diff --git a/.claude/CLAUDE.md b/.claude/CLAUDE.md index dbbc2da..1d48160 100644 --- a/.claude/CLAUDE.md +++ b/.claude/CLAUDE.md @@ -28,6 +28,21 @@ MCPs can be global because and run as child processes on the host or workspace ( - Keep the backend a thin orchestrator: **Start Mission → Stream Events → Store Logs**. - Avoid embedding provider-specific logic in the backend. Provider auth is managed via OpenCode config + dashboard flows. +## Timeout Philosophy + +Open Agent is a **pure pass-through frontend** to OpenCode. We intentionally do NOT impose any timeouts on the SSE event stream from OpenCode. All timeout handling is delegated to OpenCode, which manages tool execution timeouts internally. + +**Why no timeouts in Open Agent?** +- Long-running tools (vision analysis, large file operations, web scraping) should complete naturally +- Users can abort missions manually via the dashboard if needed +- Avoids artificial timeout mismatches between Open Agent and OpenCode +- OpenCode remains the single source of truth for execution limits + +**What this means:** +- The SSE stream in `src/opencode/mod.rs` runs indefinitely until OpenCode sends a `MessageComplete` event or closes the connection +- The only timeout applied is for initial HTTP connection establishment (`DEFAULT_REQUEST_TIMEOUT`) +- If a mission appears stuck, check OpenCode logs first—any timeout errors originate from OpenCode or downstream clients (like Conductor), not Open Agent + ## Common Entry Points - `src/api/routes.rs` – API routing and server startup. diff --git a/src/opencode/mod.rs b/src/opencode/mod.rs index 00aa8b3..11e2653 100644 --- a/src/opencode/mod.rs +++ b/src/opencode/mod.rs @@ -2,6 +2,21 @@ //! //! Provides the OpenCode HTTP API client needed to run tasks via an external //! OpenCode server, with real-time event streaming. +//! +//! ## Timeout Philosophy +//! +//! Open Agent acts as a **pure pass-through frontend** to OpenCode. We intentionally +//! do NOT impose any timeouts on the SSE event stream. All timeout handling is +//! delegated to OpenCode, which manages tool execution timeouts internally. +//! +//! This design ensures: +//! - Long-running tools (vision analysis, large file operations) complete naturally +//! - Users can abort missions manually via the dashboard if needed +//! - No artificial timeout mismatches between Open Agent and OpenCode +//! - OpenCode remains the single source of truth for execution limits +//! +//! The only timeout we apply is `DEFAULT_REQUEST_TIMEOUT` for initial HTTP connections, +//! not for ongoing SSE streaming. use anyhow::Context; use serde::{Deserialize, Serialize}; @@ -10,16 +25,9 @@ use std::collections::HashMap; use std::time::Duration; use tokio::sync::mpsc; -/// Default timeout for OpenCode HTTP requests (5 minutes). -/// Reduced from 10 minutes since we now have SSE inactivity detection. -const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(300); - -/// Maximum time to wait for SSE activity before checking tool status. -/// If tools are still running, we extend the timeout; otherwise we disconnect. -const SSE_INACTIVITY_TIMEOUT: Duration = Duration::from_secs(180); // 3 minutes - -/// How often to check if tools are still running when SSE is inactive. -const TOOL_STATUS_CHECK_INTERVAL: Duration = Duration::from_secs(30); +/// Default timeout for OpenCode HTTP requests (10 minutes). +/// This is just for the initial HTTP connection - SSE streaming has no timeout. +const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(600); /// Number of retries for transient network failures. const NETWORK_RETRY_COUNT: u32 = 3; @@ -223,109 +231,17 @@ impl OpenCodeClient { tracing::warn!(session_id = %session_id_clone, "SSE curl process started, reading lines"); - let mut last_activity = std::time::Instant::now(); - // Track running tool call IDs to extend timeout while tools execute - let mut running_tools: std::collections::HashSet = std::collections::HashSet::new(); - // Track last time we logged about running tools to avoid spam - let mut last_tool_log: Option = None; - + // No timeout - we let OpenCode handle all timeouts internally. + // The SSE stream will run until OpenCode sends MessageComplete or closes the connection. loop { line.clear(); - // Apply inactivity timeout based on meaningful SSE activity (not just bytes). - // If tools are currently running, we use a longer effective timeout. - let idle = last_activity.elapsed(); - let effective_timeout = if running_tools.is_empty() { - SSE_INACTIVITY_TIMEOUT - } else { - // When tools are running, extend timeout significantly (10 minutes) - // Long-running tools like vision analysis can take several minutes - Duration::from_secs(600) - }; - - if idle >= effective_timeout { - let idle_secs = idle.as_secs(); - tracing::error!( - session_id = %session_id_clone, - idle_secs = idle_secs, - event_count = event_count, - running_tools = running_tools.len(), - "SSE inactivity timeout - OpenCode stopped sending meaningful events" - ); - let _ = event_tx - .send(OpenCodeEvent::Error { - message: format!( - "OpenCode SSE stream inactive for {} seconds - possible internal timeout or crash", - idle_secs - ), - }) - .await; - let _ = child.kill().await; - return; - } - - // Log every 30 seconds when tools are running but SSE is quiet - if !running_tools.is_empty() && idle >= TOOL_STATUS_CHECK_INTERVAL { - let should_log = match last_tool_log { - None => true, - Some(last) => last.elapsed() >= Duration::from_secs(30), - }; - if should_log { - tracing::info!( - session_id = %session_id_clone, - idle_secs = idle.as_secs(), - running_tools = ?running_tools, - "SSE quiet but tools still running - extending timeout" - ); - last_tool_log = Some(std::time::Instant::now()); - } - } - - let remaining = effective_timeout.saturating_sub(idle); - let read_result = - tokio::time::timeout(remaining, reader.read_line(&mut line)).await; - - match read_result { - Err(_timeout) => { - // Double-check: if tools are running, reset activity and continue waiting - if !running_tools.is_empty() { - tracing::warn!( - session_id = %session_id_clone, - idle_secs = last_activity.elapsed().as_secs(), - running_tools = ?running_tools, - "Read timeout but tools still running - resetting activity timer" - ); - // Reset activity so we don't immediately timeout on next iteration - last_activity = std::time::Instant::now(); - continue; - } - let idle_secs = last_activity.elapsed().as_secs(); - tracing::error!( - session_id = %session_id_clone, - idle_secs = idle_secs, - event_count = event_count, - "SSE inactivity timeout - OpenCode stopped sending events" - ); - // Send a timeout error event so the caller knows what happened - let _ = event_tx - .send(OpenCodeEvent::Error { - message: format!( - "OpenCode SSE stream inactive for {} seconds - possible internal timeout or crash", - idle_secs - ), - }) - .await; - let _ = child.kill().await; - return; - } - Ok(Ok(0)) => { + match reader.read_line(&mut line).await { + Ok(0) => { tracing::debug!(session_id = %session_id_clone, "SSE curl stdout closed"); break; } - Ok(Ok(_)) => { - // Note: We only update last_activity when we emit a meaningful event, - // not on every line read. This prevents heartbeat events from - // resetting the inactivity timeout. + Ok(_) => { let trimmed = line.trim_end(); if trimmed.is_empty() { @@ -346,32 +262,7 @@ impl OpenCodeClient { &mut sse_state, ); - if parsed.activity { - last_activity = std::time::Instant::now(); - } - - if let Some(ref event) = parsed.event { - // Track running tools to extend timeout while they execute - match event { - OpenCodeEvent::ToolCall { tool_call_id, name, .. } => { - tracing::debug!( - tool_call_id = %tool_call_id, - tool_name = %name, - "Tool started - tracking for timeout extension" - ); - running_tools.insert(tool_call_id.clone()); - } - OpenCodeEvent::ToolResult { tool_call_id, name, .. } => { - tracing::debug!( - tool_call_id = %tool_call_id, - tool_name = %name, - "Tool completed - removing from tracking" - ); - running_tools.remove(tool_call_id); - } - _ => {} - } - + if let Some(ref event) = parsed { event_count += 1; let is_complete = matches!(event, OpenCodeEvent::MessageComplete { .. }); @@ -415,7 +306,7 @@ impl OpenCodeClient { continue; } } - Ok(Err(e)) => { + Err(e) => { tracing::warn!(session_id = %session_id_clone, error = %e, "SSE read error"); break; } @@ -1166,18 +1057,13 @@ fn handle_tool_part_update( } } -struct ParsedSseEvent { - event: Option, - activity: bool, -} - /// Parse an SSE event line into an OpenCodeEvent. fn parse_sse_event( data_str: &str, event_name: Option<&str>, session_id: &str, state: &mut SseState, -) -> ParsedSseEvent { +) -> Option { let json: serde_json::Value = match serde_json::from_str(data_str) { Ok(value) => value, Err(err) => { @@ -1192,10 +1078,7 @@ fn parse_sse_event( data_preview = %data_str.chars().take(200).collect::(), "Failed to parse OpenCode SSE JSON payload" ); - return ParsedSseEvent { - event: None, - activity: false, - }; + return None; } } } else { @@ -1204,10 +1087,7 @@ fn parse_sse_event( data_preview = %data_str.chars().take(200).collect::(), "Failed to parse OpenCode SSE JSON payload" ); - return ParsedSseEvent { - event: None, - activity: false, - }; + return None; } } }; @@ -1215,10 +1095,7 @@ fn parse_sse_event( let event_type = match json.get("type").and_then(|v| v.as_str()).or(event_name) { Some(event_type) => event_type, None => { - return ParsedSseEvent { - event: None, - activity: false, - } + return None } }; let props = json @@ -1256,17 +1133,11 @@ fn parse_sse_event( event_type = %event_type, "SKIPPING event - session ID mismatch" ); - return ParsedSseEvent { - event: None, - activity: false, - }; + return None; } } - // Treat most events as meaningful activity EXCEPT server heartbeats. - let activity = event_type != "server.heartbeat"; - - let event = match event_type { + match event_type { // OpenAI Responses-style streaming "response.output_text.delta" => { let delta = props @@ -1439,9 +1310,7 @@ fn parse_sse_event( ); None } - }; - - ParsedSseEvent { event, activity } + } } #[derive(Debug, Deserialize)]