LangGraph实战2026:用状态机思维构建复杂AI工作流
·
当你的Agent不只是"问一问、查一查、答一答",而是需要"条件分支、循环重试、并行执行、中途暂停等待人工确认"时,你就需要LangGraph了。
为什么需要LangGraph传统的LLM链(Chain)是线性的:输入 → 处理1 → 处理2 → 输出。这对简单任务够用,但遇到以下场景就捉襟见肘:- 条件逻辑:根据分析结果走不同处理路径- 循环执行:直到满足某个条件才停止- 并行处理:多个独立子任务同时执行- 人工介入:暂停工作流,等待人工审批- 错误恢复:失败后从某个检查点重试LangGraph将这些需求系统化地解决,核心思想是将Agent工作流建模为有向图(Directed Graph),每个节点是一个处理步骤,边是转换条件。## 核心概念LangGraph的三个核心概念:State(状态):工作流执行过程中的数据容器,所有节点共享。Node(节点):执行具体操作的函数,接收State,返回更新后的State。Edge(边):节点间的连接,可以是固定的(总是从A到B)或条件的(根据State决定去B还是C)。## 快速开始:一个代码审查工作流pythonfrom typing import TypedDict, List, Optional, Annotatedfrom langgraph.graph import StateGraph, ENDfrom langgraph.checkpoint.sqlite import SqliteSaverimport operator# 1. 定义状态class CodeReviewState(TypedDict): code: str # 待审查的代码 language: str # 编程语言 security_issues: List[str] # 安全问题列表 performance_issues: List[str] # 性能问题列表 style_issues: List[str] # 代码风格问题 overall_score: Optional[int] # 综合评分(0-100) review_complete: bool # 审查是否完成 human_approval_needed: bool # 是否需要人工确认 messages: Annotated[List, operator.add] # 对话历史(自动追加)# 2. 定义节点(每个节点是一个函数)async def detect_language(state: CodeReviewState) -> dict: """识别编程语言""" code = state["code"] # 简单的语言检测逻辑(实际可以用LLM) if "def " in code and "import " in code: language = "python" elif "function " in code or "const " in code: language = "javascript" else: language = "unknown" return {"language": language}async def check_security(state: CodeReviewState) -> dict: """安全问题检查""" prompt = f""" 检查以下{state['language']}代码的安全问题: {state[‘language’]} {state[‘code’]} 列出所有安全漏洞,如SQL注入、XSS、硬编码密码等。 如果没有问题,返回空列表。 以JSON数组格式输出,每项为字符串描述。 """ result = await llm.complete(prompt, response_format={"type": "json_object"}) issues = json.loads(result).get("issues", []) return {"security_issues": issues}async def check_performance(state: CodeReviewState) -> dict: """性能问题检查""" prompt = f""" 检查以下代码的性能问题: {state[‘language’]} {state[‘code’]} 重点检查:N+1查询、不必要的循环、内存泄漏等。 以JSON数组格式输出问题列表。 """ result = await llm.complete(prompt, response_format={"type": "json_object"}) issues = json.loads(result).get("issues", []) return {"performance_issues": issues}async def check_style(state: CodeReviewState) -> dict: """代码风格检查""" prompt = f""" 检查以下代码的风格和可读性问题: {state[‘language’]} {state[‘code’]} 检查命名规范、注释完整性、代码结构等。 以JSON数组格式输出问题列表。 """ result = await llm.complete(prompt, response_format={"type": "json_object"}) issues = json.loads(result).get("issues", []) return {"style_issues": issues}async def calculate_score(state: CodeReviewState) -> dict: """根据各项检查结果计算综合评分""" security_count = len(state["security_issues"]) performance_count = len(state["performance_issues"]) style_count = len(state["style_issues"]) # 评分逻辑 base_score = 100 base_score -= security_count * 20 # 安全问题扣分最重 base_score -= performance_count * 10 base_score -= style_count * 5 score = max(0, base_score) # 严重安全问题需要人工审查 human_needed = security_count > 0 return { "overall_score": score, "human_approval_needed": human_needed, "review_complete": not human_needed # 没有安全问题则直接完成 }async def generate_report(state: CodeReviewState) -> dict: """生成最终审查报告""" report = f"""# 代码审查报告## 综合评分:{state['overall_score']}/100## 安全问题({len(state['security_issues'])}个){chr(10).join(['- ' + issue for issue in state['security_issues']]) or '无'}## 性能问题({len(state['performance_issues'])}个){chr(10).join(['- ' + issue for issue in state['performance_issues']]) or '无'}## 代码风格({len(state['style_issues'])}个){chr(10).join(['- ' + issue for issue in state['style_issues']]) or '无'}""" return { "messages": [{"role": "assistant", "content": report}], "review_complete": True }# 3. 定义条件函数(决定走哪条边)def should_human_review(state: CodeReviewState) -> str: """判断是否需要人工审查""" if state.get("human_approval_needed"): return "human_review" # 需要人工介入 else: return "generate_report" # 直接生成报告# 4. 构建图def build_review_graph(): # 创建状态图 workflow = StateGraph(CodeReviewState) # 添加节点 workflow.add_node("detect_language", detect_language) workflow.add_node("check_security", check_security) workflow.add_node("check_performance", check_performance) workflow.add_node("check_style", check_style) workflow.add_node("calculate_score", calculate_score) workflow.add_node("generate_report", generate_report) # 设置入口节点 workflow.set_entry_point("detect_language") # 语言检测完成后,并行执行三个检查 workflow.add_edge("detect_language", "check_security") workflow.add_edge("detect_language", "check_performance") workflow.add_edge("detect_language", "check_style") # 三个检查都完成后,汇聚到评分节点 # 注:LangGraph会等待所有前置节点完成 workflow.add_edge("check_security", "calculate_score") workflow.add_edge("check_performance", "calculate_score") workflow.add_edge("check_style", "calculate_score") # 条件边:根据是否需要人工审查分叉 workflow.add_conditional_edges( "calculate_score", should_human_review, { "human_review": END, # 暂停,等待人工 "generate_report": "generate_report" # 直接生成报告 } ) workflow.add_edge("generate_report", END) # 编译图(可选:添加持久化检查点支持断点续传) checkpointer = SqliteSaver.from_conn_string("code_review.db") return workflow.compile(checkpointer=checkpointer)## 人工介入(Human-in-the-Loop)LangGraph最强大的特性之一是支持在工作流中暂停等待人工操作:pythonfrom langgraph.graph import interruptasync def human_review_node(state: CodeReviewState) -> dict: """人工审查节点:暂停工作流,等待人工决策""" # 展示需要人工审查的信息 security_summary = "\n".join(state["security_issues"]) # interrupt会暂停工作流,等待外部输入 human_decision = interrupt( value={ "type": "human_review_required", "message": f"发现{len(state['security_issues'])}个安全问题,需要人工确认:\n{security_summary}", "options": ["approve", "reject", "request_fix"] } ) # 根据人工决策继续处理 if human_decision == "approve": return {"review_complete": True, "human_approval_needed": False} elif human_decision == "reject": return {"review_complete": False, "overall_score": 0} else: # request_fix return {"review_complete": False, "human_approval_needed": False}# 在主工作流中添加此节点workflow.add_node("human_review", human_review_node)# 恢复被暂停的工作流async def resume_after_human_review(thread_id: str, human_decision: str): """人工完成审查后恢复工作流""" graph = build_review_graph() # 提供人工输入,恢复工作流 config = {"configurable": {"thread_id": thread_id}} async for event in graph.astream( Command(resume=human_decision), # 恢复命令 config=config ): print(event)## 循环和重试模式pythonfrom langgraph.graph import StateGraph, ENDclass ResearchState(TypedDict): query: str search_results: List[dict] current_answer: Optional[str] confidence_score: float iteration_count: int max_iterations: intasync def search_node(state: ResearchState) -> dict: """搜索节点""" results = await search_engine.search(state["query"]) return {"search_results": results}async def analyze_node(state: ResearchState) -> dict: """分析搜索结果,生成答案和置信度""" prompt = f""" 基于以下搜索结果回答问题:{state['query']} 搜索结果: {json.dumps(state['search_results'], ensure_ascii=False)} 以JSON格式输出: - answer: 答案 - confidence: 置信度(0-1) - needs_more_research: 是否需要更多资料(bool) """ result = json.loads(await llm.complete(prompt)) return { "current_answer": result["answer"], "confidence_score": result["confidence"], "iteration_count": state["iteration_count"] + 1 }def should_continue_research(state: ResearchState) -> str: """决定是否继续搜索""" if state["confidence_score"] >= 0.85: return "sufficient" # 置信度够高,完成 elif state["iteration_count"] >= state["max_iterations"]: return "max_reached" # 达到最大迭代次数,强制完成 else: return "need_more" # 需要继续搜索# 构建带循环的图research_graph = StateGraph(ResearchState)research_graph.add_node("search", search_node)research_graph.add_node("analyze", analyze_node)research_graph.set_entry_point("search")research_graph.add_edge("search", "analyze")# 条件边实现循环research_graph.add_conditional_edges( "analyze", should_continue_research, { "need_more": "search", # 回到搜索节点,形成循环 "sufficient": END, "max_reached": END })## 并行执行与结果聚合pythonfrom langgraph.constants import Sendclass MultiAgentState(TypedDict): main_task: str subtasks: List[str] subtask_results: dictasync def task_decomposer(state: MultiAgentState) -> dict: """将主任务分解为子任务""" prompt = f"将以下任务分解为3-5个独立的子任务:{state['main_task']}" subtasks = json.loads(await llm.complete(prompt)) return {"subtasks": subtasks}# 使用Send实现动态并行def spawn_subtask_agents(state: MultiAgentState): """为每个子任务创建独立的执行节点""" return [ Send("execute_subtask", {"task": task, "task_id": i}) for i, task in enumerate(state["subtasks"]) ]async def execute_subtask(state: dict) -> dict: """执行单个子任务""" result = await specialized_agent.run(state["task"]) return {"subtask_results": {state["task_id"]: result}}async def aggregate_results(state: MultiAgentState) -> dict: """聚合所有子任务结果""" all_results = state["subtask_results"] synthesis_prompt = f""" 综合以下子任务结果,生成最终答案: 主任务:{state['main_task']} 子任务结果: {json.dumps(all_results, ensure_ascii=False)} """ final_answer = await llm.complete(synthesis_prompt) return {"final_answer": final_answer}# 构建并行图parallel_graph = StateGraph(MultiAgentState)parallel_graph.add_node("decompose", task_decomposer)parallel_graph.add_node("execute_subtask", execute_subtask)parallel_graph.add_node("aggregate", aggregate_results)parallel_graph.set_entry_point("decompose")# 动态生成并行边parallel_graph.add_conditional_edges("decompose", spawn_subtask_agents)parallel_graph.add_edge("execute_subtask", "aggregate")parallel_graph.add_edge("aggregate", END)## 持久化与断点续传python# 生产环境使用PostgreSQL作为检查点存储from langgraph.checkpoint.postgres.aio import AsyncPostgresSaverasync def build_production_graph(): async with AsyncPostgresSaver.from_conn_string( "postgresql://user:pass@localhost/langgraph_db" ) as saver: graph = build_review_graph() compiled = graph.compile(checkpointer=saver) return compiled# 启动工作流(指定thread_id以支持断点续传)async def run_workflow(code: str, thread_id: str): graph = await build_production_graph() config = {"configurable": {"thread_id": thread_id}} initial_state = { "code": code, "security_issues": [], "performance_issues": [], "style_issues": [], "overall_score": None, "review_complete": False, "human_approval_needed": False, "messages": [], "iteration_count": 0, "max_iterations": 5, "language": "" } async for event in graph.astream(initial_state, config=config): print(f"节点 {list(event.keys())[0]} 执行完成") # 获取最终状态 final_state = await graph.aget_state(config) return final_state.values## 何时用LangGraph vs 简单Chain| 场景 | 简单Chain | LangGraph ||------|-----------|-----------|| 线性流水线 | ✅ 推荐 | 过度设计 || 条件分支 | ❌ 难实现 | ✅ 推荐 || 循环重试 | ❌ 难实现 | ✅ 推荐 || 并行执行 | ❌ | ✅ 推荐 || 人工介入 | ❌ | ✅ 推荐 || 断点续传 | ❌ | ✅ 推荐 || 复杂多Agent | ❌ | ✅ 推荐 |LangGraph的学习曲线比普通Chain高,但在需要复杂流程控制的场景下,它能让代码逻辑清晰得多。用状态图来表达复杂工作流,远比用嵌套的if/else和try/except清晰。
更多推荐



所有评论(0)