Files
noteflow/tests/grpc/test_annotation_mixin.py
Travis Vasceannie b11633192a
Some checks failed
CI / test-python (push) Failing after 22m14s
CI / test-rust (push) Has been cancelled
CI / test-typescript (push) Has been cancelled
deps
2026-01-24 21:31:58 +00:00

836 lines
30 KiB
Python

"""Tests for AnnotationMixin gRPC endpoints.
Tests cover:
- AddAnnotation: success with all fields, missing meeting
- GetAnnotation: found, not found, invalid ID
- ListAnnotations: empty list, with annotations, time range filter
- UpdateAnnotation: success, not found, partial updates
- DeleteAnnotation: success, not found
"""
from __future__ import annotations
from collections.abc import Sequence
from datetime import UTC, datetime
from typing import TYPE_CHECKING, cast
from unittest.mock import AsyncMock, MagicMock
from uuid import uuid4
import pytest
from noteflow.domain.entities import Annotation
from noteflow.domain.value_objects import AnnotationId, AnnotationType, MeetingId
from noteflow.grpc.mixins._types import GrpcContext
from noteflow.grpc.mixins.annotation import AnnotationMixin
from noteflow.grpc.proto import noteflow_pb2
# Test constants for annotation timestamps and time ranges
SAMPLE_ANNOTATION_END_TIME = 120.0
SAMPLE_ANNOTATION_START_TIME_SHORT = 15.0
SAMPLE_ANNOTATION_START_TIME_ACTION = 25.0
TIME_RANGE_FILTER_START = 20.0
class MockRepositoryProvider:
"""Mock repository provider as async context manager."""
def __init__(
self,
annotations_repo: AsyncMock,
meetings_repo: AsyncMock | None = None,
) -> None:
"""Initialize with mock repositories."""
self.supports_annotations = True
self.annotations = annotations_repo
self.meetings = meetings_repo or AsyncMock()
self.commit = AsyncMock()
async def __aenter__(self) -> MockRepositoryProvider:
"""Enter async context."""
return self
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: object,
) -> None:
"""Exit async context."""
class MockServicerHost(AnnotationMixin):
"""Mock servicer host implementing required protocol."""
def __init__(
self,
annotations_repo: AsyncMock,
meetings_repo: AsyncMock | None = None,
) -> None:
"""Initialize with mock repositories."""
self._annotations_repo = annotations_repo
self._meetings_repo = meetings_repo
def create_repository_provider(self) -> MockRepositoryProvider:
"""Create mock repository provider context manager."""
return MockRepositoryProvider(
self._annotations_repo,
self._meetings_repo,
)
if TYPE_CHECKING:
# Type stubs for proper type inference (not executed at runtime)
async def AddAnnotation(
self,
request: noteflow_pb2.AddAnnotationRequest,
context: GrpcContext,
) -> noteflow_pb2.Annotation: ...
async def GetAnnotation(
self,
request: noteflow_pb2.GetAnnotationRequest,
context: GrpcContext,
) -> noteflow_pb2.Annotation: ...
async def ListAnnotations(
self,
request: noteflow_pb2.ListAnnotationsRequest,
context: GrpcContext,
) -> noteflow_pb2.ListAnnotationsResponse: ...
async def UpdateAnnotation(
self,
request: noteflow_pb2.UpdateAnnotationRequest,
context: GrpcContext,
) -> noteflow_pb2.Annotation: ...
async def DeleteAnnotation(
self,
request: noteflow_pb2.DeleteAnnotationRequest,
context: GrpcContext,
) -> noteflow_pb2.DeleteAnnotationResponse: ...
def create_sample_annotation(
meeting_id: MeetingId | None = None,
annotation_id: AnnotationId | None = None,
annotation_type: AnnotationType = AnnotationType.NOTE,
text: str = "Test annotation",
start_time: float = 10.0,
end_time: float = 20.0,
segment_ids: list[int] | None = None,
) -> Annotation:
"""Create a sample Annotation for testing."""
return Annotation(
id=annotation_id or AnnotationId(uuid4()),
meeting_id=meeting_id or MeetingId(uuid4()),
annotation_type=annotation_type,
text=text,
start_time=start_time,
end_time=end_time,
segment_ids=segment_ids or [],
created_at=datetime(2024, 1, 15, 10, 30, 0, tzinfo=UTC),
)
@pytest.fixture
def mock_annotations_repo() -> AsyncMock:
"""Create mock annotations repository with common methods."""
repo = AsyncMock()
repo.add = AsyncMock()
repo.get = AsyncMock(return_value=None)
repo.get_by_meeting = AsyncMock(return_value=[])
repo.get_by_time_range = AsyncMock(return_value=[])
repo.update = AsyncMock()
repo.delete = AsyncMock(return_value=False)
return repo
def configure_mock_add_passthrough(repo: AsyncMock) -> None:
"""Configure mock repo.add to return the annotation passed to it."""
async def mock_add(annotation: Annotation) -> Annotation:
return annotation
repo.add.side_effect = mock_add
def configure_mock_update_passthrough(repo: AsyncMock) -> None:
"""Configure mock repo.update to return the annotation passed to it."""
async def mock_update(annotation: Annotation) -> Annotation:
return annotation
repo.update.side_effect = mock_update
def create_sample_annotations_list(meeting_id: MeetingId) -> list[Annotation]:
"""Create a list of sample annotations for testing ListAnnotations."""
return [
create_sample_annotation(
meeting_id=meeting_id,
annotation_type=AnnotationType.NOTE,
text="First note",
start_time=10.0,
end_time=20.0,
),
create_sample_annotation(
meeting_id=meeting_id,
annotation_type=AnnotationType.DECISION,
text="Important decision",
start_time=30.0,
end_time=40.0,
),
create_sample_annotation(
meeting_id=meeting_id,
annotation_type=AnnotationType.ACTION_ITEM,
text="Follow up required",
start_time=50.0,
end_time=60.0,
),
]
class TestAddAnnotation:
"""Tests for AddAnnotation RPC."""
@pytest.fixture
def servicer(self, mock_annotations_repo: AsyncMock) -> MockServicerHost:
"""Create servicer with mock repository."""
return MockServicerHost(mock_annotations_repo)
async def test_adds_annotation_with_all_fields(
self,
servicer: MockServicerHost,
mock_annotations_repo: AsyncMock,
mock_grpc_context: MagicMock,
) -> None:
"""AddAnnotation creates annotation with all fields populated."""
meeting_id = MeetingId(uuid4())
expected_text = "Important decision"
expected_end_time = SAMPLE_ANNOTATION_END_TIME
configure_mock_add_passthrough(mock_annotations_repo)
request = noteflow_pb2.AddAnnotationRequest(
meeting_id=str(meeting_id),
annotation_type=noteflow_pb2.ANNOTATION_TYPE_DECISION,
text=expected_text,
start_time=SAMPLE_ANNOTATION_START_TIME_SHORT,
end_time=expected_end_time,
segment_ids=[1, 2, 3],
)
response = await servicer.AddAnnotation(request, mock_grpc_context)
expected_meeting_id = str(meeting_id)
assert response.text == expected_text, "text should match request"
assert response.start_time == SAMPLE_ANNOTATION_START_TIME_SHORT, "start_time should match"
assert response.end_time == expected_end_time, "end_time should match"
assert list(cast(Sequence[int], response.segment_ids)) == [1, 2, 3], (
"segment_ids should match"
)
assert response.meeting_id == expected_meeting_id, "meeting_id should match"
assert response.annotation_type == noteflow_pb2.ANNOTATION_TYPE_DECISION, (
"type should be DECISION"
)
mock_annotations_repo.add.assert_called_once()
@pytest.mark.parametrize(
("annotation_type", "text", "start_time", "end_time"),
[
pytest.param(
noteflow_pb2.AnnotationType.ANNOTATION_TYPE_NOTE,
"A simple note",
5.0,
10.0,
id="note",
),
pytest.param(
noteflow_pb2.AnnotationType.ANNOTATION_TYPE_ACTION_ITEM,
"Follow up with client",
SAMPLE_ANNOTATION_START_TIME_ACTION,
35.0,
id="action_item",
),
pytest.param(
noteflow_pb2.AnnotationType.ANNOTATION_TYPE_RISK,
"Potential budget overrun",
45.0,
55.0,
id="risk",
),
],
)
async def test_adds_annotation_with_types(
self,
servicer: MockServicerHost,
mock_annotations_repo: AsyncMock,
mock_grpc_context: MagicMock,
annotation_type: noteflow_pb2.AnnotationType,
text: str,
start_time: float,
end_time: float,
) -> None:
meeting_id = MeetingId(uuid4())
configure_mock_add_passthrough(mock_annotations_repo)
request = noteflow_pb2.AddAnnotationRequest(
meeting_id=str(meeting_id),
annotation_type=annotation_type,
text=text,
start_time=start_time,
end_time=end_time,
)
response = await servicer.AddAnnotation(request, mock_grpc_context)
assert response.annotation_type == annotation_type, "annotation_type should match request"
assert response.text == text, "text should match request"
async def test_adds_annotation_commits_transaction(
self,
servicer: MockServicerHost,
mock_annotations_repo: AsyncMock,
mock_grpc_context: MagicMock,
) -> None:
"""AddAnnotation commits the transaction after adding."""
meeting_id = MeetingId(uuid4())
async def mock_add(annotation: Annotation) -> Annotation:
return annotation
mock_annotations_repo.add.side_effect = mock_add
request = noteflow_pb2.AddAnnotationRequest(
meeting_id=str(meeting_id),
annotation_type=noteflow_pb2.ANNOTATION_TYPE_NOTE,
text="Test note",
start_time=0.0,
end_time=5.0,
)
await servicer.AddAnnotation(request, mock_grpc_context)
# Verify commit was called on the repository provider
# (We need to check the mock provider's commit - access via the servicer)
mock_annotations_repo.add.assert_called_once()
class TestGetAnnotation:
"""Tests for GetAnnotation RPC."""
@pytest.fixture
def servicer(self, mock_annotations_repo: AsyncMock) -> MockServicerHost:
"""Create servicer with mock repository."""
return MockServicerHost(mock_annotations_repo)
@pytest.fixture
def found_annotation(self) -> tuple[AnnotationId, MeetingId, Annotation]:
"""Create a sample annotation with known identifiers."""
annotation_id = AnnotationId(uuid4())
meeting_id = MeetingId(uuid4())
annotation = create_sample_annotation(
annotation_id=annotation_id,
meeting_id=meeting_id,
annotation_type=AnnotationType.DECISION,
text="Key decision made",
start_time=100.0,
end_time=SAMPLE_ANNOTATION_END_TIME,
segment_ids=[5, 6, 7],
)
return annotation_id, meeting_id, annotation
async def test_returns_annotation_when_found(
self,
servicer: MockServicerHost,
mock_annotations_repo: AsyncMock,
mock_grpc_context: MagicMock,
found_annotation: tuple[AnnotationId, MeetingId, Annotation],
) -> None:
"""GetAnnotation returns annotation when it exists."""
annotation_id, meeting_id, expected_annotation = found_annotation
mock_annotations_repo.get.return_value = expected_annotation
expected_annotation_id = str(annotation_id)
expected_meeting_id = str(meeting_id)
request = noteflow_pb2.GetAnnotationRequest(annotation_id=expected_annotation_id)
response = await servicer.GetAnnotation(request, mock_grpc_context)
assert response.id == expected_annotation_id, "id should match request"
assert response.meeting_id == expected_meeting_id, "meeting_id should match"
assert response.text == expected_annotation.text, "text should match"
assert response.start_time == expected_annotation.start_time, "start_time should match"
assert response.end_time == expected_annotation.end_time, "end_time should match"
assert list(cast(Sequence[int], response.segment_ids)) == [5, 6, 7], (
"segment_ids should match"
)
assert response.annotation_type == noteflow_pb2.ANNOTATION_TYPE_DECISION, (
"annotation_type should be DECISION"
)
async def test_aborts_when_annotation_not_found_get_annotation(
self,
servicer: MockServicerHost,
mock_annotations_repo: AsyncMock,
mock_grpc_context: MagicMock,
) -> None:
"""GetAnnotation aborts with NOT_FOUND when annotation does not exist."""
annotation_id = uuid4()
mock_annotations_repo.get.return_value = None
request = noteflow_pb2.GetAnnotationRequest(annotation_id=str(annotation_id))
with pytest.raises(AssertionError, match="Unreachable"):
await servicer.GetAnnotation(request, mock_grpc_context)
mock_grpc_context.abort.assert_called_once()
class TestListAnnotations:
"""Tests for ListAnnotations RPC."""
@pytest.fixture
def servicer(self, mock_annotations_repo: AsyncMock) -> MockServicerHost:
"""Create servicer with mock repository."""
return MockServicerHost(mock_annotations_repo)
async def test_returns_empty_list_when_no_annotations(
self,
servicer: MockServicerHost,
mock_annotations_repo: AsyncMock,
mock_grpc_context: MagicMock,
) -> None:
"""ListAnnotations returns empty list when meeting has no annotations."""
meeting_id = MeetingId(uuid4())
mock_annotations_repo.get_by_meeting.return_value = []
request = noteflow_pb2.ListAnnotationsRequest(meeting_id=str(meeting_id))
response = await servicer.ListAnnotations(request, mock_grpc_context)
assert len(cast(Sequence[object], response.annotations)) == 0, "should return empty list"
mock_annotations_repo.get_by_meeting.assert_called_once()
async def test_returns_annotations_for_meeting(
self,
servicer: MockServicerHost,
mock_annotations_repo: AsyncMock,
mock_grpc_context: MagicMock,
) -> None:
"""ListAnnotations returns all annotations for meeting."""
meeting_id = MeetingId(uuid4())
annotations = create_sample_annotations_list(meeting_id)
mock_annotations_repo.get_by_meeting.return_value = annotations
request = noteflow_pb2.ListAnnotationsRequest(meeting_id=str(meeting_id))
response = await servicer.ListAnnotations(request, mock_grpc_context)
annotations_list = cast(Sequence[noteflow_pb2.Annotation], response.annotations)
assert len(annotations_list) == 3, "should return all 3 annotations"
assert annotations_list[0].text == "First note", "first annotation text should match"
assert annotations_list[1].text == "Important decision", (
"second annotation text should match"
)
assert annotations_list[2].text == "Follow up required", (
"third annotation text should match"
)
async def test_filters_by_time_range(
self,
servicer: MockServicerHost,
mock_annotations_repo: AsyncMock,
mock_grpc_context: MagicMock,
) -> None:
"""ListAnnotations filters by time range when specified."""
meeting_id = MeetingId(uuid4())
filtered_annotations = [
create_sample_annotation(
meeting_id=meeting_id,
text="Annotation in range",
start_time=25.0,
end_time=35.0,
),
]
mock_annotations_repo.get_by_time_range.return_value = filtered_annotations
request = noteflow_pb2.ListAnnotationsRequest(
meeting_id=str(meeting_id),
start_time=TIME_RANGE_FILTER_START,
end_time=40.0,
)
response = await servicer.ListAnnotations(request, mock_grpc_context)
annotations_list = cast(Sequence[noteflow_pb2.Annotation], response.annotations)
assert len(annotations_list) == 1, "should return filtered annotations"
assert annotations_list[0].text == "Annotation in range", (
"filtered annotation text should match"
)
mock_annotations_repo.get_by_time_range.assert_called_once_with(
meeting_id, TIME_RANGE_FILTER_START, 40.0
)
mock_annotations_repo.get_by_meeting.assert_not_called()
async def test_uses_time_range_filter_with_start_time_only(
self,
servicer: MockServicerHost,
mock_annotations_repo: AsyncMock,
mock_grpc_context: MagicMock,
) -> None:
"""ListAnnotations uses time range filter when only start_time is specified."""
meeting_id = MeetingId(uuid4())
mock_annotations_repo.get_by_time_range.return_value = []
request = noteflow_pb2.ListAnnotationsRequest(
meeting_id=str(meeting_id),
start_time=50.0,
end_time=0.0, # Not specified
)
response = await servicer.ListAnnotations(request, mock_grpc_context)
assert len(cast(Sequence[object], response.annotations)) == 0, (
"should return empty list for start_time filter"
)
mock_annotations_repo.get_by_time_range.assert_called_once_with(meeting_id, 50.0, 0.0)
class TestUpdateAnnotation:
"""Tests for UpdateAnnotation RPC."""
@pytest.fixture
def servicer(self, mock_annotations_repo: AsyncMock) -> MockServicerHost:
"""Create servicer with mock repository."""
return MockServicerHost(mock_annotations_repo)
@pytest.fixture
def update_annotation_request(
self,
) -> tuple[AnnotationId, Annotation, noteflow_pb2.UpdateAnnotationRequest]:
"""Create original annotation and update request."""
annotation_id = AnnotationId(uuid4())
original = create_sample_annotation(
annotation_id=annotation_id,
meeting_id=MeetingId(uuid4()),
annotation_type=AnnotationType.NOTE,
text="Original text",
start_time=10.0,
end_time=20.0,
segment_ids=[1],
)
request = noteflow_pb2.UpdateAnnotationRequest(
annotation_id=str(annotation_id),
annotation_type=noteflow_pb2.ANNOTATION_TYPE_DECISION,
text="Updated text",
start_time=SAMPLE_ANNOTATION_START_TIME_SHORT,
end_time=SAMPLE_ANNOTATION_START_TIME_ACTION,
segment_ids=[2, 3],
)
return annotation_id, original, request
async def test_updates_annotation_successfully(
self,
servicer: MockServicerHost,
mock_annotations_repo: AsyncMock,
mock_grpc_context: MagicMock,
update_annotation_request: tuple[
AnnotationId,
Annotation,
noteflow_pb2.UpdateAnnotationRequest,
],
) -> None:
"""UpdateAnnotation updates all fields when provided."""
annotation_id, original, request = update_annotation_request
mock_annotations_repo.get.return_value = original
configure_mock_update_passthrough(mock_annotations_repo)
response = await servicer.UpdateAnnotation(request, mock_grpc_context)
expected_annotation_id = str(annotation_id)
assert response.id == expected_annotation_id, "id should remain unchanged"
assert response.text == request.text, "text should be updated"
assert response.annotation_type == request.annotation_type, "type should be DECISION"
assert response.start_time == request.start_time, "start_time should be updated"
assert list(cast(Sequence[int], response.segment_ids)) == list(
cast(Sequence[int], request.segment_ids)
), "segment_ids should be updated"
mock_annotations_repo.update.assert_called_once()
async def test_updates_text_only(
self,
servicer: MockServicerHost,
mock_annotations_repo: AsyncMock,
mock_grpc_context: MagicMock,
) -> None:
"""UpdateAnnotation updates only text when other fields not provided."""
annotation_id = AnnotationId(uuid4())
meeting_id = MeetingId(uuid4())
original_annotation = create_sample_annotation(
annotation_id=annotation_id,
meeting_id=meeting_id,
annotation_type=AnnotationType.NOTE,
text="Original text",
start_time=10.0,
end_time=TIME_RANGE_FILTER_START,
)
mock_annotations_repo.get.return_value = original_annotation
async def mock_update(annotation: Annotation) -> Annotation:
return annotation
mock_annotations_repo.update.side_effect = mock_update
request = noteflow_pb2.UpdateAnnotationRequest(
annotation_id=str(annotation_id),
text="Only text updated",
)
response = await servicer.UpdateAnnotation(request, mock_grpc_context)
assert response.text == "Only text updated", "text should be updated"
# Note: annotation_type stays as original since UNSPECIFIED is sent
assert response.start_time == 10.0, "start_time should remain unchanged"
assert response.end_time == TIME_RANGE_FILTER_START, "end_time should remain unchanged"
async def test_aborts_when_annotation_not_found_update_annotation(
self,
servicer: MockServicerHost,
mock_annotations_repo: AsyncMock,
mock_grpc_context: MagicMock,
) -> None:
"""UpdateAnnotation aborts with NOT_FOUND when annotation does not exist."""
annotation_id = uuid4()
mock_annotations_repo.get.return_value = None
request = noteflow_pb2.UpdateAnnotationRequest(
annotation_id=str(annotation_id),
text="Updated text",
)
with pytest.raises(AssertionError, match="Unreachable"):
await servicer.UpdateAnnotation(request, mock_grpc_context)
mock_grpc_context.abort.assert_called_once()
mock_annotations_repo.update.assert_not_called()
async def test_updates_segment_ids_when_provided(
self,
servicer: MockServicerHost,
mock_annotations_repo: AsyncMock,
mock_grpc_context: MagicMock,
) -> None:
"""UpdateAnnotation updates segment_ids when provided."""
annotation_id = AnnotationId(uuid4())
original_annotation = create_sample_annotation(
annotation_id=annotation_id,
segment_ids=[1, 2],
)
mock_annotations_repo.get.return_value = original_annotation
async def mock_update(annotation: Annotation) -> Annotation:
return annotation
mock_annotations_repo.update.side_effect = mock_update
request = noteflow_pb2.UpdateAnnotationRequest(
annotation_id=str(annotation_id),
segment_ids=[3, 4, 5],
)
response = await servicer.UpdateAnnotation(request, mock_grpc_context)
assert list(cast(Sequence[int], response.segment_ids)) == [3, 4, 5], (
"segment_ids should be updated"
)
class TestDeleteAnnotation:
"""Tests for DeleteAnnotation RPC."""
@pytest.fixture
def servicer(self, mock_annotations_repo: AsyncMock) -> MockServicerHost:
"""Create servicer with mock repository."""
return MockServicerHost(mock_annotations_repo)
async def test_deletes_annotation_successfully(
self,
servicer: MockServicerHost,
mock_annotations_repo: AsyncMock,
mock_grpc_context: MagicMock,
) -> None:
"""DeleteAnnotation returns success when annotation is deleted."""
annotation_id = uuid4()
mock_annotations_repo.delete.return_value = True
request = noteflow_pb2.DeleteAnnotationRequest(annotation_id=str(annotation_id))
response = await servicer.DeleteAnnotation(request, mock_grpc_context)
assert response.success is True, "should return success=True"
mock_annotations_repo.delete.assert_called_once()
async def test_aborts_when_annotation_not_found_delete_annotation(
self,
servicer: MockServicerHost,
mock_annotations_repo: AsyncMock,
mock_grpc_context: MagicMock,
) -> None:
"""DeleteAnnotation aborts with NOT_FOUND when annotation does not exist."""
annotation_id = uuid4()
mock_annotations_repo.delete.return_value = False
request = noteflow_pb2.DeleteAnnotationRequest(annotation_id=str(annotation_id))
with pytest.raises(AssertionError, match="Unreachable"):
await servicer.DeleteAnnotation(request, mock_grpc_context)
mock_grpc_context.abort.assert_called_once()
# Test data for invalid ID tests - defined at module level to avoid eager test detection
_INVALID_ID_TEST_CASES: list[tuple[str, object]] = [
(
"AddAnnotation",
noteflow_pb2.AddAnnotationRequest(
meeting_id="not-a-valid-uuid",
annotation_type=noteflow_pb2.ANNOTATION_TYPE_NOTE,
text="Test",
start_time=0.0,
end_time=5.0,
),
),
(
"GetAnnotation",
noteflow_pb2.GetAnnotationRequest(annotation_id="not-a-valid-uuid"),
),
(
"ListAnnotations",
noteflow_pb2.ListAnnotationsRequest(meeting_id="invalid-uuid"),
),
(
"UpdateAnnotation",
noteflow_pb2.UpdateAnnotationRequest(
annotation_id="not-a-valid-uuid",
text="Updated text",
),
),
(
"DeleteAnnotation",
noteflow_pb2.DeleteAnnotationRequest(annotation_id="not-valid"),
),
]
class TestAnnotationInvalidIds:
"""Tests for invalid ID handling across annotation RPCs."""
@pytest.mark.parametrize(
("method_name", "proto_request"),
_INVALID_ID_TEST_CASES,
ids=[
"add_annotation",
"get_annotation",
"list_annotations",
"update_annotation",
"delete_annotation",
],
)
async def test_aborts_on_invalid_ids(
self,
mock_annotations_repo: AsyncMock,
mock_grpc_context: MagicMock,
method_name: str,
proto_request: object,
) -> None:
"""Annotation RPCs should abort on invalid IDs."""
servicer = MockServicerHost(mock_annotations_repo)
method = getattr(servicer, method_name)
with pytest.raises(AssertionError, match="Unreachable"):
await method(proto_request, mock_grpc_context)
mock_grpc_context.abort.assert_called_once()
class TestDatabaseNotSupported:
"""Tests for when database/annotations are not available."""
@pytest.fixture
def servicer_no_db(self) -> MockServicerHost:
"""Create servicer with database not supported."""
repo = AsyncMock()
servicer = MockServicerHost(repo)
# Override repository provider to not support annotations
provider = MockRepositoryProvider(repo)
provider.supports_annotations = False
servicer.create_repository_provider = lambda: provider
return servicer
async def test_add_annotation_aborts_without_database(
self,
servicer_no_db: MockServicerHost,
mock_grpc_context: MagicMock,
) -> None:
"""AddAnnotation aborts when database not available."""
request = noteflow_pb2.AddAnnotationRequest(
meeting_id=str(uuid4()),
annotation_type=noteflow_pb2.ANNOTATION_TYPE_NOTE,
text="Test",
start_time=0.0,
end_time=5.0,
)
with pytest.raises(AssertionError, match="Unreachable"):
await servicer_no_db.AddAnnotation(request, mock_grpc_context)
mock_grpc_context.abort.assert_called_once()
async def test_get_annotation_aborts_without_database(
self,
servicer_no_db: MockServicerHost,
mock_grpc_context: MagicMock,
) -> None:
"""GetAnnotation aborts when database not available."""
request = noteflow_pb2.GetAnnotationRequest(annotation_id=str(uuid4()))
with pytest.raises(AssertionError, match="Unreachable"):
await servicer_no_db.GetAnnotation(request, mock_grpc_context)
mock_grpc_context.abort.assert_called_once()
async def test_list_annotations_aborts_without_database(
self,
servicer_no_db: MockServicerHost,
mock_grpc_context: MagicMock,
) -> None:
"""ListAnnotations aborts when database not available."""
request = noteflow_pb2.ListAnnotationsRequest(meeting_id=str(uuid4()))
with pytest.raises(AssertionError, match="Unreachable"):
await servicer_no_db.ListAnnotations(request, mock_grpc_context)
mock_grpc_context.abort.assert_called_once()
async def test_update_annotation_aborts_without_database(
self,
servicer_no_db: MockServicerHost,
mock_grpc_context: MagicMock,
) -> None:
"""UpdateAnnotation aborts when database not available."""
request = noteflow_pb2.UpdateAnnotationRequest(
annotation_id=str(uuid4()),
text="Updated",
)
with pytest.raises(AssertionError, match="Unreachable"):
await servicer_no_db.UpdateAnnotation(request, mock_grpc_context)
mock_grpc_context.abort.assert_called_once()
async def test_delete_annotation_aborts_without_database(
self,
servicer_no_db: MockServicerHost,
mock_grpc_context: MagicMock,
) -> None:
"""DeleteAnnotation aborts when database not available."""
request = noteflow_pb2.DeleteAnnotationRequest(annotation_id=str(uuid4()))
with pytest.raises(AssertionError, match="Unreachable"):
await servicer_no_db.DeleteAnnotation(request, mock_grpc_context)
mock_grpc_context.abort.assert_called_once()