Merge branch 'main' into Anush008/main

This commit is contained in:
yangdx
2025-10-30 11:07:39 +08:00
52 changed files with 3127 additions and 1005 deletions

View File

@@ -1,3 +1,4 @@
include requirements.txt
include lightrag/api/requirements.txt
recursive-include lightrag/api/webui *
recursive-include lightrag/api/static *

View File

@@ -104,18 +104,26 @@ lightrag-server
git clone https://github.com/HKUDS/LightRAG.git
cd LightRAG
# 如有必要创建Python虚拟环境
# 以可编辑模式安装并支持API
# 以可开发(编辑模式安装LightRAG服务器
pip install -e ".[api]"
cp env.example .env
cp env.example .env # 使用你的LLM和Embedding模型访问参数更新.env文件
# 构建前端代码
cd lightrag_webui
bun install --frozen-lockfile
bun run build
cd ..
lightrag-server
```
* 使用 Docker Compose 启动 LightRAG 服务器
```
```bash
git clone https://github.com/HKUDS/LightRAG.git
cd LightRAG
cp env.example .env
cp env.example .env # 使用你的LLM和Embedding模型访问参数更新.env文件
# modify LLM and Embedding settings in .env
docker compose up
```

View File

@@ -103,19 +103,27 @@ lightrag-server
```bash
git clone https://github.com/HKUDS/LightRAG.git
cd LightRAG
# create a Python virtual enviroment if neccesary
# Create a Python virtual enviroment if neccesary
# Install in editable mode with API support
pip install -e ".[api]"
cp env.example .env
cp env.example .env # Update the .env with your LLM and embedding configurations
# Build front-end artifacts
cd lightrag_webui
bun install --frozen-lockfile
bun run build
cd ..
lightrag-server
```
* Launching the LightRAG Server with Docker Compose
```
```bash
git clone https://github.com/HKUDS/LightRAG.git
cd LightRAG
cp env.example .env
cp env.example .env # Update the .env with your LLM and embedding configurations
# modify LLM and Embedding settings in .env
docker compose up
```
@@ -1539,7 +1547,7 @@ The dataset used in LightRAG can be downloaded from [TommyChien/UltraDomain](htt
### Generate Query
LightRAG uses the following prompt to generate high-level queries, with the corresponding code in `example/generate_query.py`.
LightRAG uses the following prompt to generate high-level queries, with the corresponding code in `examples/generate_query.py`.
<details>
<summary> Prompt </summary>

View File

@@ -208,6 +208,7 @@ OPENAI_LLM_MAX_COMPLETION_TOKENS=9000
# OPENAI_LLM_EXTRA_BODY='{"chat_template_kwargs": {"enable_thinking": false}}'
### use the following command to see all support options for Ollama LLM
### If LightRAG deployed in Docker uses host.docker.internal instead of localhost in LLM_BINDING_HOST
### lightrag-server --llm-binding ollama --help
### Ollama Server Specific Parameters
### OLLAMA_LLM_NUM_CTX must be provided, and should at least larger than MAX_TOTAL_TOKENS + 2000
@@ -229,7 +230,7 @@ EMBEDDING_BINDING=ollama
EMBEDDING_MODEL=bge-m3:latest
EMBEDDING_DIM=1024
EMBEDDING_BINDING_API_KEY=your_api_key
# If the embedding service is deployed within the same Docker stack, use host.docker.internal instead of localhost
# If LightRAG deployed in Docker uses host.docker.internal instead of localhost
EMBEDDING_BINDING_HOST=http://localhost:11434
### OpenAI compatible (VoyageAI embedding openai compatible)

View File

@@ -0,0 +1,55 @@
from openai import OpenAI
# os.environ["OPENAI_API_KEY"] = ""
def openai_complete_if_cache(
model="gpt-4o-mini", prompt=None, system_prompt=None, history_messages=[], **kwargs
) -> str:
openai_client = OpenAI()
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
messages.extend(history_messages)
messages.append({"role": "user", "content": prompt})
response = openai_client.chat.completions.create(
model=model, messages=messages, **kwargs
)
return response.choices[0].message.content
if __name__ == "__main__":
description = ""
prompt = f"""
Given the following description of a dataset:
{description}
Please identify 5 potential users who would engage with this dataset. For each user, list 5 tasks they would perform with this dataset. Then, for each (user, task) combination, generate 5 questions that require a high-level understanding of the entire dataset.
Output the results in the following structure:
- User 1: [user description]
- Task 1: [task description]
- Question 1:
- Question 2:
- Question 3:
- Question 4:
- Question 5:
- Task 2: [task description]
...
- Task 5: [task description]
- User 2: [user description]
...
- User 5: [user description]
...
"""
result = openai_complete_if_cache(model="gpt-4o-mini", prompt=prompt)
file_path = "./queries.txt"
with open(file_path, "w") as file:
file.write(result)
print(f"Queries written to {file_path}")

View File

@@ -1,4 +0,0 @@
#!/bin/bash
source /home/netman/lightrag-xyj/venv/bin/activate
lightrag-server

View File

@@ -1,5 +1,5 @@
[Unit]
Description=LightRAG XYJ Ollama Service
Description=LightRAG XYJ Service
After=network.target
[Service]
@@ -8,10 +8,23 @@ User=netman
# Memory settings
MemoryHigh=8G
MemoryMax=12G
WorkingDirectory=/home/netman/lightrag-xyj
ExecStart=/home/netman/lightrag-xyj/lightrag-api
# Set the LightRAG installation directory (change this to match your installation path)
Environment="LIGHTRAG_HOME=/home/netman/lightrag-xyj"
# Set Environment to your Python virtual environment
Environment="PATH=${LIGHTRAG_HOME}/.venv/bin"
WorkingDirectory=${LIGHTRAG_HOME}
ExecStart=${LIGHTRAG_HOME}/.venv/bin/lightrag-server
# ExecStart=${LIGHTRAG_HOME}/.venv/bin/lightrag-gunicorn
# Kill mode require ExecStart must be gunicorn or unvicorn main process
KillMode=process
ExecStop=/bin/kill -s TERM $MAINPID
TimeoutStopSec=60
Restart=always
RestartSec=10
RestartSec=30
[Install]
WantedBy=multi-user.target

View File

@@ -1,5 +1,5 @@
from .lightrag import LightRAG as LightRAG, QueryParam as QueryParam
__version__ = "1.4.9.4"
__version__ = "1.4.9.6"
__author__ = "Zirui Guo"
__url__ = "https://github.com/HKUDS/LightRAG"

View File

@@ -184,24 +184,16 @@ MAX_ASYNC=4
### 将 Lightrag 安装为 Linux 服务
从示例文件 `lightrag.service.example` 创建您的服务文件 `lightrag.service`。修改服务文件中的 WorkingDirectory 和 ExecStart
从示例文件 `lightrag.service.example` 创建您的服务文件 `lightrag.service`。修改服务文件中的服务启动定义
```text
Description=LightRAG Ollama Service
WorkingDirectory=<lightrag 安装目录>
ExecStart=<lightrag 安装目录>/lightrag/api/lightrag-api
```
修改您的服务启动脚本:`lightrag-api`。根据需要更改 python 虚拟环境激活命令:
```shell
#!/bin/bash
# 您的 python 虚拟环境激活命令
source /home/netman/lightrag-xyj/venv/bin/activate
# 启动 lightrag api 服务器
lightrag-server
# Set Enviroment to your Python virtual enviroment
Environment="PATH=/home/netman/lightrag-xyj/venv/bin"
WorkingDirectory=/home/netman/lightrag-xyj
# ExecStart=/home/netman/lightrag-xyj/venv/bin/lightrag-server
ExecStart=/home/netman/lightrag-xyj/venv/bin/lightrag-gunicorn
```
> ExecStart命令必须是 lightrag-gunicorn 或 lightrag-server 中的一个,不能使用其它脚本包裹它们。因为停止服务必须要求主进程必须是这两个进程。
安装 LightRAG 服务。如果您的系统是 Ubuntu以下命令将生效

View File

@@ -189,24 +189,18 @@ MAX_ASYNC=4
### Install LightRAG as a Linux Service
Create your service file `lightrag.service` from the sample file: `lightrag.service.example`. Modify the `WorkingDirectory` and `ExecStart` in the service file:
Create your service file `lightrag.service` from the sample file: `lightrag.service.example`. Modify the start options the service file:
```text
Description=LightRAG Ollama Service
WorkingDirectory=<lightrag installed directory>
ExecStart=<lightrag installed directory>/lightrag/api/lightrag-api
# Set Enviroment to your Python virtual enviroment
Environment="PATH=/home/netman/lightrag-xyj/venv/bin"
WorkingDirectory=/home/netman/lightrag-xyj
# ExecStart=/home/netman/lightrag-xyj/venv/bin/lightrag-server
ExecStart=/home/netman/lightrag-xyj/venv/bin/lightrag-gunicorn
```
Modify your service startup script: `lightrag-api`. Change your Python virtual environment activation command as needed:
```shell
#!/bin/bash
# your python virtual environment activation
source /home/netman/lightrag-xyj/venv/bin/activate
# start lightrag api server
lightrag-server
```
> The ExecStart command must be either `lightrag-gunicorn` or `lightrag-server`; no wrapper scripts are allowed. This is because service termination requires the main process to be one of these two executables.
Install LightRAG service. If your system is Ubuntu, the following commands will work:

View File

@@ -1 +1 @@
__api_version__ = "0243"
__api_version__ = "0248"

View File

@@ -129,11 +129,13 @@ def on_exit(server):
print("=" * 80)
print("GUNICORN MASTER PROCESS: Shutting down")
print(f"Process ID: {os.getpid()}")
print("=" * 80)
# Release shared resources
print("Finalizing shared storage...")
finalize_share_data()
print("Gunicorn shutdown complete")
print("=" * 80)
print("=" * 80)
print("Gunicorn shutdown complete")
print("=" * 80)

View File

@@ -5,10 +5,13 @@ LightRAG FastAPI Server
from fastapi import FastAPI, Depends, HTTPException, Request
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
from fastapi.openapi.docs import (
get_swagger_ui_html,
get_swagger_ui_oauth2_redirect_html,
)
import os
import logging
import logging.config
import signal
import sys
import uvicorn
import pipmaster as pm
@@ -78,24 +81,6 @@ config.read("config.ini")
auth_configured = bool(auth_handler.accounts)
def setup_signal_handlers():
"""Setup signal handlers for graceful shutdown"""
def signal_handler(sig, frame):
print(f"\n\nReceived signal {sig}, shutting down gracefully...")
print(f"Process ID: {os.getpid()}")
# Release shared resources
finalize_share_data()
# Exit with success status
sys.exit(0)
# Register signal handlers
signal.signal(signal.SIGINT, signal_handler) # Ctrl+C
signal.signal(signal.SIGTERM, signal_handler) # kill command
class LLMConfigCache:
"""Smart LLM and Embedding configuration cache class"""
@@ -341,8 +326,15 @@ def create_app(args):
# Clean up database connections
await rag.finalize_storages()
# Clean up shared data
finalize_share_data()
if "LIGHTRAG_GUNICORN_MODE" not in os.environ:
# Only perform cleanup in Uvicorn single-process mode
logger.debug("Unvicorn Mode: finalizing shared storage...")
finalize_share_data()
else:
# In Gunicorn mode with preload_app=True, cleanup is handled by on_exit hooks
logger.debug(
"Gunicorn Mode: postpone shared storage finalization to master process"
)
# Initialize FastAPI
base_description = (
@@ -358,7 +350,7 @@ def create_app(args):
"description": swagger_description,
"version": __api_version__,
"openapi_url": "/openapi.json", # Explicitly set OpenAPI schema URL
"docs_url": "/docs", # Explicitly set docs URL
"docs_url": None, # Disable default docs, we'll create custom endpoint
"redoc_url": "/redoc", # Explicitly set redoc URL
"lifespan": lifespan,
}
@@ -769,6 +761,25 @@ def create_app(args):
ollama_api = OllamaAPI(rag, top_k=args.top_k, api_key=api_key)
app.include_router(ollama_api.router, prefix="/api")
# Custom Swagger UI endpoint for offline support
@app.get("/docs", include_in_schema=False)
async def custom_swagger_ui_html():
"""Custom Swagger UI HTML with local static files"""
return get_swagger_ui_html(
openapi_url=app.openapi_url,
title=app.title + " - Swagger UI",
oauth2_redirect_url="/docs/oauth2-redirect",
swagger_js_url="/static/swagger-ui/swagger-ui-bundle.js",
swagger_css_url="/static/swagger-ui/swagger-ui.css",
swagger_favicon_url="/static/swagger-ui/favicon-32x32.png",
swagger_ui_parameters=app.swagger_ui_parameters,
)
@app.get("/docs/oauth2-redirect", include_in_schema=False)
async def swagger_ui_redirect():
"""OAuth2 redirect for Swagger UI"""
return get_swagger_ui_oauth2_redirect_html()
@app.get("/")
async def redirect_to_webui():
"""Redirect root path to /webui"""
@@ -935,6 +946,15 @@ def create_app(args):
return response
# Mount Swagger UI static files for offline support
swagger_static_dir = Path(__file__).parent / "static" / "swagger-ui"
if swagger_static_dir.exists():
app.mount(
"/static/swagger-ui",
StaticFiles(directory=swagger_static_dir),
name="swagger-ui-static",
)
# Webui mount webui/index.html
static_dir = Path(__file__).parent / "webui"
static_dir.mkdir(exist_ok=True)
@@ -1076,8 +1096,10 @@ def main():
update_uvicorn_mode_config()
display_splash_screen(global_args)
# Setup signal handlers for graceful shutdown
setup_signal_handlers()
# Note: Signal handlers are NOT registered here because:
# - Uvicorn has built-in signal handling that properly calls lifespan shutdown
# - Custom signal handlers can interfere with uvicorn's graceful shutdown
# - Cleanup is handled by the lifespan context manager's finally block
# Create application instance directly instead of using factory function
app = create_app(global_args)

View File

@@ -161,6 +161,28 @@ class ReprocessResponse(BaseModel):
}
class CancelPipelineResponse(BaseModel):
"""Response model for pipeline cancellation operation
Attributes:
status: Status of the cancellation request
message: Message describing the operation result
"""
status: Literal["cancellation_requested", "not_busy"] = Field(
description="Status of the cancellation request"
)
message: str = Field(description="Human-readable message describing the operation")
class Config:
json_schema_extra = {
"example": {
"status": "cancellation_requested",
"message": "Pipeline cancellation has been requested. Documents will be marked as FAILED.",
}
}
class InsertTextRequest(BaseModel):
"""Request model for inserting a single text document
@@ -458,7 +480,7 @@ class DocsStatusesResponse(BaseModel):
"id": "doc_789",
"content_summary": "Document pending final indexing",
"content_length": 7200,
"status": "multimodal_processed",
"status": "preprocessed",
"created_at": "2025-03-31T09:30:00",
"updated_at": "2025-03-31T09:35:00",
"track_id": "upload_20250331_093000_xyz789",
@@ -1534,7 +1556,19 @@ async def background_delete_documents(
try:
# Loop through each document ID and delete them one by one
for i, doc_id in enumerate(doc_ids, 1):
# Check for cancellation at the start of each document deletion
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
cancel_msg = f"Deletion cancelled by user at document {i}/{total_docs}. {len(successful_deletions)} deleted, {total_docs - i + 1} remaining."
logger.info(cancel_msg)
pipeline_status["latest_message"] = cancel_msg
pipeline_status["history_messages"].append(cancel_msg)
# Add remaining documents to failed list with cancellation reason
failed_deletions.extend(
doc_ids[i - 1 :]
) # i-1 because enumerate starts at 1
break # Exit the loop, remaining documents unchanged
start_msg = f"Deleting document {i}/{total_docs}: {doc_id}"
logger.info(start_msg)
pipeline_status["cur_batch"] = i
@@ -1697,6 +1731,10 @@ async def background_delete_documents(
# Final summary and check for pending requests
async with pipeline_status_lock:
pipeline_status["busy"] = False
pipeline_status["pending_requests"] = False # Reset pending requests flag
pipeline_status["cancellation_requested"] = (
False # Always reset cancellation flag
)
completion_msg = f"Deletion completed: {len(successful_deletions)} successful, {len(failed_deletions)} failed"
pipeline_status["latest_message"] = completion_msg
pipeline_status["history_messages"].append(completion_msg)
@@ -2230,7 +2268,7 @@ def create_document_routes(
logger.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
# TODO: Deprecated
# TODO: Deprecated, use /documents/paginated instead
@router.get(
"", response_model=DocsStatusesResponse, dependencies=[Depends(combined_auth)]
)
@@ -2754,4 +2792,63 @@ def create_document_routes(
logger.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@router.post(
"/cancel_pipeline",
response_model=CancelPipelineResponse,
dependencies=[Depends(combined_auth)],
)
async def cancel_pipeline():
"""
Request cancellation of the currently running pipeline.
This endpoint sets a cancellation flag in the pipeline status. The pipeline will:
1. Check this flag at key processing points
2. Stop processing new documents
3. Cancel all running document processing tasks
4. Mark all PROCESSING documents as FAILED with reason "User cancelled"
The cancellation is graceful and ensures data consistency. Documents that have
completed processing will remain in PROCESSED status.
Returns:
CancelPipelineResponse: Response with status and message
- status="cancellation_requested": Cancellation flag has been set
- status="not_busy": Pipeline is not currently running
Raises:
HTTPException: If an error occurs while setting cancellation flag (500).
"""
try:
from lightrag.kg.shared_storage import (
get_namespace_data,
get_pipeline_status_lock,
)
pipeline_status = await get_namespace_data("pipeline_status")
pipeline_status_lock = get_pipeline_status_lock()
async with pipeline_status_lock:
if not pipeline_status.get("busy", False):
return CancelPipelineResponse(
status="not_busy",
message="Pipeline is not currently running. No cancellation needed.",
)
# Set cancellation flag
pipeline_status["cancellation_requested"] = True
cancel_msg = "Pipeline cancellation requested by user"
logger.info(cancel_msg)
pipeline_status["latest_message"] = cancel_msg
pipeline_status["history_messages"].append(cancel_msg)
return CancelPipelineResponse(
status="cancellation_requested",
message="Pipeline cancellation has been requested. Documents will be marked as FAILED.",
)
except Exception as e:
logger.error(f"Error requesting pipeline cancellation: {str(e)}")
logger.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
return router

View File

@@ -17,6 +17,7 @@ class EntityUpdateRequest(BaseModel):
entity_name: str
updated_data: Dict[str, Any]
allow_rename: bool = False
allow_merge: bool = False
class RelationUpdateRequest(BaseModel):
@@ -221,22 +222,178 @@ def create_graph_routes(rag, api_key: Optional[str] = None):
"""
Update an entity's properties in the knowledge graph
This endpoint allows updating entity properties, including renaming entities.
When renaming to an existing entity name, the behavior depends on allow_merge:
Args:
request (EntityUpdateRequest): Request containing entity name, updated data, and rename flag
request (EntityUpdateRequest): Request containing:
- entity_name (str): Name of the entity to update
- updated_data (Dict[str, Any]): Dictionary of properties to update
- allow_rename (bool): Whether to allow entity renaming (default: False)
- allow_merge (bool): Whether to merge into existing entity when renaming
causes name conflict (default: False)
Returns:
Dict: Updated entity information
Dict with the following structure:
{
"status": "success",
"message": "Entity updated successfully" | "Entity merged successfully into 'target_name'",
"data": {
"entity_name": str, # Final entity name
"description": str, # Entity description
"entity_type": str, # Entity type
"source_id": str, # Source chunk IDs
... # Other entity properties
},
"operation_summary": {
"merged": bool, # Whether entity was merged into another
"merge_status": str, # "success" | "failed" | "not_attempted"
"merge_error": str | None, # Error message if merge failed
"operation_status": str, # "success" | "partial_success" | "failure"
"target_entity": str | None, # Target entity name if renaming/merging
"final_entity": str, # Final entity name after operation
"renamed": bool # Whether entity was renamed
}
}
operation_status values explained:
- "success": All operations completed successfully
* For simple updates: entity properties updated
* For renames: entity renamed successfully
* For merges: non-name updates applied AND merge completed
- "partial_success": Update succeeded but merge failed
* Non-name property updates were applied successfully
* Merge operation failed (entity not merged)
* Original entity still exists with updated properties
* Use merge_error for failure details
- "failure": Operation failed completely
* If merge_status == "failed": Merge attempted but both update and merge failed
* If merge_status == "not_attempted": Regular update failed
* No changes were applied to the entity
merge_status values explained:
- "success": Entity successfully merged into target entity
- "failed": Merge operation was attempted but failed
- "not_attempted": No merge was attempted (normal update/rename)
Behavior when renaming to an existing entity:
- If allow_merge=False: Raises ValueError with 400 status (default behavior)
- If allow_merge=True: Automatically merges the source entity into the existing target entity,
preserving all relationships and applying non-name updates first
Example Request (simple update):
POST /graph/entity/edit
{
"entity_name": "Tesla",
"updated_data": {"description": "Updated description"},
"allow_rename": false,
"allow_merge": false
}
Example Response (simple update success):
{
"status": "success",
"message": "Entity updated successfully",
"data": { ... },
"operation_summary": {
"merged": false,
"merge_status": "not_attempted",
"merge_error": null,
"operation_status": "success",
"target_entity": null,
"final_entity": "Tesla",
"renamed": false
}
}
Example Request (rename with auto-merge):
POST /graph/entity/edit
{
"entity_name": "Elon Msk",
"updated_data": {
"entity_name": "Elon Musk",
"description": "Corrected description"
},
"allow_rename": true,
"allow_merge": true
}
Example Response (merge success):
{
"status": "success",
"message": "Entity merged successfully into 'Elon Musk'",
"data": { ... },
"operation_summary": {
"merged": true,
"merge_status": "success",
"merge_error": null,
"operation_status": "success",
"target_entity": "Elon Musk",
"final_entity": "Elon Musk",
"renamed": true
}
}
Example Response (partial success - update succeeded but merge failed):
{
"status": "success",
"message": "Entity updated successfully",
"data": { ... }, # Data reflects updated "Elon Msk" entity
"operation_summary": {
"merged": false,
"merge_status": "failed",
"merge_error": "Target entity locked by another operation",
"operation_status": "partial_success",
"target_entity": "Elon Musk",
"final_entity": "Elon Msk", # Original entity still exists
"renamed": true
}
}
"""
try:
result = await rag.aedit_entity(
entity_name=request.entity_name,
updated_data=request.updated_data,
allow_rename=request.allow_rename,
allow_merge=request.allow_merge,
)
# Extract operation_summary from result, with fallback for backward compatibility
operation_summary = result.get(
"operation_summary",
{
"merged": False,
"merge_status": "not_attempted",
"merge_error": None,
"operation_status": "success",
"target_entity": None,
"final_entity": request.updated_data.get(
"entity_name", request.entity_name
),
"renamed": request.updated_data.get(
"entity_name", request.entity_name
)
!= request.entity_name,
},
)
# Separate entity data from operation_summary for clean response
entity_data = dict(result)
entity_data.pop("operation_summary", None)
# Generate appropriate response message based on merge status
response_message = (
f"Entity merged successfully into '{operation_summary['final_entity']}'"
if operation_summary.get("merged")
else "Entity updated successfully"
)
return {
"status": "success",
"message": "Entity updated successfully",
"data": result,
"message": response_message,
"data": entity_data,
"operation_summary": operation_summary,
}
except ValueError as ve:
logger.error(
@@ -299,6 +456,7 @@ def create_graph_routes(rag, api_key: Optional[str] = None):
entity_data (dict): Entity properties including:
- description (str): Textual description of the entity
- entity_type (str): Category/type of the entity (e.g., PERSON, ORGANIZATION, LOCATION)
- source_id (str): Related chunk_id from which the description originates
- Additional custom properties as needed
Response Schema:
@@ -309,6 +467,7 @@ def create_graph_routes(rag, api_key: Optional[str] = None):
"entity_name": "Tesla",
"description": "Electric vehicle manufacturer",
"entity_type": "ORGANIZATION",
"source_id": "chunk-123<SEP>chunk-456"
... (other entity properties)
}
}
@@ -361,10 +520,11 @@ def create_graph_routes(rag, api_key: Optional[str] = None):
"""
Create a new relationship between two entities in the knowledge graph
This endpoint establishes a directed relationship between two existing entities.
Both the source and target entities must already exist in the knowledge graph.
The system automatically generates vector embeddings for the relationship to
enable semantic search and graph traversal.
This endpoint establishes an undirected relationship between two existing entities.
The provided source/target order is accepted for convenience, but the backend
stored edge is undirected and may be returned with the entities swapped.
Both entities must already exist in the knowledge graph. The system automatically
generates vector embeddings for the relationship to enable semantic search and graph traversal.
Prerequisites:
- Both source_entity and target_entity must exist in the knowledge graph
@@ -376,6 +536,7 @@ def create_graph_routes(rag, api_key: Optional[str] = None):
relation_data (dict): Relationship properties including:
- description (str): Textual description of the relationship
- keywords (str): Comma-separated keywords describing the relationship type
- source_id (str): Related chunk_id from which the description originates
- weight (float): Relationship strength/importance (default: 1.0)
- Additional custom properties as needed
@@ -388,6 +549,7 @@ def create_graph_routes(rag, api_key: Optional[str] = None):
"tgt_id": "Tesla",
"description": "Elon Musk is the CEO of Tesla",
"keywords": "CEO, founder",
"source_id": "chunk-123<SEP>chunk-456"
"weight": 1.0,
... (other relationship properties)
}

View File

@@ -73,6 +73,16 @@ class QueryRequest(BaseModel):
ge=1,
)
hl_keywords: list[str] = Field(
default_factory=list,
description="List of high-level keywords to prioritize in retrieval. Leave empty to use the LLM to generate the keywords.",
)
ll_keywords: list[str] = Field(
default_factory=list,
description="List of low-level keywords to refine retrieval focus. Leave empty to use the LLM to generate the keywords.",
)
conversation_history: Optional[List[Dict[str, Any]]] = Field(
default=None,
description="Stores past conversation history to maintain context. Format: [{'role': 'user/assistant', 'content': 'message'}].",
@@ -294,6 +304,16 @@ def create_query_routes(rag, api_key: Optional[str] = None, top_k: int = 60):
}
```
Bypass initial LLM call by providing high-level and low-level keywords:
```json
{
"query": "What is Retrieval-Augmented-Generation?",
"hl_keywords": ["machine learning", "information retrieval", "natural language processing"],
"ll_keywords": ["retrieval augmented generation", "RAG", "knowledge base"],
"mode": "mix"
}
```
Advanced query with references:
```json
{
@@ -482,6 +502,16 @@ def create_query_routes(rag, api_key: Optional[str] = None, top_k: int = 60):
}
```
Bypass initial LLM call by providing high-level and low-level keywords:
```json
{
"query": "What is Retrieval-Augmented-Generation?",
"hl_keywords": ["machine learning", "information retrieval", "natural language processing"],
"ll_keywords": ["retrieval augmented generation", "RAG", "knowledge base"],
"mode": "mix"
}
```
Complete response query:
```json
{
@@ -968,6 +998,16 @@ def create_query_routes(rag, api_key: Optional[str] = None, top_k: int = 60):
}
```
Bypass initial LLM call by providing high-level and low-level keywords:
```json
{
"query": "What is Retrieval-Augmented-Generation?",
"hl_keywords": ["machine learning", "information retrieval", "natural language processing"],
"ll_keywords": ["retrieval augmented generation", "RAG", "knowledge base"],
"mode": "mix"
}
```
**Response Analysis:**
- **Empty arrays**: Normal for certain modes (e.g., naive mode has no entities/relationships)
- **Processing info**: Shows retrieval statistics and token usage

View File

@@ -5,12 +5,11 @@ Start LightRAG server with Gunicorn
import os
import sys
import signal
import pipmaster as pm
from lightrag.api.utils_api import display_splash_screen, check_env_file
from lightrag.api.config import global_args
from lightrag.utils import get_env_value
from lightrag.kg.shared_storage import initialize_share_data, finalize_share_data
from lightrag.kg.shared_storage import initialize_share_data
from lightrag.constants import (
DEFAULT_WOKERS,
@@ -34,21 +33,10 @@ def check_and_install_dependencies():
print(f"{package} installed successfully")
# Signal handler for graceful shutdown
def signal_handler(sig, frame):
print("\n\n" + "=" * 80)
print("RECEIVED TERMINATION SIGNAL")
print(f"Process ID: {os.getpid()}")
print("=" * 80 + "\n")
# Release shared resources
finalize_share_data()
# Exit with success status
sys.exit(0)
def main():
# Set Gunicorn mode flag for lifespan cleanup detection
os.environ["LIGHTRAG_GUNICORN_MODE"] = "1"
# Check .env file
if not check_env_file():
sys.exit(1)
@@ -56,9 +44,8 @@ def main():
# Check and install dependencies
check_and_install_dependencies()
# Register signal handlers for graceful shutdown
signal.signal(signal.SIGINT, signal_handler) # Ctrl+C
signal.signal(signal.SIGTERM, signal_handler) # kill command
# Note: Signal handlers are NOT registered here because:
# - Master cleanup already handled by gunicorn_config.on_exit()
# Display startup information
display_splash_screen(global_args)

Binary file not shown.

After

Width:  |  Height:  |  Size: 628 B

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -720,7 +720,7 @@ class DocStatus(str, Enum):
PENDING = "pending"
PROCESSING = "processing"
PREPROCESSED = "multimodal_processed"
PREPROCESSED = "preprocessed"
PROCESSED = "processed"
FAILED = "failed"
@@ -751,6 +751,25 @@ class DocProcessingStatus:
"""Error message if failed"""
metadata: dict[str, Any] = field(default_factory=dict)
"""Additional metadata"""
multimodal_processed: bool | None = field(default=None, repr=False)
"""Internal field: indicates if multimodal processing is complete. Not shown in repr() but accessible for debugging."""
def __post_init__(self):
"""
Handle status conversion based on multimodal_processed field.
Business rules:
- If multimodal_processed is False and status is PROCESSED,
then change status to PREPROCESSED
- The multimodal_processed field is kept (with repr=False) for internal use and debugging
"""
# Apply status conversion logic
if self.multimodal_processed is not None:
if (
self.multimodal_processed is False
and self.status == DocStatus.PROCESSED
):
self.status = DocStatus.PREPROCESSED
@dataclass

View File

@@ -38,7 +38,7 @@ DEFAULT_ENTITY_TYPES = [
"NaturalObject",
]
# Separator for graph fields
# Separator for: description, source_id and relation-key fields(Can not be changed after data inserted)
GRAPH_FIELD_SEP = "<SEP>"
# Query and retrieval configuration defaults

View File

@@ -96,3 +96,11 @@ class PipelineNotInitializedError(KeyError):
f" await initialize_pipeline_status()"
)
super().__init__(msg)
class PipelineCancelledException(Exception):
"""Raised when pipeline processing is cancelled by user request."""
def __init__(self, message: str = "User cancelled"):
super().__init__(message)
self.message = message

View File

@@ -462,14 +462,37 @@ class MilvusVectorDBStorage(BaseVectorStorage):
if type_name in ["FloatVector", "FLOAT_VECTOR"]:
existing_dimension = field.get("params", {}).get("dim")
if existing_dimension != current_dimension:
# Convert both to int for comparison to handle type mismatches
# (Milvus API may return string "1024" vs int 1024)
try:
existing_dim_int = (
int(existing_dimension)
if existing_dimension is not None
else None
)
current_dim_int = (
int(current_dimension)
if current_dimension is not None
else None
)
except (TypeError, ValueError) as e:
logger.error(
f"[{self.workspace}] Failed to parse dimensions: existing={existing_dimension} (type={type(existing_dimension)}), "
f"current={current_dimension} (type={type(current_dimension)}), error={e}"
)
raise ValueError(
f"Invalid dimension values for collection '{self.final_namespace}': "
f"existing={existing_dimension}, current={current_dimension}"
) from e
if existing_dim_int != current_dim_int:
raise ValueError(
f"Vector dimension mismatch for collection '{self.final_namespace}': "
f"existing={existing_dimension}, current={current_dimension}"
f"existing={existing_dim_int}, current={current_dim_int}"
)
logger.debug(
f"[{self.workspace}] Vector dimension check passed: {current_dimension}"
f"[{self.workspace}] Vector dimension check passed: {current_dim_int}"
)
return
@@ -960,7 +983,7 @@ class MilvusVectorDBStorage(BaseVectorStorage):
async def initialize(self):
"""Initialize Milvus collection"""
async with get_data_init_lock(enable_logging=True):
async with get_data_init_lock():
if self._initialized:
return

View File

@@ -184,9 +184,17 @@ class NanoVectorDBStorage(BaseVectorStorage):
"""
try:
client = await self._get_client()
# Record count before deletion
before_count = len(client)
client.delete(ids)
# Calculate actual deleted count
after_count = len(client)
deleted_count = before_count - after_count
logger.debug(
f"[{self.workspace}] Successfully deleted {len(ids)} vectors from {self.namespace}"
f"[{self.workspace}] Successfully deleted {deleted_count} vectors from {self.namespace}"
)
except Exception as e:
logger.error(

View File

@@ -4613,16 +4613,19 @@ class PGGraphStorage(BaseGraphStorage):
Returns:
A list of all nodes, where each node is a dictionary of its properties
"""
query = f"""SELECT * FROM cypher('{self.graph_name}', $$
MATCH (n:base)
RETURN n
$$) AS (n agtype)"""
# Use native SQL to avoid Cypher wrapper overhead
# Original: SELECT * FROM cypher(...) with MATCH (n:base)
# Optimized: Direct table access for better performance
query = f"""
SELECT properties
FROM {self.graph_name}.base
"""
results = await self._query(query)
nodes = []
for result in results:
if result["n"]:
node_dict = result["n"]["properties"]
if result.get("properties"):
node_dict = result["properties"]
# Process string result, parse it to JSON dictionary
if isinstance(node_dict, str):
@@ -4632,6 +4635,7 @@ class PGGraphStorage(BaseGraphStorage):
logger.warning(
f"[{self.workspace}] Failed to parse node string: {node_dict}"
)
continue
# Add node id (entity_id) to the dictionary for easier access
node_dict["id"] = node_dict.get("entity_id")
@@ -4643,12 +4647,21 @@ class PGGraphStorage(BaseGraphStorage):
Returns:
A list of all edges, where each edge is a dictionary of its properties
(The edge is bidirectional; deduplication must be handled by the caller)
(If 2 directional edges exist between the same pair of nodes, deduplication must be handled by the caller)
"""
# Use native SQL to avoid Cartesian product (N×N) in Cypher MATCH
# Original Cypher: MATCH (a:base)-[r]-(b:base) creates ~50 billion row combinations
# Optimized: Start from edges table, join to nodes only to get entity_id
# Performance: O(E) instead of O(N²), ~50,000x faster for large graphs
query = f"""
SELECT DISTINCT
(ag_catalog.agtype_access_operator(VARIADIC ARRAY[a.properties, '"entity_id"'::agtype]))::text AS source,
(ag_catalog.agtype_access_operator(VARIADIC ARRAY[b.properties, '"entity_id"'::agtype]))::text AS target,
r.properties
FROM {self.graph_name}."DIRECTED" r
JOIN {self.graph_name}.base a ON r.start_id = a.id
JOIN {self.graph_name}.base b ON r.end_id = b.id
"""
query = f"""SELECT * FROM cypher('{self.graph_name}', $$
MATCH (a:base)-[r]-(b:base)
RETURN DISTINCT a.entity_id AS source, b.entity_id AS target, properties(r) AS properties
$$) AS (source text, target text, properties agtype)"""
results = await self._query(query)
edges = []

View File

@@ -10,17 +10,19 @@ from typing import Any, Dict, List, Optional, Union, TypeVar, Generic
from lightrag.exceptions import PipelineNotInitializedError
DEBUG_LOCKS = False
# Define a direct print function for critical logs that must be visible in all processes
def direct_log(message, enable_output: bool = False, level: str = "DEBUG"):
def direct_log(message, enable_output: bool = True, level: str = "DEBUG"):
"""
Log a message directly to stderr to ensure visibility in all processes,
including the Gunicorn master process.
Args:
message: The message to log
level: Log level (default: "DEBUG")
enable_output: Whether to actually output the log (default: True)
level: Log level for message (control the visibility of the message by comparing with the current logger level)
enable_output: Enable or disable log message (Force to turn off the message,)
"""
if not enable_output:
return
@@ -44,7 +46,6 @@ def direct_log(message, enable_output: bool = False, level: str = "DEBUG"):
}
message_level = level_mapping.get(level.upper(), logging.DEBUG)
# print(f"Diret_log: {level.upper()} {message_level} ? {current_level}", file=sys.stderr, flush=True)
if message_level >= current_level:
print(f"{level}: {message}", file=sys.stderr, flush=True)
@@ -91,7 +92,6 @@ _storage_keyed_lock: Optional["KeyedUnifiedLock"] = None
# async locks for coroutine synchronization in multiprocess mode
_async_locks: Optional[Dict[str, asyncio.Lock]] = None
DEBUG_LOCKS = False
_debug_n_locks_acquired: int = 0
@@ -141,7 +141,8 @@ class UnifiedLock(Generic[T]):
if not self._is_async and self._async_lock is not None:
await self._async_lock.acquire()
direct_log(
f"== Lock == Process {self._pid}: Async lock for '{self._name}' acquired",
f"== Lock == Process {self._pid}: Acquired async lock '{self._name}",
level="DEBUG",
enable_output=self._enable_logging,
)
@@ -152,7 +153,8 @@ class UnifiedLock(Generic[T]):
self._lock.acquire()
direct_log(
f"== Lock == Process {self._pid}: Lock '{self._name}' acquired (async={self._is_async})",
f"== Lock == Process {self._pid}: Acquired lock {self._name} (async={self._is_async})",
level="INFO",
enable_output=self._enable_logging,
)
return self
@@ -168,7 +170,7 @@ class UnifiedLock(Generic[T]):
direct_log(
f"== Lock == Process {self._pid}: Failed to acquire lock '{self._name}': {e}",
level="ERROR",
enable_output=self._enable_logging,
enable_output=True,
)
raise
@@ -183,7 +185,8 @@ class UnifiedLock(Generic[T]):
main_lock_released = True
direct_log(
f"== Lock == Process {self._pid}: Lock '{self._name}' released (async={self._is_async})",
f"== Lock == Process {self._pid}: Released lock {self._name} (async={self._is_async})",
level="INFO",
enable_output=self._enable_logging,
)
@@ -191,7 +194,8 @@ class UnifiedLock(Generic[T]):
if not self._is_async and self._async_lock is not None:
self._async_lock.release()
direct_log(
f"== Lock == Process {self._pid}: Async lock '{self._name}' released",
f"== Lock == Process {self._pid}: Released async lock {self._name}",
level="DEBUG",
enable_output=self._enable_logging,
)
@@ -199,7 +203,7 @@ class UnifiedLock(Generic[T]):
direct_log(
f"== Lock == Process {self._pid}: Failed to release lock '{self._name}': {e}",
level="ERROR",
enable_output=self._enable_logging,
enable_output=True,
)
# If main lock release failed but async lock hasn't been released, try to release it
@@ -211,19 +215,20 @@ class UnifiedLock(Generic[T]):
try:
direct_log(
f"== Lock == Process {self._pid}: Attempting to release async lock after main lock failure",
level="WARNING",
level="DEBUG",
enable_output=self._enable_logging,
)
self._async_lock.release()
direct_log(
f"== Lock == Process {self._pid}: Successfully released async lock after main lock failure",
level="INFO",
enable_output=self._enable_logging,
)
except Exception as inner_e:
direct_log(
f"== Lock == Process {self._pid}: Failed to release async lock after main lock failure: {inner_e}",
level="ERROR",
enable_output=self._enable_logging,
enable_output=True,
)
raise
@@ -234,12 +239,14 @@ class UnifiedLock(Generic[T]):
if self._is_async:
raise RuntimeError("Use 'async with' for shared_storage lock")
direct_log(
f"== Lock == Process {self._pid}: Acquiring lock '{self._name}' (sync)",
f"== Lock == Process {self._pid}: Acquiring lock {self._name} (sync)",
level="DEBUG",
enable_output=self._enable_logging,
)
self._lock.acquire()
direct_log(
f"== Lock == Process {self._pid}: Lock '{self._name}' acquired (sync)",
f"== Lock == Process {self._pid}: Acquired lock {self._name} (sync)",
level="INFO",
enable_output=self._enable_logging,
)
return self
@@ -247,7 +254,7 @@ class UnifiedLock(Generic[T]):
direct_log(
f"== Lock == Process {self._pid}: Failed to acquire lock '{self._name}' (sync): {e}",
level="ERROR",
enable_output=self._enable_logging,
enable_output=True,
)
raise
@@ -258,18 +265,20 @@ class UnifiedLock(Generic[T]):
raise RuntimeError("Use 'async with' for shared_storage lock")
direct_log(
f"== Lock == Process {self._pid}: Releasing lock '{self._name}' (sync)",
level="DEBUG",
enable_output=self._enable_logging,
)
self._lock.release()
direct_log(
f"== Lock == Process {self._pid}: Lock '{self._name}' released (sync)",
f"== Lock == Process {self._pid}: Released lock {self._name} (sync)",
level="INFO",
enable_output=self._enable_logging,
)
except Exception as e:
direct_log(
f"== Lock == Process {self._pid}: Failed to release lock '{self._name}' (sync): {e}",
level="ERROR",
enable_output=self._enable_logging,
enable_output=True,
)
raise
@@ -401,7 +410,7 @@ def _perform_lock_cleanup(
direct_log(
f"== {lock_type} Lock == Cleanup failed: {e}",
level="ERROR",
enable_output=False,
enable_output=True,
)
return 0, earliest_cleanup_time, last_cleanup_time
@@ -689,7 +698,7 @@ class KeyedUnifiedLock:
direct_log(
f"Error during multiprocess lock cleanup: {e}",
level="ERROR",
enable_output=False,
enable_output=True,
)
# 2. Cleanup async locks using generic function
@@ -718,7 +727,7 @@ class KeyedUnifiedLock:
direct_log(
f"Error during async lock cleanup: {e}",
level="ERROR",
enable_output=False,
enable_output=True,
)
# 3. Get current status after cleanup
@@ -772,7 +781,7 @@ class KeyedUnifiedLock:
direct_log(
f"Error getting keyed lock status: {e}",
level="ERROR",
enable_output=False,
enable_output=True,
)
return status
@@ -797,32 +806,239 @@ class _KeyedLockContext:
if enable_logging is not None
else parent._default_enable_logging
)
self._ul: Optional[List["UnifiedLock"]] = None # set in __aenter__
self._ul: Optional[List[Dict[str, Any]]] = None # set in __aenter__
# ----- enter -----
async def __aenter__(self):
if self._ul is not None:
raise RuntimeError("KeyedUnifiedLock already acquired in current context")
# acquire locks for all keys in the namespace
self._ul = []
for key in self._keys:
lock = self._parent._get_lock_for_key(
self._namespace, key, enable_logging=self._enable_logging
)
await lock.__aenter__()
inc_debug_n_locks_acquired()
self._ul.append(lock)
return self
try:
# Acquire locks for all keys in the namespace
for key in self._keys:
lock = None
entry = None
try:
# 1. Get lock object (reference count is incremented here)
lock = self._parent._get_lock_for_key(
self._namespace, key, enable_logging=self._enable_logging
)
# 2. Immediately create and add entry to list (critical for rollback to work)
entry = {
"key": key,
"lock": lock,
"entered": False,
"debug_inc": False,
"ref_incremented": True, # Mark that reference count has been incremented
}
self._ul.append(
entry
) # Add immediately after _get_lock_for_key for rollback to work
# 3. Try to acquire the lock
# Use try-finally to ensure state is updated atomically
lock_acquired = False
try:
await lock.__aenter__()
lock_acquired = True # Lock successfully acquired
finally:
if lock_acquired:
entry["entered"] = True
inc_debug_n_locks_acquired()
entry["debug_inc"] = True
except asyncio.CancelledError:
# Lock acquisition was cancelled
# The finally block above ensures entry["entered"] is correct
direct_log(
f"Lock acquisition cancelled for key {key}",
level="WARNING",
enable_output=self._enable_logging,
)
raise
except Exception as e:
# Other exceptions, log and re-raise
direct_log(
f"Lock acquisition failed for key {key}: {e}",
level="ERROR",
enable_output=True,
)
raise
return self
except BaseException:
# Critical: if any exception occurs (including CancelledError) during lock acquisition,
# we must rollback all already acquired locks to prevent lock leaks
# Use shield to ensure rollback completes
await asyncio.shield(self._rollback_acquired_locks())
raise
async def _rollback_acquired_locks(self):
"""Rollback all acquired locks in case of exception during __aenter__"""
if not self._ul:
return
async def rollback_single_entry(entry):
"""Rollback a single lock acquisition"""
key = entry["key"]
lock = entry["lock"]
debug_inc = entry["debug_inc"]
entered = entry["entered"]
ref_incremented = entry.get(
"ref_incremented", True
) # Default to True for safety
errors = []
# 1. If lock was acquired, release it
if entered:
try:
await lock.__aexit__(None, None, None)
except Exception as e:
errors.append(("lock_exit", e))
direct_log(
f"Lock rollback error for key {key}: {e}",
level="ERROR",
enable_output=True,
)
# 2. Release reference count (if it was incremented)
if ref_incremented:
try:
self._parent._release_lock_for_key(self._namespace, key)
except Exception as e:
errors.append(("ref_release", e))
direct_log(
f"Lock rollback reference release error for key {key}: {e}",
level="ERROR",
enable_output=True,
)
# 3. Decrement debug counter
if debug_inc:
try:
dec_debug_n_locks_acquired()
except Exception as e:
errors.append(("debug_dec", e))
direct_log(
f"Lock rollback counter decrementing error for key {key}: {e}",
level="ERROR",
enable_output=True,
)
return errors
# Release already acquired locks in reverse order
for entry in reversed(self._ul):
# Use shield to protect each lock's rollback
try:
await asyncio.shield(rollback_single_entry(entry))
except Exception as e:
# Log but continue rolling back other locks
direct_log(
f"Lock rollback unexpected error for {entry['key']}: {e}",
level="ERROR",
enable_output=True,
)
self._ul = None
# ----- exit -----
async def __aexit__(self, exc_type, exc, tb):
# The UnifiedLock takes care of proper release order
for ul, key in zip(reversed(self._ul), reversed(self._keys)):
await ul.__aexit__(exc_type, exc, tb)
self._parent._release_lock_for_key(self._namespace, key)
dec_debug_n_locks_acquired()
self._ul = None
if self._ul is None:
return
async def release_all_locks():
"""Release all locks with comprehensive error handling, protected from cancellation"""
async def release_single_entry(entry, exc_type, exc, tb):
"""Release a single lock with full protection"""
key = entry["key"]
lock = entry["lock"]
debug_inc = entry["debug_inc"]
entered = entry["entered"]
errors = []
# 1. Release the lock
if entered:
try:
await lock.__aexit__(exc_type, exc, tb)
except Exception as e:
errors.append(("lock_exit", e))
direct_log(
f"Lock release error for key {key}: {e}",
level="ERROR",
enable_output=True,
)
# 2. Release reference count
try:
self._parent._release_lock_for_key(self._namespace, key)
except Exception as e:
errors.append(("ref_release", e))
direct_log(
f"Lock release reference error for key {key}: {e}",
level="ERROR",
enable_output=True,
)
# 3. Decrement debug counter
if debug_inc:
try:
dec_debug_n_locks_acquired()
except Exception as e:
errors.append(("debug_dec", e))
direct_log(
f"Lock release counter decrementing error for key {key}: {e}",
level="ERROR",
enable_output=True,
)
return errors
all_errors = []
# Release locks in reverse order
# This entire loop is protected by the outer shield
for entry in reversed(self._ul):
try:
errors = await release_single_entry(entry, exc_type, exc, tb)
for error_type, error in errors:
all_errors.append((entry["key"], error_type, error))
except Exception as e:
all_errors.append((entry["key"], "unexpected", e))
direct_log(
f"Lock release unexpected error for {entry['key']}: {e}",
level="ERROR",
enable_output=True,
)
return all_errors
# CRITICAL: Protect the entire release process with shield
# This ensures that even if cancellation occurs, all locks are released
try:
all_errors = await asyncio.shield(release_all_locks())
except Exception as e:
direct_log(
f"Critical error during __aexit__ cleanup: {e}",
level="ERROR",
enable_output=True,
)
all_errors = []
finally:
# Always clear the lock list, even if shield was cancelled
self._ul = None
# If there were release errors and no other exception, raise the first release error
if all_errors and exc_type is None:
raise all_errors[0][2] # (key, error_type, error)
def get_internal_lock(enable_logging: bool = False) -> UnifiedLock:

View File

@@ -22,6 +22,7 @@ from typing import (
Dict,
)
from lightrag.prompt import PROMPTS
from lightrag.exceptions import PipelineCancelledException
from lightrag.constants import (
DEFAULT_MAX_GLEANING,
DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE,
@@ -86,7 +87,7 @@ from lightrag.operate import (
merge_nodes_and_edges,
kg_query,
naive_query,
_rebuild_knowledge_from_chunks,
rebuild_knowledge_from_chunks,
)
from lightrag.constants import GRAPH_FIELD_SEP
from lightrag.utils import (
@@ -709,7 +710,7 @@ class LightRAG:
async def check_and_migrate_data(self):
"""Check if data migration is needed and perform migration if necessary"""
async with get_data_init_lock(enable_logging=True):
async with get_data_init_lock():
try:
# Check if migration is needed:
# 1. chunk_entity_relation_graph has entities and relations (count > 0)
@@ -1603,6 +1604,7 @@ class LightRAG:
"batchs": 0, # Total number of files to be processed
"cur_batch": 0, # Number of files already processed
"request_pending": False, # Clear any previous request
"cancellation_requested": False, # Initialize cancellation flag
"latest_message": "",
}
)
@@ -1619,6 +1621,22 @@ class LightRAG:
try:
# Process documents until no more documents or requests
while True:
# Check for cancellation request at the start of main loop
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
# Clear pending request
pipeline_status["request_pending"] = False
# Celar cancellation flag
pipeline_status["cancellation_requested"] = False
log_message = "Pipeline cancelled by user"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
# Exit directly, skipping request_pending check
return
if not to_process_docs:
log_message = "All enqueued documents have been processed"
logger.info(log_message)
@@ -1681,14 +1699,25 @@ class LightRAG:
semaphore: asyncio.Semaphore,
) -> None:
"""Process single document"""
# Initialize variables at the start to prevent UnboundLocalError in error handling
file_path = "unknown_source"
current_file_number = 0
file_extraction_stage_ok = False
processing_start_time = int(time.time())
first_stage_tasks = []
entity_relation_task = None
async with semaphore:
nonlocal processed_count
current_file_number = 0
# Initialize to prevent UnboundLocalError in error handling
first_stage_tasks = []
entity_relation_task = None
try:
# Check for cancellation before starting document processing
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
raise PipelineCancelledException("User cancelled")
# Get file path from status document
file_path = getattr(
status_doc, "file_path", "unknown_source"
@@ -1751,6 +1780,11 @@ class LightRAG:
# Record processing start time
processing_start_time = int(time.time())
# Check for cancellation before entity extraction
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
raise PipelineCancelledException("User cancelled")
# Process document in two stages
# Stage 1: Process text chunks and docs (parallel execution)
doc_status_task = asyncio.create_task(
@@ -1805,16 +1839,29 @@ class LightRAG:
file_extraction_stage_ok = True
except Exception as e:
# Log error and update pipeline status
logger.error(traceback.format_exc())
error_msg = f"Failed to extract document {current_file_number}/{total_files}: {file_path}"
logger.error(error_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(
traceback.format_exc()
)
pipeline_status["history_messages"].append(error_msg)
# Check if this is a user cancellation
if isinstance(e, PipelineCancelledException):
# User cancellation - log brief message only, no traceback
error_msg = f"User cancelled {current_file_number}/{total_files}: {file_path}"
logger.warning(error_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(
error_msg
)
else:
# Other exceptions - log with traceback
logger.error(traceback.format_exc())
error_msg = f"Failed to extract document {current_file_number}/{total_files}: {file_path}"
logger.error(error_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(
traceback.format_exc()
)
pipeline_status["history_messages"].append(
error_msg
)
# Cancel tasks that are not yet completed
all_tasks = first_stage_tasks + (
@@ -1824,9 +1871,14 @@ class LightRAG:
if task and not task.done():
task.cancel()
# Persistent llm cache
# Persistent llm cache with error handling
if self.llm_response_cache:
await self.llm_response_cache.index_done_callback()
try:
await self.llm_response_cache.index_done_callback()
except Exception as persist_error:
logger.error(
f"Failed to persist LLM cache: {persist_error}"
)
# Record processing end time for failed case
processing_end_time = int(time.time())
@@ -1856,6 +1908,15 @@ class LightRAG:
# Concurrency is controlled by keyed lock for individual entities and relationships
if file_extraction_stage_ok:
try:
# Check for cancellation before merge
async with pipeline_status_lock:
if pipeline_status.get(
"cancellation_requested", False
):
raise PipelineCancelledException(
"User cancelled"
)
# Get chunk_results from entity_relation_task
chunk_results = await entity_relation_task
await merge_nodes_and_edges(
@@ -1914,22 +1975,38 @@ class LightRAG:
)
except Exception as e:
# Log error and update pipeline status
logger.error(traceback.format_exc())
error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}"
logger.error(error_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(
traceback.format_exc()
)
pipeline_status["history_messages"].append(
error_msg
)
# Check if this is a user cancellation
if isinstance(e, PipelineCancelledException):
# User cancellation - log brief message only, no traceback
error_msg = f"User cancelled during merge {current_file_number}/{total_files}: {file_path}"
logger.warning(error_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(
error_msg
)
else:
# Other exceptions - log with traceback
logger.error(traceback.format_exc())
error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}"
logger.error(error_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(
traceback.format_exc()
)
pipeline_status["history_messages"].append(
error_msg
)
# Persistent llm cache
# Persistent llm cache with error handling
if self.llm_response_cache:
await self.llm_response_cache.index_done_callback()
try:
await self.llm_response_cache.index_done_callback()
except Exception as persist_error:
logger.error(
f"Failed to persist LLM cache: {persist_error}"
)
# Record processing end time for failed case
processing_end_time = int(time.time())
@@ -1970,7 +2047,19 @@ class LightRAG:
)
# Wait for all document processing to complete
await asyncio.gather(*doc_tasks)
try:
await asyncio.gather(*doc_tasks)
except PipelineCancelledException:
# Cancel all remaining tasks
for task in doc_tasks:
if not task.done():
task.cancel()
# Wait for all tasks to complete cancellation
await asyncio.wait(doc_tasks, return_when=asyncio.ALL_COMPLETED)
# Exit directly (document statuses already updated in process_document)
return
# Check if there's a pending request to process more documents (with lock)
has_pending_request = False
@@ -2001,11 +2090,14 @@ class LightRAG:
to_process_docs.update(pending_docs)
finally:
log_message = "Enqueued document processing pipeline stoped"
log_message = "Enqueued document processing pipeline stopped"
logger.info(log_message)
# Always reset busy status when done or if an exception occurs (with lock)
# Always reset busy status and cancellation flag when done or if an exception occurs (with lock)
async with pipeline_status_lock:
pipeline_status["busy"] = False
pipeline_status["cancellation_requested"] = (
False # Always reset cancellation flag
)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
@@ -3055,6 +3147,9 @@ class LightRAG:
]
if not existing_sources:
# No chunk references means this entity should be deleted
entities_to_delete.add(node_label)
entity_chunk_updates[node_label] = []
continue
remaining_sources = subtract_source_ids(existing_sources, chunk_ids)
@@ -3076,6 +3171,7 @@ class LightRAG:
# Process relationships
for edge_data in affected_edges:
# source target is not in normalize order in graph db property
src = edge_data.get("source")
tgt = edge_data.get("target")
@@ -3112,6 +3208,9 @@ class LightRAG:
]
if not existing_sources:
# No chunk references means this relationship should be deleted
relationships_to_delete.add(edge_tuple)
relation_chunk_updates[edge_tuple] = []
continue
remaining_sources = subtract_source_ids(existing_sources, chunk_ids)
@@ -3195,32 +3294,7 @@ class LightRAG:
logger.error(f"Failed to delete chunks: {e}")
raise Exception(f"Failed to delete document chunks: {e}") from e
# 6. Delete entities that have no remaining sources
if entities_to_delete:
try:
# Delete from vector database
entity_vdb_ids = [
compute_mdhash_id(entity, prefix="ent-")
for entity in entities_to_delete
]
await self.entities_vdb.delete(entity_vdb_ids)
# Delete from graph
await self.chunk_entity_relation_graph.remove_nodes(
list(entities_to_delete)
)
async with pipeline_status_lock:
log_message = f"Successfully deleted {len(entities_to_delete)} entities"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
except Exception as e:
logger.error(f"Failed to delete entities: {e}")
raise Exception(f"Failed to delete entities: {e}") from e
# 7. Delete relationships that have no remaining sources
# 6. Delete relationships that have no remaining sources
if relationships_to_delete:
try:
# Delete from vector database
@@ -3239,6 +3313,14 @@ class LightRAG:
list(relationships_to_delete)
)
# Delete from relation_chunks storage
if self.relation_chunks:
relation_storage_keys = [
make_relation_chunk_key(src, tgt)
for src, tgt in relationships_to_delete
]
await self.relation_chunks.delete(relation_storage_keys)
async with pipeline_status_lock:
log_message = f"Successfully deleted {len(relationships_to_delete)} relations"
logger.info(log_message)
@@ -3249,13 +3331,73 @@ class LightRAG:
logger.error(f"Failed to delete relationships: {e}")
raise Exception(f"Failed to delete relationships: {e}") from e
# 7. Delete entities that have no remaining sources
if entities_to_delete:
try:
# Debug: Check and log all edges before deleting nodes
edges_still_exist = 0
for entity in entities_to_delete:
edges = (
await self.chunk_entity_relation_graph.get_node_edges(
entity
)
)
if edges:
for src, tgt in edges:
if (
src in entities_to_delete
and tgt in entities_to_delete
):
logger.warning(
f"Edge still exists: {src} <-> {tgt}"
)
elif src in entities_to_delete:
logger.warning(
f"Edge still exists: {src} --> {tgt}"
)
else:
logger.warning(
f"Edge still exists: {tgt} --> {src}"
)
edges_still_exist += 1
if edges_still_exist:
logger.warning(
f"⚠️ {edges_still_exist} entities still has edges before deletion"
)
# Delete from graph
await self.chunk_entity_relation_graph.remove_nodes(
list(entities_to_delete)
)
# Delete from vector database
entity_vdb_ids = [
compute_mdhash_id(entity, prefix="ent-")
for entity in entities_to_delete
]
await self.entities_vdb.delete(entity_vdb_ids)
# Delete from entity_chunks storage
if self.entity_chunks:
await self.entity_chunks.delete(list(entities_to_delete))
async with pipeline_status_lock:
log_message = f"Successfully deleted {len(entities_to_delete)} entities"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
except Exception as e:
logger.error(f"Failed to delete entities: {e}")
raise Exception(f"Failed to delete entities: {e}") from e
# Persist changes to graph database before releasing graph database lock
await self._insert_done()
# 8. Rebuild entities and relationships from remaining chunks
if entities_to_rebuild or relationships_to_rebuild:
try:
await _rebuild_knowledge_from_chunks(
await rebuild_knowledge_from_chunks(
entities_to_rebuild=entities_to_rebuild,
relationships_to_rebuild=relationships_to_rebuild,
knowledge_graph_inst=self.chunk_entity_relation_graph,
@@ -3473,16 +3615,22 @@ class LightRAG:
)
async def aedit_entity(
self, entity_name: str, updated_data: dict[str, str], allow_rename: bool = True
self,
entity_name: str,
updated_data: dict[str, str],
allow_rename: bool = True,
allow_merge: bool = False,
) -> dict[str, Any]:
"""Asynchronously edit entity information.
Updates entity information in the knowledge graph and re-embeds the entity in the vector database.
Also synchronizes entity_chunks_storage and relation_chunks_storage to track chunk references.
Args:
entity_name: Name of the entity to edit
updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "entity_type": "new type"}
allow_rename: Whether to allow entity renaming, defaults to True
allow_merge: Whether to merge into an existing entity when renaming to an existing name
Returns:
Dictionary containing updated entity information
@@ -3496,14 +3644,21 @@ class LightRAG:
entity_name,
updated_data,
allow_rename,
allow_merge,
self.entity_chunks,
self.relation_chunks,
)
def edit_entity(
self, entity_name: str, updated_data: dict[str, str], allow_rename: bool = True
self,
entity_name: str,
updated_data: dict[str, str],
allow_rename: bool = True,
allow_merge: bool = False,
) -> dict[str, Any]:
loop = always_get_an_event_loop()
return loop.run_until_complete(
self.aedit_entity(entity_name, updated_data, allow_rename)
self.aedit_entity(entity_name, updated_data, allow_rename, allow_merge)
)
async def aedit_relation(
@@ -3512,6 +3667,7 @@ class LightRAG:
"""Asynchronously edit relation information.
Updates relation (edge) information in the knowledge graph and re-embeds the relation in the vector database.
Also synchronizes the relation_chunks_storage to track which chunks reference this relation.
Args:
source_entity: Name of the source entity
@@ -3530,6 +3686,7 @@ class LightRAG:
source_entity,
target_entity,
updated_data,
self.relation_chunks,
)
def edit_relation(
@@ -3641,6 +3798,8 @@ class LightRAG:
target_entity,
merge_strategy,
target_entity_data,
self.entity_chunks,
self.relation_chunks,
)
def merge_entities(

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
from functools import partial
from pathlib import Path
import asyncio
import json
@@ -7,6 +8,7 @@ import json_repair
from typing import Any, AsyncIterator, overload, Literal
from collections import Counter, defaultdict
from lightrag.exceptions import PipelineCancelledException
from lightrag.utils import (
logger,
compute_mdhash_id,
@@ -67,7 +69,7 @@ from dotenv import load_dotenv
# use the .env that is inside the current folder
# allows to use different .env file for each lightrag instance
# the OS environment variables take precedence over the .env file
load_dotenv(dotenv_path=".env", override=False)
load_dotenv(dotenv_path=Path(__file__).resolve().parent / ".env", override=False)
def _truncate_entity_identifier(
@@ -500,7 +502,7 @@ async def _handle_single_relationship_extraction(
return None
async def _rebuild_knowledge_from_chunks(
async def rebuild_knowledge_from_chunks(
entities_to_rebuild: dict[str, list[str]],
relationships_to_rebuild: dict[tuple[str, str], list[str]],
knowledge_graph_inst: BaseGraphStorage,
@@ -675,14 +677,6 @@ async def _rebuild_knowledge_from_chunks(
entity_chunks_storage=entity_chunks_storage,
)
rebuilt_entities_count += 1
status_message = (
f"Rebuild `{entity_name}` from {len(chunk_ids)} chunks"
)
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
except Exception as e:
failed_entities_count += 1
status_message = f"Failed to rebuild `{entity_name}`: {e}"
@@ -708,6 +702,7 @@ async def _rebuild_knowledge_from_chunks(
await _rebuild_single_relationship(
knowledge_graph_inst=knowledge_graph_inst,
relationships_vdb=relationships_vdb,
entities_vdb=entities_vdb,
src=src,
tgt=tgt,
chunk_ids=chunk_ids,
@@ -715,13 +710,14 @@ async def _rebuild_knowledge_from_chunks(
llm_response_cache=llm_response_cache,
global_config=global_config,
relation_chunks_storage=relation_chunks_storage,
entity_chunks_storage=entity_chunks_storage,
pipeline_status=pipeline_status,
pipeline_status_lock=pipeline_status_lock,
)
rebuilt_relationships_count += 1
except Exception as e:
failed_relationships_count += 1
status_message = f"Failed to rebuild `{src} - {tgt}`: {e}"
status_message = f"Failed to rebuild `{src}`~`{tgt}`: {e}"
logger.info(status_message) # Per requirement, change to info
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
@@ -1290,6 +1286,7 @@ async def _rebuild_single_entity(
async def _rebuild_single_relationship(
knowledge_graph_inst: BaseGraphStorage,
relationships_vdb: BaseVectorStorage,
entities_vdb: BaseVectorStorage,
src: str,
tgt: str,
chunk_ids: list[str],
@@ -1297,6 +1294,7 @@ async def _rebuild_single_relationship(
llm_response_cache: BaseKVStorage,
global_config: dict[str, str],
relation_chunks_storage: BaseKVStorage | None = None,
entity_chunks_storage: BaseKVStorage | None = None,
pipeline_status: dict | None = None,
pipeline_status_lock=None,
) -> None:
@@ -1440,9 +1438,69 @@ async def _rebuild_single_relationship(
else current_relationship.get("file_path", "unknown_source"),
"truncate": truncation_info,
}
# Ensure both endpoint nodes exist before writing the edge back
# (certain storage backends require pre-existing nodes).
node_description = (
updated_relationship_data["description"]
if updated_relationship_data.get("description")
else current_relationship.get("description", "")
)
node_source_id = updated_relationship_data.get("source_id", "")
node_file_path = updated_relationship_data.get("file_path", "unknown_source")
for node_id in {src, tgt}:
if not (await knowledge_graph_inst.has_node(node_id)):
node_created_at = int(time.time())
node_data = {
"entity_id": node_id,
"source_id": node_source_id,
"description": node_description,
"entity_type": "UNKNOWN",
"file_path": node_file_path,
"created_at": node_created_at,
"truncate": "",
}
await knowledge_graph_inst.upsert_node(node_id, node_data=node_data)
# Update entity_chunks_storage for the newly created entity
if entity_chunks_storage is not None and limited_chunk_ids:
await entity_chunks_storage.upsert(
{
node_id: {
"chunk_ids": limited_chunk_ids,
"count": len(limited_chunk_ids),
}
}
)
# Update entity_vdb for the newly created entity
if entities_vdb is not None:
entity_vdb_id = compute_mdhash_id(node_id, prefix="ent-")
entity_content = f"{node_id}\n{node_description}"
vdb_data = {
entity_vdb_id: {
"content": entity_content,
"entity_name": node_id,
"source_id": node_source_id,
"entity_type": "UNKNOWN",
"file_path": node_file_path,
}
}
await safe_vdb_operation_with_exception(
operation=lambda payload=vdb_data: entities_vdb.upsert(payload),
operation_name="rebuild_added_entity_upsert",
entity_name=node_id,
max_retries=3,
retry_delay=0.1,
)
await knowledge_graph_inst.upsert_edge(src, tgt, updated_relationship_data)
# Update relationship in vector database
# Sort src and tgt to ensure consistent ordering (smaller string first)
if src > tgt:
src, tgt = tgt, src
try:
rel_vdb_id = compute_mdhash_id(src + tgt, prefix="rel-")
rel_vdb_id_reverse = compute_mdhash_id(tgt + src, prefix="rel-")
@@ -1485,7 +1543,7 @@ async def _rebuild_single_relationship(
raise # Re-raise exception
# Log rebuild completion with truncation info
status_message = f"Rebuild `{src} - {tgt}` from {len(chunk_ids)} chunks"
status_message = f"Rebuild `{src}`~`{tgt}` from {len(chunk_ids)} chunks"
if truncation_info:
status_message += f" ({truncation_info})"
# Add truncation info from apply_source_ids_limit if truncation occurred
@@ -1637,6 +1695,12 @@ async def _merge_nodes_then_upsert(
logger.error(f"Entity {entity_name} has no description")
raise ValueError(f"Entity {entity_name} has no description")
# Check for cancellation before LLM summary
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
raise PipelineCancelledException("User cancelled during entity summary")
# 8. Get summary description an LLM usage status
description, llm_was_used = await _handle_entity_relation_summary(
"Entity",
@@ -1789,6 +1853,7 @@ async def _merge_edges_then_upsert(
llm_response_cache: BaseKVStorage | None = None,
added_entities: list = None, # New parameter to track entities added during edge processing
relation_chunks_storage: BaseKVStorage | None = None,
entity_chunks_storage: BaseKVStorage | None = None,
):
if src_id == tgt_id:
return None
@@ -1957,6 +2022,14 @@ async def _merge_edges_then_upsert(
logger.error(f"Relation {src_id}~{tgt_id} has no description")
raise ValueError(f"Relation {src_id}~{tgt_id} has no description")
# Check for cancellation before LLM summary
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
raise PipelineCancelledException(
"User cancelled during relation summary"
)
# 8. Get summary description an LLM usage status
description, llm_was_used = await _handle_entity_relation_summary(
"Relation",
@@ -2065,7 +2138,11 @@ async def _merge_edges_then_upsert(
# 11. Update both graph and vector db
for need_insert_id in [src_id, tgt_id]:
if not (await knowledge_graph_inst.has_node(need_insert_id)):
# Optimization: Use get_node instead of has_node + get_node
existing_node = await knowledge_graph_inst.get_node(need_insert_id)
if existing_node is None:
# Node doesn't exist - create new node
node_created_at = int(time.time())
node_data = {
"entity_id": need_insert_id,
@@ -2078,6 +2155,19 @@ async def _merge_edges_then_upsert(
}
await knowledge_graph_inst.upsert_node(need_insert_id, node_data=node_data)
# Update entity_chunks_storage for the newly created entity
if entity_chunks_storage is not None:
chunk_ids = [chunk_id for chunk_id in full_source_ids if chunk_id]
if chunk_ids:
await entity_chunks_storage.upsert(
{
need_insert_id: {
"chunk_ids": chunk_ids,
"count": len(chunk_ids),
}
}
)
if entity_vdb is not None:
entity_vdb_id = compute_mdhash_id(need_insert_id, prefix="ent-")
entity_content = f"{need_insert_id}\n{description}"
@@ -2109,6 +2199,109 @@ async def _merge_edges_then_upsert(
"created_at": node_created_at,
}
added_entities.append(entity_data)
else:
# Node exists - update its source_ids by merging with new source_ids
updated = False # Track if any update occurred
# 1. Get existing full source_ids from entity_chunks_storage
existing_full_source_ids = []
if entity_chunks_storage is not None:
stored_chunks = await entity_chunks_storage.get_by_id(need_insert_id)
if stored_chunks and isinstance(stored_chunks, dict):
existing_full_source_ids = [
chunk_id
for chunk_id in stored_chunks.get("chunk_ids", [])
if chunk_id
]
# If not in entity_chunks_storage, get from graph database
if not existing_full_source_ids:
if existing_node.get("source_id"):
existing_full_source_ids = existing_node["source_id"].split(
GRAPH_FIELD_SEP
)
# 2. Merge with new source_ids from this relationship
new_source_ids_from_relation = [
chunk_id for chunk_id in source_ids if chunk_id
]
merged_full_source_ids = merge_source_ids(
existing_full_source_ids, new_source_ids_from_relation
)
# 3. Save merged full list to entity_chunks_storage (conditional)
if (
entity_chunks_storage is not None
and merged_full_source_ids != existing_full_source_ids
):
updated = True
await entity_chunks_storage.upsert(
{
need_insert_id: {
"chunk_ids": merged_full_source_ids,
"count": len(merged_full_source_ids),
}
}
)
# 4. Apply source_ids limit for graph and vector db
limit_method = global_config.get(
"source_ids_limit_method", SOURCE_IDS_LIMIT_METHOD_KEEP
)
max_source_limit = global_config.get("max_source_ids_per_entity")
limited_source_ids = apply_source_ids_limit(
merged_full_source_ids,
max_source_limit,
limit_method,
identifier=f"`{need_insert_id}`",
)
# 5. Update graph database and vector database with limited source_ids (conditional)
limited_source_id_str = GRAPH_FIELD_SEP.join(limited_source_ids)
if limited_source_id_str != existing_node.get("source_id", ""):
updated = True
updated_node_data = {
**existing_node,
"source_id": limited_source_id_str,
}
await knowledge_graph_inst.upsert_node(
need_insert_id, node_data=updated_node_data
)
# Update vector database
if entity_vdb is not None:
entity_vdb_id = compute_mdhash_id(need_insert_id, prefix="ent-")
entity_content = (
f"{need_insert_id}\n{existing_node.get('description', '')}"
)
vdb_data = {
entity_vdb_id: {
"content": entity_content,
"entity_name": need_insert_id,
"source_id": limited_source_id_str,
"entity_type": existing_node.get("entity_type", "UNKNOWN"),
"file_path": existing_node.get(
"file_path", "unknown_source"
),
}
}
await safe_vdb_operation_with_exception(
operation=lambda payload=vdb_data: entity_vdb.upsert(payload),
operation_name="existing_entity_update",
entity_name=need_insert_id,
max_retries=3,
retry_delay=0.1,
)
# 6. Log once at the end if any update occurred
if updated:
status_message = f"Chunks appended from relation: `{need_insert_id}`"
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
edge_created_at = int(time.time())
await knowledge_graph_inst.upsert_edge(
@@ -2137,6 +2330,10 @@ async def _merge_edges_then_upsert(
weight=weight,
)
# Sort src_id and tgt_id to ensure consistent ordering (smaller string first)
if src_id > tgt_id:
src_id, tgt_id = tgt_id, src_id
if relationships_vdb is not None:
rel_vdb_id = compute_mdhash_id(src_id + tgt_id, prefix="rel-")
rel_vdb_id_reverse = compute_mdhash_id(tgt_id + src_id, prefix="rel-")
@@ -2214,6 +2411,12 @@ async def merge_nodes_and_edges(
file_path: File path for logging
"""
# Check for cancellation at the start of merge
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
raise PipelineCancelledException("User cancelled during merge phase")
# Collect all nodes and edges from all chunks
all_nodes = defaultdict(list)
all_edges = defaultdict(list)
@@ -2250,6 +2453,14 @@ async def merge_nodes_and_edges(
async def _locked_process_entity_name(entity_name, entities):
async with semaphore:
# Check for cancellation before processing entity
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
raise PipelineCancelledException(
"User cancelled during entity merge"
)
workspace = global_config.get("workspace", "")
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
async with get_storage_keyed_lock(
@@ -2272,9 +2483,7 @@ async def merge_nodes_and_edges(
return entity_data
except Exception as e:
error_msg = (
f"Critical error in entity processing for `{entity_name}`: {e}"
)
error_msg = f"Error processing entity `{entity_name}`: {e}"
logger.error(error_msg)
# Try to update pipeline status, but don't let status update failure affect main exception
@@ -2310,36 +2519,32 @@ async def merge_nodes_and_edges(
entity_tasks, return_when=asyncio.FIRST_EXCEPTION
)
# Check if any task raised an exception and ensure all exceptions are retrieved
first_exception = None
successful_results = []
processed_entities = []
for task in done:
try:
exception = task.exception()
if exception is not None:
if first_exception is None:
first_exception = exception
else:
successful_results.append(task.result())
except Exception as e:
result = task.result()
except BaseException as e:
if first_exception is None:
first_exception = e
else:
processed_entities.append(result)
if pending:
for task in pending:
task.cancel()
pending_results = await asyncio.gather(*pending, return_exceptions=True)
for result in pending_results:
if isinstance(result, BaseException):
if first_exception is None:
first_exception = result
else:
processed_entities.append(result)
# If any task failed, cancel all pending tasks and raise the first exception
if first_exception is not None:
# Cancel all pending tasks
for pending_task in pending:
pending_task.cancel()
# Wait for cancellation to complete
if pending:
await asyncio.wait(pending)
# Re-raise the first exception to notify the caller
raise first_exception
# If all tasks completed successfully, collect results
processed_entities = [task.result() for task in entity_tasks]
# ===== Phase 2: Process all relationships concurrently =====
log_message = f"Phase 2: Processing {total_relations_count} relations from {doc_id} (async: {graph_max_async})"
logger.info(log_message)
@@ -2349,6 +2554,14 @@ async def merge_nodes_and_edges(
async def _locked_process_edges(edge_key, edges):
async with semaphore:
# Check for cancellation before processing edges
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
raise PipelineCancelledException(
"User cancelled during relation merge"
)
workspace = global_config.get("workspace", "")
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
sorted_edge_key = sorted([edge_key[0], edge_key[1]])
@@ -2375,6 +2588,7 @@ async def merge_nodes_and_edges(
llm_response_cache,
added_entities, # Pass list to collect added entities
relation_chunks_storage,
entity_chunks_storage, # Add entity_chunks_storage parameter
)
if edge_data is None:
@@ -2383,7 +2597,7 @@ async def merge_nodes_and_edges(
return edge_data, added_entities
except Exception as e:
error_msg = f"Critical error in relationship processing for `{sorted_edge_key}`: {e}"
error_msg = f"Error processing relation `{sorted_edge_key}`: {e}"
logger.error(error_msg)
# Try to update pipeline status, but don't let status update failure affect main exception
@@ -2421,40 +2635,36 @@ async def merge_nodes_and_edges(
edge_tasks, return_when=asyncio.FIRST_EXCEPTION
)
# Check if any task raised an exception and ensure all exceptions are retrieved
first_exception = None
successful_results = []
for task in done:
try:
exception = task.exception()
if exception is not None:
if first_exception is None:
first_exception = exception
else:
successful_results.append(task.result())
except Exception as e:
edge_data, added_entities = task.result()
except BaseException as e:
if first_exception is None:
first_exception = e
else:
if edge_data is not None:
processed_edges.append(edge_data)
all_added_entities.extend(added_entities)
if pending:
for task in pending:
task.cancel()
pending_results = await asyncio.gather(*pending, return_exceptions=True)
for result in pending_results:
if isinstance(result, BaseException):
if first_exception is None:
first_exception = result
else:
edge_data, added_entities = result
if edge_data is not None:
processed_edges.append(edge_data)
all_added_entities.extend(added_entities)
# If any task failed, cancel all pending tasks and raise the first exception
if first_exception is not None:
# Cancel all pending tasks
for pending_task in pending:
pending_task.cancel()
# Wait for cancellation to complete
if pending:
await asyncio.wait(pending)
# Re-raise the first exception to notify the caller
raise first_exception
# If all tasks completed successfully, collect results
for task in edge_tasks:
edge_data, added_entities = task.result()
if edge_data is not None:
processed_edges.append(edge_data)
all_added_entities.extend(added_entities)
# ===== Phase 3: Update full_entities and full_relations storage =====
if full_entities_storage and full_relations_storage and doc_id:
try:
@@ -2535,6 +2745,14 @@ async def extract_entities(
llm_response_cache: BaseKVStorage | None = None,
text_chunks_storage: BaseKVStorage | None = None,
) -> list:
# Check for cancellation at the start of entity extraction
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
raise PipelineCancelledException(
"User cancelled during entity extraction"
)
use_llm_func: callable = global_config["llm_model_func"]
entity_extract_max_gleaning = global_config["entity_extract_max_gleaning"]
@@ -2702,6 +2920,14 @@ async def extract_entities(
async def _process_with_semaphore(chunk):
async with semaphore:
# Check for cancellation before processing chunk
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
raise PipelineCancelledException(
"User cancelled during chunk processing"
)
try:
return await _process_single_content(chunk)
except Exception as e:

View File

@@ -2551,6 +2551,52 @@ def apply_source_ids_limit(
return truncated
def compute_incremental_chunk_ids(
existing_full_chunk_ids: list[str],
old_chunk_ids: list[str],
new_chunk_ids: list[str],
) -> list[str]:
"""
Compute incrementally updated chunk IDs based on changes.
This function applies delta changes (additions and removals) to an existing
list of chunk IDs while maintaining order and ensuring deduplication.
Delta additions from new_chunk_ids are placed at the end.
Args:
existing_full_chunk_ids: Complete list of existing chunk IDs from storage
old_chunk_ids: Previous chunk IDs from source_id (chunks being replaced)
new_chunk_ids: New chunk IDs from updated source_id (chunks being added)
Returns:
Updated list of chunk IDs with deduplication
Example:
>>> existing = ['chunk-1', 'chunk-2', 'chunk-3']
>>> old = ['chunk-1', 'chunk-2']
>>> new = ['chunk-2', 'chunk-4']
>>> compute_incremental_chunk_ids(existing, old, new)
['chunk-3', 'chunk-2', 'chunk-4']
"""
# Calculate changes
chunks_to_remove = set(old_chunk_ids) - set(new_chunk_ids)
chunks_to_add = set(new_chunk_ids) - set(old_chunk_ids)
# Apply changes to full chunk_ids
# Step 1: Remove chunks that are no longer needed
updated_chunk_ids = [
cid for cid in existing_full_chunk_ids if cid not in chunks_to_remove
]
# Step 2: Add new chunks (preserving order from new_chunk_ids)
# Note: 'cid not in updated_chunk_ids' check ensures deduplication
for cid in new_chunk_ids:
if cid in chunks_to_add and cid not in updated_chunk_ids:
updated_chunk_ids.append(cid)
return updated_chunk_ids
def subtract_source_ids(
source_ids: Iterable[str],
ids_to_remove: Collection[str],

File diff suppressed because it is too large Load Diff

View File

@@ -1,4 +1,4 @@
# Development environment configuration
VITE_BACKEND_URL=http://localhost:9621
VITE_API_PROXY=true
VITE_API_ENDPOINTS=/api,/documents,/graphs,/graph,/health,/query,/docs,/redoc,/openapi.json,/login,/auth-status
VITE_API_ENDPOINTS=/api,/documents,/graphs,/graph,/health,/query,/docs,/redoc,/openapi.json,/login,/auth-status,/static

View File

@@ -1,4 +1,4 @@
# Development environment configuration
VITE_BACKEND_URL=http://localhost:9621
VITE_API_PROXY=true
VITE_API_ENDPOINTS=/api,/documents,/graphs,/graph,/health,/query,/docs,/redoc,/openapi.json,/login,/auth-status
VITE_API_ENDPOINTS=/api,/documents,/graphs,/graph,/health,/query,/docs,/redoc,/openapi.json,/login,/auth-status,/static

View File

@@ -143,6 +143,21 @@ export type QueryResponse = {
response: string
}
export type EntityUpdateResponse = {
status: string
message: string
data: Record<string, any>
operation_summary?: {
merged: boolean
merge_status: 'success' | 'failed' | 'not_attempted'
merge_error: string | null
operation_status: 'success' | 'partial_success' | 'failure'
target_entity: string | null
final_entity?: string | null
renamed?: boolean
}
}
export type DocActionResponse = {
status: 'success' | 'partial_success' | 'failure' | 'duplicated'
message: string
@@ -167,7 +182,7 @@ export type DeleteDocResponse = {
doc_id: string
}
export type DocStatus = 'pending' | 'processing' | 'multimodal_processed' | 'processed' | 'failed'
export type DocStatus = 'pending' | 'processing' | 'preprocessed' | 'processed' | 'failed'
export type DocStatusResponse = {
id: string
@@ -242,6 +257,7 @@ export type PipelineStatusResponse = {
batchs: number
cur_batch: number
request_pending: boolean
cancellation_requested?: boolean
latest_message: string
history_messages?: string[]
update_status?: Record<string, any>
@@ -691,6 +707,14 @@ export const getPipelineStatus = async (): Promise<PipelineStatusResponse> => {
return response.data
}
export const cancelPipeline = async (): Promise<{
status: 'cancellation_requested' | 'not_busy'
message: string
}> => {
const response = await axiosInstance.post('/documents/cancel_pipeline')
return response.data
}
export const loginToServer = async (username: string, password: string): Promise<LoginResponse> => {
const formData = new FormData();
formData.append('username', username);
@@ -710,17 +734,20 @@ export const loginToServer = async (username: string, password: string): Promise
* @param entityName The name of the entity to update
* @param updatedData Dictionary containing updated attributes
* @param allowRename Whether to allow renaming the entity (default: false)
* @param allowMerge Whether to merge into an existing entity when renaming to a duplicate name
* @returns Promise with the updated entity information
*/
export const updateEntity = async (
entityName: string,
updatedData: Record<string, any>,
allowRename: boolean = false
): Promise<DocActionResponse> => {
allowRename: boolean = false,
allowMerge: boolean = false
): Promise<EntityUpdateResponse> => {
const response = await axiosInstance.post('/graph/entity/edit', {
entity_name: entityName,
updated_data: updatedData,
allow_rename: allowRename
allow_rename: allowRename,
allow_merge: allowMerge
})
return response.data
}

View File

@@ -11,7 +11,7 @@ import {
DialogDescription
} from '@/components/ui/Dialog'
import Button from '@/components/ui/Button'
import { getPipelineStatus, PipelineStatusResponse } from '@/api/lightrag'
import { getPipelineStatus, cancelPipeline, PipelineStatusResponse } from '@/api/lightrag'
import { errorMessage } from '@/lib/utils'
import { cn } from '@/lib/utils'
@@ -30,6 +30,7 @@ export default function PipelineStatusDialog({
const [status, setStatus] = useState<PipelineStatusResponse | null>(null)
const [position, setPosition] = useState<DialogPosition>('center')
const [isUserScrolled, setIsUserScrolled] = useState(false)
const [showCancelConfirm, setShowCancelConfirm] = useState(false)
const historyRef = useRef<HTMLDivElement>(null)
// Reset position when dialog opens
@@ -37,6 +38,9 @@ export default function PipelineStatusDialog({
if (open) {
setPosition('center')
setIsUserScrolled(false)
} else {
// Reset confirmation dialog state when main dialog closes
setShowCancelConfirm(false)
}
}, [open])
@@ -81,6 +85,24 @@ export default function PipelineStatusDialog({
return () => clearInterval(interval)
}, [open, t])
// Handle cancel pipeline confirmation
const handleConfirmCancel = async () => {
setShowCancelConfirm(false)
try {
const result = await cancelPipeline()
if (result.status === 'cancellation_requested') {
toast.success(t('documentPanel.pipelineStatus.cancelSuccess'))
} else if (result.status === 'not_busy') {
toast.info(t('documentPanel.pipelineStatus.cancelNotBusy'))
}
} catch (err) {
toast.error(t('documentPanel.pipelineStatus.cancelFailed', { error: errorMessage(err) }))
}
}
// Determine if cancel button should be enabled
const canCancel = status?.busy === true && !status?.cancellation_requested
return (
<Dialog open={open} onOpenChange={onOpenChange}>
<DialogContent
@@ -142,16 +164,43 @@ export default function PipelineStatusDialog({
{/* Status Content */}
<div className="space-y-4 pt-4">
{/* Pipeline Status */}
<div className="flex items-center gap-4">
<div className="flex items-center gap-2">
<div className="text-sm font-medium">{t('documentPanel.pipelineStatus.busy')}:</div>
<div className={`h-2 w-2 rounded-full ${status?.busy ? 'bg-green-500' : 'bg-gray-300'}`} />
</div>
<div className="flex items-center gap-2">
<div className="text-sm font-medium">{t('documentPanel.pipelineStatus.requestPending')}:</div>
<div className={`h-2 w-2 rounded-full ${status?.request_pending ? 'bg-green-500' : 'bg-gray-300'}`} />
{/* Pipeline Status - with cancel button */}
<div className="flex flex-wrap items-center justify-between gap-4">
{/* Left side: Status indicators */}
<div className="flex items-center gap-4">
<div className="flex items-center gap-2">
<div className="text-sm font-medium">{t('documentPanel.pipelineStatus.busy')}:</div>
<div className={`h-2 w-2 rounded-full ${status?.busy ? 'bg-green-500' : 'bg-gray-300'}`} />
</div>
<div className="flex items-center gap-2">
<div className="text-sm font-medium">{t('documentPanel.pipelineStatus.requestPending')}:</div>
<div className={`h-2 w-2 rounded-full ${status?.request_pending ? 'bg-green-500' : 'bg-gray-300'}`} />
</div>
{/* Only show cancellation status when it's requested */}
{status?.cancellation_requested && (
<div className="flex items-center gap-2">
<div className="text-sm font-medium">{t('documentPanel.pipelineStatus.cancellationRequested')}:</div>
<div className="h-2 w-2 rounded-full bg-red-500" />
</div>
)}
</div>
{/* Right side: Cancel button - only show when pipeline is busy */}
{status?.busy && (
<Button
variant="destructive"
size="sm"
disabled={!canCancel}
onClick={() => setShowCancelConfirm(true)}
title={
status?.cancellation_requested
? t('documentPanel.pipelineStatus.cancelInProgress')
: t('documentPanel.pipelineStatus.cancelTooltip')
}
>
{t('documentPanel.pipelineStatus.cancelButton')}
</Button>
)}
</div>
{/* Job Information */}
@@ -172,31 +221,49 @@ export default function PipelineStatusDialog({
</div>
</div>
{/* Latest Message */}
<div className="space-y-2">
<div className="text-sm font-medium">{t('documentPanel.pipelineStatus.latestMessage')}:</div>
<div className="font-mono text-xs rounded-md bg-zinc-800 text-zinc-100 p-3 whitespace-pre-wrap break-words">
{status?.latest_message || '-'}
</div>
</div>
{/* History Messages */}
<div className="space-y-2">
<div className="text-sm font-medium">{t('documentPanel.pipelineStatus.historyMessages')}:</div>
<div className="text-sm font-medium">{t('documentPanel.pipelineStatus.pipelineMessages')}:</div>
<div
ref={historyRef}
onScroll={handleScroll}
className="font-mono text-xs rounded-md bg-zinc-800 text-zinc-100 p-3 overflow-y-auto min-h-[7.5em] max-h-[40vh]"
className="font-mono text-xs rounded-md bg-zinc-800 text-zinc-100 p-3 overflow-y-auto overflow-x-hidden min-h-[7.5em] max-h-[40vh]"
>
{status?.history_messages?.length ? (
status.history_messages.map((msg, idx) => (
<div key={idx} className="whitespace-pre-wrap break-words">{msg}</div>
<div key={idx} className="whitespace-pre-wrap break-all">{msg}</div>
))
) : '-'}
</div>
</div>
</div>
</DialogContent>
{/* Cancel Confirmation Dialog */}
<Dialog open={showCancelConfirm} onOpenChange={setShowCancelConfirm}>
<DialogContent className="sm:max-w-[425px]">
<DialogHeader>
<DialogTitle>{t('documentPanel.pipelineStatus.cancelConfirmTitle')}</DialogTitle>
<DialogDescription>
{t('documentPanel.pipelineStatus.cancelConfirmDescription')}
</DialogDescription>
</DialogHeader>
<div className="flex justify-end gap-3 mt-4">
<Button
variant="outline"
onClick={() => setShowCancelConfirm(false)}
>
{t('common.cancel')}
</Button>
<Button
variant="destructive"
onClick={handleConfirmCancel}
>
{t('documentPanel.pipelineStatus.cancelConfirmButton')}
</Button>
</div>
</DialogContent>
</Dialog>
</Dialog>
)
}

View File

@@ -3,8 +3,11 @@ import { useTranslation } from 'react-i18next'
import { toast } from 'sonner'
import { updateEntity, updateRelation, checkEntityNameExists } from '@/api/lightrag'
import { useGraphStore } from '@/stores/graph'
import { useSettingsStore } from '@/stores/settings'
import { SearchHistoryManager } from '@/utils/SearchHistoryManager'
import { PropertyName, EditIcon, PropertyValue } from './PropertyRowComponents'
import PropertyEditDialog from './PropertyEditDialog'
import MergeDialog from './MergeDialog'
/**
* Interface for the EditablePropertyRow component props
@@ -48,6 +51,12 @@ const EditablePropertyRow = ({
const [isEditing, setIsEditing] = useState(false)
const [isSubmitting, setIsSubmitting] = useState(false)
const [currentValue, setCurrentValue] = useState(initialValue)
const [errorMessage, setErrorMessage] = useState<string | null>(null)
const [mergeDialogOpen, setMergeDialogOpen] = useState(false)
const [mergeDialogInfo, setMergeDialogInfo] = useState<{
targetEntity: string
sourceEntity: string
} | null>(null)
useEffect(() => {
setCurrentValue(initialValue)
@@ -56,42 +65,134 @@ const EditablePropertyRow = ({
const handleEditClick = () => {
if (isEditable && !isEditing) {
setIsEditing(true)
setErrorMessage(null)
}
}
const handleCancel = () => {
setIsEditing(false)
setErrorMessage(null)
}
const handleSave = async (value: string) => {
const handleSave = async (value: string, options?: { allowMerge?: boolean }) => {
if (isSubmitting || value === String(currentValue)) {
setIsEditing(false)
setErrorMessage(null)
return
}
setIsSubmitting(true)
setErrorMessage(null)
try {
if (entityType === 'node' && entityId && nodeId) {
let updatedData = { [name]: value }
const allowMerge = options?.allowMerge ?? false
if (name === 'entity_id') {
const exists = await checkEntityNameExists(value)
if (exists) {
toast.error(t('graphPanel.propertiesView.errors.duplicateName'))
return
if (!allowMerge) {
const exists = await checkEntityNameExists(value)
if (exists) {
const errorMsg = t('graphPanel.propertiesView.errors.duplicateName')
setErrorMessage(errorMsg)
toast.error(errorMsg)
return
}
}
updatedData = { 'entity_name': value }
}
await updateEntity(entityId, updatedData, true)
try {
await useGraphStore.getState().updateNodeAndSelect(nodeId, entityId, name, value)
} catch (error) {
console.error('Error updating node in graph:', error)
throw new Error('Failed to update node in graph')
const response = await updateEntity(entityId, updatedData, true, allowMerge)
const operationSummary = response.operation_summary
const operationStatus = operationSummary?.operation_status || 'complete_success'
const finalValue = operationSummary?.final_entity ?? value
// Handle different operation statuses
if (operationStatus === 'success') {
if (operationSummary?.merged) {
// Node was successfully merged into an existing entity
setMergeDialogInfo({
targetEntity: finalValue,
sourceEntity: entityId,
})
setMergeDialogOpen(true)
// Remove old entity name from search history
SearchHistoryManager.removeLabel(entityId)
// Note: Search Label update is deferred until user clicks refresh button in merge dialog
toast.success(t('graphPanel.propertiesView.success.entityMerged'))
} else {
// Node was updated/renamed normally
try {
await useGraphStore
.getState()
.updateNodeAndSelect(nodeId, entityId, name, finalValue)
} catch (error) {
console.error('Error updating node in graph:', error)
throw new Error('Failed to update node in graph')
}
// Update search history: remove old name, add new name
if (name === 'entity_id') {
const currentLabel = useSettingsStore.getState().queryLabel
SearchHistoryManager.removeLabel(entityId)
SearchHistoryManager.addToHistory(finalValue)
// Trigger dropdown refresh to show updated search history
useSettingsStore.getState().triggerSearchLabelDropdownRefresh()
// If current queryLabel is the old entity name, update to new name
if (currentLabel === entityId) {
useSettingsStore.getState().setQueryLabel(finalValue)
}
}
toast.success(t('graphPanel.propertiesView.success.entityUpdated'))
}
// Update local state and notify parent component
// For entity_id updates, use finalValue (which may be different due to merging)
// For other properties, use the original value the user entered
const valueToSet = name === 'entity_id' ? finalValue : value
setCurrentValue(valueToSet)
onValueChange?.(valueToSet)
} else if (operationStatus === 'partial_success') {
// Partial success: update succeeded but merge failed
// Do NOT update graph data to keep frontend in sync with backend
const mergeError = operationSummary?.merge_error || 'Unknown error'
const errorMsg = t('graphPanel.propertiesView.errors.updateSuccessButMergeFailed', {
error: mergeError
})
setErrorMessage(errorMsg)
toast.error(errorMsg)
// Do not update currentValue or call onValueChange
return
} else {
// Complete failure or unknown status
// Check if this was a merge attempt or just a regular update
if (operationSummary?.merge_status === 'failed') {
// Merge operation was attempted but failed
const mergeError = operationSummary?.merge_error || 'Unknown error'
const errorMsg = t('graphPanel.propertiesView.errors.mergeFailed', {
error: mergeError
})
setErrorMessage(errorMsg)
toast.error(errorMsg)
} else {
// Regular update failed (no merge involved)
const errorMsg = t('graphPanel.propertiesView.errors.updateFailed')
setErrorMessage(errorMsg)
toast.error(errorMsg)
}
// Do not update currentValue or call onValueChange
return
}
toast.success(t('graphPanel.propertiesView.success.entityUpdated'))
} else if (entityType === 'edge' && sourceId && targetId && edgeId && dynamicId) {
const updatedData = { [name]: value }
await updateRelation(sourceId, targetId, updatedData)
@@ -102,19 +203,53 @@ const EditablePropertyRow = ({
throw new Error('Failed to update edge in graph')
}
toast.success(t('graphPanel.propertiesView.success.relationUpdated'))
setCurrentValue(value)
onValueChange?.(value)
}
setIsEditing(false)
setCurrentValue(value)
onValueChange?.(value)
} catch (error) {
console.error('Error updating property:', error)
toast.error(t('graphPanel.propertiesView.errors.updateFailed'))
const errorMsg = error instanceof Error ? error.message : t('graphPanel.propertiesView.errors.updateFailed')
setErrorMessage(errorMsg)
toast.error(errorMsg)
return
} finally {
setIsSubmitting(false)
}
}
const handleMergeRefresh = (useMergedStart: boolean) => {
const info = mergeDialogInfo
const graphState = useGraphStore.getState()
const settingsState = useSettingsStore.getState()
const currentLabel = settingsState.queryLabel
// Clear graph state
graphState.clearSelection()
graphState.setGraphDataFetchAttempted(false)
graphState.setLastSuccessfulQueryLabel('')
if (useMergedStart && info?.targetEntity) {
// Use merged entity as new start point (might already be set in handleSave)
settingsState.setQueryLabel(info.targetEntity)
} else {
// Keep current start point - refresh by resetting and restoring label
// This handles the case where user wants to stay with current label
settingsState.setQueryLabel('')
setTimeout(() => {
settingsState.setQueryLabel(currentLabel)
}, 50)
}
// Force graph re-render and reset zoom/scale (same as refresh button behavior)
graphState.incrementGraphDataVersion()
setMergeDialogOpen(false)
setMergeDialogInfo(null)
toast.info(t('graphPanel.propertiesView.mergeDialog.refreshing'))
}
return (
<div className="flex items-center gap-1 overflow-hidden">
<PropertyName name={name} />
@@ -131,6 +266,19 @@ const EditablePropertyRow = ({
propertyName={name}
initialValue={String(currentValue)}
isSubmitting={isSubmitting}
errorMessage={errorMessage}
/>
<MergeDialog
mergeDialogOpen={mergeDialogOpen}
mergeDialogInfo={mergeDialogInfo}
onOpenChange={(open) => {
setMergeDialogOpen(open)
if (!open) {
setMergeDialogInfo(null)
}
}}
onRefresh={handleMergeRefresh}
/>
</div>
)

View File

@@ -17,6 +17,7 @@ import { getPopularLabels, searchLabels } from '@/api/lightrag'
const GraphLabels = () => {
const { t } = useTranslation()
const label = useSettingsStore.use.queryLabel()
const dropdownRefreshTrigger = useSettingsStore.use.searchLabelDropdownRefreshTrigger()
const [isRefreshing, setIsRefreshing] = useState(false)
const [refreshTrigger, setRefreshTrigger] = useState(0)
const [selectKey, setSelectKey] = useState(0)
@@ -54,6 +55,18 @@ const GraphLabels = () => {
initializeHistory()
}, [])
// Force AsyncSelect to re-render when label changes externally (e.g., from entity rename/merge)
useEffect(() => {
setSelectKey(prev => prev + 1)
}, [label])
// Force AsyncSelect to re-render when dropdown refresh is triggered (e.g., after entity rename)
useEffect(() => {
if (dropdownRefreshTrigger > 0) {
setSelectKey(prev => prev + 1)
}
}, [dropdownRefreshTrigger])
const fetchData = useCallback(
async (query?: string): Promise<string[]> => {
let results: string[] = [];
@@ -223,6 +236,9 @@ const GraphLabels = () => {
// Update the label to trigger data loading
useSettingsStore.getState().setQueryLabel(newLabel);
// Force graph re-render and reset zoom/scale (must be AFTER setQueryLabel)
useGraphStore.getState().incrementGraphDataVersion();
}}
clearable={false} // Prevent clearing value on reselect
debounceTime={500}

View File

@@ -0,0 +1,70 @@
import { useTranslation } from 'react-i18next'
import { useSettingsStore } from '@/stores/settings'
import {
Dialog,
DialogContent,
DialogDescription,
DialogFooter,
DialogHeader,
DialogTitle
} from '@/components/ui/Dialog'
import Button from '@/components/ui/Button'
interface MergeDialogProps {
mergeDialogOpen: boolean
mergeDialogInfo: {
targetEntity: string
sourceEntity: string
} | null
onOpenChange: (open: boolean) => void
onRefresh: (useMergedStart: boolean) => void
}
/**
* MergeDialog component that appears after a successful entity merge
* Allows user to choose whether to use the merged entity or keep current start point
*/
const MergeDialog = ({
mergeDialogOpen,
mergeDialogInfo,
onOpenChange,
onRefresh
}: MergeDialogProps) => {
const { t } = useTranslation()
const currentQueryLabel = useSettingsStore.use.queryLabel()
return (
<Dialog open={mergeDialogOpen} onOpenChange={onOpenChange}>
<DialogContent>
<DialogHeader>
<DialogTitle>{t('graphPanel.propertiesView.mergeDialog.title')}</DialogTitle>
<DialogDescription>
{t('graphPanel.propertiesView.mergeDialog.description', {
source: mergeDialogInfo?.sourceEntity ?? '',
target: mergeDialogInfo?.targetEntity ?? '',
})}
</DialogDescription>
</DialogHeader>
<p className="text-sm text-muted-foreground">
{t('graphPanel.propertiesView.mergeDialog.refreshHint')}
</p>
<DialogFooter className="mt-4 flex-col gap-2 sm:flex-row sm:justify-end">
{currentQueryLabel !== mergeDialogInfo?.sourceEntity && (
<Button
type="button"
variant="outline"
onClick={() => onRefresh(false)}
>
{t('graphPanel.propertiesView.mergeDialog.keepCurrentStart')}
</Button>
)}
<Button type="button" onClick={() => onRefresh(true)}>
{t('graphPanel.propertiesView.mergeDialog.useMergedStart')}
</Button>
</DialogFooter>
</DialogContent>
</Dialog>
)
}
export default MergeDialog

View File

@@ -225,8 +225,8 @@ const PropertyRow = ({
formattedTooltip += `\n(Truncated: ${truncate})`
}
// Use EditablePropertyRow for editable fields (description, entity_id and keywords)
if (isEditable && (name === 'description' || name === 'entity_id' || name === 'keywords')) {
// Use EditablePropertyRow for editable fields (description, entity_id and entity_type)
if (isEditable && (name === 'description' || name === 'entity_id' || name === 'entity_type' || name === 'keywords')) {
return (
<EditablePropertyRow
name={name}
@@ -325,7 +325,7 @@ const NodePropertiesView = ({ node }: { node: NodeType }) => {
nodeId={String(node.id)}
entityId={node.properties['entity_id']}
entityType="node"
isEditable={name === 'description' || name === 'entity_id'}
isEditable={name === 'description' || name === 'entity_id' || name === 'entity_type'}
truncate={node.properties['truncate']}
/>
)

View File

@@ -9,14 +9,16 @@ import {
DialogDescription
} from '@/components/ui/Dialog'
import Button from '@/components/ui/Button'
import Checkbox from '@/components/ui/Checkbox'
interface PropertyEditDialogProps {
isOpen: boolean
onClose: () => void
onSave: (value: string) => void
onSave: (value: string, options?: { allowMerge?: boolean }) => void
propertyName: string
initialValue: string
isSubmitting?: boolean
errorMessage?: string | null
}
/**
@@ -29,17 +31,18 @@ const PropertyEditDialog = ({
onSave,
propertyName,
initialValue,
isSubmitting = false
isSubmitting = false,
errorMessage = null
}: PropertyEditDialogProps) => {
const { t } = useTranslation()
const [value, setValue] = useState('')
// Add error state to display save failure messages
const [error, setError] = useState<string | null>(null)
const [allowMerge, setAllowMerge] = useState(false)
// Initialize value when dialog opens
useEffect(() => {
if (isOpen) {
setValue(initialValue)
setAllowMerge(false)
}
}, [isOpen, initialValue])
@@ -86,18 +89,8 @@ const PropertyEditDialog = ({
const handleSave = async () => {
if (value.trim() !== '') {
// Clear previous error messages
setError(null)
try {
await onSave(value)
onClose()
} catch (error) {
console.error('Save error:', error)
// Set error message to state for UI display
setError(typeof error === 'object' && error !== null
? (error as Error).message || t('common.saveFailed')
: t('common.saveFailed'))
}
const options = propertyName === 'entity_id' ? { allowMerge } : undefined
await onSave(value, options)
}
}
@@ -116,9 +109,9 @@ const PropertyEditDialog = ({
</DialogHeader>
{/* Display error message if save fails */}
{error && (
<div className="bg-destructive/15 text-destructive px-4 py-2 rounded-md text-sm mt-2">
{error}
{errorMessage && (
<div className="bg-destructive/15 text-destructive px-4 py-2 rounded-md text-sm">
{errorMessage}
</div>
)}
@@ -146,6 +139,25 @@ const PropertyEditDialog = ({
})()}
</div>
{propertyName === 'entity_id' && (
<div className="rounded-md border border-border bg-muted/20 p-3">
<label className="flex items-start gap-2 text-sm font-medium">
<Checkbox
id="allow-merge"
checked={allowMerge}
disabled={isSubmitting}
onCheckedChange={(checked) => setAllowMerge(checked === true)}
/>
<div>
<span>{t('graphPanel.propertiesView.mergeOptionLabel')}</span>
<p className="text-xs font-normal text-muted-foreground">
{t('graphPanel.propertiesView.mergeOptionDescription')}
</p>
</div>
</label>
</div>
)}
<DialogFooter>
<Button
type="button"

View File

@@ -21,7 +21,6 @@ import PaginationControls from '@/components/ui/PaginationControls'
import {
scanNewDocuments,
reprocessFailedDocuments,
getDocumentsPaginated,
DocsStatusesResponse,
DocStatus,
@@ -52,7 +51,7 @@ const getCountValue = (counts: Record<string, number>, ...keys: string[]): numbe
const hasActiveDocumentsStatus = (counts: Record<string, number>): boolean =>
getCountValue(counts, 'PROCESSING', 'processing') > 0 ||
getCountValue(counts, 'PENDING', 'pending') > 0 ||
getCountValue(counts, 'PREPROCESSED', 'preprocessed', 'multimodal_processed') > 0
getCountValue(counts, 'PREPROCESSED', 'preprocessed') > 0
const getDisplayFileName = (doc: DocStatusResponse, maxLength: number = 20): string => {
// Check if file_path exists and is a non-empty string
@@ -257,7 +256,7 @@ export default function DocumentManager() {
const [pageByStatus, setPageByStatus] = useState<Record<StatusFilter, number>>({
all: 1,
processed: 1,
multimodal_processed: 1,
preprocessed: 1,
processing: 1,
pending: 1,
failed: 1,
@@ -324,7 +323,7 @@ export default function DocumentManager() {
setPageByStatus({
all: 1,
processed: 1,
'multimodal_processed': 1,
preprocessed: 1,
processing: 1,
pending: 1,
failed: 1,
@@ -471,8 +470,8 @@ export default function DocumentManager() {
const processedCount = getCountValue(statusCounts, 'PROCESSED', 'processed') || documentCounts.processed || 0;
const preprocessedCount =
getCountValue(statusCounts, 'PREPROCESSED', 'preprocessed', 'multimodal_processed') ||
documentCounts.multimodal_processed ||
getCountValue(statusCounts, 'PREPROCESSED', 'preprocessed') ||
documentCounts.preprocessed ||
0;
const processingCount = getCountValue(statusCounts, 'PROCESSING', 'processing') || documentCounts.processing || 0;
const pendingCount = getCountValue(statusCounts, 'PENDING', 'pending') || documentCounts.pending || 0;
@@ -481,7 +480,7 @@ export default function DocumentManager() {
// Store previous status counts
const prevStatusCounts = useRef({
processed: 0,
multimodal_processed: 0,
preprocessed: 0,
processing: 0,
pending: 0,
failed: 0
@@ -572,7 +571,7 @@ export default function DocumentManager() {
const legacyDocs: DocsStatusesResponse = {
statuses: {
processed: response.documents.filter((doc: DocStatusResponse) => doc.status === 'processed'),
multimodal_processed: response.documents.filter((doc: DocStatusResponse) => doc.status === 'multimodal_processed'),
preprocessed: response.documents.filter((doc: DocStatusResponse) => doc.status === 'preprocessed'),
processing: response.documents.filter((doc: DocStatusResponse) => doc.status === 'processing'),
pending: response.documents.filter((doc: DocStatusResponse) => doc.status === 'pending'),
failed: response.documents.filter((doc: DocStatusResponse) => doc.status === 'failed')
@@ -868,42 +867,6 @@ export default function DocumentManager() {
}
}, [t, startPollingInterval, currentTab, health, statusCounts])
const retryFailedDocuments = useCallback(async () => {
try {
// Check if component is still mounted before starting the request
if (!isMountedRef.current) return;
const { status, message, track_id: _track_id } = await reprocessFailedDocuments(); // eslint-disable-line @typescript-eslint/no-unused-vars
// Check again if component is still mounted after the request completes
if (!isMountedRef.current) return;
// Note: _track_id is available for future use (e.g., progress tracking)
toast.message(message || status);
// Reset health check timer with 1 second delay to avoid race condition
useBackendState.getState().resetHealthCheckTimerDelayed(1000);
// Start fast refresh with 2-second interval immediately after retry
startPollingInterval(2000);
// Set recovery timer to restore normal polling interval after 15 seconds
setTimeout(() => {
if (isMountedRef.current && currentTab === 'documents' && health) {
// Restore intelligent polling interval based on document status
const hasActiveDocuments = hasActiveDocumentsStatus(statusCounts);
const normalInterval = hasActiveDocuments ? 5000 : 30000;
startPollingInterval(normalInterval);
}
}, 15000); // Restore after 15 seconds
} catch (err) {
// Only show error if component is still mounted
if (isMountedRef.current) {
toast.error(errorMessage(err));
}
}
}, [startPollingInterval, currentTab, health, statusCounts])
// Handle page size change - update state and save to store
const handlePageSizeChange = useCallback((newPageSize: number) => {
if (newPageSize === pagination.page_size) return;
@@ -915,7 +878,7 @@ export default function DocumentManager() {
setPageByStatus({
all: 1,
processed: 1,
multimodal_processed: 1,
preprocessed: 1,
processing: 1,
pending: 1,
failed: 1,
@@ -956,7 +919,7 @@ export default function DocumentManager() {
const legacyDocs: DocsStatusesResponse = {
statuses: {
processed: response.documents.filter(doc => doc.status === 'processed'),
multimodal_processed: response.documents.filter(doc => doc.status === 'multimodal_processed'),
preprocessed: response.documents.filter(doc => doc.status === 'preprocessed'),
processing: response.documents.filter(doc => doc.status === 'processing'),
pending: response.documents.filter(doc => doc.status === 'pending'),
failed: response.documents.filter(doc => doc.status === 'failed')
@@ -1032,7 +995,7 @@ export default function DocumentManager() {
// Get new status counts
const newStatusCounts = {
processed: docs?.statuses?.processed?.length || 0,
multimodal_processed: docs?.statuses?.multimodal_processed?.length || 0,
preprocessed: docs?.statuses?.preprocessed?.length || 0,
processing: docs?.statuses?.processing?.length || 0,
pending: docs?.statuses?.pending?.length || 0,
failed: docs?.statuses?.failed?.length || 0
@@ -1166,16 +1129,6 @@ export default function DocumentManager() {
>
<RefreshCwIcon /> {t('documentPanel.documentManager.scanButton')}
</Button>
<Button
variant="outline"
onClick={retryFailedDocuments}
side="bottom"
tooltip={t('documentPanel.documentManager.retryFailedTooltip')}
size="sm"
disabled={pipelineBusy}
>
<RotateCcwIcon /> {t('documentPanel.documentManager.retryFailedButton')}
</Button>
<Button
variant="outline"
onClick={() => setShowPipelineStatus(true)}
@@ -1270,12 +1223,12 @@ export default function DocumentManager() {
</Button>
<Button
size="sm"
variant={statusFilter === 'multimodal_processed' ? 'secondary' : 'outline'}
onClick={() => handleStatusFilterChange('multimodal_processed')}
variant={statusFilter === 'preprocessed' ? 'secondary' : 'outline'}
onClick={() => handleStatusFilterChange('preprocessed')}
disabled={isRefreshing}
className={cn(
preprocessedCount > 0 ? 'text-purple-600' : 'text-gray-500',
statusFilter === 'multimodal_processed' && 'bg-purple-100 dark:bg-purple-900/30 font-medium border border-purple-400 dark:border-purple-600 shadow-sm'
statusFilter === 'preprocessed' && 'bg-purple-100 dark:bg-purple-900/30 font-medium border border-purple-400 dark:border-purple-600 shadow-sm'
)}
>
{t('documentPanel.documentManager.status.preprocessed')} ({preprocessedCount})
@@ -1460,7 +1413,7 @@ export default function DocumentManager() {
{doc.status === 'processed' && (
<span className="text-green-600">{t('documentPanel.documentManager.status.completed')}</span>
)}
{doc.status === 'multimodal_processed' && (
{doc.status === 'preprocessed' && (
<span className="text-purple-600">{t('documentPanel.documentManager.status.preprocessed')}</span>
)}
{doc.status === 'processing' && (

View File

@@ -226,13 +226,13 @@ const GraphViewer = () => {
</div>
{showPropertyPanel && (
<div className="absolute top-2 right-2">
<div className="absolute top-2 right-2 z-10">
<PropertiesView />
</div>
)}
{showLegend && (
<div className="absolute bottom-10 right-2">
<div className="absolute bottom-10 right-2 z-0">
<Legend className="bg-background/60 backdrop-blur-lg" />
</div>
)}

View File

@@ -114,10 +114,8 @@
},
"documentManager": {
"title": "إدارة المستندات",
"scanButton": "مسح ضوئي",
"scanButton": "مسح/إعادة محاولة",
"scanTooltip": "مسح ومعالجة المستندات في مجلد الإدخال، وإعادة معالجة جميع المستندات الفاشلة أيضًا",
"retryFailedButton": "إعادة المحاولة",
"retryFailedTooltip": "إعادة معالجة جميع المستندات الفاشلة",
"refreshTooltip": "إعادة تعيين قائمة المستندات",
"pipelineStatusButton": "خط المعالجة",
"pipelineStatusTooltip": "عرض حالة خط معالجة المستندات",
@@ -157,17 +155,27 @@
"hideFileNameTooltip": "إخفاء اسم الملف"
},
"pipelineStatus": {
"title": "حالة خط المعالجة",
"busy": "خط المعالجة مشغول",
"requestPending": "الطلب معلق",
"title": "حالة خط الأنابيب",
"busy": "خط الأنابيب مشغول",
"requestPending": "طلب معلق",
"cancellationRequested": "طلب الإلغاء",
"jobName": "اسم المهمة",
"startTime": "وقت البدء",
"progress": "التقدم",
"unit": "دفعة",
"latestMessage": "آخر رسالة",
"historyMessages": "سجل الرسائل",
"pipelineMessages": "رسائل خط الأنابيب",
"cancelButton": "إلغاء",
"cancelTooltip": "إلغاء معالجة خط الأنابيب",
"cancelConfirmTitle": "تأكيد إلغاء خط الأنابيب",
"cancelConfirmDescription": "سيؤدي هذا الإجراء إلى إيقاف معالجة خط الأنابيب الجارية. هل أنت متأكد من أنك تريد المتابعة؟",
"cancelConfirmButton": "تأكيد الإلغاء",
"cancelInProgress": "الإلغاء قيد التقدم...",
"pipelineNotRunning": "خط الأنابيب غير قيد التشغيل",
"cancelSuccess": "تم طلب إلغاء خط الأنابيب",
"cancelFailed": "فشل إلغاء خط الأنابيب\n{{error}}",
"cancelNotBusy": "خط الأنابيب غير قيد التشغيل، لا حاجة للإلغاء",
"errors": {
"fetchFailed": "فشل في جلب حالة خط المعالجة\n{{error}}"
"fetchFailed": "فشل في جلب حالة خط الأنابيب\n{{error}}"
}
}
},
@@ -297,11 +305,24 @@
"errors": {
"duplicateName": "اسم العقدة موجود بالفعل",
"updateFailed": "فشل تحديث العقدة",
"tryAgainLater": "يرجى المحاولة مرة أخرى لاحقًا"
"tryAgainLater": "يرجى المحاولة مرة أخرى لاحقًا",
"updateSuccessButMergeFailed": "تم تحديث الخصائص، لكن الدمج فشل: {{error}}",
"mergeFailed": "فشل الدمج: {{error}}"
},
"success": {
"entityUpdated": "تم تحديث العقدة بنجاح",
"relationUpdated": "تم تحديث العلاقة بنجاح"
"relationUpdated": "تم تحديث العلاقة بنجاح",
"entityMerged": "تم دمج العقد بنجاح"
},
"mergeOptionLabel": "دمج تلقائي عند العثور على اسم مكرر",
"mergeOptionDescription": "عند التفعيل، سيتم دمج هذه العقدة تلقائيًا في العقدة الموجودة بدلاً من ظهور خطأ عند إعادة التسمية بنفس الاسم.",
"mergeDialog": {
"title": "تم دمج العقدة",
"description": "\"{{source}}\" تم دمجها في \"{{target}}\".",
"refreshHint": "يجب تحديث الرسم البياني لتحميل البنية الأحدث.",
"keepCurrentStart": "تحديث مع الحفاظ على عقدة البدء الحالية",
"useMergedStart": "تحديث واستخدام العقدة المدمجة كنقطة بدء",
"refreshing": "جارٍ تحديث الرسم البياني..."
},
"node": {
"title": "عقدة",

View File

@@ -114,10 +114,8 @@
},
"documentManager": {
"title": "Document Management",
"scanButton": "Scan",
"scanButton": "Scan/Retry",
"scanTooltip": "Scan and process documents in input folder, and also reprocess all failed documents",
"retryFailedButton": "Retry",
"retryFailedTooltip": "Retry processing all failed documents",
"refreshTooltip": "Reset document list",
"pipelineStatusButton": "Pipeline",
"pipelineStatusTooltip": "View document processing pipeline status",
@@ -160,14 +158,24 @@
"title": "Pipeline Status",
"busy": "Pipeline Busy",
"requestPending": "Request Pending",
"cancellationRequested": "Cancellation Requested",
"jobName": "Job Name",
"startTime": "Start Time",
"progress": "Progress",
"unit": "batch",
"latestMessage": "Latest Message",
"historyMessages": "History Messages",
"unit": "Batch",
"pipelineMessages": "Pipeline Messages",
"cancelButton": "Cancel",
"cancelTooltip": "Cancel pipeline processing",
"cancelConfirmTitle": "Confirm Pipeline Cancellation",
"cancelConfirmDescription": "This will interrupt the ongoing pipeline processing. Are you sure you want to continue?",
"cancelConfirmButton": "Confirm Cancellation",
"cancelInProgress": "Cancellation in progress...",
"pipelineNotRunning": "Pipeline not running",
"cancelSuccess": "Pipeline cancellation requested",
"cancelFailed": "Failed to cancel pipeline\n{{error}}",
"cancelNotBusy": "Pipeline is not running, no need to cancel",
"errors": {
"fetchFailed": "Failed to get pipeline status\n{{error}}"
"fetchFailed": "Failed to fetch pipeline status\n{{error}}"
}
}
},
@@ -297,11 +305,24 @@
"errors": {
"duplicateName": "Node name already exists",
"updateFailed": "Failed to update node",
"tryAgainLater": "Please try again later"
"tryAgainLater": "Please try again later",
"updateSuccessButMergeFailed": "Properties updated, but merge failed: {{error}}",
"mergeFailed": "Merge failed: {{error}}"
},
"success": {
"entityUpdated": "Node updated successfully",
"relationUpdated": "Relation updated successfully"
"relationUpdated": "Relation updated successfully",
"entityMerged": "Nodes merged successfully"
},
"mergeOptionLabel": "Automatically merge when a duplicate name is found",
"mergeOptionDescription": "If enabled, renaming to an existing name will merge this node into the existing one instead of failing.",
"mergeDialog": {
"title": "Node merged",
"description": "\"{{source}}\" has been merged into \"{{target}}\".",
"refreshHint": "Refresh the graph to load the latest structure.",
"keepCurrentStart": "Refresh and keep current start node",
"useMergedStart": "Refresh and use merged node",
"refreshing": "Refreshing graph..."
},
"node": {
"title": "Node",

View File

@@ -114,10 +114,8 @@
},
"documentManager": {
"title": "Gestion des documents",
"scanButton": "Scanner",
"scanButton": "Scanner/Retraiter",
"scanTooltip": "Scanner et traiter les documents dans le dossier d'entrée, et retraiter également tous les documents échoués",
"retryFailedButton": "Réessayer",
"retryFailedTooltip": "Réessayer le traitement de tous les documents échoués",
"refreshTooltip": "Réinitialiser la liste des documents",
"pipelineStatusButton": "Pipeline",
"pipelineStatusTooltip": "Voir l'état du pipeline de traitement des documents",
@@ -158,14 +156,24 @@
},
"pipelineStatus": {
"title": "État du Pipeline",
"busy": "Pipeline occupé",
"requestPending": "Requête en attente",
"jobName": "Nom du travail",
"startTime": "Heure de début",
"progress": "Progression",
"unit": "lot",
"latestMessage": "Dernier message",
"historyMessages": "Historique des messages",
"busy": "Pipeline Occupé",
"requestPending": "Demande en Attente",
"cancellationRequested": "Annulation Demandée",
"jobName": "Nom du Travail",
"startTime": "Heure de Début",
"progress": "Progrès",
"unit": "Lot",
"pipelineMessages": "Messages de Pipeline",
"cancelButton": "Annuler",
"cancelTooltip": "Annuler le traitement du pipeline",
"cancelConfirmTitle": "Confirmer l'Annulation du Pipeline",
"cancelConfirmDescription": "Cette action interrompra le traitement du pipeline en cours. Êtes-vous sûr de vouloir continuer ?",
"cancelConfirmButton": "Confirmer l'Annulation",
"cancelInProgress": "Annulation en cours...",
"pipelineNotRunning": "Le pipeline n'est pas en cours d'exécution",
"cancelSuccess": "Annulation du pipeline demandée",
"cancelFailed": "Échec de l'annulation du pipeline\n{{error}}",
"cancelNotBusy": "Le pipeline n'est pas en cours d'exécution, pas besoin d'annuler",
"errors": {
"fetchFailed": "Échec de la récupération de l'état du pipeline\n{{error}}"
}
@@ -297,11 +305,24 @@
"errors": {
"duplicateName": "Le nom du nœud existe déjà",
"updateFailed": "Échec de la mise à jour du nœud",
"tryAgainLater": "Veuillez réessayer plus tard"
"tryAgainLater": "Veuillez réessayer plus tard",
"updateSuccessButMergeFailed": "Propriétés mises à jour, mais la fusion a échoué : {{error}}",
"mergeFailed": "Échec de la fusion : {{error}}"
},
"success": {
"entityUpdated": "Nœud mis à jour avec succès",
"relationUpdated": "Relation mise à jour avec succès"
"relationUpdated": "Relation mise à jour avec succès",
"entityMerged": "Fusion des nœuds réussie"
},
"mergeOptionLabel": "Fusionner automatiquement en cas de nom dupliqué",
"mergeOptionDescription": "Si activé, renommer vers un nom existant fusionnera automatiquement ce nœud avec celui-ci au lieu d'échouer.",
"mergeDialog": {
"title": "Nœud fusionné",
"description": "\"{{source}}\" a été fusionné dans \"{{target}}\".",
"refreshHint": "Actualisez le graphe pour charger la structure la plus récente.",
"keepCurrentStart": "Actualiser en conservant le nœud de départ actuel",
"useMergedStart": "Actualiser en utilisant le nœud fusionné",
"refreshing": "Actualisation du graphe..."
},
"node": {
"title": "Nœud",

View File

@@ -114,10 +114,8 @@
},
"documentManager": {
"title": "文档管理",
"scanButton": "扫描",
"scanButton": "扫描/重试",
"scanTooltip": "扫描处理输入目录中的文档,同时重新处理所有失败的文档",
"retryFailedButton": "重试",
"retryFailedTooltip": "重新处理所有失败的文档",
"refreshTooltip": "复位文档清单",
"pipelineStatusButton": "流水线",
"pipelineStatusTooltip": "查看文档处理流水线状态",
@@ -160,12 +158,22 @@
"title": "流水线状态",
"busy": "流水线忙碌",
"requestPending": "待处理请求",
"cancellationRequested": "取消请求",
"jobName": "作业名称",
"startTime": "开始时间",
"progress": "进度",
"unit": "批",
"latestMessage": "最新消息",
"historyMessages": "历史消息",
"pipelineMessages": "流水线消息",
"cancelButton": "中断",
"cancelTooltip": "中断流水线处理",
"cancelConfirmTitle": "确认中断流水线",
"cancelConfirmDescription": "此操作将中断正在进行的流水线处理。确定要继续吗?",
"cancelConfirmButton": "确认中断",
"cancelInProgress": "取消请求进行中...",
"pipelineNotRunning": "流水线未运行",
"cancelSuccess": "流水线中断请求已发送",
"cancelFailed": "中断流水线失败\n{{error}}",
"cancelNotBusy": "流水线未运行,无需中断",
"errors": {
"fetchFailed": "获取流水线状态失败\n{{error}}"
}
@@ -297,11 +305,24 @@
"errors": {
"duplicateName": "节点名称已存在",
"updateFailed": "更新节点失败",
"tryAgainLater": "请稍后重试"
"tryAgainLater": "请稍后重试",
"updateSuccessButMergeFailed": "属性已更新,但合并失败:{{error}}",
"mergeFailed": "合并失败:{{error}}"
},
"success": {
"entityUpdated": "节点更新成功",
"relationUpdated": "关系更新成功"
"relationUpdated": "关系更新成功",
"entityMerged": "节点合并成功"
},
"mergeOptionLabel": "重名时自动合并",
"mergeOptionDescription": "勾选后,重命名为已存在的名称会将当前节点自动合并过去,而不会报错。",
"mergeDialog": {
"title": "节点已合并",
"description": "\"{{source}}\" 已合并到 \"{{target}}\"。",
"refreshHint": "请刷新图谱以获取最新结构。",
"keepCurrentStart": "刷新并保持当前起始节点",
"useMergedStart": "刷新并以合并后的节点为起始节点",
"refreshing": "正在刷新图谱..."
},
"node": {
"title": "节点",

View File

@@ -114,10 +114,8 @@
},
"documentManager": {
"title": "文件管理",
"scanButton": "掃描",
"scanButton": "掃描/重試",
"scanTooltip": "掃描處理輸入目錄中的文件,同時重新處理所有失敗的文件",
"retryFailedButton": "重試",
"retryFailedTooltip": "重新處理所有失敗的文件",
"refreshTooltip": "重設文件清單",
"pipelineStatusButton": "管線狀態",
"pipelineStatusTooltip": "查看文件處理管線狀態",
@@ -157,17 +155,27 @@
"hideFileNameTooltip": "隱藏檔案名稱"
},
"pipelineStatus": {
"title": "pipeline 狀態",
"busy": "pipeline 忙碌",
"title": "流水線狀態",
"busy": "流水線忙碌",
"requestPending": "待處理請求",
"jobName": "工作名稱",
"cancellationRequested": "取消請求",
"jobName": "作業名稱",
"startTime": "開始時間",
"progress": "進度",
"unit": "梯次",
"latestMessage": "最新訊息",
"historyMessages": "歷史訊息",
"unit": "",
"pipelineMessages": "流水線消息",
"cancelButton": "中斷",
"cancelTooltip": "中斷流水線處理",
"cancelConfirmTitle": "確認中斷流水線",
"cancelConfirmDescription": "此操作將中斷正在進行的流水線處理。確定要繼續嗎?",
"cancelConfirmButton": "確認中斷",
"cancelInProgress": "取消請求進行中...",
"pipelineNotRunning": "流水線未運行",
"cancelSuccess": "流水線中斷請求已發送",
"cancelFailed": "中斷流水線失敗\n{{error}}",
"cancelNotBusy": "流水線未運行,無需中斷",
"errors": {
"fetchFailed": "取得pipeline 狀態失敗\n{{error}}"
"fetchFailed": "獲取流水線狀態失敗\n{{error}}"
}
}
},
@@ -297,11 +305,24 @@
"errors": {
"duplicateName": "節點名稱已存在",
"updateFailed": "更新節點失敗",
"tryAgainLater": "請稍後重試"
"tryAgainLater": "請稍後重試",
"updateSuccessButMergeFailed": "屬性已更新,但合併失敗:{{error}}",
"mergeFailed": "合併失敗:{{error}}"
},
"success": {
"entityUpdated": "節點更新成功",
"relationUpdated": "關係更新成功"
"relationUpdated": "關係更新成功",
"entityMerged": "節點合併成功"
},
"mergeOptionLabel": "遇到重名時自動合併",
"mergeOptionDescription": "勾選後,重新命名為既有名稱時會自動將當前節點合併過去,不再報錯。",
"mergeDialog": {
"title": "節點已合併",
"description": "\"{{source}}\" 已合併到 \"{{target}}\"。",
"refreshHint": "請重新整理圖譜以取得最新結構。",
"keepCurrentStart": "重新整理並保留目前的起始節點",
"useMergedStart": "重新整理並以合併後的節點為起始節點",
"refreshing": "正在重新整理圖譜..."
},
"node": {
"title": "節點",

View File

@@ -78,6 +78,10 @@ interface SettingsState {
currentTab: Tab
setCurrentTab: (tab: Tab) => void
// Search label dropdown refresh trigger (non-persistent, runtime only)
searchLabelDropdownRefreshTrigger: number
triggerSearchLabelDropdownRefresh: () => void
}
const useSettingsStoreBase = create<SettingsState>()(
@@ -229,7 +233,14 @@ const useSettingsStoreBase = create<SettingsState>()(
})
},
setUserPromptHistory: (history: string[]) => set({ userPromptHistory: history })
setUserPromptHistory: (history: string[]) => set({ userPromptHistory: history }),
// Search label dropdown refresh trigger (not persisted)
searchLabelDropdownRefreshTrigger: 0,
triggerSearchLabelDropdownRefresh: () =>
set((state) => ({
searchLabelDropdownRefreshTrigger: state.searchLabelDropdownRefreshTrigger + 1
}))
}),
{
name: 'settings-storage',

View File

@@ -40,7 +40,7 @@ export default defineConfig({
changeOrigin: true,
rewrite: endpoint === '/api' ?
(path) => path.replace(/^\/api/, '') :
endpoint === '/docs' || endpoint === '/redoc' || endpoint === '/openapi.json' ?
endpoint === '/docs' || endpoint === '/redoc' || endpoint === '/openapi.json' || endpoint === '/static' ?
(path) => path : undefined
}
])

View File

@@ -88,7 +88,7 @@ offline-docs = [
offline-storage = [
# Storage backend dependencies
"redis>=5.0.0,<7.0.0",
"redis>=5.0.0,<8.0.0",
"neo4j>=5.0.0,<7.0.0",
"pymilvus>=2.6.2,<3.0.0",
"pymongo>=4.0.0,<5.0.0",
@@ -134,7 +134,7 @@ include-package-data = true
version = {attr = "lightrag.__version__"}
[tool.setuptools.package-data]
lightrag = ["api/webui/**/*"]
lightrag = ["api/webui/**/*", "api/static/**/*"]
[tool.ruff]
target-version = "py310"

View File

@@ -13,4 +13,4 @@ neo4j>=5.0.0,<7.0.0
pymilvus>=2.6.2,<3.0.0
pymongo>=4.0.0,<5.0.0
qdrant-client>=1.7.0,<2.0.0
redis>=5.0.0,<7.0.0
redis>=5.0.0,<8.0.0

View File

@@ -26,6 +26,6 @@ pypdf2>=3.0.0
python-docx>=0.8.11,<2.0.0
python-pptx>=0.6.21,<2.0.0
qdrant-client>=1.7.0,<2.0.0
redis>=5.0.0,<7.0.0
redis>=5.0.0,<8.0.0
voyageai>=0.2.0,<1.0.0
zhipuai>=2.0.0,<3.0.0