Improve Qdrant migration checks and verification logic

- Check workspace data before migrating
- Update Qdrant migration tests
This commit is contained in:
yangdx
2025-12-19 12:03:38 +08:00
parent bf618fc976
commit 0ae60d36bc
2 changed files with 66 additions and 13 deletions

View File

@@ -212,6 +212,11 @@ class QdrantVectorDBStorage(BaseVectorStorage):
# Case 2: Legacy collection exist
if legacy_collection:
workspace_value = workspace or DEFAULT_WORKSPACE
workspace_count_filter = models.Filter(
must=[workspace_filter_condition(workspace_value)]
)
# Only drop legacy collection if it's empty
if legacy_count is None:
legacy_count = client.count(
@@ -224,11 +229,18 @@ class QdrantVectorDBStorage(BaseVectorStorage):
)
return
# If both new and legacy collections exist with different names - skip data migration
if new_collection_exists:
new_workspace_count = client.count(
collection_name=collection_name,
count_filter=workspace_count_filter,
exact=True,
).count
# Skip data migration if new collection already has workspace data
if new_workspace_count > 0:
logger.warning(
f"Qdrant: Both new collection '{collection_name}' and legacy collection '{legacy_collection}' exist. "
f"Data migration skipped. You may need to delete the legacy collection manually."
f"Qdrant: New collection '{collection_name}' already has "
f"{new_workspace_count} records for workspace '{workspace_value}'. "
"Data migration skipped to avoid duplicates."
)
return
@@ -300,11 +312,17 @@ class QdrantVectorDBStorage(BaseVectorStorage):
break
offset = next_offset
new_count = client.count(
collection_name=collection_name, exact=True
new_count_after = client.count(
collection_name=collection_name,
count_filter=workspace_count_filter,
exact=True,
).count
if new_count != legacy_count:
error_msg = f"Qdrant: Migration verification failed, expected {legacy_count} records, got {new_count} in new collection"
inserted_count = new_count_after - new_workspace_count
if inserted_count != legacy_count:
error_msg = (
"Qdrant: Migration verification failed, expected "
f"{legacy_count} inserted records, got {inserted_count}."
)
logger.error(error_msg)
raise DataMigrationError(error_msg)

View File

@@ -87,7 +87,19 @@ async def test_qdrant_migration_trigger(mock_qdrant_client, mock_embedding_func)
)
# 2. Legacy collection exists and has data
mock_qdrant_client.count.return_value.count = 100
migration_state = {"new_workspace_count": 0}
def count_mock(collection_name, exact=True, count_filter=None):
mock_result = MagicMock()
if collection_name == legacy_collection:
mock_result.count = 100
elif collection_name == storage.final_namespace:
mock_result.count = migration_state["new_workspace_count"]
else:
mock_result.count = 0
return mock_result
mock_qdrant_client.count.side_effect = count_mock
# 3. Mock scroll for data migration
mock_point = MagicMock()
@@ -98,6 +110,12 @@ async def test_qdrant_migration_trigger(mock_qdrant_client, mock_embedding_func)
# First call returns points, second call returns empty (end of scroll)
mock_qdrant_client.scroll.side_effect = [([mock_point], "next_offset"), ([], None)]
def upsert_mock(*args, **kwargs):
migration_state["new_workspace_count"] = 100
return None
mock_qdrant_client.upsert.side_effect = upsert_mock
# Initialize storage (triggers migration)
await storage.initialize()
@@ -269,8 +287,19 @@ async def test_scenario_2_legacy_upgrade_migration(
legacy_collection_info.config.params.vectors.size = 1536
mock_qdrant_client.get_collection.return_value = legacy_collection_info
# Mock legacy data
mock_qdrant_client.count.return_value.count = 150
migration_state = {"new_workspace_count": 0}
def count_mock(collection_name, exact=True, count_filter=None):
mock_result = MagicMock()
if collection_name == legacy_collection:
mock_result.count = 150
elif collection_name == new_collection:
mock_result.count = migration_state["new_workspace_count"]
else:
mock_result.count = 0
return mock_result
mock_qdrant_client.count.side_effect = count_mock
# Mock scroll results (simulate migration in batches)
mock_points = []
@@ -284,6 +313,12 @@ async def test_scenario_2_legacy_upgrade_migration(
# First batch returns points, second batch returns empty
mock_qdrant_client.scroll.side_effect = [(mock_points, "offset1"), ([], None)]
def upsert_mock(*args, **kwargs):
migration_state["new_workspace_count"] = 150
return None
mock_qdrant_client.upsert.side_effect = upsert_mock
# Initialize (triggers migration)
await storage.initialize()
@@ -412,7 +447,7 @@ async def test_case1_empty_legacy_auto_cleanup(mock_qdrant_client, mock_embeddin
]
# Mock: Legacy collection is empty (0 records)
def count_mock(collection_name, exact=True):
def count_mock(collection_name, exact=True, count_filter=None):
mock_result = MagicMock()
if collection_name == legacy_collection:
mock_result.count = 0 # Empty legacy collection
@@ -479,7 +514,7 @@ async def test_case1_nonempty_legacy_warning(mock_qdrant_client, mock_embedding_
]
# Mock: Legacy collection has data (50 records)
def count_mock(collection_name, exact=True):
def count_mock(collection_name, exact=True, count_filter=None):
mock_result = MagicMock()
if collection_name == legacy_collection:
mock_result.count = 50 # Legacy has data