Files
disbord/tests/test_consent_manager_fixes.py
Travis Vasceannie 3acb779569 chore: remove .env.example and add new files for project structure
- Deleted .env.example file as it is no longer needed.
- Added .gitignore to manage ignored files and directories.
- Introduced CLAUDE.md for AI provider integration documentation.
- Created dev.sh for development setup and scripts.
- Updated Dockerfile and Dockerfile.production for improved build processes.
- Added multiple test files and directories for comprehensive testing.
- Introduced new utility and service files for enhanced functionality.
- Organized codebase with new directories and files for better maintainability.
2025-08-27 23:00:19 -04:00

568 lines
22 KiB
Python

"""
Comprehensive tests for ConsentManager race condition fixes.
Tests verify thread safety, proper locking mechanisms, background task management,
and resource cleanup functionality implemented to prevent race conditions.
"""
import asyncio
import logging
from typing import Any, List
from unittest.mock import AsyncMock, patch
import pytest
from core.consent_manager import ConsentManager
from core.database import DatabaseManager
logger = logging.getLogger(__name__)
class TestConsentManagerRaceConditionFixes:
"""Test suite for ConsentManager race condition fixes."""
@pytest.fixture
async def mock_db_manager(self) -> AsyncMock:
"""Create mock database manager with consistent behavior."""
db_manager = AsyncMock(spec=DatabaseManager)
# Mock database operations with consistent return values
db_manager.execute_query = AsyncMock(return_value=True)
db_manager.grant_consent = AsyncMock(return_value=True)
db_manager.revoke_consent = AsyncMock(return_value=True)
db_manager.check_consent = AsyncMock(return_value=True)
db_manager.set_global_opt_out = AsyncMock(return_value=True)
db_manager.get_consented_users = AsyncMock(return_value=[123, 456, 789])
return db_manager
@pytest.fixture
async def consent_manager(self, mock_db_manager: AsyncMock) -> ConsentManager:
"""Create ConsentManager with proper initialization for race condition testing."""
with (
patch("core.consent_manager.ConsentTemplates"),
patch("core.consent_manager.ConsentView"),
):
manager = ConsentManager(mock_db_manager)
# Mock initialization to avoid actual database calls
manager._load_consent_cache = AsyncMock()
manager._load_global_opt_outs = AsyncMock()
await manager.initialize()
return manager
# Race Condition Prevention Tests
@pytest.mark.asyncio
async def test_concurrent_consent_granting_no_cache_corruption(
self, consent_manager: ConsentManager
):
"""Test that concurrent consent granting operations don't cause cache corruption."""
user_ids = [100, 200, 300, 400, 500]
guild_id = 789012
async def grant_consent_task(user_id: int) -> bool:
return await consent_manager.grant_consent(
user_id, guild_id, f"User{user_id}"
)
# Execute concurrent consent grants
results = await asyncio.gather(
*[grant_consent_task(user_id) for user_id in user_ids],
return_exceptions=True,
)
# Verify all operations succeeded
successful_operations = sum(1 for result in results if result is True)
assert successful_operations == len(user_ids)
# Verify cache consistency - all users should have consent granted
async with consent_manager._cache_lock:
for user_id in user_ids:
assert user_id in consent_manager.consent_cache
assert consent_manager.consent_cache[user_id][guild_id] is True
@pytest.mark.asyncio
async def test_concurrent_consent_revoking_works_properly(
self, consent_manager: ConsentManager
):
"""Test that concurrent consent revoking operations work correctly."""
user_ids = [101, 202, 303, 404, 505]
guild_id = 789012
# Pre-populate cache with consent granted
async with consent_manager._cache_lock:
for user_id in user_ids:
consent_manager.consent_cache[user_id] = {guild_id: True}
async def revoke_consent_task(user_id: int) -> bool:
return await consent_manager.revoke_consent(user_id, guild_id)
# Execute concurrent consent revocations
results = await asyncio.gather(
*[revoke_consent_task(user_id) for user_id in user_ids],
return_exceptions=True,
)
# Verify all operations succeeded
successful_operations = sum(1 for result in results if result is True)
assert successful_operations == len(user_ids)
# Verify cache consistency - all users should have consent revoked
async with consent_manager._cache_lock:
for user_id in user_ids:
assert user_id in consent_manager.consent_cache
assert consent_manager.consent_cache[user_id][guild_id] is False
@pytest.mark.asyncio
async def test_concurrent_cache_access_during_check_consent(
self, consent_manager: ConsentManager
):
"""Test that concurrent cache access during check_consent operations is safe."""
user_id = 12345
guild_id = 789012
concurrent_checks = 20
# Pre-populate cache
async with consent_manager._cache_lock:
consent_manager.consent_cache[user_id] = {guild_id: True}
async def check_consent_task() -> bool:
return await consent_manager.check_consent(user_id, guild_id)
# Execute concurrent consent checks
results = await asyncio.gather(
*[check_consent_task() for _ in range(concurrent_checks)],
return_exceptions=True,
)
# Verify all operations returned consistent results
assert all(result is True for result in results)
assert len(results) == concurrent_checks
@pytest.mark.asyncio
async def test_concurrent_global_opt_out_operations(
self, consent_manager: ConsentManager
):
"""Test that concurrent global opt-out operations are thread-safe."""
user_ids = [111, 222, 333, 444, 555]
async def set_global_opt_out_task(user_id: int) -> bool:
return await consent_manager.set_global_opt_out(user_id, opt_out=True)
# Execute concurrent global opt-out operations
results = await asyncio.gather(
*[set_global_opt_out_task(user_id) for user_id in user_ids],
return_exceptions=True,
)
# Verify all operations succeeded
successful_operations = sum(1 for result in results if result is True)
assert successful_operations == len(user_ids)
# Verify global opt-out set consistency
async with consent_manager._cache_lock:
for user_id in user_ids:
assert user_id in consent_manager.global_opt_outs
# Background Task Management Tests
@pytest.mark.asyncio
async def test_cleanup_task_created_during_initialization(
self, mock_db_manager: AsyncMock
):
"""Test that cleanup task is properly created during initialization."""
with (
patch("core.consent_manager.ConsentTemplates"),
patch("core.consent_manager.ConsentView"),
):
manager = ConsentManager(mock_db_manager)
# Mock initialization methods
manager._load_consent_cache = AsyncMock()
manager._load_global_opt_outs = AsyncMock()
# Initialize and verify cleanup task creation
await manager.initialize()
assert manager._cleanup_task is not None
assert isinstance(manager._cleanup_task, asyncio.Task)
assert not manager._cleanup_task.done()
# Cleanup
await manager.cleanup()
@pytest.mark.asyncio
async def test_cleanup_method_cancels_background_tasks(
self, consent_manager: ConsentManager
):
"""Test that cleanup method properly cancels background tasks."""
# Verify cleanup task exists
assert consent_manager._cleanup_task is not None
cleanup_task = consent_manager._cleanup_task
# Call cleanup
await consent_manager.cleanup()
# Verify task was cancelled
assert cleanup_task.cancelled() or cleanup_task.done()
@pytest.mark.asyncio
async def test_cleanup_handles_already_cancelled_tasks_gracefully(
self, consent_manager: ConsentManager
):
"""Test that cleanup handles already cancelled tasks without errors."""
# Cancel the task manually first
if consent_manager._cleanup_task and not consent_manager._cleanup_task.done():
consent_manager._cleanup_task.cancel()
# Wait for cancellation to complete
try:
await consent_manager._cleanup_task
except asyncio.CancelledError:
pass # Expected when task is cancelled
# Cleanup should handle this gracefully
await consent_manager.cleanup()
@pytest.mark.asyncio
async def test_cleanup_handles_task_cancellation_exceptions(
self, mock_db_manager: AsyncMock
):
"""Test that cleanup handles task cancellation exceptions properly."""
with (
patch("core.consent_manager.ConsentTemplates"),
patch("core.consent_manager.ConsentView"),
):
manager = ConsentManager(mock_db_manager)
manager._load_consent_cache = AsyncMock()
manager._load_global_opt_outs = AsyncMock()
await manager.initialize()
# Mock the cleanup task to raise an exception when cancelled
async def failing_cleanup():
await asyncio.sleep(100) # Long running task
manager._cleanup_task = asyncio.create_task(failing_cleanup())
# Cleanup should handle exceptions gracefully
await manager.cleanup()
# Lock-Protected Operations Tests
@pytest.mark.asyncio
async def test_cache_updates_are_atomic(self, consent_manager: ConsentManager):
"""Test that cache updates are atomic and protected by locks."""
user_id = 98765
guild_id = 789012
# Concurrent operations that modify cache
async def grant_consent_operation() -> None:
await consent_manager.grant_consent(user_id, guild_id, "TestUser")
async def revoke_consent_operation() -> None:
await consent_manager.revoke_consent(user_id, guild_id)
async def check_consent_operation() -> bool:
return await consent_manager.check_consent(user_id, guild_id)
# Execute mixed operations concurrently
operations = [
grant_consent_operation(),
check_consent_operation(),
revoke_consent_operation(),
check_consent_operation(),
]
results = await asyncio.gather(*operations, return_exceptions=True)
# Verify no exceptions occurred
assert all(not isinstance(result, Exception) for result in results)
# Verify final cache state is consistent
async with consent_manager._cache_lock:
if user_id in consent_manager.consent_cache:
assert isinstance(
consent_manager.consent_cache[user_id][guild_id], bool
)
@pytest.mark.asyncio
async def test_cache_reads_dont_interfere_with_writes(
self, consent_manager: ConsentManager
):
"""Test that cache reads don't interfere with write operations."""
user_id = 55555
guild_id = 789012
read_operations = 50
write_operations = 10
# Pre-populate cache
async with consent_manager._cache_lock:
consent_manager.consent_cache[user_id] = {guild_id: True}
async def read_operation() -> bool:
return await consent_manager.check_consent(user_id, guild_id)
async def write_operation() -> bool:
# Alternate between grant and revoke
if len(consent_manager.consent_cache.get(user_id, {})) % 2 == 0:
return await consent_manager.grant_consent(
user_id, guild_id, "TestUser"
)
else:
return await consent_manager.revoke_consent(user_id, guild_id)
# Mix read and write operations
read_tasks = [read_operation() for _ in range(read_operations)]
write_tasks = [write_operation() for _ in range(write_operations)]
all_tasks = read_tasks + write_tasks
results = await asyncio.gather(*all_tasks, return_exceptions=True)
# Verify no exceptions occurred during concurrent access
exceptions = [result for result in results if isinstance(result, Exception)]
assert len(exceptions) == 0
@pytest.mark.asyncio
@pytest.mark.parametrize("concurrent_operations", [5, 10, 20])
async def test_performance_doesnt_degrade_significantly_with_locking(
self, consent_manager: ConsentManager, concurrent_operations: int
):
"""Test that performance doesn't degrade significantly with locking mechanisms."""
user_ids = list(range(1000, 1000 + concurrent_operations))
guild_id = 789012
import time
async def consent_operation(user_id: int) -> float:
start_time = time.perf_counter()
await consent_manager.grant_consent(user_id, guild_id, f"User{user_id}")
await consent_manager.check_consent(user_id, guild_id)
await consent_manager.revoke_consent(user_id, guild_id)
end_time = time.perf_counter()
return end_time - start_time
# Execute operations and measure performance
start_time = time.perf_counter()
operation_times = await asyncio.gather(
*[consent_operation(user_id) for user_id in user_ids]
)
total_time = time.perf_counter() - start_time
# Verify reasonable performance (operations should complete quickly)
avg_operation_time = sum(operation_times) / len(operation_times)
assert avg_operation_time < 0.1 # Each operation should take less than 100ms
assert total_time < 5.0 # Total time should be reasonable
# Edge Cases Tests
@pytest.mark.asyncio
async def test_behavior_when_lock_held_for_extended_time(
self, consent_manager: ConsentManager
):
"""Test system behavior when lock is held for an extended time."""
user_id = 77777
guild_id = 789012
async def long_running_operation() -> None:
async with consent_manager._cache_lock:
# Simulate long-running operation
await asyncio.sleep(0.1)
consent_manager.consent_cache[user_id] = {guild_id: True}
async def quick_operation() -> bool:
return await consent_manager.check_consent(user_id, guild_id)
# Start long-running operation
long_task = asyncio.create_task(long_running_operation())
# Wait a bit then try quick operations
await asyncio.sleep(0.05)
# These should wait for the lock to be released
quick_tasks = [asyncio.create_task(quick_operation()) for _ in range(3)]
# Wait for all operations to complete
await long_task
results = await asyncio.gather(*quick_tasks)
# Verify operations completed successfully
assert all(isinstance(result, bool) for result in results)
@pytest.mark.asyncio
async def test_multiple_concurrent_operations_same_user(
self, consent_manager: ConsentManager
):
"""Test multiple concurrent operations on the same user."""
user_id = 88888
guild_id = 789012
async def mixed_operations() -> List[Any]:
operations = [
consent_manager.grant_consent(user_id, guild_id, "TestUser"),
consent_manager.check_consent(user_id, guild_id),
consent_manager.revoke_consent(user_id, guild_id),
consent_manager.check_consent(user_id, guild_id),
consent_manager.set_global_opt_out(user_id, opt_out=True),
consent_manager.check_consent(user_id, guild_id),
consent_manager.set_global_opt_out(user_id, opt_out=False),
consent_manager.grant_consent(user_id, guild_id, "TestUser2"),
]
return await asyncio.gather(*operations, return_exceptions=True)
# Execute mixed operations multiple times concurrently
all_results = await asyncio.gather(*[mixed_operations() for _ in range(3)])
# Verify no exceptions occurred
for results in all_results:
exceptions = [result for result in results if isinstance(result, Exception)]
assert len(exceptions) == 0
@pytest.mark.asyncio
async def test_mixed_grant_revoke_check_operations_same_user(
self, consent_manager: ConsentManager
):
"""Test mixed grant/revoke/check operations on the same user are handled safely."""
user_id = 99999
guild_id = 789012
operations_count = 30
async def random_operation(operation_type: str) -> Any:
if operation_type == "grant":
return await consent_manager.grant_consent(
user_id, guild_id, "TestUser"
)
elif operation_type == "revoke":
return await consent_manager.revoke_consent(user_id, guild_id)
elif operation_type == "check":
return await consent_manager.check_consent(user_id, guild_id)
else:
return await consent_manager.set_global_opt_out(
user_id, operation_type == "opt_out"
)
# Create mixed operations
operation_types = ["grant", "revoke", "check", "opt_in", "opt_out"] * (
operations_count // 5
)
tasks = [
random_operation(op_type) for op_type in operation_types[:operations_count]
]
# Execute all operations concurrently
results = await asyncio.gather(*tasks, return_exceptions=True)
# Verify no exceptions occurred
exceptions = [result for result in results if isinstance(result, Exception)]
assert len(exceptions) == 0
# Verify final cache state is consistent (no corruption)
async with consent_manager._cache_lock:
if user_id in consent_manager.consent_cache:
assert isinstance(consent_manager.consent_cache[user_id], dict)
if guild_id in consent_manager.consent_cache[user_id]:
assert isinstance(
consent_manager.consent_cache[user_id][guild_id], bool
)
# Resource Cleanup Tests
@pytest.mark.asyncio
async def test_cleanup_with_multiple_consent_managers(
self, mock_db_manager: AsyncMock
):
"""Test resource cleanup when multiple consent managers are created and cleaned up."""
managers = []
with (
patch("core.consent_manager.ConsentTemplates"),
patch("core.consent_manager.ConsentView"),
):
# Create multiple managers
for i in range(5):
manager = ConsentManager(mock_db_manager)
manager._load_consent_cache = AsyncMock()
manager._load_global_opt_outs = AsyncMock()
await manager.initialize()
managers.append(manager)
# Verify all have cleanup tasks
for manager in managers:
assert manager._cleanup_task is not None
# Cleanup all managers
await asyncio.gather(*[manager.cleanup() for manager in managers])
# Verify all cleanup tasks are done
for manager in managers:
assert manager._cleanup_task.done() or manager._cleanup_task.cancelled()
@pytest.mark.asyncio
async def test_cache_consistency_after_concurrent_modifications(
self, consent_manager: ConsentManager
):
"""Test that cache maintains consistency after numerous concurrent modifications."""
user_count = 20
guild_id = 789012
user_ids = list(range(20000, 20000 + user_count))
async def modify_user_consent(user_id: int) -> None:
# Each user goes through a series of state changes
await consent_manager.grant_consent(user_id, guild_id, f"User{user_id}")
await consent_manager.check_consent(user_id, guild_id)
await consent_manager.revoke_consent(user_id, guild_id)
await consent_manager.grant_consent(
user_id, guild_id, f"User{user_id}Final"
)
# Execute modifications for all users concurrently
await asyncio.gather(*[modify_user_consent(user_id) for user_id in user_ids])
# Verify final cache consistency
async with consent_manager._cache_lock:
for user_id in user_ids:
if user_id in consent_manager.consent_cache:
# Should have consistent guild consent state
assert guild_id in consent_manager.consent_cache[user_id]
assert isinstance(
consent_manager.consent_cache[user_id][guild_id], bool
)
# Final operation was grant, so should be True
assert consent_manager.consent_cache[user_id][guild_id] is True
@pytest.mark.asyncio
async def test_no_deadlocks_under_heavy_concurrent_load(
self, consent_manager: ConsentManager
):
"""Test that the system doesn't deadlock under heavy concurrent load."""
users_count = 15
guild_id = 789012
operations_per_user = 10
timeout_seconds = 10
async def heavy_load_for_user(user_id: int) -> None:
for _ in range(operations_per_user):
# Mix of different operations
await consent_manager.grant_consent(user_id, guild_id, f"User{user_id}")
await consent_manager.check_consent(user_id, guild_id)
await consent_manager.revoke_consent(user_id, guild_id)
await consent_manager.set_global_opt_out(user_id, opt_out=False)
await consent_manager.check_consent(user_id, guild_id)
user_ids = list(range(30000, 30000 + users_count))
# Execute heavy load with timeout to detect deadlocks
try:
await asyncio.wait_for(
asyncio.gather(*[heavy_load_for_user(user_id) for user_id in user_ids]),
timeout=timeout_seconds,
)
except asyncio.TimeoutError:
pytest.fail(
f"Operations timed out after {timeout_seconds}s - possible deadlock detected"
)