diff --git a/.cursor/rules/project.md b/.cursor/rules/project.md index f751cc4..c872264 100644 --- a/.cursor/rules/project.md +++ b/.cursor/rules/project.md @@ -280,6 +280,32 @@ curl -sS http://127.0.0.1:3000/api/tasks \ - This is **not** multi-tenant; the JWT only proves “dashboard knows the password”. - The dashboard uses a **fetch-based SSE client** (instead of `EventSource`) so it can send auth headers. +## Dashboard Console (SSH + SFTP) + +The dashboard includes a **Console** page that can: +- open a **full-featured TTY** (colors, interactive programs) via WebSocket → PTY → `ssh` +- browse/upload/download files via **SFTP** + +### Backend endpoints +- `GET /api/console/ws` (WebSocket) +- `GET /api/fs/list?path=...` +- `POST /api/fs/upload?path=...` (multipart form-data) +- `GET /api/fs/download?path=...` +- `POST /api/fs/mkdir` +- `POST /api/fs/rm` + +### Auth nuance (WebSocket) +Browsers can't set an `Authorization` header for WebSockets, so the console uses a **WebSocket subprotocol**: +- client connects with protocols: `["openagent", "jwt."]` +- server validates the JWT from `Sec-WebSocket-Protocol` (only when `DEV_MODE=false`) + +### Required env vars +Set these on the backend: +- `CONSOLE_SSH_HOST` (e.g. `95.216.112.253`) +- `CONSOLE_SSH_PORT` (default `22`) +- `CONSOLE_SSH_USER` (default `root`) +- `CONSOLE_SSH_PRIVATE_KEY_B64` (preferred) or `CONSOLE_SSH_PRIVATE_KEY` + ## Dashboard package manager (Bun) The dashboard in `dashboard/` uses **Bun** (not npm/yarn/pnpm). diff --git a/.env.example b/.env.example index 01b8285..9150ea6 100644 --- a/.env.example +++ b/.env.example @@ -60,3 +60,18 @@ JWT_SECRET=change-me-to-a-long-random-string # JWT validity in days (default: 30) JWT_TTL_DAYS=30 +# ============================================================================= +# Dashboard Console / File Explorer (SSH) +# ============================================================================= +# These are used by the dashboard "Console" page to: +# - open an interactive root shell (WebSocket -> PTY -> ssh) +# - list/upload/download files (SFTP) +# +# Recommended: store the private key as base64 to avoid multiline env issues. +CONSOLE_SSH_HOST=95.216.112.253 +CONSOLE_SSH_PORT=22 +CONSOLE_SSH_USER=root +# base64(OpenSSH private key). Example: +# base64 -i ~/.ssh/agent.thomas.md | tr -d '\n' +CONSOLE_SSH_PRIVATE_KEY_B64= + diff --git a/Cargo.toml b/Cargo.toml index 9ddff14..0717e56 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ tokio = { version = "1", features = ["full"] } tokio-stream = "0.1" # Web framework -axum = { version = "0.7", features = ["ws"] } +axum = { version = "0.7", features = ["ws", "multipart"] } tower-http = { version = "0.5", features = ["cors", "trace"] } # Serialization @@ -44,5 +44,11 @@ chrono = { version = "0.4", features = ["serde"] } # Auth (JWT) jsonwebtoken = "9" +# Remote console / file manager +base64 = "0.22" +bytes = "1" +portable-pty = "0.9" +tokio-util = { version = "0.7", features = ["io"] } + [dev-dependencies] tokio-test = "0.4" diff --git a/dashboard/bun.lock b/dashboard/bun.lock index c9d2a16..823eaef 100644 --- a/dashboard/bun.lock +++ b/dashboard/bun.lock @@ -15,6 +15,8 @@ "react-dom": "19.2.1", "recharts": "^3.6.0", "tailwind-merge": "^3.4.0", + "xterm": "^5.3.0", + "xterm-addon-fit": "^0.8.0", }, "devDependencies": { "@tailwindcss/postcss": "^4", @@ -931,6 +933,10 @@ "word-wrap": ["word-wrap@1.2.5", "", {}, "sha512-BN22B5eaMMI9UMtjrGd5g5eCYPpCPDUy0FJXbYsaT5zYxjFOckS53SQDE3pWkVoWpHXVb3BrYcEN4Twa55B5cA=="], + "xterm": ["xterm@5.3.0", "", {}, "sha512-8QqjlekLUFTrU6x7xck1MsPzPA571K5zNqWm0M0oroYEWVOptZ0+ubQSkQ3uxIEhcIHRujJy6emDWX4A7qyFzg=="], + + "xterm-addon-fit": ["xterm-addon-fit@0.8.0", "", { "peerDependencies": { "xterm": "^5.0.0" } }, "sha512-yj3Np7XlvxxhYF/EJ7p3KHaMt6OdwQ+HDu573Vx1lRXsVxOcnVJs51RgjZOouIZOczTsskaS+CpXspK81/DLqw=="], + "yallist": ["yallist@3.1.1", "", {}, "sha512-a4UGQaWPH59mOXUYnAG2ewncQS4i4F43Tv3JoAM+s2VDAmS9NsK8GpDMLrCHPksFT7h3K6TOoUNn2pb7RoXx4g=="], "yocto-queue": ["yocto-queue@0.1.0", "", {}, "sha512-rVksvsnNCdJ/ohGc6xgPwyN8eheCxsiLM8mxuE/t/mOVqJewPuO1miLpTHQiRgTKCLexL4MeAFVagts7HmNZ2Q=="], diff --git a/dashboard/package.json b/dashboard/package.json index 60515fb..74d2b33 100644 --- a/dashboard/package.json +++ b/dashboard/package.json @@ -18,6 +18,8 @@ "next": "16.0.10", "react": "19.2.1", "react-dom": "19.2.1", + "xterm": "^5.3.0", + "xterm-addon-fit": "^0.8.0", "recharts": "^3.6.0", "tailwind-merge": "^3.4.0" }, diff --git a/dashboard/public/favicon.svg b/dashboard/public/favicon.svg index fc3bfa1..a5af603 100644 --- a/dashboard/public/favicon.svg +++ b/dashboard/public/favicon.svg @@ -23,3 +23,4 @@ + diff --git a/dashboard/src/app/console/console-client.tsx b/dashboard/src/app/console/console-client.tsx new file mode 100644 index 0000000..5481220 --- /dev/null +++ b/dashboard/src/app/console/console-client.tsx @@ -0,0 +1,463 @@ +'use client'; + +import { useEffect, useMemo, useRef, useState } from 'react'; +import { Terminal as XTerm } from 'xterm'; +import { FitAddon } from 'xterm-addon-fit'; +import 'xterm/css/xterm.css'; + +import { authHeader, getValidJwt } from '@/lib/auth'; + +const API_BASE = process.env.NEXT_PUBLIC_API_URL || 'http://127.0.0.1:3000'; + +type FsEntry = { + name: string; + path: string; + kind: 'file' | 'dir' | 'link' | 'other' | string; + size: number; + mtime: number; +}; + +function formatBytes(n: number) { + if (!Number.isFinite(n)) return '-'; + if (n < 1024) return `${n} B`; + const units = ['KB', 'MB', 'GB', 'TB'] as const; + let v = n / 1024; + let i = 0; + while (v >= 1024 && i < units.length - 1) { + v /= 1024; + i += 1; + } + return `${v.toFixed(v >= 10 ? 0 : 1)} ${units[i]}`; +} + +async function listDir(path: string): Promise { + const res = await fetch(`${API_BASE}/api/fs/list?path=${encodeURIComponent(path)}`, { + headers: { ...authHeader() }, + }); + if (!res.ok) throw new Error(await res.text()); + return res.json(); +} + +async function mkdir(path: string): Promise { + const res = await fetch(`${API_BASE}/api/fs/mkdir`, { + method: 'POST', + headers: { 'Content-Type': 'application/json', ...authHeader() }, + body: JSON.stringify({ path }), + }); + if (!res.ok) throw new Error(await res.text()); +} + +async function rm(path: string, recursive = false): Promise { + const res = await fetch(`${API_BASE}/api/fs/rm`, { + method: 'POST', + headers: { 'Content-Type': 'application/json', ...authHeader() }, + body: JSON.stringify({ path, recursive }), + }); + if (!res.ok) throw new Error(await res.text()); +} + +async function downloadFile(path: string) { + const res = await fetch(`${API_BASE}/api/fs/download?path=${encodeURIComponent(path)}`, { + headers: { ...authHeader() }, + }); + if (!res.ok) throw new Error(await res.text()); + const blob = await res.blob(); + const name = path.split('/').filter(Boolean).pop() ?? 'download'; + const url = URL.createObjectURL(blob); + const a = document.createElement('a'); + a.href = url; + a.download = name; + document.body.appendChild(a); + a.click(); + a.remove(); + URL.revokeObjectURL(url); +} + +async function uploadFiles(dir: string, files: File[], onProgress?: (done: number, total: number) => void) { + let done = 0; + for (const f of files) { + await new Promise((resolve, reject) => { + const form = new FormData(); + form.append('file', f, f.name); + const xhr = new XMLHttpRequest(); + xhr.open('POST', `${API_BASE}/api/fs/upload?path=${encodeURIComponent(dir)}`, true); + const jwt = getValidJwt()?.token; + if (jwt) xhr.setRequestHeader('Authorization', `Bearer ${jwt}`); + xhr.onload = () => { + if (xhr.status >= 200 && xhr.status < 300) resolve(); + else reject(new Error(xhr.responseText || `Upload failed (${xhr.status})`)); + }; + xhr.onerror = () => reject(new Error('Upload failed (network error)')); + xhr.send(form); + }); + done += 1; + onProgress?.(done, files.length); + } +} + +export default function ConsoleClient() { + const termElRef = useRef(null); + const termRef = useRef(null); + const fitRef = useRef(null); + const wsRef = useRef(null); + + const [wsStatus, setWsStatus] = useState<'disconnected' | 'connecting' | 'connected' | 'error'>( + 'disconnected' + ); + const [wsError, setWsError] = useState(null); + + const [cwd, setCwd] = useState('/root'); + const [entries, setEntries] = useState([]); + const [fsLoading, setFsLoading] = useState(false); + const [fsError, setFsError] = useState(null); + const [selected, setSelected] = useState(null); + const [uploading, setUploading] = useState<{ done: number; total: number } | null>(null); + + const sortedEntries = useMemo(() => { + const dirs = entries.filter((e) => e.kind === 'dir').sort((a, b) => a.name.localeCompare(b.name)); + const files = entries.filter((e) => e.kind !== 'dir').sort((a, b) => a.name.localeCompare(b.name)); + return [...dirs, ...files]; + }, [entries]); + + async function refreshDir(path: string) { + setFsLoading(true); + setFsError(null); + try { + const data = await listDir(path); + setEntries(data); + setSelected(null); + } catch (e) { + setFsError(e instanceof Error ? e.message : String(e)); + } finally { + setFsLoading(false); + } + } + + useEffect(() => { + void refreshDir(cwd); + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [cwd]); + + useEffect(() => { + if (!termElRef.current) return; + if (termRef.current) return; + + const term = new XTerm({ + fontFamily: + 'ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, "Liberation Mono", "Courier New", monospace', + fontSize: 13, + lineHeight: 1.25, + cursorBlink: true, + convertEol: true, + allowProposedApi: true, + theme: { + background: 'transparent', + }, + }); + const fit = new FitAddon(); + term.loadAddon(fit); + term.open(termElRef.current); + fit.fit(); + + termRef.current = term; + fitRef.current = fit; + + const onResize = () => { + fit.fit(); + const { cols, rows } = term; + const ws = wsRef.current; + if (ws && ws.readyState === WebSocket.OPEN) { + ws.send(JSON.stringify({ t: 'r', c: cols, r: rows })); + } + }; + window.addEventListener('resize', onResize); + term.onData((d) => { + const ws = wsRef.current; + if (ws && ws.readyState === WebSocket.OPEN) { + ws.send(JSON.stringify({ t: 'i', d })); + } + }); + + return () => { + window.removeEventListener('resize', onResize); + try { + term.dispose(); + } catch { + // ignore + } + termRef.current = null; + fitRef.current = null; + }; + }, []); + + useEffect(() => { + // Connect WS once terminal exists. + if (!termRef.current) return; + if (wsRef.current) return; + + setWsStatus('connecting'); + setWsError(null); + + const jwt = getValidJwt()?.token ?? null; + const proto = jwt ? (['openagent', `jwt.${jwt}`] as string[]) : (['openagent'] as string[]); + const u = new URL(`${API_BASE}/api/console/ws`); + u.protocol = u.protocol === 'https:' ? 'wss:' : 'ws:'; + const wsUrl = u.toString(); + const ws = new WebSocket(wsUrl, proto); + wsRef.current = ws; + + ws.onopen = () => { + setWsStatus('connected'); + termRef.current?.writeln('\x1b[1;34mConnected.\x1b[0m'); + const fit = fitRef.current; + if (fit && termRef.current) { + fit.fit(); + ws.send(JSON.stringify({ t: 'r', c: termRef.current.cols, r: termRef.current.rows })); + } + }; + ws.onmessage = (evt) => { + termRef.current?.write(typeof evt.data === 'string' ? evt.data : ''); + }; + ws.onerror = () => { + setWsStatus('error'); + setWsError('WebSocket error'); + }; + ws.onclose = () => { + setWsStatus('disconnected'); + }; + + return () => { + try { + ws.close(); + } catch { + // ignore + } + wsRef.current = null; + }; + }, []); + + return ( +
+
+
+

Console

+

+ Root shell + remote file explorer (SFTP). Keep this behind your dashboard password. +

+
+
+ + {wsStatus} + + {wsError ? {wsError} : null} +
+
+ +
+ {/* Terminal */} +
+
+
TTY
+ +
+
+
+ + {/* File explorer */} +
+
+
Files
+
+ + +
+
+ +
+ + setCwd(e.target.value)} + onKeyDown={(e) => { + if (e.key === 'Enter') void refreshDir(cwd); + }} + /> +
+ +
{ + e.preventDefault(); + e.stopPropagation(); + }} + onDrop={async (e) => { + e.preventDefault(); + e.stopPropagation(); + const files = Array.from(e.dataTransfer.files || []); + if (files.length === 0) return; + setUploading({ done: 0, total: files.length }); + try { + await uploadFiles(cwd, files, (done, total) => setUploading({ done, total })); + await refreshDir(cwd); + } catch (err) { + setFsError(err instanceof Error ? err.message : String(err)); + } finally { + setUploading(null); + } + }} + > + Drag & drop to upload into {cwd} + {uploading ? ( + + ({uploading.done}/{uploading.total}) + + ) : null} +
+ + {fsError ? ( +
+ {fsError} +
+ ) : null} + +
+
+
+
+
Name
+
Size
+
Type
+
+
+ {fsLoading ? ( +
Loading…
+ ) : sortedEntries.length === 0 ? ( +
Empty
+ ) : ( + sortedEntries.map((e) => ( + + )) + )} +
+
+
+ +
+
+
Selection
+ {selected ? ( +
+
{selected.path}
+
+ Type: {selected.kind} +
+ {selected.kind === 'file' ? ( +
+ Size: {formatBytes(selected.size)} +
+ ) : null} +
+ {selected.kind === 'file' ? ( + + ) : null} + +
+
+ ) : ( +
Click a file/folder.
+ )} +
+
+
+
+
+
+ ); +} + + diff --git a/dashboard/src/app/console/console-wrapper.tsx b/dashboard/src/app/console/console-wrapper.tsx new file mode 100644 index 0000000..d801a77 --- /dev/null +++ b/dashboard/src/app/console/console-wrapper.tsx @@ -0,0 +1,13 @@ +'use client'; + +import dynamic from 'next/dynamic'; + +const ConsoleClient = dynamic(() => import('./console-client'), { + ssr: false, +}); + +export function ConsoleWrapper() { + return ; +} + + diff --git a/dashboard/src/app/console/page.tsx b/dashboard/src/app/console/page.tsx new file mode 100644 index 0000000..b2d131e --- /dev/null +++ b/dashboard/src/app/console/page.tsx @@ -0,0 +1,18 @@ +import { Suspense } from 'react'; +import { ConsoleWrapper } from './console-wrapper'; + +export default function ConsolePage() { + return ( + +
Loading console…
+
+ } + > + + + ); +} + + diff --git a/dashboard/src/app/control/control-client.tsx b/dashboard/src/app/control/control-client.tsx index 943afcd..50975dd 100644 --- a/dashboard/src/app/control/control-client.tsx +++ b/dashboard/src/app/control/control-client.tsx @@ -408,3 +408,4 @@ export default function ControlClient() { } + diff --git a/dashboard/src/components/auth-gate.tsx b/dashboard/src/components/auth-gate.tsx index 8362a7e..79665b5 100644 --- a/dashboard/src/components/auth-gate.tsx +++ b/dashboard/src/components/auth-gate.tsx @@ -115,3 +115,4 @@ export function AuthGate({ children }: { children: React.ReactNode }) { } + diff --git a/dashboard/src/components/sidebar.tsx b/dashboard/src/components/sidebar.tsx index 93f2bff..1d1d19a 100644 --- a/dashboard/src/components/sidebar.tsx +++ b/dashboard/src/components/sidebar.tsx @@ -8,6 +8,7 @@ import { MessageSquare, Network, History, + Terminal, Settings, } from 'lucide-react'; @@ -15,6 +16,7 @@ const navigation = [ { name: 'Overview', href: '/', icon: LayoutDashboard }, { name: 'Control', href: '/control', icon: MessageSquare }, { name: 'Agents', href: '/agents', icon: Network }, + { name: 'Console', href: '/console', icon: Terminal }, { name: 'History', href: '/history', icon: History }, { name: 'Settings', href: '/settings', icon: Settings }, ]; diff --git a/dashboard/src/lib/auth.ts b/dashboard/src/lib/auth.ts index 63f5000..2faaa9e 100644 --- a/dashboard/src/lib/auth.ts +++ b/dashboard/src/lib/auth.ts @@ -50,3 +50,4 @@ export function signalAuthRequired(): void { } + diff --git a/dashboard/src/lib/utils.ts b/dashboard/src/lib/utils.ts index 60d3628..f70d411 100644 --- a/dashboard/src/lib/utils.ts +++ b/dashboard/src/lib/utils.ts @@ -34,3 +34,4 @@ export function formatRelativeTime(date: Date): string { return date.toLocaleDateString(); } + diff --git a/migrations/002_task_outcomes.sql b/migrations/002_task_outcomes.sql index ea3d175..d12db77 100644 --- a/migrations/002_task_outcomes.sql +++ b/migrations/002_task_outcomes.sql @@ -237,3 +237,4 @@ AS $$ LIMIT 3; $$; + diff --git a/src/api/auth.rs b/src/api/auth.rs index aadd69a..0a394e6 100644 --- a/src/api/auth.rs +++ b/src/api/auth.rs @@ -21,6 +21,7 @@ use jsonwebtoken::{DecodingKey, EncodingKey, Header, Validation}; use super::types::{LoginRequest, LoginResponse}; use super::routes::AppState; +use crate::config::Config; #[derive(Debug, serde::Serialize, serde::Deserialize)] struct Claims { @@ -71,6 +72,21 @@ fn verify_jwt(token: &str, secret: &str) -> anyhow::Result { Ok(token_data.claims) } +/// Verify a JWT against the server config. +/// Returns true iff: +/// - auth is not required (dev mode), OR +/// - auth is required and the token is valid. +pub fn verify_token_for_config(token: &str, config: &Config) -> bool { + if !config.auth.auth_required(config.dev_mode) { + return true; + } + let secret = match config.auth.jwt_secret.as_deref() { + Some(s) => s, + None => return false, + }; + verify_jwt(token, secret).is_ok() +} + pub async fn login( State(state): State>, Json(req): Json, diff --git a/src/api/console.rs b/src/api/console.rs new file mode 100644 index 0000000..93ea44d --- /dev/null +++ b/src/api/console.rs @@ -0,0 +1,230 @@ +//! WebSocket-backed SSH console (PTY) for the dashboard. + +use std::sync::Arc; + +use axum::{ + extract::{ws::{Message, WebSocket, WebSocketUpgrade}, State}, + http::{HeaderMap, StatusCode}, + response::IntoResponse, +}; +use futures::{SinkExt, StreamExt}; +use portable_pty::{native_pty_system, CommandBuilder, PtySize}; +use serde::Deserialize; +use tokio::sync::mpsc; + +use super::auth; +use super::routes::AppState; +use super::ssh_util::materialize_private_key; + +#[derive(Debug, Deserialize)] +#[serde(tag = "t")] +enum ClientMsg { + #[serde(rename = "i")] + Input { d: String }, + #[serde(rename = "r")] + Resize { c: u16, r: u16 }, +} + +fn extract_jwt_from_protocols(headers: &HeaderMap) -> Option { + let raw = headers + .get("sec-websocket-protocol") + .and_then(|v| v.to_str().ok())?; + // Client sends: ["openagent", "jwt."] + for part in raw.split(',').map(|s| s.trim()) { + if let Some(rest) = part.strip_prefix("jwt.") { + if !rest.is_empty() { + return Some(rest.to_string()); + } + } + } + None +} + +pub async fn console_ws( + ws: WebSocketUpgrade, + State(state): State>, + headers: HeaderMap, +) -> impl IntoResponse { + // Enforce auth in non-dev mode by taking JWT from Sec-WebSocket-Protocol. + if state.config.auth.auth_required(state.config.dev_mode) { + let token = match extract_jwt_from_protocols(&headers) { + Some(t) => t, + None => return (StatusCode::UNAUTHORIZED, "Missing websocket JWT").into_response(), + }; + if !auth::verify_token_for_config(&token, &state.config) { + return (StatusCode::UNAUTHORIZED, "Invalid or expired token").into_response(); + } + } + + // Select a stable subprotocol if client offered it. + ws.protocols(["openagent"]).on_upgrade(move |socket| handle_console(socket, state)) +} + +async fn handle_console(mut socket: WebSocket, state: Arc) { + let cfg = state.config.console_ssh.clone(); + let key = match cfg.private_key.as_deref() { + Some(k) if !k.trim().is_empty() => k, + _ => { + let _ = socket + .send(Message::Text("Console SSH is not configured on the server.".into())) + .await; + let _ = socket.close().await; + return; + } + }; + + let key_file = match materialize_private_key(key).await { + Ok(k) => k, + Err(e) => { + let _ = socket + .send(Message::Text(format!("Failed to load SSH key: {}", e))) + .await; + let _ = socket.close().await; + return; + } + }; + + let pty_system = native_pty_system(); + let pair = match pty_system.openpty(PtySize { + rows: 24, + cols: 80, + pixel_width: 0, + pixel_height: 0, + }) { + Ok(p) => p, + Err(e) => { + let _ = socket + .send(Message::Text(format!("Failed to open PTY: {}", e))) + .await; + let _ = socket.close().await; + return; + } + }; + + let mut cmd = CommandBuilder::new("ssh"); + cmd.arg("-i"); + cmd.arg(key_file.path()); + cmd.arg("-p"); + cmd.arg(cfg.port.to_string()); + cmd.arg("-o"); + cmd.arg("BatchMode=yes"); + cmd.arg("-o"); + cmd.arg("StrictHostKeyChecking=accept-new"); + cmd.arg("-o"); + cmd.arg(format!( + "UserKnownHostsFile={}", + std::env::temp_dir() + .join("open_agent_known_hosts") + .to_string_lossy() + )); + // Allocate PTY on the remote side too. + cmd.arg("-tt"); + cmd.arg(format!("{}@{}", cfg.user, cfg.host)); + cmd.env("TERM", "xterm-256color"); + + let mut child = match pair.slave.spawn_command(cmd) { + Ok(c) => c, + Err(e) => { + let _ = socket + .send(Message::Text(format!("Failed to spawn ssh: {}", e))) + .await; + let _ = socket.close().await; + return; + } + }; + drop(pair.slave); + + let mut reader = match pair.master.try_clone_reader() { + Ok(r) => r, + Err(_) => { + let _ = socket.close().await; + let _ = child.kill(); + return; + } + }; + + let (to_pty_tx, mut to_pty_rx) = mpsc::unbounded_channel::(); + let (from_pty_tx, mut from_pty_rx) = mpsc::unbounded_channel::(); + + // Writer/resizer thread. + let master_for_writer = pair.master; + let mut writer = match master_for_writer.take_writer() { + Ok(w) => w, + Err(_) => { + let _ = socket.close().await; + let _ = child.kill(); + return; + } + }; + + let writer_task = tokio::task::spawn_blocking(move || { + use std::io::Write; + while let Some(msg) = to_pty_rx.blocking_recv() { + match msg { + ClientMsg::Input { d } => { + let _ = writer.write_all(d.as_bytes()); + let _ = writer.flush(); + } + ClientMsg::Resize { c, r } => { + let _ = master_for_writer.resize(PtySize { + rows: r, + cols: c, + pixel_width: 0, + pixel_height: 0, + }); + } + } + } + }); + + // Reader thread. + let reader_task = tokio::task::spawn_blocking(move || { + use std::io::Read; + let mut buf = [0u8; 8192]; + loop { + match reader.read(&mut buf) { + Ok(0) => break, + Ok(n) => { + let s = String::from_utf8_lossy(&buf[..n]).to_string(); + let _ = from_pty_tx.send(s); + } + Err(_) => break, + } + } + }); + + let (mut ws_sender, mut ws_receiver) = socket.split(); + + // Pump PTY output to WS. + let send_task = tokio::spawn(async move { + while let Some(chunk) = from_pty_rx.recv().await { + if ws_sender.send(Message::Text(chunk)).await.is_err() { + break; + } + } + }); + + // WS -> PTY + while let Some(Ok(msg)) = ws_receiver.next().await { + match msg { + Message::Text(t) => { + if let Ok(parsed) = serde_json::from_str::(&t) { + let _ = to_pty_tx.send(parsed); + } + } + Message::Binary(_) => {} + Message::Close(_) => break, + _ => {} + } + } + + // Cleanup + let _ = child.kill(); + drop(to_pty_tx); + let _ = writer_task.await; + let _ = reader_task.await; + let _ = send_task.await; +} + + + diff --git a/src/api/fs.rs b/src/api/fs.rs new file mode 100644 index 0000000..68416d2 --- /dev/null +++ b/src/api/fs.rs @@ -0,0 +1,241 @@ +//! Remote file explorer endpoints (list/upload/download) via SSH + SFTP (OpenSSH). +//! +//! Note: uploads/downloads use `sftp` for transfer performance; directory listing uses `ssh` to run a small +//! Python snippet that returns JSON (easier/safer than parsing `sftp ls` output). + +use std::sync::Arc; + +use axum::{ + body::Body, + extract::{Multipart, Query, State}, + http::{header, HeaderMap, StatusCode}, + response::{IntoResponse, Response}, + Json, +}; +use base64::Engine; +use serde::{Deserialize, Serialize}; +use tokio::io::AsyncWriteExt; +use tokio_util::io::ReaderStream; + +use super::routes::AppState; +use super::ssh_util::{materialize_private_key, sftp_batch, ssh_exec}; + +#[derive(Debug, Deserialize)] +pub struct PathQuery { + pub path: String, +} + +#[derive(Debug, Deserialize)] +pub struct MkdirRequest { + pub path: String, +} + +#[derive(Debug, Deserialize)] +pub struct RmRequest { + pub path: String, + pub recursive: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct FsEntry { + pub name: String, + pub path: String, + pub kind: String, // file/dir/link/other + pub size: u64, + pub mtime: i64, +} + +fn python_list_script_b64() -> String { + // Reads sys.argv[1] as path; prints JSON list to stdout. + let script = r#" +import os, sys, json, stat + +path = sys.argv[1] +out = [] +try: + with os.scandir(path) as it: + for e in it: + try: + st = e.stat(follow_symlinks=False) + mode = st.st_mode + if stat.S_ISDIR(mode): + kind = "dir" + elif stat.S_ISREG(mode): + kind = "file" + elif stat.S_ISLNK(mode): + kind = "link" + else: + kind = "other" + out.append({ + "name": e.name, + "path": os.path.join(path, e.name), + "kind": kind, + "size": int(st.st_size), + "mtime": int(st.st_mtime), + }) + except Exception: + continue +except FileNotFoundError: + out = [] + +print(json.dumps(out)) +"#; + base64::engine::general_purpose::STANDARD.encode(script.as_bytes()) +} + +async fn get_key_and_cfg(state: &Arc) -> Result<(crate::config::ConsoleSshConfig, super::ssh_util::TempKeyFile), (StatusCode, String)> { + let cfg = state.config.console_ssh.clone(); + let key = cfg + .private_key + .as_deref() + .ok_or_else(|| (StatusCode::SERVICE_UNAVAILABLE, "Console SSH not configured".to_string()))?; + let key_file = materialize_private_key(key) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; + Ok((cfg, key_file)) +} + +pub async fn list( + State(state): State>, + Query(q): Query, +) -> Result>, (StatusCode, String)> { + let (cfg, key_file) = get_key_and_cfg(&state).await?; + + let b64 = python_list_script_b64(); + let code = format!("import base64,sys; exec(base64.b64decode('{}'))", b64); + let out = ssh_exec( + &cfg, + key_file.path(), + "python3", + &vec!["-c".into(), code, q.path.clone()], + ) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; + + let parsed = serde_json::from_str::>(&out) + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("parse error: {}", e)))?; + Ok(Json(parsed)) +} + +pub async fn mkdir( + State(state): State>, + Json(req): Json, +) -> Result, (StatusCode, String)> { + let (cfg, key_file) = get_key_and_cfg(&state).await?; + ssh_exec(&cfg, key_file.path(), "mkdir", &vec!["-p".into(), req.path]) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; + Ok(Json(serde_json::json!({ "ok": true }))) +} + +pub async fn rm( + State(state): State>, + Json(req): Json, +) -> Result, (StatusCode, String)> { + let (cfg, key_file) = get_key_and_cfg(&state).await?; + let recursive = req.recursive.unwrap_or(false); + let mut args = vec![]; + if recursive { + args.push("-rf".to_string()); + } else { + args.push("-f".to_string()); + } + args.push(req.path); + ssh_exec(&cfg, key_file.path(), "rm", &args) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; + Ok(Json(serde_json::json!({ "ok": true }))) +} + +pub async fn download( + State(state): State>, + Query(q): Query, +) -> Result { + let (cfg, key_file) = get_key_and_cfg(&state).await?; + + let tmp = std::env::temp_dir().join(format!("open_agent_dl_{}", uuid::Uuid::new_v4())); + let batch = format!("get -p \"{}\" \"{}\"\n", q.path, tmp.to_string_lossy()); + sftp_batch(&cfg, key_file.path(), &batch) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; + + let file = tokio::fs::File::open(&tmp) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; + let stream = ReaderStream::new(file); + let body = Body::from_stream(stream); + + let filename = q.path.split('/').last().unwrap_or("download"); + let mut headers = HeaderMap::new(); + headers.insert( + header::CONTENT_DISPOSITION, + format!("attachment; filename=\"{}\"", filename) + .parse() + .unwrap(), + ); + headers.insert(header::CONTENT_TYPE, "application/octet-stream".parse().unwrap()); + + // Best-effort cleanup (delete after a short delay). + let tmp_cleanup = tmp.clone(); + tokio::spawn(async move { + tokio::time::sleep(std::time::Duration::from_secs(30)).await; + let _ = tokio::fs::remove_file(tmp_cleanup).await; + }); + + Ok((headers, body).into_response()) +} + +pub async fn upload( + State(state): State>, + Query(q): Query, + mut multipart: Multipart, +) -> Result, (StatusCode, String)> { + let (cfg, key_file) = get_key_and_cfg(&state).await?; + + // Expect one file field. + while let Some(field) = multipart + .next_field() + .await + .map_err(|e| (StatusCode::BAD_REQUEST, e.to_string()))? + { + let file_name = field.file_name().map(|s| s.to_string()).unwrap_or_else(|| "upload.bin".to_string()); + // Stream to temp file first (avoid buffering large uploads in memory). + let tmp = std::env::temp_dir().join(format!("open_agent_ul_{}", uuid::Uuid::new_v4())); + let mut f = tokio::fs::File::create(&tmp) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; + + let mut field = field; + while let Some(chunk) = field + .chunk() + .await + .map_err(|e| (StatusCode::BAD_REQUEST, e.to_string()))? + { + f.write_all(&chunk) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; + } + f.flush() + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; + + let remote_path = if q.path.ends_with('/') { + format!("{}{}", q.path, file_name) + } else { + format!("{}/{}", q.path, file_name) + }; + + let batch = format!("put -p \"{}\" \"{}\"\n", tmp.to_string_lossy(), remote_path); + sftp_batch(&cfg, key_file.path(), &batch) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; + let _ = tokio::fs::remove_file(tmp).await; + + return Ok(Json(serde_json::json!({ "ok": true, "path": q.path, "name": file_name }))); + } + + Err((StatusCode::BAD_REQUEST, "missing file".to_string())) +} + + + diff --git a/src/api/mod.rs b/src/api/mod.rs index 30bd5c9..147ac45 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -9,6 +9,9 @@ mod routes; mod auth; +mod console; +mod fs; +mod ssh_util; pub mod types; pub use routes::serve; diff --git a/src/api/routes.rs b/src/api/routes.rs index 7751aa2..18b5af0 100644 --- a/src/api/routes.rs +++ b/src/api/routes.rs @@ -31,6 +31,8 @@ use crate::tools::ToolRegistry; use super::types::*; use super::auth; +use super::console; +use super::fs; /// Shared application state. pub struct AppState { @@ -62,7 +64,9 @@ pub async fn serve(config: Config) -> anyhow::Result<()> { let public_routes = Router::new() .route("/api/health", get(health)) - .route("/api/auth/login", post(auth::login)); + .route("/api/auth/login", post(auth::login)) + // WebSocket console uses subprotocol-based auth (browser can't set Authorization header) + .route("/api/console/ws", get(console::console_ws)); let protected_routes = Router::new() .route("/api/stats", get(get_stats)) @@ -77,6 +81,12 @@ pub async fn serve(config: Config) -> anyhow::Result<()> { .route("/api/runs/:id/events", get(get_run_events)) .route("/api/runs/:id/tasks", get(get_run_tasks)) .route("/api/memory/search", get(search_memory)) + // Remote file explorer endpoints (use Authorization header) + .route("/api/fs/list", get(fs::list)) + .route("/api/fs/download", get(fs::download)) + .route("/api/fs/upload", post(fs::upload)) + .route("/api/fs/mkdir", post(fs::mkdir)) + .route("/api/fs/rm", post(fs::rm)) .layer(middleware::from_fn_with_state(Arc::clone(&state), auth::require_auth)); let app = Router::new() diff --git a/src/api/ssh_util.rs b/src/api/ssh_util.rs new file mode 100644 index 0000000..9dcef86 --- /dev/null +++ b/src/api/ssh_util.rs @@ -0,0 +1,126 @@ +//! SSH helpers for the dashboard console + file explorer. + +use std::path::{Path, PathBuf}; +use std::time::Duration; + +use tokio::io::AsyncWriteExt; +use tokio::process::Command; +use uuid::Uuid; + +use crate::config::ConsoleSshConfig; + +/// A temporary SSH key file (best-effort cleanup on drop). +pub struct TempKeyFile { + path: PathBuf, +} + +impl TempKeyFile { + pub fn path(&self) -> &Path { + &self.path + } +} + +impl Drop for TempKeyFile { + fn drop(&mut self) { + let _ = std::fs::remove_file(&self.path); + } +} + +pub async fn materialize_private_key(private_key: &str) -> anyhow::Result { + let name = format!("open_agent_console_key_{}.key", Uuid::new_v4()); + let path = std::env::temp_dir().join(name); + let mut f = tokio::fs::File::create(&path).await?; + f.write_all(private_key.as_bytes()).await?; + f.flush().await?; + + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + let perm = std::fs::Permissions::from_mode(0o600); + std::fs::set_permissions(&path, perm)?; + } + + Ok(TempKeyFile { path }) +} + +fn ssh_base_args(cfg: &ConsoleSshConfig, key_path: &Path) -> Vec { + vec![ + "-i".to_string(), + key_path.to_string_lossy().to_string(), + "-p".to_string(), + cfg.port.to_string(), + "-o".to_string(), + "BatchMode=yes".to_string(), + "-o".to_string(), + "StrictHostKeyChecking=accept-new".to_string(), + "-o".to_string(), + // Keep known_hosts separate from system to avoid permission issues. + format!( + "UserKnownHostsFile={}", + std::env::temp_dir() + .join("open_agent_known_hosts") + .to_string_lossy() + ), + ] +} + +pub async fn ssh_exec(cfg: &ConsoleSshConfig, key_path: &Path, remote_cmd: &str, args: &[String]) -> anyhow::Result { + let mut cmd = Command::new("ssh"); + for a in ssh_base_args(cfg, key_path) { + cmd.arg(a); + } + + cmd.arg(format!("{}@{}", cfg.user, cfg.host)); + cmd.arg("--"); + cmd.arg(remote_cmd); + for a in args { + cmd.arg(a); + } + + let out = tokio::time::timeout(Duration::from_secs(30), cmd.output()).await??; + if !out.status.success() { + return Err(anyhow::anyhow!( + "ssh failed (code {:?}): {}", + out.status.code(), + String::from_utf8_lossy(&out.stderr) + )); + } + Ok(String::from_utf8_lossy(&out.stdout).to_string()) +} + +pub async fn sftp_batch(cfg: &ConsoleSshConfig, key_path: &Path, batch: &str) -> anyhow::Result<()> { + let mut cmd = Command::new("sftp"); + cmd.arg("-b").arg("-"); + cmd.arg("-i").arg(key_path); + cmd.arg("-P").arg(cfg.port.to_string()); + cmd.arg("-o").arg("BatchMode=yes"); + cmd.arg("-o").arg("StrictHostKeyChecking=accept-new"); + cmd.arg("-o").arg(format!( + "UserKnownHostsFile={}", + std::env::temp_dir() + .join("open_agent_known_hosts") + .to_string_lossy() + )); + cmd.arg(format!("{}@{}", cfg.user, cfg.host)); + + cmd.stdin(std::process::Stdio::piped()); + cmd.stdout(std::process::Stdio::piped()); + cmd.stderr(std::process::Stdio::piped()); + + let mut child = cmd.spawn()?; + if let Some(mut stdin) = child.stdin.take() { + stdin.write_all(batch.as_bytes()).await?; + } + let out = tokio::time::timeout(Duration::from_secs(120), child.wait_with_output()).await??; + if !out.status.success() { + return Err(anyhow::anyhow!( + "sftp failed (code {:?}): {}", + out.status.code(), + String::from_utf8_lossy(&out.stderr) + )); + } + Ok(()) +} + + + diff --git a/src/config.rs b/src/config.rs index 9414eb6..206695d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -7,6 +7,11 @@ //! - `HOST` - Optional. Server host. Defaults to `127.0.0.1`. //! - `PORT` - Optional. Server port. Defaults to `3000`. //! - `MAX_ITERATIONS` - Optional. Maximum agent loop iterations. Defaults to `50`. +//! - `CONSOLE_SSH_HOST` - Optional. Host for dashboard console/file explorer SSH (default: 127.0.0.1). +//! - `CONSOLE_SSH_PORT` - Optional. SSH port (default: 22). +//! - `CONSOLE_SSH_USER` - Optional. SSH user (default: root). +//! - `CONSOLE_SSH_PRIVATE_KEY_B64` - Optional. Base64-encoded OpenSSH private key. +//! - `CONSOLE_SSH_PRIVATE_KEY` - Optional. Raw (multiline) OpenSSH private key (fallback). //! - `SUPABASE_URL` - Optional. Supabase project URL for memory storage. //! - `SUPABASE_SERVICE_ROLE_KEY` - Optional. Service role key for Supabase. //! - `MEMORY_EMBED_MODEL` - Optional. Embedding model. Defaults to `openai/text-embedding-3-small`. @@ -14,6 +19,7 @@ use std::path::PathBuf; use thiserror::Error; +use base64::Engine; #[derive(Debug, Error)] pub enum ConfigError { @@ -89,10 +95,43 @@ pub struct Config { /// API auth configuration (dashboard login) pub auth: AuthConfig, + /// Remote console/file explorer SSH configuration (optional). + pub console_ssh: ConsoleSshConfig, + /// Memory/storage configuration pub memory: MemoryConfig, } +/// SSH configuration for the dashboard console + file explorer. +#[derive(Debug, Clone)] +pub struct ConsoleSshConfig { + /// Host to SSH into (default: 127.0.0.1) + pub host: String, + /// SSH port (default: 22) + pub port: u16, + /// SSH username (default: root) + pub user: String, + /// Private key (OpenSSH) used for auth (prefer *_B64 env) + pub private_key: Option, +} + +impl Default for ConsoleSshConfig { + fn default() -> Self { + Self { + host: "127.0.0.1".to_string(), + port: 22, + user: "root".to_string(), + private_key: None, + } + } +} + +impl ConsoleSshConfig { + pub fn is_configured(&self) -> bool { + self.private_key.as_ref().map(|s| !s.trim().is_empty()).unwrap_or(false) + } +} + /// API auth configuration (single-tenant). #[derive(Debug, Clone)] pub struct AuthConfig { @@ -189,6 +228,17 @@ impl Config { rerank_model: std::env::var("MEMORY_RERANK_MODEL").ok(), embed_dimension: 1536, // OpenAI text-embedding-3-small default }; + + let console_ssh = ConsoleSshConfig { + host: std::env::var("CONSOLE_SSH_HOST").unwrap_or_else(|_| "127.0.0.1".to_string()), + port: std::env::var("CONSOLE_SSH_PORT") + .ok() + .map(|v| v.parse::().map_err(|e| ConfigError::InvalidValue("CONSOLE_SSH_PORT".to_string(), format!("{}", e)))) + .transpose()? + .unwrap_or(22), + user: std::env::var("CONSOLE_SSH_USER").unwrap_or_else(|_| "root".to_string()), + private_key: read_private_key_from_env()?, + }; Ok(Self { api_key, @@ -199,6 +249,7 @@ impl Config { max_iterations, dev_mode, auth, + console_ssh, memory, }) } @@ -218,6 +269,7 @@ impl Config { max_iterations: 50, dev_mode: true, auth: AuthConfig::default(), + console_ssh: ConsoleSshConfig::default(), memory: MemoryConfig::default(), } } @@ -231,3 +283,24 @@ fn parse_bool(value: &str) -> Result { } } +fn read_private_key_from_env() -> Result, ConfigError> { + // Prefer base64 to avoid multiline env complications. + if let Ok(b64) = std::env::var("CONSOLE_SSH_PRIVATE_KEY_B64") { + if b64.trim().is_empty() { + return Ok(None); + } + let bytes = base64::engine::general_purpose::STANDARD + .decode(b64.trim().as_bytes()) + .map_err(|e| ConfigError::InvalidValue("CONSOLE_SSH_PRIVATE_KEY_B64".to_string(), format!("{}", e)))?; + let s = String::from_utf8(bytes) + .map_err(|e| ConfigError::InvalidValue("CONSOLE_SSH_PRIVATE_KEY_B64".to_string(), format!("{}", e)))?; + return Ok(Some(s)); + } + + // Fallback: raw private key in env (EnvironmentFile can support multiline). + match std::env::var("CONSOLE_SSH_PRIVATE_KEY") { + Ok(s) if !s.trim().is_empty() => Ok(Some(s)), + _ => Ok(None), + } +} + diff --git a/src/memory/embed.rs b/src/memory/embed.rs index ee8155a..4014766 100644 --- a/src/memory/embed.rs +++ b/src/memory/embed.rs @@ -125,3 +125,4 @@ struct EmbeddingUsage { total_tokens: u32, } + diff --git a/src/memory/mod.rs b/src/memory/mod.rs index 979a3de..e3cae8d 100644 --- a/src/memory/mod.rs +++ b/src/memory/mod.rs @@ -95,3 +95,4 @@ pub struct MemorySystem { pub retriever: Arc, } +