Add workspace filtering to Qdrant legacy migration

* Detect workspace field in legacy schema
* Filter scroll by active workspace
* Recount records for accurate tracking
* Prevent cross-workspace data leakage
* Update payload workspace assignment
This commit is contained in:
yangdx
2025-12-19 16:45:15 +08:00
parent 1c083c6699
commit e77a506467

View File

@@ -261,8 +261,37 @@ class QdrantVectorDBStorage(BaseVectorStorage):
return
# Case 3: Only legacy exists - migrate data from legacy collection to new collection
# Check if legacy collection has workspace_id index to determine migration strategy
legacy_info = client.get_collection(legacy_collection)
has_workspace_index = WORKSPACE_ID_FIELD in (
legacy_info.payload_schema or {}
)
# Build workspace filter if legacy collection has workspace support
# This prevents cross-workspace data leakage during migration
legacy_scroll_filter = None
if has_workspace_index:
legacy_scroll_filter = models.Filter(
should=[
workspace_filter_condition(workspace),
models.IsNullCondition(
is_null=models.PayloadField(key=WORKSPACE_ID_FIELD)
),
]
)
# Recount with workspace filter for accurate migration tracking
legacy_count = client.count(
collection_name=legacy_collection,
count_filter=legacy_scroll_filter,
exact=True,
).count
logger.info(
f"Qdrant: Legacy collection has workspace support, "
f"filtering to {legacy_count} records for workspace '{workspace}'"
)
logger.info(
f"Qdrant: Found legacy collection '{legacy_collection}' with {legacy_count} records."
f"Qdrant: Found legacy collection '{legacy_collection}' with {legacy_count} records to migrate."
)
logger.info(
f"Qdrant: Migrating data from legacy collection '{legacy_collection}' to new collection '{collection_name}'"
@@ -275,9 +304,10 @@ class QdrantVectorDBStorage(BaseVectorStorage):
batch_size = 500
while True:
# Scroll through legacy data
# Scroll through legacy data with optional workspace filter
result = client.scroll(
collection_name=legacy_collection,
scroll_filter=legacy_scroll_filter,
limit=batch_size,
offset=offset,
with_vectors=True,
@@ -291,15 +321,15 @@ class QdrantVectorDBStorage(BaseVectorStorage):
# Transform points for new collection
new_points = []
for point in points:
# Add workspace_id to payload
# Set workspace_id in payload
new_payload = dict(point.payload or {})
new_payload[WORKSPACE_ID_FIELD] = workspace or DEFAULT_WORKSPACE
new_payload[WORKSPACE_ID_FIELD] = workspace
# Create new point with workspace-prefixed ID
original_id = new_payload.get(ID_FIELD)
if original_id:
new_point_id = compute_mdhash_id_for_qdrant(
original_id, prefix=workspace or DEFAULT_WORKSPACE
original_id, prefix=workspace
)
else:
# Fallback: use original point ID