323 lines
11 KiB
Python
323 lines
11 KiB
Python
"""Tests targeting internal helpers for code_quality_guard."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import subprocess
|
|
from collections.abc import Iterable
|
|
from pathlib import Path
|
|
|
|
import code_quality_guard as guard
|
|
import pytest
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
("env_key", "value", "attr", "expected"),
|
|
(
|
|
("QUALITY_DUP_THRESHOLD", "0.9", "duplicate_threshold", 0.9),
|
|
("QUALITY_DUP_ENABLED", "false", "duplicate_enabled", False),
|
|
("QUALITY_COMPLEXITY_THRESHOLD", "7", "complexity_threshold", 7),
|
|
("QUALITY_ENFORCEMENT", "warn", "enforcement_mode", "warn"),
|
|
("QUALITY_STATE_TRACKING", "true", "state_tracking_enabled", True),
|
|
),
|
|
)
|
|
def test_quality_config_from_env_parsing(
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
env_key: str,
|
|
value: str,
|
|
attr: str,
|
|
expected: object,
|
|
) -> None:
|
|
"""Ensure QualityConfig.from_env correctly parses environment overrides."""
|
|
monkeypatch.setenv(env_key, value)
|
|
config = guard.QualityConfig.from_env()
|
|
assert getattr(config, attr) == expected
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
("tool_exists", "install_behavior", "expected"),
|
|
(
|
|
(True, None, True),
|
|
(False, "success", True),
|
|
(False, "failure", False),
|
|
(False, "timeout", False),
|
|
),
|
|
)
|
|
def test_ensure_tool_installed(
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
tool_exists: bool,
|
|
install_behavior: str | None,
|
|
expected: bool,
|
|
) -> None:
|
|
"""_ensure_tool_installed handles existing tools and installs via uv."""
|
|
|
|
def fake_exists(path: Path) -> bool:
|
|
suffix = str(path)
|
|
if suffix.endswith("basedpyright"):
|
|
return tool_exists
|
|
if suffix.endswith("uv"):
|
|
return not tool_exists
|
|
return False
|
|
|
|
monkeypatch.setattr(guard.Path, "exists", fake_exists, raising=False)
|
|
|
|
def fake_run(cmd: Iterable[str], **_: object) -> subprocess.CompletedProcess[bytes]:
|
|
if install_behavior is None:
|
|
raise AssertionError("uv install should not run when tool already exists")
|
|
if install_behavior == "timeout":
|
|
raise subprocess.TimeoutExpired(cmd=list(cmd), timeout=60)
|
|
return subprocess.CompletedProcess(list(cmd), 0 if install_behavior == "success" else 1)
|
|
|
|
monkeypatch.setattr(guard.subprocess, "run", fake_run)
|
|
|
|
assert guard._ensure_tool_installed("basedpyright") is expected
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
("tool_name", "run_payload", "expected_success", "expected_fragment"),
|
|
(
|
|
("basedpyright", {"returncode": 0, "stdout": ""}, True, ""),
|
|
("basedpyright", {"returncode": 1, "stdout": ""}, False, "Type errors found"),
|
|
("sourcery", {"returncode": 0, "stdout": "3 issues detected"}, False, "3 issues detected"),
|
|
("pyrefly", {"returncode": 1, "stdout": "pyrefly issue"}, False, "pyrefly issue"),
|
|
),
|
|
)
|
|
def test_run_type_checker_known_tools(
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
tool_name: str,
|
|
run_payload: dict[str, object],
|
|
expected_success: bool,
|
|
expected_fragment: str,
|
|
) -> None:
|
|
"""_run_type_checker evaluates tool results correctly."""
|
|
|
|
monkeypatch.setattr(guard.Path, "exists", lambda _path: True, raising=False)
|
|
|
|
def fake_run(cmd: Iterable[str], **_: object) -> subprocess.CompletedProcess[str]:
|
|
return subprocess.CompletedProcess(list(cmd), int(run_payload["returncode"]), run_payload.get("stdout", ""), "")
|
|
|
|
monkeypatch.setattr(guard.subprocess, "run", fake_run)
|
|
|
|
success, message = guard._run_type_checker(tool_name, "tmp.py", guard.QualityConfig())
|
|
assert success is expected_success
|
|
if expected_fragment:
|
|
assert expected_fragment in message
|
|
else:
|
|
assert message == ""
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
("exception", "expected_fragment"),
|
|
(
|
|
(subprocess.TimeoutExpired(cmd=["tool"], timeout=30), "timeout"),
|
|
(OSError("boom"), "execution error"),
|
|
),
|
|
)
|
|
def test_run_type_checker_runtime_exceptions(
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
exception: Exception,
|
|
expected_fragment: str,
|
|
) -> None:
|
|
"""Timeouts and OS errors surface as warnings."""
|
|
|
|
monkeypatch.setattr(guard.Path, "exists", lambda _path: True, raising=False)
|
|
def raise_exc(*_args: object, **_kwargs: object) -> None:
|
|
raise exception
|
|
|
|
monkeypatch.setattr(guard.subprocess, "run", raise_exc)
|
|
|
|
success, message = guard._run_type_checker("sourcery", "tmp.py", guard.QualityConfig())
|
|
assert success is True
|
|
assert expected_fragment in message
|
|
|
|
|
|
def test_run_type_checker_tool_missing(monkeypatch: pytest.MonkeyPatch) -> None:
|
|
"""Missing tool reports warning without attempting execution."""
|
|
|
|
monkeypatch.setattr(guard.Path, "exists", lambda _path: False, raising=False)
|
|
monkeypatch.setattr(guard, "_ensure_tool_installed", lambda _name: False)
|
|
|
|
success, message = guard._run_type_checker("pyrefly", "tmp.py", guard.QualityConfig())
|
|
|
|
assert success is True
|
|
assert "not available" in message
|
|
|
|
|
|
def test_run_type_checker_unknown_tool(monkeypatch: pytest.MonkeyPatch) -> None:
|
|
"""Unknown tools return a warning without subprocess execution."""
|
|
|
|
monkeypatch.setattr(guard.Path, "exists", lambda _path: True, raising=False)
|
|
|
|
success, message = guard._run_type_checker("unknown", "tmp.py", guard.QualityConfig())
|
|
assert success is True
|
|
assert "Unknown tool" in message
|
|
|
|
|
|
def test_run_quality_analyses_invokes_cli(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
|
|
"""_run_quality_analyses aggregates CLI outputs and duplicates."""
|
|
|
|
script_path = tmp_path / "module.py"
|
|
script_path.write_text("def sample() -> None:\n pass\n", encoding="utf-8")
|
|
|
|
dummy_config = guard.QualityConfig(
|
|
duplicate_enabled=True,
|
|
complexity_enabled=True,
|
|
modernization_enabled=True,
|
|
sourcery_enabled=False,
|
|
basedpyright_enabled=False,
|
|
pyrefly_enabled=False,
|
|
)
|
|
|
|
monkeypatch.setattr(guard, "get_claude_quality_command", lambda: ["cli"])
|
|
monkeypatch.setattr(
|
|
guard,
|
|
"detect_internal_duplicates",
|
|
lambda *_args, **_kwargs: {
|
|
"duplicates": [
|
|
{
|
|
"similarity": 0.92,
|
|
"description": "duplicate block",
|
|
"locations": [{"name": "sample", "lines": "1-4"}],
|
|
},
|
|
],
|
|
},
|
|
)
|
|
|
|
def fake_run(cmd: Iterable[str], **_: object) -> subprocess.CompletedProcess[str]:
|
|
if "complexity" in cmd:
|
|
payload = json.dumps(
|
|
{
|
|
"summary": {"average_cyclomatic_complexity": 14.0},
|
|
"distribution": {"High": 1},
|
|
},
|
|
)
|
|
elif "modernization" in cmd:
|
|
payload = json.dumps(
|
|
{
|
|
"files": {
|
|
str(script_path): [
|
|
{"issue_type": "missing_return_type"},
|
|
],
|
|
},
|
|
},
|
|
)
|
|
else:
|
|
raise AssertionError(f"Unexpected command: {cmd}")
|
|
return subprocess.CompletedProcess(list(cmd), 0, payload, "")
|
|
|
|
monkeypatch.setattr(guard.subprocess, "run", fake_run)
|
|
|
|
results = guard._run_quality_analyses(
|
|
content=script_path.read_text(encoding="utf-8"),
|
|
tmp_path=str(script_path),
|
|
config=dummy_config,
|
|
enable_type_checks=False,
|
|
)
|
|
|
|
assert "internal_duplicates" in results
|
|
assert "complexity" in results
|
|
assert "modernization" in results
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
("content", "expected"),
|
|
(
|
|
("from typing import Any\n\nAny\n", True),
|
|
("def broken(:\n Any\n", True),
|
|
("def clean() -> None:\n return None\n", False),
|
|
),
|
|
)
|
|
def test_detect_any_usage(content: str, expected: bool) -> None:
|
|
"""_detect_any_usage flags Any usage even on syntax errors."""
|
|
|
|
result = guard._detect_any_usage(content)
|
|
assert (len(result) > 0) is expected
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
("mode", "forced", "expected_permission"),
|
|
(
|
|
("strict", None, "deny"),
|
|
("warn", None, "ask"),
|
|
("permissive", None, "allow"),
|
|
("strict", "allow", "allow"),
|
|
),
|
|
)
|
|
def test_handle_quality_issues_modes(
|
|
mode: str,
|
|
forced: str | None,
|
|
expected_permission: str,
|
|
) -> None:
|
|
"""_handle_quality_issues honors enforcement modes and overrides."""
|
|
|
|
config = guard.QualityConfig(enforcement_mode=mode)
|
|
issues = ["Issue one", "Issue two"]
|
|
|
|
response = guard._handle_quality_issues("example.py", issues, config, forced_permission=forced)
|
|
assert response["permissionDecision"] == expected_permission
|
|
if forced is None:
|
|
assert any(issue in response.get("reason", "") for issue in issues)
|
|
|
|
|
|
def test_perform_quality_check_with_state_tracking(monkeypatch: pytest.MonkeyPatch) -> None:
|
|
"""_perform_quality_check stores state and reports detected issues."""
|
|
|
|
tracked_calls: list[str] = []
|
|
monkeypatch.setattr(guard, "store_pre_state", lambda path, content: tracked_calls.append(path))
|
|
|
|
def fake_analyze(*_args: object, **_kwargs: object) -> guard.AnalysisResults:
|
|
return {
|
|
"modernization": {
|
|
"files": {"example.py": [{"issue_type": "use_enumerate"}]},
|
|
},
|
|
}
|
|
|
|
monkeypatch.setattr(guard, "analyze_code_quality", fake_analyze)
|
|
|
|
config = guard.QualityConfig(state_tracking_enabled=True)
|
|
|
|
has_issues, issues = guard._perform_quality_check("example.py", "def old(): pass", config)
|
|
|
|
assert tracked_calls == ["example.py"]
|
|
assert has_issues is True
|
|
assert any("Modernization" in issue or "modernization" in issue.lower() for issue in issues)
|
|
|
|
|
|
def test_check_cross_file_duplicates_command(monkeypatch: pytest.MonkeyPatch) -> None:
|
|
"""check_cross_file_duplicates uses CLI list from get_claude_quality_command."""
|
|
|
|
captured_cmds: list[list[str]] = []
|
|
monkeypatch.setattr(guard, "get_claude_quality_command", lambda: ["cli", "--flag"])
|
|
|
|
def fake_run(cmd: Iterable[str], **_: object) -> subprocess.CompletedProcess[str]:
|
|
captured_cmds.append(list(cmd))
|
|
payload = json.dumps({"duplicates": ["/repo/example.py"]})
|
|
return subprocess.CompletedProcess(list(cmd), 0, payload, "")
|
|
|
|
monkeypatch.setattr(guard.subprocess, "run", fake_run)
|
|
|
|
issues = guard.check_cross_file_duplicates("/repo/example.py", guard.QualityConfig())
|
|
|
|
assert issues
|
|
assert "duplicates" in captured_cmds[0]
|
|
assert captured_cmds[0][:2] == ["cli", "--flag"]
|
|
|
|
|
|
def test_create_hook_response_includes_reason() -> None:
|
|
"""_create_hook_response embeds permission, reason, and system message."""
|
|
|
|
response = guard._create_hook_response(
|
|
"PreToolUse",
|
|
permission="deny",
|
|
reason="Testing",
|
|
system_message="System",
|
|
additional_context="context",
|
|
decision="block",
|
|
)
|
|
assert response["permissionDecision"] == "deny"
|
|
assert response["reason"] == "Testing"
|
|
assert response["systemMessage"] == "System"
|
|
assert response["hookSpecificOutput"]["additionalContext"] == "context"
|
|
assert response["decision"] == "block"
|
|
|