Files
lightrag/tests/test_postgres_migration.py
yangdx be744a28a7 Update Postgres tests for keyset pagination and API changes
- Use check_table_exists DB method
- Update mocks for keyset pagination
- Enforce error on dimension mismatch
- Remove deprecated module patches
- Verify workspace migration isolation
2025-12-21 18:37:28 +08:00

857 lines
31 KiB
Python

import pytest
from unittest.mock import patch, AsyncMock
import numpy as np
from lightrag.utils import EmbeddingFunc
from lightrag.kg.postgres_impl import (
PGVectorStorage,
)
from lightrag.namespace import NameSpace
# Mock PostgreSQLDB
@pytest.fixture
def mock_pg_db():
"""Mock PostgreSQL database connection"""
db = AsyncMock()
db.workspace = "test_workspace"
# Mock query responses with multirows support
async def mock_query(sql, params=None, multirows=False, **kwargs):
# Default return value
if multirows:
return [] # Return empty list for multirows
return {"exists": False, "count": 0}
# Mock for execute that mimics PostgreSQLDB.execute() behavior
async def mock_execute(sql, data=None, **kwargs):
"""
Mock that mimics PostgreSQLDB.execute() behavior:
- Accepts data as dict[str, Any] | None (second parameter)
- Internally converts dict.values() to tuple for AsyncPG
"""
# Mimic real execute() which accepts dict and converts to tuple
if data is not None and not isinstance(data, dict):
raise TypeError(
f"PostgreSQLDB.execute() expects data as dict, got {type(data).__name__}"
)
return None
db.query = AsyncMock(side_effect=mock_query)
db.execute = AsyncMock(side_effect=mock_execute)
return db
# Mock get_data_init_lock to avoid async lock issues in tests
@pytest.fixture(autouse=True)
def mock_data_init_lock():
with patch("lightrag.kg.postgres_impl.get_data_init_lock") as mock_lock:
mock_lock_ctx = AsyncMock()
mock_lock.return_value = mock_lock_ctx
yield mock_lock
# Mock ClientManager
@pytest.fixture
def mock_client_manager(mock_pg_db):
with patch("lightrag.kg.postgres_impl.ClientManager") as mock_manager:
mock_manager.get_client = AsyncMock(return_value=mock_pg_db)
mock_manager.release_client = AsyncMock()
yield mock_manager
# Mock Embedding function
@pytest.fixture
def mock_embedding_func():
async def embed_func(texts, **kwargs):
return np.array([[0.1] * 768 for _ in texts])
func = EmbeddingFunc(embedding_dim=768, func=embed_func, model_name="test_model")
return func
async def test_postgres_table_naming(
mock_client_manager, mock_pg_db, mock_embedding_func
):
"""Test if table name is correctly generated with model suffix"""
config = {
"embedding_batch_num": 10,
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
}
storage = PGVectorStorage(
namespace=NameSpace.VECTOR_STORE_CHUNKS,
global_config=config,
embedding_func=mock_embedding_func,
workspace="test_ws",
)
# Verify table name contains model suffix
expected_suffix = "test_model_768d"
assert expected_suffix in storage.table_name
assert storage.table_name == f"LIGHTRAG_VDB_CHUNKS_{expected_suffix}"
# Verify legacy table name
assert storage.legacy_table_name == "LIGHTRAG_VDB_CHUNKS"
async def test_postgres_migration_trigger(
mock_client_manager, mock_pg_db, mock_embedding_func
):
"""Test if migration logic is triggered correctly"""
config = {
"embedding_batch_num": 10,
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
}
storage = PGVectorStorage(
namespace=NameSpace.VECTOR_STORE_CHUNKS,
global_config=config,
embedding_func=mock_embedding_func,
workspace="test_ws",
)
# Setup mocks for migration scenario
# 1. New table does not exist, legacy table exists
async def mock_check_table_exists(table_name):
return table_name == storage.legacy_table_name
mock_pg_db.check_table_exists = AsyncMock(side_effect=mock_check_table_exists)
# 2. Legacy table has 100 records
mock_rows = [
{"id": f"test_id_{i}", "content": f"content_{i}", "workspace": "test_ws"}
for i in range(100)
]
migration_state = {"new_table_count": 0}
async def mock_query(sql, params=None, multirows=False, **kwargs):
if "COUNT(*)" in sql:
sql_upper = sql.upper()
legacy_table = storage.legacy_table_name.upper()
new_table = storage.table_name.upper()
is_new_table = new_table in sql_upper
is_legacy_table = legacy_table in sql_upper and not is_new_table
if is_new_table:
return {"count": migration_state["new_table_count"]}
if is_legacy_table:
return {"count": 100}
return {"count": 0}
elif multirows and "SELECT *" in sql:
# Mock batch fetch for migration using keyset pagination
# New pattern: WHERE workspace = $1 AND id > $2 ORDER BY id LIMIT $3
# or first batch: WHERE workspace = $1 ORDER BY id LIMIT $2
if "WHERE workspace" in sql:
if "id >" in sql:
# Keyset pagination: params = [workspace, last_id, limit]
last_id = params[1] if len(params) > 1 else None
# Find rows after last_id
start_idx = 0
for i, row in enumerate(mock_rows):
if row["id"] == last_id:
start_idx = i + 1
break
limit = params[2] if len(params) > 2 else 500
else:
# First batch (no last_id): params = [workspace, limit]
start_idx = 0
limit = params[1] if len(params) > 1 else 500
else:
# No workspace filter with keyset
if "id >" in sql:
last_id = params[0] if params else None
start_idx = 0
for i, row in enumerate(mock_rows):
if row["id"] == last_id:
start_idx = i + 1
break
limit = params[1] if len(params) > 1 else 500
else:
start_idx = 0
limit = params[0] if params else 500
end = min(start_idx + limit, len(mock_rows))
return mock_rows[start_idx:end]
return {}
mock_pg_db.query = AsyncMock(side_effect=mock_query)
# Track migration through _run_with_retry calls
migration_executed = []
async def mock_run_with_retry(operation, **kwargs):
# Track that migration batch operation was called
migration_executed.append(True)
migration_state["new_table_count"] = 100
return None
mock_pg_db._run_with_retry = AsyncMock(side_effect=mock_run_with_retry)
with patch(
"lightrag.kg.postgres_impl.PGVectorStorage._pg_create_table", AsyncMock()
):
# Initialize storage (should trigger migration)
await storage.initialize()
# Verify migration was executed by checking _run_with_retry was called
# (batch migration uses _run_with_retry with executemany)
assert len(migration_executed) > 0, "Migration should have been executed"
async def test_postgres_no_migration_needed(
mock_client_manager, mock_pg_db, mock_embedding_func
):
"""Test scenario where new table already exists (no migration needed)"""
config = {
"embedding_batch_num": 10,
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
}
storage = PGVectorStorage(
namespace=NameSpace.VECTOR_STORE_CHUNKS,
global_config=config,
embedding_func=mock_embedding_func,
workspace="test_ws",
)
# Mock: new table already exists
async def mock_check_table_exists(table_name):
return table_name == storage.table_name
mock_pg_db.check_table_exists = AsyncMock(side_effect=mock_check_table_exists)
with patch(
"lightrag.kg.postgres_impl.PGVectorStorage._pg_create_table", AsyncMock()
) as mock_create:
await storage.initialize()
# Verify no table creation was attempted
mock_create.assert_not_called()
async def test_scenario_1_new_workspace_creation(
mock_client_manager, mock_pg_db, mock_embedding_func
):
"""
Scenario 1: New workspace creation
Expected behavior:
- No legacy table exists
- Directly create new table with model suffix
- No migration needed
"""
config = {
"embedding_batch_num": 10,
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
}
embedding_func = EmbeddingFunc(
embedding_dim=3072,
func=mock_embedding_func.func,
model_name="text-embedding-3-large",
)
storage = PGVectorStorage(
namespace=NameSpace.VECTOR_STORE_CHUNKS,
global_config=config,
embedding_func=embedding_func,
workspace="new_workspace",
)
# Mock: neither table exists
async def mock_check_table_exists(table_name):
return False
mock_pg_db.check_table_exists = AsyncMock(side_effect=mock_check_table_exists)
with patch(
"lightrag.kg.postgres_impl.PGVectorStorage._pg_create_table", AsyncMock()
) as mock_create:
await storage.initialize()
# Verify table name format
assert "text_embedding_3_large_3072d" in storage.table_name
# Verify new table creation was called
mock_create.assert_called_once()
call_args = mock_create.call_args
assert (
call_args[0][1] == storage.table_name
) # table_name is second positional arg
async def test_scenario_2_legacy_upgrade_migration(
mock_client_manager, mock_pg_db, mock_embedding_func
):
"""
Scenario 2: Upgrade from legacy version
Expected behavior:
- Legacy table exists (without model suffix)
- New table doesn't exist
- Automatically migrate data to new table with suffix
"""
config = {
"embedding_batch_num": 10,
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
}
embedding_func = EmbeddingFunc(
embedding_dim=1536,
func=mock_embedding_func.func,
model_name="text-embedding-ada-002",
)
storage = PGVectorStorage(
namespace=NameSpace.VECTOR_STORE_CHUNKS,
global_config=config,
embedding_func=embedding_func,
workspace="legacy_workspace",
)
# Mock: only legacy table exists
async def mock_check_table_exists(table_name):
return table_name == storage.legacy_table_name
mock_pg_db.check_table_exists = AsyncMock(side_effect=mock_check_table_exists)
# Mock: legacy table has 50 records
mock_rows = [
{
"id": f"legacy_id_{i}",
"content": f"legacy_content_{i}",
"workspace": "legacy_workspace",
}
for i in range(50)
]
# Track which queries have been made for proper response
query_history = []
migration_state = {"new_table_count": 0}
async def mock_query(sql, params=None, multirows=False, **kwargs):
query_history.append(sql)
if "COUNT(*)" in sql:
# Determine table type:
# - Legacy: contains base name but NOT model suffix
# - New: contains model suffix (e.g., text_embedding_ada_002_1536d)
sql_upper = sql.upper()
base_name = storage.legacy_table_name.upper()
# Check if this is querying the new table (has model suffix)
has_model_suffix = storage.table_name.upper() in sql_upper
is_legacy_table = base_name in sql_upper and not has_model_suffix
has_workspace_filter = "WHERE workspace" in sql
if is_legacy_table and has_workspace_filter:
# Count for legacy table with workspace filter (before migration)
return {"count": 50}
elif is_legacy_table and not has_workspace_filter:
# Total count for legacy table
return {"count": 50}
else:
# New table count (before/after migration)
return {"count": migration_state["new_table_count"]}
elif multirows and "SELECT *" in sql:
# Mock batch fetch for migration using keyset pagination
# New pattern: WHERE workspace = $1 AND id > $2 ORDER BY id LIMIT $3
# or first batch: WHERE workspace = $1 ORDER BY id LIMIT $2
if "WHERE workspace" in sql:
if "id >" in sql:
# Keyset pagination: params = [workspace, last_id, limit]
last_id = params[1] if len(params) > 1 else None
# Find rows after last_id
start_idx = 0
for i, row in enumerate(mock_rows):
if row["id"] == last_id:
start_idx = i + 1
break
limit = params[2] if len(params) > 2 else 500
else:
# First batch (no last_id): params = [workspace, limit]
start_idx = 0
limit = params[1] if len(params) > 1 else 500
else:
# No workspace filter with keyset
if "id >" in sql:
last_id = params[0] if params else None
start_idx = 0
for i, row in enumerate(mock_rows):
if row["id"] == last_id:
start_idx = i + 1
break
limit = params[1] if len(params) > 1 else 500
else:
start_idx = 0
limit = params[0] if params else 500
end = min(start_idx + limit, len(mock_rows))
return mock_rows[start_idx:end]
return {}
mock_pg_db.query = AsyncMock(side_effect=mock_query)
# Track migration through _run_with_retry calls
migration_executed = []
async def mock_run_with_retry(operation, **kwargs):
# Track that migration batch operation was called
migration_executed.append(True)
migration_state["new_table_count"] = 50
return None
mock_pg_db._run_with_retry = AsyncMock(side_effect=mock_run_with_retry)
with patch(
"lightrag.kg.postgres_impl.PGVectorStorage._pg_create_table", AsyncMock()
) as mock_create:
await storage.initialize()
# Verify table name contains ada-002
assert "text_embedding_ada_002_1536d" in storage.table_name
# Verify migration was executed (batch migration uses _run_with_retry)
assert len(migration_executed) > 0, "Migration should have been executed"
mock_create.assert_called_once()
async def test_scenario_3_multi_model_coexistence(
mock_client_manager, mock_pg_db, mock_embedding_func
):
"""
Scenario 3: Multiple embedding models coexist
Expected behavior:
- Different embedding models create separate tables
- Tables are isolated by model suffix
- No interference between different models
"""
config = {
"embedding_batch_num": 10,
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
}
# Workspace A: uses bge-small (768d)
embedding_func_a = EmbeddingFunc(
embedding_dim=768, func=mock_embedding_func.func, model_name="bge-small"
)
storage_a = PGVectorStorage(
namespace=NameSpace.VECTOR_STORE_CHUNKS,
global_config=config,
embedding_func=embedding_func_a,
workspace="workspace_a",
)
# Workspace B: uses bge-large (1024d)
async def embed_func_b(texts, **kwargs):
return np.array([[0.1] * 1024 for _ in texts])
embedding_func_b = EmbeddingFunc(
embedding_dim=1024, func=embed_func_b, model_name="bge-large"
)
storage_b = PGVectorStorage(
namespace=NameSpace.VECTOR_STORE_CHUNKS,
global_config=config,
embedding_func=embedding_func_b,
workspace="workspace_b",
)
# Verify different table names
assert storage_a.table_name != storage_b.table_name
assert "bge_small_768d" in storage_a.table_name
assert "bge_large_1024d" in storage_b.table_name
# Mock: both tables don't exist yet
async def mock_check_table_exists(table_name):
return False
mock_pg_db.check_table_exists = AsyncMock(side_effect=mock_check_table_exists)
with patch(
"lightrag.kg.postgres_impl.PGVectorStorage._pg_create_table", AsyncMock()
) as mock_create:
# Initialize both storages
await storage_a.initialize()
await storage_b.initialize()
# Verify two separate tables were created
assert mock_create.call_count == 2
# Verify table names are different
call_args_list = mock_create.call_args_list
table_names = [call[0][1] for call in call_args_list] # Second positional arg
assert len(set(table_names)) == 2 # Two unique table names
assert storage_a.table_name in table_names
assert storage_b.table_name in table_names
async def test_case1_empty_legacy_auto_cleanup(
mock_client_manager, mock_pg_db, mock_embedding_func
):
"""
Case 1a: Both new and legacy tables exist, but legacy is EMPTY
Expected: Automatically delete empty legacy table (safe cleanup)
"""
config = {
"embedding_batch_num": 10,
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
}
embedding_func = EmbeddingFunc(
embedding_dim=1536,
func=mock_embedding_func.func,
model_name="test-model",
)
storage = PGVectorStorage(
namespace=NameSpace.VECTOR_STORE_CHUNKS,
global_config=config,
embedding_func=embedding_func,
workspace="test_ws",
)
# Mock: Both tables exist
async def mock_check_table_exists(table_name):
return True # Both new and legacy exist
mock_pg_db.check_table_exists = AsyncMock(side_effect=mock_check_table_exists)
# Mock: Legacy table is empty (0 records)
async def mock_query(sql, params=None, multirows=False, **kwargs):
if "COUNT(*)" in sql:
if storage.legacy_table_name in sql:
return {"count": 0} # Empty legacy table
else:
return {"count": 100} # New table has data
return {}
mock_pg_db.query = AsyncMock(side_effect=mock_query)
with patch("lightrag.kg.postgres_impl.logger"):
await storage.initialize()
# Verify: Empty legacy table should be automatically cleaned up
# Empty tables are safe to delete without data loss risk
delete_calls = [
call
for call in mock_pg_db.execute.call_args_list
if call[0][0] and "DROP TABLE" in call[0][0]
]
assert len(delete_calls) >= 1, "Empty legacy table should be auto-deleted"
# Check if legacy table was dropped
dropped_table = storage.legacy_table_name
assert any(
dropped_table in str(call) for call in delete_calls
), f"Expected to drop empty legacy table '{dropped_table}'"
print(
f"✅ Case 1a: Empty legacy table '{dropped_table}' auto-deleted successfully"
)
async def test_case1_nonempty_legacy_warning(
mock_client_manager, mock_pg_db, mock_embedding_func
):
"""
Case 1b: Both new and legacy tables exist, and legacy HAS DATA
Expected: Log warning, do not delete legacy (preserve data)
"""
config = {
"embedding_batch_num": 10,
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
}
embedding_func = EmbeddingFunc(
embedding_dim=1536,
func=mock_embedding_func.func,
model_name="test-model",
)
storage = PGVectorStorage(
namespace=NameSpace.VECTOR_STORE_CHUNKS,
global_config=config,
embedding_func=embedding_func,
workspace="test_ws",
)
# Mock: Both tables exist
async def mock_check_table_exists(table_name):
return True # Both new and legacy exist
mock_pg_db.check_table_exists = AsyncMock(side_effect=mock_check_table_exists)
# Mock: Legacy table has data (50 records)
async def mock_query(sql, params=None, multirows=False, **kwargs):
if "COUNT(*)" in sql:
if storage.legacy_table_name in sql:
return {"count": 50} # Legacy has data
else:
return {"count": 100} # New table has data
return {}
mock_pg_db.query = AsyncMock(side_effect=mock_query)
with patch("lightrag.kg.postgres_impl.logger"):
await storage.initialize()
# Verify: Legacy table with data should be preserved
# We never auto-delete tables that contain data to prevent accidental data loss
delete_calls = [
call
for call in mock_pg_db.execute.call_args_list
if call[0][0] and "DROP TABLE" in call[0][0]
]
# Check if legacy table was deleted (it should not be)
dropped_table = storage.legacy_table_name
legacy_deleted = any(dropped_table in str(call) for call in delete_calls)
assert not legacy_deleted, "Legacy table with data should NOT be auto-deleted"
print(
f"✅ Case 1b: Legacy table '{dropped_table}' with data preserved (warning only)"
)
async def test_case1_sequential_workspace_migration(
mock_client_manager, mock_pg_db, mock_embedding_func
):
"""
Case 1c: Sequential workspace migration (Multi-tenant scenario)
Critical bug fix verification:
Timeline:
1. Legacy table has workspace_a (3 records) + workspace_b (3 records)
2. Workspace A initializes first → Case 3 (only legacy exists) → migrates A's data
3. Workspace B initializes later → Case 3 (both tables exist, legacy has B's data) → should migrate B's data
4. Verify workspace B's data is correctly migrated to new table
This test verifies the migration logic correctly handles multi-tenant scenarios
where different workspaces migrate sequentially.
"""
config = {
"embedding_batch_num": 10,
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
}
embedding_func = EmbeddingFunc(
embedding_dim=1536,
func=mock_embedding_func.func,
model_name="test-model",
)
# Mock data: Legacy table has 6 records total (3 from workspace_a, 3 from workspace_b)
mock_rows_a = [
{"id": f"a_{i}", "content": f"A content {i}", "workspace": "workspace_a"}
for i in range(3)
]
mock_rows_b = [
{"id": f"b_{i}", "content": f"B content {i}", "workspace": "workspace_b"}
for i in range(3)
]
# Track migration state
migration_state = {
"new_table_exists": False,
"workspace_a_migrated": False,
"workspace_a_migration_count": 0,
"workspace_b_migration_count": 0,
}
# Step 1: Simulate workspace_a initialization (Case 3 - only legacy exists)
# CRITICAL: Set db.workspace to workspace_a
mock_pg_db.workspace = "workspace_a"
storage_a = PGVectorStorage(
namespace=NameSpace.VECTOR_STORE_CHUNKS,
global_config=config,
embedding_func=embedding_func,
workspace="workspace_a",
)
# Mock table_exists for workspace_a
async def mock_check_table_exists_a(table_name):
if table_name == storage_a.legacy_table_name:
return True
if table_name == storage_a.table_name:
return migration_state["new_table_exists"]
return False
mock_pg_db.check_table_exists = AsyncMock(side_effect=mock_check_table_exists_a)
# Mock query for workspace_a (Case 3)
async def mock_query_a(sql, params=None, multirows=False, **kwargs):
sql_upper = sql.upper()
base_name = storage_a.legacy_table_name.upper()
if "COUNT(*)" in sql:
has_model_suffix = "TEST_MODEL_1536D" in sql_upper
is_legacy = base_name in sql_upper and not has_model_suffix
has_workspace_filter = "WHERE workspace" in sql
if is_legacy and has_workspace_filter:
workspace = params[0] if params and len(params) > 0 else None
if workspace == "workspace_a":
return {"count": 3}
elif workspace == "workspace_b":
return {"count": 3}
elif is_legacy and not has_workspace_filter:
# Global count in legacy table
return {"count": 6}
elif has_model_suffix:
if has_workspace_filter:
workspace = params[0] if params and len(params) > 0 else None
if workspace == "workspace_a":
return {"count": migration_state["workspace_a_migration_count"]}
if workspace == "workspace_b":
return {"count": migration_state["workspace_b_migration_count"]}
return {
"count": migration_state["workspace_a_migration_count"]
+ migration_state["workspace_b_migration_count"]
}
elif multirows and "SELECT *" in sql:
if "WHERE workspace" in sql:
workspace = params[0] if params and len(params) > 0 else None
if workspace == "workspace_a":
# Handle keyset pagination
if "id >" in sql:
# params = [workspace, last_id, limit]
last_id = params[1] if len(params) > 1 else None
start_idx = 0
for i, row in enumerate(mock_rows_a):
if row["id"] == last_id:
start_idx = i + 1
break
limit = params[2] if len(params) > 2 else 500
else:
# First batch: params = [workspace, limit]
start_idx = 0
limit = params[1] if len(params) > 1 else 500
end = min(start_idx + limit, len(mock_rows_a))
return mock_rows_a[start_idx:end]
return {}
mock_pg_db.query = AsyncMock(side_effect=mock_query_a)
# Track migration via _run_with_retry (batch migration uses this)
migration_a_executed = []
async def mock_run_with_retry_a(operation, **kwargs):
migration_a_executed.append(True)
migration_state["workspace_a_migration_count"] = len(mock_rows_a)
return None
mock_pg_db._run_with_retry = AsyncMock(side_effect=mock_run_with_retry_a)
# Initialize workspace_a (Case 3)
with patch("lightrag.kg.postgres_impl.logger"):
await storage_a.initialize()
migration_state["new_table_exists"] = True
migration_state["workspace_a_migrated"] = True
print("✅ Step 1: Workspace A initialized")
# Verify migration was executed via _run_with_retry (batch migration uses executemany)
assert (
len(migration_a_executed) > 0
), "Migration should have been executed for workspace_a"
print(f"✅ Step 1: Migration executed {len(migration_a_executed)} batch(es)")
# Step 2: Simulate workspace_b initialization (Case 3 - both exist, but legacy has B's data)
# CRITICAL: Set db.workspace to workspace_b
mock_pg_db.workspace = "workspace_b"
storage_b = PGVectorStorage(
namespace=NameSpace.VECTOR_STORE_CHUNKS,
global_config=config,
embedding_func=embedding_func,
workspace="workspace_b",
)
mock_pg_db.reset_mock()
# Mock table_exists for workspace_b (both exist)
async def mock_check_table_exists_b(table_name):
return True # Both tables exist
mock_pg_db.check_table_exists = AsyncMock(side_effect=mock_check_table_exists_b)
# Mock query for workspace_b (Case 3)
async def mock_query_b(sql, params=None, multirows=False, **kwargs):
sql_upper = sql.upper()
base_name = storage_b.legacy_table_name.upper()
if "COUNT(*)" in sql:
has_model_suffix = "TEST_MODEL_1536D" in sql_upper
is_legacy = base_name in sql_upper and not has_model_suffix
has_workspace_filter = "WHERE workspace" in sql
if is_legacy and has_workspace_filter:
workspace = params[0] if params and len(params) > 0 else None
if workspace == "workspace_b":
return {"count": 3} # workspace_b still has data in legacy
elif workspace == "workspace_a":
return {"count": 0} # workspace_a already migrated
elif is_legacy and not has_workspace_filter:
# Global count: only workspace_b data remains
return {"count": 3}
elif has_model_suffix:
if has_workspace_filter:
workspace = params[0] if params and len(params) > 0 else None
if workspace == "workspace_b":
return {"count": migration_state["workspace_b_migration_count"]}
elif workspace == "workspace_a":
return {"count": 3}
else:
return {"count": 3 + migration_state["workspace_b_migration_count"]}
elif multirows and "SELECT *" in sql:
if "WHERE workspace" in sql:
workspace = params[0] if params and len(params) > 0 else None
if workspace == "workspace_b":
# Handle keyset pagination
if "id >" in sql:
# params = [workspace, last_id, limit]
last_id = params[1] if len(params) > 1 else None
start_idx = 0
for i, row in enumerate(mock_rows_b):
if row["id"] == last_id:
start_idx = i + 1
break
limit = params[2] if len(params) > 2 else 500
else:
# First batch: params = [workspace, limit]
start_idx = 0
limit = params[1] if len(params) > 1 else 500
end = min(start_idx + limit, len(mock_rows_b))
return mock_rows_b[start_idx:end]
return {}
mock_pg_db.query = AsyncMock(side_effect=mock_query_b)
# Track migration via _run_with_retry for workspace_b
migration_b_executed = []
async def mock_run_with_retry_b(operation, **kwargs):
migration_b_executed.append(True)
migration_state["workspace_b_migration_count"] = len(mock_rows_b)
return None
mock_pg_db._run_with_retry = AsyncMock(side_effect=mock_run_with_retry_b)
# Initialize workspace_b (Case 3 - both tables exist)
with patch("lightrag.kg.postgres_impl.logger"):
await storage_b.initialize()
print("✅ Step 2: Workspace B initialized")
# Verify workspace_b migration happens when new table has no workspace_b data
# but legacy table still has workspace_b data.
assert (
len(migration_b_executed) > 0
), "Migration should have been executed for workspace_b"
print("✅ Step 2: Migration executed for workspace_b")
print("\n🎉 Case 1c: Sequential workspace migration verification complete!")
print(" - Workspace A: Migrated successfully (only legacy existed)")
print(" - Workspace B: Migrated successfully (new table empty for workspace_b)")