Fix Neo4j fulltext index name mismatch between creation and query
This commit is contained in:
@@ -231,6 +231,7 @@ class Neo4JStorage(BaseGraphStorage):
|
||||
):
|
||||
"""Create a full-text index on the entity_id property with Chinese tokenizer support."""
|
||||
index_name = f"entity_id_fulltext_idx_{workspace_label}"
|
||||
legacy_index_name = "entity_id_fulltext_idx"
|
||||
try:
|
||||
async with driver.session(database=database) as session:
|
||||
# Check if the full-text index exists and get its configuration
|
||||
@@ -240,11 +241,34 @@ class Neo4JStorage(BaseGraphStorage):
|
||||
await result.consume()
|
||||
|
||||
existing_index = None
|
||||
legacy_index = None
|
||||
for idx in indexes:
|
||||
if idx["name"] == index_name:
|
||||
existing_index = idx
|
||||
elif idx["name"] == legacy_index_name:
|
||||
legacy_index = idx
|
||||
# Break early if we found both indexes
|
||||
if existing_index and legacy_index:
|
||||
break
|
||||
|
||||
# Handle legacy index migration
|
||||
if legacy_index and not existing_index:
|
||||
logger.info(
|
||||
f"[{self.workspace}] Found legacy index '{legacy_index_name}'. Migrating to '{index_name}'."
|
||||
)
|
||||
try:
|
||||
# Drop the legacy index
|
||||
drop_query = f"DROP INDEX {legacy_index_name}"
|
||||
result = await session.run(drop_query)
|
||||
await result.consume()
|
||||
logger.info(
|
||||
f"[{self.workspace}] Dropped legacy index '{legacy_index_name}'"
|
||||
)
|
||||
except Exception as drop_error:
|
||||
logger.warning(
|
||||
f"[{self.workspace}] Failed to drop legacy index: {str(drop_error)}"
|
||||
)
|
||||
|
||||
# Check if index exists and is online
|
||||
if existing_index:
|
||||
index_state = existing_index.get("state", "UNKNOWN")
|
||||
@@ -1641,7 +1665,7 @@ class Neo4JStorage(BaseGraphStorage):
|
||||
|
||||
query_lower = query_strip.lower()
|
||||
is_chinese = self._is_chinese_text(query_strip)
|
||||
index_name = "entity_id_fulltext_idx"
|
||||
index_name = f"entity_id_fulltext_idx_{workspace_label}"
|
||||
|
||||
# Attempt to use the full-text index first
|
||||
try:
|
||||
|
||||
299
tests/test_neo4j_fulltext_index.py
Normal file
299
tests/test_neo4j_fulltext_index.py
Normal file
@@ -0,0 +1,299 @@
|
||||
#!/usr/bin/env python
|
||||
"""
|
||||
Test Neo4j full-text index functionality, specifically:
|
||||
1. Workspace-specific index naming
|
||||
2. Legacy index migration
|
||||
3. search_labels functionality with workspace-specific indexes
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
import pytest
|
||||
import numpy as np
|
||||
|
||||
# Add the project root directory to the Python path
|
||||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
from lightrag.kg.shared_storage import initialize_share_data
|
||||
|
||||
|
||||
# Mock embedding function that returns random vectors
|
||||
async def mock_embedding_func(texts):
|
||||
return np.random.rand(len(texts), 10) # Return 10-dimensional random vectors
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def neo4j_storage():
|
||||
"""
|
||||
Initialize Neo4j storage for testing.
|
||||
Requires Neo4j to be running and configured via environment variables.
|
||||
"""
|
||||
# Check if Neo4j is configured
|
||||
if not os.getenv("NEO4J_URI"):
|
||||
pytest.skip("Neo4j not configured (NEO4J_URI not set)")
|
||||
|
||||
from lightrag.kg.neo4j_impl import Neo4JStorage
|
||||
|
||||
# Initialize shared_storage for locks
|
||||
initialize_share_data()
|
||||
|
||||
global_config = {
|
||||
"embedding_batch_num": 10,
|
||||
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.5},
|
||||
"working_dir": os.environ.get("WORKING_DIR", "./rag_storage"),
|
||||
}
|
||||
|
||||
storage = Neo4JStorage(
|
||||
namespace="test_fulltext_index",
|
||||
workspace="test_workspace",
|
||||
global_config=global_config,
|
||||
embedding_func=mock_embedding_func,
|
||||
)
|
||||
|
||||
# Initialize the connection
|
||||
await storage.initialize()
|
||||
|
||||
# Clean up any existing data
|
||||
await storage.drop()
|
||||
|
||||
yield storage
|
||||
|
||||
# Cleanup
|
||||
await storage.drop()
|
||||
await storage.finalize()
|
||||
|
||||
|
||||
@pytest.mark.integration
|
||||
@pytest.mark.requires_db
|
||||
async def test_fulltext_index_creation(neo4j_storage):
|
||||
"""
|
||||
Test that the full-text index is created with the workspace-specific name.
|
||||
"""
|
||||
storage = neo4j_storage
|
||||
workspace_label = storage._get_workspace_label()
|
||||
expected_index_name = f"entity_id_fulltext_idx_{workspace_label}"
|
||||
|
||||
# Query Neo4j to check if the index exists
|
||||
async with storage._driver.session(database=storage._DATABASE) as session:
|
||||
result = await session.run("SHOW FULLTEXT INDEXES")
|
||||
indexes = await result.data()
|
||||
await result.consume()
|
||||
|
||||
# Check if the workspace-specific index exists
|
||||
index_names = [idx["name"] for idx in indexes]
|
||||
assert (
|
||||
expected_index_name in index_names
|
||||
), f"Expected index '{expected_index_name}' not found. Found indexes: {index_names}"
|
||||
|
||||
# Check if the legacy index doesn't exist (should be migrated if it was there)
|
||||
legacy_index_name = "entity_id_fulltext_idx"
|
||||
if legacy_index_name in index_names:
|
||||
# If legacy index exists, it should be for a different workspace
|
||||
# or it means migration didn't happen
|
||||
print(
|
||||
f"Warning: Legacy index '{legacy_index_name}' still exists alongside '{expected_index_name}'"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.integration
|
||||
@pytest.mark.requires_db
|
||||
async def test_search_labels_with_workspace_index(neo4j_storage):
|
||||
"""
|
||||
Test that search_labels uses the workspace-specific index and returns results.
|
||||
"""
|
||||
storage = neo4j_storage
|
||||
|
||||
# Insert test nodes
|
||||
test_nodes = [
|
||||
{
|
||||
"entity_id": "Artificial Intelligence",
|
||||
"description": "AI field",
|
||||
"keywords": "AI,ML,DL",
|
||||
"entity_type": "Technology",
|
||||
},
|
||||
{
|
||||
"entity_id": "Machine Learning",
|
||||
"description": "ML subfield",
|
||||
"keywords": "supervised,unsupervised",
|
||||
"entity_type": "Technology",
|
||||
},
|
||||
{
|
||||
"entity_id": "Deep Learning",
|
||||
"description": "DL subfield",
|
||||
"keywords": "neural networks",
|
||||
"entity_type": "Technology",
|
||||
},
|
||||
{
|
||||
"entity_id": "Natural Language Processing",
|
||||
"description": "NLP field",
|
||||
"keywords": "text,language",
|
||||
"entity_type": "Technology",
|
||||
},
|
||||
]
|
||||
|
||||
for node_data in test_nodes:
|
||||
await storage.upsert_node(node_data["entity_id"], node_data)
|
||||
|
||||
# Give the index time to become consistent (eventually consistent index)
|
||||
await asyncio.sleep(2)
|
||||
|
||||
# Test search_labels
|
||||
results = await storage.search_labels("Learning", limit=10)
|
||||
|
||||
# Should find nodes with "Learning" in them
|
||||
assert len(results) > 0, "search_labels should return results for 'Learning'"
|
||||
assert any(
|
||||
"Learning" in result for result in results
|
||||
), "Results should contain 'Learning'"
|
||||
|
||||
# Test case-insensitive search
|
||||
results_lower = await storage.search_labels("learning", limit=10)
|
||||
assert len(results_lower) > 0, "search_labels should be case-insensitive"
|
||||
|
||||
# Test partial match
|
||||
results_partial = await storage.search_labels("Intelli", limit=10)
|
||||
assert (
|
||||
len(results_partial) > 0
|
||||
), "search_labels should support partial matching with wildcard"
|
||||
assert any(
|
||||
"Intelligence" in result for result in results_partial
|
||||
), "Should find 'Artificial Intelligence'"
|
||||
|
||||
|
||||
@pytest.mark.integration
|
||||
@pytest.mark.requires_db
|
||||
async def test_search_labels_chinese_text(neo4j_storage):
|
||||
"""
|
||||
Test that search_labels works with Chinese text using the CJK analyzer.
|
||||
"""
|
||||
storage = neo4j_storage
|
||||
|
||||
# Insert Chinese test nodes
|
||||
chinese_nodes = [
|
||||
{
|
||||
"entity_id": "人工智能",
|
||||
"description": "人工智能领域",
|
||||
"keywords": "AI,机器学习",
|
||||
"entity_type": "技术",
|
||||
},
|
||||
{
|
||||
"entity_id": "机器学习",
|
||||
"description": "机器学习子领域",
|
||||
"keywords": "监督学习,无监督学习",
|
||||
"entity_type": "技术",
|
||||
},
|
||||
{
|
||||
"entity_id": "深度学习",
|
||||
"description": "深度学习子领域",
|
||||
"keywords": "神经网络",
|
||||
"entity_type": "技术",
|
||||
},
|
||||
]
|
||||
|
||||
for node_data in chinese_nodes:
|
||||
await storage.upsert_node(node_data["entity_id"], node_data)
|
||||
|
||||
# Give the index time to become consistent
|
||||
await asyncio.sleep(2)
|
||||
|
||||
# Test Chinese text search
|
||||
results = await storage.search_labels("学习", limit=10)
|
||||
|
||||
# Should find nodes with "学习" in them
|
||||
assert len(results) > 0, "search_labels should return results for Chinese text"
|
||||
assert any(
|
||||
"学习" in result for result in results
|
||||
), "Results should contain Chinese characters '学习'"
|
||||
|
||||
|
||||
@pytest.mark.integration
|
||||
@pytest.mark.requires_db
|
||||
async def test_search_labels_fallback_to_contains(neo4j_storage):
|
||||
"""
|
||||
Test that search_labels falls back to CONTAINS search if the index fails.
|
||||
This can happen with older Neo4j versions or if the index is not yet available.
|
||||
"""
|
||||
storage = neo4j_storage
|
||||
|
||||
# Insert test nodes
|
||||
test_nodes = [
|
||||
{
|
||||
"entity_id": "Test Node Alpha",
|
||||
"description": "Test node",
|
||||
"keywords": "test",
|
||||
"entity_type": "Test",
|
||||
},
|
||||
{
|
||||
"entity_id": "Test Node Beta",
|
||||
"description": "Test node",
|
||||
"keywords": "test",
|
||||
"entity_type": "Test",
|
||||
},
|
||||
]
|
||||
|
||||
for node_data in test_nodes:
|
||||
await storage.upsert_node(node_data["entity_id"], node_data)
|
||||
|
||||
# Even if the full-text index is not available, CONTAINS should work
|
||||
results = await storage.search_labels("Alpha", limit=10)
|
||||
|
||||
# Should find the node using fallback CONTAINS search
|
||||
assert len(results) > 0, "Fallback CONTAINS search should return results"
|
||||
assert "Test Node Alpha" in results, "Should find 'Test Node Alpha'"
|
||||
|
||||
|
||||
@pytest.mark.integration
|
||||
@pytest.mark.requires_db
|
||||
async def test_multiple_workspaces_have_separate_indexes(neo4j_storage):
|
||||
"""
|
||||
Test that different workspaces have their own separate indexes.
|
||||
"""
|
||||
from lightrag.kg.neo4j_impl import Neo4JStorage
|
||||
|
||||
# Create storage for workspace 1
|
||||
storage1 = neo4j_storage
|
||||
|
||||
# Create storage for workspace 2
|
||||
global_config = {
|
||||
"embedding_batch_num": 10,
|
||||
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.5},
|
||||
"working_dir": os.environ.get("WORKING_DIR", "./rag_storage"),
|
||||
}
|
||||
|
||||
storage2 = Neo4JStorage(
|
||||
namespace="test_fulltext_index",
|
||||
workspace="test_workspace_2",
|
||||
global_config=global_config,
|
||||
embedding_func=mock_embedding_func,
|
||||
)
|
||||
|
||||
await storage2.initialize()
|
||||
await storage2.drop()
|
||||
|
||||
try:
|
||||
# Check that both workspaces have their own indexes
|
||||
async with storage1._driver.session(database=storage1._DATABASE) as session:
|
||||
result = await session.run("SHOW FULLTEXT INDEXES")
|
||||
indexes = await result.data()
|
||||
await result.consume()
|
||||
|
||||
index_names = [idx["name"] for idx in indexes]
|
||||
workspace1_index = f"entity_id_fulltext_idx_{storage1._get_workspace_label()}"
|
||||
workspace2_index = f"entity_id_fulltext_idx_{storage2._get_workspace_label()}"
|
||||
|
||||
assert (
|
||||
workspace1_index in index_names
|
||||
), f"Workspace 1 index '{workspace1_index}' should exist"
|
||||
assert (
|
||||
workspace2_index in index_names
|
||||
), f"Workspace 2 index '{workspace2_index}' should exist"
|
||||
|
||||
finally:
|
||||
await storage2.drop()
|
||||
await storage2.finalize()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Run tests with pytest
|
||||
pytest.main([__file__, "-v", "--run-integration"])
|
||||
Reference in New Issue
Block a user