Improve Qdrant workspace detection via payload sampling
- Detect unindexed workspace_id via sample - Prevent cross-workspace data leakage - Fix empty workspace warning logic - Update migration tests for sampling
This commit is contained in:
@@ -181,7 +181,7 @@ class QdrantVectorDBStorage(BaseVectorStorage):
|
||||
).count
|
||||
|
||||
# Skip data migration if new collection already has workspace data
|
||||
if new_workspace_count == 0:
|
||||
if new_workspace_count == 0 and not (collection_name == legacy_collection):
|
||||
logger.warning(
|
||||
f"Qdrant: workspace data in collection '{collection_name}' is empty. "
|
||||
f"Ensure it is caused by new workspace setup and not an unexpected embedding model change."
|
||||
@@ -261,16 +261,39 @@ class QdrantVectorDBStorage(BaseVectorStorage):
|
||||
return
|
||||
|
||||
# Case 3: Only legacy exists - migrate data from legacy collection to new collection
|
||||
# Check if legacy collection has workspace_id index to determine migration strategy
|
||||
# Check if legacy collection has workspace_id to determine migration strategy
|
||||
# Note: payload_schema only reflects INDEXED fields, so we also sample
|
||||
# actual payloads to detect unindexed workspace_id fields
|
||||
legacy_info = client.get_collection(legacy_collection)
|
||||
has_workspace_index = WORKSPACE_ID_FIELD in (
|
||||
legacy_info.payload_schema or {}
|
||||
)
|
||||
|
||||
# Detect workspace_id field presence by sampling payloads if not indexed
|
||||
# This prevents cross-workspace data leakage when workspace_id exists but isn't indexed
|
||||
has_workspace_field = has_workspace_index
|
||||
if not has_workspace_index:
|
||||
# Sample a small batch of points to check for workspace_id in payloads
|
||||
sample_result = client.scroll(
|
||||
collection_name=legacy_collection,
|
||||
limit=10, # Small sample is sufficient for detection
|
||||
with_payload=True,
|
||||
with_vectors=False,
|
||||
)
|
||||
sample_points, _ = sample_result
|
||||
for point in sample_points:
|
||||
if point.payload and WORKSPACE_ID_FIELD in point.payload:
|
||||
has_workspace_field = True
|
||||
logger.info(
|
||||
f"Qdrant: Detected unindexed {WORKSPACE_ID_FIELD} field "
|
||||
f"in legacy collection '{legacy_collection}' via payload sampling"
|
||||
)
|
||||
break
|
||||
|
||||
# Build workspace filter if legacy collection has workspace support
|
||||
# This prevents cross-workspace data leakage during migration
|
||||
legacy_scroll_filter = None
|
||||
if has_workspace_index:
|
||||
if has_workspace_field:
|
||||
legacy_scroll_filter = models.Filter(
|
||||
should=[
|
||||
workspace_filter_condition(workspace),
|
||||
|
||||
@@ -103,10 +103,16 @@ async def test_qdrant_migration_trigger(mock_qdrant_client, mock_embedding_func)
|
||||
mock_point = MagicMock()
|
||||
mock_point.id = "old_id"
|
||||
mock_point.vector = [0.1] * 768
|
||||
mock_point.payload = {"content": "test"}
|
||||
mock_point.payload = {"content": "test"} # No workspace_id in payload
|
||||
|
||||
# First call returns points, second call returns empty (end of scroll)
|
||||
mock_qdrant_client.scroll.side_effect = [([mock_point], "next_offset"), ([], None)]
|
||||
# When payload_schema is empty, the code first samples payloads to detect workspace_id
|
||||
# Then proceeds with migration batches
|
||||
# Scroll calls: 1) Sampling (limit=10), 2) Migration batch, 3) End of migration
|
||||
mock_qdrant_client.scroll.side_effect = [
|
||||
([mock_point], "_"), # Sampling scroll - no workspace_id found
|
||||
([mock_point], "next_offset"), # Migration batch
|
||||
([], None), # End of migration
|
||||
]
|
||||
|
||||
def upsert_mock(*args, **kwargs):
|
||||
migration_state["new_workspace_count"] = 100
|
||||
@@ -127,10 +133,17 @@ async def test_qdrant_migration_trigger(mock_qdrant_client, mock_embedding_func)
|
||||
mock_qdrant_client.create_collection.assert_called()
|
||||
|
||||
# 3. Data scrolled from legacy
|
||||
assert mock_qdrant_client.scroll.call_count >= 1
|
||||
call_args = mock_qdrant_client.scroll.call_args_list[0]
|
||||
assert call_args.kwargs["collection_name"] == legacy_collection
|
||||
assert call_args.kwargs["limit"] == 500
|
||||
# First call (index 0) is sampling scroll with limit=10
|
||||
# Second call (index 1) is migration batch with limit=500
|
||||
assert mock_qdrant_client.scroll.call_count >= 2
|
||||
# Check sampling scroll
|
||||
sampling_call = mock_qdrant_client.scroll.call_args_list[0]
|
||||
assert sampling_call.kwargs["collection_name"] == legacy_collection
|
||||
assert sampling_call.kwargs["limit"] == 10
|
||||
# Check migration batch scroll
|
||||
migration_call = mock_qdrant_client.scroll.call_args_list[1]
|
||||
assert migration_call.kwargs["collection_name"] == legacy_collection
|
||||
assert migration_call.kwargs["limit"] == 500
|
||||
|
||||
# 4. Data upserted to new
|
||||
mock_qdrant_client.upsert.assert_called()
|
||||
@@ -302,11 +315,18 @@ async def test_scenario_2_legacy_upgrade_migration(
|
||||
point = MagicMock()
|
||||
point.id = f"legacy-{i}"
|
||||
point.vector = [0.1] * 1536
|
||||
# No workspace_id in payload - simulates legacy data
|
||||
point.payload = {"content": f"Legacy document {i}", "id": f"doc-{i}"}
|
||||
mock_points.append(point)
|
||||
|
||||
# First batch returns points, second batch returns empty
|
||||
mock_qdrant_client.scroll.side_effect = [(mock_points, "offset1"), ([], None)]
|
||||
# When payload_schema is empty, the code first samples payloads to detect workspace_id
|
||||
# Then proceeds with migration batches
|
||||
# Scroll calls: 1) Sampling (limit=10), 2) Migration batch, 3) End of migration
|
||||
mock_qdrant_client.scroll.side_effect = [
|
||||
(mock_points, "_"), # Sampling scroll - no workspace_id found in payloads
|
||||
(mock_points, "offset1"), # Migration batch
|
||||
([], None), # End of migration
|
||||
]
|
||||
|
||||
def upsert_mock(*args, **kwargs):
|
||||
migration_state["new_workspace_count"] = 150
|
||||
|
||||
Reference in New Issue
Block a user