Refactor Qdrant setup and migration logic

- Validate dimensions before migration
- Require namespace and workspace args
- Raise error on vector size mismatch
- Simplify collection initialization flow
- Update tests for strict checks
This commit is contained in:
yangdx
2025-12-19 10:45:18 +08:00
parent 6a9e368382
commit bf618fc976
4 changed files with 223 additions and 203 deletions

View File

@@ -127,45 +127,41 @@ class QdrantVectorDBStorage(BaseVectorStorage):
def setup_collection(
client: QdrantClient,
collection_name: str,
namespace: str = None,
workspace: str = None,
**kwargs,
namespace: str,
workspace: str,
vectors_config: models.VectorParams,
hnsw_config: models.HnswConfigDiff,
):
"""
Setup Qdrant collection with migration support from legacy collections.
Ensure final collection is created with workspace isolation index.
This method now supports backward compatibility by automatically detecting
legacy collections created by older versions of LightRAG using multiple
naming patterns.
Behavior:
- Case 1: New collection is the same as legacy collection - show debug message and continue
- Case 2: Only new collection exists - - show debug message and continue
- Case 3: Both new and legacy collections exist with different names - show warning and continue
- Case 4: Only legacy exists - migrate data from legacy collection to new collection
Raise DataMigrationError if legacy collection has different dimension than new collection
Ensure final collection has workspace isolation index.
Check vector dimension compatibility before new collection creation.
Drop legacy collection if it exists and is empty.
Only migrate data from legacy collection to new collection when new collection first created and legacy collection is not empty.
Args:
client: QdrantClient instance
collection_name: Name of the final collection
namespace: Base namespace (e.g., "chunks", "entities")
workspace: Workspace identifier for data isolation
**kwargs: Additional arguments for collection creation (vectors_config, hnsw_config, etc.)
Raises:
DataMigrationError: If migration fails or index creation fails
vectors_config: Vector configuration parameters for the collection
hnsw_config: HNSW index configuration diff for the collection
"""
new_collection_exists = client.collection_exists(collection_name)
if not namespace or not workspace:
raise ValueError("namespace and workspace must be provided")
# Try to find legacy collection with backward compatibility
new_collection_exists = client.collection_exists(collection_name)
legacy_collection = (
_find_legacy_collection(client, namespace, workspace) if namespace else None
)
if not new_collection_exists:
logger.info(f"Qdrant: Creating new collection '{collection_name}'")
client.create_collection(collection_name, **kwargs)
# Case 1: Only new collection exists or new collection is the same as legacy collection
# No data migration needed, and ensuring index is created then return
if (new_collection_exists and not legacy_collection) or (
collection_name == legacy_collection
):
# create_payload_index return without error if index already exists
client.create_payload_index(
collection_name=collection_name,
field_name=WORKSPACE_ID_FIELD,
@@ -174,163 +170,154 @@ class QdrantVectorDBStorage(BaseVectorStorage):
is_tenant=True,
),
)
logger.info(f"Qdrant: Collection '{collection_name}' created successfully")
# Case 1: New collection is the same as legacy collection - show debug message and continue
if collection_name == legacy_collection:
logger.debug(
"Qdrant: legacy collection '%s' is the same as new collection '%s'.",
legacy_collection,
collection_name,
)
return
# Case 2: Only new collection exists - silently return
if new_collection_exists and not legacy_collection:
logger.debug(
"Qdrant: Only new collection '%s' exists. No migration needed.",
collection_name,
)
return
# Case 3: Both new and legacy collections exist with different names - show warning and continue
# Only delete legacy if it's empty (safe cleanup) and it's not the same as new collection
if new_collection_exists and legacy_collection:
try:
# Check if legacy collection is empty
legacy_count = None
if not new_collection_exists:
# Check vector dimension compatibility before creating new collection
if legacy_collection:
legacy_count = client.count(
collection_name=legacy_collection, exact=True
).count
if legacy_count > 0:
legacy_info = client.get_collection(legacy_collection)
legacy_dim = legacy_info.config.params.vectors.size
if legacy_count == 0:
# Legacy collection is empty, safe to delete without data loss
logger.info(
f"Qdrant: Legacy collection '{legacy_collection}' is empty. Deleting..."
)
client.delete_collection(collection_name=legacy_collection)
logger.info(
f"Qdrant: Legacy collection '{legacy_collection}' deleted successfully"
)
else:
# Legacy collection still has data - don't risk deleting it
logger.warning(
f"Qdrant: Legacy collection '{legacy_collection}' still contains {legacy_count} records. "
f"Manual deletion is required after data migration verification."
)
except Exception as e:
logger.warning(
f"Qdrant: Could not check or cleanup legacy collection '{legacy_collection}': {e}. "
"You may need to delete it manually."
)
return
if vectors_config.size and legacy_dim != vectors_config.size:
logger.error(
f"Qdrant: Dimension mismatch detected! "
f"Legacy collection '{legacy_collection}' has {legacy_dim}d vectors, "
f"but new embedding model expects {vectors_config.size}d."
)
# Case 4: Only legacy exists - migrate data from legacy collection to new collection
logger.info(
f"Qdrant: Migrating data from legacy collection '{legacy_collection}'"
raise DataMigrationError(
f"Dimension mismatch between legacy collection '{legacy_collection}' "
f"and new collection. Expected {vectors_config.size}d but got {legacy_dim}d."
)
client.create_collection(
collection_name, vectors_config=vectors_config, hnsw_config=hnsw_config
)
logger.info(f"Qdrant: Collection '{collection_name}' created successfully")
# create_payload_index return without error if index already exists
client.create_payload_index(
collection_name=collection_name,
field_name=WORKSPACE_ID_FIELD,
field_schema=models.KeywordIndexParams(
type=models.KeywordIndexType.KEYWORD,
is_tenant=True,
),
)
try:
# Get legacy collection count
legacy_count = client.count(
collection_name=legacy_collection, exact=True
).count
# Case 2: Legacy collection exist
if legacy_collection:
# Only drop legacy collection if it's empty
if legacy_count is None:
legacy_count = client.count(
collection_name=legacy_collection, exact=True
).count
if legacy_count == 0:
client.delete_collection(collection_name=legacy_collection)
logger.info(
f"Qdrant: Legacy collection '{legacy_collection}' is empty. No migration needed."
f"Qdrant: Empty legacy collection '{legacy_collection}' deleted successfully"
)
return
logger.info(f"Qdrant: Found {legacy_count} records in legacy collection")
# If both new and legacy collections exist with different names - skip data migration
if new_collection_exists:
logger.warning(
f"Qdrant: Both new collection '{collection_name}' and legacy collection '{legacy_collection}' exist. "
f"Data migration skipped. You may need to delete the legacy collection manually."
)
return
# Check vector dimension compatibility before migration
legacy_info = client.get_collection(legacy_collection)
legacy_dim = legacy_info.config.params.vectors.size
# Get expected dimension from kwargs
new_dim = (
kwargs.get("vectors_config").size
if "vectors_config" in kwargs
else None
# Case 3: Only legacy exists - migrate data from legacy collection to new collection
logger.info(
f"Qdrant: Found legacy collection '{legacy_collection}' with {legacy_count} records."
)
logger.info(
f"Qdrant: Migrating data from legacy collection '{legacy_collection}' to new collection '{collection_name}'"
)
if new_dim and legacy_dim != new_dim:
logger.error(
f"Qdrant: Dimension mismatch detected! "
f"Legacy collection '{legacy_collection}' has {legacy_dim}d vectors, "
f"but new embedding model expects {new_dim}d. "
)
try:
# Batch migration (500 records per batch)
migrated_count = 0
offset = None
batch_size = 500
raise DataMigrationError(
f"Qdrant: Dimension mismatch! "
f"Legacy collection '{legacy_collection}' has {legacy_dim}d vectors, "
f"but new embedding model expects {new_dim}d. "
)
while True:
# Scroll through legacy data
result = client.scroll(
collection_name=legacy_collection,
limit=batch_size,
offset=offset,
with_vectors=True,
with_payload=True,
)
points, next_offset = result
# Batch migration (500 records per batch)
migrated_count = 0
offset = None
batch_size = 500
if not points:
break
while True:
# Scroll through legacy data
result = client.scroll(
collection_name=legacy_collection,
limit=batch_size,
offset=offset,
with_vectors=True,
with_payload=True,
)
points, next_offset = result
# Transform points for new collection
new_points = []
for point in points:
# Add workspace_id to payload
new_payload = dict(point.payload or {})
new_payload[WORKSPACE_ID_FIELD] = workspace or DEFAULT_WORKSPACE
if not points:
break
# Create new point with workspace-prefixed ID
original_id = new_payload.get(ID_FIELD)
if original_id:
new_point_id = compute_mdhash_id_for_qdrant(
original_id, prefix=workspace or DEFAULT_WORKSPACE
)
else:
# Fallback: use original point ID
new_point_id = str(point.id)
# Transform points for new collection
new_points = []
for point in points:
# Add workspace_id to payload
new_payload = dict(point.payload or {})
new_payload[WORKSPACE_ID_FIELD] = workspace or DEFAULT_WORKSPACE
# Create new point with workspace-prefixed ID
original_id = new_payload.get(ID_FIELD)
if original_id:
new_point_id = compute_mdhash_id_for_qdrant(
original_id, prefix=workspace or DEFAULT_WORKSPACE
new_points.append(
models.PointStruct(
id=new_point_id,
vector=point.vector,
payload=new_payload,
)
)
else:
# Fallback: use original point ID
new_point_id = str(point.id)
new_points.append(
models.PointStruct(
id=new_point_id,
vector=point.vector,
payload=new_payload,
)
# Upsert to new collection
client.upsert(
collection_name=collection_name, points=new_points, wait=True
)
# Upsert to new collection
client.upsert(
collection_name=collection_name, points=new_points, wait=True
migrated_count += len(points)
logger.info(
f"Qdrant: {migrated_count}/{legacy_count} records migrated"
)
# Check if we've reached the end
if next_offset is None:
break
offset = next_offset
new_count = client.count(
collection_name=collection_name, exact=True
).count
if new_count != legacy_count:
error_msg = f"Qdrant: Migration verification failed, expected {legacy_count} records, got {new_count} in new collection"
logger.error(error_msg)
raise DataMigrationError(error_msg)
except DataMigrationError:
# Re-raise DataMigrationError as-is to preserve specific error messages
raise
except Exception as e:
logger.error(
f"Qdrant: Failed to migrate data from legacy collection '{legacy_collection}' to new collection '{collection_name}': {e}"
)
migrated_count += len(points)
logger.info(f"Qdrant: {migrated_count}/{legacy_count} records migrated")
# Check if we've reached the end
if next_offset is None:
break
offset = next_offset
# Verify migration by comparing counts
logger.info("Verifying migration...")
new_count = client.count(collection_name=collection_name, exact=True).count
if new_count != legacy_count:
error_msg = f"Qdrant: Migration verification failed, expected {legacy_count} records, got {new_count} in new collection"
logger.error(error_msg)
raise DataMigrationError(error_msg)
raise DataMigrationError(
f"Failed to migrate data from legacy collection '{legacy_collection}' to new collection '{collection_name}'"
) from e
logger.info(
f"Qdrant: Migration from '{legacy_collection}' to '{collection_name}' completed successfully"
@@ -339,14 +326,6 @@ class QdrantVectorDBStorage(BaseVectorStorage):
"Qdrant: Manual deletion is required after data migration verification."
)
except DataMigrationError:
# Re-raise migration errors without wrapping
raise
except Exception as e:
error_msg = f"Qdrant: Collection initialization failed with error: {e}"
logger.error(error_msg)
raise DataMigrationError(error_msg) from e
def __post_init__(self):
# Check for QDRANT_WORKSPACE environment variable first (higher priority)
# This allows administrators to force a specific workspace for all Qdrant storage instances

View File

@@ -11,6 +11,7 @@ from unittest.mock import MagicMock, AsyncMock, patch
from lightrag.kg.qdrant_impl import QdrantVectorDBStorage
from lightrag.kg.postgres_impl import PGVectorStorage
from lightrag.exceptions import DataMigrationError
# Note: Tests should use proper table names that have DDL templates
@@ -21,12 +22,12 @@ from lightrag.kg.postgres_impl import PGVectorStorage
class TestQdrantDimensionMismatch:
"""Test suite for Qdrant dimension mismatch handling."""
def test_qdrant_dimension_mismatch_skip_migration(self):
def test_qdrant_dimension_mismatch_raises_error(self):
"""
Test that Qdrant skips migration when dimensions don't match.
Test that Qdrant raises DataMigrationError when dimensions don't match.
Scenario: Legacy collection has 1536d vectors, new model expects 3072d.
Expected: Migration skipped, new empty collection created, legacy preserved.
Expected: DataMigrationError is raised to prevent data corruption.
"""
from qdrant_client import models
@@ -39,7 +40,9 @@ class TestQdrantDimensionMismatch:
# Setup collection existence checks
def collection_exists_side_effect(name):
if name == "lightrag_chunks": # legacy
if (
name == "lightrag_vdb_chunks"
): # legacy (matches _find_legacy_collection pattern)
return True
elif name == "lightrag_chunks_model_3072d": # new
return False
@@ -49,21 +52,35 @@ class TestQdrantDimensionMismatch:
client.get_collection.return_value = legacy_collection_info
client.count.return_value.count = 100 # Legacy has data
# Call setup_collection with 3072d (different from legacy 1536d)
QdrantVectorDBStorage.setup_collection(
client,
"lightrag_chunks_model_3072d",
namespace="chunks",
workspace="test",
vectors_config=models.VectorParams(
size=3072, distance=models.Distance.COSINE
),
)
# Patch _find_legacy_collection to return the legacy collection name
with patch(
"lightrag.kg.qdrant_impl._find_legacy_collection",
return_value="lightrag_vdb_chunks",
):
# Call setup_collection with 3072d (different from legacy 1536d)
# Should raise DataMigrationError due to dimension mismatch
with pytest.raises(DataMigrationError) as exc_info:
QdrantVectorDBStorage.setup_collection(
client,
"lightrag_chunks_model_3072d",
namespace="chunks",
workspace="test",
vectors_config=models.VectorParams(
size=3072, distance=models.Distance.COSINE
),
hnsw_config=models.HnswConfigDiff(
payload_m=16,
m=0,
),
)
# Verify new collection was created
client.create_collection.assert_called_once()
# Verify error message contains dimension information
assert "3072" in str(exc_info.value) or "1536" in str(exc_info.value)
# Verify migration was NOT attempted (no scroll/upsert calls)
# Verify new collection was NOT created (error raised before creation)
client.create_collection.assert_not_called()
# Verify migration was NOT attempted
client.scroll.assert_not_called()
client.upsert.assert_not_called()
@@ -114,6 +131,10 @@ class TestQdrantDimensionMismatch:
vectors_config=models.VectorParams(
size=1536, distance=models.Distance.COSINE
),
hnsw_config=models.HnswConfigDiff(
payload_m=16,
m=0,
),
)
# Verify migration WAS attempted

View File

@@ -45,17 +45,27 @@ class TestNoModelSuffixSafety:
# Collection is empty
client.count.return_value.count = 0
# Call setup_collection
# This should detect that new == legacy and skip deletion
QdrantVectorDBStorage.setup_collection(
client,
collection_name,
namespace="chunks",
workspace=None,
vectors_config=models.VectorParams(
size=1536, distance=models.Distance.COSINE
),
)
# Patch _find_legacy_collection to return the SAME collection name
# This simulates the scenario where new collection == legacy collection
with patch(
"lightrag.kg.qdrant_impl._find_legacy_collection",
return_value="lightrag_vdb_chunks", # Same as collection_name
):
# Call setup_collection
# This should detect that new == legacy and skip deletion
QdrantVectorDBStorage.setup_collection(
client,
collection_name,
namespace="chunks",
workspace="_",
vectors_config=models.VectorParams(
size=1536, distance=models.Distance.COSINE
),
hnsw_config=models.HnswConfigDiff(
payload_m=16,
m=0,
),
)
# CRITICAL: Collection should NOT be deleted
client.delete_collection.assert_not_called()
@@ -152,10 +162,14 @@ class TestNoModelSuffixSafety:
client,
collection_name,
namespace="chunks",
workspace=None,
workspace="_",
vectors_config=models.VectorParams(
size=1536, distance=models.Distance.COSINE
),
hnsw_config=models.HnswConfigDiff(
payload_m=16,
m=0,
),
)
# SHOULD delete legacy (normal Case 1 behavior)

View File

@@ -1,6 +1,7 @@
import pytest
from unittest.mock import MagicMock, patch, AsyncMock
import numpy as np
from qdrant_client import models
from lightrag.utils import EmbeddingFunc
from lightrag.kg.qdrant_impl import QdrantVectorDBStorage
@@ -124,7 +125,13 @@ async def test_qdrant_migration_trigger(mock_qdrant_client, mock_embedding_func)
@pytest.mark.asyncio
async def test_qdrant_no_migration_needed(mock_qdrant_client, mock_embedding_func):
"""Test scenario where new collection already exists"""
"""Test scenario where new collection already exists (Case 1 in setup_collection)
When only the new collection exists and no legacy collection is found,
the implementation should:
1. Create payload index on the new collection (ensure index exists)
2. NOT attempt any data migration (no scroll calls)
"""
config = {
"embedding_batch_num": 10,
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
@@ -137,15 +144,7 @@ async def test_qdrant_no_migration_needed(mock_qdrant_client, mock_embedding_fun
workspace="test_ws",
)
# New collection exists and Legacy exists (warning case)
# or New collection exists and Legacy does not exist (normal case)
# Mocking case where both exist to test logic flow but without migration
# Logic in code:
# Case 1: Both exist -> Warning only
# Case 2: Only new exists -> Ensure index
# Let's test Case 2: Only new collection exists
# Only new collection exists (no legacy collection found)
mock_qdrant_client.collection_exists.side_effect = (
lambda name: name == storage.final_namespace
)
@@ -153,9 +152,16 @@ async def test_qdrant_no_migration_needed(mock_qdrant_client, mock_embedding_fun
# Initialize
await storage.initialize()
# Should check index but NOT migrate
# In Qdrant implementation, Case 2 calls get_collection
mock_qdrant_client.get_collection.assert_called_with(storage.final_namespace)
# Should create payload index on the new collection (ensure index)
mock_qdrant_client.create_payload_index.assert_called_with(
collection_name=storage.final_namespace,
field_name="workspace_id",
field_schema=models.KeywordIndexParams(
type=models.KeywordIndexType.KEYWORD,
is_tenant=True,
),
)
# Should NOT migrate (no scroll calls since no legacy collection exists)
mock_qdrant_client.scroll.assert_not_called()