Fix Entity Source IDs Tracking Problem
- Handle existing node updates properly in edge merging stage - Fix source_ids merging logic - Reorder entity deletion and optimize node operations - Delete relationships before entities - Add edge existence debugging logs
This commit is contained in:
@@ -3294,36 +3294,7 @@ class LightRAG:
|
||||
logger.error(f"Failed to delete chunks: {e}")
|
||||
raise Exception(f"Failed to delete document chunks: {e}") from e
|
||||
|
||||
# 6. Delete entities that have no remaining sources
|
||||
if entities_to_delete:
|
||||
try:
|
||||
# Delete from vector database
|
||||
entity_vdb_ids = [
|
||||
compute_mdhash_id(entity, prefix="ent-")
|
||||
for entity in entities_to_delete
|
||||
]
|
||||
await self.entities_vdb.delete(entity_vdb_ids)
|
||||
|
||||
# Delete from graph
|
||||
await self.chunk_entity_relation_graph.remove_nodes(
|
||||
list(entities_to_delete)
|
||||
)
|
||||
|
||||
# Delete from entity_chunks storage
|
||||
if self.entity_chunks:
|
||||
await self.entity_chunks.delete(list(entities_to_delete))
|
||||
|
||||
async with pipeline_status_lock:
|
||||
log_message = f"Successfully deleted {len(entities_to_delete)} entities"
|
||||
logger.info(log_message)
|
||||
pipeline_status["latest_message"] = log_message
|
||||
pipeline_status["history_messages"].append(log_message)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to delete entities: {e}")
|
||||
raise Exception(f"Failed to delete entities: {e}") from e
|
||||
|
||||
# 7. Delete relationships that have no remaining sources
|
||||
# 6. Delete relationships that have no remaining sources
|
||||
if relationships_to_delete:
|
||||
try:
|
||||
# Delete from vector database
|
||||
@@ -3360,6 +3331,66 @@ class LightRAG:
|
||||
logger.error(f"Failed to delete relationships: {e}")
|
||||
raise Exception(f"Failed to delete relationships: {e}") from e
|
||||
|
||||
# 7. Delete entities that have no remaining sources
|
||||
if entities_to_delete:
|
||||
try:
|
||||
# Debug: Check and log all edges before deleting nodes
|
||||
edges_still_exist = 0
|
||||
for entity in entities_to_delete:
|
||||
edges = (
|
||||
await self.chunk_entity_relation_graph.get_node_edges(
|
||||
entity
|
||||
)
|
||||
)
|
||||
if edges:
|
||||
for src, tgt in edges:
|
||||
if (
|
||||
src in entities_to_delete
|
||||
and tgt in entities_to_delete
|
||||
):
|
||||
logger.warning(
|
||||
f"Edge still exists: {src} <-> {tgt}"
|
||||
)
|
||||
elif src in entities_to_delete:
|
||||
logger.warning(
|
||||
f"Edge still exists: {src} --> {tgt}"
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
f"Edge still exists: {tgt} --> {src}"
|
||||
)
|
||||
edges_still_exist += 1
|
||||
if edges_still_exist:
|
||||
logger.warning(
|
||||
f"⚠️ {edges_still_exist} entities still has edges before deletion"
|
||||
)
|
||||
|
||||
# Delete from graph
|
||||
await self.chunk_entity_relation_graph.remove_nodes(
|
||||
list(entities_to_delete)
|
||||
)
|
||||
|
||||
# Delete from vector database
|
||||
entity_vdb_ids = [
|
||||
compute_mdhash_id(entity, prefix="ent-")
|
||||
for entity in entities_to_delete
|
||||
]
|
||||
await self.entities_vdb.delete(entity_vdb_ids)
|
||||
|
||||
# Delete from entity_chunks storage
|
||||
if self.entity_chunks:
|
||||
await self.entity_chunks.delete(list(entities_to_delete))
|
||||
|
||||
async with pipeline_status_lock:
|
||||
log_message = f"Successfully deleted {len(entities_to_delete)} entities"
|
||||
logger.info(log_message)
|
||||
pipeline_status["latest_message"] = log_message
|
||||
pipeline_status["history_messages"].append(log_message)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to delete entities: {e}")
|
||||
raise Exception(f"Failed to delete entities: {e}") from e
|
||||
|
||||
# Persist changes to graph database before releasing graph database lock
|
||||
await self._insert_done()
|
||||
|
||||
|
||||
@@ -2138,7 +2138,11 @@ async def _merge_edges_then_upsert(
|
||||
|
||||
# 11. Update both graph and vector db
|
||||
for need_insert_id in [src_id, tgt_id]:
|
||||
if not (await knowledge_graph_inst.has_node(need_insert_id)):
|
||||
# Optimization: Use get_node instead of has_node + get_node
|
||||
existing_node = await knowledge_graph_inst.get_node(need_insert_id)
|
||||
|
||||
if existing_node is None:
|
||||
# Node doesn't exist - create new node
|
||||
node_created_at = int(time.time())
|
||||
node_data = {
|
||||
"entity_id": need_insert_id,
|
||||
@@ -2195,6 +2199,109 @@ async def _merge_edges_then_upsert(
|
||||
"created_at": node_created_at,
|
||||
}
|
||||
added_entities.append(entity_data)
|
||||
else:
|
||||
# Node exists - update its source_ids by merging with new source_ids
|
||||
updated = False # Track if any update occurred
|
||||
|
||||
# 1. Get existing full source_ids from entity_chunks_storage
|
||||
existing_full_source_ids = []
|
||||
if entity_chunks_storage is not None:
|
||||
stored_chunks = await entity_chunks_storage.get_by_id(need_insert_id)
|
||||
if stored_chunks and isinstance(stored_chunks, dict):
|
||||
existing_full_source_ids = [
|
||||
chunk_id
|
||||
for chunk_id in stored_chunks.get("chunk_ids", [])
|
||||
if chunk_id
|
||||
]
|
||||
|
||||
# If not in entity_chunks_storage, get from graph database
|
||||
if not existing_full_source_ids:
|
||||
if existing_node.get("source_id"):
|
||||
existing_full_source_ids = existing_node["source_id"].split(
|
||||
GRAPH_FIELD_SEP
|
||||
)
|
||||
|
||||
# 2. Merge with new source_ids from this relationship
|
||||
new_source_ids_from_relation = [
|
||||
chunk_id for chunk_id in source_ids if chunk_id
|
||||
]
|
||||
merged_full_source_ids = merge_source_ids(
|
||||
existing_full_source_ids, new_source_ids_from_relation
|
||||
)
|
||||
|
||||
# 3. Save merged full list to entity_chunks_storage (conditional)
|
||||
if (
|
||||
entity_chunks_storage is not None
|
||||
and merged_full_source_ids != existing_full_source_ids
|
||||
):
|
||||
updated = True
|
||||
await entity_chunks_storage.upsert(
|
||||
{
|
||||
need_insert_id: {
|
||||
"chunk_ids": merged_full_source_ids,
|
||||
"count": len(merged_full_source_ids),
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
# 4. Apply source_ids limit for graph and vector db
|
||||
limit_method = global_config.get(
|
||||
"source_ids_limit_method", SOURCE_IDS_LIMIT_METHOD_KEEP
|
||||
)
|
||||
max_source_limit = global_config.get("max_source_ids_per_entity")
|
||||
limited_source_ids = apply_source_ids_limit(
|
||||
merged_full_source_ids,
|
||||
max_source_limit,
|
||||
limit_method,
|
||||
identifier=f"`{need_insert_id}`",
|
||||
)
|
||||
|
||||
# 5. Update graph database and vector database with limited source_ids (conditional)
|
||||
limited_source_id_str = GRAPH_FIELD_SEP.join(limited_source_ids)
|
||||
|
||||
if limited_source_id_str != existing_node.get("source_id", ""):
|
||||
updated = True
|
||||
updated_node_data = {
|
||||
**existing_node,
|
||||
"source_id": limited_source_id_str,
|
||||
}
|
||||
await knowledge_graph_inst.upsert_node(
|
||||
need_insert_id, node_data=updated_node_data
|
||||
)
|
||||
|
||||
# Update vector database
|
||||
if entity_vdb is not None:
|
||||
entity_vdb_id = compute_mdhash_id(need_insert_id, prefix="ent-")
|
||||
entity_content = (
|
||||
f"{need_insert_id}\n{existing_node.get('description', '')}"
|
||||
)
|
||||
vdb_data = {
|
||||
entity_vdb_id: {
|
||||
"content": entity_content,
|
||||
"entity_name": need_insert_id,
|
||||
"source_id": limited_source_id_str,
|
||||
"entity_type": existing_node.get("entity_type", "UNKNOWN"),
|
||||
"file_path": existing_node.get(
|
||||
"file_path", "unknown_source"
|
||||
),
|
||||
}
|
||||
}
|
||||
await safe_vdb_operation_with_exception(
|
||||
operation=lambda payload=vdb_data: entity_vdb.upsert(payload),
|
||||
operation_name="existing_entity_update",
|
||||
entity_name=need_insert_id,
|
||||
max_retries=3,
|
||||
retry_delay=0.1,
|
||||
)
|
||||
|
||||
# 6. Log once at the end if any update occurred
|
||||
if updated:
|
||||
status_message = f"Chunks appended from relation: `{need_insert_id}`"
|
||||
logger.info(status_message)
|
||||
if pipeline_status is not None and pipeline_status_lock is not None:
|
||||
async with pipeline_status_lock:
|
||||
pipeline_status["latest_message"] = status_message
|
||||
pipeline_status["history_messages"].append(status_message)
|
||||
|
||||
edge_created_at = int(time.time())
|
||||
await knowledge_graph_inst.upsert_edge(
|
||||
|
||||
Reference in New Issue
Block a user