Files
noteflow/tests/stress/test_segmenter_fuzz.py
Travis Vasceannie a1fc7edeea Enhance recovery and summarization services with asset path management
- Added `asset_path` to the `Meeting` entity for audio asset storage.
- Implemented `AudioValidationResult` for audio integrity checks during recovery.
- Updated `RecoveryService` to validate audio file integrity for crashed meetings.
- Enhanced `SummarizationService` to include consent persistence callbacks.
- Introduced new database migrations for `diarization_jobs` and `user_preferences` tables.
- Refactored various components to support the new asset path and audio validation features.
- Improved documentation in `CLAUDE.md` to reflect changes in recovery and summarization functionalities.
2025-12-19 10:40:21 +00:00

537 lines
18 KiB
Python

"""Fuzz tests for Segmenter state machine.
Tests edge cases with rapid VAD transitions and random input sequences.
Verifies invariants hold under stress conditions.
"""
from __future__ import annotations
import random
import numpy as np
import pytest
from numpy.typing import NDArray
from noteflow.infrastructure.asr.segmenter import (
AudioSegment,
Segmenter,
SegmenterConfig,
SegmenterState,
)
def make_audio(duration: float, sample_rate: int = 16000) -> NDArray[np.float32]:
"""Create test audio of specified duration with random values."""
samples = int(duration * sample_rate)
return np.random.uniform(-1.0, 1.0, samples).astype(np.float32)
def make_silence(duration: float, sample_rate: int = 16000) -> NDArray[np.float32]:
"""Create silent audio of specified duration."""
samples = int(duration * sample_rate)
return np.zeros(samples, dtype=np.float32)
class TestSegmenterInvariants:
"""Verify segmenter invariants hold under various inputs."""
@pytest.mark.stress
@pytest.mark.parametrize("sample_rate", [16000, 44100, 48000])
def test_segment_duration_positive(self, sample_rate: int) -> None:
"""All emitted segments have positive duration."""
config = SegmenterConfig(
sample_rate=sample_rate,
min_speech_duration=0.0,
trailing_silence=0.1,
)
segmenter = Segmenter(config=config)
random.seed(42)
segments: list[AudioSegment] = []
for _ in range(100):
audio = make_audio(0.1, sample_rate)
is_speech = random.random() > 0.5
segments.extend(segmenter.process_audio(audio, is_speech))
if final := segmenter.flush():
segments.append(final)
for seg in segments:
assert seg.duration > 0, f"Segment duration must be positive: {seg.duration}"
assert seg.end_time > seg.start_time
@pytest.mark.stress
def test_segment_audio_length_matches_duration(self) -> None:
"""Segment audio length matches (end_time - start_time) * sample_rate."""
sample_rate = 16000
config = SegmenterConfig(
sample_rate=sample_rate,
min_speech_duration=0.0,
trailing_silence=0.1,
leading_buffer=0.0,
)
segmenter = Segmenter(config=config)
speech = make_audio(0.5, sample_rate)
silence = make_silence(0.2, sample_rate)
list(segmenter.process_audio(speech, is_speech=True))
segments = list(segmenter.process_audio(silence, is_speech=False))
for seg in segments:
expected_samples = int(seg.duration * sample_rate)
actual_samples = len(seg.audio)
assert abs(actual_samples - expected_samples) <= 1, (
f"Audio length {actual_samples} != expected {expected_samples}"
)
@pytest.mark.stress
def test_segments_strictly_sequential(self) -> None:
"""Emitted segments have non-overlapping, sequential time ranges."""
config = SegmenterConfig(
sample_rate=16000,
min_speech_duration=0.0,
trailing_silence=0.05,
)
segmenter = Segmenter(config=config)
random.seed(123)
segments: list[AudioSegment] = []
for _ in range(50):
audio = make_audio(0.05)
is_speech = random.random() > 0.3
segments.extend(segmenter.process_audio(audio, is_speech))
if final := segmenter.flush():
segments.append(final)
for i in range(1, len(segments)):
prev_end = segments[i - 1].end_time
curr_start = segments[i].start_time
assert curr_start >= prev_end, (
f"Segment overlap: prev_end={prev_end}, curr_start={curr_start}"
)
@pytest.mark.stress
def test_all_segments_have_audio(self) -> None:
"""All emitted segments contain audio data."""
config = SegmenterConfig(
sample_rate=16000,
min_speech_duration=0.0,
trailing_silence=0.1,
)
segmenter = Segmenter(config=config)
random.seed(456)
segments: list[AudioSegment] = []
for _ in range(100):
audio = make_audio(0.05)
is_speech = random.random() > 0.4
segments.extend(segmenter.process_audio(audio, is_speech))
if final := segmenter.flush():
segments.append(final)
for seg in segments:
assert len(seg.audio) > 0, "Segment must contain audio data"
class TestRapidVadTransitions:
"""Test rapid VAD state transitions (chattering)."""
@pytest.mark.stress
def test_rapid_speech_silence_alternation(self) -> None:
"""Rapid alternation between speech and silence doesn't crash."""
config = SegmenterConfig(
sample_rate=16000,
min_speech_duration=0.0,
trailing_silence=0.05,
)
segmenter = Segmenter(config=config)
for i in range(1000):
audio = make_silence(0.01) # 10ms at 16kHz
is_speech = i % 2 == 0
list(segmenter.process_audio(audio, is_speech))
assert segmenter.state in (
SegmenterState.IDLE,
SegmenterState.SPEECH,
SegmenterState.TRAILING,
)
@pytest.mark.stress
def test_single_sample_chunks(self) -> None:
"""Processing single-sample chunks doesn't crash or produce invalid segments."""
config = SegmenterConfig(
sample_rate=16000,
min_speech_duration=0.0,
trailing_silence=0.01,
)
segmenter = Segmenter(config=config)
random.seed(789)
segments: list[AudioSegment] = []
for i in range(1000):
audio = np.array([random.uniform(-1, 1)], dtype=np.float32)
is_speech = i % 10 < 5
segments.extend(segmenter.process_audio(audio, is_speech))
for seg in segments:
assert seg.duration >= 0
assert len(seg.audio) > 0
@pytest.mark.stress
def test_very_short_speech_bursts(self) -> None:
"""Very short speech bursts are handled correctly."""
config = SegmenterConfig(
sample_rate=16000,
min_speech_duration=0.0,
trailing_silence=0.02,
)
segmenter = Segmenter(config=config)
segments: list[AudioSegment] = []
for _ in range(100):
speech = make_audio(0.01)
silence = make_silence(0.05)
segments.extend(segmenter.process_audio(speech, is_speech=True))
segments.extend(segmenter.process_audio(silence, is_speech=False))
if final := segmenter.flush():
segments.append(final)
for seg in segments:
assert seg.duration > 0
assert seg.end_time > seg.start_time
class TestEdgeCaseConfigurations:
"""Test edge case segmenter configurations."""
@pytest.mark.stress
@pytest.mark.parametrize("min_speech", [0.0, 0.001, 0.01, 0.1, 1.0])
def test_various_min_speech_durations(self, min_speech: float) -> None:
"""Various min_speech_duration values work correctly."""
config = SegmenterConfig(
sample_rate=16000,
min_speech_duration=min_speech,
trailing_silence=0.1,
)
segmenter = Segmenter(config=config)
speech = make_audio(1.0)
silence = make_silence(0.2)
segments_speech = list(segmenter.process_audio(speech, is_speech=True))
segments_silence = list(segmenter.process_audio(silence, is_speech=False))
all_segments = segments_speech + segments_silence
for seg in all_segments:
assert seg.duration > 0, f"Segment duration must be positive: {seg.duration}"
@pytest.mark.stress
def test_zero_trailing_silence(self) -> None:
"""Zero trailing_silence immediately emits on silence."""
config = SegmenterConfig(
sample_rate=16000,
min_speech_duration=0.0,
trailing_silence=0.0,
)
segmenter = Segmenter(config=config)
speech = make_audio(0.1)
silence = make_silence(0.01)
list(segmenter.process_audio(speech, is_speech=True))
segments = list(segmenter.process_audio(silence, is_speech=False))
assert len(segments) == 1
@pytest.mark.stress
def test_max_duration_forced_split(self) -> None:
"""Segments are force-split at max_segment_duration."""
config = SegmenterConfig(
sample_rate=16000,
max_segment_duration=0.5,
min_speech_duration=0.0,
)
segmenter = Segmenter(config=config)
segments: list[AudioSegment] = []
for _ in range(20):
audio = make_audio(0.1)
segments.extend(segmenter.process_audio(audio, is_speech=True))
assert len(segments) >= 3, f"Expected at least 3 splits, got {len(segments)}"
for seg in segments:
assert seg.duration <= config.max_segment_duration + 0.2
@pytest.mark.stress
def test_zero_leading_buffer(self) -> None:
"""Zero leading_buffer doesn't include pre-speech audio."""
config = SegmenterConfig(
sample_rate=16000,
min_speech_duration=0.0,
trailing_silence=0.1,
leading_buffer=0.0,
)
segmenter = Segmenter(config=config)
silence = make_silence(0.5)
speech = make_audio(0.3)
more_silence = make_silence(0.2)
list(segmenter.process_audio(silence, is_speech=False))
list(segmenter.process_audio(speech, is_speech=True))
segments = list(segmenter.process_audio(more_silence, is_speech=False))
assert len(segments) == 1
seg = segments[0]
expected_duration = 0.3 + 0.2
assert abs(seg.duration - expected_duration) < 0.05
@pytest.mark.stress
@pytest.mark.parametrize("leading_buffer", [0.0, 0.1, 0.2, 0.5, 1.0])
def test_various_leading_buffers(self, leading_buffer: float) -> None:
"""Various leading_buffer values work correctly."""
config = SegmenterConfig(
sample_rate=16000,
min_speech_duration=0.0,
trailing_silence=0.1,
leading_buffer=leading_buffer,
)
segmenter = Segmenter(config=config)
silence = make_silence(0.5)
speech = make_audio(0.3)
more_silence = make_silence(0.2)
list(segmenter.process_audio(silence, is_speech=False))
list(segmenter.process_audio(speech, is_speech=True))
if segments := list(segmenter.process_audio(more_silence, is_speech=False)):
seg = segments[0]
assert seg.duration > 0
class TestStateTransitions:
"""Test specific state transition scenarios."""
@pytest.mark.stress
def test_idle_to_speech_to_idle(self) -> None:
"""IDLE -> SPEECH -> IDLE transition."""
config = SegmenterConfig(
sample_rate=16000,
min_speech_duration=0.0,
trailing_silence=0.1,
)
segmenter = Segmenter(config=config)
assert segmenter.state == SegmenterState.IDLE
speech = make_audio(0.2)
list(segmenter.process_audio(speech, is_speech=True))
assert segmenter.state == SegmenterState.SPEECH
silence = make_silence(0.2)
list(segmenter.process_audio(silence, is_speech=False))
assert segmenter.state == SegmenterState.IDLE
@pytest.mark.stress
def test_trailing_back_to_speech(self) -> None:
"""TRAILING -> SPEECH transition when speech resumes."""
config = SegmenterConfig(
sample_rate=16000,
min_speech_duration=0.0,
trailing_silence=0.5,
)
segmenter = Segmenter(config=config)
speech = make_audio(0.2)
list(segmenter.process_audio(speech, is_speech=True))
short_silence = make_silence(0.1)
list(segmenter.process_audio(short_silence, is_speech=False))
assert segmenter.state == SegmenterState.TRAILING
more_speech = make_audio(0.2)
list(segmenter.process_audio(more_speech, is_speech=True))
assert segmenter.state == SegmenterState.SPEECH
@pytest.mark.stress
def test_flush_from_speech_state(self) -> None:
"""Flush from SPEECH state emits segment."""
config = SegmenterConfig(
sample_rate=16000,
min_speech_duration=0.0,
)
segmenter = Segmenter(config=config)
speech = make_audio(0.3)
list(segmenter.process_audio(speech, is_speech=True))
assert segmenter.state == SegmenterState.SPEECH
segment = segmenter.flush()
assert segment is not None
assert segment.duration > 0
assert segmenter.state == SegmenterState.IDLE
@pytest.mark.stress
def test_flush_from_trailing_state(self) -> None:
"""Flush from TRAILING state emits segment."""
config = SegmenterConfig(
sample_rate=16000,
min_speech_duration=0.0,
trailing_silence=1.0,
)
segmenter = Segmenter(config=config)
speech = make_audio(0.3)
list(segmenter.process_audio(speech, is_speech=True))
silence = make_silence(0.1)
list(segmenter.process_audio(silence, is_speech=False))
assert segmenter.state == SegmenterState.TRAILING
segment = segmenter.flush()
assert segment is not None
assert segment.duration > 0
assert segmenter.state == SegmenterState.IDLE
@pytest.mark.stress
def test_flush_from_idle_returns_none(self) -> None:
"""Flush from IDLE state returns None."""
config = SegmenterConfig(sample_rate=16000)
segmenter = Segmenter(config=config)
assert segmenter.state == SegmenterState.IDLE
segment = segmenter.flush()
assert segment is None
class TestFuzzRandomPatterns:
"""Fuzz testing with random VAD patterns."""
@pytest.mark.stress
@pytest.mark.slow
def test_random_vad_patterns_1000_iterations(self) -> None:
"""Run 1000 random VAD pattern iterations."""
for seed in range(1000):
random.seed(seed)
np.random.seed(seed)
config = SegmenterConfig(
sample_rate=16000,
min_speech_duration=random.uniform(0, 0.5),
max_segment_duration=random.uniform(1, 10),
trailing_silence=random.uniform(0.05, 0.5),
leading_buffer=random.uniform(0, 0.3),
)
segmenter = Segmenter(config=config)
segments: list[AudioSegment] = []
for _ in range(random.randint(10, 100)):
duration = random.uniform(0.01, 0.5)
audio = make_audio(duration)
is_speech = random.random() > 0.4
segments.extend(segmenter.process_audio(audio, is_speech))
if final := segmenter.flush():
segments.append(final)
for seg in segments:
assert seg.duration > 0, f"Seed {seed}: duration must be positive"
assert seg.end_time > seg.start_time, f"Seed {seed}: end > start"
assert len(seg.audio) > 0, f"Seed {seed}: audio must exist"
@pytest.mark.stress
def test_deterministic_with_same_seed(self) -> None:
"""Same random seed produces same segments."""
def run_with_seed(seed: int) -> list[tuple[float, float]]:
random.seed(seed)
np.random.seed(seed)
config = SegmenterConfig(
sample_rate=16000,
min_speech_duration=0.0,
trailing_silence=0.1,
)
segmenter = Segmenter(config=config)
segments: list[AudioSegment] = []
for _ in range(50):
duration = random.uniform(0.05, 0.2)
audio = make_audio(duration)
is_speech = random.random() > 0.5
segments.extend(segmenter.process_audio(audio, is_speech))
if final := segmenter.flush():
segments.append(final)
return [(s.start_time, s.end_time) for s in segments]
result1 = run_with_seed(999)
result2 = run_with_seed(999)
assert result1 == result2
class TestResetBehavior:
"""Test reset functionality."""
@pytest.mark.stress
def test_reset_clears_all_state(self) -> None:
"""Reset clears all internal state."""
config = SegmenterConfig(
sample_rate=16000,
min_speech_duration=0.0,
trailing_silence=0.5,
)
segmenter = Segmenter(config=config)
speech = make_audio(0.5)
list(segmenter.process_audio(speech, is_speech=True))
silence = make_silence(0.1)
list(segmenter.process_audio(silence, is_speech=False))
segmenter.reset()
assert segmenter.state == SegmenterState.IDLE
@pytest.mark.stress
def test_reset_allows_fresh_processing(self) -> None:
"""After reset, segmenter works from fresh state."""
config = SegmenterConfig(
sample_rate=16000,
min_speech_duration=0.0,
trailing_silence=0.1,
)
segmenter = Segmenter(config=config)
speech1 = make_audio(0.3)
list(segmenter.process_audio(speech1, is_speech=True))
silence1 = make_silence(0.2)
segments1 = list(segmenter.process_audio(silence1, is_speech=False))
segmenter.reset()
speech2 = make_audio(0.3)
list(segmenter.process_audio(speech2, is_speech=True))
silence2 = make_silence(0.2)
segments2 = list(segmenter.process_audio(silence2, is_speech=False))
assert len(segments1) == len(segments2) == 1
assert segments2[0].start_time == 0.0