UN-2897 [FIX] Google Drive connector SIGSEGV crashes in Celery ForkPoolWorker processes (#1597)
UN-2897 [FIX] Google Drive connector SIGSEGV crashes in Celery ForkPoolWorker Implements lazy initialization for Google Drive API client to prevent segmentation faults when Celery forks worker processes. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import threading
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
@@ -48,14 +49,13 @@ class GoogleDriveFS(UnstractFileSystem):
|
||||
"refresh_token": settings["refresh_token"],
|
||||
GDriveConstants.TOKEN_EXPIRY: settings[GDriveConstants.TOKEN_EXPIRY],
|
||||
}
|
||||
gauth = GoogleAuth(
|
||||
settings_file=f"{os.path.dirname(__file__)}/static/settings.yaml",
|
||||
settings={"client_config": self.client_secrets["web"]},
|
||||
)
|
||||
gauth.credentials = OAuth2Credentials.from_json(
|
||||
json_data=json.dumps(self.oauth2_credentials)
|
||||
)
|
||||
self.drive = GDriveFileSystem(path="root", google_auth=gauth)
|
||||
# Store settings file path for lazy initialization
|
||||
self._settings_file = f"{os.path.dirname(__file__)}/static/settings.yaml"
|
||||
|
||||
# Lazy initialization - create client only when needed (after fork)
|
||||
# This prevents SIGSEGV crashes in Celery ForkPoolWorker processes
|
||||
self._drive = None
|
||||
self._drive_lock = threading.Lock()
|
||||
|
||||
@staticmethod
|
||||
def get_id() -> str:
|
||||
@@ -97,7 +97,34 @@ class GoogleDriveFS(UnstractFileSystem):
|
||||
return True
|
||||
|
||||
def get_fsspec_fs(self) -> GDriveFileSystem:
|
||||
return self.drive
|
||||
"""Get GDrive filesystem with lazy initialization (fork-safe).
|
||||
|
||||
This method creates the Google Drive API client on first access,
|
||||
ensuring it's created AFTER Celery fork to avoid SIGSEGV crashes.
|
||||
|
||||
The lazy initialization pattern ensures that gRPC-based Google API
|
||||
clients are created in the child process after fork(), not in the
|
||||
parent process before fork.
|
||||
|
||||
Returns:
|
||||
GDriveFileSystem: The initialized Google Drive filesystem client
|
||||
"""
|
||||
if self._drive is None:
|
||||
with self._drive_lock:
|
||||
# Double-check pattern for thread safety
|
||||
if self._drive is None:
|
||||
logger.info("Initializing Google Drive client (lazy init after fork)")
|
||||
gauth = GoogleAuth(
|
||||
settings_file=self._settings_file,
|
||||
settings={"client_config": self.client_secrets["web"]},
|
||||
)
|
||||
gauth.credentials = OAuth2Credentials.from_json(
|
||||
json_data=json.dumps(self.oauth2_credentials)
|
||||
)
|
||||
self._drive = GDriveFileSystem(path="root", google_auth=gauth)
|
||||
logger.info("Google Drive client initialized successfully")
|
||||
|
||||
return self._drive
|
||||
|
||||
def extract_metadata_file_hash(self, metadata: dict[str, Any]) -> str | None:
|
||||
"""Extracts a unique file hash from metadata.
|
||||
|
||||
Reference in New Issue
Block a user