Files
noteflow/client/src-tauri/tests/grpc_integration.rs
Travis Vasceannie 2641a9fc03
Some checks failed
CI / test-python (push) Failing after 17m22s
CI / test-rust (push) Has been cancelled
CI / test-typescript (push) Has been cancelled
optimization
2026-01-25 01:40:14 +00:00

2194 lines
80 KiB
Rust

//! gRPC Integration Tests
//!
//! Tests the actual connection between Rust client and Python gRPC server.
//! Run with: make e2e-grpc
//! Or: NOTEFLOW_INTEGRATION=1 cargo test --test grpc_integration -- --ignored --nocapture
//! Requires: gRPC server running on localhost:50051
//! Optional: Set NOTEFLOW_WORKSPACE_ID to a valid workspace UUID for webhook tests.
use std::collections::{HashMap, HashSet};
use std::env;
use std::fs::File;
use std::sync::Mutex;
use serde_json::{json, Value};
static STREAMING_TEST_LOCK: Mutex<()> = Mutex::new(());
const TARGET_SAMPLE_RATE_HZ: u32 = 16000;
const CHUNK_SAMPLES: usize = 1600;
const CHUNK_BYTES: usize = CHUNK_SAMPLES * 4;
const SUMMARY_TIMEOUT_SECS: u64 = 90;
const MAX_EXTREME_STREAM_REPEATS: usize = 8;
/// Check if integration tests should run
fn should_run_integration_tests() -> bool {
env::var("NOTEFLOW_INTEGRATION")
.map(|v| v == "1")
.unwrap_or(false)
}
/// Get the gRPC server URL
fn get_server_url() -> String {
env::var("NOTEFLOW_GRPC_URL").unwrap_or_else(|_| "http://localhost:50051".to_string())
}
/// Get workspace id for webhook tests.
fn get_workspace_id() -> Option<String> {
env::var("NOTEFLOW_WORKSPACE_ID")
.ok()
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())
}
fn parse_env_usize(name: &str, default: usize, min: usize, max: usize) -> usize {
env::var(name)
.ok()
.and_then(|value| value.parse::<usize>().ok())
.map(|value| value.clamp(min, max))
.unwrap_or(default)
}
mod integration {
use super::*;
use noteflow_lib::grpc::GrpcClient;
use noteflow_lib::identity::IdentityManager;
use std::sync::Arc;
struct LoadedAudio {
path: std::path::PathBuf,
samples: Vec<f32>,
}
fn new_client(endpoint: impl Into<String>) -> GrpcClient {
let identity = Arc::new(IdentityManager::new());
GrpcClient::new(endpoint, identity)
}
struct CloudConfigBackup {
ai_config: Option<String>,
consent: Option<bool>,
}
async fn configure_cloud_summary_provider(
client: &GrpcClient,
model: &str,
base_url: &str,
api_key: &str,
) -> CloudConfigBackup {
println!(
"Configuring cloud summary provider (model={}, base_url={})",
model, base_url
);
let existing = client
.get_preferences(Some(vec!["ai_config".to_string()]))
.await
.expect("Failed to fetch ai_config preference");
let original_ai_config = existing.preferences.get("ai_config").cloned();
let consent_status = client
.get_cloud_consent_status()
.await
.expect("Failed to fetch cloud consent status");
let mut ai_config_value = original_ai_config
.as_deref()
.and_then(|raw| serde_json::from_str::<Value>(raw).ok())
.unwrap_or_else(|| json!({}));
if !ai_config_value.is_object() {
ai_config_value = json!({});
}
let summary_entry = ai_config_value
.as_object_mut()
.expect("ai_config should be an object")
.entry("summary".to_string())
.or_insert_with(|| json!({}));
if !summary_entry.is_object() {
*summary_entry = json!({});
}
let summary_obj = summary_entry
.as_object_mut()
.expect("summary config should be an object");
summary_obj.insert("provider".to_string(), json!("openai"));
summary_obj.insert("base_url".to_string(), json!(base_url));
summary_obj.insert("api_key".to_string(), json!(api_key));
summary_obj.insert("selected_model".to_string(), json!(model));
summary_obj.insert("model".to_string(), json!(model));
summary_obj.insert("test_status".to_string(), json!("success"));
let mut prefs_update = HashMap::new();
prefs_update.insert("ai_config".to_string(), ai_config_value.to_string());
client
.set_preferences(prefs_update, None, None, true)
.await
.expect("Failed to update cloud preferences");
if !consent_status.transcription_consent {
client
.grant_cloud_consent()
.await
.expect("Failed to grant cloud consent");
}
CloudConfigBackup {
ai_config: original_ai_config,
consent: Some(consent_status.transcription_consent),
}
}
async fn restore_cloud_summary_provider(client: &GrpcClient, backup: CloudConfigBackup) {
let mut prefs_restore = HashMap::new();
if let Some(previous) = backup.ai_config {
prefs_restore.insert("ai_config".to_string(), previous);
}
if !prefs_restore.is_empty() {
if let Err(error) = client
.set_preferences(prefs_restore, None, None, true)
.await
{
println!("⚠ Failed to restore ai_config preference: {}", error);
}
}
if let Some(previous_consent) = backup.consent {
if !previous_consent {
if let Err(error) = client.revoke_cloud_consent().await {
println!("⚠ Failed to restore cloud consent: {}", error);
}
}
}
}
fn repeat_samples(samples: &[f32], repeats: usize) -> Vec<f32> {
let repeat_count = repeats.clamp(1, MAX_EXTREME_STREAM_REPEATS);
let capacity = samples.len().saturating_mul(repeat_count);
let mut extended = Vec::with_capacity(capacity);
for _ in 0..repeat_count {
extended.extend_from_slice(samples);
}
extended
}
fn load_sample_audio(max_seconds: Option<f64>) -> Option<LoadedAudio> {
// 2. Load and decode the sample audio file using symphonia directly
println!("\n=== STEP 2: Load Sample Audio File ===");
let audio_path = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
.parent()
.unwrap()
.parent()
.unwrap()
.join("tests/fixtures/sample_discord.m4a");
if !audio_path.exists() {
println!("⚠ Sample audio file not found at {:?}", audio_path);
println!(" Skipping audio streaming portion of test");
return None;
}
// Use symphonia directly for decoding (more control over file handling)
use symphonia::core::audio::SampleBuffer;
use symphonia::core::codecs::DecoderOptions;
use symphonia::core::formats::FormatOptions;
use symphonia::core::io::MediaSourceStream;
use symphonia::core::meta::MetadataOptions;
use symphonia::core::probe::Hint;
let file = File::open(&audio_path).expect("Failed to open audio file");
let mss = MediaSourceStream::new(Box::new(file), Default::default());
let mut hint = Hint::new();
hint.with_extension("m4a");
let probed = symphonia::default::get_probe()
.format(
&hint,
mss,
&FormatOptions::default(),
&MetadataOptions::default(),
)
.expect("Failed to probe audio format");
let mut format = probed.format;
// Get the default audio track
let track = format
.tracks()
.iter()
.find(|t| t.codec_params.codec != symphonia::core::codecs::CODEC_TYPE_NULL)
.expect("No audio tracks found");
let track_id = track.id;
let source_rate = track.codec_params.sample_rate.unwrap_or(48000);
let source_channels = track.codec_params.channels.map(|c| c.count()).unwrap_or(2);
println!(
"✓ Loaded audio: sample_rate={}, channels={}",
source_rate, source_channels
);
// Create a decoder for the track
let mut audio_decoder = symphonia::default::get_codecs()
.make(&track.codec_params, &DecoderOptions::default())
.expect("Failed to create decoder");
// Decode all samples
let mut samples: Vec<f32> = Vec::new();
loop {
match format.next_packet() {
Ok(packet) => {
if packet.track_id() != track_id {
continue;
}
match audio_decoder.decode(&packet) {
Ok(decoded) => {
let spec = *decoded.spec();
let mut sample_buf =
SampleBuffer::<f32>::new(decoded.capacity() as u64, spec);
sample_buf.copy_interleaved_ref(decoded);
samples.extend(sample_buf.samples());
}
Err(e) => {
println!(" Decode error: {:?}", e);
continue;
}
}
}
Err(symphonia::core::errors::Error::IoError(ref e))
if e.kind() == std::io::ErrorKind::UnexpectedEof =>
{
break
}
Err(e) => {
println!(" Format error: {:?}", e);
break;
}
}
}
println!(
"✓ Decoded {} samples ({:.2}s at {}Hz)",
samples.len(),
samples.len() as f64 / source_rate as f64 / source_channels as f64,
source_rate
);
// Convert to mono if stereo
let mono_samples: Vec<f32> = if source_channels == 2 {
samples
.chunks(2)
.map(|chunk| (chunk[0] + chunk.get(1).copied().unwrap_or(0.0)) / 2.0)
.collect()
} else {
samples
};
// Simple resampling to 16kHz (linear interpolation)
let resampled: Vec<f32> = if source_rate != TARGET_SAMPLE_RATE_HZ {
let ratio = source_rate as f64 / TARGET_SAMPLE_RATE_HZ as f64;
let output_len = (mono_samples.len() as f64 / ratio) as usize;
(0..output_len)
.map(|i| {
let src_idx = i as f64 * ratio;
let idx0 = src_idx.floor() as usize;
let idx1 = (idx0 + 1).min(mono_samples.len() - 1);
let frac = (src_idx - idx0 as f64) as f32;
mono_samples[idx0] * (1.0 - frac) + mono_samples[idx1] * frac
})
.collect()
} else {
mono_samples
};
let mut trimmed = resampled;
if let Some(limit_seconds) = max_seconds {
let max_samples = (limit_seconds * TARGET_SAMPLE_RATE_HZ as f64).round() as usize;
if trimmed.len() > max_samples {
trimmed.truncate(max_samples);
println!(
"✓ Trimmed to {:.2}s of audio",
trimmed.len() as f64 / TARGET_SAMPLE_RATE_HZ as f64
);
}
}
println!(
"✓ Resampled to {} samples ({:.2}s at 16kHz)",
trimmed.len(),
trimmed.len() as f64 / TARGET_SAMPLE_RATE_HZ as f64
);
Some(LoadedAudio {
path: audio_path,
samples: trimmed,
})
}
fn samples_to_chunks(samples: &[f32]) -> Vec<Vec<u8>> {
let audio_bytes: Vec<u8> = samples.iter().flat_map(|s| s.to_le_bytes()).collect();
audio_bytes
.chunks(CHUNK_BYTES)
.map(|chunk| chunk.to_vec())
.collect()
}
#[test]
#[ignore = "integration test; requires running server"]
fn server_is_reachable() {
if !should_run_integration_tests() {
eprintln!("Skipping (set NOTEFLOW_INTEGRATION=1 to run).");
return;
}
let url = get_server_url();
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
use tonic::transport::Channel;
let result = Channel::from_shared(url.clone())
.unwrap()
.connect_timeout(std::time::Duration::from_secs(5))
.connect()
.await;
assert!(
result.is_ok(),
"Failed to connect to gRPC server at {}: {:?}",
url,
result.err()
);
println!("Successfully connected to gRPC server at {}", url);
});
}
#[test]
#[ignore = "integration test; requires running server"]
fn get_server_info_returns_valid_response() {
if !should_run_integration_tests() {
eprintln!("Skipping (set NOTEFLOW_INTEGRATION=1 to run).");
return;
}
let url = get_server_url();
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let client = new_client(&url);
let connect_result = client.connect(Some(url.clone())).await;
assert!(
connect_result.is_ok(),
"Failed to connect: {:?}",
connect_result.err()
);
let info = client.get_server_info().await;
assert!(info.is_ok(), "Failed to get server info: {:?}", info.err());
let info = info.unwrap();
println!("Server version: {}", info.version);
println!("ASR model: {}", info.asr_model);
println!("ASR ready: {}", info.asr_ready);
assert!(
!info.version.is_empty(),
"Server version should not be empty"
);
assert!(!info.asr_model.is_empty(), "ASR model should not be empty");
});
}
#[test]
#[ignore = "integration test; requires running server"]
fn list_meetings_returns_valid_response() {
if !should_run_integration_tests() {
eprintln!("Skipping (set NOTEFLOW_INTEGRATION=1 to run).");
return;
}
let url = get_server_url();
let identity = Arc::new(IdentityManager::new());
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let client = GrpcClient::new(&url, identity);
client
.connect(Some(url.clone()))
.await
.expect("Failed to connect");
// list_meetings(states, limit, offset, sort_order, project_id, project_ids, include_segments)
let result = client.list_meetings(vec![], 10, 0, 0, None, vec![], false).await;
assert!(
result.is_ok(),
"Failed to list meetings: {:?}",
result.err()
);
let response = result.unwrap();
println!(
"Found {} meetings (total: {})",
response.meetings.len(),
response.total_count
);
assert!(
response.total_count >= 0,
"Total count should be non-negative"
);
});
}
#[test]
#[ignore = "integration test; requires running server"]
fn create_and_delete_meeting_roundtrip() {
if !should_run_integration_tests() {
eprintln!("Skipping (set NOTEFLOW_INTEGRATION=1 to run).");
return;
}
let url = get_server_url();
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let client = new_client(&url);
client
.connect(Some(url.clone()))
.await
.expect("Failed to connect");
// Create a test meeting
let title = format!(
"Integration Test {}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis()
);
let create_result = client
.create_meeting(Some(title.clone()), HashMap::new(), None)
.await;
assert!(
create_result.is_ok(),
"Failed to create meeting: {:?}",
create_result.err()
);
let meeting = create_result.unwrap();
assert!(!meeting.id.is_empty(), "Meeting ID should not be empty");
assert_eq!(meeting.title, title);
println!("Created meeting: {} ({})", meeting.title, meeting.id);
// Delete the meeting
let delete_result = client.delete_meeting(&meeting.id).await;
assert!(
delete_result.is_ok(),
"Failed to delete meeting: {:?}",
delete_result.err()
);
println!("Deleted meeting: {}", meeting.id);
});
}
#[test]
#[ignore = "integration test; requires running server"]
fn webhook_crud_operations() {
if !should_run_integration_tests() {
eprintln!("Skipping (set NOTEFLOW_INTEGRATION=1 to run).");
return;
}
let workspace_id = match get_workspace_id() {
Some(id) => id,
None => {
eprintln!("Skipping webhook test (set NOTEFLOW_WORKSPACE_ID to run).");
return;
}
};
let url = get_server_url();
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
use noteflow_lib::grpc::types::webhooks::RegisterWebhookRequest;
let client = new_client(&url);
client
.connect(Some(url.clone()))
.await
.expect("Failed to connect");
// Create a test webhook
let name = format!(
"Integration Test Webhook {}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis()
);
let webhook_url = format!("https://example.com/webhook/{}", uuid::Uuid::new_v4());
let create_result = client
.register_webhook(RegisterWebhookRequest {
workspace_id,
url: webhook_url.clone(),
events: vec!["meeting.completed".to_string()],
name: Some(name.clone()),
secret: None,
timeout_ms: None,
max_retries: None,
})
.await;
assert!(
create_result.is_ok(),
"Failed to create webhook: {:?}",
create_result.err()
);
let webhook = create_result.unwrap();
assert!(!webhook.id.is_empty(), "Webhook ID should not be empty");
println!("Created webhook: {} ({})", webhook.name, webhook.id);
// List webhooks
let list_result = client.list_webhooks(false).await;
assert!(
list_result.is_ok(),
"Failed to list webhooks: {:?}",
list_result.err()
);
let list = list_result.unwrap();
assert!(
list.webhooks.iter().any(|w| w.id == webhook.id),
"Created webhook should be in list"
);
// Delete the webhook
let delete_result = client.delete_webhook(&webhook.id).await;
assert!(
delete_result.is_ok(),
"Failed to delete webhook: {:?}",
delete_result.err()
);
println!("Deleted webhook: {}", webhook.id);
});
}
#[test]
#[ignore = "integration test; requires running server"]
fn cloud_consent_operations() {
if !should_run_integration_tests() {
eprintln!("Skipping (set NOTEFLOW_INTEGRATION=1 to run).");
return;
}
let url = get_server_url();
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let client = new_client(&url);
client
.connect(Some(url.clone()))
.await
.expect("Failed to connect");
// Get initial consent status
let status_result = client.get_cloud_consent_status().await;
assert!(
status_result.is_ok(),
"Failed to get consent status: {:?}",
status_result.err()
);
println!("Initial consent status: {:?}", status_result.unwrap());
// Grant consent
let grant_result = client.grant_cloud_consent().await;
assert!(
grant_result.is_ok(),
"Failed to grant consent: {:?}",
grant_result.err()
);
let status_after_grant = client.get_cloud_consent_status().await.unwrap();
assert!(status_after_grant.transcription_consent, "Transcription consent should be granted");
println!("Consent after grant: {:?}", status_after_grant);
// Revoke consent
let revoke_result = client.revoke_cloud_consent().await;
assert!(
revoke_result.is_ok(),
"Failed to revoke consent: {:?}",
revoke_result.err()
);
let status_after_revoke = client.get_cloud_consent_status().await.unwrap();
assert!(!status_after_revoke.transcription_consent, "Transcription consent should be revoked");
println!("Consent after revoke: {:?}", status_after_revoke);
});
}
/// GAP-006: Test that connect(None) uses cached endpoint
/// This verifies the auto-connect behavior for recording bootstrapping
#[test]
#[ignore = "integration test; requires running server"]
fn connect_with_none_uses_cached_endpoint() {
if !should_run_integration_tests() {
eprintln!("Skipping (set NOTEFLOW_INTEGRATION=1 to run).");
return;
}
let url = get_server_url();
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
// Create client with URL (simulating app startup)
let client = new_client(&url);
// Verify not connected initially
assert!(
!client.is_connected(),
"Client should not be connected initially"
);
// Connect with None - should use cached endpoint from constructor
let connect_result = client.connect(None).await;
assert!(
connect_result.is_ok(),
"connect(None) should succeed using cached endpoint: {:?}",
connect_result.err()
);
// Verify now connected
assert!(
client.is_connected(),
"Client should be connected after connect(None)"
);
// Verify server URL is still the original
assert_eq!(
client.server_url(),
url,
"Server URL should remain unchanged"
);
println!(
"GAP-006: connect(None) successfully used cached endpoint: {}",
url
);
});
}
/// GAP-006: Test that operations fail gracefully when server is unreachable
#[test]
#[ignore = "integration test; requires running server"]
fn connect_fails_gracefully_with_invalid_server() {
if !should_run_integration_tests() {
eprintln!("Skipping (set NOTEFLOW_INTEGRATION=1 to run).");
return;
}
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
// Create client with invalid URL
let invalid_url = "http://invalid-host-that-does-not-exist:99999";
let client = new_client(invalid_url);
// Verify not connected
assert!(!client.is_connected(), "Client should not be connected");
// Connect should fail with error
let connect_result = client.connect(None).await;
assert!(
connect_result.is_err(),
"connect() should fail for unreachable server"
);
// Verify still not connected
assert!(
!client.is_connected(),
"Client should remain disconnected after failed connect"
);
println!("GAP-006: connect() correctly returns error for unreachable server");
});
}
#[test]
#[ignore = "integration test; requires running server"]
fn full_meeting_lifecycle() {
if !should_run_integration_tests() {
eprintln!("Skipping (set NOTEFLOW_INTEGRATION=1 to run).");
return;
}
let url = get_server_url();
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let client = new_client(&url);
client
.connect(Some(url.clone()))
.await
.expect("Failed to connect");
// 1. CREATE meeting
let title = format!(
"E2E Lifecycle Test {}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis()
);
println!("\n=== STEP 1: Create Meeting ===");
let meeting = client
.create_meeting(Some(title.clone()), HashMap::new(), None)
.await
.expect("Failed to create meeting");
println!("✓ Created meeting: {} (ID: {})", meeting.title, meeting.id);
assert!(!meeting.id.is_empty());
assert_eq!(meeting.title, title);
// 2. GET meeting
println!("\n=== STEP 2: Get Meeting ===");
let retrieved = client
.get_meeting(&meeting.id, true, true)
.await
.expect("Failed to get meeting");
println!(
"✓ Retrieved meeting: {} (state: {:?})",
retrieved.title, retrieved.state
);
assert_eq!(retrieved.id, meeting.id);
// 3. ADD annotation
println!("\n=== STEP 3: Add Annotation ===");
let annotation = client
.add_annotation(
&meeting.id,
1, // ActionItem
"Test action item from E2E test",
0.0,
10.0,
vec![],
)
.await
.expect("Failed to add annotation");
println!(
"✓ Added annotation: {} (ID: {})",
annotation.text, annotation.id
);
assert!(!annotation.id.is_empty());
// 4. LIST annotations
println!("\n=== STEP 4: List Annotations ===");
let annotations = client
.list_annotations(&meeting.id, 0.0, f64::MAX)
.await
.expect("Failed to list annotations");
println!("✓ Listed {} annotations", annotations.len());
assert!(annotations.iter().any(|a| a.id == annotation.id));
// 5. UPDATE annotation
println!("\n=== STEP 5: Update Annotation ===");
let updated = client
.update_annotation(
&annotation.id,
None,
Some("Updated action item text".to_string()),
None,
None,
None,
)
.await
.expect("Failed to update annotation");
println!("✓ Updated annotation: {}", updated.text);
assert_eq!(updated.text, "Updated action item text");
// 6. EXPORT transcript (markdown)
println!("\n=== STEP 6: Export Transcript (Markdown) ===");
let export_md = client
.export_transcript(&meeting.id, 1)
.await
.expect("Failed to export markdown");
println!(
"✓ Exported markdown ({} bytes, extension: {})",
export_md.content.len(),
export_md.file_extension
);
assert!(!export_md.file_extension.is_empty());
// 7. EXPORT transcript (HTML)
println!("\n=== STEP 7: Export Transcript (HTML) ===");
let export_html = client
.export_transcript(&meeting.id, 2)
.await
.expect("Failed to export HTML");
println!(
"✓ Exported HTML ({} bytes, extension: {})",
export_html.content.len(),
export_html.file_extension
);
// 8. DELETE annotation
println!("\n=== STEP 8: Delete Annotation ===");
let deleted = client
.delete_annotation(&annotation.id)
.await
.expect("Failed to delete annotation");
println!("✓ Deleted annotation: {}", deleted);
assert!(deleted);
// 9. STOP meeting (skip if never started recording)
println!("\n=== STEP 9: Stop Meeting ===");
match client.stop_meeting(&meeting.id).await {
Ok(stopped) => println!("✓ Stopped meeting (state: {:?})", stopped.state),
Err(e) => {
// Meeting was never started, so it can't be stopped - this is OK
println!("✓ Meeting not in recording state (skipped stop): {}", e);
}
}
// 10. DELETE meeting
println!("\n=== STEP 10: Delete Meeting ===");
let deleted = client
.delete_meeting(&meeting.id)
.await
.expect("Failed to delete meeting");
println!("✓ Deleted meeting: {}", deleted);
assert!(deleted);
println!("\n=== ALL LIFECYCLE STEPS PASSED ===\n");
});
}
#[test]
#[ignore = "integration test; requires running server"]
fn preferences_roundtrip() {
if !should_run_integration_tests() {
eprintln!("Skipping (set NOTEFLOW_INTEGRATION=1 to run).");
return;
}
let url = get_server_url();
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let client = new_client(&url);
client
.connect(Some(url.clone()))
.await
.expect("Failed to connect");
// Get current preferences
println!("\n=== Get Preferences ===");
let prefs = client
.get_preferences(None)
.await
.expect("Failed to get preferences");
println!("✓ Retrieved preferences");
println!(
" - keys: {:?}",
prefs.preferences.keys().collect::<Vec<_>>()
);
println!(" - etag: {:?}", prefs.etag);
// Set a test preference (value must be valid JSON)
println!("\n=== Set Preferences ===");
let mut test_prefs = HashMap::new();
// Preference values must be JSON - wrap string in quotes
test_prefs.insert("test_e2e_key".to_string(), "\"test_value\"".to_string());
let result = client
.set_preferences(test_prefs, None, None, true)
.await
.expect("Failed to set preferences");
println!(
"✓ Set preferences: success={}, conflict={}",
result.success, result.conflict
);
// Verify change
let updated = client
.get_preferences(Some(vec!["test_e2e_key".to_string()]))
.await
.expect("Failed to get updated preferences");
println!("✓ Verified preferences update");
println!(
" - test_e2e_key: {:?}",
updated.preferences.get("test_e2e_key")
);
});
}
#[test]
#[ignore = "integration test; requires running server"]
fn diarization_operations() {
if !should_run_integration_tests() {
eprintln!("Skipping (set NOTEFLOW_INTEGRATION=1 to run).");
return;
}
let url = get_server_url();
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let client = new_client(&url);
let info = client
.connect(Some(url.clone()))
.await
.expect("Failed to connect");
println!("\n=== Diarization Status ===");
println!(" - diarization_enabled: {}", info.diarization_enabled);
println!(" - diarization_ready: {}", info.diarization_ready);
// Get active diarization jobs
println!("\n=== Active Diarization Jobs ===");
let jobs = client
.get_active_diarization_jobs()
.await
.expect("Failed to get diarization jobs");
println!("✓ Retrieved {} active diarization jobs", jobs.len());
// If diarization is ready, we could test refine_speakers
// but that requires a meeting with audio, which is complex to set up
if info.diarization_ready {
println!("✓ Diarization engine is ready for processing");
} else {
println!("⚠ Diarization engine is not ready (this is OK for basic tests)");
}
});
}
#[test]
#[ignore = "integration test; requires running server"]
fn diarization_refinement_smoke() {
if !should_run_integration_tests() {
eprintln!("Skipping (set NOTEFLOW_INTEGRATION=1 to run).");
return;
}
let _lock = STREAMING_TEST_LOCK
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
let url = get_server_url();
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
use async_stream::stream;
use noteflow_lib::grpc::noteflow as pb;
use noteflow_lib::grpc::types::enums::JobStatus;
use tokio_stream::StreamExt;
let client = new_client(&url);
let info = client
.connect(Some(url.clone()))
.await
.expect("Failed to connect");
assert!(
info.diarization_enabled,
"Diarization must be enabled for the refinement smoke test"
);
let title = format!(
"Diarization Smoke Test {}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis()
);
let meeting = client
.create_meeting(Some(title.clone()), HashMap::new(), None)
.await
.expect("Failed to create meeting");
let audio_fixture = match load_sample_audio(Some(10.0)) {
Some(loaded) => loaded,
None => {
let _ = client.delete_meeting(&meeting.id).await;
panic!("Sample audio fixture missing for diarization smoke test");
}
};
let chunks = samples_to_chunks(&audio_fixture.samples);
assert!(!chunks.is_empty(), "Expected audio chunks for diarization test");
let meeting_id = meeting.id.clone();
let outbound = stream! {
for (i, chunk_data) in chunks.into_iter().enumerate() {
let timestamp = (i * CHUNK_SAMPLES) as f64 / TARGET_SAMPLE_RATE_HZ as f64;
yield pb::AudioChunk {
meeting_id: meeting_id.clone(),
audio_data: chunk_data,
timestamp,
sample_rate: TARGET_SAMPLE_RATE_HZ as i32,
channels: 1,
chunk_sequence: (i + 1) as i64,
audio_source: pb::AudioSource::Unspecified as i32,
};
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}
};
let mut grpc_client = client.get_client().expect("Failed to get gRPC client");
let response = grpc_client
.stream_transcription(tonic::Request::new(outbound))
.await
.expect("Failed to start stream");
let mut inbound = response.into_inner();
let _ = tokio::time::timeout(std::time::Duration::from_secs(20), async {
while let Some(result) = inbound.next().await {
if result.is_err() {
break;
}
}
})
.await;
let stopped = client
.stop_meeting(&meeting.id)
.await
.expect("Failed to stop meeting");
assert!(
matches!(stopped.state, noteflow_lib::grpc::types::enums::MeetingState::Stopped),
"Meeting should be stopped before refinement"
);
let mut final_meeting = None;
let poll_start = std::time::Instant::now();
let poll_timeout = std::time::Duration::from_secs(20);
while poll_start.elapsed() < poll_timeout {
let candidate = client
.get_meeting(&meeting.id, true, true)
.await
.expect("Failed to get meeting");
if !candidate.segments.is_empty() {
final_meeting = Some(candidate);
break;
}
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
let final_meeting = final_meeting.unwrap_or_else(|| {
panic!("Expected transcript segments before diarization refinement")
});
assert!(
!final_meeting.segments.is_empty(),
"Expected transcript segments before refinement"
);
println!("\n=== Diarization Refinement Smoke Test ===");
let mut status = client
.refine_speaker_diarization(&meeting.id, 0)
.await
.expect("Failed to start diarization refinement");
let job_start = std::time::Instant::now();
let job_timeout = std::time::Duration::from_secs(180);
while matches!(status.status, JobStatus::Queued | JobStatus::Running)
&& job_start.elapsed() < job_timeout
{
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
status = client
.get_diarization_job_status(&status.job_id)
.await
.expect("Failed to poll diarization job");
}
assert!(
matches!(status.status, JobStatus::Completed),
"Diarization did not complete: {}",
status.error_message
);
assert!(
!status.speaker_ids.is_empty(),
"Diarization returned no speaker IDs"
);
let deleted = client
.delete_meeting(&meeting.id)
.await
.expect("Failed to delete meeting");
assert!(deleted, "Meeting deletion failed");
});
}
#[test]
#[ignore = "integration test; requires running server"]
fn user_integrations_operations() {
if !should_run_integration_tests() {
eprintln!("Skipping (set NOTEFLOW_INTEGRATION=1 to run).");
return;
}
let url = get_server_url();
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let client = new_client(&url);
client
.connect(Some(url.clone()))
.await
.expect("Failed to connect");
println!("\n=== User Integrations ===");
let result = client.get_user_integrations().await;
match result {
Ok(integrations) => {
println!(
"✓ Retrieved {} integrations",
integrations.integrations.len()
);
for integration in &integrations.integrations {
println!(
" - {} ({}): status={}",
integration.name, integration.id, integration.status
);
}
}
Err(e) => {
println!("⚠ Could not get integrations: {}", e);
println!(" (This may be OK if no integrations are configured)");
}
}
});
}
#[test]
#[ignore = "integration test; requires running server"]
fn calendar_operations() {
if !should_run_integration_tests() {
eprintln!("Skipping (set NOTEFLOW_INTEGRATION=1 to run).");
return;
}
let url = get_server_url();
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let client = new_client(&url);
client
.connect(Some(url.clone()))
.await
.expect("Failed to connect");
println!("\n=== Calendar Providers ===");
let result = client.get_calendar_providers().await;
match result {
Ok(providers) => {
println!(
"✓ Retrieved {} calendar providers",
providers.providers.len()
);
for provider in &providers.providers {
println!(
" - {}: authenticated={}",
provider.name, provider.is_authenticated
);
}
}
Err(e) => {
println!("⚠ Calendar providers not available: {}", e);
println!(" (This is OK if calendar feature is disabled)");
}
}
});
}
#[test]
#[ignore = "integration test; requires running server"]
fn observability_operations() {
if !should_run_integration_tests() {
eprintln!("Skipping (set NOTEFLOW_INTEGRATION=1 to run).");
return;
}
let url = get_server_url();
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let client = new_client(&url);
client
.connect(Some(url.clone()))
.await
.expect("Failed to connect");
println!("\n=== Performance Metrics ===");
let metrics = client
.get_performance_metrics(Some(10))
.await
.expect("Failed to get performance metrics");
println!("✓ Retrieved performance metrics");
println!(" - cpu_percent: {:.1}%", metrics.current.cpu_percent);
println!(" - memory_percent: {:.1}%", metrics.current.memory_percent);
println!(
" - active_connections: {}",
metrics.current.active_connections
);
println!(" - history_points: {}", metrics.history.len());
println!("\n=== Recent Logs ===");
let logs = client
.get_recent_logs(Some(10), None, None)
.await
.expect("Failed to get recent logs");
println!("✓ Retrieved {} recent log entries", logs.logs.len());
});
}
#[test]
#[ignore = "integration test; requires running server"]
fn project_operations() {
if !should_run_integration_tests() {
eprintln!("Skipping (set NOTEFLOW_INTEGRATION=1 to run).");
return;
}
let url = get_server_url();
let rt = tokio::runtime::Runtime::new().unwrap();
// Default workspace ID (matches identity interceptor)
let workspace_id = "00000000-0000-0000-0000-000000000001";
rt.block_on(async {
let client = new_client(&url);
client
.connect(Some(url.clone()))
.await
.expect("Failed to connect");
println!("\n=== List Projects ===");
let projects = client
.list_projects(workspace_id, false, 100, 0)
.await
.expect("Failed to list projects");
println!("✓ Retrieved {} projects", projects.projects.len());
println!("\n=== Get Active Project ===");
let result = client.get_active_project(workspace_id).await;
match result {
Ok(active) => {
println!(
"✓ Active project: {} ({})",
active.project.name, active.project.id
);
}
Err(e) => {
// InvalidInput error means no active project is set - this is OK
println!("✓ No active project set ({})", e);
}
}
});
}
/// E2E test: Stream real audio from sample file and verify transcription
/// This test requires the ASR engine to be ready on the server
#[test]
#[ignore = "integration test; requires running server"]
fn real_audio_streaming_e2e() {
if !should_run_integration_tests() {
eprintln!("Skipping (set NOTEFLOW_INTEGRATION=1 to run).");
return;
}
let _lock = STREAMING_TEST_LOCK
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
let url = get_server_url();
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
use async_stream::stream;
use noteflow_lib::grpc::noteflow as pb;
use noteflow_lib::grpc::types::enums::JobStatus;
use tokio_stream::StreamExt;
let client = new_client(&url);
let info = client
.connect(Some(url.clone()))
.await
.expect("Failed to connect");
let cloud_model = env::var("NOTEFLOW_CLOUD_LLM_MODEL").ok();
let cloud_base_url = env::var("NOTEFLOW_CLOUD_LLM_BASE_URL").ok();
let cloud_api_key = env::var("NOTEFLOW_CLOUD_LLM_API_KEY").ok();
let mut original_ai_config: Option<String> = None;
let mut original_cloud_consent: Option<bool> = None;
println!("\n=== Real Audio Streaming E2E Test ===\n");
println!(
"Server info: version={}, asr_model={}, asr_ready={}",
info.version, info.asr_model, info.asr_ready
);
// Check if ASR is ready
if !info.asr_ready {
println!("⚠ ASR engine not ready, skipping audio streaming test");
println!(" (This test requires the Whisper model to be loaded)");
return;
}
// 1. Create a test meeting
println!("\n=== STEP 1: Create Meeting for Audio Test ===");
let title = format!(
"Audio E2E Test {}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis()
);
let meeting = client
.create_meeting(Some(title.clone()), HashMap::new(), None)
.await
.expect("Failed to create meeting");
println!("✓ Created meeting: {} (ID: {})", meeting.title, meeting.id);
let audio_fixture = match load_sample_audio(None) {
Some(loaded) => loaded,
None => {
let _ = client.delete_meeting(&meeting.id).await;
return;
}
};
// 3. Stream audio to server
println!("\n=== STEP 3: Stream Audio to Server ===");
let meeting_id = meeting.id.clone();
// Convert f32 samples to bytes (little-endian f32)
let chunks = samples_to_chunks(&audio_fixture.samples);
let total_chunks = chunks.len();
println!(
" Sending {} chunks ({} bytes each, {:.1}ms per chunk)",
total_chunks, CHUNK_BYTES, 100.0
);
// Create the outbound audio stream
let outbound = stream! {
for (i, chunk_data) in chunks.into_iter().enumerate() {
let timestamp = (i * CHUNK_SAMPLES) as f64 / TARGET_SAMPLE_RATE_HZ as f64;
yield pb::AudioChunk {
meeting_id: meeting_id.clone(),
audio_data: chunk_data,
timestamp,
sample_rate: TARGET_SAMPLE_RATE_HZ as i32,
channels: 1,
chunk_sequence: (i + 1) as i64,
audio_source: pb::AudioSource::Unspecified as i32,
};
// Small delay to simulate real-time streaming
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}
};
// Get the raw gRPC client and start streaming
let mut grpc_client = client.get_client().expect("Failed to get gRPC client");
let response = match grpc_client
.stream_transcription(tonic::Request::new(outbound))
.await
{
Ok(r) => r,
Err(e) => {
println!("\n⚠ Failed to start stream: {}", e);
println!(" This may indicate a server configuration issue.");
println!(" Audio decoding and preparation worked correctly.\n");
// Cleanup
let _ = client.delete_meeting(&meeting.id).await;
// Test summary for what we verified
println!("=== PARTIAL AUDIO STREAMING TEST SUMMARY ===");
println!(
" ✓ Audio file decoded: {} samples",
audio_fixture.samples.len()
);
println!(" ✓ Audio resampled to 16kHz mono");
println!("{} chunks prepared for streaming", total_chunks);
println!(" ✗ Stream failed: server configuration issue");
println!("=============================================\n");
// Test still passes for audio decoding verification
assert!(total_chunks > 0, "Should have prepared audio chunks");
return;
}
};
let mut inbound = response.into_inner();
// 4. Collect transcript updates
println!("\n=== STEP 4: Collect Transcript Updates ===");
let mut transcript_updates: Vec<pb::TranscriptUpdate> = Vec::new();
let mut final_segments = 0;
let mut partial_count = 0;
// Collect updates with timeout
let collect_timeout = tokio::time::Duration::from_secs(30);
let collect_result = tokio::time::timeout(collect_timeout, async {
while let Some(result) = inbound.next().await {
match result {
Ok(update) => {
let update_type = update.update_type;
if update_type == 2 {
// Final segment
final_segments += 1;
if let Some(ref segment) = update.segment {
println!(
" [FINAL] Segment {}: \"{}\"",
segment.segment_id,
segment.text.chars().take(50).collect::<String>()
);
}
} else if update_type == 1 {
// Partial
partial_count += 1;
if partial_count % 5 == 0 {
println!(" [partial] {} updates received...", partial_count);
}
}
transcript_updates.push(update);
}
Err(e) => {
println!(" Stream error: {}", e);
break;
}
}
}
})
.await;
match collect_result {
Ok(()) => println!(" Stream ended normally"),
Err(_) => println!(
" Collection timed out after {}s",
collect_timeout.as_secs()
),
}
println!(
"✓ Collected {} transcript updates ({} partials, {} finals)",
transcript_updates.len(),
partial_count,
final_segments
);
// 5. Stop meeting if it's recording
println!("\n=== STEP 5: Stop Meeting ===");
match client.stop_meeting(&meeting.id).await {
Ok(stopped) => println!("✓ Stopped meeting (state: {:?})", stopped.state),
Err(e) => println!("✓ Meeting not in recording state: {}", e),
}
// 6. Verify results (poll for segments)
println!("\n=== STEP 6: Verify Results ===");
let mut final_meeting = None;
let poll_start = std::time::Instant::now();
let poll_timeout = std::time::Duration::from_secs(20);
while poll_start.elapsed() < poll_timeout {
let candidate = client
.get_meeting(&meeting.id, true, true)
.await
.expect("Failed to get meeting");
if !candidate.segments.is_empty() {
final_meeting = Some(candidate);
break;
}
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
let final_meeting = if let Some(meeting) = final_meeting {
meeting
} else {
client
.get_meeting(&meeting.id, true, true)
.await
.expect("Failed to get meeting")
};
println!(" Meeting state: {:?}", final_meeting.state);
println!(" Total segments: {}", final_meeting.segments.len());
println!(" Duration: {:.2}s", final_meeting.duration_seconds);
// Print first few transcribed segments
for (i, segment) in final_meeting.segments.iter().take(3).enumerate() {
println!(
" Segment {}: \"{}...\"",
i + 1,
segment.text.chars().take(60).collect::<String>()
);
}
assert!(
!transcript_updates.is_empty(),
"Expected transcript updates from stream"
);
assert!(
!final_meeting.segments.is_empty(),
"Expected transcript segments after streaming audio"
);
// Reconnect after streaming to avoid keepalive/ping issues on long sessions
let post_stream_client = new_client(&url);
let post_stream_info = post_stream_client
.connect(Some(url.clone()))
.await
.expect("Failed to reconnect after streaming");
if let (Some(model), Some(base_url), Some(api_key)) = (
cloud_model.clone(),
cloud_base_url.clone(),
cloud_api_key.clone(),
)
{
let backup = configure_cloud_summary_provider(
&post_stream_client,
&model,
&base_url,
&api_key,
)
.await;
original_ai_config = backup.ai_config;
original_cloud_consent = backup.consent;
}
// 7. Generate summary
println!("\n=== STEP 7: Generate Summary ===");
let mut summary = match tokio::time::timeout(
std::time::Duration::from_secs(SUMMARY_TIMEOUT_SECS),
post_stream_client.generate_summary(&meeting.id, false, None),
)
.await
{
Ok(result) => result.expect("Failed to generate summary"),
Err(_) => {
panic!(
"Summary generation timed out after {} seconds",
SUMMARY_TIMEOUT_SECS
);
}
};
println!(
"✓ Summary generated ({} key points, {} action items)",
summary.key_points.len(),
summary.action_items.len()
);
let summary_loops = parse_env_usize("NOTEFLOW_CLOUD_SUMMARY_LOOPS", 1, 1, 10);
if summary_loops > 1
&& cloud_model.is_some()
&& cloud_base_url.is_some()
&& cloud_api_key.is_some()
{
for iteration in 2..=summary_loops {
let loop_summary = match tokio::time::timeout(
std::time::Duration::from_secs(SUMMARY_TIMEOUT_SECS),
post_stream_client.generate_summary(&meeting.id, false, None),
)
.await
{
Ok(result) => result.expect("Failed to generate summary"),
Err(_) => {
panic!(
"Summary generation timed out after {} seconds",
SUMMARY_TIMEOUT_SECS
);
}
};
println!(
"✓ Summary loop {} generated ({} key points, {} action items)",
iteration,
loop_summary.key_points.len(),
loop_summary.action_items.len()
);
assert!(
!loop_summary.executive_summary.trim().is_empty(),
"Summary loop should not be empty"
);
summary = loop_summary;
}
}
assert!(
!summary.executive_summary.trim().is_empty(),
"Summary should not be empty"
);
assert!(
!summary.key_points.is_empty(),
"Summary should include key points"
);
const ACTION_KEYWORDS: [&str; 15] = [
"todo",
"action",
"will",
"should",
"must",
"need to",
"let's",
"lets",
"follow up",
"next step",
"next steps",
"schedule",
"send",
"share",
"review",
];
let transcript_has_tasks = final_meeting.segments.iter().any(|segment| {
let text = segment.text.to_lowercase();
ACTION_KEYWORDS.iter().any(|keyword| text.contains(keyword))
});
if transcript_has_tasks {
assert!(
!summary.action_items.is_empty(),
"Summary should include action items for task extraction"
);
} else {
println!("No action keywords detected; skipping action item expectation.");
}
if let Some(expected_model) = cloud_model.clone() {
let expected = format!("openai/{expected_model}");
assert_eq!(
summary.model_version, expected,
"Expected cloud summary model version {}, got {}",
expected, summary.model_version
);
}
let segment_id_set: HashSet<i32> = final_meeting
.segments
.iter()
.map(|segment| segment.segment_id)
.collect();
for key_point in &summary.key_points {
assert!(
!key_point.segment_ids.is_empty(),
"Key point missing segment_ids: {}",
key_point.text
);
assert!(
key_point
.segment_ids
.iter()
.all(|segment_id| segment_id_set.contains(segment_id)),
"Key point references unknown segment_ids: {}",
key_point.text
);
}
for action_item in &summary.action_items {
assert!(
!action_item.segment_ids.is_empty(),
"Action item missing segment_ids: {}",
action_item.text
);
assert!(
action_item
.segment_ids
.iter()
.all(|segment_id| segment_id_set.contains(segment_id)),
"Action item references unknown segment_ids: {}",
action_item.text
);
}
// 8. Extract entities (NER)
println!("\n=== STEP 8: Extract Entities ===");
let entities = post_stream_client
.extract_entities(&meeting.id, true)
.await
.expect("Failed to extract entities");
println!(
"✓ Extracted {} entities (cached={})",
entities.entities.len(),
entities.cached
);
assert!(
!entities.entities.is_empty(),
"Expected at least one extracted entity"
);
for entity in &entities.entities {
assert!(
!entity.segment_ids.is_empty(),
"Entity missing segment_ids: {}",
entity.text
);
assert!(
entity
.segment_ids
.iter()
.all(|segment_id| segment_id_set.contains(segment_id)),
"Entity references unknown segment_ids: {}",
entity.text
);
}
// 9. Diarization refinement (optional)
if post_stream_info.diarization_enabled {
println!("\n=== STEP 9: Diarization Refinement ===");
let mut status = post_stream_client
.refine_speaker_diarization(&meeting.id, 0)
.await
.expect("Failed to start diarization refinement");
let job_start = std::time::Instant::now();
let job_timeout = std::time::Duration::from_secs(180);
while matches!(status.status, JobStatus::Queued | JobStatus::Running)
&& job_start.elapsed() < job_timeout
{
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
status = post_stream_client
.get_diarization_job_status(&status.job_id)
.await
.expect("Failed to poll diarization job");
}
assert!(
matches!(status.status, JobStatus::Completed),
"Diarization did not complete: {}",
status.error_message
);
assert!(
!status.speaker_ids.is_empty(),
"Diarization returned no speaker IDs"
);
} else {
println!("⚠ Diarization disabled (skipping refinement)");
}
// 10. Cleanup
println!("\n=== STEP 10: Cleanup ===");
let deleted = post_stream_client
.delete_meeting(&meeting.id)
.await
.expect("Failed to delete meeting");
println!("✓ Deleted meeting: {}", deleted);
restore_cloud_summary_provider(
&post_stream_client,
CloudConfigBackup {
ai_config: original_ai_config,
consent: original_cloud_consent,
},
)
.await;
// Final summary
println!("\n=== AUDIO STREAMING E2E TEST SUMMARY ===");
println!(
" Audio file: {:?}",
audio_fixture.path.file_name().unwrap()
);
println!(" Chunks sent: {}", total_chunks);
println!(" Updates received: {}", transcript_updates.len());
println!(" Partial updates: {}", partial_count);
println!(" Final segments: {}", final_segments);
println!(" Stored segments: {}", final_meeting.segments.len());
println!(" Summary key points: {}", summary.key_points.len());
println!("==========================================\n");
// Test passes if we successfully streamed audio and received responses
assert!(total_chunks > 0, "Should have sent audio chunks");
});
}
/// Edge case: stream format changes mid-stream should be rejected.
#[test]
#[ignore = "integration test; requires running server"]
fn streaming_rejects_format_change_mid_stream() {
if !should_run_integration_tests() {
eprintln!("Skipping (set NOTEFLOW_INTEGRATION=1 to run).");
return;
}
let _lock = STREAMING_TEST_LOCK
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
let url = get_server_url();
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
use async_stream::stream;
use noteflow_lib::grpc::noteflow as pb;
use tokio_stream::StreamExt;
use tonic::Code;
let client = new_client(&url);
let info = client
.connect(Some(url.clone()))
.await
.expect("Failed to connect");
if !info.asr_ready {
println!("⚠ ASR engine not ready, skipping format-change test");
return;
}
let meeting = client
.create_meeting(Some("Format Change Edge Case".to_string()), HashMap::new(), None)
.await
.expect("Failed to create meeting");
let meeting_id = meeting.id.clone();
let meeting_id_for_stream = meeting_id.clone();
let samples = vec![0.0_f32; 1600];
let base_audio: Vec<u8> = samples.iter().flat_map(|s| s.to_le_bytes()).collect();
let first_chunk = base_audio.clone();
let second_chunk = base_audio;
let outbound = stream! {
yield pb::AudioChunk {
meeting_id: meeting_id_for_stream.clone(),
audio_data: first_chunk,
timestamp: 0.0,
sample_rate: 16000,
channels: 1,
chunk_sequence: 1,
audio_source: pb::AudioSource::Unspecified as i32,
};
yield pb::AudioChunk {
meeting_id: meeting_id_for_stream.clone(),
audio_data: second_chunk,
timestamp: 0.1,
sample_rate: 44100,
channels: 1,
chunk_sequence: 2,
audio_source: pb::AudioSource::Unspecified as i32,
};
};
let mut grpc_client = client.get_client().expect("Failed to get gRPC client");
let response = grpc_client
.stream_transcription(tonic::Request::new(outbound))
.await
.expect("Failed to start stream");
let mut inbound = response.into_inner();
let status = tokio::time::timeout(std::time::Duration::from_secs(5), async {
while let Some(result) = inbound.next().await {
if let Err(status) = result {
return Some(status);
}
}
None
})
.await
.expect("Timed out waiting for stream error");
let status = status.expect("Expected stream to error due to format change");
assert_eq!(
status.code(),
Code::InvalidArgument,
"Expected InvalidArgument, got: {}",
status
);
let _ = client.delete_meeting(&meeting_id).await;
});
}
/// Cloud summary retries across reconnects with jittered backoff.
#[test]
#[ignore = "integration test; requires running server"]
fn cloud_summary_reconnect_with_jitter() {
if !should_run_integration_tests() {
eprintln!("Skipping (set NOTEFLOW_INTEGRATION=1 to run).");
return;
}
let cloud_model = match env::var("NOTEFLOW_CLOUD_LLM_MODEL") {
Ok(value) => value,
Err(_) => {
eprintln!("Skipping (set NOTEFLOW_CLOUD_LLM_MODEL).");
return;
}
};
let cloud_base_url = match env::var("NOTEFLOW_CLOUD_LLM_BASE_URL") {
Ok(value) => value,
Err(_) => {
eprintln!("Skipping (set NOTEFLOW_CLOUD_LLM_BASE_URL).");
return;
}
};
let cloud_api_key = match env::var("NOTEFLOW_CLOUD_LLM_API_KEY") {
Ok(value) => value,
Err(_) => {
eprintln!("Skipping (set NOTEFLOW_CLOUD_LLM_API_KEY).");
return;
}
};
let _lock = STREAMING_TEST_LOCK
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
let url = get_server_url();
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
use async_stream::stream;
use noteflow_lib::grpc::noteflow as pb;
use tokio_stream::StreamExt;
let client = new_client(&url);
let info = client
.connect(Some(url.clone()))
.await
.expect("Failed to connect");
if !info.asr_ready {
println!("⚠ ASR engine not ready, skipping jitter test");
return;
}
let title = format!(
"Cloud Jitter Summary Test {}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis()
);
let meeting = client
.create_meeting(Some(title.clone()), HashMap::new(), None)
.await
.expect("Failed to create meeting");
let audio_fixture = match load_sample_audio(Some(20.0)) {
Some(loaded) => loaded,
None => {
let _ = client.delete_meeting(&meeting.id).await;
return;
}
};
let chunks = samples_to_chunks(&audio_fixture.samples);
let meeting_id = meeting.id.clone();
let outbound = stream! {
for (i, chunk_data) in chunks.into_iter().enumerate() {
let timestamp = (i * CHUNK_SAMPLES) as f64 / TARGET_SAMPLE_RATE_HZ as f64;
yield pb::AudioChunk {
meeting_id: meeting_id.clone(),
audio_data: chunk_data,
timestamp,
sample_rate: TARGET_SAMPLE_RATE_HZ as i32,
channels: 1,
chunk_sequence: (i + 1) as i64,
audio_source: pb::AudioSource::Unspecified as i32,
};
tokio::time::sleep(tokio::time::Duration::from_millis(8)).await;
}
};
let mut grpc_client = client.get_client().expect("Failed to get gRPC client");
let response = grpc_client
.stream_transcription(tonic::Request::new(outbound))
.await
.expect("Failed to start stream");
let mut inbound = response.into_inner();
let collect_timeout = tokio::time::Duration::from_secs(20);
let _ = tokio::time::timeout(collect_timeout, async {
while let Some(result) = inbound.next().await {
if result.is_err() {
break;
}
}
})
.await;
let _ = client.stop_meeting(&meeting.id).await;
let backup = configure_cloud_summary_provider(
&client,
&cloud_model,
&cloud_base_url,
&cloud_api_key,
)
.await;
let max_attempts = parse_env_usize("NOTEFLOW_CLOUD_JITTER_ATTEMPTS", 3, 1, 6);
let mut last_error: Option<String> = None;
let mut summary = None;
for attempt in 0..max_attempts {
let jitter_ms = 150_u64
.saturating_mul(attempt as u64 + 1)
.saturating_add(((attempt * 97) as u64) % 120);
tokio::time::sleep(tokio::time::Duration::from_millis(jitter_ms)).await;
let reconnect_client = new_client(&url);
match reconnect_client.connect(Some(url.clone())).await {
Ok(_) => {
match tokio::time::timeout(
std::time::Duration::from_secs(SUMMARY_TIMEOUT_SECS),
reconnect_client.generate_summary(&meeting.id, false, None),
)
.await
{
Ok(result) => {
summary = Some(result.expect("Failed to generate summary"));
break;
}
Err(_) => {
last_error = Some(format!(
"Summary generation timed out after {} seconds",
SUMMARY_TIMEOUT_SECS
));
}
}
}
Err(error) => {
last_error = Some(format!("Reconnect attempt failed: {}", error));
}
}
}
let summary = summary.unwrap_or_else(|| {
panic!(
"Cloud summary retry failed: {}",
last_error.unwrap_or_else(|| "unknown error".to_string())
)
});
println!(
"✓ Summary generated after reconnect ({} key points, {} action items)",
summary.key_points.len(),
summary.action_items.len()
);
assert!(
!summary.executive_summary.trim().is_empty(),
"Summary should not be empty"
);
let _ = client.delete_meeting(&meeting.id).await;
restore_cloud_summary_provider(&client, backup).await;
});
}
/// Extreme-duration audio streaming to exercise long sessions.
#[test]
#[ignore = "integration test; requires running server"]
fn real_audio_streaming_extreme_duration() {
if !should_run_integration_tests() {
eprintln!("Skipping (set NOTEFLOW_INTEGRATION=1 to run).");
return;
}
if env::var("NOTEFLOW_EXTREME_STREAM").ok().as_deref() != Some("1") {
eprintln!("Skipping (set NOTEFLOW_EXTREME_STREAM=1 to run).");
return;
}
let _lock = STREAMING_TEST_LOCK
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
let url = get_server_url();
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
use async_stream::stream;
use noteflow_lib::grpc::noteflow as pb;
use tokio_stream::StreamExt;
let client = new_client(&url);
let info = client
.connect(Some(url.clone()))
.await
.expect("Failed to connect");
if !info.asr_ready {
println!("⚠ ASR engine not ready, skipping extreme duration test");
return;
}
let repeat_count = parse_env_usize("NOTEFLOW_EXTREME_AUDIO_REPEATS", 3, 2, 8);
let title = format!(
"Extreme Duration Audio Test {}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis()
);
let meeting = client
.create_meeting(Some(title.clone()), HashMap::new(), None)
.await
.expect("Failed to create meeting");
let audio_fixture = match load_sample_audio(None) {
Some(loaded) => loaded,
None => {
let _ = client.delete_meeting(&meeting.id).await;
return;
}
};
let extended_samples = repeat_samples(&audio_fixture.samples, repeat_count);
println!(
"✓ Extended audio to {:.2}s ({} repeats)",
extended_samples.len() as f64 / TARGET_SAMPLE_RATE_HZ as f64,
repeat_count
);
let chunks = samples_to_chunks(&extended_samples);
let meeting_id = meeting.id.clone();
let outbound = stream! {
for (i, chunk_data) in chunks.into_iter().enumerate() {
let timestamp = (i * CHUNK_SAMPLES) as f64 / TARGET_SAMPLE_RATE_HZ as f64;
yield pb::AudioChunk {
meeting_id: meeting_id.clone(),
audio_data: chunk_data,
timestamp,
sample_rate: TARGET_SAMPLE_RATE_HZ as i32,
channels: 1,
chunk_sequence: (i + 1) as i64,
audio_source: pb::AudioSource::Unspecified as i32,
};
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}
};
let mut grpc_client = client.get_client().expect("Failed to get gRPC client");
let response = grpc_client
.stream_transcription(tonic::Request::new(outbound))
.await
.expect("Failed to start stream");
let mut inbound = response.into_inner();
let collect_timeout = tokio::time::Duration::from_secs(
40 + (repeat_count as u64 * 25),
);
let _ = tokio::time::timeout(collect_timeout, async {
while let Some(result) = inbound.next().await {
if result.is_err() {
break;
}
}
})
.await;
let _ = client.stop_meeting(&meeting.id).await;
let final_meeting = client
.get_meeting(&meeting.id, true, true)
.await
.expect("Failed to get meeting");
println!(
"✓ Extreme duration meeting stored {} segments (duration {:.2}s)",
final_meeting.segments.len(),
final_meeting.duration_seconds
);
assert!(
!final_meeting.segments.is_empty(),
"Expected segments after extreme duration stream"
);
let _ = client.delete_meeting(&meeting.id).await;
});
}
}