677 lines
27 KiB
Python
677 lines
27 KiB
Python
"""Integration tests for error handling and edge cases.
|
|
|
|
Tests comprehensive error handling across the application:
|
|
- Invalid input handling
|
|
- Transaction failures and rollbacks
|
|
- Resource cleanup on errors
|
|
- Boundary conditions
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
from typing import TYPE_CHECKING
|
|
from uuid import uuid4
|
|
|
|
import grpc
|
|
import pytest
|
|
|
|
from noteflow.domain.entities import Meeting, Segment, Summary
|
|
from noteflow.domain.value_objects import MeetingId
|
|
from noteflow.grpc.proto import noteflow_pb2
|
|
from noteflow.grpc.service import NoteFlowServicer
|
|
from noteflow.infrastructure.persistence.repositories import DiarizationJob
|
|
from noteflow.infrastructure.persistence.repositories.diarization_job._constants import (
|
|
JOB_STATUS_COMPLETED,
|
|
JOB_STATUS_QUEUED,
|
|
JOB_STATUS_RUNNING,
|
|
)
|
|
from noteflow.infrastructure.persistence.unit_of_work import SqlAlchemyUnitOfWork
|
|
|
|
if TYPE_CHECKING:
|
|
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
|
|
|
|
|
|
class MockContext:
|
|
"""Mock gRPC context for testing."""
|
|
|
|
def __init__(self) -> None:
|
|
"""Initialize mock context."""
|
|
self.aborted = False
|
|
self.abort_code: grpc.StatusCode | None = None
|
|
self.abort_details: str | None = None
|
|
|
|
async def abort(self, code: grpc.StatusCode, details: str) -> None:
|
|
"""Record abort and raise to simulate gRPC behavior."""
|
|
self.aborted = True
|
|
self.abort_code = code
|
|
self.abort_details = details
|
|
raise grpc.RpcError()
|
|
|
|
|
|
# Test constants
|
|
LARGE_OFFSET = 100
|
|
|
|
|
|
@pytest.mark.integration
|
|
class TestInvalidInputHandling:
|
|
"""Integration tests for invalid input handling."""
|
|
|
|
@pytest.mark.parametrize(
|
|
"meeting_id",
|
|
[
|
|
pytest.param("not-a-uuid", id="invalid_uuid"),
|
|
pytest.param("", id="empty"),
|
|
],
|
|
)
|
|
async def test_invalid_meeting_id_formats(
|
|
self,
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
meeting_id: str,
|
|
) -> None:
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
context = MockContext()
|
|
|
|
request = noteflow_pb2.GetMeetingRequest(meeting_id=meeting_id)
|
|
|
|
with pytest.raises(grpc.RpcError, match=r".*"):
|
|
await servicer.GetMeeting(request, context)
|
|
|
|
assert context.abort_code == grpc.StatusCode.INVALID_ARGUMENT, (
|
|
f"expected INVALID_ARGUMENT for malformed UUID, got {context.abort_code}"
|
|
)
|
|
|
|
@pytest.mark.parametrize(
|
|
("method_name", "proto_request"),
|
|
[
|
|
pytest.param(
|
|
"GetMeeting",
|
|
noteflow_pb2.GetMeetingRequest(meeting_id=str(uuid4())),
|
|
id="get_meeting",
|
|
),
|
|
pytest.param(
|
|
"DeleteMeeting",
|
|
noteflow_pb2.DeleteMeetingRequest(meeting_id=str(uuid4())),
|
|
id="delete_meeting",
|
|
),
|
|
],
|
|
)
|
|
async def test_nonexistent_meeting_returns_not_found(
|
|
self,
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
method_name: str,
|
|
proto_request: object,
|
|
) -> None:
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
context = MockContext()
|
|
|
|
with pytest.raises(grpc.RpcError, match=r".*"):
|
|
method = getattr(servicer, method_name)
|
|
await method(proto_request, context)
|
|
|
|
assert context.abort_code == grpc.StatusCode.NOT_FOUND, (
|
|
f"expected NOT_FOUND for nonexistent meeting, got {context.abort_code}"
|
|
)
|
|
|
|
|
|
@pytest.mark.integration
|
|
class TestSegmentEdgeCases:
|
|
"""Integration tests for segment handling edge cases."""
|
|
|
|
async def test_get_segments_from_nonexistent_meeting(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test getting segments from nonexistent meeting returns empty list."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
segments = await uow.segments.get_by_meeting(MeetingId(uuid4()))
|
|
assert segments == [], (
|
|
f"expected empty list for nonexistent meeting, got {len(segments)} segments"
|
|
)
|
|
|
|
async def test_segment_with_zero_duration(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test segment with zero duration is allowed."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create(title="Zero Duration Segment")
|
|
await uow.meetings.create(meeting)
|
|
|
|
segment = Segment(
|
|
segment_id=0,
|
|
text="Instant",
|
|
start_time=5.0,
|
|
end_time=5.0,
|
|
)
|
|
await uow.segments.add(meeting.id, segment)
|
|
await uow.commit()
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
segments = await uow.segments.get_by_meeting(meeting.id)
|
|
assert len(segments) == 1, (
|
|
f"expected 1 segment for zero-duration test, got {len(segments)}"
|
|
)
|
|
assert segments[0].start_time == segments[0].end_time, (
|
|
f"expected start_time == end_time for zero-duration segment, "
|
|
f"got start={segments[0].start_time}, end={segments[0].end_time}"
|
|
)
|
|
|
|
async def test_segment_with_large_text(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test segment with very large text is stored correctly."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create(title="Large Text Segment")
|
|
await uow.meetings.create(meeting)
|
|
|
|
large_text = "Word " * 10000
|
|
segment = Segment(
|
|
segment_id=0,
|
|
text=large_text,
|
|
start_time=0.0,
|
|
end_time=60.0,
|
|
)
|
|
await uow.segments.add(meeting.id, segment)
|
|
await uow.commit()
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
segments = await uow.segments.get_by_meeting(meeting.id)
|
|
assert len(segments) == 1, (
|
|
f"expected 1 segment for large-text test, got {len(segments)}"
|
|
)
|
|
assert len(segments[0].text) == len(large_text), (
|
|
f"expected text length {len(large_text)}, got {len(segments[0].text)}"
|
|
)
|
|
|
|
async def test_segment_ordering_preserved(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test segments are returned in order by segment_id."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create(title="Ordering Test")
|
|
await uow.meetings.create(meeting)
|
|
|
|
for i in [5, 0, 3, 1, 4, 2]:
|
|
segment = Segment(
|
|
segment_id=i,
|
|
text=f"Segment {i}",
|
|
start_time=float(i),
|
|
end_time=float(i + 1),
|
|
)
|
|
await uow.segments.add(meeting.id, segment)
|
|
await uow.commit()
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
segments = await uow.segments.get_by_meeting(meeting.id)
|
|
segment_ids = [s.segment_id for s in segments]
|
|
assert segment_ids == sorted(segment_ids), (
|
|
f"expected segments ordered by segment_id, got {segment_ids}"
|
|
)
|
|
|
|
|
|
@pytest.mark.integration
|
|
class TestDiarizationJobEdgeCases:
|
|
"""Integration tests for diarization job edge cases."""
|
|
|
|
async def test_duplicate_job_id_rejected(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test creating job with duplicate ID fails."""
|
|
job_id = str(uuid4())
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create(title="Duplicate Job Test")
|
|
await uow.meetings.create(meeting)
|
|
|
|
job1 = DiarizationJob(
|
|
job_id=job_id,
|
|
meeting_id=str(meeting.id),
|
|
status=JOB_STATUS_QUEUED,
|
|
)
|
|
await uow.diarization_jobs.create(job1)
|
|
await uow.commit()
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting2 = Meeting.create(title="Another Meeting")
|
|
await uow.meetings.create(meeting2)
|
|
|
|
job2 = DiarizationJob(
|
|
job_id=job_id,
|
|
meeting_id=str(meeting2.id),
|
|
status=JOB_STATUS_QUEUED,
|
|
)
|
|
with pytest.raises(ValueError, match="already exists") as excinfo:
|
|
await uow.diarization_jobs.create(job2)
|
|
await uow.commit()
|
|
assert isinstance(excinfo.value, ValueError)
|
|
|
|
async def test_update_nonexistent_job(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test updating nonexistent job returns False."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
result = await uow.diarization_jobs.update_status(
|
|
str(uuid4()),
|
|
JOB_STATUS_COMPLETED,
|
|
)
|
|
assert result is False, f"expected False when updating nonexistent job, got {result}"
|
|
|
|
async def test_get_nonexistent_job(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test getting nonexistent job returns None."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
result = await uow.diarization_jobs.get(str(uuid4()))
|
|
assert result is None, f"expected None for nonexistent job, got {result}"
|
|
|
|
async def test_job_meeting_cascade_delete(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test job is deleted when parent meeting is deleted."""
|
|
job_id = str(uuid4())
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create(title="Cascade Delete Test")
|
|
await uow.meetings.create(meeting)
|
|
|
|
job = DiarizationJob(
|
|
job_id=job_id,
|
|
meeting_id=str(meeting.id),
|
|
status=JOB_STATUS_RUNNING,
|
|
)
|
|
await uow.diarization_jobs.create(job)
|
|
await uow.commit()
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
await uow.meetings.delete(meeting.id)
|
|
await uow.commit()
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
job = await uow.diarization_jobs.get(job_id)
|
|
assert job is None, f"expected job to be cascade-deleted with meeting, got {job}"
|
|
|
|
|
|
@pytest.mark.integration
|
|
class TestSummaryEdgeCases:
|
|
"""Integration tests for summary handling edge cases."""
|
|
|
|
async def test_overwrite_existing_summary(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test saving new summary overwrites existing one."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create(title="Summary Overwrite Test")
|
|
await uow.meetings.create(meeting)
|
|
|
|
summary1 = Summary(
|
|
meeting_id=meeting.id,
|
|
executive_summary="First summary",
|
|
)
|
|
await uow.summaries.save(summary1)
|
|
await uow.commit()
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
summary2 = Summary(
|
|
meeting_id=meeting.id,
|
|
executive_summary="Second summary",
|
|
)
|
|
await uow.summaries.save(summary2)
|
|
await uow.commit()
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
saved = await uow.summaries.get_by_meeting(meeting.id)
|
|
assert saved is not None, "expected summary to exist after overwrite"
|
|
assert saved.executive_summary == "Second summary", (
|
|
f"expected executive_summary to be 'Second summary', "
|
|
f"got '{saved.executive_summary}'"
|
|
)
|
|
|
|
async def test_get_summary_for_nonexistent_meeting(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test getting summary for nonexistent meeting returns None."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
result = await uow.summaries.get_by_meeting(MeetingId(uuid4()))
|
|
assert result is None, f"expected None for summary of nonexistent meeting, got {result}"
|
|
|
|
async def test_delete_summary_for_meeting_without_summary(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test deleting summary when none exists returns False."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create(title="No Summary Meeting")
|
|
await uow.meetings.create(meeting)
|
|
await uow.commit()
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
result = await uow.summaries.delete_by_meeting(meeting.id)
|
|
assert result is False, (
|
|
f"expected False when deleting nonexistent summary, got {result}"
|
|
)
|
|
|
|
async def test_summary_deleted_with_meeting(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test summary is deleted when meeting is deleted."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create(title="Summary Cascade Test")
|
|
await uow.meetings.create(meeting)
|
|
|
|
summary = Summary(
|
|
meeting_id=meeting.id,
|
|
executive_summary="Will be deleted",
|
|
)
|
|
await uow.summaries.save(summary)
|
|
await uow.commit()
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
await uow.meetings.delete(meeting.id)
|
|
await uow.commit()
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
result = await uow.summaries.get_by_meeting(meeting.id)
|
|
assert result is None, (
|
|
f"expected summary to be cascade-deleted with meeting, got {result}"
|
|
)
|
|
|
|
|
|
@pytest.mark.integration
|
|
class TestPreferencesEdgeCases:
|
|
"""Integration tests for preferences edge cases."""
|
|
|
|
async def test_preference_overwrite(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test setting preference overwrites existing value."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
await uow.preferences.set("test_key", "value1")
|
|
await uow.commit()
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
await uow.preferences.set("test_key", "value2")
|
|
await uow.commit()
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
value = await uow.preferences.get("test_key")
|
|
assert value == "value2", (
|
|
f"expected preference to be overwritten to 'value2', got '{value}'"
|
|
)
|
|
|
|
async def test_get_nonexistent_preference(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test getting nonexistent preference returns None."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
value = await uow.preferences.get("nonexistent_key")
|
|
assert value is None, f"expected None for nonexistent preference, got {value}"
|
|
|
|
async def test_delete_nonexistent_preference(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test deleting nonexistent preference returns False."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
result = await uow.preferences.delete("nonexistent_key")
|
|
assert result is False, (
|
|
f"expected False when deleting nonexistent preference, got {result}"
|
|
)
|
|
|
|
async def test_preference_with_special_characters_in_key(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test preference with special characters in key."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
key = "test.key_with-special:chars/and\\more"
|
|
await uow.preferences.set(key, "value")
|
|
await uow.commit()
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
value = await uow.preferences.get(key)
|
|
assert value == "value", (
|
|
f"expected preference with special chars in key to store 'value', got '{value}'"
|
|
)
|
|
|
|
|
|
@pytest.mark.integration
|
|
class TestTransactionRollback:
|
|
"""Integration tests for transaction rollback behavior."""
|
|
|
|
async def test_rollback_on_exception(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test changes are rolled back on exception."""
|
|
meeting_id = None
|
|
try:
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create(title="Rollback Test")
|
|
await uow.meetings.create(meeting)
|
|
meeting_id = meeting.id
|
|
raise RuntimeError("Simulated error")
|
|
except RuntimeError:
|
|
pass
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
assert meeting_id is not None, "meeting_id should be set before rollback check"
|
|
result = await uow.meetings.get(meeting_id)
|
|
assert result is None, f"expected meeting to be rolled back on exception, got {result}"
|
|
|
|
async def test_partial_commit_not_allowed(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test partial changes don't persist without commit."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create(title="No Commit Test")
|
|
await uow.meetings.create(meeting)
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
result = await uow.meetings.get(meeting.id)
|
|
assert result is None, f"expected meeting not to persist without commit, got {result}"
|
|
|
|
|
|
@pytest.mark.integration
|
|
class TestListingEdgeCases:
|
|
"""Integration tests for meeting listing edge cases."""
|
|
|
|
async def test_list_with_large_offset(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test listing with offset larger than total count."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
for i in range(3):
|
|
meeting = Meeting.create(title=f"Meeting {i}")
|
|
await uow.meetings.create(meeting)
|
|
await uow.commit()
|
|
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
|
|
request = noteflow_pb2.ListMeetingsRequest(offset=LARGE_OFFSET)
|
|
result = await servicer.ListMeetings(request, MockContext())
|
|
|
|
assert len(result.meetings) == 0, (
|
|
f"expected empty meetings list with large offset, got {len(result.meetings)}"
|
|
)
|
|
|
|
async def test_list_empty_database(
|
|
self, session_factory: async_sessionmaker[AsyncSession]
|
|
) -> None:
|
|
"""Test listing from empty database returns empty list."""
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
|
|
request = noteflow_pb2.ListMeetingsRequest()
|
|
result = await servicer.ListMeetings(request, MockContext())
|
|
|
|
assert result.total_count == 0, (
|
|
f"expected total_count=0 for empty database, got {result.total_count}"
|
|
)
|
|
assert len(result.meetings) == 0, (
|
|
f"expected empty meetings list for empty database, got {len(result.meetings)}"
|
|
)
|
|
|
|
|
|
@pytest.mark.integration
|
|
class TestExportErrorHandling:
|
|
"""Integration tests for export error handling."""
|
|
|
|
async def test_export_nonexistent_meeting(
|
|
self, session_factory: async_sessionmaker[AsyncSession]
|
|
) -> None:
|
|
"""Test exporting nonexistent meeting returns NOT_FOUND."""
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
context = MockContext()
|
|
|
|
request = noteflow_pb2.ExportTranscriptRequest(
|
|
meeting_id=str(uuid4()),
|
|
format=noteflow_pb2.EXPORT_FORMAT_MARKDOWN,
|
|
)
|
|
|
|
with pytest.raises(grpc.RpcError, match=r".*"):
|
|
await servicer.ExportTranscript(request, context)
|
|
|
|
assert context.abort_code == grpc.StatusCode.NOT_FOUND, (
|
|
f"expected NOT_FOUND when exporting nonexistent meeting, got {context.abort_code}"
|
|
)
|
|
|
|
async def test_export_invalid_format(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test exporting with unspecified format uses default."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create(title="Export Format Test")
|
|
await uow.meetings.create(meeting)
|
|
|
|
segment = Segment(
|
|
segment_id=0,
|
|
text="Test content",
|
|
start_time=0.0,
|
|
end_time=1.0,
|
|
)
|
|
await uow.segments.add(meeting.id, segment)
|
|
await uow.commit()
|
|
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
|
|
request = noteflow_pb2.ExportTranscriptRequest(
|
|
meeting_id=str(meeting.id),
|
|
format=noteflow_pb2.EXPORT_FORMAT_UNSPECIFIED,
|
|
)
|
|
result = await servicer.ExportTranscript(request, MockContext())
|
|
|
|
assert result.content, "expected non-empty content for export with unspecified format"
|
|
assert result.file_extension, (
|
|
"expected non-empty file_extension for export with unspecified format"
|
|
)
|
|
|
|
|
|
@pytest.mark.integration
|
|
class TestSummarizationErrorHandling:
|
|
"""Integration tests for summarization error handling."""
|
|
|
|
async def test_summarize_nonexistent_meeting(
|
|
self, session_factory: async_sessionmaker[AsyncSession]
|
|
) -> None:
|
|
"""Test summarizing nonexistent meeting returns NOT_FOUND."""
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
context = MockContext()
|
|
|
|
request = noteflow_pb2.GenerateSummaryRequest(meeting_id=str(uuid4()))
|
|
|
|
with pytest.raises(grpc.RpcError, match=r".*"):
|
|
await servicer.GenerateSummary(request, context)
|
|
|
|
assert context.abort_code == grpc.StatusCode.NOT_FOUND, (
|
|
f"expected NOT_FOUND when summarizing nonexistent meeting, got {context.abort_code}"
|
|
)
|
|
|
|
async def test_summarize_empty_meeting_returns_placeholder(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test summarizing meeting with no segments returns placeholder."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create(title="Empty Meeting")
|
|
await uow.meetings.create(meeting)
|
|
await uow.commit()
|
|
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
|
|
request = noteflow_pb2.GenerateSummaryRequest(meeting_id=str(meeting.id))
|
|
result = await servicer.GenerateSummary(request, MockContext())
|
|
|
|
assert "No transcript" in result.executive_summary, (
|
|
f"expected placeholder summary for empty meeting, got '{result.executive_summary}'"
|
|
)
|
|
|
|
|
|
@pytest.mark.integration
|
|
class TestAnnotationErrorHandling:
|
|
"""Integration tests for annotation error handling."""
|
|
|
|
async def test_get_nonexistent_annotation(
|
|
self, session_factory: async_sessionmaker[AsyncSession]
|
|
) -> None:
|
|
"""Test getting nonexistent annotation returns NOT_FOUND."""
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
context = MockContext()
|
|
|
|
request = noteflow_pb2.GetAnnotationRequest(annotation_id=str(uuid4()))
|
|
|
|
with pytest.raises(grpc.RpcError, match=r".*"):
|
|
await servicer.GetAnnotation(request, context)
|
|
|
|
assert context.abort_code == grpc.StatusCode.NOT_FOUND, (
|
|
f"expected NOT_FOUND when getting nonexistent annotation, got {context.abort_code}"
|
|
)
|
|
|
|
async def test_update_nonexistent_annotation(
|
|
self, session_factory: async_sessionmaker[AsyncSession]
|
|
) -> None:
|
|
"""Test updating nonexistent annotation returns NOT_FOUND."""
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
context = MockContext()
|
|
|
|
request = noteflow_pb2.UpdateAnnotationRequest(
|
|
annotation_id=str(uuid4()),
|
|
text="Updated text",
|
|
)
|
|
|
|
with pytest.raises(grpc.RpcError, match=r".*"):
|
|
await servicer.UpdateAnnotation(request, context)
|
|
|
|
assert context.abort_code == grpc.StatusCode.NOT_FOUND, (
|
|
f"expected NOT_FOUND when updating nonexistent annotation, got {context.abort_code}"
|
|
)
|
|
|
|
async def test_delete_nonexistent_annotation(
|
|
self, session_factory: async_sessionmaker[AsyncSession]
|
|
) -> None:
|
|
"""Test deleting nonexistent annotation returns NOT_FOUND."""
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
context = MockContext()
|
|
|
|
request = noteflow_pb2.DeleteAnnotationRequest(annotation_id=str(uuid4()))
|
|
|
|
with pytest.raises(grpc.RpcError, match=r".*"):
|
|
await servicer.DeleteAnnotation(request, context)
|
|
|
|
assert context.abort_code == grpc.StatusCode.NOT_FOUND, (
|
|
f"expected NOT_FOUND when deleting nonexistent annotation, got {context.abort_code}"
|
|
)
|
|
|
|
|
|
@pytest.mark.integration
|
|
class TestDiarizationJobErrorHandling:
|
|
"""Integration tests for diarization job error handling."""
|
|
|
|
async def test_get_status_nonexistent_job(
|
|
self, session_factory: async_sessionmaker[AsyncSession]
|
|
) -> None:
|
|
"""Test getting status of nonexistent job returns NOT_FOUND."""
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
context = MockContext()
|
|
|
|
request = noteflow_pb2.GetDiarizationJobStatusRequest(job_id=str(uuid4()))
|
|
|
|
with pytest.raises(grpc.RpcError, match=r".*"):
|
|
await servicer.GetDiarizationJobStatus(request, context)
|
|
|
|
assert context.abort_code == grpc.StatusCode.NOT_FOUND, (
|
|
f"expected NOT_FOUND for nonexistent diarization job, got {context.abort_code}"
|
|
)
|