feat: new mission completion, tooling & stream fix

This commit is contained in:
Thomas Marchand
2025-12-17 17:03:47 +00:00
parent e0747396d9
commit 68c43c6c16
23 changed files with 654 additions and 221 deletions

File diff suppressed because one or more lines are too long

Before

Width:  |  Height:  |  Size: 20 KiB

After

Width:  |  Height:  |  Size: 20 KiB

View File

@@ -9,6 +9,7 @@
/* Begin PBXBuildFile section */
02DB7F25245D03FF72DD8E2E /* ControlView.swift in Sources */ = {isa = PBXBuildFile; fileRef = A84519FDE8FC75084938B292 /* ControlView.swift */; };
03176DF3878C25A0B557462C /* ToolUIOptionListView.swift in Sources */ = {isa = PBXBuildFile; fileRef = A4D419C8490A0C5FC4DCDF20 /* ToolUIOptionListView.swift */; };
04A1B2C3D4E5F67890123456 /* ControlSessionManager.swift in Sources */ = {isa = PBXBuildFile; fileRef = 78901234567890ABCDEF1234 /* ControlSessionManager.swift */; };
0620B298DEF91DFCAE050DAC /* Assets.xcassets in Resources */ = {isa = PBXBuildFile; fileRef = 66A48A20D2178760301256C9 /* Assets.xcassets */; };
0B5E1A6153270BFF21A54C23 /* TerminalState.swift in Sources */ = {isa = PBXBuildFile; fileRef = 52DDF35DB8CD7D70F3CFC4A6 /* TerminalState.swift */; };
1BBE749F3758FD704D1BFA0B /* ToolUIDataTableView.swift in Sources */ = {isa = PBXBuildFile; fileRef = 45213C3E550D451EDC566CDE /* ToolUIDataTableView.swift */; };
@@ -34,6 +35,7 @@
/* Begin PBXFileReference section */
02CBD2029F8CF6751AD7C4E2 /* ToolUIView.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ToolUIView.swift; sourceTree = "<group>"; };
78901234567890ABCDEF1234 /* ControlSessionManager.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ControlSessionManager.swift; sourceTree = "<group>"; };
0AC6317C4EAD4DB9A8190209 /* TerminalView.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = TerminalView.swift; sourceTree = "<group>"; };
139C740B7D55C13F3B167EF3 /* OpenAgentDashboardApp.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = OpenAgentDashboardApp.swift; sourceTree = "<group>"; };
2B9834D4EE32058824F9DF00 /* LoadingView.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = LoadingView.swift; sourceTree = "<group>"; };
@@ -184,6 +186,7 @@
children = (
CD8D224B6758B664864F3987 /* ANSIParser.swift */,
CBC90C32FEF604E025FFBF78 /* APIService.swift */,
78901234567890ABCDEF1234 /* ControlSessionManager.swift */,
3729F39FBF53046124D05BC1 /* NavigationState.swift */,
52DDF35DB8CD7D70F3CFC4A6 /* TerminalState.swift */,
);
@@ -264,6 +267,7 @@
D64972881E36894950658708 /* APIService.swift in Sources */,
CA70EC5A864C3D007D42E781 /* ChatMessage.swift in Sources */,
AA02567226057045DDD61CB1 /* ContentView.swift in Sources */,
04A1B2C3D4E5F67890123456 /* ControlSessionManager.swift in Sources */,
02DB7F25245D03FF72DD8E2E /* ControlView.swift in Sources */,
5152C5313CD5AC01276D0AE6 /* FileEntry.swift in Sources */,
6B87076797C9DFA01E24CC76 /* FilesView.swift in Sources */,

Binary file not shown.

Before

Width:  |  Height:  |  Size: 56 KiB

After

Width:  |  Height:  |  Size: 56 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 3.5 KiB

After

Width:  |  Height:  |  Size: 3.5 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 4.6 KiB

After

Width:  |  Height:  |  Size: 4.5 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 5.0 KiB

After

Width:  |  Height:  |  Size: 5.0 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 5.4 KiB

After

Width:  |  Height:  |  Size: 5.4 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 484 B

After

Width:  |  Height:  |  Size: 482 B

Binary file not shown.

Before

Width:  |  Height:  |  Size: 765 B

After

Width:  |  Height:  |  Size: 761 B

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.1 KiB

After

Width:  |  Height:  |  Size: 1.1 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.7 KiB

After

Width:  |  Height:  |  Size: 1.6 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.7 KiB

After

Width:  |  Height:  |  Size: 1.7 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 2.2 KiB

After

Width:  |  Height:  |  Size: 2.2 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 2.3 KiB

After

Width:  |  Height:  |  Size: 2.3 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 2.5 KiB

After

Width:  |  Height:  |  Size: 2.5 KiB

View File

@@ -9,6 +9,14 @@ import SwiftUI
@main
struct OpenAgentDashboardApp: App {
init() {
// Start the control session manager early so it connects immediately
// and maintains connection across tab switches
Task { @MainActor in
ControlSessionManager.shared.start()
}
}
var body: some Scene {
WindowGroup {
ContentView()

View File

@@ -0,0 +1,386 @@
//
// ControlSessionManager.swift
// OpenAgentDashboard
//
// Manages the SSE stream connection for the control session.
// Handles auto-reconnection, state recovery, and graceful error handling.
//
import Foundation
import Observation
@MainActor
@Observable
final class ControlSessionManager {
static let shared = ControlSessionManager()
// MARK: - State
var messages: [ChatMessage] = []
var runState: ControlRunState = .idle
var queueLength: Int = 0
var currentMission: Mission?
var isLoading: Bool = false
// MARK: - Private
private var streamTask: Task<Void, Never>?
private var reconnectTask: Task<Void, Never>?
private var isConnected = false
private var reconnectAttempts = 0
private let maxReconnectDelay: TimeInterval = 30
private let api = APIService.shared
private init() {}
// MARK: - Public API
/// Start the streaming connection. Call once on app launch.
func start() {
guard streamTask == nil else { return }
connect()
}
/// Stop the streaming connection.
func stop() {
streamTask?.cancel()
streamTask = nil
reconnectTask?.cancel()
reconnectTask = nil
isConnected = false
}
/// Load a specific mission by ID.
func loadMission(id: String) async {
isLoading = true
defer { isLoading = false }
do {
let missions = try await api.listMissions()
if let mission = missions.first(where: { $0.id == id }) {
switchToMission(mission)
HapticService.success()
}
} catch {
print("Failed to load mission \(id): \(error)")
}
}
/// Load the current mission from the server.
func loadCurrentMission() async {
isLoading = true
defer { isLoading = false }
do {
if let mission = try await api.getCurrentMission() {
switchToMission(mission)
}
} catch {
print("Failed to load current mission: \(error)")
}
}
/// Create a new mission.
func createNewMission() async {
do {
let mission = try await api.createMission()
currentMission = mission
messages = []
HapticService.success()
} catch {
print("Failed to create mission: \(error)")
HapticService.error()
}
}
/// Set mission status.
func setMissionStatus(_ status: MissionStatus) async {
guard let mission = currentMission else { return }
do {
try await api.setMissionStatus(id: mission.id, status: status)
currentMission?.status = status
HapticService.success()
} catch {
print("Failed to set status: \(error)")
HapticService.error()
}
}
/// Send a message to the agent.
func sendMessage(content: String) async {
let trimmed = content.trimmingCharacters(in: .whitespacesAndNewlines)
guard !trimmed.isEmpty else { return }
HapticService.lightTap()
do {
let _ = try await api.sendMessage(content: trimmed)
} catch {
print("Failed to send message: \(error)")
HapticService.error()
}
}
/// Cancel the current run.
func cancelRun() async {
do {
try await api.cancelControl()
HapticService.success()
} catch {
print("Failed to cancel: \(error)")
HapticService.error()
}
}
// MARK: - Private Helpers
private func switchToMission(_ mission: Mission) {
currentMission = mission
messages = mission.history.enumerated().map { index, entry in
ChatMessage(
id: "\(mission.id)-\(index)",
type: entry.isUser ? .user : .assistant(success: true, costCents: 0, model: nil),
content: entry.content
)
}
}
private func connect() {
streamTask = Task { [weak self] in
guard let self = self else { return }
guard let url = URL(string: "\(api.baseURL)/api/control/stream") else {
scheduleReconnect()
return
}
var request = URLRequest(url: url)
request.setValue("text/event-stream", forHTTPHeaderField: "Accept")
if api.isAuthenticated, let token = UserDefaults.standard.string(forKey: "jwt_token") {
request.setValue("Bearer \(token)", forHTTPHeaderField: "Authorization")
}
do {
let (stream, response) = try await URLSession.shared.bytes(for: request)
// Check for HTTP errors
if let httpResponse = response as? HTTPURLResponse {
guard (200...299).contains(httpResponse.statusCode) else {
scheduleReconnect()
return
}
}
// Successfully connected
isConnected = true
reconnectAttempts = 0
// Sync mission state on reconnect
await syncMissionState()
var buffer = ""
for try await byte in stream {
guard !Task.isCancelled else { break }
if let char = String(bytes: [byte], encoding: .utf8) {
buffer.append(char)
// Look for double newline (end of SSE event)
while let range = buffer.range(of: "\n\n") {
let eventString = String(buffer[..<range.lowerBound])
buffer = String(buffer[range.upperBound...])
parseAndHandleEvent(eventString)
}
}
}
// Stream ended normally or was cancelled
isConnected = false
if !Task.isCancelled {
scheduleReconnect()
}
} catch {
isConnected = false
if !Task.isCancelled {
// Don't show transient stream errors to users
print("Stream error: \(error.localizedDescription)")
scheduleReconnect()
}
}
}
}
private func scheduleReconnect() {
guard !Task.isCancelled else { return }
reconnectTask?.cancel()
reconnectTask = Task { [weak self] in
guard let self = self else { return }
// Exponential backoff: 1s, 2s, 4s, 8s, ... up to maxReconnectDelay
let delay = min(pow(2.0, Double(reconnectAttempts)), maxReconnectDelay)
reconnectAttempts += 1
try? await Task.sleep(nanoseconds: UInt64(delay * 1_000_000_000))
guard !Task.isCancelled else { return }
streamTask = nil
connect()
}
}
/// Sync mission state after reconnecting to recover any missed messages.
private func syncMissionState() async {
guard let mission = currentMission else {
// No mission loaded, try to load current
await loadCurrentMission()
return
}
// Refresh the current mission to get any messages we missed
do {
if let refreshed = try await api.getCurrentMission(), refreshed.id == mission.id {
// Only update messages if the server has more than we do locally
// This preserves any streaming messages we're currently receiving
let serverMessageCount = refreshed.history.count
let localPersistentCount = messages.filter { !$0.isThinking }.count
if serverMessageCount > localPersistentCount {
switchToMission(refreshed)
}
}
} catch {
print("Failed to sync mission state: \(error)")
}
}
private func parseAndHandleEvent(_ eventString: String) {
var eventType = "message"
var dataLines: [String] = []
for line in eventString.split(separator: "\n", omittingEmptySubsequences: false) {
let lineStr = String(line)
if lineStr.hasPrefix("event:") {
eventType = String(lineStr.dropFirst(6)).trimmingCharacters(in: .whitespaces)
} else if lineStr.hasPrefix("data:") {
dataLines.append(String(lineStr.dropFirst(5)).trimmingCharacters(in: .whitespaces))
}
// Ignore SSE comments (lines starting with :)
}
let dataString = dataLines.joined()
guard !dataString.isEmpty else { return }
do {
guard let data = dataString.data(using: .utf8),
let json = try JSONSerialization.jsonObject(with: data) as? [String: Any] else {
return
}
handleEvent(type: eventType, data: json)
} catch {
// Silently ignore parse errors - could be partial data or keepalive
}
}
private func handleEvent(type: String, data: [String: Any]) {
switch type {
case "status":
if let state = data["state"] as? String {
runState = ControlRunState(rawValue: state) ?? .idle
}
if let queue = data["queue_len"] as? Int {
queueLength = queue
}
case "user_message":
if let content = data["content"] as? String,
let id = data["id"] as? String {
// Avoid duplicates (might already have this from mission history)
if !messages.contains(where: { $0.id == id }) {
let message = ChatMessage(id: id, type: .user, content: content)
messages.append(message)
}
}
case "assistant_message":
if let content = data["content"] as? String,
let id = data["id"] as? String {
let success = data["success"] as? Bool ?? true
let costCents = data["cost_cents"] as? Int ?? 0
let model = data["model"] as? String
// Remove any incomplete thinking messages
messages.removeAll { $0.isThinking && !$0.thinkingDone }
// Avoid duplicates
if !messages.contains(where: { $0.id == id }) {
let message = ChatMessage(
id: id,
type: .assistant(success: success, costCents: costCents, model: model),
content: content
)
messages.append(message)
}
}
case "thinking":
if let content = data["content"] as? String {
let done = data["done"] as? Bool ?? false
// Find existing thinking message or create new
if let index = messages.lastIndex(where: { $0.isThinking && !$0.thinkingDone }) {
messages[index].content += "\n\n---\n\n" + content
if done {
messages[index] = ChatMessage(
id: messages[index].id,
type: .thinking(done: true, startTime: Date()),
content: messages[index].content
)
}
} else if !done {
let message = ChatMessage(
id: "thinking-\(Date().timeIntervalSince1970)",
type: .thinking(done: false, startTime: Date()),
content: content
)
messages.append(message)
}
}
case "error":
// Only show actual agent errors, not stream connection errors
if let errorMessage = data["message"] as? String,
!errorMessage.contains("Stream connection") {
let message = ChatMessage(
id: "error-\(Date().timeIntervalSince1970)",
type: .error,
content: errorMessage
)
messages.append(message)
}
case "tool_call":
if let toolCallId = data["tool_call_id"] as? String,
let name = data["name"] as? String,
let args = data["args"] as? [String: Any] {
// Parse UI tool calls
if let toolUI = ToolUIContent.parse(name: name, args: args) {
let message = ChatMessage(
id: toolCallId,
type: .toolUI(name: name),
content: "",
toolUI: toolUI
)
messages.append(message)
}
}
default:
break
}
}
}

View File

@@ -8,22 +8,23 @@
import SwiftUI
struct ControlView: View {
@State private var messages: [ChatMessage] = []
@State private var inputText = ""
@State private var runState: ControlRunState = .idle
@State private var queueLength = 0
@State private var currentMission: Mission?
@State private var isLoading = true
@State private var streamTask: Task<Void, Never>?
@State private var showMissionMenu = false
@State private var shouldScrollToBottom = false
@State private var lastMessageCount = 0
@FocusState private var isInputFocused: Bool
private let api = APIService.shared
private let session = ControlSessionManager.shared
private let nav = NavigationState.shared
private let bottomAnchorId = "bottom-anchor"
// Convenience accessors for session state
private var messages: [ChatMessage] { session.messages }
private var runState: ControlRunState { session.runState }
private var queueLength: Int { session.queueLength }
private var currentMission: Mission? { session.currentMission }
private var isLoading: Bool { session.isLoading }
var body: some View {
ZStack {
// Background with subtle accent glow
@@ -67,7 +68,7 @@ struct ControlView: View {
ToolbarItem(placement: .topBarTrailing) {
Menu {
Button {
Task { await createNewMission() }
Task { await session.createNewMission() }
} label: {
Label("New Mission", systemImage: "plus")
}
@@ -76,20 +77,20 @@ struct ControlView: View {
Divider()
Button {
Task { await setMissionStatus(.completed) }
Task { await session.setMissionStatus(.completed) }
} label: {
Label("Mark Complete", systemImage: "checkmark.circle")
}
Button(role: .destructive) {
Task { await setMissionStatus(.failed) }
Task { await session.setMissionStatus(.failed) }
} label: {
Label("Mark Failed", systemImage: "xmark.circle")
}
if mission.status != .active {
Button {
Task { await setMissionStatus(.active) }
Task { await session.setMissionStatus(.active) }
} label: {
Label("Reactivate", systemImage: "arrow.clockwise")
}
@@ -102,25 +103,31 @@ struct ControlView: View {
}
}
.task {
// Start the session manager (idempotent)
session.start()
// Check if we're being opened with a specific mission from History
if let pendingId = nav.consumePendingMission() {
await loadMission(id: pendingId)
} else {
await loadCurrentMission()
await session.loadMission(id: pendingId)
} else if session.currentMission == nil {
await session.loadCurrentMission()
}
startStreaming()
}
.onChange(of: nav.pendingMissionId) { _, newId in
// Handle navigation from History while Control is already visible
if let missionId = newId {
nav.pendingMissionId = nil
Task {
await loadMission(id: missionId)
await session.loadMission(id: missionId)
}
}
}
.onDisappear {
streamTask?.cancel()
.onChange(of: messages.count) { oldCount, newCount in
// Trigger scroll when messages are added
if newCount > lastMessageCount {
shouldScrollToBottom = true
lastMessageCount = newCount
}
}
}
@@ -183,9 +190,6 @@ struct ControlView: View {
// Dismiss keyboard when tapping on messages area
isInputFocused = false
}
.onChange(of: messages.count) { _, _ in
scrollToBottom(proxy: proxy)
}
.onChange(of: shouldScrollToBottom) { _, shouldScroll in
if shouldScroll {
scrollToBottom(proxy: proxy)
@@ -299,7 +303,7 @@ struct ControlView: View {
// Send/Stop button
Button {
if runState != .idle {
Task { await cancelRun() }
Task { await session.cancelRun() }
} else {
sendMessage()
}
@@ -325,205 +329,14 @@ struct ControlView: View {
// MARK: - Actions
private func loadCurrentMission() async {
isLoading = true
defer { isLoading = false }
do {
if let mission = try await api.getCurrentMission() {
currentMission = mission
messages = mission.history.enumerated().map { index, entry in
ChatMessage(
id: "\(mission.id)-\(index)",
type: entry.isUser ? .user : .assistant(success: true, costCents: 0, model: nil),
content: entry.content
)
}
// Scroll to bottom after loading
DispatchQueue.main.asyncAfter(deadline: .now() + 0.1) {
shouldScrollToBottom = true
}
}
} catch {
print("Failed to load mission: \(error)")
}
}
private func loadMission(id: String) async {
isLoading = true
defer { isLoading = false }
do {
let missions = try await api.listMissions()
if let mission = missions.first(where: { $0.id == id }) {
currentMission = mission
messages = mission.history.enumerated().map { index, entry in
ChatMessage(
id: "\(mission.id)-\(index)",
type: entry.isUser ? .user : .assistant(success: true, costCents: 0, model: nil),
content: entry.content
)
}
HapticService.success()
// Scroll to bottom after loading
DispatchQueue.main.asyncAfter(deadline: .now() + 0.1) {
shouldScrollToBottom = true
}
}
} catch {
print("Failed to load mission: \(error)")
}
}
private func createNewMission() async {
do {
let mission = try await api.createMission()
currentMission = mission
messages = []
HapticService.success()
} catch {
print("Failed to create mission: \(error)")
HapticService.error()
}
}
private func setMissionStatus(_ status: MissionStatus) async {
guard let mission = currentMission else { return }
do {
try await api.setMissionStatus(id: mission.id, status: status)
currentMission?.status = status
HapticService.success()
} catch {
print("Failed to set status: \(error)")
HapticService.error()
}
}
private func sendMessage() {
let content = inputText.trimmingCharacters(in: .whitespacesAndNewlines)
guard !content.isEmpty else { return }
inputText = ""
HapticService.lightTap()
Task {
do {
let _ = try await api.sendMessage(content: content)
} catch {
print("Failed to send message: \(error)")
HapticService.error()
}
}
}
private func cancelRun() async {
do {
try await api.cancelControl()
HapticService.success()
} catch {
print("Failed to cancel: \(error)")
HapticService.error()
}
}
private func startStreaming() {
streamTask = api.streamControl { eventType, data in
Task { @MainActor in
handleStreamEvent(type: eventType, data: data)
}
}
}
private func handleStreamEvent(type: String, data: [String: Any]) {
switch type {
case "status":
if let state = data["state"] as? String {
runState = ControlRunState(rawValue: state) ?? .idle
}
if let queue = data["queue_len"] as? Int {
queueLength = queue
}
case "user_message":
if let content = data["content"] as? String,
let id = data["id"] as? String {
let message = ChatMessage(id: id, type: .user, content: content)
messages.append(message)
}
case "assistant_message":
if let content = data["content"] as? String,
let id = data["id"] as? String {
let success = data["success"] as? Bool ?? true
let costCents = data["cost_cents"] as? Int ?? 0
let model = data["model"] as? String
// Remove any incomplete thinking messages
messages.removeAll { $0.isThinking && !$0.thinkingDone }
let message = ChatMessage(
id: id,
type: .assistant(success: success, costCents: costCents, model: model),
content: content
)
messages.append(message)
}
case "thinking":
if let content = data["content"] as? String {
let done = data["done"] as? Bool ?? false
// Find existing thinking message or create new
if let index = messages.lastIndex(where: { $0.isThinking && !$0.thinkingDone }) {
messages[index].content += "\n\n---\n\n" + content
if done {
messages[index] = ChatMessage(
id: messages[index].id,
type: .thinking(done: true, startTime: Date()),
content: messages[index].content
)
}
} else if !done {
let message = ChatMessage(
id: "thinking-\(Date().timeIntervalSince1970)",
type: .thinking(done: false, startTime: Date()),
content: content
)
messages.append(message)
}
}
case "error":
if let errorMessage = data["message"] as? String {
let message = ChatMessage(
id: "error-\(Date().timeIntervalSince1970)",
type: .error,
content: errorMessage
)
messages.append(message)
}
case "tool_call":
if let toolCallId = data["tool_call_id"] as? String,
let name = data["name"] as? String,
let args = data["args"] as? [String: Any] {
// Parse UI tool calls
if let toolUI = ToolUIContent.parse(name: name, args: args) {
let message = ChatMessage(
id: toolCallId,
type: .toolUI(name: name),
content: "",
toolUI: toolUI
)
messages.append(message)
}
}
default:
break
await session.sendMessage(content: content)
}
}
}

View File

@@ -30,10 +30,10 @@ function createBrainSvg(size) {
const scaleY = contentSize / 1186;
const scale = Math.min(scaleX, scaleY);
// Center the brain
// Center the brain with slight rightward shift for visual centering
const scaledWidth = 1568 * scale;
const scaledHeight = 1186 * scale;
const offsetX = (size - scaledWidth) / 2;
const offsetX = (size - scaledWidth) / 2 + (size * 0.03); // Shift 3% right for visual centering
const offsetY = (size - scaledHeight) / 2;
return `<?xml version="1.0" encoding="UTF-8"?>

View File

@@ -8,6 +8,7 @@ use crate::config::Config;
use crate::llm::LlmClient;
use crate::memory::MemorySystem;
use crate::tools::ToolRegistry;
use crate::tools::mission::MissionControl;
use tokio::sync::broadcast;
/// Shared context passed to all agents during execution.
@@ -59,6 +60,9 @@ pub struct AgentContext {
/// Benchmark registry for task-aware model selection.
pub benchmarks: Option<SharedBenchmarkRegistry>,
/// Mission control for allowing the agent to complete/fail missions.
pub mission_control: Option<MissionControl>,
}
impl AgentContext {
@@ -84,6 +88,7 @@ impl AgentContext {
control_status: None,
cancel_token: None,
benchmarks: None,
mission_control: None,
}
}
@@ -110,6 +115,7 @@ impl AgentContext {
control_status: None,
cancel_token: None,
benchmarks: None,
mission_control: None,
}
}
@@ -132,6 +138,7 @@ impl AgentContext {
control_status: self.control_status.clone(),
cancel_token: self.cancel_token.clone(),
benchmarks: self.benchmarks.clone(),
mission_control: self.mission_control.clone(),
}
}

View File

@@ -106,6 +106,12 @@ pub enum AgentEvent {
Error {
message: String,
},
/// Mission status changed (by agent or user)
MissionStatusChanged {
mission_id: Uuid,
status: MissionStatus,
summary: Option<String>,
},
}
impl AgentEvent {
@@ -118,6 +124,7 @@ impl AgentEvent {
AgentEvent::ToolCall { .. } => "tool_call",
AgentEvent::ToolResult { .. } => "tool_result",
AgentEvent::Error { .. } => "error",
AgentEvent::MissionStatusChanged { .. } => "mission_status_changed",
}
}
}
@@ -544,6 +551,9 @@ pub fn spawn_control_session(
queue_len: 0,
}));
let current_mission = Arc::new(RwLock::new(None));
// Channel for agent-initiated mission control commands
let (mission_cmd_tx, mission_cmd_rx) = mpsc::channel::<crate::tools::mission::MissionControlCommand>(64);
let state = ControlState {
cmd_tx,
@@ -559,6 +569,8 @@ pub fn spawn_control_session(
memory,
benchmarks,
cmd_rx,
mission_cmd_rx,
mission_cmd_tx,
events_tx,
tool_hub,
status,
@@ -574,6 +586,8 @@ async fn control_actor_loop(
memory: Option<MemorySystem>,
benchmarks: crate::budget::SharedBenchmarkRegistry,
mut cmd_rx: mpsc::Receiver<ControlCommand>,
mut mission_cmd_rx: mpsc::Receiver<crate::tools::mission::MissionControlCommand>,
mission_cmd_tx: mpsc::Sender<crate::tools::mission::MissionControlCommand>,
events_tx: broadcast::Sender<AgentEvent>,
tool_hub: Arc<FrontendToolHub>,
status: Arc<RwLock<ControlStatus>>,
@@ -706,6 +720,10 @@ async fn control_actor_loop(
let cancel = CancellationToken::new();
let pricing = Arc::clone(&pricing);
let hist_snapshot = history.clone();
let mission_ctrl = crate::tools::mission::MissionControl {
current_mission_id: Arc::clone(&current_mission),
cmd_tx: mission_cmd_tx.clone(),
};
running_cancel = Some(cancel.clone());
running = Some(tokio::spawn(async move {
let result = run_single_control_turn(
@@ -720,6 +738,7 @@ async fn control_actor_loop(
cancel,
hist_snapshot,
msg.clone(),
Some(mission_ctrl),
)
.await;
(mid, msg, result)
@@ -782,6 +801,13 @@ async fn control_actor_loop(
if let Some(mem) = &memory {
let result = mem.supabase.update_mission_status(id, &new_status.to_string()).await
.map_err(|e| e.to_string());
if result.is_ok() {
let _ = events_tx.send(AgentEvent::MissionStatusChanged {
mission_id: id,
status: new_status,
summary: None,
});
}
let _ = respond.send(result);
} else {
let _ = respond.send(Err("Memory not configured".to_string()));
@@ -789,6 +815,32 @@ async fn control_actor_loop(
}
}
}
// Handle agent-initiated mission status changes (from complete_mission tool)
mission_cmd = mission_cmd_rx.recv() => {
if let Some(cmd) = mission_cmd {
match cmd {
crate::tools::mission::MissionControlCommand::SetStatus { status, summary } => {
let mission_id = current_mission.read().await.clone();
if let Some(id) = mission_id {
let new_status = match status {
crate::tools::mission::MissionStatusValue::Completed => MissionStatus::Completed,
crate::tools::mission::MissionStatusValue::Failed => MissionStatus::Failed,
};
if let Some(mem) = &memory {
if let Ok(()) = mem.supabase.update_mission_status(id, &new_status.to_string()).await {
let _ = events_tx.send(AgentEvent::MissionStatusChanged {
mission_id: id,
status: new_status,
summary,
});
tracing::info!("Mission {} marked as {} by agent", id, new_status);
}
}
}
}
}
}
}
finished = async {
match &mut running {
Some(handle) => Some(handle.await),
@@ -835,6 +887,10 @@ async fn control_actor_loop(
let cancel = CancellationToken::new();
let pricing = Arc::clone(&pricing);
let hist_snapshot = history.clone();
let mission_ctrl = crate::tools::mission::MissionControl {
current_mission_id: Arc::clone(&current_mission),
cmd_tx: mission_cmd_tx.clone(),
};
running_cancel = Some(cancel.clone());
running = Some(tokio::spawn(async move {
let result = run_single_control_turn(
@@ -849,6 +905,7 @@ async fn control_actor_loop(
cancel,
hist_snapshot,
msg.clone(),
Some(mission_ctrl),
)
.await;
(mid, msg, result)
@@ -919,6 +976,7 @@ async fn run_single_control_turn(
cancel: CancellationToken,
history: Vec<(String, String)>,
user_message: String,
mission_control: Option<crate::tools::mission::MissionControl>,
) -> crate::agents::AgentResult {
// Build a task prompt that includes conversation context with size limits.
// This prevents context overflow when history gets large.
@@ -937,7 +995,7 @@ async fn run_single_control_turn(
convo.push_str(&history_context);
convo.push_str("User:\n");
convo.push_str(&user_message);
convo.push_str("\n\nInstructions:\n- Continue the conversation helpfully.\n- You may use tools to gather information or make changes.\n- When appropriate, use Tool UI tools (ui_*) for structured output or to ask for user selections.\n- For large data processing tasks (>10KB), use run_command to execute Python scripts rather than processing inline.\n");
convo.push_str("\n\nInstructions:\n- Continue the conversation helpfully.\n- You may use tools to gather information or make changes.\n- When appropriate, use Tool UI tools (ui_*) for structured output or to ask for user selections.\n- For large data processing tasks (>10KB), use run_command to execute Python scripts rather than processing inline.\n- When you have fully completed the user's goal or determined it cannot be completed, use the complete_mission tool to mark the mission status.\n");
let budget = Budget::new(1000);
let verification = VerificationCriteria::None;
@@ -951,7 +1009,7 @@ async fn run_single_control_turn(
// Context for agent execution.
let llm = Arc::new(OpenRouterClient::new(config.api_key.clone()));
let tools = ToolRegistry::new();
let tools = ToolRegistry::with_mission_control(mission_control.clone());
let mut ctx = AgentContext::with_memory(
config.clone(),
llm,
@@ -960,6 +1018,7 @@ async fn run_single_control_turn(
config.working_dir.clone(),
memory,
);
ctx.mission_control = mission_control;
ctx.control_events = Some(events_tx);
ctx.frontend_tool_hub = Some(tool_hub);
ctx.control_status = Some(status);

143
src/tools/mission.rs Normal file
View File

@@ -0,0 +1,143 @@
//! Mission control tool - allows the agent to complete or fail the current mission.
use async_trait::async_trait;
use serde::Deserialize;
use serde_json::{json, Value};
use std::path::Path;
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
use uuid::Uuid;
use super::Tool;
/// Command sent by the mission tool to the control session.
#[derive(Debug, Clone)]
pub enum MissionControlCommand {
SetStatus {
status: MissionStatusValue,
summary: Option<String>,
},
}
/// Mission status values (mirrors api::control::MissionStatus but simplified for tool use).
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MissionStatusValue {
Completed,
Failed,
}
impl std::fmt::Display for MissionStatusValue {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Completed => write!(f, "completed"),
Self::Failed => write!(f, "failed"),
}
}
}
/// Shared state for mission control, passed to the tool.
#[derive(Clone)]
pub struct MissionControl {
pub current_mission_id: Arc<RwLock<Option<Uuid>>>,
pub cmd_tx: mpsc::Sender<MissionControlCommand>,
}
/// Tool that allows the agent to mark the current mission as completed or failed.
pub struct CompleteMission {
pub control: Option<MissionControl>,
}
impl CompleteMission {
pub fn new() -> Self {
Self { control: None }
}
pub fn with_control(control: MissionControl) -> Self {
Self {
control: Some(control),
}
}
}
#[derive(Debug, Deserialize)]
struct CompleteMissionArgs {
/// Status: "completed" or "failed"
status: String,
/// Optional summary explaining the outcome
summary: Option<String>,
}
#[async_trait]
impl Tool for CompleteMission {
fn name(&self) -> &str {
"complete_mission"
}
fn description(&self) -> &str {
"Mark the current mission as completed or failed. Use this when you have finished the user's goal or when you cannot complete it. The user can still reopen or change the mission status later."
}
fn parameters_schema(&self) -> Value {
json!({
"type": "object",
"properties": {
"status": {
"type": "string",
"enum": ["completed", "failed"],
"description": "The final status of the mission. Use 'completed' when the goal has been achieved, 'failed' when it cannot be completed."
},
"summary": {
"type": "string",
"description": "Optional summary explaining the outcome (e.g., what was accomplished or why it failed)."
}
},
"required": ["status"]
})
}
async fn execute(&self, args: Value, _working_dir: &Path) -> anyhow::Result<String> {
let args: CompleteMissionArgs = serde_json::from_value(args)
.map_err(|e| anyhow::anyhow!("Invalid arguments: {}", e))?;
let status = match args.status.to_lowercase().as_str() {
"completed" => MissionStatusValue::Completed,
"failed" => MissionStatusValue::Failed,
other => {
return Err(anyhow::anyhow!(
"Invalid status '{}'. Must be 'completed' or 'failed'.",
other
))
}
};
let Some(control) = &self.control else {
return Ok("Mission control not available in this context. The mission status was not changed.".to_string());
};
// Check if there's a current mission
let mission_id = control.current_mission_id.read().await.clone();
if mission_id.is_none() {
return Ok("No active mission to complete. Start a mission first.".to_string());
}
// Send the command
control
.cmd_tx
.send(MissionControlCommand::SetStatus {
status,
summary: args.summary.clone(),
})
.await
.map_err(|_| anyhow::anyhow!("Failed to send mission control command"))?;
let summary_msg = args
.summary
.map(|s| format!(" Summary: {}", s))
.unwrap_or_default();
Ok(format!(
"Mission marked as {}.{}",
status, summary_msg
))
}
}

View File

@@ -12,6 +12,7 @@ mod directory;
mod file_ops;
mod git;
mod index;
pub mod mission;
mod search;
mod storage;
mod terminal;
@@ -61,6 +62,11 @@ pub struct ToolRegistry {
impl ToolRegistry {
/// Create a new registry with all default tools.
pub fn new() -> Self {
Self::with_mission_control(None)
}
/// Create a new registry with all default tools and optional mission control.
pub fn with_mission_control(mission_control: Option<mission::MissionControl>) -> Self {
let mut tools: HashMap<String, Arc<dyn Tool>> = HashMap::new();
// File operations
@@ -129,6 +135,13 @@ impl ToolRegistry {
tools.insert("desktop_scroll".to_string(), Arc::new(desktop::Scroll));
}
// Mission control (allows agent to complete/fail missions)
let mission_tool: Arc<dyn Tool> = match mission_control {
Some(ctrl) => Arc::new(mission::CompleteMission::with_control(ctrl)),
None => Arc::new(mission::CompleteMission::new()),
};
tools.insert("complete_mission".to_string(), mission_tool);
Self { tools }
}