From dfe628ad0bf83a3562714fb242057e3bb5382e7d Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 20 Dec 2025 00:32:40 +0800 Subject: [PATCH] Use keyset pagination for PostgreSQL migration - Switch to keyset pagination for migration - Ensure stable ordering via ID column - Prevent row skipping or duplication - Update tests for new query pattern - Minor doc comment fix in Qdrant --- lightrag/kg/postgres_impl.py | 37 +++++++--- lightrag/kg/qdrant_impl.py | 1 + tests/test_postgres_migration.py | 122 ++++++++++++++++++++++++------- 3 files changed, 123 insertions(+), 37 deletions(-) diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index 7a465c26..c427d54c 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -2215,24 +2215,45 @@ async def _pg_migrate_workspace_data( This function uses asyncpg's executemany for efficient batch insertion, reducing database round-trips from N to 1 per batch. + + Uses keyset pagination (cursor-based) with ORDER BY id for stable ordering. + This ensures every legacy row is migrated exactly once, avoiding the + non-deterministic row ordering issues with OFFSET/LIMIT without ORDER BY. """ migrated_count = 0 - offset = 0 + last_id: str | None = None batch_size = 500 while True: + # Use keyset pagination with ORDER BY id for deterministic ordering + # This avoids OFFSET/LIMIT without ORDER BY which can skip or duplicate rows if workspace: - select_query = f"SELECT * FROM {legacy_table_name} WHERE workspace = $1 OFFSET $2 LIMIT $3" - rows = await db.query( - select_query, [workspace, offset, batch_size], multirows=True - ) + if last_id is not None: + select_query = f"SELECT * FROM {legacy_table_name} WHERE workspace = $1 AND id > $2 ORDER BY id LIMIT $3" + rows = await db.query( + select_query, [workspace, last_id, batch_size], multirows=True + ) + else: + select_query = f"SELECT * FROM {legacy_table_name} WHERE workspace = $1 ORDER BY id LIMIT $2" + rows = await db.query( + select_query, [workspace, batch_size], multirows=True + ) else: - select_query = f"SELECT * FROM {legacy_table_name} OFFSET $1 LIMIT $2" - rows = await db.query(select_query, [offset, batch_size], multirows=True) + if last_id is not None: + select_query = f"SELECT * FROM {legacy_table_name} WHERE id > $1 ORDER BY id LIMIT $2" + rows = await db.query( + select_query, [last_id, batch_size], multirows=True + ) + else: + select_query = f"SELECT * FROM {legacy_table_name} ORDER BY id LIMIT $1" + rows = await db.query(select_query, [batch_size], multirows=True) if not rows: break + # Track the last ID for keyset pagination cursor + last_id = rows[-1]["id"] + # Batch insert optimization: use executemany instead of individual inserts # Get column names from the first row first_row = dict(rows[0]) @@ -2268,8 +2289,6 @@ async def _pg_migrate_workspace_data( f"PostgreSQL: {migrated_count}/{expected_count} records migrated{workspace_info}" ) - offset += batch_size - return migrated_count diff --git a/lightrag/kg/qdrant_impl.py b/lightrag/kg/qdrant_impl.py index 71d45d56..4601e05c 100644 --- a/lightrag/kg/qdrant_impl.py +++ b/lightrag/kg/qdrant_impl.py @@ -274,6 +274,7 @@ class QdrantVectorDBStorage(BaseVectorStorage): has_workspace_field = has_workspace_index if not has_workspace_index: # Sample a small batch of points to check for workspace_id in payloads + # All points must have workspace_id if any point has it sample_result = client.scroll( collection_name=legacy_collection, limit=10, # Small sample is sufficient for detection diff --git a/tests/test_postgres_migration.py b/tests/test_postgres_migration.py index 020326f5..ba40675a 100644 --- a/tests/test_postgres_migration.py +++ b/tests/test_postgres_migration.py @@ -137,19 +137,39 @@ async def test_postgres_migration_trigger( return {"count": 100} return {"count": 0} elif multirows and "SELECT *" in sql: - # Mock batch fetch for migration - # Handle workspace filtering: params = [workspace, offset, limit] or [offset, limit] + # Mock batch fetch for migration using keyset pagination + # New pattern: WHERE workspace = $1 AND id > $2 ORDER BY id LIMIT $3 + # or first batch: WHERE workspace = $1 ORDER BY id LIMIT $2 if "WHERE workspace" in sql: - # With workspace filter: params[0]=workspace, params[1]=offset, params[2]=limit - offset = params[1] if len(params) > 1 else 0 - limit = params[2] if len(params) > 2 else 500 + if "id >" in sql: + # Keyset pagination: params = [workspace, last_id, limit] + last_id = params[1] if len(params) > 1 else None + # Find rows after last_id + start_idx = 0 + for i, row in enumerate(mock_rows): + if row["id"] == last_id: + start_idx = i + 1 + break + limit = params[2] if len(params) > 2 else 500 + else: + # First batch (no last_id): params = [workspace, limit] + start_idx = 0 + limit = params[1] if len(params) > 1 else 500 else: - # No workspace filter: params[0]=offset, params[1]=limit - offset = params[0] if params else 0 - limit = params[1] if len(params) > 1 else 500 - start = offset - end = min(offset + limit, len(mock_rows)) - return mock_rows[start:end] + # No workspace filter with keyset + if "id >" in sql: + last_id = params[0] if params else None + start_idx = 0 + for i, row in enumerate(mock_rows): + if row["id"] == last_id: + start_idx = i + 1 + break + limit = params[1] if len(params) > 1 else 500 + else: + start_idx = 0 + limit = params[0] if params else 500 + end = min(start_idx + limit, len(mock_rows)) + return mock_rows[start_idx:end] return {} mock_pg_db.query = AsyncMock(side_effect=mock_query) @@ -336,19 +356,39 @@ async def test_scenario_2_legacy_upgrade_migration( # New table count (before/after migration) return {"count": migration_state["new_table_count"]} elif multirows and "SELECT *" in sql: - # Mock batch fetch for migration - # Handle workspace filtering: params = [workspace, offset, limit] or [offset, limit] + # Mock batch fetch for migration using keyset pagination + # New pattern: WHERE workspace = $1 AND id > $2 ORDER BY id LIMIT $3 + # or first batch: WHERE workspace = $1 ORDER BY id LIMIT $2 if "WHERE workspace" in sql: - # With workspace filter: params[0]=workspace, params[1]=offset, params[2]=limit - offset = params[1] if len(params) > 1 else 0 - limit = params[2] if len(params) > 2 else 500 + if "id >" in sql: + # Keyset pagination: params = [workspace, last_id, limit] + last_id = params[1] if len(params) > 1 else None + # Find rows after last_id + start_idx = 0 + for i, row in enumerate(mock_rows): + if row["id"] == last_id: + start_idx = i + 1 + break + limit = params[2] if len(params) > 2 else 500 + else: + # First batch (no last_id): params = [workspace, limit] + start_idx = 0 + limit = params[1] if len(params) > 1 else 500 else: - # No workspace filter: params[0]=offset, params[1]=limit - offset = params[0] if params else 0 - limit = params[1] if len(params) > 1 else 500 - start = offset - end = min(offset + limit, len(mock_rows)) - return mock_rows[start:end] + # No workspace filter with keyset + if "id >" in sql: + last_id = params[0] if params else None + start_idx = 0 + for i, row in enumerate(mock_rows): + if row["id"] == last_id: + start_idx = i + 1 + break + limit = params[1] if len(params) > 1 else 500 + else: + start_idx = 0 + limit = params[0] if params else 500 + end = min(start_idx + limit, len(mock_rows)) + return mock_rows[start_idx:end] return {} mock_pg_db.query = AsyncMock(side_effect=mock_query) @@ -677,9 +717,22 @@ async def test_case1_sequential_workspace_migration( if "WHERE workspace" in sql: workspace = params[0] if params and len(params) > 0 else None if workspace == "workspace_a": - offset = params[1] if len(params) > 1 else 0 - limit = params[2] if len(params) > 2 else 500 - return mock_rows_a[offset : offset + limit] + # Handle keyset pagination + if "id >" in sql: + # params = [workspace, last_id, limit] + last_id = params[1] if len(params) > 1 else None + start_idx = 0 + for i, row in enumerate(mock_rows_a): + if row["id"] == last_id: + start_idx = i + 1 + break + limit = params[2] if len(params) > 2 else 500 + else: + # First batch: params = [workspace, limit] + start_idx = 0 + limit = params[1] if len(params) > 1 else 500 + end = min(start_idx + limit, len(mock_rows_a)) + return mock_rows_a[start_idx:end] return {} mock_pg_db.query = AsyncMock(side_effect=mock_query_a) @@ -762,9 +815,22 @@ async def test_case1_sequential_workspace_migration( if "WHERE workspace" in sql: workspace = params[0] if params and len(params) > 0 else None if workspace == "workspace_b": - offset = params[1] if len(params) > 1 else 0 - limit = params[2] if len(params) > 2 else 500 - return mock_rows_b[offset : offset + limit] + # Handle keyset pagination + if "id >" in sql: + # params = [workspace, last_id, limit] + last_id = params[1] if len(params) > 1 else None + start_idx = 0 + for i, row in enumerate(mock_rows_b): + if row["id"] == last_id: + start_idx = i + 1 + break + limit = params[2] if len(params) > 2 else 500 + else: + # First batch: params = [workspace, limit] + start_idx = 0 + limit = params[1] if len(params) > 1 else 500 + end = min(start_idx + limit, len(mock_rows_b)) + return mock_rows_b[start_idx:end] return {} mock_pg_db.query = AsyncMock(side_effect=mock_query_b)