Files
noteflow/tests/grpc/test_congestion_tracking.py
Travis Vasceannie 1ce24cdf7b feat: reorganize Claude hooks and add RAG documentation structure with error handling policies
- Moved all hookify configuration files from `.claude/` to `.claude/hooks/` subdirectory for better organization
- Added four new blocking hooks to prevent common error handling anti-patterns:
  - `block-broad-exception-handler`: Prevents catching generic `Exception` with only logging
  - `block-datetime-now-fallback`: Blocks returning `datetime.now()` as fallback on parse failures to prevent data corruption
  - `block-default
2026-01-15 15:58:06 +00:00

222 lines
9.8 KiB
Python

"""Tests for congestion tracking and backpressure signaling (Phase 3).
Tests the congestion calculation and pending chunk management for
streaming transcription backpressure.
"""
from __future__ import annotations
from collections import deque
from unittest.mock import MagicMock
import pytest
from noteflow.grpc.mixins.streaming._processing._congestion import (
calculate_congestion_info,
decrement_pending_chunks,
)
from noteflow.grpc.mixins.streaming._processing._constants import (
PROCESSING_DELAY_THRESHOLD_MS,
QUEUE_DEPTH_THRESHOLD,
)
@pytest.fixture
def congestion_host() -> MagicMock:
"""Create mock host with congestion tracking initialized."""
host = MagicMock()
host.chunk_receipt_times = {}
host.pending_chunks = {}
return host
class TestCalculateCongestionInfo:
"""Tests for calculate_congestion_info function."""
def test_returns_zero_delay_when_no_receipts(self, congestion_host: MagicMock) -> None:
"""Verify zero processing delay when no receipt times tracked."""
meeting_id = "test-meeting"
result = calculate_congestion_info(congestion_host, meeting_id, 1000.0)
assert result.processing_delay_ms == 0, "delay should be 0 with no receipts"
assert result.queue_depth == 0, "queue depth should be 0"
assert result.throttle_recommended is False, "throttle not recommended"
def test_calculates_processing_delay_from_oldest_receipt(
self, congestion_host: MagicMock
) -> None:
"""Verify processing delay is calculated from oldest unprocessed chunk."""
meeting_id = "test-meeting"
oldest_time = 1000.0
current_time = 1001.5 # 1.5 seconds later
congestion_host.chunk_receipt_times[meeting_id] = deque([oldest_time, 1000.5, 1001.0])
congestion_host.pending_chunks[meeting_id] = 3
result = calculate_congestion_info(congestion_host, meeting_id, current_time)
expected_delay_ms = int(1.5 * 1000) # 1.5 seconds in milliseconds
assert result.processing_delay_ms == expected_delay_ms, f"delay should be {expected_delay_ms}ms"
def test_queue_depth_frompending_chunks(self, congestion_host: MagicMock) -> None:
"""Verify queue depth reflects pending chunk count."""
meeting_id = "test-meeting"
pending_count = 15
congestion_host.pending_chunks[meeting_id] = pending_count
result = calculate_congestion_info(congestion_host, meeting_id, 1000.0)
assert result.queue_depth == pending_count, f"queue depth should be {pending_count}"
def test_no_throttle_at_zero_load(self, congestion_host: MagicMock) -> None:
"""Verify no throttle at zero load."""
meeting_id = "test-meeting"
congestion_host.pending_chunks[meeting_id] = 0
result = calculate_congestion_info(congestion_host, meeting_id, 1000.0)
assert result.throttle_recommended is False, "throttle not recommended at zero load"
def test_no_throttle_at_moderate_load(self, congestion_host: MagicMock) -> None:
"""Verify no throttle at moderate load (below thresholds)."""
meeting_id = "test-meeting"
current_time = 1000.0
delay_seconds = 0.5 # 500ms delay
congestion_host.chunk_receipt_times[meeting_id] = deque([current_time - delay_seconds])
congestion_host.pending_chunks[meeting_id] = 5
result = calculate_congestion_info(congestion_host, meeting_id, current_time)
assert result.throttle_recommended is False, "throttle not recommended at moderate load"
def test_throttle_above_delay_threshold(self, congestion_host: MagicMock) -> None:
"""Verify throttle recommended when delay exceeds threshold."""
meeting_id = "test-meeting"
current_time = 1000.0
# Threshold is 1000ms, so use 1100ms to be clearly above
delay_seconds = 1.1
congestion_host.chunk_receipt_times[meeting_id] = deque([current_time - delay_seconds])
congestion_host.pending_chunks[meeting_id] = 0
result = calculate_congestion_info(congestion_host, meeting_id, current_time)
assert result.throttle_recommended is True, "throttle recommended above delay threshold"
def test_throttle_above_queue_threshold(self, congestion_host: MagicMock) -> None:
"""Verify throttle recommended when queue depth exceeds threshold."""
meeting_id = "test-meeting"
congestion_host.pending_chunks[meeting_id] = QUEUE_DEPTH_THRESHOLD + 1
result = calculate_congestion_info(congestion_host, meeting_id, 1000.0)
assert result.throttle_recommended is True, "throttle recommended above queue threshold"
def test_no_throttle_at_exact_delay_threshold(self, congestion_host: MagicMock) -> None:
"""Verify no throttle at exactly the delay threshold boundary."""
meeting_id = "test-meeting"
current_time = 1000.0
# Exactly at threshold (1000ms = 1.0 seconds)
delay_seconds = PROCESSING_DELAY_THRESHOLD_MS / 1000.0
congestion_host.chunk_receipt_times[meeting_id] = deque([current_time - delay_seconds])
congestion_host.pending_chunks[meeting_id] = 0
result = calculate_congestion_info(congestion_host, meeting_id, current_time)
assert result.throttle_recommended is False, "throttle not recommended at exact threshold"
def test_no_throttle_at_exact_queue_threshold(self, congestion_host: MagicMock) -> None:
"""Verify no throttle at exactly the queue depth threshold."""
meeting_id = "test-meeting"
congestion_host.pending_chunks[meeting_id] = QUEUE_DEPTH_THRESHOLD
result = calculate_congestion_info(congestion_host, meeting_id, 1000.0)
assert result.throttle_recommended is False, "throttle not recommended at exact queue threshold"
def test_throttle_when_both_thresholds_exceeded(self, congestion_host: MagicMock) -> None:
"""Verify throttle when both delay and queue thresholds are exceeded."""
meeting_id = "test-meeting"
current_time = 1000.0
delay_seconds = 1.5 # 1500ms > 1000ms threshold
congestion_host.chunk_receipt_times[meeting_id] = deque([current_time - delay_seconds])
congestion_host.pending_chunks[meeting_id] = QUEUE_DEPTH_THRESHOLD + 5
result = calculate_congestion_info(congestion_host, meeting_id, current_time)
expected_delay_ms = int(delay_seconds * 1000) # Convert to milliseconds
assert result.throttle_recommended is True, "throttle recommended when both thresholds exceeded"
assert result.processing_delay_ms == expected_delay_ms, f"delay should be {expected_delay_ms}ms"
assert result.queue_depth == QUEUE_DEPTH_THRESHOLD + 5, "queue depth should match"
def test_multiple_meetings_independent_congestion(self, congestion_host: MagicMock) -> None:
"""Verify congestion tracking is independent per meeting."""
meeting_congested = "meeting-congested"
meeting_healthy = "meeting-healthy"
current_time = 1000.0
# Set up congested meeting
congestion_host.chunk_receipt_times[meeting_congested] = deque([current_time - 2.0])
congestion_host.pending_chunks[meeting_congested] = 30
# Set up healthy meeting
congestion_host.chunk_receipt_times[meeting_healthy] = deque([current_time - 0.1])
congestion_host.pending_chunks[meeting_healthy] = 2
result_congested = calculate_congestion_info(
congestion_host, meeting_congested, current_time
)
result_healthy = calculate_congestion_info(
congestion_host, meeting_healthy, current_time
)
assert result_congested.throttle_recommended is True, "congested meeting should throttle"
assert result_healthy.throttle_recommended is False, "healthy meeting should not throttle"
class TestDecrementPendingChunks:
"""Tests for decrement_pending_chunks function."""
def test_does_nothing_when_no_tracking(self, congestion_host: MagicMock) -> None:
"""Verify no error when pending chunks not initialized for meeting."""
meeting_id = "unknown-meeting"
# Should not raise
decrement_pending_chunks(congestion_host, meeting_id)
def test_decrements_pending_count(self, congestion_host: MagicMock) -> None:
"""Verify decrement reduces pending chunk count by 1."""
meeting_id = "test-meeting"
initial_pending = 15
congestion_host.pending_chunks[meeting_id] = initial_pending
congestion_host.chunk_receipt_times[meeting_id] = deque(
[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0]
)
decrement_pending_chunks(congestion_host, meeting_id)
expected_remaining = initial_pending - 1
assert congestion_host.pending_chunks[meeting_id] == expected_remaining, "should decrement by 1"
def test_clamps_to_zero(self, congestion_host: MagicMock) -> None:
"""Verify pending chunks don't go negative."""
meeting_id = "test-meeting"
congestion_host.pending_chunks[meeting_id] = 0 # Already at zero
congestion_host.chunk_receipt_times[meeting_id] = deque([])
decrement_pending_chunks(congestion_host, meeting_id)
assert congestion_host.pending_chunks[meeting_id] == 0, "should clamp to 0"
def test_clears_receipt_times_on_decrement(self, congestion_host: MagicMock) -> None:
"""Verify oldest receipt time is cleared when processing completes."""
meeting_id = "test-meeting"
congestion_host.pending_chunks[meeting_id] = 8
congestion_host.chunk_receipt_times[meeting_id] = deque(
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]
)
decrement_pending_chunks(congestion_host, meeting_id)
remaining = len(congestion_host.chunk_receipt_times[meeting_id])
# 8 - 1 = 7 remaining (one popleft per call)
assert remaining == 7, "should have 7 receipt times remaining after decrementing 1"