diff --git a/lightrag/kg/neo4j_impl.py b/lightrag/kg/neo4j_impl.py index 38320643..6dbe3286 100644 --- a/lightrag/kg/neo4j_impl.py +++ b/lightrag/kg/neo4j_impl.py @@ -86,10 +86,33 @@ class Neo4JStorage(BaseGraphStorage): """Return workspace label (guaranteed non-empty during initialization)""" return self.workspace + def _normalize_index_suffix(self, workspace_label: str) -> str: + """Normalize workspace label for safe use in index names.""" + normalized = re.sub(r"[^A-Za-z0-9_]+", "_", workspace_label).strip("_") + if not normalized: + normalized = "base" + if not re.match(r"[A-Za-z_]", normalized[0]): + normalized = f"ws_{normalized}" + return normalized + + def _get_fulltext_index_name(self, workspace_label: str) -> str: + """Return a full-text index name derived from the normalized workspace label.""" + suffix = self._normalize_index_suffix(workspace_label) + return f"entity_id_fulltext_idx_{suffix}" + def _is_chinese_text(self, text: str) -> bool: - """Check if text contains Chinese characters.""" - chinese_pattern = re.compile(r"[\u4e00-\u9fff]+") - return bool(chinese_pattern.search(text)) + """Check if text contains Chinese/CJK characters. + + Covers: + - CJK Unified Ideographs (U+4E00-U+9FFF) + - CJK Extension A (U+3400-U+4DBF) + - CJK Compatibility Ideographs (U+F900-U+FAFF) + - CJK Extension B-F (U+20000-U+2FA1F) - supplementary planes + """ + cjk_pattern = re.compile( + r"[\u3400-\u4dbf\u4e00-\u9fff\uf900-\ufaff]|[\U00020000-\U0002fa1f]" + ) + return bool(cjk_pattern.search(text)) async def initialize(self): async with get_data_init_lock(): @@ -247,7 +270,8 @@ class Neo4JStorage(BaseGraphStorage): self, driver: AsyncDriver, database: str, workspace_label: str ): """Create a full-text index on the entity_id property with Chinese tokenizer support.""" - index_name = "entity_id_fulltext_idx" + index_name = self._get_fulltext_index_name(workspace_label) + legacy_index_name = "entity_id_fulltext_idx" try: async with driver.session(database=database) as session: # Check if the full-text index exists and get its configuration @@ -257,11 +281,34 @@ class Neo4JStorage(BaseGraphStorage): await result.consume() existing_index = None + legacy_index = None for idx in indexes: if idx["name"] == index_name: existing_index = idx + elif idx["name"] == legacy_index_name: + legacy_index = idx + # Break early if we found both indexes + if existing_index and legacy_index: break + # Handle legacy index migration + if legacy_index and not existing_index: + logger.info( + f"[{self.workspace}] Found legacy index '{legacy_index_name}'. Migrating to '{index_name}'." + ) + try: + # Drop the legacy index (use IF EXISTS for safety) + drop_query = f"DROP INDEX {legacy_index_name} IF EXISTS" + result = await session.run(drop_query) + await result.consume() + logger.info( + f"[{self.workspace}] Dropped legacy index '{legacy_index_name}'" + ) + except Exception as drop_error: + logger.warning( + f"[{self.workspace}] Failed to drop legacy index: {str(drop_error)}" + ) + # Check if index exists and is online if existing_index: index_state = existing_index.get("state", "UNKNOWN") @@ -291,10 +338,10 @@ class Neo4JStorage(BaseGraphStorage): needs_creation = existing_index is None if needs_recreation or needs_creation: - # Drop existing index if it needs recreation + # Drop existing index if it needs recreation (use IF EXISTS for safety) if needs_recreation: try: - drop_query = f"DROP INDEX {index_name}" + drop_query = f"DROP INDEX {index_name} IF EXISTS" result = await session.run(drop_query) await result.consume() logger.info( @@ -1669,7 +1716,7 @@ class Neo4JStorage(BaseGraphStorage): query_lower = query_strip.lower() is_chinese = self._is_chinese_text(query_strip) - index_name = "entity_id_fulltext_idx" + index_name = self._get_fulltext_index_name(workspace_label) # Attempt to use the full-text index first try: diff --git a/tests/test_neo4j_fulltext_index.py b/tests/test_neo4j_fulltext_index.py new file mode 100644 index 00000000..da26a1c3 --- /dev/null +++ b/tests/test_neo4j_fulltext_index.py @@ -0,0 +1,314 @@ +#!/usr/bin/env python +""" +Test Neo4j full-text index functionality, specifically: +1. Workspace-specific index naming +2. Legacy index migration +3. search_labels functionality with workspace-specific indexes +""" + +import asyncio +import os +import sys +import pytest +import numpy as np + +# Add the project root directory to the Python path +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from lightrag.kg.shared_storage import initialize_share_data + + +# Mock embedding function that returns random vectors +async def mock_embedding_func(texts): + return np.random.rand(len(texts), 10) # Return 10-dimensional random vectors + + +@pytest.fixture +async def neo4j_storage(): + """ + Initialize Neo4j storage for testing. + Requires Neo4j to be running and configured via environment variables. + """ + # Check if Neo4j is configured + if not os.getenv("NEO4J_URI"): + pytest.skip("Neo4j not configured (NEO4J_URI not set)") + + from lightrag.kg.neo4j_impl import Neo4JStorage + + # Initialize shared_storage for locks + initialize_share_data() + + global_config = { + "embedding_batch_num": 10, + "vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.5}, + "working_dir": os.environ.get("WORKING_DIR", "./rag_storage"), + } + + storage = Neo4JStorage( + namespace="test_fulltext_index", + workspace="test_workspace", + global_config=global_config, + embedding_func=mock_embedding_func, + ) + + # Initialize the connection + await storage.initialize() + + # Clean up any existing data + await storage.drop() + + yield storage + + # Cleanup + await storage.drop() + await storage.finalize() + + +@pytest.mark.integration +@pytest.mark.requires_db +async def test_fulltext_index_creation(neo4j_storage): + """ + Test that the full-text index is created with the workspace-specific name. + """ + storage = neo4j_storage + workspace_label = storage._get_workspace_label() + expected_index_name = f"entity_id_fulltext_idx_{workspace_label}" + + # Query Neo4j to check if the index exists + async with storage._driver.session(database=storage._DATABASE) as session: + result = await session.run("SHOW FULLTEXT INDEXES") + indexes = await result.data() + await result.consume() + + # Check if the workspace-specific index exists + index_names = [idx["name"] for idx in indexes] + assert ( + expected_index_name in index_names + ), f"Expected index '{expected_index_name}' not found. Found indexes: {index_names}" + + # Check if the legacy index doesn't exist (should be migrated if it was there) + legacy_index_name = "entity_id_fulltext_idx" + if legacy_index_name in index_names: + # If legacy index exists, it should be for a different workspace + # or it means migration didn't happen + print( + f"Warning: Legacy index '{legacy_index_name}' still exists alongside '{expected_index_name}'" + ) + + +@pytest.mark.integration +@pytest.mark.requires_db +async def test_search_labels_with_workspace_index(neo4j_storage): + """ + Test that search_labels uses the workspace-specific index and returns results. + """ + storage = neo4j_storage + + # Insert test nodes + test_nodes = [ + { + "entity_id": "Artificial Intelligence", + "description": "AI field", + "keywords": "AI,ML,DL", + "entity_type": "Technology", + }, + { + "entity_id": "Machine Learning", + "description": "ML subfield", + "keywords": "supervised,unsupervised", + "entity_type": "Technology", + }, + { + "entity_id": "Deep Learning", + "description": "DL subfield", + "keywords": "neural networks", + "entity_type": "Technology", + }, + { + "entity_id": "Natural Language Processing", + "description": "NLP field", + "keywords": "text,language", + "entity_type": "Technology", + }, + ] + + for node_data in test_nodes: + await storage.upsert_node(node_data["entity_id"], node_data) + + # Give the index time to become consistent (eventually consistent index) + await asyncio.sleep(2) + + # Test search_labels + results = await storage.search_labels("Learning", limit=10) + + # Should find nodes with "Learning" in them + assert len(results) > 0, "search_labels should return results for 'Learning'" + assert any( + "Learning" in result for result in results + ), "Results should contain 'Learning'" + + # Test case-insensitive search + results_lower = await storage.search_labels("learning", limit=10) + assert len(results_lower) > 0, "search_labels should be case-insensitive" + + # Test partial match + results_partial = await storage.search_labels("Intelli", limit=10) + assert ( + len(results_partial) > 0 + ), "search_labels should support partial matching with wildcard" + assert any( + "Intelligence" in result for result in results_partial + ), "Should find 'Artificial Intelligence'" + + +@pytest.mark.integration +@pytest.mark.requires_db +async def test_search_labels_chinese_text(neo4j_storage): + """ + Test that search_labels works with Chinese text using the CJK analyzer. + """ + storage = neo4j_storage + + # Insert Chinese test nodes + chinese_nodes = [ + { + "entity_id": "人工智能", + "description": "人工智能领域", + "keywords": "AI,机器学习", + "entity_type": "技术", + }, + { + "entity_id": "机器学习", + "description": "机器学习子领域", + "keywords": "监督学习,无监督学习", + "entity_type": "技术", + }, + { + "entity_id": "深度学习", + "description": "深度学习子领域", + "keywords": "神经网络", + "entity_type": "技术", + }, + ] + + for node_data in chinese_nodes: + await storage.upsert_node(node_data["entity_id"], node_data) + + # Give the index time to become consistent + await asyncio.sleep(2) + + # Test Chinese text search + results = await storage.search_labels("学习", limit=10) + + # Should find nodes with "学习" in them + assert len(results) > 0, "search_labels should return results for Chinese text" + assert any( + "学习" in result for result in results + ), "Results should contain Chinese characters '学习'" + + +@pytest.mark.integration +@pytest.mark.requires_db +async def test_search_labels_fallback_to_contains(neo4j_storage): + """ + Test that search_labels falls back to CONTAINS search if the index fails. + This can happen with older Neo4j versions or if the index is not yet available. + """ + storage = neo4j_storage + + # Insert test nodes + test_nodes = [ + { + "entity_id": "Test Node Alpha", + "description": "Test node", + "keywords": "test", + "entity_type": "Test", + }, + { + "entity_id": "Test Node Beta", + "description": "Test node", + "keywords": "test", + "entity_type": "Test", + }, + ] + + for node_data in test_nodes: + await storage.upsert_node(node_data["entity_id"], node_data) + + # Even if the full-text index is not available, CONTAINS should work + results = await storage.search_labels("Alpha", limit=10) + + # Should find the node using fallback CONTAINS search + assert len(results) > 0, "Fallback CONTAINS search should return results" + assert "Test Node Alpha" in results, "Should find 'Test Node Alpha'" + + +@pytest.mark.integration +@pytest.mark.requires_db +async def test_multiple_workspaces_have_separate_indexes(neo4j_storage): + """ + Test that different workspaces have their own separate indexes. + """ + from lightrag.kg.neo4j_impl import Neo4JStorage + + # Create storage for workspace 1 + storage1 = neo4j_storage + + # Create storage for workspace 2 + global_config = { + "embedding_batch_num": 10, + "vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.5}, + "working_dir": os.environ.get("WORKING_DIR", "./rag_storage"), + } + + storage2 = Neo4JStorage( + namespace="test_fulltext_index", + workspace="test_workspace_2", + global_config=global_config, + embedding_func=mock_embedding_func, + ) + + await storage2.initialize() + await storage2.drop() + + try: + # Check that both workspaces have their own indexes + async with storage1._driver.session(database=storage1._DATABASE) as session: + result = await session.run("SHOW FULLTEXT INDEXES") + indexes = await result.data() + await result.consume() + + index_names = [idx["name"] for idx in indexes] + workspace1_index = ( + f"entity_id_fulltext_idx_{storage1._get_workspace_label()}" + ) + workspace2_index = ( + f"entity_id_fulltext_idx_{storage2._get_workspace_label()}" + ) + + assert ( + workspace1_index in index_names + ), f"Workspace 1 index '{workspace1_index}' should exist" + assert ( + workspace2_index in index_names + ), f"Workspace 2 index '{workspace2_index}' should exist" + + finally: + # Clean up: drop the fulltext index created for workspace 2 to prevent accumulation + try: + async with storage2._driver.session(database=storage2._DATABASE) as session: + index_name = storage2._get_fulltext_index_name( + storage2._get_workspace_label() + ) + drop_query = f"DROP INDEX {index_name} IF EXISTS" + result = await session.run(drop_query) + await result.consume() + except Exception: + pass # Ignore errors during cleanup + await storage2.drop() + await storage2.finalize() + + +if __name__ == "__main__": + # Run tests with pytest + pytest.main([__file__, "-v", "--run-integration"])