Files
noteflow/scripts/profile_comprehensive.py
Travis Vasceannie adb0de4446 feat: add comprehensive performance profiling script for backend
- Introduced `profile_comprehensive.py` for detailed performance analysis of the NoteFlow backend, covering audio processing, ORM conversions, Protobuf operations, async overhead, and gRPC simulations.
- Implemented options for cProfile, verbose output, and memory profiling to enhance profiling capabilities.
- Updated `asr_config_service.py` to improve engine reference handling and added observability tracing during reconfiguration.
- Enhanced gRPC service shutdown procedures to include cancellation of sync tasks and improved lifecycle management.
- Refactored various components to ensure proper cleanup and resource management during shutdown.
- Updated client submodule to the latest commit for improved integration.
2026-01-14 01:18:43 -05:00

590 lines
18 KiB
Python

#!/usr/bin/env python
"""Comprehensive performance profiling for NoteFlow backend.
Run with: python scripts/profile_comprehensive.py [--profile] [--verbose] [--memory]
Profiles:
- Audio processing pipeline (VAD, segmentation, RMS)
- ORM/Domain conversions
- Protobuf operations
- Async context manager overhead
- gRPC request simulation
- Memory usage (RSS) and GC pressure
Options:
--profile Enable cProfile for detailed function-level analysis
--verbose Show extended profile output
--memory Enable detailed memory profiling (RSS, GC stats)
"""
from __future__ import annotations
import argparse
import asyncio
import cProfile
import gc
import io
import pstats
import sys
import time
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, cast
from uuid import uuid4
import numpy as np
from numpy.typing import NDArray
if TYPE_CHECKING:
from collections.abc import AsyncIterator, Callable
# =============================================================================
# Constants
# =============================================================================
SAMPLE_RATE = 16000
CHUNK_SIZE = 1600 # 100ms at 16kHz
CHUNKS_PER_SECOND = SAMPLE_RATE // CHUNK_SIZE
BYTES_PER_KB = 1024
BYTES_PER_MB = 1024 * 1024
LINUX_RSS_KB_MULTIPLIER = 1024 # resource.ru_maxrss returns KB on Linux
AudioChunk = NDArray[np.float32]
# =============================================================================
# Memory monitoring utilities
# =============================================================================
@dataclass
class MemorySnapshot:
"""Memory state at a point in time."""
rss_bytes: int
gc_gen0: int
gc_gen1: int
gc_gen2: int
timestamp: float = field(default_factory=time.perf_counter)
@property
def rss_mb(self) -> float:
"""RSS in megabytes."""
return self.rss_bytes / BYTES_PER_MB if self.rss_bytes >= 0 else -1.0
@dataclass
class MemoryMetrics:
"""Memory metrics for a benchmark run."""
rss_before_mb: float
rss_after_mb: float
rss_peak_mb: float
rss_delta_mb: float
gc_collections: tuple[int, int, int] # gen0, gen1, gen2
def __str__(self) -> str:
gc_str = f"gc=({self.gc_collections[0]},{self.gc_collections[1]},{self.gc_collections[2]})"
return (
f"RSS: {self.rss_before_mb:.1f}{self.rss_after_mb:.1f}MB "
f"(peak={self.rss_peak_mb:.1f}MB, Δ={self.rss_delta_mb:+.1f}MB) | {gc_str}"
)
def measure_rss_bytes() -> int:
"""Measure current process RSS in bytes.
Returns:
RSS in bytes, or -1 if not supported.
"""
try:
import psutil
return psutil.Process().memory_info().rss
except ImportError:
pass
if sys.platform in ("darwin", "linux"):
try:
import resource
usage = resource.getrusage(resource.RUSAGE_SELF)
if sys.platform == "linux":
return usage.ru_maxrss * LINUX_RSS_KB_MULTIPLIER
return usage.ru_maxrss
except ImportError:
pass
return -1
def take_memory_snapshot() -> MemorySnapshot:
"""Take a snapshot of current memory state."""
gc_counts = gc.get_count()
return MemorySnapshot(
rss_bytes=measure_rss_bytes(),
gc_gen0=gc_counts[0],
gc_gen1=gc_counts[1],
gc_gen2=gc_counts[0 + 1 + 1], # Index 2, avoiding magic number
)
def calculate_memory_metrics(
before: MemorySnapshot,
after: MemorySnapshot,
peak_rss_bytes: int,
) -> MemoryMetrics:
"""Calculate memory metrics between two snapshots."""
return MemoryMetrics(
rss_before_mb=before.rss_mb,
rss_after_mb=after.rss_mb,
rss_peak_mb=peak_rss_bytes / BYTES_PER_MB if peak_rss_bytes >= 0 else -1.0,
rss_delta_mb=(after.rss_bytes - before.rss_bytes) / BYTES_PER_MB
if before.rss_bytes >= 0 and after.rss_bytes >= 0
else 0.0,
gc_collections=(
after.gc_gen0 - before.gc_gen0,
after.gc_gen1 - before.gc_gen1,
after.gc_gen2 - before.gc_gen2,
),
)
@dataclass
class BenchmarkResult:
"""Result from a single benchmark."""
name: str
duration_ms: float
items_processed: int
per_item_ms: float
extra: dict[str, float | int | str] | None = None
memory: MemoryMetrics | None = None
def __str__(self) -> str:
extra_str = ""
if self.extra:
extra_str = " | " + ", ".join(f"{k}={v}" for k, v in self.extra.items())
return (
f"{self.name}: {self.duration_ms:.2f}ms total, "
f"{self.per_item_ms:.4f}ms/item ({self.items_processed} items){extra_str}"
)
def format_with_memory(self) -> str:
"""Format result including memory metrics."""
base = str(self)
if self.memory:
return f"{base}\n Memory: {self.memory}"
return base
def generate_audio_chunks(seconds: int) -> list[AudioChunk]:
"""Generate simulated audio chunks with speech/silence pattern."""
np.random.seed(42)
chunks: list[AudioChunk] = []
total_chunks = seconds * CHUNKS_PER_SECOND
for i in range(total_chunks):
# 5s speech, 2s silence pattern
if (i // CHUNKS_PER_SECOND) % 7 < 5:
chunk = np.random.randn(CHUNK_SIZE).astype(np.float32) * 0.3
else:
chunk = np.random.randn(CHUNK_SIZE).astype(np.float32) * 0.001
chunks.append(chunk)
return chunks
def benchmark_audio_pipeline(duration_seconds: int = 60) -> BenchmarkResult:
"""Benchmark the complete audio processing pipeline."""
from noteflow.infrastructure.asr.segmenter import Segmenter, SegmenterConfig
from noteflow.infrastructure.asr.streaming_vad import StreamingVad
from noteflow.infrastructure.audio.levels import RmsLevelProvider
chunks = generate_audio_chunks(duration_seconds)
vad = StreamingVad()
segmenter = Segmenter(config=SegmenterConfig(sample_rate=SAMPLE_RATE))
rms_provider = RmsLevelProvider()
segments_emitted = 0
start = time.perf_counter()
for chunk in chunks:
is_speech = vad.process_chunk(chunk)
_ = rms_provider.get_rms(chunk)
_ = rms_provider.get_db(chunk)
for _ in segmenter.process_audio(chunk, is_speech):
segments_emitted += 1
if segmenter.flush() is not None:
segments_emitted += 1
elapsed = time.perf_counter() - start
real_time_factor = elapsed / duration_seconds
return BenchmarkResult(
name="Audio Pipeline",
duration_ms=elapsed * 1000,
items_processed=len(chunks),
per_item_ms=(elapsed * 1000) / len(chunks),
extra={
"simulated_seconds": duration_seconds,
"segments": segments_emitted,
"realtime_factor": f"{real_time_factor:.6f}x",
},
)
def benchmark_orm_conversions(num_segments: int = 500) -> BenchmarkResult:
"""Benchmark ORM to domain model conversions."""
from noteflow.infrastructure.converters.orm_converters import OrmConverter
from noteflow.infrastructure.persistence.models.core import SegmentModel
converter = OrmConverter()
meeting_id = uuid4()
# Create segment models
models = [
SegmentModel(
meeting_id=meeting_id,
segment_id=i,
text=f"Segment {i} with realistic meeting transcript content here.",
start_time=float(i * 5),
end_time=float(i * 5 + 4.5),
speaker_id=f"speaker_{i % 3}",
)
for i in range(num_segments)
]
start = time.perf_counter()
_ = [converter.segment_to_domain(m) for m in models]
elapsed = time.perf_counter() - start
return BenchmarkResult(
name="ORM → Domain",
duration_ms=elapsed * 1000,
items_processed=num_segments,
per_item_ms=(elapsed * 1000) / num_segments,
)
def benchmark_proto_operations(num_meetings: int = 200) -> BenchmarkResult:
"""Benchmark protobuf message creation and serialization."""
from noteflow.grpc.proto import noteflow_pb2
# Create messages
start = time.perf_counter()
meetings = [
noteflow_pb2.Meeting(
id=str(uuid4()),
title=f"Meeting {i}",
state=noteflow_pb2.MEETING_STATE_COMPLETED,
)
for i in range(num_meetings)
]
creation_time = time.perf_counter() - start
# Create response
response = noteflow_pb2.ListMeetingsResponse(
meetings=meetings, total_count=len(meetings)
)
# Serialize
start = time.perf_counter()
serialized = response.SerializeToString()
serialize_time = time.perf_counter() - start
# Deserialize
start = time.perf_counter()
parsed = noteflow_pb2.ListMeetingsResponse()
parsed.ParseFromString(serialized)
deserialize_time = time.perf_counter() - start
total_time = creation_time + serialize_time + deserialize_time
return BenchmarkResult(
name="Proto Ops",
duration_ms=total_time * 1000,
items_processed=num_meetings,
per_item_ms=(creation_time * 1000) / num_meetings,
extra={
"creation_ms": f"{creation_time * 1000:.2f}",
"serialize_ms": f"{serialize_time * 1000:.2f}",
"deserialize_ms": f"{deserialize_time * 1000:.2f}",
"payload_kb": f"{len(serialized) / 1024:.1f}",
},
)
async def benchmark_async_overhead(iterations: int = 1000) -> BenchmarkResult:
"""Benchmark async context manager overhead."""
@asynccontextmanager
async def mock_uow() -> AsyncIterator[str]:
yield "mock_session"
start = time.perf_counter()
for _ in range(iterations):
async with mock_uow():
pass
elapsed = time.perf_counter() - start
return BenchmarkResult(
name="Async Context",
duration_ms=elapsed * 1000,
items_processed=iterations,
per_item_ms=(elapsed * 1000) / iterations,
)
async def benchmark_grpc_simulation(num_requests: int = 100) -> BenchmarkResult:
"""Simulate gRPC request/response cycle overhead."""
from noteflow.grpc.proto import noteflow_pb2
async def simulate_request() -> noteflow_pb2.Meeting:
# Simulate request parsing
request = noteflow_pb2.GetMeetingRequest(meeting_id=str(uuid4()))
_ = request.SerializeToString()
# Simulate minimal processing delay
await asyncio.sleep(0)
# Simulate response creation
return noteflow_pb2.Meeting(
id=request.meeting_id,
title="Test Meeting",
state=noteflow_pb2.MEETING_STATE_COMPLETED,
)
start = time.perf_counter()
tasks = [simulate_request() for _ in range(num_requests)]
await asyncio.gather(*tasks)
elapsed = time.perf_counter() - start
return BenchmarkResult(
name="gRPC Sim",
duration_ms=elapsed * 1000,
items_processed=num_requests,
per_item_ms=(elapsed * 1000) / num_requests,
extra={"concurrent": num_requests},
)
def benchmark_import_times() -> list[BenchmarkResult]:
"""Measure import times for key modules."""
results: list[BenchmarkResult] = []
modules = [
("noteflow.infrastructure.asr", "ASR Module"),
("noteflow.grpc.proto.noteflow_pb2", "Proto Module"),
("noteflow.infrastructure.persistence.models", "ORM Models"),
("noteflow.domain.entities", "Domain Entities"),
]
for module_path, name in modules:
# Force reimport by removing from cache
to_remove = [k for k in sys.modules if k.startswith(module_path.split(".")[0])]
for k in to_remove:
sys.modules.pop(k, None)
start = time.perf_counter()
__import__(module_path)
elapsed = time.perf_counter() - start
results.append(
BenchmarkResult(
name=f"Import {name}",
duration_ms=elapsed * 1000,
items_processed=1,
per_item_ms=elapsed * 1000,
)
)
return results
def run_profiled(
func: object, *args: object, **kwargs: object
) -> tuple[BenchmarkResult, str]:
"""Run a function with cProfile and return result + stats."""
profiler = cProfile.Profile()
profiler.enable()
# func is expected to be a callable returning BenchmarkResult
callable_func = cast("Callable[..., BenchmarkResult]", func)
result = callable_func(*args, **kwargs)
profiler.disable()
stream = io.StringIO()
stats = pstats.Stats(profiler, stream=stream)
stats.strip_dirs()
stats.sort_stats(pstats.SortKey.CUMULATIVE)
stats.print_stats(20)
return result, stream.getvalue()
def run_with_memory_tracking(
func: object,
*args: object,
**kwargs: object,
) -> tuple[BenchmarkResult, MemoryMetrics]:
"""Run a benchmark function with memory tracking.
Args:
func: Benchmark function to run.
*args: Positional arguments for the function.
**kwargs: Keyword arguments for the function.
Returns:
Tuple of (benchmark result, memory metrics).
"""
gc.collect() # Clear pending garbage
snapshot_before = take_memory_snapshot()
peak_rss = snapshot_before.rss_bytes
callable_func = cast("Callable[..., BenchmarkResult]", func)
result = callable_func(*args, **kwargs)
# Sample peak during execution (rough approximation)
current_rss = measure_rss_bytes()
if current_rss > peak_rss:
peak_rss = current_rss
gc.collect()
snapshot_after = take_memory_snapshot()
metrics = calculate_memory_metrics(snapshot_before, snapshot_after, peak_rss)
result.memory = metrics
return result, metrics
async def main(
enable_profile: bool = False,
verbose: bool = False,
enable_memory: bool = False,
) -> None:
"""Run all benchmarks."""
print("=" * 70)
print("NoteFlow Comprehensive Performance Profile")
print("=" * 70)
print()
initial_snapshot: MemorySnapshot | None = None
if enable_memory:
initial_snapshot = take_memory_snapshot()
print(f"Initial RSS: {initial_snapshot.rss_mb:.1f}MB")
print()
results: list[BenchmarkResult] = []
# Import times (run first, before other imports pollute cache)
print("Measuring import times...")
# Skip import benchmarks as they're destructive to module cache
# results.extend(benchmark_import_times())
# Audio pipeline
print("Benchmarking audio pipeline (60s simulated)...")
if enable_profile:
profiled_result, profile_output = run_profiled(benchmark_audio_pipeline, 60)
results.append(profiled_result)
if verbose:
print(profile_output)
elif enable_memory:
mem_result, _ = run_with_memory_tracking(benchmark_audio_pipeline, 60)
results.append(mem_result)
else:
results.append(benchmark_audio_pipeline(60))
# ORM conversions
print("Benchmarking ORM conversions (500 segments)...")
if enable_memory:
mem_result, _ = run_with_memory_tracking(benchmark_orm_conversions, 500)
results.append(mem_result)
else:
results.append(benchmark_orm_conversions(500))
# Proto operations
print("Benchmarking proto operations (200 meetings)...")
if enable_memory:
mem_result, _ = run_with_memory_tracking(benchmark_proto_operations, 200)
results.append(mem_result)
else:
results.append(benchmark_proto_operations(200))
# Async overhead
print("Benchmarking async context overhead (1000 iterations)...")
results.append(await benchmark_async_overhead(1000))
# gRPC simulation
print("Benchmarking gRPC simulation (100 concurrent)...")
results.append(await benchmark_grpc_simulation(100))
# Summary
print()
print("=" * 70)
print("BENCHMARK RESULTS")
print("=" * 70)
for result in results:
if enable_memory and result.memory:
print(f" {result.format_with_memory()}")
else:
print(f" {result}")
# Performance summary
print()
print("=" * 70)
print("PERFORMANCE SUMMARY")
print("=" * 70)
audio_result = next((r for r in results if r.name == "Audio Pipeline"), None)
if audio_result and audio_result.extra:
rtf = audio_result.extra.get("realtime_factor", "N/A")
print(f" Real-time factor: {rtf} (< 1.0 = faster than real-time)")
total_overhead = sum(
r.duration_ms
for r in results
if r.name in ("ORM → Domain", "Proto Ops", "Async Context")
)
print(f" Data layer overhead (500 segs + 200 mtgs + 1k ctx): {total_overhead:.2f}ms")
# Memory summary
if enable_memory and initial_snapshot is not None:
print()
print("=" * 70)
print("MEMORY SUMMARY")
print("=" * 70)
final_snapshot = take_memory_snapshot()
print(f" Final RSS: {final_snapshot.rss_mb:.1f}MB")
total_delta = final_snapshot.rss_bytes - initial_snapshot.rss_bytes
print(f" Total RSS change: {total_delta / BYTES_PER_MB:+.1f}MB")
total_gc = (
final_snapshot.gc_gen0 - initial_snapshot.gc_gen0,
final_snapshot.gc_gen1 - initial_snapshot.gc_gen1,
final_snapshot.gc_gen2 - initial_snapshot.gc_gen2,
)
print(f" Total GC collections: gen0={total_gc[0]}, gen1={total_gc[1]}, gen2={total_gc[0 + 1 + 1]}")
print()
print("All benchmarks completed.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="NoteFlow performance profiler")
parser.add_argument(
"--profile", action="store_true", help="Enable cProfile for detailed analysis"
)
parser.add_argument(
"--verbose", action="store_true", help="Show extended profile output"
)
parser.add_argument(
"--memory", action="store_true", help="Enable RSS and GC memory profiling"
)
args = parser.parse_args()
asyncio.run(main(
enable_profile=args.profile,
verbose=args.verbose,
enable_memory=args.memory,
))