- Moved all hookify configuration files from `.claude/` to `.claude/hooks/` subdirectory for better organization - Added four new blocking hooks to prevent common error handling anti-patterns: - `block-broad-exception-handler`: Prevents catching generic `Exception` with only logging - `block-datetime-now-fallback`: Blocks returning `datetime.now()` as fallback on parse failures to prevent data corruption - `block-default
222 lines
9.8 KiB
Python
222 lines
9.8 KiB
Python
"""Tests for congestion tracking and backpressure signaling (Phase 3).
|
|
|
|
Tests the congestion calculation and pending chunk management for
|
|
streaming transcription backpressure.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from collections import deque
|
|
from unittest.mock import MagicMock
|
|
|
|
import pytest
|
|
|
|
from noteflow.grpc.mixins.streaming._processing._congestion import (
|
|
calculate_congestion_info,
|
|
decrement_pending_chunks,
|
|
)
|
|
from noteflow.grpc.mixins.streaming._processing._constants import (
|
|
PROCESSING_DELAY_THRESHOLD_MS,
|
|
QUEUE_DEPTH_THRESHOLD,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def congestion_host() -> MagicMock:
|
|
"""Create mock host with congestion tracking initialized."""
|
|
host = MagicMock()
|
|
host.chunk_receipt_times = {}
|
|
host.pending_chunks = {}
|
|
return host
|
|
|
|
|
|
class TestCalculateCongestionInfo:
|
|
"""Tests for calculate_congestion_info function."""
|
|
|
|
def test_returns_zero_delay_when_no_receipts(self, congestion_host: MagicMock) -> None:
|
|
"""Verify zero processing delay when no receipt times tracked."""
|
|
meeting_id = "test-meeting"
|
|
|
|
result = calculate_congestion_info(congestion_host, meeting_id, 1000.0)
|
|
|
|
assert result.processing_delay_ms == 0, "delay should be 0 with no receipts"
|
|
assert result.queue_depth == 0, "queue depth should be 0"
|
|
assert result.throttle_recommended is False, "throttle not recommended"
|
|
|
|
def test_calculates_processing_delay_from_oldest_receipt(
|
|
self, congestion_host: MagicMock
|
|
) -> None:
|
|
"""Verify processing delay is calculated from oldest unprocessed chunk."""
|
|
meeting_id = "test-meeting"
|
|
oldest_time = 1000.0
|
|
current_time = 1001.5 # 1.5 seconds later
|
|
congestion_host.chunk_receipt_times[meeting_id] = deque([oldest_time, 1000.5, 1001.0])
|
|
congestion_host.pending_chunks[meeting_id] = 3
|
|
|
|
result = calculate_congestion_info(congestion_host, meeting_id, current_time)
|
|
|
|
expected_delay_ms = int(1.5 * 1000) # 1.5 seconds in milliseconds
|
|
assert result.processing_delay_ms == expected_delay_ms, f"delay should be {expected_delay_ms}ms"
|
|
|
|
def test_queue_depth_frompending_chunks(self, congestion_host: MagicMock) -> None:
|
|
"""Verify queue depth reflects pending chunk count."""
|
|
meeting_id = "test-meeting"
|
|
pending_count = 15
|
|
congestion_host.pending_chunks[meeting_id] = pending_count
|
|
|
|
result = calculate_congestion_info(congestion_host, meeting_id, 1000.0)
|
|
|
|
assert result.queue_depth == pending_count, f"queue depth should be {pending_count}"
|
|
|
|
def test_no_throttle_at_zero_load(self, congestion_host: MagicMock) -> None:
|
|
"""Verify no throttle at zero load."""
|
|
meeting_id = "test-meeting"
|
|
congestion_host.pending_chunks[meeting_id] = 0
|
|
|
|
result = calculate_congestion_info(congestion_host, meeting_id, 1000.0)
|
|
|
|
assert result.throttle_recommended is False, "throttle not recommended at zero load"
|
|
|
|
def test_no_throttle_at_moderate_load(self, congestion_host: MagicMock) -> None:
|
|
"""Verify no throttle at moderate load (below thresholds)."""
|
|
meeting_id = "test-meeting"
|
|
current_time = 1000.0
|
|
delay_seconds = 0.5 # 500ms delay
|
|
congestion_host.chunk_receipt_times[meeting_id] = deque([current_time - delay_seconds])
|
|
congestion_host.pending_chunks[meeting_id] = 5
|
|
|
|
result = calculate_congestion_info(congestion_host, meeting_id, current_time)
|
|
|
|
assert result.throttle_recommended is False, "throttle not recommended at moderate load"
|
|
|
|
def test_throttle_above_delay_threshold(self, congestion_host: MagicMock) -> None:
|
|
"""Verify throttle recommended when delay exceeds threshold."""
|
|
meeting_id = "test-meeting"
|
|
current_time = 1000.0
|
|
# Threshold is 1000ms, so use 1100ms to be clearly above
|
|
delay_seconds = 1.1
|
|
congestion_host.chunk_receipt_times[meeting_id] = deque([current_time - delay_seconds])
|
|
congestion_host.pending_chunks[meeting_id] = 0
|
|
|
|
result = calculate_congestion_info(congestion_host, meeting_id, current_time)
|
|
|
|
assert result.throttle_recommended is True, "throttle recommended above delay threshold"
|
|
|
|
def test_throttle_above_queue_threshold(self, congestion_host: MagicMock) -> None:
|
|
"""Verify throttle recommended when queue depth exceeds threshold."""
|
|
meeting_id = "test-meeting"
|
|
congestion_host.pending_chunks[meeting_id] = QUEUE_DEPTH_THRESHOLD + 1
|
|
|
|
result = calculate_congestion_info(congestion_host, meeting_id, 1000.0)
|
|
|
|
assert result.throttle_recommended is True, "throttle recommended above queue threshold"
|
|
|
|
def test_no_throttle_at_exact_delay_threshold(self, congestion_host: MagicMock) -> None:
|
|
"""Verify no throttle at exactly the delay threshold boundary."""
|
|
meeting_id = "test-meeting"
|
|
current_time = 1000.0
|
|
# Exactly at threshold (1000ms = 1.0 seconds)
|
|
delay_seconds = PROCESSING_DELAY_THRESHOLD_MS / 1000.0
|
|
congestion_host.chunk_receipt_times[meeting_id] = deque([current_time - delay_seconds])
|
|
congestion_host.pending_chunks[meeting_id] = 0
|
|
|
|
result = calculate_congestion_info(congestion_host, meeting_id, current_time)
|
|
|
|
assert result.throttle_recommended is False, "throttle not recommended at exact threshold"
|
|
|
|
def test_no_throttle_at_exact_queue_threshold(self, congestion_host: MagicMock) -> None:
|
|
"""Verify no throttle at exactly the queue depth threshold."""
|
|
meeting_id = "test-meeting"
|
|
congestion_host.pending_chunks[meeting_id] = QUEUE_DEPTH_THRESHOLD
|
|
|
|
result = calculate_congestion_info(congestion_host, meeting_id, 1000.0)
|
|
|
|
assert result.throttle_recommended is False, "throttle not recommended at exact queue threshold"
|
|
|
|
def test_throttle_when_both_thresholds_exceeded(self, congestion_host: MagicMock) -> None:
|
|
"""Verify throttle when both delay and queue thresholds are exceeded."""
|
|
meeting_id = "test-meeting"
|
|
current_time = 1000.0
|
|
delay_seconds = 1.5 # 1500ms > 1000ms threshold
|
|
congestion_host.chunk_receipt_times[meeting_id] = deque([current_time - delay_seconds])
|
|
congestion_host.pending_chunks[meeting_id] = QUEUE_DEPTH_THRESHOLD + 5
|
|
|
|
result = calculate_congestion_info(congestion_host, meeting_id, current_time)
|
|
|
|
expected_delay_ms = int(delay_seconds * 1000) # Convert to milliseconds
|
|
assert result.throttle_recommended is True, "throttle recommended when both thresholds exceeded"
|
|
assert result.processing_delay_ms == expected_delay_ms, f"delay should be {expected_delay_ms}ms"
|
|
assert result.queue_depth == QUEUE_DEPTH_THRESHOLD + 5, "queue depth should match"
|
|
|
|
def test_multiple_meetings_independent_congestion(self, congestion_host: MagicMock) -> None:
|
|
"""Verify congestion tracking is independent per meeting."""
|
|
meeting_congested = "meeting-congested"
|
|
meeting_healthy = "meeting-healthy"
|
|
current_time = 1000.0
|
|
|
|
# Set up congested meeting
|
|
congestion_host.chunk_receipt_times[meeting_congested] = deque([current_time - 2.0])
|
|
congestion_host.pending_chunks[meeting_congested] = 30
|
|
|
|
# Set up healthy meeting
|
|
congestion_host.chunk_receipt_times[meeting_healthy] = deque([current_time - 0.1])
|
|
congestion_host.pending_chunks[meeting_healthy] = 2
|
|
|
|
result_congested = calculate_congestion_info(
|
|
congestion_host, meeting_congested, current_time
|
|
)
|
|
result_healthy = calculate_congestion_info(
|
|
congestion_host, meeting_healthy, current_time
|
|
)
|
|
|
|
assert result_congested.throttle_recommended is True, "congested meeting should throttle"
|
|
assert result_healthy.throttle_recommended is False, "healthy meeting should not throttle"
|
|
|
|
|
|
class TestDecrementPendingChunks:
|
|
"""Tests for decrement_pending_chunks function."""
|
|
|
|
def test_does_nothing_when_no_tracking(self, congestion_host: MagicMock) -> None:
|
|
"""Verify no error when pending chunks not initialized for meeting."""
|
|
meeting_id = "unknown-meeting"
|
|
# Should not raise
|
|
decrement_pending_chunks(congestion_host, meeting_id)
|
|
|
|
def test_decrements_pending_count(self, congestion_host: MagicMock) -> None:
|
|
"""Verify decrement reduces pending chunk count by 1."""
|
|
meeting_id = "test-meeting"
|
|
initial_pending = 15
|
|
congestion_host.pending_chunks[meeting_id] = initial_pending
|
|
congestion_host.chunk_receipt_times[meeting_id] = deque(
|
|
[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0]
|
|
)
|
|
|
|
decrement_pending_chunks(congestion_host, meeting_id)
|
|
|
|
expected_remaining = initial_pending - 1
|
|
assert congestion_host.pending_chunks[meeting_id] == expected_remaining, "should decrement by 1"
|
|
|
|
def test_clamps_to_zero(self, congestion_host: MagicMock) -> None:
|
|
"""Verify pending chunks don't go negative."""
|
|
meeting_id = "test-meeting"
|
|
congestion_host.pending_chunks[meeting_id] = 0 # Already at zero
|
|
congestion_host.chunk_receipt_times[meeting_id] = deque([])
|
|
|
|
decrement_pending_chunks(congestion_host, meeting_id)
|
|
|
|
assert congestion_host.pending_chunks[meeting_id] == 0, "should clamp to 0"
|
|
|
|
def test_clears_receipt_times_on_decrement(self, congestion_host: MagicMock) -> None:
|
|
"""Verify oldest receipt time is cleared when processing completes."""
|
|
meeting_id = "test-meeting"
|
|
congestion_host.pending_chunks[meeting_id] = 8
|
|
congestion_host.chunk_receipt_times[meeting_id] = deque(
|
|
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]
|
|
)
|
|
|
|
decrement_pending_chunks(congestion_host, meeting_id)
|
|
|
|
remaining = len(congestion_host.chunk_receipt_times[meeting_id])
|
|
# 8 - 1 = 7 remaining (one popleft per call)
|
|
assert remaining == 7, "should have 7 receipt times remaining after decrementing 1"
|