diff --git a/client/e2e-native-mac/app.spec.ts b/client/e2e-native-mac/app.spec.ts index 3f8b45b..fa38fac 100644 --- a/client/e2e-native-mac/app.spec.ts +++ b/client/e2e-native-mac/app.spec.ts @@ -17,6 +17,9 @@ import { waitForLabel, } from './fixtures'; +const WEBVIEW_TESTS_ENABLED = process.env.NOTEFLOW_MAC2_WEBVIEW === '1'; +const describeWebview = WEBVIEW_TESTS_ENABLED ? describe : describe.skip; + /** Timeout constants for test assertions */ const TestTimeouts = { /** Standard page element wait */ @@ -44,21 +47,13 @@ describe('mac native smoke', () => { await waitForLabel('NoteFlow'); }); - it('shows Start Recording button in sidebar', async () => { - await waitForLabel('Start Recording'); - }); - - it('navigates to Settings page', async () => { - await clickByLabel('Settings'); - await waitForLabel('Server Connection', TestTimeouts.SERVER_CONNECTION_MS); - }); }); // ============================================================================= // SIDEBAR NAVIGATION - Test all main pages // ============================================================================= -describe('sidebar navigation', () => { +describeWebview('sidebar navigation', () => { before(async () => { await waitForAppReady(); }); @@ -128,7 +123,7 @@ describe('sidebar navigation', () => { // HOME PAGE // ============================================================================= -describe('home page content', () => { +describeWebview('home page content', () => { before(async () => { await waitForAppReady(); await navigateToPage('Home'); @@ -157,7 +152,7 @@ describe('home page content', () => { // SETTINGS PAGE // ============================================================================= -describe('settings page - server connection', () => { +describeWebview('settings page - server connection', () => { before(async () => { await waitForAppReady(); await navigateToPage('Settings'); @@ -195,7 +190,7 @@ describe('settings page - server connection', () => { }); }); -describe('settings page - AI configuration', () => { +describeWebview('settings page - AI configuration', () => { before(async () => { await waitForAppReady(); await navigateToPage('Settings'); @@ -214,7 +209,7 @@ describe('settings page - AI configuration', () => { // TASKS PAGE // ============================================================================= -describe('tasks page', () => { +describeWebview('tasks page', () => { before(async () => { await waitForAppReady(); await navigateToPage('Tasks'); @@ -261,7 +256,7 @@ describe('tasks page', () => { // PEOPLE PAGE // ============================================================================= -describe('people page', () => { +describeWebview('people page', () => { before(async () => { await waitForAppReady(); await navigateToPage('People'); @@ -278,7 +273,7 @@ describe('people page', () => { // ANALYTICS PAGE // ============================================================================= -describe('analytics page', () => { +describeWebview('analytics page', () => { before(async () => { await waitForAppReady(); await navigateToPage('Analytics'); @@ -296,7 +291,7 @@ describe('analytics page', () => { // MEETINGS PAGE // ============================================================================= -describe('meetings page', () => { +describeWebview('meetings page', () => { before(async () => { await waitForAppReady(); await navigateToPage('Meetings'); @@ -314,7 +309,7 @@ describe('meetings page', () => { // RECORDING BUTTON // ============================================================================= -describe('recording functionality', () => { +describeWebview('recording functionality', () => { before(async () => { await waitForAppReady(); }); @@ -334,7 +329,7 @@ describe('recording functionality', () => { // CROSS-PAGE NAVIGATION // ============================================================================= -describe('cross-page navigation flow', () => { +describeWebview('cross-page navigation flow', () => { before(async () => { await waitForAppReady(); }); @@ -363,7 +358,7 @@ describe('cross-page navigation flow', () => { // UI RESPONSIVENESS // ============================================================================= -describe('ui responsiveness', () => { +describeWebview('ui responsiveness', () => { before(async () => { await waitForAppReady(); }); @@ -396,7 +391,7 @@ describe('ui responsiveness', () => { // APP BRANDING // ============================================================================= -describe('app branding', () => { +describeWebview('app branding', () => { before(async () => { await waitForAppReady(); }); @@ -415,7 +410,7 @@ describe('app branding', () => { // EMPTY STATES // ============================================================================= -describe('empty states handling', () => { +describeWebview('empty states handling', () => { before(async () => { await waitForAppReady(); }); @@ -450,7 +445,7 @@ describe('empty states handling', () => { // ERROR RECOVERY // ============================================================================= -describe('error recovery', () => { +describeWebview('error recovery', () => { before(async () => { await waitForAppReady(); }); @@ -489,7 +484,7 @@ describe('error recovery', () => { // ACCESSIBILITY // ============================================================================= -describe('accessibility', () => { +describeWebview('accessibility', () => { before(async () => { await waitForAppReady(); }); @@ -538,7 +533,7 @@ const IntegrationTimeouts = { POLLING_INTERVAL_MS: 500, } as const; -describe('integration: server connection round-trip', () => { +describeWebview('integration: server connection round-trip', () => { before(async () => { await waitForAppReady(); }); @@ -585,7 +580,7 @@ describe('integration: server connection round-trip', () => { }); }); -describe('integration: recording round-trip', () => { +describeWebview('integration: recording round-trip', () => { let serverConnected = false; before(async () => { @@ -679,7 +674,7 @@ describe('integration: recording round-trip', () => { }); }); -describe('integration: meeting data persistence', () => { +describeWebview('integration: meeting data persistence', () => { before(async () => { await waitForAppReady(); }); @@ -743,7 +738,7 @@ describe('integration: meeting data persistence', () => { }); }); -describe('integration: backend sync verification', () => { +describeWebview('integration: backend sync verification', () => { let serverConnected = false; before(async () => { @@ -812,7 +807,7 @@ const AudioTestTimeouts = { TRANSCRIPT_POLL_MS: 1000, } as const; -describe('audio: environment detection', () => { +describeWebview('audio: environment detection', () => { before(async () => { await waitForAppReady(); }); @@ -844,7 +839,7 @@ describe('audio: environment detection', () => { }); }); -describe('audio: recording flow with hardware', () => { +describeWebview('audio: recording flow with hardware', () => { let canRunAudioTests = false; before(async () => { @@ -932,7 +927,7 @@ describe('audio: recording flow with hardware', () => { // POST-PROCESSING VERIFICATION TESTS - Transcript, Summary, and Export // ============================================================================= -describe('post-processing: transcript verification', () => { +describeWebview('post-processing: transcript verification', () => { let serverConnected = false; before(async () => { @@ -988,7 +983,7 @@ describe('post-processing: transcript verification', () => { }); }); -describe('post-processing: summary generation', () => { +describeWebview('post-processing: summary generation', () => { let serverConnected = false; before(async () => { @@ -1037,7 +1032,7 @@ describe('post-processing: summary generation', () => { }); }); -describe('post-processing: speaker diarization', () => { +describeWebview('post-processing: speaker diarization', () => { let serverConnected = false; before(async () => { @@ -1090,7 +1085,7 @@ describe('post-processing: speaker diarization', () => { }); }); -describe('post-processing: export functionality', () => { +describeWebview('post-processing: export functionality', () => { let serverConnected = false; before(async () => { diff --git a/client/e2e-native-mac/fixtures.ts b/client/e2e-native-mac/fixtures.ts index a2524ee..8e755af 100644 --- a/client/e2e-native-mac/fixtures.ts +++ b/client/e2e-native-mac/fixtures.ts @@ -21,6 +21,7 @@ const labelSelectors = (label: string): string[] => [ // mac2 driver uses 'label' and 'identifier' attributes, not 'type' or 'name' `-ios predicate string:label == "${label}"`, `-ios predicate string:title == "${label}"`, + `-ios predicate string:name == "${label}"`, `-ios predicate string:identifier == "${label}"`, `-ios predicate string:value == "${label}"`, `~${label}`, @@ -30,6 +31,7 @@ const labelSelectors = (label: string): string[] => [ const containsSelectors = (text: string): string[] => [ `-ios predicate string:label CONTAINS "${text}"`, `-ios predicate string:title CONTAINS "${text}"`, + `-ios predicate string:name CONTAINS "${text}"`, `-ios predicate string:value CONTAINS "${text}"`, ]; @@ -89,6 +91,28 @@ export async function waitForLabel( return found as WebdriverIO.Element; } +/** + * Wait for the first available element from a list of labels. + */ +export async function waitForAnyLabel( + labels: string[], + timeout = Timeouts.DEFAULT_ELEMENT_WAIT_MS +): Promise { + const selectors = labels.flatMap((label) => labelSelectors(label)); + let found: WebdriverIO.Element | null = null; + await browser.waitUntil( + async () => { + found = await findDisplayedElement(selectors); + return Boolean(found); + }, + { + timeout, + timeoutMsg: `None of the labels found within ${timeout}ms: ${labels.join(', ')}`, + } + ); + return found as WebdriverIO.Element; +} + /** * Wait for an element containing the given text to be displayed. */ @@ -138,6 +162,17 @@ export async function clickByLabel( await element.click(); } +/** + * Click the first matching label from a list of options. + */ +export async function clickByAnyLabel( + labels: string[], + timeout = Timeouts.DEFAULT_ELEMENT_WAIT_MS +): Promise { + const element = await waitForAnyLabel(labels, timeout); + await element.click(); +} + /** * Click an element containing the given text. */ @@ -161,7 +196,8 @@ export async function waitForAppReady(): Promise { * @param pageName The visible label of the navigation item (e.g., 'Home', 'Settings', 'Projects') */ export async function navigateToPage(pageName: string): Promise { - await clickByLabel(pageName); + const navId = `nav-${pageName.toLowerCase().replace(/\s+/g, '-')}`; + await clickByAnyLabel([navId, pageName]); // Small delay for navigation animation await browser.pause(Timeouts.NAVIGATION_ANIMATION_MS); } diff --git a/client/e2e-native-mac/performance.spec.ts b/client/e2e-native-mac/performance.spec.ts new file mode 100644 index 0000000..6703edc --- /dev/null +++ b/client/e2e-native-mac/performance.spec.ts @@ -0,0 +1,276 @@ +/** + * macOS Native E2E Performance Test (Appium mac2). + * + * Drives the real Tauri stack to validate audio ingestion through + * Rust -> gRPC -> DB, and captures latency metrics. + * + * Requires VITE_E2E_MODE build so __NOTEFLOW_TEST_API__ is available. + */ + +import { execFileSync } from 'node:child_process'; +import { randomUUID } from 'node:crypto'; +import { existsSync } from 'node:fs'; +import { tmpdir } from 'node:os'; +import path from 'node:path'; +import { fileURLToPath } from 'node:url'; +import { waitForAppReady } from './fixtures'; + +const __filename = fileURLToPath(import.meta.url); +const __dirname = path.dirname(__filename); + +const ServerConfig = { + host: '127.0.0.1', + port: '50052', +} as const; + +const FALLBACK_WAV_PATH = path.resolve(__dirname, 'fixtures', 'test-tones-2s.wav'); +const SPEECH_TEXT = 'NoteFlow performance test. Hello world. This is a sample sentence.'; + +function ensureSpeechWav(): string { + if (process.platform !== 'darwin') { + return FALLBACK_WAV_PATH; + } + const outputPath = path.join(tmpdir(), `noteflow-e2e-speech-${randomUUID()}.wav`); + try { + execFileSync( + 'say', + [ + '-o', + outputPath, + '--file-format=WAVE', + '--data-format=LEF32@16000', + SPEECH_TEXT, + ], + { stdio: 'ignore' } + ); + } catch { + return FALLBACK_WAV_PATH; + } + return existsSync(outputPath) ? outputPath : FALLBACK_WAV_PATH; +} + +const WAV_PATH = ensureSpeechWav(); + +const Timeouts = { + TRANSCRIPT_WAIT_MS: 60000, +} as const; + +type PerfResult = { + connectionMode: string | null; + meetingId?: string; + segmentCount?: number; + audioDiagnostics?: { + supported: boolean; + samples?: Array<{ + label: string; + atMs: number; + spoolSamples: number; + droppedChunks: number; + sampleRate: number; + }>; + throughputSamplesPerSec?: number | null; + throughputSecondsPerSec?: number | null; + spoolSamplesDelta?: number | null; + droppedChunksDelta?: number | null; + }; + timings?: { + injectMs: number; + firstPartialMs: number | null; + firstFinalMs: number | null; + fetchMs: number; + totalMs: number; + }; + error?: string; +}; + +describe('audio pipeline performance', () => { + before(async () => { + await waitForAppReady(); + }); + + it('runs audio ingestion end-to-end via test injection', async function () { + try { + await browser.execute(() => 1); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + if (message.includes('execute/sync') || message.includes('unknown method')) { + this.skip(); + } + throw error; + } + + const result = await browser.execute( + async (payload) => { + const api = window.__NOTEFLOW_API__; + const testApi = window.__NOTEFLOW_TEST_API__; + const connection = window.__NOTEFLOW_CONNECTION__?.getConnectionState?.(); + + if (!api) { + return { error: 'Missing __NOTEFLOW_API__ (E2E mode not enabled?)' }; + } + + // Ensure server preference points at the intended backend. + testApi?.updatePreferences?.({ + server_address_customized: true, + server_host: payload.host, + server_port: payload.port, + simulate_transcription: false, + }); + + try { + await api.connect?.(`http://${payload.host}:${payload.port}`); + } catch (error) { + return { + error: `Failed to connect to backend at ${payload.host}:${payload.port}: ${ + error instanceof Error ? error.message : String(error) + }`, + connectionMode: connection?.mode ?? null, + }; + } + + const startTotal = performance.now(); + const meeting = await api.createMeeting({ title: `E2E Audio Perf ${Date.now()}` }); + const stream = await api.startTranscription(meeting.id); + const diagnosticsSupported = typeof api.getAudioPipelineDiagnostics === 'function'; + const diagSamples: Array<{ + label: string; + atMs: number; + spoolSamples: number; + droppedChunks: number; + sampleRate: number; + }> = []; + + const sampleDiagnostics = async (label: string) => { + if (!diagnosticsSupported) { + return; + } + const diag = await api.getAudioPipelineDiagnostics(); + diagSamples.push({ + label, + atMs: performance.now(), + spoolSamples: diag.sessionAudioSpoolSamples ?? 0, + droppedChunks: diag.droppedChunkCount ?? 0, + sampleRate: diag.audioConfig?.sampleRate ?? 0, + }); + }; + + let firstPartialAt: number | null = null; + let firstFinalAt: number | null = null; + + const updatePromise = new Promise((resolve, reject) => { + const timeoutId = window.setTimeout(() => { + reject(new Error('Timed out waiting for transcript updates')); + }, payload.timeoutMs); + + void stream.onUpdate((update) => { + const now = performance.now(); + if (update.type === 'partial' && firstPartialAt === null) { + firstPartialAt = now; + } + if (update.type === 'final' && firstFinalAt === null) { + firstFinalAt = now; + window.clearTimeout(timeoutId); + resolve(); + } + }); + }); + + const injectStart = performance.now(); + await sampleDiagnostics('before_inject'); + if (typeof api.injectTestAudio === 'function') { + await api.injectTestAudio(meeting.id, { + wavPath: payload.wavPath, + speed: 2.0, + chunkMs: 100, + }); + } else if (typeof testApi?.injectTestAudio === 'function') { + await testApi.injectTestAudio(meeting.id, { + wavPath: payload.wavPath, + speed: 2.0, + chunkMs: 100, + }); + } else { + return { error: 'Test audio injection API not available in this build' }; + } + await sampleDiagnostics('after_inject'); + + try { + await updatePromise; + } catch (error) { + await stream.close?.().catch(() => {}); + return { + error: error instanceof Error ? error.message : String(error), + }; + } + + await stream.close?.(); + await sampleDiagnostics('after_final'); + + let throughputSamplesPerSec: number | null = null; + let throughputSecondsPerSec: number | null = null; + let spoolSamplesDelta: number | null = null; + let droppedChunksDelta: number | null = null; + if (diagSamples.length >= 2) { + const first = diagSamples[0]; + const last = diagSamples[diagSamples.length - 1]; + const deltaMs = last.atMs - first.atMs; + spoolSamplesDelta = last.spoolSamples - first.spoolSamples; + droppedChunksDelta = last.droppedChunks - first.droppedChunks; + if (deltaMs > 0) { + throughputSamplesPerSec = (spoolSamplesDelta / deltaMs) * 1000; + } + if (throughputSamplesPerSec && first.sampleRate > 0) { + throughputSecondsPerSec = throughputSamplesPerSec / first.sampleRate; + } + } + + const fetchStart = performance.now(); + const meetingWithSegments = await api.getMeeting({ + meeting_id: meeting.id, + include_segments: true, + }); + const fetchMs = performance.now() - fetchStart; + + const segmentCount = meetingWithSegments.segments?.length ?? 0; + + return { + connectionMode: window.__NOTEFLOW_CONNECTION__?.getConnectionState?.().mode ?? null, + meetingId: meeting.id, + segmentCount, + audioDiagnostics: { + supported: diagnosticsSupported, + samples: diagSamples.length > 0 ? diagSamples : undefined, + throughputSamplesPerSec, + throughputSecondsPerSec, + spoolSamplesDelta, + droppedChunksDelta, + }, + timings: { + injectMs: performance.now() - injectStart, + firstPartialMs: firstPartialAt ? firstPartialAt - injectStart : null, + firstFinalMs: firstFinalAt ? firstFinalAt - injectStart : null, + fetchMs, + totalMs: performance.now() - startTotal, + }, + }; + }, + { + host: ServerConfig.host, + port: ServerConfig.port, + wavPath: WAV_PATH, + timeoutMs: Timeouts.TRANSCRIPT_WAIT_MS, + } + ); + + const perf = result as PerfResult; + if (perf.error) { + throw new Error(perf.error); + } + + expect(perf.connectionMode).toBe('connected'); + expect(perf.segmentCount).toBeGreaterThan(0); + expect(perf.timings?.firstFinalMs ?? 0).toBeGreaterThan(0); + expect(perf.audioDiagnostics?.supported).toBe(true); + expect(perf.audioDiagnostics?.spoolSamplesDelta ?? 0).toBeGreaterThan(0); + }); +}); diff --git a/client/e2e-native/observability.spec.ts b/client/e2e-native/observability.spec.ts index 83c2c27..c206902 100644 --- a/client/e2e-native/observability.spec.ts +++ b/client/e2e-native/observability.spec.ts @@ -137,5 +137,31 @@ describe('Observability', () => { expect(Array.isArray(result.history)).toBe(true); } }); + + it('should measure metrics roundtrip latency', async () => { + const result = await browser.execute(async () => { + const api = window.__NOTEFLOW_API__; + if (!api?.getPerformanceMetrics) { + return { success: false, error: 'API unavailable' }; + } + + const samples: number[] = []; + for (let i = 0; i < 5; i++) { + const start = performance.now(); + await api.getPerformanceMetrics({ history_limit: 1 }); + samples.push(performance.now() - start); + } + + const total = samples.reduce((sum, value) => sum + value, 0); + const avg = samples.length > 0 ? total / samples.length : 0; + return { success: true, samples, avg }; + }); + + expect(result).toBeDefined(); + if (result.success) { + expect(result.avg).toBeGreaterThan(0); + expect(Array.isArray(result.samples)).toBe(true); + } + }); }); }); diff --git a/client/package.json b/client/package.json index 93deba3..98f6b75 100644 --- a/client/package.json +++ b/client/package.json @@ -19,6 +19,7 @@ "tauri:dev": "tauri dev", "tauri:dev:remote": "tauri dev --config src-tauri/tauri.conf.dev.json", "tauri:build": "tauri build", + "tauri:build:mac": "tauri build --config src-tauri/tauri.conf.mac.json", "test": "vitest run", "test:watch": "vitest", "test:rs": "cd src-tauri && cargo test", diff --git a/client/src-tauri/src/commands/diagnostics.rs b/client/src-tauri/src/commands/diagnostics.rs index 75a4b4b..17eb7aa 100644 --- a/client/src-tauri/src/commands/diagnostics.rs +++ b/client/src-tauri/src/commands/diagnostics.rs @@ -6,7 +6,10 @@ use serde::Serialize; use tauri::State; use crate::error::Result; +use crate::constants::collections as collection_constants; +use crate::state::AudioConfig; use crate::state::AppState; +use crate::commands::recording::get_dropped_chunk_count; /// Diagnostic result for connection chain testing. #[derive(Debug, Serialize)] @@ -30,6 +33,41 @@ pub struct ConnectionDiagnostics { pub steps: Vec, } +/// Audio configuration snapshot for diagnostics. +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct AudioConfigDiagnostics { + pub input_device_id: Option, + pub output_device_id: Option, + pub system_device_id: Option, + pub dual_capture_enabled: bool, + pub mic_gain: f32, + pub system_gain: f32, + pub sample_rate: u32, + pub channels: u16, +} + +/// Audio pipeline diagnostics snapshot. +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct AudioPipelineDiagnostics { + pub recording: bool, + pub recording_meeting_id: Option, + pub elapsed_seconds: u32, + pub current_db_level: f32, + pub current_level_normalized: f32, + pub playback_sample_rate: u32, + pub playback_duration: f64, + pub playback_position: f64, + pub session_audio_buffer_samples: usize, + pub session_audio_buffer_chunks: usize, + pub session_audio_spool_samples: u64, + pub session_audio_spool_chunks: usize, + pub buffer_max_samples: usize, + pub dropped_chunk_count: u32, + pub audio_config: AudioConfigDiagnostics, +} + /// Server information for diagnostics. #[derive(Debug, Serialize)] #[serde(rename_all = "camelCase")] @@ -173,3 +211,42 @@ pub async fn run_connection_diagnostics( steps, }) } + +/// Get a snapshot of audio pipeline state for diagnostics and profiling. +#[tauri::command(rename_all = "snake_case")] +pub async fn get_audio_pipeline_diagnostics( + state: State<'_, Arc>, +) -> Result { + let audio_config: AudioConfig = state.audio_config.read().clone(); + let buffer_samples = *state.session_audio_buffer_samples.read(); + let buffer_chunks = state.session_audio_buffer.read().len(); + let spool_samples = *state.session_audio_spool_samples.read(); + let spool_chunks = state.session_audio_spool.read().len(); + + Ok(AudioPipelineDiagnostics { + recording: state.is_recording(), + recording_meeting_id: state.recording_meeting_id(), + elapsed_seconds: *state.elapsed_seconds.read(), + current_db_level: *state.current_db_level.read(), + current_level_normalized: *state.current_level_normalized.read(), + playback_sample_rate: *state.playback_sample_rate.read(), + playback_duration: *state.playback_duration.read(), + playback_position: *state.playback_position.read(), + session_audio_buffer_samples: buffer_samples, + session_audio_buffer_chunks: buffer_chunks, + session_audio_spool_samples: spool_samples, + session_audio_spool_chunks: spool_chunks, + buffer_max_samples: collection_constants::MAX_SESSION_AUDIO_SAMPLES, + dropped_chunk_count: get_dropped_chunk_count(), + audio_config: AudioConfigDiagnostics { + input_device_id: audio_config.input_device_id, + output_device_id: audio_config.output_device_id, + system_device_id: audio_config.system_device_id, + dual_capture_enabled: audio_config.dual_capture_enabled, + mic_gain: audio_config.mic_gain, + system_gain: audio_config.system_gain, + sample_rate: audio_config.sample_rate, + channels: audio_config.channels, + }, + }) +} diff --git a/client/src-tauri/src/commands/recording/capture.rs b/client/src-tauri/src/commands/recording/capture.rs index a2f4feb..e91dc05 100644 --- a/client/src-tauri/src/commands/recording/capture.rs +++ b/client/src-tauri/src/commands/recording/capture.rs @@ -66,6 +66,10 @@ impl DroppedChunkTracker { (count, should_emit) } + fn total_dropped(&self) -> u32 { + self.total_dropped.load(AtomicOrdering::Relaxed) + } + /// Reset the drop counter (called when recording starts). pub fn reset(&self) { self.total_dropped.store(0, AtomicOrdering::Relaxed); @@ -81,6 +85,11 @@ pub fn reset_dropped_chunk_tracker() { DROPPED_CHUNK_TRACKER.reset(); } +/// Read the total number of dropped audio chunks. +pub fn get_dropped_chunk_count() -> u32 { + DROPPED_CHUNK_TRACKER.total_dropped() +} + /// Context for processing captured audio buffer. struct CaptureProcessContext<'a> { state: &'a AppState, diff --git a/client/src-tauri/src/commands/recording/mod.rs b/client/src-tauri/src/commands/recording/mod.rs index 5bea8ce..2dc7206 100644 --- a/client/src-tauri/src/commands/recording/mod.rs +++ b/client/src-tauri/src/commands/recording/mod.rs @@ -21,4 +21,5 @@ mod tests; pub use device::decode_input_device_id; pub use session::{send_audio_chunk, start_recording, stop_recording}; pub(crate) use session::{AudioProcessingInput, emit_error, process_audio_samples}; +pub(crate) use capture::get_dropped_chunk_count; pub use stream_state::{get_stream_state, reset_stream_state}; diff --git a/client/src-tauri/src/lib.rs b/client/src-tauri/src/lib.rs index 49aa2e5..c21f53c 100644 --- a/client/src-tauri/src/lib.rs +++ b/client/src-tauri/src/lib.rs @@ -191,8 +191,9 @@ macro_rules! app_invoke_handler { commands::get_huggingface_token_status, commands::delete_huggingface_token, commands::validate_huggingface_token, - // Diagnostics (1 command) + // Diagnostics (2 commands) commands::run_connection_diagnostics, + commands::get_audio_pipeline_diagnostics, // Shell (1 command) commands::open_url, // E2E Testing (3 commands) diff --git a/client/src-tauri/tauri.conf.json b/client/src-tauri/tauri.conf.json index 34d93cc..4a29f5a 100644 --- a/client/src-tauri/tauri.conf.json +++ b/client/src-tauri/tauri.conf.json @@ -4,7 +4,6 @@ "version": "0.1.0", "identifier": "com.noteflow.desktop", "build": { - "devUrl": "http://localhost:5173", "frontendDist": "../dist" }, "app": { diff --git a/client/src-tauri/tauri.conf.mac.json b/client/src-tauri/tauri.conf.mac.json new file mode 100644 index 0000000..c46640e --- /dev/null +++ b/client/src-tauri/tauri.conf.mac.json @@ -0,0 +1,44 @@ +{ + "$schema": "https://schema.tauri.app/config/2", + "productName": "NoteFlow", + "version": "0.1.0", + "identifier": "com.noteflow.desktop", + "build": { + "frontendDist": "../dist" + }, + "app": { + "windows": [ + { + "label": "main", + "title": "NoteFlow", + "width": 1024, + "height": 768, + "minWidth": 800, + "minHeight": 600, + "resizable": true, + "fullscreen": false, + "center": true + } + ], + "security": { + "csp": "default-src 'self'; script-src 'self'; style-src 'self' 'unsafe-inline' https://fonts.googleapis.com; font-src 'self' https://fonts.gstatic.com; connect-src 'self' https://fonts.googleapis.com https://fonts.gstatic.com", + "capabilities": ["default"] + } + }, + "bundle": { + "active": true, + "targets": ["app", "dmg"], + "icon": ["icons/icon.png", "icons/icon.ico"] + }, + "plugins": { + "shell": { + "open": true + }, + "fs": {}, + "deep-link": { + "desktop": { + "schemes": ["noteflow"] + } + } + } +} diff --git a/client/src/api/adapters/cached/observability.ts b/client/src/api/adapters/cached/observability.ts index 60e15f1..c14ec39 100644 --- a/client/src/api/adapters/cached/observability.ts +++ b/client/src/api/adapters/cached/observability.ts @@ -1,6 +1,7 @@ import { emptyResponses } from '../../core/helpers'; import type { NoteFlowAPI } from '../../interface'; import type { + AudioPipelineDiagnostics, ConnectionDiagnostics, GetPerformanceMetricsRequest, GetPerformanceMetricsResponse, @@ -11,7 +12,11 @@ import type { type CachedObservabilityAPI = Pick< NoteFlowAPI, - 'getUserIntegrations' | 'getRecentLogs' | 'getPerformanceMetrics' | 'runConnectionDiagnostics' + | 'getUserIntegrations' + | 'getRecentLogs' + | 'getPerformanceMetrics' + | 'runConnectionDiagnostics' + | 'getAudioPipelineDiagnostics' >; export const cachedObservabilityAPI: CachedObservabilityAPI = { @@ -59,4 +64,32 @@ export const cachedObservabilityAPI: CachedObservabilityAPI = { ], }; }, + async getAudioPipelineDiagnostics(): Promise { + return { + recording: false, + recordingMeetingId: null, + elapsedSeconds: 0, + currentDbLevel: 0, + currentLevelNormalized: 0, + playbackSampleRate: 0, + playbackDuration: 0, + playbackPosition: 0, + sessionAudioBufferSamples: 0, + sessionAudioBufferChunks: 0, + sessionAudioSpoolSamples: 0, + sessionAudioSpoolChunks: 0, + bufferMaxSamples: 0, + droppedChunkCount: 0, + audioConfig: { + inputDeviceId: null, + outputDeviceId: null, + systemDeviceId: null, + dualCaptureEnabled: false, + micGain: 0, + systemGain: 0, + sampleRate: 0, + channels: 0, + }, + }; + }, }; diff --git a/client/src/api/adapters/mock/index.ts b/client/src/api/adapters/mock/index.ts index db3ee3d..ad096b3 100644 --- a/client/src/api/adapters/mock/index.ts +++ b/client/src/api/adapters/mock/index.ts @@ -24,6 +24,7 @@ import type { CompleteAuthLoginResponse, CompleteCalendarAuthResponse, ConnectionDiagnostics, + AudioPipelineDiagnostics, CreateMeetingRequest, CreateProjectRequest, CreateTaskRequest, @@ -153,6 +154,7 @@ const projects: Map = new Map(); const projectMemberships: Map = new Map(); const activeProjectsByWorkspace: Map = new Map(); const TEMPLATE_MUTATION_DELAY_MS = 120; +const DEFAULT_SAMPLE_RATE_HZ = 16000; const oidcProviders: Map = new Map(); const summarizationTemplates: Map = new Map(); const summarizationTemplateVersions: Map = new Map(); @@ -1302,7 +1304,7 @@ export const mockAPI: NoteFlowAPI = { return { chunksSent: 20, durationSeconds: 2.0, - sampleRate: 16000, + sampleRate: DEFAULT_SAMPLE_RATE_HZ, }; }, async injectTestTone( @@ -1314,7 +1316,7 @@ export const mockAPI: NoteFlowAPI = { return { chunksSent: Math.max(1, Math.floor(durationSeconds * 10)), durationSeconds, - sampleRate: sampleRate ?? 16000, + sampleRate: sampleRate ?? DEFAULT_SAMPLE_RATE_HZ, }; }, async listInstalledApps(_options?: ListInstalledAppsRequest): Promise { @@ -1753,6 +1755,36 @@ export const mockAPI: NoteFlowAPI = { }; }, + async getAudioPipelineDiagnostics(): Promise { + await delay(50); + return { + recording: false, + recordingMeetingId: null, + elapsedSeconds: 0, + currentDbLevel: -60, + currentLevelNormalized: 0, + playbackSampleRate: DEFAULT_SAMPLE_RATE_HZ, + playbackDuration: 0, + playbackPosition: 0, + sessionAudioBufferSamples: 0, + sessionAudioBufferChunks: 0, + sessionAudioSpoolSamples: 0, + sessionAudioSpoolChunks: 0, + bufferMaxSamples: 0, + droppedChunkCount: 0, + audioConfig: { + inputDeviceId: null, + outputDeviceId: null, + systemDeviceId: null, + dualCaptureEnabled: false, + micGain: 1, + systemGain: 1, + sampleRate: DEFAULT_SAMPLE_RATE_HZ, + channels: 1, + }, + }; + }, + // --- OIDC Provider Management (Sprint 17) --- async registerOidcProvider(request: RegisterOidcProviderRequest): Promise { diff --git a/client/src/api/adapters/tauri/constants.ts b/client/src/api/adapters/tauri/constants.ts index 64412af..b260bfd 100644 --- a/client/src/api/adapters/tauri/constants.ts +++ b/client/src/api/adapters/tauri/constants.ts @@ -128,6 +128,7 @@ export const TauriCommands = { LIST_OIDC_PRESETS: 'list_oidc_presets', // Diagnostics RUN_CONNECTION_DIAGNOSTICS: 'run_connection_diagnostics', + GET_AUDIO_PIPELINE_DIAGNOSTICS: 'get_audio_pipeline_diagnostics', // Shell OPEN_URL: 'open_url', // ASR Configuration (Sprint 19) diff --git a/client/src/api/adapters/tauri/sections/observability.ts b/client/src/api/adapters/tauri/sections/observability.ts index 22696e1..bb0e73b 100644 --- a/client/src/api/adapters/tauri/sections/observability.ts +++ b/client/src/api/adapters/tauri/sections/observability.ts @@ -1,4 +1,5 @@ import type { + AudioPipelineDiagnostics, ConnectionDiagnostics, GetPerformanceMetricsRequest, GetPerformanceMetricsResponse, @@ -11,7 +12,7 @@ import type { TauriInvoke } from '../types'; export function createObservabilityApi(invoke: TauriInvoke): Pick< NoteFlowAPI, - 'getRecentLogs' | 'getPerformanceMetrics' | 'runConnectionDiagnostics' + 'getRecentLogs' | 'getPerformanceMetrics' | 'runConnectionDiagnostics' | 'getAudioPipelineDiagnostics' > { return { async getRecentLogs(request?: GetRecentLogsRequest): Promise { @@ -31,5 +32,8 @@ export function createObservabilityApi(invoke: TauriInvoke): Pick< async runConnectionDiagnostics(): Promise { return invoke(TauriCommands.RUN_CONNECTION_DIAGNOSTICS); }, + async getAudioPipelineDiagnostics(): Promise { + return invoke(TauriCommands.GET_AUDIO_PIPELINE_DIAGNOSTICS); + }, }; } diff --git a/client/src/api/interface.ts b/client/src/api/interface.ts index 8541726..f9bc8aa 100644 --- a/client/src/api/interface.ts +++ b/client/src/api/interface.ts @@ -32,6 +32,7 @@ import type { CancelDiarizationResult, CompleteCalendarAuthResponse, ConnectionDiagnostics, + AudioPipelineDiagnostics, StreamStateInfo, CreateMeetingRequest, CreateProjectRequest, @@ -898,6 +899,11 @@ export interface NoteFlowAPI { */ runConnectionDiagnostics(): Promise; + /** + * Get current audio pipeline diagnostics from the desktop client. + */ + getAudioPipelineDiagnostics(): Promise; + // --- OIDC Provider Management (Sprint 17) --- /** diff --git a/client/src/api/interfaces/domains.ts b/client/src/api/interfaces/domains.ts index f9cdc77..ff61a3c 100644 --- a/client/src/api/interfaces/domains.ts +++ b/client/src/api/interfaces/domains.ts @@ -110,6 +110,7 @@ import type { GetPerformanceMetricsRequest, GetPerformanceMetricsResponse, ConnectionDiagnostics, + AudioPipelineDiagnostics, RegisterOidcProviderRequest, OidcProviderApi, ListOidcProvidersRequest, @@ -356,6 +357,7 @@ export interface ObservabilityAPI { getRecentLogs(request?: GetRecentLogsRequest): Promise; getPerformanceMetrics(request: GetPerformanceMetricsRequest): Promise; runConnectionDiagnostics(): Promise; + getAudioPipelineDiagnostics(): Promise; } /** diff --git a/client/src/api/types/diagnostics.ts b/client/src/api/types/diagnostics.ts index 6cb44b6..29e2101 100644 --- a/client/src/api/types/diagnostics.ts +++ b/client/src/api/types/diagnostics.ts @@ -53,3 +53,34 @@ export interface ConnectionDiagnostics { /** Detailed step-by-step results. */ steps: DiagnosticStep[]; } + +/** Audio configuration snapshot for diagnostics. */ +export interface AudioConfigDiagnostics { + inputDeviceId: string | null; + outputDeviceId: string | null; + systemDeviceId: string | null; + dualCaptureEnabled: boolean; + micGain: number; + systemGain: number; + sampleRate: number; + channels: number; +} + +/** Audio pipeline diagnostics snapshot. */ +export interface AudioPipelineDiagnostics { + recording: boolean; + recordingMeetingId: string | null; + elapsedSeconds: number; + currentDbLevel: number; + currentLevelNormalized: number; + playbackSampleRate: number; + playbackDuration: number; + playbackPosition: number; + sessionAudioBufferSamples: number; + sessionAudioBufferChunks: number; + sessionAudioSpoolSamples: number; + sessionAudioSpoolChunks: number; + bufferMaxSamples: number; + droppedChunkCount: number; + audioConfig: AudioConfigDiagnostics; +} diff --git a/client/src/components/layout/app-sidebar.tsx b/client/src/components/layout/app-sidebar.tsx index 00bf296..b6e8273 100644 --- a/client/src/components/layout/app-sidebar.tsx +++ b/client/src/components/layout/app-sidebar.tsx @@ -94,6 +94,10 @@ export function AppSidebar({ onStartRecording, isRecording }: AppSidebarProps) { variant={isRecording ? 'recording' : 'glow'} size={collapsed ? 'icon-lg' : 'lg'} onClick={onStartRecording} + aria-label={isRecording ? 'Go to Recording' : 'Start Recording'} + title={isRecording ? 'Go to Recording' : 'Start Recording'} + id="start-recording" + data-testid="start-recording" className={cn('w-full', collapsed && 'px-0')} > @@ -106,18 +110,22 @@ export function AppSidebar({ onStartRecording, isRecording }: AppSidebarProps) { {navItems.map((item) => { const isActive = location.pathname === item.path; return ( - -
- - {!collapsed && {item.label}} -
+ + + {!collapsed && {item.label}} ); })} diff --git a/client/src/lib/observability/messages.ts b/client/src/lib/observability/messages.ts index c5babdb..e10dad0 100644 --- a/client/src/lib/observability/messages.ts +++ b/client/src/lib/observability/messages.ts @@ -303,10 +303,11 @@ const MESSAGE_TEMPLATES: Record = { 'server shutting down': () => 'Server is shutting down', }; -/** - * Get the lowercase keys for case-insensitive matching. - */ -const MESSAGE_KEYS = Object.keys(MESSAGE_TEMPLATES).map((k) => k.toLowerCase()); +const MESSAGE_ENTRIES = Object.entries(MESSAGE_TEMPLATES).map(([key, transformer]) => ({ + key, + keyLower: key.toLowerCase(), + transformer, +})); /** * Transform a technical log message into a friendly, human-readable version. @@ -319,12 +320,9 @@ export function toFriendlyMessage(message: string, details: Record { it('logs when override storage fails', () => { const setItemSpy = vi - .spyOn(Storage.prototype, 'setItem') + .spyOn(localStorage, 'setItem') .mockImplementation(() => { throw new Error('fail'); }); @@ -218,7 +218,7 @@ describe('preferences storage', () => { it('logs when saving preferences fails', () => { const setItemSpy = vi - .spyOn(Storage.prototype, 'setItem') + .spyOn(localStorage, 'setItem') .mockImplementation(() => { throw new Error('fail'); }); @@ -261,7 +261,7 @@ describe('preferences storage', () => { expect(localStorage.getItem(MODEL_CATALOG_CACHE_KEY)).toBeNull(); const removeSpy = vi - .spyOn(Storage.prototype, 'removeItem') + .spyOn(localStorage, 'removeItem') .mockImplementation(() => { throw new Error('fail'); }); diff --git a/client/src/lib/storage/utils.test.ts b/client/src/lib/storage/utils.test.ts index 07af7ab..d085cce 100644 --- a/client/src/lib/storage/utils.test.ts +++ b/client/src/lib/storage/utils.test.ts @@ -35,7 +35,7 @@ describe('storage utils', () => { }); it('logs and returns false on write errors', () => { - vi.spyOn(Storage.prototype, 'setItem').mockImplementation(() => { + vi.spyOn(localStorage, 'setItem').mockImplementation(() => { throw new Error('boom'); }); @@ -49,7 +49,7 @@ describe('storage utils', () => { localStorage.setItem('remove', '1'); expect(removeStorage('remove', 'ctx')).toBe(true); - vi.spyOn(Storage.prototype, 'removeItem').mockImplementation(() => { + vi.spyOn(localStorage, 'removeItem').mockImplementation(() => { throw new Error('boom'); }); @@ -79,7 +79,7 @@ describe('storage utils', () => { }); it('logs raw write errors', () => { - vi.spyOn(Storage.prototype, 'setItem').mockImplementation(() => { + vi.spyOn(localStorage, 'setItem').mockImplementation(() => { throw new Error('boom'); }); diff --git a/client/src/lib/storage/utils.ts b/client/src/lib/storage/utils.ts index f902ee7..79aa4dd 100644 --- a/client/src/lib/storage/utils.ts +++ b/client/src/lib/storage/utils.ts @@ -94,7 +94,13 @@ export function clearStorageByPrefix(prefix: string, context?: string): number { return 0; } try { - const keys = Object.keys(localStorage).filter((key) => key.startsWith(prefix)); + const keys: string[] = []; + for (let i = 0; i < localStorage.length; i++) { + const key = localStorage.key(i); + if (key?.startsWith(prefix)) { + keys.push(key); + } + } for (const key of keys) { localStorage.removeItem(key); } diff --git a/compose.yaml.deprecated b/compose.yaml.deprecated index 8f7d7fa..8423ac6 100644 --- a/compose.yaml.deprecated +++ b/compose.yaml.deprecated @@ -104,42 +104,43 @@ services: # CPU-only server (default, cross-platform) # Build: docker buildx bake server - # server: - # container_name: noteflow-server - # image: noteflow-server:latest - # build: - # context: . - # dockerfile: docker/server.Dockerfile - # target: server - # restart: unless-stopped - # ports: - # - "50051:50051" - # extra_hosts: - # - "host.docker.internal:host-gateway" - # env_file: - # - .env - # environment: - # NOTEFLOW_DATABASE_URL: postgresql+asyncpg://${POSTGRES_USER:-noteflow}:${POSTGRES_PASSWORD:-noteflow}@db:5432/${POSTGRES_DB:-noteflow} - # NOTEFLOW_REDIS_URL: redis://redis:6379/0 - # NOTEFLOW_QDRANT_URL: http://qdrant:6333 - # NOTEFLOW_LOG_FORMAT: console - # # Force CPU device when running CPU-only container - # NOTEFLOW_ASR_DEVICE: cpu - # volumes: - # - .:/workspace - # - server_venv:/workspace/.venv - # depends_on: - # db: - # condition: service_healthy - # redis: - # condition: service_healthy - # qdrant: - # condition: service_healthy - # networks: - # - noteflow-net - # profiles: - # - server - # - full + server: + container_name: noteflow-server + image: noteflow-server:latest + build: + context: . + dockerfile: docker/server.Dockerfile + target: server + restart: unless-stopped + tty: true + ports: + - "50051:50051" + extra_hosts: + - "host.docker.internal:host-gateway" + env_file: + - .env + environment: + NOTEFLOW_DATABASE_URL: postgresql+asyncpg://${POSTGRES_USER:-noteflow}:${POSTGRES_PASSWORD:-noteflow}@db:5432/${POSTGRES_DB:-noteflow} + NOTEFLOW_REDIS_URL: redis://redis:6379/0 + NOTEFLOW_QDRANT_URL: http://qdrant:6333 + NOTEFLOW_LOG_FORMAT: console + # Force CPU device when running CPU-only container + NOTEFLOW_ASR_DEVICE: cpu + volumes: + - .:/workspace + - server_venv:/workspace/.venv + depends_on: + db: + condition: service_healthy + redis: + condition: service_healthy + qdrant: + condition: service_healthy + networks: + - noteflow-net + profiles: + - server + - full # GPU-enabled server (NVIDIA CUDA) # Build: docker buildx bake server-gpu @@ -248,59 +249,59 @@ services: # GPU-enabled dev server (AMD ROCm with hot reload) # Build: docker buildx bake server-rocm-dev - server-rocm-dev: - container_name: noteflow-server-dev - image: git.baked.rocks/vasceannie/noteflow-server-rocm-dev:latest - build: - context: . - dockerfile: docker/Dockerfile.rocm - target: server-dev - args: - ROCM_VERSION: ${ROCM_VERSION:-6.4.1} - ROCM_PYTORCH_RELEASE: ${ROCM_PYTORCH_RELEASE:-2.6.0} - SPACY_MODEL_URL: ${SPACY_MODEL_URL:-https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.8.0/en_core_web_sm-3.8.0-py3-none-any.whl} - restart: unless-stopped - ports: - - "50051:50051" - extra_hosts: - - "host.docker.internal:host-gateway" - env_file: - - .env - environment: - NOTEFLOW_DATABASE_URL: postgresql+asyncpg://${POSTGRES_USER:-noteflow}:${POSTGRES_PASSWORD:-noteflow}@db:5432/${POSTGRES_DB:-noteflow} - NOTEFLOW_REDIS_URL: redis://redis:6379/0 - NOTEFLOW_QDRANT_URL: http://qdrant:6333 - NOTEFLOW_LOG_FORMAT: console - NOTEFLOW_ASR_DEVICE: rocm - NOTEFLOW_DIARIZATION_DEVICE: auto - NOTEFLOW_FEATURE_ROCM_ENABLED: "true" - volumes: - - .:/workspace - devices: - - /dev/kfd - - /dev/dri - group_add: - - ${VIDEO_GID:-44} - - ${RENDER_GID:-993} - security_opt: - - seccomp=unconfined - tty: true - ulimits: - memlock: -1 - stack: 67108864 - depends_on: - db: - condition: service_healthy - redis: - condition: service_healthy - qdrant: - condition: service_healthy - networks: - - noteflow-net - profiles: - - server-rocm-dev - - full-gpu - - gpu + # server-rocm-dev: + # container_name: noteflow-server-dev + # image: git.baked.rocks/vasceannie/noteflow-server-rocm-dev:latest + # build: + # context: . + # dockerfile: docker/Dockerfile.rocm + # target: server-dev + # args: + # ROCM_VERSION: ${ROCM_VERSION:-6.4.1} + # ROCM_PYTORCH_RELEASE: ${ROCM_PYTORCH_RELEASE:-2.6.0} + # SPACY_MODEL_URL: ${SPACY_MODEL_URL:-https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.8.0/en_core_web_sm-3.8.0-py3-none-any.whl} + # restart: unless-stopped + # ports: + # - "50051:50051" + # extra_hosts: + # - "host.docker.internal:host-gateway" + # env_file: + # - .env + # environment: + # NOTEFLOW_DATABASE_URL: postgresql+asyncpg://${POSTGRES_USER:-noteflow}:${POSTGRES_PASSWORD:-noteflow}@db:5432/${POSTGRES_DB:-noteflow} + # NOTEFLOW_REDIS_URL: redis://redis:6379/0 + # NOTEFLOW_QDRANT_URL: http://qdrant:6333 + # NOTEFLOW_LOG_FORMAT: console + # NOTEFLOW_ASR_DEVICE: rocm + # NOTEFLOW_DIARIZATION_DEVICE: auto + # NOTEFLOW_FEATURE_ROCM_ENABLED: "true" + # volumes: + # - .:/workspace + # devices: + # - /dev/kfd + # - /dev/dri + # group_add: + # - ${VIDEO_GID:-44} + # - ${RENDER_GID:-993} + # security_opt: + # - seccomp=unconfined + # tty: true + # ulimits: + # memlock: -1 + # stack: 67108864 + # depends_on: + # db: + # condition: service_healthy + # redis: + # condition: service_healthy + # qdrant: + # condition: service_healthy + # networks: + # - noteflow-net + # profiles: + # - server-rocm-dev + # - full-gpu + # - gpu # server-full: # container_name: noteflow-server-full diff --git a/scripts/ab_streaming_harness.py b/scripts/ab_streaming_harness.py index d256099..2558102 100644 --- a/scripts/ab_streaming_harness.py +++ b/scripts/ab_streaming_harness.py @@ -12,6 +12,7 @@ import wave from dataclasses import dataclass from pathlib import Path from typing import TYPE_CHECKING, Iterable, Protocol, cast +from uuid import uuid4 import numpy as np from numpy.typing import NDArray @@ -67,6 +68,7 @@ class AudioCase: class StreamingStats: first_partial_at: float | None = None first_final_at: float | None = None + first_segment_persisted_at: float | None = None partial_count: int = 0 final_count: int = 0 @@ -79,11 +81,40 @@ class RunResult: wall_time_s: float first_partial_latency_s: float | None first_final_latency_s: float | None + first_segment_persisted_latency_s: float | None transcript: str wer: float | None segments: int +class _RequestIdStub: + def __init__(self, stub: object, request_id: str) -> None: + self._stub = stub + self._metadata: tuple[tuple[str, str], ...] = (("x-request-id", request_id),) + + def __getattr__(self, name: str) -> object: + attr = getattr(self._stub, name) + if not callable(attr): + return attr + + def _wrapped(*args: object, **kwargs: object) -> object: + metadata = cast("tuple[tuple[str, str], ...] | None", kwargs.pop("metadata", None)) + if metadata is not None: + kwargs["metadata"] = metadata + self._metadata + else: + kwargs["metadata"] = self._metadata + return attr(*args, **kwargs) + + return _wrapped + + +def _attach_request_id(client: NoteFlowClient, request_id: str | None = None) -> str: + rid = request_id or str(uuid4()) + stub = client.require_connection() + setattr(client, "_stub", _RequestIdStub(stub, rid)) + return rid + + class StreamingConfigStub(Protocol): def GetStreamingConfiguration( self, @@ -167,6 +198,40 @@ def _chunk_audio( yield audio[start : start + chunk_size] +def _poll_first_segment_persisted( + server_address: str, + meeting_id: str, + interval_s: float, + stats: StreamingStats, + lock: threading.Lock, + stop_event: threading.Event, +) -> None: + if interval_s <= 0: + return + + poll_client = NoteFlowClient(server_address=server_address) + if not poll_client.connect(): + return + _attach_request_id(poll_client) + + try: + while not stop_event.is_set(): + try: + segments = poll_client.get_meeting_segments(meeting_id) + except Exception: + time.sleep(interval_s) + continue + if segments: + now = time.time() + with lock: + if stats.first_segment_persisted_at is None: + stats.first_segment_persisted_at = now + break + time.sleep(interval_s) + finally: + poll_client.disconnect() + + def _get_streaming_config(stub: StreamingConfigStub) -> dict[str, float]: response = stub.GetStreamingConfiguration(noteflow_pb2.GetStreamingConfigurationRequest()) config = response.configuration @@ -233,6 +298,7 @@ def _run_streaming_case( chunk_ms: int, realtime: bool, final_wait_seconds: float, + segment_poll_ms: int, ) -> RunResult: stub = client.require_connection() _apply_streaming_config(stub, config) @@ -244,6 +310,8 @@ def _run_streaming_case( stats = StreamingStats() lock = threading.Lock() + poll_stop = threading.Event() + poll_thread: threading.Thread | None = None def on_transcript(segment: TranscriptSegment) -> None: now = time.time() @@ -258,6 +326,21 @@ def _run_streaming_case( stats.first_partial_at = now client.on_transcript = on_transcript + if segment_poll_ms > 0: + poll_thread = threading.Thread( + target=_poll_first_segment_persisted, + args=( + client.server_address, + meeting.id, + segment_poll_ms / 1000.0, + stats, + lock, + poll_stop, + ), + daemon=True, + ) + poll_thread.start() + if not client.start_streaming(meeting.id): raise RuntimeError("Failed to start streaming") @@ -280,6 +363,9 @@ def _run_streaming_case( time.sleep(final_wait_seconds) client.stop_streaming() client.stop_meeting(meeting.id) + poll_stop.set() + if poll_thread is not None: + poll_thread.join(timeout=2.0) segments = client.get_meeting_segments(meeting.id) transcript = " ".join(seg.text.strip() for seg in segments if seg.text.strip()) @@ -292,6 +378,11 @@ def _run_streaming_case( first_final_latency = ( (stats.first_final_at - start_time) if stats.first_final_at else None ) + first_segment_latency = ( + (stats.first_segment_persisted_at - start_time) + if stats.first_segment_persisted_at + else None + ) wer = _word_error_rate(case.reference, transcript) if case.reference else None return RunResult( @@ -301,6 +392,7 @@ def _run_streaming_case( wall_time_s=end_time - start_time, first_partial_latency_s=first_partial_latency, first_final_latency_s=first_final_latency, + first_segment_persisted_latency_s=first_segment_latency, transcript=transcript, wer=wer, segments=len(segments), @@ -346,6 +438,10 @@ def _print_results(results: list[RunResult]) -> None: print(f" wall_time_s: {result.wall_time_s:.2f}") print(f" first_partial_latency: {_format_latency(result.first_partial_latency_s)}") print(f" first_final_latency: {_format_latency(result.first_final_latency_s)}") + print( + " first_segment_persisted_latency: " + f"{_format_latency(result.first_segment_persisted_latency_s)}" + ) print(f" segments: {result.segments}") if result.wer is not None: print(f" WER: {result.wer:.3f}") @@ -388,6 +484,12 @@ def main() -> None: parser.add_argument("--chunk-ms", type=int, default=200) parser.add_argument("--realtime", action="store_true") parser.add_argument("--final-wait", type=float, default=2.0) + parser.add_argument( + "--segment-poll-ms", + type=int, + default=0, + help="Poll for persisted segments to measure DB ingestion latency.", + ) args = parser.parse_args() configure_logging(LoggingConfig(level="INFO")) @@ -412,6 +514,7 @@ def main() -> None: if not client.connect(): raise RuntimeError(f"Unable to connect to server at {args.server}") + _attach_request_id(client) stub = cast(StreamingConfigStub, client.require_connection()) original_config = _get_streaming_config(stub) @@ -427,6 +530,7 @@ def main() -> None: args.chunk_ms, args.realtime, args.final_wait, + args.segment_poll_ms, ) ) results.append( @@ -438,6 +542,7 @@ def main() -> None: args.chunk_ms, args.realtime, args.final_wait, + args.segment_poll_ms, ) ) finally: diff --git a/scripts/profile_comprehensive.py b/scripts/profile_comprehensive.py index d8223d2..d632faa 100644 --- a/scripts/profile_comprehensive.py +++ b/scripts/profile_comprehensive.py @@ -24,11 +24,14 @@ import asyncio import cProfile import gc import io +import os import pstats import sys import time +import tempfile from contextlib import asynccontextmanager from dataclasses import dataclass, field +from pathlib import Path from typing import TYPE_CHECKING, cast from uuid import uuid4 @@ -48,6 +51,13 @@ CHUNKS_PER_SECOND = SAMPLE_RATE // CHUNK_SIZE BYTES_PER_KB = 1024 BYTES_PER_MB = 1024 * 1024 LINUX_RSS_KB_MULTIPLIER = 1024 # resource.ru_maxrss returns KB on Linux +DEFAULT_DB_SEGMENTS = 200 +DEFAULT_CONVERTER_SEGMENTS = 200 +DEFAULT_OBSERVABILITY_SAMPLES = 200 +DEFAULT_METRICS_SAMPLES = 60 +DEFAULT_ASR_SEGMENTS = 200 +DEFAULT_VOICE_PROFILE_SAMPLES = 200 +WORDS_PER_SEGMENT = 4 AudioChunk = NDArray[np.float32] @@ -316,6 +326,220 @@ def benchmark_proto_operations(num_meetings: int = 200) -> BenchmarkResult: ) +def benchmark_grpc_segment_converters(num_segments: int = DEFAULT_CONVERTER_SEGMENTS) -> BenchmarkResult: + """Benchmark gRPC segment converter performance.""" + from noteflow.domain.entities.segment import Segment, WordTiming + from noteflow.grpc.mixins.converters import segment_to_proto_update + + meeting_id = str(uuid4()) + segments = [ + Segment( + segment_id=i, + text="Segment benchmark text", + start_time=float(i), + end_time=float(i + 1), + words=[ + WordTiming(word="hello", start_time=0.0, end_time=0.25, probability=0.95), + WordTiming(word="world", start_time=0.25, end_time=0.5, probability=0.92), + WordTiming(word="from", start_time=0.5, end_time=0.75, probability=0.9), + WordTiming(word="noteflow", start_time=0.75, end_time=1.0, probability=0.93), + ], + ) + for i in range(num_segments) + ] + + start = time.perf_counter() + for segment in segments: + _ = segment_to_proto_update(meeting_id, segment) + elapsed = time.perf_counter() - start + + return BenchmarkResult( + name="gRPC Segment → Proto", + duration_ms=elapsed * 1000, + items_processed=num_segments, + per_item_ms=(elapsed * 1000) / num_segments, + extra={"words_per_segment": WORDS_PER_SEGMENT}, + ) + + +def benchmark_asr_segment_build( + num_segments: int = DEFAULT_ASR_SEGMENTS, +) -> BenchmarkResult: + """Benchmark ASR result to Segment conversion.""" + from uuid import UUID + + from noteflow.domain.value_objects import AudioSource, MeetingId + from noteflow.grpc.mixins.converters import SegmentBuildParams, create_segment_from_asr + from noteflow.infrastructure.asr.dto import AsrResult, WordTiming + + meeting_id = MeetingId(UUID("00000000-0000-0000-0000-000000000002")) + words = ( + WordTiming(word="hello", start=0.0, end=0.25, probability=0.95), + WordTiming(word="world", start=0.25, end=0.5, probability=0.92), + WordTiming(word="from", start=0.5, end=0.75, probability=0.9), + WordTiming(word="noteflow", start=0.75, end=1.0, probability=0.93), + ) + result_template = AsrResult( + text="Benchmark segment text", + start=0.0, + end=1.0, + words=words, + language="en", + language_probability=0.98, + avg_logprob=-0.2, + no_speech_prob=0.01, + ) + + start = time.perf_counter() + for i in range(num_segments): + params = SegmentBuildParams( + meeting_id=meeting_id, + segment_id=i, + segment_start_time=float(i), + audio_source=AudioSource.MIC, + ) + _ = create_segment_from_asr(params, result_template) + elapsed = time.perf_counter() - start + + return BenchmarkResult( + name="ASR Result → Segment", + duration_ms=elapsed * 1000, + items_processed=num_segments, + per_item_ms=(elapsed * 1000) / num_segments, + extra={"words_per_segment": WORDS_PER_SEGMENT}, + ) + + +def _generate_embedding_pairs( + samples: int, +) -> tuple[list[list[float]], list[list[float]]]: + from noteflow.application.services.voice_profile.service import EMBEDDING_DIM + + rng = np.random.default_rng(42) + base = rng.standard_normal((samples, EMBEDDING_DIM)).astype(np.float32) + noise = rng.standard_normal((samples, EMBEDDING_DIM)).astype(np.float32) * 0.01 + base_list = [row.tolist() for row in base] + noisy_list = [row.tolist() for row in (base + noise)] + return base_list, noisy_list + + +def benchmark_voice_profile_similarity( + samples: int = DEFAULT_VOICE_PROFILE_SAMPLES, +) -> BenchmarkResult: + """Benchmark cosine similarity for voice profile matching.""" + from noteflow.application.services.voice_profile.service import cosine_similarity + + existing, new = _generate_embedding_pairs(samples) + start = time.perf_counter() + for idx in range(samples): + cosine_similarity(existing[idx], new[idx]) + elapsed = time.perf_counter() - start + return BenchmarkResult( + name="Voice Profile Similarity", + duration_ms=elapsed * 1000, + items_processed=samples, + per_item_ms=(elapsed * 1000) / samples, + ) + + +def benchmark_voice_profile_merge( + samples: int = DEFAULT_VOICE_PROFILE_SAMPLES, +) -> BenchmarkResult: + """Benchmark merge_embeddings for voice profile updates.""" + from noteflow.application.services.voice_profile.service import merge_embeddings + + existing, new = _generate_embedding_pairs(samples) + existing_count = 3 + start = time.perf_counter() + for idx in range(samples): + merge_embeddings(existing[idx], new[idx], existing_count) + elapsed = time.perf_counter() - start + return BenchmarkResult( + name="Voice Profile Merge", + duration_ms=elapsed * 1000, + items_processed=samples, + per_item_ms=(elapsed * 1000) / samples, + ) + + +def benchmark_observability_converters( + num_entries: int = DEFAULT_OBSERVABILITY_SAMPLES, +) -> list[BenchmarkResult]: + """Benchmark log and metrics converter performance.""" + from datetime import UTC, datetime + + from noteflow.grpc.mixins.converters import log_entry_to_proto, metrics_to_proto + from noteflow.infrastructure.logging.log_buffer import LogEntry + from noteflow.infrastructure.metrics.collector import PerformanceMetrics + + metrics = PerformanceMetrics( + timestamp=time.time(), + cpu_percent=25.0, + memory_percent=60.0, + memory_mb=8000.0, + disk_percent=40.0, + network_bytes_sent=1024, + network_bytes_recv=2048, + process_memory_mb=512.0, + active_connections=8, + ) + log_entry = LogEntry( + timestamp=datetime.now(tz=UTC), + level="info", + source="benchmark", + message="Segment persisted", + details={"meeting_id": "benchmark"}, + trace_id="trace", + span_id="span", + event_type="segment.added", + operation_id="op", + entity_id="entity", + ) + + start = time.perf_counter() + for _ in range(num_entries): + _ = metrics_to_proto(metrics) + metrics_elapsed = time.perf_counter() - start + + start = time.perf_counter() + for _ in range(num_entries): + _ = log_entry_to_proto(log_entry) + logs_elapsed = time.perf_counter() - start + + return [ + BenchmarkResult( + name="gRPC Metrics → Proto", + duration_ms=metrics_elapsed * 1000, + items_processed=num_entries, + per_item_ms=(metrics_elapsed * 1000) / num_entries, + ), + BenchmarkResult( + name="gRPC Log → Proto", + duration_ms=logs_elapsed * 1000, + items_processed=num_entries, + per_item_ms=(logs_elapsed * 1000) / num_entries, + ), + ] + + +def benchmark_metrics_collection(samples: int = DEFAULT_METRICS_SAMPLES) -> BenchmarkResult: + """Benchmark MetricsCollector.collect_now overhead.""" + from noteflow.infrastructure.metrics.collector import MetricsCollector + + collector = MetricsCollector(history_size=samples) + start = time.perf_counter() + for _ in range(samples): + collector.collect_now() + elapsed = time.perf_counter() - start + + return BenchmarkResult( + name="Metrics Collect", + duration_ms=elapsed * 1000, + items_processed=samples, + per_item_ms=(elapsed * 1000) / samples, + ) + + async def benchmark_async_overhead(iterations: int = 1000) -> BenchmarkResult: """Benchmark async context manager overhead.""" @@ -370,6 +594,67 @@ async def benchmark_grpc_simulation(num_requests: int = 100) -> BenchmarkResult: ) +async def benchmark_db_roundtrip( + database_url: str, + segment_count: int = DEFAULT_DB_SEGMENTS, +) -> list[BenchmarkResult]: + """Benchmark database insert and retrieval for segments.""" + from noteflow.domain.entities import Meeting, Segment + from noteflow.infrastructure.persistence.database import create_engine_and_session_factory + from noteflow.infrastructure.persistence.unit_of_work import SqlAlchemyUnitOfWork + + meeting = Meeting.create(title="Benchmark Meeting") + segments = [ + Segment( + segment_id=i, + text="Benchmark segment text", + start_time=float(i), + end_time=float(i + 1), + ) + for i in range(segment_count) + ] + + engine, session_factory = create_engine_and_session_factory(database_url, pool_size=5) + temp_dir = tempfile.TemporaryDirectory() + meetings_dir = Path(temp_dir.name) + + try: + async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow: + start = time.perf_counter() + await uow.meetings.create(meeting) + await uow.segments.add_batch(meeting.id, segments) + await uow.commit() + insert_elapsed = time.perf_counter() - start + + async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow: + start = time.perf_counter() + _ = await uow.meetings.get(meeting.id) + _ = await uow.segments.get_by_meeting(meeting.id) + fetch_elapsed = time.perf_counter() - start + + async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow: + await uow.meetings.delete(meeting.id) + await uow.commit() + finally: + await engine.dispose() + temp_dir.cleanup() + + return [ + BenchmarkResult( + name="DB Insert + Batch", + duration_ms=insert_elapsed * 1000, + items_processed=segment_count, + per_item_ms=(insert_elapsed * 1000) / segment_count, + ), + BenchmarkResult( + name="DB Fetch Segments", + duration_ms=fetch_elapsed * 1000, + items_processed=segment_count, + per_item_ms=(fetch_elapsed * 1000) / segment_count, + ), + ] + + def benchmark_import_times() -> list[BenchmarkResult]: """Measure import times for key modules.""" results: list[BenchmarkResult] = [] @@ -462,6 +747,9 @@ async def main( enable_profile: bool = False, verbose: bool = False, enable_memory: bool = False, + database_url: str | None = None, + enable_db: bool = False, + db_segments: int = DEFAULT_DB_SEGMENTS, ) -> None: """Run all benchmarks.""" print("=" * 70) @@ -511,6 +799,38 @@ async def main( else: results.append(benchmark_proto_operations(200)) + # gRPC converters + print("Benchmarking gRPC converters (segments/logs/metrics)...") + results.append(benchmark_grpc_segment_converters(DEFAULT_CONVERTER_SEGMENTS)) + results.extend(benchmark_observability_converters(DEFAULT_OBSERVABILITY_SAMPLES)) + + # ASR segment build conversion + print("Benchmarking ASR segment conversion...") + if enable_memory: + mem_result, _ = run_with_memory_tracking(benchmark_asr_segment_build, DEFAULT_ASR_SEGMENTS) + results.append(mem_result) + else: + results.append(benchmark_asr_segment_build(DEFAULT_ASR_SEGMENTS)) + + # Voice profile operations + print("Benchmarking voice profile operations...") + if enable_memory: + mem_result, _ = run_with_memory_tracking( + benchmark_voice_profile_similarity, DEFAULT_VOICE_PROFILE_SAMPLES + ) + results.append(mem_result) + mem_result, _ = run_with_memory_tracking( + benchmark_voice_profile_merge, DEFAULT_VOICE_PROFILE_SAMPLES + ) + results.append(mem_result) + else: + results.append(benchmark_voice_profile_similarity(DEFAULT_VOICE_PROFILE_SAMPLES)) + results.append(benchmark_voice_profile_merge(DEFAULT_VOICE_PROFILE_SAMPLES)) + + # Metrics collection overhead + print("Benchmarking metrics collection (60 samples)...") + results.append(benchmark_metrics_collection(DEFAULT_METRICS_SAMPLES)) + # Async overhead print("Benchmarking async context overhead (1000 iterations)...") results.append(await benchmark_async_overhead(1000)) @@ -519,6 +839,15 @@ async def main( print("Benchmarking gRPC simulation (100 concurrent)...") results.append(await benchmark_grpc_simulation(100)) + # Database round-trip (optional) + if enable_db: + resolved_db_url = database_url or os.environ.get("NOTEFLOW_DATABASE_URL", "") + if resolved_db_url: + print(f"Benchmarking database round-trip ({db_segments} segments)...") + results.extend(await benchmark_db_roundtrip(resolved_db_url, db_segments)) + else: + print("Skipping DB benchmark (no database URL provided).") + # Summary print() print("=" * 70) @@ -580,10 +909,29 @@ if __name__ == "__main__": parser.add_argument( "--memory", action="store_true", help="Enable RSS and GC memory profiling" ) + parser.add_argument( + "--db", + action="store_true", + help="Enable database round-trip benchmarking (requires NOTEFLOW_DATABASE_URL)", + ) + parser.add_argument( + "--db-url", + default=os.environ.get("NOTEFLOW_DATABASE_URL", ""), + help="Database URL for benchmarking (defaults to NOTEFLOW_DATABASE_URL).", + ) + parser.add_argument( + "--db-segments", + type=int, + default=DEFAULT_DB_SEGMENTS, + help="Number of segments for DB benchmark.", + ) args = parser.parse_args() asyncio.run(main( enable_profile=args.profile, verbose=args.verbose, enable_memory=args.memory, + database_url=args.db_url, + enable_db=args.db, + db_segments=args.db_segments, )) diff --git a/src/noteflow/application/services/voice_profile/service.py b/src/noteflow/application/services/voice_profile/service.py index 4120fc1..3f3bb52 100644 --- a/src/noteflow/application/services/voice_profile/service.py +++ b/src/noteflow/application/services/voice_profile/service.py @@ -188,7 +188,8 @@ def merge_embeddings( existing_arr = np.array(existing, dtype=np.float32) new_arr = np.array(new, dtype=np.float32) merged = (1.0 - alpha) * existing_arr + alpha * new_arr - merged_normalized = merged / (np.linalg.norm(merged) + EMBEDDING_NORM_EPSILON) + norm = float(np.sqrt(np.dot(merged, merged))) + merged_normalized = merged / (norm + EMBEDDING_NORM_EPSILON) return merged_normalized.tolist() @@ -196,9 +197,9 @@ def cosine_similarity(a: list[float], b: list[float]) -> float: """Compute cosine similarity between two embeddings.""" a_arr = np.array(a, dtype=np.float32) b_arr = np.array(b, dtype=np.float32) - dot = np.dot(a_arr, b_arr) - norm_a = np.linalg.norm(a_arr) - norm_b = np.linalg.norm(b_arr) + dot = float(np.dot(a_arr, b_arr)) + norm_a = float(np.sqrt(np.dot(a_arr, a_arr))) + norm_b = float(np.sqrt(np.dot(b_arr, b_arr))) if norm_a < EMBEDDING_NORM_EPSILON or norm_b < EMBEDDING_NORM_EPSILON: return 0.0 return float(dot / (norm_a * norm_b)) diff --git a/src/noteflow/grpc/client.py b/src/noteflow/grpc/client.py index 4c4ff6b..178b561 100644 --- a/src/noteflow/grpc/client.py +++ b/src/noteflow/grpc/client.py @@ -4,7 +4,8 @@ from __future__ import annotations import queue import threading -from typing import TYPE_CHECKING, Final +from typing import TYPE_CHECKING, Final, Protocol, cast +from uuid import uuid4 import grpc @@ -17,6 +18,7 @@ from noteflow.grpc.client_mixins import ( MeetingClientMixin, StreamingClientMixin, ) +from noteflow.grpc.client_mixins.protocols import NoteFlowServiceStubProtocol from noteflow.grpc.config.config import STREAMING_CONFIG from noteflow.grpc.types import ( AnnotationInfo, @@ -28,13 +30,16 @@ from noteflow.grpc.types import ( TranscriptSegment, ) from noteflow.grpc.proto import noteflow_pb2, noteflow_pb2_grpc +from noteflow.grpc.interceptors.identity import METADATA_REQUEST_ID from noteflow.infrastructure.logging import get_logger if TYPE_CHECKING: import numpy as np from numpy.typing import NDArray - from noteflow.grpc.client_mixins.protocols import ClientHost, NoteFlowServiceStubProtocol + from noteflow.grpc.client_mixins.protocols import ( + ClientHost, + ) logger = get_logger(__name__) @@ -53,6 +58,56 @@ __all__ = [ DEFAULT_SERVER: Final[str] = "localhost:50051" CHUNK_TIMEOUT: Final[float] = 0.1 # Timeout for getting chunks from queue +MetadataType = tuple[tuple[str, str | bytes], ...] + + +def _attach_request_id(metadata: MetadataType | None) -> MetadataType: + if metadata is None: + return ((METADATA_REQUEST_ID, str(uuid4())),) + if any(key == METADATA_REQUEST_ID for key, _ in metadata): + return metadata + return (*metadata, (METADATA_REQUEST_ID, str(uuid4()))) + + +class _GrpcCallable(Protocol): + def __call__(self, *args: object, **kwargs: object) -> object: ... + def future(self, *args: object, **kwargs: object) -> object: ... + def with_call(self, *args: object, **kwargs: object) -> object: ... + + +class _RequestIdCallWrapper: + def __init__(self, call: _GrpcCallable) -> None: + self._call = call + + def __call__(self, *args: object, **kwargs: object) -> object: + metadata = kwargs.pop("metadata", None) + kwargs["metadata"] = _attach_request_id(metadata) + return self._call(*args, **kwargs) + + def future(self, *args: object, **kwargs: object) -> object: + metadata = kwargs.pop("metadata", None) + kwargs["metadata"] = _attach_request_id(metadata) + return self._call.future(*args, **kwargs) + + def with_call(self, *args: object, **kwargs: object) -> object: + metadata = kwargs.pop("metadata", None) + kwargs["metadata"] = _attach_request_id(metadata) + return self._call.with_call(*args, **kwargs) + + def __getattr__(self, name: str) -> object: + return getattr(self._call, name) + + +class _RequestIdStubProxy: + def __init__(self, stub: NoteFlowServiceStubProtocol) -> None: + self._stub = stub + + def __getattr__(self, name: str) -> object: + attr = getattr(self._stub, name) + if callable(attr): + return _RequestIdCallWrapper(attr) + return attr + class NoteFlowClient( MeetingClientMixin, @@ -150,7 +205,9 @@ class NoteFlowClient( # Wait for channel to be ready channel_ready_future(self._channel).result(timeout=timeout) - self._stub = noteflow_pb2_grpc.NoteFlowServiceStub(self._channel) + raw_stub = noteflow_pb2_grpc.NoteFlowServiceStub(self._channel) + # Proxy uses dynamic dispatch; cast keeps Protocol typing without duplicating methods. + self._stub = cast(NoteFlowServiceStubProtocol, _RequestIdStubProxy(raw_stub)) self._connected = True logger.info("Connected to server at %s", self._server_address) diff --git a/src/noteflow/grpc/client_mixins/protocols.py b/src/noteflow/grpc/client_mixins/protocols.py index b7af25f..5f92371 100644 --- a/src/noteflow/grpc/client_mixins/protocols.py +++ b/src/noteflow/grpc/client_mixins/protocols.py @@ -83,30 +83,107 @@ class ProtoServerInfoResponse(Protocol): gpu_vram_available_bytes: int def HasField(self, field_name: str) -> bool: ... +MetadataType = tuple[tuple[str, str | bytes], ...] + class NoteFlowServiceStubProtocol(Protocol): """Protocol for the gRPC service stub used by the client mixins.""" - def AddAnnotation(self, request: object) -> ProtoAnnotation: ... - def GetAnnotation(self, request: object) -> ProtoAnnotation: ... - def ListAnnotations(self, request: object) -> ProtoListAnnotationsResponse: ... - def UpdateAnnotation(self, request: object) -> ProtoAnnotation: ... - def DeleteAnnotation(self, request: object) -> ProtoDeleteAnnotationResponse: ... + def AddAnnotation( + self, + request: object, + *, + metadata: MetadataType | None = None, + ) -> ProtoAnnotation: ... + def GetAnnotation( + self, + request: object, + *, + metadata: MetadataType | None = None, + ) -> ProtoAnnotation: ... + def ListAnnotations( + self, + request: object, + *, + metadata: MetadataType | None = None, + ) -> ProtoListAnnotationsResponse: ... + def UpdateAnnotation( + self, + request: object, + *, + metadata: MetadataType | None = None, + ) -> ProtoAnnotation: ... + def DeleteAnnotation( + self, + request: object, + *, + metadata: MetadataType | None = None, + ) -> ProtoDeleteAnnotationResponse: ... - def CreateMeeting(self, request: object) -> ProtoMeeting: ... - def StopMeeting(self, request: object) -> ProtoMeeting: ... - def GetMeeting(self, request: object) -> ProtoMeeting: ... - def ListMeetings(self, request: object) -> ProtoListMeetingsResponse: ... + def CreateMeeting( + self, + request: object, + *, + metadata: MetadataType | None = None, + ) -> ProtoMeeting: ... + def StopMeeting( + self, + request: object, + *, + metadata: MetadataType | None = None, + ) -> ProtoMeeting: ... + def GetMeeting( + self, + request: object, + *, + metadata: MetadataType | None = None, + ) -> ProtoMeeting: ... + def ListMeetings( + self, + request: object, + *, + metadata: MetadataType | None = None, + ) -> ProtoListMeetingsResponse: ... - def ExportTranscript(self, request: object) -> ProtoExportTranscriptResponse: ... + def ExportTranscript( + self, + request: object, + *, + metadata: MetadataType | None = None, + ) -> ProtoExportTranscriptResponse: ... - def RefineSpeakerDiarization(self, request: object) -> ProtoDiarizationJobStatus: ... - def GetDiarizationJobStatus(self, request: object) -> ProtoDiarizationJobStatus: ... - def RenameSpeaker(self, request: object) -> ProtoRenameSpeakerResponse: ... + def RefineSpeakerDiarization( + self, + request: object, + *, + metadata: MetadataType | None = None, + ) -> ProtoDiarizationJobStatus: ... + def GetDiarizationJobStatus( + self, + request: object, + *, + metadata: MetadataType | None = None, + ) -> ProtoDiarizationJobStatus: ... + def RenameSpeaker( + self, + request: object, + *, + metadata: MetadataType | None = None, + ) -> ProtoRenameSpeakerResponse: ... - def StreamTranscription(self, request_iterator: Iterable[object]) -> Iterable[ProtoTranscriptUpdate]: ... + def StreamTranscription( + self, + request_iterator: Iterable[object], + *, + metadata: MetadataType | None = None, + ) -> Iterable[ProtoTranscriptUpdate]: ... - def GetServerInfo(self, request: object) -> ProtoServerInfoResponse: ... + def GetServerInfo( + self, + request: object, + *, + metadata: MetadataType | None = None, + ) -> ProtoServerInfoResponse: ... class ClientHost(Protocol): diff --git a/src/noteflow/grpc/mixins/streaming/_asr.py b/src/noteflow/grpc/mixins/streaming/_asr.py index 219fd35..ee9fb8c 100644 --- a/src/noteflow/grpc/mixins/streaming/_asr.py +++ b/src/noteflow/grpc/mixins/streaming/_asr.py @@ -220,7 +220,8 @@ async def _build_segments_from_results( Returns: List of (segment, update) tuples for yielding to client. """ - segments_to_add: list[tuple[Segment, noteflow_pb2.TranscriptUpdate]] = [] + segments: list[Segment] = [] + updates: list[noteflow_pb2.TranscriptUpdate] = [] for result in results: # Skip results with empty or whitespace-only text if not result.text or not result.text.strip(): @@ -242,9 +243,14 @@ async def _build_segments_from_results( ) segment = create_segment_from_asr(build_params, result) _assign_speaker_if_available(ctx.host, ctx.meeting_id_str, segment) - await ctx.repo.segments.add(ctx.meeting_db_id, segment) - segments_to_add.append((segment, segment_to_proto_update(ctx.meeting_id_str, segment))) - return segments_to_add + segments.append(segment) + updates.append(segment_to_proto_update(ctx.meeting_id_str, segment)) + + if not segments: + return [] + + await ctx.repo.segments.add_batch(ctx.meeting_db_id, segments) + return list(zip(segments, updates, strict=True)) def _assign_speaker_if_available(host: ServicerHost, meeting_id: str, segment: Segment) -> None: diff --git a/src/noteflow/infrastructure/asr/__init__.py b/src/noteflow/infrastructure/asr/__init__.py index 848a24d..ded1824 100644 --- a/src/noteflow/infrastructure/asr/__init__.py +++ b/src/noteflow/infrastructure/asr/__init__.py @@ -3,6 +3,10 @@ Provides speech-to-text transcription using faster-whisper. """ +from __future__ import annotations + +from typing import TYPE_CHECKING + from noteflow.infrastructure.asr.dto import ( AsrResult, PartialUpdate, @@ -10,7 +14,6 @@ from noteflow.infrastructure.asr.dto import ( VadEventType, WordTiming, ) -from noteflow.infrastructure.asr.engine import FasterWhisperEngine, VALID_MODEL_SIZES from noteflow.infrastructure.asr.protocols import AsrEngine from noteflow.infrastructure.asr.segmenter import ( AudioSegment, @@ -25,6 +28,9 @@ from noteflow.infrastructure.asr.streaming_vad import ( VadEngine, ) +if TYPE_CHECKING: + from noteflow.infrastructure.asr.engine import FasterWhisperEngine, VALID_MODEL_SIZES + __all__ = [ "AsrEngine", "AsrResult", @@ -43,3 +49,12 @@ __all__ = [ "VadEventType", "WordTiming", ] + + +def __getattr__(name: str) -> object: + """Lazy-load heavy ASR engine exports to avoid import cycles.""" + if name in {"FasterWhisperEngine", "VALID_MODEL_SIZES"}: + from noteflow.infrastructure.asr import engine as _engine + + return getattr(_engine, name) + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/src/noteflow/infrastructure/audio/levels.py b/src/noteflow/infrastructure/audio/levels.py index 6862bfd..7ddca6f 100644 --- a/src/noteflow/infrastructure/audio/levels.py +++ b/src/noteflow/infrastructure/audio/levels.py @@ -19,7 +19,11 @@ def compute_rms(frames: NDArray[np.float32]) -> float: Returns: RMS level as float (0.0 for empty array). """ - return 0.0 if len(frames) == 0 else float(np.sqrt(np.mean(np.square(frames)))) + if len(frames) == 0: + return 0.0 + # Use dot product to avoid allocating a squared array. + mean_square = float(np.dot(frames, frames)) / len(frames) + return math.sqrt(mean_square) class RmsLevelProvider: diff --git a/src/noteflow/infrastructure/metrics/collector.py b/src/noteflow/infrastructure/metrics/collector.py index 7c9c9fe..313302c 100644 --- a/src/noteflow/infrastructure/metrics/collector.py +++ b/src/noteflow/infrastructure/metrics/collector.py @@ -6,6 +6,7 @@ import asyncio import time from collections import deque from dataclasses import dataclass +from typing import Final import psutil @@ -49,6 +50,8 @@ class MetricsCollector: """ DEFAULT_HISTORY_SIZE = 60 # 1 minute of data at 1s intervals + ACTIVE_CONNECTIONS_REFRESH_SECONDS: Final[float] = 2.0 + DISK_USAGE_REFRESH_SECONDS: Final[float] = 10.0 def __init__(self, history_size: int = DEFAULT_HISTORY_SIZE) -> None: """Initialize the metrics collector. @@ -61,6 +64,10 @@ class MetricsCollector: self._last_net_io = psutil.net_io_counters() self._running = False self._task: asyncio.Task[None] | None = None + self._last_connections_check = 0.0 + self._cached_connections = 0 + self._last_disk_check = 0.0 + self._cached_disk_percent = 0.0 def collect_now(self) -> PerformanceMetrics: """Collect current system metrics. @@ -89,10 +96,18 @@ class MetricsCollector: def _collect_disk_percent(self) -> float: """Collect root filesystem disk usage percentage.""" + now = time.monotonic() + if self._last_disk_check and ( + now - self._last_disk_check + < self.DISK_USAGE_REFRESH_SECONDS + ): + return self._cached_disk_percent try: - return psutil.disk_usage("/").percent + self._cached_disk_percent = psutil.disk_usage("/").percent except OSError: - return 0.0 + self._cached_disk_percent = 0.0 + self._last_disk_check = now + return self._cached_disk_percent def _collect_network_deltas(self) -> tuple[int, int]: """Collect network I/O deltas since last call.""" @@ -111,10 +126,18 @@ class MetricsCollector: def _collect_connection_count(self) -> int: """Collect count of active network connections.""" + now = time.monotonic() + if self._last_connections_check and ( + now - self._last_connections_check + < self.ACTIVE_CONNECTIONS_REFRESH_SECONDS + ): + return self._cached_connections try: - return len(psutil.net_connections(kind="inet")) - except psutil.AccessDenied: - return 0 + self._cached_connections = len(psutil.net_connections(kind="inet")) + except (psutil.AccessDenied, OSError): + self._cached_connections = 0 + self._last_connections_check = now + return self._cached_connections def get_history(self, limit: int = 60) -> list[PerformanceMetrics]: """Get recent metrics history. diff --git a/tests/benchmarks/test_hot_paths.py b/tests/benchmarks/test_hot_paths.py index 523f2f2..8ad9c81 100644 --- a/tests/benchmarks/test_hot_paths.py +++ b/tests/benchmarks/test_hot_paths.py @@ -12,13 +12,28 @@ from __future__ import annotations import numpy as np import pytest +from typing import TYPE_CHECKING +from uuid import UUID from numpy.typing import NDArray from pytest_benchmark.fixture import BenchmarkFixture +from noteflow.application.services.voice_profile.service import ( + EMBEDDING_DIM, + cosine_similarity, + merge_embeddings, +) from noteflow.config.constants import DEFAULT_SAMPLE_RATE +from noteflow.domain.entities.segment import Segment, WordTiming +from noteflow.domain.value_objects import AudioSource, MeetingId, SpeakerRole from noteflow.infrastructure.asr.segmenter import AudioSegment, Segmenter, SegmenterConfig from noteflow.infrastructure.asr.streaming_vad import EnergyVad, StreamingVad from noteflow.infrastructure.audio.levels import RmsLevelProvider, compute_rms +from noteflow.infrastructure.logging.log_buffer import LogEntry +from noteflow.infrastructure.metrics.collector import PerformanceMetrics + +if TYPE_CHECKING: + from noteflow.grpc.mixins.converters import SegmentBuildParams + from noteflow.infrastructure.asr.dto import AsrResult def _run_benchmark(benchmark: BenchmarkFixture, func: object, *args: object) -> object: @@ -102,6 +117,15 @@ def benchmark_array_list( return cast(list[NDArray[np.float32]], _run_benchmark(benchmark, func, *args)) +def benchmark_float_list( + benchmark: BenchmarkFixture, func: object, *args: object +) -> list[float]: + """Run benchmark for functions returning list of floats.""" + from typing import cast + + return cast(list[float], _run_benchmark(benchmark, func, *args)) + + # Standard audio chunk size (100ms at 16kHz) CHUNK_SIZE = 1600 SAMPLE_RATE = DEFAULT_SAMPLE_RATE @@ -109,6 +133,12 @@ SAMPLE_RATE = DEFAULT_SAMPLE_RATE TYPICAL_PARTIAL_CHUNKS = 20 # dB floor for silence detection DB_FLOOR = -60 +MEETING_UUID = UUID("00000000-0000-0000-0000-000000000001") +MEETING_ID = MeetingId(MEETING_UUID) +ASR_SEGMENT_ID = 7 +SEGMENT_START_OFFSET = 1.25 +VOICE_EMBEDDING_NOISE = 0.01 +VOICE_EMBEDDING_EXISTING_COUNT = 3 @pytest.fixture @@ -153,6 +183,113 @@ def rms_provider() -> RmsLevelProvider: return RmsLevelProvider() +@pytest.fixture +def segment_with_words() -> Segment: + """Create a segment with word timings for converter benchmarks.""" + words = [ + WordTiming(word="hello", start_time=0.0, end_time=0.25, probability=0.95), + WordTiming(word="world", start_time=0.25, end_time=0.5, probability=0.92), + WordTiming(word="from", start_time=0.5, end_time=0.7, probability=0.9), + WordTiming(word="noteflow", start_time=0.7, end_time=1.0, probability=0.93), + ] + return Segment( + segment_id=42, + text="hello world from noteflow", + start_time=0.0, + end_time=1.0, + words=words, + ) + + +@pytest.fixture +def asr_result() -> AsrResult: + """Create an ASR result for segment build benchmarks.""" + from noteflow.infrastructure.asr.dto import AsrResult, WordTiming as AsrWordTiming + + words = ( + AsrWordTiming(word="hello", start=0.0, end=0.25, probability=0.95), + AsrWordTiming(word="world", start=0.25, end=0.5, probability=0.92), + AsrWordTiming(word="from", start=0.5, end=0.7, probability=0.9), + AsrWordTiming(word="noteflow", start=0.7, end=1.0, probability=0.93), + ) + return AsrResult( + text="hello world from noteflow", + start=0.0, + end=1.0, + words=words, + language="en", + language_probability=0.98, + avg_logprob=-0.2, + no_speech_prob=0.01, + ) + + +@pytest.fixture +def segment_build_params() -> SegmentBuildParams: + """Create segment build parameters for ASR conversion benchmarks.""" + from noteflow.grpc.mixins.converters import SegmentBuildParams + + return SegmentBuildParams( + meeting_id=MEETING_ID, + segment_id=ASR_SEGMENT_ID, + segment_start_time=SEGMENT_START_OFFSET, + audio_source=AudioSource.MIC, + ) + + +@pytest.fixture +def voice_embedding_pair() -> tuple[list[float], list[float]]: + """Create two similar embeddings for voice profile benchmarks.""" + rng = np.random.default_rng(42) + base = rng.standard_normal(EMBEDDING_DIM).astype(np.float32) + noise = rng.standard_normal(EMBEDDING_DIM).astype(np.float32) * VOICE_EMBEDDING_NOISE + return base.tolist(), (base + noise).tolist() + + +@pytest.fixture +def voice_embedding_merge_inputs( + voice_embedding_pair: tuple[list[float], list[float]], +) -> tuple[list[float], list[float], int]: + """Create inputs for merge_embeddings benchmark.""" + existing, new = voice_embedding_pair + return existing, new, VOICE_EMBEDDING_EXISTING_COUNT + + +@pytest.fixture +def performance_metrics() -> PerformanceMetrics: + """Create sample metrics for converter benchmarks.""" + return PerformanceMetrics( + timestamp=1_700_000_000.0, + cpu_percent=23.5, + memory_percent=61.2, + memory_mb=8192.0, + disk_percent=44.0, + network_bytes_sent=120_000, + network_bytes_recv=98_000, + process_memory_mb=512.0, + active_connections=12, + ) + + +@pytest.fixture +def log_entry() -> LogEntry: + """Create a sample log entry for converter benchmarks.""" + from datetime import UTC, datetime + + return LogEntry( + timestamp=datetime.now(tz=UTC), + level="info", + source="bench", + message="Segment persisted", + details={"meeting_id": "test"}, + trace_id="trace", + span_id="span", + event_type="segment.added", + operation_id="op", + entity_id="entity", + ) + + class TestComputeRmsBenchmark: """Benchmark tests for RMS computation (called 36,000x/hour).""" @@ -466,3 +603,111 @@ class TestPartialBufferComparisonBenchmark: result = benchmark_array_list(benchmark, new_pattern_cycles) assert len(result) == 10, "Should have 10 results" + + +class TestAsrSegmentBuildBenchmarks: + """Benchmark ASR-to-segment conversion path.""" + + def test_create_segment_from_asr( + self, + benchmark: BenchmarkFixture, + asr_result: "AsrResult", + segment_build_params: "SegmentBuildParams", + ) -> None: + """Benchmark create_segment_from_asr conversion.""" + from noteflow.grpc.mixins.converters import create_segment_from_asr + + result = typed_benchmark( + benchmark, + Segment, + create_segment_from_asr, + segment_build_params, + asr_result, + ) + assert result.segment_id == ASR_SEGMENT_ID, "Segment ID should match build params" + assert result.start_time == SEGMENT_START_OFFSET, "Start time should include offset" + assert result.audio_source == AudioSource.MIC, "Audio source should be preserved" + assert result.speaker_role == SpeakerRole.USER, "Speaker role should map from MIC" + + +class TestGrpcConverterBenchmarks: + """Benchmark gRPC converter hot paths.""" + + def test_segment_to_proto_update( + self, + benchmark: BenchmarkFixture, + segment_with_words: Segment, + ) -> None: + """Benchmark segment_to_proto_update conversion.""" + from noteflow.grpc.mixins.converters import segment_to_proto_update + from noteflow.grpc.proto import noteflow_pb2 + + result = typed_benchmark( + benchmark, + noteflow_pb2.TranscriptUpdate, + segment_to_proto_update, + "meeting_id", + segment_with_words, + ) + assert result.segment.segment_id == segment_with_words.segment_id, "segment_id should match" + + def test_metrics_to_proto( + self, + benchmark: BenchmarkFixture, + performance_metrics: PerformanceMetrics, + ) -> None: + """Benchmark metrics_to_proto conversion.""" + from noteflow.grpc.mixins.converters import metrics_to_proto + from noteflow.grpc.proto import noteflow_pb2 + + result = typed_benchmark( + benchmark, + noteflow_pb2.PerformanceMetricsPoint, + metrics_to_proto, + performance_metrics, + ) + assert result.cpu_percent >= 0, "CPU percent should be non-negative" + + def test_log_entry_to_proto( + self, + benchmark: BenchmarkFixture, + log_entry: LogEntry, + ) -> None: + """Benchmark log_entry_to_proto conversion.""" + from noteflow.grpc.mixins.converters import log_entry_to_proto + from noteflow.grpc.proto import noteflow_pb2 + + result = typed_benchmark( + benchmark, + noteflow_pb2.LogEntryProto, + log_entry_to_proto, + log_entry, + ) + assert result.message, "Log message should be populated" + + +class TestVoiceProfileBenchmarks: + """Benchmark voice profile similarity and merge operations.""" + + def test_cosine_similarity( + self, + benchmark: BenchmarkFixture, + voice_embedding_pair: tuple[list[float], list[float]], + ) -> None: + """Benchmark cosine similarity for voice profile embeddings.""" + existing, new = voice_embedding_pair + result = typed_benchmark(benchmark, float, cosine_similarity, existing, new) + assert 0.0 <= result <= 1.0, "Similarity should be normalized" + assert result > 0.8, "Similar embeddings should yield high similarity" + + def test_merge_embeddings( + self, + benchmark: BenchmarkFixture, + voice_embedding_merge_inputs: tuple[list[float], list[float], int], + ) -> None: + """Benchmark merge_embeddings for voice profile updates.""" + existing, new, count = voice_embedding_merge_inputs + result = benchmark_float_list(benchmark, merge_embeddings, existing, new, count) + assert len(result) == EMBEDDING_DIM, "Merged embedding should preserve dimension" + norm = float(np.linalg.norm(np.array(result, dtype=np.float32))) + assert 0.99 <= norm <= 1.01, "Merged embedding should remain normalized"