- Deleted .env.example file as it is no longer needed. - Added .gitignore to manage ignored files and directories. - Introduced CLAUDE.md for AI provider integration documentation. - Created dev.sh for development setup and scripts. - Updated Dockerfile and Dockerfile.production for improved build processes. - Added multiple test files and directories for comprehensive testing. - Introduced new utility and service files for enhanced functionality. - Organized codebase with new directories and files for better maintainability.
857 lines
30 KiB
Python
857 lines
30 KiB
Python
"""
|
|
Audio sample generation and management for NeMo speaker diarization testing.
|
|
|
|
Provides realistic audio samples, test scenarios, and data fixtures
|
|
for comprehensive testing of the NVIDIA NeMo speaker diarization system.
|
|
"""
|
|
|
|
import json
|
|
import tempfile
|
|
import wave
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Tuple
|
|
|
|
import numpy as np
|
|
import torch
|
|
|
|
|
|
@dataclass
|
|
class AudioScenario:
|
|
"""Represents a specific audio testing scenario."""
|
|
|
|
name: str
|
|
description: str
|
|
duration: float
|
|
num_speakers: int
|
|
characteristics: Dict[str, Any]
|
|
expected_segments: List[Dict[str, Any]]
|
|
|
|
|
|
class AudioSampleGenerator:
|
|
"""Generates various types of audio samples for testing."""
|
|
|
|
def __init__(self, sample_rate: int = 16000):
|
|
self.sample_rate = sample_rate
|
|
self.scenarios = self._create_test_scenarios()
|
|
|
|
def _create_test_scenarios(self) -> Dict[str, AudioScenario]:
|
|
"""Create predefined test scenarios."""
|
|
scenarios = {}
|
|
|
|
# Basic scenarios
|
|
scenarios["single_speaker"] = AudioScenario(
|
|
name="single_speaker",
|
|
description="Single speaker talking continuously",
|
|
duration=10.0,
|
|
num_speakers=1,
|
|
characteristics={"noise_level": 0.05, "speech_activity": 0.8},
|
|
expected_segments=[
|
|
{
|
|
"start_time": 0.0,
|
|
"end_time": 10.0,
|
|
"speaker_label": "SPEAKER_01",
|
|
"confidence": 0.95,
|
|
}
|
|
],
|
|
)
|
|
|
|
scenarios["two_speakers_alternating"] = AudioScenario(
|
|
name="two_speakers_alternating",
|
|
description="Two speakers taking turns",
|
|
duration=20.0,
|
|
num_speakers=2,
|
|
characteristics={"noise_level": 0.05, "turn_taking": True},
|
|
expected_segments=[
|
|
{
|
|
"start_time": 0.0,
|
|
"end_time": 5.0,
|
|
"speaker_label": "SPEAKER_01",
|
|
"confidence": 0.92,
|
|
},
|
|
{
|
|
"start_time": 5.5,
|
|
"end_time": 10.5,
|
|
"speaker_label": "SPEAKER_02",
|
|
"confidence": 0.90,
|
|
},
|
|
{
|
|
"start_time": 11.0,
|
|
"end_time": 15.0,
|
|
"speaker_label": "SPEAKER_01",
|
|
"confidence": 0.88,
|
|
},
|
|
{
|
|
"start_time": 15.5,
|
|
"end_time": 20.0,
|
|
"speaker_label": "SPEAKER_02",
|
|
"confidence": 0.85,
|
|
},
|
|
],
|
|
)
|
|
|
|
scenarios["overlapping_speakers"] = AudioScenario(
|
|
name="overlapping_speakers",
|
|
description="Speakers with overlapping speech",
|
|
duration=15.0,
|
|
num_speakers=2,
|
|
characteristics={"noise_level": 0.1, "overlap_ratio": 0.3},
|
|
expected_segments=[
|
|
{
|
|
"start_time": 0.0,
|
|
"end_time": 8.0,
|
|
"speaker_label": "SPEAKER_01",
|
|
"confidence": 0.85,
|
|
},
|
|
{
|
|
"start_time": 6.0,
|
|
"end_time": 15.0,
|
|
"speaker_label": "SPEAKER_02",
|
|
"confidence": 0.80,
|
|
},
|
|
],
|
|
)
|
|
|
|
scenarios["multi_speaker_meeting"] = AudioScenario(
|
|
name="multi_speaker_meeting",
|
|
description="4-speaker meeting with natural conversation flow",
|
|
duration=60.0,
|
|
num_speakers=4,
|
|
characteristics={"noise_level": 0.08, "meeting_style": True},
|
|
expected_segments=[
|
|
{
|
|
"start_time": 0.0,
|
|
"end_time": 15.0,
|
|
"speaker_label": "SPEAKER_01",
|
|
"confidence": 0.88,
|
|
},
|
|
{
|
|
"start_time": 15.5,
|
|
"end_time": 30.0,
|
|
"speaker_label": "SPEAKER_02",
|
|
"confidence": 0.85,
|
|
},
|
|
{
|
|
"start_time": 30.5,
|
|
"end_time": 45.0,
|
|
"speaker_label": "SPEAKER_03",
|
|
"confidence": 0.90,
|
|
},
|
|
{
|
|
"start_time": 45.5,
|
|
"end_time": 60.0,
|
|
"speaker_label": "SPEAKER_04",
|
|
"confidence": 0.87,
|
|
},
|
|
],
|
|
)
|
|
|
|
# Challenging scenarios
|
|
scenarios["noisy_environment"] = AudioScenario(
|
|
name="noisy_environment",
|
|
description="Speech with significant background noise",
|
|
duration=30.0,
|
|
num_speakers=2,
|
|
characteristics={"noise_level": 0.3, "background_type": "crowd"},
|
|
expected_segments=[
|
|
{
|
|
"start_time": 0.0,
|
|
"end_time": 15.0,
|
|
"speaker_label": "SPEAKER_01",
|
|
"confidence": 0.70,
|
|
},
|
|
{
|
|
"start_time": 15.5,
|
|
"end_time": 30.0,
|
|
"speaker_label": "SPEAKER_02",
|
|
"confidence": 0.65,
|
|
},
|
|
],
|
|
)
|
|
|
|
scenarios["whispered_speech"] = AudioScenario(
|
|
name="whispered_speech",
|
|
description="Low-amplitude whispered speech",
|
|
duration=20.0,
|
|
num_speakers=1,
|
|
characteristics={"amplitude": 0.3, "spectral_tilt": -6},
|
|
expected_segments=[
|
|
{
|
|
"start_time": 0.0,
|
|
"end_time": 20.0,
|
|
"speaker_label": "SPEAKER_01",
|
|
"confidence": 0.75,
|
|
}
|
|
],
|
|
)
|
|
|
|
scenarios["far_field_recording"] = AudioScenario(
|
|
name="far_field_recording",
|
|
description="Speakers recorded from distance with reverb",
|
|
duration=25.0,
|
|
num_speakers=3,
|
|
characteristics={"reverb_level": 0.4, "snr": 10},
|
|
expected_segments=[
|
|
{
|
|
"start_time": 0.0,
|
|
"end_time": 8.0,
|
|
"speaker_label": "SPEAKER_01",
|
|
"confidence": 0.78,
|
|
},
|
|
{
|
|
"start_time": 8.5,
|
|
"end_time": 16.5,
|
|
"speaker_label": "SPEAKER_02",
|
|
"confidence": 0.75,
|
|
},
|
|
{
|
|
"start_time": 17.0,
|
|
"end_time": 25.0,
|
|
"speaker_label": "SPEAKER_03",
|
|
"confidence": 0.80,
|
|
},
|
|
],
|
|
)
|
|
|
|
# Edge cases
|
|
scenarios["very_short_utterances"] = AudioScenario(
|
|
name="very_short_utterances",
|
|
description="Many very short speaker segments",
|
|
duration=10.0,
|
|
num_speakers=2,
|
|
characteristics={"min_segment_length": 0.5, "max_segment_length": 1.5},
|
|
expected_segments=[
|
|
{
|
|
"start_time": 0.0,
|
|
"end_time": 1.0,
|
|
"speaker_label": "SPEAKER_01",
|
|
"confidence": 0.80,
|
|
},
|
|
{
|
|
"start_time": 1.2,
|
|
"end_time": 2.0,
|
|
"speaker_label": "SPEAKER_02",
|
|
"confidence": 0.82,
|
|
},
|
|
{
|
|
"start_time": 2.2,
|
|
"end_time": 3.5,
|
|
"speaker_label": "SPEAKER_01",
|
|
"confidence": 0.78,
|
|
},
|
|
{
|
|
"start_time": 3.7,
|
|
"end_time": 4.5,
|
|
"speaker_label": "SPEAKER_02",
|
|
"confidence": 0.85,
|
|
},
|
|
],
|
|
)
|
|
|
|
scenarios["silence_heavy"] = AudioScenario(
|
|
name="silence_heavy",
|
|
description="Audio with long periods of silence",
|
|
duration=30.0,
|
|
num_speakers=2,
|
|
characteristics={"silence_ratio": 0.6, "speech_activity": 0.4},
|
|
expected_segments=[
|
|
{
|
|
"start_time": 2.0,
|
|
"end_time": 8.0,
|
|
"speaker_label": "SPEAKER_01",
|
|
"confidence": 0.90,
|
|
},
|
|
{
|
|
"start_time": 22.0,
|
|
"end_time": 28.0,
|
|
"speaker_label": "SPEAKER_02",
|
|
"confidence": 0.88,
|
|
},
|
|
],
|
|
)
|
|
|
|
return scenarios
|
|
|
|
def generate_scenario_audio(
|
|
self, scenario_name: str
|
|
) -> Tuple[torch.Tensor, AudioScenario]:
|
|
"""Generate audio for a specific scenario."""
|
|
scenario = self.scenarios[scenario_name]
|
|
audio_tensor = self._synthesize_audio_for_scenario(scenario)
|
|
return audio_tensor, scenario
|
|
|
|
def _synthesize_audio_for_scenario(self, scenario: AudioScenario) -> torch.Tensor:
|
|
"""Synthesize audio based on scenario specifications."""
|
|
samples = int(scenario.duration * self.sample_rate)
|
|
audio = torch.zeros(1, samples)
|
|
|
|
if scenario.name == "single_speaker":
|
|
audio = self._generate_single_speaker_audio(scenario)
|
|
elif scenario.name == "two_speakers_alternating":
|
|
audio = self._generate_alternating_speakers_audio(scenario)
|
|
elif scenario.name == "overlapping_speakers":
|
|
audio = self._generate_overlapping_speakers_audio(scenario)
|
|
elif scenario.name == "multi_speaker_meeting":
|
|
audio = self._generate_meeting_audio(scenario)
|
|
elif scenario.name == "noisy_environment":
|
|
audio = self._generate_noisy_audio(scenario)
|
|
elif scenario.name == "whispered_speech":
|
|
audio = self._generate_whispered_audio(scenario)
|
|
elif scenario.name == "far_field_recording":
|
|
audio = self._generate_far_field_audio(scenario)
|
|
elif scenario.name == "very_short_utterances":
|
|
audio = self._generate_short_utterances_audio(scenario)
|
|
elif scenario.name == "silence_heavy":
|
|
audio = self._generate_silence_heavy_audio(scenario)
|
|
else:
|
|
# Default generation
|
|
audio = self._generate_basic_multi_speaker_audio(scenario)
|
|
|
|
return audio
|
|
|
|
def _generate_single_speaker_audio(self, scenario: AudioScenario) -> torch.Tensor:
|
|
"""Generate single speaker audio."""
|
|
samples = int(scenario.duration * self.sample_rate)
|
|
t = torch.linspace(0, scenario.duration, samples)
|
|
|
|
# Generate speech-like signal
|
|
fundamental = 150 # Fundamental frequency
|
|
speech = torch.sin(2 * torch.pi * fundamental * t)
|
|
speech += 0.5 * torch.sin(2 * torch.pi * fundamental * 2.1 * t) # Harmonics
|
|
speech += 0.3 * torch.sin(2 * torch.pi * fundamental * 3.3 * t)
|
|
|
|
# Apply speech activity pattern
|
|
speech_activity = scenario.characteristics.get("speech_activity", 0.8)
|
|
activity_pattern = torch.rand(samples) < speech_activity
|
|
speech = speech * activity_pattern.float()
|
|
|
|
# Add noise
|
|
noise_level = scenario.characteristics.get("noise_level", 0.05)
|
|
noise = torch.randn(samples) * noise_level
|
|
|
|
return torch.unsqueeze(speech + noise, 0)
|
|
|
|
def _generate_alternating_speakers_audio(
|
|
self, scenario: AudioScenario
|
|
) -> torch.Tensor:
|
|
"""Generate alternating speakers audio."""
|
|
samples = int(scenario.duration * self.sample_rate)
|
|
audio = torch.zeros(samples)
|
|
|
|
for segment in scenario.expected_segments:
|
|
start_sample = int(segment["start_time"] * self.sample_rate)
|
|
end_sample = int(segment["end_time"] * self.sample_rate)
|
|
segment_samples = end_sample - start_sample
|
|
|
|
# Different voice characteristics for each speaker
|
|
speaker_id = int(segment["speaker_label"].split("_")[1]) - 1
|
|
fundamental = 150 + speaker_id * 50 # Different pitch
|
|
|
|
t = torch.linspace(
|
|
0, segment["end_time"] - segment["start_time"], segment_samples
|
|
)
|
|
speech = torch.sin(2 * torch.pi * fundamental * t)
|
|
speech += 0.4 * torch.sin(2 * torch.pi * fundamental * 2.2 * t)
|
|
|
|
audio[start_sample:end_sample] = speech
|
|
|
|
# Add noise
|
|
noise_level = scenario.characteristics.get("noise_level", 0.05)
|
|
noise = torch.randn(samples) * noise_level
|
|
|
|
return torch.unsqueeze(audio + noise, 0)
|
|
|
|
def _generate_overlapping_speakers_audio(
|
|
self, scenario: AudioScenario
|
|
) -> torch.Tensor:
|
|
"""Generate overlapping speakers audio."""
|
|
samples = int(scenario.duration * self.sample_rate)
|
|
audio = torch.zeros(samples)
|
|
|
|
for segment in scenario.expected_segments:
|
|
start_sample = int(segment["start_time"] * self.sample_rate)
|
|
end_sample = int(segment["end_time"] * self.sample_rate)
|
|
segment_samples = end_sample - start_sample
|
|
|
|
speaker_id = int(segment["speaker_label"].split("_")[1]) - 1
|
|
fundamental = 180 + speaker_id * 80 # More separated frequencies
|
|
|
|
t = torch.linspace(
|
|
0, segment["end_time"] - segment["start_time"], segment_samples
|
|
)
|
|
speech = torch.sin(2 * torch.pi * fundamental * t)
|
|
speech += 0.3 * torch.sin(2 * torch.pi * fundamental * 2.5 * t)
|
|
|
|
# Reduce amplitude when overlapping
|
|
amplitude = 0.7 if len(scenario.expected_segments) > 1 else 1.0
|
|
audio[start_sample:end_sample] += speech * amplitude
|
|
|
|
# Add noise
|
|
noise_level = scenario.characteristics.get("noise_level", 0.1)
|
|
noise = torch.randn(samples) * noise_level
|
|
|
|
return torch.unsqueeze(audio + noise, 0)
|
|
|
|
def _generate_meeting_audio(self, scenario: AudioScenario) -> torch.Tensor:
|
|
"""Generate meeting-style audio with multiple speakers."""
|
|
samples = int(scenario.duration * self.sample_rate)
|
|
audio = torch.zeros(samples)
|
|
|
|
# Generate more natural meeting flow
|
|
current_time = 0.0
|
|
speaker_rotation = 0
|
|
|
|
while current_time < scenario.duration:
|
|
# Random utterance length (2-8 seconds)
|
|
utterance_length = min(
|
|
np.random.uniform(2.0, 8.0), scenario.duration - current_time
|
|
)
|
|
|
|
start_sample = int(current_time * self.sample_rate)
|
|
end_sample = int((current_time + utterance_length) * self.sample_rate)
|
|
segment_samples = end_sample - start_sample
|
|
|
|
# Speaker characteristics
|
|
fundamental = 140 + speaker_rotation * 40
|
|
|
|
t = torch.linspace(0, utterance_length, segment_samples)
|
|
speech = torch.sin(2 * torch.pi * fundamental * t)
|
|
speech += 0.4 * torch.sin(2 * torch.pi * fundamental * 2.3 * t)
|
|
|
|
# Add some variation (pauses, emphasis)
|
|
variation = torch.sin(2 * torch.pi * 0.5 * t) * 0.3 + 1.0
|
|
speech = speech * variation
|
|
|
|
audio[start_sample:end_sample] = speech
|
|
|
|
current_time += utterance_length
|
|
# Add pause between speakers
|
|
current_time += np.random.uniform(0.5, 2.0)
|
|
|
|
# Rotate speakers
|
|
speaker_rotation = (speaker_rotation + 1) % scenario.num_speakers
|
|
|
|
# Add meeting room ambiance
|
|
noise_level = scenario.characteristics.get("noise_level", 0.08)
|
|
noise = torch.randn(samples) * noise_level
|
|
|
|
return torch.unsqueeze(audio + noise, 0)
|
|
|
|
def _generate_noisy_audio(self, scenario: AudioScenario) -> torch.Tensor:
|
|
"""Generate audio with significant background noise."""
|
|
# Start with basic two-speaker audio
|
|
audio = self._generate_alternating_speakers_audio(scenario)
|
|
|
|
# Add various types of noise
|
|
samples = audio.shape[1]
|
|
|
|
# Crowd noise simulation
|
|
crowd_noise = torch.randn(samples) * 0.2
|
|
# Add some periodic components (ventilation, etc.)
|
|
t = torch.linspace(0, scenario.duration, samples)
|
|
periodic_noise = 0.1 * torch.sin(2 * torch.pi * 60 * t) # 60 Hz hum
|
|
periodic_noise += 0.05 * torch.sin(
|
|
2 * torch.pi * 17 * t
|
|
) # Random periodic component
|
|
|
|
total_noise = crowd_noise + periodic_noise
|
|
|
|
# Scale according to noise level
|
|
noise_level = scenario.characteristics.get("noise_level", 0.3)
|
|
total_noise = total_noise * noise_level
|
|
|
|
return audio + total_noise
|
|
|
|
def _generate_whispered_audio(self, scenario: AudioScenario) -> torch.Tensor:
|
|
"""Generate whispered speech audio."""
|
|
samples = int(scenario.duration * self.sample_rate)
|
|
t = torch.linspace(0, scenario.duration, samples)
|
|
|
|
# Whispered speech has more noise-like characteristics
|
|
fundamental = 120 # Lower fundamental
|
|
speech = torch.randn(samples) * 0.5 # More noise component
|
|
speech += 0.3 * torch.sin(2 * torch.pi * fundamental * t)
|
|
speech += 0.2 * torch.sin(2 * torch.pi * fundamental * 2.1 * t)
|
|
|
|
# Lower amplitude
|
|
amplitude = scenario.characteristics.get("amplitude", 0.3)
|
|
speech = speech * amplitude
|
|
|
|
# Add background noise
|
|
noise = torch.randn(samples) * 0.1
|
|
|
|
return torch.unsqueeze(speech + noise, 0)
|
|
|
|
def _generate_far_field_audio(self, scenario: AudioScenario) -> torch.Tensor:
|
|
"""Generate far-field recording with reverb."""
|
|
# Generate base audio
|
|
audio = self._generate_basic_multi_speaker_audio(scenario)
|
|
|
|
# Simple reverb simulation using delays
|
|
reverb_level = scenario.characteristics.get("reverb_level", 0.4)
|
|
samples = audio.shape[1]
|
|
|
|
# Create delayed versions
|
|
delay_samples_1 = int(0.05 * self.sample_rate) # 50ms delay
|
|
delay_samples_2 = int(0.12 * self.sample_rate) # 120ms delay
|
|
|
|
reverb_audio = audio.clone()
|
|
|
|
# Add delayed components
|
|
if samples > delay_samples_1:
|
|
reverb_audio[0, delay_samples_1:] += (
|
|
audio[0, :-delay_samples_1] * reverb_level * 0.4
|
|
)
|
|
|
|
if samples > delay_samples_2:
|
|
reverb_audio[0, delay_samples_2:] += (
|
|
audio[0, :-delay_samples_2] * reverb_level * 0.2
|
|
)
|
|
|
|
return reverb_audio
|
|
|
|
def _generate_short_utterances_audio(self, scenario: AudioScenario) -> torch.Tensor:
|
|
"""Generate audio with very short utterances."""
|
|
samples = int(scenario.duration * self.sample_rate)
|
|
audio = torch.zeros(samples)
|
|
|
|
current_time = 0.0
|
|
speaker_id = 0
|
|
|
|
while current_time < scenario.duration:
|
|
# Short utterance (0.5 - 1.5 seconds)
|
|
utterance_length = np.random.uniform(0.5, 1.5)
|
|
utterance_length = min(utterance_length, scenario.duration - current_time)
|
|
|
|
if utterance_length < 0.3:
|
|
break
|
|
|
|
start_sample = int(current_time * self.sample_rate)
|
|
end_sample = int((current_time + utterance_length) * self.sample_rate)
|
|
segment_samples = end_sample - start_sample
|
|
|
|
# Generate speech for this segment
|
|
fundamental = 160 + speaker_id * 60
|
|
t = torch.linspace(0, utterance_length, segment_samples)
|
|
speech = torch.sin(2 * torch.pi * fundamental * t)
|
|
speech += 0.3 * torch.sin(2 * torch.pi * fundamental * 2.4 * t)
|
|
|
|
audio[start_sample:end_sample] = speech
|
|
|
|
# Switch speakers frequently
|
|
speaker_id = (speaker_id + 1) % scenario.num_speakers
|
|
|
|
# Short pause
|
|
current_time += utterance_length + np.random.uniform(0.2, 0.8)
|
|
|
|
# Add noise
|
|
noise = torch.randn(samples) * 0.05
|
|
|
|
return torch.unsqueeze(audio + noise, 0)
|
|
|
|
def _generate_silence_heavy_audio(self, scenario: AudioScenario) -> torch.Tensor:
|
|
"""Generate audio with long periods of silence."""
|
|
samples = int(scenario.duration * self.sample_rate)
|
|
audio = torch.zeros(samples)
|
|
|
|
# Generate only the specified segments
|
|
for segment in scenario.expected_segments:
|
|
start_sample = int(segment["start_time"] * self.sample_rate)
|
|
end_sample = int(segment["end_time"] * self.sample_rate)
|
|
segment_samples = end_sample - start_sample
|
|
|
|
speaker_id = int(segment["speaker_label"].split("_")[1]) - 1
|
|
fundamental = 170 + speaker_id * 50
|
|
|
|
t = torch.linspace(
|
|
0, segment["end_time"] - segment["start_time"], segment_samples
|
|
)
|
|
speech = torch.sin(2 * torch.pi * fundamental * t)
|
|
speech += 0.4 * torch.sin(2 * torch.pi * fundamental * 2.1 * t)
|
|
|
|
audio[start_sample:end_sample] = speech
|
|
|
|
# Very light background noise
|
|
noise = torch.randn(samples) * 0.02
|
|
|
|
return torch.unsqueeze(audio + noise, 0)
|
|
|
|
def _generate_basic_multi_speaker_audio(
|
|
self, scenario: AudioScenario
|
|
) -> torch.Tensor:
|
|
"""Generate basic multi-speaker audio."""
|
|
samples = int(scenario.duration * self.sample_rate)
|
|
audio = torch.zeros(samples)
|
|
|
|
segment_duration = scenario.duration / scenario.num_speakers
|
|
|
|
for i in range(scenario.num_speakers):
|
|
start_time = i * segment_duration
|
|
end_time = min((i + 1) * segment_duration, scenario.duration)
|
|
|
|
start_sample = int(start_time * self.sample_rate)
|
|
end_sample = int(end_time * self.sample_rate)
|
|
segment_samples = end_sample - start_sample
|
|
|
|
fundamental = 150 + i * 50
|
|
t = torch.linspace(0, end_time - start_time, segment_samples)
|
|
speech = torch.sin(2 * torch.pi * fundamental * t)
|
|
speech += 0.4 * torch.sin(2 * torch.pi * fundamental * 2.2 * t)
|
|
|
|
audio[start_sample:end_sample] = speech
|
|
|
|
# Add noise
|
|
noise_level = scenario.characteristics.get("noise_level", 0.05)
|
|
noise = torch.randn(samples) * noise_level
|
|
|
|
return torch.unsqueeze(audio + noise, 0)
|
|
|
|
|
|
class AudioFileManager:
|
|
"""Manages creation and cleanup of temporary audio files."""
|
|
|
|
def __init__(self):
|
|
self.created_files = []
|
|
|
|
def create_wav_file(
|
|
self,
|
|
audio_tensor: torch.Tensor,
|
|
sample_rate: int = 16000,
|
|
file_prefix: str = "test_audio",
|
|
) -> str:
|
|
"""Create a WAV file from audio tensor."""
|
|
# Convert to numpy and scale to int16
|
|
audio_numpy = audio_tensor.squeeze().numpy()
|
|
audio_int16 = (audio_numpy * 32767).astype(np.int16)
|
|
|
|
# Create temporary file
|
|
with tempfile.NamedTemporaryFile(
|
|
suffix=".wav", prefix=file_prefix, delete=False
|
|
) as f:
|
|
with wave.open(f.name, "wb") as wav_file:
|
|
wav_file.setnchannels(1)
|
|
wav_file.setsampwidth(2)
|
|
wav_file.setframerate(sample_rate)
|
|
wav_file.writeframes(audio_int16.tobytes())
|
|
|
|
self.created_files.append(f.name)
|
|
return f.name
|
|
|
|
def create_scenario_file(
|
|
self, scenario_name: str, sample_rate: int = 16000
|
|
) -> Tuple[str, AudioScenario]:
|
|
"""Create audio file for a specific scenario."""
|
|
generator = AudioSampleGenerator(sample_rate)
|
|
audio_tensor, scenario = generator.generate_scenario_audio(scenario_name)
|
|
|
|
file_path = self.create_wav_file(
|
|
audio_tensor, sample_rate, f"scenario_{scenario_name}"
|
|
)
|
|
|
|
return file_path, scenario
|
|
|
|
def cleanup_all(self):
|
|
"""Clean up all created files."""
|
|
for file_path in self.created_files:
|
|
try:
|
|
Path(file_path).unlink(missing_ok=True)
|
|
except Exception:
|
|
pass # Ignore cleanup errors
|
|
self.created_files.clear()
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
self.cleanup_all()
|
|
|
|
|
|
class TestDataGenerator:
|
|
"""Generates test data in various formats for NeMo testing."""
|
|
|
|
@staticmethod
|
|
def generate_manifest_json(
|
|
scenarios: List[str], audio_dir: str = "/test/audio"
|
|
) -> str:
|
|
"""Generate NeMo manifest JSON file."""
|
|
manifest_lines = []
|
|
|
|
for i, scenario_name in enumerate(scenarios):
|
|
generator = AudioSampleGenerator()
|
|
scenario = generator.scenarios[scenario_name]
|
|
|
|
manifest_entry = {
|
|
"audio_filepath": f"{audio_dir}/{scenario_name}_{i:03d}.wav",
|
|
"offset": 0,
|
|
"duration": scenario.duration,
|
|
"label": "infer",
|
|
"text": "-",
|
|
"num_speakers": scenario.num_speakers,
|
|
"rttm_filepath": None,
|
|
"uem_filepath": None,
|
|
}
|
|
|
|
manifest_lines.append(json.dumps(manifest_entry))
|
|
|
|
return "\n".join(manifest_lines)
|
|
|
|
@staticmethod
|
|
def generate_rttm_content(
|
|
scenario: AudioScenario, file_id: str = "test_file"
|
|
) -> str:
|
|
"""Generate RTTM format content for a scenario."""
|
|
rttm_lines = []
|
|
|
|
for segment in scenario.expected_segments:
|
|
duration = segment["end_time"] - segment["start_time"]
|
|
line = (
|
|
f"SPEAKER {file_id} 1 {segment['start_time']:.3f} {duration:.3f} "
|
|
f"<U> <U> {segment['speaker_label']} <U>"
|
|
)
|
|
rttm_lines.append(line)
|
|
|
|
return "\n".join(rttm_lines)
|
|
|
|
@staticmethod
|
|
def generate_uem_content(
|
|
scenario: AudioScenario, file_id: str = "test_file"
|
|
) -> str:
|
|
"""Generate UEM (Un-partitioned Evaluation Map) content."""
|
|
# UEM format: <file-id> <channel> <start-time> <end-time>
|
|
return f"{file_id} 1 0.000 {scenario.duration:.3f}"
|
|
|
|
@staticmethod
|
|
def create_test_dataset(scenarios: List[str], output_dir: Path) -> Dict[str, Any]:
|
|
"""Create a complete test dataset with audio files and annotations."""
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
audio_dir = output_dir / "audio"
|
|
rttm_dir = output_dir / "rttm"
|
|
uem_dir = output_dir / "uem"
|
|
|
|
audio_dir.mkdir(exist_ok=True)
|
|
rttm_dir.mkdir(exist_ok=True)
|
|
uem_dir.mkdir(exist_ok=True)
|
|
|
|
generator = AudioSampleGenerator()
|
|
created_files = {
|
|
"audio_files": [],
|
|
"rttm_files": [],
|
|
"uem_files": [],
|
|
"manifest_file": None,
|
|
}
|
|
|
|
manifest_entries = []
|
|
|
|
for i, scenario_name in enumerate(scenarios):
|
|
# Generate audio
|
|
audio_tensor, scenario = generator.generate_scenario_audio(scenario_name)
|
|
|
|
# Create files
|
|
audio_filename = f"{scenario_name}_{i:03d}.wav"
|
|
rttm_filename = f"{scenario_name}_{i:03d}.rttm"
|
|
uem_filename = f"{scenario_name}_{i:03d}.uem"
|
|
|
|
# Save audio file
|
|
audio_path = audio_dir / audio_filename
|
|
with AudioFileManager() as manager:
|
|
temp_file = manager.create_wav_file(audio_tensor)
|
|
Path(temp_file).rename(audio_path)
|
|
|
|
# Save RTTM file
|
|
rttm_path = rttm_dir / rttm_filename
|
|
rttm_content = TestDataGenerator.generate_rttm_content(
|
|
scenario, scenario_name
|
|
)
|
|
rttm_path.write_text(rttm_content)
|
|
|
|
# Save UEM file
|
|
uem_path = uem_dir / uem_filename
|
|
uem_content = TestDataGenerator.generate_uem_content(
|
|
scenario, scenario_name
|
|
)
|
|
uem_path.write_text(uem_content)
|
|
|
|
# Add to manifest
|
|
manifest_entry = {
|
|
"audio_filepath": str(audio_path),
|
|
"offset": 0,
|
|
"duration": scenario.duration,
|
|
"label": "infer",
|
|
"text": "-",
|
|
"num_speakers": scenario.num_speakers,
|
|
"rttm_filepath": str(rttm_path),
|
|
"uem_filepath": str(uem_path),
|
|
}
|
|
manifest_entries.append(manifest_entry)
|
|
|
|
created_files["audio_files"].append(str(audio_path))
|
|
created_files["rttm_files"].append(str(rttm_path))
|
|
created_files["uem_files"].append(str(uem_path))
|
|
|
|
# Save manifest file
|
|
manifest_path = output_dir / "manifest.jsonl"
|
|
with open(manifest_path, "w") as f:
|
|
for entry in manifest_entries:
|
|
f.write(json.dumps(entry) + "\n")
|
|
|
|
created_files["manifest_file"] = str(manifest_path)
|
|
|
|
return created_files
|
|
|
|
|
|
# Predefined test scenarios for easy access
|
|
TEST_SCENARIOS = [
|
|
"single_speaker",
|
|
"two_speakers_alternating",
|
|
"overlapping_speakers",
|
|
"multi_speaker_meeting",
|
|
"noisy_environment",
|
|
"whispered_speech",
|
|
"far_field_recording",
|
|
"very_short_utterances",
|
|
"silence_heavy",
|
|
]
|
|
|
|
CHALLENGING_SCENARIOS = [
|
|
"noisy_environment",
|
|
"overlapping_speakers",
|
|
"whispered_speech",
|
|
"far_field_recording",
|
|
"very_short_utterances",
|
|
]
|
|
|
|
BASIC_SCENARIOS = [
|
|
"single_speaker",
|
|
"two_speakers_alternating",
|
|
"multi_speaker_meeting",
|
|
]
|
|
|
|
|
|
def get_scenario_by_difficulty(difficulty: str) -> List[str]:
|
|
"""Get scenarios by difficulty level."""
|
|
if difficulty == "basic":
|
|
return BASIC_SCENARIOS
|
|
elif difficulty == "challenging":
|
|
return CHALLENGING_SCENARIOS
|
|
elif difficulty == "all":
|
|
return TEST_SCENARIOS
|
|
else:
|
|
raise ValueError(f"Unknown difficulty level: {difficulty}")
|
|
|
|
|
|
def create_quick_test_files(num_files: int = 3) -> List[Tuple[str, AudioScenario]]:
|
|
"""Create a small set of test files for quick testing."""
|
|
scenarios = ["single_speaker", "two_speakers_alternating", "noisy_environment"][
|
|
:num_files
|
|
]
|
|
|
|
files_and_scenarios = []
|
|
|
|
with AudioFileManager() as manager:
|
|
for scenario_name in scenarios:
|
|
file_path, scenario = manager.create_scenario_file(scenario_name)
|
|
files_and_scenarios.append((file_path, scenario))
|
|
|
|
return files_and_scenarios
|