如何用langgraph实现的循环嵌套子图功能?
摘要:目录 1. 环境准备 2. 统一约定(建议先看) 3. test1:单节点最小图 4. test2:单节点内做策略分流 5. test3:多节点串行执行 6. test4:条件分支图(add_conditional_edges) 7. te
目录
1. 环境准备
2. 统一约定(建议先看)
3. test1:单节点最小图
4. test2:单节点内做策略分流
5. test3:多节点串行执行
6. test4:条件分支图(add_conditional_edges)
7. test5:循环图(猜数字)
8. test6:父图 + 子图(字段不一致时怎么接)
9. 常见报错与排查
1. 环境准备
# 在当前 notebook 内核里安装/升级到 1.x
%pip install -U "langgraph>=1.0.0,<2.0.0"
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, List, NotRequired, Literal
2. 统一约定(建议先看)
2.1 节点函数尽量返回“增量字典”
推荐:
def node(state):
# 只返回这个节点更新的字段
return {"x": 123}
不推荐(虽然很多时候也能跑):
def node(state):
state["x"] = 123
return state
原因很简单:后面一旦有并发分支、reducer、子图拼接,返回增量更稳,也更容易看出每个节点到底改了什么。
2.2 START / END 写法
用 graph.add_edge(START, "nodeA") 指定入口
用 graph.add_edge("nodeZ", END) 指定结束
这套写法和 add_conditional_edges(...) 配合最顺手。
2.3 编译时机
add_node / add_edge / add_conditional_edges 都在 compile() 之前完成
图结构改了以后要重新 compile()
3. test1:单节点最小图
目标:输入名字,输出一句问候。
链路说明:START -> greeting -> END
功能说明:
输入:messages(用户名字或原始文案)。
输出:更新后的 messages(在前面拼接问候语)。
结构:只有 1 个节点 greeting,链路是 START -> greeting -> END。
适用场景:先验证环境、验证图是否能跑通、熟悉最小 StateGraph 代码骨架。
from langgraph.graph import StateGraph, START, END
from typing import TypedDict
class AgentState(TypedDict):
# 这个 demo 只有一个字段,最小可运行
messages: str
def greeting_node(state: AgentState):
# 节点里只做一件事:拼接问候语
# 返回增量字典,表示只更新 messages
return {"messages": "Hello! " + state["messages"]}
graph = StateGraph(AgentState)
# 注册节点:节点名 + 节点函数
graph.add_node("greeting", greeting_node)
# 入口从 START 进 greeting
graph.add_edge(START, "greeting")
# greeting 执行完就结束
graph.add_edge("greeting", END)
app = graph.compile()
result = app.invoke({"messages": "kunyashaw"})
print(result)
可视化:
from IPython.display import Image, display
display(Image(app.get_graph().draw_mermaid_png()))
4. test2:单节点内做策略分流
目标:同一个节点里按 operation 选择加法或乘法。
链路说明:START -> smartNode -> END(分流发生在 smartNode 节点函数内部)
功能说明:
输入:numList(数字列表)和 operation(+ 或 *)。
输出:theResult(计算结果)。
结构:图上仍是单节点,但节点内部有分支判断逻辑。
适用场景:不想把流程拆成多个节点,只想在一个节点里根据参数走不同算法。
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, List, NotRequired
class AgentState(TypedDict):
numList: List[int]
operation: str
# 运行后才写入的字段,用 NotRequired 更合理
theResult: NotRequired[int]
def smart_node(state: AgentState):
# 根据 operation 走不同逻辑
if state["operation"] == "+":
return {"theResult": sum(state["numList"])}
if state["operation"] == "*":
result = 1
for num in state["numList"]:
result *= num
return {"theResult": result}
# 明确抛错,避免默默返回不完整状态
raise ValueError("operation 仅支持 '+' 或 '*'")
graph = StateGraph(AgentState)
graph.add_node("smartNode", smart_node)
graph.add_edge(START, "smartNode")
graph.add_edge("smartNode", END)
app = graph.compile()
print(app.invoke({"numList": [1, 2, 3, 4], "operation": "+"})["theResult"]) # 10
print(app.invoke({"numList": [1, 2, 3, 4], "operation": "*"})["theResult"]) # 24
5. test3:多节点串行执行
目标:把一句自我介绍拆成 3 个节点按顺序执行。
链路说明:START -> firstNode -> secondNode -> thirdNode -> END
功能说明:
输入:name、age、skills。
输出:final(拼接后的完整介绍文本)。
结构:firstNode -> secondNode -> thirdNode 串行执行,每个节点只拼接一段。
适用场景:把一个大任务拆成多个可维护的小步骤,便于后续插入校验、日志或重试。
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, List, NotRequired
class AgentState(TypedDict):
name: str
age: int
skills: List[str]
final: NotRequired[str]
def first_node(state: AgentState):
# 第一段:名字
return {"final": f"Hi, I'm {state['name']}. "}
def second_node(state: AgentState):
# 第二段:年龄(读取前面节点写入的 final)
return {"final": state["final"] + f"I'm {state['age']} years old. "}
def third_node(state: AgentState):
# 第三段:技能
skills_text = ", ".join(state["skills"])
return {"final": state["final"] + f"My skills are {skills_text}."}
graph = StateGraph(AgentState)
graph.add_node("firstNode", first_node)
graph.add_node("secondNode", second_node)
graph.add_node("thirdNode", third_node)
graph.add_edge(START, "firstNode")
graph.add_edge("firstNode", "secondNode")
graph.add_edge("secondNode", "thirdNode")
graph.add_edge("thirdNode", END)
app = graph.compile()
result = app.invoke({
"name": "Alice",
"age": 30,
"skills": ["Python", "JavaScript", "PHP"],
})
print(result["final"])
6. test4:条件分支图(add_conditional_edges)
目标:第一段计算分一次支,第二段再分一次支。
链路说明:START -(route1)-> add_node1/substract_node -(route2)-> add_node2/substract_node2 -> END
下面用 1.x 比较干净的写法:路由函数直接返回“下一跳节点名”。
功能说明:
输入:两组运算参数(number1/number2/operation 和 number3/number4/operation2)。
输出:finalNumber、finalNumber2 两段运算结果。
结构:先做第一次路由(加/减),再做第二次路由(加/减),最后统一收敛到 END。
适用场景:一个流程中有多段决策,每段决策都由不同条件控制下一跳节点。
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, NotRequired, Literal
class AgentState(TypedDict):
number1: int
operation: str
number2: int
number3: int
operation2: str
number4: int
finalNumber: NotRequired[int]
finalNumber2: NotRequired[int]
def add_node1(state: AgentState):
return {"finalNumber": state["number1"] + state["number2"]}
def substract_node(state: AgentState):
return {"finalNumber": state["number1"] - state["number2"]}
def add_node2(state: AgentState):
return {"finalNumber2": state["number3"] + state["number4"]}
def substract_node2(state: AgentState):
return {"finalNumber2": state["number3"] - state["number4"]}
def route1(state: AgentState) -> Literal["add_node1", "substract_node"]:
# 第一段路由:根据 operation 决定走 + 还是 -
if state["operation"] == "+":
return "add_node1"
if state["operation"] == "-":
return "substract_node"
raise ValueError("operation 仅支持 + 或 -")
def route2(state: AgentState) -> Literal["add_node2", "substract_node2"]:
# 第二段路由:根据 operation2 决定走 + 还是 -
if state["operation2"] == "+":
return "add_node2"
if state["operation2"] == "-":
return "substract_node2"
raise ValueError("operation2 仅支持 + 或 -")
graph = StateGraph(AgentState)
graph.add_node("add_node1", add_node1)
graph.add_node("substract_node", substract_node)
graph.add_node("add_node2", add_node2)
graph.add_node("substract_node2", substract_node2)
# 从 START 直接做第一次条件路由
graph.add_conditional_edges(START, route1)
# 第一次分支算完后,进入第二次路由
graph.add_conditional_edges("add_node1", route2)
graph.add_conditional_edges("substract_node", route2)
# 第二次分支算完后结束
graph.add_edge("add_node2", END)
graph.add_edge("substract_node2", END)
app = graph.compile()
result = app.invoke({
"number1": 10,
"operation": "+",
"number2": 5,
"number3": 20,
"operation2": "-",
"number4": 8,
})
print(result["finalNumber"]) # 15
print(result["finalNumber2"]) # 12
7. test5:循环图(猜数字)
目标:初始化一次,然后反复猜,猜中或次数到上限就结束。
链路说明:START -> setUpNode -> guessNode -> hintRouteNode -> (continueBranch 回到 guessNode | endBranch 到 END)
功能说明:
输入:玩家名、上下界、初始尝试次数、初始猜测列表。
输出:最终 attempts、guesses,以及是否在上限内猜中。
结构:setUpNode 初始化后进入 guessNode,再通过 hintRoute 决定“继续猜”还是“结束”。
适用场景:需要循环执行同一批节点,直到满足退出条件。
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, List, Literal
import random
class AgentState(TypedDict):
name: str
target_number: int
guesses: List[int]
attempts: int
lower_bound: int
upper_bound: int
def setUpNode(state: AgentState):
# 开局初始化:重置猜测列表 + 随机目标数
target = random.randint(state["lower_bound"], state["upper_bound"])
print("target_number =", target)
return {"guesses": [], "target_number": target}
def guessNode(state: AgentState):
# 每次猜测:次数 +1,追加一个随机猜测
g = list(state["guesses"])
g.append(random.randint(state["lower_bound"], state["upper_bound"]))
print("guesses =", g)
return {"attempts": state["attempts"] + 1, "guesses": g}
def hintRoute(state: AgentState) -> Literal["endBranch", "continueBranch"]:
# 达到最大次数直接结束
if state["attempts"] >= 7:
print("Game Over! attempts exhausted.")
return "endBranch"
last_guess = state["guesses"][-1]
# 猜中直接结束
if last_guess == state["target_number"]:
print(f"{state['name']}, congratulations! You guessed it.")
return "endBranch"
# 没猜中继续循环
if last_guess < state["target_number"]:
print(f"{state['name']}, a little low.")
else:
print(f"{state['name']}, a little high.")
return "continueBranch"
graph = StateGraph(AgentState)
graph.add_node("setUpNode", setUpNode)
graph.add_node("guessNode", guessNode)
graph.add_node("hintRouteNode", lambda state: state) # 专门承接路由判断
graph.add_edge(START, "setUpNode")
graph.add_edge("setUpNode", "guessNode")
graph.add_edge("guessNode", "hintRouteNode")
graph.add_conditional_edges(
"hintRouteNode",
hintRoute,
{
"endBranch": END,
"continueBranch": "guessNode",
},
)
app = graph.compile()
result = app.invoke({
"name": "kunyashaw",
"target_number": 0, # 初始占位,setUpNode 里会被覆盖
"guesses": [],
"attempts": 0,
"lower_bound": 1,
"upper_bound": 20,
})
注意:循环图里不要再额外加 graph.add_edge("hintRouteNode", END),
否则会和条件分支逻辑打架,图意图也会变得不清楚。
8. test6:父图 + 子图(ChildState 版本)
这一版把“自我介绍子流程”拆进子图,然后在父图里先执行子图,再进入猜数字流程。
核心点不变:父图和子图字段不同,必须用一个适配节点做映射。
链路说明:父图 START -> childNode(内部调用子图) -> setUpNode -> guessNode -> hintRoute123 -> (continueBranch 回到 guessNode | endBranch 到 END);子图内部是 START -> firstNode -> secondNode -> thirdNode
功能说明:
输入:父图输入 AgentState,其中 childState 作为子图输入容器。
输出:childState.final(子图产物)+ 猜数字流程产生的 gusses/attempts/target_number。
结构:父图先走 childNode 调用子图,再进入猜数字循环;childNode 负责父子状态映射。
适用场景:一个主流程里嵌套可复用子流程,且两边状态字段不完全一致。
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, List
import random
# -----------------------------
# 子图状态(当前命名:ChildState)
# -----------------------------
class ChildState(TypedDict):
name: str
age: int
skills: List[str]
final: str
# -----------------------------
# 父图状态(当前命名:AgentState)
# childState 字段用于承接子图输入输出
# -----------------------------
class AgentState(TypedDict):
childState: ChildState
name: str
target_number: int
gusses: List[int] # 这里沿用 test6 当前字段名
attempts: int
lower_bound: int
upper_bound: int
# ===== 子图节点 =====
def first_node(state: ChildState) -> ChildState:
state["final"] = f"hi,i'm {state['name']}"
return state
def second_node(state: ChildState) -> ChildState:
state["final"] = state["final"] + f"i'm {state['age']} years old"
return state
def third_node(state: ChildState) -> ChildState:
state["final"] = state["final"] + f"my skills are {','.join(state['skills'])}"
return state
# ===== 父图节点 =====
def setUpNode(state: AgentState) -> AgentState:
state["gusses"] = []
state["target_number"] = random.randint(state["lower_bound"], state["upper_bound"])
print("the target_numebr is " + str(state["target_number"]))
return state
def guessNode(state: AgentState) -> AgentState:
state["attempts"] += 1
state["gusses"].append(random.randint(state["lower_bound"], state["upper_bound"]))
print("state[gusses])", state["gusses"])
return state
def hintRoute(state: AgentState) -> str:
if state["attempts"] == 7:
print("Game Over! You've used all your attempts.")
return "endBranch"
print("Good try! You've still have " + str(7 - state["attempts"]) + " attempts left.")
if state["gusses"][-1] < state["lower_bound"]:
print(state["name"] + ",you guess too low")
return "continueBranch"
elif state["gusses"][-1] > state["upper_bound"]:
print(state["name"] + ",you guess too high")
return "continueBranch"
elif state["gusses"][-1] == state["target_number"]:
print(state["name"] + ",congratulations! You guessed the number!")
return "endBranch"
elif state["gusses"][-1] < state["target_number"]:
print(state["name"] + " you are close ,you guess a little low")
return "continueBranch"
else:
print(state["name"] + " you are close ,you guess a little high")
return "continueBranch"
# -----------------------------
# 子图:先编译
# -----------------------------
subgraph = StateGraph(ChildState)
subgraph.add_node("firstNode", first_node)
subgraph.add_node("secondNode", second_node)
subgraph.add_node("thirdNode", third_node)
subgraph.add_edge(START, "firstNode")
subgraph.add_edge("firstNode", "secondNode")
subgraph.add_edge("secondNode", "thirdNode")
subApp = subgraph.compile()
# -----------------------------
# 适配节点:把父图 state["childState"] 交给子图
# -----------------------------
def childNode(state: AgentState):
child_out = subApp.invoke(state["childState"])
return {"childState": child_out}
# -----------------------------
# 父图:再编译
# -----------------------------
graph = StateGraph(AgentState)
graph.add_node("hintRoute123", lambda state: state)
graph.add_node("setUpNode", setUpNode)
graph.add_node("guessNode", guessNode)
graph.add_node("childNode", childNode)
graph.add_edge(START, "childNode")
graph.add_edge("childNode", "setUpNode")
graph.add_edge("setUpNode", "guessNode")
graph.add_edge("guessNode", "hintRoute123")
graph.add_conditional_edges(
"hintRoute123",
hintRoute,
{
"endBranch": END,
"continueBranch": "guessNode",
},
)
app = graph.compile()
child_state = {
"name": "Alice",
"age": 30,
"skills": ["Python", "JavaScript", "PHP"],
"final": "",
}
result = app.invoke({
"childState": child_state,
"name": "kunyashaw",
"target_number": 0,
"gusses": [], # 注意:和类定义保持一致
"attempts": 0,
"lower_bound": 1,
"upper_bound": 20,
})
8.1 两张图并排显示
test6 里主图和子图都值得看,直接左右排开会更直观:
from IPython.display import HTML, display
import base64
app_png = app.get_graph().draw_mermaid_png()
sub_png = subApp.get_graph().draw_mermaid_png()
def to_data_uri(png_bytes):
return "data:image/png;base64," + base64.b64encode(png_bytes).decode("utf-8")
display(HTML(f"""
<div style="display:flex; gap:16px; align-items:flex-start;">
<img src="{to_data_uri(sub_png)}" style="max-width:48%; height:auto; border:1px solid #ddd;" />
<img src="{to_data_uri(app_png)}" style="max-width:48%; height:auto; border:1px solid #ddd;" />
</div>
"""))
这个排版下,左边看子图,右边看父图,链路关系一眼就能对上。
9. 常见报错与排查
9.1 同时加了条件边和普通边导致流程异常
例如循环路由节点已经有 add_conditional_edges(...),就不要再额外 add_edge("routeNode", END)。
9.2 add_node / add_edge / add_conditional_edges 的先后
实操里推荐顺序:
先 add_node(...)
再 add_edge(...) 和 add_conditional_edges(...)
最后 compile()
这样最不容易在大图里看漏节点名拼写问题。
