diff --git a/hooks/bash_command_guard.py b/hooks/bash_command_guard.py index 850cc57..bd67e98 100644 --- a/hooks/bash_command_guard.py +++ b/hooks/bash_command_guard.py @@ -4,12 +4,14 @@ Prevents circumvention of type safety rules via shell commands that could inject 'Any' types or type ignore comments into Python files. """ +import fcntl import json +import os import re import subprocess import sys -import threading -import time +import tempfile +from contextlib import contextmanager from pathlib import Path from shutil import which from typing import TypedDict @@ -40,9 +42,40 @@ class JsonObject(TypedDict, total=False): hookSpecificOutput: dict[str, object] -# Global lock for subprocess operations to prevent concurrency issues -_subprocess_lock = threading.Lock() -_lock_timeout = 5.0 # Timeout for acquiring lock +# File-based lock for inter-process synchronization +def _get_lock_file() -> Path: + """Get path to lock file for subprocess serialization.""" + lock_dir = Path(tempfile.gettempdir()) / ".claude_hooks" + lock_dir.mkdir(exist_ok=True, mode=0o700) + return lock_dir / "subprocess.lock" + + +@contextmanager +def _subprocess_lock(timeout: float = 5.0): + """Context manager for file-based subprocess locking. + + Args: + timeout: Timeout in seconds for acquiring lock. + + Yields: + True if lock was acquired, False if timeout occurred. + """ + lock_file = _get_lock_file() + + # Open or create lock file + with open(lock_file, "a") as f: + try: + # Try to acquire exclusive lock (non-blocking) + fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) + yield True + except (IOError, OSError): + # Lock is held by another process, skip to avoid blocking + yield False + finally: + try: + fcntl.flock(f.fileno(), fcntl.LOCK_UN) + except (IOError, OSError): + pass def _contains_forbidden_pattern(text: str) -> tuple[bool, str | None]: @@ -328,12 +361,11 @@ def _get_staged_python_files() -> list[str]: return [] try: - # Acquire lock with timeout to prevent concurrency issues - acquired = _subprocess_lock.acquire(timeout=_lock_timeout) - if not acquired: - return [] + # Acquire file-based lock to prevent subprocess concurrency issues + with _subprocess_lock(timeout=5.0) as acquired: + if not acquired: + return [] - try: # Safe: invokes git with fixed arguments, no user input interpolation. result = subprocess.run( # noqa: S603 [git_path, "diff", "--name-only", "--cached"], @@ -351,8 +383,6 @@ def _get_staged_python_files() -> list[str]: for file_name in result.stdout.split("\n") if file_name.strip() and file_name.strip().endswith((".py", ".pyi")) ] - finally: - _subprocess_lock.release() except (OSError, subprocess.SubprocessError, TimeoutError): return [] diff --git a/hooks/code_quality_guard.py b/hooks/code_quality_guard.py index 834b393..6a99811 100644 --- a/hooks/code_quality_guard.py +++ b/hooks/code_quality_guard.py @@ -5,6 +5,7 @@ after writes. """ import ast +import fcntl import hashlib import json import logging @@ -14,10 +15,9 @@ import shutil import subprocess import sys import textwrap -import threading import tokenize from collections.abc import Callable -from contextlib import suppress +from contextlib import contextmanager, suppress from dataclasses import dataclass from datetime import UTC, datetime from importlib import import_module @@ -57,9 +57,42 @@ else: DuplicateResults = module.DuplicateResults detect_internal_duplicates = module.detect_internal_duplicates -# Global lock for subprocess operations to prevent concurrency issues -_subprocess_lock = threading.Lock() -_lock_timeout = 10.0 # Timeout for acquiring lock in seconds + +# File-based lock for inter-process synchronization +def _get_lock_file() -> Path: + """Get path to lock file for subprocess serialization.""" + lock_dir = Path(gettempdir()) / ".claude_hooks" + lock_dir.mkdir(exist_ok=True, mode=0o700) + return lock_dir / "subprocess.lock" + + +@contextmanager +def _subprocess_lock(timeout: float = 10.0): + """Context manager for file-based subprocess locking. + + Args: + timeout: Timeout in seconds for acquiring lock (not used, non-blocking). + + Yields: + True if lock was acquired, False if timeout occurred. + """ + lock_file = _get_lock_file() + + # Open or create lock file + with open(lock_file, "a") as f: + try: + # Try to acquire exclusive lock (non-blocking) + fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) + yield True + except (IOError, OSError): + # Lock is held by another process, skip to avoid blocking + yield False + finally: + try: + fcntl.flock(f.fileno(), fcntl.LOCK_UN) + except (IOError, OSError): + pass + SUSPICIOUS_SUFFIXES: tuple[str, ...] = ( "enhanced", @@ -667,61 +700,59 @@ def _run_type_checker( if not tool_config: return True, f"Warning: Unknown tool {tool_name}" - # Acquire lock with timeout to prevent concurrency issues - acquired = _subprocess_lock.acquire(timeout=_lock_timeout) - if not acquired: - return True, f"Warning: {tool_name} lock timeout" + # Acquire file-based lock to prevent subprocess concurrency issues + with _subprocess_lock(timeout=10.0) as acquired: + if not acquired: + return True, f"Warning: {tool_name} lock timeout" - try: - cmd = [str(tool_path)] + tool_config["args"] + try: + cmd = [str(tool_path)] + tool_config["args"] - # Activate virtual environment for the subprocess - env = os.environ.copy() - env["VIRTUAL_ENV"] = str(venv_bin.parent) - env["PATH"] = f"{venv_bin}:{env.get('PATH', '')}" - # Remove any PYTHONHOME that might interfere - env.pop("PYTHONHOME", None) + # Activate virtual environment for the subprocess + env = os.environ.copy() + env["VIRTUAL_ENV"] = str(venv_bin.parent) + env["PATH"] = f"{venv_bin}:{env.get('PATH', '')}" + # Remove any PYTHONHOME that might interfere + env.pop("PYTHONHOME", None) - # Add PYTHONPATH=src if src directory exists in project root - # This allows type checkers to resolve imports from src/ - project_root = venv_bin.parent.parent # Go from .venv/bin to project root - src_dir = project_root / "src" - if src_dir.exists() and src_dir.is_dir(): - existing_pythonpath = env.get("PYTHONPATH", "") - if existing_pythonpath: - env["PYTHONPATH"] = f"{src_dir}:{existing_pythonpath}" - else: - env["PYTHONPATH"] = str(src_dir) + # Add PYTHONPATH=src if src directory exists in project root + # This allows type checkers to resolve imports from src/ + project_root = venv_bin.parent.parent # Go from .venv/bin to project root + src_dir = project_root / "src" + if src_dir.exists() and src_dir.is_dir(): + existing_pythonpath = env.get("PYTHONPATH", "") + if existing_pythonpath: + env["PYTHONPATH"] = f"{src_dir}:{existing_pythonpath}" + else: + env["PYTHONPATH"] = str(src_dir) - # Run type checker from project root to pick up project configuration files - result = subprocess.run( # noqa: S603 - cmd, - check=False, - capture_output=True, - text=True, - timeout=30, - env=env, - cwd=str(project_root), - ) + # Run type checker from project root to pick up project configuration files + result = subprocess.run( # noqa: S603 + cmd, + check=False, + capture_output=True, + text=True, + timeout=30, + env=env, + cwd=str(project_root), + ) - # Check for tool-specific errors - error_check = tool_config["error_check"] - if error_check(result): - error_msg = tool_config["error_message"] - message = str(error_msg(result) if callable(error_msg) else error_msg) - return False, message + # Check for tool-specific errors + error_check = tool_config["error_check"] + if error_check(result): + error_msg = tool_config["error_message"] + message = str(error_msg(result) if callable(error_msg) else error_msg) + return False, message - # Return success or warning - if result.returncode == 0: - return True, "" - return True, f"Warning: {tool_name} error (exit {result.returncode})" + # Return success or warning + if result.returncode == 0: + return True, "" + return True, f"Warning: {tool_name} error (exit {result.returncode})" - except subprocess.TimeoutExpired: - return True, f"Warning: {tool_name} timeout" - except OSError: - return True, f"Warning: {tool_name} execution error" - finally: - _subprocess_lock.release() + except subprocess.TimeoutExpired: + return True, f"Warning: {tool_name} timeout" + except OSError: + return True, f"Warning: {tool_name} execution error" def _initialize_analysis() -> tuple[AnalysisResults, list[str]]: @@ -817,9 +848,8 @@ def _run_quality_analyses( env.pop("PYTHONHOME", None) # Acquire lock to prevent subprocess concurrency issues - acquired = _subprocess_lock.acquire(timeout=_lock_timeout) - if acquired: - try: + with _subprocess_lock(timeout=10.0) as acquired: + if acquired: with suppress(subprocess.TimeoutExpired): result = subprocess.run( # noqa: S603 cmd, @@ -832,8 +862,6 @@ def _run_quality_analyses( if result.returncode == 0: with suppress(json.JSONDecodeError): results["complexity"] = json.loads(result.stdout) - finally: - _subprocess_lock.release() # Run type checking if any tool is enabled if enable_type_checks and any( @@ -873,9 +901,8 @@ def _run_quality_analyses( env.pop("PYTHONHOME", None) # Acquire lock to prevent subprocess concurrency issues - acquired = _subprocess_lock.acquire(timeout=_lock_timeout) - if acquired: - try: + with _subprocess_lock(timeout=10.0) as acquired: + if acquired: with suppress(subprocess.TimeoutExpired): result = subprocess.run( # noqa: S603 cmd, @@ -888,8 +915,6 @@ def _run_quality_analyses( if result.returncode == 0: with suppress(json.JSONDecodeError): results["modernization"] = json.loads(result.stdout) - finally: - _subprocess_lock.release() return results