【LangChain】 Runnable 链式调用深度解析:从 `itemgetter` 到 `RunnableLambda`
LangChain Runnable 链式调用深度解析:从 itemgetter 到 RunnableLambda
本文基于 LangChain 框架,深入解析 Runnable 链式调用中的核心机制,重点剖析
itemgetter、|管道符以及RunnableLambda的用法与设计哲学。
一、从一个典型示例说起
先看一段典型的 LangChain 链式代码:
from operator import itemgetter
from langchain_core.runnables import RunnableLambda
from langchain.prompts import ChatPromptTemplate
from langchain_community.chat_models import ChatTongyi
from langchain_core.output_parsers import StrOutputParser
# 计算字符串长度的普通函数
def length_function(text):
return len(text)
# Prompt 模板
prompt = ChatPromptTemplate.from_template("{a} + {b} = ? 计算结果是多少?")
# 大模型
model = ChatTongyi()
# 输出解析器
out = StrOutputParser()
# 构建链
chain = (
{"a": itemgetter("k1") | RunnableLambda(length_function),
"b": itemgetter("k2") | RunnableLambda(length_function)}
| prompt | model | out
)
# 调用
result = chain.invoke({"k1": "hello", "k2": "world"})
# 结果:模型回答 "10"
这段代码的核心是将数据提取、函数处理、Prompt 构造、大模型推理串联成一个流水线。下面逐层拆解。
二、itemgetter:从标准库走来的"提取器"
2.1 基本用法
itemgetter 来自 Python 标准库 operator 模块,用于从可迭代对象中提取指定索引或键的值:
from operator import itemgetter
# 从字典提取
data = {"k1": "hello", "k2": "world"}
getter = itemgetter("k1")
print(getter(data)) # 输出: hello
# 从列表提取(按索引)
arr = ["a", "b", "c"]
getter = itemgetter(0, 2)
print(getter(arr)) # 输出: ('a', 'c')
2.2 本质:一个可调用对象
itemgetter("k1") 返回的是一个函数对象,等价于:
lambda x: x["k1"]
它本身不是 LangChain 的组件,也不懂什么是 Runnable、什么是管道符 |。
三、| 管道符:LangChain 的链式编排语法糖
3.1 Python 运算符重载机制
Python 执行 A | B 时,按以下顺序尝试:
1. 调用 A.__or__(B) ← 左边对象的"或"方法
2. 如果返回 NotImplemented
调用 B.__ror__(A) ← 右边对象的"反向或"方法
Python 只负责"打电话",接不接、怎么接,完全由对象自己决定。(ror 就是 right or)
何时返回 NotImplemented
一句话总结NotImplemented 是"我处理不了,让别人试试"的信号。在 A | B 中:
A.__or__(B) 返回 NotImplemented → 尝试 B.__ror__(A)
两边都返回 NotImplemented → 抛 TypeError
方法根本不存在 → 等同于返回 NotImplemented ,直接进入下一步LangChain 的 Runnable.__ror__ 之所以能自动包装左边,正是因为 Python 的这个回退机制:普通函数没有 __or__ ,所以解释器自动去尝试 RunnableLambda.__ror__() ,LangChain 在那里拦截并包装。
3.2 LangChain 的双向策略
LangChain 的 Runnable 基类实现了 __or__ 和 __ror__,但两边策略截然不同:
| 位置 | 处理逻辑 | 原因 |
|---|---|---|
左边(__ror__) |
如果不是 Runnable,自动包一层 RunnableLambda |
入口要兼容各种原始数据格式 |
右边(__or__) |
如果不是 Runnable,直接抛 TypeError |
中间环节必须类型安全 |
# 源码逻辑示意(伪代码)
class Runnable:
def __ror__(self, other): # other 在左边
if not isinstance(other, Runnable):
other = RunnableLambda(other) # 自动包装
return RunnableSequence(other, self)
def __or__(self, other): # other 在右边
if not isinstance(other, Runnable):
raise TypeError(f"Expected a Runnable, got {type(other)}")
return RunnableSequence(self, other)
3.3 为什么是"左包右不包"?
这是 LangChain 的设计选择,不是 Python 语法限制:
- 左边是"数据源":需要兼容字典、函数、常量等各种输入形式,自动包装降低使用门槛
- 右边是"处理环节":链的中间节点必须是可控的 Runnable,避免隐式转换带来的调试困难
- 显式优于隐式:右边想用普通函数?请显式声明
RunnableLambda(func)
四、RunnableLambda:让普通函数融入流水线
4.1 核心作用
RunnableLambda 是 LangChain 提供的适配器,将普通 Python 函数包装成符合 Runnable 接口的对象:
from langchain_core.runnables import RunnableLambda
def my_func(x):
return x.upper()
# 包装前:普通函数
my_func("hello") # 直接调用
# 包装后:Runnable 对象
runnable = RunnableLambda(my_func)
runnable.invoke("hello") # 通过 Runnable 接口调用
包装后,该函数就具备了 Runnable 的全部能力:
invoke()—— 同步单条执行batch()—— 批量执行stream()—— 流式输出(如果函数支持生成器)ainvoke()—— 异步执行- 可以用
|与其他 Runnable 串联
4.2 完整用法示例
示例 1:基础包装与调用
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
runnable = RunnableLambda(add_one)
# 各种调用方式
print(runnable.invoke(5)) # 6
print(runnable.batch([1, 2, 3])) # [2, 3, 4]
# 异步调用
import asyncio
async def main():
result = await runnable.ainvoke(5)
print(result) # 6
asyncio.run(main())
示例 2:函数接收字典输入
def extract_and_count(data: dict) -> int:
text = data.get("text", "")
return len(text)
runnable = RunnableLambda(extract_and_count)
print(runnable.invoke({"text": "hello"})) # 5
示例 3:在链中串联使用
from langchain_core.runnables import RunnableLambda, RunnablePassthrough
chain = (
RunnablePassthrough() # 透传输入
| RunnableLambda(lambda x: x * 2) # 乘以 2
| RunnableLambda(lambda x: x + 1) # 加 1
| RunnableLambda(lambda x: f"结果: {x}") # 格式化
)
print(chain.invoke(5)) # 结果: 11 (5*2+1=11)
示例 4:与 itemgetter 配合使用(本文开头示例的变体)
from operator import itemgetter
from langchain_core.runnables import RunnableLambda
# 原始数据
data = {"user": {"name": "Alice", "age": 30}}
# 构建链:提取嵌套字段 -> 格式化
def format_user(name: str) -> str:
return f"用户名: {name}"
# itemgetter 提取嵌套值
chain = (
itemgetter("user") # 提取 {"name": "Alice", "age": 30}
| itemgetter("name") # 提取 "Alice"
| RunnableLambda(format_user) # 格式化为 "用户名: Alice"
)
print(chain.invoke(data)) # 用户名: Alice
4.3 输入输出类型标注
LangChain 支持通过类型提示自动推断输入输出模式:
from typing import TypedDict
class Input(TypedDict):
text: str
class Output(TypedDict):
length: int
def count_chars(data: Input) -> Output:
return {"length": len(data["text"])}
runnable = RunnableLambda(count_chars)
# LangChain 会自动识别 Input/Output 的结构
4.4 错误处理与重试
RunnableLambda 继承 Runnable 的全部能力,包括重试机制:
from langchain_core.runnables import RunnableLambda
def flaky_function(x):
import random
if random.random() < 0.5:
raise ValueError("随机失败")
return x * 2
# 配置重试
runnable = RunnableLambda(flaky_function).with_retry(
stop_after_attempt=3,
wait_exponential_jitter=True
)
print(runnable.invoke(5)) # 自动重试最多 3 次
五、完整数据流回顾
以本文开头的示例为例,完整数据流如下:
输入: {"k1": "hello", "k2": "world"}
↓
┌─────────────────────────────────────────────┐
│ 并行分支处理(字典构造) │
│ ├─ "k1" → itemgetter("k1") → "hello" │
│ │ ↓ │
│ │ RunnableLambda(length_function) │
│ │ ↓ │
│ │ 5 ────────────┐ │
│ │ │ │
│ └─ "k2" → itemgetter("k2") → "world" │
│ ↓ │
│ RunnableLambda(length_function) │
│ ↓ │
│ 5 ─────────────┘ │
│ ↓ │
│ {"a": 5, "b": 5} │
└─────────────────────────────────────────────┘
↓
ChatPromptTemplate.from_template("{a} + {b} = ? ...")
↓
"5 + 5 = ? 计算结果是多少?"
↓
ChatTongyi() 大模型推理
↓
StrOutputParser() 解析输出
↓
最终结果: "10"
六、@chain 装饰器:RunnableLambda 的语法糖
除了直接使用 RunnableLambda 构造函数,LangChain 还提供了 @chain 装饰器,它是 RunnableLambda 的装饰器语法封装,两者底层完全等价,但装饰器写法更符合 Python 习惯。
6.1 基本用法
from langchain_core.runnables import chain
# 用 @chain 装饰器定义 Runnable
@chain
def double_value(x: int) -> int:
return x * 2
# 调用方式与 RunnableLambda 完全一致
print(double_value.invoke(5)) # 10
print(double_value.batch([1, 2, 3])) # [2, 4, 6]
本质:@chain 装饰器内部就是 RunnableLambda(func) 的封装:
# 伪代码
def chain(func):
return RunnableLambda(func)
6.2 两种写法完全等价
from langchain_core.runnables import RunnableLambda, chain
def add_one(x):
return x + 1
# 写法 A:构造函数
r1 = RunnableLambda(add_one)
# 写法 B:装饰器(覆盖原函数名为 Runnable 对象)
@chain
def r2(x):
return x + 1
# 类型和结果完全一致
assert type(r1) == type(r2) # 都是 RunnableLambda
assert r1.invoke(5) == r2.invoke(5) # 都是 6
6.3 装饰器构建多环节链
装饰器返回的本身就是 Runnable,可以直接用 | 串联:
from langchain_core.runnables import chain
@chain
def extract_text(data: dict) -> str:
return data.get("content", "")
@chain
def count_words(text: str) -> int:
return len(text.split())
@chain
def format_result(count: int) -> str:
return f"字数统计: {count} 字"
# 串联成流水线
pipeline = extract_text | count_words | format_result
print(pipeline.invoke({"content": "hello world foo bar"}))
# 输出: 字数统计: 4 字
对比 RunnableLambda 写法:
# 等价但可读性稍差
extract_text = RunnableLambda(lambda d: d.get("content", ""))
count_words = RunnableLambda(lambda t: len(t.split()))
format_result = RunnableLambda(lambda c: f"字数统计: {c} 字")
pipeline = extract_text | count_words | format_result
6.4 链式配置能力
装饰器返回的对象仍然是 Runnable,支持全部配置方法:
from langchain_core.runnables import chain
@chain
def risky_operation(x):
import random
if random.random() < 0.3:
raise ValueError("随机失败")
return x * 2
# 绑定重试策略
safe_pipeline = risky_operation.with_retry(
stop_after_attempt=3,
wait_exponential_jitter=True
)
print(safe_pipeline.invoke(5)) # 失败时自动重试最多 3 次
6.5 对比总结
| 特性 | RunnableLambda(func) |
@chain 装饰器 |
|---|---|---|
| 本质 | 类构造函数 | 函数装饰器(返回 RunnableLambda) |
| 语法 | r = RunnableLambda(my_func) |
@chain + def my_func |
| 函数保留 | 原函数不变,新建对象 | 覆盖原函数名为 Runnable 对象 |
| 可读性 | 一般,显式构造 | 好,像普通函数定义 |
| 类型提示 | 支持 | 支持(保留函数签名) |
| 链式配置 | .with_retry() 等 |
.with_retry() 等(完全一样) |
| 推荐场景 | 临时 lambda、动态包装 | 定义可复用的链组件 |
LangChain 推荐在新代码中使用 @chain,代码更简洁、意图更明确。
注意: @chain 是牺牲原函数的可调用性(无法当作普通函数使用了),换取链式编排的便利性。如果你需要两边都用,要么分开命名,要么不用装饰器、显式构造 RunnableLambda 。
七、总结
| 概念 | 核心要点 |
|---|---|
itemgetter |
Python 标准库函数,用于提取字典键/列表索引值 |
| 管道符 |
LangChain 的链式编排语法,依赖 Python 运算符重载 |
| 左包右不包 | LangChain 设计选择:__ror__ 自动包装左边,__or__ 要求右边必须是 Runnable |
RunnableLambda |
将普通 Python 函数包装为 Runnable,使其可接入流水线 |
| 设计哲学 | 入口宽松(兼容原始数据),中间严格(保证类型安全),显式优于隐式 |
RunnableLambda 是连接"普通 Python 代码"与"LangChain 流水线"的桥梁,理解它的用法,是掌握 LangChain 链式编排的关键一步。
本文基于 LangChain 框架源码及实践总结,如有错误欢迎指正。
更多推荐


所有评论(0)