Files
noteflow/tests/grpc/test_diarization_refine.py
Travis Vasceannie 1ce24cdf7b feat: reorganize Claude hooks and add RAG documentation structure with error handling policies
- Moved all hookify configuration files from `.claude/` to `.claude/hooks/` subdirectory for better organization
- Added four new blocking hooks to prevent common error handling anti-patterns:
  - `block-broad-exception-handler`: Prevents catching generic `Exception` with only logging
  - `block-datetime-now-fallback`: Blocks returning `datetime.now()` as fallback on parse failures to prevent data corruption
  - `block-default
2026-01-15 15:58:06 +00:00

75 lines
2.3 KiB
Python

"""Tests for RefineSpeakerDiarization RPC guards."""
from __future__ import annotations
from typing import Protocol, cast
import grpc
import pytest
from noteflow.grpc.config.config import ServicesConfig
from noteflow.grpc.proto import noteflow_pb2
from noteflow.grpc.service import NoteFlowServicer
from noteflow.infrastructure.diarization import DiarizationEngine
class _RefineSpeakerDiarizationRequest(Protocol):
meeting_id: str
num_speakers: int
class _RefineSpeakerDiarizationResponse(Protocol):
segments_updated: int
error_message: str
class _RefineSpeakerDiarizationCallable(Protocol):
async def __call__(
self,
request: _RefineSpeakerDiarizationRequest,
context: _DummyContext,
) -> _RefineSpeakerDiarizationResponse: ...
class _DummyContext:
"""Minimal gRPC context that raises if abort is invoked."""
async def abort(self, code: grpc.StatusCode, details: str) -> None:
raise AssertionError(f"abort called: {code} - {details}")
def set_code(self, code: grpc.StatusCode) -> None:
"""Record response status code (unused in these tests)."""
return None
def set_details(self, details: str) -> None:
"""Record response status details (unused in these tests)."""
return None
class _FakeDiarizationEngine(DiarizationEngine):
"""Lightweight diarization engine stub for service config."""
def __init__(self) -> None:
super().__init__()
@pytest.mark.asyncio
async def test_refine_speaker_diarization_rejects_active_meeting() -> None:
"""Refinement should be blocked while a meeting is still recording."""
servicer = NoteFlowServicer(services=ServicesConfig(diarization_engine=_FakeDiarizationEngine()))
store = servicer.get_memory_store()
meeting = store.create("Active meeting")
meeting.start_recording()
store.update(meeting)
refine = cast(_RefineSpeakerDiarizationCallable, servicer.RefineSpeakerDiarization)
response = await refine(
noteflow_pb2.RefineSpeakerDiarizationRequest(meeting_id=str(meeting.id)),
_DummyContext(),
)
assert response.segments_updated == 0, "no segments should be updated for active meeting"
assert response.error_message, "error message should be present"
assert "stopped" in response.error_message.lower(), "error should mention 'stopped'"