- Deleted .env.example file as it is no longer needed. - Added .gitignore to manage ignored files and directories. - Introduced CLAUDE.md for AI provider integration documentation. - Created dev.sh for development setup and scripts. - Updated Dockerfile and Dockerfile.production for improved build processes. - Added multiple test files and directories for comprehensive testing. - Introduced new utility and service files for enhanced functionality. - Organized codebase with new directories and files for better maintainability.
568 lines
22 KiB
Python
568 lines
22 KiB
Python
"""
|
|
Comprehensive tests for ConsentManager race condition fixes.
|
|
|
|
Tests verify thread safety, proper locking mechanisms, background task management,
|
|
and resource cleanup functionality implemented to prevent race conditions.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
from typing import Any, List
|
|
from unittest.mock import AsyncMock, patch
|
|
|
|
import pytest
|
|
|
|
from core.consent_manager import ConsentManager
|
|
from core.database import DatabaseManager
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class TestConsentManagerRaceConditionFixes:
|
|
"""Test suite for ConsentManager race condition fixes."""
|
|
|
|
@pytest.fixture
|
|
async def mock_db_manager(self) -> AsyncMock:
|
|
"""Create mock database manager with consistent behavior."""
|
|
db_manager = AsyncMock(spec=DatabaseManager)
|
|
|
|
# Mock database operations with consistent return values
|
|
db_manager.execute_query = AsyncMock(return_value=True)
|
|
db_manager.grant_consent = AsyncMock(return_value=True)
|
|
db_manager.revoke_consent = AsyncMock(return_value=True)
|
|
db_manager.check_consent = AsyncMock(return_value=True)
|
|
db_manager.set_global_opt_out = AsyncMock(return_value=True)
|
|
db_manager.get_consented_users = AsyncMock(return_value=[123, 456, 789])
|
|
|
|
return db_manager
|
|
|
|
@pytest.fixture
|
|
async def consent_manager(self, mock_db_manager: AsyncMock) -> ConsentManager:
|
|
"""Create ConsentManager with proper initialization for race condition testing."""
|
|
with (
|
|
patch("core.consent_manager.ConsentTemplates"),
|
|
patch("core.consent_manager.ConsentView"),
|
|
):
|
|
manager = ConsentManager(mock_db_manager)
|
|
|
|
# Mock initialization to avoid actual database calls
|
|
manager._load_consent_cache = AsyncMock()
|
|
manager._load_global_opt_outs = AsyncMock()
|
|
|
|
await manager.initialize()
|
|
return manager
|
|
|
|
# Race Condition Prevention Tests
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_concurrent_consent_granting_no_cache_corruption(
|
|
self, consent_manager: ConsentManager
|
|
):
|
|
"""Test that concurrent consent granting operations don't cause cache corruption."""
|
|
user_ids = [100, 200, 300, 400, 500]
|
|
guild_id = 789012
|
|
|
|
async def grant_consent_task(user_id: int) -> bool:
|
|
return await consent_manager.grant_consent(
|
|
user_id, guild_id, f"User{user_id}"
|
|
)
|
|
|
|
# Execute concurrent consent grants
|
|
results = await asyncio.gather(
|
|
*[grant_consent_task(user_id) for user_id in user_ids],
|
|
return_exceptions=True,
|
|
)
|
|
|
|
# Verify all operations succeeded
|
|
successful_operations = sum(1 for result in results if result is True)
|
|
assert successful_operations == len(user_ids)
|
|
|
|
# Verify cache consistency - all users should have consent granted
|
|
async with consent_manager._cache_lock:
|
|
for user_id in user_ids:
|
|
assert user_id in consent_manager.consent_cache
|
|
assert consent_manager.consent_cache[user_id][guild_id] is True
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_concurrent_consent_revoking_works_properly(
|
|
self, consent_manager: ConsentManager
|
|
):
|
|
"""Test that concurrent consent revoking operations work correctly."""
|
|
user_ids = [101, 202, 303, 404, 505]
|
|
guild_id = 789012
|
|
|
|
# Pre-populate cache with consent granted
|
|
async with consent_manager._cache_lock:
|
|
for user_id in user_ids:
|
|
consent_manager.consent_cache[user_id] = {guild_id: True}
|
|
|
|
async def revoke_consent_task(user_id: int) -> bool:
|
|
return await consent_manager.revoke_consent(user_id, guild_id)
|
|
|
|
# Execute concurrent consent revocations
|
|
results = await asyncio.gather(
|
|
*[revoke_consent_task(user_id) for user_id in user_ids],
|
|
return_exceptions=True,
|
|
)
|
|
|
|
# Verify all operations succeeded
|
|
successful_operations = sum(1 for result in results if result is True)
|
|
assert successful_operations == len(user_ids)
|
|
|
|
# Verify cache consistency - all users should have consent revoked
|
|
async with consent_manager._cache_lock:
|
|
for user_id in user_ids:
|
|
assert user_id in consent_manager.consent_cache
|
|
assert consent_manager.consent_cache[user_id][guild_id] is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_concurrent_cache_access_during_check_consent(
|
|
self, consent_manager: ConsentManager
|
|
):
|
|
"""Test that concurrent cache access during check_consent operations is safe."""
|
|
user_id = 12345
|
|
guild_id = 789012
|
|
concurrent_checks = 20
|
|
|
|
# Pre-populate cache
|
|
async with consent_manager._cache_lock:
|
|
consent_manager.consent_cache[user_id] = {guild_id: True}
|
|
|
|
async def check_consent_task() -> bool:
|
|
return await consent_manager.check_consent(user_id, guild_id)
|
|
|
|
# Execute concurrent consent checks
|
|
results = await asyncio.gather(
|
|
*[check_consent_task() for _ in range(concurrent_checks)],
|
|
return_exceptions=True,
|
|
)
|
|
|
|
# Verify all operations returned consistent results
|
|
assert all(result is True for result in results)
|
|
assert len(results) == concurrent_checks
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_concurrent_global_opt_out_operations(
|
|
self, consent_manager: ConsentManager
|
|
):
|
|
"""Test that concurrent global opt-out operations are thread-safe."""
|
|
user_ids = [111, 222, 333, 444, 555]
|
|
|
|
async def set_global_opt_out_task(user_id: int) -> bool:
|
|
return await consent_manager.set_global_opt_out(user_id, opt_out=True)
|
|
|
|
# Execute concurrent global opt-out operations
|
|
results = await asyncio.gather(
|
|
*[set_global_opt_out_task(user_id) for user_id in user_ids],
|
|
return_exceptions=True,
|
|
)
|
|
|
|
# Verify all operations succeeded
|
|
successful_operations = sum(1 for result in results if result is True)
|
|
assert successful_operations == len(user_ids)
|
|
|
|
# Verify global opt-out set consistency
|
|
async with consent_manager._cache_lock:
|
|
for user_id in user_ids:
|
|
assert user_id in consent_manager.global_opt_outs
|
|
|
|
# Background Task Management Tests
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cleanup_task_created_during_initialization(
|
|
self, mock_db_manager: AsyncMock
|
|
):
|
|
"""Test that cleanup task is properly created during initialization."""
|
|
with (
|
|
patch("core.consent_manager.ConsentTemplates"),
|
|
patch("core.consent_manager.ConsentView"),
|
|
):
|
|
manager = ConsentManager(mock_db_manager)
|
|
|
|
# Mock initialization methods
|
|
manager._load_consent_cache = AsyncMock()
|
|
manager._load_global_opt_outs = AsyncMock()
|
|
|
|
# Initialize and verify cleanup task creation
|
|
await manager.initialize()
|
|
|
|
assert manager._cleanup_task is not None
|
|
assert isinstance(manager._cleanup_task, asyncio.Task)
|
|
assert not manager._cleanup_task.done()
|
|
|
|
# Cleanup
|
|
await manager.cleanup()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cleanup_method_cancels_background_tasks(
|
|
self, consent_manager: ConsentManager
|
|
):
|
|
"""Test that cleanup method properly cancels background tasks."""
|
|
# Verify cleanup task exists
|
|
assert consent_manager._cleanup_task is not None
|
|
cleanup_task = consent_manager._cleanup_task
|
|
|
|
# Call cleanup
|
|
await consent_manager.cleanup()
|
|
|
|
# Verify task was cancelled
|
|
assert cleanup_task.cancelled() or cleanup_task.done()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cleanup_handles_already_cancelled_tasks_gracefully(
|
|
self, consent_manager: ConsentManager
|
|
):
|
|
"""Test that cleanup handles already cancelled tasks without errors."""
|
|
# Cancel the task manually first
|
|
if consent_manager._cleanup_task and not consent_manager._cleanup_task.done():
|
|
consent_manager._cleanup_task.cancel()
|
|
|
|
# Wait for cancellation to complete
|
|
try:
|
|
await consent_manager._cleanup_task
|
|
except asyncio.CancelledError:
|
|
pass # Expected when task is cancelled
|
|
|
|
# Cleanup should handle this gracefully
|
|
await consent_manager.cleanup()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cleanup_handles_task_cancellation_exceptions(
|
|
self, mock_db_manager: AsyncMock
|
|
):
|
|
"""Test that cleanup handles task cancellation exceptions properly."""
|
|
with (
|
|
patch("core.consent_manager.ConsentTemplates"),
|
|
patch("core.consent_manager.ConsentView"),
|
|
):
|
|
manager = ConsentManager(mock_db_manager)
|
|
manager._load_consent_cache = AsyncMock()
|
|
manager._load_global_opt_outs = AsyncMock()
|
|
|
|
await manager.initialize()
|
|
|
|
# Mock the cleanup task to raise an exception when cancelled
|
|
|
|
async def failing_cleanup():
|
|
await asyncio.sleep(100) # Long running task
|
|
|
|
manager._cleanup_task = asyncio.create_task(failing_cleanup())
|
|
|
|
# Cleanup should handle exceptions gracefully
|
|
await manager.cleanup()
|
|
|
|
# Lock-Protected Operations Tests
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cache_updates_are_atomic(self, consent_manager: ConsentManager):
|
|
"""Test that cache updates are atomic and protected by locks."""
|
|
user_id = 98765
|
|
guild_id = 789012
|
|
|
|
# Concurrent operations that modify cache
|
|
async def grant_consent_operation() -> None:
|
|
await consent_manager.grant_consent(user_id, guild_id, "TestUser")
|
|
|
|
async def revoke_consent_operation() -> None:
|
|
await consent_manager.revoke_consent(user_id, guild_id)
|
|
|
|
async def check_consent_operation() -> bool:
|
|
return await consent_manager.check_consent(user_id, guild_id)
|
|
|
|
# Execute mixed operations concurrently
|
|
operations = [
|
|
grant_consent_operation(),
|
|
check_consent_operation(),
|
|
revoke_consent_operation(),
|
|
check_consent_operation(),
|
|
]
|
|
|
|
results = await asyncio.gather(*operations, return_exceptions=True)
|
|
|
|
# Verify no exceptions occurred
|
|
assert all(not isinstance(result, Exception) for result in results)
|
|
|
|
# Verify final cache state is consistent
|
|
async with consent_manager._cache_lock:
|
|
if user_id in consent_manager.consent_cache:
|
|
assert isinstance(
|
|
consent_manager.consent_cache[user_id][guild_id], bool
|
|
)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cache_reads_dont_interfere_with_writes(
|
|
self, consent_manager: ConsentManager
|
|
):
|
|
"""Test that cache reads don't interfere with write operations."""
|
|
user_id = 55555
|
|
guild_id = 789012
|
|
read_operations = 50
|
|
write_operations = 10
|
|
|
|
# Pre-populate cache
|
|
async with consent_manager._cache_lock:
|
|
consent_manager.consent_cache[user_id] = {guild_id: True}
|
|
|
|
async def read_operation() -> bool:
|
|
return await consent_manager.check_consent(user_id, guild_id)
|
|
|
|
async def write_operation() -> bool:
|
|
# Alternate between grant and revoke
|
|
if len(consent_manager.consent_cache.get(user_id, {})) % 2 == 0:
|
|
return await consent_manager.grant_consent(
|
|
user_id, guild_id, "TestUser"
|
|
)
|
|
else:
|
|
return await consent_manager.revoke_consent(user_id, guild_id)
|
|
|
|
# Mix read and write operations
|
|
read_tasks = [read_operation() for _ in range(read_operations)]
|
|
write_tasks = [write_operation() for _ in range(write_operations)]
|
|
|
|
all_tasks = read_tasks + write_tasks
|
|
results = await asyncio.gather(*all_tasks, return_exceptions=True)
|
|
|
|
# Verify no exceptions occurred during concurrent access
|
|
exceptions = [result for result in results if isinstance(result, Exception)]
|
|
assert len(exceptions) == 0
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize("concurrent_operations", [5, 10, 20])
|
|
async def test_performance_doesnt_degrade_significantly_with_locking(
|
|
self, consent_manager: ConsentManager, concurrent_operations: int
|
|
):
|
|
"""Test that performance doesn't degrade significantly with locking mechanisms."""
|
|
user_ids = list(range(1000, 1000 + concurrent_operations))
|
|
guild_id = 789012
|
|
|
|
import time
|
|
|
|
async def consent_operation(user_id: int) -> float:
|
|
start_time = time.perf_counter()
|
|
await consent_manager.grant_consent(user_id, guild_id, f"User{user_id}")
|
|
await consent_manager.check_consent(user_id, guild_id)
|
|
await consent_manager.revoke_consent(user_id, guild_id)
|
|
end_time = time.perf_counter()
|
|
return end_time - start_time
|
|
|
|
# Execute operations and measure performance
|
|
start_time = time.perf_counter()
|
|
operation_times = await asyncio.gather(
|
|
*[consent_operation(user_id) for user_id in user_ids]
|
|
)
|
|
total_time = time.perf_counter() - start_time
|
|
|
|
# Verify reasonable performance (operations should complete quickly)
|
|
avg_operation_time = sum(operation_times) / len(operation_times)
|
|
assert avg_operation_time < 0.1 # Each operation should take less than 100ms
|
|
assert total_time < 5.0 # Total time should be reasonable
|
|
|
|
# Edge Cases Tests
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_behavior_when_lock_held_for_extended_time(
|
|
self, consent_manager: ConsentManager
|
|
):
|
|
"""Test system behavior when lock is held for an extended time."""
|
|
user_id = 77777
|
|
guild_id = 789012
|
|
|
|
async def long_running_operation() -> None:
|
|
async with consent_manager._cache_lock:
|
|
# Simulate long-running operation
|
|
await asyncio.sleep(0.1)
|
|
consent_manager.consent_cache[user_id] = {guild_id: True}
|
|
|
|
async def quick_operation() -> bool:
|
|
return await consent_manager.check_consent(user_id, guild_id)
|
|
|
|
# Start long-running operation
|
|
long_task = asyncio.create_task(long_running_operation())
|
|
|
|
# Wait a bit then try quick operations
|
|
await asyncio.sleep(0.05)
|
|
|
|
# These should wait for the lock to be released
|
|
quick_tasks = [asyncio.create_task(quick_operation()) for _ in range(3)]
|
|
|
|
# Wait for all operations to complete
|
|
await long_task
|
|
results = await asyncio.gather(*quick_tasks)
|
|
|
|
# Verify operations completed successfully
|
|
assert all(isinstance(result, bool) for result in results)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_multiple_concurrent_operations_same_user(
|
|
self, consent_manager: ConsentManager
|
|
):
|
|
"""Test multiple concurrent operations on the same user."""
|
|
user_id = 88888
|
|
guild_id = 789012
|
|
|
|
async def mixed_operations() -> List[Any]:
|
|
operations = [
|
|
consent_manager.grant_consent(user_id, guild_id, "TestUser"),
|
|
consent_manager.check_consent(user_id, guild_id),
|
|
consent_manager.revoke_consent(user_id, guild_id),
|
|
consent_manager.check_consent(user_id, guild_id),
|
|
consent_manager.set_global_opt_out(user_id, opt_out=True),
|
|
consent_manager.check_consent(user_id, guild_id),
|
|
consent_manager.set_global_opt_out(user_id, opt_out=False),
|
|
consent_manager.grant_consent(user_id, guild_id, "TestUser2"),
|
|
]
|
|
|
|
return await asyncio.gather(*operations, return_exceptions=True)
|
|
|
|
# Execute mixed operations multiple times concurrently
|
|
all_results = await asyncio.gather(*[mixed_operations() for _ in range(3)])
|
|
|
|
# Verify no exceptions occurred
|
|
for results in all_results:
|
|
exceptions = [result for result in results if isinstance(result, Exception)]
|
|
assert len(exceptions) == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_mixed_grant_revoke_check_operations_same_user(
|
|
self, consent_manager: ConsentManager
|
|
):
|
|
"""Test mixed grant/revoke/check operations on the same user are handled safely."""
|
|
user_id = 99999
|
|
guild_id = 789012
|
|
operations_count = 30
|
|
|
|
async def random_operation(operation_type: str) -> Any:
|
|
if operation_type == "grant":
|
|
return await consent_manager.grant_consent(
|
|
user_id, guild_id, "TestUser"
|
|
)
|
|
elif operation_type == "revoke":
|
|
return await consent_manager.revoke_consent(user_id, guild_id)
|
|
elif operation_type == "check":
|
|
return await consent_manager.check_consent(user_id, guild_id)
|
|
else:
|
|
return await consent_manager.set_global_opt_out(
|
|
user_id, operation_type == "opt_out"
|
|
)
|
|
|
|
# Create mixed operations
|
|
operation_types = ["grant", "revoke", "check", "opt_in", "opt_out"] * (
|
|
operations_count // 5
|
|
)
|
|
tasks = [
|
|
random_operation(op_type) for op_type in operation_types[:operations_count]
|
|
]
|
|
|
|
# Execute all operations concurrently
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
# Verify no exceptions occurred
|
|
exceptions = [result for result in results if isinstance(result, Exception)]
|
|
assert len(exceptions) == 0
|
|
|
|
# Verify final cache state is consistent (no corruption)
|
|
async with consent_manager._cache_lock:
|
|
if user_id in consent_manager.consent_cache:
|
|
assert isinstance(consent_manager.consent_cache[user_id], dict)
|
|
if guild_id in consent_manager.consent_cache[user_id]:
|
|
assert isinstance(
|
|
consent_manager.consent_cache[user_id][guild_id], bool
|
|
)
|
|
|
|
# Resource Cleanup Tests
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cleanup_with_multiple_consent_managers(
|
|
self, mock_db_manager: AsyncMock
|
|
):
|
|
"""Test resource cleanup when multiple consent managers are created and cleaned up."""
|
|
managers = []
|
|
|
|
with (
|
|
patch("core.consent_manager.ConsentTemplates"),
|
|
patch("core.consent_manager.ConsentView"),
|
|
):
|
|
# Create multiple managers
|
|
for i in range(5):
|
|
manager = ConsentManager(mock_db_manager)
|
|
manager._load_consent_cache = AsyncMock()
|
|
manager._load_global_opt_outs = AsyncMock()
|
|
await manager.initialize()
|
|
managers.append(manager)
|
|
|
|
# Verify all have cleanup tasks
|
|
for manager in managers:
|
|
assert manager._cleanup_task is not None
|
|
|
|
# Cleanup all managers
|
|
await asyncio.gather(*[manager.cleanup() for manager in managers])
|
|
|
|
# Verify all cleanup tasks are done
|
|
for manager in managers:
|
|
assert manager._cleanup_task.done() or manager._cleanup_task.cancelled()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cache_consistency_after_concurrent_modifications(
|
|
self, consent_manager: ConsentManager
|
|
):
|
|
"""Test that cache maintains consistency after numerous concurrent modifications."""
|
|
user_count = 20
|
|
guild_id = 789012
|
|
user_ids = list(range(20000, 20000 + user_count))
|
|
|
|
async def modify_user_consent(user_id: int) -> None:
|
|
# Each user goes through a series of state changes
|
|
await consent_manager.grant_consent(user_id, guild_id, f"User{user_id}")
|
|
await consent_manager.check_consent(user_id, guild_id)
|
|
await consent_manager.revoke_consent(user_id, guild_id)
|
|
await consent_manager.grant_consent(
|
|
user_id, guild_id, f"User{user_id}Final"
|
|
)
|
|
|
|
# Execute modifications for all users concurrently
|
|
await asyncio.gather(*[modify_user_consent(user_id) for user_id in user_ids])
|
|
|
|
# Verify final cache consistency
|
|
async with consent_manager._cache_lock:
|
|
for user_id in user_ids:
|
|
if user_id in consent_manager.consent_cache:
|
|
# Should have consistent guild consent state
|
|
assert guild_id in consent_manager.consent_cache[user_id]
|
|
assert isinstance(
|
|
consent_manager.consent_cache[user_id][guild_id], bool
|
|
)
|
|
# Final operation was grant, so should be True
|
|
assert consent_manager.consent_cache[user_id][guild_id] is True
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_no_deadlocks_under_heavy_concurrent_load(
|
|
self, consent_manager: ConsentManager
|
|
):
|
|
"""Test that the system doesn't deadlock under heavy concurrent load."""
|
|
users_count = 15
|
|
guild_id = 789012
|
|
operations_per_user = 10
|
|
timeout_seconds = 10
|
|
|
|
async def heavy_load_for_user(user_id: int) -> None:
|
|
for _ in range(operations_per_user):
|
|
# Mix of different operations
|
|
await consent_manager.grant_consent(user_id, guild_id, f"User{user_id}")
|
|
await consent_manager.check_consent(user_id, guild_id)
|
|
await consent_manager.revoke_consent(user_id, guild_id)
|
|
await consent_manager.set_global_opt_out(user_id, opt_out=False)
|
|
await consent_manager.check_consent(user_id, guild_id)
|
|
|
|
user_ids = list(range(30000, 30000 + users_count))
|
|
|
|
# Execute heavy load with timeout to detect deadlocks
|
|
try:
|
|
await asyncio.wait_for(
|
|
asyncio.gather(*[heavy_load_for_user(user_id) for user_id in user_ids]),
|
|
timeout=timeout_seconds,
|
|
)
|
|
except asyncio.TimeoutError:
|
|
pytest.fail(
|
|
f"Operations timed out after {timeout_seconds}s - possible deadlock detected"
|
|
)
|