LangChain Runnable 链式调用深度解析:从 itemgetterRunnableLambda

本文基于 LangChain 框架,深入解析 Runnable 链式调用中的核心机制,重点剖析 itemgetter| 管道符以及 RunnableLambda 的用法与设计哲学。


一、从一个典型示例说起

先看一段典型的 LangChain 链式代码:

from operator import itemgetter
from langchain_core.runnables import RunnableLambda
from langchain.prompts import ChatPromptTemplate
from langchain_community.chat_models import ChatTongyi
from langchain_core.output_parsers import StrOutputParser

# 计算字符串长度的普通函数
def length_function(text):
    return len(text)

# Prompt 模板
prompt = ChatPromptTemplate.from_template("{a} + {b} = ? 计算结果是多少?")

# 大模型
model = ChatTongyi()

# 输出解析器
out = StrOutputParser()

# 构建链
chain = (
    {"a": itemgetter("k1") | RunnableLambda(length_function),
     "b": itemgetter("k2") | RunnableLambda(length_function)}
    | prompt | model | out
)

# 调用
result = chain.invoke({"k1": "hello", "k2": "world"})
# 结果:模型回答 "10"

这段代码的核心是将数据提取、函数处理、Prompt 构造、大模型推理串联成一个流水线。下面逐层拆解。


二、itemgetter:从标准库走来的"提取器"

2.1 基本用法

itemgetter 来自 Python 标准库 operator 模块,用于从可迭代对象中提取指定索引或键的值

from operator import itemgetter

# 从字典提取
data = {"k1": "hello", "k2": "world"}
getter = itemgetter("k1")
print(getter(data))  # 输出: hello

# 从列表提取(按索引)
arr = ["a", "b", "c"]
getter = itemgetter(0, 2)
print(getter(arr))   # 输出: ('a', 'c')

2.2 本质:一个可调用对象

itemgetter("k1") 返回的是一个函数对象,等价于:

lambda x: x["k1"]

它本身不是 LangChain 的组件,也不懂什么是 Runnable、什么是管道符 |


三、| 管道符:LangChain 的链式编排语法糖

3.1 Python 运算符重载机制

Python 执行 A | B 时,按以下顺序尝试:

1. 调用 A.__or__(B)          ← 左边对象的"或"方法
2. 如果返回 NotImplemented
   调用 B.__ror__(A)         ← 右边对象的"反向或"方法

Python 只负责"打电话",接不接、怎么接,完全由对象自己决定。ror 就是 right or

何时返回 NotImplemented

一句话总结
NotImplemented 是"我处理不了,让别人试试"的信号。
在 A | B 中:

A.__or__(B) 返回 NotImplemented → 尝试 B.__ror__(A)

两边都返回 NotImplemented → 抛 TypeError

方法根本不存在 → 等同于返回 NotImplemented ,直接进入下一步
LangChain 的 Runnable.__ror__ 之所以能自动包装左边,正是因为 Python 的这个回退机制:普通函数没有 __or__ ,所以解释器自动去尝试 RunnableLambda.__ror__() ,LangChain 在那里拦截并包装。

3.2 LangChain 的双向策略

LangChain 的 Runnable 基类实现了 __or____ror__,但两边策略截然不同

位置 处理逻辑 原因
左边__ror__ 如果不是 Runnable,自动包一层 RunnableLambda 入口要兼容各种原始数据格式
右边__or__ 如果不是 Runnable,直接抛 TypeError 中间环节必须类型安全
# 源码逻辑示意(伪代码)
class Runnable:
    def __ror__(self, other):  # other 在左边
        if not isinstance(other, Runnable):
            other = RunnableLambda(other)  # 自动包装
        return RunnableSequence(other, self)

    def __or__(self, other):   # other 在右边
        if not isinstance(other, Runnable):
            raise TypeError(f"Expected a Runnable, got {type(other)}")
        return RunnableSequence(self, other)

3.3 为什么是"左包右不包"?

这是 LangChain 的设计选择,不是 Python 语法限制:

  • 左边是"数据源":需要兼容字典、函数、常量等各种输入形式,自动包装降低使用门槛
  • 右边是"处理环节":链的中间节点必须是可控的 Runnable,避免隐式转换带来的调试困难
  • 显式优于隐式:右边想用普通函数?请显式声明 RunnableLambda(func)

四、RunnableLambda:让普通函数融入流水线

4.1 核心作用

RunnableLambda 是 LangChain 提供的适配器,将普通 Python 函数包装成符合 Runnable 接口的对象:

from langchain_core.runnables import RunnableLambda

def my_func(x):
    return x.upper()

# 包装前:普通函数
my_func("hello")  # 直接调用

# 包装后:Runnable 对象
runnable = RunnableLambda(my_func)
runnable.invoke("hello")  # 通过 Runnable 接口调用

包装后,该函数就具备了 Runnable 的全部能力:

  • invoke() —— 同步单条执行
  • batch() —— 批量执行
  • stream() —— 流式输出(如果函数支持生成器)
  • ainvoke() —— 异步执行
  • 可以用 | 与其他 Runnable 串联

4.2 完整用法示例

示例 1:基础包装与调用
from langchain_core.runnables import RunnableLambda

def add_one(x: int) -> int:
    return x + 1

runnable = RunnableLambda(add_one)

# 各种调用方式
print(runnable.invoke(5))           # 6
print(runnable.batch([1, 2, 3]))      # [2, 3, 4]

# 异步调用
import asyncio
async def main():
    result = await runnable.ainvoke(5)
    print(result)  # 6
asyncio.run(main())
示例 2:函数接收字典输入
def extract_and_count(data: dict) -> int:
    text = data.get("text", "")
    return len(text)

runnable = RunnableLambda(extract_and_count)
print(runnable.invoke({"text": "hello"}))  # 5
示例 3:在链中串联使用
from langchain_core.runnables import RunnableLambda, RunnablePassthrough

chain = (
    RunnablePassthrough()  # 透传输入
    | RunnableLambda(lambda x: x * 2)      # 乘以 2
    | RunnableLambda(lambda x: x + 1)      # 加 1
    | RunnableLambda(lambda x: f"结果: {x}")  # 格式化
)

print(chain.invoke(5))  # 结果: 11  (5*2+1=11)
示例 4:与 itemgetter 配合使用(本文开头示例的变体)
from operator import itemgetter
from langchain_core.runnables import RunnableLambda

# 原始数据
data = {"user": {"name": "Alice", "age": 30}}

# 构建链:提取嵌套字段 -> 格式化
def format_user(name: str) -> str:
    return f"用户名: {name}"

# itemgetter 提取嵌套值
chain = (
    itemgetter("user")           # 提取 {"name": "Alice", "age": 30}
    | itemgetter("name")         # 提取 "Alice"
    | RunnableLambda(format_user)  # 格式化为 "用户名: Alice"
)

print(chain.invoke(data))  # 用户名: Alice

4.3 输入输出类型标注

LangChain 支持通过类型提示自动推断输入输出模式:

from typing import TypedDict

class Input(TypedDict):
    text: str

class Output(TypedDict):
    length: int

def count_chars(data: Input) -> Output:
    return {"length": len(data["text"])}

runnable = RunnableLambda(count_chars)
# LangChain 会自动识别 Input/Output 的结构

4.4 错误处理与重试

RunnableLambda 继承 Runnable 的全部能力,包括重试机制:

from langchain_core.runnables import RunnableLambda

def flaky_function(x):
    import random
    if random.random() < 0.5:
        raise ValueError("随机失败")
    return x * 2

# 配置重试
runnable = RunnableLambda(flaky_function).with_retry(
    stop_after_attempt=3,
    wait_exponential_jitter=True
)

print(runnable.invoke(5))  # 自动重试最多 3 次

五、完整数据流回顾

以本文开头的示例为例,完整数据流如下:

输入: {"k1": "hello", "k2": "world"}
    ↓
┌─────────────────────────────────────────────┐
│  并行分支处理(字典构造)                      │
│  ├─ "k1" → itemgetter("k1") → "hello"       │
│  │           ↓                               │
│  │      RunnableLambda(length_function)      │
│  │           ↓                               │
│  │           5  ────────────┐                │
│  │                          │                │
│  └─ "k2" → itemgetter("k2") → "world"       │
│              ↓                               │
│         RunnableLambda(length_function)      │
│              ↓                               │
│              5  ─────────────┘                │
│                          ↓                    │
│              {"a": 5, "b": 5}                │
└─────────────────────────────────────────────┘
    ↓
ChatPromptTemplate.from_template("{a} + {b} = ? ...")
    ↓
"5 + 5 = ? 计算结果是多少?"
    ↓
ChatTongyi() 大模型推理
    ↓
StrOutputParser() 解析输出
    ↓
最终结果: "10"

六、@chain 装饰器:RunnableLambda 的语法糖

除了直接使用 RunnableLambda 构造函数,LangChain 还提供了 @chain 装饰器,它是 RunnableLambda 的装饰器语法封装,两者底层完全等价,但装饰器写法更符合 Python 习惯。

6.1 基本用法

from langchain_core.runnables import chain

# 用 @chain 装饰器定义 Runnable
@chain
def double_value(x: int) -> int:
    return x * 2

# 调用方式与 RunnableLambda 完全一致
print(double_value.invoke(5))        # 10
print(double_value.batch([1, 2, 3]))   # [2, 4, 6]

本质@chain 装饰器内部就是 RunnableLambda(func) 的封装:

# 伪代码
def chain(func):
    return RunnableLambda(func)

6.2 两种写法完全等价

from langchain_core.runnables import RunnableLambda, chain

def add_one(x):
    return x + 1

# 写法 A:构造函数
r1 = RunnableLambda(add_one)

# 写法 B:装饰器(覆盖原函数名为 Runnable 对象)
@chain
def r2(x):
    return x + 1

# 类型和结果完全一致
assert type(r1) == type(r2)            # 都是 RunnableLambda
assert r1.invoke(5) == r2.invoke(5)    # 都是 6

6.3 装饰器构建多环节链

装饰器返回的本身就是 Runnable,可以直接用 | 串联:

from langchain_core.runnables import chain

@chain
def extract_text(data: dict) -> str:
    return data.get("content", "")

@chain
def count_words(text: str) -> int:
    return len(text.split())

@chain
def format_result(count: int) -> str:
    return f"字数统计: {count} 字"

# 串联成流水线
pipeline = extract_text | count_words | format_result

print(pipeline.invoke({"content": "hello world foo bar"}))
# 输出: 字数统计: 4 字

对比 RunnableLambda 写法

# 等价但可读性稍差
extract_text = RunnableLambda(lambda d: d.get("content", ""))
count_words = RunnableLambda(lambda t: len(t.split()))
format_result = RunnableLambda(lambda c: f"字数统计: {c} 字")
pipeline = extract_text | count_words | format_result

6.4 链式配置能力

装饰器返回的对象仍然是 Runnable,支持全部配置方法:

from langchain_core.runnables import chain

@chain
def risky_operation(x):
    import random
    if random.random() < 0.3:
        raise ValueError("随机失败")
    return x * 2

# 绑定重试策略
safe_pipeline = risky_operation.with_retry(
    stop_after_attempt=3,
    wait_exponential_jitter=True
)

print(safe_pipeline.invoke(5))  # 失败时自动重试最多 3 次

6.5 对比总结

特性 RunnableLambda(func) @chain 装饰器
本质 类构造函数 函数装饰器(返回 RunnableLambda
语法 r = RunnableLambda(my_func) @chain + def my_func
函数保留 原函数不变,新建对象 覆盖原函数名为 Runnable 对象
可读性 一般,显式构造 好,像普通函数定义
类型提示 支持 支持(保留函数签名)
链式配置 .with_retry() .with_retry() 等(完全一样)
推荐场景 临时 lambda、动态包装 定义可复用的链组件

LangChain 推荐在新代码中使用 @chain,代码更简洁、意图更明确。

注意: @chain 是牺牲原函数的可调用性(无法当作普通函数使用了),换取链式编排的便利性。如果你需要两边都用,要么分开命名,要么不用装饰器、显式构造 RunnableLambda 。


七、总结

概念 核心要点
itemgetter Python 标准库函数,用于提取字典键/列表索引值
| 管道符 LangChain 的链式编排语法,依赖 Python 运算符重载
左包右不包 LangChain 设计选择:__ror__ 自动包装左边,__or__ 要求右边必须是 Runnable
RunnableLambda 将普通 Python 函数包装为 Runnable,使其可接入流水线
设计哲学 入口宽松(兼容原始数据),中间严格(保证类型安全),显式优于隐式

RunnableLambda 是连接"普通 Python 代码"与"LangChain 流水线"的桥梁,理解它的用法,是掌握 LangChain 链式编排的关键一步。


本文基于 LangChain 框架源码及实践总结,如有错误欢迎指正。

Logo

汇聚全球AI编程工具,助力开发者即刻编程。

更多推荐