- Expand CJK regex to extensions A-F - Use DROP INDEX IF EXISTS - Add cleanup in multi-workspace test - Safely handle legacy index drops
315 lines
10 KiB
Python
315 lines
10 KiB
Python
#!/usr/bin/env python
|
|
"""
|
|
Test Neo4j full-text index functionality, specifically:
|
|
1. Workspace-specific index naming
|
|
2. Legacy index migration
|
|
3. search_labels functionality with workspace-specific indexes
|
|
"""
|
|
|
|
import asyncio
|
|
import os
|
|
import sys
|
|
import pytest
|
|
import numpy as np
|
|
|
|
# Add the project root directory to the Python path
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
from lightrag.kg.shared_storage import initialize_share_data
|
|
|
|
|
|
# Mock embedding function that returns random vectors
|
|
async def mock_embedding_func(texts):
|
|
return np.random.rand(len(texts), 10) # Return 10-dimensional random vectors
|
|
|
|
|
|
@pytest.fixture
|
|
async def neo4j_storage():
|
|
"""
|
|
Initialize Neo4j storage for testing.
|
|
Requires Neo4j to be running and configured via environment variables.
|
|
"""
|
|
# Check if Neo4j is configured
|
|
if not os.getenv("NEO4J_URI"):
|
|
pytest.skip("Neo4j not configured (NEO4J_URI not set)")
|
|
|
|
from lightrag.kg.neo4j_impl import Neo4JStorage
|
|
|
|
# Initialize shared_storage for locks
|
|
initialize_share_data()
|
|
|
|
global_config = {
|
|
"embedding_batch_num": 10,
|
|
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.5},
|
|
"working_dir": os.environ.get("WORKING_DIR", "./rag_storage"),
|
|
}
|
|
|
|
storage = Neo4JStorage(
|
|
namespace="test_fulltext_index",
|
|
workspace="test_workspace",
|
|
global_config=global_config,
|
|
embedding_func=mock_embedding_func,
|
|
)
|
|
|
|
# Initialize the connection
|
|
await storage.initialize()
|
|
|
|
# Clean up any existing data
|
|
await storage.drop()
|
|
|
|
yield storage
|
|
|
|
# Cleanup
|
|
await storage.drop()
|
|
await storage.finalize()
|
|
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.requires_db
|
|
async def test_fulltext_index_creation(neo4j_storage):
|
|
"""
|
|
Test that the full-text index is created with the workspace-specific name.
|
|
"""
|
|
storage = neo4j_storage
|
|
workspace_label = storage._get_workspace_label()
|
|
expected_index_name = f"entity_id_fulltext_idx_{workspace_label}"
|
|
|
|
# Query Neo4j to check if the index exists
|
|
async with storage._driver.session(database=storage._DATABASE) as session:
|
|
result = await session.run("SHOW FULLTEXT INDEXES")
|
|
indexes = await result.data()
|
|
await result.consume()
|
|
|
|
# Check if the workspace-specific index exists
|
|
index_names = [idx["name"] for idx in indexes]
|
|
assert (
|
|
expected_index_name in index_names
|
|
), f"Expected index '{expected_index_name}' not found. Found indexes: {index_names}"
|
|
|
|
# Check if the legacy index doesn't exist (should be migrated if it was there)
|
|
legacy_index_name = "entity_id_fulltext_idx"
|
|
if legacy_index_name in index_names:
|
|
# If legacy index exists, it should be for a different workspace
|
|
# or it means migration didn't happen
|
|
print(
|
|
f"Warning: Legacy index '{legacy_index_name}' still exists alongside '{expected_index_name}'"
|
|
)
|
|
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.requires_db
|
|
async def test_search_labels_with_workspace_index(neo4j_storage):
|
|
"""
|
|
Test that search_labels uses the workspace-specific index and returns results.
|
|
"""
|
|
storage = neo4j_storage
|
|
|
|
# Insert test nodes
|
|
test_nodes = [
|
|
{
|
|
"entity_id": "Artificial Intelligence",
|
|
"description": "AI field",
|
|
"keywords": "AI,ML,DL",
|
|
"entity_type": "Technology",
|
|
},
|
|
{
|
|
"entity_id": "Machine Learning",
|
|
"description": "ML subfield",
|
|
"keywords": "supervised,unsupervised",
|
|
"entity_type": "Technology",
|
|
},
|
|
{
|
|
"entity_id": "Deep Learning",
|
|
"description": "DL subfield",
|
|
"keywords": "neural networks",
|
|
"entity_type": "Technology",
|
|
},
|
|
{
|
|
"entity_id": "Natural Language Processing",
|
|
"description": "NLP field",
|
|
"keywords": "text,language",
|
|
"entity_type": "Technology",
|
|
},
|
|
]
|
|
|
|
for node_data in test_nodes:
|
|
await storage.upsert_node(node_data["entity_id"], node_data)
|
|
|
|
# Give the index time to become consistent (eventually consistent index)
|
|
await asyncio.sleep(2)
|
|
|
|
# Test search_labels
|
|
results = await storage.search_labels("Learning", limit=10)
|
|
|
|
# Should find nodes with "Learning" in them
|
|
assert len(results) > 0, "search_labels should return results for 'Learning'"
|
|
assert any(
|
|
"Learning" in result for result in results
|
|
), "Results should contain 'Learning'"
|
|
|
|
# Test case-insensitive search
|
|
results_lower = await storage.search_labels("learning", limit=10)
|
|
assert len(results_lower) > 0, "search_labels should be case-insensitive"
|
|
|
|
# Test partial match
|
|
results_partial = await storage.search_labels("Intelli", limit=10)
|
|
assert (
|
|
len(results_partial) > 0
|
|
), "search_labels should support partial matching with wildcard"
|
|
assert any(
|
|
"Intelligence" in result for result in results_partial
|
|
), "Should find 'Artificial Intelligence'"
|
|
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.requires_db
|
|
async def test_search_labels_chinese_text(neo4j_storage):
|
|
"""
|
|
Test that search_labels works with Chinese text using the CJK analyzer.
|
|
"""
|
|
storage = neo4j_storage
|
|
|
|
# Insert Chinese test nodes
|
|
chinese_nodes = [
|
|
{
|
|
"entity_id": "人工智能",
|
|
"description": "人工智能领域",
|
|
"keywords": "AI,机器学习",
|
|
"entity_type": "技术",
|
|
},
|
|
{
|
|
"entity_id": "机器学习",
|
|
"description": "机器学习子领域",
|
|
"keywords": "监督学习,无监督学习",
|
|
"entity_type": "技术",
|
|
},
|
|
{
|
|
"entity_id": "深度学习",
|
|
"description": "深度学习子领域",
|
|
"keywords": "神经网络",
|
|
"entity_type": "技术",
|
|
},
|
|
]
|
|
|
|
for node_data in chinese_nodes:
|
|
await storage.upsert_node(node_data["entity_id"], node_data)
|
|
|
|
# Give the index time to become consistent
|
|
await asyncio.sleep(2)
|
|
|
|
# Test Chinese text search
|
|
results = await storage.search_labels("学习", limit=10)
|
|
|
|
# Should find nodes with "学习" in them
|
|
assert len(results) > 0, "search_labels should return results for Chinese text"
|
|
assert any(
|
|
"学习" in result for result in results
|
|
), "Results should contain Chinese characters '学习'"
|
|
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.requires_db
|
|
async def test_search_labels_fallback_to_contains(neo4j_storage):
|
|
"""
|
|
Test that search_labels falls back to CONTAINS search if the index fails.
|
|
This can happen with older Neo4j versions or if the index is not yet available.
|
|
"""
|
|
storage = neo4j_storage
|
|
|
|
# Insert test nodes
|
|
test_nodes = [
|
|
{
|
|
"entity_id": "Test Node Alpha",
|
|
"description": "Test node",
|
|
"keywords": "test",
|
|
"entity_type": "Test",
|
|
},
|
|
{
|
|
"entity_id": "Test Node Beta",
|
|
"description": "Test node",
|
|
"keywords": "test",
|
|
"entity_type": "Test",
|
|
},
|
|
]
|
|
|
|
for node_data in test_nodes:
|
|
await storage.upsert_node(node_data["entity_id"], node_data)
|
|
|
|
# Even if the full-text index is not available, CONTAINS should work
|
|
results = await storage.search_labels("Alpha", limit=10)
|
|
|
|
# Should find the node using fallback CONTAINS search
|
|
assert len(results) > 0, "Fallback CONTAINS search should return results"
|
|
assert "Test Node Alpha" in results, "Should find 'Test Node Alpha'"
|
|
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.requires_db
|
|
async def test_multiple_workspaces_have_separate_indexes(neo4j_storage):
|
|
"""
|
|
Test that different workspaces have their own separate indexes.
|
|
"""
|
|
from lightrag.kg.neo4j_impl import Neo4JStorage
|
|
|
|
# Create storage for workspace 1
|
|
storage1 = neo4j_storage
|
|
|
|
# Create storage for workspace 2
|
|
global_config = {
|
|
"embedding_batch_num": 10,
|
|
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.5},
|
|
"working_dir": os.environ.get("WORKING_DIR", "./rag_storage"),
|
|
}
|
|
|
|
storage2 = Neo4JStorage(
|
|
namespace="test_fulltext_index",
|
|
workspace="test_workspace_2",
|
|
global_config=global_config,
|
|
embedding_func=mock_embedding_func,
|
|
)
|
|
|
|
await storage2.initialize()
|
|
await storage2.drop()
|
|
|
|
try:
|
|
# Check that both workspaces have their own indexes
|
|
async with storage1._driver.session(database=storage1._DATABASE) as session:
|
|
result = await session.run("SHOW FULLTEXT INDEXES")
|
|
indexes = await result.data()
|
|
await result.consume()
|
|
|
|
index_names = [idx["name"] for idx in indexes]
|
|
workspace1_index = (
|
|
f"entity_id_fulltext_idx_{storage1._get_workspace_label()}"
|
|
)
|
|
workspace2_index = (
|
|
f"entity_id_fulltext_idx_{storage2._get_workspace_label()}"
|
|
)
|
|
|
|
assert (
|
|
workspace1_index in index_names
|
|
), f"Workspace 1 index '{workspace1_index}' should exist"
|
|
assert (
|
|
workspace2_index in index_names
|
|
), f"Workspace 2 index '{workspace2_index}' should exist"
|
|
|
|
finally:
|
|
# Clean up: drop the fulltext index created for workspace 2 to prevent accumulation
|
|
try:
|
|
async with storage2._driver.session(database=storage2._DATABASE) as session:
|
|
index_name = storage2._get_fulltext_index_name(
|
|
storage2._get_workspace_label()
|
|
)
|
|
drop_query = f"DROP INDEX {index_name} IF EXISTS"
|
|
result = await session.run(drop_query)
|
|
await result.consume()
|
|
except Exception:
|
|
pass # Ignore errors during cleanup
|
|
await storage2.drop()
|
|
await storage2.finalize()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Run tests with pytest
|
|
pytest.main([__file__, "-v", "--run-integration"])
|