Files
noteflow/tests/integration/test_e2e_annotations.py
Travis Vasceannie e8ea0b24d6
Some checks failed
CI / test-typescript (push) Has been cancelled
CI / test-rust (push) Has been cancelled
CI / test-python (push) Has been cancelled
refactor: rename request parameter to proto_request in gRPC test methods for improved clarity.
2026-01-24 17:41:32 +00:00

532 lines
21 KiB
Python

"""End-to-end integration tests for annotation operations.
Tests the complete annotation CRUD workflow via gRPC with database:
- Adding annotations
- Retrieving annotations
- Listing annotations with filters
- Updating annotations
- Deleting annotations
- Error handling for database-required operations
"""
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING, cast
from uuid import UUID, uuid4
import grpc
import pytest
from noteflow.domain.entities import Meeting
from noteflow.domain.value_objects import MeetingId
from noteflow.grpc.proto import noteflow_pb2
from noteflow.grpc.service import NoteFlowServicer
from noteflow.infrastructure.persistence.unit_of_work import SqlAlchemyUnitOfWork
from tests.conftest import approx_float
if TYPE_CHECKING:
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
# ============================================================================
# Test Constants
# ============================================================================
# Annotation timestamps
ANNOTATION_START_TIME = 10.5
ANNOTATION_END_TIME_SECONDS = 15.0
async def _create_test_meeting(
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
title: str,
) -> Meeting:
"""Create a test meeting in the database."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title=title)
await uow.meetings.create(meeting)
await uow.commit()
return meeting
async def _add_annotation_via_servicer(
servicer: NoteFlowServicer,
meeting_id: MeetingId,
text: str,
start_time: float = ANNOTATION_START_TIME,
end_time: float = ANNOTATION_END_TIME_SECONDS,
annotation_type: noteflow_pb2.AnnotationType = noteflow_pb2.ANNOTATION_TYPE_NOTE,
) -> noteflow_pb2.Annotation:
"""Add an annotation via servicer and return the result."""
request = _create_add_annotation_request(
meeting_id, text, start_time, end_time, annotation_type
)
return await servicer.AddAnnotation(request, MockContext())
async def _list_annotations_for_meeting(
servicer: NoteFlowServicer,
meeting_id: MeetingId,
start_time: float | None = None,
end_time: float | None = None,
) -> list[noteflow_pb2.Annotation]:
"""List annotations for a meeting, optionally filtered by time range."""
list_request = noteflow_pb2.ListAnnotationsRequest(meeting_id=str(meeting_id))
if start_time is not None:
list_request.start_time = start_time
if end_time is not None:
list_request.end_time = end_time
result: noteflow_pb2.ListAnnotationsResponse = await servicer.ListAnnotations(
list_request, MockContext()
)
return cast(list[noteflow_pb2.Annotation], result.annotations)
async def _create_two_meetings(
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
) -> tuple[Meeting, Meeting]:
"""Create two test meetings in the database."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting1 = Meeting.create(title="Meeting 1")
meeting2 = Meeting.create(title="Meeting 2")
await uow.meetings.create(meeting1)
await uow.meetings.create(meeting2)
await uow.commit()
return meeting1, meeting2
def _create_add_annotation_request(
meeting_id: MeetingId,
text: str,
start_time: float = ANNOTATION_START_TIME,
end_time: float = ANNOTATION_END_TIME_SECONDS,
annotation_type: noteflow_pb2.AnnotationType = noteflow_pb2.ANNOTATION_TYPE_NOTE,
segment_ids: list[int] | None = None,
) -> noteflow_pb2.AddAnnotationRequest:
"""Create an AddAnnotationRequest."""
request = noteflow_pb2.AddAnnotationRequest(
meeting_id=str(meeting_id),
annotation_type=annotation_type,
text=text,
start_time=start_time,
end_time=end_time,
)
if segment_ids is not None:
request.segment_ids[:] = segment_ids
return request
@pytest.fixture
def annotation_servicer(
session_factory: async_sessionmaker[AsyncSession],
) -> NoteFlowServicer:
"""Create a NoteFlow servicer for annotation tests."""
return NoteFlowServicer(session_factory=session_factory)
def _assert_annotation_response(
result: noteflow_pb2.Annotation,
text: str,
segment_ids: list[int],
) -> None:
"""Assert that an annotation response matches expected values."""
assert result.id, "annotation ID should be assigned"
assert result.text == text, f"expected annotation text {text!r}, got {result.text!r}"
assert result.annotation_type == noteflow_pb2.ANNOTATION_TYPE_NOTE, (
f"expected annotation type ANNOTATION_TYPE_NOTE, got {result.annotation_type}"
)
assert result.start_time == approx_float(ANNOTATION_START_TIME), (
f"expected start_time {ANNOTATION_START_TIME}, got {result.start_time}"
)
assert result.end_time == approx_float(ANNOTATION_END_TIME_SECONDS), (
f"expected end_time {ANNOTATION_END_TIME_SECONDS}, got {result.end_time}"
)
segment_ids_list: list[int] = list(cast(list[int], result.segment_ids))
assert segment_ids_list == segment_ids, (
f"expected segment_ids {segment_ids}, got {segment_ids_list}"
)
class MockContext:
"""Mock gRPC context for testing."""
def __init__(self) -> None:
"""Initialize mock context."""
self.aborted = False
self.abort_code: grpc.StatusCode | None = None
self.abort_details: str | None = None
async def abort(self, code: grpc.StatusCode, details: str) -> None:
"""Record abort and raise to simulate gRPC behavior."""
self.aborted = True
self.abort_code = code
self.abort_details = details
raise grpc.RpcError()
@pytest.mark.integration
class TestAnnotationCRUD:
"""Integration tests for annotation CRUD operations."""
async def test_add_annotation_persists_to_database(
self,
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
annotation_servicer: NoteFlowServicer,
) -> None:
"""Test adding an annotation persists it to database."""
meeting = await _create_test_meeting(session_factory, meetings_dir, "Annotation Test")
request = _create_add_annotation_request(
meeting.id, "Important point discussed", segment_ids=[0, 1, 2]
)
result: noteflow_pb2.Annotation = await annotation_servicer.AddAnnotation(
request, MockContext()
)
_assert_annotation_response(result, "Important point discussed", [0, 1, 2])
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
from noteflow.domain.value_objects import AnnotationId
saved = await uow.annotations.get(AnnotationId(UUID(result.id)))
assert saved is not None, "annotation should be persisted in database"
assert saved.text == "Important point discussed", (
f"expected persisted text 'Important point discussed', got {saved.text!r}"
)
async def test_get_annotation_retrieves_from_database(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test getting an annotation by ID retrieves from database."""
meeting = await _create_test_meeting(session_factory, meetings_dir, "Get Test")
servicer = NoteFlowServicer(session_factory=session_factory)
added = await _add_annotation_via_servicer(
servicer,
meeting.id,
"Follow up on this",
5.0,
10.0,
noteflow_pb2.ANNOTATION_TYPE_ACTION_ITEM,
)
get_request = noteflow_pb2.GetAnnotationRequest(annotation_id=added.id)
result: noteflow_pb2.Annotation = await servicer.GetAnnotation(get_request, MockContext())
assert result.id == added.id, f"expected annotation ID {added.id}, got {result.id}"
assert result.text == "Follow up on this", (
f"expected text 'Follow up on this', got {result.text!r}"
)
assert result.annotation_type == noteflow_pb2.ANNOTATION_TYPE_ACTION_ITEM, (
f"expected annotation type ANNOTATION_TYPE_ACTION_ITEM, got {result.annotation_type}"
)
async def test_list_annotations_for_meeting(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test listing all annotations for a meeting."""
meeting = await _create_test_meeting(session_factory, meetings_dir, "List Test")
servicer = NoteFlowServicer(session_factory=session_factory)
for i in range(3):
await _add_annotation_via_servicer(
servicer, meeting.id, f"Annotation {i}", float(i * 10), float((i + 1) * 10)
)
list_request = noteflow_pb2.ListAnnotationsRequest(meeting_id=str(meeting.id))
result: noteflow_pb2.ListAnnotationsResponse = await servicer.ListAnnotations(
list_request, MockContext()
)
annotations_count: int = len(cast(list[noteflow_pb2.Annotation], result.annotations))
assert annotations_count == 3, f"expected 3 annotations, got {annotations_count}"
async def test_list_annotations_with_time_range_filter(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test listing annotations filtered by time range.
Time range filter uses overlap logic - annotations are included if
they overlap with the query range in any way.
"""
meeting = await _create_test_meeting(session_factory, meetings_dir, "Time Range Test")
servicer = NoteFlowServicer(session_factory=session_factory)
# Create annotations with clear separation from query boundaries
for start, end in [(0, 8), (15, 25), (30, 40), (55, 60)]:
await _add_annotation_via_servicer(
servicer, meeting.id, f"Annotation at {start}-{end}", float(start), float(end)
)
# Query range 10-50 should include (15-25) and (30-40)
annotations = await _list_annotations_for_meeting(servicer, meeting.id, 10.0, 50.0)
assert len(annotations) == 2, (
f"expected 2 annotations in time range 10-50, got {len(annotations)}"
)
async def test_update_annotation_modifies_database(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test updating an annotation modifies the database record."""
meeting = await _create_test_meeting(session_factory, meetings_dir, "Update Test")
servicer = NoteFlowServicer(session_factory=session_factory)
added = await _add_annotation_via_servicer(servicer, meeting.id, "Original text", 5.0, 10.0)
update_request = noteflow_pb2.UpdateAnnotationRequest(
annotation_id=added.id,
text="Updated text",
annotation_type=noteflow_pb2.ANNOTATION_TYPE_ACTION_ITEM,
)
result: noteflow_pb2.Annotation = await servicer.UpdateAnnotation(
update_request, MockContext()
)
assert result.text == "Updated text", (
f"expected updated text 'Updated text', got {result.text!r}"
)
assert result.annotation_type == noteflow_pb2.ANNOTATION_TYPE_ACTION_ITEM, (
f"expected updated type ANNOTATION_TYPE_ACTION_ITEM, got {result.annotation_type}"
)
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
from noteflow.domain.value_objects import AnnotationId
saved = await uow.annotations.get(AnnotationId(UUID(added.id)))
assert saved is not None, "updated annotation should exist in database"
assert saved.text == "Updated text", (
f"expected database text 'Updated text', got {saved.text!r}"
)
async def test_delete_annotation_removes_from_database(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test deleting an annotation removes it from database."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title="Delete Test")
await uow.meetings.create(meeting)
await uow.commit()
servicer = NoteFlowServicer(session_factory=session_factory)
add_request = noteflow_pb2.AddAnnotationRequest(
meeting_id=str(meeting.id),
annotation_type=noteflow_pb2.ANNOTATION_TYPE_NOTE,
text="To be deleted",
start_time=0.0,
end_time=5.0,
)
added: noteflow_pb2.Annotation = await servicer.AddAnnotation(add_request, MockContext())
delete_request = noteflow_pb2.DeleteAnnotationRequest(annotation_id=added.id)
result: noteflow_pb2.DeleteAnnotationResponse = await servicer.DeleteAnnotation(
delete_request, MockContext()
)
assert result.success is True, "delete operation should return success=True"
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
from noteflow.domain.value_objects import AnnotationId
deleted = await uow.annotations.get(AnnotationId(UUID(added.id)))
assert deleted is None, "annotation should be removed from database after deletion"
@pytest.mark.integration
class TestAnnotationTypes:
"""Integration tests for different annotation types."""
async def test_note_annotation_type(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test creating a NOTE type annotation."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create()
await uow.meetings.create(meeting)
await uow.commit()
servicer = NoteFlowServicer(session_factory=session_factory)
request = noteflow_pb2.AddAnnotationRequest(
meeting_id=str(meeting.id),
annotation_type=noteflow_pb2.ANNOTATION_TYPE_NOTE,
text="A note",
start_time=0.0,
end_time=1.0,
)
result: noteflow_pb2.Annotation = await servicer.AddAnnotation(request, MockContext())
assert result.annotation_type == noteflow_pb2.ANNOTATION_TYPE_NOTE, (
f"expected ANNOTATION_TYPE_NOTE, got {result.annotation_type}"
)
async def test_action_item_annotation_type(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test creating an ACTION_ITEM type annotation."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create()
await uow.meetings.create(meeting)
await uow.commit()
servicer = NoteFlowServicer(session_factory=session_factory)
request = noteflow_pb2.AddAnnotationRequest(
meeting_id=str(meeting.id),
annotation_type=noteflow_pb2.ANNOTATION_TYPE_ACTION_ITEM,
text="An action item",
start_time=0.0,
end_time=1.0,
)
result: noteflow_pb2.Annotation = await servicer.AddAnnotation(request, MockContext())
assert result.annotation_type == noteflow_pb2.ANNOTATION_TYPE_ACTION_ITEM, (
f"expected ANNOTATION_TYPE_ACTION_ITEM, got {result.annotation_type}"
)
async def test_decision_annotation_type(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test creating a DECISION type annotation."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create()
await uow.meetings.create(meeting)
await uow.commit()
servicer = NoteFlowServicer(session_factory=session_factory)
request = noteflow_pb2.AddAnnotationRequest(
meeting_id=str(meeting.id),
annotation_type=noteflow_pb2.ANNOTATION_TYPE_DECISION,
text="A decision",
start_time=0.0,
end_time=1.0,
)
result: noteflow_pb2.Annotation = await servicer.AddAnnotation(request, MockContext())
assert result.annotation_type == noteflow_pb2.ANNOTATION_TYPE_DECISION, (
f"expected ANNOTATION_TYPE_DECISION, got {result.annotation_type}"
)
@pytest.mark.integration
class TestAnnotationErrors:
"""Integration tests for annotation error handling."""
async def test_add_annotation_invalid_meeting_id(
self, session_factory: async_sessionmaker[AsyncSession]
) -> None:
"""Test adding annotation with invalid meeting ID fails."""
servicer = NoteFlowServicer(session_factory=session_factory)
context = MockContext()
request = noteflow_pb2.AddAnnotationRequest(
meeting_id="invalid",
annotation_type=noteflow_pb2.ANNOTATION_TYPE_NOTE,
text="Test",
start_time=0.0,
end_time=1.0,
)
with pytest.raises(grpc.RpcError, match=r".*"):
await servicer.AddAnnotation(request, context)
assert context.abort_code == grpc.StatusCode.INVALID_ARGUMENT, (
f"expected INVALID_ARGUMENT status code, got {context.abort_code}"
)
@pytest.mark.parametrize(
("method_name", "proto_request", "label"),
[
pytest.param(
"GetAnnotation",
noteflow_pb2.GetAnnotationRequest(annotation_id=str(uuid4())),
"get",
id="get",
),
pytest.param(
"UpdateAnnotation",
noteflow_pb2.UpdateAnnotationRequest(
annotation_id=str(uuid4()),
text="Updated",
),
"update",
id="update",
),
pytest.param(
"DeleteAnnotation",
noteflow_pb2.DeleteAnnotationRequest(annotation_id=str(uuid4())),
"delete",
id="delete",
),
],
)
async def test_annotation_not_found(
self,
session_factory: async_sessionmaker[AsyncSession],
method_name: str,
proto_request: object,
label: str,
) -> None:
"""Annotation operations should return NOT_FOUND when missing."""
servicer = NoteFlowServicer(session_factory=session_factory)
context = MockContext()
method = getattr(servicer, method_name)
with pytest.raises(grpc.RpcError, match=r".*"):
await method(proto_request, context)
assert context.abort_code == grpc.StatusCode.NOT_FOUND, (
f"expected NOT_FOUND status code for {label} nonexistent annotation, got {context.abort_code}"
)
@pytest.mark.integration
class TestAnnotationIsolation:
"""Integration tests for annotation meeting isolation."""
async def test_annotations_isolated_between_meetings(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test annotations from one meeting don't appear in another."""
meeting1, meeting2 = await _create_two_meetings(session_factory, meetings_dir)
servicer = NoteFlowServicer(session_factory=session_factory)
await _add_annotation_via_servicer(servicer, meeting1.id, "Meeting 1 annotation", 0.0, 1.0)
await _add_annotation_via_servicer(servicer, meeting2.id, "Meeting 2 annotation", 0.0, 1.0)
annotations = await _list_annotations_for_meeting(servicer, meeting1.id)
assert len(annotations) == 1, f"expected 1 annotation for meeting 1, got {len(annotations)}"
assert annotations[0].text == "Meeting 1 annotation", (
f"expected 'Meeting 1 annotation', got {annotations[0].text!r}"
)
async def test_annotations_deleted_with_meeting(
self,
session_factory: async_sessionmaker[AsyncSession],
persisted_meeting: MeetingId,
) -> None:
"""Test annotations are cascade deleted when meeting is deleted."""
meeting_id = persisted_meeting
servicer = NoteFlowServicer(session_factory=session_factory)
added = await _add_annotation_via_servicer(
servicer, meeting_id, "Will be deleted", 0.0, 1.0
)
annotation_id = added.id
await servicer.DeleteMeeting(
noteflow_pb2.DeleteMeetingRequest(meeting_id=str(meeting_id)), MockContext()
)
context = MockContext()
with pytest.raises(grpc.RpcError, match=r".*"):
await servicer.GetAnnotation(
noteflow_pb2.GetAnnotationRequest(annotation_id=annotation_id), context
)
assert context.abort_code == grpc.StatusCode.NOT_FOUND, (
f"expected NOT_FOUND for annotation after meeting deletion, got {context.abort_code}"
)