From 6a164be2e35792347a236c0a848e6bc02ee23c21 Mon Sep 17 00:00:00 2001 From: Travis Vasceannie Date: Tue, 21 Oct 2025 04:59:02 +0000 Subject: [PATCH] fix: switch to file-based locks for inter-process subprocess synchronization - Replace threading locks with fcntl file-based locks for proper inter-process synchronization - Hooks run as separate processes, so threading locks don't work across invocations - Implement non-blocking lock acquisition to prevent hook deadlocks - Use fcntl.flock on a shared lock file in /tmp/.claude_hooks/subprocess.lock - Simplify lock usage with context manager pattern in both hooks - Ensure graceful fallback if lock can't be acquired (e.g., due to concurrent hooks) This properly fixes the API Error 400 concurrency issues by serializing subprocess operations across all hook invocations, not just within a single process. --- hooks/bash_command_guard.py | 54 ++++++++++--- hooks/code_quality_guard.py | 151 +++++++++++++++++++++--------------- 2 files changed, 130 insertions(+), 75 deletions(-) diff --git a/hooks/bash_command_guard.py b/hooks/bash_command_guard.py index 850cc57..bd67e98 100644 --- a/hooks/bash_command_guard.py +++ b/hooks/bash_command_guard.py @@ -4,12 +4,14 @@ Prevents circumvention of type safety rules via shell commands that could inject 'Any' types or type ignore comments into Python files. """ +import fcntl import json +import os import re import subprocess import sys -import threading -import time +import tempfile +from contextlib import contextmanager from pathlib import Path from shutil import which from typing import TypedDict @@ -40,9 +42,40 @@ class JsonObject(TypedDict, total=False): hookSpecificOutput: dict[str, object] -# Global lock for subprocess operations to prevent concurrency issues -_subprocess_lock = threading.Lock() -_lock_timeout = 5.0 # Timeout for acquiring lock +# File-based lock for inter-process synchronization +def _get_lock_file() -> Path: + """Get path to lock file for subprocess serialization.""" + lock_dir = Path(tempfile.gettempdir()) / ".claude_hooks" + lock_dir.mkdir(exist_ok=True, mode=0o700) + return lock_dir / "subprocess.lock" + + +@contextmanager +def _subprocess_lock(timeout: float = 5.0): + """Context manager for file-based subprocess locking. + + Args: + timeout: Timeout in seconds for acquiring lock. + + Yields: + True if lock was acquired, False if timeout occurred. + """ + lock_file = _get_lock_file() + + # Open or create lock file + with open(lock_file, "a") as f: + try: + # Try to acquire exclusive lock (non-blocking) + fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) + yield True + except (IOError, OSError): + # Lock is held by another process, skip to avoid blocking + yield False + finally: + try: + fcntl.flock(f.fileno(), fcntl.LOCK_UN) + except (IOError, OSError): + pass def _contains_forbidden_pattern(text: str) -> tuple[bool, str | None]: @@ -328,12 +361,11 @@ def _get_staged_python_files() -> list[str]: return [] try: - # Acquire lock with timeout to prevent concurrency issues - acquired = _subprocess_lock.acquire(timeout=_lock_timeout) - if not acquired: - return [] + # Acquire file-based lock to prevent subprocess concurrency issues + with _subprocess_lock(timeout=5.0) as acquired: + if not acquired: + return [] - try: # Safe: invokes git with fixed arguments, no user input interpolation. result = subprocess.run( # noqa: S603 [git_path, "diff", "--name-only", "--cached"], @@ -351,8 +383,6 @@ def _get_staged_python_files() -> list[str]: for file_name in result.stdout.split("\n") if file_name.strip() and file_name.strip().endswith((".py", ".pyi")) ] - finally: - _subprocess_lock.release() except (OSError, subprocess.SubprocessError, TimeoutError): return [] diff --git a/hooks/code_quality_guard.py b/hooks/code_quality_guard.py index 834b393..6a99811 100644 --- a/hooks/code_quality_guard.py +++ b/hooks/code_quality_guard.py @@ -5,6 +5,7 @@ after writes. """ import ast +import fcntl import hashlib import json import logging @@ -14,10 +15,9 @@ import shutil import subprocess import sys import textwrap -import threading import tokenize from collections.abc import Callable -from contextlib import suppress +from contextlib import contextmanager, suppress from dataclasses import dataclass from datetime import UTC, datetime from importlib import import_module @@ -57,9 +57,42 @@ else: DuplicateResults = module.DuplicateResults detect_internal_duplicates = module.detect_internal_duplicates -# Global lock for subprocess operations to prevent concurrency issues -_subprocess_lock = threading.Lock() -_lock_timeout = 10.0 # Timeout for acquiring lock in seconds + +# File-based lock for inter-process synchronization +def _get_lock_file() -> Path: + """Get path to lock file for subprocess serialization.""" + lock_dir = Path(gettempdir()) / ".claude_hooks" + lock_dir.mkdir(exist_ok=True, mode=0o700) + return lock_dir / "subprocess.lock" + + +@contextmanager +def _subprocess_lock(timeout: float = 10.0): + """Context manager for file-based subprocess locking. + + Args: + timeout: Timeout in seconds for acquiring lock (not used, non-blocking). + + Yields: + True if lock was acquired, False if timeout occurred. + """ + lock_file = _get_lock_file() + + # Open or create lock file + with open(lock_file, "a") as f: + try: + # Try to acquire exclusive lock (non-blocking) + fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) + yield True + except (IOError, OSError): + # Lock is held by another process, skip to avoid blocking + yield False + finally: + try: + fcntl.flock(f.fileno(), fcntl.LOCK_UN) + except (IOError, OSError): + pass + SUSPICIOUS_SUFFIXES: tuple[str, ...] = ( "enhanced", @@ -667,61 +700,59 @@ def _run_type_checker( if not tool_config: return True, f"Warning: Unknown tool {tool_name}" - # Acquire lock with timeout to prevent concurrency issues - acquired = _subprocess_lock.acquire(timeout=_lock_timeout) - if not acquired: - return True, f"Warning: {tool_name} lock timeout" + # Acquire file-based lock to prevent subprocess concurrency issues + with _subprocess_lock(timeout=10.0) as acquired: + if not acquired: + return True, f"Warning: {tool_name} lock timeout" - try: - cmd = [str(tool_path)] + tool_config["args"] + try: + cmd = [str(tool_path)] + tool_config["args"] - # Activate virtual environment for the subprocess - env = os.environ.copy() - env["VIRTUAL_ENV"] = str(venv_bin.parent) - env["PATH"] = f"{venv_bin}:{env.get('PATH', '')}" - # Remove any PYTHONHOME that might interfere - env.pop("PYTHONHOME", None) + # Activate virtual environment for the subprocess + env = os.environ.copy() + env["VIRTUAL_ENV"] = str(venv_bin.parent) + env["PATH"] = f"{venv_bin}:{env.get('PATH', '')}" + # Remove any PYTHONHOME that might interfere + env.pop("PYTHONHOME", None) - # Add PYTHONPATH=src if src directory exists in project root - # This allows type checkers to resolve imports from src/ - project_root = venv_bin.parent.parent # Go from .venv/bin to project root - src_dir = project_root / "src" - if src_dir.exists() and src_dir.is_dir(): - existing_pythonpath = env.get("PYTHONPATH", "") - if existing_pythonpath: - env["PYTHONPATH"] = f"{src_dir}:{existing_pythonpath}" - else: - env["PYTHONPATH"] = str(src_dir) + # Add PYTHONPATH=src if src directory exists in project root + # This allows type checkers to resolve imports from src/ + project_root = venv_bin.parent.parent # Go from .venv/bin to project root + src_dir = project_root / "src" + if src_dir.exists() and src_dir.is_dir(): + existing_pythonpath = env.get("PYTHONPATH", "") + if existing_pythonpath: + env["PYTHONPATH"] = f"{src_dir}:{existing_pythonpath}" + else: + env["PYTHONPATH"] = str(src_dir) - # Run type checker from project root to pick up project configuration files - result = subprocess.run( # noqa: S603 - cmd, - check=False, - capture_output=True, - text=True, - timeout=30, - env=env, - cwd=str(project_root), - ) + # Run type checker from project root to pick up project configuration files + result = subprocess.run( # noqa: S603 + cmd, + check=False, + capture_output=True, + text=True, + timeout=30, + env=env, + cwd=str(project_root), + ) - # Check for tool-specific errors - error_check = tool_config["error_check"] - if error_check(result): - error_msg = tool_config["error_message"] - message = str(error_msg(result) if callable(error_msg) else error_msg) - return False, message + # Check for tool-specific errors + error_check = tool_config["error_check"] + if error_check(result): + error_msg = tool_config["error_message"] + message = str(error_msg(result) if callable(error_msg) else error_msg) + return False, message - # Return success or warning - if result.returncode == 0: - return True, "" - return True, f"Warning: {tool_name} error (exit {result.returncode})" + # Return success or warning + if result.returncode == 0: + return True, "" + return True, f"Warning: {tool_name} error (exit {result.returncode})" - except subprocess.TimeoutExpired: - return True, f"Warning: {tool_name} timeout" - except OSError: - return True, f"Warning: {tool_name} execution error" - finally: - _subprocess_lock.release() + except subprocess.TimeoutExpired: + return True, f"Warning: {tool_name} timeout" + except OSError: + return True, f"Warning: {tool_name} execution error" def _initialize_analysis() -> tuple[AnalysisResults, list[str]]: @@ -817,9 +848,8 @@ def _run_quality_analyses( env.pop("PYTHONHOME", None) # Acquire lock to prevent subprocess concurrency issues - acquired = _subprocess_lock.acquire(timeout=_lock_timeout) - if acquired: - try: + with _subprocess_lock(timeout=10.0) as acquired: + if acquired: with suppress(subprocess.TimeoutExpired): result = subprocess.run( # noqa: S603 cmd, @@ -832,8 +862,6 @@ def _run_quality_analyses( if result.returncode == 0: with suppress(json.JSONDecodeError): results["complexity"] = json.loads(result.stdout) - finally: - _subprocess_lock.release() # Run type checking if any tool is enabled if enable_type_checks and any( @@ -873,9 +901,8 @@ def _run_quality_analyses( env.pop("PYTHONHOME", None) # Acquire lock to prevent subprocess concurrency issues - acquired = _subprocess_lock.acquire(timeout=_lock_timeout) - if acquired: - try: + with _subprocess_lock(timeout=10.0) as acquired: + if acquired: with suppress(subprocess.TimeoutExpired): result = subprocess.run( # noqa: S603 cmd, @@ -888,8 +915,6 @@ def _run_quality_analyses( if result.returncode == 0: with suppress(json.JSONDecodeError): results["modernization"] = json.loads(result.stdout) - finally: - _subprocess_lock.release() return results