diff --git a/.github/workflows/e2e-tests.yml b/.github/workflows/e2e-tests.yml new file mode 100644 index 00000000..7fd969e5 --- /dev/null +++ b/.github/workflows/e2e-tests.yml @@ -0,0 +1,181 @@ +name: E2E Tests (Real Databases) + +on: + workflow_dispatch: # Manual trigger only for E2E tests + pull_request: + branches: [ main, dev ] + paths: + - 'lightrag/kg/postgres_impl.py' + - 'lightrag/kg/qdrant_impl.py' + - 'tests/test_e2e_*.py' + +jobs: + e2e-postgres: + name: E2E PostgreSQL Tests + runs-on: ubuntu-latest + + services: + postgres: + image: ankane/pgvector:latest + env: + POSTGRES_USER: lightrag + POSTGRES_PASSWORD: lightrag_test_password + POSTGRES_DB: lightrag_test + ports: + - 5432:5432 + options: >- + --health-cmd "pg_isready -U lightrag" + --health-interval 10s + --health-timeout 5s + --health-retries 5 + + strategy: + matrix: + python-version: ['3.10', '3.12'] + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Cache pip packages + uses: actions/cache@v4 + with: + path: ~/.cache/pip + key: ${{ runner.os }}-pip-e2e-${{ hashFiles('**/pyproject.toml') }} + restore-keys: | + ${{ runner.os }}-pip-e2e- + ${{ runner.os }}-pip- + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -e ".[api]" + pip install pytest pytest-asyncio asyncpg numpy + + - name: Wait for PostgreSQL + run: | + timeout 30 bash -c 'until pg_isready -h localhost -p 5432 -U lightrag; do sleep 1; done' + + - name: Setup pgvector extension + env: + PGPASSWORD: lightrag_test_password + run: | + psql -h localhost -U lightrag -d lightrag_test -c "CREATE EXTENSION IF NOT EXISTS vector;" + psql -h localhost -U lightrag -d lightrag_test -c "SELECT extname, extversion FROM pg_extension WHERE extname = 'vector';" + + - name: Run PostgreSQL E2E tests + env: + POSTGRES_HOST: localhost + POSTGRES_PORT: 5432 + POSTGRES_USER: lightrag + POSTGRES_PASSWORD: lightrag_test_password + POSTGRES_DB: lightrag_test + POSTGRES_WORKSPACE: e2e_test + run: | + pytest tests/test_e2e_postgres_migration.py -v --tb=short -s + timeout-minutes: 10 + + - name: Upload PostgreSQL test results + if: always() + uses: actions/upload-artifact@v4 + with: + name: e2e-postgres-results-py${{ matrix.python-version }} + path: | + .pytest_cache/ + test-results.xml + retention-days: 7 + + e2e-qdrant: + name: E2E Qdrant Tests + runs-on: ubuntu-latest + + services: + qdrant: + image: qdrant/qdrant:latest + ports: + - 6333:6333 + - 6334:6334 + options: >- + --health-cmd "curl -f http://localhost:6333/health || exit 1" + --health-interval 10s + --health-timeout 5s + --health-retries 5 + + strategy: + matrix: + python-version: ['3.10', '3.12'] + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Cache pip packages + uses: actions/cache@v4 + with: + path: ~/.cache/pip + key: ${{ runner.os }}-pip-e2e-${{ hashFiles('**/pyproject.toml') }} + restore-keys: | + ${{ runner.os }}-pip-e2e- + ${{ runner.os }}-pip- + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -e ".[api]" + pip install pytest pytest-asyncio qdrant-client numpy + + - name: Wait for Qdrant + run: | + timeout 30 bash -c 'until curl -f http://localhost:6333/health > /dev/null 2>&1; do sleep 1; done' + echo "Qdrant is ready" + + - name: Verify Qdrant connection + run: | + curl -X GET "http://localhost:6333/collections" -H "Content-Type: application/json" + + - name: Run Qdrant E2E tests + env: + QDRANT_URL: http://localhost:6333 + QDRANT_API_KEY: "" + run: | + pytest tests/test_e2e_qdrant_migration.py -v --tb=short -s + timeout-minutes: 10 + + - name: Upload Qdrant test results + if: always() + uses: actions/upload-artifact@v4 + with: + name: e2e-qdrant-results-py${{ matrix.python-version }} + path: | + .pytest_cache/ + test-results.xml + retention-days: 7 + + e2e-summary: + name: E2E Test Summary + runs-on: ubuntu-latest + needs: [e2e-postgres, e2e-qdrant] + if: always() + + steps: + - name: Check test results + run: | + echo "## E2E Test Summary" >> $GITHUB_STEP_SUMMARY + echo "" >> $GITHUB_STEP_SUMMARY + echo "### PostgreSQL E2E Tests" >> $GITHUB_STEP_SUMMARY + echo "Status: ${{ needs.e2e-postgres.result }}" >> $GITHUB_STEP_SUMMARY + echo "" >> $GITHUB_STEP_SUMMARY + echo "### Qdrant E2E Tests" >> $GITHUB_STEP_SUMMARY + echo "Status: ${{ needs.e2e-qdrant.result }}" >> $GITHUB_STEP_SUMMARY + + - name: Fail if any test failed + if: needs.e2e-postgres.result != 'success' || needs.e2e-qdrant.result != 'success' + run: exit 1 diff --git a/tests/test_e2e_postgres_migration.py b/tests/test_e2e_postgres_migration.py new file mode 100644 index 00000000..2950a8cb --- /dev/null +++ b/tests/test_e2e_postgres_migration.py @@ -0,0 +1,350 @@ +""" +E2E Tests for PostgreSQL Vector Storage Model Isolation + +These tests use a REAL PostgreSQL database with pgvector extension. +Unlike unit tests, these verify actual database operations, data migration, +and multi-model isolation scenarios. + +Prerequisites: +- PostgreSQL with pgvector extension +- Environment variables: POSTGRES_HOST, POSTGRES_PORT, POSTGRES_USER, POSTGRES_PASSWORD, POSTGRES_DB +""" + +import os +import pytest +import asyncio +import numpy as np +from lightrag.utils import EmbeddingFunc +from lightrag.kg.postgres_impl import PGVectorStorage, PostgreSQLDB, ClientManager +from lightrag.namespace import NameSpace + + +# E2E test configuration from environment +@pytest.fixture(scope="module") +def pg_config(): + """Real PostgreSQL configuration from environment variables""" + return { + "host": os.getenv("POSTGRES_HOST", "localhost"), + "port": int(os.getenv("POSTGRES_PORT", "5432")), + "user": os.getenv("POSTGRES_USER", "lightrag"), + "password": os.getenv("POSTGRES_PASSWORD", "lightrag_test_password"), + "database": os.getenv("POSTGRES_DB", "lightrag_test"), + "workspace": os.getenv("POSTGRES_WORKSPACE", "e2e_test"), + "max_connections": 10, + } + + +@pytest.fixture(scope="module") +async def real_db(pg_config): + """Create a real PostgreSQL database connection""" + db = PostgreSQLDB(pg_config) + await db.initdb() + yield db + # Cleanup: close connection pool + if db.pool: + await db.pool.close() + + +@pytest.fixture +async def cleanup_tables(real_db): + """Cleanup test tables before and after each test""" + # Cleanup before test + tables_to_drop = [ + "LIGHTRAG_VDB_CHUNKS", + "LIGHTRAG_VDB_CHUNKS_test_model_768d", + "LIGHTRAG_VDB_CHUNKS_text_embedding_ada_002_1536d", + "LIGHTRAG_VDB_CHUNKS_bge_small_768d", + "LIGHTRAG_VDB_CHUNKS_bge_large_1024d", + ] + + for table in tables_to_drop: + try: + await real_db.execute(f"DROP TABLE IF EXISTS {table} CASCADE", None) + except Exception: + pass + + yield + + # Cleanup after test + for table in tables_to_drop: + try: + await real_db.execute(f"DROP TABLE IF EXISTS {table} CASCADE", None) + except Exception: + pass + + +@pytest.fixture +def mock_embedding_func(): + """Create a mock embedding function for testing""" + async def embed_func(texts, **kwargs): + # Generate fake embeddings with consistent dimension + return np.array([[0.1] * 768 for _ in texts]) + + return EmbeddingFunc( + embedding_dim=768, + func=embed_func, + model_name="test_model" + ) + + +@pytest.mark.asyncio +async def test_e2e_fresh_installation(real_db, cleanup_tables, mock_embedding_func, pg_config): + """ + E2E Test: Fresh installation with model_name specified + + Scenario: New workspace, no legacy data + Expected: Create new table with model suffix, no migration needed + """ + print("\n[E2E Test] Fresh installation with model_name") + + # Reset ClientManager to use our test config + ClientManager._instance = None + ClientManager._client_config = pg_config + + # Create storage with model_name + storage = PGVectorStorage( + namespace=NameSpace.VECTOR_STORE_CHUNKS, + global_config={ + "embedding_batch_num": 10, + "vector_db_storage_cls_kwargs": { + "cosine_better_than_threshold": 0.8 + } + }, + embedding_func=mock_embedding_func, + workspace="e2e_test" + ) + + # Initialize storage (should create new table) + await storage.initialize() + + # Verify table name + assert "test_model_768d" in storage.table_name + expected_table = "LIGHTRAG_VDB_CHUNKS_test_model_768d" + assert storage.table_name == expected_table + + # Verify table exists + check_query = """ + SELECT EXISTS ( + SELECT FROM information_schema.tables + WHERE table_name = $1 + ) + """ + result = await real_db.query(check_query, [expected_table.lower()]) + assert result.get("exists") == True, f"Table {expected_table} should exist" + + # Verify legacy table does NOT exist + legacy_result = await real_db.query(check_query, ["LIGHTRAG_VDB_CHUNKS".lower()]) + assert legacy_result.get("exists") == False, "Legacy table should not exist" + + print(f"✅ Fresh installation successful: {expected_table} created") + + await storage.finalize() + + +@pytest.mark.asyncio +async def test_e2e_legacy_migration(real_db, cleanup_tables, pg_config): + """ + E2E Test: Upgrade from legacy format with automatic migration + + Scenario: + 1. Create legacy table (without model suffix) + 2. Insert test data + 3. Initialize with model_name (triggers migration) + 4. Verify data migrated to new table + """ + print("\n[E2E Test] Legacy data migration") + + # Step 1: Create legacy table and insert data + legacy_table = "LIGHTRAG_VDB_CHUNKS" + + create_legacy_sql = f""" + CREATE TABLE IF NOT EXISTS {legacy_table} ( + workspace VARCHAR(255), + id VARCHAR(255) PRIMARY KEY, + content TEXT, + content_vector vector(1536), + tokens INTEGER, + chunk_order_index INTEGER, + full_doc_id VARCHAR(255), + file_path TEXT, + create_time TIMESTAMP, + update_time TIMESTAMP + ) + """ + await real_db.execute(create_legacy_sql, None) + + # Insert test data into legacy table + test_data = [ + ("e2e_test", f"legacy_doc_{i}", f"Legacy content {i}", + [0.1] * 1536, 100, i, "legacy_doc", "/test/path", "NOW()", "NOW()") + for i in range(10) + ] + + for data in test_data: + insert_sql = f""" + INSERT INTO {legacy_table} + (workspace, id, content, content_vector, tokens, chunk_order_index, full_doc_id, file_path, create_time, update_time) + VALUES ($1, $2, $3, $4::vector, $5, $6, $7, $8, {data[8]}, {data[9]}) + """ + await real_db.execute(insert_sql, { + "workspace": data[0], + "id": data[1], + "content": data[2], + "content_vector": data[3], + "tokens": data[4], + "chunk_order_index": data[5], + "full_doc_id": data[6], + "file_path": data[7] + }) + + # Verify legacy data exists + count_result = await real_db.query(f"SELECT COUNT(*) as count FROM {legacy_table} WHERE workspace=$1", ["e2e_test"]) + legacy_count = count_result.get("count", 0) + assert legacy_count == 10, f"Expected 10 records in legacy table, got {legacy_count}" + print(f"✅ Legacy table created with {legacy_count} records") + + # Step 2: Initialize storage with model_name (triggers migration) + ClientManager._instance = None + ClientManager._client_config = pg_config + + async def embed_func(texts, **kwargs): + return np.array([[0.1] * 1536 for _ in texts]) + + embedding_func = EmbeddingFunc( + embedding_dim=1536, + func=embed_func, + model_name="text-embedding-ada-002" + ) + + storage = PGVectorStorage( + namespace=NameSpace.VECTOR_STORE_CHUNKS, + global_config={ + "embedding_batch_num": 10, + "vector_db_storage_cls_kwargs": { + "cosine_better_than_threshold": 0.8 + } + }, + embedding_func=embedding_func, + workspace="e2e_test" + ) + + # Initialize (should trigger migration) + print("🔄 Starting migration...") + await storage.initialize() + print("✅ Migration completed") + + # Step 3: Verify migration + new_table = storage.table_name + assert "text_embedding_ada_002_1536d" in new_table + + # Count records in new table + new_count_result = await real_db.query(f"SELECT COUNT(*) as count FROM {new_table} WHERE workspace=$1", ["e2e_test"]) + new_count = new_count_result.get("count", 0) + + assert new_count == legacy_count, f"Expected {legacy_count} records in new table, got {new_count}" + print(f"✅ Data migration verified: {new_count}/{legacy_count} records migrated") + + # Verify data content + sample_result = await real_db.query(f"SELECT id, content FROM {new_table} WHERE workspace=$1 LIMIT 1", ["e2e_test"]) + assert sample_result is not None + assert "Legacy content" in sample_result.get("content", "") + print(f"✅ Data integrity verified: {sample_result.get('id')}") + + await storage.finalize() + + +@pytest.mark.asyncio +async def test_e2e_multi_model_coexistence(real_db, cleanup_tables, pg_config): + """ + E2E Test: Multiple embedding models coexisting + + Scenario: + 1. Create storage with model A (768d) + 2. Create storage with model B (1024d) + 3. Verify separate tables created + 4. Verify data isolation + """ + print("\n[E2E Test] Multi-model coexistence") + + ClientManager._instance = None + ClientManager._client_config = pg_config + + # Model A: 768 dimensions + async def embed_func_a(texts, **kwargs): + return np.array([[0.1] * 768 for _ in texts]) + + embedding_func_a = EmbeddingFunc( + embedding_dim=768, + func=embed_func_a, + model_name="bge-small" + ) + + storage_a = PGVectorStorage( + namespace=NameSpace.VECTOR_STORE_CHUNKS, + global_config={ + "embedding_batch_num": 10, + "vector_db_storage_cls_kwargs": { + "cosine_better_than_threshold": 0.8 + } + }, + embedding_func=embedding_func_a, + workspace="e2e_test" + ) + + await storage_a.initialize() + table_a = storage_a.table_name + assert "bge_small_768d" in table_a + print(f"✅ Model A table created: {table_a}") + + # Model B: 1024 dimensions + async def embed_func_b(texts, **kwargs): + return np.array([[0.1] * 1024 for _ in texts]) + + embedding_func_b = EmbeddingFunc( + embedding_dim=1024, + func=embed_func_b, + model_name="bge-large" + ) + + storage_b = PGVectorStorage( + namespace=NameSpace.VECTOR_STORE_CHUNKS, + global_config={ + "embedding_batch_num": 10, + "vector_db_storage_cls_kwargs": { + "cosine_better_than_threshold": 0.8 + } + }, + embedding_func=embedding_func_b, + workspace="e2e_test" + ) + + await storage_b.initialize() + table_b = storage_b.table_name + assert "bge_large_1024d" in table_b + print(f"✅ Model B table created: {table_b}") + + # Verify tables are different + assert table_a != table_b, "Tables should have different names" + print(f"✅ Table isolation verified: {table_a} != {table_b}") + + # Verify both tables exist + check_query = """ + SELECT EXISTS ( + SELECT FROM information_schema.tables + WHERE table_name = $1 + ) + """ + result_a = await real_db.query(check_query, [table_a.lower()]) + result_b = await real_db.query(check_query, [table_b.lower()]) + + assert result_a.get("exists") == True + assert result_b.get("exists") == True + print("✅ Both tables exist in database") + + await storage_a.finalize() + await storage_b.finalize() + + +if __name__ == "__main__": + # Run tests with pytest + pytest.main([__file__, "-v", "-s"]) diff --git a/tests/test_e2e_qdrant_migration.py b/tests/test_e2e_qdrant_migration.py new file mode 100644 index 00000000..c6980221 --- /dev/null +++ b/tests/test_e2e_qdrant_migration.py @@ -0,0 +1,346 @@ +""" +E2E Tests for Qdrant Vector Storage Model Isolation + +These tests use a REAL Qdrant server. +Unlike unit tests, these verify actual collection operations, data migration, +and multi-model isolation scenarios. + +Prerequisites: +- Qdrant server running +- Environment variables: QDRANT_URL (optional QDRANT_API_KEY) +""" + +import os +import pytest +import asyncio +import numpy as np +from lightrag.utils import EmbeddingFunc +from lightrag.kg.qdrant_impl import QdrantVectorDBStorage +from lightrag.namespace import NameSpace +from qdrant_client import QdrantClient +from qdrant_client.models import Distance, VectorParams + + +# E2E test configuration from environment +@pytest.fixture(scope="module") +def qdrant_config(): + """Real Qdrant configuration from environment variables""" + return { + "url": os.getenv("QDRANT_URL", "http://localhost:6333"), + "api_key": os.getenv("QDRANT_API_KEY", None), + } + + +@pytest.fixture(scope="module") +def qdrant_client(qdrant_config): + """Create a real Qdrant client""" + client = QdrantClient( + url=qdrant_config["url"], + api_key=qdrant_config["api_key"], + timeout=60, + ) + yield client + # Client auto-closes + + +@pytest.fixture +async def cleanup_collections(qdrant_client): + """Cleanup test collections before and after each test""" + collections_to_delete = [ + "lightrag_vdb_chunks", # legacy + "e2e_test_chunks", # legacy with workspace + "lightrag_vdb_chunks_test_model_768d", + "lightrag_vdb_chunks_text_embedding_ada_002_1536d", + "lightrag_vdb_chunks_bge_small_768d", + "lightrag_vdb_chunks_bge_large_1024d", + ] + + # Cleanup before test + for collection in collections_to_delete: + try: + if qdrant_client.collection_exists(collection): + qdrant_client.delete_collection(collection) + except Exception: + pass + + yield + + # Cleanup after test + for collection in collections_to_delete: + try: + if qdrant_client.collection_exists(collection): + qdrant_client.delete_collection(collection) + except Exception: + pass + + +@pytest.fixture +def mock_embedding_func(): + """Create a mock embedding function for testing""" + async def embed_func(texts, **kwargs): + return np.array([[0.1] * 768 for _ in texts]) + + return EmbeddingFunc( + embedding_dim=768, + func=embed_func, + model_name="test_model" + ) + + +@pytest.mark.asyncio +async def test_e2e_qdrant_fresh_installation(qdrant_client, cleanup_collections, mock_embedding_func, qdrant_config): + """ + E2E Test: Fresh Qdrant installation with model_name specified + + Scenario: New workspace, no legacy collection + Expected: Create new collection with model suffix, no migration needed + """ + print("\n[E2E Test] Fresh Qdrant installation with model_name") + + # Create storage with model_name + storage = QdrantVectorDBStorage( + namespace=NameSpace.VECTOR_STORE_CHUNKS, + global_config={ + "embedding_batch_num": 10, + "vector_db_storage_cls_kwargs": { + "url": qdrant_config["url"], + "api_key": qdrant_config["api_key"], + "cosine_better_than_threshold": 0.8, + } + }, + embedding_func=mock_embedding_func, + workspace="e2e_test" + ) + + # Initialize storage (should create new collection) + await storage.initialize() + + # Verify collection name + assert "test_model_768d" in storage.final_namespace + expected_collection = "lightrag_vdb_chunks_test_model_768d" + assert storage.final_namespace == expected_collection + + # Verify collection exists + assert qdrant_client.collection_exists(expected_collection), \ + f"Collection {expected_collection} should exist" + + # Verify collection properties + collection_info = qdrant_client.get_collection(expected_collection) + assert collection_info.vectors_count == 0, "New collection should be empty" + print(f"✅ Fresh installation successful: {expected_collection} created") + + # Verify legacy collection does NOT exist + assert not qdrant_client.collection_exists("lightrag_vdb_chunks"), \ + "Legacy collection should not exist" + assert not qdrant_client.collection_exists("e2e_test_chunks"), \ + "Legacy workspace collection should not exist" + + await storage.finalize() + + +@pytest.mark.asyncio +async def test_e2e_qdrant_legacy_migration(qdrant_client, cleanup_collections, qdrant_config): + """ + E2E Test: Upgrade from legacy Qdrant collection with automatic migration + + Scenario: + 1. Create legacy collection (without model suffix) + 2. Insert test data + 3. Initialize with model_name (triggers migration) + 4. Verify data migrated to new collection + """ + print("\n[E2E Test] Legacy Qdrant collection migration") + + # Step 1: Create legacy collection and insert data + legacy_collection = "e2e_test_chunks" # workspace-prefixed legacy name + + qdrant_client.create_collection( + collection_name=legacy_collection, + vectors_config=VectorParams(size=1536, distance=Distance.COSINE), + ) + + # Insert test data into legacy collection + from qdrant_client.models import PointStruct + + test_points = [ + PointStruct( + id=i, + vector=[0.1] * 1536, + payload={ + "workspace_id": "e2e_test", + "content": f"Legacy content {i}", + "id": f"legacy_doc_{i}", + } + ) + for i in range(10) + ] + + qdrant_client.upsert( + collection_name=legacy_collection, + points=test_points, + wait=True, + ) + + # Verify legacy data exists + legacy_info = qdrant_client.get_collection(legacy_collection) + legacy_count = legacy_info.vectors_count + assert legacy_count == 10, f"Expected 10 vectors in legacy collection, got {legacy_count}" + print(f"✅ Legacy collection created with {legacy_count} vectors") + + # Step 2: Initialize storage with model_name (triggers migration) + async def embed_func(texts, **kwargs): + return np.array([[0.1] * 1536 for _ in texts]) + + embedding_func = EmbeddingFunc( + embedding_dim=1536, + func=embed_func, + model_name="text-embedding-ada-002" + ) + + storage = QdrantVectorDBStorage( + namespace=NameSpace.VECTOR_STORE_CHUNKS, + global_config={ + "embedding_batch_num": 10, + "vector_db_storage_cls_kwargs": { + "url": qdrant_config["url"], + "api_key": qdrant_config["api_key"], + "cosine_better_than_threshold": 0.8, + } + }, + embedding_func=embedding_func, + workspace="e2e_test" + ) + + # Initialize (should trigger migration) + print("🔄 Starting migration...") + await storage.initialize() + print("✅ Migration completed") + + # Step 3: Verify migration + new_collection = storage.final_namespace + assert "text_embedding_ada_002_1536d" in new_collection + + # Verify new collection exists and has data + assert qdrant_client.collection_exists(new_collection), \ + f"New collection {new_collection} should exist" + + new_info = qdrant_client.get_collection(new_collection) + new_count = new_info.vectors_count + + assert new_count == legacy_count, \ + f"Expected {legacy_count} vectors in new collection, got {new_count}" + print(f"✅ Data migration verified: {new_count}/{legacy_count} vectors migrated") + + # Verify data content + sample_points = qdrant_client.scroll( + collection_name=new_collection, + limit=1, + with_payload=True, + )[0] + + assert len(sample_points) > 0, "Should have at least one point" + sample = sample_points[0] + assert "Legacy content" in sample.payload.get("content", "") + print(f"✅ Data integrity verified: {sample.payload.get('id')}") + + await storage.finalize() + + +@pytest.mark.asyncio +async def test_e2e_qdrant_multi_model_coexistence(qdrant_client, cleanup_collections, qdrant_config): + """ + E2E Test: Multiple embedding models coexisting in Qdrant + + Scenario: + 1. Create storage with model A (768d) + 2. Create storage with model B (1024d) + 3. Verify separate collections created + 4. Verify data isolation + """ + print("\n[E2E Test] Multi-model coexistence in Qdrant") + + # Model A: 768 dimensions + async def embed_func_a(texts, **kwargs): + return np.array([[0.1] * 768 for _ in texts]) + + embedding_func_a = EmbeddingFunc( + embedding_dim=768, + func=embed_func_a, + model_name="bge-small" + ) + + storage_a = QdrantVectorDBStorage( + namespace=NameSpace.VECTOR_STORE_CHUNKS, + global_config={ + "embedding_batch_num": 10, + "vector_db_storage_cls_kwargs": { + "url": qdrant_config["url"], + "api_key": qdrant_config["api_key"], + "cosine_better_than_threshold": 0.8, + } + }, + embedding_func=embedding_func_a, + workspace="e2e_test" + ) + + await storage_a.initialize() + collection_a = storage_a.final_namespace + assert "bge_small_768d" in collection_a + print(f"✅ Model A collection created: {collection_a}") + + # Model B: 1024 dimensions + async def embed_func_b(texts, **kwargs): + return np.array([[0.1] * 1024 for _ in texts]) + + embedding_func_b = EmbeddingFunc( + embedding_dim=1024, + func=embed_func_b, + model_name="bge-large" + ) + + storage_b = QdrantVectorDBStorage( + namespace=NameSpace.VECTOR_STORE_CHUNKS, + global_config={ + "embedding_batch_num": 10, + "vector_db_storage_cls_kwargs": { + "url": qdrant_config["url"], + "api_key": qdrant_config["api_key"], + "cosine_better_than_threshold": 0.8, + } + }, + embedding_func=embedding_func_b, + workspace="e2e_test" + ) + + await storage_b.initialize() + collection_b = storage_b.final_namespace + assert "bge_large_1024d" in collection_b + print(f"✅ Model B collection created: {collection_b}") + + # Verify collections are different + assert collection_a != collection_b, "Collections should have different names" + print(f"✅ Collection isolation verified: {collection_a} != {collection_b}") + + # Verify both collections exist + assert qdrant_client.collection_exists(collection_a), \ + f"Collection {collection_a} should exist" + assert qdrant_client.collection_exists(collection_b), \ + f"Collection {collection_b} should exist" + print("✅ Both collections exist in Qdrant") + + # Verify vector dimensions + info_a = qdrant_client.get_collection(collection_a) + info_b = qdrant_client.get_collection(collection_b) + + # Qdrant stores vector config in config.params.vectors + assert info_a.config.params.vectors.size == 768, "Model A should use 768 dimensions" + assert info_b.config.params.vectors.size == 1024, "Model B should use 1024 dimensions" + print(f"✅ Vector dimensions verified: {info_a.config.params.vectors.size}d vs {info_b.config.params.vectors.size}d") + + await storage_a.finalize() + await storage_b.finalize() + + +if __name__ == "__main__": + # Run tests with pytest + pytest.main([__file__, "-v", "-s"])