Files
biz-bud/src/biz_bud/graphs/examples/human_feedback_example.py
Travis Vasceannie 64f0f49d56 fix: resolve pre-commit hooks configuration and update dependencies
- Clean and reinstall pre-commit hooks to fix corrupted cache
- Update isort to v6.0.1 to resolve deprecation warnings
- Fix pytest PT012 error by separating pytest.raises from context managers
- Fix pytest PT011 errors by using GraphInterrupt instead of generic Exception
- Fix formatting and trailing whitespace issues automatically applied by hooks
2025-08-07 18:51:42 -04:00

262 lines
9.1 KiB
Python

"""Example of integrating human feedback into a LangGraph workflow.
This example demonstrates how to use the refactored human feedback nodes
in a real workflow with proper interrupt handling.
"""
from typing import Any, Literal
from langchain_core.runnables import RunnableConfig
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, StateGraph
from biz_bud.graphs.research.nodes.synthesis import synthesize_search_results
from biz_bud.nodes.validation.content import validate_content_output
from biz_bud.nodes.validation.human_feedback import (
human_feedback_node,
prepare_human_feedback_request,
should_apply_refinement,
should_request_feedback,
)
from biz_bud.states.unified import BusinessBuddyState
# Example refinement node that would use LLM to apply feedback
async def refine_with_feedback(state: BusinessBuddyState, config: RunnableConfig) -> dict[str, Any]:
"""Apply refinement based on human feedback."""
# In a real implementation, this would use an LLM
refinement_instructions = state.get("refinement_instructions", "")
current_output = state.get("synthesis", "")
# Mock refinement
refined_output = f"{current_output}\n\n[Refined based on: {refinement_instructions}]"
return {
"synthesis": refined_output,
"final_result": refined_output,
"refinement_applied": True,
"status": "success",
}
# Edge functions
def route_after_validation(
state: BusinessBuddyState,
) -> Literal["prepare_feedback", "complete"]:
"""Route based on validation results and feedback requirements."""
return "prepare_feedback" if should_request_feedback(state) else "complete"
def route_after_feedback(state: BusinessBuddyState) -> Literal["refine", "complete"]:
"""Route based on human feedback."""
return "refine" if should_apply_refinement(state) else "complete"
def create_human_feedback_workflow() -> StateGraph[BusinessBuddyState]:
"""Create a workflow with human feedback integration.
This workflow demonstrates:
1. Research workflow (search -> synthesize -> validate)
2. Human feedback request when validation fails or confidence is low
3. Refinement based on human feedback
4. Proper interrupt handling
"""
# Create the graph
workflow = StateGraph(BusinessBuddyState)
# Add nodes
# Note: This example uses a placeholder for search. In production, use optimized_search_node
# from the research workflow instead
async def placeholder_search(state: BusinessBuddyState) -> dict[str, Any]:
"""Replace with optimized_search_node in production."""
return {"search_results": [], "status": "success"}
workflow.add_node("search", placeholder_search)
workflow.add_node("synthesize", synthesize_search_results)
workflow.add_node("validate", validate_content_output)
workflow.add_node("prepare_feedback", prepare_human_feedback_request)
workflow.add_node("human_feedback", human_feedback_node)
workflow.add_node("refine", refine_with_feedback)
# Define the flow
workflow.set_entry_point("search")
# Main research flow
workflow.add_edge("search", "synthesize")
workflow.add_edge("synthesize", "validate")
# Conditional routing after validation
workflow.add_conditional_edges(
"validate",
route_after_validation,
{"prepare_feedback": "prepare_feedback", "complete": END},
)
# Human feedback flow
workflow.add_edge("prepare_feedback", "human_feedback")
# Conditional routing after human feedback
workflow.add_conditional_edges(
"human_feedback", route_after_feedback, {"refine": "refine", "complete": END}
)
# After refinement, could loop back to validation or end
workflow.add_edge("refine", END)
return workflow
# Example usage
async def run_example() -> None:
"""Run the human feedback workflow example."""
# Create workflow with memory for interrupt handling
workflow = create_human_feedback_workflow()
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)
# Initial state
initial_state: BusinessBuddyState = {
"messages": [],
"errors": [],
"config": {
"enabled": True,
# Note: ConfigDict features field only accepts dict[str, bool]
# human_feedback_threshold and max_search_results would need
# to be handled differently in the actual implementation
},
"thread_id": "example-thread-001",
"status": "pending",
"query": "What are the latest trends in AI safety research?",
"organization": [{"name": "Example Corp", "zip_code": "12345"}],
"parsed_input": {
"raw_payload": {"query": "example"},
"user_query": "What are the latest trends in AI safety research?",
},
"input_metadata": {"session_id": "example-session", "user_id": "example-user"},
"context": {
"id": "example-context-1",
"type": "context",
"content": {"run_type": "example"},
},
"is_complete": False,
"requires_interrupt": False,
}
# Configure run
config = {"configurable": {"thread_id": "example-thread-001"}}
# Run until interrupt
from typing import Coroutine, cast
result = await cast(
"Coroutine[Any, Any, dict[str, Any]]",
app.ainvoke(initial_state, cast("RunnableConfig | None", config)),
)
# If interrupted for feedback
if result.get("requires_interrupt"):
# In a real application, you would get actual user input
# For this example, we'll simulate it
user_feedback = "Please add more information about alignment research and constitutional AI"
# Resume workflow with feedback
# The interrupt() call in human_feedback_node will receive this value
await cast(
"Coroutine[Any, Any, dict[str, Any]]",
app.ainvoke(
cast("BusinessBuddyState", {"feedback_message": user_feedback}),
cast("RunnableConfig | None", config),
),
)
# More complex example with multiple feedback rounds
def create_iterative_feedback_workflow() -> StateGraph[BusinessBuddyState]:
"""Create a workflow that can handle multiple rounds of feedback.
This demonstrates a more complex pattern where refinement
can loop back to validation for another round of feedback.
"""
workflow = StateGraph(BusinessBuddyState)
# Add nodes (same as before)
# Using placeholder search - in production use optimized search
async def placeholder_search_v2(state: BusinessBuddyState) -> dict[str, Any]:
"""Search and return placeholder results."""
return {
"search_results": [],
"sources": [],
"extracted_info": {},
"status": "success",
}
workflow.add_node("search", placeholder_search_v2)
workflow.add_node("synthesize", synthesize_search_results)
workflow.add_node("validate", validate_content_output)
workflow.add_node("prepare_feedback", prepare_human_feedback_request)
workflow.add_node("human_feedback", human_feedback_node)
workflow.add_node("refine", refine_with_feedback)
# Add a node to check iteration count
async def check_iterations(state: BusinessBuddyState, config: RunnableConfig) -> dict[str, Any]:
current_iterations = state.get("feedback_iterations", 0)
iterations = (current_iterations if isinstance(current_iterations, int) else 0) + 1
max_iterations_raw = state.get("config", {}).get("max_feedback_iterations", 3)
max_iterations = (
int(max_iterations_raw) if isinstance(max_iterations_raw, (int, float, str)) else 3
)
return {
"feedback_iterations": iterations,
"max_iterations_reached": iterations >= max_iterations,
}
workflow.add_node("check_iterations", check_iterations)
# Define edges with iteration logic
workflow.set_entry_point("search")
workflow.add_edge("search", "synthesize")
workflow.add_edge("synthesize", "validate")
# After validation, check if feedback is needed
workflow.add_conditional_edges(
"validate",
route_after_validation,
{"prepare_feedback": "prepare_feedback", "complete": END},
)
workflow.add_edge("prepare_feedback", "human_feedback")
# After feedback, check if refinement is needed
workflow.add_conditional_edges(
"human_feedback", route_after_feedback, {"refine": "refine", "complete": END}
)
# After refinement, check iteration count
workflow.add_edge("refine", "check_iterations")
# Route based on iteration count
def route_after_iteration_check(
state: BusinessBuddyState,
) -> Literal["validate", "complete"]:
return "complete" if state.get("max_iterations_reached", False) else "validate"
workflow.add_conditional_edges(
"check_iterations",
route_after_iteration_check,
{
"validate": "validate", # Loop back for another round
"complete": END,
},
)
return workflow
if __name__ == "__main__":
import asyncio
# Run the example
asyncio.run(run_example())