如何用langgraph实现的循环嵌套子图功能?

摘要:目录 1. 环境准备 2. 统一约定(建议先看) 3. test1:单节点最小图 4. test2:单节点内做策略分流 5. test3:多节点串行执行 6. test4:条件分支图(add_conditional_edges) 7. te
目录 1. 环境准备 2. 统一约定(建议先看) 3. test1:单节点最小图 4. test2:单节点内做策略分流 5. test3:多节点串行执行 6. test4:条件分支图(add_conditional_edges) 7. test5:循环图(猜数字) 8. test6:父图 + 子图(字段不一致时怎么接) 9. 常见报错与排查 1. 环境准备 # 在当前 notebook 内核里安装/升级到 1.x %pip install -U "langgraph>=1.0.0,<2.0.0" from langgraph.graph import StateGraph, START, END from typing import TypedDict, List, NotRequired, Literal 2. 统一约定(建议先看) 2.1 节点函数尽量返回“增量字典” 推荐: def node(state): # 只返回这个节点更新的字段 return {"x": 123} 不推荐(虽然很多时候也能跑): def node(state): state["x"] = 123 return state 原因很简单:后面一旦有并发分支、reducer、子图拼接,返回增量更稳,也更容易看出每个节点到底改了什么。 2.2 START / END 写法 用 graph.add_edge(START, "nodeA") 指定入口 用 graph.add_edge("nodeZ", END) 指定结束 这套写法和 add_conditional_edges(...) 配合最顺手。 2.3 编译时机 add_node / add_edge / add_conditional_edges 都在 compile() 之前完成 图结构改了以后要重新 compile() 3. test1:单节点最小图 目标:输入名字,输出一句问候。 链路说明:START -> greeting -> END 功能说明: 输入:messages(用户名字或原始文案)。 输出:更新后的 messages(在前面拼接问候语)。 结构:只有 1 个节点 greeting,链路是 START -> greeting -> END。 适用场景:先验证环境、验证图是否能跑通、熟悉最小 StateGraph 代码骨架。 from langgraph.graph import StateGraph, START, END from typing import TypedDict class AgentState(TypedDict): # 这个 demo 只有一个字段,最小可运行 messages: str def greeting_node(state: AgentState): # 节点里只做一件事:拼接问候语 # 返回增量字典,表示只更新 messages return {"messages": "Hello! " + state["messages"]} graph = StateGraph(AgentState) # 注册节点:节点名 + 节点函数 graph.add_node("greeting", greeting_node) # 入口从 START 进 greeting graph.add_edge(START, "greeting") # greeting 执行完就结束 graph.add_edge("greeting", END) app = graph.compile() result = app.invoke({"messages": "kunyashaw"}) print(result) 可视化: from IPython.display import Image, display display(Image(app.get_graph().draw_mermaid_png())) 4. test2:单节点内做策略分流 目标:同一个节点里按 operation 选择加法或乘法。 链路说明:START -> smartNode -> END(分流发生在 smartNode 节点函数内部) 功能说明: 输入:numList(数字列表)和 operation(+ 或 *)。 输出:theResult(计算结果)。 结构:图上仍是单节点,但节点内部有分支判断逻辑。 适用场景:不想把流程拆成多个节点,只想在一个节点里根据参数走不同算法。 from langgraph.graph import StateGraph, START, END from typing import TypedDict, List, NotRequired class AgentState(TypedDict): numList: List[int] operation: str # 运行后才写入的字段,用 NotRequired 更合理 theResult: NotRequired[int] def smart_node(state: AgentState): # 根据 operation 走不同逻辑 if state["operation"] == "+": return {"theResult": sum(state["numList"])} if state["operation"] == "*": result = 1 for num in state["numList"]: result *= num return {"theResult": result} # 明确抛错,避免默默返回不完整状态 raise ValueError("operation 仅支持 '+' 或 '*'") graph = StateGraph(AgentState) graph.add_node("smartNode", smart_node) graph.add_edge(START, "smartNode") graph.add_edge("smartNode", END) app = graph.compile() print(app.invoke({"numList": [1, 2, 3, 4], "operation": "+"})["theResult"]) # 10 print(app.invoke({"numList": [1, 2, 3, 4], "operation": "*"})["theResult"]) # 24 5. test3:多节点串行执行 目标:把一句自我介绍拆成 3 个节点按顺序执行。 链路说明:START -> firstNode -> secondNode -> thirdNode -> END 功能说明: 输入:name、age、skills。 输出:final(拼接后的完整介绍文本)。 结构:firstNode -> secondNode -> thirdNode 串行执行,每个节点只拼接一段。 适用场景:把一个大任务拆成多个可维护的小步骤,便于后续插入校验、日志或重试。 from langgraph.graph import StateGraph, START, END from typing import TypedDict, List, NotRequired class AgentState(TypedDict): name: str age: int skills: List[str] final: NotRequired[str] def first_node(state: AgentState): # 第一段:名字 return {"final": f"Hi, I'm {state['name']}. "} def second_node(state: AgentState): # 第二段:年龄(读取前面节点写入的 final) return {"final": state["final"] + f"I'm {state['age']} years old. "} def third_node(state: AgentState): # 第三段:技能 skills_text = ", ".join(state["skills"]) return {"final": state["final"] + f"My skills are {skills_text}."} graph = StateGraph(AgentState) graph.add_node("firstNode", first_node) graph.add_node("secondNode", second_node) graph.add_node("thirdNode", third_node) graph.add_edge(START, "firstNode") graph.add_edge("firstNode", "secondNode") graph.add_edge("secondNode", "thirdNode") graph.add_edge("thirdNode", END) app = graph.compile() result = app.invoke({ "name": "Alice", "age": 30, "skills": ["Python", "JavaScript", "PHP"], }) print(result["final"]) 6. test4:条件分支图(add_conditional_edges) 目标:第一段计算分一次支,第二段再分一次支。 链路说明:START -(route1)-> add_node1/substract_node -(route2)-> add_node2/substract_node2 -> END 下面用 1.x 比较干净的写法:路由函数直接返回“下一跳节点名”。 功能说明: 输入:两组运算参数(number1/number2/operation 和 number3/number4/operation2)。 输出:finalNumber、finalNumber2 两段运算结果。 结构:先做第一次路由(加/减),再做第二次路由(加/减),最后统一收敛到 END。 适用场景:一个流程中有多段决策,每段决策都由不同条件控制下一跳节点。 from langgraph.graph import StateGraph, START, END from typing import TypedDict, NotRequired, Literal class AgentState(TypedDict): number1: int operation: str number2: int number3: int operation2: str number4: int finalNumber: NotRequired[int] finalNumber2: NotRequired[int] def add_node1(state: AgentState): return {"finalNumber": state["number1"] + state["number2"]} def substract_node(state: AgentState): return {"finalNumber": state["number1"] - state["number2"]} def add_node2(state: AgentState): return {"finalNumber2": state["number3"] + state["number4"]} def substract_node2(state: AgentState): return {"finalNumber2": state["number3"] - state["number4"]} def route1(state: AgentState) -> Literal["add_node1", "substract_node"]: # 第一段路由:根据 operation 决定走 + 还是 - if state["operation"] == "+": return "add_node1" if state["operation"] == "-": return "substract_node" raise ValueError("operation 仅支持 + 或 -") def route2(state: AgentState) -> Literal["add_node2", "substract_node2"]: # 第二段路由:根据 operation2 决定走 + 还是 - if state["operation2"] == "+": return "add_node2" if state["operation2"] == "-": return "substract_node2" raise ValueError("operation2 仅支持 + 或 -") graph = StateGraph(AgentState) graph.add_node("add_node1", add_node1) graph.add_node("substract_node", substract_node) graph.add_node("add_node2", add_node2) graph.add_node("substract_node2", substract_node2) # 从 START 直接做第一次条件路由 graph.add_conditional_edges(START, route1) # 第一次分支算完后,进入第二次路由 graph.add_conditional_edges("add_node1", route2) graph.add_conditional_edges("substract_node", route2) # 第二次分支算完后结束 graph.add_edge("add_node2", END) graph.add_edge("substract_node2", END) app = graph.compile() result = app.invoke({ "number1": 10, "operation": "+", "number2": 5, "number3": 20, "operation2": "-", "number4": 8, }) print(result["finalNumber"]) # 15 print(result["finalNumber2"]) # 12 7. test5:循环图(猜数字) 目标:初始化一次,然后反复猜,猜中或次数到上限就结束。 链路说明:START -> setUpNode -> guessNode -> hintRouteNode -> (continueBranch 回到 guessNode | endBranch 到 END) 功能说明: 输入:玩家名、上下界、初始尝试次数、初始猜测列表。 输出:最终 attempts、guesses,以及是否在上限内猜中。 结构:setUpNode 初始化后进入 guessNode,再通过 hintRoute 决定“继续猜”还是“结束”。 适用场景:需要循环执行同一批节点,直到满足退出条件。 from langgraph.graph import StateGraph, START, END from typing import TypedDict, List, Literal import random class AgentState(TypedDict): name: str target_number: int guesses: List[int] attempts: int lower_bound: int upper_bound: int def setUpNode(state: AgentState): # 开局初始化:重置猜测列表 + 随机目标数 target = random.randint(state["lower_bound"], state["upper_bound"]) print("target_number =", target) return {"guesses": [], "target_number": target} def guessNode(state: AgentState): # 每次猜测:次数 +1,追加一个随机猜测 g = list(state["guesses"]) g.append(random.randint(state["lower_bound"], state["upper_bound"])) print("guesses =", g) return {"attempts": state["attempts"] + 1, "guesses": g} def hintRoute(state: AgentState) -> Literal["endBranch", "continueBranch"]: # 达到最大次数直接结束 if state["attempts"] >= 7: print("Game Over! attempts exhausted.") return "endBranch" last_guess = state["guesses"][-1] # 猜中直接结束 if last_guess == state["target_number"]: print(f"{state['name']}, congratulations! You guessed it.") return "endBranch" # 没猜中继续循环 if last_guess < state["target_number"]: print(f"{state['name']}, a little low.") else: print(f"{state['name']}, a little high.") return "continueBranch" graph = StateGraph(AgentState) graph.add_node("setUpNode", setUpNode) graph.add_node("guessNode", guessNode) graph.add_node("hintRouteNode", lambda state: state) # 专门承接路由判断 graph.add_edge(START, "setUpNode") graph.add_edge("setUpNode", "guessNode") graph.add_edge("guessNode", "hintRouteNode") graph.add_conditional_edges( "hintRouteNode", hintRoute, { "endBranch": END, "continueBranch": "guessNode", }, ) app = graph.compile() result = app.invoke({ "name": "kunyashaw", "target_number": 0, # 初始占位,setUpNode 里会被覆盖 "guesses": [], "attempts": 0, "lower_bound": 1, "upper_bound": 20, }) 注意:循环图里不要再额外加 graph.add_edge("hintRouteNode", END), 否则会和条件分支逻辑打架,图意图也会变得不清楚。 8. test6:父图 + 子图(ChildState 版本) 这一版把“自我介绍子流程”拆进子图,然后在父图里先执行子图,再进入猜数字流程。 核心点不变:父图和子图字段不同,必须用一个适配节点做映射。 链路说明:父图 START -> childNode(内部调用子图) -> setUpNode -> guessNode -> hintRoute123 -> (continueBranch 回到 guessNode | endBranch 到 END);子图内部是 START -> firstNode -> secondNode -> thirdNode 功能说明: 输入:父图输入 AgentState,其中 childState 作为子图输入容器。 输出:childState.final(子图产物)+ 猜数字流程产生的 gusses/attempts/target_number。 结构:父图先走 childNode 调用子图,再进入猜数字循环;childNode 负责父子状态映射。 适用场景:一个主流程里嵌套可复用子流程,且两边状态字段不完全一致。 from langgraph.graph import StateGraph, START, END from typing import TypedDict, List import random # ----------------------------- # 子图状态(当前命名:ChildState) # ----------------------------- class ChildState(TypedDict): name: str age: int skills: List[str] final: str # ----------------------------- # 父图状态(当前命名:AgentState) # childState 字段用于承接子图输入输出 # ----------------------------- class AgentState(TypedDict): childState: ChildState name: str target_number: int gusses: List[int] # 这里沿用 test6 当前字段名 attempts: int lower_bound: int upper_bound: int # ===== 子图节点 ===== def first_node(state: ChildState) -> ChildState: state["final"] = f"hi,i'm {state['name']}" return state def second_node(state: ChildState) -> ChildState: state["final"] = state["final"] + f"i'm {state['age']} years old" return state def third_node(state: ChildState) -> ChildState: state["final"] = state["final"] + f"my skills are {','.join(state['skills'])}" return state # ===== 父图节点 ===== def setUpNode(state: AgentState) -> AgentState: state["gusses"] = [] state["target_number"] = random.randint(state["lower_bound"], state["upper_bound"]) print("the target_numebr is " + str(state["target_number"])) return state def guessNode(state: AgentState) -> AgentState: state["attempts"] += 1 state["gusses"].append(random.randint(state["lower_bound"], state["upper_bound"])) print("state[gusses])", state["gusses"]) return state def hintRoute(state: AgentState) -> str: if state["attempts"] == 7: print("Game Over! You've used all your attempts.") return "endBranch" print("Good try! You've still have " + str(7 - state["attempts"]) + " attempts left.") if state["gusses"][-1] < state["lower_bound"]: print(state["name"] + ",you guess too low") return "continueBranch" elif state["gusses"][-1] > state["upper_bound"]: print(state["name"] + ",you guess too high") return "continueBranch" elif state["gusses"][-1] == state["target_number"]: print(state["name"] + ",congratulations! You guessed the number!") return "endBranch" elif state["gusses"][-1] < state["target_number"]: print(state["name"] + " you are close ,you guess a little low") return "continueBranch" else: print(state["name"] + " you are close ,you guess a little high") return "continueBranch" # ----------------------------- # 子图:先编译 # ----------------------------- subgraph = StateGraph(ChildState) subgraph.add_node("firstNode", first_node) subgraph.add_node("secondNode", second_node) subgraph.add_node("thirdNode", third_node) subgraph.add_edge(START, "firstNode") subgraph.add_edge("firstNode", "secondNode") subgraph.add_edge("secondNode", "thirdNode") subApp = subgraph.compile() # ----------------------------- # 适配节点:把父图 state["childState"] 交给子图 # ----------------------------- def childNode(state: AgentState): child_out = subApp.invoke(state["childState"]) return {"childState": child_out} # ----------------------------- # 父图:再编译 # ----------------------------- graph = StateGraph(AgentState) graph.add_node("hintRoute123", lambda state: state) graph.add_node("setUpNode", setUpNode) graph.add_node("guessNode", guessNode) graph.add_node("childNode", childNode) graph.add_edge(START, "childNode") graph.add_edge("childNode", "setUpNode") graph.add_edge("setUpNode", "guessNode") graph.add_edge("guessNode", "hintRoute123") graph.add_conditional_edges( "hintRoute123", hintRoute, { "endBranch": END, "continueBranch": "guessNode", }, ) app = graph.compile() child_state = { "name": "Alice", "age": 30, "skills": ["Python", "JavaScript", "PHP"], "final": "", } result = app.invoke({ "childState": child_state, "name": "kunyashaw", "target_number": 0, "gusses": [], # 注意:和类定义保持一致 "attempts": 0, "lower_bound": 1, "upper_bound": 20, }) 8.1 两张图并排显示 test6 里主图和子图都值得看,直接左右排开会更直观: from IPython.display import HTML, display import base64 app_png = app.get_graph().draw_mermaid_png() sub_png = subApp.get_graph().draw_mermaid_png() def to_data_uri(png_bytes): return "data:image/png;base64," + base64.b64encode(png_bytes).decode("utf-8") display(HTML(f""" <div style="display:flex; gap:16px; align-items:flex-start;"> <img src="{to_data_uri(sub_png)}" style="max-width:48%; height:auto; border:1px solid #ddd;" /> <img src="{to_data_uri(app_png)}" style="max-width:48%; height:auto; border:1px solid #ddd;" /> </div> """)) 这个排版下,左边看子图,右边看父图,链路关系一眼就能对上。 9. 常见报错与排查 9.1 同时加了条件边和普通边导致流程异常 例如循环路由节点已经有 add_conditional_edges(...),就不要再额外 add_edge("routeNode", END)。 9.2 add_node / add_edge / add_conditional_edges 的先后 实操里推荐顺序: 先 add_node(...) 再 add_edge(...) 和 add_conditional_edges(...) 最后 compile() 这样最不容易在大图里看漏节点名拼写问题。