feat: add structured logging for persistence and export operations

- Introduced logging for CRUD operations in repositories to enhance visibility into database interactions.
- Implemented timing logs for BaseRepository and UnitOfWork lifecycle events to track performance.
- Added export logging for size and duration without exposing sensitive content.
- Promoted logging levels for specific operations to improve clarity and reduce noise in logs.
- Established a framework for consistent logging practices across persistence and export functionalities.
This commit is contained in:
2026-01-14 01:18:44 +00:00
parent 5713abd07b
commit 0c1dbb362f
28 changed files with 4650 additions and 6144 deletions

2
client

Submodule client updated: c1ebe8cb53...2a2449be30

View File

@@ -0,0 +1,168 @@
# Sprint: Logging Gap Remediation (P1 - Runtime/Inputs)
> **Size**: M | **Owner**: Platform | **Prerequisites**: log_timing + get_logger already in place
> **Phase**: Observability - Runtime Diagnostics
---
## Open Issues & Prerequisites
> ⚠️ **Review Date**: 2026-01-03 — Verification complete, scope needs owner/priority confirmation.
### Blocking Issues
| ID | Issue | Status | Resolution |
|----|-------|--------|------------|
| **B1** | Log level policy for invalid input (warn vs info vs debug) | ✅ | WARN with redaction |
| **B2** | PII redaction rules for UUIDs and URLs in logs | Pending | Align with security guidance |
### Design Gaps to Address
| ID | Gap | Resolution |
|----|-----|------------|
| G1 | Stub-missing logs could be noisy in gRPC client mixins | Add rate-limited or once-per-session logging |
| G2 | Timing vs. count metrics for long-running CPU tasks | Standardize on `log_timing` + optional result_count |
### Prerequisite Verification
| Prerequisite | Status | Notes |
|--------------|--------|-------|
| `log_timing` helper available | ✅ | `src/noteflow/infrastructure/logging/timing.py` |
| `log_state_transition` available | ✅ | `src/noteflow/infrastructure/logging/transitions.py` |
---
## Validation Status (2026-01-03)
### RESOLVED SINCE TRIAGE
| Component | Status | Notes |
|-----------|--------|-------|
| Ollama availability logging | Resolved | `src/noteflow/infrastructure/summarization/ollama_provider.py` uses `log_timing` |
| Cloud LLM API timing/logging | Resolved | `src/noteflow/infrastructure/summarization/cloud_provider.py` uses `log_timing` |
| Google Calendar request timing | Resolved | `src/noteflow/infrastructure/calendar/google_adapter.py` uses `log_timing` |
| OAuth refresh timing | Resolved | `src/noteflow/infrastructure/calendar/oauth_manager.py` uses `log_timing` |
| Webhook delivery start/finish | Resolved | `src/noteflow/infrastructure/webhooks/executor.py` info logs |
| Database engine + migrations | Resolved | `src/noteflow/infrastructure/persistence/database.py` info logs |
| Diarization full timing | Resolved | `src/noteflow/infrastructure/diarization/engine.py` uses `log_timing` |
| Diarization job timeout logging | Resolved | `src/noteflow/grpc/_mixins/diarization/_status.py` |
| Meeting state transitions | Resolved | `src/noteflow/application/services/meeting_service.py` |
| Streaming cleanup | Resolved | `src/noteflow/grpc/_mixins/streaming/_cleanup.py` |
### NOT IMPLEMENTED
| Component | Status | Notes |
|-----------|--------|-------|
| NER warmup timing/logs | Not implemented | `src/noteflow/application/services/ner_service.py` uses `run_in_executor` without logs |
| ASR `transcribe_async` timing | Not implemented | `src/noteflow/infrastructure/asr/engine.py` lacks duration/RTF logs |
| Segmenter state transitions | Not implemented | `src/noteflow/infrastructure/asr/segmenter.py` no transition logs |
| Silent UUID parsing (workspace) | Not implemented | `src/noteflow/grpc/_mixins/meeting.py` returns None on ValueError |
| Silent meeting-id parsing | Not implemented | `src/noteflow/grpc/_mixins/converters/_id_parsing.py` returns None on ValueError |
| Silent calendar datetime parsing | Not implemented | `src/noteflow/infrastructure/triggers/calendar.py` returns None on ValueError |
| Settings fallback logging | Not implemented | `_get_llm_settings`, `_get_webhook_settings`, `diarization_job_ttl_seconds` |
| gRPC client stub missing logs | Not implemented | `src/noteflow/grpc/_client_mixins/*.py` return None silently |
| Rust gRPC connection tracing | Not implemented | `client/src-tauri/src/grpc/client/core.rs` no start/finish timing |
**Downstream impact**: Runtime visibility gaps for user-facing latency, failure diagnosis, and client connection issues.
---
## Objective
Close remaining high-impact logging gaps for runtime operations and input validation to reduce debugging time and improve failure diagnosis across Python gRPC services and the Tauri client.
---
## Key Decisions
| Decision | Choice | Rationale |
|----------|--------|-----------|
| **Timing utility** | Use `log_timing` | Consistent duration metrics and structured fields |
| **Invalid input logging** | Warn-level with redaction | Catch client errors without leaking sensitive data |
| **Stub-missing logging** | Rate-limited (once per client instance) | Avoid log spam while preserving visibility |
---
## What Already Exists
| Asset | Location | Implication |
|-------|----------|-------------|
| `log_timing` helper | `src/noteflow/infrastructure/logging/timing.py` | Use for executor + network timing |
| `log_state_transition` | `src/noteflow/infrastructure/logging/transitions.py` | Reuse for state-machine transitions |
| Existing log_timing usage | `ollama_provider.py`, `cloud_provider.py`, `google_adapter.py` | Follow established patterns |
---
## Scope
| Task | Effort | Notes |
|------|--------|-------|
| **Application Layer** | | |
| Add NER warmup + extraction timing logs | S | Use `log_timing` around `run_in_executor` |
| **Infrastructure Layer** | | |
| Add ASR `transcribe_async` duration + RTF logging | M | Include audio duration and model size |
| Add segmenter state transition logs | S | Use `log_state_transition` or structured info logs |
| Add settings fallback warning logs | S | `_get_llm_settings`, `_get_webhook_settings`, `diarization_job_ttl_seconds` |
| **API Layer** | | |
| Log invalid workspace UUID parsing (WARN + redaction) | S | `src/noteflow/grpc/_mixins/meeting.py` |
| Log invalid meeting_id parsing (WARN + redaction) | S | `src/noteflow/grpc/_mixins/converters/_id_parsing.py` |
| Log calendar datetime parse failures (WARN + redaction) | S | `src/noteflow/infrastructure/triggers/calendar.py` |
| gRPC client mixins log missing stub (rate-limited) | S | `src/noteflow/grpc/_client_mixins/*.py` |
| **Client Layer** | | |
| Add tracing for gRPC connect attempts | S | `client/src-tauri/src/grpc/client/core.rs` |
**Total Effort**: M (2-4 hours)
---
## Deliverables
### Backend
**Application Layer**:
- [ ] `src/noteflow/application/services/ner_service.py` — add warmup/extraction timing logs
**Infrastructure Layer**:
- [ ] `src/noteflow/infrastructure/asr/engine.py` — log transcription duration + RTF
- [ ] `src/noteflow/infrastructure/asr/segmenter.py` — log state transitions
- [ ] `src/noteflow/infrastructure/summarization/cloud_provider.py` — log settings fallback
- [ ] `src/noteflow/infrastructure/webhooks/executor.py` — log settings fallback
**API Layer**:
- [ ] `src/noteflow/grpc/_mixins/meeting.py` — log invalid workspace UUID parse (WARN + redaction)
- [ ] `src/noteflow/grpc/_mixins/converters/_id_parsing.py` — log invalid meeting_id parse (WARN + redaction)
- [ ] `src/noteflow/infrastructure/triggers/calendar.py` — log datetime parse errors (WARN + redaction)
- [ ] `src/noteflow/grpc/_client_mixins/*.py` — log missing stub (rate-limited)
- [ ] `src/noteflow/grpc/_mixins/diarization_job.py` — log settings fallback
### Client
- [ ] `client/src-tauri/src/grpc/client/core.rs` — log connection attempt duration + endpoint
---
## Test Strategy
### Core test cases
- **Application**: `caplog` validates NER warmup logs appear when lazy-load path is taken
- **Infrastructure**: `caplog` validates ASR timing log fields include duration and audio length
- **API**: invalid UUID parsing emits warning and aborts/returns safely
- **Client**: basic unit test or log snapshot for connection start/failure paths
---
## Quality Gates
- [ ] Added logs use structured fields and follow existing logging patterns
- [ ] No new `# type: ignore` or `Any` introduced
- [ ] Targeted tests for new logging paths where practical
- [ ] `ruff check` + `mypy` pass (backend)
- [ ] `npm run lint:rs` pass (client)
---
## Post-Sprint
- [ ] Evaluate if logging should be sampled for high-frequency segmenter transitions
- [ ] Consider centralized log suppression for repeated invalid client inputs

View File

@@ -0,0 +1,144 @@
# Sprint: Logging Gap Remediation (P2 - Persistence/Exports)
> **Size**: L | **Owner**: Platform | **Prerequisites**: P1 logging gaps resolved
> **Phase**: Observability - Data & Lifecycle
---
## Open Issues & Prerequisites
> ⚠️ **Review Date**: 2026-01-03 — Verification complete, scope needs prioritization.
### Blocking Issues
| ID | Issue | Status | Resolution |
|----|-------|--------|------------|
| **B1** | Log volume for repository CRUD operations | Pending | Decide sampling/level policy |
| **B2** | Sensitive data in repository logs | Pending | Redaction and field allowlist |
### Design Gaps to Address
| ID | Gap | Resolution |
|----|-----|------------|
| G1 | Consistent DB timing strategy across BaseRepository and UoW | Add `log_timing` helpers or per-method timing |
| G2 | Export logs should include size without dumping content | Log byte count + segment count only |
### Prerequisite Verification
| Prerequisite | Status | Notes |
|--------------|--------|-------|
| Logging helpers available | ✅ | `log_timing`, `get_logger` |
| State transition logger | ✅ | `log_state_transition` |
---
## Validation Status (2026-01-03)
### PARTIALLY IMPLEMENTED
| Component | Status | Notes |
|-----------|--------|-------|
| DB migrations lifecycle logs | Partial | Migration start/end logged; repo/UoW still silent |
| Audio writer open logging | Partial | Open/flush errors logged, but thread lifecycle unlogged |
### NOT IMPLEMENTED
| Component | Status | Notes |
|-----------|--------|-------|
| BaseRepository query timing | Not implemented | `src/noteflow/infrastructure/persistence/repositories/_base.py` |
| UnitOfWork lifecycle logs | Not implemented | `src/noteflow/infrastructure/persistence/unit_of_work.py` |
| Repository CRUD logging | Not implemented | `meeting_repo.py`, `segment_repo.py`, `summary_repo.py`, etc. |
| Asset deletion no-op logging | Not implemented | `src/noteflow/infrastructure/persistence/repositories/asset_repo.py` |
| Export timing/logging | Not implemented | `pdf.py`, `markdown.py`, `html.py` |
| Diarization session close log level | Not implemented | `src/noteflow/infrastructure/diarization/session.py` uses debug |
| Background task lifecycle logs | Not implemented | `src/noteflow/grpc/_mixins/diarization/_jobs.py` task creation missing |
**Downstream impact**: Limited visibility into DB performance, export latency, and lifecycle cleanup.
---
## Objective
Add structured logging for persistence, export, and lifecycle operations so DB performance issues and long-running exports are diagnosable without ad-hoc debugging.
---
## Key Decisions
| Decision | Choice | Rationale |
|----------|--------|-----------|
| **Repository logging level** | INFO for mutations, DEBUG for reads | Avoid log noise while capturing state changes |
| **Timing strategy** | `log_timing` around DB write batches | Consistent duration metrics without per-row spam |
| **Export logging** | Log sizes and durations only | Avoid dumping user content |
---
## What Already Exists
| Asset | Location | Implication |
|-------|----------|-------------|
| Migration logging | `src/noteflow/infrastructure/persistence/database.py` | Reuse for DB lifecycle logs |
| Log helpers | `src/noteflow/infrastructure/logging/*` | Standardize on structured logging |
---
## Scope
| Task | Effort | Notes |
|------|--------|-------|
| **Infrastructure Layer** | | |
| Add BaseRepository timing wrappers | M | `_execute_*` methods emit duration |
| Add UnitOfWork lifecycle logs | S | __aenter__/commit/rollback/exit |
| Add CRUD mutation logs in repositories | L | Create/Update/Delete summary logs |
| Add asset deletion no-op log | S | log when directory missing |
| Add export timing logs | M | PDF/Markdown/HTML export duration + size |
| Promote diarization session close to INFO | S | `session.py` |
| Log diarization job task creation | S | `grpc/_mixins/diarization/_jobs.py` |
| Add audio flush thread lifecycle logs | S | `infrastructure/audio/writer.py` |
**Total Effort**: L (4-8 hours)
---
## Deliverables
### Backend
**Infrastructure Layer**:
- [ ] `src/noteflow/infrastructure/persistence/repositories/_base.py` — timing logs for DB operations
- [ ] `src/noteflow/infrastructure/persistence/unit_of_work.py` — session/commit/rollback logs
- [ ] `src/noteflow/infrastructure/persistence/repositories/*_repo.py` — mutation logging
- [ ] `src/noteflow/infrastructure/persistence/repositories/asset_repo.py` — no-op delete log
- [ ] `src/noteflow/infrastructure/export/pdf.py` — duration + byte-size log
- [ ] `src/noteflow/infrastructure/export/markdown.py` — export count log
- [ ] `src/noteflow/infrastructure/export/html.py` — export count log
- [ ] `src/noteflow/infrastructure/diarization/session.py` — info-level close log
- [ ] `src/noteflow/grpc/_mixins/diarization/_jobs.py` — background task creation log
- [ ] `src/noteflow/infrastructure/audio/writer.py` — flush thread lifecycle logs
---
## Test Strategy
### Core test cases
- **Repositories**: `caplog` validates mutation logging for create/update/delete
- **UnitOfWork**: log emitted on commit/rollback paths
- **Exports**: ensure logs include duration and output size (bytes/segments)
- **Lifecycle**: diarization session close emits info log
---
## Quality Gates
- [ ] Logging includes structured fields and avoids payload content
- [ ] No new `# type: ignore` or `Any` introduced
- [ ] `pytest` passes for touched modules
- [ ] `ruff check` + `mypy` pass
---
## Post-Sprint
- [ ] Assess performance impact of repo timing logs
- [ ] Consider opt-in logging for high-volume read paths

View File

@@ -1,165 +1,555 @@
According to a document from **January 3, 2026** (your `repomix-output.md` snapshot), there are several concrete places where your **Python gRPC backend (`src/`)** and your **frontend (React `client/src` + Tauri `client/src-tauri`)** can be “correct but not actually wired together,” which would exactly match your symptom: *the Docker backend never shows logs that look like they were triggered by the Tauri client*.
# Technical Debt Triage
Below is a “where it breaks / why / what to fix” checklist, with the most likely root causes first.
This document tracks known issues, technical debt, and areas needing improvement.
---
## 1) Your Tauri recording pipeline can start without ever connecting to the backend
## Insufficient Logging - Comprehensive Audit
### What I see
**Discovered:** 2025-12-31
**Last Verified:** 2026-01-03
**Sprint Docs:**
- `docs/sprints/sprint_logging_gap_remediation_p1.md`
- `docs/sprints/sprint_logging_gap_remediation_p2.md`
* The Tauri command `start_recording` calls `stream_manager.start_streaming(meeting_id, app)` but does **not** ensure a gRPC connection exists before doing so. There is no `grpc_client.connect(...)` in that visible code path.
* `StreamManager::setup_streaming` immediately does `let mut grpc_client = self.client.get_client()?;` and then calls `.stream_transcription(...)` on it.
* `GrpcClient::get_client()` returns `Error::NotConnected` if there is no internal client/channel.
### Why that explains “no backend logs”
If the React side doesnt explicitly call the Tauri `connect` command *before* starting a recording, the streaming setup fails locally with `NotConnected` and **no TCP connection is ever attempted**, so your Docker backend sees nothing (therefore no logs).
### Fix
Pick one (Id do both #1 + #2):
1. **Frontend guarantee:** In the UI flow where you start recording, force an `await api.connect(serverUrl)` (or connect-on-startup), and block recording UI until connected.
2. **Backend guarantee in Rust:** Make `start_recording` auto-connect if needed:
* If `!state.grpc_client.is_connected()`, call `state.grpc_client.connect(None).await?` before `start_streaming`.
Also ensure you surface the Tauri connect error event in the UI (see #4). The connect command emits error info when it fails.
**Impact:** Caused ~1 hour of debugging when Ollama 120s timeout appeared as migration hang
**Total Issues Found:** 100+
---
## 2) You have mock/simulated paths that can completely bypass the backend
## 1. Network/External Service Connections
### What I see
### 1.1 CRITICAL: Ollama Availability Check - Silent 120s Timeout
* `initializeAPI` imports both `cachedAPI` and `mockAPI`.
* Your preferences model includes `simulate_transcription?: boolean`.
* There is a `MockTranscriptionStream` whose `send(_chunk)` is literally a no-op (so audio you “send” goes nowhere).
**File:** `src/noteflow/infrastructure/summarization/ollama_provider.py:101-115`
### Why that explains “no backend logs”
```python
@property
def is_available(self) -> bool:
try:
client = self._get_client()
client.list() # Silent 120-second timeout!
return True
except (ConnectionError, TimeoutError, ...):
return False
```
If your Tauri build ever selects `mockAPI` (or a simulated transcription mode) instead of the real Tauri→Rust→gRPC flow, the UI can appear to function while **never touching the Docker backend**.
**Status (2026-01-03):** Resolved — `log_timing` added around availability check.
### Fix
* Make the active API implementation extremely explicit at runtime (log it in the UI console + show a dev badge).
* In Tauri builds, ensure `initializeAPI` never picks `mockAPI` unless you intentionally enable it.
* Default `simulate_transcription` to `false` in production builds, and make it obvious when enabled.
**Fix:**
```python
@property
def is_available(self) -> bool:
try:
logger.info("Checking Ollama availability at %s (timeout: %.0fs)...", self._host, self._timeout)
client = self._get_client()
client.list()
logger.info("Ollama server is available")
return True
except TimeoutError:
logger.warning("Ollama server timeout at %s after %.0fs", self._host, self._timeout)
return False
except (ConnectionError, RuntimeError, OSError) as e:
logger.debug("Ollama server unreachable at %s: %s", self._host, e)
return False
```
---
## 3) Your “source of truth” for server host/port is scattered and easy to mis-point
### 1.2 Cloud Summarization API Calls - No Request Logging
### What I see (multiple defaults)
**File:** `src/noteflow/infrastructure/summarization/cloud_provider.py:238-282`
Backend/Python side:
```python
def _call_openai(self, user_prompt: str, system_prompt: str) -> tuple[str, int | None]:
try:
response = client.chat.completions.create(...) # No timing logged
except TimeoutError as e:
raise SummarizationTimeoutError(...) # No duration logged
```
* Default host/port are `"localhost"` and `50051`.
* The server binds to an IPv6-any address: `address = f"[::]:{self._port}"` (and `add_insecure_port(address)`).
**Status (2026-01-03):** Resolved — `log_timing` wraps OpenAI/Anthropic calls and response logging added.
Tauri/Rust side:
* Default saved preferences are `server_host: "localhost"`, `server_port: "50051"`.
* Theres also an env-var override `NOTEFLOW_SERVER_ADDRESS`.
* There is a test that expects the normalized default URL `http://localhost:50051`.
### Where this breaks in Docker + desktop apps
* **Docker port publish:** `localhost:50051` only works if your container publishes that port to the host (`-p 50051:50051`).
* **IPv6 “localhost” gotcha:** Many systems resolve `localhost` to `::1` first. Docker port publishing is not consistently reachable over IPv6 on all platforms/configs. Your backend binding is IPv6-friendly inside the container, but the *host-side published port* may still be IPv4-only depending on OS/Docker. (This is a common “it works in one place but not another” cause.)
### Fix
* For debugging, set the client host explicitly to **`127.0.0.1`** instead of `localhost` (or try both).
* Consider changing the Python server bind from `[::]:port` to `0.0.0.0:port` for simpler cross-platform Docker port publishing.
* Make the Tauri UI show the exact URL it is attempting and whether its coming from preferences vs env var.
**Fix:** Add `logger.info("Initiating OpenAI API call: model=%s", self._model)` before call, log duration after.
---
## 4) Connection and streaming events must match *exactly*, or the UI wont reflect reality
### 1.3 Google Calendar API - No Request Logging
### What I see
**File:** `src/noteflow/infrastructure/calendar/google_adapter.py:76-91`
* The connect command emits a connection change event and emits error info on failure.
* Streaming emits events using **string literals** `"transcript_update"` and `"stream_health"`.
* The JS side has a Tauri adapter that imports `TauriCommands` and `TauriEvents` from `tauri-constants`.
* The cached adapter references a `startTauriEventBridge` and checks `isTauriEnvironment`.
```python
async with httpx.AsyncClient() as client:
response = await client.get(url, params=params, headers=headers) # No logging
```
### Why that matters
**Status (2026-01-03):** Resolved — request timing logged via `log_timing`.
Even if gRPC is working, if your frontend isnt listening to the *actual* event names being emitted, youll see “nothing happens,” and youll likely never trigger downstream actions that depend on “connected/streaming” state.
### Fix
* Start the Tauri event bridge **immediately** on app boot in Tauri mode (not lazily after some UI action).
* Put all event names in one canonical place and generate them (avoid “string literal in Rust” vs “enum in TS”).
**Fix:** Log request start, duration, and response status.
---
## 5) Some “expected backend activity” wont exist because identity/workspace is local-only
### 1.4 OAuth Token Refresh - Missing Timing
### What I see
**File:** `src/noteflow/infrastructure/calendar/oauth_manager.py:211-222`
Your Tauri identity/workspace commands do not call the backend; they read/write local state:
```python
async def refresh_tokens(...) -> OAuthTokens:
response = await client.post(token_url, data=data) # No timing
```
* `get_current_user`, `list_workspaces`, `switch_workspace` are local to Tauri state.
### Why that matters
If youre using UI actions like “switch workspace” as your “did the backend get hit?” indicator: it wont. Those actions never touch Docker.
**Status (2026-01-03):** Resolved — refresh timing logged via `log_timing`.
---
## 6) Backend identity interceptor expects metadata your Rust client may not send
### 1.5 Webhook Delivery - Missing Initial Request Log
### What I see
**File:** `src/noteflow/infrastructure/webhooks/executor.py:107-237`
Backend interceptor looks for metadata keys like:
```python
async def deliver(...) -> WebhookDelivery:
for attempt in range(1, max_retries + 1):
_logger.debug("Webhook delivery attempt %d/%d", attempt, max_retries) # DEBUG only
```
* `"x-user-id"`, `"x-workspace-id"`, `"x-request-id"`
### Why it matters
If your backends logging or routing relies on those headers, and the Tauri client never attaches them, you could end up with:
* missing context in logs, or
* backend rejecting calls (depending on how the interceptor/handlers behave beyond the snippet).
### Fix
If you intend to use those headers:
* Add a tonic interceptor on the Rust client to inject them into every request (from the local identity/workspace state).
**Status (2026-01-03):** Resolved — info log at delivery start + completion.
---
## 7) You may be expecting “per-RPC logs” that the backend simply doesnt emit
### 1.6 Database Connection Creation - No Logging
From the Python server snippet we can see it binds and starts, but nothing shown indicates a “log each RPC method call” interceptor.
**File:** `src/noteflow/infrastructure/persistence/database.py:85-116`
So its possible your Tauri app *is* hitting the backend and you still see “no logs,” unless your service methods log.
```python
def create_engine_and_session_factory(...):
engine = sa_create_async_engine(database_url, pool_size=pool_size, ...)
# No logging of connection parameters
```
### Fix
Temporarily add a server interceptor that logs:
* method name
* peer address
* status code
* duration
Thats the fastest way to confirm traffic.
**Status (2026-01-03):** Resolved — engine creation logged with masked URL.
---
## The “most likely” single root cause given your symptom
### 1.7 Rust gRPC Client Connection - No Tracing
If I had to bet based strictly on the code: **youre starting recording/streaming without ever calling `connect` first**, and because `get_client()` hard-fails with `NotConnected`, the backend never sees any traffic.
**File:** `client/src-tauri/src/grpc/client/core.rs:174-197`
The second most likely is: **your Tauri app is using `mockAPI` / simulated transcription** (or something like `cachedAPI` in a mode that doesnt actually call into the Rust commands), so theres simply nothing to log.
```rust
async fn perform_connect(&self) -> Result<ServerInfo> {
let channel = endpoint.connect().await // No tracing before/after
.map_err(|e| Error::Connection(...))?;
```
**Status (2026-01-03):** Not implemented — see P1 sprint.
---
If you want, I can also produce a **concrete “call graph”** from “User clicks Start Recording” → TS API method → Tauri invoke command string → Rust command → gRPC RPC name, but your repomix snapshot elides large bodies of the TS adapters (so I cant see the actual `invoke(...)` command strings). The big wiring issues above are still valid and are enough to explain “no backend logs” by themselves.
## 2. Blocking/Long-Running Operations
### 2.1 NER Service - Silent Model Warmup
**File:** `src/noteflow/application/services/ner_service.py:185-197`
```python
await loop.run_in_executor(
None,
lambda: self._ner_engine.extract("warm up"), # No logging
)
```
**Status (2026-01-03):** Not implemented — see P1 sprint.
---
### 2.2 ASR Transcription - No Duration Logging
**File:** `src/noteflow/infrastructure/asr/engine.py:156-177`
```python
async def transcribe_async(...) -> list[AsrResult]:
return await loop.run_in_executor(None, ...) # No timing
```
**Status (2026-01-03):** Not implemented — see P1 sprint.
---
### 2.3 Diarization - Missing Blocking Operation Logging
**File:** `src/noteflow/infrastructure/diarization/engine.py:299-347`
```python
def diarize_full(...) -> Sequence[SpeakerTurn]:
logger.debug("Running offline diarization on %.2fs audio", ...) # DEBUG only
annotation = self._offline_pipeline(waveform, ...) # No end logging
```
**Status (2026-01-03):** Resolved — `log_timing` wraps diarization.
---
### 2.4 Diarization Job Timeout - No Pre-Timeout Context
**File:** `src/noteflow/grpc/_mixins/diarization/_jobs.py:173-186`
```python
async with asyncio.timeout(DIARIZATION_TIMEOUT_SECONDS):
updated_count = await self.refine_speaker_diarization(...)
# No logging of timeout value before entering block
```
**Status (2026-01-03):** Resolved — timeout value logged in job handler.
---
## 3. Error Handling - Silent Failures
### 3.1 Silent ValueError Returns
**Files:**
- `src/noteflow/grpc/_mixins/meeting.py:64-67` - workspace UUID parse
- `src/noteflow/grpc/_mixins/converters.py:76-79` - meeting ID parse
- `src/noteflow/grpc/_mixins/diarization/_jobs.py:84-87` - meeting ID validation
- `src/noteflow/infrastructure/triggers/calendar.py:141-144` - datetime parse
```python
try:
UUID(workspace_id)
except ValueError:
return None # Silent failure, no logging
```
**Status (2026-01-03):** Not implemented — add WARN + redaction (P1 sprint).
---
### 3.2 Silent Settings Fallbacks
**Files:**
- `src/noteflow/infrastructure/webhooks/executor.py:56-65`
- `src/noteflow/infrastructure/summarization/ollama_provider.py:44-48`
- `src/noteflow/infrastructure/summarization/cloud_provider.py:48-52`
- `src/noteflow/grpc/_mixins/diarization_job.py:63-66`
```python
except Exception:
return DEFAULT_VALUES # No logging that fallback occurred
```
**Status (2026-01-03):** Not implemented — add warning logs (P1 sprint).
---
### 3.3 gRPC Client Stub Unavailable - Silent Returns
**Files:** `src/noteflow/grpc/_client_mixins/*.py` (multiple locations)
```python
if not self._stub:
return None # No logging of connection issue
```
**Status (2026-01-03):** Not implemented — add rate-limited warn log (P1 sprint).
---
## 4. State Transitions and Lifecycle
### 4.1 Meeting State Changes Not Logged
**Status (2026-01-03):** Resolved — meeting service logs transitions.
---
### 4.2 Diarization Job State - Missing Previous State
**File:** `src/noteflow/grpc/_mixins/diarization/_jobs.py:147-171`
```python
await repo.diarization_jobs.update_status(job_id, JOB_STATUS_RUNNING, ...)
**Status (2026-01-03):** Resolved state transitions logged.
```
---
### 4.3 Segmenter State Machine - No Transition Logging
**File:** `src/noteflow/infrastructure/asr/segmenter.py:121-127`
```python
if is_speech:
self._state = SegmenterState.SPEECH # No logging of IDLE -> SPEECH
```
**Status (2026-01-03):** Not implemented — see P1 sprint.
---
### 4.4 Stream Cleanup - No Logging
**File:** `src/noteflow/grpc/_mixins/streaming/_cleanup.py:14-34`
```python
def cleanup_stream_resources(host, meeting_id):
# Multiple cleanup operations, no completion log
host._active_streams.discard(meeting_id)
```
---
### 4.5 Diarization Session Close - DEBUG Only
**File:** `src/noteflow/infrastructure/diarization/session.py:145-159`
**Status (2026-01-03):** Not implemented — see P2 sprint.
```python
def close(self) -> None:
logger.debug("Session %s closed", self.meeting_id) # Should be INFO
```
---
### 4.6 Background Task Spawning - No Task ID
**File:** `src/noteflow/grpc/_mixins/diarization/_jobs.py:130-132`
**Status (2026-01-03):** Not implemented — see P2 sprint.
```python
task = asyncio.create_task(self._run_diarization_job(job_id, num_speakers))
self._diarization_tasks[job_id] = task # No logging of task creation
```
---
### 4.7 Audio Flush Thread - No Start/End Logging
**File:** `src/noteflow/infrastructure/audio/writer.py:135-157`
**Status (2026-01-03):** Not implemented — see P2 sprint.
```python
self._flush_thread.start() # No logging
# ...
def _periodic_flush_loop(self):
while not self._stop_flush.wait(...):
# No entry/exit logging for loop
```
---
## 5. Database Operations
### 5.1 BaseRepository - No Query Timing
**File:** `src/noteflow/infrastructure/persistence/repositories/_base.py`
**Status (2026-01-03):** Not implemented — see P2 sprint.
All methods (`_execute_scalar`, `_execute_scalars`, `_add_and_flush`, `_delete_and_flush`, `_add_all_and_flush`, `_execute_update`, `_execute_delete`) have no timing or logging.
---
### 5.2 Unit of Work - No Transaction Logging
**File:** `src/noteflow/infrastructure/persistence/unit_of_work.py:220-296`
**Status (2026-01-03):** Not implemented — see P2 sprint.
---
### 5.3 Repository CRUD Operations - No Logging
**Files:**
- `meeting_repo.py` - create, update, delete, list_all
- `segment_repo.py` - add_batch, update_embedding, update_speaker
- `summary_repo.py` - save (upsert with cascades)
- `diarization_job_repo.py` - create, mark_running_as_failed, prune_completed
- `entity_repo.py` - save_batch, delete_by_meeting
- `webhook_repo.py` - create, add_delivery
- `integration_repo.py` - set_secrets
- `usage_event_repo.py` - add_batch, delete_before
- `preferences_repo.py` - set_bulk
**Status (2026-01-03):** Not implemented — see P2 sprint.
---
## 6. File System Operations
### 6.1 Meeting Directory Creation - Not Logged
**File:** `src/noteflow/infrastructure/audio/writer.py:109-111`
**Status (2026-01-03):** Resolved — audio writer open logs meeting and dir.
```python
self._meeting_dir.mkdir(parents=True, exist_ok=True) # No logging
```
---
### 6.2 Manifest Read/Write - Not Logged
**File:** `src/noteflow/infrastructure/audio/writer.py:122-123`
**Status (2026-01-03):** Partially implemented — open logged, manifest write still unlogged (P2 sprint).
```python
manifest_path.write_text(json.dumps(manifest, indent=2)) # No logging
```
---
### 6.3 Asset Deletion - Silent No-Op
**File:** `src/noteflow/infrastructure/persistence/repositories/asset_repo.py:49-51`
**Status (2026-01-03):** Not implemented — see P2 sprint.
```python
if meeting_dir.exists():
shutil.rmtree(meeting_dir)
logger.info("Deleted meeting assets at %s", meeting_dir)
# No log when directory doesn't exist
```
---
## 7. Export Operations
### 7.1 PDF Export - No Timing
**File:** `src/noteflow/infrastructure/export/pdf.py:161-186`
```python
def export(self, meeting, segments) -> bytes:
pdf_bytes = weasy_html(string=html_content).write_pdf() # No timing
return pdf_bytes
```
**Status (2026-01-03):** Not implemented — see P2 sprint.
---
### 7.2 Markdown/HTML Export - No Logging
**Files:** `markdown.py:37-89`, `html.py:158-187`
**Status (2026-01-03):** Not implemented — see P2 sprint.
No logging of export operations.
---
## 8. Initialization Sequences
### 8.1 Lazy Model Loading - Not Logged at Load Time
**Files:**
- `NerEngine._ensure_loaded()` - spaCy model load
- `DiarizationEngine` - pyannote model load
- `OllamaSummarizer._get_client()` - client creation
**Status (2026-01-03):** Partially implemented — some model loads logged, NER warmup not logged (P1 sprint).
---
### 8.2 Singleton Creation - Silent
**File:** `src/noteflow/infrastructure/metrics/collector.py:168-178`
**Status (2026-01-03):** Not implemented — out of P1/P2 scope unless needed.
```python
def get_metrics_collector() -> MetricsCollector:
global _metrics_collector
if _metrics_collector is None:
_metrics_collector = MetricsCollector() # No logging
return _metrics_collector
```
---
### 8.3 Provider Registration - DEBUG Level
**File:** `src/noteflow/application/services/summarization_service.py:119-127`
**Status (2026-01-03):** Partially implemented — still debug in factory registration; consider if INFO needed.
```python
def register_provider(self, mode, provider):
logger.debug("Registered %s provider", mode.value) # Should be INFO at startup
```
---
## Summary Statistics
| Category | Issue Count | Severity |
|----------|-------------|----------|
| Network/External Services | 7 | CRITICAL (mostly resolved) |
| Blocking/Long-Running | 4 | HIGH (partially unresolved) |
| Error Handling | 10+ | HIGH (partially unresolved) |
| State Transitions | 7 | MEDIUM (partially unresolved) |
| Database Operations | 30+ | MEDIUM (unresolved) |
| File System | 3 | LOW (partially unresolved) |
| Export | 3 | LOW (unresolved) |
| Initialization | 5 | MEDIUM (partially unresolved) |
| **Total** | **100+** | - |
---
## Recommended Logging Pattern
For all async/blocking operations:
```python
logger.info("Starting <operation>: context=%s", context)
start = time.perf_counter()
try:
result = await some_operation()
elapsed_ms = (time.perf_counter() - start) * 1000
logger.info("<Operation> completed: result_count=%d, duration_ms=%.2f", len(result), elapsed_ms)
except TimeoutError:
elapsed_ms = (time.perf_counter() - start) * 1000
logger.error("<Operation> timeout after %.2fms", elapsed_ms)
raise
except Exception as e:
elapsed_ms = (time.perf_counter() - start) * 1000
logger.error("<Operation> failed after %.2fms: %s", elapsed_ms, e)
raise
```
---
## Priority Fixes
### P0 - Fix Immediately
1. (Resolved) Ollama `is_available` timeout logging
2. (Resolved) Summarization factory timing
3. (Resolved) Database migration progress logging
### P1 - Fix This Sprint
4. (Resolved) All external HTTP calls (calendar, OAuth, webhooks)
5. All `run_in_executor` calls (ASR, NER, diarization)
6. Silent ValueError returns
### P2 - Fix Next Sprint
7. Repository CRUD logging
8. State transition logging (segmenter + diarization session)
9. Background task lifecycle logging
---
## Resolved Issues
- ~~Server-side state volatility~~ → Diarization jobs persisted to DB
- ~~Hardcoded directory paths~~ → `asset_path` column added to meetings
- ~~Synchronous blocking in async gRPC~~ → `run_in_executor` for diarization
- ~~Summarization consent not persisted~~ → Stored in `user_preferences` table
- ~~VU meter update throttling~~ → 20fps throttle implemented
- ~~Webhook infrastructure missing~~ → Full webhook subsystem implemented
- ~~Integration/OAuth token storage~~ → `IntegrationSecretModel` for secure storage

View File

@@ -12,7 +12,7 @@
"files": true,
"removeComments": true,
"removeEmptyLines": true,
"compress": false,
"compress": true,
"topFilesLength": 5,
"showLineNumbers": true,
"truncateBase64": false,

View File

@@ -0,0 +1,451 @@
#!/usr/bin/env python3
"""A/B harness for streaming configuration latency and WER comparisons."""
from __future__ import annotations
import argparse
import json
import re
import threading
import time
import wave
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Iterable, Protocol, cast
import numpy as np
from numpy.typing import NDArray
from noteflow.config.settings import get_settings
from noteflow.grpc.client import NoteFlowClient
from noteflow.grpc.proto import noteflow_pb2
from noteflow.infrastructure.audio.reader import MeetingAudioReader
from noteflow.infrastructure.logging import LoggingConfig, configure_logging
from noteflow.infrastructure.security.crypto import AesGcmCryptoBox
from noteflow.infrastructure.security.keystore import KeyringKeyStore
if TYPE_CHECKING:
from noteflow.grpc._types import TranscriptSegment
PRESETS: dict[str, dict[str, float]] = {
"responsive": {
"partial_cadence_seconds": 0.8,
"min_partial_audio_seconds": 0.3,
"max_segment_duration_seconds": 15.0,
"min_speech_duration_seconds": 0.2,
"trailing_silence_seconds": 0.3,
"leading_buffer_seconds": 0.1,
},
"balanced": {
"partial_cadence_seconds": 1.5,
"min_partial_audio_seconds": 0.5,
"max_segment_duration_seconds": 30.0,
"min_speech_duration_seconds": 0.3,
"trailing_silence_seconds": 0.5,
"leading_buffer_seconds": 0.2,
},
"accurate": {
"partial_cadence_seconds": 2.5,
"min_partial_audio_seconds": 0.8,
"max_segment_duration_seconds": 45.0,
"min_speech_duration_seconds": 0.4,
"trailing_silence_seconds": 0.8,
"leading_buffer_seconds": 0.25,
},
}
@dataclass(frozen=True)
class AudioCase:
label: str
audio: NDArray[np.float32]
sample_rate: int
reference: str | None
@dataclass
class StreamingStats:
first_partial_at: float | None = None
first_final_at: float | None = None
partial_count: int = 0
final_count: int = 0
@dataclass
class RunResult:
label: str
meeting_id: str
audio_duration_s: float
wall_time_s: float
first_partial_latency_s: float | None
first_final_latency_s: float | None
transcript: str
wer: float | None
segments: int
class StreamingConfigStub(Protocol):
def GetStreamingConfiguration(
self,
request: noteflow_pb2.GetStreamingConfigurationRequest,
) -> noteflow_pb2.GetStreamingConfigurationResponse: ...
def UpdateStreamingConfiguration(
self,
request: noteflow_pb2.UpdateStreamingConfigurationRequest,
) -> noteflow_pb2.UpdateStreamingConfigurationResponse: ...
def _normalize_text(text: str) -> list[str]:
cleaned = re.sub(r"[^a-z0-9']+", " ", text.lower())
return [token for token in cleaned.split() if token]
def _word_error_rate(reference: str, hypothesis: str) -> float:
ref_tokens = _normalize_text(reference)
hyp_tokens = _normalize_text(hypothesis)
if not ref_tokens:
return 0.0 if not hyp_tokens else 1.0
rows = len(ref_tokens) + 1
cols = len(hyp_tokens) + 1
dp = [[0] * cols for _ in range(rows)]
for i in range(rows):
dp[i][0] = i
for j in range(cols):
dp[0][j] = j
for i in range(1, rows):
for j in range(1, cols):
cost = 0 if ref_tokens[i - 1] == hyp_tokens[j - 1] else 1
dp[i][j] = min(
dp[i - 1][j] + 1,
dp[i][j - 1] + 1,
dp[i - 1][j - 1] + cost,
)
return dp[-1][-1] / len(ref_tokens)
def _load_wav(path: Path) -> tuple[NDArray[np.float32], int]:
with wave.open(str(path), "rb") as wav_file:
channels = wav_file.getnchannels()
sample_rate = wav_file.getframerate()
sample_width = wav_file.getsampwidth()
frame_count = wav_file.getnframes()
raw = wav_file.readframes(frame_count)
if sample_width != 2:
raise ValueError("Only 16-bit PCM WAV files are supported")
pcm16 = np.frombuffer(raw, dtype=np.int16)
if channels > 1:
pcm16 = pcm16.reshape(-1, channels).mean(axis=1).astype(np.int16)
audio = pcm16.astype(np.float32) / 32767.0
return audio, sample_rate
def _load_meeting_audio(
meeting_id: str,
asset_path: str | None,
meetings_dir: Path,
) -> tuple[NDArray[np.float32], int]:
crypto = AesGcmCryptoBox(KeyringKeyStore())
reader = MeetingAudioReader(crypto, meetings_dir)
chunks = reader.load_meeting_audio(meeting_id, asset_path)
if not chunks:
return np.array([], dtype=np.float32), reader.sample_rate
audio = np.concatenate([chunk.frames for chunk in chunks]).astype(np.float32)
return audio, reader.sample_rate
def _chunk_audio(
audio: NDArray[np.float32],
sample_rate: int,
chunk_ms: int,
) -> Iterable[NDArray[np.float32]]:
chunk_size = max(1, int(sample_rate * (chunk_ms / 1000)))
for start in range(0, audio.shape[0], chunk_size):
yield audio[start : start + chunk_size]
def _get_streaming_config(stub: StreamingConfigStub) -> dict[str, float]:
response = stub.GetStreamingConfiguration(noteflow_pb2.GetStreamingConfigurationRequest())
config = response.configuration
return {
"partial_cadence_seconds": config.partial_cadence_seconds,
"min_partial_audio_seconds": config.min_partial_audio_seconds,
"max_segment_duration_seconds": config.max_segment_duration_seconds,
"min_speech_duration_seconds": config.min_speech_duration_seconds,
"trailing_silence_seconds": config.trailing_silence_seconds,
"leading_buffer_seconds": config.leading_buffer_seconds,
}
def _apply_streaming_config(
stub: StreamingConfigStub,
config: dict[str, float],
) -> None:
request = noteflow_pb2.UpdateStreamingConfigurationRequest(**config)
stub.UpdateStreamingConfiguration(request)
def _read_reference(reference_path: str | None) -> str | None:
if not reference_path:
return None
path = Path(reference_path)
if not path.exists():
raise FileNotFoundError(f"Reference file not found: {path}")
return path.read_text(encoding="utf-8")
def _make_case(
label: str,
meeting_id: str | None,
asset_path: str | None,
wav_path: str | None,
reference_path: str | None,
) -> AudioCase:
settings = get_settings()
reference = _read_reference(reference_path)
if meeting_id:
audio, sample_rate = _load_meeting_audio(
meeting_id,
asset_path,
Path(settings.meetings_dir),
)
elif wav_path:
audio, sample_rate = _load_wav(Path(wav_path))
else:
raise ValueError("Either meeting_id or wav_path must be provided.")
return AudioCase(
label=label,
audio=audio,
sample_rate=sample_rate,
reference=reference,
)
def _run_streaming_case(
client: NoteFlowClient,
case: AudioCase,
config_label: str,
config: dict[str, float],
chunk_ms: int,
realtime: bool,
final_wait_seconds: float,
) -> RunResult:
stub = client.require_connection()
_apply_streaming_config(stub, config)
meeting_title = f"AB {case.label} [{config_label}]"
meeting = client.create_meeting(meeting_title)
if meeting is None:
raise RuntimeError("Failed to create meeting")
stats = StreamingStats()
lock = threading.Lock()
def on_transcript(segment: TranscriptSegment) -> None:
now = time.time()
with lock:
if segment.is_final:
stats.final_count += 1
if stats.first_final_at is None:
stats.first_final_at = now
else:
stats.partial_count += 1
if stats.first_partial_at is None:
stats.first_partial_at = now
client.on_transcript = on_transcript
if not client.start_streaming(meeting.id):
raise RuntimeError("Failed to start streaming")
start_time = time.time()
sent_samples = 0
try:
for chunk in _chunk_audio(case.audio, case.sample_rate, chunk_ms):
if realtime:
target_time = start_time + (sent_samples / case.sample_rate)
sleep_for = target_time - time.time()
if sleep_for > 0:
time.sleep(sleep_for)
while not client.send_audio(chunk, timestamp=time.time()):
time.sleep(0.01)
sent_samples += chunk.shape[0]
finally:
time.sleep(final_wait_seconds)
client.stop_streaming()
client.stop_meeting(meeting.id)
segments = client.get_meeting_segments(meeting.id)
transcript = " ".join(seg.text.strip() for seg in segments if seg.text.strip())
end_time = time.time()
audio_duration = case.audio.shape[0] / case.sample_rate if case.sample_rate else 0.0
first_partial_latency = (
(stats.first_partial_at - start_time) if stats.first_partial_at else None
)
first_final_latency = (
(stats.first_final_at - start_time) if stats.first_final_at else None
)
wer = _word_error_rate(case.reference, transcript) if case.reference else None
return RunResult(
label=f"{case.label}:{config_label}",
meeting_id=meeting.id,
audio_duration_s=audio_duration,
wall_time_s=end_time - start_time,
first_partial_latency_s=first_partial_latency,
first_final_latency_s=first_final_latency,
transcript=transcript,
wer=wer,
segments=len(segments),
)
def _load_cases_from_json(path: str) -> list[AudioCase]:
raw = json.loads(Path(path).read_text(encoding="utf-8"))
if not isinstance(raw, list):
raise ValueError("Cases JSON must be a list of case objects.")
payload = cast(list[object], raw)
entries: list[dict[str, object]] = []
for item in payload:
if not isinstance(item, dict):
raise ValueError("Each case must be an object.")
entries.append(cast(dict[str, object], item))
cases: list[AudioCase] = []
for entry_dict in entries:
cases.append(
_make_case(
label=str(entry_dict.get("label", "case")),
meeting_id=cast(str | None, entry_dict.get("meeting_id")),
asset_path=cast(str | None, entry_dict.get("asset_path")),
wav_path=cast(str | None, entry_dict.get("wav_path")),
reference_path=cast(str | None, entry_dict.get("reference_path")),
)
)
return cases
def _format_latency(value: float | None) -> str:
if value is None:
return "n/a"
return f"{value:.2f}s"
def _print_results(results: list[RunResult]) -> None:
for result in results:
print("")
print(f"Case {result.label}")
print(f" meeting_id: {result.meeting_id}")
print(f" audio_duration_s: {result.audio_duration_s:.2f}")
print(f" wall_time_s: {result.wall_time_s:.2f}")
print(f" first_partial_latency: {_format_latency(result.first_partial_latency_s)}")
print(f" first_final_latency: {_format_latency(result.first_final_latency_s)}")
print(f" segments: {result.segments}")
if result.wer is not None:
print(f" WER: {result.wer:.3f}")
def _build_config(value: str | None, label: str) -> dict[str, float]:
if value is None:
raise ValueError(f"Missing config for {label}")
if value in PRESETS:
return PRESETS[value]
path = Path(value)
if path.exists():
data = json.loads(path.read_text(encoding="utf-8"))
if not isinstance(data, dict):
raise ValueError(f"Config file must be an object: {path}")
payload = cast(dict[str, object], data)
config: dict[str, float] = {}
for key, raw_value in payload.items():
if isinstance(raw_value, (int, float)):
config[str(key)] = float(raw_value)
if not config:
raise ValueError(f"Config file has no numeric values: {path}")
return config
raise ValueError(f"Unknown preset or config path: {value}")
def main() -> None:
parser = argparse.ArgumentParser(description="A/B harness for streaming config.")
parser.add_argument("--server", default="localhost:50051")
parser.add_argument("--meeting-id", help="Meeting ID to replay audio from.")
parser.add_argument("--asset-path", help="Override meeting asset path.")
parser.add_argument("--wav", help="WAV file to stream instead of a meeting.")
parser.add_argument("--reference", help="Reference transcript text file.")
parser.add_argument(
"--cases",
help="JSON file describing multiple cases (label, meeting_id/wav_path, reference_path).",
)
parser.add_argument("--preset-a", default="responsive")
parser.add_argument("--preset-b", default="balanced")
parser.add_argument("--chunk-ms", type=int, default=200)
parser.add_argument("--realtime", action="store_true")
parser.add_argument("--final-wait", type=float, default=2.0)
args = parser.parse_args()
configure_logging(LoggingConfig(level="INFO"))
if args.cases:
cases = _load_cases_from_json(args.cases)
else:
cases = [
_make_case(
label="sample",
meeting_id=args.meeting_id,
asset_path=args.asset_path,
wav_path=args.wav,
reference_path=args.reference,
)
]
config_a = _build_config(args.preset_a, "A")
config_b = _build_config(args.preset_b, "B")
client = NoteFlowClient(server_address=args.server)
if not client.connect():
raise RuntimeError(f"Unable to connect to server at {args.server}")
stub = cast(StreamingConfigStub, client.require_connection())
original_config = _get_streaming_config(stub)
results: list[RunResult] = []
try:
for case in cases:
results.append(
_run_streaming_case(
client,
case,
"A",
config_a,
args.chunk_ms,
args.realtime,
args.final_wait,
)
)
results.append(
_run_streaming_case(
client,
case,
"B",
config_b,
args.chunk_ms,
args.realtime,
args.final_wait,
)
)
finally:
_apply_streaming_config(stub, original_config)
client.disconnect()
_print_results(results)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,192 @@
"""Helpers for persisting streaming configuration selections."""
from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING, Final, TypedDict, cast
from noteflow.infrastructure.logging import get_logger
if TYPE_CHECKING:
from noteflow.config.settings import Settings
logger = get_logger(__name__)
STREAMING_CONFIG_PARTIAL_CADENCE_KEY: Final[str] = "partial_cadence_seconds"
STREAMING_CONFIG_MIN_PARTIAL_AUDIO_KEY: Final[str] = "min_partial_audio_seconds"
STREAMING_CONFIG_MAX_SEGMENT_DURATION_KEY: Final[str] = "max_segment_duration_seconds"
STREAMING_CONFIG_MIN_SPEECH_DURATION_KEY: Final[str] = "min_speech_duration_seconds"
STREAMING_CONFIG_TRAILING_SILENCE_KEY: Final[str] = "trailing_silence_seconds"
STREAMING_CONFIG_LEADING_BUFFER_KEY: Final[str] = "leading_buffer_seconds"
STREAMING_CONFIG_KEYS: Final[tuple[str, ...]] = (
STREAMING_CONFIG_PARTIAL_CADENCE_KEY,
STREAMING_CONFIG_MIN_PARTIAL_AUDIO_KEY,
STREAMING_CONFIG_MAX_SEGMENT_DURATION_KEY,
STREAMING_CONFIG_MIN_SPEECH_DURATION_KEY,
STREAMING_CONFIG_TRAILING_SILENCE_KEY,
STREAMING_CONFIG_LEADING_BUFFER_KEY,
)
STREAMING_CONFIG_RANGES: Final[dict[str, tuple[float, float]]] = {
STREAMING_CONFIG_PARTIAL_CADENCE_KEY: (0.5, 10.0),
STREAMING_CONFIG_MIN_PARTIAL_AUDIO_KEY: (0.1, 5.0),
STREAMING_CONFIG_MAX_SEGMENT_DURATION_KEY: (5.0, 180.0),
STREAMING_CONFIG_MIN_SPEECH_DURATION_KEY: (0.1, 2.0),
STREAMING_CONFIG_TRAILING_SILENCE_KEY: (0.1, 2.0),
STREAMING_CONFIG_LEADING_BUFFER_KEY: (0.05, 1.0),
}
class StreamingConfigPreference(TypedDict, total=False):
partial_cadence_seconds: float
min_partial_audio_seconds: float
max_segment_duration_seconds: float
min_speech_duration_seconds: float
trailing_silence_seconds: float
leading_buffer_seconds: float
class StreamingConfigValues(TypedDict):
partial_cadence_seconds: float
min_partial_audio_seconds: float
max_segment_duration_seconds: float
min_speech_duration_seconds: float
trailing_silence_seconds: float
leading_buffer_seconds: float
@dataclass(frozen=True, slots=True)
class StreamingConfig:
"""Runtime streaming configuration values."""
partial_cadence_seconds: float
min_partial_audio_seconds: float
max_segment_duration_seconds: float
min_speech_duration_seconds: float
trailing_silence_seconds: float
leading_buffer_seconds: float
@dataclass(frozen=True, slots=True)
class StreamingConfigResolution:
config: StreamingConfig
used_preferences: bool
had_fallback: bool
def build_default_streaming_config(settings: Settings) -> StreamingConfig:
"""Build default streaming configuration from settings."""
raw = StreamingConfig(
partial_cadence_seconds=settings.grpc_partial_cadence_seconds,
min_partial_audio_seconds=settings.grpc_min_partial_audio_seconds,
max_segment_duration_seconds=settings.grpc_max_segment_duration_seconds,
min_speech_duration_seconds=settings.grpc_min_speech_duration_seconds,
trailing_silence_seconds=settings.grpc_trailing_silence_seconds,
leading_buffer_seconds=settings.grpc_leading_buffer_seconds,
)
resolved, had_fallback = _resolve_config(_build_values_dict(raw), raw)
if had_fallback:
logger.warning("streaming_config_defaults_clamped")
return resolved
def build_streaming_config_preference(
config: StreamingConfig,
) -> StreamingConfigPreference:
"""Build a preference payload for streaming config persistence."""
values = _build_values_dict(config)
preference: dict[str, float] = {}
for key, value in values.items():
min_val, max_val = STREAMING_CONFIG_RANGES[key]
preference[key] = _clamp(value, min_val, max_val)
return cast(StreamingConfigPreference, preference)
def resolve_streaming_config_preference(
raw_value: object,
fallback: StreamingConfig,
) -> StreamingConfigResolution | None:
"""Resolve a stored streaming config preference into safe runtime values."""
parsed = _parse_preference(raw_value)
if parsed is None:
return None
resolved, had_fallback = _resolve_config(parsed, fallback)
return StreamingConfigResolution(
config=resolved,
used_preferences=True,
had_fallback=had_fallback,
)
def _parse_preference(raw_value: object) -> StreamingConfigPreference | None:
if not isinstance(raw_value, dict):
return None
raw_dict = cast(dict[str, object], raw_value)
preference: StreamingConfigPreference = {}
for key in STREAMING_CONFIG_KEYS:
value = _read_float(raw_dict.get(key))
if value is not None:
preference[key] = value
return preference if preference else None
def _resolve_config(
preference: StreamingConfigPreference,
fallback: StreamingConfig,
) -> tuple[StreamingConfig, bool]:
values: dict[str, float] = {}
had_fallback = False
for key in STREAMING_CONFIG_KEYS:
fallback_value = _get_fallback_value(fallback, key)
resolved, used_fallback = _resolve_value(
key,
preference.get(key),
fallback_value,
)
values[key] = resolved
had_fallback = had_fallback or used_fallback
return StreamingConfig(**cast(StreamingConfigValues, values)), had_fallback
def _build_values_dict(config: StreamingConfig) -> StreamingConfigValues:
values = {key: _get_fallback_value(config, key) for key in STREAMING_CONFIG_KEYS}
return cast(StreamingConfigValues, values)
def _get_fallback_value(config: StreamingConfig, key: str) -> float:
return cast(float, getattr(config, key))
def _resolve_value(
key: str,
preferred: float | None,
fallback: float,
) -> tuple[float, bool]:
min_val, max_val = STREAMING_CONFIG_RANGES[key]
if preferred is not None and min_val <= preferred <= max_val:
return preferred, False
resolved = _clamp(fallback, min_val, max_val)
if preferred is not None:
return resolved, True
if resolved != fallback:
logger.warning("streaming_config_fallback_out_of_range", key=key, value=fallback)
return resolved, True
return resolved, False
def _clamp(value: float, min_val: float, max_val: float) -> float:
return min(max(value, min_val), max_val)
def _read_float(value: object) -> float | None:
if isinstance(value, (float, int)):
return float(value)
return None

View File

@@ -49,6 +49,7 @@ from noteflow.config.constants.domain import (
SCHEMA_TYPE_STRING,
SETTING_ASR_CONFIG,
SETTING_CLOUD_CONSENT_GRANTED,
SETTING_STREAMING_CONFIG,
SPACY_MODEL_LG,
SPACY_MODEL_MD,
SPACY_MODEL_SM,
@@ -196,6 +197,7 @@ __all__ = [
"SECONDS_PER_HOUR",
"SETTING_ASR_CONFIG",
"SETTING_CLOUD_CONSENT_GRANTED",
"SETTING_STREAMING_CONFIG",
"SPACY_MODEL_LG",
"SPACY_MODEL_MD",
"SPACY_MODEL_SM",

View File

@@ -78,6 +78,9 @@ SETTING_CLOUD_CONSENT_GRANTED: Final[str] = "cloud_consent_granted"
SETTING_ASR_CONFIG: Final[str] = "asr_config"
"""Settings field name for persisted ASR configuration."""
SETTING_STREAMING_CONFIG: Final[str] = "streaming_config"
"""Settings field name for persisted streaming configuration."""
# =============================================================================
# Trigger Actions
# =============================================================================

View File

@@ -39,6 +39,10 @@ class Settings(TriggerSettings):
NOTEFLOW_RETENTION_ENABLED: Enable automatic retention policy (default: False)
NOTEFLOW_RETENTION_DAYS: Days to retain completed meetings (default: 90)
NOTEFLOW_RETENTION_CHECK_INTERVAL_HOURS: Hours between retention checks (default: 24)
NOTEFLOW_GRPC_MAX_SEGMENT_DURATION_SECONDS: Max segment duration (default: 30.0)
NOTEFLOW_GRPC_MIN_SPEECH_DURATION_SECONDS: Min speech duration (default: 0.3)
NOTEFLOW_GRPC_TRAILING_SILENCE_SECONDS: Trailing silence duration (default: 0.5)
NOTEFLOW_GRPC_LEADING_BUFFER_SECONDS: Leading buffer duration (default: 0.2)
"""
model_config = SettingsConfigDict(
@@ -181,6 +185,22 @@ class Settings(TriggerSettings):
float,
Field(default=0.5, ge=0.1, le=5.0, description="Minimum audio for partial inference"),
]
grpc_max_segment_duration_seconds: Annotated[
float,
Field(default=30.0, ge=5.0, le=180.0, description="Maximum segment duration in seconds"),
]
grpc_min_speech_duration_seconds: Annotated[
float,
Field(default=0.3, ge=0.1, le=2.0, description="Minimum speech duration in seconds"),
]
grpc_trailing_silence_seconds: Annotated[
float,
Field(default=0.5, ge=0.1, le=2.0, description="Trailing silence duration in seconds"),
]
grpc_leading_buffer_seconds: Annotated[
float,
Field(default=0.2, ge=0.05, le=1.0, description="Leading buffer duration in seconds"),
]
# Webhook settings
webhook_timeout_seconds: Annotated[

View File

@@ -3,6 +3,7 @@
from ._types import GrpcContext, GrpcStatusContext
from .annotation import AnnotationMixin
from .asr_config import AsrConfigMixin
from .streaming_config import StreamingConfigMixin
from .calendar import CalendarMixin
from .diarization import DiarizationMixin
from .diarization_job import DiarizationJobMixin
@@ -27,6 +28,7 @@ from .webhooks import WebhooksMixin
__all__ = [
"AnnotationMixin",
"AsrConfigMixin",
"StreamingConfigMixin",
"CalendarMixin",
"DiarizationJobMixin",
"DiarizationMixin",

View File

@@ -22,6 +22,7 @@ if TYPE_CHECKING:
from noteflow.application.services.summarization import SummarizationService
from noteflow.application.services.webhook_service import WebhookService
from noteflow.domain.entities import SyncRun
from noteflow.application.services.streaming_config_persistence import StreamingConfig
from noteflow.infrastructure.asr import FasterWhisperEngine, Segmenter, StreamingVad
from noteflow.infrastructure.audio.writer import MeetingAudioWriter
from noteflow.infrastructure.auth.oidc_registry import OidcAuthService
@@ -67,6 +68,9 @@ class ServicerState(Protocol):
active_streams: set[str]
stop_requested: set[str] # Meeting IDs with pending stop requests
# Streaming tuning configuration
streaming_config: StreamingConfig
# Chunk sequence tracking for acknowledgments
chunk_sequences: dict[str, int] # Highest received sequence per meeting
chunk_counts: dict[str, int] # Chunks since last ack (emit ack every 5)

View File

@@ -0,0 +1,205 @@
"""Post-meeting processing: auto-summarization and completion."""
from __future__ import annotations
import asyncio
from dataclasses import dataclass
from typing import TYPE_CHECKING
from uuid import UUID
from noteflow.domain.ports.unit_of_work import UnitOfWork
from noteflow.domain.value_objects import MeetingId, MeetingState
from noteflow.infrastructure.logging import get_logger, log_state_transition
if TYPE_CHECKING:
from noteflow.application.services.summarization import SummarizationService
from noteflow.application.services.webhook_service import WebhookService
from noteflow.domain.entities import Meeting, Segment, Summary
from ..protocols import ServicerHost
logger = get_logger(__name__)
@dataclass(slots=True)
class _SummaryCompletionContext:
repo: UnitOfWork
meeting: Meeting
meeting_id: str
segments: list[Segment]
summary: Summary
async def _generate_summary_and_complete(
host: ServicerHost,
meeting_id: str,
summarization_service: SummarizationService,
) -> None:
"""Generate summary for meeting and transition to COMPLETED state."""
result = await _process_summary(
repo_provider=host, meeting_id=meeting_id, service=summarization_service
)
if result is None:
return
meeting, saved_summary = result
await _trigger_summary_webhook(host.webhook_service, meeting, saved_summary)
async def _process_summary(
repo_provider: ServicerHost,
meeting_id: str,
service: SummarizationService,
) -> tuple[Meeting, Summary] | None:
from ..summarization._summary_generation import summarize_or_placeholder
parsed_id = MeetingId(UUID(meeting_id))
repo: UnitOfWork
async with repo_provider.create_repository_provider() as repo:
meeting = await _load_meeting(repo, parsed_id, meeting_id)
if meeting is None or not _should_process_meeting(meeting, meeting_id):
return None
segments = await _load_segments(repo, parsed_id)
if not segments:
await _complete_without_summary(repo, meeting, meeting_id)
return None
summary = await summarize_or_placeholder(
service,
parsed_id,
segments,
style_prompt=None,
)
context = _SummaryCompletionContext(
repo=repo,
meeting=meeting,
meeting_id=meeting_id,
segments=segments,
summary=summary,
)
saved_summary = await _save_summary_and_complete(context)
return meeting, saved_summary
async def _load_meeting(
repo: UnitOfWork,
meeting_id: MeetingId,
meeting_id_str: str,
) -> Meeting | None:
meeting = await repo.meetings.get(meeting_id)
if meeting is None:
logger.warning("Post-processing: meeting not found", meeting_id=meeting_id_str)
return meeting
def _should_process_meeting(meeting: Meeting, meeting_id: str) -> bool:
if meeting.state != MeetingState.STOPPED:
logger.debug(
"Post-processing: skipping, meeting not in STOPPED state",
meeting_id=meeting_id,
state=meeting.state.name,
)
return False
return True
async def _load_segments(
repo: UnitOfWork,
meeting_id: MeetingId,
) -> list[Segment]:
return list(await repo.segments.get_by_meeting(meeting_id))
async def _complete_without_summary(
repo: UnitOfWork,
meeting: Meeting,
meeting_id: str,
) -> None:
logger.info(
"Post-processing: no segments, completing without summary",
meeting_id=meeting_id,
)
_complete_meeting(meeting, meeting_id)
await repo.meetings.update(meeting)
await repo.commit()
async def _save_summary_and_complete(context: _SummaryCompletionContext) -> Summary:
saved_summary = await context.repo.summaries.save(context.summary)
_complete_meeting(context.meeting, context.meeting_id)
await context.repo.meetings.update(context.meeting)
await context.repo.commit()
logger.info(
"Post-processing complete",
meeting_id=context.meeting_id,
summary_db_id=saved_summary.db_id,
segment_count=len(context.segments),
)
return saved_summary
def _complete_meeting(meeting: Meeting, meeting_id: str) -> None:
"""Transition meeting to COMPLETED state with logging."""
previous_state = meeting.state
meeting.complete()
log_state_transition("meeting", meeting_id, previous_state, meeting.state)
async def _trigger_summary_webhook(
webhook_service: WebhookService | None,
meeting: Meeting,
summary: Summary,
) -> None:
"""Trigger summary.generated webhook (fire-and-forget)."""
if webhook_service is None:
return
try:
meeting.summary = summary
await webhook_service.trigger_summary_generated(meeting)
except Exception:
logger.exception("Failed to trigger summary.generated webhook")
async def start_post_processing(
host: ServicerHost,
meeting_id: str,
) -> asyncio.Task[None] | None:
"""Spawn background task for post-meeting processing.
Starts auto-summarization and meeting completion as a fire-and-forget task.
Returns the task handle for testing/monitoring, or None if summarization
is not configured.
Args:
host: The servicer host.
meeting_id: The meeting ID to process.
Returns:
The spawned asyncio Task, or None if summarization service unavailable.
"""
service = host.summarization_service
if service is None:
logger.debug(
"Post-processing: summarization not configured, skipping",
meeting_id=meeting_id,
)
return None
# Capture narrowed type for closure
summarization_service: SummarizationService = service
async def _run_with_error_handling() -> None:
"""Wrapper to catch and log any errors."""
try:
await _generate_summary_and_complete(host, meeting_id, summarization_service)
except Exception:
logger.exception(
"Post-processing failed",
meeting_id=meeting_id,
)
task = asyncio.create_task(_run_with_error_handling())
logger.info("Post-processing task started", meeting_id=meeting_id)
return task

View File

@@ -3,6 +3,7 @@
from __future__ import annotations
from collections.abc import Mapping, Sequence
from dataclasses import dataclass
from typing import TYPE_CHECKING, cast
from uuid import UUID
@@ -10,13 +11,14 @@ from noteflow.config.constants import DEFAULT_MEETING_TITLE
from noteflow.domain.entities import Meeting
from noteflow.domain.entities.meeting import MeetingCreateParams
from noteflow.domain.identity import OperationContext
from noteflow.domain.value_objects import MeetingState
from noteflow.domain.value_objects import MeetingId, MeetingState
from noteflow.infrastructure.logging import get_logger
from ...proto import noteflow_pb2
from ..converters import meeting_to_proto, parse_meeting_id_or_abort
from ..errors import ENTITY_MEETING, abort_not_found
from ..protocols import MeetingRepositoryProvider
from ._post_processing import start_post_processing
from ._project_scope import (
parse_project_id_or_abort,
parse_project_ids_or_abort,
@@ -39,10 +41,75 @@ if TYPE_CHECKING:
from noteflow.infrastructure.audio.writer import MeetingAudioWriter
from .._types import GrpcContext
from ..protocols import ServicerHost
logger = get_logger(__name__)
@dataclass(slots=True)
class _StopMeetingContext:
host: MeetingMixin
repo: MeetingRepositoryProvider
meeting: Meeting
meeting_id: str
context: GrpcContext
async def _load_meeting_for_stop(
repo: MeetingRepositoryProvider,
meeting_id: MeetingId,
meeting_id_str: str,
context: GrpcContext,
) -> Meeting:
meeting = await repo.meetings.get(meeting_id)
if meeting is None:
logger.warning("StopMeeting: meeting not found", meeting_id=meeting_id_str)
await abort_not_found(context, ENTITY_MEETING, meeting_id_str)
raise AssertionError("unreachable")
return meeting
async def _stop_meeting_and_persist(context: _StopMeetingContext) -> Meeting:
terminal_states = (
MeetingState.STOPPED,
MeetingState.STOPPING,
MeetingState.COMPLETED,
MeetingState.ERROR,
)
if context.meeting.state in terminal_states:
logger.debug(
"StopMeeting: already terminal",
meeting_id=context.meeting_id,
state=context.meeting.state.value,
)
return context.meeting
previous_state = context.meeting.state.value
await transition_to_stopped(
context.meeting,
context.meeting_id,
previous_state,
context.context,
)
await context.repo.meetings.update(context.meeting)
if context.repo.supports_diarization_jobs:
await context.repo.diarization_jobs.clear_streaming_turns(context.meeting_id)
await context.repo.commit()
logger.info(
"Meeting stopped",
meeting_id=context.meeting_id,
from_state=previous_state,
to_state=context.meeting.state.value,
)
await fire_stop_webhooks(context.host.webhook_service, context.meeting)
host_cast = cast("ServicerHost", context.host)
await start_post_processing(host_cast, context.meeting_id)
return context.meeting
class MeetingMixin:
"""Mixin providing meeting CRUD functionality.
@@ -113,33 +180,15 @@ class MeetingMixin:
parsed_meeting_id = await parse_meeting_id_or_abort(meeting_id, context)
async with cast(MeetingRepositoryProvider, self.create_repository_provider()) as repo:
meeting = await repo.meetings.get(parsed_meeting_id)
if meeting is None:
logger.warning("StopMeeting: meeting not found", meeting_id=meeting_id)
await abort_not_found(context, ENTITY_MEETING, meeting_id)
raise # Unreachable but helps type checker
# Idempotency: return success if already stopped/stopping/completed
terminal_states = (
MeetingState.STOPPED,
MeetingState.STOPPING,
MeetingState.COMPLETED,
MeetingState.ERROR,
meeting = await _load_meeting_for_stop(repo, parsed_meeting_id, meeting_id, context)
stop_context = _StopMeetingContext(
host=self,
repo=repo,
meeting=meeting,
meeting_id=meeting_id,
context=context,
)
if meeting.state in terminal_states:
logger.debug("StopMeeting: already terminal", meeting_id=meeting_id, state=meeting.state.value)
return meeting_to_proto(meeting)
previous_state = meeting.state.value
await transition_to_stopped(meeting, meeting_id, previous_state, context)
await repo.meetings.update(meeting)
if repo.supports_diarization_jobs:
await repo.diarization_jobs.clear_streaming_turns(meeting_id)
await repo.commit()
logger.info("Meeting stopped", meeting_id=meeting_id, from_state=previous_state, to_state=meeting.state.value)
await fire_stop_webhooks(self.webhook_service, meeting)
meeting = await _stop_meeting_and_persist(stop_context)
return meeting_to_proto(meeting)
async def ListMeetings(

View File

@@ -30,7 +30,7 @@ def _should_emit_partial(
True if a partial should be emitted.
"""
# Check if enough time has passed since last partial
if now - state.last_partial_time < host.PARTIAL_CADENCE_SECONDS:
if now - state.last_partial_time < host.streaming_config.partial_cadence_seconds:
return False
# Check if we have enough audio
@@ -38,7 +38,10 @@ def _should_emit_partial(
return False
# Check minimum audio duration before extracting
return state.partial_buffer.duration_seconds >= host.MIN_PARTIAL_AUDIO_SECONDS
return (
state.partial_buffer.duration_seconds
>= host.streaming_config.min_partial_audio_seconds
)
async def _transcribe_partial_audio(

View File

@@ -0,0 +1,144 @@
"""Streaming configuration mixin for gRPC service.
Provides runtime tuning for partial cadence and segmentation thresholds.
"""
from __future__ import annotations
from dataclasses import replace
from typing import TYPE_CHECKING
from noteflow.application.services.streaming_config_persistence import (
STREAMING_CONFIG_KEYS,
STREAMING_CONFIG_RANGES,
StreamingConfig,
build_streaming_config_preference,
)
from noteflow.config.constants.domain import SETTING_STREAMING_CONFIG
from noteflow.infrastructure.asr import Segmenter
from noteflow.infrastructure.logging import get_logger
from ..proto import noteflow_pb2
from ._types import GrpcContext
from .errors import abort_invalid_argument
if TYPE_CHECKING:
from collections.abc import Callable
from noteflow.infrastructure.persistence.unit_of_work import SqlAlchemyUnitOfWork
logger = get_logger(__name__)
def _build_configuration_proto(config: StreamingConfig) -> noteflow_pb2.StreamingConfiguration:
return noteflow_pb2.StreamingConfiguration(
partial_cadence_seconds=config.partial_cadence_seconds,
min_partial_audio_seconds=config.min_partial_audio_seconds,
max_segment_duration_seconds=config.max_segment_duration_seconds,
min_speech_duration_seconds=config.min_speech_duration_seconds,
trailing_silence_seconds=config.trailing_silence_seconds,
leading_buffer_seconds=config.leading_buffer_seconds,
)
def _parse_update_request(
request: noteflow_pb2.UpdateStreamingConfigurationRequest,
) -> dict[str, float]:
updates: dict[str, float] = {}
for field in STREAMING_CONFIG_KEYS:
if request.HasField(field):
updates[field] = getattr(request, field)
return updates
def _validate_update(update: dict[str, float]) -> str | None:
for key, value in update.items():
min_val, max_val = STREAMING_CONFIG_RANGES.get(key, (None, None))
if min_val is None or max_val is None:
continue
if not (min_val <= value <= max_val):
return f"{key} must be between {min_val} and {max_val} seconds"
return None
class StreamingConfigMixin:
"""Mixin providing streaming configuration management functionality."""
streaming_config: StreamingConfig
segmenters: dict[str, Segmenter]
create_uow: Callable[..., SqlAlchemyUnitOfWork]
async def GetStreamingConfiguration(
self,
request: noteflow_pb2.GetStreamingConfigurationRequest,
context: GrpcContext,
) -> noteflow_pb2.GetStreamingConfigurationResponse:
config_proto = _build_configuration_proto(self.streaming_config)
return noteflow_pb2.GetStreamingConfigurationResponse(configuration=config_proto)
async def UpdateStreamingConfiguration(
self,
request: noteflow_pb2.UpdateStreamingConfigurationRequest,
context: GrpcContext,
) -> noteflow_pb2.UpdateStreamingConfigurationResponse:
updates = _parse_update_request(request)
if not updates:
config_proto = _build_configuration_proto(self.streaming_config)
return noteflow_pb2.UpdateStreamingConfigurationResponse(configuration=config_proto)
if error := _validate_update(updates):
await abort_invalid_argument(context, error)
raise AssertionError("unreachable") # abort is NoReturn
current = self.streaming_config
updated = StreamingConfig(
partial_cadence_seconds=updates.get("partial_cadence_seconds", current.partial_cadence_seconds),
min_partial_audio_seconds=updates.get("min_partial_audio_seconds", current.min_partial_audio_seconds),
max_segment_duration_seconds=updates.get(
"max_segment_duration_seconds", current.max_segment_duration_seconds
),
min_speech_duration_seconds=updates.get(
"min_speech_duration_seconds", current.min_speech_duration_seconds
),
trailing_silence_seconds=updates.get(
"trailing_silence_seconds", current.trailing_silence_seconds
),
leading_buffer_seconds=updates.get(
"leading_buffer_seconds", current.leading_buffer_seconds
),
)
self.apply_streaming_config(updated)
await self._persist_streaming_config(updated)
config_proto = _build_configuration_proto(updated)
return noteflow_pb2.UpdateStreamingConfigurationResponse(configuration=config_proto)
def apply_streaming_config(self, config: StreamingConfig) -> None:
self.streaming_config = config
for segmenter in self.segmenters.values():
current_config = segmenter.config
segmenter.config = replace(
current_config,
min_speech_duration=config.min_speech_duration_seconds,
max_segment_duration=config.max_segment_duration_seconds,
trailing_silence=config.trailing_silence_seconds,
leading_buffer=config.leading_buffer_seconds,
)
async def _persist_streaming_config(self, config: StreamingConfig) -> None:
try:
uow = self.create_uow()
except RuntimeError:
logger.debug("streaming_config_persist_skipped_no_db")
return
async with uow:
if not uow.supports_preferences:
logger.debug("streaming_config_persist_skipped_no_preferences")
return
preference = build_streaming_config_preference(config)
await uow.preferences.set(SETTING_STREAMING_CONFIG, preference)
await uow.commit()

View File

@@ -1,6 +1,4 @@
"""Mixin classes for NoteFlowServicer."""
from __future__ import annotations
import time
@@ -15,6 +13,7 @@ from noteflow.domain.identity.context import OperationContext, UserContext, Work
from noteflow.domain.identity.roles import WorkspaceRole
from noteflow.domain.value_objects import MeetingState
from noteflow.grpc.meeting_store import MeetingStore
from noteflow.application.services.streaming_config_persistence import StreamingConfig
from noteflow.infrastructure.asr import Segmenter, SegmenterConfig, StreamingVad
from noteflow.infrastructure.audio.partial_buffer import PartialAudioBuffer
from noteflow.infrastructure.audio.writer import MeetingAudioWriter
@@ -51,15 +50,12 @@ if TYPE_CHECKING:
from ._mixins._types import GrpcContext
logger = get_logger(__name__)
class ServicerUowMixin:
"""Mixin for Unit of Work operations."""
session_factory: async_sessionmaker[AsyncSession] | None
meetings_dir: Path
memory_store: MeetingStore | None
def use_database(self) -> bool:
"""Check if database persistence is configured."""
return self.session_factory is not None
@@ -95,11 +91,7 @@ class ServicerContextMixin:
"""Mixin for operation context handling."""
def get_operation_context(self, context: GrpcContext) -> OperationContext:
"""Get operation context from gRPC context variables.
Extract user and workspace IDs from context variables set by the
identity interceptor. Falls back to domain defaults for local-first mode.
"""
"""Get operation context from gRPC context variables."""
request_id = request_id_var.get()
user_id_str = user_id_var.get()
workspace_id_str = workspace_id_var.get()
@@ -125,17 +117,25 @@ class ServicerStreamingStateMixin:
vad_instances: dict[str, StreamingVad]
segmenters: dict[str, Segmenter]
segment_counters: dict[str, int]
streaming_config: StreamingConfig
stream_formats: dict[str, tuple[int, int]]
chunk_sequences: dict[str, int]
chunk_counts: dict[str, int]
chunk_receipt_times: dict[str, deque[float]]
pending_chunks: dict[str, int]
DEFAULT_SAMPLE_RATE: int
def init_streaming_state(self, meeting_id: str, next_segment_id: int) -> None:
"""Initialize VAD, Segmenter, speaking state, and partial buffers for a meeting."""
vad = StreamingVad()
segmenter = Segmenter(config=SegmenterConfig(sample_rate=self.DEFAULT_SAMPLE_RATE))
segmenter = Segmenter(
config=SegmenterConfig(
min_speech_duration=self.streaming_config.min_speech_duration_seconds,
max_segment_duration=self.streaming_config.max_segment_duration_seconds,
trailing_silence=self.streaming_config.trailing_silence_seconds,
leading_buffer=self.streaming_config.leading_buffer_seconds,
sample_rate=self.DEFAULT_SAMPLE_RATE,
)
)
partial_buffer = PartialAudioBuffer(sample_rate=self.DEFAULT_SAMPLE_RATE)
current_time = time.time()

View File

@@ -57,6 +57,10 @@ service NoteFlowService {
rpc UpdateAsrConfiguration(UpdateAsrConfigurationRequest) returns (UpdateAsrConfigurationResponse);
rpc GetAsrConfigurationJobStatus(GetAsrConfigurationJobStatusRequest) returns (AsrConfigurationJobStatus);
// Streaming configuration (Sprint 20)
rpc GetStreamingConfiguration(GetStreamingConfigurationRequest) returns (GetStreamingConfigurationResponse);
rpc UpdateStreamingConfiguration(UpdateStreamingConfigurationRequest) returns (UpdateStreamingConfigurationResponse);
// Named entity extraction (Sprint 4) + mutations (Sprint 8)
rpc ExtractEntities(ExtractEntitiesRequest) returns (ExtractEntitiesResponse);
rpc UpdateEntity(UpdateEntityRequest) returns (UpdateEntityResponse);
@@ -733,6 +737,61 @@ message AsrConfigurationJobStatus {
optional AsrConfiguration new_configuration = 6;
}
// =============================================================================
// Streaming Configuration Messages (Sprint 20)
// =============================================================================
// Streaming configuration for partials and segmentation
message StreamingConfiguration {
// Interval for emitting partial transcripts (seconds)
float partial_cadence_seconds = 1;
// Minimum audio duration required to emit a partial (seconds)
float min_partial_audio_seconds = 2;
// Maximum duration before forcing a segment split (seconds)
float max_segment_duration_seconds = 3;
// Minimum speech duration to keep a segment (seconds)
float min_speech_duration_seconds = 4;
// Trailing silence to include after speech ends (seconds)
float trailing_silence_seconds = 5;
// Leading buffer to include before speech starts (seconds)
float leading_buffer_seconds = 6;
}
message GetStreamingConfigurationRequest {}
message GetStreamingConfigurationResponse {
StreamingConfiguration configuration = 1;
}
message UpdateStreamingConfigurationRequest {
// Interval for emitting partial transcripts (seconds)
optional float partial_cadence_seconds = 1;
// Minimum audio duration required to emit a partial (seconds)
optional float min_partial_audio_seconds = 2;
// Maximum duration before forcing a segment split (seconds)
optional float max_segment_duration_seconds = 3;
// Minimum speech duration to keep a segment (seconds)
optional float min_speech_duration_seconds = 4;
// Trailing silence to include after speech ends (seconds)
optional float trailing_silence_seconds = 5;
// Leading buffer to include before speech starts (seconds)
optional float leading_buffer_seconds = 6;
}
message UpdateStreamingConfigurationResponse {
StreamingConfiguration configuration = 1;
}
// =============================================================================
// Annotation Messages
// =============================================================================

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load Diff

View File

@@ -3,7 +3,7 @@
import grpc
import warnings
import noteflow_pb2 as noteflow__pb2
from . import noteflow_pb2 as noteflow__pb2
GRPC_GENERATED_VERSION = '1.76.0'
GRPC_VERSION = grpc.__version__
@@ -183,6 +183,16 @@ class NoteFlowServiceStub(object):
request_serializer=noteflow__pb2.GetAsrConfigurationJobStatusRequest.SerializeToString,
response_deserializer=noteflow__pb2.AsrConfigurationJobStatus.FromString,
_registered_method=True)
self.GetStreamingConfiguration = channel.unary_unary(
'/noteflow.NoteFlowService/GetStreamingConfiguration',
request_serializer=noteflow__pb2.GetStreamingConfigurationRequest.SerializeToString,
response_deserializer=noteflow__pb2.GetStreamingConfigurationResponse.FromString,
_registered_method=True)
self.UpdateStreamingConfiguration = channel.unary_unary(
'/noteflow.NoteFlowService/UpdateStreamingConfiguration',
request_serializer=noteflow__pb2.UpdateStreamingConfigurationRequest.SerializeToString,
response_deserializer=noteflow__pb2.UpdateStreamingConfigurationResponse.FromString,
_registered_method=True)
self.ExtractEntities = channel.unary_unary(
'/noteflow.NoteFlowService/ExtractEntities',
request_serializer=noteflow__pb2.ExtractEntitiesRequest.SerializeToString,
@@ -650,6 +660,19 @@ class NoteFlowServiceServicer(object):
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def GetStreamingConfiguration(self, request, context):
"""Streaming configuration (Sprint 20)
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def UpdateStreamingConfiguration(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def ExtractEntities(self, request, context):
"""Named entity extraction (Sprint 4) + mutations (Sprint 8)
"""
@@ -1143,6 +1166,16 @@ def add_NoteFlowServiceServicer_to_server(servicer, server):
request_deserializer=noteflow__pb2.GetAsrConfigurationJobStatusRequest.FromString,
response_serializer=noteflow__pb2.AsrConfigurationJobStatus.SerializeToString,
),
'GetStreamingConfiguration': grpc.unary_unary_rpc_method_handler(
servicer.GetStreamingConfiguration,
request_deserializer=noteflow__pb2.GetStreamingConfigurationRequest.FromString,
response_serializer=noteflow__pb2.GetStreamingConfigurationResponse.SerializeToString,
),
'UpdateStreamingConfiguration': grpc.unary_unary_rpc_method_handler(
servicer.UpdateStreamingConfiguration,
request_deserializer=noteflow__pb2.UpdateStreamingConfigurationRequest.FromString,
response_serializer=noteflow__pb2.UpdateStreamingConfigurationResponse.SerializeToString,
),
'ExtractEntities': grpc.unary_unary_rpc_method_handler(
servicer.ExtractEntities,
request_deserializer=noteflow__pb2.ExtractEntitiesRequest.FromString,
@@ -2216,6 +2249,60 @@ class NoteFlowService(object):
metadata,
_registered_method=True)
@staticmethod
def GetStreamingConfiguration(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(
request,
target,
'/noteflow.NoteFlowService/GetStreamingConfiguration',
noteflow__pb2.GetStreamingConfigurationRequest.SerializeToString,
noteflow__pb2.GetStreamingConfigurationResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
@staticmethod
def UpdateStreamingConfiguration(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(
request,
target,
'/noteflow.NoteFlowService/UpdateStreamingConfiguration',
noteflow__pb2.UpdateStreamingConfigurationRequest.SerializeToString,
noteflow__pb2.UpdateStreamingConfigurationResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
@staticmethod
def ExtractEntities(request,
target,

View File

@@ -13,7 +13,14 @@ from pydantic import ValidationError
from noteflow.application.services.asr_config_persistence import (
resolve_asr_config_preference,
)
from noteflow.config.constants import DEFAULT_GRPC_PORT, SETTING_ASR_CONFIG
from noteflow.application.services.streaming_config_persistence import (
StreamingConfig,
build_default_streaming_config,
)
from noteflow.config.constants import (
DEFAULT_GRPC_PORT,
SETTING_ASR_CONFIG,
)
from noteflow.config.constants.core import MAIN_MODULE_NAME
from noteflow.config.settings import get_settings
from noteflow.infrastructure.logging import LoggingConfig, configure_logging, get_logger
@@ -33,6 +40,7 @@ from ._bootstrap import (
from ._lifecycle import load_asr_engine, stop_server
from ._services import build_servicer, ensure_services
from ._setup import bind_server, create_server
from ._streaming_config import load_streaming_config_from_preferences
from ._types import ServerInitKwargs
if TYPE_CHECKING:
@@ -75,8 +83,6 @@ async def _read_asr_config_preference(
return await uow.preferences.get(SETTING_ASR_CONFIG)
class NoteFlowServer:
"""Async gRPC server for NoteFlow."""
@@ -93,6 +99,7 @@ class NoteFlowServer:
bind_address = kwargs.get("bind_address", DEFAULT_BIND_ADDRESS)
asr = kwargs.get("asr")
asr_config = asr if isinstance(asr, AsrConfig) else AsrConfig()
streaming_config = kwargs.get("streaming_config")
session_factory = kwargs.get("session_factory")
db_engine = kwargs.get("db_engine")
services = kwargs.get("services") or ServicesConfig()
@@ -102,6 +109,9 @@ class NoteFlowServer:
self._asr_model: str = asr_config.model
self._asr_device: str = asr_config.device
self._asr_compute_type: str = asr_config.compute_type
self._streaming_config: StreamingConfig = (
streaming_config or build_default_streaming_config(get_settings())
)
self._session_factory = session_factory
self._db_engine = db_engine
self._summarization_service = services.summarization_service
@@ -195,10 +205,15 @@ class NoteFlowServer:
logger.info("Starting NoteFlow gRPC server (async)...")
await self._apply_persisted_asr_config()
asr_engine = load_asr_engine(self._asr_model, self._asr_device, self._asr_compute_type)
settings = get_settings()
self._streaming_config = await load_streaming_config_from_preferences(
self._session_factory,
settings.meetings_dir,
self._streaming_config,
)
asr_engine = load_asr_engine(self._asr_model, self._asr_device, self._asr_compute_type)
await ensure_services(self, self._session_factory, settings)
self._servicer = build_servicer(self, asr_engine)
self._servicer = build_servicer(self, asr_engine, self._streaming_config)
await recover_orphaned_jobs(self._session_factory)
self._server = create_server()

View File

@@ -5,6 +5,7 @@ from __future__ import annotations
from typing import TYPE_CHECKING, Protocol
from noteflow.infrastructure.logging import get_logger
from noteflow.application.services.streaming_config_persistence import StreamingConfig
from noteflow.infrastructure.summarization import create_summarization_service
from .._config import ServicesConfig
@@ -88,6 +89,7 @@ async def wire_consent_persistence(
def build_servicer(
state: _ServerState,
asr_engine: object,
streaming_config: StreamingConfig,
) -> NoteFlowServicer:
"""Construct the gRPC servicer instance."""
from noteflow.infrastructure.asr import FasterWhisperEngine
@@ -96,7 +98,7 @@ def build_servicer(
if asr_typed is None:
raise TypeError("Invalid ASR engine")
return NoteFlowServicer(
servicer = NoteFlowServicer(
asr_engine=asr_typed,
session_factory=state.session_factory,
services=ServicesConfig(
@@ -109,3 +111,5 @@ def build_servicer(
project_service=state.project_service,
),
)
servicer.apply_streaming_config(streaming_config)
return servicer

View File

@@ -0,0 +1,65 @@
"""Streaming configuration preference loading for gRPC server."""
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING
from noteflow.application.services.streaming_config_persistence import (
StreamingConfig,
resolve_streaming_config_preference,
)
from noteflow.config.constants import SETTING_STREAMING_CONFIG
from noteflow.infrastructure.logging import get_logger
if TYPE_CHECKING:
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
logger = get_logger(__name__)
async def load_streaming_config_from_preferences(
session_factory: async_sessionmaker[AsyncSession] | None,
meetings_dir: Path,
fallback: StreamingConfig,
) -> StreamingConfig:
stored = await _load_streaming_config_preference(session_factory, meetings_dir)
if stored is None:
return fallback
resolution = resolve_streaming_config_preference(stored, fallback=fallback)
if resolution is None:
return fallback
if resolution.had_fallback:
logger.warning("streaming_config_preferences_fallback")
else:
logger.info("streaming_config_loaded_from_preferences")
return resolution.config
async def _load_streaming_config_preference(
session_factory: async_sessionmaker[AsyncSession] | None,
meetings_dir: Path,
) -> object | None:
if session_factory is None:
return None
try:
return await _read_streaming_config_preference(session_factory, meetings_dir)
except Exception as exc: # pragma: no cover - defensive logging
logger.warning("streaming_config_preferences_load_failed", error=str(exc))
return None
async def _read_streaming_config_preference(
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
) -> object | None:
from noteflow.infrastructure.persistence.unit_of_work import SqlAlchemyUnitOfWork
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
if not uow.supports_preferences:
logger.debug("streaming_config_preferences_unavailable")
return None
return await uow.preferences.get(SETTING_STREAMING_CONFIG)

View File

@@ -8,6 +8,7 @@ if TYPE_CHECKING:
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker
from .._config import AsrConfig, ServicesConfig
from noteflow.application.services.streaming_config_persistence import StreamingConfig
class ServerInitKwargs(TypedDict, total=False):
@@ -16,6 +17,7 @@ class ServerInitKwargs(TypedDict, total=False):
port: int
bind_address: str
asr: AsrConfig | None
streaming_config: StreamingConfig | None
session_factory: async_sessionmaker[AsyncSession] | None
db_engine: AsyncEngine | None
services: ServicesConfig | None

View File

@@ -10,9 +10,13 @@ from typing import TYPE_CHECKING, ClassVar, Final
from noteflow import __version__
from noteflow.config.constants import APP_DIR_NAME, SETTING_ASR_CONFIG
from noteflow.config.settings import get_settings
from noteflow.application.services.asr_config_persistence import (
build_asr_config_preference,
)
from noteflow.application.services.streaming_config_persistence import (
build_default_streaming_config,
)
from noteflow.config.constants import DEFAULT_SAMPLE_RATE as _DEFAULT_SAMPLE_RATE
from noteflow.grpc.meeting_store import MeetingStore
from noteflow.infrastructure.asr import Segmenter, StreamingVad
@@ -40,6 +44,7 @@ from ._mixins import (
PreferencesMixin,
ProjectMembershipMixin,
ProjectMixin,
StreamingConfigMixin,
StreamingMixin,
SummarizationConsentMixin,
SummarizationGenerationMixin,
@@ -97,6 +102,7 @@ class NoteFlowServicer(
ProjectMixin,
ProjectMembershipMixin,
AsrConfigMixin,
StreamingConfigMixin,
HfTokenMixin,
NoteFlowServicerStubs,
GrpcBaseServicer,
@@ -139,6 +145,7 @@ class NoteFlowServicer(
"""
self._init_injected_services(asr_engine, session_factory, services)
self._init_audio_infrastructure(meetings_dir, session_factory)
self._init_streaming_config()
self._init_streaming_state()
self._init_diarization_state()
@@ -201,6 +208,10 @@ class NoteFlowServicer(
crypto=self.crypto,
)
def _init_streaming_config(self) -> None:
settings = get_settings()
self.streaming_config = build_default_streaming_config(settings)
def _set_asr_engine(self, engine: FasterWhisperEngine) -> None:
"""Update the active ASR engine reference for streaming."""
self.asr_engine = engine

View File

@@ -124,6 +124,7 @@ class MockMeetingMixinServicerHost(MeetingMixin):
# Webhook service (optional)
self.webhook_service = webhook_service
self.project_service = None
self.summarization_service = None # Post-processing disabled in tests
def create_repository_provider(self) -> MockMeetingRepositoryProvider:
"""Create mock repository provider context manager."""

View File

@@ -12,7 +12,6 @@ in Phase 4.5 Sprint 15.1.
from __future__ import annotations
import asyncio
import re
from datetime import timedelta
from pathlib import Path
from uuid import uuid4
@@ -113,7 +112,7 @@ class TestStopMeetingIdempotency:
def test_idempotency_guard_in_code(self) -> None:
"""Verify the idempotency guard code exists in StopMeeting."""
meeting_path = Path("src/noteflow/grpc/_mixins/meeting.py")
meeting_path = Path("src/noteflow/grpc/_mixins/meeting/meeting_mixin.py")
content = meeting_path.read_text()
# Verify the idempotency guard pattern exists
@@ -132,16 +131,24 @@ class TestStopMeetingIdempotency:
)
def test_idempotency_guard_returns_early(self) -> None:
"""Verify idempotency guard returns meeting proto without state transition."""
meeting_path = Path("src/noteflow/grpc/_mixins/meeting.py")
"""Verify idempotency guard returns early without state transition.
After refactoring, the helper function returns the domain entity directly,
and the caller (StopMeeting) converts to proto.
"""
meeting_path = Path("src/noteflow/grpc/_mixins/meeting/meeting_mixin.py")
content = meeting_path.read_text()
# Find the idempotency guard pattern and verify it returns
# Pattern: if meeting.state in terminal_states: ... return meeting_to_proto(meeting)
pattern = r"if meeting\.state in terminal_states:\s+.*?return meeting_to_proto\(meeting\)"
match = re.search(pattern, content, re.DOTALL)
assert match is not None, (
"Idempotency guard should return meeting_to_proto(meeting) for terminal states"
# Verify the guard pattern exists: checks terminal_states and returns context.meeting
# The pattern checks state membership then returns without transition
guard_check = "context.meeting.state in terminal_states"
early_return = "return context.meeting"
assert guard_check in content, (
"Idempotency guard check for terminal_states not found"
)
assert early_return in content, (
"Idempotency guard early return not found"
)
@pytest.mark.parametrize(