Files
claude-scripts/hooks/internal_duplicate_detector.py
2025-09-17 17:01:02 +00:00

486 lines
16 KiB
Python

"""Internal duplicate detection for analyzing code blocks within a single file.
Uses AST analysis and multiple similarity algorithms to detect redundant patterns.
"""
import ast
import difflib
import hashlib
import re
from collections import defaultdict
from dataclasses import dataclass
from typing import Any
COMMON_DUPLICATE_METHODS = {
"__init__",
"__enter__",
"__exit__",
"__aenter__",
"__aexit__",
}
@dataclass
class CodeBlock:
"""Represents a code block (function, method, or class)."""
name: str
type: str # 'function', 'method', 'class'
start_line: int
end_line: int
source: str
ast_node: ast.AST
complexity: int = 0
tokens: list[str] = None
def __post_init__(self):
if self.tokens is None:
self.tokens = self._tokenize()
def _tokenize(self) -> list[str]:
"""Extract meaningful tokens from source code."""
# Remove comments and docstrings
code = re.sub(r"#.*$", "", self.source, flags=re.MULTILINE)
code = re.sub(r'""".*?"""', "", code, flags=re.DOTALL)
code = re.sub(r"'''.*?'''", "", code, flags=re.DOTALL)
# Extract identifiers, keywords, operators
return re.findall(r"\b\w+\b|[=<>!+\-*/]+", code)
@dataclass
class DuplicateGroup:
"""Group of similar code blocks."""
blocks: list[CodeBlock]
similarity_score: float
pattern_type: str # 'exact', 'structural', 'semantic'
description: str
class InternalDuplicateDetector:
"""Detects duplicate and similar code blocks within a single file."""
def __init__(
self,
similarity_threshold: float = 0.7,
min_lines: int = 4,
min_tokens: int = 20,
):
self.similarity_threshold = similarity_threshold
self.min_lines = min_lines
self.min_tokens = min_tokens
self.duplicate_groups: list[DuplicateGroup] = []
def analyze_code(self, source_code: str) -> dict[str, Any]:
"""Analyze source code for internal duplicates."""
try:
tree = ast.parse(source_code)
except SyntaxError:
return {
"error": "Failed to parse code",
"duplicates": [],
"summary": {"total_duplicates": 0},
}
# Extract code blocks
blocks = self._extract_code_blocks(tree, source_code)
# Filter blocks by size
blocks = [
b
for b in blocks
if (b.end_line - b.start_line + 1) >= self.min_lines
and len(b.tokens) >= self.min_tokens
]
if len(blocks) < 2:
return {
"duplicates": [],
"summary": {
"total_duplicates": 0,
"blocks_analyzed": len(blocks),
},
}
# Find duplicates
duplicate_groups = []
# 1. Check for exact duplicates (normalized)
exact_groups = self._find_exact_duplicates(blocks)
duplicate_groups.extend(exact_groups)
# 2. Check for structural similarity
structural_groups = self._find_structural_duplicates(blocks)
duplicate_groups.extend(structural_groups)
# 3. Check for semantic patterns
pattern_groups = self._find_pattern_duplicates(blocks)
duplicate_groups.extend(pattern_groups)
filtered_groups = [
group
for group in duplicate_groups
if group.similarity_score >= self.similarity_threshold
and not self._should_ignore_group(group)
]
results = [
{
"type": group.pattern_type,
"similarity": group.similarity_score,
"description": group.description,
"locations": [
{
"name": block.name,
"type": block.type,
"lines": f"{block.start_line}-{block.end_line}",
}
for block in group.blocks
],
}
for group in filtered_groups
]
return {
"duplicates": results,
"summary": {
"total_duplicates": len(results),
"blocks_analyzed": len(blocks),
"duplicate_lines": sum(
sum(b.end_line - b.start_line + 1 for b in g.blocks)
for g in filtered_groups
),
},
}
def _extract_code_blocks(self, tree: ast.AST, source: str) -> list[CodeBlock]:
"""Extract functions, methods, and classes from AST."""
blocks = []
lines = source.split("\n")
def create_block(
node: ast.AST,
block_type: str,
lines: list[str],
) -> CodeBlock | None:
try:
start = node.lineno - 1
end = node.end_lineno - 1 if hasattr(node, "end_lineno") else start
source = "\n".join(lines[start : end + 1])
return CodeBlock(
name=node.name,
type=block_type,
start_line=node.lineno,
end_line=node.end_lineno
if hasattr(node, "end_lineno")
else node.lineno,
source=source,
ast_node=node,
complexity=calculate_complexity(node),
)
except Exception: # noqa: BLE001
return None
def calculate_complexity(node: ast.AST) -> int:
"""Simple cyclomatic complexity calculation."""
complexity = 1
for child in ast.walk(node):
if isinstance(
child,
(ast.If, ast.While, ast.For, ast.ExceptHandler),
):
complexity += 1
elif isinstance(child, ast.BoolOp):
complexity += len(child.values) - 1
return complexity
def extract_blocks_from_node(
node: ast.AST,
parent: ast.AST | None = None,
) -> None:
"""Recursively extract code blocks from AST nodes."""
if isinstance(node, ast.ClassDef):
if block := create_block(node, "class", lines):
blocks.append(block)
for item in node.body:
extract_blocks_from_node(item, node)
return
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
block_type = (
"method" if isinstance(parent, ast.ClassDef) else "function"
)
if block := create_block(node, block_type, lines):
blocks.append(block)
for child in ast.iter_child_nodes(node):
extract_blocks_from_node(child, node)
extract_blocks_from_node(tree)
return blocks
def _find_exact_duplicates(self, blocks: list[CodeBlock]) -> list[DuplicateGroup]:
"""Find exact or near-exact duplicate blocks."""
groups = []
processed = set()
for i, block1 in enumerate(blocks):
if i in processed:
continue
similar = [block1]
norm1 = self._normalize_code(block1.source)
for j, block2 in enumerate(blocks[i + 1 :], i + 1):
if j in processed:
continue
norm2 = self._normalize_code(block2.source)
# Check if normalized versions are very similar
similarity = difflib.SequenceMatcher(None, norm1, norm2).ratio()
if similarity >= 0.85: # High threshold for "exact" duplicates
similar.append(block2)
processed.add(j)
if len(similar) > 1:
# Calculate actual similarity on normalized code
total_sim = 0
count = 0
for k in range(len(similar)):
for idx in range(k + 1, len(similar)):
norm_k = self._normalize_code(similar[k].source)
norm_idx = self._normalize_code(similar[idx].source)
sim = difflib.SequenceMatcher(None, norm_k, norm_idx).ratio()
total_sim += sim
count += 1
avg_similarity = total_sim / count if count > 0 else 1.0
groups.append(
DuplicateGroup(
blocks=similar,
similarity_score=avg_similarity,
pattern_type="exact",
description=f"Nearly identical {similar[0].type}s",
),
)
processed.add(i)
return groups
def _normalize_code(self, code: str) -> str:
"""Normalize code for comparison (replace variable names, etc.)."""
# Remove comments and docstrings
code = re.sub(r"#.*$", "", code, flags=re.MULTILINE)
code = re.sub(r'""".*?"""', "", code, flags=re.DOTALL)
code = re.sub(r"'''.*?'''", "", code, flags=re.DOTALL)
# Replace string literals
code = re.sub(r'"[^"]*"', '"STR"', code)
code = re.sub(r"'[^']*'", "'STR'", code)
# Replace numbers
code = re.sub(r"\b\d+\.?\d*\b", "NUM", code)
# Normalize whitespace
code = re.sub(r"\s+", " ", code)
return code.strip()
def _find_structural_duplicates(
self,
blocks: list[CodeBlock],
) -> list[DuplicateGroup]:
"""Find structurally similar blocks using AST comparison."""
groups = []
processed = set()
for i, block1 in enumerate(blocks):
if i in processed:
continue
similar_blocks = [block1]
for j, block2 in enumerate(blocks[i + 1 :], i + 1):
if j in processed:
continue
similarity = self._ast_similarity(block1.ast_node, block2.ast_node)
if similarity >= self.similarity_threshold:
similar_blocks.append(block2)
processed.add(j)
if len(similar_blocks) > 1:
# Calculate average similarity
total_sim = 0
count = 0
for k in range(len(similar_blocks)):
for idx in range(k + 1, len(similar_blocks)):
total_sim += self._ast_similarity(
similar_blocks[k].ast_node,
similar_blocks[idx].ast_node,
)
count += 1
avg_similarity = total_sim / count if count > 0 else 0
groups.append(
DuplicateGroup(
blocks=similar_blocks,
similarity_score=avg_similarity,
pattern_type="structural",
description=f"Structurally similar {similar_blocks[0].type}s",
),
)
processed.add(i)
return groups
def _ast_similarity(self, node1: ast.AST, node2: ast.AST) -> float:
"""Calculate structural similarity between two AST nodes."""
def get_structure(node: ast.AST) -> list[str]:
"""Extract structural pattern from AST node."""
structure = []
for child in ast.walk(node):
structure.append(child.__class__.__name__)
return structure
struct1 = get_structure(node1)
struct2 = get_structure(node2)
if not struct1 or not struct2:
return 0.0
# Use sequence matcher for structural similarity
matcher = difflib.SequenceMatcher(None, struct1, struct2)
return matcher.ratio()
def _find_pattern_duplicates(self, blocks: list[CodeBlock]) -> list[DuplicateGroup]:
"""Find blocks with similar patterns (e.g., similar loops, conditions)."""
groups = []
pattern_groups = defaultdict(list)
for block in blocks:
patterns = self._extract_patterns(block)
for pattern_type, pattern_hash in patterns:
pattern_groups[(pattern_type, pattern_hash)].append(block)
for (pattern_type, _), similar_blocks in pattern_groups.items():
if len(similar_blocks) > 1:
# Calculate token-based similarity
total_sim = 0
count = 0
for i in range(len(similar_blocks)):
for j in range(i + 1, len(similar_blocks)):
sim = self._token_similarity(
similar_blocks[i].tokens,
similar_blocks[j].tokens,
)
total_sim += sim
count += 1
avg_similarity = total_sim / count if count > 0 else 0.7
if avg_similarity >= self.similarity_threshold:
groups.append(
DuplicateGroup(
blocks=similar_blocks,
similarity_score=avg_similarity,
pattern_type="semantic",
description=f"Similar {pattern_type} patterns",
),
)
return groups
def _extract_patterns(self, block: CodeBlock) -> list[tuple[str, str]]:
"""Extract semantic patterns from code block."""
patterns = []
# Pattern: for-if combination
if "for " in block.source and "if " in block.source:
pattern = re.sub(r"\b\w+\b", "VAR", block.source)
pattern = re.sub(r"\s+", "", pattern)
patterns.append(
("loop-condition", hashlib.sha256(pattern.encode()).hexdigest()[:8]),
)
# Pattern: multiple similar operations
operations = re.findall(r"(\w+)\s*[=+\-*/]+\s*(\w+)", block.source)
if len(operations) > 2:
op_pattern = "".join(sorted(op[0] for op in operations))
patterns.append(
("repetitive-ops", hashlib.sha256(op_pattern.encode()).hexdigest()[:8]),
)
# Pattern: similar function calls
calls = re.findall(r"(\w+)\s*\([^)]*\)", block.source)
if len(calls) > 2:
call_pattern = "".join(sorted(set(calls)))
patterns.append(
(
"similar-calls",
hashlib.sha256(call_pattern.encode()).hexdigest()[:8],
),
)
return patterns
def _token_similarity(self, tokens1: list[str], tokens2: list[str]) -> float:
"""Calculate similarity between token sequences."""
if not tokens1 or not tokens2:
return 0.0
# Use Jaccard similarity on token sets
set1 = set(tokens1)
set2 = set(tokens2)
intersection = len(set1 & set2)
union = len(set1 | set2)
if union == 0:
return 0.0
jaccard = intersection / union
# Also consider sequence similarity
sequence_sim = difflib.SequenceMatcher(None, tokens1, tokens2).ratio()
# Weighted combination
return 0.6 * jaccard + 0.4 * sequence_sim
def _should_ignore_group(self, group: DuplicateGroup) -> bool:
"""Drop duplicate groups that match common boilerplate patterns."""
if not group.blocks:
return False
if all(block.name in COMMON_DUPLICATE_METHODS for block in group.blocks):
max_lines = max(
block.end_line - block.start_line + 1 for block in group.blocks
)
max_complexity = max(block.complexity for block in group.blocks)
# Allow simple lifecycle dunder methods to repeat across classes.
if max_lines <= 12 and max_complexity <= 3:
return True
return False
def detect_internal_duplicates(
source_code: str,
threshold: float = 0.7,
min_lines: int = 4,
) -> dict[str, Any]:
"""Main function to detect internal duplicates in code."""
detector = InternalDuplicateDetector(
similarity_threshold=threshold,
min_lines=min_lines,
)
return detector.analyze_code(source_code)