Files
lightrag/tests/test_neo4j_fulltext_index.py
yangdx 2a02b69e1d Improve CJK detection and safely drop Neo4j indexes
- Expand CJK regex to extensions A-F
- Use DROP INDEX IF EXISTS
- Add cleanup in multi-workspace test
- Safely handle legacy index drops
2025-12-22 00:19:37 +08:00

315 lines
10 KiB
Python

#!/usr/bin/env python
"""
Test Neo4j full-text index functionality, specifically:
1. Workspace-specific index naming
2. Legacy index migration
3. search_labels functionality with workspace-specific indexes
"""
import asyncio
import os
import sys
import pytest
import numpy as np
# Add the project root directory to the Python path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from lightrag.kg.shared_storage import initialize_share_data
# Mock embedding function that returns random vectors
async def mock_embedding_func(texts):
return np.random.rand(len(texts), 10) # Return 10-dimensional random vectors
@pytest.fixture
async def neo4j_storage():
"""
Initialize Neo4j storage for testing.
Requires Neo4j to be running and configured via environment variables.
"""
# Check if Neo4j is configured
if not os.getenv("NEO4J_URI"):
pytest.skip("Neo4j not configured (NEO4J_URI not set)")
from lightrag.kg.neo4j_impl import Neo4JStorage
# Initialize shared_storage for locks
initialize_share_data()
global_config = {
"embedding_batch_num": 10,
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.5},
"working_dir": os.environ.get("WORKING_DIR", "./rag_storage"),
}
storage = Neo4JStorage(
namespace="test_fulltext_index",
workspace="test_workspace",
global_config=global_config,
embedding_func=mock_embedding_func,
)
# Initialize the connection
await storage.initialize()
# Clean up any existing data
await storage.drop()
yield storage
# Cleanup
await storage.drop()
await storage.finalize()
@pytest.mark.integration
@pytest.mark.requires_db
async def test_fulltext_index_creation(neo4j_storage):
"""
Test that the full-text index is created with the workspace-specific name.
"""
storage = neo4j_storage
workspace_label = storage._get_workspace_label()
expected_index_name = f"entity_id_fulltext_idx_{workspace_label}"
# Query Neo4j to check if the index exists
async with storage._driver.session(database=storage._DATABASE) as session:
result = await session.run("SHOW FULLTEXT INDEXES")
indexes = await result.data()
await result.consume()
# Check if the workspace-specific index exists
index_names = [idx["name"] for idx in indexes]
assert (
expected_index_name in index_names
), f"Expected index '{expected_index_name}' not found. Found indexes: {index_names}"
# Check if the legacy index doesn't exist (should be migrated if it was there)
legacy_index_name = "entity_id_fulltext_idx"
if legacy_index_name in index_names:
# If legacy index exists, it should be for a different workspace
# or it means migration didn't happen
print(
f"Warning: Legacy index '{legacy_index_name}' still exists alongside '{expected_index_name}'"
)
@pytest.mark.integration
@pytest.mark.requires_db
async def test_search_labels_with_workspace_index(neo4j_storage):
"""
Test that search_labels uses the workspace-specific index and returns results.
"""
storage = neo4j_storage
# Insert test nodes
test_nodes = [
{
"entity_id": "Artificial Intelligence",
"description": "AI field",
"keywords": "AI,ML,DL",
"entity_type": "Technology",
},
{
"entity_id": "Machine Learning",
"description": "ML subfield",
"keywords": "supervised,unsupervised",
"entity_type": "Technology",
},
{
"entity_id": "Deep Learning",
"description": "DL subfield",
"keywords": "neural networks",
"entity_type": "Technology",
},
{
"entity_id": "Natural Language Processing",
"description": "NLP field",
"keywords": "text,language",
"entity_type": "Technology",
},
]
for node_data in test_nodes:
await storage.upsert_node(node_data["entity_id"], node_data)
# Give the index time to become consistent (eventually consistent index)
await asyncio.sleep(2)
# Test search_labels
results = await storage.search_labels("Learning", limit=10)
# Should find nodes with "Learning" in them
assert len(results) > 0, "search_labels should return results for 'Learning'"
assert any(
"Learning" in result for result in results
), "Results should contain 'Learning'"
# Test case-insensitive search
results_lower = await storage.search_labels("learning", limit=10)
assert len(results_lower) > 0, "search_labels should be case-insensitive"
# Test partial match
results_partial = await storage.search_labels("Intelli", limit=10)
assert (
len(results_partial) > 0
), "search_labels should support partial matching with wildcard"
assert any(
"Intelligence" in result for result in results_partial
), "Should find 'Artificial Intelligence'"
@pytest.mark.integration
@pytest.mark.requires_db
async def test_search_labels_chinese_text(neo4j_storage):
"""
Test that search_labels works with Chinese text using the CJK analyzer.
"""
storage = neo4j_storage
# Insert Chinese test nodes
chinese_nodes = [
{
"entity_id": "人工智能",
"description": "人工智能领域",
"keywords": "AI,机器学习",
"entity_type": "技术",
},
{
"entity_id": "机器学习",
"description": "机器学习子领域",
"keywords": "监督学习,无监督学习",
"entity_type": "技术",
},
{
"entity_id": "深度学习",
"description": "深度学习子领域",
"keywords": "神经网络",
"entity_type": "技术",
},
]
for node_data in chinese_nodes:
await storage.upsert_node(node_data["entity_id"], node_data)
# Give the index time to become consistent
await asyncio.sleep(2)
# Test Chinese text search
results = await storage.search_labels("学习", limit=10)
# Should find nodes with "学习" in them
assert len(results) > 0, "search_labels should return results for Chinese text"
assert any(
"学习" in result for result in results
), "Results should contain Chinese characters '学习'"
@pytest.mark.integration
@pytest.mark.requires_db
async def test_search_labels_fallback_to_contains(neo4j_storage):
"""
Test that search_labels falls back to CONTAINS search if the index fails.
This can happen with older Neo4j versions or if the index is not yet available.
"""
storage = neo4j_storage
# Insert test nodes
test_nodes = [
{
"entity_id": "Test Node Alpha",
"description": "Test node",
"keywords": "test",
"entity_type": "Test",
},
{
"entity_id": "Test Node Beta",
"description": "Test node",
"keywords": "test",
"entity_type": "Test",
},
]
for node_data in test_nodes:
await storage.upsert_node(node_data["entity_id"], node_data)
# Even if the full-text index is not available, CONTAINS should work
results = await storage.search_labels("Alpha", limit=10)
# Should find the node using fallback CONTAINS search
assert len(results) > 0, "Fallback CONTAINS search should return results"
assert "Test Node Alpha" in results, "Should find 'Test Node Alpha'"
@pytest.mark.integration
@pytest.mark.requires_db
async def test_multiple_workspaces_have_separate_indexes(neo4j_storage):
"""
Test that different workspaces have their own separate indexes.
"""
from lightrag.kg.neo4j_impl import Neo4JStorage
# Create storage for workspace 1
storage1 = neo4j_storage
# Create storage for workspace 2
global_config = {
"embedding_batch_num": 10,
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.5},
"working_dir": os.environ.get("WORKING_DIR", "./rag_storage"),
}
storage2 = Neo4JStorage(
namespace="test_fulltext_index",
workspace="test_workspace_2",
global_config=global_config,
embedding_func=mock_embedding_func,
)
await storage2.initialize()
await storage2.drop()
try:
# Check that both workspaces have their own indexes
async with storage1._driver.session(database=storage1._DATABASE) as session:
result = await session.run("SHOW FULLTEXT INDEXES")
indexes = await result.data()
await result.consume()
index_names = [idx["name"] for idx in indexes]
workspace1_index = (
f"entity_id_fulltext_idx_{storage1._get_workspace_label()}"
)
workspace2_index = (
f"entity_id_fulltext_idx_{storage2._get_workspace_label()}"
)
assert (
workspace1_index in index_names
), f"Workspace 1 index '{workspace1_index}' should exist"
assert (
workspace2_index in index_names
), f"Workspace 2 index '{workspace2_index}' should exist"
finally:
# Clean up: drop the fulltext index created for workspace 2 to prevent accumulation
try:
async with storage2._driver.session(database=storage2._DATABASE) as session:
index_name = storage2._get_fulltext_index_name(
storage2._get_workspace_label()
)
drop_query = f"DROP INDEX {index_name} IF EXISTS"
result = await session.run(drop_query)
await result.consume()
except Exception:
pass # Ignore errors during cleanup
await storage2.drop()
await storage2.finalize()
if __name__ == "__main__":
# Run tests with pytest
pytest.main([__file__, "-v", "--run-integration"])