Files
disbord/tests/test_load_performance.py
Travis Vasceannie 3acb779569 chore: remove .env.example and add new files for project structure
- Deleted .env.example file as it is no longer needed.
- Added .gitignore to manage ignored files and directories.
- Introduced CLAUDE.md for AI provider integration documentation.
- Created dev.sh for development setup and scripts.
- Updated Dockerfile and Dockerfile.production for improved build processes.
- Added multiple test files and directories for comprehensive testing.
- Introduced new utility and service files for enhanced functionality.
- Organized codebase with new directories and files for better maintainability.
2025-08-27 23:00:19 -04:00

608 lines
21 KiB
Python

"""
Load Tests and Performance Benchmarks
Tests system performance under heavy load, stress testing,
and performance regression detection.
"""
import asyncio
import statistics
import time
from datetime import datetime
from typing import Dict
from unittest.mock import AsyncMock
import pytest
from tests.conftest import PerformanceBenchmark, TestConfig, TestUtilities
class TestSystemLoad:
"""Load tests for system-wide performance"""
@pytest.fixture
async def load_test_setup(self, mock_db_manager, mock_ai_manager):
"""Setup for load testing"""
# Configure mocks for high-performance testing
mock_db_manager.execute_query.return_value = True
mock_ai_manager.generate_text.return_value = TestConfig.MOCK_AI_RESPONSE
# Create benchmark instance
benchmark = PerformanceBenchmark()
return {
"db_manager": mock_db_manager,
"ai_manager": mock_ai_manager,
"benchmark": benchmark,
}
@pytest.mark.load
@pytest.mark.asyncio
async def test_concurrent_quote_analysis(self, load_test_setup):
"""Test concurrent quote analysis under load"""
load_test_setup["db_manager"]
load_test_setup["ai_manager"]
load_test_setup["benchmark"]
# Mock quote analyzer
async def analyze_quote(text: str) -> Dict[str, float]:
# Simulate AI processing time
await asyncio.sleep(0.1)
return TestConfig.MOCK_QUOTE_SCORES
# Test parameters
concurrent_requests = 50
quotes = [
f"Test quote {i} for load testing" for i in range(concurrent_requests)
]
# Benchmark concurrent analysis
start_time = time.perf_counter()
tasks = [analyze_quote(quote) for quote in quotes]
results = await asyncio.gather(*tasks)
end_time = time.perf_counter()
total_time = end_time - start_time
# Performance assertions
assert len(results) == concurrent_requests
assert total_time < 15.0, f"Load test too slow: {total_time:.2f}s"
# Calculate throughput
throughput = concurrent_requests / total_time
assert throughput > 5, f"Throughput too low: {throughput:.2f} quotes/sec"
# Verify all results are valid
for result in results:
TestUtilities.assert_quote_scores_valid(result)
@pytest.mark.load
@pytest.mark.asyncio
async def test_database_connection_pool_stress(self, load_test_setup):
"""Test database connection pool under stress"""
db_manager = load_test_setup["db_manager"]
# Simulate high database load
async def db_operation(operation_id: int):
# Multiple queries per operation
queries = [
f"SELECT * FROM quotes WHERE id = {operation_id}",
f"INSERT INTO test_table VALUES ({operation_id})",
f"UPDATE quotes SET processed = true WHERE id = {operation_id}",
]
results = []
for query in queries:
result = await db_manager.execute_query(query)
results.append(result)
return results
# Run many concurrent database operations
num_operations = 100
start_time = time.perf_counter()
tasks = [db_operation(i) for i in range(num_operations)]
results = await asyncio.gather(*tasks, return_exceptions=True)
end_time = time.perf_counter()
total_time = end_time - start_time
# Check for errors
errors = [r for r in results if isinstance(r, Exception)]
assert len(errors) == 0, f"Got {len(errors)} database errors during load test"
# Performance check
ops_per_second = num_operations / total_time
assert ops_per_second > 10, f"Database ops/sec too low: {ops_per_second:.2f}"
@pytest.mark.load
@pytest.mark.asyncio
async def test_memory_system_load(self, load_test_setup):
"""Test memory system under load"""
# Mock memory manager
memory_manager = AsyncMock()
async def store_memory(user_id: int, content: str):
# Simulate memory storage time
await asyncio.sleep(0.05)
return f"memory_{user_id}_{len(content)}"
async def retrieve_memories(user_id: int, query: str):
# Simulate memory retrieval time
await asyncio.sleep(0.03)
return [{"content": f"Retrieved for {user_id}: {query}"}]
memory_manager.store_memory = store_memory
memory_manager.retrieve_memories = retrieve_memories
# Test concurrent memory operations
num_users = 20
operations_per_user = 5
async def user_memory_operations(user_id: int):
operations = []
# Store memories
for i in range(operations_per_user):
content = f"Memory content {i} for user {user_id}"
store_task = memory_manager.store_memory(user_id, content)
operations.append(store_task)
# Retrieve memories
for i in range(operations_per_user):
query = f"Query {i} from user {user_id}"
retrieve_task = memory_manager.retrieve_memories(user_id, query)
operations.append(retrieve_task)
return await asyncio.gather(*operations)
# Run concurrent user operations
start_time = time.perf_counter()
user_tasks = [user_memory_operations(user_id) for user_id in range(num_users)]
user_results = await asyncio.gather(*user_tasks)
end_time = time.perf_counter()
total_time = end_time - start_time
# Verify results
assert len(user_results) == num_users
total_operations = num_users * operations_per_user * 2 # Store + retrieve
ops_per_second = total_operations / total_time
assert ops_per_second > 20, f"Memory ops/sec too low: {ops_per_second:.2f}"
@pytest.mark.load
@pytest.mark.asyncio
async def test_discord_interaction_load(
self, load_test_setup, mock_discord_interaction
):
"""Test Discord interaction handling under load"""
# Mock slash command handler
async def handle_quotes_command(
interaction, search: str = None, limit: int = 5
):
# Simulate command processing
await asyncio.sleep(0.2)
# Mock database query
quotes = [
{"id": i, "quote": f"Quote {i}", "score": 7.5} for i in range(limit)
]
# Mock Discord response
await interaction.response.send_message(f"Found {len(quotes)} quotes")
return quotes
# Simulate many concurrent slash command uses
num_concurrent_commands = 30
start_time = time.perf_counter()
# Create mock interactions
interactions = [
mock_discord_interaction for _ in range(num_concurrent_commands)
]
# Handle commands concurrently
tasks = [
handle_quotes_command(interaction, "test", 5)
for interaction in interactions
]
results = await asyncio.gather(*tasks)
end_time = time.perf_counter()
total_time = end_time - start_time
# Performance checks
assert len(results) == num_concurrent_commands
assert total_time < 10.0, f"Command handling too slow: {total_time:.2f}s"
commands_per_second = num_concurrent_commands / total_time
assert (
commands_per_second > 5
), f"Command throughput too low: {commands_per_second:.2f} cmd/sec"
class TestStressTests:
"""Stress tests for system breaking points"""
@pytest.mark.load
@pytest.mark.slow
@pytest.mark.asyncio
async def test_extended_operation_stress(self, load_test_setup):
"""Test system under extended high-load operation"""
# Run for extended period
test_duration = 60 # seconds
operations_per_second = 10
async def continuous_operation():
while True:
# Simulate quote analysis operation
await asyncio.sleep(0.1)
return TestConfig.MOCK_QUOTE_SCORES
# Run continuous operations
start_time = time.perf_counter()
results = []
while time.perf_counter() - start_time < test_duration:
batch_size = 10
batch_tasks = [continuous_operation() for _ in range(batch_size)]
batch_results = await asyncio.gather(*batch_tasks)
results.extend(batch_results)
# Small delay to control rate
await asyncio.sleep(1.0 / operations_per_second)
end_time = time.perf_counter()
actual_duration = end_time - start_time
# Verify system maintained performance
assert len(results) > 0
assert actual_duration >= test_duration * 0.9 # Allow some variance
operations_completed = len(results)
actual_ops_per_second = operations_completed / actual_duration
# System should maintain reasonable performance throughout
assert (
actual_ops_per_second > operations_per_second * 0.5
), f"Performance degraded too much: {actual_ops_per_second:.2f} ops/sec"
@pytest.mark.load
@pytest.mark.asyncio
async def test_memory_pressure_handling(self, load_test_setup):
"""Test system behavior under memory pressure"""
import os
import psutil
process = psutil.Process(os.getpid())
initial_memory = process.memory_info().rss
# Create memory pressure with large data structures
large_datasets = []
try:
for i in range(100):
# Create large mock datasets
large_data = {
"quotes": [f"Large quote {j} " * 100 for j in range(100)],
"analysis": [TestConfig.MOCK_QUOTE_SCORES for _ in range(100)],
"metadata": {"timestamp": datetime.utcnow(), "size": 100 * 100},
}
large_datasets.append(large_data)
# Process data to simulate real work
await asyncio.sleep(0.01)
# Check memory usage periodically
if i % 10 == 0:
current_memory = process.memory_info().rss
memory_increase = current_memory - initial_memory
# If memory gets too high, system should handle gracefully
max_memory_increase = 500 * 1024 * 1024 # 500MB
if memory_increase > max_memory_increase:
# System should implement memory management here
break
finally:
# Cleanup
large_datasets.clear()
final_memory = process.memory_info().rss
peak_increase = final_memory - initial_memory
# Memory should be managed effectively
reasonable_increase = 200 * 1024 * 1024 # 200MB
assert (
peak_increase < reasonable_increase
), f"Memory usage too high: {peak_increase / 1024 / 1024:.1f}MB"
@pytest.mark.load
@pytest.mark.asyncio
async def test_error_cascade_resilience(self, load_test_setup):
"""Test system resilience against cascading failures"""
load_test_setup["db_manager"]
load_test_setup["ai_manager"]
# Simulate progressive system failures
failure_rate = 0.0
async def failing_operation(operation_id: int):
nonlocal failure_rate
# Increase failure rate over time
failure_rate = min(0.8, operation_id / 100)
import random
if random.random() < failure_rate:
raise Exception(f"Simulated failure {operation_id}")
await asyncio.sleep(0.05)
return {"operation_id": operation_id, "success": True}
# Run operations with increasing failure rate
num_operations = 200
results = []
errors = []
for i in range(num_operations):
try:
result = await failing_operation(i)
results.append(result)
except Exception as e:
errors.append(e)
# System should continue operating despite errors
continue
# System should handle failures gracefully
success_rate = len(results) / num_operations
assert (
success_rate > 0.2
), f"Success rate too low under stress: {success_rate:.2%}"
# Should have some successful operations even with high failure rate
assert (
len(results) > 10
), "System failed to maintain any operations under stress"
class TestPerformanceRegression:
"""Tests to detect performance regressions"""
@pytest.mark.performance
@pytest.mark.asyncio
async def test_quote_analysis_performance_baseline(self, load_test_setup):
"""Establish and test quote analysis performance baseline"""
benchmark = load_test_setup["benchmark"]
async def quote_analysis_operation(quote: str):
# Simulate realistic quote analysis time
await asyncio.sleep(0.15) # 150ms baseline
return TestConfig.MOCK_QUOTE_SCORES
# Benchmark the operation
result = await benchmark.benchmark_async_function(
quote_analysis_operation,
"Test quote for performance baseline",
iterations=20,
)
# Performance thresholds
max_average_time = 0.20 # 200ms max average
max_individual_time = 0.30 # 300ms max individual
benchmark.assert_performance_threshold(result, max_average_time)
assert (
result["maximum"] < max_individual_time
), f"Individual operation too slow: {result['maximum']:.4f}s"
# Consistency check
times_std = statistics.stdev(
[result["minimum"], result["average"], result["maximum"]]
)
assert (
times_std < 0.05
), f"Performance too inconsistent: {times_std:.4f}s std dev"
@pytest.mark.performance
@pytest.mark.asyncio
async def test_database_query_performance(self, load_test_setup):
"""Test database query performance baselines"""
db_manager = load_test_setup["db_manager"]
# Mock realistic database response time
async def mock_database_query():
await asyncio.sleep(0.05) # 50ms baseline
return [{"id": 1, "quote": "Test quote"}]
db_manager.execute_query = mock_database_query
# Benchmark database operations
start_time = time.perf_counter()
# Sequential queries
sequential_results = []
for _ in range(10):
result = await db_manager.execute_query()
sequential_results.append(result)
sequential_time = time.perf_counter() - start_time
# Concurrent queries
start_time = time.perf_counter()
concurrent_tasks = [db_manager.execute_query() for _ in range(10)]
await asyncio.gather(*concurrent_tasks)
concurrent_time = time.perf_counter() - start_time
# Concurrent should be significantly faster
speedup = sequential_time / concurrent_time
assert speedup > 2, f"Insufficient concurrency benefit: {speedup:.2f}x speedup"
# Both should meet performance thresholds
assert (
sequential_time < 1.0
), f"Sequential queries too slow: {sequential_time:.3f}s"
assert (
concurrent_time < 0.5
), f"Concurrent queries too slow: {concurrent_time:.3f}s"
@pytest.mark.performance
@pytest.mark.asyncio
async def test_memory_system_performance(self, load_test_setup):
"""Test memory system performance characteristics"""
# Mock memory operations with realistic times
async def store_memory_operation():
await asyncio.sleep(0.02) # 20ms to store
return "memory_id"
async def retrieve_memory_operation():
await asyncio.sleep(0.01) # 10ms to retrieve
return [{"content": "memory"}]
# Benchmark memory operations
store_times = []
retrieve_times = []
for _ in range(50):
# Store operation
start = time.perf_counter()
await store_memory_operation()
store_times.append(time.perf_counter() - start)
# Retrieve operation
start = time.perf_counter()
await retrieve_memory_operation()
retrieve_times.append(time.perf_counter() - start)
# Performance assertions
avg_store_time = statistics.mean(store_times)
avg_retrieve_time = statistics.mean(retrieve_times)
assert avg_store_time < 0.05, f"Memory store too slow: {avg_store_time:.4f}s"
assert (
avg_retrieve_time < 0.03
), f"Memory retrieve too slow: {avg_retrieve_time:.4f}s"
# Retrieval should be faster than storage
assert (
avg_retrieve_time < avg_store_time
), "Memory retrieval should be faster than storage"
class TestScalabilityLimits:
"""Tests to find system scalability limits"""
@pytest.mark.load
@pytest.mark.slow
@pytest.mark.asyncio
async def test_maximum_concurrent_users(self, load_test_setup):
"""Test maximum number of concurrent users the system can handle"""
# Start with baseline and increase
max_users_tested = 0
for num_users in [10, 50, 100, 200, 500]:
try:
async def simulate_user(user_id: int):
# Simulate typical user activity
operations = [
asyncio.sleep(0.1), # Processing time
asyncio.sleep(0.05), # Database query
asyncio.sleep(0.02), # Response time
]
await asyncio.gather(*operations)
return f"user_{user_id}_completed"
# Test concurrent users
start_time = time.perf_counter()
user_tasks = [simulate_user(i) for i in range(num_users)]
await asyncio.wait_for(
asyncio.gather(*user_tasks), timeout=30.0 # 30 second timeout
)
end_time = time.perf_counter()
# Check if performance is still acceptable
processing_time = end_time - start_time
users_per_second = num_users / processing_time
if users_per_second < 5: # Minimum acceptable throughput
break
max_users_tested = num_users
except (asyncio.TimeoutError, Exception):
# Hit scalability limit
break
# Should handle at least 50 concurrent users
assert (
max_users_tested >= 50
), f"System can only handle {max_users_tested} concurrent users"
print(f"System successfully handled {max_users_tested} concurrent users")
@pytest.mark.load
@pytest.mark.asyncio
async def test_quote_volume_limits(self, load_test_setup):
"""Test limits on quote processing volume"""
# Test increasing volumes of quotes
successful_volumes = []
for volume in [100, 500, 1000, 2000, 5000]:
try:
quotes = [f"Volume test quote {i}" for i in range(volume)]
start_time = time.perf_counter()
# Process in batches to avoid overwhelming system
batch_size = 50
results = []
for i in range(0, len(quotes), batch_size):
batch = quotes[i : i + batch_size]
async def process_quote(quote):
await asyncio.sleep(0.01) # Minimal processing
return TestConfig.MOCK_QUOTE_SCORES
batch_tasks = [process_quote(quote) for quote in batch]
batch_results = await asyncio.gather(*batch_tasks)
results.extend(batch_results)
end_time = time.perf_counter()
processing_time = end_time - start_time
# Check if processing completed successfully and reasonably fast
if len(results) == volume and processing_time < 60: # 1 minute max
successful_volumes.append(volume)
else:
break
except Exception:
break
# Should handle at least 1000 quotes
max_volume = max(successful_volumes) if successful_volumes else 0
assert max_volume >= 1000, f"System can only handle {max_volume} quotes"
print(f"System successfully processed up to {max_volume} quotes")
if __name__ == "__main__":
pytest.main([__file__, "-v", "-m", "load"])