From 94d92b814ffe467bd3fb3072ca86262c32d31031 Mon Sep 17 00:00:00 2001 From: Travis Vasceannie Date: Sun, 25 Jan 2026 03:40:19 +0000 Subject: [PATCH] feat: centralize analytics service test fixtures and correct cache invalidation assertion logic. --- client/e2e/full-roundtrip-profile.spec.ts | 8 +- client/e2e/performance-profile.spec.ts | 2 +- client/e2e/streaming-profile.spec.ts | 6 +- client/src/lib/request/dedup.ts | 3 +- .../services/analytics/conftest.py | 83 +++++ .../analytics/test_cache_invalidation.py | 43 ++- tests/grpc/proto_types.py | 12 + tests/grpc/test_meeting_mixin.py | 72 +++++ .../test_backend_roundtrip_profile.py | 303 ++++++++++-------- 9 files changed, 362 insertions(+), 170 deletions(-) create mode 100644 tests/application/services/analytics/conftest.py diff --git a/client/e2e/full-roundtrip-profile.spec.ts b/client/e2e/full-roundtrip-profile.spec.ts index 433ff2f..cd444d6 100644 --- a/client/e2e/full-roundtrip-profile.spec.ts +++ b/client/e2e/full-roundtrip-profile.spec.ts @@ -16,7 +16,7 @@ * - Export operations */ -import { expect, test, type Page } from '@playwright/test'; +import { expect, test } from '@playwright/test'; import { callAPI, navigateTo, TEST_DATA, waitForAPI, waitForLoadingComplete } from './fixtures'; const shouldRun = process.env.NOTEFLOW_E2E === '1'; @@ -923,7 +923,7 @@ test.describe('Full Page Load Round Trip', () => { // ============================================================================= test.afterAll(() => { - console.log('\n' + '='.repeat(80)); + console.log(`\n${'='.repeat(80)}`); console.log('FULL ROUND-TRIP PERFORMANCE PROFILING REPORT'); console.log('='.repeat(80)); @@ -941,7 +941,7 @@ test.afterAll(() => { if (!byFlow.has(key)) { byFlow.set(key, []); } - byFlow.get(key)!.push(m); + byFlow.get(key)?.push(m); } console.log('\n--- BY OPERATION TYPE ---'); @@ -975,5 +975,5 @@ test.afterAll(() => { console.log(` ${status} ${m.operation}: ${m.duration.toFixed(0)}ms${sizeInfo}`); } - console.log('\n' + '='.repeat(80)); + console.log(`\n${'='.repeat(80)}`); }); diff --git a/client/e2e/performance-profile.spec.ts b/client/e2e/performance-profile.spec.ts index d59fb4d..bb137b1 100644 --- a/client/e2e/performance-profile.spec.ts +++ b/client/e2e/performance-profile.spec.ts @@ -12,7 +12,7 @@ * - UI access paths and navigation */ -import { expect, test, type Page } from '@playwright/test'; +import { expect, test } from '@playwright/test'; import { callAPI, navigateTo, TEST_DATA, waitForAPI, waitForLoadingComplete } from './fixtures'; const shouldRun = process.env.NOTEFLOW_E2E === '1'; diff --git a/client/e2e/streaming-profile.spec.ts b/client/e2e/streaming-profile.spec.ts index 4ba378b..8683eae 100644 --- a/client/e2e/streaming-profile.spec.ts +++ b/client/e2e/streaming-profile.spec.ts @@ -8,7 +8,7 @@ * NOTEFLOW_E2E=1 NOTEFLOW_E2E_BASE_URL=http://192.168.50.151:5173 npx playwright test streaming-profile.spec.ts */ -import { test, expect, type Page } from '@playwright/test'; +import { test, expect } from '@playwright/test'; import { waitForAppReady, navigateTo, @@ -375,7 +375,7 @@ test.describe('Real-time Event Profiling', () => { 'CONNECTION_CHANGE', ]; - events.forEach((event) => { + events.forEach((_event) => { const handler = () => {}; listeners.push(handler); }); @@ -396,7 +396,7 @@ test.describe('Real-time Event Profiling', () => { 100, async () => { return page.evaluate(() => { - const conn = (window as Record).__NOTEFLOW_CONNECTION__ as { + const conn = (window as unknown as Record).__NOTEFLOW_CONNECTION__ as { getConnectionState?: () => unknown; }; return conn?.getConnectionState?.() ?? null; diff --git a/client/src/lib/request/dedup.ts b/client/src/lib/request/dedup.ts index e1d96f9..7883932 100644 --- a/client/src/lib/request/dedup.ts +++ b/client/src/lib/request/dedup.ts @@ -27,7 +27,6 @@ export function createDedupKey(command: string, args: unknown): string { */ export class InFlightRequestMap { private map: Map> = new Map(); - private cleanupTimer: ReturnType | null = null; constructor() { this.startCleanupSweep(); @@ -94,7 +93,7 @@ export class InFlightRequestMap { * Starts periodic cleanup sweep to remove expired entries. */ private startCleanupSweep(): void { - this.cleanupTimer = setInterval(() => { + setInterval(() => { const now = Date.now(); const keysToDelete: string[] = []; diff --git a/tests/application/services/analytics/conftest.py b/tests/application/services/analytics/conftest.py new file mode 100644 index 0000000..59db704 --- /dev/null +++ b/tests/application/services/analytics/conftest.py @@ -0,0 +1,83 @@ +"""Fixtures for analytics service tests.""" + +from __future__ import annotations + +from typing import Final +from unittest.mock import MagicMock + +import pytest + +from noteflow.application.services.analytics import AnalyticsService +from noteflow.domain.entities.analytics import ( + AnalyticsOverview, + SpeakerStat, +) + +EXPECTED_TOTAL_MEETINGS: Final[int] = 1 +EXPECTED_TOTAL_DURATION: Final[float] = 1800.0 +EXPECTED_TOTAL_WORDS: Final[int] = 2500 +EXPECTED_TOTAL_SEGMENTS: Final[int] = 50 +EXPECTED_SPEAKER_COUNT: Final[int] = 2 + +SPEAKER_ALICE_TIME: Final[float] = 900.0 +SPEAKER_ALICE_SEGMENTS: Final[int] = 25 +SPEAKER_ALICE_MEETINGS: Final[int] = 1 +SPEAKER_ALICE_CONFIDENCE: Final[float] = 0.95 + +SPEAKER_BOB_TIME: Final[float] = 900.0 +SPEAKER_BOB_SEGMENTS: Final[int] = 25 +SPEAKER_BOB_MEETINGS: Final[int] = 1 +SPEAKER_BOB_CONFIDENCE: Final[float] = 0.93 + +CACHE_TTL_SECONDS: Final[int] = 60 + + +@pytest.fixture +def analytics_uow_factory() -> MagicMock: + """Create a mock UoW factory for analytics service tests.""" + return MagicMock() + + +@pytest.fixture +def analytics_service(analytics_uow_factory: MagicMock) -> AnalyticsService: + """Create an AnalyticsService instance with mocked dependencies.""" + return AnalyticsService( + uow_factory=analytics_uow_factory, + cache_ttl_seconds=CACHE_TTL_SECONDS, + ) + + +@pytest.fixture +def sample_overview() -> AnalyticsOverview: + """Create a sample AnalyticsOverview for testing.""" + return AnalyticsOverview( + daily=[], + total_meetings=EXPECTED_TOTAL_MEETINGS, + total_duration=EXPECTED_TOTAL_DURATION, + total_words=EXPECTED_TOTAL_WORDS, + total_segments=EXPECTED_TOTAL_SEGMENTS, + speaker_count=EXPECTED_SPEAKER_COUNT, + ) + + +@pytest.fixture +def sample_speaker_stats() -> list[SpeakerStat]: + """Create sample SpeakerStat list for testing.""" + return [ + SpeakerStat( + speaker_id="alice-001", + display_name="Alice", + total_time=SPEAKER_ALICE_TIME, + segment_count=SPEAKER_ALICE_SEGMENTS, + meeting_count=SPEAKER_ALICE_MEETINGS, + avg_confidence=SPEAKER_ALICE_CONFIDENCE, + ), + SpeakerStat( + speaker_id="bob-002", + display_name="Bob", + total_time=SPEAKER_BOB_TIME, + segment_count=SPEAKER_BOB_SEGMENTS, + meeting_count=SPEAKER_BOB_MEETINGS, + avg_confidence=SPEAKER_BOB_CONFIDENCE, + ), + ] diff --git a/tests/application/services/analytics/test_cache_invalidation.py b/tests/application/services/analytics/test_cache_invalidation.py index fbbb133..464b1fb 100644 --- a/tests/application/services/analytics/test_cache_invalidation.py +++ b/tests/application/services/analytics/test_cache_invalidation.py @@ -76,27 +76,19 @@ async def _setup_mock_uow_with_speaker_stats( async def _verify_cache_hit_then_db_hit( mock_uow: MagicMock, ) -> None: - """Verify cache was hit first, then DB was hit after invalidation.""" - assert ( - mock_uow.analytics.get_overview_fast.call_count == EXPECTED_DB_CALLS_FIRST - ), "First query should hit DB" - + """Verify DB was hit twice: once initially, once after invalidation.""" assert ( mock_uow.analytics.get_overview_fast.call_count == EXPECTED_DB_CALLS_AFTER_INVALIDATION - ), "Second query should hit DB after invalidation" - - assert ( - mock_uow.analytics.get_overview_fast.call_count == EXPECTED_DB_CALLS_AFTER_INVALIDATION - ), "Second query should hit DB after invalidation" + ), "Should hit DB twice: initial query + post-invalidation query" async def _verify_independent_workspace_caches( mock_uow: MagicMock, ) -> None: """Verify caches are independent - invalidating one doesn't affect other.""" - assert ( - mock_uow.analytics.get_overview_fast.call_count == EXPECTED_TWO_CACHE_ENTRIES - ), "Should have cached two workspaces" + assert mock_uow.analytics.get_overview_fast.call_count == EXPECTED_TWO_CACHE_ENTRIES, ( + "Should have cached two workspaces" + ) class TestAnalyticsCacheInvalidation: @@ -146,12 +138,12 @@ class TestAnalyticsCacheInvalidation: await analytics_service.get_overview(workspace_id) await analytics_service.get_speaker_stats(workspace_id) - assert ( - mock_uow.analytics.get_overview_fast.call_count == EXPECTED_DB_CALLS_FIRST - ), "Overview should have been queried once" - assert ( - mock_uow.analytics.get_speaker_stats_fast.call_count == EXPECTED_DB_CALLS_FIRST - ), "Speaker stats should have been queried once" + assert mock_uow.analytics.get_overview_fast.call_count == EXPECTED_DB_CALLS_FIRST, ( + "Overview should have been queried once" + ) + assert mock_uow.analytics.get_speaker_stats_fast.call_count == EXPECTED_DB_CALLS_FIRST, ( + "Speaker stats should have been queried once" + ) analytics_service.invalidate_cache(workspace_id) await analytics_service.get_overview(workspace_id) @@ -180,9 +172,9 @@ class TestAnalyticsCacheInvalidation: await analytics_service.get_overview(workspace_id_1) await analytics_service.get_overview(workspace_id_2) - assert ( - mock_uow.analytics.get_overview_fast.call_count == EXPECTED_TWO_CACHE_ENTRIES - ), "Should have queried DB twice" + assert mock_uow.analytics.get_overview_fast.call_count == EXPECTED_TWO_CACHE_ENTRIES, ( + "Should have queried DB twice" + ) analytics_service.invalidate_cache(None) @@ -216,6 +208,7 @@ class TestAnalyticsCacheInvalidation: await analytics_service.get_overview(workspace_id_1) await analytics_service.get_overview(workspace_id_2) - assert ( - mock_uow.analytics.get_overview_fast.call_count == EXPECTED_TWO_CACHE_ENTRIES * 2 - ), "Invalidating one workspace should not affect other's cache" + expected_calls = EXPECTED_TWO_CACHE_ENTRIES + EXPECTED_SINGLE_CACHE_ENTRY + assert mock_uow.analytics.get_overview_fast.call_count == expected_calls, ( + "Should hit DB 3 times: 2 initial + 1 for invalidated workspace (other stays cached)" + ) diff --git a/tests/grpc/proto_types.py b/tests/grpc/proto_types.py index 8b16cdd..1b10010 100644 --- a/tests/grpc/proto_types.py +++ b/tests/grpc/proto_types.py @@ -45,6 +45,12 @@ class ListMeetingsRequestProto(Protocol): @property def project_id(self) -> str: ... + @property + def include_segments(self) -> bool: ... + + @property + def include_summary(self) -> bool: ... + class GetMeetingRequestProto(Protocol): @property @@ -66,8 +72,14 @@ class FinalSegmentProto(Protocol): text: str +class ActionItemProto(Protocol): + text: str + assignee: str + + class SummaryProto(Protocol): executive_summary: str + action_items: Sequence[ActionItemProto] class MeetingProto(Protocol): diff --git a/tests/grpc/test_meeting_mixin.py b/tests/grpc/test_meeting_mixin.py index a362c5b..59735ef 100644 --- a/tests/grpc/test_meeting_mixin.py +++ b/tests/grpc/test_meeting_mixin.py @@ -662,6 +662,78 @@ class TestListMeetings: assert MeetingState.RECORDING in call_kwargs["states"], "RECORDING should be in filter" assert MeetingState.STOPPED in call_kwargs["states"], "STOPPED should be in filter" + async def test_list_meetings_includes_summary_when_requested( + self, + meeting_mixin_servicer: MeetingServicerProtocol, + meeting_mixin_meetings_repo: AsyncMock, + meeting_mixin_summaries_repo: AsyncMock, + mock_grpc_context: MagicMock, + ) -> None: + """ListMeetings loads summaries when include_summary is True.""" + from noteflow.domain.entities import ActionItem, Summary + + meeting_id = MeetingId(uuid4()) + meeting = Meeting.create(title="Meeting with Summary") + meeting.id = meeting_id + meeting_mixin_meetings_repo.list_all.return_value = ([meeting], 1) + summary = Summary( + meeting_id=meeting_id, + executive_summary="Productive meeting.", + key_points=[], + action_items=[ActionItem(text="Follow up")], + ) + meeting_mixin_summaries_repo.get_by_meeting.return_value = summary + + request = noteflow_pb2.ListMeetingsRequest(include_summary=True) + response = await meeting_mixin_servicer.ListMeetings(request, mock_grpc_context) + + meeting_response = response.meetings[0] + assert meeting_response.summary is not None, "Summary should be present" + assert meeting_response.summary.executive_summary == "Productive meeting.", ( + "Executive summary should match" + ) + assert len(meeting_response.summary.action_items) == 1, "Should have 1 action item" + meeting_mixin_summaries_repo.get_by_meeting.assert_called_once_with(meeting.id) + + async def test_list_meetings_excludes_summary_when_not_requested( + self, + meeting_mixin_servicer: MeetingServicerProtocol, + meeting_mixin_meetings_repo: AsyncMock, + meeting_mixin_summaries_repo: AsyncMock, + mock_grpc_context: MagicMock, + ) -> None: + """ListMeetings does not load summaries when include_summary is False.""" + meeting = Meeting.create(title="Meeting without Summary") + meeting_mixin_meetings_repo.list_all.return_value = ([meeting], 1) + + request: ListMeetingsRequestProto = noteflow_pb2.ListMeetingsRequest(include_summary=False) + await meeting_mixin_servicer.ListMeetings(request, mock_grpc_context) + + meeting_mixin_summaries_repo.get_by_meeting.assert_not_called() + + async def test_list_meetings_excludes_summary_by_default( + self, + meeting_mixin_servicer: MeetingServicerProtocol, + meeting_mixin_meetings_repo: AsyncMock, + meeting_mixin_summaries_repo: AsyncMock, + mock_grpc_context: MagicMock, + ) -> None: + """ListMeetings excludes summaries by default when include_summary not specified.""" + meeting = Meeting.create(title="Meeting") + meeting_mixin_meetings_repo.list_all.return_value = ([meeting], 1) + + request: ListMeetingsRequestProto = noteflow_pb2.ListMeetingsRequest() + response: ListMeetingsResponseProto = await meeting_mixin_servicer.ListMeetings( + request, mock_grpc_context + ) + + meeting_mixin_summaries_repo.get_by_meeting.assert_not_called() + meeting_response = response.meetings[0] + assert meeting_response.summary is not None, "Summary field should exist" + assert meeting_response.summary.executive_summary == "", ( + "Summary should be empty by default" + ) + # ============================================================================ # TestGetMeeting diff --git a/tests/profiling/test_backend_roundtrip_profile.py b/tests/profiling/test_backend_roundtrip_profile.py index da8cee8..e3102e8 100644 --- a/tests/profiling/test_backend_roundtrip_profile.py +++ b/tests/profiling/test_backend_roundtrip_profile.py @@ -9,11 +9,11 @@ Run with: from __future__ import annotations -import asyncio import os import statistics import time from dataclasses import dataclass, field +from collections.abc import Callable from typing import TYPE_CHECKING, Final from uuid import uuid4 @@ -21,6 +21,9 @@ import grpc import pytest from noteflow.grpc.proto import noteflow_pb2, noteflow_pb2_grpc +from noteflow.infrastructure.logging import get_logger + +logger = get_logger(__name__) if TYPE_CHECKING: from collections.abc import Generator @@ -28,6 +31,12 @@ if TYPE_CHECKING: # Configuration DEFAULT_SERVER: Final[str] = "localhost:50051" GRPC_TARGET: Final[str] = os.environ.get("NOTEFLOW_GRPC_TARGET", DEFAULT_SERVER) +GRPC_TIMEOUT_SECONDS: Final[int] = 10 +CONCURRENT_WORKERS: Final[int] = 5 +BATCH_SIZE: Final[int] = 10 +LIST_LIMIT: Final[int] = 10 +LIST_MEETINGS_LIMIT: Final[int] = 50 + # Required gRPC metadata def get_metadata() -> tuple[tuple[str, str], ...]: @@ -37,6 +46,7 @@ def get_metadata() -> tuple[tuple[str, str], ...]: ("x-workspace-id", "00000000-0000-0000-0000-000000000001"), ) + # Profiling thresholds (milliseconds) THRESHOLDS = { "create_meeting": 500, @@ -77,6 +87,26 @@ class ProfileResult: return f"[{status}] {self.operation}: {self.duration_ms:.2f}ms (threshold: {self.threshold_ms}ms)" +def _record_profile_result( + session: "ProfileSession", + operation: str, + duration_ms: float, + success: bool, + details: str = "", +) -> ProfileResult: + """Record a profile result and log it.""" + result = ProfileResult( + operation=operation, + duration_ms=duration_ms, + success=success, + threshold_ms=THRESHOLDS.get(operation, 1000), + details=details, + ) + session.add(result) + logger.info(str(result)) + return result + + @dataclass class ProfileSession: """Collection of profiling results.""" @@ -88,7 +118,7 @@ class ProfileSession: self.results.append(result) def summary(self) -> str: - lines = [f"\n{'='*60}", f"PROFILING SUMMARY - Server: {self.server}", "=" * 60] + lines = [f"\n{'=' * 60}", f"PROFILING SUMMARY - Server: {self.server}", "=" * 60] passed = sum(1 for r in self.results if r.passed_threshold) total = len(self.results) lines.append(f"Results: {passed}/{total} passed thresholds\n") @@ -101,7 +131,9 @@ class ProfileSession: lines.append(f"\nStatistics:") lines.append(f" Mean: {statistics.mean(durations):.2f}ms") lines.append(f" Median: {statistics.median(durations):.2f}ms") - lines.append(f" Stdev: {statistics.stdev(durations):.2f}ms" if len(durations) > 1 else "") + lines.append( + f" Stdev: {statistics.stdev(durations):.2f}ms" if len(durations) > 1 else "" + ) lines.append(f" Min: {min(durations):.2f}ms") lines.append(f" Max: {max(durations):.2f}ms") @@ -121,7 +153,7 @@ def grpc_channel() -> Generator[grpc.Channel, None, None]: ) # Wait for channel to be ready try: - grpc.channel_ready_future(channel).result(timeout=10) + grpc.channel_ready_future(channel).result(timeout=GRPC_TIMEOUT_SECONDS) except grpc.FutureTimeoutError: pytest.skip(f"Could not connect to gRPC server at {GRPC_TARGET}") yield channel @@ -142,16 +174,16 @@ def profile_session() -> ProfileSession: def profile_operation( operation: str, threshold_ms: float | None = None -) -> callable: +) -> Callable[[Callable[..., object]], Callable[..., object]]: """Decorator to profile an operation.""" - def decorator(func: callable) -> callable: + def decorator(func: Callable[..., object]) -> Callable[..., object]: def wrapper( stub: noteflow_pb2_grpc.NoteFlowServiceStub, profile_session: ProfileSession, - *args, - **kwargs, - ): + *args: object, + **kwargs: object, + ) -> object: threshold = threshold_ms or THRESHOLDS.get(operation, 1000) start = time.perf_counter() try: @@ -164,7 +196,7 @@ def profile_operation( threshold_ms=threshold, ) profile_session.add(profile_result) - print(profile_result) + logger.info(str(profile_result)) return result except grpc.RpcError as e: duration_ms = (time.perf_counter() - start) * 1000 @@ -176,7 +208,7 @@ def profile_operation( details=str(e.code()), ) profile_session.add(profile_result) - print(f"✗ {operation}: FAILED - {e.code()}") + logger.info(f"✗ {operation}: FAILED - {e.code()}") raise return wrapper @@ -204,7 +236,9 @@ class TestMeetingCRUDProfiling: title=f"Profile Test Meeting {uuid4()}", ) try: - response = self.stub.CreateMeeting(request, timeout=10, metadata=get_metadata()) + response = self.stub.CreateMeeting( + request, timeout=GRPC_TIMEOUT_SECONDS, metadata=get_metadata() + ) duration_ms = (time.perf_counter() - start) * 1000 self.meeting_id = response.id result = ProfileResult( @@ -214,9 +248,7 @@ class TestMeetingCRUDProfiling: threshold_ms=THRESHOLDS["create_meeting"], ) self.session.add(result) - print(result) - # Store for subsequent tests - pytest.meeting_id = response.id + logger.info(str(result)) assert response.id except grpc.RpcError as e: duration_ms = (time.perf_counter() - start) * 1000 @@ -239,7 +271,9 @@ class TestMeetingCRUDProfiling: start = time.perf_counter() request = noteflow_pb2.GetMeetingRequest(meeting_id=meeting_id) try: - response = self.stub.GetMeeting(request, timeout=10, metadata=get_metadata()) + response = self.stub.GetMeeting( + request, timeout=GRPC_TIMEOUT_SECONDS, metadata=get_metadata() + ) duration_ms = (time.perf_counter() - start) * 1000 result = ProfileResult( operation="get_meeting", @@ -248,7 +282,7 @@ class TestMeetingCRUDProfiling: threshold_ms=THRESHOLDS["get_meeting"], ) self.session.add(result) - print(result) + logger.info(str(result)) assert response.id == meeting_id except grpc.RpcError as e: duration_ms = (time.perf_counter() - start) * 1000 @@ -265,9 +299,11 @@ class TestMeetingCRUDProfiling: def test_03_list_meetings(self) -> None: """Profile meeting listing.""" start = time.perf_counter() - request = noteflow_pb2.ListMeetingsRequest(limit=50) + request = noteflow_pb2.ListMeetingsRequest(limit=LIST_MEETINGS_LIMIT) try: - response = self.stub.ListMeetings(request, timeout=10, metadata=get_metadata()) + response = self.stub.ListMeetings( + request, timeout=GRPC_TIMEOUT_SECONDS, metadata=get_metadata() + ) duration_ms = (time.perf_counter() - start) * 1000 result = ProfileResult( operation="list_meetings", @@ -277,7 +313,7 @@ class TestMeetingCRUDProfiling: details=f"count={len(response.meetings)}", ) self.session.add(result) - print(result) + logger.info(str(result)) except grpc.RpcError as e: duration_ms = (time.perf_counter() - start) * 1000 result = ProfileResult( @@ -295,32 +331,43 @@ class TestMeetingCRUDProfiling: meeting_id = getattr(pytest, "meeting_id", None) if not meeting_id: pytest.skip("No meeting created") + _profile_stop_meeting(self.stub, self.session, meeting_id) - start = time.perf_counter() - request = noteflow_pb2.StopMeetingRequest(meeting_id=meeting_id) - try: - response = self.stub.StopMeeting(request, timeout=10, metadata=get_metadata()) - duration_ms = (time.perf_counter() - start) * 1000 - result = ProfileResult( - operation="stop_meeting", - duration_ms=duration_ms, - success=True, - threshold_ms=THRESHOLDS.get("stop_meeting", 500), - ) - self.session.add(result) - print(result) - except grpc.RpcError as e: - duration_ms = (time.perf_counter() - start) * 1000 - # Stop on non-recording meeting might fail, but we still capture timing - result = ProfileResult( - operation="stop_meeting", - duration_ms=duration_ms, - success=False, - threshold_ms=THRESHOLDS.get("stop_meeting", 500), - details=str(e.code()), - ) - self.session.add(result) - print(result) # Don't raise - expected to fail on non-recording meeting + +def _profile_list_segments( + stub: noteflow_pb2_grpc.NoteFlowServiceStub, + session: ProfileSession, + meeting_id: str, +) -> None: + start = time.perf_counter() + request = noteflow_pb2.GetMeetingRequest(meeting_id=meeting_id, include_segments=True) + try: + response = stub.GetMeeting(request, timeout=GRPC_TIMEOUT_SECONDS, metadata=get_metadata()) + duration_ms = (time.perf_counter() - start) * 1000 + _record_profile_result( + session, "list_segments", duration_ms, True, f"count={len(response.segments)}" + ) + except grpc.RpcError as e: + duration_ms = (time.perf_counter() - start) * 1000 + _record_profile_result(session, "list_segments", duration_ms, False, str(e.code())) + raise + + +def _profile_stop_meeting( + stub: noteflow_pb2_grpc.NoteFlowServiceStub, + session: ProfileSession, + meeting_id: str, +) -> None: + start = time.perf_counter() + request = noteflow_pb2.StopMeetingRequest(meeting_id=meeting_id) + try: + stub.StopMeeting(request, timeout=GRPC_TIMEOUT_SECONDS, metadata=get_metadata()) + duration_ms = (time.perf_counter() - start) * 1000 + _record_profile_result(session, "stop_meeting", duration_ms, True) + except grpc.RpcError as e: + duration_ms = (time.perf_counter() - start) * 1000 + # Stop on non-recording meeting might fail, but we still capture timing + _record_profile_result(session, "stop_meeting", duration_ms, False, str(e.code())) class TestSegmentProfiling: @@ -340,35 +387,7 @@ class TestSegmentProfiling: meeting_id = getattr(pytest, "meeting_id", None) if not meeting_id: pytest.skip("No meeting created") - - start = time.perf_counter() - request = noteflow_pb2.GetMeetingRequest( - meeting_id=meeting_id, - include_segments=True, - ) - try: - response = self.stub.GetMeeting(request, timeout=10, metadata=get_metadata()) - duration_ms = (time.perf_counter() - start) * 1000 - result = ProfileResult( - operation="list_segments", - duration_ms=duration_ms, - success=True, - threshold_ms=THRESHOLDS["list_segments"], - details=f"count={len(response.segments)}", - ) - self.session.add(result) - print(result) - except grpc.RpcError as e: - duration_ms = (time.perf_counter() - start) * 1000 - result = ProfileResult( - operation="list_segments", - duration_ms=duration_ms, - success=False, - threshold_ms=THRESHOLDS["list_segments"], - details=str(e.code()), - ) - self.session.add(result) - raise + _profile_list_segments(self.stub, self.session, meeting_id) class TestAnnotationProfiling: @@ -396,7 +415,7 @@ class TestAnnotationProfiling: text=f"Profile test annotation {uuid4()}", ) try: - response = self.stub.AddAnnotation(request, timeout=10, metadata=get_metadata()) + self.stub.AddAnnotation(request, timeout=GRPC_TIMEOUT_SECONDS, metadata=get_metadata()) duration_ms = (time.perf_counter() - start) * 1000 result = ProfileResult( operation="create_annotation", @@ -405,8 +424,7 @@ class TestAnnotationProfiling: threshold_ms=THRESHOLDS["create_annotation"], ) self.session.add(result) - print(result) - pytest.annotation_id = response.id + logger.info(str(result)) except grpc.RpcError as e: duration_ms = (time.perf_counter() - start) * 1000 result = ProfileResult( @@ -428,7 +446,9 @@ class TestAnnotationProfiling: start = time.perf_counter() request = noteflow_pb2.ListAnnotationsRequest(meeting_id=meeting_id) try: - response = self.stub.ListAnnotations(request, timeout=10, metadata=get_metadata()) + response = self.stub.ListAnnotations( + request, timeout=GRPC_TIMEOUT_SECONDS, metadata=get_metadata() + ) duration_ms = (time.perf_counter() - start) * 1000 result = ProfileResult( operation="list_annotations", @@ -438,7 +458,7 @@ class TestAnnotationProfiling: details=f"count={len(response.annotations)}", ) self.session.add(result) - print(result) + logger.info(str(result)) except grpc.RpcError as e: duration_ms = (time.perf_counter() - start) * 1000 result = ProfileResult( @@ -469,7 +489,7 @@ class TestPreferencesProfiling: start = time.perf_counter() request = noteflow_pb2.GetPreferencesRequest() try: - response = self.stub.GetPreferences(request, timeout=10, metadata=get_metadata()) + self.stub.GetPreferences(request, timeout=GRPC_TIMEOUT_SECONDS, metadata=get_metadata()) duration_ms = (time.perf_counter() - start) * 1000 result = ProfileResult( operation="get_preferences", @@ -478,7 +498,7 @@ class TestPreferencesProfiling: threshold_ms=THRESHOLDS["get_preferences"], ) self.session.add(result) - print(result) + logger.info(str(result)) except grpc.RpcError as e: duration_ms = (time.perf_counter() - start) * 1000 result = ProfileResult( @@ -492,6 +512,26 @@ class TestPreferencesProfiling: raise +def _profile_concurrent_reads( + stub: noteflow_pb2_grpc.NoteFlowServiceStub, + session: ProfileSession, +) -> None: + import concurrent.futures + + def read_meetings() -> object: + request = noteflow_pb2.ListMeetingsRequest(limit=LIST_LIMIT) + return stub.ListMeetings(request, timeout=GRPC_TIMEOUT_SECONDS, metadata=get_metadata()) + + start = time.perf_counter() + with concurrent.futures.ThreadPoolExecutor(max_workers=CONCURRENT_WORKERS) as executor: + futures = [executor.submit(read_meetings) for _ in range(CONCURRENT_WORKERS)] + _ = [f.result() for f in concurrent.futures.as_completed(futures)] + duration_ms = (time.perf_counter() - start) * 1000 + _record_profile_result( + session, "concurrent_reads", duration_ms, True, f"{CONCURRENT_WORKERS} concurrent requests" + ) + + class TestConcurrentProfiling: """Profile concurrent operations.""" @@ -506,27 +546,35 @@ class TestConcurrentProfiling: def test_concurrent_reads(self) -> None: """Profile concurrent read operations.""" - import concurrent.futures + _profile_concurrent_reads(self.stub, self.session) - def read_meetings(): - request = noteflow_pb2.ListMeetingsRequest(limit=10) - return self.stub.ListMeetings(request, timeout=10, metadata=get_metadata()) - start = time.perf_counter() - with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: - futures = [executor.submit(read_meetings) for _ in range(5)] - results = [f.result() for f in concurrent.futures.as_completed(futures)] +def _create_batch_meetings( + stub: noteflow_pb2_grpc.NoteFlowServiceStub, +) -> list[str]: + meeting_ids: list[str] = [] + for i in range(BATCH_SIZE): + request = noteflow_pb2.CreateMeetingRequest(title=f"Batch Profile Meeting {i} - {uuid4()}") + try: + response = stub.CreateMeeting( + request, timeout=GRPC_TIMEOUT_SECONDS, metadata=get_metadata() + ) + meeting_ids.append(response.id) + except grpc.RpcError: + pass + return meeting_ids - duration_ms = (time.perf_counter() - start) * 1000 - result = ProfileResult( - operation="concurrent_reads", - duration_ms=duration_ms, - success=True, - threshold_ms=THRESHOLDS["concurrent_reads"], - details=f"5 concurrent requests", - ) - self.session.add(result) - print(result) + +def _cleanup_batch_meetings( + stub: noteflow_pb2_grpc.NoteFlowServiceStub, + meeting_ids: list[str], +) -> None: + for mid in meeting_ids: + try: + request = noteflow_pb2.DeleteMeetingRequest(meeting_id=mid) + stub.DeleteMeeting(request, timeout=GRPC_TIMEOUT_SECONDS, metadata=get_metadata()) + except grpc.RpcError: + pass class TestBatchProfiling: @@ -543,37 +591,18 @@ class TestBatchProfiling: def test_batch_create_meetings(self) -> None: """Profile batch meeting creation.""" - meeting_ids = [] start = time.perf_counter() - - for i in range(10): - request = noteflow_pb2.CreateMeetingRequest( - title=f"Batch Profile Meeting {i} - {uuid4()}", - ) - try: - response = self.stub.CreateMeeting(request, timeout=10, metadata=get_metadata()) - meeting_ids.append(response.id) - except grpc.RpcError: - pass - + meeting_ids = _create_batch_meetings(self.stub) duration_ms = (time.perf_counter() - start) * 1000 - result = ProfileResult( - operation="batch_create", - duration_ms=duration_ms, - success=len(meeting_ids) == 10, - threshold_ms=THRESHOLDS["batch_create"], - details=f"created={len(meeting_ids)}/10", + success = len(meeting_ids) == BATCH_SIZE + _record_profile_result( + self.session, + "batch_create", + duration_ms, + success, + f"created={len(meeting_ids)}/{BATCH_SIZE}", ) - self.session.add(result) - print(result) - - # Cleanup - delete created meetings - for mid in meeting_ids: - try: - request = noteflow_pb2.DeleteMeetingRequest(meeting_id=mid) - self.stub.DeleteMeeting(request, timeout=10, metadata=get_metadata()) - except grpc.RpcError: - pass + _cleanup_batch_meetings(self.stub, meeting_ids) class TestAnalyticsProfiling: @@ -593,7 +622,9 @@ class TestAnalyticsProfiling: start = time.perf_counter() request = noteflow_pb2.GetAnalyticsOverviewRequest() try: - response = self.stub.GetAnalyticsOverview(request, timeout=30, metadata=get_metadata()) + self.stub.GetAnalyticsOverview( + request, timeout=GRPC_TIMEOUT_SECONDS * 3, metadata=get_metadata() + ) duration_ms = (time.perf_counter() - start) * 1000 result = ProfileResult( operation="get_analytics", @@ -602,7 +633,7 @@ class TestAnalyticsProfiling: threshold_ms=THRESHOLDS["get_analytics"], ) self.session.add(result) - print(result) + logger.info(str(result)) except grpc.RpcError as e: duration_ms = (time.perf_counter() - start) * 1000 result = ProfileResult( @@ -614,7 +645,7 @@ class TestAnalyticsProfiling: ) self.session.add(result) # Don't raise - analytics might not be implemented - print(f"Analytics: {e.code()} (may not be implemented)") + logger.info(f"Analytics: {e.code()} (may not be implemented)") class TestCleanupAndSummary: @@ -635,15 +666,17 @@ class TestCleanupAndSummary: if meeting_id: try: request = noteflow_pb2.DeleteMeetingRequest(meeting_id=meeting_id) - self.stub.DeleteMeeting(request, timeout=10, metadata=get_metadata()) - print(f"Cleaned up meeting: {meeting_id}") + self.stub.DeleteMeeting( + request, timeout=GRPC_TIMEOUT_SECONDS, metadata=get_metadata() + ) + logger.info(f"Cleaned up meeting: {meeting_id}") except grpc.RpcError as e: - print(f"Cleanup failed: {e.code()}") + logger.info(f"Cleanup failed: {e.code()}") # Print summary - print(self.session.summary()) + logger.info(self.session.summary()) # Assert all thresholds passed (optional - comment out for pure profiling) failed = [r for r in self.session.results if not r.passed_threshold and r.success] if failed: - print(f"\n[WARNING] {len(failed)} operations exceeded thresholds") + logger.info(f"\n[WARNING] {len(failed)} operations exceeded thresholds")