- Updated the client submodule to the latest commit for improved features and stability. - Introduced new quality violation reports for better code quality assessment, including checks for duplicates, test smells, and magic numbers. - Enhanced linting configurations and diagnostics across various files to ensure cleaner code and adherence to standards.
398 lines
16 KiB
Python
398 lines
16 KiB
Python
"""Database resilience and connection pool tests.
|
|
|
|
Tests database failure scenarios, connection pooling behavior,
|
|
and transaction isolation under concurrent operations.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from pathlib import Path
|
|
from typing import TYPE_CHECKING
|
|
from uuid import UUID
|
|
|
|
import pytest
|
|
|
|
from noteflow.domain.entities import Meeting, Segment
|
|
from noteflow.domain.value_objects import MeetingId, MeetingState
|
|
from noteflow.infrastructure.persistence.unit_of_work import SqlAlchemyUnitOfWork
|
|
|
|
if TYPE_CHECKING:
|
|
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
|
|
|
|
# Test constants
|
|
DEFAULT_POOL_SIZE = 5
|
|
POOL_OVERFLOW_COUNT = 3
|
|
POOL_TIMEOUT_SECONDS = 5.0
|
|
BULK_SEGMENT_COUNT = 50
|
|
SEQUENTIAL_OPS_COUNT = 20
|
|
|
|
|
|
async def _attempt_concurrent_update(
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
meeting_id: MeetingId,
|
|
suffix: str,
|
|
) -> str:
|
|
"""Attempt concurrent update, return outcome string."""
|
|
try:
|
|
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
|
|
m = await uow.meetings.get(meeting_id)
|
|
if m is None:
|
|
return "not_found"
|
|
m.title = f"Updated-{suffix}"
|
|
await uow.meetings.update(m)
|
|
await uow.commit()
|
|
return "success"
|
|
except ValueError as e:
|
|
if "modified concurrently" in str(e):
|
|
return "conflict"
|
|
raise
|
|
|
|
|
|
class TestConnectionPoolBehavior:
|
|
"""Test behavior of database connection pooling."""
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.asyncio
|
|
async def test_concurrent_operations_within_pool_limit(
|
|
self, session_factory: async_sessionmaker[AsyncSession]
|
|
) -> None:
|
|
"""Verify system handles concurrent ops within pool size."""
|
|
from noteflow.infrastructure.persistence.unit_of_work import SqlAlchemyUnitOfWork
|
|
|
|
pool_size = DEFAULT_POOL_SIZE
|
|
|
|
async def concurrent_operation(idx: int) -> int:
|
|
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
|
|
# Multiple queries to simulate realistic workload
|
|
await uow.meetings.count_by_state(MeetingState.CREATED)
|
|
await uow.meetings.count_by_state(MeetingState.COMPLETED)
|
|
return idx
|
|
|
|
# Run exactly pool_size concurrent operations
|
|
tasks = [concurrent_operation(i) for i in range(pool_size)]
|
|
results = await asyncio.gather(*tasks)
|
|
|
|
assert len(results) == pool_size, (
|
|
f"expected {pool_size} results, got {len(results)}"
|
|
)
|
|
assert set(results) == set(range(pool_size)), (
|
|
f"expected result indices {set(range(pool_size))}, got {set(results)}"
|
|
)
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.asyncio
|
|
async def test_graceful_handling_beyond_pool_limit(
|
|
self, session_factory: async_sessionmaker[AsyncSession]
|
|
) -> None:
|
|
"""Verify operations queue when pool is exhausted (not crash)."""
|
|
from noteflow.infrastructure.persistence.unit_of_work import SqlAlchemyUnitOfWork
|
|
|
|
pool_size = DEFAULT_POOL_SIZE
|
|
overflow = POOL_OVERFLOW_COUNT
|
|
|
|
async def long_operation(idx: int) -> int:
|
|
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
|
|
# Perform multiple queries to extend operation duration naturally
|
|
await uow.meetings.count_by_state(MeetingState.CREATED)
|
|
await uow.meetings.count_by_state(MeetingState.COMPLETED)
|
|
await uow.meetings.count_by_state(MeetingState.RECORDING)
|
|
return idx
|
|
|
|
# Run more operations than pool size
|
|
tasks = [
|
|
asyncio.create_task(long_operation(i))
|
|
for i in range(pool_size + overflow)
|
|
]
|
|
|
|
# Should complete (with queueing), not crash
|
|
async with asyncio.timeout(POOL_TIMEOUT_SECONDS):
|
|
results = await asyncio.gather(*tasks)
|
|
|
|
assert len(results) == pool_size + overflow, (
|
|
f"expected {pool_size + overflow} results, got {len(results)}"
|
|
)
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.asyncio
|
|
async def test_connection_released_after_context_exit(
|
|
self, session_factory: async_sessionmaker[AsyncSession]
|
|
) -> None:
|
|
"""Verify connections are properly released after UoW context exits."""
|
|
from noteflow.infrastructure.persistence.unit_of_work import SqlAlchemyUnitOfWork
|
|
|
|
# Run many sequential operations - should not exhaust pool
|
|
for _ in range(SEQUENTIAL_OPS_COUNT):
|
|
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
|
|
await uow.meetings.count_by_state(MeetingState.CREATED)
|
|
|
|
# If we get here without timeout/error, connections were released
|
|
|
|
|
|
class TestTransactionRollback:
|
|
"""Test transaction rollback behavior."""
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.asyncio
|
|
async def test_rollback_on_no_commit(
|
|
self, session_factory: async_sessionmaker[AsyncSession]
|
|
) -> None:
|
|
"""Verify rollback when commit is not called."""
|
|
from noteflow.infrastructure.persistence.unit_of_work import SqlAlchemyUnitOfWork
|
|
|
|
meeting = Meeting.create(title="Rollback Test")
|
|
meeting_id = meeting.id
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
|
|
await uow.meetings.create(meeting)
|
|
# Don't commit - should rollback on exit
|
|
|
|
# Verify not persisted
|
|
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
|
|
retrieved = await uow.meetings.get(meeting_id)
|
|
assert retrieved is None, (
|
|
f"meeting {meeting_id} should not persist without commit"
|
|
)
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.asyncio
|
|
async def test_explicit_rollback(
|
|
self, session_factory: async_sessionmaker[AsyncSession]
|
|
) -> None:
|
|
"""Verify explicit rollback reverts changes."""
|
|
from noteflow.infrastructure.persistence.unit_of_work import SqlAlchemyUnitOfWork
|
|
|
|
meeting = Meeting.create(title="Explicit Rollback")
|
|
meeting_id = meeting.id
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
|
|
await uow.meetings.create(meeting)
|
|
await uow.rollback()
|
|
|
|
# Verify not persisted
|
|
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
|
|
retrieved = await uow.meetings.get(meeting_id)
|
|
assert retrieved is None, (
|
|
f"meeting {meeting_id} should not persist after explicit rollback"
|
|
)
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.asyncio
|
|
async def test_commit_persists_changes(
|
|
self, session_factory: async_sessionmaker[AsyncSession]
|
|
) -> None:
|
|
"""Verify commit persists changes."""
|
|
from noteflow.infrastructure.persistence.unit_of_work import SqlAlchemyUnitOfWork
|
|
|
|
meeting = Meeting.create(title="Commit Test")
|
|
meeting_id = meeting.id
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
|
|
await uow.meetings.create(meeting)
|
|
await uow.commit()
|
|
|
|
# Verify persisted
|
|
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
|
|
retrieved = await uow.meetings.get(meeting_id)
|
|
assert retrieved is not None, (
|
|
f"meeting {meeting_id} should persist after commit"
|
|
)
|
|
assert retrieved.title == "Commit Test", (
|
|
f"expected title 'Commit Test', got '{retrieved.title}'"
|
|
)
|
|
|
|
|
|
class TestTransactionIsolation:
|
|
"""Test transaction isolation under concurrent modifications."""
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.asyncio
|
|
async def test_concurrent_meeting_updates(
|
|
self, session_factory: async_sessionmaker[AsyncSession]
|
|
) -> None:
|
|
"""Verify optimistic locking detects concurrent modifications."""
|
|
meeting = Meeting.create(title="Concurrent Test")
|
|
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
|
|
await uow.meetings.create(meeting)
|
|
await uow.commit()
|
|
|
|
# Gather concurrent update attempts
|
|
outcomes = await asyncio.gather(
|
|
_attempt_concurrent_update(session_factory, meeting.id, "A"),
|
|
_attempt_concurrent_update(session_factory, meeting.id, "B"),
|
|
_attempt_concurrent_update(session_factory, meeting.id, "C"),
|
|
)
|
|
success_count = outcomes.count("success")
|
|
conflict_count = outcomes.count("conflict")
|
|
assert success_count >= 1, "At least one update should succeed"
|
|
assert success_count + conflict_count == 3, "All attempts should complete"
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
|
|
final = await uow.meetings.get(meeting.id)
|
|
assert final is not None, "Meeting should exist"
|
|
assert final.title.startswith("Updated-"), "Title should be updated"
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.asyncio
|
|
async def test_concurrent_creates_unique_ids(
|
|
self, session_factory: async_sessionmaker[AsyncSession]
|
|
) -> None:
|
|
"""Verify concurrent creates with unique IDs succeed."""
|
|
from noteflow.infrastructure.persistence.unit_of_work import SqlAlchemyUnitOfWork
|
|
|
|
async def create_meeting(idx: int) -> str:
|
|
meeting = Meeting.create(title=f"Concurrent Create {idx}")
|
|
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
|
|
await uow.meetings.create(meeting)
|
|
await uow.commit()
|
|
return str(meeting.id)
|
|
|
|
# Create 10 meetings concurrently
|
|
ids = await asyncio.gather(*[create_meeting(i) for i in range(10)])
|
|
|
|
# All should have unique IDs
|
|
assert len(set(ids)) == 10, (
|
|
f"expected 10 unique IDs, got {len(set(ids))} unique from {ids}"
|
|
)
|
|
|
|
# All should be persisted
|
|
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
|
|
missing_ids = [
|
|
mid
|
|
for mid in ids
|
|
if await uow.meetings.get(MeetingId(UUID(mid))) is None
|
|
]
|
|
assert not missing_ids, f"meetings not persisted: {missing_ids}"
|
|
|
|
|
|
class TestDatabaseReconnection:
|
|
"""Test database reconnection behavior."""
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.asyncio
|
|
async def test_operations_succeed_after_idle(
|
|
self, session_factory: async_sessionmaker[AsyncSession]
|
|
) -> None:
|
|
"""Verify operations succeed after connection idle period."""
|
|
from noteflow.infrastructure.persistence.unit_of_work import SqlAlchemyUnitOfWork
|
|
|
|
# First operation - establish connection
|
|
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
|
|
await uow.meetings.count_by_state(MeetingState.CREATED)
|
|
|
|
# Connection is released back to pool; pool_pre_ping verifies on reuse
|
|
# Second operation uses a potentially different connection from pool
|
|
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
|
|
count = await uow.meetings.count_by_state(MeetingState.CREATED)
|
|
assert count >= 0, f"expected non-negative count, got {count}"
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.asyncio
|
|
async def test_multiple_uow_sessions_independent(
|
|
self, session_factory: async_sessionmaker[AsyncSession]
|
|
) -> None:
|
|
"""Verify multiple UoW sessions are independent."""
|
|
from noteflow.infrastructure.persistence.unit_of_work import SqlAlchemyUnitOfWork
|
|
|
|
meeting = Meeting.create(title="Independence Test")
|
|
meeting_id = meeting.id
|
|
|
|
# Session 1: Add but don't commit
|
|
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow1:
|
|
await uow1.meetings.create(meeting)
|
|
|
|
# Session 2: Should not see uncommitted changes
|
|
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow2:
|
|
found = await uow2.meetings.get(meeting_id)
|
|
assert found is None, (
|
|
f"meeting {meeting_id} should not be visible in other session "
|
|
"before commit"
|
|
)
|
|
|
|
# Still not committed in uow1
|
|
|
|
|
|
class TestSegmentOperations:
|
|
"""Test segment repository operations under load."""
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.asyncio
|
|
async def test_bulk_segment_creation(
|
|
self, session_factory: async_sessionmaker[AsyncSession]
|
|
) -> None:
|
|
"""Verify bulk segment creation performance."""
|
|
from noteflow.infrastructure.persistence.unit_of_work import SqlAlchemyUnitOfWork
|
|
|
|
meeting = Meeting.create(title="Bulk Segments")
|
|
meeting_id = meeting.id
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
|
|
await uow.meetings.create(meeting)
|
|
await uow.commit()
|
|
|
|
# Create bulk segments
|
|
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
|
|
for i in range(BULK_SEGMENT_COUNT):
|
|
segment = Segment(
|
|
meeting_id=meeting_id,
|
|
segment_id=i,
|
|
text=f"Segment {i}",
|
|
start_time=float(i),
|
|
end_time=float(i + 1),
|
|
)
|
|
await uow.segments.add(meeting_id, segment)
|
|
await uow.commit()
|
|
|
|
# Verify all segments persisted
|
|
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
|
|
segments = await uow.segments.get_by_meeting(meeting_id)
|
|
assert len(segments) == BULK_SEGMENT_COUNT, (
|
|
f"expected {BULK_SEGMENT_COUNT} segments, got {len(segments)}"
|
|
)
|
|
|
|
|
|
class TestDatabaseFailureChaos:
|
|
"""Chaos tests for database connection failures."""
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.asyncio
|
|
async def test_operation_after_rollback(
|
|
self, session_factory: async_sessionmaker[AsyncSession]
|
|
) -> None:
|
|
"""Verify session usable after rollback due to error."""
|
|
from noteflow.infrastructure.persistence.unit_of_work import SqlAlchemyUnitOfWork
|
|
|
|
meeting = Meeting.create(title="Rollback Recovery")
|
|
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
|
|
await uow.meetings.create(meeting)
|
|
await uow.rollback()
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
|
|
count = await uow.meetings.count_by_state(MeetingState.CREATED)
|
|
assert count >= 0, "Should query after rollback"
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.asyncio
|
|
async def test_operations_succeed_after_pool_dispose(
|
|
self, session_factory: async_sessionmaker[AsyncSession]
|
|
) -> None:
|
|
"""Verify operations succeed after pool disposal (new connection)."""
|
|
from noteflow.infrastructure.persistence.unit_of_work import SqlAlchemyUnitOfWork
|
|
|
|
# First operation
|
|
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
|
|
await uow.meetings.count_by_state(MeetingState.CREATED)
|
|
|
|
# Dispose pool (closes all pooled connections)
|
|
engine = session_factory.kw["bind"]
|
|
await engine.dispose()
|
|
|
|
# New operation should succeed (creates fresh connection)
|
|
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
|
|
count = await uow.meetings.count_by_state(MeetingState.CREATED)
|
|
assert count >= 0, "Should succeed with fresh connection"
|
|
|
|
|
|
# NOTE: Webhook repository tests moved to tests/integration/test_webhook_repository.py
|
|
# which includes proper workspace fixture setup for foreign key constraints.
|