如何将LCEL从序列执行改为并行执行?

摘要:1. 引言 在 LLM 应用的开发浪潮中,LangChain 框架迅速成为最受欢迎的工具之一。随着应用的主键复杂化,复杂的 RAG、智能体(Agent)、多步骤推理等需求变得普遍,早期的编程范式逐渐显露出了局限性,面向对象的链式构建方式在应
1. 引言 在 LLM 应用的开发浪潮中,LangChain 框架迅速成为最受欢迎的工具之一。随着应用的主键复杂化,复杂的 RAG、智能体(Agent)、多步骤推理等需求变得普遍,早期的编程范式逐渐显露出了局限性,面向对象的链式构建方式在应对这种复杂性时显得非常笨拙和难以维护。所以 LangChain 在 2023 年 8 月推出了 LCEL(LangChain Expression Language, LangChain 表达式语言),进行了范式的升级。 1.1. 从命令式到声明式 在没有 LCEL 之前,构建链的方式很传统,通常依赖于命令式的、面向对象的方式,例如使用 LLMChain 类,开发者需要实例化类,配置参数,然后显式地调用执行方法,就像用基本的积木一块一块地手动拼接。这种方式虽然直观,但在构建复杂应用时会导致代码冗长、逻辑流难以追踪,并且深度定制化相对困难。 LCEL 引入了一种声明式的组合方法,开发者不再需要详细描述“如何”执行每一步,而是通过一种简洁的语法来描述“做什么”——定义数据应该如何在一个组件网络中流动。这种声明式的特性将执行优化的责任交给了 LangChain 框架本身。开发者只需构建一个由 Runnable 对象组成的计算图,框架就能在运行时智能地决定最高效的执行策略。这不仅极大地简化了代码,也为一系列强大的生产级特性奠定了基础。 做个对比吧。 LLMChain 的工作很简单,接受一个输入,使用提示模板(PromptTemplate)格式化输入,然后将格式化后的提示词发送给 LLM,最后返回 LLM 的输出。 from langchain.llms import OpenAI from langchain.prompts import PromptTemplate from langchain.chains import LLMChain # 1. 实例化 LLM 模型 llm = OpenAI(temperature = 0.7) # 2. 构建提示词模板 prompt = PromptTemplate( input_variables = ["product"], template = "What is a good name for a company that makes {product}?", ) # 3. 构建链,将零件通过参数传入 chain = LLMChain(llm = llm, prompt = prompt) # 4. 运行链,显示调用 run() result = chain.run("colorful socks") print(result) 可以看到有这么几个问题: 很简单的调用 LLM 回答问题,也需要四个步骤,那么对于复杂应用,嵌套和组合多个链会让代码变得很长 业务逻辑流程其实是比较难追踪的。比如 A 链的输出要作为 B 链的输入,就需要手动处理这个传递过程,当这类过程变得复杂,整体结构会变得不清晰。 若要深度定制也会有困难。如果想在链的执行过程中加入一些自定义的逻辑,比如对 LLM 的输出进行清洗之后再传递给下一步,通常就需要创建一个全新的、继承自 LLMChain 的子类,重写它的方法。这就显得很重了。 现在同样的需求,使用 LCEL 写一遍。 # 使用新的模块 from langchain_core.prompts import PromptTemplate from langchain_openai import ChatOpenAI # 提示词与 LLM 实例化与之前类似 prompt = PromptTemplate.from_template( "What is a good name for a company that makes {product}?" ) llm = ChatOpentAI(temperature = 0.7) # 使用 | 操作符声明式组合链 # 含义:创建一个流程,数据先流向 prompt 模板进行格式化,然后其结果自动流向 llm 模型进行调用 chain = prompt | llm # 使用统一的 invoke 方法执行 result = chain.invoke({"product": "colorful socks"}) print(result.content) 代码看起来真的清爽很多。 重点在 | 操作符,意味着『将左边组件的输出,作为右边组件的输入』。 以及 LCEL 链有一个统一的接口 .invoke() 进行执行。 LCEL 引入了一种全新的声明式编程模型,不用再关心“如何一步步执行”,而是声明”各个组件之间关系如何连接“,LCEL 会自动构建执行流程。 特性 LLMChain LCEL 优势 范式 命令式、面向对象 声明式、函数式 LCEL 更简洁,专注于定义“是什么”而不是“怎么做” 组合方式 通过类的构造函数传入参数 使用 ` ` 管道操作符 代码长度 冗长,步骤多 简洁 链的复杂度越高,LCEL 的优势越明显 逻辑清晰度 逻辑分散在多个实例化和调用中 逻辑在管道中一目了然 数据流清晰可见,易于理解和调试 定制化 困难,需创建子类 简单,使用标准接口 可以轻松插入任何实现了 .invoke() 方法的自定义函数到管道中 功能特性 基础 内置强大功能 LCEL 链原生支持异步、流式输出、批量处理等,无需额外代码 1.2. LCEL 原生能力 LCEL 之所以被誉为“为生产而生”,是因为它为所有通过它构建的链赋予了一系列开箱即用的核心能力,而这些能力在传统编程模式下需要大量的手动实现: 异步支持 任何使用 LCEL 构建的链都天然支持异步调用。通过 invoke()、abatch()、astream() 等异步方法,可以轻松的将 LangChain 应用集成到现代异步 Python Web 框架中比如 FastAPI,构建能处理高并发请求的服务。 并行执行 当数据流中存在可以并发处理分支的时候,LCEL 会自动利用 RunnableParallel 或 batch API 来并行执行这些任务,对于 IO 密集型操作比如多次调用 API,能显著降低应用的整体延迟。 流式处理 流式响应是提升 LLM 应用用户体验的关键。LCEL 让流式处理变得十分简单。任何链都可以通过 stream() 或这是 astream() 方法进行调用,框架会自动处理从模型到最终输出的整个过程的数据流,以最小化“首个令牌时间”(Time-to-First-Token)让用户能即时看到结果的生成过程。 统一接口 LCEL 的所有组件都遵循一个名为 Runnable 的标准协议。无论是模型、提示模板、输出解析器还是检索器,他们都共享一套标准方法,比如 invoke()、batch()、stream() 等。这种一致性大大增强了代码的可预测性和可组合性,让开发者可以像拼接乐高积木一样构建复杂的应用。 LCEL 的诞生并非简单的语法升级,而是 LangChain 框架走向成熟的标志。它体现了一种控制反转的设计哲学,即开发者将执行流程的控制权交给框架,以换取生产环境所必须的性能、健壮性和可观测性(比如与 LangSmith 平台的无缝集成),通过定义一个声明式的数据流图,框架获得了对整个执行过程的全局视角,从而能够施展那些在命令式代码中难以自动实现的优化,例如在遇到并行步骤时自动启用线程池(同步执行)或利用 asyncio.gather(异步执行) 2. RunnableSequence 与 | 管道操作符 LCEL 有两个核心概念: 作为通用接口的 Runnable 协议 用于顺序组合的 RunnableSequence 2.1. Runnable 协议 在 LCEL 的世界里,万物皆 Runnable。从最基础的提示词模板(PromptTemplate)和语言模型(LLM/ChatModel)到功能性的组件比如输出解析器(OutputParser)和文档检索器(Retriever)都实现了 Runnable 接口,这意味着他们都遵循一套共同的交互标准。即拥有了: invoke,单输入单输出 batch,多输入多输出 stream,流式输出 ...等等标准方法 正是这种统一的接口,使得不同的组件可以被自由、无缝的组合在一起,构成 LCEL 强大可组合的基础。 2.2. RunnableSequence RunnableSequence is the most important composition operator in LangChain as it is used in virtually every chain. RunnableSequence 是 LangChain 中最重要的组合操作符,因为它几乎在每个链中都使用。 上面是官方文档原文。 RunnableSequence 的作用非常纯粹:将多个 Runnable 组件按照顺序链接起来,形成一个线性的处理管道,在这个管道中前一个组件的输出会直接作为后一个组件的输入。 可以通过构造函数 RunnableSequence(first=..., middle=..., last=...) 或 RunnableSequence(steps=[...]) 来显式地创建一个序列,但实际开发中几乎不这么用。绝大多数情况下都会使用一种更简洁、更具表达力的语法糖,即管道操作符 |。 2.3. | 操作符:优雅背后的魔法 | 管道操作符允许开发者以一种极其直观的方式将 Runnable 组件链接在一起,就像在 Linux 命令行中使用管道一样。例如: chain = prompt | model | output_parser 这种优雅语法的背后是 Python 的操作符重载机制在发挥作用。 当解释器遇到 runnable1 | runnable2 这样的表达式时,它会调用 runnable1 对象的 __or__ 特殊方法。在 LangChain 的 Runnable 基类中 __or__ 方法的实现逻辑就是创建一个新的 RunnableSequence 实例,将 self (即 runnable1) 作为第一个元素,other (即 runnable2) 作为第二个元素。同样Runnable 类也实现了 __ror__ 方法,这使得 dict | runnable 这样的表达式(其中字典本身不是 Runnable)也能正确工作。 说明很绕,可以看如下图。 2.4. 代码实践 构建一个经典的 Prompt | Model | Parser 链。 # 1. 导入必要的库 from langchain_openai import ChatOpenAI from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser # 2. 初始化模型(请确保已设置 OPENAI_API_KEY 环境变量) model = ChatOpenAI(model="gpt-3.5-turbo") # 3. 创建提示模板 prompt = ChatPromptTemplate.from_template("讲一个关于 {topic} 的笑话。") # 4. 创建输出解析器 output_parser = StrOutputParser() # 5. 使用 | 操作符构建 RunnableSequence chain = prompt | model | output_parser # 6. 调用链(invoke 方法会同步执行整个链) response = chain.invoke({"topic": "程序员"}) print(response) 可以看到prompt | model | output_parser 这行代码就隐式地创建了一个 RunnableSequence。 当调用 chain.invoke({"topic": "程序员"}) 时,数据流程是这样的: | 操作符和其背后的“强制转换”(Coercion)机制是 LCEL 优秀用户体验的核心。当 | 的右侧是一个非 Runnable 对象时,例如一个普通的 Python 函数或一个字典,LCEL 会尝试将其自动转换为一个合适的 Runnable 类型。例如,函数会被包装成 RunnableLambda,字典会被包装成 RunnableParallel。这种隐式转换极大地降低了开发者的心智负担,使得代码更加简洁。理解这一自动化过程的存在是很重要的,在调试复杂链时知道背后可能发生的隐式转换可以帮助我们更快地定位问题。 3. 数据管道中的关键节点:RunnablePassthrough RunnableSequence 的默认行为是替换,每一步的输入完全取代上一步的输出,但时常会遇到这样的需求:怎么在一个线性的数据流中既处理数据 又保留原始未处理的数据?这个时候就要用到 RunnablePassthrough。 RunnablePassthrough 的基本行为类似于一个恒等函数,也就是输入什么就输出什么,用途主要是两个: 在链中传递数据 当链的后续步骤需要用到之前某个步骤的原始输入,而不仅仅是其直接上游的输出时,RunnablePassthrough 可以作为一个信使,将原始数据携带到需要他的地方 作为并行分支的占位符 在 RunnableParallel 中 RunnablePassthrough 通常被用来将原始输入原封不动的包含在并行处理的输出结果中。 3.1. assign() 方法的用法 RunnablePassthrough 的 assign() 方法是一个很强大的功能。这个方法允许在保留原始输入字典的同时,动态的计算并添加新的键值对。 assign() 接收一个或多个关键字参数,其值可以是 Runnable、callable 或普通值,对于每个关键字参数assign() 都会并行地执行其对应的 Runnable(以原始输入作为其输入),然后将执行结果作为新的值,关键字作为新的键,合并到原始输入字典中,最后返回这个扩充后的新字典。 这个“扩充而非替换”的行为是 .assign() 与 | 操作符的核心区别。 3.2. 代码实践 检索增强生成(RAG)是 RunnablePassthrough 和 .assign() 发挥巨大作用的典型场景。在一个 RAG 流程中,需要根据用户的问题(question)去检索相关文档(context),然后将问题和文档一起提供给 LLM 生成答案。这意味着,最终传给提示模板的必须同时包含 question 和 context。 先看一个利用 RunnableParallel(字典字面量)的做法: from langchain_core.runnables import RunnablePassthrough from operator import itemgetter # 假设 retriever 是一个已经配置好的 Runnable # retriever =... # 使用字典字面量(隐式的 RunnableParallel) rag_chain_v1 = { "context": retriever, "question": RunnablePassthrough() } | prompt | model | output_parser RunnablePassthrough() 接收整个输入字典(如 {"question": "..."}),然后又原样输出。其实是有些冗余的。 可以使用更优雅的方式,即刚刚说的 RunnablePassthrough.assign(): from langchain_core.runnables import RunnablePassthrough from operator import itemgetter # 假设 retriever 和其他组件已定义 # retriever =... # prompt =... # model =... # output_parser =... # 使用.assign() 扩充数据流 rag_chain_v2 = RunnablePassthrough.assign( context=itemgetter("question") | retriever ) | prompt | model | output_parser # 调用链 # response = rag_chain_v2.invoke({"question": "LCEL 是什么?"}) # print(response) assign() 实际上做了这些事情: invoke 的初始输入是 {"question": "LCEL 是什么?"}。 RunnablePassthrough.assign 接收到这个输入,并准备将其透传下去。 同时,它开始计算 context 字段。它执行 itemgetter("question") | retriever 这个子链。 itemgetter("question") 从原始输入中提取出字符串 "LCEL 是什么?"。 这个字符串被传递给 retriever,后者执行检索并返回一个文档列表 [...]。 .assign 将计算出的 context(文档列表)与原始输入合并,生成一个新的字典:{"question": "LCEL 是什么?", "context": [...]}。 这个完美格式化的字典,正好包含了 prompt 所需的全部变量,被传递给管道的下一步。 可以查看如下图: .assign 方法优雅地解决了在函数式的、不可变的管道中如何“添加”信息而不是“替换”信息的根本问题。这种将“扩充状态”行为本身封装成一个 Runnable 的做法,使得数据流的管理变得更加明确和模块化,是构建复杂、可维护的链的关键所在。 4. RunnableLambda 的高级用法与陷阱 RunnableLambda 是一个 Runnable 包装器,它可以将任何 Python 的 callable 对象(如普通函数、lambda 表达式等)转换成 LCEL 链的一个合法部分。 一旦将一个函数包装成 RunnableLambda,它就继承了所有 Runnable 的标准方法: .invoke(input, config=None): 对单个输入执行函数。 .batch(inputs, config=None): 对一批输入执行批量处理。 .stream(input, config=None): (如果可能)流式地输出结果。 .with_config(config): 为 Runnable 添加配置。 | (管道操作符): 用于将多个 Runnable 连接成一个序列。 RunnableLambda 会自动处理输入和输出的类型。 4.1. 为什么需要它? 在没有 RunnableLambda 之前,如果想在一个链中插入一个简单的数据转换步骤,可能需要创建一个完整的自定义 Chain 类,这显得有些重。RunnableLambda 提供了一种极其轻量级和便捷的方法来实现这一点。 主要用途包括: 数据预处理/后处理:在将输入传递给 LLM 之前对其进行格式化,或者在将 LLM 的输出传递给下一个步骤之前进行解析。 自定义逻辑:在链的执行流程中插入任何你需要的 Python 逻辑。 简化集成:将现有的、非 LangChain 的函数快速改造为可以与 LangChain 生态其他部分协作的组件。 4.2. 如何使用 创建 RunnableLambda 有两种方式: 显式创建: RunnableLambda(my_function) 隐式创建: 在 | 管道中直接使用函数名 ... | my_function |...。LCEL 会自动将其强制转换为一个 RunnableLambda。隐式创建是更常见、更简洁的做法。 # 原始的、需要多个参数的业务逻辑函数 def combine_text(text1: str, text2: str) -> str: """将两个文本用连字符拼接起来""" return f"{text1} - {text2}" # 兼容 LCEL 的包装函数,接收单个字典参数 def wrapped_combine_text(inputs: dict) -> str: """从输入字典中解包参数并调用原始函数""" return combine_text(inputs['key1'], inputs['key2']) # 在链中使用 # 假设 runnable1 和 runnable2 是两个上游组件 #... # 它们的输出将被 RunnableParallel 组合成一个字典 multi_param_chain = { "key1": runnable1, "key2": runnable2 } | RunnableLambda(wrapped_combine_text) 4.3. 注意点:函数为何必须接收单个参数? RunnableLambda 有一个很重要的使用约束:被包装的函数必须只接收一个参数。 这个设计决策根植于 LCEL 的核心设计哲学。LCEL 链本质上是一个线性的数据管道,其中每个 Runnable 组件都接收其上游组件的单一输出作为自己的单一输入。这个严格的“单输入/单输出”接口约定,是保证任意两个 Runnable 之间都可以无缝链接、维持整个系统可组合性的基石。 但如果业务逻辑函数天然需要多个参数怎么办? 标准解决方案是编写一个包装函数。这个包装函数遵循 LCEL 的约定,只接收一个字典作为其单一参数,然后在函数内部解包(unpack)这个字典,再将提取出的多个值传递给原始的、需要多参数的业务逻辑函数。 具体的示例代码可以参考上一小节如何使用部分的代码。 RunnableLambda 在声明式框架的严格结构和命令式编程的无限灵活性之间取得了平衡。单参数的限制,是开发者为了获得这种灵活性而需要遵守的“契约”,它要求在自定义代码的边界处编写少量的适配器代码(如字典包装器),以此来维护整个 LCEL 系统赖以生存的可组合性和一致性。 4.4. 进阶用法:RunnableConfig 为了实现更高级的控制和可观测性,RunnableLambda 包装的函数可以选择性地接收第二个参数:config: RunnableConfig。 RunnableConfig 是一个特殊的配置对象,它在整个链的调用过程中被传递。它携带了运行时的元数据,例如 callbacks (用于日志和追踪)、tags (用于过滤和识别)、run_name (为某次运行命名) 等。当你的自定义函数内部需要调用另一个 Runnable 时,将这个 config 对象传递给内部的 .invoke() 调用,可以确保整个执行过程的追踪信息是连续和完整的,这对于在 LangSmith 等平台上调试复杂链至关重要。 import json from langchain_core.runnables import RunnableConfig # 一个接收 config 的函数示例 def my_complex_logic(data: dict, config: RunnableConfig) -> str: #... 一些复杂的逻辑... # 在内部调用另一个 runnable 时,传递 config # 这样,internal_chain 的运行就会被正确地追踪,并成为 my_complex_logic 运行的子节点 result = internal_chain.invoke({"input": data["some_key"]}, config=config) return result 5. 效率提升:RunnableParallel 当应用需要对同一个输入执行多个独立的操作时,串行执行不仅效率低下,而且不符合逻辑。这时就需要用上 LCEL 的并行执行原语 RunnableParallel 了。 5.1. RunnableParallel RunnableParallel 是一个组合原语,它接收一个输入,并将这个输入同时(concurrently)分发给它所包含的多个子 Runnable。它会并行地执行所有这些子 Runnable,等待它们全部完成后,将结果收集到一个字典中返回。 输入: 单一输入,与 RunnableSequence 的输入类似。 输出: 一个字典。该字典的键是在构造 RunnableParallel 时指定的键,值是其对应的子 Runnable 的执行结果。 在同步执行环境下,RunnableParallel 通常使用线程池(ThreadPoolExecutor)来实现并发;在异步环境下,它则利用 asyncio.gather 来实现高效的并行处理。 需要明确的是,当在查阅 LangChain 相关文档或书籍时,可能会遇到 RunnableMap。 根据官方 API 文档,RunnableMap 仅仅是 RunnableParallel 的一个别名(alias)。它们指向的是同一个类,功能完全相同。所以在本文中统一都用 RunnableParallel 这个名称。 与 RunnableSequence 类似,使用 RunnableParallel 也有两种方式: 显式实例化: 直接调用 RunnableParallel 的构造函数,可以通过关键字参数或一个字典来定义并行的分支。 from langchain_core.runnables import RunnableParallel parallel_chain = RunnableParallel(key1=runnable1, key2=runnable2) 隐式转换(推荐): 在 | 管道中直接使用一个字典字面量。这是最常见、最简洁、也最符合 LCEL 风格的用法。LCEL 会自动将这个字典强制转换为一个 RunnableParallel 实例。 parallel_chain = {"key1": runnable1, "key2": runnable2} 5.2. 代码实践 from langchain_openai import ChatOpenAI from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser from langchain_core.runnables import RunnableParallel # --- 通用组件 --- model = ChatOpenAI(model="gpt-3.5-turbo") parser = StrOutputParser() # --- 分支 A: 笑话生成链 --- prompt_joke = ChatPromptTemplate.from_template("讲一个关于 {topic} 的简短笑话。") joke_chain = prompt_joke | model | parser # --- 分支 B: 诗歌生成链 --- prompt_poem = ChatPromptTemplate.from_template("写一首关于 {topic} 的四行短诗。") poem_chain = prompt_poem | model | parser # --- 使用 RunnableParallel (或字典字面量) 组合 --- # map_chain = RunnableParallel(joke=joke_chain, poem=poem_chain) # 或者更简洁地: map_chain = { "joke": joke_chain, "poem": poem_chain } # --- 调用并行链 --- result = map_chain.invoke({"topic": "机器人"}) # 打印结果 print("--- 笑话 ---") print(result['joke']) print("\n--- 诗歌 ---") print(result['poem']) # result 的结构会是: {"joke": "...", "poem": "..."} 5.3. RunnableParallel vs. RunnableSequence RunnableSequence 定义了串行执行和数据依赖。它适用于一个多步骤的工作流,其中每一步都依赖于前一步的结果。其数据流是线性的,如同装配线:A -> B -> C。 RunnableParallel 定义了并行执行和数据独立。它适用于对同一个输入进行多种不同处理的场景,其中每个处理分支之间互不依赖。其数据流是发散的(或称“扇出”,Fan-out):Input ->。 特性 RunnableSequence RunnableParallel 执行模式 串行 (Sequential) 并行/并发 (Parallel/Concurrent) 数据流 线性管道 (A -> B -> C) 扇出 (Input ->) 依赖关系 步骤间存在数据依赖 步骤间相互独立 输入/输出 单输入 -> 单输出 单输入 -> 字典输出 核心语法 `runnable1 runnable2` 主要用途 构建多步工作流 (如 RAG) 对同一输入进行多任务处理,数据准备 RunnableParallel 不仅仅是一个性能优化工具,它更是 LCEL 中实现“扇出”(Fan-out)数据流模式的结构化原语。它完美地模拟了从一个点发散出去,执行多个独立操作的计算模式。其字典形式的输出,为后续可能的“扇入”(Fan-in)步骤(例如,用一个 RunnableLambda 来聚合所有分支的结果)提供了结构化的数据。因此,RunnableSequence 和 RunnableParallel 共同构成了构建任意复杂执行图(DAG - Directed Acyclic Graph)的基础。Sequence 定义了图中的边(依赖关系),而 Parallel 定义了图中的节点分叉。理解这一点,意味着开发者从“链接组件”的思维,提升到了“构建计算图”的思维。 6. 以 LCEL 思维构建健壮、高效的 LLM 应用 LangChain 表达式语言(LCEL)为构建 LLM 应用提供了一套强大而富有表现力的工具集。通过其声明式的哲学,LCEL 将复杂的执行逻辑(如并行化、异步和流式处理)抽象出来,让开发者能够专注于应用的核心业务逻辑。 本文主要探讨了 LCEL 的概念和四大核心构建模块: RunnableSequence (| 操作符) 是构建线性、顺序工作流的基石。 RunnablePassthrough (特别是其 .assign 方法) 是在数据流中无损地扩充和传递状态的关键。 RunnableLambda 提供了将任意自定义 Python 代码安全注入到声明式链中的“逃生舱口”。 RunnableParallel (字典字面量) 是实现高效并发执行、构建“扇出”式数据流的利器。 我们需要从传统的“编写脚本”思维,转向“构建数据流图”的思维模式。在设计应用时,应首先思考任务之间的关系,熟练掌握并运用 LCEL 的思维模式,将是每一位致力于构建复杂、可维护、高性能 LLM 应用的开发者的必备技能。