338 lines
14 KiB
Python
338 lines
14 KiB
Python
"""Integration tests for server initialization and startup.
|
|
|
|
Tests the complete server initialization workflow:
|
|
- Database connection and session factory setup
|
|
- Preferences loading during startup
|
|
- Recovery service execution
|
|
- Servicer configuration with database
|
|
- Graceful shutdown
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
from typing import TYPE_CHECKING
|
|
from uuid import uuid4
|
|
|
|
import pytest
|
|
|
|
from noteflow.application.services.recovery import RecoveryService
|
|
from noteflow.domain.entities import Meeting
|
|
from noteflow.domain.value_objects import MeetingState
|
|
from noteflow.grpc.proto import noteflow_pb2
|
|
from noteflow.grpc.service import NoteFlowServicer
|
|
from noteflow.infrastructure.persistence.repositories import DiarizationJob
|
|
from noteflow.infrastructure.persistence.repositories.diarization_job._constants import (
|
|
JOB_STATUS_COMPLETED,
|
|
JOB_STATUS_FAILED,
|
|
JOB_STATUS_QUEUED,
|
|
JOB_STATUS_RUNNING,
|
|
)
|
|
from noteflow.infrastructure.persistence.unit_of_work import SqlAlchemyUnitOfWork
|
|
|
|
if TYPE_CHECKING:
|
|
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
|
|
|
|
|
|
async def _create_meeting_with_jobs_for_shutdown(
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
meetings_dir: Path,
|
|
) -> tuple[Meeting, DiarizationJob, DiarizationJob]:
|
|
"""Create a meeting with two jobs for shutdown testing."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create()
|
|
await uow.meetings.create(meeting)
|
|
|
|
job1 = DiarizationJob(
|
|
job_id=str(uuid4()),
|
|
meeting_id=str(meeting.id),
|
|
status=JOB_STATUS_QUEUED,
|
|
)
|
|
job2 = DiarizationJob(
|
|
job_id=str(uuid4()),
|
|
meeting_id=str(meeting.id),
|
|
status=JOB_STATUS_RUNNING,
|
|
)
|
|
await uow.diarization_jobs.create(job1)
|
|
await uow.diarization_jobs.create(job2)
|
|
await uow.commit()
|
|
return meeting, job1, job2
|
|
|
|
|
|
async def _verify_jobs_marked_failed_after_shutdown(
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
meetings_dir: Path,
|
|
jobs: list[DiarizationJob],
|
|
) -> None:
|
|
"""Verify jobs are marked as failed after shutdown."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
for job in jobs:
|
|
retrieved = await uow.diarization_jobs.get(job.job_id)
|
|
assert retrieved is not None, f"Job {job.job_id} should exist after shutdown"
|
|
assert retrieved.status == JOB_STATUS_FAILED, (
|
|
f"Job should be marked failed on shutdown, got {retrieved.status}"
|
|
)
|
|
|
|
|
|
@pytest.mark.integration
|
|
class TestServerStartupPreferences:
|
|
"""Integration tests for preferences loading during server startup."""
|
|
|
|
async def test_servicer_initializes_withsession_factory(
|
|
self, session_factory: async_sessionmaker[AsyncSession]
|
|
) -> None:
|
|
"""Test servicer can be initialized with database session factory."""
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
|
|
assert servicer.session_factory is not None, "Servicer should have session factory set"
|
|
|
|
async def test_preferences_loaded_on_startup(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test preferences can be loaded during startup."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
await uow.preferences.set("cloud_consent_granted", True)
|
|
await uow.preferences.set("default_language", "en")
|
|
await uow.commit()
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
consent = await uow.preferences.get_bool("cloud_consent_granted", False)
|
|
language = await uow.preferences.get("default_language")
|
|
|
|
assert consent is True, "Cloud consent preference should be True after being set"
|
|
assert language == "en", f"Default language should be 'en', got {language!r}"
|
|
|
|
async def test_preferences_default_when_not_set(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test preferences return defaults when not set."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
consent = await uow.preferences.get_bool("cloud_consent_granted", False)
|
|
|
|
assert consent is False, "Cloud consent should default to False when not set"
|
|
|
|
|
|
@pytest.mark.integration
|
|
class TestServerStartupRecovery:
|
|
"""Integration tests for crash recovery during server startup."""
|
|
|
|
async def test_recovery_runs_on_startup_simulation(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test recovery service properly recovers crashed meetings on startup."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
crashed_meeting = Meeting.create(title="Crashed")
|
|
crashed_meeting.start_recording()
|
|
await uow.meetings.create(crashed_meeting)
|
|
|
|
crashed_job = DiarizationJob(
|
|
job_id=str(uuid4()),
|
|
meeting_id=str(crashed_meeting.id),
|
|
status=JOB_STATUS_RUNNING,
|
|
)
|
|
await uow.diarization_jobs.create(crashed_job)
|
|
await uow.commit()
|
|
|
|
recovery_service = RecoveryService(SqlAlchemyUnitOfWork(session_factory, meetings_dir))
|
|
result = await recovery_service.recover_all()
|
|
|
|
assert result.meetings_recovered == 1, "Should recover 1 meeting"
|
|
assert result.diarization_jobs_failed == 1, "Should fail 1 diarization job"
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = await uow.meetings.get(crashed_meeting.id)
|
|
job = await uow.diarization_jobs.get(crashed_job.job_id)
|
|
|
|
assert meeting is not None, "Meeting should exist"
|
|
assert meeting.state == MeetingState.ERROR, "Meeting should be in ERROR state"
|
|
assert job is not None, "Job should exist"
|
|
assert job.status == JOB_STATUS_FAILED, "Job should be failed"
|
|
|
|
async def test_recovery_skips_clean_state(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test recovery doesn't affect clean meeting states."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
clean_meeting = Meeting.create(title="Clean")
|
|
await uow.meetings.create(clean_meeting)
|
|
|
|
completed_job = DiarizationJob(
|
|
job_id=str(uuid4()),
|
|
meeting_id=str(clean_meeting.id),
|
|
status=JOB_STATUS_COMPLETED,
|
|
)
|
|
await uow.diarization_jobs.create(completed_job)
|
|
await uow.commit()
|
|
|
|
recovery_service = RecoveryService(SqlAlchemyUnitOfWork(session_factory, meetings_dir))
|
|
result = await recovery_service.recover_all()
|
|
|
|
assert result.meetings_recovered == 0, (
|
|
f"No meetings should be recovered for clean state, got {result.meetings_recovered}"
|
|
)
|
|
assert result.diarization_jobs_failed == 0, (
|
|
f"No jobs should be failed for clean state, got {result.diarization_jobs_failed}"
|
|
)
|
|
|
|
|
|
@pytest.mark.integration
|
|
class TestServerGracefulShutdown:
|
|
"""Integration tests for graceful server shutdown."""
|
|
|
|
async def test_shutdown_marks_running_jobs_failed(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test shutdown marks all running diarization jobs as failed."""
|
|
_, job1, job2 = await _create_meeting_with_jobs_for_shutdown(session_factory, meetings_dir)
|
|
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
await servicer.shutdown()
|
|
|
|
await _verify_jobs_marked_failed_after_shutdown(session_factory, meetings_dir, [job1, job2])
|
|
|
|
async def test_shutdown_preserves_completed_jobs(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test shutdown preserves already completed jobs."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create()
|
|
await uow.meetings.create(meeting)
|
|
|
|
completed_job = DiarizationJob(
|
|
job_id=str(uuid4()),
|
|
meeting_id=str(meeting.id),
|
|
status=JOB_STATUS_COMPLETED,
|
|
segments_updated=10,
|
|
)
|
|
await uow.diarization_jobs.create(completed_job)
|
|
await uow.commit()
|
|
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
await servicer.shutdown()
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
job = await uow.diarization_jobs.get(completed_job.job_id)
|
|
|
|
assert job is not None, f"Completed job {completed_job.job_id} should exist after shutdown"
|
|
assert job.status == JOB_STATUS_COMPLETED, (
|
|
f"Completed job should remain completed after shutdown, got {job.status}"
|
|
)
|
|
assert job.segments_updated == 10, (
|
|
f"Completed job segments_updated should be preserved, expected 10, got {job.segments_updated}"
|
|
)
|
|
|
|
|
|
@pytest.mark.integration
|
|
class TestServerDatabaseOperations:
|
|
"""Integration tests for server database operations."""
|
|
|
|
async def test_get_server_info_counts_from_database(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test GetServerInfo counts active meetings from database."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
recording1 = Meeting.create(title="Recording 1")
|
|
recording1.start_recording()
|
|
await uow.meetings.create(recording1)
|
|
|
|
recording2 = Meeting.create(title="Recording 2")
|
|
recording2.start_recording()
|
|
await uow.meetings.create(recording2)
|
|
|
|
stopped = Meeting.create(title="Stopped")
|
|
stopped.start_recording()
|
|
stopped.begin_stopping()
|
|
stopped.stop_recording()
|
|
await uow.meetings.create(stopped)
|
|
await uow.commit()
|
|
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
|
|
class MockContext:
|
|
async def abort(self, code: object, details: str) -> None:
|
|
raise Exception(f"{code}: {details}")
|
|
|
|
request = noteflow_pb2.ServerInfoRequest()
|
|
result = await servicer.GetServerInfo(request, MockContext())
|
|
|
|
assert result.active_meetings == 2, (
|
|
f"ServerInfo should report 2 active meetings, got {result.active_meetings}"
|
|
)
|
|
|
|
async def test_multiple_servicer_instances_share_database(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test multiple servicer instances can share the same database."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create(title="Shared Meeting")
|
|
await uow.meetings.create(meeting)
|
|
await uow.commit()
|
|
|
|
servicer1 = NoteFlowServicer(session_factory=session_factory)
|
|
servicer2 = NoteFlowServicer(session_factory=session_factory)
|
|
|
|
class MockContext:
|
|
async def abort(self, code: object, details: str) -> None:
|
|
raise Exception(f"{code}: {details}")
|
|
|
|
request = noteflow_pb2.GetMeetingRequest(meeting_id=str(meeting.id))
|
|
|
|
result1 = await servicer1.GetMeeting(request, MockContext())
|
|
result2 = await servicer2.GetMeeting(request, MockContext())
|
|
|
|
assert result1.id == result2.id, (
|
|
f"Both servicers should return same meeting ID, got {result1.id} vs {result2.id}"
|
|
)
|
|
assert result1.title == "Shared Meeting", (
|
|
f"Meeting title should be 'Shared Meeting', got {result1.title!r}"
|
|
)
|
|
|
|
|
|
@pytest.mark.integration
|
|
class TestServerDatabasePersistence:
|
|
"""Integration tests for database persistence across operations."""
|
|
|
|
async def test_meeting_survives_servicer_restart(
|
|
self, session_factory: async_sessionmaker[AsyncSession]
|
|
) -> None:
|
|
"""Test meeting data survives servicer instance restart."""
|
|
|
|
class MockContext:
|
|
async def abort(self, code: object, details: str) -> None:
|
|
raise Exception(f"{code}: {details}")
|
|
|
|
servicer1 = NoteFlowServicer(session_factory=session_factory)
|
|
create_request = noteflow_pb2.CreateMeetingRequest(title="Persistent Meeting")
|
|
created = await servicer1.CreateMeeting(create_request, MockContext())
|
|
meeting_id = created.id
|
|
|
|
del servicer1
|
|
|
|
servicer2 = NoteFlowServicer(session_factory=session_factory)
|
|
get_request = noteflow_pb2.GetMeetingRequest(meeting_id=meeting_id)
|
|
result = await servicer2.GetMeeting(get_request, MockContext())
|
|
|
|
assert result.id == meeting_id, (
|
|
f"Meeting ID should persist across servicer restart, expected {meeting_id}, got {result.id}"
|
|
)
|
|
assert result.title == "Persistent Meeting", (
|
|
f"Meeting title should persist across restart, got {result.title!r}"
|
|
)
|
|
|
|
async def test_preferences_survive_servicer_restart(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test preferences survive servicer instance restart."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
await uow.preferences.set("test_setting", "test_value")
|
|
await uow.commit()
|
|
|
|
servicer1 = NoteFlowServicer(session_factory=session_factory)
|
|
del servicer1
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
value = await uow.preferences.get("test_setting")
|
|
assert value == "test_value", (
|
|
f"Preference should persist across servicer restart, expected 'test_value', got {value!r}"
|
|
)
|