- Implemented a new function to detect usage of typing.Any in code, which raises warnings during pre-tool use. - Updated the pretooluse_hook to handle quality issues related to typing.Any and integrate it with existing quality checks. - Modified the response structure to include permissionDecision instead of decision for consistency across hooks. - Enhanced test coverage for typing.Any usage detection in both single and multi-edit scenarios. - Adjusted existing tests to reflect changes in response structure and ensure proper validation of quality checks.
501 lines
16 KiB
Python
501 lines
16 KiB
Python
"""Edge case tests for the code quality hook system."""
|
|
|
|
import os
|
|
import subprocess
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
from code_quality_guard import (
|
|
QualityConfig,
|
|
analyze_code_quality,
|
|
detect_internal_duplicates,
|
|
posttooluse_hook,
|
|
pretooluse_hook,
|
|
)
|
|
|
|
|
|
def _perm(response: dict) -> str | None:
|
|
return response.get("hookSpecificOutput", {}).get("permissionDecision")
|
|
|
|
|
|
class TestEdgeCases:
|
|
"""Test edge cases and corner conditions."""
|
|
|
|
def test_massive_file_content(self):
|
|
"""Test handling of very large files."""
|
|
config = QualityConfig()
|
|
# Create a file with 10,000 lines
|
|
massive_content = "\n".join(f"# Line {i}" for i in range(10000))
|
|
massive_content += "\ndef func1():\n pass\n"
|
|
|
|
hook_data = {
|
|
"tool_name": "Write",
|
|
"tool_input": {
|
|
"file_path": "massive.py",
|
|
"content": massive_content,
|
|
},
|
|
}
|
|
|
|
with patch("code_quality_guard.analyze_code_quality") as mock_analyze:
|
|
mock_analyze.return_value = {}
|
|
result = pretooluse_hook(hook_data, config)
|
|
assert _perm(result) == "allow"
|
|
# Should still be called despite large file
|
|
mock_analyze.assert_called_once()
|
|
|
|
def test_empty_file_content(self):
|
|
"""Test handling of empty files."""
|
|
config = QualityConfig()
|
|
hook_data = {
|
|
"tool_name": "Write",
|
|
"tool_input": {
|
|
"file_path": "empty.py",
|
|
"content": "",
|
|
},
|
|
}
|
|
|
|
result = pretooluse_hook(hook_data, config)
|
|
assert _perm(result) == "allow"
|
|
|
|
def test_whitespace_only_content(self):
|
|
"""Test handling of whitespace-only content."""
|
|
config = QualityConfig()
|
|
hook_data = {
|
|
"tool_name": "Write",
|
|
"tool_input": {
|
|
"file_path": "whitespace.py",
|
|
"content": " \n\t\n \n",
|
|
},
|
|
}
|
|
|
|
result = pretooluse_hook(hook_data, config)
|
|
assert _perm(result) == "allow"
|
|
|
|
def test_malformed_python_syntax(self):
|
|
"""Test handling of syntax errors in Python code."""
|
|
config = QualityConfig()
|
|
malformed_code = """
|
|
def broken_func(
|
|
print("missing closing paren"
|
|
if True
|
|
return
|
|
"""
|
|
|
|
hook_data = {
|
|
"tool_name": "Write",
|
|
"tool_input": {
|
|
"file_path": "malformed.py",
|
|
"content": malformed_code,
|
|
},
|
|
}
|
|
|
|
# Should gracefully handle syntax errors
|
|
result = pretooluse_hook(hook_data, config)
|
|
decision = _perm(result)
|
|
assert decision in ["allow", "deny", "ask"]
|
|
if decision != "allow":
|
|
text = (result.get("reason") or "") + (result.get("systemMessage") or "")
|
|
assert "error" in text.lower()
|
|
|
|
def test_unicode_content(self):
|
|
"""Test handling of Unicode characters in code."""
|
|
config = QualityConfig()
|
|
unicode_code = """
|
|
# 你好世界 - Hello World in Chinese
|
|
# مرحبا بالعالم - Hello World in Arabic
|
|
# Здравствуй, мир - Hello World in Russian
|
|
|
|
def greet_世界():
|
|
'''Function with unicode name'''
|
|
emoji = "👋🌍"
|
|
return f"Hello {emoji}"
|
|
"""
|
|
|
|
hook_data = {
|
|
"tool_name": "Write",
|
|
"tool_input": {
|
|
"file_path": "unicode.py",
|
|
"content": unicode_code,
|
|
},
|
|
}
|
|
|
|
result = pretooluse_hook(hook_data, config)
|
|
assert _perm(result) in ["allow", "deny", "ask"]
|
|
|
|
def test_concurrent_hook_calls(self):
|
|
"""Test thread safety with concurrent calls."""
|
|
config = QualityConfig()
|
|
hook_data = {
|
|
"tool_name": "Write",
|
|
"tool_input": {
|
|
"file_path": "concurrent.py",
|
|
"content": "def test(): pass",
|
|
},
|
|
}
|
|
|
|
# Simulate rapid consecutive calls
|
|
results = []
|
|
for _ in range(5):
|
|
with patch("code_quality_guard.analyze_code_quality") as mock_analyze:
|
|
mock_analyze.return_value = {}
|
|
result = pretooluse_hook(hook_data, config)
|
|
results.append(result)
|
|
|
|
# All should have the same decision
|
|
decisions = [_perm(r) for r in results]
|
|
assert all(d == decisions[0] for d in decisions)
|
|
|
|
def test_missing_tool_input_fields(self):
|
|
"""Test handling of missing required fields."""
|
|
config = QualityConfig()
|
|
|
|
# Missing file_path
|
|
hook_data = {
|
|
"tool_name": "Write",
|
|
"tool_input": {"content": "def test(): pass"},
|
|
}
|
|
result = pretooluse_hook(hook_data, config)
|
|
assert _perm(result) == "allow" # Should handle gracefully
|
|
|
|
# Missing content for Write
|
|
hook_data = {
|
|
"tool_name": "Write",
|
|
"tool_input": {"file_path": "test.py"},
|
|
}
|
|
result = pretooluse_hook(hook_data, config)
|
|
assert _perm(result) == "allow" # Should handle gracefully
|
|
|
|
def test_circular_import_detection(self):
|
|
"""Test detection of circular imports."""
|
|
config = QualityConfig()
|
|
circular_code = """
|
|
from module_a import func_a
|
|
from module_b import func_b
|
|
|
|
def func_c():
|
|
return func_a() + func_b()
|
|
"""
|
|
|
|
hook_data = {
|
|
"tool_name": "Write",
|
|
"tool_input": {
|
|
"file_path": "module_c.py",
|
|
"content": circular_code,
|
|
},
|
|
}
|
|
|
|
result = pretooluse_hook(hook_data, config)
|
|
assert _perm(result) in ["allow", "deny", "ask"]
|
|
|
|
def test_binary_file_path(self):
|
|
"""Test handling of binary file paths."""
|
|
config = QualityConfig()
|
|
hook_data = {
|
|
"tool_name": "Write",
|
|
"tool_input": {
|
|
"file_path": "image.png", # Non-Python file
|
|
"content": "binary content",
|
|
},
|
|
}
|
|
|
|
result = pretooluse_hook(hook_data, config)
|
|
assert _perm(result) == "allow" # Should skip non-Python files
|
|
|
|
def test_null_and_none_values(self):
|
|
"""Test handling of null/None values."""
|
|
config = QualityConfig()
|
|
|
|
# None as content
|
|
hook_data = {
|
|
"tool_name": "Write",
|
|
"tool_input": {
|
|
"file_path": "test.py",
|
|
"content": None,
|
|
},
|
|
}
|
|
result = pretooluse_hook(hook_data, config)
|
|
assert _perm(result) == "allow"
|
|
|
|
# None as file_path
|
|
hook_data["tool_input"] = {
|
|
"file_path": None,
|
|
"content": "def test(): pass",
|
|
}
|
|
result = pretooluse_hook(hook_data, config)
|
|
assert _perm(result) == "allow"
|
|
|
|
def test_path_traversal_attempts(self):
|
|
"""Test handling of path traversal attempts."""
|
|
config = QualityConfig()
|
|
dangerous_paths = [
|
|
"../../../etc/passwd",
|
|
"..\\..\\..\\windows\\system32\\config.sys",
|
|
"/etc/shadow",
|
|
"~/../../root/.ssh/id_rsa",
|
|
]
|
|
|
|
for path in dangerous_paths:
|
|
hook_data = {
|
|
"tool_name": "Write",
|
|
"tool_input": {
|
|
"file_path": path,
|
|
"content": "malicious content",
|
|
},
|
|
}
|
|
result = pretooluse_hook(hook_data, config)
|
|
assert _perm(result) in ["allow", "deny", "ask"]
|
|
|
|
def test_extreme_thresholds(self):
|
|
"""Test with extreme threshold values."""
|
|
# Zero thresholds
|
|
config = QualityConfig(
|
|
duplicate_threshold=0.0,
|
|
complexity_threshold=0,
|
|
)
|
|
hook_data = {
|
|
"tool_name": "Write",
|
|
"tool_input": {
|
|
"file_path": "test.py",
|
|
"content": "def test(): pass",
|
|
},
|
|
}
|
|
|
|
with patch("code_quality_guard.analyze_code_quality") as mock_analyze:
|
|
mock_analyze.return_value = {
|
|
"complexity": {
|
|
"summary": {"average_cyclomatic_complexity": 1},
|
|
},
|
|
}
|
|
result = pretooluse_hook(hook_data, config)
|
|
assert _perm(result) == "deny"
|
|
|
|
# Maximum thresholds
|
|
config = QualityConfig(
|
|
duplicate_threshold=1.0,
|
|
complexity_threshold=999999,
|
|
enforcement_mode="permissive", # Use permissive mode for high thresholds
|
|
)
|
|
|
|
with patch("code_quality_guard.analyze_code_quality") as mock_analyze:
|
|
mock_analyze.return_value = {
|
|
"complexity": {
|
|
"summary": {"average_cyclomatic_complexity": 50},
|
|
"distribution": {"Extreme": 10},
|
|
},
|
|
}
|
|
result = pretooluse_hook(hook_data, config)
|
|
assert _perm(result) == "allow"
|
|
|
|
def test_subprocess_timeout(self):
|
|
"""Test handling of subprocess timeouts."""
|
|
config = QualityConfig()
|
|
test_content = "def test(): pass"
|
|
|
|
with patch("subprocess.run") as mock_run:
|
|
# Simulate timeout
|
|
mock_run.side_effect = subprocess.TimeoutExpired("cmd", 30)
|
|
|
|
results = analyze_code_quality(test_content, "test.py", config)
|
|
# Should handle timeout gracefully
|
|
assert isinstance(results, dict)
|
|
|
|
def test_subprocess_command_failure(self):
|
|
"""Test handling of subprocess command failures."""
|
|
config = QualityConfig()
|
|
test_content = "def test(): pass"
|
|
|
|
with patch("subprocess.run") as mock_run:
|
|
# Simulate command failure
|
|
mock_result = MagicMock()
|
|
mock_result.returncode = 1
|
|
mock_result.stdout = "Error: command failed"
|
|
mock_run.return_value = mock_result
|
|
|
|
results = analyze_code_quality(test_content, "test.py", config)
|
|
# Should handle failure gracefully
|
|
assert isinstance(results, dict)
|
|
|
|
def test_json_parsing_errors(self):
|
|
"""Test handling of JSON parsing errors from subprocess."""
|
|
config = QualityConfig()
|
|
test_content = "def test(): pass"
|
|
|
|
with patch("subprocess.run") as mock_run:
|
|
# Simulate invalid JSON output
|
|
mock_result = MagicMock()
|
|
mock_result.returncode = 0
|
|
mock_result.stdout = "Not valid JSON {broken:"
|
|
mock_run.return_value = mock_result
|
|
|
|
results = analyze_code_quality(test_content, "test.py", config)
|
|
# Should handle JSON errors gracefully
|
|
assert isinstance(results, dict)
|
|
|
|
def test_file_permission_errors(self):
|
|
"""Test handling of file permission errors."""
|
|
config = QualityConfig(state_tracking_enabled=True)
|
|
hook_data = {
|
|
"tool_name": "Write",
|
|
"tool_output": {
|
|
"file_path": "/root/protected.py",
|
|
},
|
|
}
|
|
|
|
with patch("pathlib.Path.exists", return_value=True):
|
|
with patch("pathlib.Path.read_text") as mock_read:
|
|
mock_read.side_effect = PermissionError("Access denied")
|
|
|
|
result = posttooluse_hook(hook_data, config)
|
|
assert "decision" not in result
|
|
|
|
def test_deeply_nested_code_structure(self):
|
|
"""Test handling of deeply nested code."""
|
|
config = QualityConfig()
|
|
# Create code with 10 levels of nesting
|
|
nested_code = "def func():\n"
|
|
indent = " "
|
|
for i in range(10):
|
|
nested_code += f"{indent * (i + 1)}if condition_{i}:\n"
|
|
nested_code += f"{indent * 11}return True\n"
|
|
|
|
hook_data = {
|
|
"tool_name": "Write",
|
|
"tool_input": {
|
|
"file_path": "nested.py",
|
|
"content": nested_code,
|
|
},
|
|
}
|
|
|
|
result = pretooluse_hook(hook_data, config)
|
|
assert _perm(result) in ["allow", "deny", "ask"]
|
|
|
|
def test_recursive_function_detection(self):
|
|
"""Test detection of recursive functions."""
|
|
config = QualityConfig()
|
|
recursive_code = """
|
|
def factorial(n):
|
|
if n <= 1:
|
|
return 1
|
|
return n * factorial(n - 1)
|
|
|
|
def infinite_recursion():
|
|
return infinite_recursion()
|
|
"""
|
|
|
|
hook_data = {
|
|
"tool_name": "Write",
|
|
"tool_input": {
|
|
"file_path": "recursive.py",
|
|
"content": recursive_code,
|
|
},
|
|
}
|
|
|
|
result = pretooluse_hook(hook_data, config)
|
|
assert _perm(result) in ["allow", "deny", "ask"]
|
|
|
|
def test_multifile_edit_paths(self):
|
|
"""Test MultiEdit with multiple file paths."""
|
|
config = QualityConfig()
|
|
hook_data = {
|
|
"tool_name": "MultiEdit",
|
|
"tool_input": {
|
|
"file_path": "main.py",
|
|
"edits": [
|
|
{"old_string": "old1", "new_string": "def func1(): pass"},
|
|
{"old_string": "old2", "new_string": "def func2(): pass"},
|
|
{"old_string": "old3", "new_string": "def func3(): pass"},
|
|
],
|
|
},
|
|
}
|
|
|
|
with patch("code_quality_guard.analyze_code_quality") as mock_analyze:
|
|
mock_analyze.return_value = {}
|
|
result = pretooluse_hook(hook_data, config)
|
|
assert _perm(result) == "allow"
|
|
# Should concatenate all new_strings
|
|
call_args = mock_analyze.call_args[0][0]
|
|
assert "func1" in call_args
|
|
assert "func2" in call_args
|
|
assert "func3" in call_args
|
|
|
|
def test_environment_variable_injection(self):
|
|
"""Test handling of environment variable injection attempts."""
|
|
malicious_envs = {
|
|
"QUALITY_ENFORCEMENT": "permissive; rm -rf /",
|
|
"QUALITY_COMPLEXITY_THRESHOLD": "-1; echo hacked",
|
|
"QUALITY_DUP_THRESHOLD": "0.5 && malicious_command",
|
|
}
|
|
|
|
for key, value in malicious_envs.items():
|
|
os.environ[key] = value
|
|
try:
|
|
config = QualityConfig()
|
|
# Should handle malicious env vars safely
|
|
assert isinstance(config, QualityConfig)
|
|
finally:
|
|
del os.environ[key]
|
|
|
|
def test_memory_efficient_large_duplicates(self):
|
|
"""Test memory efficiency with large duplicate blocks."""
|
|
# Create a large function that's duplicated
|
|
large_func = """
|
|
def process_data(data):
|
|
''' Large function with many lines '''
|
|
result = []
|
|
""" + "\n".join(
|
|
f" # Processing step {i}\n result.append(data[{i}])"
|
|
for i in range(100)
|
|
)
|
|
|
|
# Duplicate the function
|
|
code_with_duplicates = (
|
|
large_func + "\n\n" + large_func.replace("process_data", "process_data2")
|
|
)
|
|
|
|
duplicates = detect_internal_duplicates(code_with_duplicates, threshold=0.8)
|
|
# Should detect duplicates without memory issues
|
|
assert "duplicates" in duplicates
|
|
assert len(duplicates["duplicates"]) > 0
|
|
|
|
def test_special_python_constructs(self):
|
|
"""Test handling of special Python constructs."""
|
|
special_code = """
|
|
# Walrus operator
|
|
if (n := len(data)) > 10:
|
|
print(f"{n} items")
|
|
|
|
# Match statement (Python 3.10+)
|
|
def handle(value):
|
|
match value:
|
|
case 0:
|
|
return "zero"
|
|
case _:
|
|
return "other"
|
|
|
|
# Type hints with unions
|
|
def process(data: list[str | int | None]) -> dict[str, Any]:
|
|
return {}
|
|
|
|
# Async context managers
|
|
async def fetch():
|
|
async with aiohttp.ClientSession() as session:
|
|
pass
|
|
|
|
# Decorators with arguments
|
|
@lru_cache(maxsize=128)
|
|
@deprecated(version='1.0')
|
|
def cached_func():
|
|
pass
|
|
"""
|
|
|
|
config = QualityConfig()
|
|
hook_data = {
|
|
"tool_name": "Write",
|
|
"tool_input": {
|
|
"file_path": "special.py",
|
|
"content": special_code,
|
|
},
|
|
}
|
|
|
|
result = pretooluse_hook(hook_data, config)
|
|
assert _perm(result) in ["allow", "deny", "ask"]
|