Add pytest-benchmark for performance testing and optimize audio processing

- Introduced `pytest-benchmark` dependency for performance testing of critical code paths.
- Added a new `PartialAudioBuffer` class to optimize audio chunk handling, reducing memory allocations and improving efficiency.
- Implemented benchmark tests for audio processing, validating performance improvements and establishing baselines.
- Enhanced the `Segmenter` class to utilize cached sample counts for faster duration calculations.
- Updated gRPC service to leverage consolidated streaming state, reducing multiple dictionary lookups to a single access.

All quality checks pass.
This commit is contained in:
2025-12-30 00:26:41 +00:00
parent 7292f0fc29
commit df7c06198f
23 changed files with 2478 additions and 238 deletions

View File

@@ -1,196 +1,199 @@
# Phase 4 Productization - Closeout
# Phase 4 Productization Closeout - Test Quality Review
## Resilience Testing Analysis
Date: 2025-12-29
### Overview
## Overview
This closeout documents the quality and rigor assessment of the current test suite and highlights remaining gaps with actionable solutions. The suite is broad (domain/application/infrastructure/integration/stress) and includes test-quality gates, but several high-impact integration and runtime verification gaps remain.
Comprehensive analysis of gRPC servicer teardown, resource management, and crash resilience patterns.
## Strengths (what is working well)
- Multi-layer coverage (domain, application, infrastructure, integration, stress).
- Internal quality gates for test smells and anti-patterns.
---
Evidence (quality gate example):
## 1. gRPC Context Cancellation Patterns
### Finding: Implicit Finally-Based Cleanup
The servicer does **NOT** use explicit `context.cancelled()` or `context.add_callback()`. Instead:
1. **Implicit Cancellation via AsyncIterator**
- `StreamTranscription()` uses `async for chunk in request_iterator`
- gRPC automatically raises `asyncio.CancelledError` or breaks iterator on client disconnect
- No explicit cancellation checking needed
2. **Graceful Stop via `_stop_requested` Set**
- `StopMeeting()` RPC sets `_stop_requested[meeting_id]`
- Stream checks this flag each chunk and breaks gracefully
- 2-second grace period for stream to detect and exit
3. **Error Handling via `context.abort()`**
- Validation failures sent to client via `await context.abort(status_code, message)`
- Not used for cancellation recovery
### Cleanup Path (streaming.py:105-115)
```python
finally:
if current_meeting_id:
# 1. Flush audio buffer (minimize data loss)
if current_meeting_id in self._audio_writers:
self._audio_writers[current_meeting_id].flush()
# 2. Clean streaming state (VAD, segmenter, buffers, turns)
self._cleanup_streaming_state(current_meeting_id)
# 3. Close audio writer (file handle)
self._close_audio_writer(current_meeting_id)
# 4. Remove from active streams
self._active_streams.discard(current_meeting_id)
```py
# tests/quality/test_test_smells.py
# ...
assert len(smells) <= 50, (
f"Found {len(smells)} pytest.raises without match (max 50 allowed):\n"
+ "\n".join(
f" {s.file_path}:{s.line_number}: {s.test_name}" for s in smells[:15]
)
)
```
**Key Properties:**
- `finally` block always executes (disconnect, cancellation, error, normal completion)
- Idempotent cleanup via `.pop(key, None)` and `.discard()`
- Diarization session properly closed if exists
## Findings and Targeted Fixes
---
### 1) Missing full-stack E2E coverage (UI -> Tauri -> gRPC -> DB)
**Impact:** Highest risk of production regressions. Current tests do not validate the whole chain under real runtime conditions.
## 2. Servicer Shutdown Sequence
**Evidence:** E2E scaffolding now exists but only provides a smoke test and requires an explicit env flag.
### Implementation (service.py:379-433)
Five sequential cleanup phases:
| Phase | Resource | Method | Order |
|-------|----------|--------|-------|
| 1 | Diarization Tasks | `task.cancel()` + await | 1st |
| 2 | Diarization Sessions | `session.close()` | 2nd |
| 3 | Audio Writers | `_close_audio_writer()` | 3rd |
| 4 | Database Jobs | `mark_running_as_failed()` | 4th |
| 5 | Webhook Service | `close()` | 5th |
### Server Lifecycle Integration (server.py:168-187)
```
1. servicer.shutdown() → cancels tasks, closes streams, marks jobs failed
2. server.stop(grace) → allows in-flight RPCs, then closes connections
3. db_engine.dispose() → releases connection pool
```json
// client/package.json
"scripts": {
"test": "vitest run",
"test:e2e": "playwright test"
}
```
---
**Targeted fix:** Add a minimal E2E smoke test that launches the Tauri app and validates the shell loads. Expand to full UI -> Tauri -> gRPC -> DB flow once a test harness is standardized.
## 3. Identified Race Conditions
Implementation (current smoke test):
### RC-1: New Tasks During Shutdown
**Vulnerability:** `RefineSpeakerDiarization` RPC called during shutdown creates uncancelled task.
**Mitigation:** gRPC server's grace period should reject new RPCs before `servicer.shutdown()`.
```ts
// client/e2e/recording-smoke.spec.ts
import { test, expect } from '@playwright/test';
### RC-2: Stream vs Shutdown Ordering
**Vulnerability:** Active streams not signaled to stop; end when gRPC closes connections.
**Mitigation:** `grace_period` in `server.stop()` allows natural completion.
const shouldRun = process.env.NOTEFLOW_E2E === '1';
### RC-3: Concurrent Database Updates
**Vulnerability:** Task completion vs `mark_running_as_failed()` could cause status overwrite.
**Mitigation:** `mark_running_as_failed()` only marks QUEUED/RUNNING jobs (unlikely race).
test.describe('recording smoke', () => {
test.skip(!shouldRun, 'Set NOTEFLOW_E2E=1 to enable end-to-end tests.');
### RC-4: Audio Writer Double-Close
**Safe:** Both shutdown and stream cleanup use `.pop(meeting_id, None)`.
### RC-5: Concurrent Shutdown Calls
**Safe:** All cleanup operations are idempotent.
---
## 4. Per-Meeting State Architecture
### Dictionaries
```
_vad_instances[meeting_id] → StreamingVad
_segmenters[meeting_id] → Segmenter
_audio_writers[meeting_id] → MeetingAudioWriter
_partial_buffers[meeting_id] → list[NDArray]
_diarization_turns[meeting_id] → list[SpeakerTurn]
_diarization_sessions[meeting_id] → DiarizationSession
_was_speaking[meeting_id] → bool
_segment_counters[meeting_id] → int
_stream_formats[meeting_id] → tuple[int, int]
_last_partial_time[meeting_id] → float
_last_partial_text[meeting_id] → str
_diarization_stream_time[meeting_id] → float
test('app launches and renders the shell', async ({ page }) => {
await page.goto('/');
await expect(page).toHaveTitle(/NoteFlow/i);
await expect(page.locator('#root')).toBeVisible();
});
});
```
### Sets
```
_active_streams → Currently streaming meeting IDs
_stop_requested → Requested to stop
_audio_write_failed → Failed to write audio (prevents log spam)
_diarization_streaming_failed → Failed streaming diarization
Run:
```bash
NOTEFLOW_E2E=1 npm run test:e2e
```
All cleaned in `_cleanup_streaming_state()` for single point of cleanup.
### 2) Rust/Tauri tests not wired into the default test workflow
**Impact:** The Tauri backend can regress without detection if Rust tests are not run in CI or local standard flows.
---
**Evidence:** Tests exist but no default script runs them.
## 5. Test Coverage Status
```json
// client/package.json
"scripts": {
"test": "vitest run",
"quality:rs": "./src-tauri/scripts/code_quality.sh"
}
```
### Current Tests (74 passing)
```rs
// client/src-tauri/tests/robustness.rs
#[test]
fn trigger_state_dismissed_triggers_bounded() {
// ...
}
```
| File | Tests | Coverage |
|------|-------|----------|
| `test_stream_lifecycle.py` | 32 | gRPC streaming cleanup, cancellation, races |
| `test_resource_leaks.py` | 14 | FD, memory, thread leaks |
| `test_crash_scenarios.py` | 14 | Recovery from crashes |
| `test_database_resilience.py` | 14 | Connection pool, transactions |
**Targeted fix:** Add a Rust test script and include it in CI or local aggregate targets (implemented).
### Coverage Gaps - All Resolved ✅
Implementation:
| ID | Gap | Status | Test |
|----|-----|--------|------|
| GAP-001 | Real gRPC context cancellation | ✅ Resolved | `TestGrpcContextCancellationReal` |
| GAP-002 | Concurrent stream cleanup same meeting_id | ✅ Resolved | `test_concurrent_cleanup_same_meeting_safe` |
| GAP-003 | New RPC during shutdown race | ✅ Resolved | `TestShutdownRaceConditions` |
| GAP-004 | Stream vs task cleanup ordering | ✅ Resolved | `test_shutdown_order_tasks_before_sessions` |
| GAP-005 | Double-start same meeting_id | ✅ Resolved | `test_double_start_same_meeting_id_detected` |
| GAP-006 | Stop request before first audio chunk | ✅ Resolved | `test_stop_request_before_stream_active` |
```json
// client/package.json
"scripts": {
"test": "vitest run",
"test:rs": "cd src-tauri && cargo test",
"test:all": "npm run test && npm run test:rs",
"test:quality": "vitest run src/test/code-quality.test.ts"
}
```
### Anti-Patterns Fixed
### 3) Streaming pipeline integration only partially exercised
**Impact:** Real audio pipeline (chunk -> VAD -> segmenter -> ASR) can break undetected.
- ✅ Magic numbers → Named constants
- ✅ Assertion roulette → Messages added (50 threshold met)
- ✅ Duplicated capture_request → Fixture in conftest.py
- ✅ Sleepy tests → Path exclusions for stress tests
**Evidence:** Integration test patches VAD and segmenter.
---
```py
# tests/integration/test_e2e_streaming.py
with (
patch.object(servicer, "_vad_instances", {str(meeting.id): MagicMock(process_chunk=lambda x: True)}),
patch.object(servicer, "_segmenters") as mock_segmenters,
):
mock_segment = MagicMock()
mock_segment.audio = audio
# ...
```
## 6. Design Strengths
**Targeted fix:** Add at least one integration test that uses real VAD + `Segmenter` with deterministic audio (implemented).
1. **Finally-based cleanup** - No reliance on fragile `context.cancelled()` polling
2. **Idempotent operations** - Safe to call cleanup multiple times
3. **Single cleanup function** - `_cleanup_streaming_state()` centralizes all state removal
4. **Database fallback** - Jobs marked failed in both memory and DB for crash recovery
5. **Graceful degradation** - Audio flush before cleanup minimizes data loss
Implementation:
---
```py
# tests/integration/test_streaming_real_pipeline.py
final_updates = [
update
for update in updates
if update.update_type == noteflow_pb2.UPDATE_TYPE_FINAL
]
assert final_updates, "Expected at least one final transcript update"
```
## 7. Recommendations
Also consider a transport-level gRPC streaming test (real server, real client) to avoid relying on private methods.
### Immediate - All Completed ✅
1. ~~Add concurrent stream test for same meeting_id race~~`test_concurrent_cleanup_same_meeting_safe`
2. ~~Add double-start protection test~~`test_double_start_same_meeting_id_detected`
3. ~~Add stop-before-first-chunk test~~`test_stop_request_before_stream_active`
### 4) Migration tests are structural only (no runtime upgrade/downgrade)
**Impact:** Migrations can compile but still fail or produce data issues at runtime.
### Future
1. Consider explicit stream shutdown signal in `servicer.shutdown()`
2. Add metrics for cleanup timing (detect slow cleanup)
3. Consider circuit breaker for new RPCs during shutdown
**Evidence:** Tests only parse AST / check strings.
---
```py
# tests/infrastructure/persistence/test_migrations.py
content = path.read_text()
assert "CREATE TRIGGER" in content
```
## 8. Critical Code Locations
**Targeted fix:** Add a runtime migration test that upgrades then downgrades and performs a CRUD sanity check (implemented). Also move Alembic's version table to `public` so base downgrades can drop the `noteflow` schema without removing `alembic_version`.
| Component | File | Lines |
|-----------|------|-------|
| StreamTranscription RPC | `_mixins/streaming.py` | 56-115 |
| Stream initialization | `_mixins/streaming.py` | 117-209 |
| Chunk processing | `_mixins/streaming.py` | 254-549 |
| Cleanup method | `service.py` | 226-242 |
| Audio writer ops | `service.py` | 279-326 |
| Graceful stop | `_mixins/meeting.py` | 49-115 |
| Shutdown | `service.py` | 379-433 |
| Task cancellation | `_mixins/diarization_job.py` | 153-223 |
Implementation:
```py
# tests/integration/test_migration_roundtrip.py
command.upgrade(_alembic_config(), "head")
asyncio.run(_crud_roundtrip(database_url, tmp_path))
command.downgrade(_alembic_config(), "base")
```
Related fix:
```py
# src/noteflow/infrastructure/persistence/migrations/env.py
context.configure(
# ...
version_table_schema="public",
)
```
### 5) Device/hardware tests are ignored by default
**Impact:** Audio device compatibility issues likely to be found late and manually.
**Evidence:** Tests are ignored unless explicitly enabled.
```rs
// client/src-tauri/tests/device_integration.rs
#[test]
#[ignore = "requires physical audio devices"]
fn input_device_available() { /* ... */ }
```
**Targeted fix:** Keep ignored by default but add a visible, documented manual run target and periodic CI job.
Example (doc + script):
```bash
# Manual run
NOTEFLOW_DEVICE_TESTS=1 cargo test --test device_integration -- --ignored
```
## Targeted Fix Checklist (actionable)
- Add E2E smoke test directory and runner; expand into full-stack flow.
- Add `test:rs` and `test:all` scripts; ensure CI runs them.
- Add real-pipeline streaming integration test using a deterministic audio sequence.
- Add runtime migration upgrade/downgrade test.
- Document and schedule device tests (manual/weekly CI).
## Suggested Definition of Done (Test Quality)
- Full-stack E2E smoke test added and passing.
- Rust tests run in CI or standard scripts.
- At least one real audio pipeline integration test added.
- Migrations validated by upgrade + downgrade test.
- Device tests documented and scheduled for periodic execution.