diff --git a/README.md b/README.md index 9ba35c4b..52b9008c 100644 --- a/README.md +++ b/README.md @@ -933,7 +933,8 @@ maxclients 500 The `workspace` parameter ensures data isolation between different LightRAG instances. Once initialized, the `workspace` is immutable and cannot be changed.Here is how workspaces are implemented for different types of storage: - **For local file-based databases, data isolation is achieved through workspace subdirectories:** `JsonKVStorage`, `JsonDocStatusStorage`, `NetworkXStorage`, `NanoVectorDBStorage`, `FaissVectorDBStorage`. -- **For databases that store data in collections, it's done by adding a workspace prefix to the collection name:** `RedisKVStorage`, `RedisDocStatusStorage`, `MilvusVectorDBStorage`, `QdrantVectorDBStorage`, `MongoKVStorage`, `MongoDocStatusStorage`, `MongoVectorDBStorage`, `MongoGraphStorage`, `PGGraphStorage`. +- **For databases that store data in collections, it's done by adding a workspace prefix to the collection name:** `RedisKVStorage`, `RedisDocStatusStorage`, `MilvusVectorDBStorage`, `MongoKVStorage`, `MongoDocStatusStorage`, `MongoVectorDBStorage`, `MongoGraphStorage`, `PGGraphStorage`. +- **For Qdrant vector database, data isolation is achieved through payload-based partitioning (Qdrant's recommended multitenancy approach):** `QdrantVectorDBStorage` uses shared collections with payload filtering for unlimited workspace scalability. - **For relational databases, data isolation is achieved by adding a `workspace` field to the tables for logical data separation:** `PGKVStorage`, `PGVectorStorage`, `PGDocStatusStorage`. - **For the Neo4j graph database, logical data isolation is achieved through labels:** `Neo4JStorage` diff --git a/lightrag/api/README.md b/lightrag/api/README.md index aa24576e..a16d9023 100644 --- a/lightrag/api/README.md +++ b/lightrag/api/README.md @@ -165,7 +165,8 @@ Configuring an independent working directory and a dedicated `.env` configuratio The command-line `workspace` argument and the `WORKSPACE` environment variable in the `.env` file can both be used to specify the workspace name for the current instance, with the command-line argument having higher priority. Here is how workspaces are implemented for different types of storage: - **For local file-based databases, data isolation is achieved through workspace subdirectories:** `JsonKVStorage`, `JsonDocStatusStorage`, `NetworkXStorage`, `NanoVectorDBStorage`, `FaissVectorDBStorage`. -- **For databases that store data in collections, it's done by adding a workspace prefix to the collection name:** `RedisKVStorage`, `RedisDocStatusStorage`, `MilvusVectorDBStorage`, `QdrantVectorDBStorage`, `MongoKVStorage`, `MongoDocStatusStorage`, `MongoVectorDBStorage`, `MongoGraphStorage`, `PGGraphStorage`. +- **For databases that store data in collections, it's done by adding a workspace prefix to the collection name:** `RedisKVStorage`, `RedisDocStatusStorage`, `MilvusVectorDBStorage`, `MongoKVStorage`, `MongoDocStatusStorage`, `MongoVectorDBStorage`, `MongoGraphStorage`, `PGGraphStorage`. +- **For Qdrant vector database, data isolation is achieved through payload-based partitioning (Qdrant's recommended multitenancy approach):** `QdrantVectorDBStorage` uses shared collections with payload filtering for unlimited workspace scalability. - **For relational databases, data isolation is achieved by adding a `workspace` field to the tables for logical data separation:** `PGKVStorage`, `PGVectorStorage`, `PGDocStatusStorage`. - **For graph databases, logical data isolation is achieved through labels:** `Neo4JStorage`, `MemgraphStorage` diff --git a/lightrag/kg/qdrant_impl.py b/lightrag/kg/qdrant_impl.py index 0adfd279..60e8e66d 100644 --- a/lightrag/kg/qdrant_impl.py +++ b/lightrag/kg/qdrant_impl.py @@ -1,21 +1,29 @@ import asyncio -import os -from typing import Any, final, List -from dataclasses import dataclass -import numpy as np +import configparser import hashlib +import os import uuid -from ..utils import logger +from dataclasses import dataclass +from typing import Any, List, final + +import numpy as np +import pipmaster as pm + from ..base import BaseVectorStorage from ..kg.shared_storage import get_data_init_lock, get_storage_lock -import configparser -import pipmaster as pm +from ..utils import compute_mdhash_id, logger if not pm.is_installed("qdrant-client"): pm.install("qdrant-client") from qdrant_client import QdrantClient, models # type: ignore +DEFAULT_WORKSPACE = "_" +WORKSPACE_ID_FIELD = "workspace_id" +ENTITY_PREFIX = "ent-" +CREATED_AT_FIELD = "created_at" +ID_FIELD = "id" + config = configparser.ConfigParser() config.read("config.ini", "utf-8") @@ -48,6 +56,15 @@ def compute_mdhash_id_for_qdrant( raise ValueError("Invalid style. Choose from 'simple', 'hyphenated', or 'urn'.") +def workspace_filter_condition(workspace: str) -> models.FieldCondition: + """ + Create a workspace filter condition for Qdrant queries. + """ + return models.FieldCondition( + key=WORKSPACE_ID_FIELD, match=models.MatchValue(value=workspace) + ) + + @final @dataclass class QdrantVectorDBStorage(BaseVectorStorage): @@ -64,24 +81,19 @@ class QdrantVectorDBStorage(BaseVectorStorage): self.__post_init__() @staticmethod - def create_collection_if_not_exist( - client: QdrantClient, collection_name: str, **kwargs - ): - exists = False - if hasattr(client, "collection_exists"): - try: - exists = client.collection_exists(collection_name) - except Exception: - exists = False - else: - try: - client.get_collection(collection_name) - exists = True - except Exception: - exists = False + def setup_collection(client: QdrantClient, collection_name: str, **kwargs): + exists = client.collection_exists(collection_name) if not exists: client.create_collection(collection_name, **kwargs) + client.create_payload_index( + collection_name=collection_name, + field_name=WORKSPACE_ID_FIELD, + field_schema=models.KeywordIndexParams( + type=models.KeywordIndexType.KEYWORD, + is_tenant=True, # Optimize storage structure for tenant co-location + ), + ) def __post_init__(self): # Check for QDRANT_WORKSPACE environment variable first (higher priority) @@ -101,18 +113,14 @@ class QdrantVectorDBStorage(BaseVectorStorage): f"Using passed workspace parameter: '{effective_workspace}'" ) - # Build final_namespace with workspace prefix for data isolation - # Keep original namespace unchanged for type detection logic - if effective_workspace: - self.final_namespace = f"{effective_workspace}_{self.namespace}" - logger.debug( - f"Final namespace with workspace prefix: '{self.final_namespace}'" - ) - else: - # When workspace is empty, final_namespace equals original namespace - self.final_namespace = self.namespace - self.workspace = "_" - logger.debug(f"Final namespace (no workspace): '{self.final_namespace}'") + self.effective_workspace = effective_workspace or DEFAULT_WORKSPACE + + # Use a shared collection with payload-based partitioning (Qdrant's recommended approach) + # Ref: https://qdrant.tech/documentation/guides/multiple-partitions/ + self.final_namespace = self.namespace + logger.debug( + f"Using shared collection '{self.final_namespace}' with workspace '{self.effective_workspace}' for payload-based partitioning" + ) kwargs = self.global_config.get("vector_db_storage_cls_kwargs", {}) cosine_threshold = kwargs.get("cosine_better_than_threshold") @@ -149,8 +157,8 @@ class QdrantVectorDBStorage(BaseVectorStorage): f"[{self.workspace}] QdrantClient created successfully" ) - # Create collection if not exists - QdrantVectorDBStorage.create_collection_if_not_exist( + # Setup collection (create if not exists and configure indexes) + QdrantVectorDBStorage.setup_collection( self._client, self.final_namespace, vectors_config=models.VectorParams( @@ -158,6 +166,7 @@ class QdrantVectorDBStorage(BaseVectorStorage): distance=models.Distance.COSINE, ), ) + self._initialized = True logger.info( f"[{self.workspace}] Qdrant collection '{self.namespace}' initialized successfully" @@ -179,8 +188,9 @@ class QdrantVectorDBStorage(BaseVectorStorage): list_data = [ { - "id": k, - "created_at": current_time, + ID_FIELD: k, + WORKSPACE_ID_FIELD: self.effective_workspace, + CREATED_AT_FIELD: current_time, **{k1: v1 for k1, v1 in v.items() if k1 in self.meta_fields}, } for k, v in data.items() @@ -200,7 +210,9 @@ class QdrantVectorDBStorage(BaseVectorStorage): for i, d in enumerate(list_data): list_points.append( models.PointStruct( - id=compute_mdhash_id_for_qdrant(d["id"]), + id=compute_mdhash_id_for_qdrant( + d[ID_FIELD], prefix=self.effective_workspace + ), vector=embeddings[i], payload=d, ) @@ -222,21 +234,22 @@ class QdrantVectorDBStorage(BaseVectorStorage): ) # higher priority for query embedding = embedding_result[0] - results = self._client.search( + results = self._client.query_points( collection_name=self.final_namespace, - query_vector=embedding, + query=embedding, limit=top_k, with_payload=True, score_threshold=self.cosine_better_than_threshold, - ) - - # logger.debug(f"[{self.workspace}] query result: {results}") + query_filter=models.Filter( + must=[workspace_filter_condition(self.effective_workspace)] + ), + ).points return [ { **dp.payload, "distance": dp.score, - "created_at": dp.payload.get("created_at"), + CREATED_AT_FIELD: dp.payload.get(CREATED_AT_FIELD), } for dp in results ] @@ -252,14 +265,18 @@ class QdrantVectorDBStorage(BaseVectorStorage): ids: List of vector IDs to be deleted """ try: + if not ids: + return + # Convert regular ids to Qdrant compatible ids - qdrant_ids = [compute_mdhash_id_for_qdrant(id) for id in ids] - # Delete points from the collection + qdrant_ids = [ + compute_mdhash_id_for_qdrant(id, prefix=self.effective_workspace) + for id in ids + ] + # Delete points from the collection with workspace filtering self._client.delete( collection_name=self.final_namespace, - points_selector=models.PointIdsList( - points=qdrant_ids, - ), + points_selector=models.PointIdsList(points=qdrant_ids), wait=True, ) logger.debug( @@ -277,18 +294,16 @@ class QdrantVectorDBStorage(BaseVectorStorage): entity_name: Name of the entity to delete """ try: - # Generate the entity ID - entity_id = compute_mdhash_id_for_qdrant(entity_name, prefix="ent-") - # logger.debug( - # f"[{self.workspace}] Attempting to delete entity {entity_name} with ID {entity_id}" - # ) + # Generate the entity ID using the same function as used for storage + entity_id = compute_mdhash_id(entity_name, prefix=ENTITY_PREFIX) + qdrant_entity_id = compute_mdhash_id_for_qdrant( + entity_id, prefix=self.effective_workspace + ) - # Delete the entity point from the collection + # Delete the entity point by its Qdrant ID directly self._client.delete( collection_name=self.final_namespace, - points_selector=models.PointIdsList( - points=[entity_id], - ), + points_selector=models.PointIdsList(points=[qdrant_entity_id]), wait=True, ) logger.debug( @@ -304,10 +319,11 @@ class QdrantVectorDBStorage(BaseVectorStorage): entity_name: Name of the entity whose relations should be deleted """ try: - # Find relations where the entity is either source or target + # Find relations where the entity is either source or target, with workspace filtering results = self._client.scroll( collection_name=self.final_namespace, scroll_filter=models.Filter( + must=[workspace_filter_condition(self.effective_workspace)], should=[ models.FieldCondition( key="src_id", match=models.MatchValue(value=entity_name) @@ -315,7 +331,7 @@ class QdrantVectorDBStorage(BaseVectorStorage): models.FieldCondition( key="tgt_id", match=models.MatchValue(value=entity_name) ), - ] + ], ), with_payload=True, limit=1000, # Adjust as needed for your use case @@ -326,12 +342,11 @@ class QdrantVectorDBStorage(BaseVectorStorage): ids_to_delete = [point.id for point in relation_points] if ids_to_delete: - # Delete the relations + # Delete the relations with workspace filtering + assert isinstance(self._client, QdrantClient) self._client.delete( collection_name=self.final_namespace, - points_selector=models.PointIdsList( - points=ids_to_delete, - ), + points_selector=models.PointIdsList(points=ids_to_delete), wait=True, ) logger.debug( @@ -357,9 +372,11 @@ class QdrantVectorDBStorage(BaseVectorStorage): """ try: # Convert to Qdrant compatible ID - qdrant_id = compute_mdhash_id_for_qdrant(id) + qdrant_id = compute_mdhash_id_for_qdrant( + id, prefix=self.effective_workspace + ) - # Retrieve the point by ID + # Retrieve the point by ID with workspace filtering result = self._client.retrieve( collection_name=self.final_namespace, ids=[qdrant_id], @@ -369,10 +386,9 @@ class QdrantVectorDBStorage(BaseVectorStorage): if not result: return None - # Ensure the result contains created_at field payload = result[0].payload - if "created_at" not in payload: - payload["created_at"] = None + if CREATED_AT_FIELD not in payload: + payload[CREATED_AT_FIELD] = None return payload except Exception as e: @@ -395,7 +411,10 @@ class QdrantVectorDBStorage(BaseVectorStorage): try: # Convert to Qdrant compatible IDs - qdrant_ids = [compute_mdhash_id_for_qdrant(id) for id in ids] + qdrant_ids = [ + compute_mdhash_id_for_qdrant(id, prefix=self.effective_workspace) + for id in ids + ] # Retrieve the points by IDs results = self._client.retrieve( @@ -410,14 +429,14 @@ class QdrantVectorDBStorage(BaseVectorStorage): for point in results: payload = dict(point.payload or {}) - if "created_at" not in payload: - payload["created_at"] = None + if CREATED_AT_FIELD not in payload: + payload[CREATED_AT_FIELD] = None qdrant_point_id = str(point.id) if point.id is not None else "" if qdrant_point_id: payload_by_qdrant_id[qdrant_point_id] = payload - original_id = payload.get("id") + original_id = payload.get(ID_FIELD) if original_id is not None: payload_by_original_id[str(original_id)] = payload @@ -450,7 +469,10 @@ class QdrantVectorDBStorage(BaseVectorStorage): try: # Convert to Qdrant compatible IDs - qdrant_ids = [compute_mdhash_id_for_qdrant(id) for id in ids] + qdrant_ids = [ + compute_mdhash_id_for_qdrant(id, prefix=self.effective_workspace) + for id in ids + ] # Retrieve the points by IDs with vectors results = self._client.retrieve( @@ -464,7 +486,7 @@ class QdrantVectorDBStorage(BaseVectorStorage): for point in results: if point and point.vector is not None and point.payload: # Get original ID from payload - original_id = point.payload.get("id") + original_id = point.payload.get(ID_FIELD) if original_id: # Convert numpy array to list if needed vector_data = point.vector @@ -482,7 +504,7 @@ class QdrantVectorDBStorage(BaseVectorStorage): async def drop(self) -> dict[str, str]: """Drop all vector data from storage and clean up resources - This method will delete all data from the Qdrant collection. + This method will delete all data for the current workspace from the Qdrant collection. Returns: dict[str, str]: Operation status and message @@ -491,39 +513,23 @@ class QdrantVectorDBStorage(BaseVectorStorage): """ async with get_storage_lock(): try: - # Delete the collection and recreate it - exists = False - if hasattr(self._client, "collection_exists"): - try: - exists = self._client.collection_exists(self.final_namespace) - except Exception: - exists = False - else: - try: - self._client.get_collection(self.final_namespace) - exists = True - except Exception: - exists = False - - if exists: - self._client.delete_collection(self.final_namespace) - - # Recreate the collection - QdrantVectorDBStorage.create_collection_if_not_exist( - self._client, - self.final_namespace, - vectors_config=models.VectorParams( - size=self.embedding_func.embedding_dim, - distance=models.Distance.COSINE, + # Delete all points for the current workspace + self._client.delete( + collection_name=self.final_namespace, + points_selector=models.FilterSelector( + filter=models.Filter( + must=[workspace_filter_condition(self.effective_workspace)] + ) ), + wait=True, ) logger.info( - f"[{self.workspace}] Process {os.getpid()} drop Qdrant collection {self.namespace}" + f"[{self.workspace}] Process {os.getpid()} dropped workspace data from Qdrant collection {self.namespace}" ) return {"status": "success", "message": "data dropped"} except Exception as e: logger.error( - f"[{self.workspace}] Error dropping Qdrant collection {self.namespace}: {e}" + f"[{self.workspace}] Error dropping workspace data from Qdrant collection {self.namespace}: {e}" ) return {"status": "error", "message": str(e)} diff --git a/uv.lock b/uv.lock index ebc66f5e..b99e8a05 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 2 +revision = 3 requires-python = ">=3.10" resolution-markers = [ "python_full_version >= '3.13' and sys_platform == 'darwin'", @@ -1590,11 +1590,11 @@ requires-dist = [ { name = "numpy" }, { name = "numpy", marker = "extra == 'api'" }, { name = "ollama", marker = "extra == 'offline-llm'", specifier = ">=0.1.0,<1.0.0" }, - { name = "openai", marker = "extra == 'api'", specifier = ">=1.0.0,<2.0.0" }, - { name = "openai", marker = "extra == 'offline-llm'", specifier = ">=1.0.0,<2.0.0" }, + { name = "openai", marker = "extra == 'api'", specifier = ">=1.0.0,<3.0.0" }, + { name = "openai", marker = "extra == 'offline-llm'", specifier = ">=1.0.0,<3.0.0" }, { name = "openpyxl", marker = "extra == 'offline-docs'", specifier = ">=3.0.0,<4.0.0" }, - { name = "pandas", specifier = ">=2.0.0,<2.3.0" }, - { name = "pandas", marker = "extra == 'api'", specifier = ">=2.0.0,<2.3.0" }, + { name = "pandas", specifier = ">=2.0.0,<2.4.0" }, + { name = "pandas", marker = "extra == 'api'", specifier = ">=2.0.0,<2.4.0" }, { name = "passlib", extras = ["bcrypt"], marker = "extra == 'api'" }, { name = "pipmaster" }, { name = "pipmaster", marker = "extra == 'api'" },