"""Edge case tests for the code quality hook system.""" import os import subprocess from unittest.mock import MagicMock, patch from code_quality_guard import ( QualityConfig, analyze_code_quality, detect_internal_duplicates, posttooluse_hook, pretooluse_hook, ) def _perm(response: dict) -> str | None: return response.get("hookSpecificOutput", {}).get("permissionDecision") class TestEdgeCases: """Test edge cases and corner conditions.""" def test_massive_file_content(self): """Test handling of very large files.""" config = QualityConfig() # Create a file with 10,000 lines massive_content = "\n".join(f"# Line {i}" for i in range(10000)) massive_content += "\ndef func1():\n pass\n" hook_data = { "tool_name": "Write", "tool_input": { "file_path": "massive.py", "content": massive_content, }, } with patch("code_quality_guard.analyze_code_quality") as mock_analyze: mock_analyze.return_value = {} result = pretooluse_hook(hook_data, config) assert _perm(result) == "allow" # Should still be called despite large file mock_analyze.assert_called_once() def test_empty_file_content(self): """Test handling of empty files.""" config = QualityConfig() hook_data = { "tool_name": "Write", "tool_input": { "file_path": "empty.py", "content": "", }, } result = pretooluse_hook(hook_data, config) assert _perm(result) == "allow" def test_whitespace_only_content(self): """Test handling of whitespace-only content.""" config = QualityConfig() hook_data = { "tool_name": "Write", "tool_input": { "file_path": "whitespace.py", "content": " \n\t\n \n", }, } result = pretooluse_hook(hook_data, config) assert _perm(result) == "allow" def test_malformed_python_syntax(self): """Test handling of syntax errors in Python code.""" config = QualityConfig() malformed_code = """ def broken_func( print("missing closing paren" if True return """ hook_data = { "tool_name": "Write", "tool_input": { "file_path": "malformed.py", "content": malformed_code, }, } # Should gracefully handle syntax errors result = pretooluse_hook(hook_data, config) decision = _perm(result) assert decision in ["allow", "deny", "ask"] if decision != "allow": text = (result.get("reason") or "") + (result.get("systemMessage") or "") assert "error" in text.lower() def test_unicode_content(self): """Test handling of Unicode characters in code.""" config = QualityConfig() unicode_code = """ # 你好世界 - Hello World in Chinese # مرحبا بالعالم - Hello World in Arabic # Здравствуй, мир - Hello World in Russian def greet_世界(): '''Function with unicode name''' emoji = "👋🌍" return f"Hello {emoji}" """ hook_data = { "tool_name": "Write", "tool_input": { "file_path": "unicode.py", "content": unicode_code, }, } result = pretooluse_hook(hook_data, config) assert _perm(result) in ["allow", "deny", "ask"] def test_concurrent_hook_calls(self): """Test thread safety with concurrent calls.""" config = QualityConfig() hook_data = { "tool_name": "Write", "tool_input": { "file_path": "concurrent.py", "content": "def test(): pass", }, } # Simulate rapid consecutive calls results = [] for _ in range(5): with patch("code_quality_guard.analyze_code_quality") as mock_analyze: mock_analyze.return_value = {} result = pretooluse_hook(hook_data, config) results.append(result) # All should have the same decision decisions = [_perm(r) for r in results] assert all(d == decisions[0] for d in decisions) def test_missing_tool_input_fields(self): """Test handling of missing required fields.""" config = QualityConfig() # Missing file_path hook_data = { "tool_name": "Write", "tool_input": {"content": "def test(): pass"}, } result = pretooluse_hook(hook_data, config) assert _perm(result) == "allow" # Should handle gracefully # Missing content for Write hook_data = { "tool_name": "Write", "tool_input": {"file_path": "test.py"}, } result = pretooluse_hook(hook_data, config) assert _perm(result) == "allow" # Should handle gracefully def test_circular_import_detection(self): """Test detection of circular imports.""" config = QualityConfig() circular_code = """ from module_a import func_a from module_b import func_b def func_c(): return func_a() + func_b() """ hook_data = { "tool_name": "Write", "tool_input": { "file_path": "module_c.py", "content": circular_code, }, } result = pretooluse_hook(hook_data, config) assert _perm(result) in ["allow", "deny", "ask"] def test_binary_file_path(self): """Test handling of binary file paths.""" config = QualityConfig() hook_data = { "tool_name": "Write", "tool_input": { "file_path": "image.png", # Non-Python file "content": "binary content", }, } result = pretooluse_hook(hook_data, config) assert _perm(result) == "allow" # Should skip non-Python files def test_null_and_none_values(self): """Test handling of null/None values.""" config = QualityConfig() # None as content hook_data = { "tool_name": "Write", "tool_input": { "file_path": "test.py", "content": None, }, } result = pretooluse_hook(hook_data, config) assert _perm(result) == "allow" # None as file_path hook_data["tool_input"] = { "file_path": None, "content": "def test(): pass", } result = pretooluse_hook(hook_data, config) assert _perm(result) == "allow" def test_path_traversal_attempts(self): """Test handling of path traversal attempts.""" config = QualityConfig() dangerous_paths = [ "../../../etc/passwd", "..\\..\\..\\windows\\system32\\config.sys", "/etc/shadow", "~/../../root/.ssh/id_rsa", ] for path in dangerous_paths: hook_data = { "tool_name": "Write", "tool_input": { "file_path": path, "content": "malicious content", }, } result = pretooluse_hook(hook_data, config) assert _perm(result) in ["allow", "deny", "ask"] def test_extreme_thresholds(self): """Test with extreme threshold values.""" # Zero thresholds config = QualityConfig( duplicate_threshold=0.0, complexity_threshold=0, ) hook_data = { "tool_name": "Write", "tool_input": { "file_path": "test.py", "content": "def test(): pass", }, } with patch("code_quality_guard.analyze_code_quality") as mock_analyze: mock_analyze.return_value = { "complexity": { "summary": {"average_cyclomatic_complexity": 1}, }, } result = pretooluse_hook(hook_data, config) assert _perm(result) == "deny" # Maximum thresholds config = QualityConfig( duplicate_threshold=1.0, complexity_threshold=999999, enforcement_mode="permissive", # Use permissive mode for high thresholds ) with patch("code_quality_guard.analyze_code_quality") as mock_analyze: mock_analyze.return_value = { "complexity": { "summary": {"average_cyclomatic_complexity": 50}, "distribution": {"Extreme": 10}, }, } result = pretooluse_hook(hook_data, config) assert _perm(result) == "allow" def test_subprocess_timeout(self): """Test handling of subprocess timeouts.""" config = QualityConfig() test_content = "def test(): pass" with patch("subprocess.run") as mock_run: # Simulate timeout mock_run.side_effect = subprocess.TimeoutExpired("cmd", 30) results = analyze_code_quality(test_content, "test.py", config) # Should handle timeout gracefully assert isinstance(results, dict) def test_subprocess_command_failure(self): """Test handling of subprocess command failures.""" config = QualityConfig() test_content = "def test(): pass" with patch("subprocess.run") as mock_run: # Simulate command failure mock_result = MagicMock() mock_result.returncode = 1 mock_result.stdout = "Error: command failed" mock_run.return_value = mock_result results = analyze_code_quality(test_content, "test.py", config) # Should handle failure gracefully assert isinstance(results, dict) def test_json_parsing_errors(self): """Test handling of JSON parsing errors from subprocess.""" config = QualityConfig() test_content = "def test(): pass" with patch("subprocess.run") as mock_run: # Simulate invalid JSON output mock_result = MagicMock() mock_result.returncode = 0 mock_result.stdout = "Not valid JSON {broken:" mock_run.return_value = mock_result results = analyze_code_quality(test_content, "test.py", config) # Should handle JSON errors gracefully assert isinstance(results, dict) def test_file_permission_errors(self): """Test handling of file permission errors.""" config = QualityConfig(state_tracking_enabled=True) hook_data = { "tool_name": "Write", "tool_output": { "file_path": "/root/protected.py", }, } with patch("pathlib.Path.exists", return_value=True): with patch("pathlib.Path.read_text") as mock_read: mock_read.side_effect = PermissionError("Access denied") result = posttooluse_hook(hook_data, config) assert "decision" not in result def test_deeply_nested_code_structure(self): """Test handling of deeply nested code.""" config = QualityConfig() # Create code with 10 levels of nesting nested_code = "def func():\n" indent = " " for i in range(10): nested_code += f"{indent * (i + 1)}if condition_{i}:\n" nested_code += f"{indent * 11}return True\n" hook_data = { "tool_name": "Write", "tool_input": { "file_path": "nested.py", "content": nested_code, }, } result = pretooluse_hook(hook_data, config) assert _perm(result) in ["allow", "deny", "ask"] def test_recursive_function_detection(self): """Test detection of recursive functions.""" config = QualityConfig() recursive_code = """ def factorial(n): if n <= 1: return 1 return n * factorial(n - 1) def infinite_recursion(): return infinite_recursion() """ hook_data = { "tool_name": "Write", "tool_input": { "file_path": "recursive.py", "content": recursive_code, }, } result = pretooluse_hook(hook_data, config) assert _perm(result) in ["allow", "deny", "ask"] def test_multifile_edit_paths(self): """Test MultiEdit with multiple file paths.""" config = QualityConfig() hook_data = { "tool_name": "MultiEdit", "tool_input": { "file_path": "main.py", "edits": [ {"old_string": "old1", "new_string": "def func1(): pass"}, {"old_string": "old2", "new_string": "def func2(): pass"}, {"old_string": "old3", "new_string": "def func3(): pass"}, ], }, } with patch("code_quality_guard.analyze_code_quality") as mock_analyze: mock_analyze.return_value = {} result = pretooluse_hook(hook_data, config) assert _perm(result) == "allow" # Should concatenate all new_strings call_args = mock_analyze.call_args[0][0] assert "func1" in call_args assert "func2" in call_args assert "func3" in call_args def test_environment_variable_injection(self): """Test handling of environment variable injection attempts.""" malicious_envs = { "QUALITY_ENFORCEMENT": "permissive; rm -rf /", "QUALITY_COMPLEXITY_THRESHOLD": "-1; echo hacked", "QUALITY_DUP_THRESHOLD": "0.5 && malicious_command", } for key, value in malicious_envs.items(): os.environ[key] = value try: config = QualityConfig() # Should handle malicious env vars safely assert isinstance(config, QualityConfig) finally: del os.environ[key] def test_memory_efficient_large_duplicates(self): """Test memory efficiency with large duplicate blocks.""" # Create a large function that's duplicated large_func = """ def process_data(data): ''' Large function with many lines ''' result = [] """ + "\n".join( f" # Processing step {i}\n result.append(data[{i}])" for i in range(100) ) # Duplicate the function code_with_duplicates = ( large_func + "\n\n" + large_func.replace("process_data", "process_data2") ) duplicates = detect_internal_duplicates(code_with_duplicates, threshold=0.8) # Should detect duplicates without memory issues assert "duplicates" in duplicates assert len(duplicates["duplicates"]) > 0 def test_special_python_constructs(self): """Test handling of special Python constructs.""" special_code = """ # Walrus operator if (n := len(data)) > 10: print(f"{n} items") # Match statement (Python 3.10+) def handle(value): match value: case 0: return "zero" case _: return "other" # Type hints with unions def process(data: list[str | int | None]) -> dict[str, Any]: return {} # Async context managers async def fetch(): async with aiohttp.ClientSession() as session: pass # Decorators with arguments @lru_cache(maxsize=128) @deprecated(version='1.0') def cached_func(): pass """ config = QualityConfig() hook_data = { "tool_name": "Write", "tool_input": { "file_path": "special.py", "content": special_code, }, } result = pretooluse_hook(hook_data, config) assert _perm(result) in ["allow", "deny", "ask"]