Remove SSE inactivity timeout - let OpenCode handle all timeouts
Open Agent now acts as a pure pass-through frontend to OpenCode. We no longer impose any timeouts on the SSE event stream. Changes: - Remove SSE_INACTIVITY_TIMEOUT (180s) and TOOL_STATUS_CHECK_INTERVAL (30s) - Remove tool tracking for timeout extension - Simplify SSE loop to blocking read without timeout - Document timeout philosophy in module docs and CLAUDE.md This ensures long-running tools complete naturally and avoids timeout mismatches between Open Agent and OpenCode. Users can still abort missions manually via the dashboard.
This commit is contained in:
@@ -28,6 +28,21 @@ MCPs can be global because and run as child processes on the host or workspace (
|
||||
- Keep the backend a thin orchestrator: **Start Mission → Stream Events → Store Logs**.
|
||||
- Avoid embedding provider-specific logic in the backend. Provider auth is managed via OpenCode config + dashboard flows.
|
||||
|
||||
## Timeout Philosophy
|
||||
|
||||
Open Agent is a **pure pass-through frontend** to OpenCode. We intentionally do NOT impose any timeouts on the SSE event stream from OpenCode. All timeout handling is delegated to OpenCode, which manages tool execution timeouts internally.
|
||||
|
||||
**Why no timeouts in Open Agent?**
|
||||
- Long-running tools (vision analysis, large file operations, web scraping) should complete naturally
|
||||
- Users can abort missions manually via the dashboard if needed
|
||||
- Avoids artificial timeout mismatches between Open Agent and OpenCode
|
||||
- OpenCode remains the single source of truth for execution limits
|
||||
|
||||
**What this means:**
|
||||
- The SSE stream in `src/opencode/mod.rs` runs indefinitely until OpenCode sends a `MessageComplete` event or closes the connection
|
||||
- The only timeout applied is for initial HTTP connection establishment (`DEFAULT_REQUEST_TIMEOUT`)
|
||||
- If a mission appears stuck, check OpenCode logs first—any timeout errors originate from OpenCode or downstream clients (like Conductor), not Open Agent
|
||||
|
||||
## Common Entry Points
|
||||
|
||||
- `src/api/routes.rs` – API routing and server startup.
|
||||
|
||||
@@ -2,6 +2,21 @@
|
||||
//!
|
||||
//! Provides the OpenCode HTTP API client needed to run tasks via an external
|
||||
//! OpenCode server, with real-time event streaming.
|
||||
//!
|
||||
//! ## Timeout Philosophy
|
||||
//!
|
||||
//! Open Agent acts as a **pure pass-through frontend** to OpenCode. We intentionally
|
||||
//! do NOT impose any timeouts on the SSE event stream. All timeout handling is
|
||||
//! delegated to OpenCode, which manages tool execution timeouts internally.
|
||||
//!
|
||||
//! This design ensures:
|
||||
//! - Long-running tools (vision analysis, large file operations) complete naturally
|
||||
//! - Users can abort missions manually via the dashboard if needed
|
||||
//! - No artificial timeout mismatches between Open Agent and OpenCode
|
||||
//! - OpenCode remains the single source of truth for execution limits
|
||||
//!
|
||||
//! The only timeout we apply is `DEFAULT_REQUEST_TIMEOUT` for initial HTTP connections,
|
||||
//! not for ongoing SSE streaming.
|
||||
|
||||
use anyhow::Context;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -10,16 +25,9 @@ use std::collections::HashMap;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
/// Default timeout for OpenCode HTTP requests (5 minutes).
|
||||
/// Reduced from 10 minutes since we now have SSE inactivity detection.
|
||||
const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(300);
|
||||
|
||||
/// Maximum time to wait for SSE activity before checking tool status.
|
||||
/// If tools are still running, we extend the timeout; otherwise we disconnect.
|
||||
const SSE_INACTIVITY_TIMEOUT: Duration = Duration::from_secs(180); // 3 minutes
|
||||
|
||||
/// How often to check if tools are still running when SSE is inactive.
|
||||
const TOOL_STATUS_CHECK_INTERVAL: Duration = Duration::from_secs(30);
|
||||
/// Default timeout for OpenCode HTTP requests (10 minutes).
|
||||
/// This is just for the initial HTTP connection - SSE streaming has no timeout.
|
||||
const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(600);
|
||||
|
||||
/// Number of retries for transient network failures.
|
||||
const NETWORK_RETRY_COUNT: u32 = 3;
|
||||
@@ -223,109 +231,17 @@ impl OpenCodeClient {
|
||||
|
||||
tracing::warn!(session_id = %session_id_clone, "SSE curl process started, reading lines");
|
||||
|
||||
let mut last_activity = std::time::Instant::now();
|
||||
// Track running tool call IDs to extend timeout while tools execute
|
||||
let mut running_tools: std::collections::HashSet<String> = std::collections::HashSet::new();
|
||||
// Track last time we logged about running tools to avoid spam
|
||||
let mut last_tool_log: Option<std::time::Instant> = None;
|
||||
|
||||
// No timeout - we let OpenCode handle all timeouts internally.
|
||||
// The SSE stream will run until OpenCode sends MessageComplete or closes the connection.
|
||||
loop {
|
||||
line.clear();
|
||||
|
||||
// Apply inactivity timeout based on meaningful SSE activity (not just bytes).
|
||||
// If tools are currently running, we use a longer effective timeout.
|
||||
let idle = last_activity.elapsed();
|
||||
let effective_timeout = if running_tools.is_empty() {
|
||||
SSE_INACTIVITY_TIMEOUT
|
||||
} else {
|
||||
// When tools are running, extend timeout significantly (10 minutes)
|
||||
// Long-running tools like vision analysis can take several minutes
|
||||
Duration::from_secs(600)
|
||||
};
|
||||
|
||||
if idle >= effective_timeout {
|
||||
let idle_secs = idle.as_secs();
|
||||
tracing::error!(
|
||||
session_id = %session_id_clone,
|
||||
idle_secs = idle_secs,
|
||||
event_count = event_count,
|
||||
running_tools = running_tools.len(),
|
||||
"SSE inactivity timeout - OpenCode stopped sending meaningful events"
|
||||
);
|
||||
let _ = event_tx
|
||||
.send(OpenCodeEvent::Error {
|
||||
message: format!(
|
||||
"OpenCode SSE stream inactive for {} seconds - possible internal timeout or crash",
|
||||
idle_secs
|
||||
),
|
||||
})
|
||||
.await;
|
||||
let _ = child.kill().await;
|
||||
return;
|
||||
}
|
||||
|
||||
// Log every 30 seconds when tools are running but SSE is quiet
|
||||
if !running_tools.is_empty() && idle >= TOOL_STATUS_CHECK_INTERVAL {
|
||||
let should_log = match last_tool_log {
|
||||
None => true,
|
||||
Some(last) => last.elapsed() >= Duration::from_secs(30),
|
||||
};
|
||||
if should_log {
|
||||
tracing::info!(
|
||||
session_id = %session_id_clone,
|
||||
idle_secs = idle.as_secs(),
|
||||
running_tools = ?running_tools,
|
||||
"SSE quiet but tools still running - extending timeout"
|
||||
);
|
||||
last_tool_log = Some(std::time::Instant::now());
|
||||
}
|
||||
}
|
||||
|
||||
let remaining = effective_timeout.saturating_sub(idle);
|
||||
let read_result =
|
||||
tokio::time::timeout(remaining, reader.read_line(&mut line)).await;
|
||||
|
||||
match read_result {
|
||||
Err(_timeout) => {
|
||||
// Double-check: if tools are running, reset activity and continue waiting
|
||||
if !running_tools.is_empty() {
|
||||
tracing::warn!(
|
||||
session_id = %session_id_clone,
|
||||
idle_secs = last_activity.elapsed().as_secs(),
|
||||
running_tools = ?running_tools,
|
||||
"Read timeout but tools still running - resetting activity timer"
|
||||
);
|
||||
// Reset activity so we don't immediately timeout on next iteration
|
||||
last_activity = std::time::Instant::now();
|
||||
continue;
|
||||
}
|
||||
let idle_secs = last_activity.elapsed().as_secs();
|
||||
tracing::error!(
|
||||
session_id = %session_id_clone,
|
||||
idle_secs = idle_secs,
|
||||
event_count = event_count,
|
||||
"SSE inactivity timeout - OpenCode stopped sending events"
|
||||
);
|
||||
// Send a timeout error event so the caller knows what happened
|
||||
let _ = event_tx
|
||||
.send(OpenCodeEvent::Error {
|
||||
message: format!(
|
||||
"OpenCode SSE stream inactive for {} seconds - possible internal timeout or crash",
|
||||
idle_secs
|
||||
),
|
||||
})
|
||||
.await;
|
||||
let _ = child.kill().await;
|
||||
return;
|
||||
}
|
||||
Ok(Ok(0)) => {
|
||||
match reader.read_line(&mut line).await {
|
||||
Ok(0) => {
|
||||
tracing::debug!(session_id = %session_id_clone, "SSE curl stdout closed");
|
||||
break;
|
||||
}
|
||||
Ok(Ok(_)) => {
|
||||
// Note: We only update last_activity when we emit a meaningful event,
|
||||
// not on every line read. This prevents heartbeat events from
|
||||
// resetting the inactivity timeout.
|
||||
Ok(_) => {
|
||||
let trimmed = line.trim_end();
|
||||
|
||||
if trimmed.is_empty() {
|
||||
@@ -346,32 +262,7 @@ impl OpenCodeClient {
|
||||
&mut sse_state,
|
||||
);
|
||||
|
||||
if parsed.activity {
|
||||
last_activity = std::time::Instant::now();
|
||||
}
|
||||
|
||||
if let Some(ref event) = parsed.event {
|
||||
// Track running tools to extend timeout while they execute
|
||||
match event {
|
||||
OpenCodeEvent::ToolCall { tool_call_id, name, .. } => {
|
||||
tracing::debug!(
|
||||
tool_call_id = %tool_call_id,
|
||||
tool_name = %name,
|
||||
"Tool started - tracking for timeout extension"
|
||||
);
|
||||
running_tools.insert(tool_call_id.clone());
|
||||
}
|
||||
OpenCodeEvent::ToolResult { tool_call_id, name, .. } => {
|
||||
tracing::debug!(
|
||||
tool_call_id = %tool_call_id,
|
||||
tool_name = %name,
|
||||
"Tool completed - removing from tracking"
|
||||
);
|
||||
running_tools.remove(tool_call_id);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
if let Some(ref event) = parsed {
|
||||
event_count += 1;
|
||||
let is_complete =
|
||||
matches!(event, OpenCodeEvent::MessageComplete { .. });
|
||||
@@ -415,7 +306,7 @@ impl OpenCodeClient {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
Err(e) => {
|
||||
tracing::warn!(session_id = %session_id_clone, error = %e, "SSE read error");
|
||||
break;
|
||||
}
|
||||
@@ -1166,18 +1057,13 @@ fn handle_tool_part_update(
|
||||
}
|
||||
}
|
||||
|
||||
struct ParsedSseEvent {
|
||||
event: Option<OpenCodeEvent>,
|
||||
activity: bool,
|
||||
}
|
||||
|
||||
/// Parse an SSE event line into an OpenCodeEvent.
|
||||
fn parse_sse_event(
|
||||
data_str: &str,
|
||||
event_name: Option<&str>,
|
||||
session_id: &str,
|
||||
state: &mut SseState,
|
||||
) -> ParsedSseEvent {
|
||||
) -> Option<OpenCodeEvent> {
|
||||
let json: serde_json::Value = match serde_json::from_str(data_str) {
|
||||
Ok(value) => value,
|
||||
Err(err) => {
|
||||
@@ -1192,10 +1078,7 @@ fn parse_sse_event(
|
||||
data_preview = %data_str.chars().take(200).collect::<String>(),
|
||||
"Failed to parse OpenCode SSE JSON payload"
|
||||
);
|
||||
return ParsedSseEvent {
|
||||
event: None,
|
||||
activity: false,
|
||||
};
|
||||
return None;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@@ -1204,10 +1087,7 @@ fn parse_sse_event(
|
||||
data_preview = %data_str.chars().take(200).collect::<String>(),
|
||||
"Failed to parse OpenCode SSE JSON payload"
|
||||
);
|
||||
return ParsedSseEvent {
|
||||
event: None,
|
||||
activity: false,
|
||||
};
|
||||
return None;
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -1215,10 +1095,7 @@ fn parse_sse_event(
|
||||
let event_type = match json.get("type").and_then(|v| v.as_str()).or(event_name) {
|
||||
Some(event_type) => event_type,
|
||||
None => {
|
||||
return ParsedSseEvent {
|
||||
event: None,
|
||||
activity: false,
|
||||
}
|
||||
return None
|
||||
}
|
||||
};
|
||||
let props = json
|
||||
@@ -1256,17 +1133,11 @@ fn parse_sse_event(
|
||||
event_type = %event_type,
|
||||
"SKIPPING event - session ID mismatch"
|
||||
);
|
||||
return ParsedSseEvent {
|
||||
event: None,
|
||||
activity: false,
|
||||
};
|
||||
return None;
|
||||
}
|
||||
}
|
||||
|
||||
// Treat most events as meaningful activity EXCEPT server heartbeats.
|
||||
let activity = event_type != "server.heartbeat";
|
||||
|
||||
let event = match event_type {
|
||||
match event_type {
|
||||
// OpenAI Responses-style streaming
|
||||
"response.output_text.delta" => {
|
||||
let delta = props
|
||||
@@ -1439,9 +1310,7 @@ fn parse_sse_event(
|
||||
);
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
ParsedSseEvent { event, activity }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
|
||||
Reference in New Issue
Block a user