- Updated the client submodule to the latest commit for enhanced compatibility. - Refactored HTML export logic to utilize a dedicated function for building HTML documents, improving code clarity and maintainability. - Enhanced test assertions across various files to include descriptive messages, aiding in debugging and understanding test failures. All quality checks pass.
152 lines
6.6 KiB
Python
152 lines
6.6 KiB
Python
"""Integration tests for SqlAlchemyUnitOfWork."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import UTC, datetime
|
|
from pathlib import Path
|
|
from typing import TYPE_CHECKING
|
|
|
|
import pytest
|
|
|
|
from noteflow.domain.entities import Meeting, Segment, Summary
|
|
from noteflow.domain.value_objects import MeetingState
|
|
from noteflow.infrastructure.persistence.unit_of_work import SqlAlchemyUnitOfWork
|
|
|
|
if TYPE_CHECKING:
|
|
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
|
|
|
|
|
|
@pytest.mark.integration
|
|
class TestUnitOfWork:
|
|
"""Integration tests for SqlAlchemyUnitOfWork."""
|
|
|
|
async def test_uow_context_manager(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test UoW works as async context manager."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
assert uow.meetings is not None, "meetings repository should be initialized"
|
|
assert uow.segments is not None, "segments repository should be initialized"
|
|
assert uow.summaries is not None, "summaries repository should be initialized"
|
|
|
|
async def test_uow_commit(self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path) -> None:
|
|
"""Test UoW commit persists changes."""
|
|
meeting = Meeting.create(title="Commit Test")
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
await uow.meetings.create(meeting)
|
|
await uow.commit()
|
|
|
|
# Verify in new UoW
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
retrieved = await uow.meetings.get(meeting.id)
|
|
assert retrieved is not None, f"meeting {meeting.id} should exist after commit"
|
|
assert retrieved.title == "Commit Test", f"expected title 'Commit Test', got '{retrieved.title}'"
|
|
|
|
async def test_uow_rollback(self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path) -> None:
|
|
"""Test UoW rollback discards changes."""
|
|
meeting = Meeting.create(title="Rollback Test")
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
await uow.meetings.create(meeting)
|
|
await uow.rollback()
|
|
|
|
# Verify not persisted
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
retrieved = await uow.meetings.get(meeting.id)
|
|
assert retrieved is None, f"meeting {meeting.id} should not exist after rollback"
|
|
|
|
async def test_uow_auto_rollback_on_exception(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test UoW auto-rollbacks on exception."""
|
|
meeting = Meeting.create(title="Exception Test")
|
|
|
|
with pytest.raises(ValueError, match="Test exception"):
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
await uow.meetings.create(meeting)
|
|
raise ValueError("Test exception")
|
|
|
|
# Verify not persisted
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
retrieved = await uow.meetings.get(meeting.id)
|
|
assert retrieved is None, f"meeting {meeting.id} should not exist after exception rollback"
|
|
|
|
async def test_uow_transactional_consistency(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test UoW provides transactional consistency across repos."""
|
|
meeting = Meeting.create(title="Transactional Test")
|
|
segment = Segment(
|
|
segment_id=0,
|
|
text="Hello",
|
|
start_time=0.0,
|
|
end_time=1.0,
|
|
meeting_id=meeting.id,
|
|
)
|
|
summary = Summary(
|
|
meeting_id=meeting.id,
|
|
executive_summary="Test summary",
|
|
generated_at=datetime.now(UTC),
|
|
)
|
|
|
|
# Create meeting, segment, and summary in same transaction
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
await uow.meetings.create(meeting)
|
|
await uow.segments.add(meeting.id, segment)
|
|
await uow.summaries.save(summary)
|
|
await uow.commit()
|
|
|
|
# Verify all persisted
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
m = await uow.meetings.get(meeting.id)
|
|
segs = await uow.segments.get_by_meeting(meeting.id)
|
|
s = await uow.summaries.get_by_meeting(meeting.id)
|
|
|
|
assert m is not None, f"meeting {meeting.id} should exist after transactional commit"
|
|
assert len(segs) == 1, f"expected 1 segment, got {len(segs)}"
|
|
assert s is not None, f"summary for meeting {meeting.id} should exist after commit"
|
|
|
|
async def test_uow_repository_caching(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test UoW caches repository instances."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meetings1 = uow.meetings
|
|
meetings2 = uow.meetings
|
|
assert meetings1 is meetings2, "meetings repository should be cached (same instance)"
|
|
|
|
segments1 = uow.segments
|
|
segments2 = uow.segments
|
|
assert segments1 is segments2, "segments repository should be cached (same instance)"
|
|
|
|
async def test_uow_multiple_operations(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test UoW handles multiple operations in sequence."""
|
|
meeting = Meeting.create(title="Multi-op Test")
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
# Create
|
|
await uow.meetings.create(meeting)
|
|
await uow.commit()
|
|
|
|
# Update
|
|
meeting.start_recording()
|
|
await uow.meetings.update(meeting)
|
|
await uow.commit()
|
|
|
|
# Add segment
|
|
segment = Segment(segment_id=0, text="Test", start_time=0.0, end_time=1.0)
|
|
await uow.segments.add(meeting.id, segment)
|
|
await uow.commit()
|
|
|
|
# Verify final state
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
m = await uow.meetings.get(meeting.id)
|
|
segs = await uow.segments.get_by_meeting(meeting.id)
|
|
|
|
assert m is not None, f"meeting {meeting.id} should exist after multiple operations"
|
|
assert m.state == MeetingState.RECORDING, f"expected state RECORDING, got {m.state}"
|
|
assert len(segs) == 1, f"expected 1 segment after operations, got {len(segs)}"
|