Add real-time desktop streaming with WebSocket MJPEG

Implements desktop streaming feature to watch the AI agent work in real-time:

- Backend: WebSocket endpoint at /api/desktop/stream using MJPEG frames
- iOS: Bottom sheet UI with play/pause, FPS and quality controls
- Web: Side-by-side split view with toggleable desktop panel
- Better OpenCode error messages for debugging
This commit is contained in:
Thomas Marchand
2026-01-02 21:40:00 +00:00
parent 0a960e6381
commit c9a2ef45c7
10 changed files with 1199 additions and 17 deletions

View File

@@ -65,6 +65,9 @@ import {
Code,
FolderOpen,
Trash2,
Monitor,
PanelRightClose,
PanelRight,
} from "lucide-react";
import {
OptionList,
@@ -79,6 +82,7 @@ import {
import { useScrollToBottom } from "@/hooks/use-scroll-to-bottom";
import { useLocalStorage } from "@/hooks/use-local-storage";
import { useCopyToClipboard } from "@/hooks/use-copy-to-clipboard";
import { DesktopStream } from "@/components/desktop-stream";
type ChatItem =
| {
@@ -629,6 +633,10 @@ export default function ControlClient() {
// Server configuration (fetched from health endpoint)
const [maxIterations, setMaxIterations] = useState<number>(50); // Default fallback
// Desktop stream state
const [showDesktopStream, setShowDesktopStream] = useState(false);
const [desktopDisplayId] = useState(":99");
// Check if the mission we're viewing is actually running (not just any mission)
const viewingMissionIsRunning = useMemo(() => {
if (!viewingMissionId) return runState !== "idle";
@@ -1568,6 +1576,26 @@ export default function ControlClient() {
</button>
)}
{/* Desktop stream toggle */}
<button
onClick={() => setShowDesktopStream(!showDesktopStream)}
className={cn(
"flex items-center gap-2 rounded-lg border px-3 py-2 text-sm transition-colors",
showDesktopStream
? "border-emerald-500/30 bg-emerald-500/10 text-emerald-400"
: "border-white/[0.06] bg-white/[0.02] text-white/70 hover:bg-white/[0.04]"
)}
title={showDesktopStream ? "Hide desktop stream" : "Show desktop stream"}
>
<Monitor className="h-4 w-4" />
<span className="hidden sm:inline">Desktop</span>
{showDesktopStream ? (
<PanelRightClose className="h-4 w-4" />
) : (
<PanelRight className="h-4 w-4" />
)}
</button>
{/* Status panel */}
<div className="flex items-center gap-2 rounded-lg border border-white/[0.06] bg-white/[0.02] px-3 py-2">
{/* Run state indicator */}
@@ -1721,8 +1749,13 @@ export default function ControlClient() {
</div>
)}
{/* Chat container */}
<div className="flex-1 min-h-0 flex flex-col rounded-2xl glass-panel border border-white/[0.06] overflow-hidden relative">
{/* Main content area - Chat and Desktop stream side by side */}
<div className="flex-1 min-h-0 flex gap-4">
{/* Chat container */}
<div className={cn(
"flex-1 min-h-0 flex flex-col rounded-2xl glass-panel border border-white/[0.06] overflow-hidden relative transition-all duration-300",
showDesktopStream && "flex-[2]"
)}>
{/* Messages */}
<div ref={containerRef} className="flex-1 overflow-y-auto p-6">
{items.length === 0 ? (
@@ -2312,6 +2345,18 @@ export default function ControlClient() {
</form>
</div>
</div>
{/* Desktop Stream Panel */}
{showDesktopStream && (
<div className="flex-1 min-h-0 transition-all duration-300 animate-fade-in">
<DesktopStream
displayId={desktopDisplayId}
className="h-full"
onClose={() => setShowDesktopStream(false)}
/>
</div>
)}
</div>
</div>
);
}

View File

@@ -0,0 +1,393 @@
"use client";
import { useState, useEffect, useRef, useCallback } from "react";
import { cn } from "@/lib/utils";
import {
Monitor,
Play,
Pause,
RefreshCw,
X,
Settings,
Maximize2,
Minimize2,
} from "lucide-react";
interface DesktopStreamProps {
displayId?: string;
className?: string;
onClose?: () => void;
initialFps?: number;
initialQuality?: number;
}
type ConnectionState = "connecting" | "connected" | "disconnected" | "error";
export function DesktopStream({
displayId = ":99",
className,
onClose,
initialFps = 10,
initialQuality = 70,
}: DesktopStreamProps) {
const [connectionState, setConnectionState] =
useState<ConnectionState>("connecting");
const [isPaused, setIsPaused] = useState(false);
const [frameCount, setFrameCount] = useState(0);
const [errorMessage, setErrorMessage] = useState<string | null>(null);
const [showControls, setShowControls] = useState(true);
const [fps, setFps] = useState(initialFps);
const [quality, setQuality] = useState(initialQuality);
const [isFullscreen, setIsFullscreen] = useState(false);
const wsRef = useRef<WebSocket | null>(null);
const canvasRef = useRef<HTMLCanvasElement>(null);
const containerRef = useRef<HTMLDivElement>(null);
// Build WebSocket URL
const buildWsUrl = useCallback(() => {
const baseUrl =
typeof window !== "undefined"
? localStorage.getItem("api_base_url") ||
process.env.NEXT_PUBLIC_API_URL ||
"https://agent-backend.thomas.md"
: "";
// Convert https to wss, http to ws
const wsUrl = baseUrl
.replace("https://", "wss://")
.replace("http://", "ws://");
const params = new URLSearchParams({
display: displayId,
fps: fps.toString(),
quality: quality.toString(),
});
return `${wsUrl}/api/desktop/stream?${params}`;
}, [displayId, fps, quality]);
// Connect to WebSocket
const connect = useCallback(() => {
// Clean up existing connection
if (wsRef.current) {
wsRef.current.close();
}
setConnectionState("connecting");
setErrorMessage(null);
const url = buildWsUrl();
// Get JWT token
const token =
typeof window !== "undefined"
? sessionStorage.getItem("jwt") ||
localStorage.getItem("jwt_token")
: null;
// Create WebSocket with subprotocol auth
const protocols = token ? ["openagent", `jwt.${token}`] : ["openagent"];
const ws = new WebSocket(url, protocols);
ws.binaryType = "arraybuffer";
ws.onopen = () => {
setConnectionState("connected");
setErrorMessage(null);
};
ws.onmessage = (event) => {
if (event.data instanceof ArrayBuffer) {
// Binary data = JPEG frame
const blob = new Blob([event.data], { type: "image/jpeg" });
const imageUrl = URL.createObjectURL(blob);
const img = new Image();
img.onload = () => {
const canvas = canvasRef.current;
if (canvas) {
const ctx = canvas.getContext("2d");
if (ctx) {
// Resize canvas to match image
if (
canvas.width !== img.width ||
canvas.height !== img.height
) {
canvas.width = img.width;
canvas.height = img.height;
}
ctx.drawImage(img, 0, 0);
setFrameCount((prev) => prev + 1);
}
}
URL.revokeObjectURL(imageUrl);
};
img.src = imageUrl;
} else if (typeof event.data === "string") {
// Text message = JSON (error or control response)
try {
const json = JSON.parse(event.data);
if (json.error) {
setErrorMessage(json.message || json.error);
}
} catch {
// Ignore parse errors
}
}
};
ws.onerror = () => {
setConnectionState("error");
setErrorMessage("Connection error");
};
ws.onclose = () => {
setConnectionState("disconnected");
};
wsRef.current = ws;
}, [buildWsUrl]);
// Send command to server
const sendCommand = useCallback((cmd: Record<string, unknown>) => {
if (wsRef.current?.readyState === WebSocket.OPEN) {
wsRef.current.send(JSON.stringify(cmd));
}
}, []);
// Control handlers
const handlePause = useCallback(() => {
setIsPaused(true);
sendCommand({ t: "pause" });
}, [sendCommand]);
const handleResume = useCallback(() => {
setIsPaused(false);
sendCommand({ t: "resume" });
}, [sendCommand]);
const handleFpsChange = useCallback(
(newFps: number) => {
setFps(newFps);
sendCommand({ t: "fps", fps: newFps });
},
[sendCommand]
);
const handleQualityChange = useCallback(
(newQuality: number) => {
setQuality(newQuality);
sendCommand({ t: "quality", quality: newQuality });
},
[sendCommand]
);
const handleFullscreen = useCallback(() => {
if (!containerRef.current) return;
if (!isFullscreen) {
containerRef.current.requestFullscreen?.();
setIsFullscreen(true);
} else {
document.exitFullscreen?.();
setIsFullscreen(false);
}
}, [isFullscreen]);
// Connect on mount
useEffect(() => {
connect();
return () => {
wsRef.current?.close();
};
}, [connect]);
// Listen for fullscreen changes
useEffect(() => {
const handleFullscreenChange = () => {
setIsFullscreen(!!document.fullscreenElement);
};
document.addEventListener("fullscreenchange", handleFullscreenChange);
return () =>
document.removeEventListener("fullscreenchange", handleFullscreenChange);
}, []);
return (
<div
ref={containerRef}
className={cn(
"relative flex flex-col bg-[#0a0a0a] rounded-xl overflow-hidden border border-white/[0.06]",
className
)}
onMouseEnter={() => setShowControls(true)}
onMouseLeave={() => setShowControls(false)}
>
{/* Header */}
<div
className={cn(
"absolute top-0 left-0 right-0 z-10 flex items-center justify-between px-4 py-2 bg-gradient-to-b from-black/80 to-transparent transition-opacity duration-200",
showControls ? "opacity-100" : "opacity-0"
)}
>
<div className="flex items-center gap-3">
<div
className={cn(
"flex items-center gap-2 text-xs",
connectionState === "connected"
? "text-emerald-400"
: connectionState === "connecting"
? "text-amber-400"
: "text-red-400"
)}
>
<div
className={cn(
"w-2 h-2 rounded-full",
connectionState === "connected"
? "bg-emerald-400"
: connectionState === "connecting"
? "bg-amber-400 animate-pulse"
: "bg-red-400"
)}
/>
{connectionState === "connected"
? isPaused
? "Paused"
: "Live"
: connectionState === "connecting"
? "Connecting..."
: "Disconnected"}
</div>
<span className="text-xs text-white/40 font-mono">{displayId}</span>
<span className="text-xs text-white/30">{frameCount} frames</span>
</div>
<div className="flex items-center gap-2">
<button
onClick={handleFullscreen}
className="p-1.5 rounded-lg hover:bg-white/10 text-white/60 hover:text-white transition-colors"
title={isFullscreen ? "Exit fullscreen" : "Fullscreen"}
>
{isFullscreen ? (
<Minimize2 className="w-4 h-4" />
) : (
<Maximize2 className="w-4 h-4" />
)}
</button>
{onClose && (
<button
onClick={onClose}
className="p-1.5 rounded-lg hover:bg-white/10 text-white/60 hover:text-white transition-colors"
title="Close"
>
<X className="w-4 h-4" />
</button>
)}
</div>
</div>
{/* Canvas */}
<div className="flex-1 flex items-center justify-center bg-black min-h-[200px]">
{connectionState === "connected" && !errorMessage ? (
<canvas
ref={canvasRef}
className="max-w-full max-h-full object-contain"
/>
) : connectionState === "connecting" ? (
<div className="flex flex-col items-center gap-3 text-white/60">
<Monitor className="w-12 h-12 animate-pulse" />
<span className="text-sm">Connecting to desktop...</span>
</div>
) : (
<div className="flex flex-col items-center gap-3 text-white/60">
<Monitor className="w-12 h-12 text-red-400/60" />
<span className="text-sm text-red-400">
{errorMessage || "Connection lost"}
</span>
<button
onClick={connect}
className="flex items-center gap-2 px-3 py-1.5 rounded-lg bg-indigo-500 text-white text-sm hover:bg-indigo-600 transition-colors"
>
<RefreshCw className="w-4 h-4" />
Reconnect
</button>
</div>
)}
</div>
{/* Controls */}
<div
className={cn(
"absolute bottom-0 left-0 right-0 z-10 p-4 bg-gradient-to-t from-black/80 to-transparent transition-opacity duration-200",
showControls ? "opacity-100" : "opacity-0"
)}
>
<div className="flex items-center justify-between gap-4">
{/* Play/Pause */}
<div className="flex items-center gap-2">
<button
onClick={isPaused ? handleResume : handlePause}
disabled={connectionState !== "connected"}
className={cn(
"p-2 rounded-full transition-colors",
connectionState === "connected"
? "bg-white/10 hover:bg-white/20 text-white"
: "bg-white/5 text-white/30 cursor-not-allowed"
)}
title={isPaused ? "Resume" : "Pause"}
>
{isPaused ? (
<Play className="w-5 h-5" />
) : (
<Pause className="w-5 h-5" />
)}
</button>
<button
onClick={connect}
className="p-2 rounded-full bg-white/10 hover:bg-white/20 text-white transition-colors"
title="Reconnect"
>
<RefreshCw className="w-4 h-4" />
</button>
</div>
{/* Sliders */}
<div className="flex-1 flex items-center gap-6 max-w-md">
<div className="flex-1 flex items-center gap-2">
<span className="text-xs text-white/40 w-8">FPS</span>
<input
type="range"
min={1}
max={30}
value={fps}
onChange={(e) => handleFpsChange(Number(e.target.value))}
className="flex-1 h-1 bg-white/20 rounded-full appearance-none cursor-pointer [&::-webkit-slider-thumb]:appearance-none [&::-webkit-slider-thumb]:w-3 [&::-webkit-slider-thumb]:h-3 [&::-webkit-slider-thumb]:bg-indigo-500 [&::-webkit-slider-thumb]:rounded-full [&::-webkit-slider-thumb]:cursor-pointer"
/>
<span className="text-xs text-white/60 w-6 text-right tabular-nums">
{fps}
</span>
</div>
<div className="flex-1 flex items-center gap-2">
<span className="text-xs text-white/40 w-12">Quality</span>
<input
type="range"
min={10}
max={100}
step={5}
value={quality}
onChange={(e) => handleQualityChange(Number(e.target.value))}
className="flex-1 h-1 bg-white/20 rounded-full appearance-none cursor-pointer [&::-webkit-slider-thumb]:appearance-none [&::-webkit-slider-thumb]:w-3 [&::-webkit-slider-thumb]:h-3 [&::-webkit-slider-thumb]:bg-indigo-500 [&::-webkit-slider-thumb]:rounded-full [&::-webkit-slider-thumb]:cursor-pointer"
/>
<span className="text-xs text-white/60 w-8 text-right tabular-nums">
{quality}%
</span>
</div>
</div>
</div>
</div>
</div>
);
}

View File

@@ -17,20 +17,22 @@
3DD4D1D2E080C2F89C4881B7 /* ToolUIModels.swift in Sources */ = {isa = PBXBuildFile; fileRef = 8A6128ECBCA632D9E2D415F2 /* ToolUIModels.swift */; };
4B50B97618C0CC469FF64592 /* Theme.swift in Sources */ = {isa = PBXBuildFile; fileRef = 504A1222CE8971417834D229 /* Theme.swift */; };
4D0CF2666262F45370D000DF /* TerminalView.swift in Sources */ = {isa = PBXBuildFile; fileRef = 0AC6317C4EAD4DB9A8190209 /* TerminalView.swift */; };
51436A7671B1E3C8478F81A2 /* RunningMissionsBar.swift in Sources */ = {isa = PBXBuildFile; fileRef = E7FC053808661C9A0E21E83C /* RunningMissionsBar.swift */; };
5152C5313CD5AC01276D0AE6 /* FileEntry.swift in Sources */ = {isa = PBXBuildFile; fileRef = BA70A2A73D3A386EAFD69FC4 /* FileEntry.swift */; };
652A0AE498D69C9DB728B2DF /* ANSIParser.swift in Sources */ = {isa = PBXBuildFile; fileRef = CD8D224B6758B664864F3987 /* ANSIParser.swift */; };
6865FE997D3E1D91D411F6BC /* LoadingView.swift in Sources */ = {isa = PBXBuildFile; fileRef = 2B9834D4EE32058824F9DF00 /* LoadingView.swift */; };
6B87076797C9DFA01E24CC76 /* FilesView.swift in Sources */ = {isa = PBXBuildFile; fileRef = 5908645A518F48B501390AB8 /* FilesView.swift */; };
7B064EE6C0C1039360CCE40B /* DesktopStreamService.swift in Sources */ = {isa = PBXBuildFile; fileRef = A07EFDD6964AA3B251967041 /* DesktopStreamService.swift */; };
83BB0F0AAFE4F2735FF76B87 /* NavigationState.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3729F39FBF53046124D05BC1 /* NavigationState.swift */; };
999ACAA94B0BD81A05288092 /* GlassCard.swift in Sources */ = {isa = PBXBuildFile; fileRef = EB5A4720378F06807FDE73E1 /* GlassCard.swift */; };
9BC40E40E1B5622B24328AEB /* Mission.swift in Sources */ = {isa = PBXBuildFile; fileRef = D4AB47CF121ABA1946A4D879 /* Mission.swift */; };
AA02567226057045DDD61CB1 /* ContentView.swift in Sources */ = {isa = PBXBuildFile; fileRef = 99B57FC3136B64DC87413CA6 /* ContentView.swift */; };
BD52A9FE6C97C3CC53810094 /* DesktopStreamView.swift in Sources */ = {isa = PBXBuildFile; fileRef = FCA36F5FA00B575DDD336598 /* DesktopStreamView.swift */; };
CA70EC5A864C3D007D42E781 /* ChatMessage.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3CB591B632D3EF26AB217976 /* ChatMessage.swift */; };
D64972881E36894950658708 /* APIService.swift in Sources */ = {isa = PBXBuildFile; fileRef = CBC90C32FEF604E025FFBF78 /* APIService.swift */; };
DA4634D7424AF3FC985987E7 /* GlassButton.swift in Sources */ = {isa = PBXBuildFile; fileRef = 5267DE67017A858357F68424 /* GlassButton.swift */; };
FA7E68F22D16E1AC0B5F5E22 /* StatusBadge.swift in Sources */ = {isa = PBXBuildFile; fileRef = CD6FB2E54DC07BE7A1EB08F8 /* StatusBadge.swift */; };
FF9C447978711CBA9185B8B0 /* OpenAgentDashboardApp.swift in Sources */ = {isa = PBXBuildFile; fileRef = 139C740B7D55C13F3B167EF3 /* OpenAgentDashboardApp.swift */; };
3425BC785C1C91F71D626BE5 /* RunningMissionsBar.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7F50E98346B3C796078CAF21 /* RunningMissionsBar.swift */; };
/* End PBXBuildFile section */
/* Begin PBXFileReference section */
@@ -51,6 +53,7 @@
66A48A20D2178760301256C9 /* Assets.xcassets */ = {isa = PBXFileReference; lastKnownFileType = folder.assetcatalog; path = Assets.xcassets; sourceTree = "<group>"; };
8A6128ECBCA632D9E2D415F2 /* ToolUIModels.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ToolUIModels.swift; sourceTree = "<group>"; };
99B57FC3136B64DC87413CA6 /* ContentView.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ContentView.swift; sourceTree = "<group>"; };
A07EFDD6964AA3B251967041 /* DesktopStreamService.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = DesktopStreamService.swift; sourceTree = "<group>"; };
A4D419C8490A0C5FC4DCDF20 /* ToolUIOptionListView.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ToolUIOptionListView.swift; sourceTree = "<group>"; };
A84519FDE8FC75084938B292 /* ControlView.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ControlView.swift; sourceTree = "<group>"; };
BA70A2A73D3A386EAFD69FC4 /* FileEntry.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = FileEntry.swift; sourceTree = "<group>"; };
@@ -58,9 +61,10 @@
CD6FB2E54DC07BE7A1EB08F8 /* StatusBadge.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = StatusBadge.swift; sourceTree = "<group>"; };
CD8D224B6758B664864F3987 /* ANSIParser.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ANSIParser.swift; sourceTree = "<group>"; };
D4AB47CF121ABA1946A4D879 /* Mission.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Mission.swift; sourceTree = "<group>"; };
E7FC053808661C9A0E21E83C /* RunningMissionsBar.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = RunningMissionsBar.swift; sourceTree = "<group>"; };
EB5A4720378F06807FDE73E1 /* GlassCard.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = GlassCard.swift; sourceTree = "<group>"; };
F51395D8FB559D3C79AAA0A4 /* OpenAgentDashboard.app */ = {isa = PBXFileReference; includeInIndex = 0; lastKnownFileType = wrapper.application; path = OpenAgentDashboard.app; sourceTree = BUILT_PRODUCTS_DIR; };
7F50E98346B3C796078CAF21 /* RunningMissionsBar.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = RunningMissionsBar.swift; sourceTree = "<group>"; };
FCA36F5FA00B575DDD336598 /* DesktopStreamView.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = DesktopStreamView.swift; sourceTree = "<group>"; };
/* End PBXFileReference section */
/* Begin PBXGroup section */
@@ -102,7 +106,7 @@
5267DE67017A858357F68424 /* GlassButton.swift */,
EB5A4720378F06807FDE73E1 /* GlassCard.swift */,
2B9834D4EE32058824F9DF00 /* LoadingView.swift */,
7F50E98346B3C796078CAF21 /* RunningMissionsBar.swift */,
E7FC053808661C9A0E21E83C /* RunningMissionsBar.swift */,
CD6FB2E54DC07BE7A1EB08F8 /* StatusBadge.swift */,
D09E84E812213CF7E52E4FEF /* ToolUI */,
);
@@ -122,6 +126,7 @@
children = (
2EF415E84544334B25BD8E26 /* Components */,
DABAA3652C0B0A54CFC3221B /* Control */,
A688A831235D3E218A0A6783 /* Desktop */,
0C1185300420EEF31B892A3A /* Files */,
5A40B212F0D2055C1C499FCC /* History */,
0D9369EE2F3374EAA1EF332E /* Terminal */,
@@ -129,6 +134,14 @@
path = Views;
sourceTree = "<group>";
};
A688A831235D3E218A0A6783 /* Desktop */ = {
isa = PBXGroup;
children = (
FCA36F5FA00B575DDD336598 /* DesktopStreamView.swift */,
);
path = Desktop;
sourceTree = "<group>";
};
AB86DCEEB152D8EA7E8CBD86 = {
isa = PBXGroup;
children = (
@@ -187,6 +200,7 @@
children = (
CD8D224B6758B664864F3987 /* ANSIParser.swift */,
CBC90C32FEF604E025FFBF78 /* APIService.swift */,
A07EFDD6964AA3B251967041 /* DesktopStreamService.swift */,
3729F39FBF53046124D05BC1 /* NavigationState.swift */,
52DDF35DB8CD7D70F3CFC4A6 /* TerminalState.swift */,
);
@@ -268,6 +282,8 @@
CA70EC5A864C3D007D42E781 /* ChatMessage.swift in Sources */,
AA02567226057045DDD61CB1 /* ContentView.swift in Sources */,
02DB7F25245D03FF72DD8E2E /* ControlView.swift in Sources */,
7B064EE6C0C1039360CCE40B /* DesktopStreamService.swift in Sources */,
BD52A9FE6C97C3CC53810094 /* DesktopStreamView.swift in Sources */,
5152C5313CD5AC01276D0AE6 /* FileEntry.swift in Sources */,
6B87076797C9DFA01E24CC76 /* FilesView.swift in Sources */,
DA4634D7424AF3FC985987E7 /* GlassButton.swift in Sources */,
@@ -277,6 +293,7 @@
9BC40E40E1B5622B24328AEB /* Mission.swift in Sources */,
83BB0F0AAFE4F2735FF76B87 /* NavigationState.swift in Sources */,
FF9C447978711CBA9185B8B0 /* OpenAgentDashboardApp.swift in Sources */,
51436A7671B1E3C8478F81A2 /* RunningMissionsBar.swift in Sources */,
FA7E68F22D16E1AC0B5F5E22 /* StatusBadge.swift in Sources */,
0B5E1A6153270BFF21A54C23 /* TerminalState.swift in Sources */,
4D0CF2666262F45370D000DF /* TerminalView.swift in Sources */,
@@ -285,7 +302,6 @@
3DD4D1D2E080C2F89C4881B7 /* ToolUIModels.swift in Sources */,
03176DF3878C25A0B557462C /* ToolUIOptionListView.swift in Sources */,
3361B14E949CB2A6E75B6962 /* ToolUIView.swift in Sources */,
3425BC785C1C91F71D626BE5 /* RunningMissionsBar.swift in Sources */,
);
runOnlyForDeploymentPostprocessing = 0;
};

View File

@@ -0,0 +1,169 @@
//
// DesktopStreamService.swift
// OpenAgentDashboard
//
// WebSocket client for MJPEG desktop streaming
//
import Foundation
import Observation
import UIKit
@MainActor
@Observable
final class DesktopStreamService {
static let shared = DesktopStreamService()
nonisolated init() {}
// Stream state
var isConnected = false
var isPaused = false
var currentFrame: UIImage?
var errorMessage: String?
var frameCount: UInt64 = 0
var fps: Int = 10
var quality: Int = 70
private var webSocket: URLSessionWebSocketTask?
private var displayId: String?
// MARK: - Connection
func connect(displayId: String) {
disconnect()
self.displayId = displayId
self.errorMessage = nil
guard let url = buildWebSocketURL(displayId: displayId) else {
errorMessage = "Invalid URL"
return
}
let session = URLSession(configuration: .default)
var request = URLRequest(url: url)
// Add JWT token via subprotocol (same pattern as console)
if let token = UserDefaults.standard.string(forKey: "jwt_token") {
request.setValue("openagent, jwt.\(token)", forHTTPHeaderField: "Sec-WebSocket-Protocol")
} else {
request.setValue("openagent", forHTTPHeaderField: "Sec-WebSocket-Protocol")
}
webSocket = session.webSocketTask(with: request)
webSocket?.resume()
isConnected = true
// Start receiving frames
receiveMessage()
}
func disconnect() {
webSocket?.cancel(with: .normalClosure, reason: nil)
webSocket = nil
isConnected = false
currentFrame = nil
frameCount = 0
}
// MARK: - Controls
func pause() {
guard isConnected else { return }
isPaused = true
sendCommand(["t": "pause"])
}
func resume() {
guard isConnected else { return }
isPaused = false
sendCommand(["t": "resume"])
}
func setFps(_ newFps: Int) {
fps = newFps
guard isConnected else { return }
sendCommand(["t": "fps", "fps": newFps])
}
func setQuality(_ newQuality: Int) {
quality = newQuality
guard isConnected else { return }
sendCommand(["t": "quality", "quality": newQuality])
}
// MARK: - Private
private func buildWebSocketURL(displayId: String) -> URL? {
let baseURL = UserDefaults.standard.string(forKey: "api_base_url") ?? "https://agent-backend.thomas.md"
// Convert https to wss, http to ws
var wsURL = baseURL
.replacingOccurrences(of: "https://", with: "wss://")
.replacingOccurrences(of: "http://", with: "ws://")
// Build query string
let encodedDisplay = displayId.addingPercentEncoding(withAllowedCharacters: .urlQueryAllowed) ?? displayId
wsURL += "/api/desktop/stream?display=\(encodedDisplay)&fps=\(fps)&quality=\(quality)"
return URL(string: wsURL)
}
private func sendCommand(_ command: [String: Any]) {
guard let webSocket = webSocket,
let data = try? JSONSerialization.data(withJSONObject: command),
let string = String(data: data, encoding: .utf8) else {
return
}
webSocket.send(.string(string)) { [weak self] error in
if let error = error {
Task { @MainActor in
self?.errorMessage = "Send failed: \(error.localizedDescription)"
}
}
}
}
private func receiveMessage() {
webSocket?.receive { [weak self] result in
Task { @MainActor in
guard let self = self else { return }
switch result {
case .success(let message):
self.handleMessage(message)
// Continue receiving
self.receiveMessage()
case .failure(let error):
self.errorMessage = "Connection lost: \(error.localizedDescription)"
self.isConnected = false
}
}
}
}
private func handleMessage(_ message: URLSessionWebSocketTask.Message) {
switch message {
case .data(let data):
// Binary data = JPEG frame
if let image = UIImage(data: data) {
currentFrame = image
frameCount += 1
errorMessage = nil
}
case .string(let text):
// Text message = JSON (error or control response)
if let data = text.data(using: .utf8),
let json = try? JSONSerialization.jsonObject(with: data) as? [String: Any] {
if let error = json["error"] as? String {
errorMessage = json["message"] as? String ?? error
}
}
@unknown default:
break
}
}
}

View File

@@ -33,7 +33,11 @@ struct ControlView: View {
// Track pending fetch to prevent race conditions
@State private var fetchingMissionId: String?
// Desktop stream state
@State private var showDesktopStream = false
@State private var desktopDisplayId = ":99"
@FocusState private var isInputFocused: Bool
private let api = APIService.shared
@@ -127,6 +131,18 @@ struct ControlView: View {
}
}
ToolbarItem(placement: .topBarTrailing) {
// Desktop stream button
Button {
showDesktopStream = true
HapticService.lightTap()
} label: {
Image(systemName: "display")
.font(.system(size: 14))
.foregroundStyle(Theme.textSecondary)
}
}
ToolbarItem(placement: .topBarTrailing) {
Menu {
Button {
@@ -134,10 +150,17 @@ struct ControlView: View {
} label: {
Label("New Mission", systemImage: "plus")
}
// Desktop stream option in menu too
Button {
showDesktopStream = true
} label: {
Label("View Desktop", systemImage: "display")
}
if let mission = currentMission {
Divider()
// Resume button for interrupted/blocked missions
if mission.canResume {
Button {
@@ -146,19 +169,19 @@ struct ControlView: View {
Label("Resume Mission", systemImage: "play.circle")
}
}
Button {
Task { await setMissionStatus(.completed) }
} label: {
Label("Mark Complete", systemImage: "checkmark.circle")
}
Button(role: .destructive) {
Task { await setMissionStatus(.failed) }
} label: {
Label("Mark Failed", systemImage: "xmark.circle")
}
if mission.status != .active && !mission.canResume {
Button {
Task { await setMissionStatus(.active) }
@@ -216,6 +239,12 @@ struct ControlView: View {
reconnectAttempt = 0
pollingTask?.cancel()
}
.sheet(isPresented: $showDesktopStream) {
DesktopStreamView(displayId: desktopDisplayId)
.presentationDetents([.medium, .large])
.presentationDragIndicator(.visible)
.presentationBackgroundInteraction(.enabled(upThrough: .medium))
}
}
// MARK: - Running Missions Bar

View File

@@ -0,0 +1,247 @@
//
// DesktopStreamView.swift
// OpenAgentDashboard
//
// Real-time desktop stream viewer with controls
// Designed to be shown in a bottom sheet
//
import SwiftUI
struct DesktopStreamView: View {
@State private var streamService = DesktopStreamService.shared
@State private var showControls = true
@State private var displayId: String
@Environment(\.dismiss) private var dismiss
init(displayId: String = ":99") {
_displayId = State(initialValue: displayId)
}
var body: some View {
ZStack {
// Background
Theme.backgroundPrimary.ignoresSafeArea()
VStack(spacing: 0) {
// Header bar
headerView
// Stream content
streamContent
.frame(maxWidth: .infinity, maxHeight: .infinity)
// Controls (when visible)
if showControls {
controlsView
.transition(.move(edge: .bottom).combined(with: .opacity))
}
}
}
.onAppear {
streamService.connect(displayId: displayId)
}
.onDisappear {
streamService.disconnect()
}
.onTapGesture {
withAnimation(.easeInOut(duration: 0.2)) {
showControls.toggle()
}
}
}
// MARK: - Header
private var headerView: some View {
HStack(spacing: 12) {
// Connection indicator
HStack(spacing: 6) {
Circle()
.fill(streamService.isConnected ? Theme.success : Theme.error)
.frame(width: 8, height: 8)
.overlay {
if streamService.isConnected && !streamService.isPaused {
Circle()
.stroke(Theme.success.opacity(0.5), lineWidth: 2)
.frame(width: 14, height: 14)
.opacity(0.6)
}
}
Text(streamService.isConnected ? (streamService.isPaused ? "Paused" : "Live") : "Disconnected")
.font(.caption.weight(.medium))
.foregroundStyle(Theme.textSecondary)
}
Spacer()
// Display ID
Text(displayId)
.font(.caption.monospaced())
.foregroundStyle(Theme.textMuted)
// Frame counter
Text("\(streamService.frameCount) frames")
.font(.caption2.monospaced())
.foregroundStyle(Theme.textMuted)
// Close button
Button {
dismiss()
} label: {
Image(systemName: "xmark")
.font(.system(size: 14, weight: .medium))
.foregroundStyle(Theme.textSecondary)
.frame(width: 28, height: 28)
.background(Theme.backgroundSecondary)
.clipShape(Circle())
}
}
.padding(.horizontal, 16)
.padding(.vertical, 12)
.background(.ultraThinMaterial)
}
// MARK: - Stream Content
@ViewBuilder
private var streamContent: some View {
if let frame = streamService.currentFrame {
// Show the current frame
Image(uiImage: frame)
.resizable()
.aspectRatio(contentMode: .fit)
.background(Color.black)
} else if let error = streamService.errorMessage {
// Show error state
VStack(spacing: 16) {
Image(systemName: "exclamationmark.triangle")
.font(.system(size: 48))
.foregroundStyle(Theme.warning)
Text(error)
.font(.subheadline)
.foregroundStyle(Theme.textSecondary)
.multilineTextAlignment(.center)
.padding(.horizontal)
Button {
streamService.connect(displayId: displayId)
} label: {
Label("Retry", systemImage: "arrow.clockwise")
.font(.subheadline.weight(.medium))
.foregroundStyle(.white)
.padding(.horizontal, 20)
.padding(.vertical, 10)
.background(Theme.accent)
.clipShape(Capsule())
}
}
} else {
// Loading state
VStack(spacing: 16) {
ProgressView()
.progressViewStyle(.circular)
.tint(Theme.accent)
.scaleEffect(1.2)
Text("Connecting to desktop...")
.font(.subheadline)
.foregroundStyle(Theme.textSecondary)
}
}
}
// MARK: - Controls
private var controlsView: some View {
VStack(spacing: 16) {
// Play/Pause and reconnect buttons
HStack(spacing: 16) {
// Play/Pause
Button {
if streamService.isPaused {
streamService.resume()
} else {
streamService.pause()
}
HapticService.lightTap()
} label: {
Image(systemName: streamService.isPaused ? "play.fill" : "pause.fill")
.font(.system(size: 20))
.foregroundStyle(.white)
.frame(width: 48, height: 48)
.background(Theme.accent)
.clipShape(Circle())
}
.disabled(!streamService.isConnected)
.opacity(streamService.isConnected ? 1 : 0.5)
// Reconnect
Button {
streamService.connect(displayId: displayId)
HapticService.lightTap()
} label: {
Image(systemName: "arrow.clockwise")
.font(.system(size: 16, weight: .medium))
.foregroundStyle(Theme.textPrimary)
.frame(width: 44, height: 44)
.background(Theme.backgroundSecondary)
.clipShape(Circle())
}
}
// Quality and FPS sliders
VStack(spacing: 12) {
// FPS slider
HStack {
Text("FPS")
.font(.caption.weight(.medium))
.foregroundStyle(Theme.textMuted)
.frame(width: 50, alignment: .leading)
Slider(value: Binding(
get: { Double(streamService.fps) },
set: { streamService.setFps(Int($0)) }
), in: 1...30, step: 1)
.tint(Theme.accent)
Text("\(streamService.fps)")
.font(.caption.monospaced())
.foregroundStyle(Theme.textSecondary)
.frame(width: 30)
}
// Quality slider
HStack {
Text("Quality")
.font(.caption.weight(.medium))
.foregroundStyle(Theme.textMuted)
.frame(width: 50, alignment: .leading)
Slider(value: Binding(
get: { Double(streamService.quality) },
set: { streamService.setQuality(Int($0)) }
), in: 10...100, step: 5)
.tint(Theme.accent)
Text("\(streamService.quality)%")
.font(.caption.monospaced())
.foregroundStyle(Theme.textSecondary)
.frame(width: 40)
}
}
.padding(.horizontal, 8)
}
.padding(16)
.background(.ultraThinMaterial)
}
}
// MARK: - Preview
#Preview {
DesktopStreamView(displayId: ":99")
}

View File

@@ -279,13 +279,21 @@ impl Agent for OpenCodeAgent {
});
}
if response.info.error.is_some() {
if let Some(error) = &response.info.error {
tree.status = "failed".to_string();
if let Some(node) = tree.children.iter_mut().find(|n| n.id == "opencode") {
node.status = "failed".to_string();
}
ctx.emit_tree(tree);
return AgentResult::failure("OpenCode returned an error response", 0)
// Extract error message from the error value
let error_msg = if let Some(msg) = error.get("message").and_then(|v| v.as_str()) {
msg.to_string()
} else if let Some(s) = error.as_str() {
s.to_string()
} else {
error.to_string()
};
return AgentResult::failure(format!("OpenCode error: {}", error_msg), 0)
.with_terminal_reason(TerminalReason::LlmError);
}
@@ -355,13 +363,21 @@ impl OpenCodeAgent {
}
};
if response.info.error.is_some() {
if let Some(error) = &response.info.error {
tree.status = "failed".to_string();
if let Some(node) = tree.children.iter_mut().find(|n| n.id == "opencode") {
node.status = "failed".to_string();
}
ctx.emit_tree(tree);
return AgentResult::failure("OpenCode returned an error response", 0)
// Extract error message from the error value
let error_msg = if let Some(msg) = error.get("message").and_then(|v| v.as_str()) {
msg.to_string()
} else if let Some(s) = error.as_str() {
s.to_string()
} else {
error.to_string()
};
return AgentResult::failure(format!("OpenCode error: {}", error_msg), 0)
.with_terminal_reason(TerminalReason::LlmError);
}

260
src/api/desktop_stream.rs Normal file
View File

@@ -0,0 +1,260 @@
//! WebSocket-based MJPEG streaming for virtual desktop display.
//!
//! Provides real-time streaming of the X11 virtual desktop (Xvfb)
//! to connected clients over WebSocket using MJPEG frames.
use std::sync::Arc;
use std::time::Duration;
use axum::{
extract::{
ws::{Message, WebSocket, WebSocketUpgrade},
Query, State,
},
http::{HeaderMap, StatusCode},
response::IntoResponse,
};
use futures::{SinkExt, StreamExt};
use serde::Deserialize;
use tokio::process::Command;
use tokio::sync::mpsc;
use super::auth;
use super::routes::AppState;
/// Query parameters for the desktop stream endpoint
#[derive(Debug, Deserialize)]
pub struct StreamParams {
/// Display identifier (e.g., ":99")
pub display: String,
/// Target frames per second (default: 10)
pub fps: Option<u32>,
/// JPEG quality 1-100 (default: 70)
pub quality: Option<u32>,
}
/// Extract JWT from WebSocket subprotocol header
fn extract_jwt_from_protocols(headers: &HeaderMap) -> Option<String> {
let raw = headers
.get("sec-websocket-protocol")
.and_then(|v| v.to_str().ok())?;
// Client sends: ["openagent", "jwt.<token>"]
for part in raw.split(',').map(|s| s.trim()) {
if let Some(rest) = part.strip_prefix("jwt.") {
if !rest.is_empty() {
return Some(rest.to_string());
}
}
}
None
}
/// WebSocket endpoint for streaming desktop as MJPEG
pub async fn desktop_stream_ws(
ws: WebSocketUpgrade,
State(state): State<Arc<AppState>>,
Query(params): Query<StreamParams>,
headers: HeaderMap,
) -> impl IntoResponse {
// Enforce auth in non-dev mode
if state.config.auth.auth_required(state.config.dev_mode) {
let token = match extract_jwt_from_protocols(&headers) {
Some(t) => t,
None => return (StatusCode::UNAUTHORIZED, "Missing websocket JWT").into_response(),
};
if !auth::verify_token_for_config(&token, &state.config) {
return (StatusCode::UNAUTHORIZED, "Invalid or expired token").into_response();
}
}
// Validate display format
if !params.display.starts_with(':') {
return (StatusCode::BAD_REQUEST, "Invalid display format").into_response();
}
ws.protocols(["openagent"])
.on_upgrade(move |socket| handle_desktop_stream(socket, params))
}
/// Client command for controlling the stream
#[derive(Debug, Deserialize)]
#[serde(tag = "t")]
enum ClientCommand {
/// Pause streaming
#[serde(rename = "pause")]
Pause,
/// Resume streaming
#[serde(rename = "resume")]
Resume,
/// Change FPS
#[serde(rename = "fps")]
SetFps { fps: u32 },
/// Change quality
#[serde(rename = "quality")]
SetQuality { quality: u32 },
}
/// Handle the WebSocket connection for desktop streaming
async fn handle_desktop_stream(mut socket: WebSocket, params: StreamParams) {
let x11_display = params.display;
let fps = params.fps.unwrap_or(10).clamp(1, 30);
let quality = params.quality.unwrap_or(70).clamp(10, 100);
tracing::info!(
x11_display = %x11_display,
fps = fps,
quality = quality,
"Starting desktop stream"
);
// Channel for control commands from client
let (cmd_tx, mut cmd_rx) = mpsc::unbounded_channel::<ClientCommand>();
// Split the socket
let (mut ws_sender, mut ws_receiver) = socket.split();
// Spawn task to handle incoming messages
let cmd_tx_clone = cmd_tx.clone();
let recv_task = tokio::spawn(async move {
while let Some(Ok(msg)) = ws_receiver.next().await {
match msg {
Message::Text(t) => {
if let Ok(cmd) = serde_json::from_str::<ClientCommand>(&t) {
let _ = cmd_tx_clone.send(cmd);
}
}
Message::Close(_) => break,
_ => {}
}
}
});
// Streaming state
let mut paused = false;
let mut current_fps = fps;
let mut current_quality = quality;
let mut frame_interval = Duration::from_millis(1000 / current_fps as u64);
// Main streaming loop
let stream_task = tokio::spawn(async move {
let mut frame_count: u64 = 0;
loop {
// Check for control commands (non-blocking)
while let Ok(cmd) = cmd_rx.try_recv() {
match cmd {
ClientCommand::Pause => {
paused = true;
tracing::debug!("Stream paused");
}
ClientCommand::Resume => {
paused = false;
tracing::debug!("Stream resumed");
}
ClientCommand::SetFps { fps: new_fps } => {
current_fps = new_fps.clamp(1, 30);
frame_interval = Duration::from_millis(1000 / current_fps as u64);
tracing::debug!(fps = current_fps, "FPS changed");
}
ClientCommand::SetQuality { quality: new_quality } => {
current_quality = new_quality.clamp(10, 100);
tracing::debug!(quality = current_quality, "Quality changed");
}
}
}
if paused {
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
}
// Capture frame
match capture_frame(&x11_display, current_quality).await {
Ok(jpeg_data) => {
frame_count += 1;
// Send as binary WebSocket message
if ws_sender.send(Message::Binary(jpeg_data)).await.is_err() {
tracing::debug!("Client disconnected");
break;
}
}
Err(e) => {
// Send error as text message
let err_msg = serde_json::json!({
"error": "capture_failed",
"message": e.to_string()
});
if ws_sender
.send(Message::Text(err_msg.to_string()))
.await
.is_err()
{
break;
}
// Wait a bit before retrying on error
tokio::time::sleep(Duration::from_millis(500)).await;
}
}
// Wait for next frame
tokio::time::sleep(frame_interval).await;
}
tracing::info!(frames = frame_count, "Desktop stream ended");
});
// Wait for either task to complete
tokio::select! {
_ = recv_task => {}
_ = stream_task => {}
}
}
/// Capture a single frame from the X11 display as JPEG
async fn capture_frame(display: &str, quality: u32) -> anyhow::Result<Vec<u8>> {
// Use import from ImageMagick to capture and convert directly to JPEG
// This avoids writing to disk and is more efficient
let output = Command::new("import")
.args([
"-window",
"root",
"-quality",
&quality.to_string(),
"jpeg:-", // Output JPEG to stdout
])
.env("DISPLAY", display)
.output()
.await
.map_err(|e| anyhow::anyhow!("Failed to run import: {}", e))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(anyhow::anyhow!("import failed: {}", stderr));
}
Ok(output.stdout)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_stream_params_defaults() {
let params = StreamParams {
display: ":99".to_string(),
fps: None,
quality: None,
};
assert_eq!(params.fps.unwrap_or(10), 10);
assert_eq!(params.quality.unwrap_or(70), 70);
}
#[test]
fn test_fps_clamping() {
assert_eq!(0_u32.clamp(1, 30), 1);
assert_eq!(50_u32.clamp(1, 30), 30);
assert_eq!(15_u32.clamp(1, 30), 15);
}
}

View File

@@ -18,6 +18,7 @@
mod auth;
mod console;
pub mod control;
mod desktop_stream;
mod fs;
pub mod mcp;
pub mod mission_runner;

View File

@@ -32,6 +32,7 @@ use crate::tools::ToolRegistry;
use super::auth;
use super::console;
use super::control;
use super::desktop_stream;
use super::fs;
use super::mcp as mcp_api;
use super::types::*;
@@ -103,7 +104,12 @@ pub async fn serve(config: Config) -> anyhow::Result<()> {
.route("/api/health", get(health))
.route("/api/auth/login", post(auth::login))
// WebSocket console uses subprotocol-based auth (browser can't set Authorization header)
.route("/api/console/ws", get(console::console_ws));
.route("/api/console/ws", get(console::console_ws))
// WebSocket desktop stream uses subprotocol-based auth
.route(
"/api/desktop/stream",
get(desktop_stream::desktop_stream_ws),
);
// File upload routes with increased body limit (10GB)
let upload_route = Router::new()