* refactor: replace module-level config caching with thread-safe lazy loading * refactor: migrate to registry-based architecture with new validation system * Merge branch 'main' into cleanup * feat: add secure graph routing with comprehensive security controls * fix: add cross-package dependencies to pyrefly search paths - Fix import resolution errors in business-buddy-tools package by adding ../business-buddy-core/src and ../business-buddy-extraction/src to search_path - Fix import resolution errors in business-buddy-extraction package by adding ../business-buddy-core/src to search_path - Resolves all 86 pyrefly import errors that were failing in CI/CD pipeline - All packages now pass pyrefly type checking with 0 errors The issue was that packages import from bb_core but pyrefly was only looking in local src directories, not in sibling package directories. * fix: resolve async function and security import issues Research.py fixes: - Create separate async config loader using load_config_async - Fix _get_cached_config_async to properly await async lazy loader - Prevents blocking event loop during config loading Planner.py fixes: - Move get_secure_router and execute_graph_securely imports to module level - Remove imports from exception handlers to prevent cascade failures - Improves reliability during security incident handling Both fixes ensure proper async behavior and more robust error handling.
433 lines
17 KiB
Python
Executable File
433 lines
17 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Check for modern typing patterns and Pydantic v2 usage across the codebase.
|
|
|
|
This script validates that the codebase uses modern Python 3.12+ typing patterns
|
|
and Pydantic v2 features, while ignoring legitimate compatibility-related type ignores.
|
|
|
|
Usage:
|
|
python scripts/checks/typing_modernization_check.py # Check src/ and packages/
|
|
python scripts/checks/typing_modernization_check.py --tests # Include tests/
|
|
python scripts/checks/typing_modernization_check.py --verbose # Detailed output
|
|
python scripts/checks/typing_modernization_check.py --fix # Auto-fix simple issues
|
|
"""
|
|
|
|
import argparse
|
|
import ast
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Any, NamedTuple
|
|
|
|
# Define the project root
|
|
PROJECT_ROOT = Path(__file__).parent.parent.parent
|
|
|
|
|
|
class Issue(NamedTuple):
|
|
"""Represents a typing/Pydantic issue found in the code."""
|
|
file_path: Path
|
|
line_number: int
|
|
issue_type: str
|
|
description: str
|
|
suggestion: str | None = None
|
|
|
|
|
|
class TypingChecker:
|
|
"""Main checker class for typing and Pydantic patterns."""
|
|
|
|
def __init__(self, include_tests: bool = False, verbose: bool = False, fix: bool = False):
|
|
self.include_tests = include_tests
|
|
self.verbose = verbose
|
|
self.fix = fix
|
|
self.issues: list[Issue] = []
|
|
|
|
# Paths to check
|
|
self.check_paths = [
|
|
PROJECT_ROOT / "src",
|
|
PROJECT_ROOT / "packages",
|
|
]
|
|
if include_tests:
|
|
self.check_paths.append(PROJECT_ROOT / "tests")
|
|
|
|
def check_all(self) -> list[Issue]:
|
|
"""Run all checks and return found issues."""
|
|
print(f"🔍 Checking typing modernization in: {', '.join(str(p.name) for p in self.check_paths)}")
|
|
|
|
for path in self.check_paths:
|
|
if path.exists():
|
|
self._check_directory(path)
|
|
|
|
return self.issues
|
|
|
|
def _check_directory(self, directory: Path) -> None:
|
|
"""Recursively check all Python files in a directory."""
|
|
for py_file in directory.rglob("*.py"):
|
|
# Skip certain files that may have legitimate old patterns
|
|
if self._should_skip_file(py_file):
|
|
continue
|
|
|
|
self._check_file(py_file)
|
|
|
|
def _should_skip_file(self, file_path: Path) -> bool:
|
|
"""Determine if a file should be skipped from checking."""
|
|
# Skip files in __pycache__ or .git directories
|
|
if any(part.startswith('.') or part == '__pycache__' for part in file_path.parts):
|
|
return True
|
|
|
|
# Skip migration files or generated code
|
|
if 'migrations' in str(file_path) or 'generated' in str(file_path):
|
|
return True
|
|
|
|
return False
|
|
|
|
def _check_file(self, file_path: Path) -> None:
|
|
"""Check a single Python file for typing and Pydantic issues."""
|
|
try:
|
|
content = file_path.read_text(encoding='utf-8')
|
|
lines = content.splitlines()
|
|
|
|
# Check each line for patterns
|
|
for line_num, line in enumerate(lines, 1):
|
|
self._check_line(file_path, line_num, line, content)
|
|
|
|
# Parse AST for more complex checks
|
|
try:
|
|
tree = ast.parse(content)
|
|
self._check_ast(file_path, tree, lines)
|
|
except SyntaxError:
|
|
# Skip files with syntax errors
|
|
pass
|
|
|
|
except (UnicodeDecodeError, PermissionError) as e:
|
|
if self.verbose:
|
|
print(f"⚠️ Could not read {file_path}: {e}")
|
|
|
|
def _check_line(self, file_path: Path, line_num: int, line: str, full_content: str) -> None:
|
|
"""Check a single line for typing and Pydantic issues."""
|
|
stripped_line = line.strip()
|
|
|
|
# Skip comments and docstrings (unless they contain actual code)
|
|
if stripped_line.startswith('#') or stripped_line.startswith('"""') or stripped_line.startswith("'''"):
|
|
return
|
|
|
|
# Skip legitimate type ignore comments for compatibility
|
|
if self._is_legitimate_type_ignore(line):
|
|
return
|
|
|
|
# Check for old typing imports
|
|
self._check_old_typing_imports(file_path, line_num, line)
|
|
|
|
# Check for old typing usage patterns
|
|
self._check_old_typing_patterns(file_path, line_num, line)
|
|
|
|
# Check for Pydantic v1 patterns
|
|
self._check_pydantic_v1_patterns(file_path, line_num, line)
|
|
|
|
# Check for specific modernization opportunities
|
|
self._check_modernization_opportunities(file_path, line_num, line)
|
|
|
|
def _check_ast(self, file_path: Path, tree: ast.AST, lines: list[str]) -> None:
|
|
"""Perform AST-based checks for more complex patterns."""
|
|
for node in ast.walk(tree):
|
|
# Check function annotations
|
|
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
|
|
self._check_function_annotations(file_path, node, lines)
|
|
|
|
# Check class definitions
|
|
elif isinstance(node, ast.ClassDef):
|
|
self._check_class_definition(file_path, node, lines)
|
|
|
|
# Check variable annotations
|
|
elif isinstance(node, ast.AnnAssign):
|
|
self._check_variable_annotation(file_path, node, lines)
|
|
|
|
def _is_legitimate_type_ignore(self, line: str) -> bool:
|
|
"""Check if a type ignore comment is for legitimate compatibility reasons."""
|
|
if '# type: ignore' not in line:
|
|
return False
|
|
|
|
# Common legitimate type ignores for compatibility
|
|
legitimate_patterns = [
|
|
'import', # Import compatibility issues
|
|
'TCH', # TYPE_CHECKING related ignores
|
|
'overload', # Function overload issues
|
|
'protocol', # Protocol compatibility
|
|
'mypy', # Specific mypy version issues
|
|
'pyright', # Specific pyright issues
|
|
]
|
|
|
|
return any(pattern in line.lower() for pattern in legitimate_patterns)
|
|
|
|
def _check_old_typing_imports(self, file_path: Path, line_num: int, line: str) -> None:
|
|
"""Check for old typing imports that should be modernized."""
|
|
# Pattern: from typing import Union, Optional, Dict, List, etc.
|
|
if 'from typing import' in line:
|
|
old_imports = ['Union', 'Optional', 'Dict', 'List', 'Set', 'Tuple']
|
|
found_old = []
|
|
|
|
for imp in old_imports:
|
|
# Check for exact word boundaries to avoid false positives like "TypedDict" containing "Dict"
|
|
import re
|
|
# Match the import name with word boundaries or specific delimiters
|
|
pattern = rf'\b{imp}\b'
|
|
if re.search(pattern, line):
|
|
# Additional check to ensure it's not part of a longer word like "TypedDict"
|
|
# Check for common patterns: " Dict", "Dict,", "Dict)", "(Dict", "Dict\n"
|
|
if (f' {imp}' in line or f'{imp},' in line or f'{imp})' in line or
|
|
f'({imp}' in line or line.strip().endswith(imp)):
|
|
# Exclude cases where it's part of a longer identifier
|
|
if not any(longer in line for longer in [f'Typed{imp}', f'{imp}Type', f'_{imp}', f'{imp}_']):
|
|
found_old.append(imp)
|
|
|
|
if found_old:
|
|
suggestion = self._suggest_import_fix(line, found_old)
|
|
self.issues.append(Issue(
|
|
file_path=file_path,
|
|
line_number=line_num,
|
|
issue_type="old_typing_import",
|
|
description=f"Old typing imports: {', '.join(found_old)}",
|
|
suggestion=suggestion
|
|
))
|
|
|
|
def _check_old_typing_patterns(self, file_path: Path, line_num: int, line: str) -> None:
|
|
"""Check for old typing usage patterns."""
|
|
# Union[X, Y] should be X | Y
|
|
union_pattern = re.search(r'Union\[([^\]]+)\]', line)
|
|
if union_pattern:
|
|
suggestion = union_pattern.group(1).replace(', ', ' | ')
|
|
self.issues.append(Issue(
|
|
file_path=file_path,
|
|
line_number=line_num,
|
|
issue_type="old_union_syntax",
|
|
description=f"Use '|' syntax instead of Union: {union_pattern.group(0)}",
|
|
suggestion=suggestion
|
|
))
|
|
|
|
# Optional[X] should be X | None
|
|
optional_pattern = re.search(r'Optional\[([^\]]+)\]', line)
|
|
if optional_pattern:
|
|
suggestion = f"{optional_pattern.group(1)} | None"
|
|
self.issues.append(Issue(
|
|
file_path=file_path,
|
|
line_number=line_num,
|
|
issue_type="old_optional_syntax",
|
|
description=f"Use '| None' syntax instead of Optional: {optional_pattern.group(0)}",
|
|
suggestion=suggestion
|
|
))
|
|
|
|
# Dict[K, V] should be dict[K, V]
|
|
for old_type in ['Dict', 'List', 'Set', 'Tuple']:
|
|
pattern = re.search(rf'{old_type}\[([^\]]+)\]', line)
|
|
if pattern:
|
|
suggestion = f"{old_type.lower()}[{pattern.group(1)}]"
|
|
self.issues.append(Issue(
|
|
file_path=file_path,
|
|
line_number=line_num,
|
|
issue_type="old_generic_syntax",
|
|
description=f"Use built-in generic: {pattern.group(0)}",
|
|
suggestion=suggestion
|
|
))
|
|
|
|
def _check_pydantic_v1_patterns(self, file_path: Path, line_num: int, line: str) -> None:
|
|
"""Check for Pydantic v1 patterns that should be v2."""
|
|
# Config class instead of model_config
|
|
if 'class Config:' in line:
|
|
self.issues.append(Issue(
|
|
file_path=file_path,
|
|
line_number=line_num,
|
|
issue_type="pydantic_v1_config",
|
|
description="Use model_config = ConfigDict(...) instead of Config class",
|
|
suggestion="model_config = ConfigDict(...)"
|
|
))
|
|
|
|
# Old field syntax
|
|
if re.search(r'Field\([^)]*allow_mutation\s*=', line):
|
|
self.issues.append(Issue(
|
|
file_path=file_path,
|
|
line_number=line_num,
|
|
issue_type="pydantic_v1_field",
|
|
description="'allow_mutation' is deprecated, use 'frozen' on model",
|
|
suggestion="Use frozen=True in model_config"
|
|
))
|
|
|
|
# Old validator syntax
|
|
if '@validator' in line:
|
|
self.issues.append(Issue(
|
|
file_path=file_path,
|
|
line_number=line_num,
|
|
issue_type="pydantic_v1_validator",
|
|
description="Use @field_validator instead of @validator",
|
|
suggestion="@field_validator('field_name')"
|
|
))
|
|
|
|
# Old root_validator syntax
|
|
if '@root_validator' in line:
|
|
self.issues.append(Issue(
|
|
file_path=file_path,
|
|
line_number=line_num,
|
|
issue_type="pydantic_v1_root_validator",
|
|
description="Use @model_validator instead of @root_validator",
|
|
suggestion="@model_validator(mode='before')"
|
|
))
|
|
|
|
def _check_modernization_opportunities(self, file_path: Path, line_num: int, line: str) -> None:
|
|
"""Check for other modernization opportunities."""
|
|
# typing_extensions imports that can be replaced
|
|
if 'from typing_extensions import' in line:
|
|
modern_imports = ['NotRequired', 'Required', 'TypedDict', 'Literal']
|
|
found_modern = [imp for imp in modern_imports if f' {imp}' in line or f'{imp},' in line]
|
|
|
|
if found_modern:
|
|
self.issues.append(Issue(
|
|
file_path=file_path,
|
|
line_number=line_num,
|
|
issue_type="typing_extensions_modernizable",
|
|
description=f"These can be imported from typing: {', '.join(found_modern)}",
|
|
suggestion=f"from typing import {', '.join(found_modern)}"
|
|
))
|
|
|
|
# Old try/except for typing imports
|
|
if 'try:' in line and 'from typing import' in line:
|
|
self.issues.append(Issue(
|
|
file_path=file_path,
|
|
line_number=line_num,
|
|
issue_type="unnecessary_typing_try_except",
|
|
description="Try/except for typing imports may be unnecessary in Python 3.12+",
|
|
suggestion="Direct import should work"
|
|
))
|
|
|
|
def _check_function_annotations(self, file_path: Path, node: ast.FunctionDef | ast.AsyncFunctionDef, lines: list[str]) -> None:
|
|
"""Check function annotations for modernization opportunities."""
|
|
# This could be expanded to check function signature patterns
|
|
pass
|
|
|
|
def _check_class_definition(self, file_path: Path, node: ast.ClassDef, lines: list[str]) -> None:
|
|
"""Check class definitions for modernization opportunities."""
|
|
# Check for TypedDict with total=False patterns that could be simplified
|
|
if any(isinstance(base, ast.Name) and base.id == 'TypedDict' for base in node.bases):
|
|
# Could check for NotRequired vs total=False patterns
|
|
pass
|
|
|
|
def _check_variable_annotation(self, file_path: Path, node: ast.AnnAssign, lines: list[str]) -> None:
|
|
"""Check variable annotations for modernization opportunities."""
|
|
# This could check for specific annotation patterns
|
|
pass
|
|
|
|
def _suggest_import_fix(self, line: str, old_imports: list[str]) -> str:
|
|
"""Suggest how to fix old typing imports."""
|
|
# Remove old imports and suggest modern alternatives
|
|
suggestions = []
|
|
if 'Union' in old_imports:
|
|
suggestions.append("Use 'X | Y' syntax instead of Union")
|
|
if 'Optional' in old_imports:
|
|
suggestions.append("Use 'X | None' instead of Optional")
|
|
if any(imp in old_imports for imp in ['Dict', 'List', 'Set', 'Tuple']):
|
|
suggestions.append("Use built-in generics (dict, list, set, tuple)")
|
|
|
|
return "; ".join(suggestions)
|
|
|
|
def print_results(self) -> None:
|
|
"""Print the results of the check."""
|
|
if not self.issues:
|
|
print("✅ No typing modernization issues found!")
|
|
return
|
|
|
|
# Group issues by type
|
|
issues_by_type: dict[str, list[Issue]] = {}
|
|
for issue in self.issues:
|
|
issues_by_type.setdefault(issue.issue_type, []).append(issue)
|
|
|
|
print(f"\n❌ Found {len(self.issues)} typing modernization issues:")
|
|
print("=" * 60)
|
|
|
|
for issue_type, type_issues in issues_by_type.items():
|
|
print(f"\n🔸 {issue_type.replace('_', ' ').title()} ({len(type_issues)} issues)")
|
|
print("-" * 40)
|
|
|
|
for issue in type_issues:
|
|
rel_path = issue.file_path.relative_to(PROJECT_ROOT)
|
|
print(f" 📁 {rel_path}:{issue.line_number}")
|
|
print(f" {issue.description}")
|
|
if issue.suggestion and self.verbose:
|
|
print(f" 💡 Suggestion: {issue.suggestion}")
|
|
print()
|
|
|
|
# Summary
|
|
print("=" * 60)
|
|
print(f"Summary: {len(self.issues)} issues across {len(set(i.file_path for i in self.issues))} files")
|
|
|
|
# Recommendations
|
|
print("\n📝 Quick fixes:")
|
|
print("1. Replace Union[X, Y] with X | Y")
|
|
print("2. Replace Optional[X] with X | None")
|
|
print("3. Replace Dict/List/Set/Tuple with dict/list/set/tuple")
|
|
print("4. Update Pydantic v1 patterns to v2")
|
|
print("5. Use direct imports from typing instead of typing_extensions")
|
|
|
|
|
|
def main() -> int:
|
|
"""Main entry point for the script."""
|
|
parser = argparse.ArgumentParser(
|
|
description="Check for modern typing patterns and Pydantic v2 usage",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
python scripts/checks/typing_modernization_check.py
|
|
python scripts/checks/typing_modernization_check.py --tests --verbose
|
|
python scripts/checks/typing_modernization_check.py --fix
|
|
"""
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--tests',
|
|
action='store_true',
|
|
help='Include tests/ directory in checks'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--verbose', '-v',
|
|
action='store_true',
|
|
help='Show detailed output including suggestions'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--fix',
|
|
action='store_true',
|
|
help='Attempt to auto-fix simple issues (not implemented yet)'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--quiet', '-q',
|
|
action='store_true',
|
|
help='Only show summary, no detailed issues'
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.fix:
|
|
print("⚠️ Auto-fix functionality not implemented yet")
|
|
return 1
|
|
|
|
# Run the checker
|
|
checker = TypingChecker(
|
|
include_tests=args.tests,
|
|
verbose=args.verbose and not args.quiet,
|
|
fix=args.fix
|
|
)
|
|
|
|
issues = checker.check_all()
|
|
|
|
if not args.quiet:
|
|
checker.print_results()
|
|
else:
|
|
if issues:
|
|
print(f"❌ Found {len(issues)} typing modernization issues")
|
|
else:
|
|
print("✅ No typing modernization issues found!")
|
|
|
|
# Return exit code
|
|
return 1 if issues else 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|