- Detect unindexed workspace_id via sample - Prevent cross-workspace data leakage - Fix empty workspace warning logic - Update migration tests for sampling
563 lines
19 KiB
Python
563 lines
19 KiB
Python
import pytest
|
||
from unittest.mock import MagicMock, patch, AsyncMock
|
||
import numpy as np
|
||
from qdrant_client import models
|
||
from lightrag.utils import EmbeddingFunc
|
||
from lightrag.kg.qdrant_impl import QdrantVectorDBStorage
|
||
|
||
|
||
# Mock QdrantClient
|
||
@pytest.fixture
|
||
def mock_qdrant_client():
|
||
with patch("lightrag.kg.qdrant_impl.QdrantClient") as mock_client_cls:
|
||
client = mock_client_cls.return_value
|
||
client.collection_exists.return_value = False
|
||
client.count.return_value.count = 0
|
||
# Mock payload schema and vector config for get_collection
|
||
collection_info = MagicMock()
|
||
collection_info.payload_schema = {}
|
||
# Mock vector dimension to match mock_embedding_func (768d)
|
||
collection_info.config.params.vectors.size = 768
|
||
client.get_collection.return_value = collection_info
|
||
yield client
|
||
|
||
|
||
# Mock get_data_init_lock to avoid async lock issues in tests
|
||
@pytest.fixture(autouse=True)
|
||
def mock_data_init_lock():
|
||
with patch("lightrag.kg.qdrant_impl.get_data_init_lock") as mock_lock:
|
||
mock_lock_ctx = AsyncMock()
|
||
mock_lock.return_value = mock_lock_ctx
|
||
yield mock_lock
|
||
|
||
|
||
# Mock Embedding function
|
||
@pytest.fixture
|
||
def mock_embedding_func():
|
||
async def embed_func(texts, **kwargs):
|
||
return np.array([[0.1] * 768 for _ in texts])
|
||
|
||
func = EmbeddingFunc(embedding_dim=768, func=embed_func, model_name="test-model")
|
||
return func
|
||
|
||
|
||
async def test_qdrant_collection_naming(mock_qdrant_client, mock_embedding_func):
|
||
"""Test if collection name is correctly generated with model suffix"""
|
||
config = {
|
||
"embedding_batch_num": 10,
|
||
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
|
||
}
|
||
|
||
storage = QdrantVectorDBStorage(
|
||
namespace="chunks",
|
||
global_config=config,
|
||
embedding_func=mock_embedding_func,
|
||
workspace="test_ws",
|
||
)
|
||
|
||
# Verify collection name contains model suffix
|
||
expected_suffix = "test_model_768d"
|
||
assert expected_suffix in storage.final_namespace
|
||
assert storage.final_namespace == f"lightrag_vdb_chunks_{expected_suffix}"
|
||
|
||
|
||
async def test_qdrant_migration_trigger(mock_qdrant_client, mock_embedding_func):
|
||
"""Test if migration logic is triggered correctly"""
|
||
config = {
|
||
"embedding_batch_num": 10,
|
||
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
|
||
}
|
||
|
||
storage = QdrantVectorDBStorage(
|
||
namespace="chunks",
|
||
global_config=config,
|
||
embedding_func=mock_embedding_func,
|
||
workspace="test_ws",
|
||
)
|
||
|
||
# Legacy collection name (without model suffix)
|
||
legacy_collection = "lightrag_vdb_chunks"
|
||
|
||
# Setup mocks for migration scenario
|
||
# 1. New collection does not exist, only legacy exists
|
||
mock_qdrant_client.collection_exists.side_effect = (
|
||
lambda name: name == legacy_collection
|
||
)
|
||
|
||
# 2. Legacy collection exists and has data
|
||
migration_state = {"new_workspace_count": 0}
|
||
|
||
def count_mock(collection_name, exact=True, count_filter=None):
|
||
mock_result = MagicMock()
|
||
if collection_name == legacy_collection:
|
||
mock_result.count = 100
|
||
elif collection_name == storage.final_namespace:
|
||
mock_result.count = migration_state["new_workspace_count"]
|
||
else:
|
||
mock_result.count = 0
|
||
return mock_result
|
||
|
||
mock_qdrant_client.count.side_effect = count_mock
|
||
|
||
# 3. Mock scroll for data migration
|
||
mock_point = MagicMock()
|
||
mock_point.id = "old_id"
|
||
mock_point.vector = [0.1] * 768
|
||
mock_point.payload = {"content": "test"} # No workspace_id in payload
|
||
|
||
# When payload_schema is empty, the code first samples payloads to detect workspace_id
|
||
# Then proceeds with migration batches
|
||
# Scroll calls: 1) Sampling (limit=10), 2) Migration batch, 3) End of migration
|
||
mock_qdrant_client.scroll.side_effect = [
|
||
([mock_point], "_"), # Sampling scroll - no workspace_id found
|
||
([mock_point], "next_offset"), # Migration batch
|
||
([], None), # End of migration
|
||
]
|
||
|
||
def upsert_mock(*args, **kwargs):
|
||
migration_state["new_workspace_count"] = 100
|
||
return None
|
||
|
||
mock_qdrant_client.upsert.side_effect = upsert_mock
|
||
|
||
# Initialize storage (triggers migration)
|
||
await storage.initialize()
|
||
|
||
# Verify migration steps
|
||
# 1. Legacy count checked
|
||
mock_qdrant_client.count.assert_any_call(
|
||
collection_name=legacy_collection, exact=True
|
||
)
|
||
|
||
# 2. New collection created
|
||
mock_qdrant_client.create_collection.assert_called()
|
||
|
||
# 3. Data scrolled from legacy
|
||
# First call (index 0) is sampling scroll with limit=10
|
||
# Second call (index 1) is migration batch with limit=500
|
||
assert mock_qdrant_client.scroll.call_count >= 2
|
||
# Check sampling scroll
|
||
sampling_call = mock_qdrant_client.scroll.call_args_list[0]
|
||
assert sampling_call.kwargs["collection_name"] == legacy_collection
|
||
assert sampling_call.kwargs["limit"] == 10
|
||
# Check migration batch scroll
|
||
migration_call = mock_qdrant_client.scroll.call_args_list[1]
|
||
assert migration_call.kwargs["collection_name"] == legacy_collection
|
||
assert migration_call.kwargs["limit"] == 500
|
||
|
||
# 4. Data upserted to new
|
||
mock_qdrant_client.upsert.assert_called()
|
||
|
||
# 5. Payload index created
|
||
mock_qdrant_client.create_payload_index.assert_called()
|
||
|
||
|
||
async def test_qdrant_no_migration_needed(mock_qdrant_client, mock_embedding_func):
|
||
"""Test scenario where new collection already exists (Case 1 in setup_collection)
|
||
|
||
When only the new collection exists and no legacy collection is found,
|
||
the implementation should:
|
||
1. Create payload index on the new collection (ensure index exists)
|
||
2. NOT attempt any data migration (no scroll calls)
|
||
"""
|
||
config = {
|
||
"embedding_batch_num": 10,
|
||
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
|
||
}
|
||
|
||
storage = QdrantVectorDBStorage(
|
||
namespace="chunks",
|
||
global_config=config,
|
||
embedding_func=mock_embedding_func,
|
||
workspace="test_ws",
|
||
)
|
||
|
||
# Only new collection exists (no legacy collection found)
|
||
mock_qdrant_client.collection_exists.side_effect = (
|
||
lambda name: name == storage.final_namespace
|
||
)
|
||
|
||
# Initialize
|
||
await storage.initialize()
|
||
|
||
# Should create payload index on the new collection (ensure index)
|
||
mock_qdrant_client.create_payload_index.assert_called_with(
|
||
collection_name=storage.final_namespace,
|
||
field_name="workspace_id",
|
||
field_schema=models.KeywordIndexParams(
|
||
type=models.KeywordIndexType.KEYWORD,
|
||
is_tenant=True,
|
||
),
|
||
)
|
||
# Should NOT migrate (no scroll calls since no legacy collection exists)
|
||
mock_qdrant_client.scroll.assert_not_called()
|
||
|
||
|
||
# ============================================================================
|
||
# Tests for scenarios described in design document (Lines 606-649)
|
||
# ============================================================================
|
||
|
||
|
||
async def test_scenario_1_new_workspace_creation(
|
||
mock_qdrant_client, mock_embedding_func
|
||
):
|
||
"""
|
||
场景1:新建workspace
|
||
预期:直接创建lightrag_vdb_chunks_text_embedding_3_large_3072d
|
||
"""
|
||
# Use a large embedding model
|
||
large_model_func = EmbeddingFunc(
|
||
embedding_dim=3072,
|
||
func=mock_embedding_func.func,
|
||
model_name="text-embedding-3-large",
|
||
)
|
||
|
||
config = {
|
||
"embedding_batch_num": 10,
|
||
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
|
||
}
|
||
|
||
storage = QdrantVectorDBStorage(
|
||
namespace="chunks",
|
||
global_config=config,
|
||
embedding_func=large_model_func,
|
||
workspace="test_new",
|
||
)
|
||
|
||
# Case 3: Neither legacy nor new collection exists
|
||
mock_qdrant_client.collection_exists.return_value = False
|
||
|
||
# Initialize storage
|
||
await storage.initialize()
|
||
|
||
# Verify: Should create new collection with model suffix
|
||
expected_collection = "lightrag_vdb_chunks_text_embedding_3_large_3072d"
|
||
assert storage.final_namespace == expected_collection
|
||
|
||
# Verify create_collection was called with correct name
|
||
create_calls = [
|
||
call for call in mock_qdrant_client.create_collection.call_args_list
|
||
]
|
||
assert len(create_calls) > 0
|
||
assert (
|
||
create_calls[0][0][0] == expected_collection
|
||
or create_calls[0].kwargs.get("collection_name") == expected_collection
|
||
)
|
||
|
||
# Verify no migration was attempted
|
||
mock_qdrant_client.scroll.assert_not_called()
|
||
|
||
print(
|
||
f"✅ Scenario 1: New workspace created with collection '{expected_collection}'"
|
||
)
|
||
|
||
|
||
async def test_scenario_2_legacy_upgrade_migration(
|
||
mock_qdrant_client, mock_embedding_func
|
||
):
|
||
"""
|
||
场景2:从旧版本升级
|
||
已存在lightrag_vdb_chunks(无后缀)
|
||
预期:自动迁移数据到lightrag_vdb_chunks_text_embedding_ada_002_1536d
|
||
注意:迁移后不再自动删除遗留集合,需要手动删除
|
||
"""
|
||
# Use ada-002 model
|
||
ada_func = EmbeddingFunc(
|
||
embedding_dim=1536,
|
||
func=mock_embedding_func.func,
|
||
model_name="text-embedding-ada-002",
|
||
)
|
||
|
||
config = {
|
||
"embedding_batch_num": 10,
|
||
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
|
||
}
|
||
|
||
storage = QdrantVectorDBStorage(
|
||
namespace="chunks",
|
||
global_config=config,
|
||
embedding_func=ada_func,
|
||
workspace="test_legacy",
|
||
)
|
||
|
||
# Legacy collection name (without model suffix)
|
||
legacy_collection = "lightrag_vdb_chunks"
|
||
new_collection = storage.final_namespace
|
||
|
||
# Case 4: Only legacy collection exists
|
||
mock_qdrant_client.collection_exists.side_effect = (
|
||
lambda name: name == legacy_collection
|
||
)
|
||
|
||
# Mock legacy collection info with 1536d vectors
|
||
legacy_collection_info = MagicMock()
|
||
legacy_collection_info.payload_schema = {}
|
||
legacy_collection_info.config.params.vectors.size = 1536
|
||
mock_qdrant_client.get_collection.return_value = legacy_collection_info
|
||
|
||
migration_state = {"new_workspace_count": 0}
|
||
|
||
def count_mock(collection_name, exact=True, count_filter=None):
|
||
mock_result = MagicMock()
|
||
if collection_name == legacy_collection:
|
||
mock_result.count = 150
|
||
elif collection_name == new_collection:
|
||
mock_result.count = migration_state["new_workspace_count"]
|
||
else:
|
||
mock_result.count = 0
|
||
return mock_result
|
||
|
||
mock_qdrant_client.count.side_effect = count_mock
|
||
|
||
# Mock scroll results (simulate migration in batches)
|
||
mock_points = []
|
||
for i in range(10):
|
||
point = MagicMock()
|
||
point.id = f"legacy-{i}"
|
||
point.vector = [0.1] * 1536
|
||
# No workspace_id in payload - simulates legacy data
|
||
point.payload = {"content": f"Legacy document {i}", "id": f"doc-{i}"}
|
||
mock_points.append(point)
|
||
|
||
# When payload_schema is empty, the code first samples payloads to detect workspace_id
|
||
# Then proceeds with migration batches
|
||
# Scroll calls: 1) Sampling (limit=10), 2) Migration batch, 3) End of migration
|
||
mock_qdrant_client.scroll.side_effect = [
|
||
(mock_points, "_"), # Sampling scroll - no workspace_id found in payloads
|
||
(mock_points, "offset1"), # Migration batch
|
||
([], None), # End of migration
|
||
]
|
||
|
||
def upsert_mock(*args, **kwargs):
|
||
migration_state["new_workspace_count"] = 150
|
||
return None
|
||
|
||
mock_qdrant_client.upsert.side_effect = upsert_mock
|
||
|
||
# Initialize (triggers migration)
|
||
await storage.initialize()
|
||
|
||
# Verify: New collection should be created
|
||
expected_new_collection = "lightrag_vdb_chunks_text_embedding_ada_002_1536d"
|
||
assert storage.final_namespace == expected_new_collection
|
||
|
||
# Verify migration steps
|
||
# 1. Check legacy count
|
||
mock_qdrant_client.count.assert_any_call(
|
||
collection_name=legacy_collection, exact=True
|
||
)
|
||
|
||
# 2. Create new collection
|
||
mock_qdrant_client.create_collection.assert_called()
|
||
|
||
# 3. Scroll legacy data
|
||
scroll_calls = [call for call in mock_qdrant_client.scroll.call_args_list]
|
||
assert len(scroll_calls) >= 1
|
||
assert scroll_calls[0].kwargs["collection_name"] == legacy_collection
|
||
|
||
# 4. Upsert to new collection
|
||
upsert_calls = [call for call in mock_qdrant_client.upsert.call_args_list]
|
||
assert len(upsert_calls) >= 1
|
||
assert upsert_calls[0].kwargs["collection_name"] == new_collection
|
||
|
||
# Note: Legacy collection is NOT automatically deleted after migration
|
||
# Manual deletion is required after data migration verification
|
||
|
||
print(
|
||
f"✅ Scenario 2: Legacy data migrated from '{legacy_collection}' to '{expected_new_collection}'"
|
||
)
|
||
|
||
|
||
async def test_scenario_3_multi_model_coexistence(mock_qdrant_client):
|
||
"""
|
||
场景3:多模型并存
|
||
预期:两个独立的collection,互不干扰
|
||
"""
|
||
|
||
# Model A: bge-small with 768d
|
||
async def embed_func_a(texts, **kwargs):
|
||
return np.array([[0.1] * 768 for _ in texts])
|
||
|
||
model_a_func = EmbeddingFunc(
|
||
embedding_dim=768, func=embed_func_a, model_name="bge-small"
|
||
)
|
||
|
||
# Model B: bge-large with 1024d
|
||
async def embed_func_b(texts, **kwargs):
|
||
return np.array([[0.2] * 1024 for _ in texts])
|
||
|
||
model_b_func = EmbeddingFunc(
|
||
embedding_dim=1024, func=embed_func_b, model_name="bge-large"
|
||
)
|
||
|
||
config = {
|
||
"embedding_batch_num": 10,
|
||
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
|
||
}
|
||
|
||
# Create storage for workspace A with model A
|
||
storage_a = QdrantVectorDBStorage(
|
||
namespace="chunks",
|
||
global_config=config,
|
||
embedding_func=model_a_func,
|
||
workspace="workspace_a",
|
||
)
|
||
|
||
# Create storage for workspace B with model B
|
||
storage_b = QdrantVectorDBStorage(
|
||
namespace="chunks",
|
||
global_config=config,
|
||
embedding_func=model_b_func,
|
||
workspace="workspace_b",
|
||
)
|
||
|
||
# Verify: Collection names are different
|
||
assert storage_a.final_namespace != storage_b.final_namespace
|
||
|
||
# Verify: Model A collection
|
||
expected_collection_a = "lightrag_vdb_chunks_bge_small_768d"
|
||
assert storage_a.final_namespace == expected_collection_a
|
||
|
||
# Verify: Model B collection
|
||
expected_collection_b = "lightrag_vdb_chunks_bge_large_1024d"
|
||
assert storage_b.final_namespace == expected_collection_b
|
||
|
||
# Verify: Different embedding dimensions are preserved
|
||
assert storage_a.embedding_func.embedding_dim == 768
|
||
assert storage_b.embedding_func.embedding_dim == 1024
|
||
|
||
print("✅ Scenario 3: Multi-model coexistence verified")
|
||
print(f" - Workspace A: {expected_collection_a} (768d)")
|
||
print(f" - Workspace B: {expected_collection_b} (1024d)")
|
||
print(" - Collections are independent")
|
||
|
||
|
||
async def test_case1_empty_legacy_auto_cleanup(mock_qdrant_client, mock_embedding_func):
|
||
"""
|
||
Case 1a: 新旧collection都存在,且旧库为空
|
||
预期:自动删除旧库
|
||
"""
|
||
config = {
|
||
"embedding_batch_num": 10,
|
||
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
|
||
}
|
||
|
||
storage = QdrantVectorDBStorage(
|
||
namespace="chunks",
|
||
global_config=config,
|
||
embedding_func=mock_embedding_func,
|
||
workspace="test_ws",
|
||
)
|
||
|
||
# Legacy collection name (without model suffix)
|
||
legacy_collection = "lightrag_vdb_chunks"
|
||
new_collection = storage.final_namespace
|
||
|
||
# Mock: Both collections exist
|
||
mock_qdrant_client.collection_exists.side_effect = lambda name: name in [
|
||
legacy_collection,
|
||
new_collection,
|
||
]
|
||
|
||
# Mock: Legacy collection is empty (0 records)
|
||
def count_mock(collection_name, exact=True, count_filter=None):
|
||
mock_result = MagicMock()
|
||
if collection_name == legacy_collection:
|
||
mock_result.count = 0 # Empty legacy collection
|
||
else:
|
||
mock_result.count = 100 # New collection has data
|
||
return mock_result
|
||
|
||
mock_qdrant_client.count.side_effect = count_mock
|
||
|
||
# Mock get_collection for Case 2 check
|
||
collection_info = MagicMock()
|
||
collection_info.payload_schema = {"workspace_id": True}
|
||
mock_qdrant_client.get_collection.return_value = collection_info
|
||
|
||
# Initialize storage
|
||
await storage.initialize()
|
||
|
||
# Verify: Empty legacy collection should be automatically cleaned up
|
||
# Empty collections are safe to delete without data loss risk
|
||
delete_calls = [
|
||
call for call in mock_qdrant_client.delete_collection.call_args_list
|
||
]
|
||
assert len(delete_calls) >= 1, "Empty legacy collection should be auto-deleted"
|
||
deleted_collection = (
|
||
delete_calls[0][0][0]
|
||
if delete_calls[0][0]
|
||
else delete_calls[0].kwargs.get("collection_name")
|
||
)
|
||
assert (
|
||
deleted_collection == legacy_collection
|
||
), f"Expected to delete '{legacy_collection}', but deleted '{deleted_collection}'"
|
||
|
||
print(
|
||
f"✅ Case 1a: Empty legacy collection '{legacy_collection}' auto-deleted successfully"
|
||
)
|
||
|
||
|
||
async def test_case1_nonempty_legacy_warning(mock_qdrant_client, mock_embedding_func):
|
||
"""
|
||
Case 1b: 新旧collection都存在,且旧库有数据
|
||
预期:警告但不删除
|
||
"""
|
||
config = {
|
||
"embedding_batch_num": 10,
|
||
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
|
||
}
|
||
|
||
storage = QdrantVectorDBStorage(
|
||
namespace="chunks",
|
||
global_config=config,
|
||
embedding_func=mock_embedding_func,
|
||
workspace="test_ws",
|
||
)
|
||
|
||
# Legacy collection name (without model suffix)
|
||
legacy_collection = "lightrag_vdb_chunks"
|
||
new_collection = storage.final_namespace
|
||
|
||
# Mock: Both collections exist
|
||
mock_qdrant_client.collection_exists.side_effect = lambda name: name in [
|
||
legacy_collection,
|
||
new_collection,
|
||
]
|
||
|
||
# Mock: Legacy collection has data (50 records)
|
||
def count_mock(collection_name, exact=True, count_filter=None):
|
||
mock_result = MagicMock()
|
||
if collection_name == legacy_collection:
|
||
mock_result.count = 50 # Legacy has data
|
||
else:
|
||
mock_result.count = 100 # New collection has data
|
||
return mock_result
|
||
|
||
mock_qdrant_client.count.side_effect = count_mock
|
||
|
||
# Mock get_collection for Case 2 check
|
||
collection_info = MagicMock()
|
||
collection_info.payload_schema = {"workspace_id": True}
|
||
mock_qdrant_client.get_collection.return_value = collection_info
|
||
|
||
# Initialize storage
|
||
await storage.initialize()
|
||
|
||
# Verify: Legacy collection with data should be preserved
|
||
# We never auto-delete collections that contain data to prevent accidental data loss
|
||
delete_calls = [
|
||
call for call in mock_qdrant_client.delete_collection.call_args_list
|
||
]
|
||
# Check if legacy collection was deleted (it should not be)
|
||
legacy_deleted = any(
|
||
(call[0][0] if call[0] else call.kwargs.get("collection_name"))
|
||
== legacy_collection
|
||
for call in delete_calls
|
||
)
|
||
assert not legacy_deleted, "Legacy collection with data should NOT be auto-deleted"
|
||
|
||
print(
|
||
f"✅ Case 1b: Legacy collection '{legacy_collection}' with data preserved (warning only)"
|
||
)
|