- Moved all hookify configuration files from `.claude/` to `.claude/hooks/` subdirectory for better organization - Added four new blocking hooks to prevent common error handling anti-patterns: - `block-broad-exception-handler`: Prevents catching generic `Exception` with only logging - `block-datetime-now-fallback`: Blocks returning `datetime.now()` as fallback on parse failures to prevent data corruption - `block-default
296 lines
10 KiB
Python
296 lines
10 KiB
Python
"""Tests for StreamingErrorMetrics class.
|
|
|
|
Tests the streaming error metrics tracking functionality including
|
|
counter increments, stats retrieval, and thread safety.
|
|
|
|
Sprint GAP-003: Error Handling Mismatches
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import threading
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
|
import pytest
|
|
|
|
from noteflow.grpc.mixins._metrics import (
|
|
StreamingErrorMetrics,
|
|
get_streaming_metrics,
|
|
reset_streaming_metrics,
|
|
)
|
|
|
|
# Test constants
|
|
AUDIO_DECODE_ERROR = "audio_decode_failed"
|
|
"""Error type for audio decoding failures."""
|
|
|
|
VAD_ERROR = "vad_processing_failed"
|
|
"""Error type for VAD processing failures."""
|
|
|
|
DIARIZATION_ERROR = "diarization_failed"
|
|
"""Error type for diarization failures."""
|
|
|
|
MEETING_ID_1 = "meeting-001"
|
|
"""Test meeting ID for primary meeting."""
|
|
|
|
MEETING_ID_2 = "meeting-002"
|
|
"""Test meeting ID for secondary meeting."""
|
|
|
|
CONCURRENT_THREAD_COUNT = 10
|
|
"""Number of threads for thread-safety tests."""
|
|
|
|
ITERATIONS_PER_THREAD = 100
|
|
"""Iterations per thread for thread-safety tests."""
|
|
|
|
|
|
def _run_concurrent_record_and_read(metrics: StreamingErrorMetrics) -> None:
|
|
"""Run concurrent record and read operations for thread safety testing."""
|
|
errors_recorded = threading.Event()
|
|
|
|
def record_loop() -> None:
|
|
for _ in range(ITERATIONS_PER_THREAD):
|
|
metrics.record_error(AUDIO_DECODE_ERROR, MEETING_ID_1)
|
|
errors_recorded.set()
|
|
|
|
def read_loop() -> None:
|
|
while not errors_recorded.is_set():
|
|
counts = metrics.get_counts()
|
|
_ = counts.get(AUDIO_DECODE_ERROR, 0)
|
|
|
|
record_thread = threading.Thread(target=record_loop)
|
|
read_thread = threading.Thread(target=read_loop)
|
|
record_thread.start()
|
|
read_thread.start()
|
|
record_thread.join()
|
|
read_thread.join()
|
|
|
|
|
|
@pytest.fixture
|
|
def metrics() -> StreamingErrorMetrics:
|
|
"""Create fresh StreamingErrorMetrics instance for testing.
|
|
|
|
Returns:
|
|
New StreamingErrorMetrics instance with no recorded errors.
|
|
"""
|
|
return StreamingErrorMetrics()
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_global_metrics() -> None:
|
|
"""Reset global metrics singleton before each test.
|
|
|
|
Ensures test isolation by clearing any state from previous tests.
|
|
"""
|
|
reset_streaming_metrics()
|
|
|
|
|
|
class TestRecordError:
|
|
"""Tests for StreamingErrorMetrics.record_error method."""
|
|
|
|
def test_increments_counter_for_single_error_type(
|
|
self, metrics: StreamingErrorMetrics
|
|
) -> None:
|
|
"""Verify recording an error increments the counter for that type."""
|
|
metrics.record_error(AUDIO_DECODE_ERROR, MEETING_ID_1)
|
|
|
|
counts = metrics.get_counts()
|
|
assert counts[AUDIO_DECODE_ERROR] == 1, (
|
|
"Counter should be 1 after single error recording"
|
|
)
|
|
|
|
def test_increments_counter_multiple_times_same_type(
|
|
self, metrics: StreamingErrorMetrics
|
|
) -> None:
|
|
"""Verify repeated recordings increment the same counter."""
|
|
increment_count = 3
|
|
metrics.record_error(AUDIO_DECODE_ERROR, MEETING_ID_1)
|
|
metrics.record_error(AUDIO_DECODE_ERROR, MEETING_ID_1)
|
|
metrics.record_error(AUDIO_DECODE_ERROR, MEETING_ID_2)
|
|
|
|
counts = metrics.get_counts()
|
|
assert counts[AUDIO_DECODE_ERROR] == increment_count, (
|
|
f"Counter should be {increment_count} after three recordings"
|
|
)
|
|
|
|
def test_tracks_different_error_types_independently(
|
|
self, metrics: StreamingErrorMetrics
|
|
) -> None:
|
|
"""Verify different error types maintain separate counters."""
|
|
metrics.record_error(AUDIO_DECODE_ERROR, MEETING_ID_1)
|
|
metrics.record_error(VAD_ERROR, MEETING_ID_1)
|
|
metrics.record_error(VAD_ERROR, MEETING_ID_2)
|
|
|
|
counts = metrics.get_counts()
|
|
assert counts[AUDIO_DECODE_ERROR] == 1, "Audio decode counter should be 1"
|
|
assert counts[VAD_ERROR] == 2, "VAD error counter should be 2"
|
|
|
|
@pytest.mark.parametrize(
|
|
("error_type", "meeting_id"),
|
|
[
|
|
pytest.param(AUDIO_DECODE_ERROR, MEETING_ID_1, id="audio_decode_meeting1"),
|
|
pytest.param(VAD_ERROR, MEETING_ID_2, id="vad_error_meeting2"),
|
|
pytest.param(DIARIZATION_ERROR, MEETING_ID_1, id="diarization_meeting1"),
|
|
],
|
|
)
|
|
def test_records_various_error_types(
|
|
self, metrics: StreamingErrorMetrics, error_type: str, meeting_id: str
|
|
) -> None:
|
|
"""Verify various error types can be recorded successfully."""
|
|
metrics.record_error(error_type, meeting_id)
|
|
|
|
counts = metrics.get_counts()
|
|
assert error_type in counts, f"Error type '{error_type}' should be in counts"
|
|
assert counts[error_type] == 1, f"Counter for '{error_type}' should be 1"
|
|
|
|
|
|
class TestGetCounts:
|
|
"""Tests for StreamingErrorMetrics.get_counts method."""
|
|
|
|
def test_returns_empty_dict_when_no_errors(
|
|
self, metrics: StreamingErrorMetrics
|
|
) -> None:
|
|
"""Verify get_counts returns empty dict with no recorded errors."""
|
|
counts = metrics.get_counts()
|
|
|
|
assert counts == {}, "Counts should be empty dict when no errors recorded"
|
|
|
|
def test_returns_copy_of_internal_state(
|
|
self, metrics: StreamingErrorMetrics
|
|
) -> None:
|
|
"""Verify get_counts returns a copy, not a reference."""
|
|
metrics.record_error(AUDIO_DECODE_ERROR, MEETING_ID_1)
|
|
counts = metrics.get_counts()
|
|
|
|
# Modify returned dict
|
|
counts[AUDIO_DECODE_ERROR] = 999
|
|
|
|
# Original should be unchanged
|
|
assert metrics.get_counts()[AUDIO_DECODE_ERROR] == 1, (
|
|
"Internal state should not be modified by external changes"
|
|
)
|
|
|
|
def test_returns_all_recorded_error_types(
|
|
self, metrics: StreamingErrorMetrics
|
|
) -> None:
|
|
"""Verify get_counts includes all recorded error types."""
|
|
metrics.record_error(AUDIO_DECODE_ERROR, MEETING_ID_1)
|
|
metrics.record_error(VAD_ERROR, MEETING_ID_1)
|
|
metrics.record_error(DIARIZATION_ERROR, MEETING_ID_1)
|
|
|
|
counts = metrics.get_counts()
|
|
expected_types = {AUDIO_DECODE_ERROR, VAD_ERROR, DIARIZATION_ERROR}
|
|
|
|
assert set(counts.keys()) == expected_types, (
|
|
f"Should return all error types: expected {expected_types}, got {set(counts.keys())}"
|
|
)
|
|
|
|
|
|
class TestReset:
|
|
"""Tests for StreamingErrorMetrics.reset method."""
|
|
|
|
def test_clears_all_error_counts(self, metrics: StreamingErrorMetrics) -> None:
|
|
"""Verify reset clears all recorded errors."""
|
|
metrics.record_error(AUDIO_DECODE_ERROR, MEETING_ID_1)
|
|
metrics.record_error(VAD_ERROR, MEETING_ID_2)
|
|
|
|
metrics.reset()
|
|
|
|
counts = metrics.get_counts()
|
|
assert counts == {}, "Counts should be empty after reset"
|
|
|
|
def test_allows_new_recordings_after_reset(
|
|
self, metrics: StreamingErrorMetrics
|
|
) -> None:
|
|
"""Verify new errors can be recorded after reset."""
|
|
metrics.record_error(AUDIO_DECODE_ERROR, MEETING_ID_1)
|
|
metrics.reset()
|
|
metrics.record_error(VAD_ERROR, MEETING_ID_1)
|
|
|
|
counts = metrics.get_counts()
|
|
assert AUDIO_DECODE_ERROR not in counts, (
|
|
"Old error type should not be present after reset"
|
|
)
|
|
assert counts[VAD_ERROR] == 1, "New error should be recorded after reset"
|
|
|
|
|
|
class TestThreadSafety:
|
|
"""Tests for StreamingErrorMetrics thread safety."""
|
|
|
|
def test_concurrent_record_error_preserves_count(
|
|
self, metrics: StreamingErrorMetrics
|
|
) -> None:
|
|
"""Verify concurrent record_error calls produce correct total count."""
|
|
expected_total = CONCURRENT_THREAD_COUNT * ITERATIONS_PER_THREAD
|
|
|
|
def record_errors() -> None:
|
|
for _ in range(ITERATIONS_PER_THREAD):
|
|
metrics.record_error(AUDIO_DECODE_ERROR, MEETING_ID_1)
|
|
|
|
with ThreadPoolExecutor(max_workers=CONCURRENT_THREAD_COUNT) as executor:
|
|
futures = [executor.submit(record_errors) for _ in range(CONCURRENT_THREAD_COUNT)]
|
|
for future in futures:
|
|
future.result()
|
|
|
|
counts = metrics.get_counts()
|
|
assert counts[AUDIO_DECODE_ERROR] == expected_total, (
|
|
f"Expected {expected_total} errors after concurrent recording, "
|
|
f"got {counts[AUDIO_DECODE_ERROR]}"
|
|
)
|
|
|
|
def test_concurrent_mixed_operations(
|
|
self, metrics: StreamingErrorMetrics
|
|
) -> None:
|
|
"""Verify concurrent record and get_counts operations are safe."""
|
|
_run_concurrent_record_and_read(metrics)
|
|
counts = metrics.get_counts()
|
|
|
|
assert counts[AUDIO_DECODE_ERROR] == ITERATIONS_PER_THREAD, (
|
|
f"Expected {ITERATIONS_PER_THREAD} errors after concurrent operations"
|
|
)
|
|
|
|
|
|
class TestGlobalSingleton:
|
|
"""Tests for global singleton functions."""
|
|
|
|
def test_get_streaming_metrics_returns_singleton(self) -> None:
|
|
"""Verify get_streaming_metrics returns same instance."""
|
|
metrics1 = get_streaming_metrics()
|
|
metrics2 = get_streaming_metrics()
|
|
|
|
assert metrics1 is metrics2, "Should return same singleton instance"
|
|
|
|
def test_metrics_singleton_persists_state(self) -> None:
|
|
"""Verify state persists across get_streaming_metrics calls."""
|
|
metrics = get_streaming_metrics()
|
|
metrics.record_error(AUDIO_DECODE_ERROR, MEETING_ID_1)
|
|
|
|
# Get again and verify state
|
|
metrics_again = get_streaming_metrics()
|
|
counts = metrics_again.get_counts()
|
|
|
|
assert counts[AUDIO_DECODE_ERROR] == 1, (
|
|
"State should persist across singleton access"
|
|
)
|
|
|
|
def test_reset_streaming_metrics_clears_singleton(self) -> None:
|
|
"""Verify reset_streaming_metrics clears the singleton."""
|
|
metrics = get_streaming_metrics()
|
|
metrics.record_error(AUDIO_DECODE_ERROR, MEETING_ID_1)
|
|
|
|
reset_streaming_metrics()
|
|
|
|
# New singleton should be fresh
|
|
new_metrics = get_streaming_metrics()
|
|
counts = new_metrics.get_counts()
|
|
|
|
assert counts == {}, "New singleton should have no errors after reset"
|
|
|
|
def test_metrics_reset_creates_new_instance(self) -> None:
|
|
"""Verify reset_streaming_metrics creates a new instance."""
|
|
metrics_before = get_streaming_metrics()
|
|
reset_streaming_metrics()
|
|
metrics_after = get_streaming_metrics()
|
|
|
|
assert metrics_before is not metrics_after, (
|
|
"Reset should create a new instance"
|
|
)
|