Files
noteflow/tests/integration/test_database_resilience.py
Travis Vasceannie 84e0c00b6b chore: update client submodule and enhance quality checks
- Updated the client submodule to the latest commit for improved features and stability.
- Introduced new quality violation reports for better code quality assessment, including checks for duplicates, test smells, and magic numbers.
- Enhanced linting configurations and diagnostics across various files to ensure cleaner code and adherence to standards.
2026-01-06 02:51:21 +00:00

398 lines
16 KiB
Python

"""Database resilience and connection pool tests.
Tests database failure scenarios, connection pooling behavior,
and transaction isolation under concurrent operations.
"""
from __future__ import annotations
import asyncio
from pathlib import Path
from typing import TYPE_CHECKING
from uuid import UUID
import pytest
from noteflow.domain.entities import Meeting, Segment
from noteflow.domain.value_objects import MeetingId, MeetingState
from noteflow.infrastructure.persistence.unit_of_work import SqlAlchemyUnitOfWork
if TYPE_CHECKING:
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
# Test constants
DEFAULT_POOL_SIZE = 5
POOL_OVERFLOW_COUNT = 3
POOL_TIMEOUT_SECONDS = 5.0
BULK_SEGMENT_COUNT = 50
SEQUENTIAL_OPS_COUNT = 20
async def _attempt_concurrent_update(
session_factory: async_sessionmaker[AsyncSession],
meeting_id: MeetingId,
suffix: str,
) -> str:
"""Attempt concurrent update, return outcome string."""
try:
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
m = await uow.meetings.get(meeting_id)
if m is None:
return "not_found"
m.title = f"Updated-{suffix}"
await uow.meetings.update(m)
await uow.commit()
return "success"
except ValueError as e:
if "modified concurrently" in str(e):
return "conflict"
raise
class TestConnectionPoolBehavior:
"""Test behavior of database connection pooling."""
@pytest.mark.integration
@pytest.mark.asyncio
async def test_concurrent_operations_within_pool_limit(
self, session_factory: async_sessionmaker[AsyncSession]
) -> None:
"""Verify system handles concurrent ops within pool size."""
from noteflow.infrastructure.persistence.unit_of_work import SqlAlchemyUnitOfWork
pool_size = DEFAULT_POOL_SIZE
async def concurrent_operation(idx: int) -> int:
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
# Multiple queries to simulate realistic workload
await uow.meetings.count_by_state(MeetingState.CREATED)
await uow.meetings.count_by_state(MeetingState.COMPLETED)
return idx
# Run exactly pool_size concurrent operations
tasks = [concurrent_operation(i) for i in range(pool_size)]
results = await asyncio.gather(*tasks)
assert len(results) == pool_size, (
f"expected {pool_size} results, got {len(results)}"
)
assert set(results) == set(range(pool_size)), (
f"expected result indices {set(range(pool_size))}, got {set(results)}"
)
@pytest.mark.integration
@pytest.mark.asyncio
async def test_graceful_handling_beyond_pool_limit(
self, session_factory: async_sessionmaker[AsyncSession]
) -> None:
"""Verify operations queue when pool is exhausted (not crash)."""
from noteflow.infrastructure.persistence.unit_of_work import SqlAlchemyUnitOfWork
pool_size = DEFAULT_POOL_SIZE
overflow = POOL_OVERFLOW_COUNT
async def long_operation(idx: int) -> int:
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
# Perform multiple queries to extend operation duration naturally
await uow.meetings.count_by_state(MeetingState.CREATED)
await uow.meetings.count_by_state(MeetingState.COMPLETED)
await uow.meetings.count_by_state(MeetingState.RECORDING)
return idx
# Run more operations than pool size
tasks = [
asyncio.create_task(long_operation(i))
for i in range(pool_size + overflow)
]
# Should complete (with queueing), not crash
async with asyncio.timeout(POOL_TIMEOUT_SECONDS):
results = await asyncio.gather(*tasks)
assert len(results) == pool_size + overflow, (
f"expected {pool_size + overflow} results, got {len(results)}"
)
@pytest.mark.integration
@pytest.mark.asyncio
async def test_connection_released_after_context_exit(
self, session_factory: async_sessionmaker[AsyncSession]
) -> None:
"""Verify connections are properly released after UoW context exits."""
from noteflow.infrastructure.persistence.unit_of_work import SqlAlchemyUnitOfWork
# Run many sequential operations - should not exhaust pool
for _ in range(SEQUENTIAL_OPS_COUNT):
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
await uow.meetings.count_by_state(MeetingState.CREATED)
# If we get here without timeout/error, connections were released
class TestTransactionRollback:
"""Test transaction rollback behavior."""
@pytest.mark.integration
@pytest.mark.asyncio
async def test_rollback_on_no_commit(
self, session_factory: async_sessionmaker[AsyncSession]
) -> None:
"""Verify rollback when commit is not called."""
from noteflow.infrastructure.persistence.unit_of_work import SqlAlchemyUnitOfWork
meeting = Meeting.create(title="Rollback Test")
meeting_id = meeting.id
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
await uow.meetings.create(meeting)
# Don't commit - should rollback on exit
# Verify not persisted
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
retrieved = await uow.meetings.get(meeting_id)
assert retrieved is None, (
f"meeting {meeting_id} should not persist without commit"
)
@pytest.mark.integration
@pytest.mark.asyncio
async def test_explicit_rollback(
self, session_factory: async_sessionmaker[AsyncSession]
) -> None:
"""Verify explicit rollback reverts changes."""
from noteflow.infrastructure.persistence.unit_of_work import SqlAlchemyUnitOfWork
meeting = Meeting.create(title="Explicit Rollback")
meeting_id = meeting.id
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
await uow.meetings.create(meeting)
await uow.rollback()
# Verify not persisted
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
retrieved = await uow.meetings.get(meeting_id)
assert retrieved is None, (
f"meeting {meeting_id} should not persist after explicit rollback"
)
@pytest.mark.integration
@pytest.mark.asyncio
async def test_commit_persists_changes(
self, session_factory: async_sessionmaker[AsyncSession]
) -> None:
"""Verify commit persists changes."""
from noteflow.infrastructure.persistence.unit_of_work import SqlAlchemyUnitOfWork
meeting = Meeting.create(title="Commit Test")
meeting_id = meeting.id
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
await uow.meetings.create(meeting)
await uow.commit()
# Verify persisted
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
retrieved = await uow.meetings.get(meeting_id)
assert retrieved is not None, (
f"meeting {meeting_id} should persist after commit"
)
assert retrieved.title == "Commit Test", (
f"expected title 'Commit Test', got '{retrieved.title}'"
)
class TestTransactionIsolation:
"""Test transaction isolation under concurrent modifications."""
@pytest.mark.integration
@pytest.mark.asyncio
async def test_concurrent_meeting_updates(
self, session_factory: async_sessionmaker[AsyncSession]
) -> None:
"""Verify optimistic locking detects concurrent modifications."""
meeting = Meeting.create(title="Concurrent Test")
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
await uow.meetings.create(meeting)
await uow.commit()
# Gather concurrent update attempts
outcomes = await asyncio.gather(
_attempt_concurrent_update(session_factory, meeting.id, "A"),
_attempt_concurrent_update(session_factory, meeting.id, "B"),
_attempt_concurrent_update(session_factory, meeting.id, "C"),
)
success_count = outcomes.count("success")
conflict_count = outcomes.count("conflict")
assert success_count >= 1, "At least one update should succeed"
assert success_count + conflict_count == 3, "All attempts should complete"
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
final = await uow.meetings.get(meeting.id)
assert final is not None, "Meeting should exist"
assert final.title.startswith("Updated-"), "Title should be updated"
@pytest.mark.integration
@pytest.mark.asyncio
async def test_concurrent_creates_unique_ids(
self, session_factory: async_sessionmaker[AsyncSession]
) -> None:
"""Verify concurrent creates with unique IDs succeed."""
from noteflow.infrastructure.persistence.unit_of_work import SqlAlchemyUnitOfWork
async def create_meeting(idx: int) -> str:
meeting = Meeting.create(title=f"Concurrent Create {idx}")
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
await uow.meetings.create(meeting)
await uow.commit()
return str(meeting.id)
# Create 10 meetings concurrently
ids = await asyncio.gather(*[create_meeting(i) for i in range(10)])
# All should have unique IDs
assert len(set(ids)) == 10, (
f"expected 10 unique IDs, got {len(set(ids))} unique from {ids}"
)
# All should be persisted
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
missing_ids = [
mid
for mid in ids
if await uow.meetings.get(MeetingId(UUID(mid))) is None
]
assert not missing_ids, f"meetings not persisted: {missing_ids}"
class TestDatabaseReconnection:
"""Test database reconnection behavior."""
@pytest.mark.integration
@pytest.mark.asyncio
async def test_operations_succeed_after_idle(
self, session_factory: async_sessionmaker[AsyncSession]
) -> None:
"""Verify operations succeed after connection idle period."""
from noteflow.infrastructure.persistence.unit_of_work import SqlAlchemyUnitOfWork
# First operation - establish connection
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
await uow.meetings.count_by_state(MeetingState.CREATED)
# Connection is released back to pool; pool_pre_ping verifies on reuse
# Second operation uses a potentially different connection from pool
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
count = await uow.meetings.count_by_state(MeetingState.CREATED)
assert count >= 0, f"expected non-negative count, got {count}"
@pytest.mark.integration
@pytest.mark.asyncio
async def test_multiple_uow_sessions_independent(
self, session_factory: async_sessionmaker[AsyncSession]
) -> None:
"""Verify multiple UoW sessions are independent."""
from noteflow.infrastructure.persistence.unit_of_work import SqlAlchemyUnitOfWork
meeting = Meeting.create(title="Independence Test")
meeting_id = meeting.id
# Session 1: Add but don't commit
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow1:
await uow1.meetings.create(meeting)
# Session 2: Should not see uncommitted changes
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow2:
found = await uow2.meetings.get(meeting_id)
assert found is None, (
f"meeting {meeting_id} should not be visible in other session "
"before commit"
)
# Still not committed in uow1
class TestSegmentOperations:
"""Test segment repository operations under load."""
@pytest.mark.integration
@pytest.mark.asyncio
async def test_bulk_segment_creation(
self, session_factory: async_sessionmaker[AsyncSession]
) -> None:
"""Verify bulk segment creation performance."""
from noteflow.infrastructure.persistence.unit_of_work import SqlAlchemyUnitOfWork
meeting = Meeting.create(title="Bulk Segments")
meeting_id = meeting.id
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
await uow.meetings.create(meeting)
await uow.commit()
# Create bulk segments
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
for i in range(BULK_SEGMENT_COUNT):
segment = Segment(
meeting_id=meeting_id,
segment_id=i,
text=f"Segment {i}",
start_time=float(i),
end_time=float(i + 1),
)
await uow.segments.add(meeting_id, segment)
await uow.commit()
# Verify all segments persisted
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
segments = await uow.segments.get_by_meeting(meeting_id)
assert len(segments) == BULK_SEGMENT_COUNT, (
f"expected {BULK_SEGMENT_COUNT} segments, got {len(segments)}"
)
class TestDatabaseFailureChaos:
"""Chaos tests for database connection failures."""
@pytest.mark.integration
@pytest.mark.asyncio
async def test_operation_after_rollback(
self, session_factory: async_sessionmaker[AsyncSession]
) -> None:
"""Verify session usable after rollback due to error."""
from noteflow.infrastructure.persistence.unit_of_work import SqlAlchemyUnitOfWork
meeting = Meeting.create(title="Rollback Recovery")
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
await uow.meetings.create(meeting)
await uow.rollback()
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
count = await uow.meetings.count_by_state(MeetingState.CREATED)
assert count >= 0, "Should query after rollback"
@pytest.mark.integration
@pytest.mark.asyncio
async def test_operations_succeed_after_pool_dispose(
self, session_factory: async_sessionmaker[AsyncSession]
) -> None:
"""Verify operations succeed after pool disposal (new connection)."""
from noteflow.infrastructure.persistence.unit_of_work import SqlAlchemyUnitOfWork
# First operation
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
await uow.meetings.count_by_state(MeetingState.CREATED)
# Dispose pool (closes all pooled connections)
engine = session_factory.kw["bind"]
await engine.dispose()
# New operation should succeed (creates fresh connection)
async with SqlAlchemyUnitOfWork(session_factory, Path(".")) as uow:
count = await uow.meetings.count_by_state(MeetingState.CREATED)
assert count >= 0, "Should succeed with fresh connection"
# NOTE: Webhook repository tests moved to tests/integration/test_webhook_repository.py
# which includes proper workspace fixture setup for foreign key constraints.