Files
noteflow/tests/grpc/test_streaming_metrics.py
Travis Vasceannie 1ce24cdf7b feat: reorganize Claude hooks and add RAG documentation structure with error handling policies
- Moved all hookify configuration files from `.claude/` to `.claude/hooks/` subdirectory for better organization
- Added four new blocking hooks to prevent common error handling anti-patterns:
  - `block-broad-exception-handler`: Prevents catching generic `Exception` with only logging
  - `block-datetime-now-fallback`: Blocks returning `datetime.now()` as fallback on parse failures to prevent data corruption
  - `block-default
2026-01-15 15:58:06 +00:00

296 lines
10 KiB
Python

"""Tests for StreamingErrorMetrics class.
Tests the streaming error metrics tracking functionality including
counter increments, stats retrieval, and thread safety.
Sprint GAP-003: Error Handling Mismatches
"""
from __future__ import annotations
import threading
from concurrent.futures import ThreadPoolExecutor
import pytest
from noteflow.grpc.mixins._metrics import (
StreamingErrorMetrics,
get_streaming_metrics,
reset_streaming_metrics,
)
# Test constants
AUDIO_DECODE_ERROR = "audio_decode_failed"
"""Error type for audio decoding failures."""
VAD_ERROR = "vad_processing_failed"
"""Error type for VAD processing failures."""
DIARIZATION_ERROR = "diarization_failed"
"""Error type for diarization failures."""
MEETING_ID_1 = "meeting-001"
"""Test meeting ID for primary meeting."""
MEETING_ID_2 = "meeting-002"
"""Test meeting ID for secondary meeting."""
CONCURRENT_THREAD_COUNT = 10
"""Number of threads for thread-safety tests."""
ITERATIONS_PER_THREAD = 100
"""Iterations per thread for thread-safety tests."""
def _run_concurrent_record_and_read(metrics: StreamingErrorMetrics) -> None:
"""Run concurrent record and read operations for thread safety testing."""
errors_recorded = threading.Event()
def record_loop() -> None:
for _ in range(ITERATIONS_PER_THREAD):
metrics.record_error(AUDIO_DECODE_ERROR, MEETING_ID_1)
errors_recorded.set()
def read_loop() -> None:
while not errors_recorded.is_set():
counts = metrics.get_counts()
_ = counts.get(AUDIO_DECODE_ERROR, 0)
record_thread = threading.Thread(target=record_loop)
read_thread = threading.Thread(target=read_loop)
record_thread.start()
read_thread.start()
record_thread.join()
read_thread.join()
@pytest.fixture
def metrics() -> StreamingErrorMetrics:
"""Create fresh StreamingErrorMetrics instance for testing.
Returns:
New StreamingErrorMetrics instance with no recorded errors.
"""
return StreamingErrorMetrics()
@pytest.fixture(autouse=True)
def reset_global_metrics() -> None:
"""Reset global metrics singleton before each test.
Ensures test isolation by clearing any state from previous tests.
"""
reset_streaming_metrics()
class TestRecordError:
"""Tests for StreamingErrorMetrics.record_error method."""
def test_increments_counter_for_single_error_type(
self, metrics: StreamingErrorMetrics
) -> None:
"""Verify recording an error increments the counter for that type."""
metrics.record_error(AUDIO_DECODE_ERROR, MEETING_ID_1)
counts = metrics.get_counts()
assert counts[AUDIO_DECODE_ERROR] == 1, (
"Counter should be 1 after single error recording"
)
def test_increments_counter_multiple_times_same_type(
self, metrics: StreamingErrorMetrics
) -> None:
"""Verify repeated recordings increment the same counter."""
increment_count = 3
metrics.record_error(AUDIO_DECODE_ERROR, MEETING_ID_1)
metrics.record_error(AUDIO_DECODE_ERROR, MEETING_ID_1)
metrics.record_error(AUDIO_DECODE_ERROR, MEETING_ID_2)
counts = metrics.get_counts()
assert counts[AUDIO_DECODE_ERROR] == increment_count, (
f"Counter should be {increment_count} after three recordings"
)
def test_tracks_different_error_types_independently(
self, metrics: StreamingErrorMetrics
) -> None:
"""Verify different error types maintain separate counters."""
metrics.record_error(AUDIO_DECODE_ERROR, MEETING_ID_1)
metrics.record_error(VAD_ERROR, MEETING_ID_1)
metrics.record_error(VAD_ERROR, MEETING_ID_2)
counts = metrics.get_counts()
assert counts[AUDIO_DECODE_ERROR] == 1, "Audio decode counter should be 1"
assert counts[VAD_ERROR] == 2, "VAD error counter should be 2"
@pytest.mark.parametrize(
("error_type", "meeting_id"),
[
pytest.param(AUDIO_DECODE_ERROR, MEETING_ID_1, id="audio_decode_meeting1"),
pytest.param(VAD_ERROR, MEETING_ID_2, id="vad_error_meeting2"),
pytest.param(DIARIZATION_ERROR, MEETING_ID_1, id="diarization_meeting1"),
],
)
def test_records_various_error_types(
self, metrics: StreamingErrorMetrics, error_type: str, meeting_id: str
) -> None:
"""Verify various error types can be recorded successfully."""
metrics.record_error(error_type, meeting_id)
counts = metrics.get_counts()
assert error_type in counts, f"Error type '{error_type}' should be in counts"
assert counts[error_type] == 1, f"Counter for '{error_type}' should be 1"
class TestGetCounts:
"""Tests for StreamingErrorMetrics.get_counts method."""
def test_returns_empty_dict_when_no_errors(
self, metrics: StreamingErrorMetrics
) -> None:
"""Verify get_counts returns empty dict with no recorded errors."""
counts = metrics.get_counts()
assert counts == {}, "Counts should be empty dict when no errors recorded"
def test_returns_copy_of_internal_state(
self, metrics: StreamingErrorMetrics
) -> None:
"""Verify get_counts returns a copy, not a reference."""
metrics.record_error(AUDIO_DECODE_ERROR, MEETING_ID_1)
counts = metrics.get_counts()
# Modify returned dict
counts[AUDIO_DECODE_ERROR] = 999
# Original should be unchanged
assert metrics.get_counts()[AUDIO_DECODE_ERROR] == 1, (
"Internal state should not be modified by external changes"
)
def test_returns_all_recorded_error_types(
self, metrics: StreamingErrorMetrics
) -> None:
"""Verify get_counts includes all recorded error types."""
metrics.record_error(AUDIO_DECODE_ERROR, MEETING_ID_1)
metrics.record_error(VAD_ERROR, MEETING_ID_1)
metrics.record_error(DIARIZATION_ERROR, MEETING_ID_1)
counts = metrics.get_counts()
expected_types = {AUDIO_DECODE_ERROR, VAD_ERROR, DIARIZATION_ERROR}
assert set(counts.keys()) == expected_types, (
f"Should return all error types: expected {expected_types}, got {set(counts.keys())}"
)
class TestReset:
"""Tests for StreamingErrorMetrics.reset method."""
def test_clears_all_error_counts(self, metrics: StreamingErrorMetrics) -> None:
"""Verify reset clears all recorded errors."""
metrics.record_error(AUDIO_DECODE_ERROR, MEETING_ID_1)
metrics.record_error(VAD_ERROR, MEETING_ID_2)
metrics.reset()
counts = metrics.get_counts()
assert counts == {}, "Counts should be empty after reset"
def test_allows_new_recordings_after_reset(
self, metrics: StreamingErrorMetrics
) -> None:
"""Verify new errors can be recorded after reset."""
metrics.record_error(AUDIO_DECODE_ERROR, MEETING_ID_1)
metrics.reset()
metrics.record_error(VAD_ERROR, MEETING_ID_1)
counts = metrics.get_counts()
assert AUDIO_DECODE_ERROR not in counts, (
"Old error type should not be present after reset"
)
assert counts[VAD_ERROR] == 1, "New error should be recorded after reset"
class TestThreadSafety:
"""Tests for StreamingErrorMetrics thread safety."""
def test_concurrent_record_error_preserves_count(
self, metrics: StreamingErrorMetrics
) -> None:
"""Verify concurrent record_error calls produce correct total count."""
expected_total = CONCURRENT_THREAD_COUNT * ITERATIONS_PER_THREAD
def record_errors() -> None:
for _ in range(ITERATIONS_PER_THREAD):
metrics.record_error(AUDIO_DECODE_ERROR, MEETING_ID_1)
with ThreadPoolExecutor(max_workers=CONCURRENT_THREAD_COUNT) as executor:
futures = [executor.submit(record_errors) for _ in range(CONCURRENT_THREAD_COUNT)]
for future in futures:
future.result()
counts = metrics.get_counts()
assert counts[AUDIO_DECODE_ERROR] == expected_total, (
f"Expected {expected_total} errors after concurrent recording, "
f"got {counts[AUDIO_DECODE_ERROR]}"
)
def test_concurrent_mixed_operations(
self, metrics: StreamingErrorMetrics
) -> None:
"""Verify concurrent record and get_counts operations are safe."""
_run_concurrent_record_and_read(metrics)
counts = metrics.get_counts()
assert counts[AUDIO_DECODE_ERROR] == ITERATIONS_PER_THREAD, (
f"Expected {ITERATIONS_PER_THREAD} errors after concurrent operations"
)
class TestGlobalSingleton:
"""Tests for global singleton functions."""
def test_get_streaming_metrics_returns_singleton(self) -> None:
"""Verify get_streaming_metrics returns same instance."""
metrics1 = get_streaming_metrics()
metrics2 = get_streaming_metrics()
assert metrics1 is metrics2, "Should return same singleton instance"
def test_metrics_singleton_persists_state(self) -> None:
"""Verify state persists across get_streaming_metrics calls."""
metrics = get_streaming_metrics()
metrics.record_error(AUDIO_DECODE_ERROR, MEETING_ID_1)
# Get again and verify state
metrics_again = get_streaming_metrics()
counts = metrics_again.get_counts()
assert counts[AUDIO_DECODE_ERROR] == 1, (
"State should persist across singleton access"
)
def test_reset_streaming_metrics_clears_singleton(self) -> None:
"""Verify reset_streaming_metrics clears the singleton."""
metrics = get_streaming_metrics()
metrics.record_error(AUDIO_DECODE_ERROR, MEETING_ID_1)
reset_streaming_metrics()
# New singleton should be fresh
new_metrics = get_streaming_metrics()
counts = new_metrics.get_counts()
assert counts == {}, "New singleton should have no errors after reset"
def test_metrics_reset_creates_new_instance(self) -> None:
"""Verify reset_streaming_metrics creates a new instance."""
metrics_before = get_streaming_metrics()
reset_streaming_metrics()
metrics_after = get_streaming_metrics()
assert metrics_before is not metrics_after, (
"Reset should create a new instance"
)