Files
noteflow/tests/integration/test_grpc_servicer_database.py
Travis Vasceannie d8090a98e8
Some checks failed
CI / test-typescript (push) Has been cancelled
CI / test-rust (push) Has been cancelled
CI / test-python (push) Has been cancelled
ci/cd fixes
2026-01-26 00:28:15 +00:00

1449 lines
56 KiB
Python

"""Integration tests for gRPC servicer with real database.
These tests verify the actual database code paths in the gRPC service layer,
which are completely untested when using the in-memory fallback.
Tests cover:
- Meeting CRUD via gRPC with database persistence
- Diarization job lifecycle through the servicer
- Preference loading during server initialization
- Transaction integrity across gRPC operations
- Error handling with database failures
"""
from __future__ import annotations
from collections.abc import Callable, Sequence
from pathlib import Path
from typing import TYPE_CHECKING, Protocol, cast
from unittest.mock import MagicMock
from uuid import UUID, uuid4
import grpc
import pytest
from noteflow.domain.entities import Meeting, Segment
from noteflow.domain.entities.named_entity import EntityCategory, NamedEntity
from noteflow.domain.value_objects import MeetingId, MeetingState
from noteflow.grpc.config.config import ServicesConfig
from noteflow.grpc.proto import noteflow_pb2
from noteflow.grpc.service import NoteFlowServicer
from noteflow.infrastructure.persistence.repositories import DiarizationJob
from noteflow.infrastructure.persistence.repositories.diarization_job._constants import (
JOB_STATUS_COMPLETED,
JOB_STATUS_FAILED,
JOB_STATUS_QUEUED,
JOB_STATUS_RUNNING,
)
from noteflow.infrastructure.persistence.unit_of_work import SqlAlchemyUnitOfWork
if TYPE_CHECKING:
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
class _RefineSpeakerDiarizationRequest(Protocol):
meeting_id: str
num_speakers: int
class _RefineSpeakerDiarizationResponse(Protocol):
job_id: str
status: int
error_message: str
segments_updated: int
class _DiarizationJobStatusRequest(Protocol):
job_id: str
class _DiarizationJobStatusResponse(Protocol):
job_id: str
status: int
segments_updated: int
speaker_ids: Sequence[str]
error_message: str
progress_percent: float
class _RenameSpeakerRequest(Protocol):
meeting_id: str
old_speaker_id: str
new_speaker_name: str
class _RenameSpeakerResponse(Protocol):
segments_updated: int
success: bool
class _RefineSpeakerDiarizationCallable(Protocol):
async def __call__(
self,
request: _RefineSpeakerDiarizationRequest,
context: MockContext,
) -> _RefineSpeakerDiarizationResponse: ...
class _GetDiarizationJobStatusCallable(Protocol):
async def __call__(
self,
request: _DiarizationJobStatusRequest,
context: MockContext,
) -> _DiarizationJobStatusResponse: ...
class _RenameSpeakerCallable(Protocol):
async def __call__(
self,
request: _RenameSpeakerRequest,
context: MockContext,
) -> _RenameSpeakerResponse: ...
# ============================================================================
# Test Constants
# ============================================================================
# Diarization job counts
DIARIZATION_SEGMENTS_UPDATED = 25
# Meeting and segment counts for list/filter tests
SEGMENTS_PER_MEETING_COUNT = 3
MEETINGS_LIST_COUNT = 5
ACTIVE_MEETINGS_COUNT = 3
RECORDING_MEETINGS_FILTERED_COUNT = 1
LIST_MEETINGS_LIMIT = 10
# Speaker rename test counts
SPEAKER_RENAME_SEGMENTS_COUNT = 3
OTHER_SPEAKER_SEGMENTS_COUNT = 2
# Entity test constants
ENTITY_CONFIDENCE = 0.95
ENTITY_CONFIDENCE_LOW = 0.85
ENTITY_CONFIDENCE_MED = 0.9
class MockContext:
"""Mock gRPC context for testing."""
def __init__(self) -> None:
"""Initialize mock context."""
self.aborted = False
self.abort_code: grpc.StatusCode | None = None
self.abort_details: str | None = None
async def abort(self, code: grpc.StatusCode, details: str) -> None:
"""Record abort and raise to simulate gRPC behavior."""
self.aborted = True
self.abort_code = code
self.abort_details = details
raise _MockRpcError(details)
def set_code(self, code: grpc.StatusCode) -> None:
"""Record status code set by gRPC handlers."""
self.abort_code = code
def set_details(self, details: str) -> None:
"""Record details set by gRPC handlers."""
self.abort_details = details
class _MockRpcError(grpc.RpcError):
"""RPC error with accessible details for tests."""
def __init__(self, details: str) -> None:
super().__init__()
self._details = details
def details(self) -> str | None:
"""Return error details string."""
return self._details
def __str__(self) -> str:
return self._details
async def _call_refine(
servicer: NoteFlowServicer,
request: _RefineSpeakerDiarizationRequest,
context: MockContext,
) -> _RefineSpeakerDiarizationResponse:
"""Call RefineSpeakerDiarization with typed response."""
refine = cast(
_RefineSpeakerDiarizationCallable,
servicer.RefineSpeakerDiarization,
)
return await refine(request, context)
async def _call_get_status(
servicer: NoteFlowServicer,
request: _DiarizationJobStatusRequest,
context: MockContext,
) -> _DiarizationJobStatusResponse:
"""Call GetDiarizationJobStatus with typed response."""
get_status = cast(_GetDiarizationJobStatusCallable, servicer.GetDiarizationJobStatus)
return await get_status(request, context)
async def _call_rename(
servicer: NoteFlowServicer,
request: _RenameSpeakerRequest,
context: MockContext,
) -> _RenameSpeakerResponse:
"""Call RenameSpeaker with typed response."""
rename = cast(_RenameSpeakerCallable, servicer.RenameSpeaker)
return await rename(request, context)
async def _create_meeting_with_segments_for_rename(
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
segment_count: int = 5,
) -> Meeting:
"""Create a meeting with segments for rename testing."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create()
await uow.meetings.create(meeting)
for i in range(segment_count):
segment = Segment(
segment_id=i,
text=f"Segment {i}",
start_time=float(i),
end_time=float(i + 1),
speaker_id="SPEAKER_00" if i < 3 else "SPEAKER_01",
)
await uow.segments.add(meeting.id, segment)
await uow.commit()
return meeting
async def _create_meeting_with_segments(
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
title: str,
segment_count: int = SEGMENTS_PER_MEETING_COUNT,
) -> Meeting:
"""Create a meeting with a set of segments."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title=title)
await uow.meetings.create(meeting)
for i in range(segment_count):
segment = Segment(
segment_id=i,
text=f"Segment {i}",
start_time=float(i),
end_time=float(i + 1),
)
await uow.segments.add(meeting.id, segment)
await uow.commit()
return meeting
async def _create_meetings_with_states(
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
) -> Meeting:
"""Create meetings in created, recording, and stopped states."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
created = Meeting.create(title="Created")
await uow.meetings.create(created)
recording = Meeting.create(title="Recording")
recording.start_recording()
await uow.meetings.create(recording)
stopped = Meeting.create(title="Stopped")
stopped.start_recording()
stopped.begin_stopping()
stopped.stop_recording()
await uow.meetings.create(stopped)
await uow.commit()
return recording
async def _create_stopped_meeting(
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
title: str,
) -> Meeting:
"""Create a stopped meeting for diarization tests."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title=title)
meeting.start_recording()
meeting.begin_stopping()
meeting.stop_recording()
await uow.meetings.create(meeting)
await uow.commit()
return meeting
async def _get_diarization_job(
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
job_id: str,
) -> DiarizationJob:
"""Fetch a diarization job from the database."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
job = await uow.diarization_jobs.get(job_id)
assert job is not None, f"diarization job {job_id} should exist in database"
return job
def _assert_segment_texts(
segments: Sequence[noteflow_pb2.FinalSegment],
expected_texts: Sequence[str],
) -> None:
"""Assert segment texts match expectations."""
assert len(segments) == len(expected_texts), (
f"expected {len(expected_texts)} segments, got {len(segments)}"
)
for index, expected_text in enumerate(expected_texts):
assert segments[index].text == expected_text, (
f"expected segment {index} text '{expected_text}', got '{segments[index].text}'"
)
@pytest.fixture
async def meeting_with_segments(
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
) -> Meeting:
"""Meeting with segments for include_segments tests."""
return await _create_meeting_with_segments(
session_factory,
meetings_dir,
title="Meeting with Segments",
)
@pytest.fixture
async def recording_meeting_with_states(
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
) -> Meeting:
"""Recording meeting with other state variants present."""
return await _create_meetings_with_states(session_factory, meetings_dir)
@pytest.fixture
async def stopped_meeting_for_diarization(
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
) -> Meeting:
"""Stopped meeting for diarization job creation."""
return await _create_stopped_meeting(
session_factory,
meetings_dir,
title="For Diarization",
)
async def _verify_speaker_rename_results(
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
meeting_id: MeetingId,
expected_alice_count: int,
expected_other_count: int,
) -> None:
"""Verify speaker rename results in database."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
segments = await uow.segments.get_by_meeting(meeting_id)
alice_segments = [s for s in segments if s.speaker_id == "Alice"]
other_segments = [s for s in segments if s.speaker_id == "SPEAKER_01"]
assert len(alice_segments) == expected_alice_count, (
f"expected {expected_alice_count} segments with speaker_id 'Alice', got {len(alice_segments)}"
)
assert len(other_segments) == expected_other_count, (
f"expected {expected_other_count} segments with speaker_id 'SPEAKER_01', got {len(other_segments)}"
)
async def _create_meeting_with_jobs(
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
) -> tuple[Meeting, DiarizationJob, DiarizationJob, DiarizationJob]:
"""Create a meeting with multiple diarization jobs."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create()
await uow.meetings.create(meeting)
job1 = DiarizationJob(
job_id=str(uuid4()),
meeting_id=str(meeting.id),
status=JOB_STATUS_QUEUED,
)
job2 = DiarizationJob(
job_id=str(uuid4()),
meeting_id=str(meeting.id),
status=JOB_STATUS_RUNNING,
)
job3 = DiarizationJob(
job_id=str(uuid4()),
meeting_id=str(meeting.id),
status=JOB_STATUS_COMPLETED,
)
await uow.diarization_jobs.create(job1)
await uow.diarization_jobs.create(job2)
await uow.diarization_jobs.create(job3)
await uow.commit()
return meeting, job1, job2, job3
async def _verify_job_statuses_after_shutdown(
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
jobs: list[DiarizationJob],
expected_statuses: list[int],
) -> None:
"""Verify job statuses after shutdown."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
for job, expected_status in zip(jobs, expected_statuses, strict=False):
retrieved = await uow.diarization_jobs.get(job.job_id)
assert retrieved is not None, f"job {job.job_id} should exist"
assert retrieved.status == expected_status, (
f"job {job.job_id} should have status {expected_status}, got {retrieved.status}"
)
async def _create_meeting_with_two_entities(
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
) -> tuple[Meeting, UUID, UUID]:
"""Create a meeting with two entities and return their IDs."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title="Multi-Entity Meeting")
await uow.meetings.create(meeting)
entity1 = NamedEntity.create(
text="Entity One",
category=EntityCategory.COMPANY,
segment_ids=[0],
confidence=ENTITY_CONFIDENCE_MED,
meeting_id=meeting.id,
)
entity2 = NamedEntity.create(
text="Entity Two",
category=EntityCategory.PERSON,
segment_ids=[1],
confidence=ENTITY_CONFIDENCE_LOW,
meeting_id=meeting.id,
)
saved_entity1 = await uow.entities.save(entity1)
saved_entity2 = await uow.entities.save(entity2)
await uow.commit()
assert saved_entity1.db_id is not None and saved_entity2.db_id is not None
return meeting, saved_entity1.db_id, saved_entity2.db_id
async def _verify_entity_deletion_result(
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
deleted_id: UUID,
kept_id: UUID,
expected_kept_text: str,
) -> None:
"""Verify entity deletion results."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
deleted, kept = await uow.entities.get(deleted_id), await uow.entities.get(kept_id)
assert deleted is None and kept is not None and kept.text == expected_kept_text, (
f"only entity1 should be deleted; entity2 should remain with text='{expected_kept_text}', "
f"got deleted={deleted is None}, kept={kept is not None}, kept_text='{kept.text if kept else 'None'}'"
)
@pytest.mark.integration
class TestServicerMeetingOperationsWithDatabase:
"""Integration tests for meeting operations using real database."""
async def test_create_meeting_persists_to_database(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test CreateMeeting persists meeting to database."""
servicer = NoteFlowServicer(session_factory=session_factory)
request = noteflow_pb2.CreateMeetingRequest(
title="Database Test Meeting",
metadata={"source": "integration_test"},
)
result = await servicer.CreateMeeting(request, MockContext())
assert result.id, "CreateMeeting response should include a meeting ID"
assert result.title == "Database Test Meeting", (
f"expected title 'Database Test Meeting', got '{result.title}'"
)
assert result.state == noteflow_pb2.MEETING_STATE_CREATED, (
f"expected state CREATED, got {result.state}"
)
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
from noteflow.domain.value_objects import MeetingId
meeting = await uow.meetings.get(MeetingId(UUID(result.id)))
assert meeting is not None, f"meeting with ID {result.id} should exist in database"
assert meeting.title == "Database Test Meeting", (
f"expected title 'Database Test Meeting', got '{meeting.title}'"
)
assert meeting.metadata["source"] == "integration_test", (
f"expected metadata source 'integration_test', got '{meeting.metadata.get('source')}'"
)
async def test_get_meeting_retrieves_from_database(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test GetMeeting retrieves meeting from database."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title="Persisted Meeting")
await uow.meetings.create(meeting)
await uow.commit()
servicer = NoteFlowServicer(session_factory=session_factory)
meeting_id_str = str(meeting.id)
request = noteflow_pb2.GetMeetingRequest(meeting_id=meeting_id_str)
result = await servicer.GetMeeting(request, MockContext())
assert result.id == meeting_id_str, f"expected meeting ID {meeting.id}, got {result.id}"
assert result.title == "Persisted Meeting", (
f"expected title 'Persisted Meeting', got '{result.title}'"
)
async def test_get_meeting_with_segments(
self,
session_factory: async_sessionmaker[AsyncSession],
meeting_with_segments: Meeting,
) -> None:
"""Test GetMeeting with include_segments loads segments from database."""
servicer = NoteFlowServicer(session_factory=session_factory)
request = noteflow_pb2.GetMeetingRequest(
meeting_id=str(meeting_with_segments.id),
include_segments=True,
)
result: noteflow_pb2.Meeting = await servicer.GetMeeting(request, MockContext())
# cast required: protobuf RepeatedCompositeFieldContainer is typed in .pyi but pyright doesn't resolve generic
segments: Sequence[noteflow_pb2.FinalSegment] = cast(
Sequence[noteflow_pb2.FinalSegment], result.segments
)
_assert_segment_texts(segments, ["Segment 0", "Segment 1", "Segment 2"])
async def test_get_nonexistent_meeting_returns_not_found(
self, session_factory: async_sessionmaker[AsyncSession]
) -> None:
"""Test GetMeeting returns NOT_FOUND for nonexistent meeting."""
servicer = NoteFlowServicer(session_factory=session_factory)
context = MockContext()
request = noteflow_pb2.GetMeetingRequest(meeting_id=str(uuid4()))
with pytest.raises(grpc.RpcError, match=r".*"):
await servicer.GetMeeting(request, context)
assert context.abort_code == grpc.StatusCode.NOT_FOUND, (
f"expected NOT_FOUND status for nonexistent meeting, got {context.abort_code}"
)
async def test_list_meetings_queries_database(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test ListMeetings queries meetings from database."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
for i in range(MEETINGS_LIST_COUNT):
meeting = Meeting.create(title=f"Meeting {i}")
await uow.meetings.create(meeting)
await uow.commit()
servicer = NoteFlowServicer(session_factory=session_factory)
request = noteflow_pb2.ListMeetingsRequest(limit=LIST_MEETINGS_LIMIT)
response: noteflow_pb2.ListMeetingsResponse = await servicer.ListMeetings(
request, MockContext()
)
# cast required: protobuf RepeatedCompositeFieldContainer is typed in .pyi but pyright doesn't resolve generic
meetings_list: Sequence[noteflow_pb2.Meeting] = cast(
Sequence[noteflow_pb2.Meeting], response.meetings
)
assert response.total_count == MEETINGS_LIST_COUNT, (
f"expected total_count {MEETINGS_LIST_COUNT}, got {response.total_count}"
)
assert len(meetings_list) == MEETINGS_LIST_COUNT, (
f"expected {MEETINGS_LIST_COUNT} meetings in response, got {len(meetings_list)}"
)
async def test_list_meetings_with_state_filter(
self,
session_factory: async_sessionmaker[AsyncSession],
recording_meeting_with_states: Meeting,
) -> None:
"""Test ListMeetings filters by state correctly."""
servicer = NoteFlowServicer(session_factory=session_factory)
request = noteflow_pb2.ListMeetingsRequest(
states=[noteflow_pb2.MEETING_STATE_RECORDING],
)
response: noteflow_pb2.ListMeetingsResponse = await servicer.ListMeetings(
request, MockContext()
)
# cast required: protobuf RepeatedCompositeFieldContainer is typed in .pyi but pyright doesn't resolve generic
meetings_list: Sequence[noteflow_pb2.Meeting] = cast(
Sequence[noteflow_pb2.Meeting], response.meetings
)
assert response.total_count == RECORDING_MEETINGS_FILTERED_COUNT, (
f"expected {RECORDING_MEETINGS_FILTERED_COUNT} meeting with RECORDING state, got {response.total_count}"
)
assert meetings_list[0].title == recording_meeting_with_states.title, (
f"expected meeting title 'Recording', got '{meetings_list[0].title}'"
)
async def test_delete_meeting_removes_from_database(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test DeleteMeeting removes meeting from database."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title="To Delete")
await uow.meetings.create(meeting)
await uow.commit()
servicer = NoteFlowServicer(session_factory=session_factory)
request = noteflow_pb2.DeleteMeetingRequest(meeting_id=str(meeting.id))
result = await servicer.DeleteMeeting(request, MockContext())
assert result.success is True, "DeleteMeeting should return success=True"
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
deleted = await uow.meetings.get(meeting.id)
assert deleted is None, f"meeting {meeting.id} should have been deleted from database"
async def test_stop_meeting_updates_database_state(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test StopMeeting transitions state in database."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title="To Stop")
meeting.start_recording()
await uow.meetings.create(meeting)
await uow.commit()
servicer = NoteFlowServicer(session_factory=session_factory)
request = noteflow_pb2.StopMeetingRequest(meeting_id=str(meeting.id))
result = await servicer.StopMeeting(request, MockContext())
assert result.state == noteflow_pb2.MEETING_STATE_STOPPED, (
f"expected STOPPED state in response, got {result.state}"
)
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
stopped = await uow.meetings.get(meeting.id)
assert stopped is not None, (
f"meeting {meeting.id} should exist in database after stopping"
)
assert stopped.state == MeetingState.STOPPED, (
f"expected STOPPED state in database, got {stopped.state}"
)
@pytest.mark.integration
class TestServicerDiarizationWithDatabase:
"""Integration tests for diarization operations with database."""
async def test_refine_speaker_diarization_creates_job_in_database(
self,
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
stopped_meeting_for_diarization: Meeting,
) -> None:
"""Test RefineSpeakerDiarization creates job record in database."""
mock_engine = MagicMock()
servicer = NoteFlowServicer(
session_factory=session_factory,
services=ServicesConfig(
diarization_engine=mock_engine,
diarization_refinement_enabled=True,
),
)
meeting_id_str = str(stopped_meeting_for_diarization.id)
request = noteflow_pb2.RefineSpeakerDiarizationRequest(meeting_id=meeting_id_str)
result = await _call_refine(servicer, request, MockContext())
assert result.job_id, "RefineSpeakerDiarization response should include a job ID"
assert result.status == JOB_STATUS_QUEUED, f"expected QUEUED status, got {result.status}"
job = await _get_diarization_job(session_factory, meetings_dir, result.job_id)
assert job.meeting_id == meeting_id_str, (
f"expected job meeting_id {stopped_meeting_for_diarization.id}, got {job.meeting_id}"
)
assert job.status == JOB_STATUS_QUEUED, f"expected job status QUEUED, got {job.status}"
async def test_get_diarization_job_status_retrieves_from_database(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test GetDiarizationJobStatus retrieves job from database."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create()
await uow.meetings.create(meeting)
job = DiarizationJob(
job_id=str(uuid4()),
meeting_id=str(meeting.id),
status=JOB_STATUS_COMPLETED,
segments_updated=DIARIZATION_SEGMENTS_UPDATED,
speaker_ids=["SPEAKER_00", "SPEAKER_01"],
)
await uow.diarization_jobs.create(job)
await uow.commit()
servicer = NoteFlowServicer(session_factory=session_factory)
request = noteflow_pb2.GetDiarizationJobStatusRequest(job_id=job.job_id)
response = await _call_get_status(servicer, request, MockContext())
# cast required: protobuf RepeatedScalarFieldContainer is typed in .pyi but pyright doesn't resolve generic
speaker_ids_list = response.speaker_ids
assert response.job_id == job.job_id, f"expected job_id {job.job_id}, got {response.job_id}"
assert response.status == JOB_STATUS_COMPLETED, (
f"expected COMPLETED status, got {response.status}"
)
assert response.segments_updated == DIARIZATION_SEGMENTS_UPDATED, (
f"expected {DIARIZATION_SEGMENTS_UPDATED} segments_updated, got {response.segments_updated}"
)
assert list(speaker_ids_list) == ["SPEAKER_00", "SPEAKER_01"], (
f"expected speaker_ids ['SPEAKER_00', 'SPEAKER_01'], got {list(speaker_ids_list)}"
)
async def test_get_nonexistent_job_returns_not_found(
self, session_factory: async_sessionmaker[AsyncSession]
) -> None:
"""Test GetDiarizationJobStatus returns NOT_FOUND for nonexistent job."""
servicer = NoteFlowServicer(session_factory=session_factory)
context = MockContext()
request = noteflow_pb2.GetDiarizationJobStatusRequest(job_id="nonexistent")
with pytest.raises(grpc.RpcError, match=r".*"):
await _call_get_status(servicer, request, context)
assert context.abort_code == grpc.StatusCode.NOT_FOUND, (
f"expected NOT_FOUND status for nonexistent job, got {context.abort_code}"
)
async def test_refine_rejects_recording_meeting(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test RefineSpeakerDiarization rejects meetings still recording."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title="Still Recording")
meeting.start_recording()
await uow.meetings.create(meeting)
await uow.commit()
mock_engine = MagicMock()
servicer = NoteFlowServicer(
session_factory=session_factory,
services=ServicesConfig(
diarization_engine=mock_engine,
diarization_refinement_enabled=True,
),
)
request = noteflow_pb2.RefineSpeakerDiarizationRequest(
meeting_id=str(meeting.id),
)
result = await _call_refine(servicer, request, MockContext())
assert result.status == JOB_STATUS_FAILED, (
f"expected FAILED status for recording meeting, got {result.status}"
)
assert "stopped" in result.error_message.lower(), (
f"expected 'stopped' in error message, got '{result.error_message}'"
)
@pytest.mark.integration
class TestServicerServerInfoWithDatabase:
"""Integration tests for GetServerInfo with database."""
async def test_server_info_counts_active_meetings_from_database(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test GetServerInfo counts active meetings from database."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
for _ in range(2):
recording = Meeting.create()
recording.start_recording()
await uow.meetings.create(recording)
stopping = Meeting.create()
stopping.start_recording()
stopping.begin_stopping()
await uow.meetings.create(stopping)
created = Meeting.create()
await uow.meetings.create(created)
stopped = Meeting.create()
stopped.start_recording()
stopped.begin_stopping()
stopped.stop_recording()
await uow.meetings.create(stopped)
await uow.commit()
servicer = NoteFlowServicer(session_factory=session_factory)
request = noteflow_pb2.ServerInfoRequest()
result = await servicer.GetServerInfo(request, MockContext())
assert result.active_meetings == 3, (
f"expected 3 active meetings (2 recording + 1 stopping), got {result.active_meetings}"
)
@pytest.mark.integration
class TestServicerShutdownWithDatabase:
"""Integration tests for servicer shutdown with database."""
async def test_shutdown_marks_running_jobs_as_failed(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test shutdown marks all running diarization jobs as failed."""
_, job1, job2, job3 = await _create_meeting_with_jobs(session_factory, meetings_dir)
servicer = NoteFlowServicer(session_factory=session_factory)
await servicer.shutdown()
await _verify_job_statuses_after_shutdown(
session_factory,
meetings_dir,
[job1, job2, job3],
[JOB_STATUS_FAILED, JOB_STATUS_FAILED, JOB_STATUS_COMPLETED],
)
@pytest.mark.integration
class TestServicerRenameSpeakerWithDatabase:
"""Integration tests for RenameSpeaker with database."""
async def test_rename_speaker_updates_segments_in_database(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test RenameSpeaker updates speaker IDs in database."""
meeting = await _create_meeting_with_segments_for_rename(session_factory, meetings_dir)
servicer = NoteFlowServicer(session_factory=session_factory)
request = noteflow_pb2.RenameSpeakerRequest(
meeting_id=str(meeting.id),
old_speaker_id="SPEAKER_00",
new_speaker_name="Alice",
)
result = await _call_rename(servicer, request, MockContext())
assert result.segments_updated == 3, (
f"expected 3 segments updated, got {result.segments_updated}"
)
assert result.success is True, "RenameSpeaker should return success=True"
await _verify_speaker_rename_results(session_factory, meetings_dir, meeting.id, 3, 2)
@pytest.mark.integration
class TestServicerTransactionIntegrity:
"""Integration tests for transaction integrity in servicer operations."""
async def test_create_meeting_atomic(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test CreateMeeting is atomic - either fully commits or rolls back."""
servicer = NoteFlowServicer(session_factory=session_factory)
request = noteflow_pb2.CreateMeetingRequest(title="Atomic Test")
result = await servicer.CreateMeeting(request, MockContext())
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
from noteflow.domain.value_objects import MeetingId
meeting = await uow.meetings.get(MeetingId(UUID(result.id)))
assert meeting is not None, (
f"meeting {result.id} should exist in database after atomic create"
)
async def test_stop_meeting_clears_streaming_turns(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test StopMeeting clears streaming diarization turns from database."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create()
meeting.start_recording()
await uow.meetings.create(meeting)
from noteflow.infrastructure.persistence.repositories import StreamingTurn
turns = [
StreamingTurn(speaker="S1", start_time=0.0, end_time=1.0),
StreamingTurn(speaker="S2", start_time=1.0, end_time=2.0),
]
await uow.diarization_jobs.add_streaming_turns(str(meeting.id), turns)
await uow.commit()
servicer = NoteFlowServicer(session_factory=session_factory)
request = noteflow_pb2.StopMeetingRequest(meeting_id=str(meeting.id))
await servicer.StopMeeting(request, MockContext())
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
remaining = await uow.diarization_jobs.get_streaming_turns(str(meeting.id))
assert remaining == [], (
f"expected no streaming turns after StopMeeting, got {len(remaining)} turns"
)
@pytest.mark.integration
class TestServicerEntityMutationsWithDatabase:
"""Integration tests for entity update/delete gRPC operations with database."""
EntityRequestFactory = Callable[[str], object]
@staticmethod
def _build_update_request(meeting_id: str) -> noteflow_pb2.UpdateEntityRequest:
return noteflow_pb2.UpdateEntityRequest(
meeting_id=meeting_id, entity_id=str(uuid4()), text="Won't Matter"
)
@staticmethod
def _build_delete_request(meeting_id: str) -> noteflow_pb2.DeleteEntityRequest:
return noteflow_pb2.DeleteEntityRequest(meeting_id=meeting_id, entity_id=str(uuid4()))
@staticmethod
def _build_update_invalid_request(meeting_id: str) -> noteflow_pb2.UpdateEntityRequest:
return noteflow_pb2.UpdateEntityRequest(
meeting_id=meeting_id, entity_id="not-a-valid-uuid", text="Won't Matter"
)
@staticmethod
def _build_delete_invalid_request(meeting_id: str) -> noteflow_pb2.DeleteEntityRequest:
return noteflow_pb2.DeleteEntityRequest(meeting_id=meeting_id, entity_id="garbage-id")
async def _create_meeting_with_entity(
self,
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
text: str = "Acme Corp",
category: str = "company",
) -> tuple[str, str]:
"""Create a meeting with a named entity for testing.
Returns:
Tuple of (meeting_id, entity_id).
"""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title="Entity Test Meeting")
await uow.meetings.create(meeting)
# Create entity via domain entity and repository
entity = NamedEntity.create(
text=text,
category=EntityCategory.from_string(category),
segment_ids=[0, 1],
confidence=ENTITY_CONFIDENCE,
meeting_id=meeting.id,
)
saved_entity = await uow.entities.save(entity)
await uow.commit()
entity_id = saved_entity.db_id
assert entity_id is not None, "saved entity should have db_id"
return str(meeting.id), str(entity_id)
async def test_update_entity_text_via_grpc(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test UpdateEntity RPC updates entity text in database."""
from uuid import UUID as PyUUID
meeting_id, entity_id = await self._create_meeting_with_entity(
session_factory, meetings_dir, text="Original Name"
)
servicer = NoteFlowServicer(session_factory=session_factory)
request = noteflow_pb2.UpdateEntityRequest(
meeting_id=meeting_id, entity_id=entity_id, text="Updated Name"
)
result = await servicer.UpdateEntity(request, MockContext())
assert (result.entity.id, result.entity.text) == (entity_id, "Updated Name"), (
f"expected entity ({entity_id}, 'Updated Name'), got ({result.entity.id}, '{result.entity.text}')"
)
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
updated = await uow.entities.get(PyUUID(entity_id))
assert updated is not None and updated.text == "Updated Name", (
f"entity text in database should be 'Updated Name', got '{updated.text if updated else 'None'}'"
)
async def test_update_entity_category_via_grpc(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test UpdateEntity RPC updates entity category in database."""
from uuid import UUID as PyUUID
meeting_id, entity_id = await self._create_meeting_with_entity(
session_factory, meetings_dir, category="person"
)
servicer = NoteFlowServicer(session_factory=session_factory)
request = noteflow_pb2.UpdateEntityRequest(
meeting_id=meeting_id, entity_id=entity_id, category="company"
)
result = await servicer.UpdateEntity(request, MockContext())
assert result.entity.category == "company", (
f"expected category 'company' in response, got '{result.entity.category}'"
)
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
updated = await uow.entities.get(PyUUID(entity_id))
assert updated is not None and updated.category.value == "company", (
f"entity category in database should be 'company', got '{updated.category.value if updated else 'None'}'"
)
async def test_update_entity_both_fields_via_grpc(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test UpdateEntity RPC updates both text and category atomically."""
from uuid import UUID as PyUUID
meeting_id, entity_id = await self._create_meeting_with_entity(
session_factory, meetings_dir, text="John Doe", category="person"
)
servicer = NoteFlowServicer(session_factory=session_factory)
request = noteflow_pb2.UpdateEntityRequest(
meeting_id=meeting_id, entity_id=entity_id, text="Acme Industries", category="company"
)
result = await servicer.UpdateEntity(request, MockContext())
assert (result.entity.text, result.entity.category) == ("Acme Industries", "company"), (
f"expected ('Acme Industries', 'company'), got ('{result.entity.text}', '{result.entity.category}')"
)
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
updated = await uow.entities.get(PyUUID(entity_id))
assert updated is not None and (updated.text, updated.category.value) == (
"Acme Industries",
"company",
), (
f"entity in database should have text='Acme Industries' and category='company', got text='{updated.text if updated else 'None'}', category='{updated.category.value if updated else 'None'}'"
)
@pytest.mark.parametrize(
("method_name", "request_factory", "label"),
[
pytest.param("UpdateEntity", _build_update_request, "update", id="update"),
pytest.param("DeleteEntity", _build_delete_request, "delete", id="delete"),
],
)
async def test_entity_not_found_grpc_not_found(
self,
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
method_name: str,
request_factory: EntityRequestFactory,
label: str,
) -> None:
"""Entity operations return NOT_FOUND for nonexistent entities."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title="Empty Meeting")
await uow.meetings.create(meeting)
await uow.commit()
meeting_id = str(meeting.id)
servicer = NoteFlowServicer(session_factory=session_factory)
context = MockContext()
request = request_factory(meeting_id)
method = getattr(servicer, method_name)
with pytest.raises(grpc.RpcError, match="not found"):
await method(request, context)
assert context.abort_code == grpc.StatusCode.NOT_FOUND, (
f"expected NOT_FOUND for {label} nonexistent entity, got {context.abort_code}"
)
@pytest.mark.parametrize(
("method_name", "request_factory", "label"),
[
pytest.param("UpdateEntity", _build_update_invalid_request, "update", id="update"),
pytest.param("DeleteEntity", _build_delete_invalid_request, "delete", id="delete"),
],
)
async def test_entity_invalid_id_returns_invalid_argument(
self,
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
method_name: str,
request_factory: EntityRequestFactory,
label: str,
) -> None:
"""Entity operations return INVALID_ARGUMENT for malformed entity_id."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title="Test Meeting")
await uow.meetings.create(meeting)
await uow.commit()
meeting_id = str(meeting.id)
servicer = NoteFlowServicer(session_factory=session_factory)
context = MockContext()
request = request_factory(meeting_id)
method = getattr(servicer, method_name)
with pytest.raises(grpc.RpcError, match="Invalid"):
await method(request, context)
assert context.abort_code == grpc.StatusCode.INVALID_ARGUMENT, (
f"expected INVALID_ARGUMENT for {label} malformed entity_id, got {context.abort_code}"
)
async def test_delete_entity_grpc_removes_from_db(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test DeleteEntity RPC removes entity from database."""
from uuid import UUID as PyUUID
meeting_id, entity_id = await self._create_meeting_with_entity(
session_factory, meetings_dir
)
servicer = NoteFlowServicer(session_factory=session_factory)
request = noteflow_pb2.DeleteEntityRequest(meeting_id=meeting_id, entity_id=entity_id)
result = await servicer.DeleteEntity(request, MockContext())
assert result.success is True, "DeleteEntity should return success=True"
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
assert await uow.entities.get(PyUUID(entity_id)) is None, (
f"entity {entity_id} should have been deleted from database"
)
async def test_grpc_delete_preserves_other_entities(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test DeleteEntity only removes the targeted entity."""
meeting, entity1_id, entity2_id = await _create_meeting_with_two_entities(
session_factory, meetings_dir
)
servicer = NoteFlowServicer(session_factory=session_factory)
request = noteflow_pb2.DeleteEntityRequest(
meeting_id=str(meeting.id), entity_id=str(entity1_id)
)
await servicer.DeleteEntity(request, MockContext())
await _verify_entity_deletion_result(
session_factory, meetings_dir, entity1_id, entity2_id, "Entity Two"
)
# ============================================================================
# Sprint GAP-004: GetActiveDiarizationJobs Tests
# ============================================================================
class TestGetActiveDiarizationJobs:
"""Tests for GetActiveDiarizationJobs RPC."""
@pytest.mark.asyncio
async def test_returns_running_job(
self,
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
) -> None:
"""GetActiveDiarizationJobs includes jobs with RUNNING status."""
job_id = str(uuid4())
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create()
await uow.meetings.create(meeting)
job = DiarizationJob(
job_id=job_id,
meeting_id=str(meeting.id),
status=JOB_STATUS_RUNNING,
)
await uow.diarization_jobs.create(job)
await uow.commit()
servicer = NoteFlowServicer(session_factory=session_factory)
request = noteflow_pb2.GetActiveDiarizationJobsRequest()
response = await servicer.GetActiveDiarizationJobs(request, MockContext())
assert len(response.jobs) == 1, f"expected 1 active job, got {len(response.jobs)}"
assert response.jobs[0].job_id == job_id, (
f"expected job_id={job_id}, got {response.jobs[0].job_id}"
)
@pytest.mark.asyncio
async def test_returns_queued_job(
self,
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
) -> None:
"""GetActiveDiarizationJobs includes jobs with QUEUED status."""
job_id = str(uuid4())
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create()
await uow.meetings.create(meeting)
job = DiarizationJob(
job_id=job_id,
meeting_id=str(meeting.id),
status=JOB_STATUS_QUEUED,
)
await uow.diarization_jobs.create(job)
await uow.commit()
servicer = NoteFlowServicer(session_factory=session_factory)
request = noteflow_pb2.GetActiveDiarizationJobsRequest()
response = await servicer.GetActiveDiarizationJobs(request, MockContext())
assert len(response.jobs) == 1, f"expected 1 queued job, got {len(response.jobs)}"
assert response.jobs[0].job_id == job_id, (
f"expected job_id={job_id}, got {response.jobs[0].job_id}"
)
@pytest.mark.asyncio
async def test_excludes_completed_job(
self,
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
) -> None:
"""GetActiveDiarizationJobs excludes jobs with COMPLETED status."""
job_id = str(uuid4())
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create()
await uow.meetings.create(meeting)
job = DiarizationJob(
job_id=job_id,
meeting_id=str(meeting.id),
status=JOB_STATUS_COMPLETED,
)
await uow.diarization_jobs.create(job)
await uow.commit()
servicer = NoteFlowServicer(session_factory=session_factory)
request = noteflow_pb2.GetActiveDiarizationJobsRequest()
response = await servicer.GetActiveDiarizationJobs(request, MockContext())
assert len(response.jobs) == 0, (
f"expected 0 active jobs for COMPLETED job, got {len(response.jobs)}"
)
@pytest.mark.asyncio
async def test_excludes_failed_job(
self,
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
) -> None:
"""GetActiveDiarizationJobs excludes jobs with FAILED status."""
job_id = str(uuid4())
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create()
await uow.meetings.create(meeting)
job = DiarizationJob(
job_id=job_id,
meeting_id=str(meeting.id),
status=JOB_STATUS_FAILED,
)
await uow.diarization_jobs.create(job)
await uow.commit()
servicer = NoteFlowServicer(session_factory=session_factory)
request = noteflow_pb2.GetActiveDiarizationJobsRequest()
response = await servicer.GetActiveDiarizationJobs(request, MockContext())
assert len(response.jobs) == 0, (
f"expected 0 active jobs for FAILED job, got {len(response.jobs)}"
)
# ============================================================================
# Sprint GAP-004: Server Restart Job Recovery Tests
# ============================================================================
class TestServerRestartJobRecovery:
"""Tests for server restart marking orphaned jobs as FAILED."""
@pytest.mark.asyncio
async def test_shutdown_marks_running_job_failed(
self,
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
) -> None:
"""Server shutdown marks RUNNING jobs as FAILED."""
job_id = str(uuid4())
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create()
await uow.meetings.create(meeting)
job = DiarizationJob(
job_id=job_id,
meeting_id=str(meeting.id),
status=JOB_STATUS_RUNNING,
)
await uow.diarization_jobs.create(job)
await uow.commit()
servicer = NoteFlowServicer(session_factory=session_factory)
await servicer.shutdown()
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
job_after = await uow.diarization_jobs.get(job_id)
assert job_after is not None, f"job {job_id} should exist after shutdown"
assert job_after.status == JOB_STATUS_FAILED, (
f"expected FAILED status, got {job_after.status}"
)
@pytest.mark.asyncio
async def test_shutdown_marks_queued_job_failed(
self,
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
) -> None:
"""Server shutdown marks QUEUED jobs as FAILED."""
job_id = str(uuid4())
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create()
await uow.meetings.create(meeting)
job = DiarizationJob(
job_id=job_id,
meeting_id=str(meeting.id),
status=JOB_STATUS_QUEUED,
)
await uow.diarization_jobs.create(job)
await uow.commit()
servicer = NoteFlowServicer(session_factory=session_factory)
await servicer.shutdown()
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
job_after = await uow.diarization_jobs.get(job_id)
assert job_after is not None, "queued job should exist after shutdown"
assert job_after.status == JOB_STATUS_FAILED, "queued job should be FAILED"
@pytest.mark.asyncio
async def test_shutdown_sets_error_message(
self,
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
) -> None:
"""Server shutdown sets ERR_SERVER_RESTART error message."""
job_id = str(uuid4())
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create()
await uow.meetings.create(meeting)
job = DiarizationJob(
job_id=job_id,
meeting_id=str(meeting.id),
status=JOB_STATUS_RUNNING,
)
await uow.diarization_jobs.create(job)
await uow.commit()
servicer = NoteFlowServicer(session_factory=session_factory)
await servicer.shutdown()
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
job_after = await uow.diarization_jobs.get(job_id)
assert job_after is not None, "job should exist after shutdown"
assert job_after.error_message == "Server restarted", "should have restart error"
@pytest.mark.asyncio
async def test_shutdown_preserves_completed_job(
self,
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
) -> None:
"""Server shutdown does not modify COMPLETED jobs."""
job_id = str(uuid4())
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create()
await uow.meetings.create(meeting)
job = DiarizationJob(
job_id=job_id,
meeting_id=str(meeting.id),
status=JOB_STATUS_COMPLETED,
)
await uow.diarization_jobs.create(job)
await uow.commit()
servicer = NoteFlowServicer(session_factory=session_factory)
await servicer.shutdown()
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
job_after = await uow.diarization_jobs.get(job_id)
assert job_after is not None, "completed job should exist after shutdown"
assert job_after.status == JOB_STATUS_COMPLETED, "completed job should remain COMPLETED"
@pytest.mark.asyncio
async def test_get_active_jobs_empty_after_shutdown(
self,
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
) -> None:
"""GetActiveDiarizationJobs returns empty after server shutdown."""
job_id = str(uuid4())
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create()
await uow.meetings.create(meeting)
job = DiarizationJob(
job_id=job_id,
meeting_id=str(meeting.id),
status=JOB_STATUS_RUNNING,
)
await uow.diarization_jobs.create(job)
await uow.commit()
servicer = NoteFlowServicer(session_factory=session_factory)
await servicer.shutdown()
servicer_new = NoteFlowServicer(session_factory=session_factory)
request = noteflow_pb2.GetActiveDiarizationJobsRequest()
response = await servicer_new.GetActiveDiarizationJobs(request, MockContext())
assert len(response.jobs) == 0, "no active jobs after shutdown"
@pytest.mark.asyncio
async def test_client_can_query_failed_job_status(
self,
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
) -> None:
"""Client can query job status to see it failed with restart error."""
job_id = str(uuid4())
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create()
await uow.meetings.create(meeting)
job = DiarizationJob(
job_id=job_id,
meeting_id=str(meeting.id),
status=JOB_STATUS_RUNNING,
)
await uow.diarization_jobs.create(job)
await uow.commit()
servicer = NoteFlowServicer(session_factory=session_factory)
await servicer.shutdown()
servicer_new = NoteFlowServicer(session_factory=session_factory)
request = noteflow_pb2.GetDiarizationJobStatusRequest(job_id=job_id)
response = await _call_get_status(servicer_new, request, MockContext())
assert response.status == JOB_STATUS_FAILED, "job should be FAILED"
assert response.error_message == "Server restarted", "should have restart error"