refactor: Qdrant Multi-tenancy (Include staged)
Signed-off-by: Anush008 <anushshetty90@gmail.com>
This commit is contained in:
@@ -933,7 +933,8 @@ maxclients 500
|
||||
The `workspace` parameter ensures data isolation between different LightRAG instances. Once initialized, the `workspace` is immutable and cannot be changed.Here is how workspaces are implemented for different types of storage:
|
||||
|
||||
- **For local file-based databases, data isolation is achieved through workspace subdirectories:** `JsonKVStorage`, `JsonDocStatusStorage`, `NetworkXStorage`, `NanoVectorDBStorage`, `FaissVectorDBStorage`.
|
||||
- **For databases that store data in collections, it's done by adding a workspace prefix to the collection name:** `RedisKVStorage`, `RedisDocStatusStorage`, `MilvusVectorDBStorage`, `QdrantVectorDBStorage`, `MongoKVStorage`, `MongoDocStatusStorage`, `MongoVectorDBStorage`, `MongoGraphStorage`, `PGGraphStorage`.
|
||||
- **For databases that store data in collections, it's done by adding a workspace prefix to the collection name:** `RedisKVStorage`, `RedisDocStatusStorage`, `MilvusVectorDBStorage`, `MongoKVStorage`, `MongoDocStatusStorage`, `MongoVectorDBStorage`, `MongoGraphStorage`, `PGGraphStorage`.
|
||||
- **For Qdrant vector database, data isolation is achieved through payload-based partitioning (Qdrant's recommended multitenancy approach):** `QdrantVectorDBStorage` uses shared collections with payload filtering for unlimited workspace scalability.
|
||||
- **For relational databases, data isolation is achieved by adding a `workspace` field to the tables for logical data separation:** `PGKVStorage`, `PGVectorStorage`, `PGDocStatusStorage`.
|
||||
- **For the Neo4j graph database, logical data isolation is achieved through labels:** `Neo4JStorage`
|
||||
|
||||
|
||||
@@ -165,7 +165,8 @@ Configuring an independent working directory and a dedicated `.env` configuratio
|
||||
The command-line `workspace` argument and the `WORKSPACE` environment variable in the `.env` file can both be used to specify the workspace name for the current instance, with the command-line argument having higher priority. Here is how workspaces are implemented for different types of storage:
|
||||
|
||||
- **For local file-based databases, data isolation is achieved through workspace subdirectories:** `JsonKVStorage`, `JsonDocStatusStorage`, `NetworkXStorage`, `NanoVectorDBStorage`, `FaissVectorDBStorage`.
|
||||
- **For databases that store data in collections, it's done by adding a workspace prefix to the collection name:** `RedisKVStorage`, `RedisDocStatusStorage`, `MilvusVectorDBStorage`, `QdrantVectorDBStorage`, `MongoKVStorage`, `MongoDocStatusStorage`, `MongoVectorDBStorage`, `MongoGraphStorage`, `PGGraphStorage`.
|
||||
- **For databases that store data in collections, it's done by adding a workspace prefix to the collection name:** `RedisKVStorage`, `RedisDocStatusStorage`, `MilvusVectorDBStorage`, `MongoKVStorage`, `MongoDocStatusStorage`, `MongoVectorDBStorage`, `MongoGraphStorage`, `PGGraphStorage`.
|
||||
- **For Qdrant vector database, data isolation is achieved through payload-based partitioning (Qdrant's recommended multitenancy approach):** `QdrantVectorDBStorage` uses shared collections with payload filtering for unlimited workspace scalability.
|
||||
- **For relational databases, data isolation is achieved by adding a `workspace` field to the tables for logical data separation:** `PGKVStorage`, `PGVectorStorage`, `PGDocStatusStorage`.
|
||||
- **For graph databases, logical data isolation is achieved through labels:** `Neo4JStorage`, `MemgraphStorage`
|
||||
|
||||
|
||||
@@ -1,21 +1,29 @@
|
||||
import asyncio
|
||||
import os
|
||||
from typing import Any, final, List
|
||||
from dataclasses import dataclass
|
||||
import numpy as np
|
||||
import configparser
|
||||
import hashlib
|
||||
import os
|
||||
import uuid
|
||||
from ..utils import logger
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, List, final
|
||||
|
||||
import numpy as np
|
||||
import pipmaster as pm
|
||||
|
||||
from ..base import BaseVectorStorage
|
||||
from ..kg.shared_storage import get_data_init_lock, get_storage_lock
|
||||
import configparser
|
||||
import pipmaster as pm
|
||||
from ..utils import compute_mdhash_id, logger
|
||||
|
||||
if not pm.is_installed("qdrant-client"):
|
||||
pm.install("qdrant-client")
|
||||
|
||||
from qdrant_client import QdrantClient, models # type: ignore
|
||||
|
||||
DEFAULT_WORKSPACE = "_"
|
||||
WORKSPACE_ID_FIELD = "workspace_id"
|
||||
ENTITY_PREFIX = "ent-"
|
||||
CREATED_AT_FIELD = "created_at"
|
||||
ID_FIELD = "id"
|
||||
|
||||
config = configparser.ConfigParser()
|
||||
config.read("config.ini", "utf-8")
|
||||
|
||||
@@ -48,6 +56,15 @@ def compute_mdhash_id_for_qdrant(
|
||||
raise ValueError("Invalid style. Choose from 'simple', 'hyphenated', or 'urn'.")
|
||||
|
||||
|
||||
def workspace_filter_condition(workspace: str) -> models.FieldCondition:
|
||||
"""
|
||||
Create a workspace filter condition for Qdrant queries.
|
||||
"""
|
||||
return models.FieldCondition(
|
||||
key=WORKSPACE_ID_FIELD, match=models.MatchValue(value=workspace)
|
||||
)
|
||||
|
||||
|
||||
@final
|
||||
@dataclass
|
||||
class QdrantVectorDBStorage(BaseVectorStorage):
|
||||
@@ -64,24 +81,19 @@ class QdrantVectorDBStorage(BaseVectorStorage):
|
||||
self.__post_init__()
|
||||
|
||||
@staticmethod
|
||||
def create_collection_if_not_exist(
|
||||
client: QdrantClient, collection_name: str, **kwargs
|
||||
):
|
||||
exists = False
|
||||
if hasattr(client, "collection_exists"):
|
||||
try:
|
||||
exists = client.collection_exists(collection_name)
|
||||
except Exception:
|
||||
exists = False
|
||||
else:
|
||||
try:
|
||||
client.get_collection(collection_name)
|
||||
exists = True
|
||||
except Exception:
|
||||
exists = False
|
||||
def setup_collection(client: QdrantClient, collection_name: str, **kwargs):
|
||||
exists = client.collection_exists(collection_name)
|
||||
|
||||
if not exists:
|
||||
client.create_collection(collection_name, **kwargs)
|
||||
client.create_payload_index(
|
||||
collection_name=collection_name,
|
||||
field_name=WORKSPACE_ID_FIELD,
|
||||
field_schema=models.KeywordIndexParams(
|
||||
type=models.KeywordIndexType.KEYWORD,
|
||||
is_tenant=True, # Optimize storage structure for tenant co-location
|
||||
),
|
||||
)
|
||||
|
||||
def __post_init__(self):
|
||||
# Check for QDRANT_WORKSPACE environment variable first (higher priority)
|
||||
@@ -101,18 +113,14 @@ class QdrantVectorDBStorage(BaseVectorStorage):
|
||||
f"Using passed workspace parameter: '{effective_workspace}'"
|
||||
)
|
||||
|
||||
# Build final_namespace with workspace prefix for data isolation
|
||||
# Keep original namespace unchanged for type detection logic
|
||||
if effective_workspace:
|
||||
self.final_namespace = f"{effective_workspace}_{self.namespace}"
|
||||
logger.debug(
|
||||
f"Final namespace with workspace prefix: '{self.final_namespace}'"
|
||||
)
|
||||
else:
|
||||
# When workspace is empty, final_namespace equals original namespace
|
||||
self.final_namespace = self.namespace
|
||||
self.workspace = "_"
|
||||
logger.debug(f"Final namespace (no workspace): '{self.final_namespace}'")
|
||||
self.effective_workspace = effective_workspace or DEFAULT_WORKSPACE
|
||||
|
||||
# Use a shared collection with payload-based partitioning (Qdrant's recommended approach)
|
||||
# Ref: https://qdrant.tech/documentation/guides/multiple-partitions/
|
||||
self.final_namespace = self.namespace
|
||||
logger.debug(
|
||||
f"Using shared collection '{self.final_namespace}' with workspace '{self.effective_workspace}' for payload-based partitioning"
|
||||
)
|
||||
|
||||
kwargs = self.global_config.get("vector_db_storage_cls_kwargs", {})
|
||||
cosine_threshold = kwargs.get("cosine_better_than_threshold")
|
||||
@@ -149,8 +157,8 @@ class QdrantVectorDBStorage(BaseVectorStorage):
|
||||
f"[{self.workspace}] QdrantClient created successfully"
|
||||
)
|
||||
|
||||
# Create collection if not exists
|
||||
QdrantVectorDBStorage.create_collection_if_not_exist(
|
||||
# Setup collection (create if not exists and configure indexes)
|
||||
QdrantVectorDBStorage.setup_collection(
|
||||
self._client,
|
||||
self.final_namespace,
|
||||
vectors_config=models.VectorParams(
|
||||
@@ -158,6 +166,7 @@ class QdrantVectorDBStorage(BaseVectorStorage):
|
||||
distance=models.Distance.COSINE,
|
||||
),
|
||||
)
|
||||
|
||||
self._initialized = True
|
||||
logger.info(
|
||||
f"[{self.workspace}] Qdrant collection '{self.namespace}' initialized successfully"
|
||||
@@ -179,8 +188,9 @@ class QdrantVectorDBStorage(BaseVectorStorage):
|
||||
|
||||
list_data = [
|
||||
{
|
||||
"id": k,
|
||||
"created_at": current_time,
|
||||
ID_FIELD: k,
|
||||
WORKSPACE_ID_FIELD: self.effective_workspace,
|
||||
CREATED_AT_FIELD: current_time,
|
||||
**{k1: v1 for k1, v1 in v.items() if k1 in self.meta_fields},
|
||||
}
|
||||
for k, v in data.items()
|
||||
@@ -200,7 +210,9 @@ class QdrantVectorDBStorage(BaseVectorStorage):
|
||||
for i, d in enumerate(list_data):
|
||||
list_points.append(
|
||||
models.PointStruct(
|
||||
id=compute_mdhash_id_for_qdrant(d["id"]),
|
||||
id=compute_mdhash_id_for_qdrant(
|
||||
d[ID_FIELD], prefix=self.effective_workspace
|
||||
),
|
||||
vector=embeddings[i],
|
||||
payload=d,
|
||||
)
|
||||
@@ -222,21 +234,22 @@ class QdrantVectorDBStorage(BaseVectorStorage):
|
||||
) # higher priority for query
|
||||
embedding = embedding_result[0]
|
||||
|
||||
results = self._client.search(
|
||||
results = self._client.query_points(
|
||||
collection_name=self.final_namespace,
|
||||
query_vector=embedding,
|
||||
query=embedding,
|
||||
limit=top_k,
|
||||
with_payload=True,
|
||||
score_threshold=self.cosine_better_than_threshold,
|
||||
)
|
||||
|
||||
# logger.debug(f"[{self.workspace}] query result: {results}")
|
||||
query_filter=models.Filter(
|
||||
must=[workspace_filter_condition(self.effective_workspace)]
|
||||
),
|
||||
).points
|
||||
|
||||
return [
|
||||
{
|
||||
**dp.payload,
|
||||
"distance": dp.score,
|
||||
"created_at": dp.payload.get("created_at"),
|
||||
CREATED_AT_FIELD: dp.payload.get(CREATED_AT_FIELD),
|
||||
}
|
||||
for dp in results
|
||||
]
|
||||
@@ -252,14 +265,18 @@ class QdrantVectorDBStorage(BaseVectorStorage):
|
||||
ids: List of vector IDs to be deleted
|
||||
"""
|
||||
try:
|
||||
if not ids:
|
||||
return
|
||||
|
||||
# Convert regular ids to Qdrant compatible ids
|
||||
qdrant_ids = [compute_mdhash_id_for_qdrant(id) for id in ids]
|
||||
# Delete points from the collection
|
||||
qdrant_ids = [
|
||||
compute_mdhash_id_for_qdrant(id, prefix=self.effective_workspace)
|
||||
for id in ids
|
||||
]
|
||||
# Delete points from the collection with workspace filtering
|
||||
self._client.delete(
|
||||
collection_name=self.final_namespace,
|
||||
points_selector=models.PointIdsList(
|
||||
points=qdrant_ids,
|
||||
),
|
||||
points_selector=models.PointIdsList(points=qdrant_ids),
|
||||
wait=True,
|
||||
)
|
||||
logger.debug(
|
||||
@@ -277,18 +294,16 @@ class QdrantVectorDBStorage(BaseVectorStorage):
|
||||
entity_name: Name of the entity to delete
|
||||
"""
|
||||
try:
|
||||
# Generate the entity ID
|
||||
entity_id = compute_mdhash_id_for_qdrant(entity_name, prefix="ent-")
|
||||
# logger.debug(
|
||||
# f"[{self.workspace}] Attempting to delete entity {entity_name} with ID {entity_id}"
|
||||
# )
|
||||
# Generate the entity ID using the same function as used for storage
|
||||
entity_id = compute_mdhash_id(entity_name, prefix=ENTITY_PREFIX)
|
||||
qdrant_entity_id = compute_mdhash_id_for_qdrant(
|
||||
entity_id, prefix=self.effective_workspace
|
||||
)
|
||||
|
||||
# Delete the entity point from the collection
|
||||
# Delete the entity point by its Qdrant ID directly
|
||||
self._client.delete(
|
||||
collection_name=self.final_namespace,
|
||||
points_selector=models.PointIdsList(
|
||||
points=[entity_id],
|
||||
),
|
||||
points_selector=models.PointIdsList(points=[qdrant_entity_id]),
|
||||
wait=True,
|
||||
)
|
||||
logger.debug(
|
||||
@@ -304,10 +319,11 @@ class QdrantVectorDBStorage(BaseVectorStorage):
|
||||
entity_name: Name of the entity whose relations should be deleted
|
||||
"""
|
||||
try:
|
||||
# Find relations where the entity is either source or target
|
||||
# Find relations where the entity is either source or target, with workspace filtering
|
||||
results = self._client.scroll(
|
||||
collection_name=self.final_namespace,
|
||||
scroll_filter=models.Filter(
|
||||
must=[workspace_filter_condition(self.effective_workspace)],
|
||||
should=[
|
||||
models.FieldCondition(
|
||||
key="src_id", match=models.MatchValue(value=entity_name)
|
||||
@@ -315,7 +331,7 @@ class QdrantVectorDBStorage(BaseVectorStorage):
|
||||
models.FieldCondition(
|
||||
key="tgt_id", match=models.MatchValue(value=entity_name)
|
||||
),
|
||||
]
|
||||
],
|
||||
),
|
||||
with_payload=True,
|
||||
limit=1000, # Adjust as needed for your use case
|
||||
@@ -326,12 +342,11 @@ class QdrantVectorDBStorage(BaseVectorStorage):
|
||||
ids_to_delete = [point.id for point in relation_points]
|
||||
|
||||
if ids_to_delete:
|
||||
# Delete the relations
|
||||
# Delete the relations with workspace filtering
|
||||
assert isinstance(self._client, QdrantClient)
|
||||
self._client.delete(
|
||||
collection_name=self.final_namespace,
|
||||
points_selector=models.PointIdsList(
|
||||
points=ids_to_delete,
|
||||
),
|
||||
points_selector=models.PointIdsList(points=ids_to_delete),
|
||||
wait=True,
|
||||
)
|
||||
logger.debug(
|
||||
@@ -357,9 +372,11 @@ class QdrantVectorDBStorage(BaseVectorStorage):
|
||||
"""
|
||||
try:
|
||||
# Convert to Qdrant compatible ID
|
||||
qdrant_id = compute_mdhash_id_for_qdrant(id)
|
||||
qdrant_id = compute_mdhash_id_for_qdrant(
|
||||
id, prefix=self.effective_workspace
|
||||
)
|
||||
|
||||
# Retrieve the point by ID
|
||||
# Retrieve the point by ID with workspace filtering
|
||||
result = self._client.retrieve(
|
||||
collection_name=self.final_namespace,
|
||||
ids=[qdrant_id],
|
||||
@@ -369,10 +386,9 @@ class QdrantVectorDBStorage(BaseVectorStorage):
|
||||
if not result:
|
||||
return None
|
||||
|
||||
# Ensure the result contains created_at field
|
||||
payload = result[0].payload
|
||||
if "created_at" not in payload:
|
||||
payload["created_at"] = None
|
||||
if CREATED_AT_FIELD not in payload:
|
||||
payload[CREATED_AT_FIELD] = None
|
||||
|
||||
return payload
|
||||
except Exception as e:
|
||||
@@ -395,7 +411,10 @@ class QdrantVectorDBStorage(BaseVectorStorage):
|
||||
|
||||
try:
|
||||
# Convert to Qdrant compatible IDs
|
||||
qdrant_ids = [compute_mdhash_id_for_qdrant(id) for id in ids]
|
||||
qdrant_ids = [
|
||||
compute_mdhash_id_for_qdrant(id, prefix=self.effective_workspace)
|
||||
for id in ids
|
||||
]
|
||||
|
||||
# Retrieve the points by IDs
|
||||
results = self._client.retrieve(
|
||||
@@ -410,14 +429,14 @@ class QdrantVectorDBStorage(BaseVectorStorage):
|
||||
|
||||
for point in results:
|
||||
payload = dict(point.payload or {})
|
||||
if "created_at" not in payload:
|
||||
payload["created_at"] = None
|
||||
if CREATED_AT_FIELD not in payload:
|
||||
payload[CREATED_AT_FIELD] = None
|
||||
|
||||
qdrant_point_id = str(point.id) if point.id is not None else ""
|
||||
if qdrant_point_id:
|
||||
payload_by_qdrant_id[qdrant_point_id] = payload
|
||||
|
||||
original_id = payload.get("id")
|
||||
original_id = payload.get(ID_FIELD)
|
||||
if original_id is not None:
|
||||
payload_by_original_id[str(original_id)] = payload
|
||||
|
||||
@@ -450,7 +469,10 @@ class QdrantVectorDBStorage(BaseVectorStorage):
|
||||
|
||||
try:
|
||||
# Convert to Qdrant compatible IDs
|
||||
qdrant_ids = [compute_mdhash_id_for_qdrant(id) for id in ids]
|
||||
qdrant_ids = [
|
||||
compute_mdhash_id_for_qdrant(id, prefix=self.effective_workspace)
|
||||
for id in ids
|
||||
]
|
||||
|
||||
# Retrieve the points by IDs with vectors
|
||||
results = self._client.retrieve(
|
||||
@@ -464,7 +486,7 @@ class QdrantVectorDBStorage(BaseVectorStorage):
|
||||
for point in results:
|
||||
if point and point.vector is not None and point.payload:
|
||||
# Get original ID from payload
|
||||
original_id = point.payload.get("id")
|
||||
original_id = point.payload.get(ID_FIELD)
|
||||
if original_id:
|
||||
# Convert numpy array to list if needed
|
||||
vector_data = point.vector
|
||||
@@ -482,7 +504,7 @@ class QdrantVectorDBStorage(BaseVectorStorage):
|
||||
async def drop(self) -> dict[str, str]:
|
||||
"""Drop all vector data from storage and clean up resources
|
||||
|
||||
This method will delete all data from the Qdrant collection.
|
||||
This method will delete all data for the current workspace from the Qdrant collection.
|
||||
|
||||
Returns:
|
||||
dict[str, str]: Operation status and message
|
||||
@@ -491,39 +513,23 @@ class QdrantVectorDBStorage(BaseVectorStorage):
|
||||
"""
|
||||
async with get_storage_lock():
|
||||
try:
|
||||
# Delete the collection and recreate it
|
||||
exists = False
|
||||
if hasattr(self._client, "collection_exists"):
|
||||
try:
|
||||
exists = self._client.collection_exists(self.final_namespace)
|
||||
except Exception:
|
||||
exists = False
|
||||
else:
|
||||
try:
|
||||
self._client.get_collection(self.final_namespace)
|
||||
exists = True
|
||||
except Exception:
|
||||
exists = False
|
||||
|
||||
if exists:
|
||||
self._client.delete_collection(self.final_namespace)
|
||||
|
||||
# Recreate the collection
|
||||
QdrantVectorDBStorage.create_collection_if_not_exist(
|
||||
self._client,
|
||||
self.final_namespace,
|
||||
vectors_config=models.VectorParams(
|
||||
size=self.embedding_func.embedding_dim,
|
||||
distance=models.Distance.COSINE,
|
||||
# Delete all points for the current workspace
|
||||
self._client.delete(
|
||||
collection_name=self.final_namespace,
|
||||
points_selector=models.FilterSelector(
|
||||
filter=models.Filter(
|
||||
must=[workspace_filter_condition(self.effective_workspace)]
|
||||
)
|
||||
),
|
||||
wait=True,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"[{self.workspace}] Process {os.getpid()} drop Qdrant collection {self.namespace}"
|
||||
f"[{self.workspace}] Process {os.getpid()} dropped workspace data from Qdrant collection {self.namespace}"
|
||||
)
|
||||
return {"status": "success", "message": "data dropped"}
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"[{self.workspace}] Error dropping Qdrant collection {self.namespace}: {e}"
|
||||
f"[{self.workspace}] Error dropping workspace data from Qdrant collection {self.namespace}: {e}"
|
||||
)
|
||||
return {"status": "error", "message": str(e)}
|
||||
|
||||
10
uv.lock
generated
10
uv.lock
generated
@@ -1,5 +1,5 @@
|
||||
version = 1
|
||||
revision = 2
|
||||
revision = 3
|
||||
requires-python = ">=3.10"
|
||||
resolution-markers = [
|
||||
"python_full_version >= '3.13' and sys_platform == 'darwin'",
|
||||
@@ -1590,11 +1590,11 @@ requires-dist = [
|
||||
{ name = "numpy" },
|
||||
{ name = "numpy", marker = "extra == 'api'" },
|
||||
{ name = "ollama", marker = "extra == 'offline-llm'", specifier = ">=0.1.0,<1.0.0" },
|
||||
{ name = "openai", marker = "extra == 'api'", specifier = ">=1.0.0,<2.0.0" },
|
||||
{ name = "openai", marker = "extra == 'offline-llm'", specifier = ">=1.0.0,<2.0.0" },
|
||||
{ name = "openai", marker = "extra == 'api'", specifier = ">=1.0.0,<3.0.0" },
|
||||
{ name = "openai", marker = "extra == 'offline-llm'", specifier = ">=1.0.0,<3.0.0" },
|
||||
{ name = "openpyxl", marker = "extra == 'offline-docs'", specifier = ">=3.0.0,<4.0.0" },
|
||||
{ name = "pandas", specifier = ">=2.0.0,<2.3.0" },
|
||||
{ name = "pandas", marker = "extra == 'api'", specifier = ">=2.0.0,<2.3.0" },
|
||||
{ name = "pandas", specifier = ">=2.0.0,<2.4.0" },
|
||||
{ name = "pandas", marker = "extra == 'api'", specifier = ">=2.0.0,<2.4.0" },
|
||||
{ name = "passlib", extras = ["bcrypt"], marker = "extra == 'api'" },
|
||||
{ name = "pipmaster" },
|
||||
{ name = "pipmaster", marker = "extra == 'api'" },
|
||||
|
||||
Reference in New Issue
Block a user