- Deleted .env.example file as it is no longer needed. - Added .gitignore to manage ignored files and directories. - Introduced CLAUDE.md for AI provider integration documentation. - Created dev.sh for development setup and scripts. - Updated Dockerfile and Dockerfile.production for improved build processes. - Added multiple test files and directories for comprehensive testing. - Introduced new utility and service files for enhanced functionality. - Organized codebase with new directories and files for better maintainability.
608 lines
21 KiB
Python
608 lines
21 KiB
Python
"""
|
|
Load Tests and Performance Benchmarks
|
|
|
|
Tests system performance under heavy load, stress testing,
|
|
and performance regression detection.
|
|
"""
|
|
|
|
import asyncio
|
|
import statistics
|
|
import time
|
|
from datetime import datetime
|
|
from typing import Dict
|
|
from unittest.mock import AsyncMock
|
|
|
|
import pytest
|
|
|
|
from tests.conftest import PerformanceBenchmark, TestConfig, TestUtilities
|
|
|
|
|
|
class TestSystemLoad:
|
|
"""Load tests for system-wide performance"""
|
|
|
|
@pytest.fixture
|
|
async def load_test_setup(self, mock_db_manager, mock_ai_manager):
|
|
"""Setup for load testing"""
|
|
# Configure mocks for high-performance testing
|
|
mock_db_manager.execute_query.return_value = True
|
|
mock_ai_manager.generate_text.return_value = TestConfig.MOCK_AI_RESPONSE
|
|
|
|
# Create benchmark instance
|
|
benchmark = PerformanceBenchmark()
|
|
|
|
return {
|
|
"db_manager": mock_db_manager,
|
|
"ai_manager": mock_ai_manager,
|
|
"benchmark": benchmark,
|
|
}
|
|
|
|
@pytest.mark.load
|
|
@pytest.mark.asyncio
|
|
async def test_concurrent_quote_analysis(self, load_test_setup):
|
|
"""Test concurrent quote analysis under load"""
|
|
load_test_setup["db_manager"]
|
|
load_test_setup["ai_manager"]
|
|
load_test_setup["benchmark"]
|
|
|
|
# Mock quote analyzer
|
|
async def analyze_quote(text: str) -> Dict[str, float]:
|
|
# Simulate AI processing time
|
|
await asyncio.sleep(0.1)
|
|
return TestConfig.MOCK_QUOTE_SCORES
|
|
|
|
# Test parameters
|
|
concurrent_requests = 50
|
|
quotes = [
|
|
f"Test quote {i} for load testing" for i in range(concurrent_requests)
|
|
]
|
|
|
|
# Benchmark concurrent analysis
|
|
start_time = time.perf_counter()
|
|
|
|
tasks = [analyze_quote(quote) for quote in quotes]
|
|
results = await asyncio.gather(*tasks)
|
|
|
|
end_time = time.perf_counter()
|
|
total_time = end_time - start_time
|
|
|
|
# Performance assertions
|
|
assert len(results) == concurrent_requests
|
|
assert total_time < 15.0, f"Load test too slow: {total_time:.2f}s"
|
|
|
|
# Calculate throughput
|
|
throughput = concurrent_requests / total_time
|
|
assert throughput > 5, f"Throughput too low: {throughput:.2f} quotes/sec"
|
|
|
|
# Verify all results are valid
|
|
for result in results:
|
|
TestUtilities.assert_quote_scores_valid(result)
|
|
|
|
@pytest.mark.load
|
|
@pytest.mark.asyncio
|
|
async def test_database_connection_pool_stress(self, load_test_setup):
|
|
"""Test database connection pool under stress"""
|
|
db_manager = load_test_setup["db_manager"]
|
|
|
|
# Simulate high database load
|
|
async def db_operation(operation_id: int):
|
|
# Multiple queries per operation
|
|
queries = [
|
|
f"SELECT * FROM quotes WHERE id = {operation_id}",
|
|
f"INSERT INTO test_table VALUES ({operation_id})",
|
|
f"UPDATE quotes SET processed = true WHERE id = {operation_id}",
|
|
]
|
|
|
|
results = []
|
|
for query in queries:
|
|
result = await db_manager.execute_query(query)
|
|
results.append(result)
|
|
|
|
return results
|
|
|
|
# Run many concurrent database operations
|
|
num_operations = 100
|
|
start_time = time.perf_counter()
|
|
|
|
tasks = [db_operation(i) for i in range(num_operations)]
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
end_time = time.perf_counter()
|
|
total_time = end_time - start_time
|
|
|
|
# Check for errors
|
|
errors = [r for r in results if isinstance(r, Exception)]
|
|
assert len(errors) == 0, f"Got {len(errors)} database errors during load test"
|
|
|
|
# Performance check
|
|
ops_per_second = num_operations / total_time
|
|
assert ops_per_second > 10, f"Database ops/sec too low: {ops_per_second:.2f}"
|
|
|
|
@pytest.mark.load
|
|
@pytest.mark.asyncio
|
|
async def test_memory_system_load(self, load_test_setup):
|
|
"""Test memory system under load"""
|
|
# Mock memory manager
|
|
memory_manager = AsyncMock()
|
|
|
|
async def store_memory(user_id: int, content: str):
|
|
# Simulate memory storage time
|
|
await asyncio.sleep(0.05)
|
|
return f"memory_{user_id}_{len(content)}"
|
|
|
|
async def retrieve_memories(user_id: int, query: str):
|
|
# Simulate memory retrieval time
|
|
await asyncio.sleep(0.03)
|
|
return [{"content": f"Retrieved for {user_id}: {query}"}]
|
|
|
|
memory_manager.store_memory = store_memory
|
|
memory_manager.retrieve_memories = retrieve_memories
|
|
|
|
# Test concurrent memory operations
|
|
num_users = 20
|
|
operations_per_user = 5
|
|
|
|
async def user_memory_operations(user_id: int):
|
|
operations = []
|
|
|
|
# Store memories
|
|
for i in range(operations_per_user):
|
|
content = f"Memory content {i} for user {user_id}"
|
|
store_task = memory_manager.store_memory(user_id, content)
|
|
operations.append(store_task)
|
|
|
|
# Retrieve memories
|
|
for i in range(operations_per_user):
|
|
query = f"Query {i} from user {user_id}"
|
|
retrieve_task = memory_manager.retrieve_memories(user_id, query)
|
|
operations.append(retrieve_task)
|
|
|
|
return await asyncio.gather(*operations)
|
|
|
|
# Run concurrent user operations
|
|
start_time = time.perf_counter()
|
|
|
|
user_tasks = [user_memory_operations(user_id) for user_id in range(num_users)]
|
|
user_results = await asyncio.gather(*user_tasks)
|
|
|
|
end_time = time.perf_counter()
|
|
total_time = end_time - start_time
|
|
|
|
# Verify results
|
|
assert len(user_results) == num_users
|
|
|
|
total_operations = num_users * operations_per_user * 2 # Store + retrieve
|
|
ops_per_second = total_operations / total_time
|
|
|
|
assert ops_per_second > 20, f"Memory ops/sec too low: {ops_per_second:.2f}"
|
|
|
|
@pytest.mark.load
|
|
@pytest.mark.asyncio
|
|
async def test_discord_interaction_load(
|
|
self, load_test_setup, mock_discord_interaction
|
|
):
|
|
"""Test Discord interaction handling under load"""
|
|
|
|
# Mock slash command handler
|
|
async def handle_quotes_command(
|
|
interaction, search: str = None, limit: int = 5
|
|
):
|
|
# Simulate command processing
|
|
await asyncio.sleep(0.2)
|
|
|
|
# Mock database query
|
|
quotes = [
|
|
{"id": i, "quote": f"Quote {i}", "score": 7.5} for i in range(limit)
|
|
]
|
|
|
|
# Mock Discord response
|
|
await interaction.response.send_message(f"Found {len(quotes)} quotes")
|
|
|
|
return quotes
|
|
|
|
# Simulate many concurrent slash command uses
|
|
num_concurrent_commands = 30
|
|
|
|
start_time = time.perf_counter()
|
|
|
|
# Create mock interactions
|
|
interactions = [
|
|
mock_discord_interaction for _ in range(num_concurrent_commands)
|
|
]
|
|
|
|
# Handle commands concurrently
|
|
tasks = [
|
|
handle_quotes_command(interaction, "test", 5)
|
|
for interaction in interactions
|
|
]
|
|
results = await asyncio.gather(*tasks)
|
|
|
|
end_time = time.perf_counter()
|
|
total_time = end_time - start_time
|
|
|
|
# Performance checks
|
|
assert len(results) == num_concurrent_commands
|
|
assert total_time < 10.0, f"Command handling too slow: {total_time:.2f}s"
|
|
|
|
commands_per_second = num_concurrent_commands / total_time
|
|
assert (
|
|
commands_per_second > 5
|
|
), f"Command throughput too low: {commands_per_second:.2f} cmd/sec"
|
|
|
|
|
|
class TestStressTests:
|
|
"""Stress tests for system breaking points"""
|
|
|
|
@pytest.mark.load
|
|
@pytest.mark.slow
|
|
@pytest.mark.asyncio
|
|
async def test_extended_operation_stress(self, load_test_setup):
|
|
"""Test system under extended high-load operation"""
|
|
# Run for extended period
|
|
test_duration = 60 # seconds
|
|
operations_per_second = 10
|
|
|
|
async def continuous_operation():
|
|
while True:
|
|
# Simulate quote analysis operation
|
|
await asyncio.sleep(0.1)
|
|
return TestConfig.MOCK_QUOTE_SCORES
|
|
|
|
# Run continuous operations
|
|
start_time = time.perf_counter()
|
|
results = []
|
|
|
|
while time.perf_counter() - start_time < test_duration:
|
|
batch_size = 10
|
|
batch_tasks = [continuous_operation() for _ in range(batch_size)]
|
|
batch_results = await asyncio.gather(*batch_tasks)
|
|
results.extend(batch_results)
|
|
|
|
# Small delay to control rate
|
|
await asyncio.sleep(1.0 / operations_per_second)
|
|
|
|
end_time = time.perf_counter()
|
|
actual_duration = end_time - start_time
|
|
|
|
# Verify system maintained performance
|
|
assert len(results) > 0
|
|
assert actual_duration >= test_duration * 0.9 # Allow some variance
|
|
|
|
operations_completed = len(results)
|
|
actual_ops_per_second = operations_completed / actual_duration
|
|
|
|
# System should maintain reasonable performance throughout
|
|
assert (
|
|
actual_ops_per_second > operations_per_second * 0.5
|
|
), f"Performance degraded too much: {actual_ops_per_second:.2f} ops/sec"
|
|
|
|
@pytest.mark.load
|
|
@pytest.mark.asyncio
|
|
async def test_memory_pressure_handling(self, load_test_setup):
|
|
"""Test system behavior under memory pressure"""
|
|
import os
|
|
|
|
import psutil
|
|
|
|
process = psutil.Process(os.getpid())
|
|
initial_memory = process.memory_info().rss
|
|
|
|
# Create memory pressure with large data structures
|
|
large_datasets = []
|
|
|
|
try:
|
|
for i in range(100):
|
|
# Create large mock datasets
|
|
large_data = {
|
|
"quotes": [f"Large quote {j} " * 100 for j in range(100)],
|
|
"analysis": [TestConfig.MOCK_QUOTE_SCORES for _ in range(100)],
|
|
"metadata": {"timestamp": datetime.utcnow(), "size": 100 * 100},
|
|
}
|
|
large_datasets.append(large_data)
|
|
|
|
# Process data to simulate real work
|
|
await asyncio.sleep(0.01)
|
|
|
|
# Check memory usage periodically
|
|
if i % 10 == 0:
|
|
current_memory = process.memory_info().rss
|
|
memory_increase = current_memory - initial_memory
|
|
|
|
# If memory gets too high, system should handle gracefully
|
|
max_memory_increase = 500 * 1024 * 1024 # 500MB
|
|
if memory_increase > max_memory_increase:
|
|
# System should implement memory management here
|
|
break
|
|
|
|
finally:
|
|
# Cleanup
|
|
large_datasets.clear()
|
|
|
|
final_memory = process.memory_info().rss
|
|
peak_increase = final_memory - initial_memory
|
|
|
|
# Memory should be managed effectively
|
|
reasonable_increase = 200 * 1024 * 1024 # 200MB
|
|
assert (
|
|
peak_increase < reasonable_increase
|
|
), f"Memory usage too high: {peak_increase / 1024 / 1024:.1f}MB"
|
|
|
|
@pytest.mark.load
|
|
@pytest.mark.asyncio
|
|
async def test_error_cascade_resilience(self, load_test_setup):
|
|
"""Test system resilience against cascading failures"""
|
|
load_test_setup["db_manager"]
|
|
load_test_setup["ai_manager"]
|
|
|
|
# Simulate progressive system failures
|
|
failure_rate = 0.0
|
|
|
|
async def failing_operation(operation_id: int):
|
|
nonlocal failure_rate
|
|
|
|
# Increase failure rate over time
|
|
failure_rate = min(0.8, operation_id / 100)
|
|
|
|
import random
|
|
|
|
if random.random() < failure_rate:
|
|
raise Exception(f"Simulated failure {operation_id}")
|
|
|
|
await asyncio.sleep(0.05)
|
|
return {"operation_id": operation_id, "success": True}
|
|
|
|
# Run operations with increasing failure rate
|
|
num_operations = 200
|
|
results = []
|
|
errors = []
|
|
|
|
for i in range(num_operations):
|
|
try:
|
|
result = await failing_operation(i)
|
|
results.append(result)
|
|
except Exception as e:
|
|
errors.append(e)
|
|
# System should continue operating despite errors
|
|
continue
|
|
|
|
# System should handle failures gracefully
|
|
success_rate = len(results) / num_operations
|
|
assert (
|
|
success_rate > 0.2
|
|
), f"Success rate too low under stress: {success_rate:.2%}"
|
|
|
|
# Should have some successful operations even with high failure rate
|
|
assert (
|
|
len(results) > 10
|
|
), "System failed to maintain any operations under stress"
|
|
|
|
|
|
class TestPerformanceRegression:
|
|
"""Tests to detect performance regressions"""
|
|
|
|
@pytest.mark.performance
|
|
@pytest.mark.asyncio
|
|
async def test_quote_analysis_performance_baseline(self, load_test_setup):
|
|
"""Establish and test quote analysis performance baseline"""
|
|
benchmark = load_test_setup["benchmark"]
|
|
|
|
async def quote_analysis_operation(quote: str):
|
|
# Simulate realistic quote analysis time
|
|
await asyncio.sleep(0.15) # 150ms baseline
|
|
return TestConfig.MOCK_QUOTE_SCORES
|
|
|
|
# Benchmark the operation
|
|
result = await benchmark.benchmark_async_function(
|
|
quote_analysis_operation,
|
|
"Test quote for performance baseline",
|
|
iterations=20,
|
|
)
|
|
|
|
# Performance thresholds
|
|
max_average_time = 0.20 # 200ms max average
|
|
max_individual_time = 0.30 # 300ms max individual
|
|
|
|
benchmark.assert_performance_threshold(result, max_average_time)
|
|
assert (
|
|
result["maximum"] < max_individual_time
|
|
), f"Individual operation too slow: {result['maximum']:.4f}s"
|
|
|
|
# Consistency check
|
|
times_std = statistics.stdev(
|
|
[result["minimum"], result["average"], result["maximum"]]
|
|
)
|
|
assert (
|
|
times_std < 0.05
|
|
), f"Performance too inconsistent: {times_std:.4f}s std dev"
|
|
|
|
@pytest.mark.performance
|
|
@pytest.mark.asyncio
|
|
async def test_database_query_performance(self, load_test_setup):
|
|
"""Test database query performance baselines"""
|
|
db_manager = load_test_setup["db_manager"]
|
|
|
|
# Mock realistic database response time
|
|
async def mock_database_query():
|
|
await asyncio.sleep(0.05) # 50ms baseline
|
|
return [{"id": 1, "quote": "Test quote"}]
|
|
|
|
db_manager.execute_query = mock_database_query
|
|
|
|
# Benchmark database operations
|
|
start_time = time.perf_counter()
|
|
|
|
# Sequential queries
|
|
sequential_results = []
|
|
for _ in range(10):
|
|
result = await db_manager.execute_query()
|
|
sequential_results.append(result)
|
|
|
|
sequential_time = time.perf_counter() - start_time
|
|
|
|
# Concurrent queries
|
|
start_time = time.perf_counter()
|
|
|
|
concurrent_tasks = [db_manager.execute_query() for _ in range(10)]
|
|
await asyncio.gather(*concurrent_tasks)
|
|
|
|
concurrent_time = time.perf_counter() - start_time
|
|
|
|
# Concurrent should be significantly faster
|
|
speedup = sequential_time / concurrent_time
|
|
assert speedup > 2, f"Insufficient concurrency benefit: {speedup:.2f}x speedup"
|
|
|
|
# Both should meet performance thresholds
|
|
assert (
|
|
sequential_time < 1.0
|
|
), f"Sequential queries too slow: {sequential_time:.3f}s"
|
|
assert (
|
|
concurrent_time < 0.5
|
|
), f"Concurrent queries too slow: {concurrent_time:.3f}s"
|
|
|
|
@pytest.mark.performance
|
|
@pytest.mark.asyncio
|
|
async def test_memory_system_performance(self, load_test_setup):
|
|
"""Test memory system performance characteristics"""
|
|
|
|
# Mock memory operations with realistic times
|
|
async def store_memory_operation():
|
|
await asyncio.sleep(0.02) # 20ms to store
|
|
return "memory_id"
|
|
|
|
async def retrieve_memory_operation():
|
|
await asyncio.sleep(0.01) # 10ms to retrieve
|
|
return [{"content": "memory"}]
|
|
|
|
# Benchmark memory operations
|
|
store_times = []
|
|
retrieve_times = []
|
|
|
|
for _ in range(50):
|
|
# Store operation
|
|
start = time.perf_counter()
|
|
await store_memory_operation()
|
|
store_times.append(time.perf_counter() - start)
|
|
|
|
# Retrieve operation
|
|
start = time.perf_counter()
|
|
await retrieve_memory_operation()
|
|
retrieve_times.append(time.perf_counter() - start)
|
|
|
|
# Performance assertions
|
|
avg_store_time = statistics.mean(store_times)
|
|
avg_retrieve_time = statistics.mean(retrieve_times)
|
|
|
|
assert avg_store_time < 0.05, f"Memory store too slow: {avg_store_time:.4f}s"
|
|
assert (
|
|
avg_retrieve_time < 0.03
|
|
), f"Memory retrieve too slow: {avg_retrieve_time:.4f}s"
|
|
|
|
# Retrieval should be faster than storage
|
|
assert (
|
|
avg_retrieve_time < avg_store_time
|
|
), "Memory retrieval should be faster than storage"
|
|
|
|
|
|
class TestScalabilityLimits:
|
|
"""Tests to find system scalability limits"""
|
|
|
|
@pytest.mark.load
|
|
@pytest.mark.slow
|
|
@pytest.mark.asyncio
|
|
async def test_maximum_concurrent_users(self, load_test_setup):
|
|
"""Test maximum number of concurrent users the system can handle"""
|
|
# Start with baseline and increase
|
|
max_users_tested = 0
|
|
|
|
for num_users in [10, 50, 100, 200, 500]:
|
|
try:
|
|
|
|
async def simulate_user(user_id: int):
|
|
# Simulate typical user activity
|
|
operations = [
|
|
asyncio.sleep(0.1), # Processing time
|
|
asyncio.sleep(0.05), # Database query
|
|
asyncio.sleep(0.02), # Response time
|
|
]
|
|
await asyncio.gather(*operations)
|
|
return f"user_{user_id}_completed"
|
|
|
|
# Test concurrent users
|
|
start_time = time.perf_counter()
|
|
|
|
user_tasks = [simulate_user(i) for i in range(num_users)]
|
|
await asyncio.wait_for(
|
|
asyncio.gather(*user_tasks), timeout=30.0 # 30 second timeout
|
|
)
|
|
|
|
end_time = time.perf_counter()
|
|
|
|
# Check if performance is still acceptable
|
|
processing_time = end_time - start_time
|
|
users_per_second = num_users / processing_time
|
|
|
|
if users_per_second < 5: # Minimum acceptable throughput
|
|
break
|
|
|
|
max_users_tested = num_users
|
|
|
|
except (asyncio.TimeoutError, Exception):
|
|
# Hit scalability limit
|
|
break
|
|
|
|
# Should handle at least 50 concurrent users
|
|
assert (
|
|
max_users_tested >= 50
|
|
), f"System can only handle {max_users_tested} concurrent users"
|
|
|
|
print(f"System successfully handled {max_users_tested} concurrent users")
|
|
|
|
@pytest.mark.load
|
|
@pytest.mark.asyncio
|
|
async def test_quote_volume_limits(self, load_test_setup):
|
|
"""Test limits on quote processing volume"""
|
|
# Test increasing volumes of quotes
|
|
successful_volumes = []
|
|
|
|
for volume in [100, 500, 1000, 2000, 5000]:
|
|
try:
|
|
quotes = [f"Volume test quote {i}" for i in range(volume)]
|
|
|
|
start_time = time.perf_counter()
|
|
|
|
# Process in batches to avoid overwhelming system
|
|
batch_size = 50
|
|
results = []
|
|
|
|
for i in range(0, len(quotes), batch_size):
|
|
batch = quotes[i : i + batch_size]
|
|
|
|
async def process_quote(quote):
|
|
await asyncio.sleep(0.01) # Minimal processing
|
|
return TestConfig.MOCK_QUOTE_SCORES
|
|
|
|
batch_tasks = [process_quote(quote) for quote in batch]
|
|
batch_results = await asyncio.gather(*batch_tasks)
|
|
results.extend(batch_results)
|
|
|
|
end_time = time.perf_counter()
|
|
processing_time = end_time - start_time
|
|
|
|
# Check if processing completed successfully and reasonably fast
|
|
if len(results) == volume and processing_time < 60: # 1 minute max
|
|
successful_volumes.append(volume)
|
|
else:
|
|
break
|
|
|
|
except Exception:
|
|
break
|
|
|
|
# Should handle at least 1000 quotes
|
|
max_volume = max(successful_volumes) if successful_volumes else 0
|
|
assert max_volume >= 1000, f"System can only handle {max_volume} quotes"
|
|
|
|
print(f"System successfully processed up to {max_volume} quotes")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
pytest.main([__file__, "-v", "-m", "load"])
|