Files
noteflow/tests/integration/test_server_initialization.py
2026-01-09 10:13:50 +00:00

338 lines
14 KiB
Python

"""Integration tests for server initialization and startup.
Tests the complete server initialization workflow:
- Database connection and session factory setup
- Preferences loading during startup
- Recovery service execution
- Servicer configuration with database
- Graceful shutdown
"""
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING
from uuid import uuid4
import pytest
from noteflow.application.services.recovery import RecoveryService
from noteflow.domain.entities import Meeting
from noteflow.domain.value_objects import MeetingState
from noteflow.grpc.proto import noteflow_pb2
from noteflow.grpc.service import NoteFlowServicer
from noteflow.infrastructure.persistence.repositories import DiarizationJob
from noteflow.infrastructure.persistence.repositories.diarization_job._constants import (
JOB_STATUS_COMPLETED,
JOB_STATUS_FAILED,
JOB_STATUS_QUEUED,
JOB_STATUS_RUNNING,
)
from noteflow.infrastructure.persistence.unit_of_work import SqlAlchemyUnitOfWork
if TYPE_CHECKING:
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
async def _create_meeting_with_jobs_for_shutdown(
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
) -> tuple[Meeting, DiarizationJob, DiarizationJob]:
"""Create a meeting with two jobs for shutdown testing."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create()
await uow.meetings.create(meeting)
job1 = DiarizationJob(
job_id=str(uuid4()),
meeting_id=str(meeting.id),
status=JOB_STATUS_QUEUED,
)
job2 = DiarizationJob(
job_id=str(uuid4()),
meeting_id=str(meeting.id),
status=JOB_STATUS_RUNNING,
)
await uow.diarization_jobs.create(job1)
await uow.diarization_jobs.create(job2)
await uow.commit()
return meeting, job1, job2
async def _verify_jobs_marked_failed_after_shutdown(
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
jobs: list[DiarizationJob],
) -> None:
"""Verify jobs are marked as failed after shutdown."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
for job in jobs:
retrieved = await uow.diarization_jobs.get(job.job_id)
assert retrieved is not None, f"Job {job.job_id} should exist after shutdown"
assert retrieved.status == JOB_STATUS_FAILED, (
f"Job should be marked failed on shutdown, got {retrieved.status}"
)
@pytest.mark.integration
class TestServerStartupPreferences:
"""Integration tests for preferences loading during server startup."""
async def test_servicer_initializes_withsession_factory(
self, session_factory: async_sessionmaker[AsyncSession]
) -> None:
"""Test servicer can be initialized with database session factory."""
servicer = NoteFlowServicer(session_factory=session_factory)
assert servicer.session_factory is not None, "Servicer should have session factory set"
async def test_preferences_loaded_on_startup(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test preferences can be loaded during startup."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
await uow.preferences.set("cloud_consent_granted", True)
await uow.preferences.set("default_language", "en")
await uow.commit()
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
consent = await uow.preferences.get_bool("cloud_consent_granted", False)
language = await uow.preferences.get("default_language")
assert consent is True, "Cloud consent preference should be True after being set"
assert language == "en", f"Default language should be 'en', got {language!r}"
async def test_preferences_default_when_not_set(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test preferences return defaults when not set."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
consent = await uow.preferences.get_bool("cloud_consent_granted", False)
assert consent is False, "Cloud consent should default to False when not set"
@pytest.mark.integration
class TestServerStartupRecovery:
"""Integration tests for crash recovery during server startup."""
async def test_recovery_runs_on_startup_simulation(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test recovery service properly recovers crashed meetings on startup."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
crashed_meeting = Meeting.create(title="Crashed")
crashed_meeting.start_recording()
await uow.meetings.create(crashed_meeting)
crashed_job = DiarizationJob(
job_id=str(uuid4()),
meeting_id=str(crashed_meeting.id),
status=JOB_STATUS_RUNNING,
)
await uow.diarization_jobs.create(crashed_job)
await uow.commit()
recovery_service = RecoveryService(SqlAlchemyUnitOfWork(session_factory, meetings_dir))
result = await recovery_service.recover_all()
assert result.meetings_recovered == 1, "Should recover 1 meeting"
assert result.diarization_jobs_failed == 1, "Should fail 1 diarization job"
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = await uow.meetings.get(crashed_meeting.id)
job = await uow.diarization_jobs.get(crashed_job.job_id)
assert meeting is not None, "Meeting should exist"
assert meeting.state == MeetingState.ERROR, "Meeting should be in ERROR state"
assert job is not None, "Job should exist"
assert job.status == JOB_STATUS_FAILED, "Job should be failed"
async def test_recovery_skips_clean_state(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test recovery doesn't affect clean meeting states."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
clean_meeting = Meeting.create(title="Clean")
await uow.meetings.create(clean_meeting)
completed_job = DiarizationJob(
job_id=str(uuid4()),
meeting_id=str(clean_meeting.id),
status=JOB_STATUS_COMPLETED,
)
await uow.diarization_jobs.create(completed_job)
await uow.commit()
recovery_service = RecoveryService(SqlAlchemyUnitOfWork(session_factory, meetings_dir))
result = await recovery_service.recover_all()
assert result.meetings_recovered == 0, (
f"No meetings should be recovered for clean state, got {result.meetings_recovered}"
)
assert result.diarization_jobs_failed == 0, (
f"No jobs should be failed for clean state, got {result.diarization_jobs_failed}"
)
@pytest.mark.integration
class TestServerGracefulShutdown:
"""Integration tests for graceful server shutdown."""
async def test_shutdown_marks_running_jobs_failed(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test shutdown marks all running diarization jobs as failed."""
_, job1, job2 = await _create_meeting_with_jobs_for_shutdown(session_factory, meetings_dir)
servicer = NoteFlowServicer(session_factory=session_factory)
await servicer.shutdown()
await _verify_jobs_marked_failed_after_shutdown(session_factory, meetings_dir, [job1, job2])
async def test_shutdown_preserves_completed_jobs(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test shutdown preserves already completed jobs."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create()
await uow.meetings.create(meeting)
completed_job = DiarizationJob(
job_id=str(uuid4()),
meeting_id=str(meeting.id),
status=JOB_STATUS_COMPLETED,
segments_updated=10,
)
await uow.diarization_jobs.create(completed_job)
await uow.commit()
servicer = NoteFlowServicer(session_factory=session_factory)
await servicer.shutdown()
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
job = await uow.diarization_jobs.get(completed_job.job_id)
assert job is not None, f"Completed job {completed_job.job_id} should exist after shutdown"
assert job.status == JOB_STATUS_COMPLETED, (
f"Completed job should remain completed after shutdown, got {job.status}"
)
assert job.segments_updated == 10, (
f"Completed job segments_updated should be preserved, expected 10, got {job.segments_updated}"
)
@pytest.mark.integration
class TestServerDatabaseOperations:
"""Integration tests for server database operations."""
async def test_get_server_info_counts_from_database(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test GetServerInfo counts active meetings from database."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
recording1 = Meeting.create(title="Recording 1")
recording1.start_recording()
await uow.meetings.create(recording1)
recording2 = Meeting.create(title="Recording 2")
recording2.start_recording()
await uow.meetings.create(recording2)
stopped = Meeting.create(title="Stopped")
stopped.start_recording()
stopped.begin_stopping()
stopped.stop_recording()
await uow.meetings.create(stopped)
await uow.commit()
servicer = NoteFlowServicer(session_factory=session_factory)
class MockContext:
async def abort(self, code: object, details: str) -> None:
raise Exception(f"{code}: {details}")
request = noteflow_pb2.ServerInfoRequest()
result = await servicer.GetServerInfo(request, MockContext())
assert result.active_meetings == 2, (
f"ServerInfo should report 2 active meetings, got {result.active_meetings}"
)
async def test_multiple_servicer_instances_share_database(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test multiple servicer instances can share the same database."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title="Shared Meeting")
await uow.meetings.create(meeting)
await uow.commit()
servicer1 = NoteFlowServicer(session_factory=session_factory)
servicer2 = NoteFlowServicer(session_factory=session_factory)
class MockContext:
async def abort(self, code: object, details: str) -> None:
raise Exception(f"{code}: {details}")
request = noteflow_pb2.GetMeetingRequest(meeting_id=str(meeting.id))
result1 = await servicer1.GetMeeting(request, MockContext())
result2 = await servicer2.GetMeeting(request, MockContext())
assert result1.id == result2.id, (
f"Both servicers should return same meeting ID, got {result1.id} vs {result2.id}"
)
assert result1.title == "Shared Meeting", (
f"Meeting title should be 'Shared Meeting', got {result1.title!r}"
)
@pytest.mark.integration
class TestServerDatabasePersistence:
"""Integration tests for database persistence across operations."""
async def test_meeting_survives_servicer_restart(
self, session_factory: async_sessionmaker[AsyncSession]
) -> None:
"""Test meeting data survives servicer instance restart."""
class MockContext:
async def abort(self, code: object, details: str) -> None:
raise Exception(f"{code}: {details}")
servicer1 = NoteFlowServicer(session_factory=session_factory)
create_request = noteflow_pb2.CreateMeetingRequest(title="Persistent Meeting")
created = await servicer1.CreateMeeting(create_request, MockContext())
meeting_id = created.id
del servicer1
servicer2 = NoteFlowServicer(session_factory=session_factory)
get_request = noteflow_pb2.GetMeetingRequest(meeting_id=meeting_id)
result = await servicer2.GetMeeting(get_request, MockContext())
assert result.id == meeting_id, (
f"Meeting ID should persist across servicer restart, expected {meeting_id}, got {result.id}"
)
assert result.title == "Persistent Meeting", (
f"Meeting title should persist across restart, got {result.title!r}"
)
async def test_preferences_survive_servicer_restart(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test preferences survive servicer instance restart."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
await uow.preferences.set("test_setting", "test_value")
await uow.commit()
servicer1 = NoteFlowServicer(session_factory=session_factory)
del servicer1
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
value = await uow.preferences.get("test_setting")
assert value == "test_value", (
f"Preference should persist across servicer restart, expected 'test_value', got {value!r}"
)