Restore hooks/analyzers and enhance quality_guard with comprehensive checks
This commit is contained in:
0
hooks/analyzers/py.typed
Normal file
0
hooks/analyzers/py.typed
Normal file
@@ -33,7 +33,7 @@ class TypeInferenceHelper:
|
||||
r"\.items\(\)": "ItemsView",
|
||||
r"\.keys\(\)": "KeysView",
|
||||
r"\.values\(\)": "ValuesView",
|
||||
r"json\.loads\(": "dict[str, Any]", # Still Any but documented
|
||||
r"json\.loads\(": "dict[str, object]",
|
||||
r"json\.dumps\(": "str",
|
||||
r"Path\(": "Path",
|
||||
r"open\(": "TextIOWrapper | BufferedReader",
|
||||
@@ -80,10 +80,10 @@ class TypeInferenceHelper:
|
||||
value_node: ast.expr = assignments[0]
|
||||
suggested_type: str = TypeInferenceHelper._infer_from_node(value_node)
|
||||
|
||||
if suggested_type and suggested_type != "Any":
|
||||
if suggested_type and suggested_type != "object":
|
||||
return TypeSuggestion(
|
||||
element_name=variable_name,
|
||||
current_type="Any",
|
||||
current_type="object",
|
||||
suggested_type=suggested_type,
|
||||
confidence=0.8,
|
||||
reason=f"Inferred from assignment: {ast.unparse(value_node)[:50]}",
|
||||
@@ -105,28 +105,28 @@ class TypeInferenceHelper:
|
||||
"str": "str",
|
||||
"bytes": "bytes",
|
||||
}
|
||||
return type_map.get(value_type, "Any")
|
||||
return type_map.get(value_type, "object")
|
||||
|
||||
if isinstance(node, ast.List):
|
||||
if not node.elts:
|
||||
return "list[Any]"
|
||||
return "list[object]"
|
||||
# Try to infer element type from first element
|
||||
first_type: str = TypeInferenceHelper._infer_from_node(node.elts[0])
|
||||
return f"list[{first_type}]"
|
||||
|
||||
if isinstance(node, ast.Dict):
|
||||
if not node.keys or not node.values:
|
||||
return "dict[Any, Any]"
|
||||
return "dict[object, object]"
|
||||
first_key: ast.expr | None = node.keys[0]
|
||||
if first_key is None:
|
||||
return "dict[Any, Any]"
|
||||
return "dict[object, object]"
|
||||
key_type: str = TypeInferenceHelper._infer_from_node(first_key)
|
||||
dict_value_type: str = TypeInferenceHelper._infer_from_node(node.values[0])
|
||||
return f"dict[{key_type}, {dict_value_type}]"
|
||||
|
||||
if isinstance(node, ast.Set):
|
||||
if not node.elts:
|
||||
return "set[Any]"
|
||||
return "set[object]"
|
||||
element_type: str = TypeInferenceHelper._infer_from_node(node.elts[0])
|
||||
return f"set[{element_type}]"
|
||||
|
||||
@@ -152,7 +152,7 @@ class TypeInferenceHelper:
|
||||
if func.attr == "readlines":
|
||||
return "list[str]"
|
||||
|
||||
return "Any"
|
||||
return "object"
|
||||
|
||||
@staticmethod
|
||||
def suggest_function_return_type(
|
||||
@@ -258,11 +258,11 @@ class TypeInferenceHelper:
|
||||
):
|
||||
return "TextIOWrapper | BufferedReader"
|
||||
if attr_name in ("items", "keys", "values", "get"):
|
||||
return "dict[str, Any]"
|
||||
return "dict[str, object]"
|
||||
if attr_name in ("append", "extend", "pop", "remove"):
|
||||
return "list[Any]"
|
||||
return "list[object]"
|
||||
if attr_name in ("add", "remove", "discard"):
|
||||
return "set[Any]"
|
||||
return "set[object]"
|
||||
|
||||
if (
|
||||
isinstance(node, ast.Subscript)
|
||||
@@ -270,7 +270,7 @@ class TypeInferenceHelper:
|
||||
and node.value.id == param_name
|
||||
):
|
||||
# Parameter is subscripted - likely a sequence or mapping
|
||||
return "Sequence[Any] | Mapping[str, Any]"
|
||||
return "Sequence[object] | Mapping[str, object]"
|
||||
|
||||
if (
|
||||
isinstance(node, (ast.For, ast.AsyncFor))
|
||||
@@ -278,7 +278,7 @@ class TypeInferenceHelper:
|
||||
and node.iter.id == param_name
|
||||
):
|
||||
# Parameter is iterated over
|
||||
return "Iterable[Any]"
|
||||
return "Iterable[object]"
|
||||
|
||||
if (
|
||||
isinstance(node, ast.Call)
|
||||
@@ -286,7 +286,7 @@ class TypeInferenceHelper:
|
||||
and node.func.id == param_name
|
||||
):
|
||||
# Check if param is called (callable)
|
||||
return "Callable[..., Any]"
|
||||
return "Callable[..., object]"
|
||||
|
||||
return None
|
||||
|
||||
@@ -356,7 +356,7 @@ class TypeInferenceHelper:
|
||||
target_name = node.target.id
|
||||
|
||||
# Try to infer better type from value
|
||||
better_type: str = "Any"
|
||||
better_type: str = "object"
|
||||
if node.value:
|
||||
better_type = TypeInferenceHelper._infer_from_node(node.value)
|
||||
|
||||
@@ -394,7 +394,7 @@ class TypeInferenceHelper:
|
||||
)
|
||||
)
|
||||
suggested_type: str = (
|
||||
suggestion.suggested_type if suggestion else "Any"
|
||||
suggestion.suggested_type if suggestion else "object"
|
||||
)
|
||||
results.append(
|
||||
{
|
||||
|
||||
@@ -34,6 +34,8 @@ class PayloadValidator(BaseModel):
|
||||
tool_input: dict[str, object] = {}
|
||||
tool_response: object = None
|
||||
tool_output: object = None
|
||||
content: str = ""
|
||||
file_path: str = ""
|
||||
|
||||
class Config:
|
||||
"""Pydantic config."""
|
||||
|
||||
@@ -1,58 +1,404 @@
|
||||
"""Code quality guard for Claude Code PreToolUse/PostToolUse hooks.
|
||||
|
||||
Enforces quality standards by preventing duplicate, complex, or non-modernized code.
|
||||
Note: Currently provides minimal pass-through validation.
|
||||
Full code quality analysis can be added by integrating the claude-quality toolkit.
|
||||
Integrates with hooks/analyzers and src/quality analyzers to enforce quality
|
||||
standards by detecting duplicate code, high complexity, type safety issues,
|
||||
and code style violations.
|
||||
"""
|
||||
|
||||
import ast
|
||||
import re
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import TypeGuard
|
||||
|
||||
# Setup path for imports - try both relative and absolute
|
||||
# Setup path for imports
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "src"))
|
||||
|
||||
from models import HookResponse
|
||||
|
||||
# Optionally import analyzer modules (graceful degradation if not available)
|
||||
message_enrichment_module: object = None
|
||||
type_inference_module: object = None
|
||||
|
||||
# Fallback: define minimal versions
|
||||
def _pretooluse_hook(_hook_data: dict[str, object]) -> HookResponse:
|
||||
"""Minimal pretooluse handler."""
|
||||
return {
|
||||
"hookSpecificOutput": {
|
||||
"hookEventName": "PreToolUse",
|
||||
"permissionDecision": "allow",
|
||||
},
|
||||
}
|
||||
try:
|
||||
from analyzers import message_enrichment as message_enrichment_module
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
try:
|
||||
from analyzers import type_inference as type_inference_module
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
|
||||
def _posttooluse_hook(_hook_data: dict[str, object]) -> HookResponse:
|
||||
"""Minimal posttooluse handler."""
|
||||
return {
|
||||
"hookSpecificOutput": {"hookEventName": "PostToolUse"},
|
||||
}
|
||||
def _is_dict_str_obj(value: object) -> TypeGuard[dict[str, object]]:
|
||||
"""Type guard for dict with string keys and object values."""
|
||||
return isinstance(value, dict)
|
||||
|
||||
|
||||
def _safe_dict_get(d: object, key: str) -> object | None:
|
||||
"""Safely get a value from a dict, narrowing through isinstance checks."""
|
||||
if isinstance(d, dict):
|
||||
result = d.get(key)
|
||||
if result is not None:
|
||||
return result
|
||||
return None
|
||||
|
||||
|
||||
def _safe_get_int(d: object, key: str, default: int = 0) -> int:
|
||||
"""Safely get an int value from a dict."""
|
||||
val = _safe_dict_get(d, key)
|
||||
if isinstance(val, int):
|
||||
return val
|
||||
return default
|
||||
|
||||
|
||||
def _safe_get_str(d: object, key: str, default: str = "") -> str:
|
||||
"""Safely get a str value from a dict."""
|
||||
val = _safe_dict_get(d, key)
|
||||
if isinstance(val, str):
|
||||
return val
|
||||
return default
|
||||
|
||||
|
||||
def _safe_get_float(d: object, key: str, default: float = 0.0) -> float:
|
||||
"""Safely get a float value from a dict."""
|
||||
val = _safe_dict_get(d, key)
|
||||
if isinstance(val, (int, float)):
|
||||
return float(val)
|
||||
return default
|
||||
|
||||
|
||||
def _safe_get_list(d: object, key: str) -> list[object]:
|
||||
"""Safely get a list value from a dict."""
|
||||
val = _safe_dict_get(d, key)
|
||||
if isinstance(val, list):
|
||||
# Cast list[Unknown] to list[object] after isinstance narrows the type
|
||||
return list(val)
|
||||
return []
|
||||
|
||||
|
||||
class CodeQualityGuard:
|
||||
"""Validates code quality through duplicate, complexity, modernization checks."""
|
||||
"""Validates code quality through comprehensive checks.
|
||||
|
||||
Checks for:
|
||||
- Duplicate code blocks (structural and semantic)
|
||||
- High cyclomatic complexity
|
||||
- Any type usage without justification
|
||||
- Type suppression comments (# type: ignore, # noqa)
|
||||
"""
|
||||
|
||||
COMPLEXITY_THRESHOLD: int = 15
|
||||
"""Maximum allowed cyclomatic complexity per function."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
"""Initialize quality analyzers from src/quality."""
|
||||
self.dup_engine: object = None
|
||||
self.complexity_analyzer: object = None
|
||||
|
||||
try:
|
||||
from quality.detection.engine import DuplicateDetectionEngine
|
||||
from quality.complexity.analyzer import ComplexityAnalyzer
|
||||
from quality.config.schemas import QualityConfig
|
||||
|
||||
config = QualityConfig()
|
||||
self.dup_engine = DuplicateDetectionEngine(config)
|
||||
self.complexity_analyzer = ComplexityAnalyzer(
|
||||
config.complexity, config
|
||||
)
|
||||
except ImportError:
|
||||
# Quality package not available, analyzers remain None
|
||||
pass
|
||||
|
||||
def pretooluse(self, hook_data: dict[str, object]) -> HookResponse:
|
||||
"""Handle PreToolUse hook for quality analysis.
|
||||
|
||||
Currently provides pass-through validation. Full analysis happens
|
||||
in posttooluse after code is written.
|
||||
|
||||
Args:
|
||||
hook_data: Hook input data containing tool_name and tool_input.
|
||||
|
||||
Returns:
|
||||
Hook response with permission decision.
|
||||
Hook response with permission decision (always allow pre-write).
|
||||
"""
|
||||
return _pretooluse_hook(hook_data)
|
||||
return {
|
||||
"hookSpecificOutput": {
|
||||
"hookEventName": "PreToolUse",
|
||||
"permissionDecision": "allow",
|
||||
},
|
||||
}
|
||||
|
||||
def _extract_content(self, hook_data: dict[str, object]) -> str:
|
||||
"""Extract code content from hook data.
|
||||
|
||||
Checks tool_input.content first, then hook_data.content.
|
||||
|
||||
Args:
|
||||
hook_data: Hook payload data.
|
||||
|
||||
Returns:
|
||||
Extracted code content or empty string.
|
||||
"""
|
||||
tool_input = hook_data.get("tool_input")
|
||||
if _is_dict_str_obj(tool_input):
|
||||
content_obj = tool_input.get("content")
|
||||
if isinstance(content_obj, str) and content_obj.strip():
|
||||
return content_obj
|
||||
|
||||
content_obj = hook_data.get("content")
|
||||
if isinstance(content_obj, str) and content_obj.strip():
|
||||
return content_obj
|
||||
|
||||
return ""
|
||||
|
||||
def _check_any_usage(self, content: str) -> list[str]:
|
||||
"""Check for typing.Any usage without justification.
|
||||
|
||||
Args:
|
||||
content: Source code to analyze.
|
||||
|
||||
Returns:
|
||||
List of violation messages with guidance.
|
||||
"""
|
||||
violations: list[str] = []
|
||||
if type_inference_module is None:
|
||||
return violations
|
||||
|
||||
try:
|
||||
helper = getattr(type_inference_module, "TypeInferenceHelper", None)
|
||||
if helper is None:
|
||||
return violations
|
||||
|
||||
find_method = getattr(helper, "find_any_usage_with_context", None)
|
||||
if find_method is None:
|
||||
return violations
|
||||
|
||||
any_usages = find_method(content)
|
||||
for usage_item in any_usages:
|
||||
if not isinstance(usage_item, dict):
|
||||
continue
|
||||
|
||||
# Cast to the expected type after isinstance check
|
||||
usage_dict = usage_item
|
||||
|
||||
line_num = _safe_get_int(usage_dict, "line", 0)
|
||||
element = _safe_get_str(usage_dict, "element", "unknown")
|
||||
context = _safe_get_str(usage_dict, "context", "")
|
||||
suggested = _safe_get_str(usage_dict, "suggested", "")
|
||||
|
||||
msg = (
|
||||
f"❌ Line {line_num}: Found `Any` type in {context}\n"
|
||||
f" Element: {element}\n"
|
||||
f" Suggested: {suggested}\n"
|
||||
f" Why: Using specific types prevents bugs and improves IDE support"
|
||||
)
|
||||
violations.append(msg)
|
||||
except Exception: # noqa: BLE001
|
||||
pass
|
||||
|
||||
return violations
|
||||
|
||||
def _check_type_suppression(self, content: str) -> list[str]:
|
||||
"""Check for type: ignore and # noqa suppression comments.
|
||||
|
||||
Args:
|
||||
content: Source code to analyze.
|
||||
|
||||
Returns:
|
||||
List of violation messages with explanations.
|
||||
"""
|
||||
violations: list[str] = []
|
||||
lines = content.splitlines()
|
||||
|
||||
for line_num, line in enumerate(lines, 1):
|
||||
# Check for # type: ignore comments
|
||||
if re.search(r"#\s*type:\s*ignore", line):
|
||||
code = line.split("#")[0].strip()
|
||||
msg = (
|
||||
f"🚫 Line {line_num}: Found `# type: ignore` suppression\n"
|
||||
f" Code: {code}\n"
|
||||
f" Why: Type suppression hides real type errors and prevents proper typing\n"
|
||||
f" Fix: Use proper type annotations or TypeGuard/Protocol instead"
|
||||
)
|
||||
violations.append(msg)
|
||||
|
||||
# Check for # noqa comments
|
||||
if re.search(r"#\s*noqa", line):
|
||||
code = line.split("#")[0].strip()
|
||||
msg = (
|
||||
f"⚠️ Line {line_num}: Found `# noqa` linting suppression\n"
|
||||
f" Code: {code}\n"
|
||||
f" Why: Suppressing linting hides code quality issues\n"
|
||||
f" Fix: Address the linting issue directly or document why it's necessary"
|
||||
)
|
||||
violations.append(msg)
|
||||
|
||||
return violations
|
||||
|
||||
def _check_complexity(self, content: str) -> list[str]:
|
||||
"""Check for high cyclomatic complexity.
|
||||
|
||||
Args:
|
||||
content: Source code to analyze.
|
||||
|
||||
Returns:
|
||||
List of violation messages with refactoring guidance.
|
||||
"""
|
||||
violations: list[str] = []
|
||||
|
||||
try:
|
||||
tree = ast.parse(content)
|
||||
except SyntaxError:
|
||||
return violations
|
||||
|
||||
for node in ast.walk(tree):
|
||||
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
|
||||
complexity = self._calculate_complexity(node)
|
||||
if complexity > self.COMPLEXITY_THRESHOLD:
|
||||
line_num = getattr(node, "lineno", 0)
|
||||
msg = (
|
||||
f"⚠️ Line {line_num}: High complexity in `{node.name}` "
|
||||
f"(complexity: {complexity}, threshold: {self.COMPLEXITY_THRESHOLD})\n"
|
||||
f" Refactoring suggestions:\n"
|
||||
f" • Extract nested conditions into separate functions\n"
|
||||
f" • Use guard clauses to reduce nesting\n"
|
||||
f" • Replace complex conditionals with polymorphism/strategy pattern\n"
|
||||
f" • Break into smaller, focused functions\n"
|
||||
f" Why: Complex code is harder to understand, test, and maintain"
|
||||
)
|
||||
violations.append(msg)
|
||||
|
||||
return violations
|
||||
|
||||
def _calculate_complexity(self, node: ast.AST) -> int:
|
||||
"""Calculate cyclomatic complexity for a function.
|
||||
|
||||
Args:
|
||||
node: AST node to analyze.
|
||||
|
||||
Returns:
|
||||
Cyclomatic complexity value.
|
||||
"""
|
||||
complexity = 1
|
||||
for child in ast.walk(node):
|
||||
if isinstance(
|
||||
child,
|
||||
(ast.If, ast.While, ast.For, ast.ExceptHandler),
|
||||
):
|
||||
complexity += 1
|
||||
elif isinstance(child, ast.BoolOp):
|
||||
complexity += len(child.values) - 1
|
||||
return complexity
|
||||
|
||||
def _check_duplicates(self, content: str) -> list[str]:
|
||||
"""Check for duplicate code blocks.
|
||||
|
||||
Args:
|
||||
content: Source code to analyze.
|
||||
|
||||
Returns:
|
||||
List of violation messages with context.
|
||||
"""
|
||||
violations: list[str] = []
|
||||
if self.dup_engine is None:
|
||||
return violations
|
||||
|
||||
try:
|
||||
ast_analyzer = getattr(self.dup_engine, "ast_analyzer", None)
|
||||
if ast_analyzer is None:
|
||||
return violations
|
||||
|
||||
blocks_method = getattr(ast_analyzer, "extract_code_blocks", None)
|
||||
if blocks_method is None:
|
||||
return violations
|
||||
|
||||
code_blocks = blocks_method(content)
|
||||
if not code_blocks or len(code_blocks) <= 1:
|
||||
return violations
|
||||
|
||||
detect_method = getattr(
|
||||
self.dup_engine, "detect_duplicates_in_blocks", None
|
||||
)
|
||||
if detect_method is None:
|
||||
return violations
|
||||
|
||||
duplicates = detect_method(code_blocks)
|
||||
if duplicates and message_enrichment_module is not None:
|
||||
formatter = getattr(message_enrichment_module, "EnhancedMessageFormatter", None)
|
||||
if formatter is not None:
|
||||
format_method = getattr(formatter, "format_duplicate_message", None)
|
||||
if format_method is not None:
|
||||
for dup in duplicates:
|
||||
if not isinstance(dup, dict):
|
||||
continue
|
||||
|
||||
# Cast after isinstance check
|
||||
dup_dict = dup
|
||||
|
||||
dup_type = _safe_get_str(dup_dict, "type", "unknown")
|
||||
similarity = _safe_get_float(dup_dict, "similarity", 0.0)
|
||||
locations = _safe_get_list(dup_dict, "locations")
|
||||
|
||||
msg = format_method(
|
||||
dup_type,
|
||||
similarity,
|
||||
locations,
|
||||
content,
|
||||
include_refactoring=True,
|
||||
)
|
||||
if isinstance(msg, str):
|
||||
violations.append(msg)
|
||||
except Exception: # noqa: BLE001
|
||||
pass
|
||||
|
||||
return violations
|
||||
|
||||
def posttooluse(self, hook_data: dict[str, object]) -> HookResponse:
|
||||
"""Handle PostToolUse hook for quality verification.
|
||||
|
||||
Checks for:
|
||||
- Type: ignore and # noqa suppression comments
|
||||
- Typing.Any usage
|
||||
- High cyclomatic complexity
|
||||
- Duplicate code blocks
|
||||
|
||||
Args:
|
||||
hook_data: Hook output data.
|
||||
hook_data: Hook output data containing written code.
|
||||
|
||||
Returns:
|
||||
Hook response with decision.
|
||||
Hook response with approval or block decision.
|
||||
"""
|
||||
return _posttooluse_hook(hook_data)
|
||||
content = self._extract_content(hook_data)
|
||||
if not content:
|
||||
return {"hookSpecificOutput": {"hookEventName": "PostToolUse"}}
|
||||
|
||||
violations: list[str] = []
|
||||
|
||||
# Check for suppressions first (highest priority)
|
||||
violations.extend(self._check_type_suppression(content))
|
||||
|
||||
# Check for Any type usage
|
||||
violations.extend(self._check_any_usage(content))
|
||||
|
||||
# Check complexity
|
||||
violations.extend(self._check_complexity(content))
|
||||
|
||||
# Check duplicates
|
||||
violations.extend(self._check_duplicates(content))
|
||||
|
||||
if violations:
|
||||
message = (
|
||||
"🚫 Code Quality Issues Detected\n\n"
|
||||
+ "\n\n".join(violations)
|
||||
+ "\n\n"
|
||||
"📚 Learn more: Use specific types, remove suppressions, reduce complexity"
|
||||
)
|
||||
return {
|
||||
"hookSpecificOutput": {"hookEventName": "PostToolUse"},
|
||||
"decision": "block",
|
||||
"reason": message,
|
||||
}
|
||||
|
||||
return {"hookSpecificOutput": {"hookEventName": "PostToolUse"}}
|
||||
|
||||
12078
logs/status_line.json
12078
logs/status_line.json
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user