feat: centralize analytics service test fixtures and correct cache invalidation assertion logic.
Some checks failed
CI / test-python (push) Failing after 22m25s
CI / test-typescript (push) Failing after 5m56s
CI / test-rust (push) Failing after 6m56s

This commit is contained in:
2026-01-25 03:40:19 +00:00
parent 42c8fba642
commit 94d92b814f
9 changed files with 362 additions and 170 deletions

View File

@@ -16,7 +16,7 @@
* - Export operations
*/
import { expect, test, type Page } from '@playwright/test';
import { expect, test } from '@playwright/test';
import { callAPI, navigateTo, TEST_DATA, waitForAPI, waitForLoadingComplete } from './fixtures';
const shouldRun = process.env.NOTEFLOW_E2E === '1';
@@ -923,7 +923,7 @@ test.describe('Full Page Load Round Trip', () => {
// =============================================================================
test.afterAll(() => {
console.log('\n' + '='.repeat(80));
console.log(`\n${'='.repeat(80)}`);
console.log('FULL ROUND-TRIP PERFORMANCE PROFILING REPORT');
console.log('='.repeat(80));
@@ -941,7 +941,7 @@ test.afterAll(() => {
if (!byFlow.has(key)) {
byFlow.set(key, []);
}
byFlow.get(key)!.push(m);
byFlow.get(key)?.push(m);
}
console.log('\n--- BY OPERATION TYPE ---');
@@ -975,5 +975,5 @@ test.afterAll(() => {
console.log(` ${status} ${m.operation}: ${m.duration.toFixed(0)}ms${sizeInfo}`);
}
console.log('\n' + '='.repeat(80));
console.log(`\n${'='.repeat(80)}`);
});

View File

@@ -12,7 +12,7 @@
* - UI access paths and navigation
*/
import { expect, test, type Page } from '@playwright/test';
import { expect, test } from '@playwright/test';
import { callAPI, navigateTo, TEST_DATA, waitForAPI, waitForLoadingComplete } from './fixtures';
const shouldRun = process.env.NOTEFLOW_E2E === '1';

View File

@@ -8,7 +8,7 @@
* NOTEFLOW_E2E=1 NOTEFLOW_E2E_BASE_URL=http://192.168.50.151:5173 npx playwright test streaming-profile.spec.ts
*/
import { test, expect, type Page } from '@playwright/test';
import { test, expect } from '@playwright/test';
import {
waitForAppReady,
navigateTo,
@@ -375,7 +375,7 @@ test.describe('Real-time Event Profiling', () => {
'CONNECTION_CHANGE',
];
events.forEach((event) => {
events.forEach((_event) => {
const handler = () => {};
listeners.push(handler);
});
@@ -396,7 +396,7 @@ test.describe('Real-time Event Profiling', () => {
100,
async () => {
return page.evaluate(() => {
const conn = (window as Record<string, unknown>).__NOTEFLOW_CONNECTION__ as {
const conn = (window as unknown as Record<string, unknown>).__NOTEFLOW_CONNECTION__ as {
getConnectionState?: () => unknown;
};
return conn?.getConnectionState?.() ?? null;

View File

@@ -27,7 +27,6 @@ export function createDedupKey(command: string, args: unknown): string {
*/
export class InFlightRequestMap<T> {
private map: Map<string, InFlightEntry<T>> = new Map();
private cleanupTimer: ReturnType<typeof setInterval> | null = null;
constructor() {
this.startCleanupSweep();
@@ -94,7 +93,7 @@ export class InFlightRequestMap<T> {
* Starts periodic cleanup sweep to remove expired entries.
*/
private startCleanupSweep(): void {
this.cleanupTimer = setInterval(() => {
setInterval(() => {
const now = Date.now();
const keysToDelete: string[] = [];

View File

@@ -0,0 +1,83 @@
"""Fixtures for analytics service tests."""
from __future__ import annotations
from typing import Final
from unittest.mock import MagicMock
import pytest
from noteflow.application.services.analytics import AnalyticsService
from noteflow.domain.entities.analytics import (
AnalyticsOverview,
SpeakerStat,
)
EXPECTED_TOTAL_MEETINGS: Final[int] = 1
EXPECTED_TOTAL_DURATION: Final[float] = 1800.0
EXPECTED_TOTAL_WORDS: Final[int] = 2500
EXPECTED_TOTAL_SEGMENTS: Final[int] = 50
EXPECTED_SPEAKER_COUNT: Final[int] = 2
SPEAKER_ALICE_TIME: Final[float] = 900.0
SPEAKER_ALICE_SEGMENTS: Final[int] = 25
SPEAKER_ALICE_MEETINGS: Final[int] = 1
SPEAKER_ALICE_CONFIDENCE: Final[float] = 0.95
SPEAKER_BOB_TIME: Final[float] = 900.0
SPEAKER_BOB_SEGMENTS: Final[int] = 25
SPEAKER_BOB_MEETINGS: Final[int] = 1
SPEAKER_BOB_CONFIDENCE: Final[float] = 0.93
CACHE_TTL_SECONDS: Final[int] = 60
@pytest.fixture
def analytics_uow_factory() -> MagicMock:
"""Create a mock UoW factory for analytics service tests."""
return MagicMock()
@pytest.fixture
def analytics_service(analytics_uow_factory: MagicMock) -> AnalyticsService:
"""Create an AnalyticsService instance with mocked dependencies."""
return AnalyticsService(
uow_factory=analytics_uow_factory,
cache_ttl_seconds=CACHE_TTL_SECONDS,
)
@pytest.fixture
def sample_overview() -> AnalyticsOverview:
"""Create a sample AnalyticsOverview for testing."""
return AnalyticsOverview(
daily=[],
total_meetings=EXPECTED_TOTAL_MEETINGS,
total_duration=EXPECTED_TOTAL_DURATION,
total_words=EXPECTED_TOTAL_WORDS,
total_segments=EXPECTED_TOTAL_SEGMENTS,
speaker_count=EXPECTED_SPEAKER_COUNT,
)
@pytest.fixture
def sample_speaker_stats() -> list[SpeakerStat]:
"""Create sample SpeakerStat list for testing."""
return [
SpeakerStat(
speaker_id="alice-001",
display_name="Alice",
total_time=SPEAKER_ALICE_TIME,
segment_count=SPEAKER_ALICE_SEGMENTS,
meeting_count=SPEAKER_ALICE_MEETINGS,
avg_confidence=SPEAKER_ALICE_CONFIDENCE,
),
SpeakerStat(
speaker_id="bob-002",
display_name="Bob",
total_time=SPEAKER_BOB_TIME,
segment_count=SPEAKER_BOB_SEGMENTS,
meeting_count=SPEAKER_BOB_MEETINGS,
avg_confidence=SPEAKER_BOB_CONFIDENCE,
),
]

View File

@@ -76,27 +76,19 @@ async def _setup_mock_uow_with_speaker_stats(
async def _verify_cache_hit_then_db_hit(
mock_uow: MagicMock,
) -> None:
"""Verify cache was hit first, then DB was hit after invalidation."""
assert (
mock_uow.analytics.get_overview_fast.call_count == EXPECTED_DB_CALLS_FIRST
), "First query should hit DB"
"""Verify DB was hit twice: once initially, once after invalidation."""
assert (
mock_uow.analytics.get_overview_fast.call_count == EXPECTED_DB_CALLS_AFTER_INVALIDATION
), "Second query should hit DB after invalidation"
assert (
mock_uow.analytics.get_overview_fast.call_count == EXPECTED_DB_CALLS_AFTER_INVALIDATION
), "Second query should hit DB after invalidation"
), "Should hit DB twice: initial query + post-invalidation query"
async def _verify_independent_workspace_caches(
mock_uow: MagicMock,
) -> None:
"""Verify caches are independent - invalidating one doesn't affect other."""
assert (
mock_uow.analytics.get_overview_fast.call_count == EXPECTED_TWO_CACHE_ENTRIES
), "Should have cached two workspaces"
assert mock_uow.analytics.get_overview_fast.call_count == EXPECTED_TWO_CACHE_ENTRIES, (
"Should have cached two workspaces"
)
class TestAnalyticsCacheInvalidation:
@@ -146,12 +138,12 @@ class TestAnalyticsCacheInvalidation:
await analytics_service.get_overview(workspace_id)
await analytics_service.get_speaker_stats(workspace_id)
assert (
mock_uow.analytics.get_overview_fast.call_count == EXPECTED_DB_CALLS_FIRST
), "Overview should have been queried once"
assert (
mock_uow.analytics.get_speaker_stats_fast.call_count == EXPECTED_DB_CALLS_FIRST
), "Speaker stats should have been queried once"
assert mock_uow.analytics.get_overview_fast.call_count == EXPECTED_DB_CALLS_FIRST, (
"Overview should have been queried once"
)
assert mock_uow.analytics.get_speaker_stats_fast.call_count == EXPECTED_DB_CALLS_FIRST, (
"Speaker stats should have been queried once"
)
analytics_service.invalidate_cache(workspace_id)
await analytics_service.get_overview(workspace_id)
@@ -180,9 +172,9 @@ class TestAnalyticsCacheInvalidation:
await analytics_service.get_overview(workspace_id_1)
await analytics_service.get_overview(workspace_id_2)
assert (
mock_uow.analytics.get_overview_fast.call_count == EXPECTED_TWO_CACHE_ENTRIES
), "Should have queried DB twice"
assert mock_uow.analytics.get_overview_fast.call_count == EXPECTED_TWO_CACHE_ENTRIES, (
"Should have queried DB twice"
)
analytics_service.invalidate_cache(None)
@@ -216,6 +208,7 @@ class TestAnalyticsCacheInvalidation:
await analytics_service.get_overview(workspace_id_1)
await analytics_service.get_overview(workspace_id_2)
assert (
mock_uow.analytics.get_overview_fast.call_count == EXPECTED_TWO_CACHE_ENTRIES * 2
), "Invalidating one workspace should not affect other's cache"
expected_calls = EXPECTED_TWO_CACHE_ENTRIES + EXPECTED_SINGLE_CACHE_ENTRY
assert mock_uow.analytics.get_overview_fast.call_count == expected_calls, (
"Should hit DB 3 times: 2 initial + 1 for invalidated workspace (other stays cached)"
)

View File

@@ -45,6 +45,12 @@ class ListMeetingsRequestProto(Protocol):
@property
def project_id(self) -> str: ...
@property
def include_segments(self) -> bool: ...
@property
def include_summary(self) -> bool: ...
class GetMeetingRequestProto(Protocol):
@property
@@ -66,8 +72,14 @@ class FinalSegmentProto(Protocol):
text: str
class ActionItemProto(Protocol):
text: str
assignee: str
class SummaryProto(Protocol):
executive_summary: str
action_items: Sequence[ActionItemProto]
class MeetingProto(Protocol):

View File

@@ -662,6 +662,78 @@ class TestListMeetings:
assert MeetingState.RECORDING in call_kwargs["states"], "RECORDING should be in filter"
assert MeetingState.STOPPED in call_kwargs["states"], "STOPPED should be in filter"
async def test_list_meetings_includes_summary_when_requested(
self,
meeting_mixin_servicer: MeetingServicerProtocol,
meeting_mixin_meetings_repo: AsyncMock,
meeting_mixin_summaries_repo: AsyncMock,
mock_grpc_context: MagicMock,
) -> None:
"""ListMeetings loads summaries when include_summary is True."""
from noteflow.domain.entities import ActionItem, Summary
meeting_id = MeetingId(uuid4())
meeting = Meeting.create(title="Meeting with Summary")
meeting.id = meeting_id
meeting_mixin_meetings_repo.list_all.return_value = ([meeting], 1)
summary = Summary(
meeting_id=meeting_id,
executive_summary="Productive meeting.",
key_points=[],
action_items=[ActionItem(text="Follow up")],
)
meeting_mixin_summaries_repo.get_by_meeting.return_value = summary
request = noteflow_pb2.ListMeetingsRequest(include_summary=True)
response = await meeting_mixin_servicer.ListMeetings(request, mock_grpc_context)
meeting_response = response.meetings[0]
assert meeting_response.summary is not None, "Summary should be present"
assert meeting_response.summary.executive_summary == "Productive meeting.", (
"Executive summary should match"
)
assert len(meeting_response.summary.action_items) == 1, "Should have 1 action item"
meeting_mixin_summaries_repo.get_by_meeting.assert_called_once_with(meeting.id)
async def test_list_meetings_excludes_summary_when_not_requested(
self,
meeting_mixin_servicer: MeetingServicerProtocol,
meeting_mixin_meetings_repo: AsyncMock,
meeting_mixin_summaries_repo: AsyncMock,
mock_grpc_context: MagicMock,
) -> None:
"""ListMeetings does not load summaries when include_summary is False."""
meeting = Meeting.create(title="Meeting without Summary")
meeting_mixin_meetings_repo.list_all.return_value = ([meeting], 1)
request: ListMeetingsRequestProto = noteflow_pb2.ListMeetingsRequest(include_summary=False)
await meeting_mixin_servicer.ListMeetings(request, mock_grpc_context)
meeting_mixin_summaries_repo.get_by_meeting.assert_not_called()
async def test_list_meetings_excludes_summary_by_default(
self,
meeting_mixin_servicer: MeetingServicerProtocol,
meeting_mixin_meetings_repo: AsyncMock,
meeting_mixin_summaries_repo: AsyncMock,
mock_grpc_context: MagicMock,
) -> None:
"""ListMeetings excludes summaries by default when include_summary not specified."""
meeting = Meeting.create(title="Meeting")
meeting_mixin_meetings_repo.list_all.return_value = ([meeting], 1)
request: ListMeetingsRequestProto = noteflow_pb2.ListMeetingsRequest()
response: ListMeetingsResponseProto = await meeting_mixin_servicer.ListMeetings(
request, mock_grpc_context
)
meeting_mixin_summaries_repo.get_by_meeting.assert_not_called()
meeting_response = response.meetings[0]
assert meeting_response.summary is not None, "Summary field should exist"
assert meeting_response.summary.executive_summary == "", (
"Summary should be empty by default"
)
# ============================================================================
# TestGetMeeting

View File

@@ -9,11 +9,11 @@ Run with:
from __future__ import annotations
import asyncio
import os
import statistics
import time
from dataclasses import dataclass, field
from collections.abc import Callable
from typing import TYPE_CHECKING, Final
from uuid import uuid4
@@ -21,6 +21,9 @@ import grpc
import pytest
from noteflow.grpc.proto import noteflow_pb2, noteflow_pb2_grpc
from noteflow.infrastructure.logging import get_logger
logger = get_logger(__name__)
if TYPE_CHECKING:
from collections.abc import Generator
@@ -28,6 +31,12 @@ if TYPE_CHECKING:
# Configuration
DEFAULT_SERVER: Final[str] = "localhost:50051"
GRPC_TARGET: Final[str] = os.environ.get("NOTEFLOW_GRPC_TARGET", DEFAULT_SERVER)
GRPC_TIMEOUT_SECONDS: Final[int] = 10
CONCURRENT_WORKERS: Final[int] = 5
BATCH_SIZE: Final[int] = 10
LIST_LIMIT: Final[int] = 10
LIST_MEETINGS_LIMIT: Final[int] = 50
# Required gRPC metadata
def get_metadata() -> tuple[tuple[str, str], ...]:
@@ -37,6 +46,7 @@ def get_metadata() -> tuple[tuple[str, str], ...]:
("x-workspace-id", "00000000-0000-0000-0000-000000000001"),
)
# Profiling thresholds (milliseconds)
THRESHOLDS = {
"create_meeting": 500,
@@ -77,6 +87,26 @@ class ProfileResult:
return f"[{status}] {self.operation}: {self.duration_ms:.2f}ms (threshold: {self.threshold_ms}ms)"
def _record_profile_result(
session: "ProfileSession",
operation: str,
duration_ms: float,
success: bool,
details: str = "",
) -> ProfileResult:
"""Record a profile result and log it."""
result = ProfileResult(
operation=operation,
duration_ms=duration_ms,
success=success,
threshold_ms=THRESHOLDS.get(operation, 1000),
details=details,
)
session.add(result)
logger.info(str(result))
return result
@dataclass
class ProfileSession:
"""Collection of profiling results."""
@@ -88,7 +118,7 @@ class ProfileSession:
self.results.append(result)
def summary(self) -> str:
lines = [f"\n{'='*60}", f"PROFILING SUMMARY - Server: {self.server}", "=" * 60]
lines = [f"\n{'=' * 60}", f"PROFILING SUMMARY - Server: {self.server}", "=" * 60]
passed = sum(1 for r in self.results if r.passed_threshold)
total = len(self.results)
lines.append(f"Results: {passed}/{total} passed thresholds\n")
@@ -101,7 +131,9 @@ class ProfileSession:
lines.append(f"\nStatistics:")
lines.append(f" Mean: {statistics.mean(durations):.2f}ms")
lines.append(f" Median: {statistics.median(durations):.2f}ms")
lines.append(f" Stdev: {statistics.stdev(durations):.2f}ms" if len(durations) > 1 else "")
lines.append(
f" Stdev: {statistics.stdev(durations):.2f}ms" if len(durations) > 1 else ""
)
lines.append(f" Min: {min(durations):.2f}ms")
lines.append(f" Max: {max(durations):.2f}ms")
@@ -121,7 +153,7 @@ def grpc_channel() -> Generator[grpc.Channel, None, None]:
)
# Wait for channel to be ready
try:
grpc.channel_ready_future(channel).result(timeout=10)
grpc.channel_ready_future(channel).result(timeout=GRPC_TIMEOUT_SECONDS)
except grpc.FutureTimeoutError:
pytest.skip(f"Could not connect to gRPC server at {GRPC_TARGET}")
yield channel
@@ -142,16 +174,16 @@ def profile_session() -> ProfileSession:
def profile_operation(
operation: str, threshold_ms: float | None = None
) -> callable:
) -> Callable[[Callable[..., object]], Callable[..., object]]:
"""Decorator to profile an operation."""
def decorator(func: callable) -> callable:
def decorator(func: Callable[..., object]) -> Callable[..., object]:
def wrapper(
stub: noteflow_pb2_grpc.NoteFlowServiceStub,
profile_session: ProfileSession,
*args,
**kwargs,
):
*args: object,
**kwargs: object,
) -> object:
threshold = threshold_ms or THRESHOLDS.get(operation, 1000)
start = time.perf_counter()
try:
@@ -164,7 +196,7 @@ def profile_operation(
threshold_ms=threshold,
)
profile_session.add(profile_result)
print(profile_result)
logger.info(str(profile_result))
return result
except grpc.RpcError as e:
duration_ms = (time.perf_counter() - start) * 1000
@@ -176,7 +208,7 @@ def profile_operation(
details=str(e.code()),
)
profile_session.add(profile_result)
print(f"{operation}: FAILED - {e.code()}")
logger.info(f"{operation}: FAILED - {e.code()}")
raise
return wrapper
@@ -204,7 +236,9 @@ class TestMeetingCRUDProfiling:
title=f"Profile Test Meeting {uuid4()}",
)
try:
response = self.stub.CreateMeeting(request, timeout=10, metadata=get_metadata())
response = self.stub.CreateMeeting(
request, timeout=GRPC_TIMEOUT_SECONDS, metadata=get_metadata()
)
duration_ms = (time.perf_counter() - start) * 1000
self.meeting_id = response.id
result = ProfileResult(
@@ -214,9 +248,7 @@ class TestMeetingCRUDProfiling:
threshold_ms=THRESHOLDS["create_meeting"],
)
self.session.add(result)
print(result)
# Store for subsequent tests
pytest.meeting_id = response.id
logger.info(str(result))
assert response.id
except grpc.RpcError as e:
duration_ms = (time.perf_counter() - start) * 1000
@@ -239,7 +271,9 @@ class TestMeetingCRUDProfiling:
start = time.perf_counter()
request = noteflow_pb2.GetMeetingRequest(meeting_id=meeting_id)
try:
response = self.stub.GetMeeting(request, timeout=10, metadata=get_metadata())
response = self.stub.GetMeeting(
request, timeout=GRPC_TIMEOUT_SECONDS, metadata=get_metadata()
)
duration_ms = (time.perf_counter() - start) * 1000
result = ProfileResult(
operation="get_meeting",
@@ -248,7 +282,7 @@ class TestMeetingCRUDProfiling:
threshold_ms=THRESHOLDS["get_meeting"],
)
self.session.add(result)
print(result)
logger.info(str(result))
assert response.id == meeting_id
except grpc.RpcError as e:
duration_ms = (time.perf_counter() - start) * 1000
@@ -265,9 +299,11 @@ class TestMeetingCRUDProfiling:
def test_03_list_meetings(self) -> None:
"""Profile meeting listing."""
start = time.perf_counter()
request = noteflow_pb2.ListMeetingsRequest(limit=50)
request = noteflow_pb2.ListMeetingsRequest(limit=LIST_MEETINGS_LIMIT)
try:
response = self.stub.ListMeetings(request, timeout=10, metadata=get_metadata())
response = self.stub.ListMeetings(
request, timeout=GRPC_TIMEOUT_SECONDS, metadata=get_metadata()
)
duration_ms = (time.perf_counter() - start) * 1000
result = ProfileResult(
operation="list_meetings",
@@ -277,7 +313,7 @@ class TestMeetingCRUDProfiling:
details=f"count={len(response.meetings)}",
)
self.session.add(result)
print(result)
logger.info(str(result))
except grpc.RpcError as e:
duration_ms = (time.perf_counter() - start) * 1000
result = ProfileResult(
@@ -295,32 +331,43 @@ class TestMeetingCRUDProfiling:
meeting_id = getattr(pytest, "meeting_id", None)
if not meeting_id:
pytest.skip("No meeting created")
_profile_stop_meeting(self.stub, self.session, meeting_id)
start = time.perf_counter()
request = noteflow_pb2.StopMeetingRequest(meeting_id=meeting_id)
try:
response = self.stub.StopMeeting(request, timeout=10, metadata=get_metadata())
duration_ms = (time.perf_counter() - start) * 1000
result = ProfileResult(
operation="stop_meeting",
duration_ms=duration_ms,
success=True,
threshold_ms=THRESHOLDS.get("stop_meeting", 500),
)
self.session.add(result)
print(result)
except grpc.RpcError as e:
duration_ms = (time.perf_counter() - start) * 1000
# Stop on non-recording meeting might fail, but we still capture timing
result = ProfileResult(
operation="stop_meeting",
duration_ms=duration_ms,
success=False,
threshold_ms=THRESHOLDS.get("stop_meeting", 500),
details=str(e.code()),
)
self.session.add(result)
print(result) # Don't raise - expected to fail on non-recording meeting
def _profile_list_segments(
stub: noteflow_pb2_grpc.NoteFlowServiceStub,
session: ProfileSession,
meeting_id: str,
) -> None:
start = time.perf_counter()
request = noteflow_pb2.GetMeetingRequest(meeting_id=meeting_id, include_segments=True)
try:
response = stub.GetMeeting(request, timeout=GRPC_TIMEOUT_SECONDS, metadata=get_metadata())
duration_ms = (time.perf_counter() - start) * 1000
_record_profile_result(
session, "list_segments", duration_ms, True, f"count={len(response.segments)}"
)
except grpc.RpcError as e:
duration_ms = (time.perf_counter() - start) * 1000
_record_profile_result(session, "list_segments", duration_ms, False, str(e.code()))
raise
def _profile_stop_meeting(
stub: noteflow_pb2_grpc.NoteFlowServiceStub,
session: ProfileSession,
meeting_id: str,
) -> None:
start = time.perf_counter()
request = noteflow_pb2.StopMeetingRequest(meeting_id=meeting_id)
try:
stub.StopMeeting(request, timeout=GRPC_TIMEOUT_SECONDS, metadata=get_metadata())
duration_ms = (time.perf_counter() - start) * 1000
_record_profile_result(session, "stop_meeting", duration_ms, True)
except grpc.RpcError as e:
duration_ms = (time.perf_counter() - start) * 1000
# Stop on non-recording meeting might fail, but we still capture timing
_record_profile_result(session, "stop_meeting", duration_ms, False, str(e.code()))
class TestSegmentProfiling:
@@ -340,35 +387,7 @@ class TestSegmentProfiling:
meeting_id = getattr(pytest, "meeting_id", None)
if not meeting_id:
pytest.skip("No meeting created")
start = time.perf_counter()
request = noteflow_pb2.GetMeetingRequest(
meeting_id=meeting_id,
include_segments=True,
)
try:
response = self.stub.GetMeeting(request, timeout=10, metadata=get_metadata())
duration_ms = (time.perf_counter() - start) * 1000
result = ProfileResult(
operation="list_segments",
duration_ms=duration_ms,
success=True,
threshold_ms=THRESHOLDS["list_segments"],
details=f"count={len(response.segments)}",
)
self.session.add(result)
print(result)
except grpc.RpcError as e:
duration_ms = (time.perf_counter() - start) * 1000
result = ProfileResult(
operation="list_segments",
duration_ms=duration_ms,
success=False,
threshold_ms=THRESHOLDS["list_segments"],
details=str(e.code()),
)
self.session.add(result)
raise
_profile_list_segments(self.stub, self.session, meeting_id)
class TestAnnotationProfiling:
@@ -396,7 +415,7 @@ class TestAnnotationProfiling:
text=f"Profile test annotation {uuid4()}",
)
try:
response = self.stub.AddAnnotation(request, timeout=10, metadata=get_metadata())
self.stub.AddAnnotation(request, timeout=GRPC_TIMEOUT_SECONDS, metadata=get_metadata())
duration_ms = (time.perf_counter() - start) * 1000
result = ProfileResult(
operation="create_annotation",
@@ -405,8 +424,7 @@ class TestAnnotationProfiling:
threshold_ms=THRESHOLDS["create_annotation"],
)
self.session.add(result)
print(result)
pytest.annotation_id = response.id
logger.info(str(result))
except grpc.RpcError as e:
duration_ms = (time.perf_counter() - start) * 1000
result = ProfileResult(
@@ -428,7 +446,9 @@ class TestAnnotationProfiling:
start = time.perf_counter()
request = noteflow_pb2.ListAnnotationsRequest(meeting_id=meeting_id)
try:
response = self.stub.ListAnnotations(request, timeout=10, metadata=get_metadata())
response = self.stub.ListAnnotations(
request, timeout=GRPC_TIMEOUT_SECONDS, metadata=get_metadata()
)
duration_ms = (time.perf_counter() - start) * 1000
result = ProfileResult(
operation="list_annotations",
@@ -438,7 +458,7 @@ class TestAnnotationProfiling:
details=f"count={len(response.annotations)}",
)
self.session.add(result)
print(result)
logger.info(str(result))
except grpc.RpcError as e:
duration_ms = (time.perf_counter() - start) * 1000
result = ProfileResult(
@@ -469,7 +489,7 @@ class TestPreferencesProfiling:
start = time.perf_counter()
request = noteflow_pb2.GetPreferencesRequest()
try:
response = self.stub.GetPreferences(request, timeout=10, metadata=get_metadata())
self.stub.GetPreferences(request, timeout=GRPC_TIMEOUT_SECONDS, metadata=get_metadata())
duration_ms = (time.perf_counter() - start) * 1000
result = ProfileResult(
operation="get_preferences",
@@ -478,7 +498,7 @@ class TestPreferencesProfiling:
threshold_ms=THRESHOLDS["get_preferences"],
)
self.session.add(result)
print(result)
logger.info(str(result))
except grpc.RpcError as e:
duration_ms = (time.perf_counter() - start) * 1000
result = ProfileResult(
@@ -492,6 +512,26 @@ class TestPreferencesProfiling:
raise
def _profile_concurrent_reads(
stub: noteflow_pb2_grpc.NoteFlowServiceStub,
session: ProfileSession,
) -> None:
import concurrent.futures
def read_meetings() -> object:
request = noteflow_pb2.ListMeetingsRequest(limit=LIST_LIMIT)
return stub.ListMeetings(request, timeout=GRPC_TIMEOUT_SECONDS, metadata=get_metadata())
start = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor(max_workers=CONCURRENT_WORKERS) as executor:
futures = [executor.submit(read_meetings) for _ in range(CONCURRENT_WORKERS)]
_ = [f.result() for f in concurrent.futures.as_completed(futures)]
duration_ms = (time.perf_counter() - start) * 1000
_record_profile_result(
session, "concurrent_reads", duration_ms, True, f"{CONCURRENT_WORKERS} concurrent requests"
)
class TestConcurrentProfiling:
"""Profile concurrent operations."""
@@ -506,27 +546,35 @@ class TestConcurrentProfiling:
def test_concurrent_reads(self) -> None:
"""Profile concurrent read operations."""
import concurrent.futures
_profile_concurrent_reads(self.stub, self.session)
def read_meetings():
request = noteflow_pb2.ListMeetingsRequest(limit=10)
return self.stub.ListMeetings(request, timeout=10, metadata=get_metadata())
start = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(read_meetings) for _ in range(5)]
results = [f.result() for f in concurrent.futures.as_completed(futures)]
def _create_batch_meetings(
stub: noteflow_pb2_grpc.NoteFlowServiceStub,
) -> list[str]:
meeting_ids: list[str] = []
for i in range(BATCH_SIZE):
request = noteflow_pb2.CreateMeetingRequest(title=f"Batch Profile Meeting {i} - {uuid4()}")
try:
response = stub.CreateMeeting(
request, timeout=GRPC_TIMEOUT_SECONDS, metadata=get_metadata()
)
meeting_ids.append(response.id)
except grpc.RpcError:
pass
return meeting_ids
duration_ms = (time.perf_counter() - start) * 1000
result = ProfileResult(
operation="concurrent_reads",
duration_ms=duration_ms,
success=True,
threshold_ms=THRESHOLDS["concurrent_reads"],
details=f"5 concurrent requests",
)
self.session.add(result)
print(result)
def _cleanup_batch_meetings(
stub: noteflow_pb2_grpc.NoteFlowServiceStub,
meeting_ids: list[str],
) -> None:
for mid in meeting_ids:
try:
request = noteflow_pb2.DeleteMeetingRequest(meeting_id=mid)
stub.DeleteMeeting(request, timeout=GRPC_TIMEOUT_SECONDS, metadata=get_metadata())
except grpc.RpcError:
pass
class TestBatchProfiling:
@@ -543,37 +591,18 @@ class TestBatchProfiling:
def test_batch_create_meetings(self) -> None:
"""Profile batch meeting creation."""
meeting_ids = []
start = time.perf_counter()
for i in range(10):
request = noteflow_pb2.CreateMeetingRequest(
title=f"Batch Profile Meeting {i} - {uuid4()}",
)
try:
response = self.stub.CreateMeeting(request, timeout=10, metadata=get_metadata())
meeting_ids.append(response.id)
except grpc.RpcError:
pass
meeting_ids = _create_batch_meetings(self.stub)
duration_ms = (time.perf_counter() - start) * 1000
result = ProfileResult(
operation="batch_create",
duration_ms=duration_ms,
success=len(meeting_ids) == 10,
threshold_ms=THRESHOLDS["batch_create"],
details=f"created={len(meeting_ids)}/10",
success = len(meeting_ids) == BATCH_SIZE
_record_profile_result(
self.session,
"batch_create",
duration_ms,
success,
f"created={len(meeting_ids)}/{BATCH_SIZE}",
)
self.session.add(result)
print(result)
# Cleanup - delete created meetings
for mid in meeting_ids:
try:
request = noteflow_pb2.DeleteMeetingRequest(meeting_id=mid)
self.stub.DeleteMeeting(request, timeout=10, metadata=get_metadata())
except grpc.RpcError:
pass
_cleanup_batch_meetings(self.stub, meeting_ids)
class TestAnalyticsProfiling:
@@ -593,7 +622,9 @@ class TestAnalyticsProfiling:
start = time.perf_counter()
request = noteflow_pb2.GetAnalyticsOverviewRequest()
try:
response = self.stub.GetAnalyticsOverview(request, timeout=30, metadata=get_metadata())
self.stub.GetAnalyticsOverview(
request, timeout=GRPC_TIMEOUT_SECONDS * 3, metadata=get_metadata()
)
duration_ms = (time.perf_counter() - start) * 1000
result = ProfileResult(
operation="get_analytics",
@@ -602,7 +633,7 @@ class TestAnalyticsProfiling:
threshold_ms=THRESHOLDS["get_analytics"],
)
self.session.add(result)
print(result)
logger.info(str(result))
except grpc.RpcError as e:
duration_ms = (time.perf_counter() - start) * 1000
result = ProfileResult(
@@ -614,7 +645,7 @@ class TestAnalyticsProfiling:
)
self.session.add(result)
# Don't raise - analytics might not be implemented
print(f"Analytics: {e.code()} (may not be implemented)")
logger.info(f"Analytics: {e.code()} (may not be implemented)")
class TestCleanupAndSummary:
@@ -635,15 +666,17 @@ class TestCleanupAndSummary:
if meeting_id:
try:
request = noteflow_pb2.DeleteMeetingRequest(meeting_id=meeting_id)
self.stub.DeleteMeeting(request, timeout=10, metadata=get_metadata())
print(f"Cleaned up meeting: {meeting_id}")
self.stub.DeleteMeeting(
request, timeout=GRPC_TIMEOUT_SECONDS, metadata=get_metadata()
)
logger.info(f"Cleaned up meeting: {meeting_id}")
except grpc.RpcError as e:
print(f"Cleanup failed: {e.code()}")
logger.info(f"Cleanup failed: {e.code()}")
# Print summary
print(self.session.summary())
logger.info(self.session.summary())
# Assert all thresholds passed (optional - comment out for pure profiling)
failed = [r for r in self.session.results if not r.passed_threshold and r.success]
if failed:
print(f"\n[WARNING] {len(failed)} operations exceeded thresholds")
logger.info(f"\n[WARNING] {len(failed)} operations exceeded thresholds")