Files
noteflow/tests/integration/test_error_handling.py
Travis Vasceannie e8ea0b24d6
Some checks failed
CI / test-typescript (push) Has been cancelled
CI / test-rust (push) Has been cancelled
CI / test-python (push) Has been cancelled
refactor: rename request parameter to proto_request in gRPC test methods for improved clarity.
2026-01-24 17:41:32 +00:00

677 lines
27 KiB
Python

"""Integration tests for error handling and edge cases.
Tests comprehensive error handling across the application:
- Invalid input handling
- Transaction failures and rollbacks
- Resource cleanup on errors
- Boundary conditions
"""
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING
from uuid import uuid4
import grpc
import pytest
from noteflow.domain.entities import Meeting, Segment, Summary
from noteflow.domain.value_objects import MeetingId
from noteflow.grpc.proto import noteflow_pb2
from noteflow.grpc.service import NoteFlowServicer
from noteflow.infrastructure.persistence.repositories import DiarizationJob
from noteflow.infrastructure.persistence.repositories.diarization_job._constants import (
JOB_STATUS_COMPLETED,
JOB_STATUS_QUEUED,
JOB_STATUS_RUNNING,
)
from noteflow.infrastructure.persistence.unit_of_work import SqlAlchemyUnitOfWork
if TYPE_CHECKING:
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
class MockContext:
"""Mock gRPC context for testing."""
def __init__(self) -> None:
"""Initialize mock context."""
self.aborted = False
self.abort_code: grpc.StatusCode | None = None
self.abort_details: str | None = None
async def abort(self, code: grpc.StatusCode, details: str) -> None:
"""Record abort and raise to simulate gRPC behavior."""
self.aborted = True
self.abort_code = code
self.abort_details = details
raise grpc.RpcError()
# Test constants
LARGE_OFFSET = 100
@pytest.mark.integration
class TestInvalidInputHandling:
"""Integration tests for invalid input handling."""
@pytest.mark.parametrize(
"meeting_id",
[
pytest.param("not-a-uuid", id="invalid_uuid"),
pytest.param("", id="empty"),
],
)
async def test_invalid_meeting_id_formats(
self,
session_factory: async_sessionmaker[AsyncSession],
meeting_id: str,
) -> None:
servicer = NoteFlowServicer(session_factory=session_factory)
context = MockContext()
request = noteflow_pb2.GetMeetingRequest(meeting_id=meeting_id)
with pytest.raises(grpc.RpcError, match=r".*"):
await servicer.GetMeeting(request, context)
assert context.abort_code == grpc.StatusCode.INVALID_ARGUMENT, (
f"expected INVALID_ARGUMENT for malformed UUID, got {context.abort_code}"
)
@pytest.mark.parametrize(
("method_name", "proto_request"),
[
pytest.param(
"GetMeeting",
noteflow_pb2.GetMeetingRequest(meeting_id=str(uuid4())),
id="get_meeting",
),
pytest.param(
"DeleteMeeting",
noteflow_pb2.DeleteMeetingRequest(meeting_id=str(uuid4())),
id="delete_meeting",
),
],
)
async def test_nonexistent_meeting_returns_not_found(
self,
session_factory: async_sessionmaker[AsyncSession],
method_name: str,
proto_request: object,
) -> None:
servicer = NoteFlowServicer(session_factory=session_factory)
context = MockContext()
with pytest.raises(grpc.RpcError, match=r".*"):
method = getattr(servicer, method_name)
await method(proto_request, context)
assert context.abort_code == grpc.StatusCode.NOT_FOUND, (
f"expected NOT_FOUND for nonexistent meeting, got {context.abort_code}"
)
@pytest.mark.integration
class TestSegmentEdgeCases:
"""Integration tests for segment handling edge cases."""
async def test_get_segments_from_nonexistent_meeting(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test getting segments from nonexistent meeting returns empty list."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
segments = await uow.segments.get_by_meeting(MeetingId(uuid4()))
assert segments == [], (
f"expected empty list for nonexistent meeting, got {len(segments)} segments"
)
async def test_segment_with_zero_duration(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test segment with zero duration is allowed."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title="Zero Duration Segment")
await uow.meetings.create(meeting)
segment = Segment(
segment_id=0,
text="Instant",
start_time=5.0,
end_time=5.0,
)
await uow.segments.add(meeting.id, segment)
await uow.commit()
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
segments = await uow.segments.get_by_meeting(meeting.id)
assert len(segments) == 1, (
f"expected 1 segment for zero-duration test, got {len(segments)}"
)
assert segments[0].start_time == segments[0].end_time, (
f"expected start_time == end_time for zero-duration segment, "
f"got start={segments[0].start_time}, end={segments[0].end_time}"
)
async def test_segment_with_large_text(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test segment with very large text is stored correctly."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title="Large Text Segment")
await uow.meetings.create(meeting)
large_text = "Word " * 10000
segment = Segment(
segment_id=0,
text=large_text,
start_time=0.0,
end_time=60.0,
)
await uow.segments.add(meeting.id, segment)
await uow.commit()
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
segments = await uow.segments.get_by_meeting(meeting.id)
assert len(segments) == 1, (
f"expected 1 segment for large-text test, got {len(segments)}"
)
assert len(segments[0].text) == len(large_text), (
f"expected text length {len(large_text)}, got {len(segments[0].text)}"
)
async def test_segment_ordering_preserved(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test segments are returned in order by segment_id."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title="Ordering Test")
await uow.meetings.create(meeting)
for i in [5, 0, 3, 1, 4, 2]:
segment = Segment(
segment_id=i,
text=f"Segment {i}",
start_time=float(i),
end_time=float(i + 1),
)
await uow.segments.add(meeting.id, segment)
await uow.commit()
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
segments = await uow.segments.get_by_meeting(meeting.id)
segment_ids = [s.segment_id for s in segments]
assert segment_ids == sorted(segment_ids), (
f"expected segments ordered by segment_id, got {segment_ids}"
)
@pytest.mark.integration
class TestDiarizationJobEdgeCases:
"""Integration tests for diarization job edge cases."""
async def test_duplicate_job_id_rejected(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test creating job with duplicate ID fails."""
job_id = str(uuid4())
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title="Duplicate Job Test")
await uow.meetings.create(meeting)
job1 = DiarizationJob(
job_id=job_id,
meeting_id=str(meeting.id),
status=JOB_STATUS_QUEUED,
)
await uow.diarization_jobs.create(job1)
await uow.commit()
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting2 = Meeting.create(title="Another Meeting")
await uow.meetings.create(meeting2)
job2 = DiarizationJob(
job_id=job_id,
meeting_id=str(meeting2.id),
status=JOB_STATUS_QUEUED,
)
with pytest.raises(ValueError, match="already exists") as excinfo:
await uow.diarization_jobs.create(job2)
await uow.commit()
assert isinstance(excinfo.value, ValueError)
async def test_update_nonexistent_job(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test updating nonexistent job returns False."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
result = await uow.diarization_jobs.update_status(
str(uuid4()),
JOB_STATUS_COMPLETED,
)
assert result is False, f"expected False when updating nonexistent job, got {result}"
async def test_get_nonexistent_job(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test getting nonexistent job returns None."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
result = await uow.diarization_jobs.get(str(uuid4()))
assert result is None, f"expected None for nonexistent job, got {result}"
async def test_job_meeting_cascade_delete(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test job is deleted when parent meeting is deleted."""
job_id = str(uuid4())
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title="Cascade Delete Test")
await uow.meetings.create(meeting)
job = DiarizationJob(
job_id=job_id,
meeting_id=str(meeting.id),
status=JOB_STATUS_RUNNING,
)
await uow.diarization_jobs.create(job)
await uow.commit()
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
await uow.meetings.delete(meeting.id)
await uow.commit()
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
job = await uow.diarization_jobs.get(job_id)
assert job is None, f"expected job to be cascade-deleted with meeting, got {job}"
@pytest.mark.integration
class TestSummaryEdgeCases:
"""Integration tests for summary handling edge cases."""
async def test_overwrite_existing_summary(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test saving new summary overwrites existing one."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title="Summary Overwrite Test")
await uow.meetings.create(meeting)
summary1 = Summary(
meeting_id=meeting.id,
executive_summary="First summary",
)
await uow.summaries.save(summary1)
await uow.commit()
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
summary2 = Summary(
meeting_id=meeting.id,
executive_summary="Second summary",
)
await uow.summaries.save(summary2)
await uow.commit()
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
saved = await uow.summaries.get_by_meeting(meeting.id)
assert saved is not None, "expected summary to exist after overwrite"
assert saved.executive_summary == "Second summary", (
f"expected executive_summary to be 'Second summary', "
f"got '{saved.executive_summary}'"
)
async def test_get_summary_for_nonexistent_meeting(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test getting summary for nonexistent meeting returns None."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
result = await uow.summaries.get_by_meeting(MeetingId(uuid4()))
assert result is None, f"expected None for summary of nonexistent meeting, got {result}"
async def test_delete_summary_for_meeting_without_summary(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test deleting summary when none exists returns False."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title="No Summary Meeting")
await uow.meetings.create(meeting)
await uow.commit()
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
result = await uow.summaries.delete_by_meeting(meeting.id)
assert result is False, (
f"expected False when deleting nonexistent summary, got {result}"
)
async def test_summary_deleted_with_meeting(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test summary is deleted when meeting is deleted."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title="Summary Cascade Test")
await uow.meetings.create(meeting)
summary = Summary(
meeting_id=meeting.id,
executive_summary="Will be deleted",
)
await uow.summaries.save(summary)
await uow.commit()
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
await uow.meetings.delete(meeting.id)
await uow.commit()
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
result = await uow.summaries.get_by_meeting(meeting.id)
assert result is None, (
f"expected summary to be cascade-deleted with meeting, got {result}"
)
@pytest.mark.integration
class TestPreferencesEdgeCases:
"""Integration tests for preferences edge cases."""
async def test_preference_overwrite(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test setting preference overwrites existing value."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
await uow.preferences.set("test_key", "value1")
await uow.commit()
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
await uow.preferences.set("test_key", "value2")
await uow.commit()
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
value = await uow.preferences.get("test_key")
assert value == "value2", (
f"expected preference to be overwritten to 'value2', got '{value}'"
)
async def test_get_nonexistent_preference(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test getting nonexistent preference returns None."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
value = await uow.preferences.get("nonexistent_key")
assert value is None, f"expected None for nonexistent preference, got {value}"
async def test_delete_nonexistent_preference(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test deleting nonexistent preference returns False."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
result = await uow.preferences.delete("nonexistent_key")
assert result is False, (
f"expected False when deleting nonexistent preference, got {result}"
)
async def test_preference_with_special_characters_in_key(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test preference with special characters in key."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
key = "test.key_with-special:chars/and\\more"
await uow.preferences.set(key, "value")
await uow.commit()
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
value = await uow.preferences.get(key)
assert value == "value", (
f"expected preference with special chars in key to store 'value', got '{value}'"
)
@pytest.mark.integration
class TestTransactionRollback:
"""Integration tests for transaction rollback behavior."""
async def test_rollback_on_exception(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test changes are rolled back on exception."""
meeting_id = None
try:
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title="Rollback Test")
await uow.meetings.create(meeting)
meeting_id = meeting.id
raise RuntimeError("Simulated error")
except RuntimeError:
pass
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
assert meeting_id is not None, "meeting_id should be set before rollback check"
result = await uow.meetings.get(meeting_id)
assert result is None, f"expected meeting to be rolled back on exception, got {result}"
async def test_partial_commit_not_allowed(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test partial changes don't persist without commit."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title="No Commit Test")
await uow.meetings.create(meeting)
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
result = await uow.meetings.get(meeting.id)
assert result is None, f"expected meeting not to persist without commit, got {result}"
@pytest.mark.integration
class TestListingEdgeCases:
"""Integration tests for meeting listing edge cases."""
async def test_list_with_large_offset(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test listing with offset larger than total count."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
for i in range(3):
meeting = Meeting.create(title=f"Meeting {i}")
await uow.meetings.create(meeting)
await uow.commit()
servicer = NoteFlowServicer(session_factory=session_factory)
request = noteflow_pb2.ListMeetingsRequest(offset=LARGE_OFFSET)
result = await servicer.ListMeetings(request, MockContext())
assert len(result.meetings) == 0, (
f"expected empty meetings list with large offset, got {len(result.meetings)}"
)
async def test_list_empty_database(
self, session_factory: async_sessionmaker[AsyncSession]
) -> None:
"""Test listing from empty database returns empty list."""
servicer = NoteFlowServicer(session_factory=session_factory)
request = noteflow_pb2.ListMeetingsRequest()
result = await servicer.ListMeetings(request, MockContext())
assert result.total_count == 0, (
f"expected total_count=0 for empty database, got {result.total_count}"
)
assert len(result.meetings) == 0, (
f"expected empty meetings list for empty database, got {len(result.meetings)}"
)
@pytest.mark.integration
class TestExportErrorHandling:
"""Integration tests for export error handling."""
async def test_export_nonexistent_meeting(
self, session_factory: async_sessionmaker[AsyncSession]
) -> None:
"""Test exporting nonexistent meeting returns NOT_FOUND."""
servicer = NoteFlowServicer(session_factory=session_factory)
context = MockContext()
request = noteflow_pb2.ExportTranscriptRequest(
meeting_id=str(uuid4()),
format=noteflow_pb2.EXPORT_FORMAT_MARKDOWN,
)
with pytest.raises(grpc.RpcError, match=r".*"):
await servicer.ExportTranscript(request, context)
assert context.abort_code == grpc.StatusCode.NOT_FOUND, (
f"expected NOT_FOUND when exporting nonexistent meeting, got {context.abort_code}"
)
async def test_export_invalid_format(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test exporting with unspecified format uses default."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title="Export Format Test")
await uow.meetings.create(meeting)
segment = Segment(
segment_id=0,
text="Test content",
start_time=0.0,
end_time=1.0,
)
await uow.segments.add(meeting.id, segment)
await uow.commit()
servicer = NoteFlowServicer(session_factory=session_factory)
request = noteflow_pb2.ExportTranscriptRequest(
meeting_id=str(meeting.id),
format=noteflow_pb2.EXPORT_FORMAT_UNSPECIFIED,
)
result = await servicer.ExportTranscript(request, MockContext())
assert result.content, "expected non-empty content for export with unspecified format"
assert result.file_extension, (
"expected non-empty file_extension for export with unspecified format"
)
@pytest.mark.integration
class TestSummarizationErrorHandling:
"""Integration tests for summarization error handling."""
async def test_summarize_nonexistent_meeting(
self, session_factory: async_sessionmaker[AsyncSession]
) -> None:
"""Test summarizing nonexistent meeting returns NOT_FOUND."""
servicer = NoteFlowServicer(session_factory=session_factory)
context = MockContext()
request = noteflow_pb2.GenerateSummaryRequest(meeting_id=str(uuid4()))
with pytest.raises(grpc.RpcError, match=r".*"):
await servicer.GenerateSummary(request, context)
assert context.abort_code == grpc.StatusCode.NOT_FOUND, (
f"expected NOT_FOUND when summarizing nonexistent meeting, got {context.abort_code}"
)
async def test_summarize_empty_meeting_returns_placeholder(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test summarizing meeting with no segments returns placeholder."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title="Empty Meeting")
await uow.meetings.create(meeting)
await uow.commit()
servicer = NoteFlowServicer(session_factory=session_factory)
request = noteflow_pb2.GenerateSummaryRequest(meeting_id=str(meeting.id))
result = await servicer.GenerateSummary(request, MockContext())
assert "No transcript" in result.executive_summary, (
f"expected placeholder summary for empty meeting, got '{result.executive_summary}'"
)
@pytest.mark.integration
class TestAnnotationErrorHandling:
"""Integration tests for annotation error handling."""
async def test_get_nonexistent_annotation(
self, session_factory: async_sessionmaker[AsyncSession]
) -> None:
"""Test getting nonexistent annotation returns NOT_FOUND."""
servicer = NoteFlowServicer(session_factory=session_factory)
context = MockContext()
request = noteflow_pb2.GetAnnotationRequest(annotation_id=str(uuid4()))
with pytest.raises(grpc.RpcError, match=r".*"):
await servicer.GetAnnotation(request, context)
assert context.abort_code == grpc.StatusCode.NOT_FOUND, (
f"expected NOT_FOUND when getting nonexistent annotation, got {context.abort_code}"
)
async def test_update_nonexistent_annotation(
self, session_factory: async_sessionmaker[AsyncSession]
) -> None:
"""Test updating nonexistent annotation returns NOT_FOUND."""
servicer = NoteFlowServicer(session_factory=session_factory)
context = MockContext()
request = noteflow_pb2.UpdateAnnotationRequest(
annotation_id=str(uuid4()),
text="Updated text",
)
with pytest.raises(grpc.RpcError, match=r".*"):
await servicer.UpdateAnnotation(request, context)
assert context.abort_code == grpc.StatusCode.NOT_FOUND, (
f"expected NOT_FOUND when updating nonexistent annotation, got {context.abort_code}"
)
async def test_delete_nonexistent_annotation(
self, session_factory: async_sessionmaker[AsyncSession]
) -> None:
"""Test deleting nonexistent annotation returns NOT_FOUND."""
servicer = NoteFlowServicer(session_factory=session_factory)
context = MockContext()
request = noteflow_pb2.DeleteAnnotationRequest(annotation_id=str(uuid4()))
with pytest.raises(grpc.RpcError, match=r".*"):
await servicer.DeleteAnnotation(request, context)
assert context.abort_code == grpc.StatusCode.NOT_FOUND, (
f"expected NOT_FOUND when deleting nonexistent annotation, got {context.abort_code}"
)
@pytest.mark.integration
class TestDiarizationJobErrorHandling:
"""Integration tests for diarization job error handling."""
async def test_get_status_nonexistent_job(
self, session_factory: async_sessionmaker[AsyncSession]
) -> None:
"""Test getting status of nonexistent job returns NOT_FOUND."""
servicer = NoteFlowServicer(session_factory=session_factory)
context = MockContext()
request = noteflow_pb2.GetDiarizationJobStatusRequest(job_id=str(uuid4()))
with pytest.raises(grpc.RpcError, match=r".*"):
await servicer.GetDiarizationJobStatus(request, context)
assert context.abort_code == grpc.StatusCode.NOT_FOUND, (
f"expected NOT_FOUND for nonexistent diarization job, got {context.abort_code}"
)