650 lines
22 KiB
Python
650 lines
22 KiB
Python
"""Backend gRPC Round-Trip Performance Profiling.
|
|
|
|
Profiles real backend operations including database round-trips.
|
|
Configure NOTEFLOW_GRPC_TARGET to connect to remote server.
|
|
|
|
Run with:
|
|
NOTEFLOW_GRPC_TARGET=192.168.50.151:50051 pytest tests/profiling/test_backend_roundtrip_profile.py -v -s
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import os
|
|
import statistics
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from typing import TYPE_CHECKING, Final
|
|
from uuid import uuid4
|
|
|
|
import grpc
|
|
import pytest
|
|
|
|
from noteflow.grpc.proto import noteflow_pb2, noteflow_pb2_grpc
|
|
|
|
if TYPE_CHECKING:
|
|
from collections.abc import Generator
|
|
|
|
# Configuration
|
|
DEFAULT_SERVER: Final[str] = "localhost:50051"
|
|
GRPC_TARGET: Final[str] = os.environ.get("NOTEFLOW_GRPC_TARGET", DEFAULT_SERVER)
|
|
|
|
# Required gRPC metadata
|
|
def get_metadata() -> tuple[tuple[str, str], ...]:
|
|
"""Get required gRPC metadata headers."""
|
|
return (
|
|
("x-request-id", str(uuid4())),
|
|
("x-workspace-id", "00000000-0000-0000-0000-000000000001"),
|
|
)
|
|
|
|
# Profiling thresholds (milliseconds)
|
|
THRESHOLDS = {
|
|
"create_meeting": 500,
|
|
"get_meeting": 200,
|
|
"list_meetings": 300,
|
|
"update_meeting": 300,
|
|
"delete_meeting": 300,
|
|
"create_segment": 200,
|
|
"list_segments": 300,
|
|
"diarization_refine": 5000,
|
|
"generate_summary": 10000,
|
|
"get_analytics": 1000,
|
|
"create_annotation": 200,
|
|
"list_annotations": 300,
|
|
"get_preferences": 100,
|
|
"save_preferences": 200,
|
|
"concurrent_reads": 500,
|
|
"batch_create": 2000,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class ProfileResult:
|
|
"""Result of a profiled operation."""
|
|
|
|
operation: str
|
|
duration_ms: float
|
|
success: bool
|
|
threshold_ms: float
|
|
details: str = ""
|
|
|
|
@property
|
|
def passed_threshold(self) -> bool:
|
|
return self.duration_ms <= self.threshold_ms
|
|
|
|
def __str__(self) -> str:
|
|
status = "PASS" if self.passed_threshold else "FAIL"
|
|
return f"[{status}] {self.operation}: {self.duration_ms:.2f}ms (threshold: {self.threshold_ms}ms)"
|
|
|
|
|
|
@dataclass
|
|
class ProfileSession:
|
|
"""Collection of profiling results."""
|
|
|
|
results: list[ProfileResult] = field(default_factory=list)
|
|
server: str = GRPC_TARGET
|
|
|
|
def add(self, result: ProfileResult) -> None:
|
|
self.results.append(result)
|
|
|
|
def summary(self) -> str:
|
|
lines = [f"\n{'='*60}", f"PROFILING SUMMARY - Server: {self.server}", "=" * 60]
|
|
passed = sum(1 for r in self.results if r.passed_threshold)
|
|
total = len(self.results)
|
|
lines.append(f"Results: {passed}/{total} passed thresholds\n")
|
|
|
|
for result in self.results:
|
|
lines.append(str(result))
|
|
|
|
if self.results:
|
|
durations = [r.duration_ms for r in self.results]
|
|
lines.append(f"\nStatistics:")
|
|
lines.append(f" Mean: {statistics.mean(durations):.2f}ms")
|
|
lines.append(f" Median: {statistics.median(durations):.2f}ms")
|
|
lines.append(f" Stdev: {statistics.stdev(durations):.2f}ms" if len(durations) > 1 else "")
|
|
lines.append(f" Min: {min(durations):.2f}ms")
|
|
lines.append(f" Max: {max(durations):.2f}ms")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def grpc_channel() -> Generator[grpc.Channel, None, None]:
|
|
"""Create gRPC channel to the target server."""
|
|
channel = grpc.insecure_channel(
|
|
GRPC_TARGET,
|
|
options=[
|
|
("grpc.keepalive_time_ms", 10000),
|
|
("grpc.keepalive_timeout_ms", 5000),
|
|
("grpc.keepalive_permit_without_calls", True),
|
|
],
|
|
)
|
|
# Wait for channel to be ready
|
|
try:
|
|
grpc.channel_ready_future(channel).result(timeout=10)
|
|
except grpc.FutureTimeoutError:
|
|
pytest.skip(f"Could not connect to gRPC server at {GRPC_TARGET}")
|
|
yield channel
|
|
channel.close()
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def stub(grpc_channel: grpc.Channel) -> noteflow_pb2_grpc.NoteFlowServiceStub:
|
|
"""Create gRPC stub."""
|
|
return noteflow_pb2_grpc.NoteFlowServiceStub(grpc_channel)
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def profile_session() -> ProfileSession:
|
|
"""Create a profiling session."""
|
|
return ProfileSession()
|
|
|
|
|
|
def profile_operation(
|
|
operation: str, threshold_ms: float | None = None
|
|
) -> callable:
|
|
"""Decorator to profile an operation."""
|
|
|
|
def decorator(func: callable) -> callable:
|
|
def wrapper(
|
|
stub: noteflow_pb2_grpc.NoteFlowServiceStub,
|
|
profile_session: ProfileSession,
|
|
*args,
|
|
**kwargs,
|
|
):
|
|
threshold = threshold_ms or THRESHOLDS.get(operation, 1000)
|
|
start = time.perf_counter()
|
|
try:
|
|
result = func(stub, profile_session, *args, **kwargs)
|
|
duration_ms = (time.perf_counter() - start) * 1000
|
|
profile_result = ProfileResult(
|
|
operation=operation,
|
|
duration_ms=duration_ms,
|
|
success=True,
|
|
threshold_ms=threshold,
|
|
)
|
|
profile_session.add(profile_result)
|
|
print(profile_result)
|
|
return result
|
|
except grpc.RpcError as e:
|
|
duration_ms = (time.perf_counter() - start) * 1000
|
|
profile_result = ProfileResult(
|
|
operation=operation,
|
|
duration_ms=duration_ms,
|
|
success=False,
|
|
threshold_ms=threshold,
|
|
details=str(e.code()),
|
|
)
|
|
profile_session.add(profile_result)
|
|
print(f"✗ {operation}: FAILED - {e.code()}")
|
|
raise
|
|
|
|
return wrapper
|
|
|
|
return decorator
|
|
|
|
|
|
class TestMeetingCRUDProfiling:
|
|
"""Profile meeting CRUD operations."""
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _setup(
|
|
self,
|
|
stub: noteflow_pb2_grpc.NoteFlowServiceStub,
|
|
profile_session: ProfileSession,
|
|
) -> None:
|
|
self.stub = stub
|
|
self.session = profile_session
|
|
self.meeting_id: str | None = None
|
|
|
|
def test_01_create_meeting(self) -> None:
|
|
"""Profile meeting creation."""
|
|
start = time.perf_counter()
|
|
request = noteflow_pb2.CreateMeetingRequest(
|
|
title=f"Profile Test Meeting {uuid4()}",
|
|
)
|
|
try:
|
|
response = self.stub.CreateMeeting(request, timeout=10, metadata=get_metadata())
|
|
duration_ms = (time.perf_counter() - start) * 1000
|
|
self.meeting_id = response.id
|
|
result = ProfileResult(
|
|
operation="create_meeting",
|
|
duration_ms=duration_ms,
|
|
success=True,
|
|
threshold_ms=THRESHOLDS["create_meeting"],
|
|
)
|
|
self.session.add(result)
|
|
print(result)
|
|
# Store for subsequent tests
|
|
pytest.meeting_id = response.id
|
|
assert response.id
|
|
except grpc.RpcError as e:
|
|
duration_ms = (time.perf_counter() - start) * 1000
|
|
result = ProfileResult(
|
|
operation="create_meeting",
|
|
duration_ms=duration_ms,
|
|
success=False,
|
|
threshold_ms=THRESHOLDS["create_meeting"],
|
|
details=str(e.code()),
|
|
)
|
|
self.session.add(result)
|
|
pytest.skip(f"Create meeting failed: {e.code()}")
|
|
|
|
def test_02_get_meeting(self) -> None:
|
|
"""Profile meeting retrieval."""
|
|
meeting_id = getattr(pytest, "meeting_id", None)
|
|
if not meeting_id:
|
|
pytest.skip("No meeting created")
|
|
|
|
start = time.perf_counter()
|
|
request = noteflow_pb2.GetMeetingRequest(meeting_id=meeting_id)
|
|
try:
|
|
response = self.stub.GetMeeting(request, timeout=10, metadata=get_metadata())
|
|
duration_ms = (time.perf_counter() - start) * 1000
|
|
result = ProfileResult(
|
|
operation="get_meeting",
|
|
duration_ms=duration_ms,
|
|
success=True,
|
|
threshold_ms=THRESHOLDS["get_meeting"],
|
|
)
|
|
self.session.add(result)
|
|
print(result)
|
|
assert response.id == meeting_id
|
|
except grpc.RpcError as e:
|
|
duration_ms = (time.perf_counter() - start) * 1000
|
|
result = ProfileResult(
|
|
operation="get_meeting",
|
|
duration_ms=duration_ms,
|
|
success=False,
|
|
threshold_ms=THRESHOLDS["get_meeting"],
|
|
details=str(e.code()),
|
|
)
|
|
self.session.add(result)
|
|
raise
|
|
|
|
def test_03_list_meetings(self) -> None:
|
|
"""Profile meeting listing."""
|
|
start = time.perf_counter()
|
|
request = noteflow_pb2.ListMeetingsRequest(limit=50)
|
|
try:
|
|
response = self.stub.ListMeetings(request, timeout=10, metadata=get_metadata())
|
|
duration_ms = (time.perf_counter() - start) * 1000
|
|
result = ProfileResult(
|
|
operation="list_meetings",
|
|
duration_ms=duration_ms,
|
|
success=True,
|
|
threshold_ms=THRESHOLDS["list_meetings"],
|
|
details=f"count={len(response.meetings)}",
|
|
)
|
|
self.session.add(result)
|
|
print(result)
|
|
except grpc.RpcError as e:
|
|
duration_ms = (time.perf_counter() - start) * 1000
|
|
result = ProfileResult(
|
|
operation="list_meetings",
|
|
duration_ms=duration_ms,
|
|
success=False,
|
|
threshold_ms=THRESHOLDS["list_meetings"],
|
|
details=str(e.code()),
|
|
)
|
|
self.session.add(result)
|
|
raise
|
|
|
|
def test_04_stop_meeting(self) -> None:
|
|
"""Profile meeting stop operation."""
|
|
meeting_id = getattr(pytest, "meeting_id", None)
|
|
if not meeting_id:
|
|
pytest.skip("No meeting created")
|
|
|
|
start = time.perf_counter()
|
|
request = noteflow_pb2.StopMeetingRequest(meeting_id=meeting_id)
|
|
try:
|
|
response = self.stub.StopMeeting(request, timeout=10, metadata=get_metadata())
|
|
duration_ms = (time.perf_counter() - start) * 1000
|
|
result = ProfileResult(
|
|
operation="stop_meeting",
|
|
duration_ms=duration_ms,
|
|
success=True,
|
|
threshold_ms=THRESHOLDS.get("stop_meeting", 500),
|
|
)
|
|
self.session.add(result)
|
|
print(result)
|
|
except grpc.RpcError as e:
|
|
duration_ms = (time.perf_counter() - start) * 1000
|
|
# Stop on non-recording meeting might fail, but we still capture timing
|
|
result = ProfileResult(
|
|
operation="stop_meeting",
|
|
duration_ms=duration_ms,
|
|
success=False,
|
|
threshold_ms=THRESHOLDS.get("stop_meeting", 500),
|
|
details=str(e.code()),
|
|
)
|
|
self.session.add(result)
|
|
print(result) # Don't raise - expected to fail on non-recording meeting
|
|
|
|
|
|
class TestSegmentProfiling:
|
|
"""Profile segment operations."""
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _setup(
|
|
self,
|
|
stub: noteflow_pb2_grpc.NoteFlowServiceStub,
|
|
profile_session: ProfileSession,
|
|
) -> None:
|
|
self.stub = stub
|
|
self.session = profile_session
|
|
|
|
def test_list_segments(self) -> None:
|
|
"""Profile segment listing (via GetMeeting with include_segments)."""
|
|
meeting_id = getattr(pytest, "meeting_id", None)
|
|
if not meeting_id:
|
|
pytest.skip("No meeting created")
|
|
|
|
start = time.perf_counter()
|
|
request = noteflow_pb2.GetMeetingRequest(
|
|
meeting_id=meeting_id,
|
|
include_segments=True,
|
|
)
|
|
try:
|
|
response = self.stub.GetMeeting(request, timeout=10, metadata=get_metadata())
|
|
duration_ms = (time.perf_counter() - start) * 1000
|
|
result = ProfileResult(
|
|
operation="list_segments",
|
|
duration_ms=duration_ms,
|
|
success=True,
|
|
threshold_ms=THRESHOLDS["list_segments"],
|
|
details=f"count={len(response.segments)}",
|
|
)
|
|
self.session.add(result)
|
|
print(result)
|
|
except grpc.RpcError as e:
|
|
duration_ms = (time.perf_counter() - start) * 1000
|
|
result = ProfileResult(
|
|
operation="list_segments",
|
|
duration_ms=duration_ms,
|
|
success=False,
|
|
threshold_ms=THRESHOLDS["list_segments"],
|
|
details=str(e.code()),
|
|
)
|
|
self.session.add(result)
|
|
raise
|
|
|
|
|
|
class TestAnnotationProfiling:
|
|
"""Profile annotation operations."""
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _setup(
|
|
self,
|
|
stub: noteflow_pb2_grpc.NoteFlowServiceStub,
|
|
profile_session: ProfileSession,
|
|
) -> None:
|
|
self.stub = stub
|
|
self.session = profile_session
|
|
|
|
def test_01_create_annotation(self) -> None:
|
|
"""Profile annotation creation."""
|
|
meeting_id = getattr(pytest, "meeting_id", None)
|
|
if not meeting_id:
|
|
pytest.skip("No meeting created")
|
|
|
|
start = time.perf_counter()
|
|
request = noteflow_pb2.AddAnnotationRequest(
|
|
meeting_id=meeting_id,
|
|
annotation_type=noteflow_pb2.ANNOTATION_TYPE_ACTION_ITEM,
|
|
text=f"Profile test annotation {uuid4()}",
|
|
)
|
|
try:
|
|
response = self.stub.AddAnnotation(request, timeout=10, metadata=get_metadata())
|
|
duration_ms = (time.perf_counter() - start) * 1000
|
|
result = ProfileResult(
|
|
operation="create_annotation",
|
|
duration_ms=duration_ms,
|
|
success=True,
|
|
threshold_ms=THRESHOLDS["create_annotation"],
|
|
)
|
|
self.session.add(result)
|
|
print(result)
|
|
pytest.annotation_id = response.id
|
|
except grpc.RpcError as e:
|
|
duration_ms = (time.perf_counter() - start) * 1000
|
|
result = ProfileResult(
|
|
operation="create_annotation",
|
|
duration_ms=duration_ms,
|
|
success=False,
|
|
threshold_ms=THRESHOLDS["create_annotation"],
|
|
details=str(e.code()),
|
|
)
|
|
self.session.add(result)
|
|
raise
|
|
|
|
def test_02_list_annotations(self) -> None:
|
|
"""Profile annotation listing."""
|
|
meeting_id = getattr(pytest, "meeting_id", None)
|
|
if not meeting_id:
|
|
pytest.skip("No meeting created")
|
|
|
|
start = time.perf_counter()
|
|
request = noteflow_pb2.ListAnnotationsRequest(meeting_id=meeting_id)
|
|
try:
|
|
response = self.stub.ListAnnotations(request, timeout=10, metadata=get_metadata())
|
|
duration_ms = (time.perf_counter() - start) * 1000
|
|
result = ProfileResult(
|
|
operation="list_annotations",
|
|
duration_ms=duration_ms,
|
|
success=True,
|
|
threshold_ms=THRESHOLDS["list_annotations"],
|
|
details=f"count={len(response.annotations)}",
|
|
)
|
|
self.session.add(result)
|
|
print(result)
|
|
except grpc.RpcError as e:
|
|
duration_ms = (time.perf_counter() - start) * 1000
|
|
result = ProfileResult(
|
|
operation="list_annotations",
|
|
duration_ms=duration_ms,
|
|
success=False,
|
|
threshold_ms=THRESHOLDS["list_annotations"],
|
|
details=str(e.code()),
|
|
)
|
|
self.session.add(result)
|
|
raise
|
|
|
|
|
|
class TestPreferencesProfiling:
|
|
"""Profile preferences operations."""
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _setup(
|
|
self,
|
|
stub: noteflow_pb2_grpc.NoteFlowServiceStub,
|
|
profile_session: ProfileSession,
|
|
) -> None:
|
|
self.stub = stub
|
|
self.session = profile_session
|
|
|
|
def test_get_preferences(self) -> None:
|
|
"""Profile preferences retrieval."""
|
|
start = time.perf_counter()
|
|
request = noteflow_pb2.GetPreferencesRequest()
|
|
try:
|
|
response = self.stub.GetPreferences(request, timeout=10, metadata=get_metadata())
|
|
duration_ms = (time.perf_counter() - start) * 1000
|
|
result = ProfileResult(
|
|
operation="get_preferences",
|
|
duration_ms=duration_ms,
|
|
success=True,
|
|
threshold_ms=THRESHOLDS["get_preferences"],
|
|
)
|
|
self.session.add(result)
|
|
print(result)
|
|
except grpc.RpcError as e:
|
|
duration_ms = (time.perf_counter() - start) * 1000
|
|
result = ProfileResult(
|
|
operation="get_preferences",
|
|
duration_ms=duration_ms,
|
|
success=False,
|
|
threshold_ms=THRESHOLDS["get_preferences"],
|
|
details=str(e.code()),
|
|
)
|
|
self.session.add(result)
|
|
raise
|
|
|
|
|
|
class TestConcurrentProfiling:
|
|
"""Profile concurrent operations."""
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _setup(
|
|
self,
|
|
stub: noteflow_pb2_grpc.NoteFlowServiceStub,
|
|
profile_session: ProfileSession,
|
|
) -> None:
|
|
self.stub = stub
|
|
self.session = profile_session
|
|
|
|
def test_concurrent_reads(self) -> None:
|
|
"""Profile concurrent read operations."""
|
|
import concurrent.futures
|
|
|
|
def read_meetings():
|
|
request = noteflow_pb2.ListMeetingsRequest(limit=10)
|
|
return self.stub.ListMeetings(request, timeout=10, metadata=get_metadata())
|
|
|
|
start = time.perf_counter()
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
|
|
futures = [executor.submit(read_meetings) for _ in range(5)]
|
|
results = [f.result() for f in concurrent.futures.as_completed(futures)]
|
|
|
|
duration_ms = (time.perf_counter() - start) * 1000
|
|
result = ProfileResult(
|
|
operation="concurrent_reads",
|
|
duration_ms=duration_ms,
|
|
success=True,
|
|
threshold_ms=THRESHOLDS["concurrent_reads"],
|
|
details=f"5 concurrent requests",
|
|
)
|
|
self.session.add(result)
|
|
print(result)
|
|
|
|
|
|
class TestBatchProfiling:
|
|
"""Profile batch operations."""
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _setup(
|
|
self,
|
|
stub: noteflow_pb2_grpc.NoteFlowServiceStub,
|
|
profile_session: ProfileSession,
|
|
) -> None:
|
|
self.stub = stub
|
|
self.session = profile_session
|
|
|
|
def test_batch_create_meetings(self) -> None:
|
|
"""Profile batch meeting creation."""
|
|
meeting_ids = []
|
|
start = time.perf_counter()
|
|
|
|
for i in range(10):
|
|
request = noteflow_pb2.CreateMeetingRequest(
|
|
title=f"Batch Profile Meeting {i} - {uuid4()}",
|
|
)
|
|
try:
|
|
response = self.stub.CreateMeeting(request, timeout=10, metadata=get_metadata())
|
|
meeting_ids.append(response.id)
|
|
except grpc.RpcError:
|
|
pass
|
|
|
|
duration_ms = (time.perf_counter() - start) * 1000
|
|
result = ProfileResult(
|
|
operation="batch_create",
|
|
duration_ms=duration_ms,
|
|
success=len(meeting_ids) == 10,
|
|
threshold_ms=THRESHOLDS["batch_create"],
|
|
details=f"created={len(meeting_ids)}/10",
|
|
)
|
|
self.session.add(result)
|
|
print(result)
|
|
|
|
# Cleanup - delete created meetings
|
|
for mid in meeting_ids:
|
|
try:
|
|
request = noteflow_pb2.DeleteMeetingRequest(meeting_id=mid)
|
|
self.stub.DeleteMeeting(request, timeout=10, metadata=get_metadata())
|
|
except grpc.RpcError:
|
|
pass
|
|
|
|
|
|
class TestAnalyticsProfiling:
|
|
"""Profile analytics operations."""
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _setup(
|
|
self,
|
|
stub: noteflow_pb2_grpc.NoteFlowServiceStub,
|
|
profile_session: ProfileSession,
|
|
) -> None:
|
|
self.stub = stub
|
|
self.session = profile_session
|
|
|
|
def test_get_analytics(self) -> None:
|
|
"""Profile analytics retrieval."""
|
|
start = time.perf_counter()
|
|
request = noteflow_pb2.GetAnalyticsOverviewRequest()
|
|
try:
|
|
response = self.stub.GetAnalyticsOverview(request, timeout=30, metadata=get_metadata())
|
|
duration_ms = (time.perf_counter() - start) * 1000
|
|
result = ProfileResult(
|
|
operation="get_analytics",
|
|
duration_ms=duration_ms,
|
|
success=True,
|
|
threshold_ms=THRESHOLDS["get_analytics"],
|
|
)
|
|
self.session.add(result)
|
|
print(result)
|
|
except grpc.RpcError as e:
|
|
duration_ms = (time.perf_counter() - start) * 1000
|
|
result = ProfileResult(
|
|
operation="get_analytics",
|
|
duration_ms=duration_ms,
|
|
success=False,
|
|
threshold_ms=THRESHOLDS["get_analytics"],
|
|
details=str(e.code()),
|
|
)
|
|
self.session.add(result)
|
|
# Don't raise - analytics might not be implemented
|
|
print(f"Analytics: {e.code()} (may not be implemented)")
|
|
|
|
|
|
class TestCleanupAndSummary:
|
|
"""Cleanup test data and print summary."""
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _setup(
|
|
self,
|
|
stub: noteflow_pb2_grpc.NoteFlowServiceStub,
|
|
profile_session: ProfileSession,
|
|
) -> None:
|
|
self.stub = stub
|
|
self.session = profile_session
|
|
|
|
def test_cleanup_and_summary(self) -> None:
|
|
"""Cleanup created meeting and print summary."""
|
|
meeting_id = getattr(pytest, "meeting_id", None)
|
|
if meeting_id:
|
|
try:
|
|
request = noteflow_pb2.DeleteMeetingRequest(meeting_id=meeting_id)
|
|
self.stub.DeleteMeeting(request, timeout=10, metadata=get_metadata())
|
|
print(f"Cleaned up meeting: {meeting_id}")
|
|
except grpc.RpcError as e:
|
|
print(f"Cleanup failed: {e.code()}")
|
|
|
|
# Print summary
|
|
print(self.session.summary())
|
|
|
|
# Assert all thresholds passed (optional - comment out for pure profiling)
|
|
failed = [r for r in self.session.results if not r.passed_threshold and r.success]
|
|
if failed:
|
|
print(f"\n[WARNING] {len(failed)} operations exceeded thresholds")
|