* feat: enhance coverage reporting and improve tool configuration - Added support for JSON coverage reports in pyproject.toml. - Updated .gitignore to include coverage.json and task files for better management. - Introduced a new Type Safety Audit Report to document findings and recommendations for type safety improvements. - Created a comprehensive coverage configuration guide to assist in understanding coverage reporting setup. - Refactored tools configuration to utilize environment variables for concurrent scraping settings. These changes improve the project's testing and reporting capabilities while enhancing overall code quality and maintainability. * feat: enhance configuration handling and improve error logging - Introduced a new utility function `_get_env_int` for robust environment variable integer retrieval with validation. - Updated `WebToolsConfig` and `ToolsConfigModel` to utilize the new utility for environment variable defaults. - Enhanced logging in `CircuitBreaker` to provide detailed state transition information. - Improved URL handling in `url_analyzer.py` for better file extension extraction and normalization. - Added type validation and logging in `SecureInputMixin` to ensure input sanitization and validation consistency. These changes improve the reliability and maintainability of configuration management and error handling across the codebase. * refactor: update imports and enhance .gitignore for improved organization - Updated import paths in various example scripts to reflect the new structure under `biz_bud`. - Enhanced .gitignore to include clearer formatting for task files. - Removed obsolete function calls and improved error handling in several scripts. - Added public alias for backward compatibility in `upload_r2r.py`. These changes improve code organization, maintainability, and compatibility across the project. * refactor: update graph paths in langgraph.json for improved organization - Changed paths for research, catalog, paperless, and url_to_r2r graphs to reflect new directory structure. - Added new entries for analysis and scraping graphs to enhance functionality. These changes improve the organization and maintainability of the graph configurations. * fix: enhance validation and error handling in date range and scraping functions - Updated date validation in UserFiltersModel to ensure date values are strings. - Improved error messages in create_scraped_content_dict to clarify conditions for success and failure. - Enhanced test coverage for date validation and scraping content creation to ensure robustness. These changes improve input validation and error handling across the application, enhancing overall reliability. * refactor: streamline graph creation and enhance type annotations in examples - Simplified graph creation in `catalog_ingredient_research_example.py` and `catalog_tech_components_example.py` by directly compiling the graph. - Updated type annotations in `catalog_intel_with_config.py` for improved clarity and consistency. - Enhanced error handling in catalog data processing to ensure robustness against unexpected data types. These changes improve code readability, maintainability, and error resilience across example scripts. * Update src/biz_bud/nodes/extraction/extractors.py Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com> * Update src/biz_bud/core/validation/pydantic_models.py Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com> * refactor: migrate Jina and Tavily clients to use ServiceFactory dependency injection * refactor: migrate URL processing to provider-based architecture with improved error handling * feat: add FirecrawlApp compatibility classes and mock implementations * fix: add thread-safe locking to LazyLoader factory management * feat: implement service restart and refactor cache decorator helpers * refactor: move r2r_direct_api_call to tools.clients.r2r_utils and improve HTTP service error handling * chore: update Sonar task IDs in report configuration --------- Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com>
777 lines
30 KiB
Python
Executable File
777 lines
30 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Check for modern typing patterns and Pydantic v2 usage across the codebase.
|
|
|
|
This script validates that the codebase uses modern Python 3.12+ typing patterns
|
|
and Pydantic v2 features, while ignoring legitimate compatibility-related type ignores.
|
|
|
|
Usage:
|
|
python scripts/checks/typing_modernization_check.py # Check src/
|
|
python scripts/checks/typing_modernization_check.py --tests # Include tests/
|
|
python scripts/checks/typing_modernization_check.py --verbose # Detailed output
|
|
python scripts/checks/typing_modernization_check.py --fix # Auto-fix simple issues
|
|
"""
|
|
|
|
import argparse
|
|
import ast
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import NamedTuple
|
|
|
|
# Define the project root
|
|
PROJECT_ROOT = Path(__file__).parent.parent.parent
|
|
|
|
# --- Constants for Pattern Matching ---
|
|
|
|
# Old typing imports that should be modernized
|
|
OLD_TYPING_IMPORTS = ["Union", "Optional", "Dict", "List", "Set", "Tuple"]
|
|
|
|
# Modern imports that can be moved from typing_extensions to typing
|
|
MODERNIZABLE_TYPING_EXTENSIONS = ["NotRequired", "Required", "TypedDict", "Literal"]
|
|
|
|
# Legitimate type ignore patterns for compatibility
|
|
LEGITIMATE_TYPE_IGNORE_PATTERNS = [
|
|
"import", # Import compatibility issues
|
|
"TCH", # TYPE_CHECKING related ignores
|
|
"overload", # Function overload issues
|
|
"protocol", # Protocol compatibility
|
|
"mypy", # Specific mypy version issues
|
|
"pyright", # Specific pyright issues
|
|
]
|
|
|
|
# File path patterns to skip
|
|
SKIP_PATH_PATTERNS = ["__pycache__", "migrations", "generated"]
|
|
|
|
# --- Error Messages ---
|
|
|
|
# Pydantic v1 to v2 migration messages
|
|
PYDANTIC_CONFIG_MESSAGE = "Use model_config = ConfigDict(...) instead of Config class"
|
|
PYDANTIC_CONFIG_SUGGESTION = "model_config = ConfigDict(...)"
|
|
|
|
PYDANTIC_MUTATION_MESSAGE = "'allow_mutation' is deprecated, use 'frozen' on model"
|
|
PYDANTIC_MUTATION_SUGGESTION = "Use frozen=True in model_config"
|
|
|
|
PYDANTIC_VALIDATOR_MESSAGE = "Use @field_validator instead of @validator"
|
|
PYDANTIC_VALIDATOR_SUGGESTION = "@field_validator('field_name')"
|
|
|
|
PYDANTIC_ROOT_VALIDATOR_MESSAGE = "Use @model_validator instead of @root_validator"
|
|
PYDANTIC_ROOT_VALIDATOR_SUGGESTION = "@model_validator(mode='before')"
|
|
|
|
# Typing modernization messages
|
|
UNION_SYNTAX_MESSAGE = "Use '|' syntax instead of Union"
|
|
OPTIONAL_SYNTAX_MESSAGE = "Use '| None' syntax instead of Optional"
|
|
BUILTIN_GENERIC_MESSAGE = "Use built-in generic"
|
|
|
|
TYPING_EXTENSIONS_MESSAGE = "These can be imported from typing"
|
|
UNNECESSARY_TRY_EXCEPT_MESSAGE = "Try/except for typing imports may be unnecessary in Python 3.12+"
|
|
UNNECESSARY_TRY_EXCEPT_SUGGESTION = "Direct import should work"
|
|
|
|
# --- Type checking constants ---
|
|
TYPING_MODULE = "typing"
|
|
TYPING_EXTENSIONS_MODULE = "typing_extensions"
|
|
DIRECT_IMPORT = "direct_import"
|
|
|
|
|
|
class Issue(NamedTuple):
|
|
"""Represents a typing/Pydantic issue found in the code."""
|
|
|
|
file_path: Path
|
|
line_number: int
|
|
issue_type: str
|
|
description: str
|
|
suggestion: str | None = None
|
|
|
|
|
|
class TypingChecker:
|
|
"""Main checker class for typing and Pydantic patterns."""
|
|
|
|
def __init__(
|
|
self, include_tests: bool = False, verbose: bool = False, fix: bool = False
|
|
):
|
|
"""Initialize the typing modernization checker."""
|
|
self.include_tests = include_tests
|
|
self.verbose = verbose
|
|
self.fix = fix
|
|
self.issues: list[Issue] = []
|
|
# Track imports per file to avoid false positives
|
|
self._file_imports: dict[str, dict[str, str]] = {}
|
|
# Track seen issues to avoid duplicates between regex and AST analysis
|
|
self._seen_issues: set[tuple[str, int, str]] = set()
|
|
|
|
# Paths to check
|
|
self.check_paths = [
|
|
PROJECT_ROOT / "src",
|
|
]
|
|
if include_tests:
|
|
self.check_paths.append(PROJECT_ROOT / "tests")
|
|
|
|
def check_all(self) -> list[Issue]:
|
|
"""Run all checks and return found issues."""
|
|
print(
|
|
f"🔍 Checking typing modernization in: {', '.join(str(p.name) for p in self.check_paths)}"
|
|
)
|
|
|
|
for path in self.check_paths:
|
|
if path.exists():
|
|
self._check_directory(path)
|
|
|
|
return self.issues
|
|
|
|
def _check_directory(self, path: Path) -> None:
|
|
"""Check a directory or individual file."""
|
|
if path.is_file():
|
|
# Single file
|
|
if not self._should_skip_file(path):
|
|
self._check_file(path)
|
|
else:
|
|
# Directory - recursively check all Python files
|
|
for py_file in path.rglob("*.py"):
|
|
if self._should_skip_file(py_file):
|
|
continue
|
|
self._check_file(py_file)
|
|
|
|
def _should_skip_file(self, file_path: Path) -> bool:
|
|
"""Determine if a file should be skipped from checking."""
|
|
# Skip files in __pycache__ or .git directories
|
|
if any(part.startswith(".") for part in file_path.parts):
|
|
return True
|
|
|
|
# Skip migration files or generated code
|
|
return any(pattern in str(file_path) for pattern in SKIP_PATH_PATTERNS)
|
|
|
|
def _check_file(self, file_path: Path) -> None:
|
|
"""Check a single Python file for typing and Pydantic issues."""
|
|
try:
|
|
content = file_path.read_text(encoding="utf-8")
|
|
lines = content.splitlines()
|
|
|
|
# Parse AST first to get import context
|
|
try:
|
|
tree = ast.parse(content)
|
|
self._analyze_imports(file_path, tree)
|
|
self._check_ast(file_path, tree, lines)
|
|
except SyntaxError:
|
|
# Skip files with syntax errors
|
|
pass
|
|
|
|
# Check each line for patterns with import context
|
|
for line_num, line in enumerate(lines, 1):
|
|
self._check_line(file_path, line_num, line, content)
|
|
|
|
except (UnicodeDecodeError, PermissionError) as e:
|
|
if self.verbose:
|
|
print(f"⚠️ Could not read {file_path}: {e}")
|
|
|
|
def _check_line(
|
|
self, file_path: Path, line_num: int, line: str, full_content: str
|
|
) -> None:
|
|
"""Check a single line for typing and Pydantic issues."""
|
|
stripped_line = line.strip()
|
|
|
|
# Skip comments and docstrings (unless they contain actual code)
|
|
if (
|
|
stripped_line.startswith("#")
|
|
or stripped_line.startswith('"""')
|
|
or stripped_line.startswith("'''")
|
|
):
|
|
return
|
|
|
|
# Skip legitimate type ignore comments for compatibility
|
|
if self._is_legitimate_type_ignore(line):
|
|
return
|
|
|
|
# Check for old typing imports
|
|
self._check_old_typing_imports(file_path, line_num, line)
|
|
|
|
# Check for old typing usage patterns
|
|
self._check_old_typing_patterns(file_path, line_num, line)
|
|
|
|
# Check for Pydantic v1 patterns
|
|
self._check_pydantic_v1_patterns(file_path, line_num, line)
|
|
|
|
# Check for specific modernization opportunities
|
|
self._check_modernization_opportunities(file_path, line_num, line)
|
|
|
|
def _check_ast(self, file_path: Path, tree: ast.AST, lines: list[str]) -> None:
|
|
"""Perform AST-based checks for more complex patterns."""
|
|
for node in ast.walk(tree):
|
|
# Check function annotations
|
|
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
|
|
self._check_function_annotations(file_path, node, lines)
|
|
|
|
# Check class definitions
|
|
elif isinstance(node, ast.ClassDef):
|
|
self._check_class_definition(file_path, node, lines)
|
|
|
|
# Check variable annotations
|
|
elif isinstance(node, ast.AnnAssign):
|
|
self._check_variable_annotation(file_path, node, lines)
|
|
|
|
def _is_legitimate_type_ignore(self, line: str) -> bool:
|
|
"""Check if a type ignore comment is for legitimate compatibility reasons."""
|
|
if "# type: ignore" not in line:
|
|
return False
|
|
|
|
return any(pattern in line.lower() for pattern in LEGITIMATE_TYPE_IGNORE_PATTERNS)
|
|
|
|
def _is_valid_old_import(self, line: str, import_name: str) -> bool:
|
|
"""Check if an import name is a valid old typing import to flag."""
|
|
# Check for exact word boundaries to avoid false positives like "TypedDict" containing "Dict"
|
|
pattern = rf"\b{import_name}\b"
|
|
if not re.search(pattern, line):
|
|
return False
|
|
|
|
# Additional check to ensure it's not part of a longer word like "TypedDict"
|
|
# Check for common patterns: " Dict", "Dict,", "Dict)", "(Dict", "Dict\n"
|
|
if not any([
|
|
f" {import_name}" in line,
|
|
f"{import_name}," in line,
|
|
f"{import_name})" in line,
|
|
f"({import_name}" in line,
|
|
line.strip().endswith(import_name)
|
|
]):
|
|
return False
|
|
|
|
# Exclude cases where it's part of a longer identifier
|
|
excluded_patterns = [
|
|
f"Typed{import_name}",
|
|
f"{import_name}Type",
|
|
f"_{import_name}",
|
|
f"{import_name}_",
|
|
]
|
|
return not any(longer in line for longer in excluded_patterns)
|
|
|
|
def _check_old_typing_imports(
|
|
self, file_path: Path, line_num: int, line: str
|
|
) -> None:
|
|
"""Check for old typing imports that should be modernized."""
|
|
# Pattern: from typing import Union, Optional, Dict, List, etc.
|
|
if "from typing import" not in line:
|
|
return
|
|
|
|
found_old = [
|
|
imp for imp in OLD_TYPING_IMPORTS
|
|
if self._is_valid_old_import(line, imp)
|
|
]
|
|
|
|
if found_old:
|
|
suggestion = self._suggest_import_fix(line, found_old)
|
|
self.issues.append(
|
|
Issue(
|
|
file_path=file_path,
|
|
line_number=line_num,
|
|
issue_type="old_typing_import",
|
|
description=f"Old typing imports: {', '.join(found_old)}",
|
|
suggestion=suggestion,
|
|
)
|
|
)
|
|
|
|
def _add_union_issue(self, file_path: Path, line_num: int, inner_content: str) -> None:
|
|
"""Add a Union syntax issue if valid."""
|
|
type_parts = self._parse_comma_separated_types(inner_content)
|
|
if len(type_parts) <= 1:
|
|
return
|
|
|
|
suggestion = " | ".join(type_parts)
|
|
if not self._validate_suggestion(suggestion):
|
|
return
|
|
|
|
issue_key = (str(file_path), line_num, suggestion)
|
|
if issue_key in self._seen_issues:
|
|
return
|
|
|
|
self._seen_issues.add(issue_key)
|
|
self.issues.append(
|
|
Issue(
|
|
file_path=file_path,
|
|
line_number=line_num,
|
|
issue_type="old_union_syntax",
|
|
description=f"{UNION_SYNTAX_MESSAGE}: Union[{inner_content}]",
|
|
suggestion=suggestion,
|
|
)
|
|
)
|
|
|
|
def _add_optional_issue(self, file_path: Path, line_num: int, inner_content: str) -> None:
|
|
"""Add an Optional syntax issue if valid."""
|
|
suggestion = f"{inner_content} | None"
|
|
if self._validate_suggestion(suggestion):
|
|
self.issues.append(
|
|
Issue(
|
|
file_path=file_path,
|
|
line_number=line_num,
|
|
issue_type="old_optional_syntax",
|
|
description=f"{OPTIONAL_SYNTAX_MESSAGE}: Optional[{inner_content}]",
|
|
suggestion=suggestion,
|
|
)
|
|
)
|
|
|
|
def _add_generic_issue(self, file_path: Path, line_num: int, old_type: str, inner_content: str) -> None:
|
|
"""Add a generic type issue if valid."""
|
|
suggestion = f"{old_type.lower()}[{inner_content}]"
|
|
if self._validate_suggestion(suggestion):
|
|
self.issues.append(
|
|
Issue(
|
|
file_path=file_path,
|
|
line_number=line_num,
|
|
issue_type="old_generic_syntax",
|
|
description=f"{BUILTIN_GENERIC_MESSAGE}: {old_type}[{inner_content}]",
|
|
suggestion=suggestion,
|
|
)
|
|
)
|
|
|
|
def _check_old_typing_patterns(
|
|
self, file_path: Path, line_num: int, line: str
|
|
) -> None:
|
|
"""Check for old typing usage patterns."""
|
|
file_key = str(file_path)
|
|
imports = self._file_imports.get(file_key, {})
|
|
|
|
# Union[X, Y] should be X | Y - handle nested brackets properly
|
|
union_matches = self._find_balanced_brackets(line, "Union")
|
|
for match in union_matches:
|
|
self._add_union_issue(file_path, line_num, match["content"])
|
|
|
|
# Optional[X] should be X | None - handle nested brackets properly
|
|
optional_matches = self._find_balanced_brackets(line, "Optional")
|
|
for match in optional_matches:
|
|
self._add_optional_issue(file_path, line_num, match["content"])
|
|
|
|
# Dict[K, V] should be dict[K, V] - only if imported from typing
|
|
for old_type in ["Dict", "List", "Set", "Tuple"]:
|
|
# Check if this type is imported from typing
|
|
if old_type in imports and imports[old_type] == TYPING_MODULE:
|
|
matches = self._find_balanced_brackets(line, old_type)
|
|
for match in matches:
|
|
self._add_generic_issue(file_path, line_num, old_type, match["content"])
|
|
|
|
def _check_pydantic_v1_patterns(
|
|
self, file_path: Path, line_num: int, line: str
|
|
) -> None:
|
|
"""Check for Pydantic v1 patterns that should be v2."""
|
|
# Config class instead of model_config
|
|
if "class Config:" in line:
|
|
self.issues.append(
|
|
Issue(
|
|
file_path=file_path,
|
|
line_number=line_num,
|
|
issue_type="pydantic_v1_config",
|
|
description=PYDANTIC_CONFIG_MESSAGE,
|
|
suggestion=PYDANTIC_CONFIG_SUGGESTION,
|
|
)
|
|
)
|
|
|
|
# Old field syntax
|
|
if re.search(r"Field\([^)]*allow_mutation\s*=", line):
|
|
self.issues.append(
|
|
Issue(
|
|
file_path=file_path,
|
|
line_number=line_num,
|
|
issue_type="pydantic_v1_field",
|
|
description=PYDANTIC_MUTATION_MESSAGE,
|
|
suggestion=PYDANTIC_MUTATION_SUGGESTION,
|
|
)
|
|
)
|
|
|
|
# Old validator syntax
|
|
if "@validator" in line:
|
|
self.issues.append(
|
|
Issue(
|
|
file_path=file_path,
|
|
line_number=line_num,
|
|
issue_type="pydantic_v1_validator",
|
|
description=PYDANTIC_VALIDATOR_MESSAGE,
|
|
suggestion=PYDANTIC_VALIDATOR_SUGGESTION,
|
|
)
|
|
)
|
|
|
|
# Old root_validator syntax
|
|
if "@root_validator" in line:
|
|
self.issues.append(
|
|
Issue(
|
|
file_path=file_path,
|
|
line_number=line_num,
|
|
issue_type="pydantic_v1_root_validator",
|
|
description=PYDANTIC_ROOT_VALIDATOR_MESSAGE,
|
|
suggestion=PYDANTIC_ROOT_VALIDATOR_SUGGESTION,
|
|
)
|
|
)
|
|
|
|
def _check_modernization_opportunities(
|
|
self, file_path: Path, line_num: int, line: str
|
|
) -> None:
|
|
"""Check for other modernization opportunities."""
|
|
# typing_extensions imports that can be replaced
|
|
if "from typing_extensions import" in line:
|
|
found_modern = [
|
|
imp for imp in MODERNIZABLE_TYPING_EXTENSIONS
|
|
if f" {imp}" in line or f"{imp}," in line
|
|
]
|
|
|
|
if found_modern:
|
|
self.issues.append(
|
|
Issue(
|
|
file_path=file_path,
|
|
line_number=line_num,
|
|
issue_type="typing_extensions_modernizable",
|
|
description=f"{TYPING_EXTENSIONS_MESSAGE}: {', '.join(found_modern)}",
|
|
suggestion=f"from typing import {', '.join(found_modern)}",
|
|
)
|
|
)
|
|
|
|
# Old try/except for typing imports
|
|
if "try:" in line and "from typing import" in line:
|
|
self.issues.append(
|
|
Issue(
|
|
file_path=file_path,
|
|
line_number=line_num,
|
|
issue_type="unnecessary_typing_try_except",
|
|
description=UNNECESSARY_TRY_EXCEPT_MESSAGE,
|
|
suggestion=UNNECESSARY_TRY_EXCEPT_SUGGESTION,
|
|
)
|
|
)
|
|
|
|
def _check_function_annotations(
|
|
self,
|
|
file_path: Path,
|
|
node: ast.FunctionDef | ast.AsyncFunctionDef,
|
|
lines: list[str],
|
|
) -> None:
|
|
"""Check function annotations for modernization opportunities."""
|
|
# Check return type annotation
|
|
if node.returns:
|
|
self._check_annotation_node(file_path, node.returns, lines, node.lineno, "return type")
|
|
|
|
# Check argument annotations
|
|
for arg in node.args.args:
|
|
if arg.annotation:
|
|
self._check_annotation_node(file_path, arg.annotation, lines, arg.lineno, f"parameter '{arg.arg}'")
|
|
|
|
def _check_class_definition(
|
|
self, file_path: Path, node: ast.ClassDef, lines: list[str]
|
|
) -> None:
|
|
"""Check class definitions for modernization opportunities."""
|
|
# Check for TypedDict with total=False patterns that could be simplified
|
|
if any(
|
|
isinstance(base, ast.Name) and base.id == "TypedDict" for base in node.bases
|
|
):
|
|
# Could check for NotRequired vs total=False patterns
|
|
pass
|
|
|
|
def _check_variable_annotation(
|
|
self, file_path: Path, node: ast.AnnAssign, lines: list[str]
|
|
) -> None:
|
|
"""Check variable annotations for modernization opportunities."""
|
|
if node.annotation:
|
|
self._check_annotation_node(file_path, node.annotation, lines, node.lineno, "variable annotation")
|
|
|
|
def _check_annotation_node(
|
|
self, file_path: Path, annotation: ast.AST, lines: list[str], line_num: int, context: str
|
|
) -> None:
|
|
"""Check a specific annotation AST node for modernization opportunities."""
|
|
file_key = str(file_path)
|
|
imports = self._file_imports.get(file_key, {})
|
|
|
|
# Convert AST node back to string for analysis
|
|
try:
|
|
import ast as ast_module
|
|
annotation_str = ast_module.unparse(annotation)
|
|
except (AttributeError, Exception):
|
|
# Fallback for older Python versions or complex nodes
|
|
return
|
|
|
|
# Check for Union patterns in AST
|
|
if isinstance(annotation, ast.Subscript):
|
|
if isinstance(annotation.value, ast.Name):
|
|
type_name = annotation.value.id
|
|
|
|
# Union[X, Y] -> X | Y (only if Union imported from typing)
|
|
if type_name == "Union" and type_name in imports and imports[type_name] == TYPING_MODULE:
|
|
self._handle_union_ast(file_path, line_num, annotation, context, annotation_str, ast_module)
|
|
|
|
# Optional[X] -> X | None (only if Optional imported from typing)
|
|
elif type_name == "Optional" and type_name in imports and imports[type_name] == TYPING_MODULE:
|
|
self._handle_optional_ast(file_path, line_num, annotation, context, annotation_str, ast_module)
|
|
|
|
# Dict/List/Set/Tuple[...] -> dict/list/set/tuple[...] (only if imported from typing)
|
|
elif type_name in ("Dict", "List", "Set", "Tuple") and type_name in imports and imports[type_name] == TYPING_MODULE:
|
|
self._handle_generic_ast(file_path, line_num, annotation, context, annotation_str, type_name, ast_module)
|
|
|
|
def _handle_union_ast(self, file_path: Path, line_num: int, annotation: ast.Subscript,
|
|
context: str, annotation_str: str, ast_module) -> None:
|
|
"""Handle Union annotation AST processing."""
|
|
# Extract the union types from the subscript
|
|
if isinstance(annotation.slice, ast.Tuple):
|
|
type_parts = []
|
|
for elt in annotation.slice.elts:
|
|
try:
|
|
part_str = ast_module.unparse(elt)
|
|
type_parts.append(part_str)
|
|
except Exception:
|
|
continue
|
|
|
|
if len(type_parts) > 1:
|
|
suggestion = " | ".join(type_parts)
|
|
issue_key = (str(file_path), line_num, suggestion)
|
|
if issue_key not in self._seen_issues:
|
|
self._seen_issues.add(issue_key)
|
|
self.issues.append(
|
|
Issue(
|
|
file_path=file_path,
|
|
line_number=line_num,
|
|
issue_type="old_union_syntax_ast",
|
|
description=f"{UNION_SYNTAX_MESSAGE} in {context}: {annotation_str}",
|
|
suggestion=suggestion,
|
|
)
|
|
)
|
|
|
|
def _handle_optional_ast(self, file_path: Path, line_num: int, annotation: ast.Subscript,
|
|
context: str, annotation_str: str, ast_module) -> None:
|
|
"""Handle Optional annotation AST processing."""
|
|
try:
|
|
inner_type = ast_module.unparse(annotation.slice)
|
|
suggestion = f"{inner_type} | None"
|
|
self.issues.append(
|
|
Issue(
|
|
file_path=file_path,
|
|
line_number=line_num,
|
|
issue_type="old_optional_syntax_ast",
|
|
description=f"{OPTIONAL_SYNTAX_MESSAGE} in {context}: {annotation_str}",
|
|
suggestion=suggestion,
|
|
)
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
def _handle_generic_ast(self, file_path: Path, line_num: int, annotation: ast.Subscript,
|
|
context: str, annotation_str: str, type_name: str, ast_module) -> None:
|
|
"""Handle generic type annotation AST processing."""
|
|
try:
|
|
inner_type = ast_module.unparse(annotation.slice)
|
|
suggestion = f"{type_name.lower()}[{inner_type}]"
|
|
self.issues.append(
|
|
Issue(
|
|
file_path=file_path,
|
|
line_number=line_num,
|
|
issue_type="old_generic_syntax_ast",
|
|
description=f"{BUILTIN_GENERIC_MESSAGE} in {context}: {annotation_str}",
|
|
suggestion=suggestion,
|
|
)
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
def _analyze_imports(self, file_path: Path, tree: ast.AST) -> None:
|
|
"""Analyze imports in the file to track where types come from."""
|
|
file_key = str(file_path)
|
|
self._file_imports[file_key] = {}
|
|
|
|
for node in ast.walk(tree):
|
|
if isinstance(node, ast.ImportFrom) and node.module in (TYPING_MODULE, TYPING_EXTENSIONS_MODULE):
|
|
# At this point, node.module is guaranteed to be one of the strings we checked
|
|
import_from_node = node # Type is now narrowed to ast.ImportFrom
|
|
for alias in import_from_node.names or []:
|
|
name = alias.asname or alias.name
|
|
# We know module is not None because we checked it above
|
|
assert import_from_node.module is not None, "Module should not be None after check"
|
|
self._file_imports[file_key][name] = import_from_node.module
|
|
elif isinstance(node, ast.Import):
|
|
for alias in node.names:
|
|
name = alias.asname or alias.name
|
|
self._file_imports[file_key][name] = DIRECT_IMPORT
|
|
|
|
def _find_balanced_brackets(self, text: str, type_name: str) -> list[dict[str, str]]:
|
|
"""Find all occurrences of Type[...] with properly balanced brackets."""
|
|
matches = []
|
|
pattern = f"{type_name}["
|
|
start = 0
|
|
|
|
while True:
|
|
pos = text.find(pattern, start)
|
|
if pos == -1:
|
|
break
|
|
|
|
# Ensure this is a word boundary (not part of a longer identifier)
|
|
if pos > 0 and (text[pos - 1].isalnum() or text[pos - 1] == "_"):
|
|
start = pos + 1
|
|
continue
|
|
|
|
# Find the matching closing bracket
|
|
bracket_pos = pos + len(pattern) - 1
|
|
bracket_count = 1
|
|
i = bracket_pos + 1
|
|
|
|
while i < len(text) and bracket_count > 0:
|
|
if text[i] == "[":
|
|
bracket_count += 1
|
|
elif text[i] == "]":
|
|
bracket_count -= 1
|
|
i += 1
|
|
|
|
if bracket_count == 0:
|
|
content = text[bracket_pos + 1:i - 1]
|
|
matches.append({
|
|
"full_match": text[pos:i],
|
|
"content": content,
|
|
"start": pos,
|
|
"end": i
|
|
})
|
|
|
|
start = pos + 1
|
|
|
|
return matches
|
|
|
|
def _parse_comma_separated_types(self, content: str) -> list[str]:
|
|
"""Parse comma-separated types handling nested brackets."""
|
|
types = []
|
|
current_type = ""
|
|
bracket_count = 0
|
|
|
|
for char in content:
|
|
if char == "[":
|
|
bracket_count += 1
|
|
current_type += char
|
|
elif char == "]":
|
|
bracket_count -= 1
|
|
current_type += char
|
|
elif char == "," and bracket_count == 0:
|
|
types.append(current_type.strip())
|
|
current_type = ""
|
|
else:
|
|
current_type += char
|
|
|
|
if current_type.strip():
|
|
types.append(current_type.strip())
|
|
|
|
return types
|
|
|
|
def _validate_suggestion(self, suggestion: str) -> bool:
|
|
"""Validate that a suggested fix is syntactically correct."""
|
|
try:
|
|
# Try to parse the suggestion as a type annotation
|
|
test_code = f"x: {suggestion}"
|
|
ast.parse(test_code)
|
|
return True
|
|
except SyntaxError:
|
|
return False
|
|
|
|
def _suggest_import_fix(self, line: str, old_imports: list[str]) -> str:
|
|
"""Suggest how to fix old typing imports."""
|
|
# Remove old imports and suggest modern alternatives
|
|
suggestions = []
|
|
if "Union" in old_imports:
|
|
suggestions.append("Use 'X | Y' syntax instead of Union")
|
|
if "Optional" in old_imports:
|
|
suggestions.append("Use 'X | None' instead of Optional")
|
|
if any(imp in old_imports for imp in ["Dict", "List", "Set", "Tuple"]):
|
|
suggestions.append("Use built-in generics (dict, list, set, tuple)")
|
|
|
|
return "; ".join(suggestions)
|
|
|
|
def print_results(self) -> None:
|
|
"""Print the results of the check."""
|
|
if not self.issues:
|
|
print("✅ No typing modernization issues found!")
|
|
return
|
|
|
|
# Group issues by type
|
|
issues_by_type: dict[str, list[Issue]] = {}
|
|
for issue in self.issues:
|
|
issues_by_type.setdefault(issue.issue_type, []).append(issue)
|
|
|
|
print(f"\n❌ Found {len(self.issues)} typing modernization issues:")
|
|
print("=" * 60)
|
|
|
|
for issue_type, type_issues in issues_by_type.items():
|
|
print(
|
|
f"\n🔸 {issue_type.replace('_', ' ').title()} ({len(type_issues)} issues)"
|
|
)
|
|
print("-" * 40)
|
|
|
|
for issue in type_issues:
|
|
rel_path = issue.file_path.relative_to(PROJECT_ROOT)
|
|
print(f" 📁 {rel_path}:{issue.line_number}")
|
|
print(f" {issue.description}")
|
|
if issue.suggestion and self.verbose:
|
|
print(f" 💡 Suggestion: {issue.suggestion}")
|
|
print()
|
|
|
|
# Summary
|
|
print("=" * 60)
|
|
print(
|
|
f"Summary: {len(self.issues)} issues across {len(set(i.file_path for i in self.issues))} files"
|
|
)
|
|
|
|
# Recommendations
|
|
print("\n📝 Quick fixes:")
|
|
print("1. Replace Union[X, Y] with X | Y")
|
|
print("2. Replace Optional[X] with X | None")
|
|
print("3. Replace Dict/List/Set/Tuple with dict/list/set/tuple")
|
|
print("4. Update Pydantic v1 patterns to v2")
|
|
print("5. Use direct imports from typing instead of typing_extensions")
|
|
|
|
|
|
def main() -> int:
|
|
"""Run the main entry point for the script."""
|
|
parser = argparse.ArgumentParser(
|
|
description="Check for modern typing patterns and Pydantic v2 usage",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
python scripts/checks/typing_modernization_check.py
|
|
python scripts/checks/typing_modernization_check.py --tests --verbose
|
|
python scripts/checks/typing_modernization_check.py --fix
|
|
""",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--tests", action="store_true", help="Include tests/ directory in checks"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--verbose",
|
|
"-v",
|
|
action="store_true",
|
|
help="Show detailed output including suggestions",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--fix",
|
|
action="store_true",
|
|
help="Attempt to auto-fix simple issues (not implemented yet)",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--quiet",
|
|
"-q",
|
|
action="store_true",
|
|
help="Only show summary, no detailed issues",
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.fix:
|
|
print("⚠️ Auto-fix functionality not implemented yet")
|
|
return 1
|
|
|
|
# Run the checker
|
|
checker = TypingChecker(
|
|
include_tests=args.tests, verbose=args.verbose and not args.quiet, fix=args.fix
|
|
)
|
|
|
|
issues = checker.check_all()
|
|
|
|
if not args.quiet:
|
|
checker.print_results()
|
|
else:
|
|
if issues:
|
|
print(f"❌ Found {len(issues)} typing modernization issues")
|
|
else:
|
|
print("✅ No typing modernization issues found!")
|
|
|
|
# Return exit code
|
|
return 1 if issues else 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|