Add real-time desktop streaming for watching AI agent work (#17)

* iOS: Improve mission UI, add auto-reconnect, and refine input field

- Fix missions showing "Default" label by using mission ID instead when no model override
- Add ConnectionState enum to track SSE stream health with reconnecting/disconnected states
- Implement automatic reconnection with exponential backoff (1s→30s)
- Show connection status in toolbar when disconnecting, hide error bubbles for connection issues
- Fix status event filtering to only apply to currently viewed mission
- Reset run state when creating new mission or switching missions
- Redesign input field to ChatGPT style: clean outline, no background fill, integrated send button

* Add real-time desktop streaming with WebSocket MJPEG

Implements desktop streaming feature to watch the AI agent work in real-time:

- Backend: WebSocket endpoint at /api/desktop/stream using MJPEG frames
- iOS: Bottom sheet UI with play/pause, FPS and quality controls
- Web: Side-by-side split view with toggleable desktop panel
- Better OpenCode error messages for debugging

* Fix Bugbot review issues

- Fix WebSocket reconnection on slider changes by using initial values for URL params
- Fix iOS connected status set before WebSocket actually connects
- Fix mission state mapping to properly handle waiting_for_tool state

* Change default model from Sonnet 4 to Opus 4.5

Update DEFAULT_MODEL default value to claude-opus-4-5-20251101,
the most capable model in the Claude family.

* Fix additional Bugbot review issues

- Add onerror handler for image loading to prevent memory leaks
- Reset isPaused on disconnect to avoid UI desync
- Fix data race on backoff variable using nonisolated(unsafe)

* Address remaining Bugbot review issues

- Make error filtering more specific to SSE reconnection errors only
- Use refs for FPS/quality values to preserve current settings on reconnect

* Fix initial connection state and task cleanup

- Start iOS connection state as disconnected until first event
- Abort spawned tasks when WebSocket handler exits to prevent resource waste

* Fix connection state and backoff logic in iOS ControlView

- Set connectionState to .disconnected on view disappear (was incorrectly .connected)
- Only reset exponential backoff on successful (non-error) events to maintain proper
  backoff behavior when server is unavailable

* Fix fullscreen state sync and stale WebSocket callbacks

- Web: Don't set fullscreen state synchronously; rely on event listeners
- Web: Add fullscreenerror event handler to catch failed fullscreen requests
- iOS: Add connection ID to prevent stale WebSocket callbacks from corrupting
  new connection state when reconnecting

* Fix user message not appearing when viewing parallel missions

When switching to a parallel mission, currentMission was not being
updated, causing viewingId != currentId. This made the event filter
skip user_message events (which have mission_id: None from main session).

Now always update currentMission when switching, ensuring the filter
passes events correctly.

* Fix web dashboard showing "Agent is working..." for idle missions

Two fixes:
1. Set viewingMissionId immediately when loading mission from URL param
   - Previously viewingMissionId was null, falling back to global runState
   - Now it's set immediately so viewingMissionIsRunning checks runningMissions

2. Add status event filtering by mission_id
   - Status events now only update runState if they match the viewing mission
   - Similar to iOS fix for cross-mission status contamination

* Fix mission not loading when accessed via URL before authentication

When loading a mission via URL param (?mission=...), the initial API
fetch would fail with 401 before the user authenticated. After login,
nothing triggered a re-fetch of the mission data.

Added auth retry mechanism:
- Add signalAuthSuccess() to dispatch event after successful login
- Add authRetryTrigger state and listener in control-client
- Re-fetch mission and providers when auth succeeds

* Fix user message not appearing when viewing a specific mission

The user_message SSE event was being sent with mission_id: None, causing
it to be filtered out by the frontend when viewing a specific mission.
Now we read the current_mission before emitting the event and include
its ID, so the frontend correctly displays the user's message.

* Separate viewed mission from main mission to prevent event leaking

- Thread mission_id through main control runs so assistant/thinking/tool
  events are tagged with the correct mission ID
- Web: Track viewingMission separately from currentMission; filter SSE
  events by mission_id; revert to previous view on load failures
- iOS: Track viewingMission separately from currentMission; filter SSE
  events by mission_id; restore previous view on load failures; parse
  depth from both 'depth' and 'current_depth' SSE fields
- Update "Auto uses" label to Opus 4.5 on web

This prevents mission switching from leaking messages or status updates
across different missions when running parallel missions.

* Fix Bugbot review issues

- Use getValidJwt() and getRuntimeApiBase() in desktop-stream.tsx
  instead of incorrect storage keys
- Show error toast for mission load failures (except 401 auth errors)
  to fix silent failures for already-authenticated users

* Fix additional Bugbot review issues

- Add connectionId guard to desktop stream WebSocket to prevent race
  conditions where stale onclose callbacks incorrectly set disconnected
  state after reconnection
- Fix sync effect in control-client to only update viewingMission when
  viewingMissionId matches currentMission.id, preventing state corruption
- Restore runState, queueLength, progress on iOS mission switch failure
  to avoid mismatched status indicators

* Add race condition guard to URL-based mission loading

* Fix data race in iOS reconnection backoff using OSAllocatedUnfairLock

Replace nonisolated(unsafe) with proper thread-safe synchronization
using OSAllocatedUnfairLock for the receivedSuccessfulEvent boolean
that is written from the stream callback and read after completion.
This commit is contained in:
Thomas Marchand
2026-01-03 12:21:35 +00:00
committed by GitHub
parent 3398dbe271
commit b42ed192cf
14 changed files with 1525 additions and 172 deletions

View File

@@ -65,6 +65,9 @@ import {
Code,
FolderOpen,
Trash2,
Monitor,
PanelRightClose,
PanelRight,
} from "lucide-react";
import {
OptionList,
@@ -79,6 +82,7 @@ import {
import { useScrollToBottom } from "@/hooks/use-scroll-to-bottom";
import { useLocalStorage } from "@/hooks/use-local-storage";
import { useCopyToClipboard } from "@/hooks/use-copy-to-clipboard";
import { DesktopStream } from "@/components/desktop-stream";
type ChatItem =
| {
@@ -587,6 +591,7 @@ export default function ControlClient() {
// Mission state
const [currentMission, setCurrentMission] = useState<Mission | null>(null);
const [viewingMission, setViewingMission] = useState<Mission | null>(null);
const [showStatusMenu, setShowStatusMenu] = useState(false);
const [missionLoading, setMissionLoading] = useState(false);
@@ -629,6 +634,10 @@ export default function ControlClient() {
// Server configuration (fetched from health endpoint)
const [maxIterations, setMaxIterations] = useState<number>(50); // Default fallback
// Desktop stream state
const [showDesktopStream, setShowDesktopStream] = useState(false);
const [desktopDisplayId] = useState(":99");
// Check if the mission we're viewing is actually running (not just any mission)
const viewingMissionIsRunning = useMemo(() => {
if (!viewingMissionId) return runState !== "idle";
@@ -647,6 +656,7 @@ export default function ControlClient() {
const viewingMissionIdRef = useRef<string | null>(null);
const runningMissionsRef = useRef<RunningMissionInfo[]>([]);
const currentMissionRef = useRef<Mission | null>(null);
const viewingMissionRef = useRef<Mission | null>(null);
// Keep refs in sync with state
useEffect(() => {
@@ -661,6 +671,10 @@ export default function ControlClient() {
currentMissionRef.current = currentMission;
}, [currentMission]);
useEffect(() => {
viewingMissionRef.current = viewingMission;
}, [viewingMission]);
// Smart auto-scroll
const { containerRef, endRef, isAtBottom, scrollToBottom } =
useScrollToBottom();
@@ -856,19 +870,54 @@ export default function ControlClient() {
});
}, []);
// Load mission from URL param on mount
// Load mission from URL param on mount (and retry on auth success)
const [authRetryTrigger, setAuthRetryTrigger] = useState(0);
// Listen for auth success to retry loading
useEffect(() => {
const onAuthSuccess = () => {
setAuthRetryTrigger((prev) => prev + 1);
};
window.addEventListener("openagent:auth:success", onAuthSuccess);
return () => window.removeEventListener("openagent:auth:success", onAuthSuccess);
}, []);
useEffect(() => {
const missionId = searchParams.get("mission");
if (missionId) {
const previousViewingMission = viewingMissionRef.current;
setMissionLoading(true);
setViewingMissionId(missionId); // Set viewing ID immediately to prevent "Agent is working..." flash
fetchingMissionIdRef.current = missionId; // Track which mission we're loading
loadMission(missionId)
.then((mission) => {
// Race condition guard: only apply if this is still the mission we want
if (fetchingMissionIdRef.current !== missionId) return;
setCurrentMission(mission);
setViewingMission(mission);
setItems(missionHistoryToItems(mission));
})
.catch((err) => {
// Race condition guard: only handle error if this is still the mission we want
if (fetchingMissionIdRef.current !== missionId) return;
console.error("Failed to load mission:", err);
toast.error("Failed to load mission");
// Show error toast for mission load failures (skip if likely a 401 during initial page load)
const is401 = err?.message?.includes("401") || err?.status === 401;
if (!is401) {
toast.error("Failed to load mission");
}
// Revert viewing state to the previous mission to avoid filtering out events
const fallbackMission = previousViewingMission ?? currentMissionRef.current;
if (fallbackMission) {
setViewingMissionId(fallbackMission.id);
setViewingMission(fallbackMission);
setItems(missionHistoryToItems(fallbackMission));
} else {
setViewingMissionId(null);
setViewingMission(null);
setItems([]);
}
})
.finally(() => setMissionLoading(false));
} else {
@@ -876,6 +925,7 @@ export default function ControlClient() {
.then((mission) => {
if (mission) {
setCurrentMission(mission);
setViewingMission(mission);
setItems(missionHistoryToItems(mission));
router.replace(`/control?mission=${mission.id}`, { scroll: false });
}
@@ -884,7 +934,7 @@ export default function ControlClient() {
console.error("Failed to get current mission:", err);
});
}
}, [searchParams, router, missionHistoryToItems]);
}, [searchParams, router, missionHistoryToItems, authRetryTrigger]);
// Poll for running parallel missions
useEffect(() => {
@@ -907,7 +957,7 @@ export default function ControlClient() {
return () => clearInterval(interval);
}, []);
// Fetch available providers and models for mission creation
// Fetch available providers and models for mission creation (retry on auth success)
useEffect(() => {
listProviders()
.then((data) => {
@@ -916,7 +966,7 @@ export default function ControlClient() {
.catch((err) => {
console.error("Failed to fetch providers:", err);
});
}, []);
}, [authRetryTrigger]);
// Fetch server configuration (max_iterations) from health endpoint
useEffect(() => {
@@ -951,6 +1001,9 @@ export default function ControlClient() {
// Handle switching which mission we're viewing
const handleViewMission = useCallback(
async (missionId: string) => {
const previousViewingId = viewingMissionIdRef.current;
const previousViewingMission = viewingMissionRef.current;
setViewingMissionId(missionId);
fetchingMissionIdRef.current = missionId;
@@ -968,6 +1021,10 @@ export default function ControlClient() {
setItems(historyItems);
// Update cache with fresh data
setMissionItems((prev) => ({ ...prev, [missionId]: historyItems }));
setViewingMission(mission);
if (currentMissionRef.current?.id === mission.id) {
setCurrentMission(mission);
}
} catch (err) {
console.error("Failed to load mission:", err);
@@ -976,10 +1033,19 @@ export default function ControlClient() {
return;
}
// Fallback to cached items if API fails
if (missionItems[missionId]) {
setItems(missionItems[missionId]);
// Revert viewing state to avoid filtering out events
const fallbackMission = previousViewingMission ?? currentMissionRef.current;
if (fallbackMission) {
setViewingMissionId(fallbackMission.id);
setViewingMission(fallbackMission);
setItems(missionHistoryToItems(fallbackMission));
} else if (previousViewingId && missionItems[previousViewingId]) {
setViewingMissionId(previousViewingId);
setViewingMission(null);
setItems(missionItems[previousViewingId]);
} else {
setViewingMissionId(null);
setViewingMission(null);
setItems([]);
}
}
@@ -987,10 +1053,14 @@ export default function ControlClient() {
[missionItems, missionHistoryToItems]
);
// Sync viewingMissionId with currentMission
// Sync viewingMissionId with currentMission only when there's no explicit viewing mission set
useEffect(() => {
if (currentMission && !viewingMissionId) {
setViewingMissionId(currentMission.id);
setViewingMission(currentMission);
} else if (currentMission && viewingMissionId === currentMission.id) {
// Only update viewingMission if we're actually viewing the current mission
setViewingMission(currentMission);
}
}, [currentMission, viewingMissionId]);
@@ -1003,6 +1073,7 @@ export default function ControlClient() {
setMissionLoading(true);
const mission = await createMission(undefined, modelOverride);
setCurrentMission(mission);
setViewingMission(mission);
setViewingMissionId(mission.id); // Also update viewing to the new mission
setItems([]);
setShowParallelPanel(true); // Show the missions panel so user can see the new mission
@@ -1021,10 +1092,16 @@ export default function ControlClient() {
// Handle setting mission status
const handleSetStatus = async (status: MissionStatus) => {
if (!currentMission) return;
const mission = viewingMission ?? currentMission;
if (!mission) return;
try {
await setMissionStatus(currentMission.id, status);
setCurrentMission({ ...currentMission, status });
await setMissionStatus(mission.id, status);
if (currentMission?.id === mission.id) {
setCurrentMission({ ...mission, status });
}
if (viewingMission?.id === mission.id) {
setViewingMission({ ...mission, status });
}
setShowStatusMenu(false);
toast.success(`Mission marked as ${status}`);
} catch (err) {
@@ -1035,16 +1112,22 @@ export default function ControlClient() {
// Handle resuming an interrupted mission
const handleResumeMission = async (cleanWorkspace: boolean = false) => {
if (!currentMission || !["interrupted", "blocked"].includes(currentMission.status)) return;
const mission = viewingMission ?? currentMission;
if (!mission || !["interrupted", "blocked"].includes(mission.status)) return;
try {
setMissionLoading(true);
const resumed = await resumeMission(currentMission.id, cleanWorkspace);
const resumed = await resumeMission(mission.id, cleanWorkspace);
setCurrentMission(resumed);
setViewingMission(resumed);
setViewingMissionId(resumed.id);
const historyItems = missionHistoryToItems(resumed);
setItems(historyItems);
setMissionItems((prev) => ({ ...prev, [resumed.id]: historyItems }));
setShowStatusMenu(false);
toast.success(
cleanWorkspace
? "Mission resumed with clean workspace"
: (currentMission.status === "blocked" ? "Continuing mission" : "Mission resumed")
: (mission.status === "blocked" ? "Continuing mission" : "Mission resumed")
);
} catch (err) {
console.error("Failed to resume mission:", err);
@@ -1116,30 +1199,45 @@ export default function ControlClient() {
const newState =
typeof st === "string" ? (st as ControlRunState) : "idle";
const q = data["queue_len"];
setQueueLen(typeof q === "number" ? q : 0);
// Clear progress when idle
if (newState === "idle") {
setProgress(null);
// Status filtering: only apply status if it matches the mission we're viewing
const statusMissionId = typeof data["mission_id"] === "string" ? data["mission_id"] : null;
let shouldApplyStatus = true;
if (statusMissionId) {
// Status for a specific mission - only apply if viewing that mission
shouldApplyStatus = statusMissionId === viewingId;
} else {
// Status for main session - only apply if viewing main mission or no specific mission
shouldApplyStatus = !viewingId || viewingId === currentMissionId;
}
// If we reconnected and agent is already running, add a visual indicator
setRunState((prevState) => {
// Only show reconnect notice if we weren't already tracking this as running
// and there's no active thinking/phase item (means we missed some events)
if (newState === "running" && prevState === "idle") {
setItems((prevItems) => {
const hasActiveThinking = prevItems.some(
(it) =>
(it.kind === "thinking" && !it.done) || it.kind === "phase"
);
// If there's no active streaming item, the user is seeing stale state
// The "Agent is working..." indicator will show via the render logic
return prevItems;
});
if (shouldApplyStatus) {
setQueueLen(typeof q === "number" ? q : 0);
// Clear progress when idle
if (newState === "idle") {
setProgress(null);
}
return newState;
});
// If we reconnected and agent is already running, add a visual indicator
setRunState((prevState) => {
// Only show reconnect notice if we weren't already tracking this as running
// and there's no active thinking/phase item (means we missed some events)
if (newState === "running" && prevState === "idle") {
setItems((prevItems) => {
const hasActiveThinking = prevItems.some(
(it) =>
(it.kind === "thinking" && !it.done) || it.kind === "phase"
);
// If there's no active streaming item, the user is seeing stale state
// The "Agent is working..." indicator will show via the render logic
return prevItems;
});
}
return newState;
});
}
return;
}
@@ -1354,13 +1452,14 @@ export default function ControlClient() {
}
};
const missionStatus = currentMission
? missionStatusLabel(currentMission.status)
const activeMission = viewingMission ?? currentMission;
const missionStatus = activeMission
? missionStatusLabel(activeMission.status)
: null;
const missionTitle = currentMission?.title
? currentMission.title.length > 60
? currentMission.title.slice(0, 60) + "..."
: currentMission.title
const missionTitle = activeMission?.title
? activeMission.title.length > 60
? activeMission.title.slice(0, 60) + "..."
: activeMission.title
: "New Mission";
return (
@@ -1398,8 +1497,8 @@ export default function ControlClient() {
)}
</div>
<p className="text-xs text-white/40 truncate">
{currentMission
? `Mission ${currentMission.id.slice(0, 8)}...`
{activeMission
? `Mission ${activeMission.id.slice(0, 8)}...`
: "No active mission"}
</p>
</div>
@@ -1407,7 +1506,7 @@ export default function ControlClient() {
</div>
<div className="flex items-center gap-3 shrink-0 flex-wrap">
{currentMission && (
{activeMission && (
<div className="relative" ref={statusMenuRef}>
<button
onClick={() => setShowStatusMenu(!showStatusMenu)}
@@ -1432,7 +1531,7 @@ export default function ControlClient() {
<XCircle className="h-4 w-4 text-red-400" />
Mark Failed
</button>
{(currentMission.status === "interrupted" || currentMission.status === "blocked") && (
{(activeMission?.status === "interrupted" || activeMission?.status === "blocked") && (
<>
<button
onClick={() => handleResumeMission(false)}
@@ -1440,7 +1539,7 @@ export default function ControlClient() {
className="flex w-full items-center gap-2 px-3 py-2 text-sm text-white/70 hover:bg-white/[0.04] disabled:opacity-50"
>
<PlayCircle className="h-4 w-4 text-emerald-400" />
{currentMission.status === "blocked" ? "Continue Mission" : "Resume Mission"}
{activeMission?.status === "blocked" ? "Continue Mission" : "Resume Mission"}
</button>
<button
onClick={() => handleResumeMission(true)}
@@ -1449,11 +1548,11 @@ export default function ControlClient() {
title="Delete work folder and start fresh"
>
<Trash2 className="h-4 w-4 text-orange-400" />
Clean & {currentMission.status === "blocked" ? "Continue" : "Resume"}
Clean & {activeMission?.status === "blocked" ? "Continue" : "Resume"}
</button>
</>
)}
{currentMission.status !== "active" && currentMission.status !== "interrupted" && currentMission.status !== "blocked" && (
{activeMission?.status !== "active" && activeMission?.status !== "interrupted" && activeMission?.status !== "blocked" && (
<button
onClick={() => handleSetStatus("active")}
className="flex w-full items-center gap-2 px-3 py-2 text-sm text-white/70 hover:bg-white/[0.04]"
@@ -1519,7 +1618,7 @@ export default function ControlClient() {
))}
</select>
<p className="text-xs text-white/30 mt-1.5">
Auto uses Claude Sonnet 4
Auto uses Claude Opus 4.5
</p>
</div>
<div className="flex gap-2 pt-1">
@@ -1568,6 +1667,26 @@ export default function ControlClient() {
</button>
)}
{/* Desktop stream toggle */}
<button
onClick={() => setShowDesktopStream(!showDesktopStream)}
className={cn(
"flex items-center gap-2 rounded-lg border px-3 py-2 text-sm transition-colors",
showDesktopStream
? "border-emerald-500/30 bg-emerald-500/10 text-emerald-400"
: "border-white/[0.06] bg-white/[0.02] text-white/70 hover:bg-white/[0.04]"
)}
title={showDesktopStream ? "Hide desktop stream" : "Show desktop stream"}
>
<Monitor className="h-4 w-4" />
<span className="hidden sm:inline">Desktop</span>
{showDesktopStream ? (
<PanelRightClose className="h-4 w-4" />
) : (
<PanelRight className="h-4 w-4" />
)}
</button>
{/* Status panel */}
<div className="flex items-center gap-2 rounded-lg border border-white/[0.06] bg-white/[0.02] px-3 py-2">
{/* Run state indicator */}
@@ -1721,8 +1840,13 @@ export default function ControlClient() {
</div>
)}
{/* Chat container */}
<div className="flex-1 min-h-0 flex flex-col rounded-2xl glass-panel border border-white/[0.06] overflow-hidden relative">
{/* Main content area - Chat and Desktop stream side by side */}
<div className="flex-1 min-h-0 flex gap-4">
{/* Chat container */}
<div className={cn(
"flex-1 min-h-0 flex flex-col rounded-2xl glass-panel border border-white/[0.06] overflow-hidden relative transition-all duration-300",
showDesktopStream && "flex-[2]"
)}>
{/* Messages */}
<div ref={containerRef} className="flex-1 overflow-y-auto p-6">
{items.length === 0 ? (
@@ -1747,26 +1871,26 @@ export default function ControlClient() {
arrive.
</p>
</>
) : currentMission && currentMission.status !== "active" ? (
) : activeMission && activeMission.status !== "active" ? (
<>
<h2 className="text-lg font-medium text-white">
{currentMission.status === "interrupted"
{activeMission.status === "interrupted"
? "Mission Interrupted"
: currentMission.status === "blocked"
: activeMission.status === "blocked"
? "Iteration Limit Reached"
: "No conversation history"}
</h2>
<p className="mt-2 text-sm text-white/40 max-w-sm">
{currentMission.status === "interrupted" ? (
{activeMission.status === "interrupted" ? (
<>This mission was interrupted (server shutdown or cancellation). Click the <strong className="text-amber-400">Resume</strong> button in the mission menu to continue where you left off.</>
) : currentMission.status === "blocked" ? (
) : activeMission.status === "blocked" ? (
<>The agent reached its iteration limit ({maxIterations}). You can continue the mission to give it more iterations.</>
) : (
<>This mission was {currentMission.status} without any messages.
{currentMission.status === "completed" && " You can reactivate it to continue."}</>
<>This mission was {activeMission.status} without any messages.
{activeMission.status === "completed" && " You can reactivate it to continue."}</>
)}
</p>
{currentMission.status === "blocked" && (
{activeMission.status === "blocked" && (
<div className="mt-4 flex gap-2">
<button
onClick={() => handleResumeMission(false)}
@@ -2111,7 +2235,7 @@ export default function ControlClient() {
})}
{/* Continue banner for blocked missions */}
{currentMission?.status === "blocked" && items.length > 0 && (
{activeMission?.status === "blocked" && items.length > 0 && (
<div className="flex justify-center py-4">
<div className="flex items-center gap-3 rounded-xl bg-amber-500/10 border border-amber-500/20 px-5 py-3">
<Clock className="h-5 w-5 text-amber-400" />
@@ -2312,6 +2436,18 @@ export default function ControlClient() {
</form>
</div>
</div>
{/* Desktop Stream Panel */}
{showDesktopStream && (
<div className="flex-1 min-h-0 transition-all duration-300 animate-fade-in">
<DesktopStream
displayId={desktopDisplayId}
className="h-full"
onClose={() => setShowDesktopStream(false)}
/>
</div>
)}
</div>
</div>
);
}

View File

@@ -2,7 +2,7 @@
import { useEffect, useMemo, useState } from 'react';
import { login, getHealth } from '@/lib/api';
import { clearJwt, getValidJwt, setJwt } from '@/lib/auth';
import { clearJwt, getValidJwt, setJwt, signalAuthSuccess } from '@/lib/auth';
import { Lock } from 'lucide-react';
export function AuthGate({ children }: { children: React.ReactNode }) {
@@ -61,6 +61,7 @@ export function AuthGate({ children }: { children: React.ReactNode }) {
setJwt(res.token, res.exp);
setIsAuthed(true);
setPassword('');
signalAuthSuccess();
} catch {
setError('Invalid password');
} finally {

View File

@@ -0,0 +1,420 @@
"use client";
import { useState, useEffect, useRef, useCallback } from "react";
import { cn } from "@/lib/utils";
import { getValidJwt } from "@/lib/auth";
import { getRuntimeApiBase } from "@/lib/settings";
import {
Monitor,
Play,
Pause,
RefreshCw,
X,
Settings,
Maximize2,
Minimize2,
} from "lucide-react";
interface DesktopStreamProps {
displayId?: string;
className?: string;
onClose?: () => void;
initialFps?: number;
initialQuality?: number;
}
type ConnectionState = "connecting" | "connected" | "disconnected" | "error";
export function DesktopStream({
displayId = ":99",
className,
onClose,
initialFps = 10,
initialQuality = 70,
}: DesktopStreamProps) {
const [connectionState, setConnectionState] =
useState<ConnectionState>("connecting");
const [isPaused, setIsPaused] = useState(false);
const [frameCount, setFrameCount] = useState(0);
const [errorMessage, setErrorMessage] = useState<string | null>(null);
const [showControls, setShowControls] = useState(true);
const [fps, setFps] = useState(initialFps);
const [quality, setQuality] = useState(initialQuality);
const [isFullscreen, setIsFullscreen] = useState(false);
const wsRef = useRef<WebSocket | null>(null);
const canvasRef = useRef<HTMLCanvasElement>(null);
const containerRef = useRef<HTMLDivElement>(null);
const connectionIdRef = useRef(0); // Guard against stale callbacks from old connections
// Refs to store current values without triggering reconnection on slider changes
const fpsRef = useRef(initialFps);
const qualityRef = useRef(initialQuality);
// Keep refs in sync with state
fpsRef.current = fps;
qualityRef.current = quality;
// Build WebSocket URL - uses refs to get current values without causing reconnections
const buildWsUrl = useCallback(() => {
const baseUrl = getRuntimeApiBase();
// Convert https to wss, http to ws
const wsUrl = baseUrl
.replace("https://", "wss://")
.replace("http://", "ws://");
// Use refs for current values - refs don't trigger useCallback dependency changes
const params = new URLSearchParams({
display: displayId,
fps: fpsRef.current.toString(),
quality: qualityRef.current.toString(),
});
return `${wsUrl}/api/desktop/stream?${params}`;
}, [displayId]);
// Connect to WebSocket
const connect = useCallback(() => {
// Clean up existing connection
if (wsRef.current) {
wsRef.current.close();
}
// Increment connection ID to invalidate stale callbacks
connectionIdRef.current += 1;
const thisConnectionId = connectionIdRef.current;
setConnectionState("connecting");
setErrorMessage(null);
const url = buildWsUrl();
// Get JWT token using proper auth module
const jwt = getValidJwt();
const token = jwt?.token ?? null;
// Create WebSocket with subprotocol auth
const protocols = token ? ["openagent", `jwt.${token}`] : ["openagent"];
const ws = new WebSocket(url, protocols);
ws.binaryType = "arraybuffer";
ws.onopen = () => {
// Guard against stale callbacks from previous connections
if (connectionIdRef.current !== thisConnectionId) return;
setConnectionState("connected");
setErrorMessage(null);
};
ws.onmessage = (event) => {
// Guard against stale callbacks
if (connectionIdRef.current !== thisConnectionId) return;
if (event.data instanceof ArrayBuffer) {
// Binary data = JPEG frame
const blob = new Blob([event.data], { type: "image/jpeg" });
const imageUrl = URL.createObjectURL(blob);
const img = new Image();
img.onload = () => {
const canvas = canvasRef.current;
if (canvas) {
const ctx = canvas.getContext("2d");
if (ctx) {
// Resize canvas to match image
if (
canvas.width !== img.width ||
canvas.height !== img.height
) {
canvas.width = img.width;
canvas.height = img.height;
}
ctx.drawImage(img, 0, 0);
setFrameCount((prev) => prev + 1);
}
}
URL.revokeObjectURL(imageUrl);
};
img.onerror = () => {
// Revoke URL on failed load to prevent memory leak
URL.revokeObjectURL(imageUrl);
};
img.src = imageUrl;
} else if (typeof event.data === "string") {
// Text message = JSON (error or control response)
try {
const json = JSON.parse(event.data);
if (json.error) {
setErrorMessage(json.message || json.error);
}
} catch {
// Ignore parse errors
}
}
};
ws.onerror = () => {
// Guard against stale callbacks
if (connectionIdRef.current !== thisConnectionId) return;
setConnectionState("error");
setErrorMessage("Connection error");
};
ws.onclose = () => {
// Guard against stale callbacks from previous connections
if (connectionIdRef.current !== thisConnectionId) return;
setConnectionState("disconnected");
};
wsRef.current = ws;
}, [buildWsUrl]);
// Send command to server
const sendCommand = useCallback((cmd: Record<string, unknown>) => {
if (wsRef.current?.readyState === WebSocket.OPEN) {
wsRef.current.send(JSON.stringify(cmd));
}
}, []);
// Control handlers
const handlePause = useCallback(() => {
setIsPaused(true);
sendCommand({ t: "pause" });
}, [sendCommand]);
const handleResume = useCallback(() => {
setIsPaused(false);
sendCommand({ t: "resume" });
}, [sendCommand]);
const handleFpsChange = useCallback(
(newFps: number) => {
setFps(newFps);
sendCommand({ t: "fps", fps: newFps });
},
[sendCommand]
);
const handleQualityChange = useCallback(
(newQuality: number) => {
setQuality(newQuality);
sendCommand({ t: "quality", quality: newQuality });
},
[sendCommand]
);
const handleFullscreen = useCallback(() => {
if (!containerRef.current) return;
if (!isFullscreen) {
// Don't set state here - let the fullscreenchange event handler do it
// This prevents state desync if fullscreen request fails
containerRef.current.requestFullscreen?.();
} else {
document.exitFullscreen?.();
}
}, [isFullscreen]);
// Connect on mount
useEffect(() => {
connect();
return () => {
wsRef.current?.close();
};
}, [connect]);
// Listen for fullscreen changes and errors
useEffect(() => {
const handleFullscreenChange = () => {
setIsFullscreen(!!document.fullscreenElement);
};
const handleFullscreenError = () => {
// Fullscreen request failed - ensure state reflects reality
setIsFullscreen(false);
};
document.addEventListener("fullscreenchange", handleFullscreenChange);
document.addEventListener("fullscreenerror", handleFullscreenError);
return () => {
document.removeEventListener("fullscreenchange", handleFullscreenChange);
document.removeEventListener("fullscreenerror", handleFullscreenError);
};
}, []);
return (
<div
ref={containerRef}
className={cn(
"relative flex flex-col bg-[#0a0a0a] rounded-xl overflow-hidden border border-white/[0.06]",
className
)}
onMouseEnter={() => setShowControls(true)}
onMouseLeave={() => setShowControls(false)}
>
{/* Header */}
<div
className={cn(
"absolute top-0 left-0 right-0 z-10 flex items-center justify-between px-4 py-2 bg-gradient-to-b from-black/80 to-transparent transition-opacity duration-200",
showControls ? "opacity-100" : "opacity-0"
)}
>
<div className="flex items-center gap-3">
<div
className={cn(
"flex items-center gap-2 text-xs",
connectionState === "connected"
? "text-emerald-400"
: connectionState === "connecting"
? "text-amber-400"
: "text-red-400"
)}
>
<div
className={cn(
"w-2 h-2 rounded-full",
connectionState === "connected"
? "bg-emerald-400"
: connectionState === "connecting"
? "bg-amber-400 animate-pulse"
: "bg-red-400"
)}
/>
{connectionState === "connected"
? isPaused
? "Paused"
: "Live"
: connectionState === "connecting"
? "Connecting..."
: "Disconnected"}
</div>
<span className="text-xs text-white/40 font-mono">{displayId}</span>
<span className="text-xs text-white/30">{frameCount} frames</span>
</div>
<div className="flex items-center gap-2">
<button
onClick={handleFullscreen}
className="p-1.5 rounded-lg hover:bg-white/10 text-white/60 hover:text-white transition-colors"
title={isFullscreen ? "Exit fullscreen" : "Fullscreen"}
>
{isFullscreen ? (
<Minimize2 className="w-4 h-4" />
) : (
<Maximize2 className="w-4 h-4" />
)}
</button>
{onClose && (
<button
onClick={onClose}
className="p-1.5 rounded-lg hover:bg-white/10 text-white/60 hover:text-white transition-colors"
title="Close"
>
<X className="w-4 h-4" />
</button>
)}
</div>
</div>
{/* Canvas */}
<div className="flex-1 flex items-center justify-center bg-black min-h-[200px]">
{connectionState === "connected" && !errorMessage ? (
<canvas
ref={canvasRef}
className="max-w-full max-h-full object-contain"
/>
) : connectionState === "connecting" ? (
<div className="flex flex-col items-center gap-3 text-white/60">
<Monitor className="w-12 h-12 animate-pulse" />
<span className="text-sm">Connecting to desktop...</span>
</div>
) : (
<div className="flex flex-col items-center gap-3 text-white/60">
<Monitor className="w-12 h-12 text-red-400/60" />
<span className="text-sm text-red-400">
{errorMessage || "Connection lost"}
</span>
<button
onClick={connect}
className="flex items-center gap-2 px-3 py-1.5 rounded-lg bg-indigo-500 text-white text-sm hover:bg-indigo-600 transition-colors"
>
<RefreshCw className="w-4 h-4" />
Reconnect
</button>
</div>
)}
</div>
{/* Controls */}
<div
className={cn(
"absolute bottom-0 left-0 right-0 z-10 p-4 bg-gradient-to-t from-black/80 to-transparent transition-opacity duration-200",
showControls ? "opacity-100" : "opacity-0"
)}
>
<div className="flex items-center justify-between gap-4">
{/* Play/Pause */}
<div className="flex items-center gap-2">
<button
onClick={isPaused ? handleResume : handlePause}
disabled={connectionState !== "connected"}
className={cn(
"p-2 rounded-full transition-colors",
connectionState === "connected"
? "bg-white/10 hover:bg-white/20 text-white"
: "bg-white/5 text-white/30 cursor-not-allowed"
)}
title={isPaused ? "Resume" : "Pause"}
>
{isPaused ? (
<Play className="w-5 h-5" />
) : (
<Pause className="w-5 h-5" />
)}
</button>
<button
onClick={connect}
className="p-2 rounded-full bg-white/10 hover:bg-white/20 text-white transition-colors"
title="Reconnect"
>
<RefreshCw className="w-4 h-4" />
</button>
</div>
{/* Sliders */}
<div className="flex-1 flex items-center gap-6 max-w-md">
<div className="flex-1 flex items-center gap-2">
<span className="text-xs text-white/40 w-8">FPS</span>
<input
type="range"
min={1}
max={30}
value={fps}
onChange={(e) => handleFpsChange(Number(e.target.value))}
className="flex-1 h-1 bg-white/20 rounded-full appearance-none cursor-pointer [&::-webkit-slider-thumb]:appearance-none [&::-webkit-slider-thumb]:w-3 [&::-webkit-slider-thumb]:h-3 [&::-webkit-slider-thumb]:bg-indigo-500 [&::-webkit-slider-thumb]:rounded-full [&::-webkit-slider-thumb]:cursor-pointer"
/>
<span className="text-xs text-white/60 w-6 text-right tabular-nums">
{fps}
</span>
</div>
<div className="flex-1 flex items-center gap-2">
<span className="text-xs text-white/40 w-12">Quality</span>
<input
type="range"
min={10}
max={100}
step={5}
value={quality}
onChange={(e) => handleQualityChange(Number(e.target.value))}
className="flex-1 h-1 bg-white/20 rounded-full appearance-none cursor-pointer [&::-webkit-slider-thumb]:appearance-none [&::-webkit-slider-thumb]:w-3 [&::-webkit-slider-thumb]:h-3 [&::-webkit-slider-thumb]:bg-indigo-500 [&::-webkit-slider-thumb]:rounded-full [&::-webkit-slider-thumb]:cursor-pointer"
/>
<span className="text-xs text-white/60 w-8 text-right tabular-nums">
{quality}%
</span>
</div>
</div>
</div>
</div>
</div>
);
}

View File

@@ -50,3 +50,8 @@ export function signalAuthRequired(): void {
if (typeof window === 'undefined') return;
window.dispatchEvent(new CustomEvent('openagent:auth:required'));
}
export function signalAuthSuccess(): void {
if (typeof window === 'undefined') return;
window.dispatchEvent(new CustomEvent('openagent:auth:success'));
}

View File

@@ -17,20 +17,22 @@
3DD4D1D2E080C2F89C4881B7 /* ToolUIModels.swift in Sources */ = {isa = PBXBuildFile; fileRef = 8A6128ECBCA632D9E2D415F2 /* ToolUIModels.swift */; };
4B50B97618C0CC469FF64592 /* Theme.swift in Sources */ = {isa = PBXBuildFile; fileRef = 504A1222CE8971417834D229 /* Theme.swift */; };
4D0CF2666262F45370D000DF /* TerminalView.swift in Sources */ = {isa = PBXBuildFile; fileRef = 0AC6317C4EAD4DB9A8190209 /* TerminalView.swift */; };
51436A7671B1E3C8478F81A2 /* RunningMissionsBar.swift in Sources */ = {isa = PBXBuildFile; fileRef = E7FC053808661C9A0E21E83C /* RunningMissionsBar.swift */; };
5152C5313CD5AC01276D0AE6 /* FileEntry.swift in Sources */ = {isa = PBXBuildFile; fileRef = BA70A2A73D3A386EAFD69FC4 /* FileEntry.swift */; };
652A0AE498D69C9DB728B2DF /* ANSIParser.swift in Sources */ = {isa = PBXBuildFile; fileRef = CD8D224B6758B664864F3987 /* ANSIParser.swift */; };
6865FE997D3E1D91D411F6BC /* LoadingView.swift in Sources */ = {isa = PBXBuildFile; fileRef = 2B9834D4EE32058824F9DF00 /* LoadingView.swift */; };
6B87076797C9DFA01E24CC76 /* FilesView.swift in Sources */ = {isa = PBXBuildFile; fileRef = 5908645A518F48B501390AB8 /* FilesView.swift */; };
7B064EE6C0C1039360CCE40B /* DesktopStreamService.swift in Sources */ = {isa = PBXBuildFile; fileRef = A07EFDD6964AA3B251967041 /* DesktopStreamService.swift */; };
83BB0F0AAFE4F2735FF76B87 /* NavigationState.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3729F39FBF53046124D05BC1 /* NavigationState.swift */; };
999ACAA94B0BD81A05288092 /* GlassCard.swift in Sources */ = {isa = PBXBuildFile; fileRef = EB5A4720378F06807FDE73E1 /* GlassCard.swift */; };
9BC40E40E1B5622B24328AEB /* Mission.swift in Sources */ = {isa = PBXBuildFile; fileRef = D4AB47CF121ABA1946A4D879 /* Mission.swift */; };
AA02567226057045DDD61CB1 /* ContentView.swift in Sources */ = {isa = PBXBuildFile; fileRef = 99B57FC3136B64DC87413CA6 /* ContentView.swift */; };
BD52A9FE6C97C3CC53810094 /* DesktopStreamView.swift in Sources */ = {isa = PBXBuildFile; fileRef = FCA36F5FA00B575DDD336598 /* DesktopStreamView.swift */; };
CA70EC5A864C3D007D42E781 /* ChatMessage.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3CB591B632D3EF26AB217976 /* ChatMessage.swift */; };
D64972881E36894950658708 /* APIService.swift in Sources */ = {isa = PBXBuildFile; fileRef = CBC90C32FEF604E025FFBF78 /* APIService.swift */; };
DA4634D7424AF3FC985987E7 /* GlassButton.swift in Sources */ = {isa = PBXBuildFile; fileRef = 5267DE67017A858357F68424 /* GlassButton.swift */; };
FA7E68F22D16E1AC0B5F5E22 /* StatusBadge.swift in Sources */ = {isa = PBXBuildFile; fileRef = CD6FB2E54DC07BE7A1EB08F8 /* StatusBadge.swift */; };
FF9C447978711CBA9185B8B0 /* OpenAgentDashboardApp.swift in Sources */ = {isa = PBXBuildFile; fileRef = 139C740B7D55C13F3B167EF3 /* OpenAgentDashboardApp.swift */; };
3425BC785C1C91F71D626BE5 /* RunningMissionsBar.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7F50E98346B3C796078CAF21 /* RunningMissionsBar.swift */; };
/* End PBXBuildFile section */
/* Begin PBXFileReference section */
@@ -51,6 +53,7 @@
66A48A20D2178760301256C9 /* Assets.xcassets */ = {isa = PBXFileReference; lastKnownFileType = folder.assetcatalog; path = Assets.xcassets; sourceTree = "<group>"; };
8A6128ECBCA632D9E2D415F2 /* ToolUIModels.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ToolUIModels.swift; sourceTree = "<group>"; };
99B57FC3136B64DC87413CA6 /* ContentView.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ContentView.swift; sourceTree = "<group>"; };
A07EFDD6964AA3B251967041 /* DesktopStreamService.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = DesktopStreamService.swift; sourceTree = "<group>"; };
A4D419C8490A0C5FC4DCDF20 /* ToolUIOptionListView.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ToolUIOptionListView.swift; sourceTree = "<group>"; };
A84519FDE8FC75084938B292 /* ControlView.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ControlView.swift; sourceTree = "<group>"; };
BA70A2A73D3A386EAFD69FC4 /* FileEntry.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = FileEntry.swift; sourceTree = "<group>"; };
@@ -58,9 +61,10 @@
CD6FB2E54DC07BE7A1EB08F8 /* StatusBadge.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = StatusBadge.swift; sourceTree = "<group>"; };
CD8D224B6758B664864F3987 /* ANSIParser.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ANSIParser.swift; sourceTree = "<group>"; };
D4AB47CF121ABA1946A4D879 /* Mission.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Mission.swift; sourceTree = "<group>"; };
E7FC053808661C9A0E21E83C /* RunningMissionsBar.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = RunningMissionsBar.swift; sourceTree = "<group>"; };
EB5A4720378F06807FDE73E1 /* GlassCard.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = GlassCard.swift; sourceTree = "<group>"; };
F51395D8FB559D3C79AAA0A4 /* OpenAgentDashboard.app */ = {isa = PBXFileReference; includeInIndex = 0; lastKnownFileType = wrapper.application; path = OpenAgentDashboard.app; sourceTree = BUILT_PRODUCTS_DIR; };
7F50E98346B3C796078CAF21 /* RunningMissionsBar.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = RunningMissionsBar.swift; sourceTree = "<group>"; };
FCA36F5FA00B575DDD336598 /* DesktopStreamView.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = DesktopStreamView.swift; sourceTree = "<group>"; };
/* End PBXFileReference section */
/* Begin PBXGroup section */
@@ -102,7 +106,7 @@
5267DE67017A858357F68424 /* GlassButton.swift */,
EB5A4720378F06807FDE73E1 /* GlassCard.swift */,
2B9834D4EE32058824F9DF00 /* LoadingView.swift */,
7F50E98346B3C796078CAF21 /* RunningMissionsBar.swift */,
E7FC053808661C9A0E21E83C /* RunningMissionsBar.swift */,
CD6FB2E54DC07BE7A1EB08F8 /* StatusBadge.swift */,
D09E84E812213CF7E52E4FEF /* ToolUI */,
);
@@ -122,6 +126,7 @@
children = (
2EF415E84544334B25BD8E26 /* Components */,
DABAA3652C0B0A54CFC3221B /* Control */,
A688A831235D3E218A0A6783 /* Desktop */,
0C1185300420EEF31B892A3A /* Files */,
5A40B212F0D2055C1C499FCC /* History */,
0D9369EE2F3374EAA1EF332E /* Terminal */,
@@ -129,6 +134,14 @@
path = Views;
sourceTree = "<group>";
};
A688A831235D3E218A0A6783 /* Desktop */ = {
isa = PBXGroup;
children = (
FCA36F5FA00B575DDD336598 /* DesktopStreamView.swift */,
);
path = Desktop;
sourceTree = "<group>";
};
AB86DCEEB152D8EA7E8CBD86 = {
isa = PBXGroup;
children = (
@@ -187,6 +200,7 @@
children = (
CD8D224B6758B664864F3987 /* ANSIParser.swift */,
CBC90C32FEF604E025FFBF78 /* APIService.swift */,
A07EFDD6964AA3B251967041 /* DesktopStreamService.swift */,
3729F39FBF53046124D05BC1 /* NavigationState.swift */,
52DDF35DB8CD7D70F3CFC4A6 /* TerminalState.swift */,
);
@@ -268,6 +282,8 @@
CA70EC5A864C3D007D42E781 /* ChatMessage.swift in Sources */,
AA02567226057045DDD61CB1 /* ContentView.swift in Sources */,
02DB7F25245D03FF72DD8E2E /* ControlView.swift in Sources */,
7B064EE6C0C1039360CCE40B /* DesktopStreamService.swift in Sources */,
BD52A9FE6C97C3CC53810094 /* DesktopStreamView.swift in Sources */,
5152C5313CD5AC01276D0AE6 /* FileEntry.swift in Sources */,
6B87076797C9DFA01E24CC76 /* FilesView.swift in Sources */,
DA4634D7424AF3FC985987E7 /* GlassButton.swift in Sources */,
@@ -277,6 +293,7 @@
9BC40E40E1B5622B24328AEB /* Mission.swift in Sources */,
83BB0F0AAFE4F2735FF76B87 /* NavigationState.swift in Sources */,
FF9C447978711CBA9185B8B0 /* OpenAgentDashboardApp.swift in Sources */,
51436A7671B1E3C8478F81A2 /* RunningMissionsBar.swift in Sources */,
FA7E68F22D16E1AC0B5F5E22 /* StatusBadge.swift in Sources */,
0B5E1A6153270BFF21A54C23 /* TerminalState.swift in Sources */,
4D0CF2666262F45370D000DF /* TerminalView.swift in Sources */,
@@ -285,7 +302,6 @@
3DD4D1D2E080C2F89C4881B7 /* ToolUIModels.swift in Sources */,
03176DF3878C25A0B557462C /* ToolUIOptionListView.swift in Sources */,
3361B14E949CB2A6E75B6962 /* ToolUIView.swift in Sources */,
3425BC785C1C91F71D626BE5 /* RunningMissionsBar.swift in Sources */,
);
runOnlyForDeploymentPostprocessing = 0;
};

View File

@@ -0,0 +1,182 @@
//
// DesktopStreamService.swift
// OpenAgentDashboard
//
// WebSocket client for MJPEG desktop streaming
//
import Foundation
import Observation
import UIKit
@MainActor
@Observable
final class DesktopStreamService {
static let shared = DesktopStreamService()
nonisolated init() {}
// Stream state
var isConnected = false
var isPaused = false
var currentFrame: UIImage?
var errorMessage: String?
var frameCount: UInt64 = 0
var fps: Int = 10
var quality: Int = 70
private var webSocket: URLSessionWebSocketTask?
private var displayId: String?
// Connection ID to prevent stale callbacks from corrupting state
private var connectionId: UInt64 = 0
// MARK: - Connection
func connect(displayId: String) {
disconnect()
self.displayId = displayId
self.errorMessage = nil
// Increment connection ID to invalidate any pending callbacks from old connections
self.connectionId += 1
guard let url = buildWebSocketURL(displayId: displayId) else {
errorMessage = "Invalid URL"
return
}
let session = URLSession(configuration: .default)
var request = URLRequest(url: url)
// Add JWT token via subprotocol (same pattern as console)
if let token = UserDefaults.standard.string(forKey: "jwt_token") {
request.setValue("openagent, jwt.\(token)", forHTTPHeaderField: "Sec-WebSocket-Protocol")
} else {
request.setValue("openagent", forHTTPHeaderField: "Sec-WebSocket-Protocol")
}
webSocket = session.webSocketTask(with: request)
webSocket?.resume()
// Note: isConnected will be set to true on first successful message receive
// Start receiving frames with current connection ID
receiveMessage(forConnection: connectionId)
}
func disconnect() {
webSocket?.cancel(with: .normalClosure, reason: nil)
webSocket = nil
isConnected = false
isPaused = false // Reset paused state for fresh connection
currentFrame = nil
frameCount = 0
}
// MARK: - Controls
func pause() {
guard isConnected else { return }
isPaused = true
sendCommand(["t": "pause"])
}
func resume() {
guard isConnected else { return }
isPaused = false
sendCommand(["t": "resume"])
}
func setFps(_ newFps: Int) {
fps = newFps
guard isConnected else { return }
sendCommand(["t": "fps", "fps": newFps])
}
func setQuality(_ newQuality: Int) {
quality = newQuality
guard isConnected else { return }
sendCommand(["t": "quality", "quality": newQuality])
}
// MARK: - Private
private func buildWebSocketURL(displayId: String) -> URL? {
let baseURL = UserDefaults.standard.string(forKey: "api_base_url") ?? "https://agent-backend.thomas.md"
// Convert https to wss, http to ws
var wsURL = baseURL
.replacingOccurrences(of: "https://", with: "wss://")
.replacingOccurrences(of: "http://", with: "ws://")
// Build query string
let encodedDisplay = displayId.addingPercentEncoding(withAllowedCharacters: .urlQueryAllowed) ?? displayId
wsURL += "/api/desktop/stream?display=\(encodedDisplay)&fps=\(fps)&quality=\(quality)"
return URL(string: wsURL)
}
private func sendCommand(_ command: [String: Any]) {
guard let webSocket = webSocket,
let data = try? JSONSerialization.data(withJSONObject: command),
let string = String(data: data, encoding: .utf8) else {
return
}
webSocket.send(.string(string)) { [weak self] error in
if let error = error {
Task { @MainActor in
self?.errorMessage = "Send failed: \(error.localizedDescription)"
}
}
}
}
private func receiveMessage(forConnection connId: UInt64) {
webSocket?.receive { [weak self] result in
Task { @MainActor in
guard let self = self else { return }
// Ignore callbacks from stale connections
// This prevents old WebSocket failures from corrupting new connection state
guard self.connectionId == connId else { return }
switch result {
case .success(let message):
// Mark as connected on first successful message
if !self.isConnected {
self.isConnected = true
}
self.handleMessage(message)
// Continue receiving with same connection ID
self.receiveMessage(forConnection: connId)
case .failure(let error):
self.errorMessage = "Connection lost: \(error.localizedDescription)"
self.isConnected = false
}
}
}
}
private func handleMessage(_ message: URLSessionWebSocketTask.Message) {
switch message {
case .data(let data):
// Binary data = JPEG frame
if let image = UIImage(data: data) {
currentFrame = image
frameCount += 1
errorMessage = nil
}
case .string(let text):
// Text message = JSON (error or control response)
if let data = text.data(using: .utf8),
let json = try? JSONSerialization.jsonObject(with: data) as? [String: Any] {
if let error = json["error"] as? String {
errorMessage = json["message"] as? String ?? error
}
}
@unknown default:
break
}
}
}

View File

@@ -6,6 +6,7 @@
//
import SwiftUI
import os
struct ControlView: View {
@State private var messages: [ChatMessage] = []
@@ -13,6 +14,7 @@ struct ControlView: View {
@State private var runState: ControlRunState = .idle
@State private var queueLength = 0
@State private var currentMission: Mission?
@State private var viewingMission: Mission?
@State private var isLoading = true
@State private var streamTask: Task<Void, Never>?
@State private var showMissionMenu = false
@@ -21,8 +23,8 @@ struct ControlView: View {
@State private var isAtBottom = true
@State private var copiedMessageId: String?
// Connection state for SSE stream
@State private var connectionState: ConnectionState = .connected
// Connection state for SSE stream - starts as disconnected until first event received
@State private var connectionState: ConnectionState = .disconnected
@State private var reconnectAttempt = 0
// Parallel missions state
@@ -33,7 +35,11 @@ struct ControlView: View {
// Track pending fetch to prevent race conditions
@State private var fetchingMissionId: String?
// Desktop stream state
@State private var showDesktopStream = false
@State private var desktopDisplayId = ":99"
@FocusState private var isInputFocused: Bool
private let api = APIService.shared
@@ -61,12 +67,12 @@ struct ControlView: View {
inputView
}
}
.navigationTitle(currentMission?.displayTitle ?? "Control")
.navigationTitle(viewingMission?.displayTitle ?? "Control")
.navigationBarTitleDisplayMode(.inline)
.toolbar {
ToolbarItem(placement: .principal) {
VStack(spacing: 2) {
Text(currentMission?.displayTitle ?? "Control")
Text(viewingMission?.displayTitle ?? "Control")
.font(.headline)
.foregroundStyle(Theme.textPrimary)
@@ -127,6 +133,18 @@ struct ControlView: View {
}
}
ToolbarItem(placement: .topBarTrailing) {
// Desktop stream button
Button {
showDesktopStream = true
HapticService.lightTap()
} label: {
Image(systemName: "display")
.font(.system(size: 14))
.foregroundStyle(Theme.textSecondary)
}
}
ToolbarItem(placement: .topBarTrailing) {
Menu {
Button {
@@ -134,10 +152,17 @@ struct ControlView: View {
} label: {
Label("New Mission", systemImage: "plus")
}
if let mission = currentMission {
// Desktop stream option in menu too
Button {
showDesktopStream = true
} label: {
Label("View Desktop", systemImage: "display")
}
if let mission = viewingMission {
Divider()
// Resume button for interrupted/blocked missions
if mission.canResume {
Button {
@@ -146,19 +171,19 @@ struct ControlView: View {
Label("Resume Mission", systemImage: "play.circle")
}
}
Button {
Task { await setMissionStatus(.completed) }
} label: {
Label("Mark Complete", systemImage: "checkmark.circle")
}
Button(role: .destructive) {
Task { await setMissionStatus(.failed) }
} label: {
Label("Mark Failed", systemImage: "xmark.circle")
}
if mission.status != .active && !mission.canResume {
Button {
Task { await setMissionStatus(.active) }
@@ -177,10 +202,10 @@ struct ControlView: View {
// Check if we're being opened with a specific mission from History
if let pendingId = nav.consumePendingMission() {
await loadMission(id: pendingId)
viewingMissionId = pendingId
// Also load the current mission in the background for main-session context
await loadCurrentMission(updateViewing: false)
} else {
await loadCurrentMission()
viewingMissionId = currentMission?.id
await loadCurrentMission(updateViewing: true)
}
// Fetch initial running missions
@@ -200,22 +225,27 @@ struct ControlView: View {
nav.pendingMissionId = nil
Task {
await loadMission(id: missionId)
viewingMissionId = missionId
}
}
}
.onChange(of: currentMission?.id) { _, newId in
// Sync viewingMissionId with currentMission when it changes
if viewingMissionId == nil, let id = newId {
viewingMissionId = id
// Sync viewing mission with current mission if nothing is being viewed yet
if viewingMissionId == nil, let id = newId, let mission = currentMission, mission.id == id {
applyViewingMission(mission)
}
}
.onDisappear {
streamTask?.cancel()
connectionState = .connected
connectionState = .disconnected
reconnectAttempt = 0
pollingTask?.cancel()
}
.sheet(isPresented: $showDesktopStream) {
DesktopStreamView(displayId: desktopDisplayId)
.presentationDetents([.medium, .large])
.presentationDragIndicator(.visible)
.presentationBackgroundInteraction(.enabled(upThrough: .medium))
}
}
// MARK: - Running Missions Bar
@@ -601,25 +631,33 @@ struct ControlView: View {
// MARK: - Actions
private func loadCurrentMission() async {
private func applyViewingMission(_ mission: Mission, scrollToBottom: Bool = true) {
viewingMission = mission
viewingMissionId = mission.id
messages = mission.history.enumerated().map { index, entry in
ChatMessage(
id: "\(mission.id)-\(index)",
type: entry.isUser ? .user : .assistant(success: true, costCents: 0, model: nil),
content: entry.content
)
}
if scrollToBottom {
DispatchQueue.main.asyncAfter(deadline: .now() + 0.1) {
shouldScrollToBottom = true
}
}
}
private func loadCurrentMission(updateViewing: Bool) async {
isLoading = true
defer { isLoading = false }
do {
if let mission = try await api.getCurrentMission() {
currentMission = mission
viewingMissionId = mission.id
messages = mission.history.enumerated().map { index, entry in
ChatMessage(
id: "\(mission.id)-\(index)",
type: entry.isUser ? .user : .assistant(success: true, costCents: 0, model: nil),
content: entry.content
)
}
// Scroll to bottom after loading
DispatchQueue.main.asyncAfter(deadline: .now() + 0.1) {
shouldScrollToBottom = true
if updateViewing || viewingMissionId == nil || viewingMissionId == mission.id {
applyViewingMission(mission)
}
}
} catch {
@@ -630,6 +668,9 @@ struct ControlView: View {
private func loadMission(id: String) async {
// Set target immediately for race condition tracking
fetchingMissionId = id
let previousViewingMission = viewingMission
let previousViewingId = viewingMissionId
viewingMissionId = id
isLoading = true
@@ -641,28 +682,25 @@ struct ControlView: View {
return // Another mission was requested, discard this response
}
currentMission = mission
viewingMissionId = mission.id
messages = mission.history.enumerated().map { index, entry in
ChatMessage(
id: "\(mission.id)-\(index)",
type: entry.isUser ? .user : .assistant(success: true, costCents: 0, model: nil),
content: entry.content
)
if currentMission?.id == mission.id {
currentMission = mission
}
applyViewingMission(mission)
isLoading = false
HapticService.success()
// Scroll to bottom after loading
DispatchQueue.main.asyncAfter(deadline: .now() + 0.1) {
shouldScrollToBottom = true
}
} catch {
// Race condition guard
guard fetchingMissionId == id else { return }
isLoading = false
print("Failed to load mission: \(error)")
// Revert viewing state to avoid filtering out events
if let fallback = previousViewingMission ?? currentMission {
applyViewingMission(fallback, scrollToBottom: false)
} else {
viewingMissionId = previousViewingId
}
}
}
@@ -670,8 +708,7 @@ struct ControlView: View {
do {
let mission = try await api.createMission()
currentMission = mission
viewingMissionId = mission.id
messages = []
applyViewingMission(mission, scrollToBottom: false)
// Reset status for the new mission - it hasn't started yet
runState = .idle
@@ -696,11 +733,14 @@ struct ControlView: View {
}
private func setMissionStatus(_ status: MissionStatus) async {
guard let mission = currentMission else { return }
guard let mission = viewingMission else { return }
do {
try await api.setMissionStatus(id: mission.id, status: status)
currentMission?.status = status
viewingMission?.status = status
if currentMission?.id == mission.id {
currentMission?.status = status
}
HapticService.success()
} catch {
print("Failed to set status: \(error)")
@@ -709,26 +749,17 @@ struct ControlView: View {
}
private func resumeMission() async {
guard let mission = currentMission, mission.canResume else { return }
guard let mission = viewingMission, mission.canResume else { return }
do {
let resumed = try await api.resumeMission(id: mission.id)
currentMission = resumed
viewingMissionId = resumed.id
// Reload messages to get the resume prompt
messages = resumed.history.enumerated().map { index, entry in
ChatMessage(
id: "\(resumed.id)-\(index)",
type: entry.isUser ? .user : .assistant(success: true, costCents: 0, model: nil),
content: entry.content
)
}
applyViewingMission(resumed)
// Refresh running missions
await refreshRunningMissions()
HapticService.success()
shouldScrollToBottom = true
} catch {
print("Failed to resume mission: \(error)")
HapticService.error()
@@ -777,14 +808,21 @@ struct ControlView: View {
}
// Start streaming - this will block until the stream ends
// Use OSAllocatedUnfairLock for thread-safe boolean access across actor boundaries
// Track successful (non-error) events separately from all events
let receivedSuccessfulEvent = OSAllocatedUnfairLock(initialState: false)
let streamCompleted = await withCheckedContinuation { continuation in
let innerTask = api.streamControl { eventType, data in
// Only count non-error events as successful for backoff reset
if eventType != "error" {
receivedSuccessfulEvent.withLock { $0 = true }
}
Task { @MainActor in
// Successfully received an event - we're connected
if !self.connectionState.isConnected {
self.connectionState = .connected
self.reconnectAttempt = 0
currentBackoff = 1
}
self.handleStreamEvent(type: eventType, data: data)
}
@@ -797,6 +835,12 @@ struct ControlView: View {
}
}
// Reset backoff only after receiving successful (non-error) events
// This prevents error events from resetting backoff when server is unavailable
if receivedSuccessfulEvent.withLock({ $0 }) {
currentBackoff = 1
}
// Stream ended - check if we should reconnect
guard !Task.isCancelled else { break }
@@ -840,6 +884,11 @@ struct ControlView: View {
guard id != viewingMissionId else { return }
// Set the target mission ID immediately for race condition tracking
let previousViewingMission = viewingMission
let previousViewingId = viewingMissionId
let previousRunState = runState
let previousQueueLength = queueLength
let previousProgress = progress
viewingMissionId = id
fetchingMissionId = id
@@ -847,10 +896,13 @@ struct ControlView: View {
// Determine the run state for this mission from runningMissions
if let runningInfo = runningMissions.first(where: { $0.missionId == id }) {
// This mission is in the running list
if runningInfo.isRunning {
// This mission is in the running list - map state string to enum properly
switch runningInfo.state {
case "running":
runState = .running
} else {
case "waiting_for_tool":
runState = .waitingForTool
default:
runState = .idle
}
queueLength = runningInfo.queueLen
@@ -870,31 +922,14 @@ struct ControlView: View {
return // Another mission was requested, discard this response
}
// If this is not a parallel mission, also update currentMission
if runningMissions.contains(where: { $0.missionId == id }) {
// This is a parallel mission - just load its history
messages = mission.history.enumerated().map { index, entry in
ChatMessage(
id: "\(mission.id)-\(index)",
type: entry.isUser ? .user : .assistant(success: true, costCents: 0, model: nil),
content: entry.content
)
}
} else {
// This is the main mission - load it fully
// Update current mission if this is the main mission, and update the viewed mission
if currentMission?.id == mission.id {
currentMission = mission
messages = mission.history.enumerated().map { index, entry in
ChatMessage(
id: "\(mission.id)-\(index)",
type: entry.isUser ? .user : .assistant(success: true, costCents: 0, model: nil),
content: entry.content
)
}
}
applyViewingMission(mission)
isLoading = false
HapticService.selectionChanged()
shouldScrollToBottom = true
} catch {
// Race condition guard: only show error if this is still the mission we want
guard fetchingMissionId == id else { return }
@@ -902,6 +937,16 @@ struct ControlView: View {
isLoading = false
print("Failed to switch mission: \(error)")
HapticService.error()
// Revert viewing state and status indicators to avoid filtering out events
runState = previousRunState
queueLength = previousQueueLength
progress = previousProgress
if let fallback = previousViewingMission ?? currentMission {
applyViewingMission(fallback, scrollToBottom: false)
} else {
viewingMissionId = previousViewingId
}
}
}
@@ -1053,7 +1098,7 @@ struct ControlView: View {
let total = data["total_subtasks"] as? Int ?? 0
let completed = data["completed_subtasks"] as? Int ?? 0
let current = data["current_subtask"] as? String
let depth = data["current_depth"] as? Int ?? 0
let depth = data["depth"] as? Int ?? data["current_depth"] as? Int ?? 0
if total > 0 {
progress = ExecutionProgress(
@@ -1066,14 +1111,17 @@ struct ControlView: View {
case "error":
if let errorMessage = data["message"] as? String {
// Filter out connection-related errors - these are handled by the reconnection logic
// and shown in the status bar, not as chat messages
let isConnectionError = errorMessage.lowercased().contains("stream connection failed") ||
errorMessage.lowercased().contains("timed out") ||
errorMessage.lowercased().contains("network") ||
errorMessage.lowercased().contains("connection")
// Filter out SSE-specific reconnection errors - these are handled by the reconnection logic
// Use specific patterns to avoid filtering legitimate agent errors
let lower = errorMessage.lowercased()
let isSseReconnectError = lower.contains("stream connection failed") ||
lower.contains("sse connection") ||
lower.contains("event stream") ||
lower == "timed out" ||
lower == "connection reset" ||
lower == "connection closed"
if !isConnectionError {
if !isSseReconnectError {
let message = ChatMessage(
id: "error-\(Date().timeIntervalSince1970)",
type: .error,
@@ -1503,4 +1551,3 @@ private struct MarkdownText: View {
ControlView()
}
}

View File

@@ -0,0 +1,247 @@
//
// DesktopStreamView.swift
// OpenAgentDashboard
//
// Real-time desktop stream viewer with controls
// Designed to be shown in a bottom sheet
//
import SwiftUI
struct DesktopStreamView: View {
@State private var streamService = DesktopStreamService.shared
@State private var showControls = true
@State private var displayId: String
@Environment(\.dismiss) private var dismiss
init(displayId: String = ":99") {
_displayId = State(initialValue: displayId)
}
var body: some View {
ZStack {
// Background
Theme.backgroundPrimary.ignoresSafeArea()
VStack(spacing: 0) {
// Header bar
headerView
// Stream content
streamContent
.frame(maxWidth: .infinity, maxHeight: .infinity)
// Controls (when visible)
if showControls {
controlsView
.transition(.move(edge: .bottom).combined(with: .opacity))
}
}
}
.onAppear {
streamService.connect(displayId: displayId)
}
.onDisappear {
streamService.disconnect()
}
.onTapGesture {
withAnimation(.easeInOut(duration: 0.2)) {
showControls.toggle()
}
}
}
// MARK: - Header
private var headerView: some View {
HStack(spacing: 12) {
// Connection indicator
HStack(spacing: 6) {
Circle()
.fill(streamService.isConnected ? Theme.success : Theme.error)
.frame(width: 8, height: 8)
.overlay {
if streamService.isConnected && !streamService.isPaused {
Circle()
.stroke(Theme.success.opacity(0.5), lineWidth: 2)
.frame(width: 14, height: 14)
.opacity(0.6)
}
}
Text(streamService.isConnected ? (streamService.isPaused ? "Paused" : "Live") : "Disconnected")
.font(.caption.weight(.medium))
.foregroundStyle(Theme.textSecondary)
}
Spacer()
// Display ID
Text(displayId)
.font(.caption.monospaced())
.foregroundStyle(Theme.textMuted)
// Frame counter
Text("\(streamService.frameCount) frames")
.font(.caption2.monospaced())
.foregroundStyle(Theme.textMuted)
// Close button
Button {
dismiss()
} label: {
Image(systemName: "xmark")
.font(.system(size: 14, weight: .medium))
.foregroundStyle(Theme.textSecondary)
.frame(width: 28, height: 28)
.background(Theme.backgroundSecondary)
.clipShape(Circle())
}
}
.padding(.horizontal, 16)
.padding(.vertical, 12)
.background(.ultraThinMaterial)
}
// MARK: - Stream Content
@ViewBuilder
private var streamContent: some View {
if let frame = streamService.currentFrame {
// Show the current frame
Image(uiImage: frame)
.resizable()
.aspectRatio(contentMode: .fit)
.background(Color.black)
} else if let error = streamService.errorMessage {
// Show error state
VStack(spacing: 16) {
Image(systemName: "exclamationmark.triangle")
.font(.system(size: 48))
.foregroundStyle(Theme.warning)
Text(error)
.font(.subheadline)
.foregroundStyle(Theme.textSecondary)
.multilineTextAlignment(.center)
.padding(.horizontal)
Button {
streamService.connect(displayId: displayId)
} label: {
Label("Retry", systemImage: "arrow.clockwise")
.font(.subheadline.weight(.medium))
.foregroundStyle(.white)
.padding(.horizontal, 20)
.padding(.vertical, 10)
.background(Theme.accent)
.clipShape(Capsule())
}
}
} else {
// Loading state
VStack(spacing: 16) {
ProgressView()
.progressViewStyle(.circular)
.tint(Theme.accent)
.scaleEffect(1.2)
Text("Connecting to desktop...")
.font(.subheadline)
.foregroundStyle(Theme.textSecondary)
}
}
}
// MARK: - Controls
private var controlsView: some View {
VStack(spacing: 16) {
// Play/Pause and reconnect buttons
HStack(spacing: 16) {
// Play/Pause
Button {
if streamService.isPaused {
streamService.resume()
} else {
streamService.pause()
}
HapticService.lightTap()
} label: {
Image(systemName: streamService.isPaused ? "play.fill" : "pause.fill")
.font(.system(size: 20))
.foregroundStyle(.white)
.frame(width: 48, height: 48)
.background(Theme.accent)
.clipShape(Circle())
}
.disabled(!streamService.isConnected)
.opacity(streamService.isConnected ? 1 : 0.5)
// Reconnect
Button {
streamService.connect(displayId: displayId)
HapticService.lightTap()
} label: {
Image(systemName: "arrow.clockwise")
.font(.system(size: 16, weight: .medium))
.foregroundStyle(Theme.textPrimary)
.frame(width: 44, height: 44)
.background(Theme.backgroundSecondary)
.clipShape(Circle())
}
}
// Quality and FPS sliders
VStack(spacing: 12) {
// FPS slider
HStack {
Text("FPS")
.font(.caption.weight(.medium))
.foregroundStyle(Theme.textMuted)
.frame(width: 50, alignment: .leading)
Slider(value: Binding(
get: { Double(streamService.fps) },
set: { streamService.setFps(Int($0)) }
), in: 1...30, step: 1)
.tint(Theme.accent)
Text("\(streamService.fps)")
.font(.caption.monospaced())
.foregroundStyle(Theme.textSecondary)
.frame(width: 30)
}
// Quality slider
HStack {
Text("Quality")
.font(.caption.weight(.medium))
.foregroundStyle(Theme.textMuted)
.frame(width: 50, alignment: .leading)
Slider(value: Binding(
get: { Double(streamService.quality) },
set: { streamService.setQuality(Int($0)) }
), in: 10...100, step: 5)
.tint(Theme.accent)
Text("\(streamService.quality)%")
.font(.caption.monospaced())
.foregroundStyle(Theme.textSecondary)
.frame(width: 40)
}
}
.padding(.horizontal, 8)
}
.padding(16)
.background(.ultraThinMaterial)
}
}
// MARK: - Preview
#Preview {
DesktopStreamView(displayId: ":99")
}

View File

@@ -279,13 +279,21 @@ impl Agent for OpenCodeAgent {
});
}
if response.info.error.is_some() {
if let Some(error) = &response.info.error {
tree.status = "failed".to_string();
if let Some(node) = tree.children.iter_mut().find(|n| n.id == "opencode") {
node.status = "failed".to_string();
}
ctx.emit_tree(tree);
return AgentResult::failure("OpenCode returned an error response", 0)
// Extract error message from the error value
let error_msg = if let Some(msg) = error.get("message").and_then(|v| v.as_str()) {
msg.to_string()
} else if let Some(s) = error.as_str() {
s.to_string()
} else {
error.to_string()
};
return AgentResult::failure(format!("OpenCode error: {}", error_msg), 0)
.with_terminal_reason(TerminalReason::LlmError);
}
@@ -355,13 +363,21 @@ impl OpenCodeAgent {
}
};
if response.info.error.is_some() {
if let Some(error) = &response.info.error {
tree.status = "failed".to_string();
if let Some(node) = tree.children.iter_mut().find(|n| n.id == "opencode") {
node.status = "failed".to_string();
}
ctx.emit_tree(tree);
return AgentResult::failure("OpenCode returned an error response", 0)
// Extract error message from the error value
let error_msg = if let Some(msg) = error.get("message").and_then(|v| v.as_str()) {
msg.to_string()
} else if let Some(s) = error.as_str() {
s.to_string()
} else {
error.to_string()
};
return AgentResult::failure(format!("OpenCode error: {}", error_msg), 0)
.with_terminal_reason(TerminalReason::LlmError);
}

View File

@@ -1716,7 +1716,8 @@ async fn control_actor_loop(
if running.is_none() {
if let Some((mid, msg, model_override)) = queue.pop_front() {
set_and_emit_status(&status, &events_tx, ControlRunState::Running, queue.len()).await;
let _ = events_tx.send(AgentEvent::UserMessage { id: mid, content: msg.clone(), mission_id: None });
let current_mid = current_mission.read().await.clone();
let _ = events_tx.send(AgentEvent::UserMessage { id: mid, content: msg.clone(), mission_id: current_mid });
let cfg = config.clone();
let agent = Arc::clone(&root_agent);
let mem = memory.clone();
@@ -1735,9 +1736,10 @@ async fn control_actor_loop(
};
let tree_ref = Arc::clone(&current_tree);
let progress_ref = Arc::clone(&progress);
running_cancel = Some(cancel.clone());
// Capture which mission this task is working on
running_mission_id = current_mission.read().await.clone();
let mission_id = current_mission.read().await.clone();
running_cancel = Some(cancel.clone());
running_mission_id = mission_id;
running = Some(tokio::spawn(async move {
let result = run_single_control_turn(
cfg,
@@ -1757,6 +1759,7 @@ async fn control_actor_loop(
Some(mission_ctrl),
tree_ref,
progress_ref,
mission_id,
)
.await;
(mid, msg, result)
@@ -2034,6 +2037,7 @@ async fn control_actor_loop(
Some(mission_ctrl),
tree_ref,
progress_ref,
Some(mission_id),
)
.await;
(mid, msg, result)
@@ -2320,11 +2324,14 @@ async fn control_actor_loop(
success: agent_result.success,
cost_cents: agent_result.cost_cents,
model: agent_result.model_used,
mission_id: None,
mission_id: completed_mission_id,
});
}
Err(e) => {
let _ = events_tx.send(AgentEvent::Error { message: format!("Control session task join failed: {}", e), mission_id: None });
let _ = events_tx.send(AgentEvent::Error {
message: format!("Control session task join failed: {}", e),
mission_id: completed_mission_id,
});
}
}
}
@@ -2332,7 +2339,8 @@ async fn control_actor_loop(
// Start next queued message, if any.
if let Some((mid, msg, model_override)) = queue.pop_front() {
set_and_emit_status(&status, &events_tx, ControlRunState::Running, queue.len()).await;
let _ = events_tx.send(AgentEvent::UserMessage { id: mid, content: msg.clone(), mission_id: None });
let current_mid = current_mission.read().await.clone();
let _ = events_tx.send(AgentEvent::UserMessage { id: mid, content: msg.clone(), mission_id: current_mid });
let cfg = config.clone();
let agent = Arc::clone(&root_agent);
let mem = memory.clone();
@@ -2353,7 +2361,8 @@ async fn control_actor_loop(
let progress_ref = Arc::clone(&progress);
running_cancel = Some(cancel.clone());
// Capture which mission this task is working on
running_mission_id = current_mission.read().await.clone();
let mission_id = current_mission.read().await.clone();
running_mission_id = mission_id;
running = Some(tokio::spawn(async move {
let result = run_single_control_turn(
cfg,
@@ -2373,6 +2382,7 @@ async fn control_actor_loop(
Some(mission_ctrl),
tree_ref,
progress_ref,
mission_id,
)
.await;
(mid, msg, result)
@@ -2452,6 +2462,7 @@ async fn run_single_control_turn(
mission_control: Option<crate::tools::mission::MissionControl>,
tree_snapshot: Arc<RwLock<Option<AgentTreeNode>>>,
progress_snapshot: Arc<RwLock<ExecutionProgress>>,
mission_id: Option<Uuid>,
) -> crate::agents::AgentResult {
// Build a task prompt that includes conversation context with size limits.
// Uses ContextBuilder with config-driven limits to prevent context overflow.
@@ -2507,6 +2518,7 @@ async fn run_single_control_turn(
ctx.resolver = Some(resolver);
ctx.tree_snapshot = Some(tree_snapshot);
ctx.progress_snapshot = Some(progress_snapshot);
ctx.mission_id = mission_id;
ctx.mcp = Some(mcp);
let result = root_agent.execute(&mut task, &ctx).await;

264
src/api/desktop_stream.rs Normal file
View File

@@ -0,0 +1,264 @@
//! WebSocket-based MJPEG streaming for virtual desktop display.
//!
//! Provides real-time streaming of the X11 virtual desktop (Xvfb)
//! to connected clients over WebSocket using MJPEG frames.
use std::sync::Arc;
use std::time::Duration;
use axum::{
extract::{
ws::{Message, WebSocket, WebSocketUpgrade},
Query, State,
},
http::{HeaderMap, StatusCode},
response::IntoResponse,
};
use futures::{SinkExt, StreamExt};
use serde::Deserialize;
use tokio::process::Command;
use tokio::sync::mpsc;
use super::auth;
use super::routes::AppState;
/// Query parameters for the desktop stream endpoint
#[derive(Debug, Deserialize)]
pub struct StreamParams {
/// Display identifier (e.g., ":99")
pub display: String,
/// Target frames per second (default: 10)
pub fps: Option<u32>,
/// JPEG quality 1-100 (default: 70)
pub quality: Option<u32>,
}
/// Extract JWT from WebSocket subprotocol header
fn extract_jwt_from_protocols(headers: &HeaderMap) -> Option<String> {
let raw = headers
.get("sec-websocket-protocol")
.and_then(|v| v.to_str().ok())?;
// Client sends: ["openagent", "jwt.<token>"]
for part in raw.split(',').map(|s| s.trim()) {
if let Some(rest) = part.strip_prefix("jwt.") {
if !rest.is_empty() {
return Some(rest.to_string());
}
}
}
None
}
/// WebSocket endpoint for streaming desktop as MJPEG
pub async fn desktop_stream_ws(
ws: WebSocketUpgrade,
State(state): State<Arc<AppState>>,
Query(params): Query<StreamParams>,
headers: HeaderMap,
) -> impl IntoResponse {
// Enforce auth in non-dev mode
if state.config.auth.auth_required(state.config.dev_mode) {
let token = match extract_jwt_from_protocols(&headers) {
Some(t) => t,
None => return (StatusCode::UNAUTHORIZED, "Missing websocket JWT").into_response(),
};
if !auth::verify_token_for_config(&token, &state.config) {
return (StatusCode::UNAUTHORIZED, "Invalid or expired token").into_response();
}
}
// Validate display format
if !params.display.starts_with(':') {
return (StatusCode::BAD_REQUEST, "Invalid display format").into_response();
}
ws.protocols(["openagent"])
.on_upgrade(move |socket| handle_desktop_stream(socket, params))
}
/// Client command for controlling the stream
#[derive(Debug, Deserialize)]
#[serde(tag = "t")]
enum ClientCommand {
/// Pause streaming
#[serde(rename = "pause")]
Pause,
/// Resume streaming
#[serde(rename = "resume")]
Resume,
/// Change FPS
#[serde(rename = "fps")]
SetFps { fps: u32 },
/// Change quality
#[serde(rename = "quality")]
SetQuality { quality: u32 },
}
/// Handle the WebSocket connection for desktop streaming
async fn handle_desktop_stream(mut socket: WebSocket, params: StreamParams) {
let x11_display = params.display;
let fps = params.fps.unwrap_or(10).clamp(1, 30);
let quality = params.quality.unwrap_or(70).clamp(10, 100);
tracing::info!(
x11_display = %x11_display,
fps = fps,
quality = quality,
"Starting desktop stream"
);
// Channel for control commands from client
let (cmd_tx, mut cmd_rx) = mpsc::unbounded_channel::<ClientCommand>();
// Split the socket
let (mut ws_sender, mut ws_receiver) = socket.split();
// Spawn task to handle incoming messages
let cmd_tx_clone = cmd_tx.clone();
let mut recv_task = tokio::spawn(async move {
while let Some(Ok(msg)) = ws_receiver.next().await {
match msg {
Message::Text(t) => {
if let Ok(cmd) = serde_json::from_str::<ClientCommand>(&t) {
let _ = cmd_tx_clone.send(cmd);
}
}
Message::Close(_) => break,
_ => {}
}
}
});
// Streaming state
let mut paused = false;
let mut current_fps = fps;
let mut current_quality = quality;
let mut frame_interval = Duration::from_millis(1000 / current_fps as u64);
// Main streaming loop
let mut stream_task = tokio::spawn(async move {
let mut frame_count: u64 = 0;
loop {
// Check for control commands (non-blocking)
while let Ok(cmd) = cmd_rx.try_recv() {
match cmd {
ClientCommand::Pause => {
paused = true;
tracing::debug!("Stream paused");
}
ClientCommand::Resume => {
paused = false;
tracing::debug!("Stream resumed");
}
ClientCommand::SetFps { fps: new_fps } => {
current_fps = new_fps.clamp(1, 30);
frame_interval = Duration::from_millis(1000 / current_fps as u64);
tracing::debug!(fps = current_fps, "FPS changed");
}
ClientCommand::SetQuality { quality: new_quality } => {
current_quality = new_quality.clamp(10, 100);
tracing::debug!(quality = current_quality, "Quality changed");
}
}
}
if paused {
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
}
// Capture frame
match capture_frame(&x11_display, current_quality).await {
Ok(jpeg_data) => {
frame_count += 1;
// Send as binary WebSocket message
if ws_sender.send(Message::Binary(jpeg_data)).await.is_err() {
tracing::debug!("Client disconnected");
break;
}
}
Err(e) => {
// Send error as text message
let err_msg = serde_json::json!({
"error": "capture_failed",
"message": e.to_string()
});
if ws_sender
.send(Message::Text(err_msg.to_string()))
.await
.is_err()
{
break;
}
// Wait a bit before retrying on error
tokio::time::sleep(Duration::from_millis(500)).await;
}
}
// Wait for next frame
tokio::time::sleep(frame_interval).await;
}
tracing::info!(frames = frame_count, "Desktop stream ended");
});
// Wait for either task to complete, then abort the other to prevent resource waste
tokio::select! {
_ = &mut recv_task => {
stream_task.abort();
}
_ = &mut stream_task => {
recv_task.abort();
}
}
}
/// Capture a single frame from the X11 display as JPEG
async fn capture_frame(display: &str, quality: u32) -> anyhow::Result<Vec<u8>> {
// Use import from ImageMagick to capture and convert directly to JPEG
// This avoids writing to disk and is more efficient
let output = Command::new("import")
.args([
"-window",
"root",
"-quality",
&quality.to_string(),
"jpeg:-", // Output JPEG to stdout
])
.env("DISPLAY", display)
.output()
.await
.map_err(|e| anyhow::anyhow!("Failed to run import: {}", e))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(anyhow::anyhow!("import failed: {}", stderr));
}
Ok(output.stdout)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_stream_params_defaults() {
let params = StreamParams {
display: ":99".to_string(),
fps: None,
quality: None,
};
assert_eq!(params.fps.unwrap_or(10), 10);
assert_eq!(params.quality.unwrap_or(70), 70);
}
#[test]
fn test_fps_clamping() {
assert_eq!(0_u32.clamp(1, 30), 1);
assert_eq!(50_u32.clamp(1, 30), 30);
assert_eq!(15_u32.clamp(1, 30), 15);
}
}

View File

@@ -18,6 +18,7 @@
mod auth;
mod console;
pub mod control;
mod desktop_stream;
mod fs;
pub mod mcp;
pub mod mission_runner;

View File

@@ -32,6 +32,7 @@ use crate::tools::ToolRegistry;
use super::auth;
use super::console;
use super::control;
use super::desktop_stream;
use super::fs;
use super::mcp as mcp_api;
use super::types::*;
@@ -103,7 +104,12 @@ pub async fn serve(config: Config) -> anyhow::Result<()> {
.route("/api/health", get(health))
.route("/api/auth/login", post(auth::login))
// WebSocket console uses subprotocol-based auth (browser can't set Authorization header)
.route("/api/console/ws", get(console::console_ws));
.route("/api/console/ws", get(console::console_ws))
// WebSocket desktop stream uses subprotocol-based auth
.route(
"/api/desktop/stream",
get(desktop_stream::desktop_stream_ws),
);
// File upload routes with increased body limit (10GB)
let upload_route = Router::new()

View File

@@ -2,7 +2,7 @@
//!
//! Open Agent uses OpenCode as its execution backend. Configuration can be set via environment variables:
//! - `OPENROUTER_API_KEY` - Optional. Only required for memory embeddings.
//! - `DEFAULT_MODEL` - Optional. The default LLM model to use. Defaults to `claude-sonnet-4-20250514`.
//! - `DEFAULT_MODEL` - Optional. The default LLM model to use. Defaults to `claude-opus-4-5-20251101`.
//! - `WORKING_DIR` - Optional. Default working directory for relative paths. Defaults to `/root` in production, current directory in dev.
//! - `HOST` - Optional. Server host. Defaults to `127.0.0.1`.
//! - `PORT` - Optional. Server port. Defaults to `3000`.
@@ -359,7 +359,7 @@ impl Config {
.unwrap_or(true);
let default_model = std::env::var("DEFAULT_MODEL")
.unwrap_or_else(|_| "claude-sonnet-4-20250514".to_string());
.unwrap_or_else(|_| "claude-opus-4-5-20251101".to_string());
// WORKING_DIR: default working directory for relative paths.
// In production (release build), default to /root. In dev, default to current directory.