diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index ff7ed253..f0acb72d 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -3294,36 +3294,7 @@ class LightRAG: logger.error(f"Failed to delete chunks: {e}") raise Exception(f"Failed to delete document chunks: {e}") from e - # 6. Delete entities that have no remaining sources - if entities_to_delete: - try: - # Delete from vector database - entity_vdb_ids = [ - compute_mdhash_id(entity, prefix="ent-") - for entity in entities_to_delete - ] - await self.entities_vdb.delete(entity_vdb_ids) - - # Delete from graph - await self.chunk_entity_relation_graph.remove_nodes( - list(entities_to_delete) - ) - - # Delete from entity_chunks storage - if self.entity_chunks: - await self.entity_chunks.delete(list(entities_to_delete)) - - async with pipeline_status_lock: - log_message = f"Successfully deleted {len(entities_to_delete)} entities" - logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) - - except Exception as e: - logger.error(f"Failed to delete entities: {e}") - raise Exception(f"Failed to delete entities: {e}") from e - - # 7. Delete relationships that have no remaining sources + # 6. Delete relationships that have no remaining sources if relationships_to_delete: try: # Delete from vector database @@ -3360,6 +3331,66 @@ class LightRAG: logger.error(f"Failed to delete relationships: {e}") raise Exception(f"Failed to delete relationships: {e}") from e + # 7. Delete entities that have no remaining sources + if entities_to_delete: + try: + # Debug: Check and log all edges before deleting nodes + edges_still_exist = 0 + for entity in entities_to_delete: + edges = ( + await self.chunk_entity_relation_graph.get_node_edges( + entity + ) + ) + if edges: + for src, tgt in edges: + if ( + src in entities_to_delete + and tgt in entities_to_delete + ): + logger.warning( + f"Edge still exists: {src} <-> {tgt}" + ) + elif src in entities_to_delete: + logger.warning( + f"Edge still exists: {src} --> {tgt}" + ) + else: + logger.warning( + f"Edge still exists: {tgt} --> {src}" + ) + edges_still_exist += 1 + if edges_still_exist: + logger.warning( + f"⚠️ {edges_still_exist} entities still has edges before deletion" + ) + + # Delete from graph + await self.chunk_entity_relation_graph.remove_nodes( + list(entities_to_delete) + ) + + # Delete from vector database + entity_vdb_ids = [ + compute_mdhash_id(entity, prefix="ent-") + for entity in entities_to_delete + ] + await self.entities_vdb.delete(entity_vdb_ids) + + # Delete from entity_chunks storage + if self.entity_chunks: + await self.entity_chunks.delete(list(entities_to_delete)) + + async with pipeline_status_lock: + log_message = f"Successfully deleted {len(entities_to_delete)} entities" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) + + except Exception as e: + logger.error(f"Failed to delete entities: {e}") + raise Exception(f"Failed to delete entities: {e}") from e + # Persist changes to graph database before releasing graph database lock await self._insert_done() diff --git a/lightrag/operate.py b/lightrag/operate.py index 9a66f309..3354a2ae 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -2138,7 +2138,11 @@ async def _merge_edges_then_upsert( # 11. Update both graph and vector db for need_insert_id in [src_id, tgt_id]: - if not (await knowledge_graph_inst.has_node(need_insert_id)): + # Optimization: Use get_node instead of has_node + get_node + existing_node = await knowledge_graph_inst.get_node(need_insert_id) + + if existing_node is None: + # Node doesn't exist - create new node node_created_at = int(time.time()) node_data = { "entity_id": need_insert_id, @@ -2195,6 +2199,109 @@ async def _merge_edges_then_upsert( "created_at": node_created_at, } added_entities.append(entity_data) + else: + # Node exists - update its source_ids by merging with new source_ids + updated = False # Track if any update occurred + + # 1. Get existing full source_ids from entity_chunks_storage + existing_full_source_ids = [] + if entity_chunks_storage is not None: + stored_chunks = await entity_chunks_storage.get_by_id(need_insert_id) + if stored_chunks and isinstance(stored_chunks, dict): + existing_full_source_ids = [ + chunk_id + for chunk_id in stored_chunks.get("chunk_ids", []) + if chunk_id + ] + + # If not in entity_chunks_storage, get from graph database + if not existing_full_source_ids: + if existing_node.get("source_id"): + existing_full_source_ids = existing_node["source_id"].split( + GRAPH_FIELD_SEP + ) + + # 2. Merge with new source_ids from this relationship + new_source_ids_from_relation = [ + chunk_id for chunk_id in source_ids if chunk_id + ] + merged_full_source_ids = merge_source_ids( + existing_full_source_ids, new_source_ids_from_relation + ) + + # 3. Save merged full list to entity_chunks_storage (conditional) + if ( + entity_chunks_storage is not None + and merged_full_source_ids != existing_full_source_ids + ): + updated = True + await entity_chunks_storage.upsert( + { + need_insert_id: { + "chunk_ids": merged_full_source_ids, + "count": len(merged_full_source_ids), + } + } + ) + + # 4. Apply source_ids limit for graph and vector db + limit_method = global_config.get( + "source_ids_limit_method", SOURCE_IDS_LIMIT_METHOD_KEEP + ) + max_source_limit = global_config.get("max_source_ids_per_entity") + limited_source_ids = apply_source_ids_limit( + merged_full_source_ids, + max_source_limit, + limit_method, + identifier=f"`{need_insert_id}`", + ) + + # 5. Update graph database and vector database with limited source_ids (conditional) + limited_source_id_str = GRAPH_FIELD_SEP.join(limited_source_ids) + + if limited_source_id_str != existing_node.get("source_id", ""): + updated = True + updated_node_data = { + **existing_node, + "source_id": limited_source_id_str, + } + await knowledge_graph_inst.upsert_node( + need_insert_id, node_data=updated_node_data + ) + + # Update vector database + if entity_vdb is not None: + entity_vdb_id = compute_mdhash_id(need_insert_id, prefix="ent-") + entity_content = ( + f"{need_insert_id}\n{existing_node.get('description', '')}" + ) + vdb_data = { + entity_vdb_id: { + "content": entity_content, + "entity_name": need_insert_id, + "source_id": limited_source_id_str, + "entity_type": existing_node.get("entity_type", "UNKNOWN"), + "file_path": existing_node.get( + "file_path", "unknown_source" + ), + } + } + await safe_vdb_operation_with_exception( + operation=lambda payload=vdb_data: entity_vdb.upsert(payload), + operation_name="existing_entity_update", + entity_name=need_insert_id, + max_retries=3, + retry_delay=0.1, + ) + + # 6. Log once at the end if any update occurred + if updated: + status_message = f"Chunks appended from relation: `{need_insert_id}`" + logger.info(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) edge_created_at = int(time.time()) await knowledge_graph_inst.upsert_edge(