Files
noteflow/tests/infrastructure/metrics/test_infrastructure_metrics.py
Travis Vasceannie d8090a98e8
Some checks failed
CI / test-typescript (push) Has been cancelled
CI / test-rust (push) Has been cancelled
CI / test-python (push) Has been cancelled
ci/cd fixes
2026-01-26 00:28:15 +00:00

233 lines
8.0 KiB
Python

"""Tests for infrastructure metrics tracking."""
from __future__ import annotations
import time
import pytest
from noteflow.infrastructure.metrics import (
InfrastructureMetrics,
InfrastructureStats,
get_infrastructure_metrics,
reset_infrastructure_metrics,
)
@pytest.fixture(autouse=True)
def reset_metrics_fixture() -> None:
"""Reset metrics singleton before each test."""
reset_infrastructure_metrics()
def test_record_fallback_increments_count() -> None:
"""Test that recording a fallback increments the count."""
metrics = InfrastructureMetrics()
metrics.record_fallback("ollama_settings", "ImportError")
stats = metrics.get_infrastructure_stats()
assert stats.total_fallbacks == 1, "Should record one fallback"
assert stats.fallbacks_by_component == {
"ollama_settings": 1
}, "Should track fallback by component"
def test_record_fallback_multiple_components() -> None:
"""Test that fallbacks are tracked by component."""
metrics = InfrastructureMetrics()
metrics.record_fallback("ollama_settings", "ImportError")
metrics.record_fallback("calendar_adapter", "ValueError")
metrics.record_fallback("ollama_settings", "KeyError")
stats = metrics.get_infrastructure_stats()
assert stats.total_fallbacks == 3, "Should count all fallbacks"
assert stats.fallbacks_by_component == {
"ollama_settings": 2,
"calendar_adapter": 1,
}, "Should aggregate by component"
def test_record_buffer_overflow_tracks_samples() -> None:
"""Test that buffer overflows track dropped samples."""
metrics = InfrastructureMetrics()
first_drop = 1024
second_drop = 512
expected_total = first_drop + second_drop
metrics.record_buffer_overflow("partial_audio", first_drop)
metrics.record_buffer_overflow("partial_audio", second_drop)
stats = metrics.get_infrastructure_stats()
assert stats.total_buffer_overflows == 2, "Should count both overflows"
assert stats.samples_dropped == expected_total, "Should sum dropped samples"
def test_record_provider_unavailable_tracks_providers() -> None:
"""Test that provider unavailability is tracked."""
metrics = InfrastructureMetrics()
metrics.record_provider_unavailable("sounddevice", "no_loopback_device")
metrics.record_provider_unavailable("pywinctl", "import_error")
stats = metrics.get_infrastructure_stats()
assert stats.unavailable_providers == {"sounddevice", "pywinctl"}
def test_record_provider_unavailable_deduplicates() -> None:
"""Test that provider unavailability deduplicates providers."""
metrics = InfrastructureMetrics()
metrics.record_provider_unavailable("sounddevice", "no_loopback_device")
metrics.record_provider_unavailable("sounddevice", "stream_failed")
stats = metrics.get_infrastructure_stats()
assert stats.unavailable_providers == {"sounddevice"}
def test_get_stats_returns_empty_when_no_events() -> None:
"""Test that empty stats are returned when no events recorded."""
metrics = InfrastructureMetrics()
stats = metrics.get_infrastructure_stats()
expected_empty = InfrastructureStats.empty()
# Verify all fields match empty stats
assert (
stats.total_fallbacks == expected_empty.total_fallbacks
and stats.fallbacks_by_component == expected_empty.fallbacks_by_component
and stats.total_buffer_overflows == expected_empty.total_buffer_overflows
and stats.samples_dropped == expected_empty.samples_dropped
and stats.unavailable_providers == expected_empty.unavailable_providers
), f"Empty stats mismatch: {stats} != {expected_empty}"
def test_singleton_returns_same_instance() -> None:
"""Test that get_infrastructure_metrics returns singleton."""
metrics1 = get_infrastructure_metrics()
metrics2 = get_infrastructure_metrics()
assert metrics1 is metrics2
def test_reset_clears_singleton() -> None:
"""Test that reset_infrastructure_metrics clears state."""
metrics1 = get_infrastructure_metrics()
metrics1.record_fallback("test", "Error")
reset_infrastructure_metrics()
metrics2 = get_infrastructure_metrics()
assert metrics1 is not metrics2, "Reset should create new instance"
assert metrics2.get_infrastructure_stats().total_fallbacks == 0, "New instance should be empty"
def test_rolling_window_pruning() -> None:
"""Test that old metrics are pruned from rolling window."""
from noteflow.infrastructure.metrics.infrastructure_metrics import (
METRIC_WINDOW_SECONDS,
)
metrics = InfrastructureMetrics()
# Record metric
metrics.record_fallback("test", "Error")
assert metrics.get_infrastructure_stats().total_fallbacks == 1, "Should record initial metric"
# Advance time beyond window (monkey-patch time.time)
original_time = time.time
try:
time.time = lambda: original_time() + METRIC_WINDOW_SECONDS + 1 # type: ignore[method-assign]
# Metric should be pruned
assert metrics.get_infrastructure_stats().total_fallbacks == 0, "Old metrics should be pruned"
finally:
time.time = original_time # type: ignore[method-assign]
def test_infrastructure_stats_empty_factory() -> None:
"""Test InfrastructureStats.empty() factory."""
stats = InfrastructureStats.empty()
expected_zero = 0
expected_empty_dict: dict[str, int] = {}
expected_empty_set: set[str] = set()
assert stats.total_fallbacks == expected_zero, "Fallbacks should be zero"
assert stats.fallbacks_by_component == expected_empty_dict, "Component dict should be empty"
assert stats.total_buffer_overflows == expected_zero, "Overflows should be zero"
assert stats.samples_dropped == expected_zero, "Dropped samples should be zero"
assert stats.unavailable_providers == expected_empty_set, "Providers set should be empty"
def test_thread_safety_fallbacks(monkeypatch: pytest.MonkeyPatch) -> None:
"""Test that fallback recording is thread-safe."""
import threading
metrics = InfrastructureMetrics()
iterations = 100
def record_fallbacks() -> None:
for _ in range(iterations):
metrics.record_fallback("component", "Error")
threads = [threading.Thread(target=record_fallbacks) for _ in range(10)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
stats = metrics.get_infrastructure_stats()
assert stats.total_fallbacks == 10 * iterations
def test_thread_safety_overflows(monkeypatch: pytest.MonkeyPatch) -> None:
"""Test that overflow recording is thread-safe."""
import threading
metrics = InfrastructureMetrics()
iterations = 100
num_threads = 10
samples_per_overflow = 10
def record_overflows() -> None:
for _ in range(iterations):
metrics.record_buffer_overflow("buffer", samples_per_overflow)
threads = [threading.Thread(target=record_overflows) for _ in range(num_threads)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
expected_overflows = num_threads * iterations
expected_dropped = expected_overflows * samples_per_overflow
stats = metrics.get_infrastructure_stats()
assert stats.total_buffer_overflows == expected_overflows, "Should count all overflows"
assert stats.samples_dropped == expected_dropped, "Should sum all dropped samples"
def test_thread_safety_providers(monkeypatch: pytest.MonkeyPatch) -> None:
"""Test that provider recording is thread-safe."""
import threading
metrics = InfrastructureMetrics()
iterations = 100
def record_providers() -> None:
for i in range(iterations):
metrics.record_provider_unavailable(f"provider_{i % 5}", "reason")
threads = [threading.Thread(target=record_providers) for _ in range(10)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
stats = metrics.get_infrastructure_stats()
# Should have 5 unique providers (provider_0 through provider_4)
assert len(stats.unavailable_providers) == 5