Use keyset pagination for PostgreSQL migration

- Switch to keyset pagination for migration
- Ensure stable ordering via ID column
- Prevent row skipping or duplication
- Update tests for new query pattern
- Minor doc comment fix in Qdrant
This commit is contained in:
yangdx
2025-12-20 00:32:40 +08:00
parent 93ea50c4aa
commit dfe628ad0b
3 changed files with 123 additions and 37 deletions

View File

@@ -2215,24 +2215,45 @@ async def _pg_migrate_workspace_data(
This function uses asyncpg's executemany for efficient batch insertion,
reducing database round-trips from N to 1 per batch.
Uses keyset pagination (cursor-based) with ORDER BY id for stable ordering.
This ensures every legacy row is migrated exactly once, avoiding the
non-deterministic row ordering issues with OFFSET/LIMIT without ORDER BY.
"""
migrated_count = 0
offset = 0
last_id: str | None = None
batch_size = 500
while True:
# Use keyset pagination with ORDER BY id for deterministic ordering
# This avoids OFFSET/LIMIT without ORDER BY which can skip or duplicate rows
if workspace:
select_query = f"SELECT * FROM {legacy_table_name} WHERE workspace = $1 OFFSET $2 LIMIT $3"
rows = await db.query(
select_query, [workspace, offset, batch_size], multirows=True
)
if last_id is not None:
select_query = f"SELECT * FROM {legacy_table_name} WHERE workspace = $1 AND id > $2 ORDER BY id LIMIT $3"
rows = await db.query(
select_query, [workspace, last_id, batch_size], multirows=True
)
else:
select_query = f"SELECT * FROM {legacy_table_name} WHERE workspace = $1 ORDER BY id LIMIT $2"
rows = await db.query(
select_query, [workspace, batch_size], multirows=True
)
else:
select_query = f"SELECT * FROM {legacy_table_name} OFFSET $1 LIMIT $2"
rows = await db.query(select_query, [offset, batch_size], multirows=True)
if last_id is not None:
select_query = f"SELECT * FROM {legacy_table_name} WHERE id > $1 ORDER BY id LIMIT $2"
rows = await db.query(
select_query, [last_id, batch_size], multirows=True
)
else:
select_query = f"SELECT * FROM {legacy_table_name} ORDER BY id LIMIT $1"
rows = await db.query(select_query, [batch_size], multirows=True)
if not rows:
break
# Track the last ID for keyset pagination cursor
last_id = rows[-1]["id"]
# Batch insert optimization: use executemany instead of individual inserts
# Get column names from the first row
first_row = dict(rows[0])
@@ -2268,8 +2289,6 @@ async def _pg_migrate_workspace_data(
f"PostgreSQL: {migrated_count}/{expected_count} records migrated{workspace_info}"
)
offset += batch_size
return migrated_count

View File

@@ -274,6 +274,7 @@ class QdrantVectorDBStorage(BaseVectorStorage):
has_workspace_field = has_workspace_index
if not has_workspace_index:
# Sample a small batch of points to check for workspace_id in payloads
# All points must have workspace_id if any point has it
sample_result = client.scroll(
collection_name=legacy_collection,
limit=10, # Small sample is sufficient for detection

View File

@@ -137,19 +137,39 @@ async def test_postgres_migration_trigger(
return {"count": 100}
return {"count": 0}
elif multirows and "SELECT *" in sql:
# Mock batch fetch for migration
# Handle workspace filtering: params = [workspace, offset, limit] or [offset, limit]
# Mock batch fetch for migration using keyset pagination
# New pattern: WHERE workspace = $1 AND id > $2 ORDER BY id LIMIT $3
# or first batch: WHERE workspace = $1 ORDER BY id LIMIT $2
if "WHERE workspace" in sql:
# With workspace filter: params[0]=workspace, params[1]=offset, params[2]=limit
offset = params[1] if len(params) > 1 else 0
limit = params[2] if len(params) > 2 else 500
if "id >" in sql:
# Keyset pagination: params = [workspace, last_id, limit]
last_id = params[1] if len(params) > 1 else None
# Find rows after last_id
start_idx = 0
for i, row in enumerate(mock_rows):
if row["id"] == last_id:
start_idx = i + 1
break
limit = params[2] if len(params) > 2 else 500
else:
# First batch (no last_id): params = [workspace, limit]
start_idx = 0
limit = params[1] if len(params) > 1 else 500
else:
# No workspace filter: params[0]=offset, params[1]=limit
offset = params[0] if params else 0
limit = params[1] if len(params) > 1 else 500
start = offset
end = min(offset + limit, len(mock_rows))
return mock_rows[start:end]
# No workspace filter with keyset
if "id >" in sql:
last_id = params[0] if params else None
start_idx = 0
for i, row in enumerate(mock_rows):
if row["id"] == last_id:
start_idx = i + 1
break
limit = params[1] if len(params) > 1 else 500
else:
start_idx = 0
limit = params[0] if params else 500
end = min(start_idx + limit, len(mock_rows))
return mock_rows[start_idx:end]
return {}
mock_pg_db.query = AsyncMock(side_effect=mock_query)
@@ -336,19 +356,39 @@ async def test_scenario_2_legacy_upgrade_migration(
# New table count (before/after migration)
return {"count": migration_state["new_table_count"]}
elif multirows and "SELECT *" in sql:
# Mock batch fetch for migration
# Handle workspace filtering: params = [workspace, offset, limit] or [offset, limit]
# Mock batch fetch for migration using keyset pagination
# New pattern: WHERE workspace = $1 AND id > $2 ORDER BY id LIMIT $3
# or first batch: WHERE workspace = $1 ORDER BY id LIMIT $2
if "WHERE workspace" in sql:
# With workspace filter: params[0]=workspace, params[1]=offset, params[2]=limit
offset = params[1] if len(params) > 1 else 0
limit = params[2] if len(params) > 2 else 500
if "id >" in sql:
# Keyset pagination: params = [workspace, last_id, limit]
last_id = params[1] if len(params) > 1 else None
# Find rows after last_id
start_idx = 0
for i, row in enumerate(mock_rows):
if row["id"] == last_id:
start_idx = i + 1
break
limit = params[2] if len(params) > 2 else 500
else:
# First batch (no last_id): params = [workspace, limit]
start_idx = 0
limit = params[1] if len(params) > 1 else 500
else:
# No workspace filter: params[0]=offset, params[1]=limit
offset = params[0] if params else 0
limit = params[1] if len(params) > 1 else 500
start = offset
end = min(offset + limit, len(mock_rows))
return mock_rows[start:end]
# No workspace filter with keyset
if "id >" in sql:
last_id = params[0] if params else None
start_idx = 0
for i, row in enumerate(mock_rows):
if row["id"] == last_id:
start_idx = i + 1
break
limit = params[1] if len(params) > 1 else 500
else:
start_idx = 0
limit = params[0] if params else 500
end = min(start_idx + limit, len(mock_rows))
return mock_rows[start_idx:end]
return {}
mock_pg_db.query = AsyncMock(side_effect=mock_query)
@@ -677,9 +717,22 @@ async def test_case1_sequential_workspace_migration(
if "WHERE workspace" in sql:
workspace = params[0] if params and len(params) > 0 else None
if workspace == "workspace_a":
offset = params[1] if len(params) > 1 else 0
limit = params[2] if len(params) > 2 else 500
return mock_rows_a[offset : offset + limit]
# Handle keyset pagination
if "id >" in sql:
# params = [workspace, last_id, limit]
last_id = params[1] if len(params) > 1 else None
start_idx = 0
for i, row in enumerate(mock_rows_a):
if row["id"] == last_id:
start_idx = i + 1
break
limit = params[2] if len(params) > 2 else 500
else:
# First batch: params = [workspace, limit]
start_idx = 0
limit = params[1] if len(params) > 1 else 500
end = min(start_idx + limit, len(mock_rows_a))
return mock_rows_a[start_idx:end]
return {}
mock_pg_db.query = AsyncMock(side_effect=mock_query_a)
@@ -762,9 +815,22 @@ async def test_case1_sequential_workspace_migration(
if "WHERE workspace" in sql:
workspace = params[0] if params and len(params) > 0 else None
if workspace == "workspace_b":
offset = params[1] if len(params) > 1 else 0
limit = params[2] if len(params) > 2 else 500
return mock_rows_b[offset : offset + limit]
# Handle keyset pagination
if "id >" in sql:
# params = [workspace, last_id, limit]
last_id = params[1] if len(params) > 1 else None
start_idx = 0
for i, row in enumerate(mock_rows_b):
if row["id"] == last_id:
start_idx = i + 1
break
limit = params[2] if len(params) > 2 else 500
else:
# First batch: params = [workspace, limit]
start_idx = 0
limit = params[1] if len(params) > 1 else 500
end = min(start_idx + limit, len(mock_rows_b))
return mock_rows_b[start_idx:end]
return {}
mock_pg_db.query = AsyncMock(side_effect=mock_query_b)