docs: add new CLAUDE.md documentation files across various components and update Claude configuration.

This commit is contained in:
2026-01-20 03:16:01 +00:00
parent 217a867a8d
commit 00095896a5
21 changed files with 4576 additions and 63 deletions

View File

@@ -1,9 +1,9 @@
---
active: true
iteration: 6
iteration: 3
max_iterations: 0
completion_promise: null
started_at: "2026-01-19T15:47:26Z"
started_at: "2026-01-20T02:31:55Z"
---
Please refactor the disoriganized modules in the client dir based on @docs/sprints/phase-ongoing/sprint-organization/ and iterate on them until the organization is complete, no type errors, no type suppressions, zero preexisting errors, quality tests passing, all linters passing, and all unit and integration tests passing
proceed with the plan, i have also documented a copy in @.claudectx/codefixes.md. please use your agents iteratively to manage context and speed, however you must review the accuracy and value of each doc before moving to the next

View File

@@ -0,0 +1,533 @@
# Tauri Rust Backend Development Guide
## Overview
The Tauri backend (`client/src-tauri/src/`) is the Rust layer bridging the React frontend with the Python gRPC server. It handles audio capture, IPC commands, state management, and event emission.
**Architecture**: Tauri 2.0 + Tokio async runtime + tonic gRPC client
---
## Module Structure
```
src/
├── commands/ # 60+ Tauri IPC command handlers
├── grpc/ # gRPC client + types + streaming
├── state/ # Thread-safe runtime state (AppState)
├── audio/ # Capture, playback, mixing, drift compensation
├── events/ # Event emission to TypeScript frontend
├── triggers/ # Trigger detection (audio activity, foreground app)
├── identity/ # Auth keychain storage
├── crypto/ # Audio encryption (AES-256-GCM)
├── cache/ # Memory caching
├── error/ # Error types and classification
├── config.rs # Environment-based configuration
├── constants.rs # Centralized constants (273 lines)
├── helpers.rs # Utilities (timestamps, IDs, formatting)
├── lib.rs # Tauri setup + command registration
└── main.rs # Entry point (thin wrapper)
```
---
## Key Architectural Patterns
### 1. State Management (`AppState`)
All mutable state is wrapped in `Arc<RwLock<T>>` for thread-safe sharing:
```rust
pub struct AppState {
pub grpc_client: RwLock<Option<GrpcClient>>,
pub recording_session: RwLock<Option<RecordingSession>>,
pub playback_position: RwLock<f64>,
pub transcript_segments: RwLock<Vec<Segment>>,
pub crypto_manager: RwLock<Option<CryptoManager>>,
pub identity_manager: RwLock<Option<IdentityManager>>,
pub preferences: RwLock<UserPreferences>,
// ...
}
```
**Key Pattern**: Use `parking_lot::RwLock` instead of `std::sync::RwLock` for performance.
**Lazy Initialization**: Crypto and identity managers defer keychain access until first use to avoid OS permission dialogs at app startup.
### 2. Command Pattern
Commands are async functions decorated with `#[tauri::command]`:
```rust
#[tauri::command(rename_all = "snake_case")]
pub async fn connect(
state: State<'_, Arc<AppState>>,
app: AppHandle,
server_url: Option<String>,
) -> Result<ServerInfo> {
let grpc_client = GrpcClient::connect(server_url).await?;
*state.grpc_client.write() = Some(grpc_client);
app.emit(event_names::CONNECTION_CHANGE, ConnectionChangeEvent { connected: true })?;
Ok(server_info)
}
```
**Registration**: All commands must be added to `app_invoke_handler!` macro in `lib.rs`.
### 3. Event System
Events are emitted through a broadcast channel to the frontend:
```rust
pub enum AppEvent {
TranscriptUpdate(TranscriptUpdateEvent),
AudioLevel(AudioLevelEvent),
PlaybackPosition(PlaybackPositionEvent),
ConnectionChange(ConnectionChangeEvent),
MeetingDetected(MeetingDetectedEvent),
RecordingTimer(RecordingTimerEvent),
SummaryProgress(SummaryProgressEvent),
Error(ErrorEvent),
// ...
}
```
**Event Names**: Must match TypeScript `TauriEvents` constants in `client/src/api/tauri-constants.ts`.
### 4. Error Classification
All errors are classified for intelligent frontend handling:
```rust
pub fn classify(&self) -> ErrorClassification {
ErrorClassification {
grpc_status: Some(status_code),
category: "network".to_string(), // auth, validation, timeout, not_found, etc.
retryable: true,
}
}
```
---
## Command Categories
| Category | Count | Key Commands |
|----------|-------|--------------|
| Connection | 5 | `connect`, `disconnect`, `is_connected`, `get_server_info` |
| Identity | 8 | `get_current_user`, `switch_workspace`, `logout` |
| Projects | 11 | CRUD + members, `set_active_project` |
| Meeting | 5 | `create_meeting`, `list_meetings`, `stop_meeting` |
| Recording | 5 | `start_recording`, `stop_recording`, `send_audio_chunk` |
| Diarization | 5 | `refine_diarization`, `rename_speaker` |
| Audio | 14 | Device selection, test input/output, dual capture |
| Playback | 5 | `start_playback`, `pause_playback`, `seek` |
| Summary | 8 | `generate_summary`, templates, consent |
| Annotations | 5 | CRUD operations |
| Calendar | 7 | Events, OAuth, webhooks |
| OIDC | 8 | Provider management |
| Preferences | 4 | Get, save, sync |
| Triggers | 6 | Enable, snooze, status |
---
## gRPC Client (`grpc/`)
### Module Organization
```
grpc/
├── client/
│ ├── core.rs # Connection, ClientConfig, IdentityInterceptor
│ ├── meetings.rs # Meeting operations
│ ├── annotations.rs # Annotation CRUD
│ ├── calendar.rs # Calendar + OAuth
│ ├── converters.rs # Protobuf → domain types
│ └── ...
├── types/ # Domain types for frontend
├── streaming/ # Bidirectional audio streaming
└── noteflow.rs # Generated from proto (build.rs)
```
### Streaming Architecture
```rust
pub struct StreamManager {
tx: mpsc::Sender<AudioChunk>,
activity: Arc<AtomicBool>,
inactivity_timeout: Duration,
max_duration: Duration,
}
```
**Features**:
- Audio chunk buffering (channel capacity: 512)
- Backpressure signaling via `StreamHealth` event
- Inactivity timeout (5 min default)
- Max stream duration (4 hours default)
---
## Audio Subsystem (`audio/`)
### Capture (`capture.rs`)
- Uses `cpal` for cross-platform audio input
- RMS level callback for VU meter display
- Channel sender for streaming to gRPC
### Playback (`playback.rs`)
- Uses `rodio` with symphonia decoder
- Thread-based playback handle
- Position tracking via samples played
### Dual Capture (`mixer.rs`, `drift_compensation/`)
- Microphone + System Audio mixing
- Gain controls (0.01.0)
- Adaptive resampling for drift compensation
```rust
// Drift compensation detector
pub struct DriftDetector {
ema_alpha: f64, // 0.02
drift_threshold: f64, // 10 ppm
history: VecDeque<f64>,
}
```
---
## Recording Flow
```
start_recording()
├── Create RecordingSession
├── Start cpal audio capture
├── Start conversion task (async)
▼ (audio frames via callback)
send_audio_chunk()
├── Queue chunks for gRPC streaming
├── Apply drift compensation (if dual capture)
├── Encrypt if required
stop_recording()
├── Stop capture threads
├── Flush audio buffers
└── Close session
```
---
## Constants (`constants.rs`)
| Category | Constant | Value |
|----------|----------|-------|
| **Events** | EVENT_CHANNEL_CAPACITY | 100 |
| **gRPC** | CONNECTION_TIMEOUT | 5 seconds |
| **gRPC** | REQUEST_TIMEOUT | 300 seconds |
| **Audio** | DEFAULT_SAMPLE_RATE | 16000 Hz |
| **Audio** | BUFFER_SIZE_FRAMES | 1600 (100ms) |
| **Audio** | VU_METER_UPDATE_RATE | 20 Hz |
| **Recording** | TIMER_TICK | 1 second |
| **Streaming** | CHANNEL_CAPACITY | 512 |
| **Streaming** | INACTIVITY_TIMEOUT | 5 minutes |
| **Streaming** | MAX_DURATION | 4 hours |
| **Drift** | EMA_ALPHA | 0.02 |
| **Drift** | THRESHOLD_PPM | 10 |
---
## TypeScript Integration
### Command Invocation (TS → Rust)
```typescript
// client/src/api/tauri-adapter.ts
await invoke<ServerInfo>('connect', { server_url: 'localhost:50051' });
```
```rust
// src/commands/connection.rs
#[tauri::command(rename_all = "snake_case")]
pub async fn connect(...) -> Result<ServerInfo>
```
### Event Subscription (Rust → TS)
```rust
// Rust emits
app.emit(event_names::TRANSCRIPT_UPDATE, event)?;
```
```typescript
// TypeScript receives
listen('TRANSCRIPT_UPDATE', (event) => handleUpdate(event.payload));
```
### Proto Synchronization
When protobuf changes:
1. Update `src/noteflow/grpc/proto/noteflow.proto` (Python)
2. Run `npm run tauri:build` (regenerates `grpc/noteflow.rs`)
3. Update `src/grpc/types/*.rs` (Rust domain types)
4. Update `client/src/api/types/` (TypeScript types)
5. Update converters in `grpc/client/converters.rs`
---
## Adding New Commands
### 1. Implement Command
```rust
// src/commands/my_feature.rs
#[tauri::command(rename_all = "snake_case")]
pub async fn my_command(
state: State<'_, Arc<AppState>>,
app: AppHandle,
my_param: String,
) -> Result<MyResponse> {
// Implementation
}
```
### 2. Export from Module
```rust
// src/commands/mod.rs
pub mod my_feature;
pub use my_feature::my_command;
```
### 3. Register in lib.rs
```rust
// src/lib.rs
app_invoke_handler!(
// ... existing commands
my_command,
)
```
### 4. TypeScript Adapter
```typescript
// client/src/api/tauri-adapter.ts
async myCommand(myParam: string): Promise<MyResponse> {
return invoke<MyResponse>('my_command', { my_param: myParam });
}
```
### 5. Mock Adapter
```typescript
// client/src/api/mock-adapter.ts
async myCommand(myParam: string): Promise<MyResponse> {
return { /* mock response */ };
}
```
---
## Adding New Events
### 1. Add Event Variant
```rust
// src/events/mod.rs
pub enum AppEvent {
// ... existing
MyEvent(MyEventPayload),
}
```
### 2. Add Payload Struct
```rust
#[derive(Debug, Serialize, Clone)]
pub struct MyEventPayload {
pub data: String,
}
```
### 3. Add Event Name Constant
```rust
// src/events/event_names.rs
pub const MY_EVENT: &str = "MY_EVENT";
```
### 4. Update Dispatch Methods
```rust
fn event_name(&self) -> &'static str {
match self {
// ...
AppEvent::MyEvent(_) => event_names::MY_EVENT,
}
}
fn to_payload(&self) -> serde_json::Value {
match self {
// ...
AppEvent::MyEvent(e) => serde_json::to_value(e).unwrap(),
}
}
```
### 5. TypeScript Constants
```typescript
// client/src/api/tauri-constants.ts
export const MY_EVENT = 'MY_EVENT';
```
### 6. Update Contract Test
```typescript
// client/src/api/tauri-constants.test.ts
// Add MY_EVENT to expected events
```
---
## Forbidden Patterns
### ❌ Using `unwrap()` Without Justification
```rust
// WRONG
let value = some_option.unwrap();
// RIGHT
let value = some_option.ok_or(Error::NotFound)?;
```
### ❌ Direct Keychain Access at Startup
```rust
// WRONG: Triggers OS permission dialog at launch
fn new() -> Self {
let keychain = Keychain::new().get_key().unwrap();
}
// RIGHT: Lazy initialization
fn get_key(&self) -> Result<Key> {
let mut manager = self.crypto_manager.write();
if manager.is_none() {
*manager = Some(CryptoManager::new()?);
}
manager.as_ref().unwrap().get_key()
}
```
### ❌ Using `std::sync::RwLock`
```rust
// WRONG: Performance issues
use std::sync::RwLock;
// RIGHT: parking_lot is faster
use parking_lot::RwLock;
```
### ❌ Hardcoding Values
```rust
// WRONG
let timeout = Duration::from_secs(5);
// RIGHT
let timeout = Duration::from_secs(constants::CONNECTION_TIMEOUT_SECS);
```
### ❌ Bypassing Error Classification
```rust
// WRONG
Err(Error::Custom("Something failed".to_string()))
// RIGHT
Err(Error::MyOperation(MyOperationError::SpecificFailure))
// With classification implemented
```
---
## Code Quality Standards
### Clippy Configuration (`clippy.toml`)
```toml
cognitive-complexity-threshold = 25
too-many-lines-threshold = 100
too-many-arguments-threshold = 7
```
### Required Practices
- **Error Handling**: Always use `Result<T, Error>`, never panic
- **Documentation**: Doc comments for all public items
- **Naming**: `snake_case` commands match TypeScript adapter
- **Constants**: All magic numbers in `constants.rs`
- **Async**: Prefer `async/await` over raw futures
- **Logging**: Use `tracing` macros with appropriate levels
---
## Key Dependencies
| Crate | Purpose | Version |
|-------|---------|---------|
| `tauri` | App framework | 2.0 |
| `tokio` | Async runtime | 1.40 |
| `tonic` | gRPC client | 0.12 |
| `cpal` | Audio capture | 0.15 |
| `rodio` | Audio playback | 0.20 |
| `rubato` | Sample rate conversion | 0.16 |
| `aes-gcm` | Encryption | 0.10 |
| `parking_lot` | Sync primitives | 0.12 |
| `keyring` | Secure storage | 2.3 |
| `tracing` | Logging | 0.1 |
---
## Testing
### Unit Tests
```rust
#[cfg(test)]
mod tests {
#[test]
fn test_error_classification() {
let err = Error::Timeout("request timed out".into());
let classification = err.classify();
assert_eq!(classification.category, "timeout");
assert!(classification.retryable);
}
}
```
### Test Files
- `error/tests.rs` — Error classification
- `commands/*_tests.rs` — Command-specific tests
- `grpc/proto_compliance_tests.rs` — Proto contract verification
- `events/tests.rs` — Event serialization
---
## See Also
- `/client/CLAUDE.md` — Full client development guide
- `/client/src/api/types/` — TypeScript type definitions
- `/src/noteflow/grpc/proto/noteflow.proto` — gRPC schema (source of truth)

424
client/src/hooks/CLAUDE.md Normal file
View File

@@ -0,0 +1,424 @@
# React Hooks Development Guide
## Overview
The hooks layer (`client/src/hooks/`) provides reusable React hooks that encapsulate complex stateful logic, side effects, and business operations. Hooks use vanilla React patterns with the adapter interface for clean component integration.
**Architecture**: React hooks + `getAPI()` adapter pattern + vanilla state management
---
## Hook Catalog by Category
### Audio Hooks (`audio/`)
| Hook | File | Purpose |
|------|------|---------|
| `useAudioDevices` | `use-audio-devices.ts` | Device enumeration, selection, default tracking |
| `useAudioTesting` | `use-audio-testing.ts` | Input/output device testing with VU meter |
| `useAsrConfig` | `use-asr-config.ts` | ASR engine configuration |
| `useStreamingConfig` | `use-streaming-config.ts` | Streaming parameters |
**Support files**: `use-audio-devices.types.ts`, `use-audio-devices.helpers.ts`
### Auth Hooks (`auth/`)
| Hook | File | Purpose |
|------|------|---------|
| `useOAuthFlow` | `use-oauth-flow.ts` | OAuth authorization flow management |
| `useAuthFlow` | `use-auth-flow.ts` | OIDC authentication flow |
| `useOidcProviders` | `use-oidc-providers.ts` | OIDC provider configuration CRUD |
| `useSecureIntegrationSecrets` | `use-secure-integration-secrets.ts` | Secure secret storage |
| `useCloudConsent` | `use-cloud-consent.ts` | Cloud processing consent management |
| `useHuggingfaceToken` | `use-huggingface-token.ts` | HuggingFace API token management |
### Data Hooks (`data/`)
| Hook | File | Purpose |
|------|------|---------|
| `useProject` | `use-project.ts` | Active project state |
| `useProjectMembers` | `use-project-members.ts` | Project membership management |
| `useAsyncData` | `use-async-data.ts` | Generic async data fetching helper |
| `useGuardedMutation` | `use-guarded-mutation.ts` | Mutation with confirmation dialog |
### Processing Hooks (`processing/`)
| Hook | File | Purpose |
|------|------|---------|
| `useDiarization` | `use-diarization.ts` | Speaker diarization refinement |
| `usePostProcessing` | `use-post-processing.ts` | Post-meeting processing orchestration |
| `useEntityExtraction` | `use-entity-extraction.ts` | Named entity extraction trigger |
**Support files**: `events.ts` (event types), `state.ts` (state management)
### Recording Hooks (`recording/`)
| Hook | File | Purpose |
|------|------|---------|
| `useRecordingSession` | `use-recording-session.ts` | Recording session lifecycle |
| `useRecordingAppPolicy` | `use-recording-app-policy.ts` | App-specific recording rules |
### Sync Hooks (`sync/`)
| Hook | File | Purpose |
|------|------|---------|
| `useCalendarSync` | `use-calendar-sync.ts` | Calendar event synchronization |
| `useIntegrationSync` | `use-integration-sync.ts` | Integration state sync |
| `useIntegrationValidation` | `use-integration-validation.ts` | Integration health checks |
| `usePreferencesSync` | `use-preferences-sync.ts` | User preferences sync |
| `useWebhooks` | `use-webhooks.ts` | Webhook configuration CRUD |
| `useMeetingReminders` | `use-meeting-reminders.ts` | Calendar reminder notifications |
**Support files**: `sync-notifications.ts` (notification helpers)
### UI Hooks (`ui/`)
| Hook | File | Purpose |
|------|------|---------|
| `useToast` | `use-toast.ts` | Toast notification system |
| `useMobile` | `use-mobile.tsx` | Responsive breakpoint detection |
| `usePanelPreferences` | `use-panel-preferences.ts` | UI panel state persistence |
| `useAnimatedWords` | `use-animated-words.ts` | Word animation effects |
| `useRecordingPanels` | `use-recording-panels.ts` | Recording panel layout |
---
## Common Patterns
### 1. API Access via `getAPI()`
All hooks access the adapter through the `getAPI()` function:
```typescript
import { getAPI } from '@/api/interface';
export function useMyFeature() {
const performAction = useCallback(async () => {
const api = getAPI();
const result = await api.someMethod();
// Handle result...
}, []);
return { performAction };
}
```
**Important**: Call `getAPI()` inside callbacks/effects, not at hook initialization.
### 2. Mount Tracking for Async Safety
Prevent state updates on unmounted components:
```typescript
export function useMyHook() {
const isMountedRef = useRef(true);
useEffect(() => {
return () => {
isMountedRef.current = false;
};
}, []);
const fetchData = useCallback(async () => {
const data = await api.getData();
// Only update state if still mounted
if (isMountedRef.current) {
setData(data);
}
}, []);
return { fetchData };
}
```
### 3. Request ID Pattern (Superseding Operations)
For operations that might be superseded by newer requests:
```typescript
const requestIdRef = useRef(0);
const performOperation = useCallback(async () => {
const currentRequestId = ++requestIdRef.current;
const result = await api.longOperation();
// Bail if a newer request was initiated
if (currentRequestId !== requestIdRef.current) {
return;
}
setResult(result);
}, []);
```
### 4. Polling with Backoff
```typescript
const pollStatus = useCallback(async (
jobId: string,
initialInterval = 1000,
maxInterval = 30000,
) => {
let interval = initialInterval;
while (isMountedRef.current) {
const api = getAPI();
const status = await api.getJobStatus(jobId);
if (status.state === 'COMPLETED' || status.state === 'FAILED') {
return status;
}
await new Promise(resolve => setTimeout(resolve, interval));
interval = Math.min(interval * 1.5, maxInterval);
}
}, []);
```
### 5. Error State Management
```typescript
const [error, setError] = useState<string | null>(null);
const [isLoading, setIsLoading] = useState(false);
const doAction = useCallback(async () => {
setError(null);
setIsLoading(true);
try {
const api = getAPI();
await api.action();
} catch (err) {
if (isMountedRef.current) {
setError(extractErrorMessage(err));
}
} finally {
if (isMountedRef.current) {
setIsLoading(false);
}
}
}, []);
```
---
## Context Dependencies
### Connection State
```typescript
import { getConnectionState, subscribeConnectionState } from '@/api';
export function useMyHook() {
const [connected, setConnected] = useState(getConnectionState().connected);
useEffect(() => {
return subscribeConnectionState((state) => {
setConnected(state.connected);
});
}, []);
}
```
### Project Context
```typescript
import { useProjects } from '@/contexts/project-state';
export function useMyHook() {
const { activeProject, setActiveProject } = useProjects();
// ...
}
```
---
## Testing Hooks
### Test Setup
```typescript
import { renderHook, waitFor } from '@testing-library/react';
import { vi } from 'vitest';
// Mock getAPI
vi.mock('@/api/interface', () => ({
getAPI: vi.fn(() => ({
someMethod: vi.fn().mockResolvedValue({ data: 'test' }),
})),
}));
```
### Example Test
```typescript
describe('useMyHook', () => {
it('should fetch data', async () => {
const { result } = renderHook(() => useMyHook());
await act(async () => {
await result.current.fetchData();
});
expect(result.current.data).toBeDefined();
});
});
```
---
## Adding New Hooks
### 1. Choose the Right Category
| Category | Use When |
|----------|----------|
| `audio/` | Audio device or playback operations |
| `auth/` | Authentication, authorization, or consent |
| `data/` | CRUD operations on domain entities |
| `processing/` | Background processing or AI operations |
| `recording/` | Recording session management |
| `sync/` | Data synchronization across sources |
| `ui/` | UI state or user preferences |
### 2. Create Hook File
```typescript
// hooks/<category>/use-my-feature.ts
import { useCallback, useEffect, useRef, useState } from 'react';
import { getAPI } from '@/api/interface';
export interface UseMyFeatureOptions {
enabled?: boolean;
}
export function useMyFeature(options: UseMyFeatureOptions = {}) {
const { enabled = true } = options;
const isMountedRef = useRef(true);
useEffect(() => {
return () => {
isMountedRef.current = false;
};
}, []);
// Implementation...
return {
// Expose state and actions
};
}
```
### 3. Export from Index
```typescript
// hooks/<category>/index.ts
export * from './use-my-feature';
```
### 4. Add Tests
```typescript
// hooks/<category>/use-my-feature.test.ts
import { renderHook } from '@testing-library/react';
import { useMyFeature } from './use-my-feature';
describe('useMyFeature', () => {
it('should handle initial state', () => {
const { result } = renderHook(() => useMyFeature());
expect(result.current).toBeDefined();
});
});
```
---
## Forbidden Patterns
### ❌ Calling getAPI() at Module Level
```typescript
// WRONG: Called at module load time
const api = getAPI();
export function useMyHook() {
return api.getData();
}
// RIGHT: Call inside hooks/callbacks
export function useMyHook() {
const getData = useCallback(async () => {
const api = getAPI();
return api.getData();
}, []);
}
```
### ❌ Untracked Async State Updates
```typescript
// WRONG: Can update unmounted component
useEffect(() => {
fetchData().then(setData);
}, []);
// RIGHT: Track mount state
useEffect(() => {
let mounted = true;
fetchData().then(data => {
if (mounted) setData(data);
});
return () => { mounted = false; };
}, []);
```
### ❌ Inline Anonymous Functions in Dependencies
```typescript
// WRONG: New function identity every render
useEffect(() => {
doSomething();
}, [() => getData()]); // Bad dependency
// RIGHT: Use useCallback
const getData = useCallback(() => { /* ... */ }, [deps]);
useEffect(() => {
doSomething();
}, [getData]);
```
### ❌ Stale Closure Over State
```typescript
// WRONG: Captures stale count
const [count, setCount] = useState(0);
const increment = () => setCount(count + 1);
// RIGHT: Use functional update
const increment = useCallback(() => {
setCount(prev => prev + 1);
}, []);
```
---
## Key Files Reference
| File | Purpose |
|------|---------|
| `hooks/index.ts` | Central hook exports |
| `api/interface.ts` | `getAPI()` function and adapter interface |
| `api/` | Adapter implementations |
| `contexts/connection-state.ts` | Connection state management |
| `contexts/project-state.ts` | Project context |
---
## See Also
- `/client/CLAUDE.md` — Full client development guide
- `/client/src/api/` — API layer and adapter implementations
- `/client/src/contexts/` — React context implementations
- `/client/src/test/` — Test utilities and mocks

View File

@@ -320,7 +320,7 @@ export function useRecordingSession(
const runStop = async () => {
setRecordingState('stopping');
try {
streamRef.current?.close();
await streamRef.current?.close();
streamRef.current = null;
const api = shouldSimulate && !isConnected ? mockAPI : getAPI();
const stoppedMeeting = await api.stopMeeting(meeting.id);
@@ -402,7 +402,7 @@ export function useRecordingSession(
stream = ensureTranscriptionStream(await api.startTranscription(existingMeeting.id));
}
if (cancelled) {
stream.close();
await stream.close();
return;
}
streamRef.current = stream;
@@ -439,8 +439,8 @@ export function useRecordingSession(
// Cleanup on unmount
useEffect(() => {
return () => {
streamRef.current?.close();
return async () => {
await streamRef.current?.close();
};
}, []);

View File

@@ -60,6 +60,7 @@ export function useAnimatedWords(
// Split on whitespace while preserving structure
const words = text.split(/(\s+)/);
let newWordCounter = 0;
return words
.map((word, index) => {
// Skip empty strings and pure whitespace
@@ -68,14 +69,12 @@ export function useAnimatedWords(
}
const shouldAnimate = !seenIndicesRef.current.has(index);
let delay = 0;
// Calculate delay only for words that should animate
// Use the count of words before this one that also need animation
const animatingWordsBefore = words
.slice(0, index)
.filter((w, i) => w && !/^\s+$/.test(w) && !seenIndicesRef.current.has(i)).length;
const delay = shouldAnimate ? animatingWordsBefore * staggerDelay : 0;
if (shouldAnimate) {
delay = newWordCounter * staggerDelay;
newWordCounter++;
}
return {
word,

View File

@@ -140,8 +140,10 @@ export function clearClientLogs(): void {
}
// Flush logs on page unload to prevent data loss
let beforeUnloadListener: (() => void) | null = null;
if (typeof window !== 'undefined') {
window.addEventListener('beforeunload', flushLogs);
beforeUnloadListener = () => flushLogs();
window.addEventListener('beforeunload', beforeUnloadListener);
}
/**
@@ -155,4 +157,8 @@ export function _resetClientLogsForTesting(): void {
writeTimeout = null;
}
cachedLogs = null;
if (beforeUnloadListener !== null && typeof window !== 'undefined') {
window.removeEventListener('beforeunload', beforeUnloadListener);
beforeUnloadListener = null;
}
}

View File

@@ -239,10 +239,13 @@ export async function stopTauriEventBridge(): Promise<void> {
}
/**
* Force reset the bridge state for testing.
* Force reset of bridge state for testing.
* @internal
*/
export function _resetTauriEventBridgeForTesting(): void {
for (const fn of unlistenFns) {
fn();
}
bridgeStarted = false;
startPromise = null;
unlistenFns = [];

View File

@@ -0,0 +1,278 @@
According to a document from **January 20, 2026**, Noteflow already has a pretty “agent-friendly” shape: youve got a clean service layer for meeting/segment access (including semantic search), explicit gating for RAG features (`rag_enabled`), local-vs-cloud consent controls, usage/observability plumbing, and UI patterns for “AI jobs” like summary + entity extraction. That combination makes **LangGraph** a good fit as an orchestration layer *without* forcing you into a monolithic “mega-agent.”
Below are early LangGraph design/structural decisions that tend to **buy runway** (growth) while still fitting Noteflows architecture.
---
## 1) Treat the graph as an orchestration layer, not “the domain”
**Decision:** keep LangGraph in the *application service* boundary (or adjacent), and make nodes call existing Noteflow services/repos rather than embedding DB / gRPC / Tauri concerns directly in nodes.
Why this fits Noteflow:
* You already have service mixins that expose the primitives an agent needs (fetch segments, semantic search, etc.).
* Semantic search is already a first-class repository operation (`search_semantic`), so your “retrieve evidence” node can be a thin wrapper over that path.
LangGraph nuance:
* This keeps nodes idempotent and makes it much easier to checkpoint/resume safely later (interrupts + durable execution). ([LangChain Docs][1])
---
## 2) Separate input/output schemas from internal state **on day 1**
**Decision:** use LangGraphs **multiple schema** pattern:
* `InputState`: what your API accepts (question, meeting_id, options)
* `OutputState`: what your API returns (answer, citations, proposed actions)
* `InternalState`: everything else (retrieval hits, intermediate drafts, tool traces)
Why it matters:
* It prevents your internal “agent scratchpad” from becoming a de-facto public API (which is one of the easiest ways to accidentally freeze your design).
* It makes it safe to add new internal capabilities later (rerankers, web-search traces, multi-meeting retrieval, etc.) without breaking clients.
This is explicitly supported in LangGraphs Graph API docs (“multiple schemas”, private state channels). ([LangChain Docs][2])
Noteflow alignment:
* You already do something similar for extensibility at the settings layer via `extensions` + `schema_version`, which is the same *spirit* of “reserve room for growth.”
---
## 3) Decide your **thread_id scheme + checkpoint store** early
**Decision:** pick a stable mapping for `thread_id` now, even if your first version doesnt “chat” much.
A practical Noteflow-oriented scheme:
* `thread_id = f"meeting:{meeting_id}:user:{user_id}:graph:meeting_qa:v1"`
Why this is foundational:
* In LangGraph, threads/checkpoints are the backbone for resuming, interrupts, time-travel, memory, and fault tolerance. ([LangChain Docs][3])
* If you choose `thread_id` haphazardly, youll paint yourself into a corner when you later want “follow-up questions” or “resume where we left off”.
Noteflow alignment:
* You already propagate identity context (e.g., `user_id`, `workspace_id`) via gRPC interceptors—perfect inputs to the thread key.
Checkpoint store choice:
* If Noteflow is already leaning on Postgres for vector search, a Postgres-backed checkpointer is usually the cleanest “production-grade” choice; if you have local-only deployments, a SQLite-backed checkpointer can still work well. LangGraphs persistence model is explicitly built around compiling the graph with a checkpointer and using `thread_id` for lookup. ([LangChain Docs][3])
---
## 4) Keep graph state *small* and store references, not payloads
**Decision:** store **segment_ids** (and maybe tiny excerpts) in graph state, not whole transcripts.
Why:
* Checkpoints persist *the state*. If you shove huge transcript blobs into state, you will pay for it in serialization, storage, and latency.
* Noteflow already has a natural “reference system”: segments have IDs, and annotations already carry `segment_ids` in the protobuf mapping.
Noteflow nuance:
* Theres even a `full_transcript` convenience property—great for UI, dangerous for agent prompts. Prefer retrieval + windowing instead of “join everything.”
---
## 5) Use reducers deliberately (especially for messages)
**Decision:** if you maintain conversation turns in state, do it the LangGraph way:
* Use a `messages` channel with the `add_messages` reducer, not a naive list append.
Why:
* LangGraph reducers govern how node updates merge into state; if you get this wrong early, concurrency + resumability become painful.
* For message history, LangGraph specifically recommends `add_messages` so updates can overwrite by ID (useful for human-in-the-loop edits) instead of only appending. ([LangChain Docs][2])
---
## 6) Plan for “writes” as a separate phase with human-in-the-loop gates
**Decision:** design your first graphs so that “agent suggestions” are outputs, and “agent writes” (creating annotations, modifying meeting artifacts) are a *second step* that can be approved.
Why:
* Noteflow has real side effects available (e.g., creating annotations with types like action items/decisions/notes/risks).
* If you later use LangGraph **interrupts**, you must treat side effects carefully: the docs explicitly warn that code before an interrupt runs again on resume, so side effects must be idempotent. ([LangChain Docs][1])
So, structurally:
* Graph A (“analyze”) produces: answer + citations + `proposed_annotations[]`
* Graph B (“apply”) performs writes *only after approval* (interrupt or explicit user action)
This keeps you safe today and unlocks richer automation later.
---
## 7) Make your “tools” match Noteflow ports/services, not LangChain tools directly
**Decision:** define a Noteflow-level tool/adapter interface and keep LangGraph nodes calling that interface.
Why it wont hinder growth:
* You can swap implementations (local vs cloud model, different embedder, different web search adapter, offline mode) without rewriting graphs.
* Noteflow already uses this pattern heavily in observability ports + sinks and provider selection logic; you can mirror it for “agent tools.”
Key Noteflow nuance:
* Semantic search is not universally available: your in-memory repo path explicitly no-ops embeddings and returns empty semantic results (it requires pgvector). Your retrieval node should degrade gracefully (fallback to lexical/time-window search, or “no evidence found”).
---
## 8) Gate features with existing Noteflow switches and consent checks
**Decision:** tie agent capabilities to:
* `rag_enabled` at the project/effective-rules level
* “cloud consent required” checks before any cloud LLM/web call
Why:
* You already model `rag_enabled` in project settings and effective rules; use that as the feature flag for all RAG-style agent surfaces.
* You already have a consent pattern that asserts cloud consent before cloud summarization; agents should reuse the same shape so you dont build a parallel consent system.
Hardware nuance:
* Your UI already educates users on local model fit (“likely exceeds available VRAM/RAM… consider smaller model or INT8…”). Design graphs to accept a “budget” and choose smaller models/shorter context automatically.
---
## 9) Stream progress in a way that matches Noteflow UX patterns
**Decision:** decide early what “agent progress” looks like in the UI and map LangGraph streaming to it.
Noteflow already has:
* Tauri events + toasts for `SUMMARY_PROGRESS` and `DIARIZATION_PROGRESS`. An “AGENT_PROGRESS” event fits naturally.
LangGraph supports multiple streaming modes (values/updates/messages, etc.). Pick the minimal one that matches your UI first (e.g., stream partial answer tokens OR discrete node-stage updates), and grow later. ([LangChain Docs][4])
---
## 10) Make your first public API surface very small
**Decision:** start with *one* graph + *one* endpoint:
* “Ask this meeting a question, return an answer with citations to segments.”
Why this is the right “reach” early:
* It directly leverages your strongest primitives: segments + semantic search + citation anchors.
* It keeps compute predictable (important given your hardware sensitivity concerns).
* It naturally composes into future features (auto annotations, cross-meeting RAG, “compare meetings”, “what changed since last week”, etc.).
Noteflow UI already has an “AI actions” affordance on the meeting header (summary + entity extraction buttons). Adding “Ask” there, and/or a new “Ask” tab beside Summary/Transcript/Notes, is consistent with existing structure.
---
# A concrete LangGraph scaffold that fits Noteflow
This is intentionally “thin”: it assumes nodes call existing Noteflow services like `MeetingServiceSegmentsMixin.search_segments()` and return IDs + citations (not giant blobs).
```python
# src/noteflow/application/services/agent/meeting_qa_graph.py
from __future__ import annotations
from typing import Annotated, TypedDict, NotRequired
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
# If you use LangChain message objects:
from langchain_core.messages import AnyMessage
class MeetingQAInput(TypedDict):
meeting_id: str
question: str
class MeetingQAOutput(TypedDict):
answer: str
citations: list[dict] # e.g. [{"segment_id": 12, "start": 123.4, "end": 130.2}]
class MeetingQAState(MeetingQAInput):
# Keep internal state minimal & serializable
messages: Annotated[list[AnyMessage], add_messages]
retrieved_segment_ids: NotRequired[list[int]]
retrieved_segments: NotRequired[list[dict]]
answer: NotRequired[str]
citations: NotRequired[list[dict]]
def build_meeting_qa_graph(*, meeting_tools) -> object:
"""
meeting_tools is your Noteflow-facing adapter:
- semantic_retrieve(meeting_id, question) -> [{segment_id, text, start_time, end_time, score}, ...]
- synthesize_answer(question, evidence) -> (answer, citations)
"""
async def retrieve_node(state: MeetingQAState) -> dict:
evidence = await meeting_tools.semantic_retrieve(
meeting_id=state["meeting_id"],
question=state["question"],
)
return {
"retrieved_segments": evidence,
"retrieved_segment_ids": [e["segment_id"] for e in evidence],
}
async def answer_node(state: MeetingQAState) -> dict:
answer, citations = await meeting_tools.synthesize_answer(
question=state["question"],
evidence=state.get("retrieved_segments", []),
)
return {"answer": answer, "citations": citations}
g = StateGraph(MeetingQAState, input_schema=MeetingQAInput, output_schema=MeetingQAOutput)
g.add_node("retrieve", retrieve_node)
g.add_node("answer", answer_node)
g.add_edge(START, "retrieve")
g.add_edge("retrieve", "answer")
g.add_edge("answer", END)
return g.compile()
```
This scaffolding maps cleanly onto:
* Noteflow segment semantic search (your evidence node can call `search_segments()` which calls the repos `search_semantic()` under the hood).
* Output citations anchored to `segment_id` (which already matches your annotation protobuf shape).
* LangGraph best practices around messages + reducers and schema separation. ([LangChain Docs][2])
---
## The “gotchas” Id be mindful of in your specific app
1. **Semantic search availability**
* In-memory mode cant do embeddings/semantic retrieval, so your graph must tolerate “no evidence found” and still answer conservatively (or disable the feature).
2. **Embedding dimensionality constraints**
* Your DB semantic search assumes a stored embedding vector and computes cosine distance; that implies your query embedder must match whats stored (dimension + model family). Any future “multiple embedding models” plan is easiest if you decide that strategy early.
3. **Interrupt/resume semantics**
* If you adopt interrupts later for approval flows, remember: the node restarts from the beginning on resume, so avoid non-idempotent side effects before the interrupt (LangGraph calls this out explicitly). ([LangChain Docs][1])
4. **State bloat**
* Dont put `full_transcript` into graph state; store references and fetch-on-demand. Your own domain model makes this easy, since segments are already structured objects.
5. **UX expectations**
* Noteflow already signals “AI tasks completed/failed” via Tauri events; users will expect similar behavior from an agent. Plan an `AGENT_PROGRESS` event early so you dont retrofit later.
---
If you want to pick *one* early decision thats the highest leverage: **commit to (Input/Output/Internal) schemas + thread_id convention now**, even before you add checkpointing. Its the easiest way to avoid “API surface ossification” while still letting you ship an MVP meeting-QA agent quickly. ([LangChain Docs][2])
[1]: https://docs.langchain.com/oss/python/langgraph/interrupts "Interrupts - Docs by LangChain"
[2]: https://docs.langchain.com/oss/python/langgraph/graph-api "Graph API overview - Docs by LangChain"
[3]: https://docs.langchain.com/oss/python/langgraph/persistence "Persistence - Docs by LangChain"
[4]: https://docs.langchain.com/oss/python/langgraph/streaming "Streaming - Docs by LangChain"

View File

@@ -0,0 +1,379 @@
According to a document from **January 19, 2026**, NoteFlow is already organized around a fairly rich **gRPC service surface** (`NoteFlowService`) that covers meeting lifecycle + streaming transcription, diarization refinement, summaries, entity extraction, annotations, exports, OAuth/calendar, webhooks, etc. That shape strongly favors an agent framework that behaves like a **controlled workflow engine** (predictable branches, bounded tool use, resumable steps) rather than an unbounded “chat loop”.
Since you want to pursue **LangGraph**, Id lean into it as the “workflow spine”: define a small, typed state machine that routes between (1) retrieval from your existing transcript/segment store, (2) optional web search, (3) answer synthesis with citations, and (4) *optional* safe actions (like creating annotations) only when explicitly requested. LangGraph is designed around graph/state/conditional edges in exactly this way. ([GitHub][1])
Below is a concrete brainstorm + scaffolding plan for “injecting an agent into noteflow” that stays mindful of hardware constraints and uses what you already have.
---
## Why this fits NoteFlows direction
### You already have “RAG/Q&A” as a first-class concept
Theres explicit support for **RAG enablement** at both the **project rules** level (`rag_enabled`) and in workspace settings messages. Even roles mention “run Q&A” for viewers. So: youre not bolting on an alien feature—this is aligned with the domain model.
### Your transcript store is already a vector-friendly knowledge base
Segments have timing + text structure suitable for grounding answers, and your SQL segment repository already supports **semantic search by embedding**, optionally scoped to a meeting.
The big gotcha (which informs the agent design): the in-memory repository explicitly **doesnt support semantic search**. So the agent needs a graceful fallback when running without a DB.
### You already have safe “action surfaces” the agent can call
Annotations have an established API surface (Add/Update/Delete/List) in the gRPC service, and types that map cleanly to agent outputs like decisions/action items/risks. This means the agent can start read-only, then “graduate” to proposing/creating structured artifacts without inventing new primitives.
---
## Recommended agent “reach” in phases
### Phase 1: Meeting Q&A with citations (MVP)
**Goal:** “Ask a question about a meeting (or across meetings) and get an answer grounded in segments.”
Reach:
* Retrieve top-K transcript segments via semantic search (DB mode) or keyword scan (no-DB mode).
* Produce an answer with **segment citations** (meeting_id + segment_id + timestamps).
* No mutations, no web.
Why this is feasible now:
* Segment semantic search exists and supports an optional meeting scope.
* Segment structures include timing, which you can turn into clickable citations in the UI.
### Phase 2: Cross-meeting knowledge (“recordings as a RAG store”)
**Goal:** Query across your entire recordings corpus.
Reach:
* Same retrieval tool, but omit meeting_id so it searches globally (still top-K bounded).
* Add lightweight “answer + supporting meetings list”.
Guardrails:
* Must be gated by `rag_enabled` and (optionally) role, since roles mention Q&A explicitly.
### Phase 3: Enrichment suggestions (still low-hardware)
**Goal:** Have the agent *suggest* structured outputs that already exist in your data model:
* Suggested **annotations**: decision, action item, risk, follow-up
* Suggested “key moments” with timestamps (just segments)
* Suggested summary refinement prompts (reusing your summary template flow)
This stays cheap because suggestions can be produced from the retrieved segments subset.
### Phase 4: Optional web cross-check node
Only when the user asks something external (“Is that metric true?”, “Whats the latest spec?”), route to a web-search node and have a smaller model do “compare meeting claim vs web sources”.
This is where youll want the “small model cross-referencer” you mentioned.
---
## The LangGraph workflow Id implement
Heres the graph as a controlled pipeline (not “agentic chaos”):
1. **Intent + scope detection**
Decide: meeting-scoped vs workspace-scoped vs external research.
2. **Retrieve** (RAG)
Pull segments from DB semantic search (or fallback).
3. **Optional web branch**
Only if intent requires it.
4. **Cross-reference / verify** (small model)
Check that citations actually support claims (and/or compare meeting claims vs web).
5. **Synthesize answer** (bigger model)
Generate final response with citations + (optional) structured suggestions.
LangGraph supports exactly this style of stateful, conditional routing. ([GitHub][1])
If you want a “plan/execute” style with a smaller model doing step selection, thats also a known LangGraph pattern. ([LangChain][2])
---
## The “tools” your agent should have in NoteFlow
Think of these as *internal* functions that your graph nodes can call. Keep them small and bounded.
### Retrieval tools
* `search_segments(meeting_id: Optional[str], query: str, k: int) -> list[SegmentHit]`
* DB mode: embed query + call `segments.search_semantic(...)`
* No-DB mode: keyword scan / recent segments
* `get_meeting(meeting_id)`, `list_meetings(...)`
You already have meeting service patterns for this.
### Enrichment/action tools (phase 3+)
* `list_annotations(meeting_id)` / `add_annotation(...)`
gRPC already exposes these actions.
* `generate_summary(meeting_id)`
Exists as a first-class RPC, with processing-status tracking already implemented around the summary step.
### Optional web tool (phase 4)
* `web_search(query) -> list[WebResult]`
Implement as a provider interface so its easy to disable or swap.
---
## API surfaces to add
### Backend gRPC
You have a single `NoteFlowService` with many RPCs already. The cleanest addition is:
* `AskAssistant`: unary request/response for “ask + answer + citations”
* (Optional) `StreamAssistant`: streaming tokens/chunks for better UX
**Request fields** (suggested):
* `meeting_id` (optional): if present, meeting-scoped; else workspace/global scoped
* `question`
* `thread_id` or `conversation_id` (optional)
* `allow_web` (bool) and `allow_actions` (bool) for safety gating
* `top_k`, `time_range`, etc.
**Response fields**:
* `answer_markdown`
* `citations[]`: `{meeting_id, segment_id, start_time, end_time}`
* `suggested_annotations[]` (optional, phase 3)
This matches your existing style where RPCs return typed data objects (summary proto, annotations proto, etc.).
### Frontend API + UI
You already have a typed `API` interface in the client that includes `generateSummary`, `extractEntities`, `searchSegments`, etc.. Add something like:
* `askAssistant(request): Promise<AskAssistantResponse>`
UI-wise, you already have a “Generate Summary” entry point in the meeting detail header props (`onGenerateSummary`, etc.). Add:
* `onAskAssistant` + a “Ask” button beside the Sparkles summary affordance.
Gate visibility behind:
* `rag_enabled` rules
* workspace settings `rag_enabled`
* role permissions (since Q&A is explicitly called out)
---
## Scaffolding: where to “inject” this in the server
You already group optional services into an `OptionalServicesConfig` to keep servicer init sane, and `build_servicer(...)` wires those services into `ServicesConfig(...)`.
So Id add:
* `assistant_service: AssistantService | None = None` to that config group (same pattern as summarization/ner/calendar/webhook services).
* A new gRPC mixin module, similar to how summarization and meeting segments are broken out into mixins.
---
## Scaffolding: LangGraph graph + service wrapper
Below is a “starter skeleton” that matches your architecture constraints (async, bounded retrieval, clean separation). Its intentionally not tied to the deprecated helper APIs and sticks to the core `StateGraph` pattern. ([GitHub][1])
### 1) Define the state + citation types
```python
# src/noteflow/application/services/assistant/state.py
from __future__ import annotations
from dataclasses import dataclass
from typing import Literal, TypedDict
@dataclass(frozen=True)
class SegmentCitation:
meeting_id: str
segment_id: int
start_time: float
end_time: float
text: str
score: float | None = None
class AssistantState(TypedDict, total=False):
# request
request_id: str
meeting_id: str | None
question: str
# routing
intent: Literal["meeting_qa", "workspace_qa", "external_research"]
allow_web: bool
# retrieval
retrieval_query: str
citations: list[SegmentCitation]
# web
web_results: list[dict]
# drafting
draft_answer: str
final_answer: str
```
This is built to support timestamped citations, which map naturally to your segment structure (`segment_id`, `start_time`, `end_time`, `text`).
### 2) Build a retrieval helper that uses your DB semantic search when available
```python
# src/noteflow/application/services/assistant/retrieval.py
from __future__ import annotations
from typing import Protocol
from uuid import UUID
from noteflow.domain.ports.unit_of_work import UnitOfWork
class Embedder(Protocol):
async def embed(self, text: str) -> list[float]: ...
async def retrieve_segments(
uow: UnitOfWork,
embedder: Embedder,
*,
meeting_id: UUID | None,
query: str,
top_k: int = 8,
) -> list[tuple[object, float]]:
"""
Returns: list of (Segment, score) domain objects
"""
query_vec = await embedder.embed(query)
# DB-backed semantic search supports meeting scoping
# (signature supports meeting_id: MeetingId | None):contentReference[oaicite:39]{index=39}
return await uow.segments.search_semantic(
query_embedding=query_vec,
meeting_id=meeting_id,
limit=top_k,
)
```
Then: if youre in memory mode, youll need a fallback path because semantic search wont work without DB support.
### 3) Create the LangGraph workflow
```python
# src/noteflow/application/services/assistant/graph.py
from __future__ import annotations
from langgraph.graph import StateGraph, START, END # :contentReference[oaicite:41]{index=41}
from .state import AssistantState
def build_assistant_graph(
*,
classify_intent,
retrieve_node,
maybe_web_node,
verify_node,
synthesize_node,
):
g = StateGraph(AssistantState)
g.add_node("classify_intent", classify_intent)
g.add_node("retrieve", retrieve_node)
g.add_node("maybe_web", maybe_web_node)
g.add_node("verify", verify_node)
g.add_node("synthesize", synthesize_node)
g.add_edge(START, "classify_intent")
g.add_edge("classify_intent", "retrieve")
# simple conditional routing: only call web if intent says so AND allow_web
def route_after_retrieve(state: AssistantState) -> str:
if state.get("allow_web") and state.get("intent") == "external_research":
return "maybe_web"
return "verify"
g.add_conditional_edges("retrieve", route_after_retrieve, {
"maybe_web": "maybe_web",
"verify": "verify",
})
g.add_edge("maybe_web", "verify")
g.add_edge("verify", "synthesize")
g.add_edge("synthesize", END)
return g.compile()
```
### 4) Verification step: reuse your existing citation-verification concept
You already have a `SegmentCitationVerifier` pattern in summarization land to validate/filter citations. Reuse the same idea here:
* Small model (or heuristic) checks:
* each cited segment actually contains the supporting text
* remove invalid citations before final answer
* if “not enough evidence”, respond with “I dont know from the meeting” + ask for clarification (UX choice)
This directly attacks hallucinations without needing a giant model.
---
## Hardware + performance guardrails
### Make DB mode the “full RAG” mode
Your server config explicitly supports running without a DB (`database_url: ... If None, runs in-memory mode`). Since semantic search is not supported in memory:
* If DB available → semantic retrieval (best experience)
* If DB not available → fallback:
* last-N segments
* keyword match
* or “summary-only Q&A” (if you have summary)
### Use consent/remote settings patterns you already have
You already persist cloud consent and apply it into the summarization service settings on startup, and you have gRPC endpoints for grant/revoke/status.
If your “big model” is cloud-based:
* gate it behind the same consent model (or reuse it)
* fall back to local model/provider if not granted
### Track usage like you already do for summarization
Your summarization providers return tokens/latency, and you already aggregate usage events (provider/model/tokens) for analytics. The agent should emit the same kind of usage events per model call:
* planner/verifier call
* synthesis call
* web calls (if any)
---
## Minimal “MVP build order” Id do next
1. **Backend internal service**
Add `AssistantService.ask(...)` that:
* retrieves segments (DB semantic if possible)
* produces answer with citations
2. **gRPC surface**
Add `AskAssistant` RPC alongside existing meeting operations.
3. **Client API + UI**
Add `askAssistant` to the client `API` interface and wire a button into `MeetingDetailHeader` next to summary actions.
4. **Rules/permissions gating**
Hide/disable unless `rag_enabled` is true.
5. **Optional: annotation suggestion**
Return `suggested_annotations[]` without writing. Later: add an “Apply” action that calls AddAnnotation.
---
If you want, I can take the next step and propose the exact proto shapes for `AskAssistantRequest/Response` in the same style as your existing proto messages (including how to represent citations as segment ranges), plus the server mixin layout to match how `GenerateSummary` and consent endpoints are structured.
[1]: https://github.com/langchain-ai/langgraph?utm_source=chatgpt.com "langchain-ai/langgraph: Build resilient language agents as ..."
[2]: https://langchain-ai.github.io/langgraph/tutorials/plan-and-execute/plan-and-execute/?utm_source=chatgpt.com "Plan-and-Execute"

View File

@@ -1,81 +1,333 @@
# Sprint 25+: LangGraph Migration
# Sprint 2528: LangGraph Integration
> **Size**: XL (multi-sprint) | **Owner**: Backend | **Prerequisites**: Sprint 19, Sprint 21
> **Phase**: 5 - Platform Evolution
> **Size**: XL (Multi-Sprint) | **Owner**: Backend + Client | **Phase**: 5 - Platform Evolution
> **Total Effort**: ~4 sprints (2528) | **Prerequisites**: Sprint 19 (Embeddings), Sprint 21 (MCP Config - optional)
---
## Validation Status (2025-12-31)
## Executive Summary
### 🚫 BLOCKED — Prerequisites Not Implemented
This initiative introduces LangGraph as an orchestration layer for AI workflows in NoteFlow, starting with a Meeting Q&A feature that leverages existing semantic search capabilities. The implementation is structured as four progressive sprints:
| Sprint | Focus | Deliverable |
|--------|-------|-------------|
| **25** | Foundation | LangGraph infrastructure + wrap existing summarization |
| **26** | Meeting Q&A MVP | Single-meeting Q&A with citations |
| **27** | Cross-Meeting RAG | Workspace-scoped retrieval + suggested annotations |
| **28** | Advanced Capabilities | Streaming, caching, guardrails, web search |
---
## Validation Status (2026-01-20)
### ✅ UNBLOCKED — Core Prerequisites Satisfied
| Prerequisite | Status | Impact |
|--------------|--------|--------|
| MCP configuration (Sprint 21) | ❌ Not implemented | Cannot configure tool sources |
| Usage events (Sprint 15 / OTel) | ❌ Not implemented | Cannot emit run metadata |
| Project scoping (Sprint 18) | ✅ Implemented | No longer blocks Sprint 21 |
| Semantic search (Sprint 19) | ✅ Implemented | `SegmentRepository.search_semantic` ready |
| Embeddings storage | ✅ Implemented | `SegmentModel.embedding` with pgvector |
| Project scoping (Sprint 18) | ✅ Implemented | RAG can be scoped to projects |
| RAG feature flags | ✅ Implemented | `rag_enabled` in project rules |
| Summarization service | ✅ Implemented | Pattern to follow for new AI services |
**Action required**: Complete Sprint 21 (MCP Config) and Sprint 15 (Usage Events) before starting.
### ⚠️ OPTIONAL — Non-Blocking Prerequisites
| Prerequisite | Status | Impact |
|--------------|--------|--------|
| MCP configuration (Sprint 21) | ❌ Not implemented | Tool sources hardcoded initially |
| Usage events (Sprint 15/OTel) | ⚠️ Partial | Use existing `UsageEventSink` pattern |
**Decision**: Proceed without MCP. Use hardcoded tool configuration initially; refactor when Sprint 21 completes.
---
## Objective
## Architecture Overview
Replace AI workflows with LangGraph after context sources/scoping exist. Enable sophisticated RAG, research, and Q&A capabilities.
### LangGraph as Orchestration Layer
```
┌─────────────────────────────────────────────────────────────────────────────┐
│ gRPC API Surface │
│ AskAssistant | StreamAssistant | GetAssistantHistory │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐
│ Application Layer │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ AssistantService │ │
│ │ - ask(question, meeting_id?, options) │ │
│ │ - Manages graph compilation, checkpointing, usage tracking │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ LangGraph Workflows │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ MeetingQA │ │ SummaryWrap │ │ Research │ │ │
│ │ │ Graph │ │ Graph │ │ Graph │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐
│ Infrastructure Layer │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Retrieval │ │ Synthesis │ │ Verification │ │ Checkpointer │ │
│ │ Tools │ │ Providers │ │ Engine │ │ (Postgres) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘
```
### Key Design Decisions
| Decision | Rationale |
|----------|-----------|
| **Orchestration, not domain** | LangGraph lives in application layer; nodes call existing services |
| **Separate Input/Output/Internal schemas** | Prevents internal state from becoming API surface |
| **Reference-based state** | Store `segment_ids`, not full transcripts (checkpoint efficiency) |
| **thread_id convention** | `meeting:{meeting_id}:user:{user_id}:graph:{graph_name}:v{version}` |
| **Postgres checkpointer** | Matches existing persistence infrastructure |
| **Consent-gated cloud calls** | Reuse existing cloud consent pattern from summarization |
---
## What Already Exists
## Sprint Breakdown
| Asset | Location | Implication |
|-------|----------|-------------|
| Summarization service | `application/services/summarization_service.py` | ✅ Wrap target |
| RAG retrieval (Sprint 19) | ✅ `SegmentModel.embedding` + cosine_distance | Context source ready |
| MCP configuration (Sprint 21) | ❌ Not implemented | **Prerequisite missing** |
| Usage events (Sprint 15) | ❌ Not implemented | **Prerequisite missing** |
### [Sprint 25: LangGraph Foundation](./sprint-25-foundation/README.md)
**Objective**: Establish LangGraph infrastructure and wrap existing summarization as proof of pattern.
| Task | Effort | Deliverable |
|------|--------|-------------|
| Add `langgraph` + `langgraph-checkpoint-postgres` dependencies | S | `pyproject.toml` |
| Create state schemas (Input/Output/Internal) | M | `domain/ai/state.py` |
| Create retrieval tools adapter | M | `infrastructure/ai/tools/retrieval.py` |
| Create synthesis adapter | M | `infrastructure/ai/tools/synthesis.py` |
| Wrap summarization in LangGraph graph | M | `infrastructure/ai/graphs/summarization.py` |
| Create AssistantService shell | M | `application/services/assistant/` |
| Add checkpointer factory | S | `infrastructure/ai/checkpointer.py` |
| Emit usage events from graph runs | M | Integration with `UsageEventSink` |
| Unit tests for graph nodes | M | `tests/infrastructure/ai/` |
**Quality Gates**:
- Existing summarization behavior unchanged
- `pytest tests/infrastructure/ai/` passes
- `make quality` passes
---
## Phase 1: Wrap Existing (Sprint 25)
### [Sprint 26: Meeting Q&A MVP](./sprint-26-meeting-qa/README.md)
| Task | Effort |
|------|--------|
| Wrap summarization in LangGraph graph | M |
| Keep outputs identical | S |
| Emit structured run metadata | M |
| Evaluation hooks integration | M |
**Objective**: Implement single-meeting Q&A with segment citations.
| Task | Effort | Deliverable |
|------|--------|-------------|
| Define MeetingQA graph (retrieve → verify → synthesize) | M | `infrastructure/ai/graphs/meeting_qa.py` |
| Create citation verifier node | M | `infrastructure/ai/nodes/verification.py` |
| Add proto messages (`AskAssistantRequest/Response`) | M | `noteflow.proto` |
| Add gRPC mixin (`AssistantMixin`) | M | `grpc/_mixins/assistant.py` |
| Add Rust command (`ask_assistant`) | M | `commands/assistant.rs` |
| Add TypeScript adapter method | S | `tauri-adapter.ts` |
| Add types for citations | S | `api/types/assistant.ts` |
| Create Ask UI component (meeting detail) | M | `components/meeting/AskPanel.tsx` |
| Gate behind `rag_enabled` feature flag | S | Service + UI |
| Integration tests | M | `tests/grpc/test_assistant.py` |
**Quality Gates**:
- Q&A returns answers with valid segment citations
- Citations link to correct timestamps
- Feature hidden when `rag_enabled=false`
- `make quality` passes
---
## Phase 2: Extend Capabilities (Sprint 26+)
### [Sprint 27: Cross-Meeting RAG](./sprint-27-cross-meeting/README.md)
| Task | Effort |
|------|--------|
| Research graphs with tool-using | L |
| Multi-step Q&A with retrieval | L |
| Caching layer | M |
| Guardrails (content filtering) | M |
| Streaming responses | M |
**Objective**: Enable workspace-scoped Q&A and annotation suggestions.
| Task | Effort | Deliverable |
|------|--------|-------------|
| Extend retrieval to omit `meeting_id` filter | S | Tool update |
| Add workspace-scoped semantic search | M | `SegmentRepository.search_semantic_workspace` |
| Create WorkspaceQA graph variant | M | `infrastructure/ai/graphs/workspace_qa.py` |
| Add suggested annotations output | M | Graph output schema |
| Create "Apply Annotation" flow | M | UI + gRPC |
| Add conversation history support | M | State management |
| Implement thread_id scheme | M | Checkpointer integration |
| Follow-up question support | M | Graph state persistence |
**Quality Gates**:
- Cross-meeting queries return results from multiple meetings
- Suggested annotations can be applied
- Conversation history persists across requests
- `make quality` passes
---
## Deliverables
### [Sprint 28: Advanced Capabilities](./sprint-28-advanced/README.md)
### Phase 1
- `src/noteflow/infrastructure/ai/graphs/summarization.py`
- Run metadata emission
- Evaluation hook wiring
**Objective**: Production hardening with streaming, caching, guardrails, and optional web search.
### Phase 2
- `src/noteflow/infrastructure/ai/graphs/research.py`
- `src/noteflow/infrastructure/ai/graphs/qa.py`
- `src/noteflow/infrastructure/ai/cache.py`
- `src/noteflow/infrastructure/ai/guardrails.py`
| Task | Effort | Deliverable |
|------|--------|-------------|
| Implement streaming responses | L | `StreamAssistant` RPC |
| Add caching layer for embeddings | M | `infrastructure/ai/cache.py` |
| Implement content guardrails | M | `infrastructure/ai/guardrails.py` |
| Add optional web search node | M | `infrastructure/ai/nodes/web_search.py` |
| Create `AGENT_PROGRESS` Tauri event | S | Event emission |
| Implement interrupts for approval flows | L | LangGraph interrupts |
| Performance optimization (batching) | M | Retrieval batching |
| Comprehensive E2E tests | M | `client/e2e/assistant.spec.ts` |
**Quality Gates**:
- Streaming shows progressive answer generation
- Cache reduces redundant embedding calls
- Guardrails prevent harmful content
- Web search gated by `allow_web` flag
- `make quality` passes
---
## Post-Sprint
## Detailed Implementation Guides
- Custom graph builder UI
- Graph sharing/marketplace
- Fine-tuning integration
Each sprint has a dedicated README with:
1. **Current State Analysis** — What exists, what's missing
2. **Target Code** — Exact files to create/modify with line estimates
3. **Implementation Tasks** — Step-by-step with code snippets
4. **Test Plans** — Unit, integration, E2E test specifications
5. **Acceptance Criteria** — Functional and technical requirements
6. **Rollback Plan** — How to disable if issues arise
---
## File Structure (Final State)
```
src/noteflow/
├── domain/
│ └── ai/
│ ├── state.py # Input/Output/Internal state schemas
│ ├── citations.py # SegmentCitation value object
│ └── ports.py # AssistantPort protocol
├── application/
│ └── services/
│ └── assistant/
│ ├── __init__.py
│ ├── assistant_service.py # Main orchestration
│ ├── _graph_runner.py # Graph compilation/execution
│ └── _usage_tracker.py # Usage event emission
├── infrastructure/
│ └── ai/
│ ├── graphs/
│ │ ├── __init__.py
│ │ ├── summarization.py # Wrapped summarization
│ │ ├── meeting_qa.py # Single-meeting Q&A
│ │ └── workspace_qa.py # Cross-meeting Q&A
│ ├── nodes/
│ │ ├── __init__.py
│ │ ├── retrieval.py # Semantic search node
│ │ ├── synthesis.py # Answer generation node
│ │ ├── verification.py # Citation verification
│ │ └── web_search.py # Optional web node
│ ├── tools/
│ │ ├── __init__.py
│ │ ├── retrieval.py # Tool adapters
│ │ └── synthesis.py
│ ├── checkpointer.py # Postgres checkpointer factory
│ ├── cache.py # Embedding cache
│ └── guardrails.py # Content filtering
└── grpc/
└── _mixins/
└── assistant.py # AskAssistant, StreamAssistant RPCs
client/
├── src/
│ ├── api/
│ │ └── types/
│ │ └── assistant.ts # Assistant types
│ ├── components/
│ │ └── meeting/
│ │ └── AskPanel.tsx # Q&A UI component
│ └── hooks/
│ └── use-assistant.ts # Assistant hook
└── src-tauri/
└── src/
└── commands/
└── assistant.rs # Tauri commands
```
---
## Proto Schema (Sprint 26)
```protobuf
// Assistant API
message SegmentCitation {
string meeting_id = 1;
int32 segment_id = 2;
float start_time = 3;
float end_time = 4;
string text = 5;
float score = 6;
}
message AskAssistantRequest {
string question = 1;
optional string meeting_id = 2; // If absent, workspace-scoped
optional string thread_id = 3; // For conversation continuity
bool allow_web = 4; // Gate web search
int32 top_k = 5; // Max segments to retrieve
}
message AskAssistantResponse {
string answer = 1;
repeated SegmentCitation citations = 2;
repeated SuggestedAnnotation suggested_annotations = 3;
string thread_id = 4;
}
message SuggestedAnnotation {
string text = 1;
AnnotationType type = 2;
repeated int32 segment_ids = 3;
}
// Add to NoteFlowService
rpc AskAssistant(AskAssistantRequest) returns (AskAssistantResponse);
rpc StreamAssistant(AskAssistantRequest) returns (stream AskAssistantResponse);
```
---
## Risk Mitigation
| Risk | Mitigation |
|------|------------|
| LangGraph API changes | Pin version, wrap in adapter layer |
| Checkpoint storage growth | Implement TTL, prune old threads |
| Hallucination | Citation verification node, conservative prompts |
| Performance (cold start) | Cache compiled graphs, warm embedder |
| Memory mode (no DB) | Graceful fallback to keyword search or disable feature |
---
## Success Metrics
| Metric | Sprint 26 Target | Sprint 28 Target |
|--------|------------------|------------------|
| Q&A latency (p95) | < 5s | < 3s |
| Citation accuracy | > 80% | > 90% |
| User adoption | 10% of active users | 30% of active users |
| Hallucination rate | < 20% | < 5% |
---
## References
- [LangGraph Documentation](https://langchain-ai.github.io/langgraph/)
- [LangGraph Persistence](https://docs.langchain.com/oss/python/langgraph/persistence)
- [LangGraph Streaming](https://docs.langchain.com/oss/python/langgraph/streaming)
- NoteFlow: `docs/sprints/phase-1-core-pipeline/sprint-1-ai-templates/README.md` (pattern reference)
- NoteFlow: `src/noteflow/application/services/summarization/` (service pattern)

View File

@@ -0,0 +1,135 @@
# Application Layer Development Guide
## Overview
The application layer contains **use cases and services** that orchestrate domain logic. Services coordinate between domain entities, infrastructure adapters, and external systems.
---
## Layer Responsibilities
| Concern | Application Layer | NOT Application Layer |
|---------|------------------|----------------------|
| Transaction management | Yes (via UnitOfWork) | - |
| Domain entity creation | Yes | - |
| Business rule validation | Yes (via domain) | - |
| External API calls | Delegate to infra | Direct calls |
| Database queries | Delegate to repos | Direct SQL |
| gRPC handling | No | gRPC mixins |
---
## Directory Structure
```
application/
├── services/ # Domain service implementations
│ ├── meeting_service.py
│ ├── annotation_service.py
│ ├── retention_service.py
│ ├── summarization_service.py
│ ├── ner_service.py
│ ├── calendar_service.py
│ ├── webhook_service.py
│ ├── export_service.py
│ ├── preferences_service.py
│ ├── identity_service.py
│ ├── project_service.py
│ └── recovery_service.py
├── ports/ # Service protocols (if needed)
└── __init__.py
```
---
## Service Pattern
### Constructor Injection
```python
class MeetingService:
def __init__(
self,
uow_factory: Callable[[], UnitOfWork],
asset_manager: AssetManager,
webhook_service: WebhookService | None = None,
) -> None:
self._uow_factory = uow_factory
self._asset_manager = asset_manager
self._webhook_service = webhook_service
```
### Use Case Methods
```python
async def create_meeting(
self,
params: MeetingCreateParams,
*,
created_by_id: UUID | None = None,
) -> Meeting:
"""Create a new meeting with optional project association."""
async with self._uow_factory() as uow:
meeting = Meeting.create(params, created_by_id=created_by_id)
created = await uow.meetings.create(meeting)
await uow.commit()
if self._webhook_service:
await self._webhook_service.fire_event(
WebhookEventType.MEETING_CREATED,
meeting_id=created.id,
)
return created
```
---
## Key Services
| Service | Purpose | Dependencies |
|---------|---------|--------------|
| **MeetingService** | Meeting CRUD, state transitions | UoW, AssetManager, WebhookService |
| **SummarizationService** | Generate/manage summaries | Providers, UoW, ConsentManager |
| **NerService** | Named entity extraction | NerEngine, UoW |
| **RetentionService** | Automated cleanup | UoW, settings |
| **RecoveryService** | Failed meeting recovery | UoW, AssetManager |
| **ExportService** | PDF/Markdown/HTML export | UoW, templates |
| **CalendarService** | Calendar sync | CalendarAdapter, OAuth |
| **WebhookService** | Event notification | WebhookExecutor, UoW |
---
## Transaction Management
Always use UnitOfWork context manager:
```python
async with self._uow_factory() as uow:
# All operations within transaction
meeting = await uow.meetings.get(meeting_id)
meeting = meeting.update(...)
await uow.meetings.update(meeting)
await uow.commit() # Explicit commit required
```
**Auto-rollback on exception** — UoW rolls back if exception occurs before commit.
---
## Forbidden Patterns
- **No direct database access** — Use repositories via UoW
- **No gRPC imports** — Application layer is transport-agnostic
- **No infrastructure details** — Don't import SQLAlchemy, httpx, etc.
- **No stateful services** — Services should be stateless; state in domain
---
## Adding a New Service
1. Create service class with constructor injection
2. Accept `uow_factory: Callable[[], UnitOfWork]` for data access
3. Define async use case methods
4. Wire up in `grpc/startup/services.py`
5. Add tests in `tests/application/services/`

View File

@@ -0,0 +1,526 @@
# Application Services Layer Development Guide
## Overview
The application services layer (`src/noteflow/application/services/`) orchestrates business operations between domain logic and infrastructure adapters. Services coordinate repositories, manage transactions, and enforce business rules.
**Architecture**: Clean Architecture use cases with mixin-based composition
---
## Service Catalog
### Core Services
| Service | Directory | Responsibility |
|---------|-----------|----------------|
| **MeetingService** | `meeting/` | Meeting CRUD, state transitions, segments, summaries, annotations |
| **IdentityService** | `identity/` | User/workspace context, defaults, membership |
| **ProjectService** | `project_service/` | Project CRUD, rules inheritance, role resolution |
| **ExportService** | `export/` | Transcript export (Markdown, HTML, PDF) |
### AI & Processing Services
| Service | Directory | Responsibility |
|---------|-----------|----------------|
| **SummarizationService** | `summarization/` | LLM coordination, provider selection, consent |
| **SummarizationTemplateService** | `summarization/` | Template CRUD and resolution |
| **NerService** | `ner/` | Named entity extraction, model lifecycle |
| **AsrConfigService** | `asr_config/` | ASR engine reconfiguration |
### Integration Services
| Service | Directory | Responsibility |
|---------|-----------|----------------|
| **CalendarService** | `calendar/` | OAuth flow, calendar event sync |
| **AuthService** | `auth/` | User authentication via OAuth |
| **WebhookService** | `webhooks/` | Webhook registration and delivery |
| **TriggerService** | `triggers/` | Trigger evaluation, rate limiting |
### Lifecycle Services
| Service | Directory | Responsibility |
|---------|-----------|----------------|
| **RecoveryService** | `recovery/` | Crash recovery for meetings and jobs |
| **RetentionService** | `retention/` | Automatic meeting deletion |
---
## Dependency Injection Patterns
### Pattern 1: Constructor-Based (Direct UnitOfWork)
```python
class MeetingService:
def __init__(self, uow: UnitOfWork) -> None:
self._uow = uow
# Usage
async with uow:
service = MeetingService(uow)
result = await service.create_meeting(title="Test")
```
**Use when**: Service lifetime matches transaction lifetime.
### Pattern 2: Factory Function Injection
```python
class AuthService:
def __init__(
self,
uow_factory: Callable[[], UnitOfWork],
settings: CalendarIntegrationSettings,
) -> None:
self._uow_factory = uow_factory
self._settings = settings
# Usage - fresh UnitOfWork per operation
async def handle_login():
async with auth_service._uow_factory() as uow:
result = await auth_service.complete_login(code, state)
```
**Use when**: Service is long-lived, operations are independent.
### Pattern 3: Optional Dependency Overrides
```python
class _CalendarServiceDepsKwargs(TypedDict, total=False):
oauth_manager: OAuthManager
google_adapter: GoogleCalendarAdapter
outlook_adapter: OutlookCalendarAdapter
class CalendarService:
def __init__(
self,
uow_factory: Callable[[], UnitOfWork],
settings: Settings,
**kwargs: Unpack[_CalendarServiceDepsKwargs],
):
self._oauth_manager = kwargs.get("oauth_manager") or OAuthManager(settings)
```
**Use when**: Testing requires dependency substitution.
---
## Mixin Composition Pattern
Services use mixins to organize related behaviors:
```python
class ProjectService(
RuleInheritanceMixin, # Compute effective rules
ProjectRoleResolverMixin, # Role resolution
ProjectCrudMixin, # CRUD operations
ActiveProjectMixin, # Active project tracking
ProjectMembershipMixin, # Membership management
):
"""Combines 5 focused responsibilities into one service."""
```
### Mixin Guidelines
1. **Single responsibility**: Each mixin handles one concern
2. **Protocol parameters**: Accept protocols, not concrete UnitOfWork
3. **No shared state**: Methods receive dependencies as parameters
4. **Reusable**: Can be composed into different services
```python
class ProjectCrudMixin:
async def create_project(
self,
uow: ProjectCrudRepositoryProvider, # Protocol, not UnitOfWork
workspace_id: UUID,
name: str,
) -> Project:
...
```
---
## UnitOfWork Integration
### Capability-Based Access
Always check capability flags before accessing optional repositories:
```python
async def extract_entities(self, meeting_id: UUID) -> ExtractionResult:
if not self._uow.supports_entities:
logger.warning("NER disabled: database doesn't support entities")
return ExtractionResult(entities=[], cached=False, total_count=0)
# Now safe to access
existing = await self._uow.entities.fetch_by_meeting(meeting_id)
```
### Transaction Lifecycle
```python
async with uow:
meeting = await uow.meetings.get(meeting_id)
meeting.state = MeetingState.RECORDING
await uow.meetings.update(meeting)
# Auto-commit on success, rollback on exception
```
### Capability Flags Reference
| Flag | Repository | Required for |
|------|------------|--------------|
| `supports_annotations` | annotations | User annotations |
| `supports_entities` | entities | NER results |
| `supports_webhooks` | webhooks | Webhook config/delivery |
| `supports_integrations` | integrations | OAuth integrations |
| `supports_diarization_jobs` | diarization_jobs | Speaker diarization |
| `supports_preferences` | preferences | User preferences |
| `supports_usage_events` | usage_events | Analytics |
| `supports_projects` | projects | Multi-project mode |
| `supports_workspaces` | workspaces | Multi-tenant mode |
---
## Key Service Patterns
### MeetingService: State Machine Enforcement
```python
async def transition_state(self, meeting_id: UUID, new_state: MeetingState) -> Meeting:
meeting = await self._uow.meetings.get(meeting_id)
if not meeting.can_transition_to(new_state):
raise StateError(
error_code=ErrorCode.INVALID_STATE_TRANSITION,
current_state=meeting.state,
required_state=new_state,
)
meeting.state = new_state
await self._uow.meetings.update(meeting)
return meeting
```
### SummarizationService: Provider Selection with Consent
```python
async def summarize(self, meeting_id: UUID, options: SummarizationOptions) -> SummaryResult:
# Select provider based on mode and consent
provider = self._select_provider(options.mode)
if provider.requires_consent and not self._consent_granted:
raise ConsentRequiredError(consent_type="cloud_summarization")
# Generate summary with citation verification
result = await provider.summarize(segments, options)
if options.verify_citations:
result = self._verify_and_filter_citations(result, segments)
return result
```
### ProjectService: Rules Inheritance
```python
def get_effective_rules(
self,
workspace_settings: WorkspaceSettings,
project_settings: ProjectSettings | None,
) -> EffectiveRules:
"""Resolve settings cascade: SYSTEM_DEFAULTS → Workspace → Project."""
return EffectiveRules(
export=project_settings.export_rules
or workspace_settings.export_rules
or SYSTEM_DEFAULTS.export,
trigger=project_settings.trigger_rules
or workspace_settings.trigger_rules
or SYSTEM_DEFAULTS.trigger,
rag_enabled=project_settings.rag_enabled
?? workspace_settings.rag_enabled
?? SYSTEM_DEFAULTS.rag_enabled,
)
```
### NerService: Model Lifecycle with Locking
```python
class NerService:
_model_load_lock: asyncio.Lock
async def ensure_ready(self) -> None:
async with self._model_load_lock:
if self._ner_port.is_ready():
return
await self._ner_port.load_model()
# Warmup with dummy text
await self._ner_port.extract("Test sentence for warmup.")
```
---
## Error Handling
### Service-Level Error Translation
```python
class AuthService:
async def complete_login(self, code: str, state: str) -> AuthResult:
try:
tokens = await self._token_exchanger.exchange(code, state)
return AuthResult(user=user, tokens=tokens)
except OAuthError as e:
logger.warning("auth_login_failed", error=str(e))
raise AuthServiceError(
message="OAuth login failed",
error_code=ErrorCode.PROVIDER_ERROR,
) from e
```
### Domain Error Mapping
Services raise domain errors; gRPC layer maps to gRPC status:
```python
# Service raises
raise NotFoundError(
resource_type="meeting",
resource_id=str(meeting_id),
error_code=ErrorCode.MEETING_NOT_FOUND,
)
# gRPC layer (interceptor) converts
abort(error.error_code.grpc_status, str(error))
```
---
## Testing Services
### Mock UnitOfWork Fixture
```python
@pytest.fixture
def mock_uow() -> MagicMock:
uow = MagicMock(spec=UnitOfWork)
uow.meetings = MagicMock(spec=MeetingRepository)
uow.segments = MagicMock(spec=SegmentRepository)
uow.supports_annotations = True
uow.supports_entities = True
uow.commit = AsyncMock()
uow.__aenter__ = AsyncMock(return_value=uow)
uow.__aexit__ = AsyncMock(return_value=None)
return uow
```
### Test Structure
```python
class TestMeetingServiceCreation:
async def test_create_meeting_success(self, mock_uow, sample_meeting):
mock_uow.meetings.create = AsyncMock(return_value=sample_meeting)
service = MeetingService(mock_uow)
result = await service.create_meeting(title="Test")
assert result.title == "Test"
mock_uow.meetings.create.assert_called_once()
mock_uow.commit.assert_called_once()
async def test_create_meeting_without_title_uses_default(self, mock_uow):
# Test that default title is generated
...
```
### Optional Dependency Override Testing
```python
async def test_calendar_service_with_mock_adapter(mock_uow_factory):
mock_google = MagicMock(spec=GoogleCalendarAdapter)
mock_google.fetch_events = AsyncMock(return_value=[sample_event])
service = CalendarService(
uow_factory=mock_uow_factory,
settings=test_settings,
google_adapter=mock_google, # Inject mock
)
events = await service.get_calendar_events(integration_id, time_range)
assert len(events) == 1
```
---
## Service-to-Repository Mapping
| Service | Core Repos | Optional Repos |
|---------|-----------|----------------|
| MeetingService | meetings, segments, summaries, assets | annotations |
| ProjectService | projects, project_memberships | — |
| IdentityService | workspaces, users, projects | — |
| ExportService | meetings, segments | — |
| SummarizationService | summaries | templates |
| NerService | — | entities |
| CalendarService | integrations | — |
| AuthService | integrations, users | — |
| WebhookService | webhooks | — |
| RecoveryService | meetings, diarization_jobs | — |
| RetentionService | meetings | — |
---
## Adding New Services
### 1. Create Service Directory
```
src/noteflow/application/services/my_service/
├── __init__.py # Public exports
├── service.py # Main service class
├── mixins/ # Optional: focused behaviors
├── protocols.py # Optional: repository protocols
└── helpers.py # Optional: helper functions
```
### 2. Define Service Class
```python
class MyService:
def __init__(self, uow: UnitOfWork) -> None:
self._uow = uow
async def my_operation(self, param: str) -> MyResult:
# Check capability if using optional repo
if not self._uow.supports_my_feature:
raise DatabaseRequiredError(feature="my_feature")
result = await self._uow.my_repo.do_something(param)
await self._uow.commit()
return result
```
### 3. Export from Module
```python
# __init__.py
from .service import MyService
__all__ = ["MyService"]
```
### 4. Wire in gRPC Startup
```python
# grpc/startup/services.py
def create_services(uow: UnitOfWork) -> ServiceContainer:
return ServiceContainer(
meeting_service=MeetingService(uow),
my_service=MyService(uow), # Add here
...
)
```
---
## Forbidden Patterns
### ❌ Direct Infrastructure Access
```python
# WRONG: Service accessing database directly
class MeetingService:
def __init__(self, db_connection: asyncpg.Connection):
self._conn = db_connection
# RIGHT: Access via repositories
class MeetingService:
def __init__(self, uow: UnitOfWork):
self._uow = uow
```
### ❌ Business Logic in Repositories
```python
# WRONG: Logic in repository
class MeetingRepository:
async def create_and_notify(self, meeting: Meeting) -> Meeting:
result = await self.create(meeting)
await self.webhook_service.notify(...) # Don't!
# RIGHT: Logic in service
class MeetingService:
async def create_meeting(self, title: str) -> Meeting:
meeting = await self._uow.meetings.create(...)
await self._webhook_service.notify(...)
```
### ❌ Skipping Capability Checks
```python
# WRONG: Assume optional feature exists
entities = await self._uow.entities.fetch_all()
# RIGHT: Check capability first
if self._uow.supports_entities:
entities = await self._uow.entities.fetch_all()
else:
entities = []
```
### ❌ God Services
```python
# WRONG: Single service does everything
class AppService:
async def create_meeting_and_summarize_and_export_and_webhook(self, ...):
... # 500 lines of mixed concerns
# RIGHT: Focused services with composition
class MeetingService: ...
class SummarizationService: ...
class ExportService: ...
class WebhookService: ...
```
---
## Observability
### Structured Logging
```python
logger.info(
"meeting_created",
meeting_id=str(meeting.id),
workspace_id=str(meeting.workspace_id),
title=meeting.title,
)
logger.debug(
"segment_batch_created",
meeting_id=str(meeting_id),
segment_count=len(segments),
)
```
### Usage Event Tracking
```python
# SummarizationService tracks LLM usage
usage_event = UsageEvent(
event_type="summarization",
meeting_id=meeting_id,
provider=provider_name,
tokens_used=result.tokens_used,
latency_ms=result.latency_ms,
)
await self._usage_sink.record(usage_event)
```
---
## See Also
- `/src/noteflow/domain/CLAUDE.md` — Domain layer patterns
- `/src/noteflow/infrastructure/CLAUDE.md` — Infrastructure adapters
- `/src/noteflow/grpc/startup/services.py` — Service initialization
- `/tests/application/` — Service test suites

View File

@@ -0,0 +1,579 @@
# Domain Layer Development Guide
## Overview
The domain layer (`src/noteflow/domain/`) contains pure business logic with no external dependencies. It defines entities, value objects, ports (interfaces), and business rules.
**Architecture**: Clean Architecture / Domain-Driven Design (DDD)
- No imports from infrastructure, application, or grpc layers
- Pure Python with dataclasses and protocols
- All validation in `__post_init__` methods
- Immutable value objects (frozen dataclasses)
---
## Entity Catalog
### Primary Aggregates
| Entity | File | Purpose | State Machine |
|--------|------|---------|---------------|
| **Meeting** | `entities/meeting.py` | Recording session aggregate root | CREATED → RECORDING → STOPPING → STOPPED → COMPLETED (ERROR from any) |
| **Segment** | `entities/segment.py` | Timestamped transcript chunk | Immutable |
| **Summary** | `entities/summary.py` | LLM-generated summary with evidence | Immutable |
| **Annotation** | `entities/annotation.py` | User-created marker | Immutable |
| **NamedEntity** | `entities/named_entity.py` | Extracted entity (person, org, etc.) | Immutable |
| **Project** | `entities/project.py` | Workspace sub-grouping | Configuration |
| **Integration** | `entities/integration.py` | OAuth connection state | DISCONNECTED ↔ CONNECTED ↔ ERROR |
### Identity Aggregates
| Entity | File | Purpose |
|--------|------|---------|
| **User** | `identity/entities.py` | Account identity |
| **Workspace** | `identity/entities.py` | Tenancy boundary |
| **WorkspaceMembership** | `identity/entities.py` | User-workspace role binding |
| **ProjectMembership** | `identity/entities.py` | User-project role binding |
---
## Meeting State Machine
```
UNSPECIFIED (initial, proto default)
CREATED
├──→ RECORDING ──→ STOPPING ──→ STOPPED ──→ COMPLETED
│ │ │ │ │
└─────────┴────────────┴───────────┴────────────┴──→ ERROR (terminal)
```
**State validation**:
```python
# Meeting.valid_transitions defines legal transitions
if not meeting.can_transition_to(new_state):
raise StateError(
error_code=ErrorCode.INVALID_STATE_TRANSITION,
current_state=meeting.state,
required_state=new_state,
)
```
---
## Evidence Linking Pattern
Summaries, annotations, and entities all link back to transcript segments via `segment_ids`:
```python
# Summary evidence
@dataclass(frozen=True)
class KeyPoint:
text: str
segment_ids: list[int] # References Segment.segment_id
def is_sourced(self) -> bool:
return len(self.segment_ids) > 0
# Annotation
@dataclass(frozen=True)
class Annotation:
segment_ids: list[int] # Evidence backref
# NamedEntity
@dataclass(frozen=True)
class NamedEntity:
segment_ids: list[int] # Where entity was found
```
**Invariant**: All `segment_ids` must reference valid segments within the same meeting.
---
## Port Definitions
### Repository Ports (`ports/repositories/`)
| Port | Methods | Capability Flag |
|------|---------|-----------------|
| `MeetingRepository` | create, fetch, fetch_by_id, update, delete, exists | (core) |
| `SegmentRepository` | batch_create, fetch_all, fetch_by_ids, delete_all | (core) |
| `SummaryRepository` | create, fetch, update, delete, exists | (core) |
| `AssetRepository` | create, fetch, delete, list | (core) |
| `AnnotationRepository` | create, fetch_all, delete, delete_for_meeting | `supports_annotations` |
| `EntityRepository` | create, fetch_all, delete_for_meeting | `supports_entities` |
| `WebhookRepository` | create, fetch, update, delete, fetch_all | `supports_webhooks` |
| `IntegrationRepository` | create, fetch, update, delete, fetch_all | `supports_integrations` |
| `DiarizationJobRepository` | create, fetch, update, delete | `supports_diarization_jobs` |
| `PreferencesRepository` | get, set, delete | `supports_preferences` |
| `UsageEventRepository` | create, fetch_all, aggregate | `supports_usage_events` |
### UnitOfWork Protocol (`ports/unit_of_work.py`)
```python
class UnitOfWork(Protocol):
# Core repositories (always available)
meetings: MeetingRepository
segments: SegmentRepository
summaries: SummaryRepository
assets: AssetRepository
# Capability flags
supports_annotations: bool
supports_entities: bool
supports_webhooks: bool
# ... etc
# Lifecycle
async def __aenter__(self) -> Self: ...
async def __aexit__(self, *args) -> None: ...
async def commit(self) -> None: ...
async def rollback(self) -> None: ...
```
**Usage pattern**:
```python
async with uow:
if uow.supports_entities:
entities = await uow.entities.get_by_meeting(meeting_id)
# Auto-commit on success, rollback on exception
```
### Service Ports
| Port | File | Purpose |
|------|------|---------|
| `NerPort` | `ports/ner.py` | Named entity extraction |
| `CalendarPort` | `ports/calendar.py` | Calendar sync |
| `SummarizationProvider` | `summarization/ports.py` | LLM summarization |
---
## Value Objects (`value_objects.py`)
### Enums
| Enum | Values | Purpose |
|------|--------|---------|
| `MeetingState` | UNSPECIFIED, CREATED, RECORDING, STOPPING, STOPPED, COMPLETED, ERROR | Meeting lifecycle |
| `AnnotationType` | ACTION_ITEM, DECISION, NOTE, RISK | User annotation types |
| `ExportFormat` | MARKDOWN, HTML, PDF | Export outputs |
| `OAuthProvider` | GOOGLE, OUTLOOK | Calendar OAuth |
### Frozen Value Objects
```python
@dataclass(frozen=True)
class OAuthTokens:
access_token: str
refresh_token: str | None
token_type: str
expires_at: datetime
scope: str
def is_expired(self, buffer_seconds: int = 0) -> bool:
return datetime.now(UTC) >= self.expires_at - timedelta(seconds=buffer_seconds)
```
---
## Identity & Authorization
### Role Hierarchy
```
Workspace
├── WorkspaceMembership (user_id, role: WorkspaceRole)
│ └── OWNER > ADMIN > MEMBER > VIEWER
└── Project
└── ProjectMembership (user_id, role: ProjectRole)
└── ADMIN > EDITOR > VIEWER
```
### Permission Methods
```python
class WorkspaceRole(StrEnum):
OWNER = "owner"
ADMIN = "admin"
MEMBER = "member"
VIEWER = "viewer"
def can_write(self) -> bool:
return self in (WorkspaceRole.OWNER, WorkspaceRole.ADMIN, WorkspaceRole.MEMBER)
def can_admin(self) -> bool:
return self in (WorkspaceRole.OWNER, WorkspaceRole.ADMIN)
```
### OperationContext Pattern
```python
@dataclass(frozen=True)
class OperationContext:
user: UserContext
workspace: WorkspaceContext
project: ProjectContext | None
request_id: str | None = None
def can_write(self) -> bool:
return self.workspace.role.can_write()
def is_admin(self) -> bool:
return self.workspace.role.can_admin()
def can_write_project(self) -> bool:
# Workspace admin can always write to any project
if self.is_admin():
return True
if self.project is None:
return False
return self.project.role.can_write()
```
### Default Identity (Local-First Mode)
```python
DEFAULT_USER_ID = UUID("00000000-0000-0000-0000-000000000001")
DEFAULT_WORKSPACE_ID = UUID("00000000-0000-0000-0000-000000000001")
DEFAULT_PROJECT_ID = UUID("00000000-0000-0000-0000-000000000002")
```
---
## Rules Engine (`rules/`)
### Rule Modes (Progression Path)
| Mode | Status | Description |
|------|--------|-------------|
| `SIMPLE` | Implemented | Static key-value config (MVP) |
| `CONDITIONAL` | Future | If-then expressions |
| `EXPRESSION` | Future | Full expression language |
| `NATURAL` | Future | NLP → expression |
### Rule Types
```python
@RuleTypeRegistry.register
class ExportRuleType(RuleType):
name: ClassVar[str] = "export"
def evaluate(self, rule_config: dict, context: RuleContext) -> RuleResult:
# Return matched actions based on context
...
def get_schema(self) -> dict:
return {
"default_format": {"type": "string", "enum": ["markdown", "html", "pdf"]},
"include_audio": {"type": "boolean"},
"include_timestamps": {"type": "boolean"},
}
```
### Adding New Rule Types
1. Create class inheriting from `RuleType`
2. Set `name` class variable
3. Implement `evaluate()`, `validate_config()`, `get_schema()`
4. Register with `@RuleTypeRegistry.register` decorator
---
## Error Handling (`errors.py`)
### ErrorCode Enum
```python
class ErrorCode(Enum):
# Each has (code_string, grpc_status) pair
MEETING_NOT_FOUND = ("MEETING_NOT_FOUND", StatusCode.NOT_FOUND)
INVALID_MEETING_ID = ("INVALID_MEETING_ID", StatusCode.INVALID_ARGUMENT)
MEETING_ALREADY_STOPPED = ("MEETING_ALREADY_STOPPED", StatusCode.FAILED_PRECONDITION)
CONSENT_REQUIRED = ("CONSENT_REQUIRED", StatusCode.FAILED_PRECONDITION)
WORKSPACE_ACCESS_DENIED = ("WORKSPACE_ACCESS_DENIED", StatusCode.PERMISSION_DENIED)
PROVIDER_ERROR = ("PROVIDER_ERROR", StatusCode.INTERNAL)
# ... 29 total codes
```
### Exception Hierarchy
```python
DomainError (base)
NotFoundError (resource_type, resource_id)
ValidationError (field, reason)
StateError (current_state, required_state)
ProviderError (provider_name, operation, reason)
ConsentRequiredError (consent_type, operation)
DatabaseRequiredError (feature)
PermissionDeniedError (resource_type, resource_id)
CannotArchiveDefaultProjectError (project_id)
```
### Raising Domain Errors
```python
# Always use specific exceptions with error codes
raise NotFoundError(
resource_type="meeting",
resource_id=str(meeting_id),
error_code=ErrorCode.MEETING_NOT_FOUND,
)
# gRPC layer automatically maps error_code.grpc_status
```
---
## Settings Inheritance (`settings/`, `entities/project.py`)
### Cascade Resolution
```
SYSTEM_DEFAULTS
↓ (override with non-None)
Workspace.settings
↓ (override with non-None)
Project.settings
EffectiveRules (fully resolved, no None values)
```
### ExtensibleSettings Pattern
```python
class ExtensibleSettings:
extensions: dict[str, object] # Experimental features
schema_version: int = 1
def set_extension(self, key: str, value: object) -> None: ...
def has_extension(self, key: str) -> bool: ...
```
**Lifecycle**: Experimental feature in `extensions["key"]` → promoted to typed field after validation
---
## Webhooks (`webhooks/`)
### Event Types
```python
class WebhookEventType(StrEnum):
MEETING_COMPLETED = "meeting.completed"
SUMMARY_GENERATED = "summary.generated"
RECORDING_STARTED = "recording.started"
RECORDING_STOPPED = "recording.stopped"
ENTITIES_EXTRACTED = "entities.extracted" # Future
DIARIZATION_COMPLETED = "diarization.completed" # Future
```
### WebhookConfig
```python
@dataclass(frozen=True)
class WebhookConfig:
id: UUID
workspace_id: UUID
url: str
events: frozenset[WebhookEventType] # Subscribed events
secret: str | None # HMAC-SHA256 signing key
enabled: bool
timeout_ms: int = 30000
max_retries: int = 3
```
---
## Triggers (`triggers/`)
### Signal → Decision Flow
```
TriggerSignal (source, weight, app_name, timestamp)
↓ (aggregate multiple signals)
TriggerDecision (action, confidence, signals)
TriggerAction: IGNORE (conf < 0.40) | NOTIFY (0.40-0.79) | AUTO_START (>= 0.80)
```
### Trigger Sources
| Source | Implementation Status |
|--------|----------------------|
| `AUDIO_ACTIVITY` | Implemented |
| `FOREGROUND_APP` | Implemented |
| `CALENDAR` | Future |
---
## Key Design Patterns
### 1. Frozen Dataclasses for Immutability
```python
@dataclass(frozen=True)
class Segment:
segment_id: int
text: str
start_time: float
end_time: float
def __post_init__(self):
if self.start_time >= self.end_time:
raise ValueError("start_time must be < end_time")
```
### 2. Factory Methods for Complex Construction
```python
class Meeting:
@classmethod
def create(
cls,
workspace_id: UUID,
created_by_id: UUID,
title: str | None = None,
project_id: UUID | None = None,
) -> Meeting:
return cls(
id=uuid4(),
workspace_id=workspace_id,
created_by_id=created_by_id,
title=title or f"Meeting {datetime.now().strftime('%Y-%m-%d %H:%M')}",
project_id=project_id,
state=MeetingState.CREATED,
created_at=utc_now(),
updated_at=utc_now(),
version=1,
)
```
### 3. Protocol-Based Ports (Structural Subtyping)
```python
class NerPort(Protocol):
def extract(self, text: str) -> list[NamedEntity]: ...
def extract_from_segments(self, segments: list[tuple[int, str]]) -> list[NamedEntity]: ...
def is_ready(self) -> bool: ...
```
### 4. NewType for Distinct IDs
```python
from typing import NewType
MeetingId = NewType("MeetingId", UUID)
AnnotationId = NewType("AnnotationId", UUID)
```
---
## Forbidden Patterns
### Importing Infrastructure/Application
```python
# WRONG: Domain must be pure
from noteflow.infrastructure.persistence import SqlAlchemyUnitOfWork
from noteflow.application.services import MeetingService
# RIGHT: Use protocols from ports/
from noteflow.domain.ports.unit_of_work import UnitOfWork
```
### Mutable Entities
```python
# WRONG: Mutable state
@dataclass
class Segment:
text: str
segment.text = "new text" # Don't mutate!
# RIGHT: Create new instances or use frozen
@dataclass(frozen=True)
class Segment:
text: str
```
### Business Logic in Repositories
```python
# WRONG: Logic in repository
class MeetingRepository:
async def create_and_notify(self, meeting: Meeting) -> Meeting:
result = await self.create(meeting)
await self.webhook_service.notify(...) # Don't do this!
# RIGHT: Logic in application services
```
---
## Testing Domain Code
### Unit Test Entities Directly
```python
def test_meeting_state_transition():
meeting = Meeting.create(workspace_id=uuid4(), created_by_id=uuid4())
assert meeting.state == MeetingState.CREATED
assert meeting.can_transition_to(MeetingState.RECORDING)
assert not meeting.can_transition_to(MeetingState.COMPLETED)
def test_segment_invariant():
with pytest.raises(ValueError, match="start_time must be < end_time"):
Segment(segment_id=0, text="test", start_time=10.0, end_time=5.0, ...)
```
### Mock Ports with Protocol-Compatible Mocks
```python
@pytest.fixture
def mock_ner_port() -> NerPort:
mock = Mock(spec=NerPort)
mock.extract.return_value = [NamedEntity(...)]
mock.is_ready.return_value = True
return mock
```
---
## Directory Structure
```
domain/
├── entities/ # Aggregate roots and value objects
│ ├── meeting.py # Meeting, MeetingCreateParams
│ ├── segment.py # Segment, WordTiming
│ ├── summary.py # Summary, KeyPoint, ActionItem
│ ├── annotation.py # Annotation
│ ├── named_entity.py # NamedEntity, EntityCategory
│ ├── project.py # Project, ExportRules, TriggerRules
│ ├── integration.py # Integration, IntegrationType
│ └── processing.py # ProcessingStatus, ProcessingStepState
├── ports/ # Interface definitions
│ ├── repositories/ # Repository protocols
│ ├── unit_of_work.py # UnitOfWork protocol
│ ├── ner.py # NerPort
│ └── calendar.py # CalendarPort
├── identity/ # User/workspace/project
├── auth/ # OIDC configuration
├── webhooks/ # Webhook events and config
├── triggers/ # Trigger signals and decisions
├── rules/ # Rule engine
├── settings/ # ExtensibleSettings base
├── errors.py # ErrorCode, DomainError hierarchy
└── value_objects.py # MeetingState, AnnotationType, etc.
```
---
## See Also
- `/src/noteflow/CLAUDE.md` — Python backend standards
- `/src/noteflow/infrastructure/CLAUDE.md` — Infrastructure adapter patterns
- `/src/noteflow/application/services/` — Service layer (uses domain)

View File

@@ -0,0 +1,139 @@
# Domain Entities Development Guide
## Overview
The entities directory contains **aggregate roots, value objects, and domain entities** that form the core business model. All entities follow Domain-Driven Design (DDD) principles with immutable value objects and rich domain models.
---
## Entity Catalog
### Primary Aggregates (Transcript)
| Entity | File | Purpose |
|--------|------|---------|
| **Meeting** | `meeting.py` | Recording session aggregate root |
| **Segment** | `segment.py` | Timestamped transcript chunk with word-level timing |
| **Summary** | `summary.py` | LLM-generated summary with evidence linking |
| **Annotation** | `annotation.py` | User-created marker (action, decision, note, risk) |
| **NamedEntity** | `named_entity.py` | Extracted NER entity |
### Configuration Entities
| Entity | File | Purpose |
|--------|------|---------|
| **Project** | `project.py` | Meeting grouping with settings cascade |
| **Integration** | `integration.py` | OAuth/external service connection |
| **SummarizationTemplate** | `summarization_template.py` | Custom summary format |
### Processing Entities
| Entity | File | Purpose |
|--------|------|---------|
| **ProcessingStatus** | `processing.py` | Post-meeting workflow tracker |
---
## Key Patterns
### 1. Frozen Dataclasses for Immutability
```python
@dataclass(frozen=True)
class Segment:
segment_id: int
text: str
start_time: float
end_time: float
words: list[WordTiming]
speaker_id: str | None = None
```
### 2. Factory Methods
```python
class Meeting:
@classmethod
def create(
cls,
params: MeetingCreateParams,
*,
created_by_id: UUID | None = None,
) -> Meeting:
"""Encapsulates initialization logic."""
```
### 3. Evidence Linking (Critical)
All LLM outputs link back to source segments:
```python
@dataclass(frozen=True)
class KeyPoint:
text: str
segment_ids: list[int] # References Segment.segment_id
def is_sourced(self) -> bool:
return len(self.segment_ids) > 0
```
### 4. NewType for Distinct IDs
```python
MeetingId = NewType("MeetingId", UUID)
AnnotationId = NewType("AnnotationId", UUID)
```
---
## State Machines
### MeetingState Transitions
```
CREATED → RECORDING → STOPPING → STOPPED → COMPLETED
↓ ↓ ↓ ↓ ↓
ERROR ←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←
```
Use `can_transition_to()` method to validate transitions.
### ProcessingStepState
```
PENDING → RUNNING → COMPLETED | FAILED | SKIPPED
```
---
## Settings Cascade
```
SYSTEM_DEFAULTS (EffectiveRules)
↓ (override if non-None)
Workspace.settings
↓ (override if non-None)
Project.settings
EffectiveRules (all fields have values)
```
---
## Forbidden Patterns
- **No infrastructure imports** in this directory
- **No mutable state** in value objects
- **No business logic in repositories**
- **No database models** (those live in `infrastructure/persistence/models/`)
---
## Adding a New Entity
1. Create dataclass with `frozen=True` for value objects
2. Add factory method if construction is complex
3. Define state enum if entity has lifecycle
4. Add NewType for ID if distinct from UUID
5. Export in `__init__.py`
6. Define repository protocol in `ports/repositories/`

View File

@@ -0,0 +1,101 @@
# Domain Ports Development Guide
## Overview
The ports directory defines **interface protocols** for external dependencies. These are contracts that infrastructure adapters implement. Domain code depends on ports, never on concrete implementations.
---
## Port Catalog
### Unit of Work (`unit_of_work.py`)
The central transaction management protocol with **capability flags**:
```python
class UnitOfWork(Protocol):
# Always available
meetings: MeetingRepository
segments: SegmentRepository
summaries: SummaryRepository
assets: AssetRepository
# Check capability before access
supports_annotations: bool
annotations: AnnotationRepository # Only if supports_annotations
supports_entities: bool
entities: EntityRepository # Only if supports_entities
supports_webhooks: bool
webhooks: WebhookRepository # Only if supports_webhooks
```
**Usage Pattern**:
```python
async with uow:
if uow.supports_entities:
entities = await uow.entities.get_by_meeting(meeting_id)
await uow.commit()
```
### Repository Protocols (`repositories/`)
| Module | Protocols |
|--------|-----------|
| `transcript.py` | MeetingRepository, SegmentRepository, SummaryRepository, AnnotationRepository |
| `asset.py` | AssetRepository |
| `background.py` | DiarizationJobRepository, PreferencesRepository |
| `external/` | EntityRepository, IntegrationRepository, WebhookRepository, UsageEventRepository |
| `identity/` | UserRepository, WorkspaceRepository, ProjectRepository, MembershipRepository |
### Service Ports
| Port | File | Purpose |
|------|------|---------|
| NerPort | `ner.py` | Named entity extraction |
| CalendarPort | `calendar.py` | Calendar event sync |
| DiarizationEngine | `diarization.py` | Speaker identification |
| SummarizationProvider | in `domain/summarization/ports.py` | LLM summarization |
---
## Repository Protocol Pattern
```python
class MeetingRepository(Protocol):
async def create(self, meeting: Meeting) -> Meeting: ...
async def get(self, meeting_id: MeetingId) -> Meeting | None: ...
async def update(self, meeting: Meeting) -> Meeting: ...
async def delete(self, meeting_id: MeetingId) -> bool: ...
async def list_all(self, **kwargs: MeetingListKwargs) -> tuple[Sequence[Meeting], int]: ...
```
---
## Key Principles
1. **Protocol-based** (structural typing) — No inheritance required
2. **Async-first** — All I/O operations are async
3. **Capability flags** — Check before accessing optional repositories
4. **No implementation details** — Ports don't know about SQL, HTTP, etc.
---
## Forbidden Patterns
- **No concrete types** — Use Protocol, not ABC with implementation
- **No infrastructure imports** — Ports are pure interfaces
- **No business logic** — Ports define what, not how
- **No direct database types** — Use domain entities only
---
## Adding a New Port
1. Create Protocol class with method signatures
2. Add to appropriate module in `repositories/`
3. Add capability flag to UnitOfWork if optional
4. Implement in `infrastructure/` directory
5. Wire up in `grpc/startup/services.py`

View File

@@ -0,0 +1,151 @@
# Rules Engine Development Guide
## Overview
The rules engine enables **configurable behavior** through a pluggable rule type system. It supports progressive complexity from simple key-value configs to future expression-based rules.
---
## Rule Modes (MVP to Future)
| Mode | Status | Description |
|------|--------|-------------|
| **SIMPLE** | Implemented | Static key-value config |
| **CONDITIONAL** | Future | If-then conditions |
| **EXPRESSION** | Future | Full expression DSL |
| **NATURAL** | Future | NLP-transformed expression |
---
## Core Models (`models.py`)
### RuleAction
```python
@dataclass
class RuleAction:
action_type: str # "set_value", "notify", "auto_start"
params: dict[str, object]
```
### ConditionalRule
```python
@dataclass
class ConditionalRule:
name: str
mode: RuleMode = RuleMode.SIMPLE
condition: str | None = None
actions: list[RuleAction] = []
priority: int = 0 # Higher = evaluated first
enabled: bool = True
```
### RuleSet
```python
@dataclass
class RuleSet:
rule_type: str # "export", "trigger", "retention"
rules: list[ConditionalRule] = []
simple_defaults: dict[str, object] = {}
def is_simple_mode(self) -> bool:
return len(self.rules) == 0
```
---
## Registry (`registry.py`)
### RuleContext
Provides evaluation context:
```python
@dataclass
class RuleContext:
meeting: Meeting | None = None
calendar_event: object | None = None
workspace: Workspace | None = None
project: object | None = None
user: User | None = None
extra: dict[str, object] = {}
```
### RuleType (Abstract Base)
```python
class RuleType(ABC):
name: ClassVar[str]
version: ClassVar[int] = 1
@abstractmethod
def evaluate(self, rule_config: dict, context: RuleContext) -> RuleResult: ...
@abstractmethod
def validate_config(self, config: dict) -> list[str]: ...
def get_schema(self) -> dict: ... # JSON Schema for UI
```
### Registering New Rule Types
```python
@RuleTypeRegistry.register
class MyRuleType(RuleType):
name = "my_rule"
version = 1
def evaluate(self, config: dict, context: RuleContext) -> RuleResult:
# Return RuleResult with matched=True/False and actions
...
def validate_config(self, config: dict) -> list[str]:
# Return list of validation error strings
...
```
---
## Built-in Rule Types (`builtin.py`)
### ExportRuleType
- **Name**: `export`
- **Config fields**: `default_format`, `include_audio`, `include_timestamps`, `template_id`
### TriggerRuleType
- **Name**: `trigger`
- **Config fields**: `auto_start_enabled`, `calendar_match_patterns`, `app_match_patterns`
---
## Evaluation Flow
```python
# 1. Get rule type from registry
rule_type = RuleTypeRegistry.get("export")
# 2. Build context
context = RuleContext(meeting=meeting, project=project)
# 3. Evaluate
result = rule_type.evaluate(rule_config, context)
# 4. Check result
if result.matched:
for action in result.actions:
# Execute action
```
---
## Adding a New Rule Type
1. Create class extending `RuleType` in `builtin.py` or new file
2. Define `name` and `version` class variables
3. Implement `evaluate()`, `validate_config()`, `get_schema()`
4. Decorate with `@RuleTypeRegistry.register`
5. Add tests in `tests/domain/rules/`

View File

@@ -0,0 +1,130 @@
# ASR (Automatic Speech Recognition) Development Guide
## Overview
The ASR subsystem provides **speech-to-text transcription** using Whisper models. It supports multiple backends (CTranslate2, PyTorch, ROCm) with automatic device detection.
---
## Architecture
```
AsrEngine (Protocol)
├── FasterWhisperEngine (CTranslate2 - default)
├── PyTorchWhisperEngine (fallback)
└── RocmWhisperEngine (AMD GPU)
StreamingVad
└── EnergyVad (energy-based voice detection)
Segmenter (VAD-driven state machine)
└── IDLE → SPEECH → TRAILING
```
---
## Key Files
| File | Purpose |
|------|---------|
| `protocols.py` | `AsrEngine` Protocol definition |
| `engine.py` | `FasterWhisperEngine` implementation |
| `factory.py` | `create_asr_engine()` with device detection |
| `streaming_vad.py` | `EnergyVad`, `StreamingVad` |
| `segmenter/segmenter.py` | VAD-driven audio segmentation |
| `rocm_engine.py` | AMD ROCm support |
| `pytorch_engine.py` | PyTorch fallback |
---
## AsrEngine Protocol
```python
class AsrEngine(Protocol):
def load_model(self) -> None: ...
def transcribe(self, audio: NDArray) -> Iterator[AsrResult]: ...
def unload(self) -> None: ...
@property
def is_loaded(self) -> bool: ...
@property
def model_size(self) -> str: ...
@property
def device(self) -> str: ...
@property
def compute_type(self) -> str: ...
```
---
## Device Selection
```python
engine = create_asr_engine(
model_size="base", # tiny, base, small, medium, large
device="auto", # auto, cpu, cuda, rocm, mps
compute_type="int8" # int8, float16, float32
)
```
**"auto" resolution**: CUDA → ROCm → MPS → CPU
---
## Segmenter State Machine
```
IDLE → (speech detected) → SPEECH → (silence) → TRAILING → (timeout) → IDLE
Emit AudioSegment
```
**Configuration**:
- `min_speech_duration`: 0.3s
- `max_segment_duration`: 30s
- `trailing_silence`: 0.5s
---
## Async Pattern
```python
async def transcribe_async(self, audio: NDArray) -> list[AsrResult]:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, self._sync_transcribe, audio)
```
Whisper is synchronous — wrap with `run_in_executor()` for async contexts.
---
## VAD (Voice Activity Detection)
**EnergyVad** uses energy threshold with hysteresis:
```python
class EnergyVad:
speech_threshold: float = 0.01
silence_threshold: float = 0.005
min_speech_frames: int = 3
min_silence_frames: int = 5
```
---
## Key Patterns
1. **Lazy loading** — Models loaded on first `transcribe()` call
2. **Device abstraction** — "auto" string resolved to actual backend
3. **Word-level timestamps**`WordTiming` for fine-grained sync
4. **Fallback chain** — CTranslate2 → PyTorch if unavailable
---
## Adding a New Backend
1. Implement `AsrEngine` Protocol
2. Add to `factory.py` device resolution
3. Handle model loading/unloading lifecycle
4. Return `Iterator[AsrResult]` with word timings
5. Add integration tests in `tests/infrastructure/asr/`

View File

@@ -0,0 +1,133 @@
# Diarization Development Guide
## Overview
The diarization subsystem provides **speaker identification** using pyannote.audio. It supports both **streaming** (real-time) and **offline** (post-meeting refinement) modes.
---
## Architecture
```
DiarizationEngine (entry point)
├── Streaming Pipeline (diart) — real-time
└── Offline Pipeline (pyannote) — post-meeting
DiarizationSession (per-meeting state)
└── Isolates pipeline state for concurrent processing
SpeakerAssigner
└── Maps diarization turns to transcript segments
```
---
## Key Files
| File | Purpose |
|------|---------|
| `engine/engine.py` | `DiarizationEngine` main class |
| `session.py` | `DiarizationSession` per-meeting isolation |
| `audio_buffer.py` | `DiarizationAudioBuffer` chunk accumulation |
| `assigner.py` | `assign_speaker()` segment mapping |
| `dto.py` | `SpeakerTurn` dataclass |
| `engine/_streaming_mixin.py` | Real-time processing |
| `engine/_offline_mixin.py` | Post-meeting refinement |
---
## Session-per-Meeting Pattern
```python
# Each meeting gets isolated session
session = engine.create_session(meeting_id)
# Process audio chunks
for chunk in audio_stream:
turns = await session.process_chunk(chunk)
# Session maintains its own state
session.close()
```
**Why sessions?** Enables concurrent diarization without cross-meeting interference.
---
## Streaming vs Offline
| Mode | Pipeline | Latency | Accuracy | Use Case |
|------|----------|---------|----------|----------|
| **Streaming** | diart | ~500ms | Good | Real-time speaker labels |
| **Offline** | pyannote | Minutes | Better | Post-meeting refinement |
---
## Speaker Assignment
```python
speaker_id, confidence = assign_speaker(
segment_start=0.0,
segment_end=5.0,
turns=speaker_turns,
)
```
**Algorithm**: Max overlap duration wins. Confidence = overlap / segment duration.
---
## Turn Management
Sessions maintain bounded turn history:
```python
# Pruning limits
MAX_TURNS = 5000
MAX_TURN_AGE_SECONDS = 900 # 15 minutes
```
**Crash Recovery**: `StreamingDiarizationTurnModel` persisted immediately to DB.
---
## Configuration
```python
DiarizationEngine(
hf_token="hf_...", # HuggingFace API token
device="cuda", # cuda, cpu
streaming_latency=0.5, # Streaming window
min_speakers=1,
max_speakers=10,
)
```
---
## Key Patterns
1. **Session isolation** — Shared models, isolated state per meeting
2. **Crash resilience** — Turns persisted to DB immediately
3. **Bounded memory** — Turn pruning by count and age
4. **Async wrappers** — Model inference in thread pool
---
## Job Lifecycle
```
PENDING → RUNNING → COMPLETED | FAILED
```
Jobs tracked in `DiarizationJobRepository` for async refinement.
---
## Adding Speaker Features
1. Modify `SpeakerTurn` DTO if new fields needed
2. Update `session.py` turn processing
3. Update `assigner.py` assignment logic
4. Persist changes via `diarization_jobs` repository
5. Add tests in `tests/infrastructure/diarization/`

View File

@@ -0,0 +1,145 @@
# NER (Named Entity Recognition) Development Guide
## Overview
The NER subsystem provides **named entity extraction** from transcript text. It uses spaCy with configurable model sizes and supports batch processing with segment tracking.
---
## Architecture
```
NerEngine (main entry point)
└── spaCy pipeline (en_core_web_*)
Category Mapping:
spaCy types → Domain EntityCategory
```
---
## Key Files
| File | Purpose |
|------|---------|
| `engine.py` | `NerEngine` implementation |
| Future: `backends/` | Backend abstraction (GLiNER, etc.) |
| Future: `post_processing.py` | Normalization, deduplication |
| Future: `mapper.py` | Raw → Domain entity conversion |
---
## NerEngine
```python
class NerEngine:
def extract(self, text: str) -> list[NamedEntity]: ...
def extract_from_segments(
self,
segments: list[tuple[int, str]]
) -> list[NamedEntity]: ...
async def extract_async(self, text: str) -> list[NamedEntity]: ...
```
---
## Category Mapping
| spaCy Type | Domain Category |
|------------|-----------------|
| PERSON | PERSON |
| ORG | COMPANY |
| GPE, LOC, FAC | LOCATION |
| DATE, TIME | DATE |
| PRODUCT, WORK_OF_ART | PRODUCT |
| Others | OTHER |
---
## Model Options
```python
engine = NerEngine(model_size="sm") # sm, md, lg, trf
```
| Size | Model | Speed | Accuracy |
|------|-------|-------|----------|
| sm | en_core_web_sm | Fast | Basic |
| md | en_core_web_md | Medium | Better |
| lg | en_core_web_lg | Slow | Good |
| trf | en_core_web_trf | Slowest | Best |
**Fallback**: If transformer unavailable, falls back to small model.
---
## Segment Tracking
```python
# Input: List of (segment_id, text) tuples
segments = [(1, "John spoke about Microsoft"), (2, "Jane mentioned Google")]
# Output: Entities with segment_ids populated
entities = engine.extract_from_segments(segments)
# Entity("John", PERSON, segment_ids=[1])
# Entity("Microsoft", COMPANY, segment_ids=[1])
# Entity("Jane", PERSON, segment_ids=[2])
# Entity("Google", COMPANY, segment_ids=[2])
```
---
## Deduplication
Entities with same normalized text are merged:
```python
# "JOHN" and "John" → single entity with combined segment_ids
```
---
## Async Pattern
```python
async def extract_async(self, text: str) -> list[NamedEntity]:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, self.extract, text)
```
spaCy is synchronous — wrap for async contexts.
---
## Key Patterns
1. **Lazy model loading** — Model loaded on first `extract()` call
2. **Segment tracking** — Each entity knows which segments it came from
3. **Case-insensitive deduplication** — Normalized text key
4. **Async wrappers** — Thread pool for blocking inference
---
## Future: Backend Abstraction
Planned architecture for swappable backends:
```python
class NerBackend(Protocol):
def extract(self, text: str) -> list[RawEntity]: ...
class NerEngine:
def __init__(self, backend: NerBackend, mapper: NerMapper): ...
```
Enables switching to GLiNER or other backends via config.
---
## Adding Entity Categories
1. Add to `EntityCategory` enum in `domain/entities/named_entity.py`
2. Update category mapping in `engine.py`
3. Add post-processing rules if needed
4. Update tests in `tests/infrastructure/ner/`

View File

@@ -0,0 +1,162 @@
# Summarization Development Guide
## Overview
The summarization subsystem provides **LLM-powered summary generation** with multiple provider backends. It supports consent tracking for cloud providers and evidence linking via segment citations.
---
## Architecture
```
SummarizationProvider (Protocol)
├── CloudSummarizer (OpenAI/Anthropic)
├── OllamaSummarizer (local LLM)
└── MockSummarizer (testing)
SummarizationService (application layer)
└── Manages providers, consent, templates
CitationVerifier
└── Validates segment_ids in summaries
```
---
## Key Files
| File | Purpose |
|------|---------|
| `factory.py` | `create_summarization_service()` |
| `cloud_provider/cloud_provider.py` | `CloudSummarizer` |
| `ollama_provider.py` | `OllamaSummarizer` |
| `mock_provider.py` | `MockSummarizer` |
| `_parsing.py` | Response parsing utilities |
| `citation_verifier.py` | `SegmentCitationVerifier` |
| `template_renderer.py` | Template customization |
---
## Provider Protocol
```python
class SummarizationProvider(Protocol):
async def summarize(
self,
segments: list[Segment],
context: SummarizationContext,
) -> Summary: ...
@property
def requires_consent(self) -> bool: ...
@property
def is_available(self) -> bool: ...
```
---
## Provider Selection
```python
service = create_summarization_service(
config=SummarizationServiceFactoryConfig(
preferred_provider=ProviderType.AUTO, # AUTO, LOCAL, CLOUD, MOCK
ollama_enabled=True,
cloud_enabled=True,
)
)
```
**Fallback chain**: LOCAL (Ollama) → CLOUD (OpenAI/Anthropic) → MOCK
---
## Cloud Backends
```python
class CloudBackend(Enum):
OPENAI = "openai"
ANTHROPIC = "anthropic"
```
Configuration via environment:
- `NOTEFLOW_OPENAI_API_KEY`
- `NOTEFLOW_ANTHROPIC_API_KEY`
---
## Consent Workflow
Cloud providers require explicit user consent:
```python
if provider.requires_consent:
if not consent_manager.has_consent(user_id):
raise ConsentRequiredError("Cloud summarization requires consent")
```
Consent tracked via `ConsentManager` in gRPC mixins.
---
## Citation Verification
```python
verifier = SegmentCitationVerifier()
result = verifier.verify(summary, valid_segment_ids)
if not result.is_valid:
# Filter out invalid citations
summary = verifier.filter_invalid_citations(summary, result)
```
---
## Template System
```python
template = SummarizationTemplate(
tone=Tone.PROFESSIONAL, # professional, casual, technical
format=Format.BULLET_POINTS, # bullet_points, narrative, structured
verbosity=Verbosity.BALANCED # minimal, balanced, detailed
)
```
---
## Summary Structure
```python
@dataclass(frozen=True)
class Summary:
meeting_id: MeetingId
executive_summary: str
key_points: list[KeyPoint] # Each with segment_ids
action_items: list[ActionItem] # Each with segment_ids
provider_name: str
model_name: str
tokens_used: int
latency_ms: int
```
---
## Key Patterns
1. **Provider abstraction** — Switch backends without code changes
2. **Fallback chain** — Graceful degradation if preferred unavailable
3. **Citation linking** — Segment IDs enable evidence verification
4. **Consent tracking** — Required for cloud data transfer
5. **Availability checking** — Providers report ready status
---
## Adding a New Provider
1. Implement `SummarizationProvider` Protocol
2. Add to `factory.py` provider resolution
3. Set `requires_consent` appropriately
4. Implement `is_available` check
5. Handle rate limits and errors gracefully
6. Add integration tests in `tests/infrastructure/summarization/`

438
tests/CLAUDE.md Normal file
View File

@@ -0,0 +1,438 @@
# Python Tests Development Guide
## Overview
The test suite (`tests/`) provides comprehensive coverage for the NoteFlow Python backend. Tests are organized to mirror the source structure and enforce quality gates through baseline comparison.
**Architecture**: pytest + pytest-asyncio + testcontainers for integration
---
## Directory Structure
```
tests/
├── application/ # Service and use case tests
├── benchmarks/ # Performance benchmarking tests
├── config/ # Configuration tests
├── domain/ # Domain entity and value object tests
├── fixtures/ # Test data files (audio samples)
│ └── audio/ # Audio fixture files
├── grpc/ # gRPC servicer and streaming tests
├── infrastructure/ # Infrastructure adapter tests
│ ├── asr/ # Speech-to-text engine tests
│ ├── audio/ # Audio processing tests
│ ├── auth/ # Authentication/OAuth tests
│ ├── calendar/ # Calendar integration tests
│ ├── diarization/ # Speaker diarization tests
│ ├── export/ # Export functionality tests
│ ├── gpu/ # GPU hardware detection tests
│ ├── metrics/ # Metrics and observability tests
│ ├── ner/ # Named Entity Recognition tests
│ ├── observability/ # OpenTelemetry instrumentation tests
│ ├── persistence/ # Database ORM tests
│ ├── security/ # Encryption/crypto tests
│ ├── summarization/ # Summary generation tests
│ ├── triggers/ # Window trigger detection tests
│ └── webhooks/ # Webhook delivery tests
├── integration/ # Full system integration tests (PostgreSQL)
├── quality/ # Code quality and static analysis tests
│ ├── _detectors/ # Quality rule detector implementations
│ └── baselines.json # Frozen violation baseline
├── stress/ # Stress, concurrency, and fuzz tests
└── conftest.py # Root-level shared fixtures
```
---
## Running Tests
```bash
# All tests
pytest
# Skip slow tests (model loading)
pytest -m "not slow"
# Integration tests only
pytest -m integration
# Quality gate checks
pytest tests/quality/
# Stress/fuzz tests
pytest tests/stress/
# Specific test file
pytest tests/domain/test_meeting.py
# With coverage
pytest --cov=src/noteflow --cov-report=html
```
---
## Pytest Markers
| Marker | Purpose |
|--------|---------|
| `@pytest.mark.slow` | Model loading, GPU operations |
| `@pytest.mark.integration` | External services (PostgreSQL) |
| `@pytest.mark.stress` | Stress and concurrency tests |
Usage:
```python
@pytest.mark.slow
@pytest.mark.parametrize("model_size", ["tiny", "base"])
def test_asr_model_loading(model_size: str) -> None:
...
```
---
## Key Fixtures
### Root Fixtures (`conftest.py`)
| Fixture | Scope | Purpose |
|---------|-------|---------|
| `mock_optional_extras` | session | Mocks openai, anthropic, ollama (collection time) |
| `reset_context_vars` | function | Isolates logging context variables |
| `mock_uow` | function | Full UnitOfWork mock with all repos |
| `meeting_id` | function | MeetingId (UUID-based) |
| `sample_meeting` | function | Meeting in CREATED state |
| `recording_meeting` | function | Meeting in RECORDING state |
| `sample_rate` | function | 16000 (DEFAULT_SAMPLE_RATE) |
| `crypto` | function | AesGcmCryptoBox with InMemoryKeyStore |
| `meetings_dir` | function | Temporary meetings directory |
| `webhook_config` | function | WebhookConfig for MEETING_COMPLETED |
| `mock_grpc_context` | function | Mock gRPC ServicerContext |
| `mockasr_engine` | function | Mock ASR engine |
| `memory_servicer` | function | In-memory NoteFlowServicer |
**Utility Functions** (not fixtures):
- `approx_float(expected, rel, abs)` — Type-safe pytest.approx wrapper
- `approx_sequence(expected, rel, abs)` — Float sequence comparison
### Integration Fixtures (`integration/conftest.py`)
| Fixture | Purpose |
|---------|---------|
| `session_factory` | PostgreSQL async_sessionmaker (testcontainers) |
| `session` | Individual AsyncSession with rollback |
| `persisted_meeting` | Created and persisted Meeting |
| `stopped_meeting_with_segments` | Meeting in STOPPED state with speaker segments |
| `audio_fixture_path` | Path to sample_discord.wav |
| `audio_samples` | Normalized float32 array |
### Quality Fixtures (`quality/`)
| Fixture | Purpose |
|---------|---------|
| `baselines.json` | Frozen violation counts by rule |
| `_baseline.py` | Baseline comparison utilities |
| `_detectors/` | AST-based detection implementations |
---
## Testing Patterns
### 1. Declarative Tests (No Loops)
**CRITICAL**: Test functions must NOT contain loops. Use parametrize:
```python
# ✅ CORRECT: Parametrized test
@pytest.mark.parametrize(
("text", "expected_category"),
[
("John Smith", EntityCategory.PERSON),
("Apple Inc.", EntityCategory.COMPANY),
("Python", EntityCategory.TECHNICAL),
],
)
def test_extract_entity_category(text: str, expected_category: EntityCategory) -> None:
result = extract_entity(text)
assert result.category == expected_category
# ❌ WRONG: Loop in test
def test_extract_entity_categories() -> None:
test_cases = [("John Smith", EntityCategory.PERSON), ...]
for text, expected in test_cases: # FORBIDDEN
result = extract_entity(text)
assert result.category == expected
```
### 2. Async Testing
```python
# Async fixtures
@pytest.fixture
async def persisted_meeting(session: AsyncSession) -> Meeting:
meeting = Meeting.create(...)
session.add(MeetingModel.from_entity(meeting))
await session.commit()
return meeting
# Async tests (auto-marked by pytest-asyncio)
async def test_fetch_meeting(uow: UnitOfWork, persisted_meeting: Meeting) -> None:
result = await uow.meetings.get(persisted_meeting.id)
assert result is not None
```
### 3. Mock Strategies
**Repository Mocks**:
```python
@pytest.fixture
def mock_meetings_repo() -> AsyncMock:
repo = AsyncMock(spec=MeetingRepository)
repo.get = AsyncMock(return_value=None)
repo.create = AsyncMock()
return repo
```
**Service Mocks with Side Effects**:
```python
@pytest.fixture
def mock_executor(captured_payloads: list) -> AsyncMock:
async def capture_delivery(config, event_type, payload):
captured_payloads.append(payload)
return WebhookDelivery(status_code=200, ...)
executor = AsyncMock(spec=WebhookExecutor)
executor.deliver = AsyncMock(side_effect=capture_delivery)
return executor
```
### 4. Quality Gate Pattern
Tests in `tests/quality/` use baseline comparison:
```python
def test_no_high_complexity_functions() -> None:
violations = collect_high_complexity(parse_errors=[])
assert_no_new_violations("high_complexity", violations)
```
**Behavior**:
- Fails if NEW violations are introduced
- Passes if violations match or decrease from baseline
- Baseline is frozen in `baselines.json`
### 5. Integration Test Setup
```python
@pytest.fixture(scope="session")
async def session_factory() -> AsyncGenerator[async_sessionmaker, None]:
_, database_url = get_or_create_container() # testcontainers
engine = create_test_engine(database_url)
async with engine.begin() as conn:
await initialize_test_schema(conn)
yield create_test_session_factory(engine)
await cleanup_test_schema(conn)
await engine.dispose()
```
---
## Quality Tests (`tests/quality/`)
### Quality Rules
| Test File | Rule | Description |
|-----------|------|-------------|
| `test_code_smells.py` | high_complexity | Cyclomatic complexity > threshold |
| `test_code_smells.py` | god_class | Classes with too many methods |
| `test_code_smells.py` | deep_nesting | Nesting depth > 7 levels |
| `test_code_smells.py` | long_method | Methods > 50 lines |
| `test_test_smells.py` | test_loop | Loops in test assertions |
| `test_test_smells.py` | test_conditional | Conditionals in assertions |
| `test_magic_values.py` | magic_number | Unextracted numeric constants |
| `test_duplicate_code.py` | code_duplication | Repeated code blocks |
| `test_stale_code.py` | dead_code | Unreachable code |
| `test_unnecessary_wrappers.py` | thin_wrapper | Unnecessary wrapper functions |
| `test_decentralized_helpers.py` | helper_sprawl | Unconsolidated helpers |
| `test_baseline_self.py` | baseline_integrity | Baseline file validation |
### Baseline System
```python
# _baseline.py
@dataclass
class Violation:
rule: str
relative_path: str
identifier: str
detail: str | None = None
@property
def stable_id(self) -> str:
"""Unique identifier: rule|relative_path|identifier[|detail]"""
...
def assert_no_new_violations(rule: str, violations: list[Violation]) -> None:
"""Compares current violations against frozen baseline."""
baseline = load_baseline()
result = compare_violations(baseline[rule], violations)
assert result.passed, f"New violations: {result.new_violations}"
```
### Running Quality Checks
```bash
# All quality tests
pytest tests/quality/
# Specific rule
pytest tests/quality/test_code_smells.py
# Update baseline (REQUIRES APPROVAL)
# Do NOT do this without explicit permission
```
---
## Fixture Scope Guidelines
| Scope | Use When |
|-------|----------|
| `session` | Expensive setup (DB containers, ML models) |
| `module` | Shared across test class (NER engine) |
| `function` | Per-test isolation (default) |
| `autouse=True` | Auto-inject into all tests |
---
## Adding New Tests
### 1. Choose Location
Mirror the source structure:
- `src/noteflow/domain/entities/meeting.py``tests/domain/test_meeting.py`
- `src/noteflow/infrastructure/ner/engine.py``tests/infrastructure/ner/test_engine.py`
### 2. Use Existing Fixtures
Check `conftest.py` files for reusable fixtures before creating new ones.
### 3. Follow Patterns
```python
# tests/<module>/test_my_feature.py
import pytest
from noteflow.<module> import MyFeature
class TestMyFeature:
"""Tests for MyFeature class."""
def test_basic_operation(self, sample_meeting: Meeting) -> None:
"""Test basic operation with sample meeting."""
feature = MyFeature()
result = feature.process(sample_meeting)
assert result.success is True
@pytest.mark.parametrize(
("input_value", "expected"),
[
("valid", True),
("invalid", False),
],
)
def test_validation(self, input_value: str, expected: bool) -> None:
"""Test validation with various inputs."""
result = MyFeature.validate(input_value)
assert result == expected
@pytest.mark.slow
def test_with_model_loading(self) -> None:
"""Test requiring ML model loading."""
...
```
### 4. Add Fixtures to conftest.py
```python
# tests/<module>/conftest.py
import pytest
from noteflow.<module> import MyFeature
@pytest.fixture
def my_feature() -> MyFeature:
"""Configured MyFeature instance."""
return MyFeature(config=test_config)
```
---
## Forbidden Patterns
### ❌ Loops in Test Assertions
```python
# WRONG
def test_items() -> None:
for item in items:
assert item.valid
# RIGHT: Use parametrize
@pytest.mark.parametrize("item", items)
def test_item_valid(item: Item) -> None:
assert item.valid
```
### ❌ Conditionals in Assertions
```python
# WRONG
def test_result() -> None:
if condition:
assert result == expected_a
else:
assert result == expected_b
# RIGHT: Separate tests or parametrize
```
### ❌ Modifying Quality Baselines
**NEVER** add entries to `baselines.json` without explicit approval. Fix the actual code violation instead.
### ❌ Using print() for Debugging
```python
# WRONG
def test_something() -> None:
print(f"Debug: {result}") # Will be captured, not shown
# RIGHT: Use pytest's capsys or assert messages
def test_something(capsys) -> None:
...
captured = capsys.readouterr()
```
---
## Key Files Reference
| File | Purpose |
|------|---------|
| `conftest.py` | Root fixtures (524 lines) |
| `quality/baselines.json` | Frozen violation baseline |
| `quality/_baseline.py` | Baseline comparison utilities |
| `quality/_detectors/` | AST-based detectors |
| `integration/conftest.py` | PostgreSQL testcontainers setup |
| `stress/conftest.py` | Stress test fixtures |
| `fixtures/audio/` | Audio sample files |
---
## See Also
- `/src/noteflow/CLAUDE.md` — Python backend standards
- `/pyproject.toml` — pytest configuration
- `/support/` — Test support utilities (non-test code)