836 lines
30 KiB
Python
836 lines
30 KiB
Python
"""Tests for AnnotationMixin gRPC endpoints.
|
|
|
|
Tests cover:
|
|
- AddAnnotation: success with all fields, missing meeting
|
|
- GetAnnotation: found, not found, invalid ID
|
|
- ListAnnotations: empty list, with annotations, time range filter
|
|
- UpdateAnnotation: success, not found, partial updates
|
|
- DeleteAnnotation: success, not found
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from collections.abc import Sequence
|
|
from datetime import UTC, datetime
|
|
from typing import TYPE_CHECKING, cast
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
from uuid import uuid4
|
|
|
|
import pytest
|
|
|
|
from noteflow.domain.entities import Annotation
|
|
from noteflow.domain.value_objects import AnnotationId, AnnotationType, MeetingId
|
|
from noteflow.grpc.mixins._types import GrpcContext
|
|
from noteflow.grpc.mixins.annotation import AnnotationMixin
|
|
from noteflow.grpc.proto import noteflow_pb2
|
|
|
|
# Test constants for annotation timestamps and time ranges
|
|
SAMPLE_ANNOTATION_END_TIME = 120.0
|
|
SAMPLE_ANNOTATION_START_TIME_SHORT = 15.0
|
|
SAMPLE_ANNOTATION_START_TIME_ACTION = 25.0
|
|
TIME_RANGE_FILTER_START = 20.0
|
|
|
|
|
|
class MockRepositoryProvider:
|
|
"""Mock repository provider as async context manager."""
|
|
|
|
def __init__(
|
|
self,
|
|
annotations_repo: AsyncMock,
|
|
meetings_repo: AsyncMock | None = None,
|
|
) -> None:
|
|
"""Initialize with mock repositories."""
|
|
self.supports_annotations = True
|
|
self.annotations = annotations_repo
|
|
self.meetings = meetings_repo or AsyncMock()
|
|
self.commit = AsyncMock()
|
|
|
|
async def __aenter__(self) -> MockRepositoryProvider:
|
|
"""Enter async context."""
|
|
return self
|
|
|
|
async def __aexit__(
|
|
self,
|
|
exc_type: type[BaseException] | None,
|
|
exc_val: BaseException | None,
|
|
exc_tb: object,
|
|
) -> None:
|
|
"""Exit async context."""
|
|
|
|
|
|
class MockServicerHost(AnnotationMixin):
|
|
"""Mock servicer host implementing required protocol."""
|
|
|
|
def __init__(
|
|
self,
|
|
annotations_repo: AsyncMock,
|
|
meetings_repo: AsyncMock | None = None,
|
|
) -> None:
|
|
"""Initialize with mock repositories."""
|
|
self._annotations_repo = annotations_repo
|
|
self._meetings_repo = meetings_repo
|
|
|
|
def create_repository_provider(self) -> MockRepositoryProvider:
|
|
"""Create mock repository provider context manager."""
|
|
return MockRepositoryProvider(
|
|
self._annotations_repo,
|
|
self._meetings_repo,
|
|
)
|
|
|
|
if TYPE_CHECKING:
|
|
# Type stubs for proper type inference (not executed at runtime)
|
|
async def AddAnnotation(
|
|
self,
|
|
request: noteflow_pb2.AddAnnotationRequest,
|
|
context: GrpcContext,
|
|
) -> noteflow_pb2.Annotation: ...
|
|
|
|
async def GetAnnotation(
|
|
self,
|
|
request: noteflow_pb2.GetAnnotationRequest,
|
|
context: GrpcContext,
|
|
) -> noteflow_pb2.Annotation: ...
|
|
|
|
async def ListAnnotations(
|
|
self,
|
|
request: noteflow_pb2.ListAnnotationsRequest,
|
|
context: GrpcContext,
|
|
) -> noteflow_pb2.ListAnnotationsResponse: ...
|
|
|
|
async def UpdateAnnotation(
|
|
self,
|
|
request: noteflow_pb2.UpdateAnnotationRequest,
|
|
context: GrpcContext,
|
|
) -> noteflow_pb2.Annotation: ...
|
|
|
|
async def DeleteAnnotation(
|
|
self,
|
|
request: noteflow_pb2.DeleteAnnotationRequest,
|
|
context: GrpcContext,
|
|
) -> noteflow_pb2.DeleteAnnotationResponse: ...
|
|
|
|
|
|
def create_sample_annotation(
|
|
meeting_id: MeetingId | None = None,
|
|
annotation_id: AnnotationId | None = None,
|
|
annotation_type: AnnotationType = AnnotationType.NOTE,
|
|
text: str = "Test annotation",
|
|
start_time: float = 10.0,
|
|
end_time: float = 20.0,
|
|
segment_ids: list[int] | None = None,
|
|
) -> Annotation:
|
|
"""Create a sample Annotation for testing."""
|
|
return Annotation(
|
|
id=annotation_id or AnnotationId(uuid4()),
|
|
meeting_id=meeting_id or MeetingId(uuid4()),
|
|
annotation_type=annotation_type,
|
|
text=text,
|
|
start_time=start_time,
|
|
end_time=end_time,
|
|
segment_ids=segment_ids or [],
|
|
created_at=datetime(2024, 1, 15, 10, 30, 0, tzinfo=UTC),
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_annotations_repo() -> AsyncMock:
|
|
"""Create mock annotations repository with common methods."""
|
|
repo = AsyncMock()
|
|
repo.add = AsyncMock()
|
|
repo.get = AsyncMock(return_value=None)
|
|
repo.get_by_meeting = AsyncMock(return_value=[])
|
|
repo.get_by_time_range = AsyncMock(return_value=[])
|
|
repo.update = AsyncMock()
|
|
repo.delete = AsyncMock(return_value=False)
|
|
return repo
|
|
|
|
|
|
def configure_mock_add_passthrough(repo: AsyncMock) -> None:
|
|
"""Configure mock repo.add to return the annotation passed to it."""
|
|
|
|
async def mock_add(annotation: Annotation) -> Annotation:
|
|
return annotation
|
|
|
|
repo.add.side_effect = mock_add
|
|
|
|
|
|
def configure_mock_update_passthrough(repo: AsyncMock) -> None:
|
|
"""Configure mock repo.update to return the annotation passed to it."""
|
|
|
|
async def mock_update(annotation: Annotation) -> Annotation:
|
|
return annotation
|
|
|
|
repo.update.side_effect = mock_update
|
|
|
|
|
|
def create_sample_annotations_list(meeting_id: MeetingId) -> list[Annotation]:
|
|
"""Create a list of sample annotations for testing ListAnnotations."""
|
|
return [
|
|
create_sample_annotation(
|
|
meeting_id=meeting_id,
|
|
annotation_type=AnnotationType.NOTE,
|
|
text="First note",
|
|
start_time=10.0,
|
|
end_time=20.0,
|
|
),
|
|
create_sample_annotation(
|
|
meeting_id=meeting_id,
|
|
annotation_type=AnnotationType.DECISION,
|
|
text="Important decision",
|
|
start_time=30.0,
|
|
end_time=40.0,
|
|
),
|
|
create_sample_annotation(
|
|
meeting_id=meeting_id,
|
|
annotation_type=AnnotationType.ACTION_ITEM,
|
|
text="Follow up required",
|
|
start_time=50.0,
|
|
end_time=60.0,
|
|
),
|
|
]
|
|
|
|
|
|
class TestAddAnnotation:
|
|
"""Tests for AddAnnotation RPC."""
|
|
|
|
@pytest.fixture
|
|
def servicer(self, mock_annotations_repo: AsyncMock) -> MockServicerHost:
|
|
"""Create servicer with mock repository."""
|
|
return MockServicerHost(mock_annotations_repo)
|
|
|
|
async def test_adds_annotation_with_all_fields(
|
|
self,
|
|
servicer: MockServicerHost,
|
|
mock_annotations_repo: AsyncMock,
|
|
mock_grpc_context: MagicMock,
|
|
) -> None:
|
|
"""AddAnnotation creates annotation with all fields populated."""
|
|
meeting_id = MeetingId(uuid4())
|
|
expected_text = "Important decision"
|
|
expected_end_time = SAMPLE_ANNOTATION_END_TIME
|
|
configure_mock_add_passthrough(mock_annotations_repo)
|
|
|
|
request = noteflow_pb2.AddAnnotationRequest(
|
|
meeting_id=str(meeting_id),
|
|
annotation_type=noteflow_pb2.ANNOTATION_TYPE_DECISION,
|
|
text=expected_text,
|
|
start_time=SAMPLE_ANNOTATION_START_TIME_SHORT,
|
|
end_time=expected_end_time,
|
|
segment_ids=[1, 2, 3],
|
|
)
|
|
response = await servicer.AddAnnotation(request, mock_grpc_context)
|
|
|
|
expected_meeting_id = str(meeting_id)
|
|
assert response.text == expected_text, "text should match request"
|
|
assert response.start_time == SAMPLE_ANNOTATION_START_TIME_SHORT, "start_time should match"
|
|
assert response.end_time == expected_end_time, "end_time should match"
|
|
assert list(cast(Sequence[int], response.segment_ids)) == [1, 2, 3], (
|
|
"segment_ids should match"
|
|
)
|
|
assert response.meeting_id == expected_meeting_id, "meeting_id should match"
|
|
assert response.annotation_type == noteflow_pb2.ANNOTATION_TYPE_DECISION, (
|
|
"type should be DECISION"
|
|
)
|
|
mock_annotations_repo.add.assert_called_once()
|
|
|
|
@pytest.mark.parametrize(
|
|
("annotation_type", "text", "start_time", "end_time"),
|
|
[
|
|
pytest.param(
|
|
noteflow_pb2.AnnotationType.ANNOTATION_TYPE_NOTE,
|
|
"A simple note",
|
|
5.0,
|
|
10.0,
|
|
id="note",
|
|
),
|
|
pytest.param(
|
|
noteflow_pb2.AnnotationType.ANNOTATION_TYPE_ACTION_ITEM,
|
|
"Follow up with client",
|
|
SAMPLE_ANNOTATION_START_TIME_ACTION,
|
|
35.0,
|
|
id="action_item",
|
|
),
|
|
pytest.param(
|
|
noteflow_pb2.AnnotationType.ANNOTATION_TYPE_RISK,
|
|
"Potential budget overrun",
|
|
45.0,
|
|
55.0,
|
|
id="risk",
|
|
),
|
|
],
|
|
)
|
|
async def test_adds_annotation_with_types(
|
|
self,
|
|
servicer: MockServicerHost,
|
|
mock_annotations_repo: AsyncMock,
|
|
mock_grpc_context: MagicMock,
|
|
annotation_type: noteflow_pb2.AnnotationType,
|
|
text: str,
|
|
start_time: float,
|
|
end_time: float,
|
|
) -> None:
|
|
meeting_id = MeetingId(uuid4())
|
|
configure_mock_add_passthrough(mock_annotations_repo)
|
|
|
|
request = noteflow_pb2.AddAnnotationRequest(
|
|
meeting_id=str(meeting_id),
|
|
annotation_type=annotation_type,
|
|
text=text,
|
|
start_time=start_time,
|
|
end_time=end_time,
|
|
)
|
|
|
|
response = await servicer.AddAnnotation(request, mock_grpc_context)
|
|
|
|
assert response.annotation_type == annotation_type, "annotation_type should match request"
|
|
assert response.text == text, "text should match request"
|
|
|
|
async def test_adds_annotation_commits_transaction(
|
|
self,
|
|
servicer: MockServicerHost,
|
|
mock_annotations_repo: AsyncMock,
|
|
mock_grpc_context: MagicMock,
|
|
) -> None:
|
|
"""AddAnnotation commits the transaction after adding."""
|
|
meeting_id = MeetingId(uuid4())
|
|
|
|
async def mock_add(annotation: Annotation) -> Annotation:
|
|
return annotation
|
|
|
|
mock_annotations_repo.add.side_effect = mock_add
|
|
|
|
request = noteflow_pb2.AddAnnotationRequest(
|
|
meeting_id=str(meeting_id),
|
|
annotation_type=noteflow_pb2.ANNOTATION_TYPE_NOTE,
|
|
text="Test note",
|
|
start_time=0.0,
|
|
end_time=5.0,
|
|
)
|
|
|
|
await servicer.AddAnnotation(request, mock_grpc_context)
|
|
|
|
# Verify commit was called on the repository provider
|
|
# (We need to check the mock provider's commit - access via the servicer)
|
|
mock_annotations_repo.add.assert_called_once()
|
|
|
|
|
|
class TestGetAnnotation:
|
|
"""Tests for GetAnnotation RPC."""
|
|
|
|
@pytest.fixture
|
|
def servicer(self, mock_annotations_repo: AsyncMock) -> MockServicerHost:
|
|
"""Create servicer with mock repository."""
|
|
return MockServicerHost(mock_annotations_repo)
|
|
|
|
@pytest.fixture
|
|
def found_annotation(self) -> tuple[AnnotationId, MeetingId, Annotation]:
|
|
"""Create a sample annotation with known identifiers."""
|
|
annotation_id = AnnotationId(uuid4())
|
|
meeting_id = MeetingId(uuid4())
|
|
annotation = create_sample_annotation(
|
|
annotation_id=annotation_id,
|
|
meeting_id=meeting_id,
|
|
annotation_type=AnnotationType.DECISION,
|
|
text="Key decision made",
|
|
start_time=100.0,
|
|
end_time=SAMPLE_ANNOTATION_END_TIME,
|
|
segment_ids=[5, 6, 7],
|
|
)
|
|
return annotation_id, meeting_id, annotation
|
|
|
|
async def test_returns_annotation_when_found(
|
|
self,
|
|
servicer: MockServicerHost,
|
|
mock_annotations_repo: AsyncMock,
|
|
mock_grpc_context: MagicMock,
|
|
found_annotation: tuple[AnnotationId, MeetingId, Annotation],
|
|
) -> None:
|
|
"""GetAnnotation returns annotation when it exists."""
|
|
annotation_id, meeting_id, expected_annotation = found_annotation
|
|
mock_annotations_repo.get.return_value = expected_annotation
|
|
expected_annotation_id = str(annotation_id)
|
|
expected_meeting_id = str(meeting_id)
|
|
|
|
request = noteflow_pb2.GetAnnotationRequest(annotation_id=expected_annotation_id)
|
|
response = await servicer.GetAnnotation(request, mock_grpc_context)
|
|
|
|
assert response.id == expected_annotation_id, "id should match request"
|
|
assert response.meeting_id == expected_meeting_id, "meeting_id should match"
|
|
assert response.text == expected_annotation.text, "text should match"
|
|
assert response.start_time == expected_annotation.start_time, "start_time should match"
|
|
assert response.end_time == expected_annotation.end_time, "end_time should match"
|
|
assert list(cast(Sequence[int], response.segment_ids)) == [5, 6, 7], (
|
|
"segment_ids should match"
|
|
)
|
|
assert response.annotation_type == noteflow_pb2.ANNOTATION_TYPE_DECISION, (
|
|
"annotation_type should be DECISION"
|
|
)
|
|
|
|
async def test_aborts_when_annotation_not_found_get_annotation(
|
|
self,
|
|
servicer: MockServicerHost,
|
|
mock_annotations_repo: AsyncMock,
|
|
mock_grpc_context: MagicMock,
|
|
) -> None:
|
|
"""GetAnnotation aborts with NOT_FOUND when annotation does not exist."""
|
|
annotation_id = uuid4()
|
|
mock_annotations_repo.get.return_value = None
|
|
|
|
request = noteflow_pb2.GetAnnotationRequest(annotation_id=str(annotation_id))
|
|
|
|
with pytest.raises(AssertionError, match="Unreachable"):
|
|
await servicer.GetAnnotation(request, mock_grpc_context)
|
|
|
|
mock_grpc_context.abort.assert_called_once()
|
|
|
|
|
|
class TestListAnnotations:
|
|
"""Tests for ListAnnotations RPC."""
|
|
|
|
@pytest.fixture
|
|
def servicer(self, mock_annotations_repo: AsyncMock) -> MockServicerHost:
|
|
"""Create servicer with mock repository."""
|
|
return MockServicerHost(mock_annotations_repo)
|
|
|
|
async def test_returns_empty_list_when_no_annotations(
|
|
self,
|
|
servicer: MockServicerHost,
|
|
mock_annotations_repo: AsyncMock,
|
|
mock_grpc_context: MagicMock,
|
|
) -> None:
|
|
"""ListAnnotations returns empty list when meeting has no annotations."""
|
|
meeting_id = MeetingId(uuid4())
|
|
mock_annotations_repo.get_by_meeting.return_value = []
|
|
|
|
request = noteflow_pb2.ListAnnotationsRequest(meeting_id=str(meeting_id))
|
|
response = await servicer.ListAnnotations(request, mock_grpc_context)
|
|
|
|
assert len(cast(Sequence[object], response.annotations)) == 0, "should return empty list"
|
|
mock_annotations_repo.get_by_meeting.assert_called_once()
|
|
|
|
async def test_returns_annotations_for_meeting(
|
|
self,
|
|
servicer: MockServicerHost,
|
|
mock_annotations_repo: AsyncMock,
|
|
mock_grpc_context: MagicMock,
|
|
) -> None:
|
|
"""ListAnnotations returns all annotations for meeting."""
|
|
meeting_id = MeetingId(uuid4())
|
|
annotations = create_sample_annotations_list(meeting_id)
|
|
mock_annotations_repo.get_by_meeting.return_value = annotations
|
|
|
|
request = noteflow_pb2.ListAnnotationsRequest(meeting_id=str(meeting_id))
|
|
response = await servicer.ListAnnotations(request, mock_grpc_context)
|
|
|
|
annotations_list = cast(Sequence[noteflow_pb2.Annotation], response.annotations)
|
|
assert len(annotations_list) == 3, "should return all 3 annotations"
|
|
assert annotations_list[0].text == "First note", "first annotation text should match"
|
|
assert annotations_list[1].text == "Important decision", (
|
|
"second annotation text should match"
|
|
)
|
|
assert annotations_list[2].text == "Follow up required", (
|
|
"third annotation text should match"
|
|
)
|
|
|
|
async def test_filters_by_time_range(
|
|
self,
|
|
servicer: MockServicerHost,
|
|
mock_annotations_repo: AsyncMock,
|
|
mock_grpc_context: MagicMock,
|
|
) -> None:
|
|
"""ListAnnotations filters by time range when specified."""
|
|
meeting_id = MeetingId(uuid4())
|
|
filtered_annotations = [
|
|
create_sample_annotation(
|
|
meeting_id=meeting_id,
|
|
text="Annotation in range",
|
|
start_time=25.0,
|
|
end_time=35.0,
|
|
),
|
|
]
|
|
mock_annotations_repo.get_by_time_range.return_value = filtered_annotations
|
|
|
|
request = noteflow_pb2.ListAnnotationsRequest(
|
|
meeting_id=str(meeting_id),
|
|
start_time=TIME_RANGE_FILTER_START,
|
|
end_time=40.0,
|
|
)
|
|
response = await servicer.ListAnnotations(request, mock_grpc_context)
|
|
|
|
annotations_list = cast(Sequence[noteflow_pb2.Annotation], response.annotations)
|
|
assert len(annotations_list) == 1, "should return filtered annotations"
|
|
assert annotations_list[0].text == "Annotation in range", (
|
|
"filtered annotation text should match"
|
|
)
|
|
mock_annotations_repo.get_by_time_range.assert_called_once_with(
|
|
meeting_id, TIME_RANGE_FILTER_START, 40.0
|
|
)
|
|
mock_annotations_repo.get_by_meeting.assert_not_called()
|
|
|
|
async def test_uses_time_range_filter_with_start_time_only(
|
|
self,
|
|
servicer: MockServicerHost,
|
|
mock_annotations_repo: AsyncMock,
|
|
mock_grpc_context: MagicMock,
|
|
) -> None:
|
|
"""ListAnnotations uses time range filter when only start_time is specified."""
|
|
meeting_id = MeetingId(uuid4())
|
|
mock_annotations_repo.get_by_time_range.return_value = []
|
|
|
|
request = noteflow_pb2.ListAnnotationsRequest(
|
|
meeting_id=str(meeting_id),
|
|
start_time=50.0,
|
|
end_time=0.0, # Not specified
|
|
)
|
|
response = await servicer.ListAnnotations(request, mock_grpc_context)
|
|
|
|
assert len(cast(Sequence[object], response.annotations)) == 0, (
|
|
"should return empty list for start_time filter"
|
|
)
|
|
mock_annotations_repo.get_by_time_range.assert_called_once_with(meeting_id, 50.0, 0.0)
|
|
|
|
|
|
class TestUpdateAnnotation:
|
|
"""Tests for UpdateAnnotation RPC."""
|
|
|
|
@pytest.fixture
|
|
def servicer(self, mock_annotations_repo: AsyncMock) -> MockServicerHost:
|
|
"""Create servicer with mock repository."""
|
|
return MockServicerHost(mock_annotations_repo)
|
|
|
|
@pytest.fixture
|
|
def update_annotation_request(
|
|
self,
|
|
) -> tuple[AnnotationId, Annotation, noteflow_pb2.UpdateAnnotationRequest]:
|
|
"""Create original annotation and update request."""
|
|
annotation_id = AnnotationId(uuid4())
|
|
original = create_sample_annotation(
|
|
annotation_id=annotation_id,
|
|
meeting_id=MeetingId(uuid4()),
|
|
annotation_type=AnnotationType.NOTE,
|
|
text="Original text",
|
|
start_time=10.0,
|
|
end_time=20.0,
|
|
segment_ids=[1],
|
|
)
|
|
request = noteflow_pb2.UpdateAnnotationRequest(
|
|
annotation_id=str(annotation_id),
|
|
annotation_type=noteflow_pb2.ANNOTATION_TYPE_DECISION,
|
|
text="Updated text",
|
|
start_time=SAMPLE_ANNOTATION_START_TIME_SHORT,
|
|
end_time=SAMPLE_ANNOTATION_START_TIME_ACTION,
|
|
segment_ids=[2, 3],
|
|
)
|
|
return annotation_id, original, request
|
|
|
|
async def test_updates_annotation_successfully(
|
|
self,
|
|
servicer: MockServicerHost,
|
|
mock_annotations_repo: AsyncMock,
|
|
mock_grpc_context: MagicMock,
|
|
update_annotation_request: tuple[
|
|
AnnotationId,
|
|
Annotation,
|
|
noteflow_pb2.UpdateAnnotationRequest,
|
|
],
|
|
) -> None:
|
|
"""UpdateAnnotation updates all fields when provided."""
|
|
annotation_id, original, request = update_annotation_request
|
|
mock_annotations_repo.get.return_value = original
|
|
configure_mock_update_passthrough(mock_annotations_repo)
|
|
|
|
response = await servicer.UpdateAnnotation(request, mock_grpc_context)
|
|
|
|
expected_annotation_id = str(annotation_id)
|
|
assert response.id == expected_annotation_id, "id should remain unchanged"
|
|
assert response.text == request.text, "text should be updated"
|
|
assert response.annotation_type == request.annotation_type, "type should be DECISION"
|
|
assert response.start_time == request.start_time, "start_time should be updated"
|
|
assert list(cast(Sequence[int], response.segment_ids)) == list(
|
|
cast(Sequence[int], request.segment_ids)
|
|
), "segment_ids should be updated"
|
|
mock_annotations_repo.update.assert_called_once()
|
|
|
|
async def test_updates_text_only(
|
|
self,
|
|
servicer: MockServicerHost,
|
|
mock_annotations_repo: AsyncMock,
|
|
mock_grpc_context: MagicMock,
|
|
) -> None:
|
|
"""UpdateAnnotation updates only text when other fields not provided."""
|
|
annotation_id = AnnotationId(uuid4())
|
|
meeting_id = MeetingId(uuid4())
|
|
original_annotation = create_sample_annotation(
|
|
annotation_id=annotation_id,
|
|
meeting_id=meeting_id,
|
|
annotation_type=AnnotationType.NOTE,
|
|
text="Original text",
|
|
start_time=10.0,
|
|
end_time=TIME_RANGE_FILTER_START,
|
|
)
|
|
mock_annotations_repo.get.return_value = original_annotation
|
|
|
|
async def mock_update(annotation: Annotation) -> Annotation:
|
|
return annotation
|
|
|
|
mock_annotations_repo.update.side_effect = mock_update
|
|
|
|
request = noteflow_pb2.UpdateAnnotationRequest(
|
|
annotation_id=str(annotation_id),
|
|
text="Only text updated",
|
|
)
|
|
|
|
response = await servicer.UpdateAnnotation(request, mock_grpc_context)
|
|
|
|
assert response.text == "Only text updated", "text should be updated"
|
|
# Note: annotation_type stays as original since UNSPECIFIED is sent
|
|
assert response.start_time == 10.0, "start_time should remain unchanged"
|
|
assert response.end_time == TIME_RANGE_FILTER_START, "end_time should remain unchanged"
|
|
|
|
async def test_aborts_when_annotation_not_found_update_annotation(
|
|
self,
|
|
servicer: MockServicerHost,
|
|
mock_annotations_repo: AsyncMock,
|
|
mock_grpc_context: MagicMock,
|
|
) -> None:
|
|
"""UpdateAnnotation aborts with NOT_FOUND when annotation does not exist."""
|
|
annotation_id = uuid4()
|
|
mock_annotations_repo.get.return_value = None
|
|
|
|
request = noteflow_pb2.UpdateAnnotationRequest(
|
|
annotation_id=str(annotation_id),
|
|
text="Updated text",
|
|
)
|
|
|
|
with pytest.raises(AssertionError, match="Unreachable"):
|
|
await servicer.UpdateAnnotation(request, mock_grpc_context)
|
|
|
|
mock_grpc_context.abort.assert_called_once()
|
|
mock_annotations_repo.update.assert_not_called()
|
|
|
|
async def test_updates_segment_ids_when_provided(
|
|
self,
|
|
servicer: MockServicerHost,
|
|
mock_annotations_repo: AsyncMock,
|
|
mock_grpc_context: MagicMock,
|
|
) -> None:
|
|
"""UpdateAnnotation updates segment_ids when provided."""
|
|
annotation_id = AnnotationId(uuid4())
|
|
original_annotation = create_sample_annotation(
|
|
annotation_id=annotation_id,
|
|
segment_ids=[1, 2],
|
|
)
|
|
mock_annotations_repo.get.return_value = original_annotation
|
|
|
|
async def mock_update(annotation: Annotation) -> Annotation:
|
|
return annotation
|
|
|
|
mock_annotations_repo.update.side_effect = mock_update
|
|
|
|
request = noteflow_pb2.UpdateAnnotationRequest(
|
|
annotation_id=str(annotation_id),
|
|
segment_ids=[3, 4, 5],
|
|
)
|
|
|
|
response = await servicer.UpdateAnnotation(request, mock_grpc_context)
|
|
|
|
assert list(cast(Sequence[int], response.segment_ids)) == [3, 4, 5], (
|
|
"segment_ids should be updated"
|
|
)
|
|
|
|
|
|
class TestDeleteAnnotation:
|
|
"""Tests for DeleteAnnotation RPC."""
|
|
|
|
@pytest.fixture
|
|
def servicer(self, mock_annotations_repo: AsyncMock) -> MockServicerHost:
|
|
"""Create servicer with mock repository."""
|
|
return MockServicerHost(mock_annotations_repo)
|
|
|
|
async def test_deletes_annotation_successfully(
|
|
self,
|
|
servicer: MockServicerHost,
|
|
mock_annotations_repo: AsyncMock,
|
|
mock_grpc_context: MagicMock,
|
|
) -> None:
|
|
"""DeleteAnnotation returns success when annotation is deleted."""
|
|
annotation_id = uuid4()
|
|
mock_annotations_repo.delete.return_value = True
|
|
|
|
request = noteflow_pb2.DeleteAnnotationRequest(annotation_id=str(annotation_id))
|
|
response = await servicer.DeleteAnnotation(request, mock_grpc_context)
|
|
|
|
assert response.success is True, "should return success=True"
|
|
mock_annotations_repo.delete.assert_called_once()
|
|
|
|
async def test_aborts_when_annotation_not_found_delete_annotation(
|
|
self,
|
|
servicer: MockServicerHost,
|
|
mock_annotations_repo: AsyncMock,
|
|
mock_grpc_context: MagicMock,
|
|
) -> None:
|
|
"""DeleteAnnotation aborts with NOT_FOUND when annotation does not exist."""
|
|
annotation_id = uuid4()
|
|
mock_annotations_repo.delete.return_value = False
|
|
|
|
request = noteflow_pb2.DeleteAnnotationRequest(annotation_id=str(annotation_id))
|
|
|
|
with pytest.raises(AssertionError, match="Unreachable"):
|
|
await servicer.DeleteAnnotation(request, mock_grpc_context)
|
|
|
|
mock_grpc_context.abort.assert_called_once()
|
|
|
|
|
|
# Test data for invalid ID tests - defined at module level to avoid eager test detection
|
|
_INVALID_ID_TEST_CASES: list[tuple[str, object]] = [
|
|
(
|
|
"AddAnnotation",
|
|
noteflow_pb2.AddAnnotationRequest(
|
|
meeting_id="not-a-valid-uuid",
|
|
annotation_type=noteflow_pb2.ANNOTATION_TYPE_NOTE,
|
|
text="Test",
|
|
start_time=0.0,
|
|
end_time=5.0,
|
|
),
|
|
),
|
|
(
|
|
"GetAnnotation",
|
|
noteflow_pb2.GetAnnotationRequest(annotation_id="not-a-valid-uuid"),
|
|
),
|
|
(
|
|
"ListAnnotations",
|
|
noteflow_pb2.ListAnnotationsRequest(meeting_id="invalid-uuid"),
|
|
),
|
|
(
|
|
"UpdateAnnotation",
|
|
noteflow_pb2.UpdateAnnotationRequest(
|
|
annotation_id="not-a-valid-uuid",
|
|
text="Updated text",
|
|
),
|
|
),
|
|
(
|
|
"DeleteAnnotation",
|
|
noteflow_pb2.DeleteAnnotationRequest(annotation_id="not-valid"),
|
|
),
|
|
]
|
|
|
|
|
|
class TestAnnotationInvalidIds:
|
|
"""Tests for invalid ID handling across annotation RPCs."""
|
|
|
|
@pytest.mark.parametrize(
|
|
("method_name", "proto_request"),
|
|
_INVALID_ID_TEST_CASES,
|
|
ids=[
|
|
"add_annotation",
|
|
"get_annotation",
|
|
"list_annotations",
|
|
"update_annotation",
|
|
"delete_annotation",
|
|
],
|
|
)
|
|
async def test_aborts_on_invalid_ids(
|
|
self,
|
|
mock_annotations_repo: AsyncMock,
|
|
mock_grpc_context: MagicMock,
|
|
method_name: str,
|
|
proto_request: object,
|
|
) -> None:
|
|
"""Annotation RPCs should abort on invalid IDs."""
|
|
servicer = MockServicerHost(mock_annotations_repo)
|
|
method = getattr(servicer, method_name)
|
|
|
|
with pytest.raises(AssertionError, match="Unreachable"):
|
|
await method(proto_request, mock_grpc_context)
|
|
|
|
mock_grpc_context.abort.assert_called_once()
|
|
|
|
|
|
class TestDatabaseNotSupported:
|
|
"""Tests for when database/annotations are not available."""
|
|
|
|
@pytest.fixture
|
|
def servicer_no_db(self) -> MockServicerHost:
|
|
"""Create servicer with database not supported."""
|
|
repo = AsyncMock()
|
|
servicer = MockServicerHost(repo)
|
|
# Override repository provider to not support annotations
|
|
provider = MockRepositoryProvider(repo)
|
|
provider.supports_annotations = False
|
|
servicer.create_repository_provider = lambda: provider
|
|
return servicer
|
|
|
|
async def test_add_annotation_aborts_without_database(
|
|
self,
|
|
servicer_no_db: MockServicerHost,
|
|
mock_grpc_context: MagicMock,
|
|
) -> None:
|
|
"""AddAnnotation aborts when database not available."""
|
|
request = noteflow_pb2.AddAnnotationRequest(
|
|
meeting_id=str(uuid4()),
|
|
annotation_type=noteflow_pb2.ANNOTATION_TYPE_NOTE,
|
|
text="Test",
|
|
start_time=0.0,
|
|
end_time=5.0,
|
|
)
|
|
|
|
with pytest.raises(AssertionError, match="Unreachable"):
|
|
await servicer_no_db.AddAnnotation(request, mock_grpc_context)
|
|
|
|
mock_grpc_context.abort.assert_called_once()
|
|
|
|
async def test_get_annotation_aborts_without_database(
|
|
self,
|
|
servicer_no_db: MockServicerHost,
|
|
mock_grpc_context: MagicMock,
|
|
) -> None:
|
|
"""GetAnnotation aborts when database not available."""
|
|
request = noteflow_pb2.GetAnnotationRequest(annotation_id=str(uuid4()))
|
|
|
|
with pytest.raises(AssertionError, match="Unreachable"):
|
|
await servicer_no_db.GetAnnotation(request, mock_grpc_context)
|
|
|
|
mock_grpc_context.abort.assert_called_once()
|
|
|
|
async def test_list_annotations_aborts_without_database(
|
|
self,
|
|
servicer_no_db: MockServicerHost,
|
|
mock_grpc_context: MagicMock,
|
|
) -> None:
|
|
"""ListAnnotations aborts when database not available."""
|
|
request = noteflow_pb2.ListAnnotationsRequest(meeting_id=str(uuid4()))
|
|
|
|
with pytest.raises(AssertionError, match="Unreachable"):
|
|
await servicer_no_db.ListAnnotations(request, mock_grpc_context)
|
|
|
|
mock_grpc_context.abort.assert_called_once()
|
|
|
|
async def test_update_annotation_aborts_without_database(
|
|
self,
|
|
servicer_no_db: MockServicerHost,
|
|
mock_grpc_context: MagicMock,
|
|
) -> None:
|
|
"""UpdateAnnotation aborts when database not available."""
|
|
request = noteflow_pb2.UpdateAnnotationRequest(
|
|
annotation_id=str(uuid4()),
|
|
text="Updated",
|
|
)
|
|
|
|
with pytest.raises(AssertionError, match="Unreachable"):
|
|
await servicer_no_db.UpdateAnnotation(request, mock_grpc_context)
|
|
|
|
mock_grpc_context.abort.assert_called_once()
|
|
|
|
async def test_delete_annotation_aborts_without_database(
|
|
self,
|
|
servicer_no_db: MockServicerHost,
|
|
mock_grpc_context: MagicMock,
|
|
) -> None:
|
|
"""DeleteAnnotation aborts when database not available."""
|
|
request = noteflow_pb2.DeleteAnnotationRequest(annotation_id=str(uuid4()))
|
|
|
|
with pytest.raises(AssertionError, match="Unreachable"):
|
|
await servicer_no_db.DeleteAnnotation(request, mock_grpc_context)
|
|
|
|
mock_grpc_context.abort.assert_called_once()
|