From eb52ec94d7c776ca38d6519ac2bc5a357f4bd39a Mon Sep 17 00:00:00 2001 From: BukeLy Date: Thu, 13 Nov 2025 22:31:14 +0800 Subject: [PATCH] feat: Add workspace isolation support for pipeline status MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Problem: In multi-tenant scenarios, different workspaces share a single global pipeline_status namespace, causing pipelines from different tenants to block each other, severely impacting concurrent processing performance. Solution: - Extended get_namespace_data() to recognize workspace-specific pipeline namespaces with pattern "{workspace}:pipeline" (following GraphDB pattern) - Added workspace parameter to initialize_pipeline_status() for per-tenant isolated pipeline namespaces - Updated all 7 call sites to use workspace-aware locks: * lightrag.py: process_document_queue(), aremove_document() * document_routes.py: background_delete_documents(), clear_documents(), cancel_pipeline(), get_pipeline_status(), delete_documents() Impact: - Different workspaces can process documents concurrently without blocking - Backward compatible: empty workspace defaults to "pipeline_status" - Maintains fail-fast: uninitialized pipeline raises clear error - Expected N× performance improvement for N concurrent tenants Bug fixes: - Fixed AttributeError by using self.workspace instead of self.global_config - Fixed pipeline status endpoint to show workspace-specific status - Fixed delete endpoint to check workspace-specific busy flag Code changes: 4 files, 141 insertions(+), 28 deletions(-) Testing: All syntax checks passed, comprehensive workspace isolation tests completed --- .gitignore | 2 + lightrag/api/routers/document_routes.py | 102 +++++++++++++++++++----- lightrag/kg/shared_storage.py | 26 +++++- lightrag/lightrag.py | 39 +++++++-- 4 files changed, 141 insertions(+), 28 deletions(-) diff --git a/.gitignore b/.gitignore index 8a5059c8..3c676aaf 100644 --- a/.gitignore +++ b/.gitignore @@ -72,3 +72,5 @@ download_models_hf.py # Cline files memory-bank +.claude/CLAUDE.md +.claude/ diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index 3e479a53..d5268779 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -1581,11 +1581,26 @@ async def background_delete_documents( """Background task to delete multiple documents""" from lightrag.kg.shared_storage import ( get_namespace_data, - get_pipeline_status_lock, + get_storage_keyed_lock, + initialize_pipeline_status, ) - pipeline_status = await get_namespace_data("pipeline_status") - pipeline_status_lock = get_pipeline_status_lock() + # Step 1: Get workspace + workspace = rag.workspace + + # Step 2: Construct namespace + namespace = f"{workspace}:pipeline" if workspace else "pipeline_status" + + # Step 3: Ensure initialization + await initialize_pipeline_status(workspace) + + # Step 4: Get lock + pipeline_status_lock = get_storage_keyed_lock( + keys="status", namespace=namespace, enable_logging=False + ) + + # Step 5: Get data + pipeline_status = await get_namespace_data(namespace) total_docs = len(doc_ids) successful_deletions = [] @@ -2074,12 +2089,27 @@ def create_document_routes( """ from lightrag.kg.shared_storage import ( get_namespace_data, - get_pipeline_status_lock, + get_storage_keyed_lock, + initialize_pipeline_status, ) # Get pipeline status and lock - pipeline_status = await get_namespace_data("pipeline_status") - pipeline_status_lock = get_pipeline_status_lock() + # Step 1: Get workspace + workspace = rag.workspace + + # Step 2: Construct namespace + namespace = f"{workspace}:pipeline" if workspace else "pipeline_status" + + # Step 3: Ensure initialization + await initialize_pipeline_status(workspace) + + # Step 4: Get lock + pipeline_status_lock = get_storage_keyed_lock( + keys="status", namespace=namespace, enable_logging=False + ) + + # Step 5: Get data + pipeline_status = await get_namespace_data(namespace) # Check and set status with lock async with pipeline_status_lock: @@ -2271,9 +2301,14 @@ def create_document_routes( from lightrag.kg.shared_storage import ( get_namespace_data, get_all_update_flags_status, + initialize_pipeline_status, ) - pipeline_status = await get_namespace_data("pipeline_status") + # Get workspace-specific pipeline status + workspace = rag.workspace + namespace = f"{workspace}:pipeline" if workspace else "pipeline_status" + await initialize_pipeline_status(workspace) + pipeline_status = await get_namespace_data(namespace) # Get update flags status for all namespaces update_status = await get_all_update_flags_status() @@ -2478,17 +2513,31 @@ def create_document_routes( doc_ids = delete_request.doc_ids try: - from lightrag.kg.shared_storage import get_namespace_data + from lightrag.kg.shared_storage import ( + get_namespace_data, + get_storage_keyed_lock, + initialize_pipeline_status, + ) - pipeline_status = await get_namespace_data("pipeline_status") + # Get workspace-specific pipeline status + workspace = rag.workspace + namespace = f"{workspace}:pipeline" if workspace else "pipeline_status" + await initialize_pipeline_status(workspace) - # Check if pipeline is busy - if pipeline_status.get("busy", False): - return DeleteDocByIdResponse( - status="busy", - message="Cannot delete documents while pipeline is busy", - doc_id=", ".join(doc_ids), - ) + # Use workspace-aware lock to check busy flag + pipeline_status_lock = get_storage_keyed_lock( + keys="status", namespace=namespace, enable_logging=False + ) + pipeline_status = await get_namespace_data(namespace) + + # Check if pipeline is busy with proper lock + async with pipeline_status_lock: + if pipeline_status.get("busy", False): + return DeleteDocByIdResponse( + status="busy", + message="Cannot delete documents while pipeline is busy", + doc_id=", ".join(doc_ids), + ) # Add deletion task to background tasks background_tasks.add_task( @@ -2884,11 +2933,26 @@ def create_document_routes( try: from lightrag.kg.shared_storage import ( get_namespace_data, - get_pipeline_status_lock, + get_storage_keyed_lock, + initialize_pipeline_status, ) - pipeline_status = await get_namespace_data("pipeline_status") - pipeline_status_lock = get_pipeline_status_lock() + # Step 1: Get workspace + workspace = rag.workspace + + # Step 2: Construct namespace + namespace = f"{workspace}:pipeline" if workspace else "pipeline_status" + + # Step 3: Ensure initialization + await initialize_pipeline_status(workspace) + + # Step 4: Get lock + pipeline_status_lock = get_storage_keyed_lock( + keys="status", namespace=namespace, enable_logging=False + ) + + # Step 5: Get data + pipeline_status = await get_namespace_data(namespace) async with pipeline_status_lock: if not pipeline_status.get("busy", False): diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index 0abcf719..8f34e64c 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -1270,12 +1270,23 @@ def initialize_share_data(workers: int = 1): _initialized = True -async def initialize_pipeline_status(): +async def initialize_pipeline_status(workspace: str = ""): """ Initialize pipeline namespace with default values. + + Args: + workspace: Optional workspace identifier for multi-tenant isolation. + Empty string (default) uses global "pipeline_status" namespace. + This function is called during FASTAPI lifespan for each worker. """ - pipeline_namespace = await get_namespace_data("pipeline_status", first_init=True) + # Construct namespace (following GraphDB pattern) + if workspace: + namespace = f"{workspace}:pipeline" + else: + namespace = "pipeline_status" # Backward compatibility + + pipeline_namespace = await get_namespace_data(namespace, first_init=True) async with get_internal_lock(): # Check if already initialized by checking for required fields @@ -1298,7 +1309,9 @@ async def initialize_pipeline_status(): "history_messages": history_messages, # 使用共享列表对象 } ) - direct_log(f"Process {os.getpid()} Pipeline namespace initialized") + direct_log( + f"Process {os.getpid()} Pipeline namespace '{namespace}' initialized" + ) async def get_update_flag(namespace: str): @@ -1430,7 +1443,12 @@ async def get_namespace_data( async with get_internal_lock(): if namespace not in _shared_dicts: # Special handling for pipeline_status namespace - if namespace == "pipeline_status" and not first_init: + # Supports both global "pipeline_status" and workspace-specific "{workspace}:pipeline" + is_pipeline = namespace == "pipeline_status" or namespace.endswith( + ":pipeline" + ) + + if is_pipeline and not first_init: # Check if pipeline_status should have been initialized but wasn't # This helps users understand they need to call initialize_pipeline_status() raise PipelineNotInitializedError(namespace) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index acf157da..211914ab 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -61,9 +61,10 @@ from lightrag.kg import ( from lightrag.kg.shared_storage import ( get_namespace_data, - get_pipeline_status_lock, get_graph_db_lock, get_data_init_lock, + get_storage_keyed_lock, + initialize_pipeline_status, ) from lightrag.base import ( @@ -1573,8 +1574,22 @@ class LightRAG: """ # Get pipeline status shared data and lock - pipeline_status = await get_namespace_data("pipeline_status") - pipeline_status_lock = get_pipeline_status_lock() + # Step 1: Get workspace + workspace = self.workspace + + # Step 2: Construct namespace (following GraphDB pattern) + namespace = f"{workspace}:pipeline" if workspace else "pipeline_status" + + # Step 3: Ensure initialization (on first access) + await initialize_pipeline_status(workspace) + + # Step 4: Get lock + pipeline_status_lock = get_storage_keyed_lock( + keys="status", namespace=namespace, enable_logging=False + ) + + # Step 5: Get data + pipeline_status = await get_namespace_data(namespace) # Check if another process is already processing the queue async with pipeline_status_lock: @@ -2912,8 +2927,22 @@ class LightRAG: doc_llm_cache_ids: list[str] = [] # Get pipeline status shared data and lock for status updates - pipeline_status = await get_namespace_data("pipeline_status") - pipeline_status_lock = get_pipeline_status_lock() + # Step 1: Get workspace + workspace = self.workspace + + # Step 2: Construct namespace (following GraphDB pattern) + namespace = f"{workspace}:pipeline" if workspace else "pipeline_status" + + # Step 3: Ensure initialization (on first access) + await initialize_pipeline_status(workspace) + + # Step 4: Get lock + pipeline_status_lock = get_storage_keyed_lock( + keys="status", namespace=namespace, enable_logging=False + ) + + # Step 5: Get data + pipeline_status = await get_namespace_data(namespace) async with pipeline_status_lock: log_message = f"Starting deletion process for document {doc_id}"