Files
noteflow/tests/grpc/test_sync_orchestration.py
Travis Vasceannie d8090a98e8
Some checks failed
CI / test-typescript (push) Has been cancelled
CI / test-rust (push) Has been cancelled
CI / test-python (push) Has been cancelled
ci/cd fixes
2026-01-26 00:28:15 +00:00

974 lines
34 KiB
Python

"""Behavioral tests for integration sync orchestration (Sprint 9 + Sprint 18.1).
Covers the full sync lifecycle: create integration, start sync, poll status,
verify completion/error states. Uses in-memory storage with mocked calendar service.
Sprint 18.1 additions: GetUserIntegrations RPC, NOT_FOUND status code validation,
list_all repository method tests.
"""
from __future__ import annotations
import asyncio
from collections.abc import Awaitable, Callable, Sequence
from dataclasses import dataclass
from datetime import datetime
from typing import Protocol, cast
from uuid import UUID, uuid4
import grpc
import pytest
from noteflow.application.services.calendar import CalendarService
from noteflow.domain.entities.integration import (
Integration,
IntegrationType,
SyncRun,
)
from noteflow.grpc.config.config import ServicesConfig
from noteflow.grpc.meeting_store import MeetingStore
from noteflow.grpc.proto import noteflow_pb2
from noteflow.grpc.service import NoteFlowServicer
def _get_status_code_not_found() -> grpc.StatusCode:
"""Helper function to get StatusCode.NOT_FOUND for type checker compatibility."""
attr_name = "StatusCode"
status_code = getattr(grpc, attr_name)
return status_code.NOT_FOUND
class _DummyContext:
"""Minimal gRPC context for testing."""
def __init__(self) -> None:
self.aborted = False
self.abort_code: grpc.StatusCode | None = None
self.abort_details: str | None = None
async def abort(self, code: grpc.StatusCode, details: str) -> None:
self.aborted = True
self.abort_code = code
self.abort_details = details
raise AssertionError(f"abort called: {code} - {details}")
class _StartIntegrationSyncRequest(Protocol):
integration_id: str
class _StartIntegrationSyncResponse(Protocol):
sync_run_id: str
status: str
class _GetSyncStatusRequest(Protocol):
sync_run_id: str
class _GetSyncStatusResponse(Protocol):
status: str
items_synced: int
items_total: int
error_code: int # SyncErrorCode enum value
duration_ms: int
expires_at: str
class _ListSyncHistoryRequest(Protocol):
integration_id: str
limit: int
class _ListSyncHistoryResponse(Protocol):
total_count: int
runs: Sequence[_SyncRunInfo]
class _SyncRunInfo(Protocol):
id: str
class _IntegrationInfo(Protocol):
id: str
name: str
type: str
status: str
workspace_id: str
class _GetUserIntegrationsResponse(Protocol):
integrations: Sequence[_IntegrationInfo]
class _StartIntegrationSyncCallable(Protocol):
async def __call__(
self,
request: _StartIntegrationSyncRequest,
context: _DummyContext,
) -> _StartIntegrationSyncResponse: ...
class _GetSyncStatusCallable(Protocol):
async def __call__(
self,
request: _GetSyncStatusRequest,
context: _DummyContext,
) -> _GetSyncStatusResponse: ...
class _ListSyncHistoryCallable(Protocol):
async def __call__(
self,
request: _ListSyncHistoryRequest,
context: _DummyContext,
) -> _ListSyncHistoryResponse: ...
class _GetUserIntegrationsCallable(Protocol):
async def __call__(
self,
request: noteflow_pb2.GetUserIntegrationsRequest,
context: _DummyContext,
) -> _GetUserIntegrationsResponse: ...
async def _call_start_sync(
servicer: NoteFlowServicer,
request: _StartIntegrationSyncRequest,
context: _DummyContext,
) -> _StartIntegrationSyncResponse:
start_sync = cast(
_StartIntegrationSyncCallable,
servicer.StartIntegrationSync,
)
return await start_sync(request, context)
async def _call_get_sync_status(
servicer: NoteFlowServicer,
request: _GetSyncStatusRequest,
context: _DummyContext,
) -> _GetSyncStatusResponse:
get_status = cast(
_GetSyncStatusCallable,
servicer.GetSyncStatus,
)
return await get_status(request, context)
async def _call_list_sync_history(
servicer: NoteFlowServicer,
request: _ListSyncHistoryRequest,
context: _DummyContext,
) -> _ListSyncHistoryResponse:
list_history = cast(
_ListSyncHistoryCallable,
servicer.ListSyncHistory,
)
return await list_history(request, context)
async def _call_get_user_integrations(
servicer: NoteFlowServicer,
request: noteflow_pb2.GetUserIntegrationsRequest,
context: _DummyContext,
) -> _GetUserIntegrationsResponse:
get_integrations = cast(
_GetUserIntegrationsCallable,
servicer.GetUserIntegrations,
)
return await get_integrations(request, context)
@dataclass
class MockCalendarEvent:
"""Mock calendar event for testing."""
event_id: str
title: str
start_time: str
end_time: str
class SuccessfulCalendarService:
"""Calendar service that always succeeds with configured events."""
def __init__(self, events: list[MockCalendarEvent] | None = None) -> None:
self.events_to_return: list[MockCalendarEvent] = events or []
self.call_count: int = 0
self.sync_started = asyncio.Event()
self.sync_can_complete = asyncio.Event()
self.sync_can_complete.set() # Default: immediate completion
async def list_calendar_events(
self,
hours_ahead: int = 168,
limit: int = 100,
provider: str | None = None,
) -> list[MockCalendarEvent]:
"""Return mock calendar events (always succeeds)."""
del hours_ahead, limit, provider # Unused - interface compliance
self.call_count += 1
self.sync_started.set()
await self.sync_can_complete.wait()
return self.events_to_return
class FailingCalendarService:
"""Calendar service that always fails with configured error."""
def __init__(self, failure_message: str = "Calendar sync failed") -> None:
self.failure_message = failure_message
self.call_count: int = 0
self.sync_started = asyncio.Event()
self.sync_can_complete = asyncio.Event()
self.sync_can_complete.set()
async def list_calendar_events(
self,
hours_ahead: int = 168,
limit: int = 100,
provider: str | None = None,
) -> list[MockCalendarEvent]:
"""Always raises RuntimeError."""
del hours_ahead, limit, provider # Unused - interface compliance
self.call_count += 1
self.sync_started.set()
await self.sync_can_complete.wait()
raise RuntimeError(self.failure_message)
@pytest.fixture
def meeting_store() -> MeetingStore:
"""Create a fresh meeting store with shared integration repository."""
return MeetingStore()
@pytest.fixture
def successfulcalendar_service() -> SuccessfulCalendarService:
"""Create a successful calendar service."""
return SuccessfulCalendarService()
@pytest.fixture
def failingcalendar_service() -> FailingCalendarService:
"""Create a failing calendar service."""
return FailingCalendarService()
@pytest.fixture
def servicer_with_success(
meeting_store: MeetingStore, successfulcalendar_service: SuccessfulCalendarService
) -> NoteFlowServicer:
"""Create servicer with in-memory storage and successful calendar service."""
servicer = NoteFlowServicer(
services=ServicesConfig(calendar_service=cast(CalendarService, successfulcalendar_service))
)
servicer.memory_store = meeting_store
return servicer
@pytest.fixture
def servicer_with_failure(
meeting_store: MeetingStore, failingcalendar_service: FailingCalendarService
) -> NoteFlowServicer:
"""Create servicer with in-memory storage and failing calendar service."""
servicer = NoteFlowServicer(
services=ServicesConfig(calendar_service=cast(CalendarService, failingcalendar_service))
)
servicer.memory_store = meeting_store
return servicer
async def create_test_integration(store: MeetingStore, name: str = "Test Calendar") -> Integration:
"""Create and persist a test integration."""
integration = Integration.create(
workspace_id=uuid4(),
name=name,
integration_type=IntegrationType.CALENDAR,
config={"provider": "google"},
)
integration.connect(provider_email="test@example.com")
await store.integrations.create(integration)
return integration
async def await_sync_completion(
servicer: NoteFlowServicer,
sync_run_id: str,
context: _DummyContext,
timeout: float = 2.0,
) -> _GetSyncStatusResponse:
"""Wait for sync to complete using event-based synchronization.
Uses asyncio.wait_for for timeout instead of polling loop.
The sync task runs in the background; we wait then check final status.
"""
async def _get_final_status() -> _GetSyncStatusResponse:
# Brief delay to allow background sync to complete
await asyncio.sleep(0.05)
return await _call_get_sync_status(
servicer,
noteflow_pb2.GetSyncStatusRequest(sync_run_id=sync_run_id),
context,
)
return await asyncio.wait_for(_get_final_status(), timeout=timeout)
class TestStartIntegrationSync:
"""Test starting sync operations."""
@pytest.mark.asyncio
async def test_start_sync_for_nonexistent_integration_fails(self) -> None:
"""Starting sync for nonexistent integration returns NOT_FOUND."""
servicer = NoteFlowServicer()
context = _DummyContext()
with pytest.raises(AssertionError, match="abort called"):
await _call_start_sync(
servicer,
noteflow_pb2.StartIntegrationSyncRequest(integration_id=str(uuid4())),
context,
)
assert context.aborted, "Should have aborted with NOT_FOUND"
@pytest.mark.asyncio
async def test_start_sync_requires_integration_id(self) -> None:
"""Starting sync without integration_id fails."""
servicer = NoteFlowServicer()
context = _DummyContext()
with pytest.raises(AssertionError, match="abort called"):
await _call_start_sync(
servicer,
noteflow_pb2.StartIntegrationSyncRequest(integration_id=""),
context,
)
assert context.aborted, "Should have aborted for empty integration_id"
class TestSyncStatus:
"""Test sync status monitoring."""
@pytest.mark.asyncio
async def test_get_sync_status_for_nonexistent_run_fails(self) -> None:
"""Get sync status for nonexistent run returns NOT_FOUND."""
servicer = NoteFlowServicer()
context = _DummyContext()
with pytest.raises(AssertionError, match="abort called"):
await _call_get_sync_status(
servicer,
noteflow_pb2.GetSyncStatusRequest(sync_run_id=str(uuid4())),
context,
)
assert context.aborted, "Should have aborted with NOT_FOUND"
@pytest.mark.asyncio
async def test_get_sync_status_requires_sync_run_id(self) -> None:
"""Get sync status without sync_run_id fails."""
servicer = NoteFlowServicer()
context = _DummyContext()
with pytest.raises(AssertionError, match="abort called"):
await _call_get_sync_status(
servicer,
noteflow_pb2.GetSyncStatusRequest(sync_run_id=""),
context,
)
assert context.aborted, "Should have aborted for empty sync_run_id"
class TestSyncHistory:
"""Test sync history listing."""
@pytest.mark.asyncio
async def test_list_sync_history_for_nonexistent_integration(self) -> None:
"""List sync history for nonexistent integration returns empty list."""
servicer = NoteFlowServicer()
context = _DummyContext()
response = await _call_list_sync_history(
servicer,
noteflow_pb2.ListSyncHistoryRequest(integration_id=str(uuid4()), limit=10),
context,
)
assert response.runs is not None, "Should return runs list"
assert response.total_count >= 0, "Should return non-negative count"
@pytest.mark.asyncio
async def test_list_sync_history_default_pagination(self) -> None:
"""List sync history uses default pagination when not specified."""
servicer = NoteFlowServicer()
context = _DummyContext()
response = await _call_list_sync_history(
servicer,
noteflow_pb2.ListSyncHistoryRequest(integration_id=str(uuid4())),
context,
)
assert response is not None, "Should return valid response"
class TestSyncHappyPath:
"""Test the complete sync happy path: start -> poll -> complete."""
@pytest.mark.asyncio
async def test_start_sync_returns_running_status(
self, servicer_with_success: NoteFlowServicer, meeting_store: MeetingStore
) -> None:
"""Starting sync returns a running sync run ID."""
integration = await create_test_integration(meeting_store)
context = _DummyContext()
response = await _call_start_sync(
servicer_with_success,
noteflow_pb2.StartIntegrationSyncRequest(integration_id=str(integration.id)),
context,
)
assert response.sync_run_id, "Should return a sync run ID"
assert response.status == "running", "Initial status should be running"
assert not context.aborted, "Should not abort for valid integration"
@pytest.mark.asyncio
async def test_sync_completes_successfully(
self,
meeting_store: MeetingStore,
successfulcalendar_service: SuccessfulCalendarService,
) -> None:
"""Sync completes with success status when events are fetched."""
successfulcalendar_service.events_to_return = [
MockCalendarEvent("evt1", "Meeting 1", "2025-01-01T10:00:00Z", "2025-01-01T11:00:00Z"),
MockCalendarEvent("evt2", "Meeting 2", "2025-01-01T14:00:00Z", "2025-01-01T15:00:00Z"),
]
servicer = NoteFlowServicer(
services=ServicesConfig(
calendar_service=cast(CalendarService, successfulcalendar_service)
)
)
servicer.memory_store = meeting_store
integration = await create_test_integration(meeting_store)
context = _DummyContext()
start = await _call_start_sync(
servicer,
noteflow_pb2.StartIntegrationSyncRequest(integration_id=str(integration.id)),
context,
)
status = await await_sync_completion(servicer, start.sync_run_id, context)
assert status.status == "success", "Sync should complete successfully"
expected_synced = 2
assert status.items_synced == expected_synced, "Should report 2 items synced"
@pytest.mark.asyncio
async def test_sync_history_shows_completed_run(
self,
meeting_store: MeetingStore,
successfulcalendar_service: SuccessfulCalendarService,
) -> None:
"""Completed sync run appears in history."""
successfulcalendar_service.events_to_return = [
MockCalendarEvent("evt1", "Meeting", "2025-01-01T10:00:00Z", "2025-01-01T11:00:00Z"),
]
servicer = NoteFlowServicer(
services=ServicesConfig(
calendar_service=cast(CalendarService, successfulcalendar_service)
)
)
servicer.memory_store = meeting_store
integration = await create_test_integration(meeting_store)
context = _DummyContext()
start = await _call_start_sync(
servicer,
noteflow_pb2.StartIntegrationSyncRequest(integration_id=str(integration.id)),
context,
)
await await_sync_completion(servicer, start.sync_run_id, context)
history = await _call_list_sync_history(
servicer,
noteflow_pb2.ListSyncHistoryRequest(integration_id=str(integration.id), limit=10),
context,
)
expected_count = 1
assert history.total_count == expected_count, "Should have one sync run in history"
assert history.runs[0].id == start.sync_run_id, "Run ID should match"
class TestSyncErrorHandling:
"""Test sync error scenarios and recovery."""
@pytest.mark.asyncio
async def test_sync_fails_whencalendar_service_errors(
self, meeting_store: MeetingStore
) -> None:
"""Sync fails and records error when calendar service throws."""
failing_service = FailingCalendarService(failure_message="OAuth token expired")
servicer = NoteFlowServicer(
services=ServicesConfig(calendar_service=cast(CalendarService, failing_service))
)
servicer.memory_store = meeting_store
integration = await create_test_integration(meeting_store)
context = _DummyContext()
start = await _call_start_sync(
servicer,
noteflow_pb2.StartIntegrationSyncRequest(integration_id=str(integration.id)),
context,
)
status = await await_sync_completion(servicer, start.sync_run_id, context)
assert status.status == "error", "Sync should report error"
# Auth errors should return SYNC_ERROR_CODE_AUTH_REQUIRED
assert status.error_code == noteflow_pb2.SYNC_ERROR_CODE_AUTH_REQUIRED, (
f"Expected auth_required error code, got {status.error_code}"
)
@pytest.mark.asyncio
async def test_first_sync_fails(
self, servicer_with_failure: NoteFlowServicer, meeting_store: MeetingStore
) -> None:
"""First sync fails due to error."""
integration = await create_test_integration(meeting_store)
context = _DummyContext()
start = await _call_start_sync(
servicer_with_failure,
noteflow_pb2.StartIntegrationSyncRequest(integration_id=str(integration.id)),
context,
)
status = await await_sync_completion(servicer_with_failure, start.sync_run_id, context)
assert status.status == "error", "First sync should fail"
@pytest.mark.asyncio
async def test_retry_after_failure_succeeds(
self,
meeting_store: MeetingStore,
servicer_with_failure: NoteFlowServicer,
servicer_with_success: NoteFlowServicer,
successfulcalendar_service: SuccessfulCalendarService,
) -> None:
"""A sync can succeed after a previous sync failed."""
integration = await create_test_integration(meeting_store)
context = _DummyContext()
first = await _call_start_sync(
servicer_with_failure,
noteflow_pb2.StartIntegrationSyncRequest(integration_id=str(integration.id)),
context,
)
await await_sync_completion(servicer_with_failure, first.sync_run_id, context)
successfulcalendar_service.events_to_return = [
MockCalendarEvent("evt1", "Meeting", "2025-01-01T10:00:00Z", "2025-01-01T11:00:00Z"),
]
second = await _call_start_sync(
servicer_with_success,
noteflow_pb2.StartIntegrationSyncRequest(integration_id=str(integration.id)),
context,
)
status = await await_sync_completion(servicer_with_success, second.sync_run_id, context)
assert status.status == "success", "Second sync should succeed"
expected_synced = 1
assert status.items_synced == expected_synced, "Should sync 1 item"
class TestConcurrentSyncs:
"""Test concurrent sync operations."""
@pytest.mark.asyncio
async def test_multiple_integrations_can_sync(
self, servicer_with_success: NoteFlowServicer, meeting_store: MeetingStore
) -> None:
"""Multiple integrations can sync."""
int1 = await create_test_integration(meeting_store, "Calendar 1")
int2 = await create_test_integration(meeting_store, "Calendar 2")
context = _DummyContext()
r1 = await _call_start_sync(
servicer_with_success,
noteflow_pb2.StartIntegrationSyncRequest(integration_id=str(int1.id)),
context,
)
r2 = await _call_start_sync(
servicer_with_success,
noteflow_pb2.StartIntegrationSyncRequest(integration_id=str(int2.id)),
context,
)
s1 = await await_sync_completion(servicer_with_success, r1.sync_run_id, context)
s2 = await await_sync_completion(servicer_with_success, r2.sync_run_id, context)
assert s1.status == "success", "First integration sync should succeed"
assert s2.status == "success", "Second integration sync should succeed"
@pytest.mark.asyncio
async def test_each_integration_has_own_history(
self, servicer_with_success: NoteFlowServicer, meeting_store: MeetingStore
) -> None:
"""Each integration maintains separate history."""
int1 = await create_test_integration(meeting_store, "Calendar 1")
int2 = await create_test_integration(meeting_store, "Calendar 2")
context = _DummyContext()
r1 = await _call_start_sync(
servicer_with_success,
noteflow_pb2.StartIntegrationSyncRequest(integration_id=str(int1.id)),
context,
)
r2 = await _call_start_sync(
servicer_with_success,
noteflow_pb2.StartIntegrationSyncRequest(integration_id=str(int2.id)),
context,
)
await await_sync_completion(servicer_with_success, r1.sync_run_id, context)
await await_sync_completion(servicer_with_success, r2.sync_run_id, context)
h1 = await _call_list_sync_history(
servicer_with_success,
noteflow_pb2.ListSyncHistoryRequest(integration_id=str(int1.id), limit=10),
context,
)
h2 = await _call_list_sync_history(
servicer_with_success,
noteflow_pb2.ListSyncHistoryRequest(integration_id=str(int2.id), limit=10),
context,
)
expected_runs = 1
assert h1.total_count == expected_runs, "Integration 1 should have 1 run"
assert h2.total_count == expected_runs, "Integration 2 should have 1 run"
class TestSyncPolling:
"""Test polling behavior during sync execution."""
@pytest.mark.asyncio
async def test_status_shows_running_while_sync_in_progress(
self, meeting_store: MeetingStore
) -> None:
"""Status shows running while sync is still executing."""
blocking_service = SuccessfulCalendarService()
blocking_service.sync_can_complete.clear() # Block completion
servicer = NoteFlowServicer(
services=ServicesConfig(calendar_service=cast(CalendarService, blocking_service))
)
servicer.memory_store = meeting_store
integration = await create_test_integration(meeting_store)
context = _DummyContext()
start = await _call_start_sync(
servicer,
noteflow_pb2.StartIntegrationSyncRequest(integration_id=str(integration.id)),
context,
)
# Wait for sync to start
await asyncio.wait_for(blocking_service.sync_started.wait(), timeout=1.0)
# Check status while blocked - should still be running
immediate_status = await _call_get_sync_status(
servicer,
noteflow_pb2.GetSyncStatusRequest(sync_run_id=start.sync_run_id),
context,
)
assert immediate_status.status == "running", "Should be running while blocked"
# Allow completion
blocking_service.sync_can_complete.set()
final = await await_sync_completion(servicer, start.sync_run_id, context)
assert final.status == "success", "Should complete after unblocking"
@pytest.mark.asyncio
async def test_sync_run_includes_duration(
self, servicer_with_success: NoteFlowServicer, meeting_store: MeetingStore
) -> None:
"""Completed sync run includes duration information."""
integration = await create_test_integration(meeting_store)
context = _DummyContext()
start = await _call_start_sync(
servicer_with_success,
noteflow_pb2.StartIntegrationSyncRequest(integration_id=str(integration.id)),
context,
)
status = await await_sync_completion(servicer_with_success, start.sync_run_id, context)
assert status.status == "success", "Sync should complete"
assert status.duration_ms >= 0, "Duration should be recorded"
class TestGetUserIntegrations:
"""Test GetUserIntegrations RPC (Sprint 18.1)."""
@pytest.mark.asyncio
async def test_returns_empty_when_no_integrations(self) -> None:
"""GetUserIntegrations returns empty list when no integrations exist."""
servicer = NoteFlowServicer()
context = _DummyContext()
response = await _call_get_user_integrations(
servicer,
noteflow_pb2.GetUserIntegrationsRequest(),
context,
)
assert response.integrations is not None, "Response should have integrations list"
expected_count = 0
assert len(response.integrations) == expected_count, "Should return empty list"
assert not context.aborted, "Should not abort"
@pytest.mark.asyncio
async def test_returns_created_integration(self, meeting_store: MeetingStore) -> None:
"""GetUserIntegrations returns created integration."""
servicer = NoteFlowServicer()
servicer.memory_store = meeting_store
context = _DummyContext()
integration = await create_test_integration(meeting_store, "Google Calendar")
response = await _call_get_user_integrations(
servicer,
noteflow_pb2.GetUserIntegrationsRequest(),
context,
)
expected_count = 1
assert len(response.integrations) == expected_count, "Should return one integration"
expected_id = str(integration.id)
assert response.integrations[0].id == expected_id, "ID should match"
@pytest.mark.asyncio
async def test_includes_all_fields(self, meeting_store: MeetingStore) -> None:
"""GetUserIntegrations response includes id, name, type, status, workspace_id."""
servicer = NoteFlowServicer()
servicer.memory_store = meeting_store
context = _DummyContext()
integration = await create_test_integration(meeting_store, "Test Calendar")
response = await _call_get_user_integrations(
servicer,
noteflow_pb2.GetUserIntegrationsRequest(),
context,
)
expected_count = 1
assert len(response.integrations) == expected_count, "Should return one integration"
info = response.integrations[0]
expected_id = str(integration.id)
assert info.id == expected_id, "ID should match"
assert info.name == "Test Calendar", "Name should match"
assert info.type == "calendar", "Type should be calendar"
assert info.status == "connected", "Status should be connected"
assert info.workspace_id, "Workspace ID should be present"
@pytest.mark.asyncio
async def test_excludes_deleted_integration(self, meeting_store: MeetingStore) -> None:
"""Deleted integrations are excluded from response."""
servicer = NoteFlowServicer()
servicer.memory_store = meeting_store
context = _DummyContext()
int1 = await create_test_integration(meeting_store, "Calendar 1")
int2 = await create_test_integration(meeting_store, "Calendar 2")
await meeting_store.integrations.delete(int1.id)
response = await _call_get_user_integrations(
servicer,
noteflow_pb2.GetUserIntegrationsRequest(),
context,
)
expected_count = 1
assert len(response.integrations) == expected_count, "Should return only remaining"
expected_id = str(int2.id)
assert response.integrations[0].id == expected_id, "Should be non-deleted"
class TestNotFoundStatusCode:
"""Verify gRPC NOT_FOUND status code is returned correctly (Sprint 18.1)."""
CallMethod = Callable[[NoteFlowServicer, object, _DummyContext], Awaitable[object]]
@pytest.mark.asyncio
@pytest.mark.parametrize(
("call_method", "proto_request", "label"),
[
pytest.param(
_call_start_sync,
noteflow_pb2.StartIntegrationSyncRequest(integration_id=str(uuid4())),
"start_sync",
id="start_sync",
),
pytest.param(
_call_get_sync_status,
noteflow_pb2.GetSyncStatusRequest(sync_run_id=str(uuid4())),
"get_sync_status",
id="get_sync_status",
),
],
)
async def test_nonexistent_aborts_with_not_found(
self,
servicer_with_success: NoteFlowServicer,
call_method: CallMethod,
proto_request: object,
label: str,
) -> None:
"""Sync operations abort with NOT_FOUND for missing IDs."""
context = _DummyContext()
with pytest.raises(AssertionError, match="abort called"):
await call_method(servicer_with_success, proto_request, context)
assert context.aborted, f"Should abort for nonexistent {label}"
assert context.abort_code == _get_status_code_not_found(), "Code should be NOT_FOUND"
class TestSyncRunExpiryMetadata:
"""Test sync run expiry metadata (Sprint GAP-002)."""
@pytest.mark.asyncio
async def test_sync_status_includes_expires_at(
self, servicer_with_success: NoteFlowServicer, meeting_store: MeetingStore
) -> None:
"""GetSyncStatus includes expires_at."""
integration = await create_test_integration(meeting_store)
context = _DummyContext()
start = await _call_start_sync(
servicer_with_success,
noteflow_pb2.StartIntegrationSyncRequest(integration_id=str(integration.id)),
context,
)
status = await _call_get_sync_status(
servicer_with_success,
noteflow_pb2.GetSyncStatusRequest(sync_run_id=start.sync_run_id),
context,
)
assert start.sync_run_id, "Should have sync run ID"
assert status.expires_at, "Response should include expires_at"
@pytest.mark.asyncio
async def test_sync_run_cache_tracks_times(
self, servicer_with_success: NoteFlowServicer, meeting_store: MeetingStore
) -> None:
"""Sync run cache tracks cache times."""
integration = await create_test_integration(meeting_store)
context = _DummyContext()
start = await _call_start_sync(
servicer_with_success,
noteflow_pb2.StartIntegrationSyncRequest(integration_id=str(integration.id)),
context,
)
sync_run_id_uuid = UUID(start.sync_run_id)
# Type annotation needed: ensure_sync_runs_cache is a mixin method added via SyncMixin
ensure_cache = cast(
Callable[[], dict[UUID, SyncRun]],
servicer_with_success.ensure_sync_runs_cache,
)
ensure_cache()
cache_times: dict[UUID, datetime] = servicer_with_success.sync_run_cache_times
cache_time = cache_times.get(sync_run_id_uuid)
assert cache_time is not None, "Cache times should track the sync run"
class TestServerStateVersion:
"""Test server state version (Sprint GAP-002)."""
@pytest.mark.asyncio
async def test_server_info_includes_state_version(self) -> None:
"""GetServerInfo response includes state_version."""
servicer = NoteFlowServicer()
context = _DummyContext()
response = await servicer.GetServerInfo(
noteflow_pb2.ServerInfoRequest(),
context,
)
state_version = response.state_version
assert state_version >= 1, "State version should be positive"
@pytest.mark.asyncio
async def test_state_version_is_consistent(self) -> None:
"""Multiple GetServerInfo calls return consistent state_version."""
servicer = NoteFlowServicer()
context = _DummyContext()
response1 = await servicer.GetServerInfo(
noteflow_pb2.ServerInfoRequest(),
context,
)
response2 = await servicer.GetServerInfo(
noteflow_pb2.ServerInfoRequest(),
context,
)
assert response1.state_version == response2.state_version, (
"State version should be consistent across calls"
)
class TestListAllRepository:
"""Test list_all repository method (Sprint 18.1)."""
@pytest.mark.asyncio
async def test_list_all_returns_empty_for_new_store(self) -> None:
"""list_all returns empty list when no integrations exist."""
store = MeetingStore()
result = await store.integrations.list_all()
assert result is not None, "Should return a list"
expected_count = 0
assert len(result) == expected_count, "Should be empty for new store"
@pytest.mark.asyncio
async def test_list_all_returns_created_integration(self) -> None:
"""list_all returns integration that was created."""
store = MeetingStore()
integration = await create_test_integration(store, "Test Calendar")
result = await store.integrations.list_all()
expected_count = 1
assert len(result) == expected_count, "Should return one integration"
assert result[0].id == integration.id, "ID should match created integration"
@pytest.mark.asyncio
async def test_list_all_excludes_deleted_integrations(self) -> None:
"""Deleted integrations are excluded from list_all results."""
store = MeetingStore()
int1 = await create_test_integration(store, "Calendar 1")
int2 = await create_test_integration(store, "Calendar 2")
await store.integrations.delete(int1.id)
result = await store.integrations.list_all()
expected_count = 1
assert len(result) == expected_count, "Should return only non-deleted"
assert result[0].id == int2.id, "Should return the non-deleted integration"