1449 lines
56 KiB
Python
1449 lines
56 KiB
Python
"""Integration tests for gRPC servicer with real database.
|
|
|
|
These tests verify the actual database code paths in the gRPC service layer,
|
|
which are completely untested when using the in-memory fallback.
|
|
|
|
Tests cover:
|
|
- Meeting CRUD via gRPC with database persistence
|
|
- Diarization job lifecycle through the servicer
|
|
- Preference loading during server initialization
|
|
- Transaction integrity across gRPC operations
|
|
- Error handling with database failures
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from collections.abc import Callable, Sequence
|
|
from pathlib import Path
|
|
from typing import TYPE_CHECKING, Protocol, cast
|
|
from unittest.mock import MagicMock
|
|
from uuid import UUID, uuid4
|
|
|
|
import grpc
|
|
import pytest
|
|
|
|
from noteflow.domain.entities import Meeting, Segment
|
|
from noteflow.domain.entities.named_entity import EntityCategory, NamedEntity
|
|
from noteflow.domain.value_objects import MeetingId, MeetingState
|
|
from noteflow.grpc.config.config import ServicesConfig
|
|
from noteflow.grpc.proto import noteflow_pb2
|
|
from noteflow.grpc.service import NoteFlowServicer
|
|
from noteflow.infrastructure.persistence.repositories import DiarizationJob
|
|
from noteflow.infrastructure.persistence.repositories.diarization_job._constants import (
|
|
JOB_STATUS_COMPLETED,
|
|
JOB_STATUS_FAILED,
|
|
JOB_STATUS_QUEUED,
|
|
JOB_STATUS_RUNNING,
|
|
)
|
|
from noteflow.infrastructure.persistence.unit_of_work import SqlAlchemyUnitOfWork
|
|
|
|
if TYPE_CHECKING:
|
|
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
|
|
|
|
|
|
class _RefineSpeakerDiarizationRequest(Protocol):
|
|
meeting_id: str
|
|
num_speakers: int
|
|
|
|
|
|
class _RefineSpeakerDiarizationResponse(Protocol):
|
|
job_id: str
|
|
status: int
|
|
error_message: str
|
|
segments_updated: int
|
|
|
|
|
|
class _DiarizationJobStatusRequest(Protocol):
|
|
job_id: str
|
|
|
|
|
|
class _DiarizationJobStatusResponse(Protocol):
|
|
job_id: str
|
|
status: int
|
|
segments_updated: int
|
|
speaker_ids: Sequence[str]
|
|
error_message: str
|
|
progress_percent: float
|
|
|
|
|
|
class _RenameSpeakerRequest(Protocol):
|
|
meeting_id: str
|
|
old_speaker_id: str
|
|
new_speaker_name: str
|
|
|
|
|
|
class _RenameSpeakerResponse(Protocol):
|
|
segments_updated: int
|
|
success: bool
|
|
|
|
|
|
class _RefineSpeakerDiarizationCallable(Protocol):
|
|
async def __call__(
|
|
self,
|
|
request: _RefineSpeakerDiarizationRequest,
|
|
context: MockContext,
|
|
) -> _RefineSpeakerDiarizationResponse: ...
|
|
|
|
|
|
class _GetDiarizationJobStatusCallable(Protocol):
|
|
async def __call__(
|
|
self,
|
|
request: _DiarizationJobStatusRequest,
|
|
context: MockContext,
|
|
) -> _DiarizationJobStatusResponse: ...
|
|
|
|
|
|
class _RenameSpeakerCallable(Protocol):
|
|
async def __call__(
|
|
self,
|
|
request: _RenameSpeakerRequest,
|
|
context: MockContext,
|
|
) -> _RenameSpeakerResponse: ...
|
|
|
|
|
|
# ============================================================================
|
|
# Test Constants
|
|
# ============================================================================
|
|
|
|
# Diarization job counts
|
|
DIARIZATION_SEGMENTS_UPDATED = 25
|
|
|
|
# Meeting and segment counts for list/filter tests
|
|
SEGMENTS_PER_MEETING_COUNT = 3
|
|
MEETINGS_LIST_COUNT = 5
|
|
ACTIVE_MEETINGS_COUNT = 3
|
|
RECORDING_MEETINGS_FILTERED_COUNT = 1
|
|
LIST_MEETINGS_LIMIT = 10
|
|
|
|
# Speaker rename test counts
|
|
SPEAKER_RENAME_SEGMENTS_COUNT = 3
|
|
OTHER_SPEAKER_SEGMENTS_COUNT = 2
|
|
|
|
# Entity test constants
|
|
ENTITY_CONFIDENCE = 0.95
|
|
ENTITY_CONFIDENCE_LOW = 0.85
|
|
ENTITY_CONFIDENCE_MED = 0.9
|
|
|
|
|
|
class MockContext:
|
|
"""Mock gRPC context for testing."""
|
|
|
|
def __init__(self) -> None:
|
|
"""Initialize mock context."""
|
|
self.aborted = False
|
|
self.abort_code: grpc.StatusCode | None = None
|
|
self.abort_details: str | None = None
|
|
|
|
async def abort(self, code: grpc.StatusCode, details: str) -> None:
|
|
"""Record abort and raise to simulate gRPC behavior."""
|
|
self.aborted = True
|
|
self.abort_code = code
|
|
self.abort_details = details
|
|
raise _MockRpcError(details)
|
|
|
|
def set_code(self, code: grpc.StatusCode) -> None:
|
|
"""Record status code set by gRPC handlers."""
|
|
self.abort_code = code
|
|
|
|
def set_details(self, details: str) -> None:
|
|
"""Record details set by gRPC handlers."""
|
|
self.abort_details = details
|
|
|
|
|
|
class _MockRpcError(grpc.RpcError):
|
|
"""RPC error with accessible details for tests."""
|
|
|
|
def __init__(self, details: str) -> None:
|
|
super().__init__()
|
|
self._details = details
|
|
|
|
def details(self) -> str | None:
|
|
"""Return error details string."""
|
|
return self._details
|
|
|
|
def __str__(self) -> str:
|
|
return self._details
|
|
|
|
|
|
async def _call_refine(
|
|
servicer: NoteFlowServicer,
|
|
request: _RefineSpeakerDiarizationRequest,
|
|
context: MockContext,
|
|
) -> _RefineSpeakerDiarizationResponse:
|
|
"""Call RefineSpeakerDiarization with typed response."""
|
|
refine = cast(
|
|
_RefineSpeakerDiarizationCallable,
|
|
servicer.RefineSpeakerDiarization,
|
|
)
|
|
return await refine(request, context)
|
|
|
|
|
|
async def _call_get_status(
|
|
servicer: NoteFlowServicer,
|
|
request: _DiarizationJobStatusRequest,
|
|
context: MockContext,
|
|
) -> _DiarizationJobStatusResponse:
|
|
"""Call GetDiarizationJobStatus with typed response."""
|
|
get_status = cast(_GetDiarizationJobStatusCallable, servicer.GetDiarizationJobStatus)
|
|
return await get_status(request, context)
|
|
|
|
|
|
async def _call_rename(
|
|
servicer: NoteFlowServicer,
|
|
request: _RenameSpeakerRequest,
|
|
context: MockContext,
|
|
) -> _RenameSpeakerResponse:
|
|
"""Call RenameSpeaker with typed response."""
|
|
rename = cast(_RenameSpeakerCallable, servicer.RenameSpeaker)
|
|
return await rename(request, context)
|
|
|
|
|
|
async def _create_meeting_with_segments_for_rename(
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
meetings_dir: Path,
|
|
segment_count: int = 5,
|
|
) -> Meeting:
|
|
"""Create a meeting with segments for rename testing."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create()
|
|
await uow.meetings.create(meeting)
|
|
for i in range(segment_count):
|
|
segment = Segment(
|
|
segment_id=i,
|
|
text=f"Segment {i}",
|
|
start_time=float(i),
|
|
end_time=float(i + 1),
|
|
speaker_id="SPEAKER_00" if i < 3 else "SPEAKER_01",
|
|
)
|
|
await uow.segments.add(meeting.id, segment)
|
|
await uow.commit()
|
|
return meeting
|
|
|
|
|
|
async def _create_meeting_with_segments(
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
meetings_dir: Path,
|
|
title: str,
|
|
segment_count: int = SEGMENTS_PER_MEETING_COUNT,
|
|
) -> Meeting:
|
|
"""Create a meeting with a set of segments."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create(title=title)
|
|
await uow.meetings.create(meeting)
|
|
for i in range(segment_count):
|
|
segment = Segment(
|
|
segment_id=i,
|
|
text=f"Segment {i}",
|
|
start_time=float(i),
|
|
end_time=float(i + 1),
|
|
)
|
|
await uow.segments.add(meeting.id, segment)
|
|
await uow.commit()
|
|
return meeting
|
|
|
|
|
|
async def _create_meetings_with_states(
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
meetings_dir: Path,
|
|
) -> Meeting:
|
|
"""Create meetings in created, recording, and stopped states."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
created = Meeting.create(title="Created")
|
|
await uow.meetings.create(created)
|
|
|
|
recording = Meeting.create(title="Recording")
|
|
recording.start_recording()
|
|
await uow.meetings.create(recording)
|
|
|
|
stopped = Meeting.create(title="Stopped")
|
|
stopped.start_recording()
|
|
stopped.begin_stopping()
|
|
stopped.stop_recording()
|
|
await uow.meetings.create(stopped)
|
|
await uow.commit()
|
|
return recording
|
|
|
|
|
|
async def _create_stopped_meeting(
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
meetings_dir: Path,
|
|
title: str,
|
|
) -> Meeting:
|
|
"""Create a stopped meeting for diarization tests."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create(title=title)
|
|
meeting.start_recording()
|
|
meeting.begin_stopping()
|
|
meeting.stop_recording()
|
|
await uow.meetings.create(meeting)
|
|
await uow.commit()
|
|
return meeting
|
|
|
|
|
|
async def _get_diarization_job(
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
meetings_dir: Path,
|
|
job_id: str,
|
|
) -> DiarizationJob:
|
|
"""Fetch a diarization job from the database."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
job = await uow.diarization_jobs.get(job_id)
|
|
assert job is not None, f"diarization job {job_id} should exist in database"
|
|
return job
|
|
|
|
|
|
def _assert_segment_texts(
|
|
segments: Sequence[noteflow_pb2.FinalSegment],
|
|
expected_texts: Sequence[str],
|
|
) -> None:
|
|
"""Assert segment texts match expectations."""
|
|
assert len(segments) == len(expected_texts), (
|
|
f"expected {len(expected_texts)} segments, got {len(segments)}"
|
|
)
|
|
for index, expected_text in enumerate(expected_texts):
|
|
assert segments[index].text == expected_text, (
|
|
f"expected segment {index} text '{expected_text}', got '{segments[index].text}'"
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
async def meeting_with_segments(
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
meetings_dir: Path,
|
|
) -> Meeting:
|
|
"""Meeting with segments for include_segments tests."""
|
|
return await _create_meeting_with_segments(
|
|
session_factory,
|
|
meetings_dir,
|
|
title="Meeting with Segments",
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
async def recording_meeting_with_states(
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
meetings_dir: Path,
|
|
) -> Meeting:
|
|
"""Recording meeting with other state variants present."""
|
|
return await _create_meetings_with_states(session_factory, meetings_dir)
|
|
|
|
|
|
@pytest.fixture
|
|
async def stopped_meeting_for_diarization(
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
meetings_dir: Path,
|
|
) -> Meeting:
|
|
"""Stopped meeting for diarization job creation."""
|
|
return await _create_stopped_meeting(
|
|
session_factory,
|
|
meetings_dir,
|
|
title="For Diarization",
|
|
)
|
|
|
|
|
|
async def _verify_speaker_rename_results(
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
meetings_dir: Path,
|
|
meeting_id: MeetingId,
|
|
expected_alice_count: int,
|
|
expected_other_count: int,
|
|
) -> None:
|
|
"""Verify speaker rename results in database."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
segments = await uow.segments.get_by_meeting(meeting_id)
|
|
alice_segments = [s for s in segments if s.speaker_id == "Alice"]
|
|
other_segments = [s for s in segments if s.speaker_id == "SPEAKER_01"]
|
|
assert len(alice_segments) == expected_alice_count, (
|
|
f"expected {expected_alice_count} segments with speaker_id 'Alice', got {len(alice_segments)}"
|
|
)
|
|
assert len(other_segments) == expected_other_count, (
|
|
f"expected {expected_other_count} segments with speaker_id 'SPEAKER_01', got {len(other_segments)}"
|
|
)
|
|
|
|
|
|
async def _create_meeting_with_jobs(
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
meetings_dir: Path,
|
|
) -> tuple[Meeting, DiarizationJob, DiarizationJob, DiarizationJob]:
|
|
"""Create a meeting with multiple diarization jobs."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create()
|
|
await uow.meetings.create(meeting)
|
|
|
|
job1 = DiarizationJob(
|
|
job_id=str(uuid4()),
|
|
meeting_id=str(meeting.id),
|
|
status=JOB_STATUS_QUEUED,
|
|
)
|
|
job2 = DiarizationJob(
|
|
job_id=str(uuid4()),
|
|
meeting_id=str(meeting.id),
|
|
status=JOB_STATUS_RUNNING,
|
|
)
|
|
job3 = DiarizationJob(
|
|
job_id=str(uuid4()),
|
|
meeting_id=str(meeting.id),
|
|
status=JOB_STATUS_COMPLETED,
|
|
)
|
|
await uow.diarization_jobs.create(job1)
|
|
await uow.diarization_jobs.create(job2)
|
|
await uow.diarization_jobs.create(job3)
|
|
await uow.commit()
|
|
return meeting, job1, job2, job3
|
|
|
|
|
|
async def _verify_job_statuses_after_shutdown(
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
meetings_dir: Path,
|
|
jobs: list[DiarizationJob],
|
|
expected_statuses: list[int],
|
|
) -> None:
|
|
"""Verify job statuses after shutdown."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
for job, expected_status in zip(jobs, expected_statuses, strict=False):
|
|
retrieved = await uow.diarization_jobs.get(job.job_id)
|
|
assert retrieved is not None, f"job {job.job_id} should exist"
|
|
assert retrieved.status == expected_status, (
|
|
f"job {job.job_id} should have status {expected_status}, got {retrieved.status}"
|
|
)
|
|
|
|
|
|
async def _create_meeting_with_two_entities(
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
meetings_dir: Path,
|
|
) -> tuple[Meeting, UUID, UUID]:
|
|
"""Create a meeting with two entities and return their IDs."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create(title="Multi-Entity Meeting")
|
|
await uow.meetings.create(meeting)
|
|
|
|
entity1 = NamedEntity.create(
|
|
text="Entity One",
|
|
category=EntityCategory.COMPANY,
|
|
segment_ids=[0],
|
|
confidence=ENTITY_CONFIDENCE_MED,
|
|
meeting_id=meeting.id,
|
|
)
|
|
entity2 = NamedEntity.create(
|
|
text="Entity Two",
|
|
category=EntityCategory.PERSON,
|
|
segment_ids=[1],
|
|
confidence=ENTITY_CONFIDENCE_LOW,
|
|
meeting_id=meeting.id,
|
|
)
|
|
saved_entity1 = await uow.entities.save(entity1)
|
|
saved_entity2 = await uow.entities.save(entity2)
|
|
await uow.commit()
|
|
|
|
assert saved_entity1.db_id is not None and saved_entity2.db_id is not None
|
|
return meeting, saved_entity1.db_id, saved_entity2.db_id
|
|
|
|
|
|
async def _verify_entity_deletion_result(
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
meetings_dir: Path,
|
|
deleted_id: UUID,
|
|
kept_id: UUID,
|
|
expected_kept_text: str,
|
|
) -> None:
|
|
"""Verify entity deletion results."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
deleted, kept = await uow.entities.get(deleted_id), await uow.entities.get(kept_id)
|
|
assert deleted is None and kept is not None and kept.text == expected_kept_text, (
|
|
f"only entity1 should be deleted; entity2 should remain with text='{expected_kept_text}', "
|
|
f"got deleted={deleted is None}, kept={kept is not None}, kept_text='{kept.text if kept else 'None'}'"
|
|
)
|
|
|
|
|
|
@pytest.mark.integration
|
|
class TestServicerMeetingOperationsWithDatabase:
|
|
"""Integration tests for meeting operations using real database."""
|
|
|
|
async def test_create_meeting_persists_to_database(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test CreateMeeting persists meeting to database."""
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
|
|
request = noteflow_pb2.CreateMeetingRequest(
|
|
title="Database Test Meeting",
|
|
metadata={"source": "integration_test"},
|
|
)
|
|
result = await servicer.CreateMeeting(request, MockContext())
|
|
|
|
assert result.id, "CreateMeeting response should include a meeting ID"
|
|
assert result.title == "Database Test Meeting", (
|
|
f"expected title 'Database Test Meeting', got '{result.title}'"
|
|
)
|
|
assert result.state == noteflow_pb2.MEETING_STATE_CREATED, (
|
|
f"expected state CREATED, got {result.state}"
|
|
)
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
from noteflow.domain.value_objects import MeetingId
|
|
|
|
meeting = await uow.meetings.get(MeetingId(UUID(result.id)))
|
|
assert meeting is not None, f"meeting with ID {result.id} should exist in database"
|
|
assert meeting.title == "Database Test Meeting", (
|
|
f"expected title 'Database Test Meeting', got '{meeting.title}'"
|
|
)
|
|
assert meeting.metadata["source"] == "integration_test", (
|
|
f"expected metadata source 'integration_test', got '{meeting.metadata.get('source')}'"
|
|
)
|
|
|
|
async def test_get_meeting_retrieves_from_database(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test GetMeeting retrieves meeting from database."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create(title="Persisted Meeting")
|
|
await uow.meetings.create(meeting)
|
|
await uow.commit()
|
|
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
meeting_id_str = str(meeting.id)
|
|
|
|
request = noteflow_pb2.GetMeetingRequest(meeting_id=meeting_id_str)
|
|
result = await servicer.GetMeeting(request, MockContext())
|
|
|
|
assert result.id == meeting_id_str, f"expected meeting ID {meeting.id}, got {result.id}"
|
|
assert result.title == "Persisted Meeting", (
|
|
f"expected title 'Persisted Meeting', got '{result.title}'"
|
|
)
|
|
|
|
async def test_get_meeting_with_segments(
|
|
self,
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
meeting_with_segments: Meeting,
|
|
) -> None:
|
|
"""Test GetMeeting with include_segments loads segments from database."""
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
request = noteflow_pb2.GetMeetingRequest(
|
|
meeting_id=str(meeting_with_segments.id),
|
|
include_segments=True,
|
|
)
|
|
result: noteflow_pb2.Meeting = await servicer.GetMeeting(request, MockContext())
|
|
|
|
# cast required: protobuf RepeatedCompositeFieldContainer is typed in .pyi but pyright doesn't resolve generic
|
|
segments: Sequence[noteflow_pb2.FinalSegment] = cast(
|
|
Sequence[noteflow_pb2.FinalSegment], result.segments
|
|
)
|
|
_assert_segment_texts(segments, ["Segment 0", "Segment 1", "Segment 2"])
|
|
|
|
async def test_get_nonexistent_meeting_returns_not_found(
|
|
self, session_factory: async_sessionmaker[AsyncSession]
|
|
) -> None:
|
|
"""Test GetMeeting returns NOT_FOUND for nonexistent meeting."""
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
context = MockContext()
|
|
|
|
request = noteflow_pb2.GetMeetingRequest(meeting_id=str(uuid4()))
|
|
|
|
with pytest.raises(grpc.RpcError, match=r".*"):
|
|
await servicer.GetMeeting(request, context)
|
|
|
|
assert context.abort_code == grpc.StatusCode.NOT_FOUND, (
|
|
f"expected NOT_FOUND status for nonexistent meeting, got {context.abort_code}"
|
|
)
|
|
|
|
async def test_list_meetings_queries_database(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test ListMeetings queries meetings from database."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
for i in range(MEETINGS_LIST_COUNT):
|
|
meeting = Meeting.create(title=f"Meeting {i}")
|
|
await uow.meetings.create(meeting)
|
|
await uow.commit()
|
|
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
|
|
request = noteflow_pb2.ListMeetingsRequest(limit=LIST_MEETINGS_LIMIT)
|
|
response: noteflow_pb2.ListMeetingsResponse = await servicer.ListMeetings(
|
|
request, MockContext()
|
|
)
|
|
|
|
# cast required: protobuf RepeatedCompositeFieldContainer is typed in .pyi but pyright doesn't resolve generic
|
|
meetings_list: Sequence[noteflow_pb2.Meeting] = cast(
|
|
Sequence[noteflow_pb2.Meeting], response.meetings
|
|
)
|
|
assert response.total_count == MEETINGS_LIST_COUNT, (
|
|
f"expected total_count {MEETINGS_LIST_COUNT}, got {response.total_count}"
|
|
)
|
|
assert len(meetings_list) == MEETINGS_LIST_COUNT, (
|
|
f"expected {MEETINGS_LIST_COUNT} meetings in response, got {len(meetings_list)}"
|
|
)
|
|
|
|
async def test_list_meetings_with_state_filter(
|
|
self,
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
recording_meeting_with_states: Meeting,
|
|
) -> None:
|
|
"""Test ListMeetings filters by state correctly."""
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
request = noteflow_pb2.ListMeetingsRequest(
|
|
states=[noteflow_pb2.MEETING_STATE_RECORDING],
|
|
)
|
|
response: noteflow_pb2.ListMeetingsResponse = await servicer.ListMeetings(
|
|
request, MockContext()
|
|
)
|
|
|
|
# cast required: protobuf RepeatedCompositeFieldContainer is typed in .pyi but pyright doesn't resolve generic
|
|
meetings_list: Sequence[noteflow_pb2.Meeting] = cast(
|
|
Sequence[noteflow_pb2.Meeting], response.meetings
|
|
)
|
|
assert response.total_count == RECORDING_MEETINGS_FILTERED_COUNT, (
|
|
f"expected {RECORDING_MEETINGS_FILTERED_COUNT} meeting with RECORDING state, got {response.total_count}"
|
|
)
|
|
assert meetings_list[0].title == recording_meeting_with_states.title, (
|
|
f"expected meeting title 'Recording', got '{meetings_list[0].title}'"
|
|
)
|
|
|
|
async def test_delete_meeting_removes_from_database(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test DeleteMeeting removes meeting from database."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create(title="To Delete")
|
|
await uow.meetings.create(meeting)
|
|
await uow.commit()
|
|
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
|
|
request = noteflow_pb2.DeleteMeetingRequest(meeting_id=str(meeting.id))
|
|
result = await servicer.DeleteMeeting(request, MockContext())
|
|
|
|
assert result.success is True, "DeleteMeeting should return success=True"
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
deleted = await uow.meetings.get(meeting.id)
|
|
assert deleted is None, f"meeting {meeting.id} should have been deleted from database"
|
|
|
|
async def test_stop_meeting_updates_database_state(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test StopMeeting transitions state in database."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create(title="To Stop")
|
|
meeting.start_recording()
|
|
await uow.meetings.create(meeting)
|
|
await uow.commit()
|
|
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
|
|
request = noteflow_pb2.StopMeetingRequest(meeting_id=str(meeting.id))
|
|
result = await servicer.StopMeeting(request, MockContext())
|
|
|
|
assert result.state == noteflow_pb2.MEETING_STATE_STOPPED, (
|
|
f"expected STOPPED state in response, got {result.state}"
|
|
)
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
stopped = await uow.meetings.get(meeting.id)
|
|
assert stopped is not None, (
|
|
f"meeting {meeting.id} should exist in database after stopping"
|
|
)
|
|
assert stopped.state == MeetingState.STOPPED, (
|
|
f"expected STOPPED state in database, got {stopped.state}"
|
|
)
|
|
|
|
|
|
@pytest.mark.integration
|
|
class TestServicerDiarizationWithDatabase:
|
|
"""Integration tests for diarization operations with database."""
|
|
|
|
async def test_refine_speaker_diarization_creates_job_in_database(
|
|
self,
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
meetings_dir: Path,
|
|
stopped_meeting_for_diarization: Meeting,
|
|
) -> None:
|
|
"""Test RefineSpeakerDiarization creates job record in database."""
|
|
mock_engine = MagicMock()
|
|
servicer = NoteFlowServicer(
|
|
session_factory=session_factory,
|
|
services=ServicesConfig(
|
|
diarization_engine=mock_engine,
|
|
diarization_refinement_enabled=True,
|
|
),
|
|
)
|
|
meeting_id_str = str(stopped_meeting_for_diarization.id)
|
|
|
|
request = noteflow_pb2.RefineSpeakerDiarizationRequest(meeting_id=meeting_id_str)
|
|
result = await _call_refine(servicer, request, MockContext())
|
|
|
|
assert result.job_id, "RefineSpeakerDiarization response should include a job ID"
|
|
assert result.status == JOB_STATUS_QUEUED, f"expected QUEUED status, got {result.status}"
|
|
|
|
job = await _get_diarization_job(session_factory, meetings_dir, result.job_id)
|
|
assert job.meeting_id == meeting_id_str, (
|
|
f"expected job meeting_id {stopped_meeting_for_diarization.id}, got {job.meeting_id}"
|
|
)
|
|
assert job.status == JOB_STATUS_QUEUED, f"expected job status QUEUED, got {job.status}"
|
|
|
|
async def test_get_diarization_job_status_retrieves_from_database(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test GetDiarizationJobStatus retrieves job from database."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create()
|
|
await uow.meetings.create(meeting)
|
|
|
|
job = DiarizationJob(
|
|
job_id=str(uuid4()),
|
|
meeting_id=str(meeting.id),
|
|
status=JOB_STATUS_COMPLETED,
|
|
segments_updated=DIARIZATION_SEGMENTS_UPDATED,
|
|
speaker_ids=["SPEAKER_00", "SPEAKER_01"],
|
|
)
|
|
await uow.diarization_jobs.create(job)
|
|
await uow.commit()
|
|
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
|
|
request = noteflow_pb2.GetDiarizationJobStatusRequest(job_id=job.job_id)
|
|
response = await _call_get_status(servicer, request, MockContext())
|
|
|
|
# cast required: protobuf RepeatedScalarFieldContainer is typed in .pyi but pyright doesn't resolve generic
|
|
speaker_ids_list = response.speaker_ids
|
|
assert response.job_id == job.job_id, f"expected job_id {job.job_id}, got {response.job_id}"
|
|
assert response.status == JOB_STATUS_COMPLETED, (
|
|
f"expected COMPLETED status, got {response.status}"
|
|
)
|
|
assert response.segments_updated == DIARIZATION_SEGMENTS_UPDATED, (
|
|
f"expected {DIARIZATION_SEGMENTS_UPDATED} segments_updated, got {response.segments_updated}"
|
|
)
|
|
assert list(speaker_ids_list) == ["SPEAKER_00", "SPEAKER_01"], (
|
|
f"expected speaker_ids ['SPEAKER_00', 'SPEAKER_01'], got {list(speaker_ids_list)}"
|
|
)
|
|
|
|
async def test_get_nonexistent_job_returns_not_found(
|
|
self, session_factory: async_sessionmaker[AsyncSession]
|
|
) -> None:
|
|
"""Test GetDiarizationJobStatus returns NOT_FOUND for nonexistent job."""
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
context = MockContext()
|
|
|
|
request = noteflow_pb2.GetDiarizationJobStatusRequest(job_id="nonexistent")
|
|
|
|
with pytest.raises(grpc.RpcError, match=r".*"):
|
|
await _call_get_status(servicer, request, context)
|
|
|
|
assert context.abort_code == grpc.StatusCode.NOT_FOUND, (
|
|
f"expected NOT_FOUND status for nonexistent job, got {context.abort_code}"
|
|
)
|
|
|
|
async def test_refine_rejects_recording_meeting(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test RefineSpeakerDiarization rejects meetings still recording."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create(title="Still Recording")
|
|
meeting.start_recording()
|
|
await uow.meetings.create(meeting)
|
|
await uow.commit()
|
|
|
|
mock_engine = MagicMock()
|
|
servicer = NoteFlowServicer(
|
|
session_factory=session_factory,
|
|
services=ServicesConfig(
|
|
diarization_engine=mock_engine,
|
|
diarization_refinement_enabled=True,
|
|
),
|
|
)
|
|
|
|
request = noteflow_pb2.RefineSpeakerDiarizationRequest(
|
|
meeting_id=str(meeting.id),
|
|
)
|
|
result = await _call_refine(servicer, request, MockContext())
|
|
|
|
assert result.status == JOB_STATUS_FAILED, (
|
|
f"expected FAILED status for recording meeting, got {result.status}"
|
|
)
|
|
assert "stopped" in result.error_message.lower(), (
|
|
f"expected 'stopped' in error message, got '{result.error_message}'"
|
|
)
|
|
|
|
|
|
@pytest.mark.integration
|
|
class TestServicerServerInfoWithDatabase:
|
|
"""Integration tests for GetServerInfo with database."""
|
|
|
|
async def test_server_info_counts_active_meetings_from_database(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test GetServerInfo counts active meetings from database."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
for _ in range(2):
|
|
recording = Meeting.create()
|
|
recording.start_recording()
|
|
await uow.meetings.create(recording)
|
|
|
|
stopping = Meeting.create()
|
|
stopping.start_recording()
|
|
stopping.begin_stopping()
|
|
await uow.meetings.create(stopping)
|
|
|
|
created = Meeting.create()
|
|
await uow.meetings.create(created)
|
|
|
|
stopped = Meeting.create()
|
|
stopped.start_recording()
|
|
stopped.begin_stopping()
|
|
stopped.stop_recording()
|
|
await uow.meetings.create(stopped)
|
|
await uow.commit()
|
|
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
|
|
request = noteflow_pb2.ServerInfoRequest()
|
|
result = await servicer.GetServerInfo(request, MockContext())
|
|
|
|
assert result.active_meetings == 3, (
|
|
f"expected 3 active meetings (2 recording + 1 stopping), got {result.active_meetings}"
|
|
)
|
|
|
|
|
|
@pytest.mark.integration
|
|
class TestServicerShutdownWithDatabase:
|
|
"""Integration tests for servicer shutdown with database."""
|
|
|
|
async def test_shutdown_marks_running_jobs_as_failed(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test shutdown marks all running diarization jobs as failed."""
|
|
_, job1, job2, job3 = await _create_meeting_with_jobs(session_factory, meetings_dir)
|
|
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
await servicer.shutdown()
|
|
|
|
await _verify_job_statuses_after_shutdown(
|
|
session_factory,
|
|
meetings_dir,
|
|
[job1, job2, job3],
|
|
[JOB_STATUS_FAILED, JOB_STATUS_FAILED, JOB_STATUS_COMPLETED],
|
|
)
|
|
|
|
|
|
@pytest.mark.integration
|
|
class TestServicerRenameSpeakerWithDatabase:
|
|
"""Integration tests for RenameSpeaker with database."""
|
|
|
|
async def test_rename_speaker_updates_segments_in_database(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test RenameSpeaker updates speaker IDs in database."""
|
|
meeting = await _create_meeting_with_segments_for_rename(session_factory, meetings_dir)
|
|
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
request = noteflow_pb2.RenameSpeakerRequest(
|
|
meeting_id=str(meeting.id),
|
|
old_speaker_id="SPEAKER_00",
|
|
new_speaker_name="Alice",
|
|
)
|
|
result = await _call_rename(servicer, request, MockContext())
|
|
|
|
assert result.segments_updated == 3, (
|
|
f"expected 3 segments updated, got {result.segments_updated}"
|
|
)
|
|
assert result.success is True, "RenameSpeaker should return success=True"
|
|
await _verify_speaker_rename_results(session_factory, meetings_dir, meeting.id, 3, 2)
|
|
|
|
|
|
@pytest.mark.integration
|
|
class TestServicerTransactionIntegrity:
|
|
"""Integration tests for transaction integrity in servicer operations."""
|
|
|
|
async def test_create_meeting_atomic(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test CreateMeeting is atomic - either fully commits or rolls back."""
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
|
|
request = noteflow_pb2.CreateMeetingRequest(title="Atomic Test")
|
|
result = await servicer.CreateMeeting(request, MockContext())
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
from noteflow.domain.value_objects import MeetingId
|
|
|
|
meeting = await uow.meetings.get(MeetingId(UUID(result.id)))
|
|
assert meeting is not None, (
|
|
f"meeting {result.id} should exist in database after atomic create"
|
|
)
|
|
|
|
async def test_stop_meeting_clears_streaming_turns(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test StopMeeting clears streaming diarization turns from database."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create()
|
|
meeting.start_recording()
|
|
await uow.meetings.create(meeting)
|
|
|
|
from noteflow.infrastructure.persistence.repositories import StreamingTurn
|
|
|
|
turns = [
|
|
StreamingTurn(speaker="S1", start_time=0.0, end_time=1.0),
|
|
StreamingTurn(speaker="S2", start_time=1.0, end_time=2.0),
|
|
]
|
|
await uow.diarization_jobs.add_streaming_turns(str(meeting.id), turns)
|
|
await uow.commit()
|
|
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
|
|
request = noteflow_pb2.StopMeetingRequest(meeting_id=str(meeting.id))
|
|
await servicer.StopMeeting(request, MockContext())
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
remaining = await uow.diarization_jobs.get_streaming_turns(str(meeting.id))
|
|
assert remaining == [], (
|
|
f"expected no streaming turns after StopMeeting, got {len(remaining)} turns"
|
|
)
|
|
|
|
|
|
@pytest.mark.integration
|
|
class TestServicerEntityMutationsWithDatabase:
|
|
"""Integration tests for entity update/delete gRPC operations with database."""
|
|
|
|
EntityRequestFactory = Callable[[str], object]
|
|
|
|
@staticmethod
|
|
def _build_update_request(meeting_id: str) -> noteflow_pb2.UpdateEntityRequest:
|
|
return noteflow_pb2.UpdateEntityRequest(
|
|
meeting_id=meeting_id, entity_id=str(uuid4()), text="Won't Matter"
|
|
)
|
|
|
|
@staticmethod
|
|
def _build_delete_request(meeting_id: str) -> noteflow_pb2.DeleteEntityRequest:
|
|
return noteflow_pb2.DeleteEntityRequest(meeting_id=meeting_id, entity_id=str(uuid4()))
|
|
|
|
@staticmethod
|
|
def _build_update_invalid_request(meeting_id: str) -> noteflow_pb2.UpdateEntityRequest:
|
|
return noteflow_pb2.UpdateEntityRequest(
|
|
meeting_id=meeting_id, entity_id="not-a-valid-uuid", text="Won't Matter"
|
|
)
|
|
|
|
@staticmethod
|
|
def _build_delete_invalid_request(meeting_id: str) -> noteflow_pb2.DeleteEntityRequest:
|
|
return noteflow_pb2.DeleteEntityRequest(meeting_id=meeting_id, entity_id="garbage-id")
|
|
|
|
async def _create_meeting_with_entity(
|
|
self,
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
meetings_dir: Path,
|
|
text: str = "Acme Corp",
|
|
category: str = "company",
|
|
) -> tuple[str, str]:
|
|
"""Create a meeting with a named entity for testing.
|
|
|
|
Returns:
|
|
Tuple of (meeting_id, entity_id).
|
|
"""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create(title="Entity Test Meeting")
|
|
await uow.meetings.create(meeting)
|
|
|
|
# Create entity via domain entity and repository
|
|
entity = NamedEntity.create(
|
|
text=text,
|
|
category=EntityCategory.from_string(category),
|
|
segment_ids=[0, 1],
|
|
confidence=ENTITY_CONFIDENCE,
|
|
meeting_id=meeting.id,
|
|
)
|
|
saved_entity = await uow.entities.save(entity)
|
|
await uow.commit()
|
|
|
|
entity_id = saved_entity.db_id
|
|
assert entity_id is not None, "saved entity should have db_id"
|
|
return str(meeting.id), str(entity_id)
|
|
|
|
async def test_update_entity_text_via_grpc(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test UpdateEntity RPC updates entity text in database."""
|
|
from uuid import UUID as PyUUID
|
|
|
|
meeting_id, entity_id = await self._create_meeting_with_entity(
|
|
session_factory, meetings_dir, text="Original Name"
|
|
)
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
request = noteflow_pb2.UpdateEntityRequest(
|
|
meeting_id=meeting_id, entity_id=entity_id, text="Updated Name"
|
|
)
|
|
result = await servicer.UpdateEntity(request, MockContext())
|
|
|
|
assert (result.entity.id, result.entity.text) == (entity_id, "Updated Name"), (
|
|
f"expected entity ({entity_id}, 'Updated Name'), got ({result.entity.id}, '{result.entity.text}')"
|
|
)
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
updated = await uow.entities.get(PyUUID(entity_id))
|
|
assert updated is not None and updated.text == "Updated Name", (
|
|
f"entity text in database should be 'Updated Name', got '{updated.text if updated else 'None'}'"
|
|
)
|
|
|
|
async def test_update_entity_category_via_grpc(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test UpdateEntity RPC updates entity category in database."""
|
|
from uuid import UUID as PyUUID
|
|
|
|
meeting_id, entity_id = await self._create_meeting_with_entity(
|
|
session_factory, meetings_dir, category="person"
|
|
)
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
request = noteflow_pb2.UpdateEntityRequest(
|
|
meeting_id=meeting_id, entity_id=entity_id, category="company"
|
|
)
|
|
result = await servicer.UpdateEntity(request, MockContext())
|
|
|
|
assert result.entity.category == "company", (
|
|
f"expected category 'company' in response, got '{result.entity.category}'"
|
|
)
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
updated = await uow.entities.get(PyUUID(entity_id))
|
|
assert updated is not None and updated.category.value == "company", (
|
|
f"entity category in database should be 'company', got '{updated.category.value if updated else 'None'}'"
|
|
)
|
|
|
|
async def test_update_entity_both_fields_via_grpc(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test UpdateEntity RPC updates both text and category atomically."""
|
|
from uuid import UUID as PyUUID
|
|
|
|
meeting_id, entity_id = await self._create_meeting_with_entity(
|
|
session_factory, meetings_dir, text="John Doe", category="person"
|
|
)
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
request = noteflow_pb2.UpdateEntityRequest(
|
|
meeting_id=meeting_id, entity_id=entity_id, text="Acme Industries", category="company"
|
|
)
|
|
result = await servicer.UpdateEntity(request, MockContext())
|
|
|
|
assert (result.entity.text, result.entity.category) == ("Acme Industries", "company"), (
|
|
f"expected ('Acme Industries', 'company'), got ('{result.entity.text}', '{result.entity.category}')"
|
|
)
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
updated = await uow.entities.get(PyUUID(entity_id))
|
|
assert updated is not None and (updated.text, updated.category.value) == (
|
|
"Acme Industries",
|
|
"company",
|
|
), (
|
|
f"entity in database should have text='Acme Industries' and category='company', got text='{updated.text if updated else 'None'}', category='{updated.category.value if updated else 'None'}'"
|
|
)
|
|
|
|
@pytest.mark.parametrize(
|
|
("method_name", "request_factory", "label"),
|
|
[
|
|
pytest.param("UpdateEntity", _build_update_request, "update", id="update"),
|
|
pytest.param("DeleteEntity", _build_delete_request, "delete", id="delete"),
|
|
],
|
|
)
|
|
async def test_entity_not_found_grpc_not_found(
|
|
self,
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
meetings_dir: Path,
|
|
method_name: str,
|
|
request_factory: EntityRequestFactory,
|
|
label: str,
|
|
) -> None:
|
|
"""Entity operations return NOT_FOUND for nonexistent entities."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create(title="Empty Meeting")
|
|
await uow.meetings.create(meeting)
|
|
await uow.commit()
|
|
meeting_id = str(meeting.id)
|
|
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
context = MockContext()
|
|
request = request_factory(meeting_id)
|
|
|
|
method = getattr(servicer, method_name)
|
|
with pytest.raises(grpc.RpcError, match="not found"):
|
|
await method(request, context)
|
|
|
|
assert context.abort_code == grpc.StatusCode.NOT_FOUND, (
|
|
f"expected NOT_FOUND for {label} nonexistent entity, got {context.abort_code}"
|
|
)
|
|
|
|
@pytest.mark.parametrize(
|
|
("method_name", "request_factory", "label"),
|
|
[
|
|
pytest.param("UpdateEntity", _build_update_invalid_request, "update", id="update"),
|
|
pytest.param("DeleteEntity", _build_delete_invalid_request, "delete", id="delete"),
|
|
],
|
|
)
|
|
async def test_entity_invalid_id_returns_invalid_argument(
|
|
self,
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
meetings_dir: Path,
|
|
method_name: str,
|
|
request_factory: EntityRequestFactory,
|
|
label: str,
|
|
) -> None:
|
|
"""Entity operations return INVALID_ARGUMENT for malformed entity_id."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create(title="Test Meeting")
|
|
await uow.meetings.create(meeting)
|
|
await uow.commit()
|
|
meeting_id = str(meeting.id)
|
|
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
context = MockContext()
|
|
request = request_factory(meeting_id)
|
|
|
|
method = getattr(servicer, method_name)
|
|
with pytest.raises(grpc.RpcError, match="Invalid"):
|
|
await method(request, context)
|
|
|
|
assert context.abort_code == grpc.StatusCode.INVALID_ARGUMENT, (
|
|
f"expected INVALID_ARGUMENT for {label} malformed entity_id, got {context.abort_code}"
|
|
)
|
|
|
|
async def test_delete_entity_grpc_removes_from_db(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test DeleteEntity RPC removes entity from database."""
|
|
from uuid import UUID as PyUUID
|
|
|
|
meeting_id, entity_id = await self._create_meeting_with_entity(
|
|
session_factory, meetings_dir
|
|
)
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
request = noteflow_pb2.DeleteEntityRequest(meeting_id=meeting_id, entity_id=entity_id)
|
|
result = await servicer.DeleteEntity(request, MockContext())
|
|
|
|
assert result.success is True, "DeleteEntity should return success=True"
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
assert await uow.entities.get(PyUUID(entity_id)) is None, (
|
|
f"entity {entity_id} should have been deleted from database"
|
|
)
|
|
|
|
async def test_grpc_delete_preserves_other_entities(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test DeleteEntity only removes the targeted entity."""
|
|
meeting, entity1_id, entity2_id = await _create_meeting_with_two_entities(
|
|
session_factory, meetings_dir
|
|
)
|
|
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
request = noteflow_pb2.DeleteEntityRequest(
|
|
meeting_id=str(meeting.id), entity_id=str(entity1_id)
|
|
)
|
|
await servicer.DeleteEntity(request, MockContext())
|
|
|
|
await _verify_entity_deletion_result(
|
|
session_factory, meetings_dir, entity1_id, entity2_id, "Entity Two"
|
|
)
|
|
|
|
|
|
# ============================================================================
|
|
# Sprint GAP-004: GetActiveDiarizationJobs Tests
|
|
# ============================================================================
|
|
|
|
|
|
class TestGetActiveDiarizationJobs:
|
|
"""Tests for GetActiveDiarizationJobs RPC."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_returns_running_job(
|
|
self,
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
meetings_dir: Path,
|
|
) -> None:
|
|
"""GetActiveDiarizationJobs includes jobs with RUNNING status."""
|
|
job_id = str(uuid4())
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create()
|
|
await uow.meetings.create(meeting)
|
|
job = DiarizationJob(
|
|
job_id=job_id,
|
|
meeting_id=str(meeting.id),
|
|
status=JOB_STATUS_RUNNING,
|
|
)
|
|
await uow.diarization_jobs.create(job)
|
|
await uow.commit()
|
|
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
request = noteflow_pb2.GetActiveDiarizationJobsRequest()
|
|
response = await servicer.GetActiveDiarizationJobs(request, MockContext())
|
|
|
|
assert len(response.jobs) == 1, f"expected 1 active job, got {len(response.jobs)}"
|
|
assert response.jobs[0].job_id == job_id, (
|
|
f"expected job_id={job_id}, got {response.jobs[0].job_id}"
|
|
)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_returns_queued_job(
|
|
self,
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
meetings_dir: Path,
|
|
) -> None:
|
|
"""GetActiveDiarizationJobs includes jobs with QUEUED status."""
|
|
job_id = str(uuid4())
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create()
|
|
await uow.meetings.create(meeting)
|
|
job = DiarizationJob(
|
|
job_id=job_id,
|
|
meeting_id=str(meeting.id),
|
|
status=JOB_STATUS_QUEUED,
|
|
)
|
|
await uow.diarization_jobs.create(job)
|
|
await uow.commit()
|
|
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
request = noteflow_pb2.GetActiveDiarizationJobsRequest()
|
|
response = await servicer.GetActiveDiarizationJobs(request, MockContext())
|
|
|
|
assert len(response.jobs) == 1, f"expected 1 queued job, got {len(response.jobs)}"
|
|
assert response.jobs[0].job_id == job_id, (
|
|
f"expected job_id={job_id}, got {response.jobs[0].job_id}"
|
|
)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_excludes_completed_job(
|
|
self,
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
meetings_dir: Path,
|
|
) -> None:
|
|
"""GetActiveDiarizationJobs excludes jobs with COMPLETED status."""
|
|
job_id = str(uuid4())
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create()
|
|
await uow.meetings.create(meeting)
|
|
job = DiarizationJob(
|
|
job_id=job_id,
|
|
meeting_id=str(meeting.id),
|
|
status=JOB_STATUS_COMPLETED,
|
|
)
|
|
await uow.diarization_jobs.create(job)
|
|
await uow.commit()
|
|
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
request = noteflow_pb2.GetActiveDiarizationJobsRequest()
|
|
response = await servicer.GetActiveDiarizationJobs(request, MockContext())
|
|
|
|
assert len(response.jobs) == 0, (
|
|
f"expected 0 active jobs for COMPLETED job, got {len(response.jobs)}"
|
|
)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_excludes_failed_job(
|
|
self,
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
meetings_dir: Path,
|
|
) -> None:
|
|
"""GetActiveDiarizationJobs excludes jobs with FAILED status."""
|
|
job_id = str(uuid4())
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create()
|
|
await uow.meetings.create(meeting)
|
|
job = DiarizationJob(
|
|
job_id=job_id,
|
|
meeting_id=str(meeting.id),
|
|
status=JOB_STATUS_FAILED,
|
|
)
|
|
await uow.diarization_jobs.create(job)
|
|
await uow.commit()
|
|
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
request = noteflow_pb2.GetActiveDiarizationJobsRequest()
|
|
response = await servicer.GetActiveDiarizationJobs(request, MockContext())
|
|
|
|
assert len(response.jobs) == 0, (
|
|
f"expected 0 active jobs for FAILED job, got {len(response.jobs)}"
|
|
)
|
|
|
|
|
|
# ============================================================================
|
|
# Sprint GAP-004: Server Restart Job Recovery Tests
|
|
# ============================================================================
|
|
|
|
|
|
class TestServerRestartJobRecovery:
|
|
"""Tests for server restart marking orphaned jobs as FAILED."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_shutdown_marks_running_job_failed(
|
|
self,
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
meetings_dir: Path,
|
|
) -> None:
|
|
"""Server shutdown marks RUNNING jobs as FAILED."""
|
|
job_id = str(uuid4())
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create()
|
|
await uow.meetings.create(meeting)
|
|
job = DiarizationJob(
|
|
job_id=job_id,
|
|
meeting_id=str(meeting.id),
|
|
status=JOB_STATUS_RUNNING,
|
|
)
|
|
await uow.diarization_jobs.create(job)
|
|
await uow.commit()
|
|
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
await servicer.shutdown()
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
job_after = await uow.diarization_jobs.get(job_id)
|
|
assert job_after is not None, f"job {job_id} should exist after shutdown"
|
|
assert job_after.status == JOB_STATUS_FAILED, (
|
|
f"expected FAILED status, got {job_after.status}"
|
|
)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_shutdown_marks_queued_job_failed(
|
|
self,
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
meetings_dir: Path,
|
|
) -> None:
|
|
"""Server shutdown marks QUEUED jobs as FAILED."""
|
|
job_id = str(uuid4())
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create()
|
|
await uow.meetings.create(meeting)
|
|
job = DiarizationJob(
|
|
job_id=job_id,
|
|
meeting_id=str(meeting.id),
|
|
status=JOB_STATUS_QUEUED,
|
|
)
|
|
await uow.diarization_jobs.create(job)
|
|
await uow.commit()
|
|
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
await servicer.shutdown()
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
job_after = await uow.diarization_jobs.get(job_id)
|
|
assert job_after is not None, "queued job should exist after shutdown"
|
|
assert job_after.status == JOB_STATUS_FAILED, "queued job should be FAILED"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_shutdown_sets_error_message(
|
|
self,
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
meetings_dir: Path,
|
|
) -> None:
|
|
"""Server shutdown sets ERR_SERVER_RESTART error message."""
|
|
job_id = str(uuid4())
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create()
|
|
await uow.meetings.create(meeting)
|
|
job = DiarizationJob(
|
|
job_id=job_id,
|
|
meeting_id=str(meeting.id),
|
|
status=JOB_STATUS_RUNNING,
|
|
)
|
|
await uow.diarization_jobs.create(job)
|
|
await uow.commit()
|
|
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
await servicer.shutdown()
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
job_after = await uow.diarization_jobs.get(job_id)
|
|
assert job_after is not None, "job should exist after shutdown"
|
|
assert job_after.error_message == "Server restarted", "should have restart error"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_shutdown_preserves_completed_job(
|
|
self,
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
meetings_dir: Path,
|
|
) -> None:
|
|
"""Server shutdown does not modify COMPLETED jobs."""
|
|
job_id = str(uuid4())
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create()
|
|
await uow.meetings.create(meeting)
|
|
job = DiarizationJob(
|
|
job_id=job_id,
|
|
meeting_id=str(meeting.id),
|
|
status=JOB_STATUS_COMPLETED,
|
|
)
|
|
await uow.diarization_jobs.create(job)
|
|
await uow.commit()
|
|
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
await servicer.shutdown()
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
job_after = await uow.diarization_jobs.get(job_id)
|
|
assert job_after is not None, "completed job should exist after shutdown"
|
|
assert job_after.status == JOB_STATUS_COMPLETED, "completed job should remain COMPLETED"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_active_jobs_empty_after_shutdown(
|
|
self,
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
meetings_dir: Path,
|
|
) -> None:
|
|
"""GetActiveDiarizationJobs returns empty after server shutdown."""
|
|
job_id = str(uuid4())
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create()
|
|
await uow.meetings.create(meeting)
|
|
job = DiarizationJob(
|
|
job_id=job_id,
|
|
meeting_id=str(meeting.id),
|
|
status=JOB_STATUS_RUNNING,
|
|
)
|
|
await uow.diarization_jobs.create(job)
|
|
await uow.commit()
|
|
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
await servicer.shutdown()
|
|
|
|
servicer_new = NoteFlowServicer(session_factory=session_factory)
|
|
request = noteflow_pb2.GetActiveDiarizationJobsRequest()
|
|
response = await servicer_new.GetActiveDiarizationJobs(request, MockContext())
|
|
|
|
assert len(response.jobs) == 0, "no active jobs after shutdown"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_client_can_query_failed_job_status(
|
|
self,
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
meetings_dir: Path,
|
|
) -> None:
|
|
"""Client can query job status to see it failed with restart error."""
|
|
job_id = str(uuid4())
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create()
|
|
await uow.meetings.create(meeting)
|
|
job = DiarizationJob(
|
|
job_id=job_id,
|
|
meeting_id=str(meeting.id),
|
|
status=JOB_STATUS_RUNNING,
|
|
)
|
|
await uow.diarization_jobs.create(job)
|
|
await uow.commit()
|
|
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
await servicer.shutdown()
|
|
|
|
servicer_new = NoteFlowServicer(session_factory=session_factory)
|
|
request = noteflow_pb2.GetDiarizationJobStatusRequest(job_id=job_id)
|
|
response = await _call_get_status(servicer_new, request, MockContext())
|
|
|
|
assert response.status == JOB_STATUS_FAILED, "job should be FAILED"
|
|
assert response.error_message == "Server restarted", "should have restart error"
|