1. 引言:为什么我们每天离不开LangChain这五个核心功能

如果你正在构建基于大语言模型的应用,大概率已经听过或者正在使用LangChain。这个框架几乎成了LLM应用开发的事实标准,但它的价值远不止于“又一个AI工具库”。在我们团队的实际生产部署中,从简单的聊天机器人到复杂的多智能体工作流,LangChain提供的几个核心抽象层,实实在在地解决了工程化落地中最头疼的那些问题。今天,我想抛开那些泛泛的概述,直接聊聊我们每天在真实项目中重度依赖的五个LangChain功能,以及如何将它们组合起来,构建一个可以直接上线的、由LangGraph驱动的智能研究助手。

这五个功能分别是: 提示词模板管理 带持久化的对话上下文 基于LCEL的链式组合 开箱即用的检索增强生成 ,以及 用于结构化输出的解析器 。它们共同构成了一个可靠的工具集,让你能专注于业务逻辑,而不是反复造轮子去处理LLM集成、状态管理和数据流编排的琐碎细节。我们会从一个最简单的“研究助手”原型开始,逐步加入这些功能,最后用LangGraph将其重构为一个健壮的生产级应用。无论你是刚开始接触LangChain,还是已经用它做过一些原型,相信这些从一线实战中总结出的模式和技巧,都能帮你少踩几个坑。

2. 核心功能一:用提示词模板实现可维护的提示工程

直接向LLM API发送原始字符串提示词,在原型阶段或许可行,但一旦项目规模扩大,你就会陷入“字符串地狱”——提示词散落在代码各处,修改一个系统指令需要全局搜索,不同模型间的提示格式差异让人头疼。LangChain的 PromptTemplate ChatPromptTemplate 首先解决的就是这个问题: 将提示词定义为可复用、可配置的程序对象

2.1 基础模板:告别硬编码的字符串

一个最基本的提示词模板,允许你将动态部分定义为变量。这样做的好处是双重的:一是实现了关注点分离,业务逻辑代码不用关心提示词的具体措辞;二是提升了可维护性,你可以集中管理所有提示词,甚至将它们外部化到配置文件或数据库中。

from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

# 定义一个研究助手的提示词模板
research_prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个研究助手。请根据请求的主题,提供关键要点和核心概念。"),
    ("human", "{input}")
])

# 初始化LLM
llm = ChatOpenAI(model="gpt-4", temperature=0)

# 使用模板,动态注入用户查询
formatted_messages = research_prompt.format_messages(input="什么是LangChain?")
response = llm.invoke(formatted_messages)
print(response.content)

在这个例子中, {input} 就是一个占位符。当你调用 format_messages 时,传入的字典会替换这些占位符,生成最终的提示词列表。这看起来简单,但它奠定了所有复杂功能的基础。模板支持多种消息角色(system, human, ai, function),你可以轻松构建多轮对话的上下文。

注意 ChatPromptTemplate.from_messages 接受一个消息列表。每个消息是一个元组,第一个元素是角色,第二个是内容(或包含变量的模板字符串)。这种结构天然支持复杂的多角色对话编排。

2.2 进阶技巧:少样本提示与模板组合

基础模板解决了变量替换问题,但对于需要引导模型输出特定格式或风格的任务,我们常常需要“少样本提示”。LangChain提供了 FewShotChatMessagePromptTemplate 来优雅地处理这个问题。

假设我们希望研究助手总是以“首先...其次...最后...”的结构来回答。我们可以这样做:

from langchain_core.prompts import FewShotChatMessagePromptTemplate

# 1. 定义示例集
examples = [
    {
        "input": "什么是机器学习?",
        "output": "首先,机器学习是人工智能的一个分支,专注于让计算机从数据中学习。其次,它主要分为监督学习、无监督学习和强化学习。最后,其核心目标是构建能够自动改进经验的算法。"
    },
    {
        "input": "解释一下神经网络",
        "output": "首先,神经网络是受人脑神经元结构启发的计算模型。其次,它由输入层、隐藏层和输出层组成,通过权重和激活函数传递信号。最后,通过反向传播算法调整权重以最小化预测误差。"
    }
]

# 2. 创建示例模板,定义如何将每个示例转化为对话消息
example_prompt = ChatPromptTemplate.from_messages([
    ("human", "{input}"),
    ("ai", "{output}")
])

# 3. 创建少样本提示模板
few_shot_prompt = FewShotChatMessagePromptTemplate(
    example_prompt=example_prompt,
    examples=examples,
)

# 4. 将少样本模板作为最终提示的一部分
final_prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个研究助手,请严格按照提供的示例格式进行回答。"),
    few_shot_prompt,  # 这里会插入所有的示例对话
    ("human", "{input}")
])

# 使用方式与基础模板完全相同
chain = final_prompt | llm
response = chain.invoke({"input": "什么是RAG?"})

少样本提示能显著提升模型输出的稳定性和质量,尤其是在需要特定格式、风格或解决复杂推理任务时。 FewShotChatMessagePromptTemplate 帮你管理这些示例,让提示词工程变得模块化。

实操心得 :不要把所有示例都硬编码在代码里。对于生产环境,考虑将示例存储在向量数据库中,根据用户当前查询动态检索最相关的几个示例注入提示词,这可以实现更智能、更个性化的少样本学习。

3. 核心功能二:用消息历史管理实现有记忆的对话

没有记忆的聊天机器人就像金鱼——每次对话都是全新的开始。这对于需要上下文连贯的复杂任务来说是致命的。LangChain通过 ChatMessageHistory 及其相关组件,提供了从内存到数据库等多种级别的对话上下文管理方案。

3.1 会话级内存:让对话连贯起来

最简单的需求是让机器人在单次会话中记住之前说过的话。 InMemoryChatMessageHistory 非常适合这个场景,它就像一个临时的对话记录本。

from langchain.memory import ChatMessageHistory
from langchain_core.messages import HumanMessage, AIMessage

# 创建一个内存中的消息历史记录
history = ChatMessageHistory()

# 模拟一轮对话
history.add_user_message("LangChain是什么?")
# 假设AI回复了,我们将其加入历史
history.add_ai_message("LangChain是一个用于开发大语言模型应用的框架。")

# 现在进行下一轮对话,历史记录会被自动带入上下文
print(history.messages)
# 输出: [HumanMessage(content='LangChain是什么?'), AIMessage(content='LangChain是一个用于开发大语言模型应用的框架。')]

但真实应用通常要同时服务多个用户或会话。我们需要一个能根据会话ID隔离历史记录的机制。LangChain的 RunnableWithMessageHistory 包装器与一个简单的“会话历史工厂函数”配合,可以完美解决这个问题。

from langchain.memory import ChatMessageHistory
from langchain_core.runnables.history import RunnableWithMessageHistory

# 一个简单的内存存储,用字典模拟。生产环境请用Redis、数据库等。
memory_store = {}

def get_session_history(session_id: str) -> ChatMessageHistory:
    """根据session_id获取或创建对应的消息历史记录。"""
    if session_id not in memory_store:
        memory_store[session_id] = ChatMessageHistory()
    return memory_store[session_id]

# 在提示词模板中为历史消息预留位置
prompt_with_history = ChatPromptTemplate.from_messages([
    ("system", "你是一个研究助手。请参考之前的对话历史来回答问题。"),
    MessagesPlaceholder(variable_name="history"),  # 关键:这是一个占位符
    ("human", "{input}")
])

# 创建基础链
chain = prompt_with_history | llm

# 用RunnableWithMessageHistory包装基础链,使其具备会话感知能力
conversational_chain = RunnableWithMessageHistory(
    chain,
    get_session_history,  # 告诉它如何获取历史
    input_messages_key="input",  # 用户输入对应的键
    history_messages_key="history"  # 历史消息在提示词中对应的变量名
)

# 模拟用户“user-123”的两轮对话
response1 = conversational_chain.invoke(
    {"input": "帮我了解一下LangChain"},
    config={"configurable": {"session_id": "user-123"}}  # 指定会话ID
)
print(f"第一轮回复: {response1.content}")

response2 = conversational_chain.invoke(
    {"input": "我刚刚问了你什么?"},  # 这个问题依赖于历史
    config={"configurable": {"session_id": "user-123"}}  # 相同的会话ID
)
print(f"第二轮回复: {response2.content}")  # AI应该能回答出上一轮的问题

RunnableWithMessageHistory 自动完成了最繁琐的工作:在每次调用时,它根据 session_id 从工厂函数获取历史记录,将其格式化后插入到提示词的 MessagesPlaceholder 位置,并在调用结束后将本轮的新消息(用户输入和AI回复)追加回历史记录。你只需要关心业务链和如何存储历史。

3.2 长期记忆与上下文窗口管理

内存存储只适用于单次服务进程的生命周期。一旦服务重启,所有记忆都会消失。对于生产环境,你需要 持久化存储 ,比如Redis、PostgreSQL或MongoDB。LangChain支持多种后端,你只需实现一个类似的 get_session_history 函数,内部从你的数据库中读写即可。

另一个更隐蔽但至关重要的问题是 上下文窗口限制 。像GPT-4这样的模型有固定的token上限(例如128K)。如果对话历史无限增长,最终会超出限制,导致最开始的对话被“遗忘”。常见的解决方案有两种:

  1. 滑动窗口 :只保留最近N条消息或最近N个token的历史。 ChatMessageHistory 本身不提供此功能,但你可以通过自定义 get_session_history 函数,在返回历史前进行截断。
  2. 总结性压缩 :这是更优雅的方案。当历史记录达到一定长度时,调用另一个LLM对早期对话进行总结,然后用总结摘要替换掉详细的历史记录,从而在保留核心信息的同时大幅节省token。
from langchain_core.prompts import PromptTemplate
from langchain.chains import LLMChain

summary_prompt = PromptTemplate.from_template(
    "请将以下对话历史简洁地总结成一段摘要,保留所有关键事实和决定:\n\n{history}\n\n摘要:"
)
summary_chain = summary_prompt | llm

def get_compressed_history(session_id: str, max_messages=10) -> ChatMessageHistory:
    history = get_persistent_history_from_db(session_id)  # 假设从DB获取
    messages = history.messages

    if len(messages) > max_messages:
        # 将超出部分的老消息提取出来用于总结
        to_summarize = messages[:-max_messages]  # 老消息
        recent = messages[-max_messages:]  # 新消息

        # 将老消息格式化成文本进行总结
        history_text = "\n".join([f"{m.type}: {m.content}" for m in to_summarize])
        summary = summary_chain.invoke({"history": history_text})

        # 创建新的历史:总结 + 新消息
        new_history = ChatMessageHistory()
        new_history.add_message(AIMessage(content=f"[先前对话摘要] {summary.content}"))
        for msg in recent:
            new_history.add_message(msg)
        return new_history
    return history

避坑指南 :上下文管理是生产级聊天应用稳定性的基石。务必根据你的业务场景(是短平快客服还是深度学术讨论)设计合适的历史保留策略。无脑保存全部历史,不仅浪费token、增加成本,还可能因为无关信息干扰导致模型性能下降。

4. 核心功能三:用LCEL构建声明式、可组合的工作流

当你把提示词模板、LLM调用、输出解析等步骤组合起来时,代码很容易变成嵌套的回调地狱。LangChain表达式语言(LCEL)通过引入 Runnable 协议和管道操作符( | ),让链的构建变得像搭积木一样直观和声明式。

4.1 LCEL基础:从线性链到复杂管道

任何实现了 .invoke() , .batch() , .stream() 等方法的对象都是 Runnable 。LCEL的核心思想是用 | 操作符将多个 Runnable 连接起来,形成一个新链。这个新链本身也是一个 Runnable

让我们扩展之前的研究助手,让它先进行研究,然后对研究结果进行总结。

from operator import itemgetter

# 1. 定义两个提示词模板
research_prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个研究助手。请提供关于请求主题的关键要点和概念。"),
    ("human", "{input}")
])

summary_prompt = ChatPromptTemplate.from_messages([
    ("human", "请用两三句话总结以下信息:{input}")
])

# 2. 使用LCEL组合链
research_and_summarize_chain = (
    research_prompt  # 第一步:格式化研究提示词
    | llm  # 第二步:调用LLM进行研究
    | (lambda x: x.content)  # 第三步:提取LLM回复的文本内容
    | summary_prompt  # 第四步:将上一步结果作为输入,格式化总结提示词
    | llm  # 第五步:再次调用LLM进行总结
    | (lambda x: x.content)  # 第六步:提取最终总结文本
)

# 调用链
result = research_and_summarize_chain.invoke({"input": "什么是检索增强生成?"})
print(result)

这个链清晰展示了数据流:用户输入 -> 研究提示词 -> LLM研究 -> 提取内容 -> 总结提示词 -> LLM总结 -> 提取最终结果。LCEL会自动处理中间结果的传递。

4.2 高级组合:分支、并行与动态路由

简单的线性链不够用?LCEL提供了 RunnableParallel , RunnableBranch 等组件来处理更复杂的逻辑。

  • RunnableParallel :并行执行多个分支,并将结果合并到一个字典中。这在需要同时获取多种信息时非常有用。
from langchain_core.runnables import RunnableParallel

# 定义一个链,同时获取主题的定义和示例
multi_query_chain = RunnableParallel({
    "definition": research_prompt | llm | (lambda x: x.content),
    "example": ChatPromptTemplate.from_template("给出一个关于{input}的具体示例。") | llm | (lambda x: x.content),
})

result = multi_query_chain.invoke({"input": "机器学习"})
print(f"定义: {result['definition']}")
print(f"示例: {result['example']}")
  • RunnableBranch :根据条件动态选择执行哪个分支。这可以用来实现简单的路由逻辑。
from langchain_core.runnables import RunnableBranch

# 定义条件判断函数
def route_based_on_length(input_dict):
    user_input = input_dict.get("input", "")
    if len(user_input) > 100:
        return "long"  # 返回分支的标识符
    else:
        return "short"

# 定义不同分支的处理链
long_chain = ChatPromptTemplate.from_template("用户输入很长,请详细分析:{input}") | llm
short_chain = ChatPromptTemplate.from_template("用户输入简短,请直接回答:{input}") | llm

# 创建分支
branch_chain = RunnableBranch(
    ("long", long_chain),  # 如果route函数返回"long",执行此链
    ("short", short_chain), # 如果返回"short",执行此链
)

# 需要将原始输入先经过路由判断,再进入分支
from langchain_core.runnables import RunnableLambda
full_chain = RunnableLambda(route_based_on_length) | branch_chain

result = full_chain.invoke({"input": "这是一个非常非常长的用户输入..." * 10})

核心建议 :LCEL非常适合构建清晰、可测试的原子化工作流单元。但对于涉及复杂状态循环(比如需要根据LLM输出反复执行工具调用)、多智能体协作或严格需要人工审核步骤的生产级应用,LCEL会显得力不从心。这时, LangGraph 是更合适的选择。你可以将每个LCEL链封装成LangGraph的一个节点,在图的层面管理状态和流程。

5. 核心功能四:用向量存储抽象轻松实现检索增强生成

LLM的知识受限于其训练数据,且存在“幻觉”问题。检索增强生成通过从外部知识库(如你的文档、数据库)中实时检索相关信息,并将其作为上下文提供给LLM,极大地提升了回答的准确性和时效性。LangChain将RAG流程中与向量数据库交互的复杂性封装成了简洁的抽象层。

5.1 构建你的知识库:文档加载、切分与向量化

RAG的第一步是准备知识库。这通常涉及三个步骤:

  1. 加载 :从各种来源(PDF、网页、数据库)加载文档。
  2. 切分 :将大文档切分成适合检索的小片段。
  3. 向量化 :使用嵌入模型将文本片段转化为向量,并存入向量数据库。
from langchain_community.document_loaders import TextLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_chroma import Chroma  # 这里以Chroma为例

# 1. 加载文档(假设有一个docs文件夹存放文本文件)
import os
docs = []
for filename in os.listdir("./docs"):
    if filename.endswith(".txt"):
        loader = TextLoader(f"./docs/{filename}", encoding="utf-8")
        docs.extend(loader.load())

# 2. 切分文档
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=500,  # 每个片段大约500字符
    chunk_overlap=50,  # 片段间重叠50字符,保持上下文连贯
    separators=["\n\n", "\n", "。", "!", "?", ";", ",", " ", ""]  # 中文友好的分隔符
)
all_splits = text_splitter.split_documents(docs)

# 3. 向量化并存储
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")  # 使用OpenAI的嵌入模型
vectorstore = Chroma.from_documents(
    documents=all_splits,
    embedding=embeddings,
    persist_directory="./chroma_db"  # 指定持久化目录
)
# 现在vectorstore就是一个可以检索的检索器了

5.2 集成RAG到LCEL链中

有了向量存储,我们就可以在用户提问时,先检索相关文档片段,再将其作为额外上下文注入提示词。

from langchain_core.runnables import RunnablePassthrough

# 1. 从已创建的向量库创建检索器
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})  # 每次检索最相关的3个片段

# 2. 定义一个格式化检索结果的函数
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

# 3. 更新提示词模板,加入上下文占位符
rag_prompt = ChatPromptTemplate.from_messages([
    ("system", """你是一个研究助手。请基于以下提供的上下文信息来回答问题。如果上下文信息不足以回答问题,请基于你自己的知识回答,并说明这一点。
上下文:
{context}
问题:"""),
    ("human", "{input}")
])

# 4. 构建RAG链
rag_chain = (
    RunnablePassthrough.assign(  # RunnablePassthrough传递原始输入
        context=lambda x: format_docs(retriever.invoke(x["input"]))  # 检索并格式化上下文
    )
    | rag_prompt  # 将原始输入和检索到的上下文一起格式化到提示词中
    | llm
    | (lambda x: x.content)
)

# 调用链
result = rag_chain.invoke({"input": "LangChain的主要组件有哪些?"})
print(result)

这个链的关键在于 RunnablePassthrough.assign() 。它创建了一个新的字典,其中包含原始输入的所有键值对,并额外添加了一个 context 键,其值是通过检索器获取并格式化后的文档内容。这样,在后续的 rag_prompt 中, {context} {input} 就都有了对应的值。

性能与精度权衡 search_kwargs={"k": 3} 中的 k 值很重要。 k 太小可能遗漏关键信息, k 太大会增加token消耗、成本和可能的无关信息干扰。通常需要根据你的文档平均长度和问题复杂度进行调整。此外,检索器的搜索类型(如 similarity_search , mmr 最大边际相关性搜索)也会影响结果质量, mmr 可以在保证相关性的同时增加结果的多样性。

6. 核心功能五:用输出解析器从非确定性中获取结构化数据

LLM的输出是自由文本,这对于直接展示给用户没问题,但如果你想用它的输出驱动下游逻辑(比如存入数据库、触发另一个API调用、或者进行自动化决策),非结构化的文本就成了噩梦。输出解析器的作用就是 为LLM的输出套上一个“模板” ,强制它返回结构化的数据(如JSON、Pydantic模型),并自动进行验证和类型转换。

6.1 使用Pydantic模型定义输出结构

结合Pydantic,你可以用Python类清晰地定义你期望的输出格式。

from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.output_parsers import PydanticOutputParser

# 1. 定义你期望的数据结构
class ResearchSummary(BaseModel):
    subject: str = Field(description="研究主题,用一个词概括")
    summary: str = Field(description="简要总结")
    key_points: list[str] = Field(description="3-5个关键要点", min_items=3, max_items=5)
    confidence: float = Field(description="回答的置信度,0到1之间", ge=0, le=1)

# 2. 创建输出解析器
parser = PydanticOutputParser(pydantic_object=ResearchSummary)

# 3. 在提示词中告诉模型需要遵循的格式
format_instructions = parser.get_format_instructions()  # 获取格式说明文本
print(format_instructions)  # 你会看到类似“输出应为JSON,包含subject, summary等字段...”的指令

structured_prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个研究助手。请根据用户查询进行研究并结构化输出。\n{format_instructions}"),
    ("human", "{input}")
])

# 4. 构建链:提示词 -> LLM -> 解析器
structured_chain = (
    structured_prompt.partial(format_instructions=format_instructions)  # 部分格式化,先注入格式指令
    | llm
    | parser  # 关键:解析器会将LLM的文本输出转换为ResearchSummary对象
)

# 调用链
try:
    result: ResearchSummary = structured_chain.invoke({"input": "解释一下深度学习"})
    print(f"主题: {result.subject}")
    print(f"总结: {result.summary}")
    print(f"关键要点: {result.key_points}")
    print(f"置信度: {result.confidence}")
    # 现在result是一个标准的Pydantic对象,你可以轻松地访问其属性,或转换为dict/json
    print(result.json(indent=2))
except Exception as e:
    print(f"解析失败: {e}")
    # 处理解析错误,例如让模型重试

PydanticOutputParser 会自动将LLM的输出文本解析成你定义的 ResearchSummary 对象。如果LLM的输出不符合格式要求或字段验证失败(比如 confidence 不在0-1之间),解析器会抛出异常,这为你提供了重试或降级处理的机会。

6.2 解析器在复杂工作流中的价值

输出解析器的威力在链式或图式工作流中更能体现。想象一个场景:第一个LLM节点负责分析用户意图并分类,输出一个结构化的“意图对象”;第二个节点根据不同的意图,调用不同的工具链。如果没有输出解析器,你需要在代码里写一堆脆弱的字符串匹配和正则表达式来解析第一个LLM的输出。有了输出解析器,你可以直接拿到一个可靠的、类型安全的Python对象来做条件判断。

class UserIntent(BaseModel):
    intent_type: Literal["query_fact", "request_summary", "generate_code"] = Field(...)
    topic: str = Field(...)
    urgency: Literal["low", "medium", "high"] = Field("medium")

intent_parser = PydanticOutputParser(pydantic_object=UserIntent)
intent_chain = intent_prompt | llm | intent_parser

# 在后续路由逻辑中,可以直接使用解析后的对象
intent_obj = intent_chain.invoke({"input": user_query})
if intent_obj.intent_type == "query_fact":
    # 调用事实查询链
    pass
elif intent_obj.intent_type == "generate_code":
    # 调用代码生成链,并将topic和urgency作为参数传递
    pass

错误处理策略 :LLM的输出是非确定性的,解析失败是常态而非例外。生产代码中必须包裹 try...except 。更健壮的做法是使用LangChain的 with_retry with_fallbacks 方法为链添加重试或降级机制。例如,可以设置解析失败时自动使用一个更简单的提示词让模型重试,或者返回一个包含错误信息的默认结构化对象。

7. 实战整合:构建一个生产级研究助手

现在,让我们把上述所有功能整合起来,构建一个功能相对完整的研究助手。它将具备:1) 基于RAG的知识库查询;2) 会话记忆;3) 结构化输出;4) 调用外部工具(如网络搜索)的能力。我们将先用LCEL实现一个版本,然后讨论其局限性,并引入最终的解决方案——LangGraph。

7.1 基于LCEL的整合实现

这个版本将所有功能串联在一个LCEL链中,逻辑是线性的:检索 -> 研究(可能调用工具)-> 总结 -> 解析。

import os
from operator import itemgetter
from typing import List, Dict, Any
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.output_parsers import PydanticOutputParser
from langchain_core.messages import HumanMessage, AIMessage
from langchain.memory import ChatMessageHistory
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables import RunnablePassthrough, RunnableLambda
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_chroma import Chroma
from langchain_core.documents import Document
import requests  # 用于网络搜索工具

# --- 1. 定义数据结构 ---
class ResearchSummary(BaseModel):
    subject: str = Field(description="研究主题")
    summary: str = Field(description="简要总结")
    key_points: List[str] = Field(description="3-5个关键要点")
    sources: List[Dict[str, str]] = Field(description="信息来源,包含title和url", default_factory=list)

# --- 2. 定义工具 ---
@tool
def search_web(query: str) -> List[Dict[str, str]]:
    """使用搜索引擎查询网络上的最新信息。"""
    # 这里使用Brave Search API作为示例,你需要替换成自己的API密钥和端点
    BRAVE_API_KEY = os.getenv("BRAVE_API_KEY")
    if not BRAVE_API_KEY:
        return [{"title": "API Key未配置", "url": ""}]

    try:
        response = requests.get(
            "https://api.search.brave.com/res/v1/web/search",
            headers={"X-Subscription-Token": BRAVE_API_KEY},
            params={"q": query, "count": 3, "country": "us", "search_lang": "en"},
            timeout=10
        )
        response.raise_for_status()
        data = response.json()
        web_results = data.get("web", {}).get("results", [])
        return [{"title": r.get("title"), "url": r.get("url")} for r in web_results[:2]]  # 取前2条
    except Exception as e:
        print(f"网络搜索失败: {e}")
        return [{"title": f"搜索失败: {str(e)}", "url": ""}]

def execute_tool_calls(tool_calls):
    """执行工具调用。"""
    results = []
    for tc in tool_calls:
        if tc["name"] == "search_web":
            # 在实际中,tool_calls可能是更复杂的对象,这里简化为字典
            query = tc.get("args", {}).get("query", "")
            result = search_web.invoke(query)
            results.extend(result)
    return results

# --- 3. 初始化核心组件 ---
llm = ChatOpenAI(model="gpt-4", temperature=0)
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
# 假设向量库已存在,这里加载它
vectorstore = Chroma(persist_directory="./chroma_db", embedding_function=embeddings)
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
parser = PydanticOutputParser(pydantic_object=ResearchSummary)

# --- 4. 定义提示词模板 ---
research_prompt = ChatPromptTemplate.from_messages([
    ("system", """你是一个高级研究助手。你的任务是根据用户查询进行深入研究。
你可以使用以下工具:
1. 知识库:已为你提供相关上下文。
2. 网络搜索:如需最新信息,可使用搜索工具。
请先理解问题,然后综合利用已有上下文和工具获取的信息,生成全面、准确的研究笔记。
上下文:
{context}
历史对话:
{history}
"""),
    ("human", "用户查询:{input}"),
])

summary_and_parse_prompt = ChatPromptTemplate.from_messages([
    ("system", "请将以下研究笔记整理成结构化的摘要。{format_instructions}"),
    ("human", "研究笔记:{research_notes}"),
])

# --- 5. 构建主LCEL链 ---
def format_docs(docs):
    return "\n---\n".join([d.page_content for d in docs])

# 这是一个复杂的链,包含了条件逻辑(是否调用工具)
main_chain = (
    RunnablePassthrough.assign(
        context=lambda x: format_docs(retriever.invoke(x["input"])),
        history=lambda x: x.get("history", []),  # 从状态中获取历史
    )
    | research_prompt
    | llm.bind_tools([search_web])  # 为LLM绑定工具,使其能生成工具调用请求
    | RunnableLambda(
        lambda msg: {
            "research_notes": msg.content if not hasattr(msg, 'tool_calls') else "",
            "tool_results": execute_tool_calls(getattr(msg, 'tool_calls', [])),
            "raw_ai_msg": msg
        }
    )
    # 注意:这里简化了工具结果整合回LLM的流程。完整流程需要将工具执行结果作为新消息加入历史,再让LLM生成最终回答。
    # 在LCEL中实现多轮工具调用和结果整合比较繁琐,这正是LangGraph的优势所在。
)

# --- 6. 包装记忆功能 ---
memory_store = {}
def get_session_history(session_id: str):
    if session_id not in memory_store:
        memory_store[session_id] = ChatMessageHistory()
    return memory_store[session_id]

# 由于主链现在输出字典,我们需要一个适配器来将其最终结果格式化成AI消息并存入历史
def finalize_chain(state: dict):
    """将主链的输出整合,并准备最终的结构化输出。"""
    research_notes = state["research_notes"]
    tool_results = state["tool_results"]
    raw_msg = state["raw_ai_msg"]

    # 如果有工具调用结果,将其信息融入研究笔记
    final_notes = research_notes
    if tool_results:
        sources_text = "\n网络搜索来源:\n" + "\n".join([f"- {r['title']}: {r['url']}" for r in tool_results])
        final_notes += sources_text

    # 调用总结和解析链
    summary_chain = (
        summary_and_parse_prompt.partial(format_instructions=parser.get_format_instructions())
        | llm
        | parser
    )
    structured_summary = summary_chain.invoke({"research_notes": final_notes})

    # 更新历史(这里简化,实际应添加用户消息和AI的多个消息)
    # 为了演示,我们只返回结构化结果
    return structured_summary

full_chain_with_memory = (
    RunnableWithMessageHistory(
        main_chain,
        get_session_history,
        input_messages_key="input",
        history_messages_key="history"
    )
    | RunnableLambda(finalize_chain)
)

# --- 7. 调用 ---
if __name__ == "__main__":
    try:
        config = {"configurable": {"session_id": "test_user_001"}}
        result = full_chain_with_memory.invoke(
            {"input": "LangGraph和LCEL的主要区别是什么?各自适合什么场景?"},
            config=config
        )
        print("结构化研究摘要:")
        print(result.json(indent=2))
    except Exception as e:
        print(f"执行过程中出错: {e}")

这个实现已经相当复杂,但它暴露了LCEL在处理 有状态、多步骤、有条件循环 的工作流时的局限性。例如,工具调用的结果需要被送回到LLM进行进一步处理,这可能需要循环。在LCEL中实现这种循环逻辑会非常笨拙且难以维护。

7.2 升级到LangGraph:管理复杂状态与流程

LangGraph是LangChain之上用于构建有状态、多智能体工作流的框架。它将工作流建模为 ,其中节点是处理函数,边定义了执行流。它内置了状态管理、循环、分支和人工干预等高级模式。

下面我们用LangGraph重构上面的研究助手:

from typing import TypedDict, Annotated, Sequence
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, ToolMessage
import operator

# --- 1. 定义图的状态结构 ---
class AgentState(TypedDict):
    """图的状态,所有节点共享和修改这个字典。"""
    input: str  # 用户输入
    messages: Annotated[Sequence[BaseMessage], operator.add]  # 完整的消息历史(自动追加)
    context: str  # 从知识库检索的上下文
    research_notes: str  # 研究节点生成的初步笔记
    tool_results: list  # 工具执行结果
    final_output: ResearchSummary  # 最终的结构化输出

# --- 2. 定义各个节点函数 ---
def retrieve_node(state: AgentState) -> dict:
    """节点:从知识库检索上下文。"""
    query = state["input"]
    docs = retriever.invoke(query)
    formatted_context = format_docs(docs)
    return {"context": formatted_context}

def research_node(state: AgentState) -> dict:
    """节点:进行研究分析,可能决定调用工具。"""
    # 准备带历史、上下文和工具的LLM
    model_with_tools = llm.bind_tools([search_web])
    # 构建提示词
    prompt = research_prompt.format_messages(
        context=state["context"],
        history=state["messages"],  # LangGraph会自动管理消息列表
        input=state["input"]
    )
    # 调用LLM
    ai_message: AIMessage = model_with_tools.invoke(prompt)
    # 更新消息历史(LangGraph的Annotated会自动处理追加)
    new_messages = [ai_message]
    # 返回结果
    return {
        "messages": new_messages,  # 这会被自动追加到state["messages"]
        "research_notes": ai_message.content if not ai_message.tool_calls else "",
        "tool_calls_to_make": ai_message.tool_calls  # 传递给下一个节点判断
    }

def should_use_tools(state: AgentState) -> str:
    """条件判断函数:决定下一步是调用工具还是直接总结。"""
    # 检查上一步研究节点产生的消息中是否有工具调用
    last_message = state["messages"][-1]
    if hasattr(last_message, 'tool_calls') and last_message.tool_calls:
        return "call_tools"  # 边名
    else:
        return "summarize"   # 边名

def tools_node(state: AgentState) -> dict:
    """节点:执行工具调用。"""
    last_message = state["messages"][-1]
    tool_calls = last_message.tool_calls
    tool_messages = []
    for tc in tool_calls:
        if tc["name"] == "search_web":
            result = search_web.invoke(tc["args"]["query"])
            # 创建ToolMessage,这是LangGraph/LangChain中工具执行结果的标准格式
            tool_msg = ToolMessage(content=str(result), tool_call_id=tc["id"], name=tc["name"])
            tool_messages.append(tool_msg)
    # 将工具执行结果也作为消息追加,以便后续节点(如LLM)能看到
    return {"messages": tool_messages, "tool_results": [tm.content for tm in tool_messages]}

def summarize_node(state: AgentState) -> dict:
    """节点:总结研究结果并解析为结构化格式。"""
    # 收集所有需要总结的信息:研究笔记 + 工具结果
    notes_to_summarize = state.get("research_notes", "")
    if state.get("tool_results"):
        notes_to_summarize += "\n工具搜索结果:\n" + "\n".join(state["tool_results"])

    # 调用总结链
    summary_chain = (
        summary_and_parse_prompt.partial(format_instructions=parser.get_format_instructions())
        | llm
        | parser
    )
    structured_output: ResearchSummary = summary_chain.invoke({"research_notes": notes_to_summarize})

    # 生成最终的AI回复消息
    final_ai_msg = AIMessage(content=f"研究完成。主题:{structured_output.subject}")
    return {
        "messages": [final_ai_msg],
        "final_output": structured_output
    }

# --- 3. 构建图 ---
workflow = StateGraph(AgentState)

# 添加节点
workflow.add_node("retrieve", retrieve_node)
workflow.add_node("research", research_node)
workflow.add_node("call_tools", tools_node)
workflow.add_node("summarize", summarize_node)

# 设置入口点
workflow.set_entry_point("retrieve")

# 添加边(定义执行流)
workflow.add_edge("retrieve", "research")  # 检索完后必然进行研究
workflow.add_conditional_edges(
    "research",
    should_use_tools,  # 条件判断函数
    {
        "call_tools": "call_tools",  # 如果需要工具,去tools节点
        "summarize": "summarize",    # 否则,直接去总结节点
    }
)
workflow.add_edge("call_tools", "summarize")  # 工具执行完后去总结
workflow.add_edge("summarize", END)  # 总结完成后结束

# --- 4. 编译图,并添加记忆(检查点)---
memory = MemorySaver()  # 内存检查点,生产环境可用数据库后端
app = workflow.compile(checkpointer=memory)

# --- 5. 运行图 ---
if __name__ == "__main__":
    # 初始化状态
    initial_state = {
        "input": "LangGraph和LCEL的主要区别是什么?各自适合什么场景?",
        "messages": [],  # 初始为空
        "context": "",
        "research_notes": "",
        "tool_results": [],
    }
    config = {"configurable": {"thread_id": "user_001"}}  # thread_id类似session_id

    # 执行图
    final_state = app.invoke(initial_state, config=config)
    print("最终状态:")
    print(final_state.keys())
    if "final_output" in final_state:
        print("\n结构化输出:")
        print(final_state["final_output"].json(indent=2))

LangGraph带来的核心优势:

  1. 显式状态管理 :所有节点都读写一个共享的 AgentState 字典,数据流一目了然,避免了LCEL链中复杂的嵌套字典操作。
  2. 灵活的流程控制 add_conditional_edges 让你能基于LLM的输出或其他状态轻松实现分支。循环(例如,工具调用后回到研究节点进行进一步分析)也可以轻松实现。
  3. 内置的持久化 MemorySaver 等检查点机制可以保存整个图的状态,支持暂停、恢复和异步执行,这对于需要人工审核或长时间运行的任务至关重要。
  4. 更好的可观测性 :每个节点是独立的函数,易于单独测试、调试和监控。图的执行路径也更容易可视化。

8. 总结与核心建议

回顾这五个功能: 提示模板 让提示工程可维护; 消息历史 让对话有了记忆; LCEL 让简单的工作流组合变得优雅; 向量存储抽象 让RAG触手可及; 输出解析器 让非确定性的LLM输出变得可靠。而 LangGraph ,则是将这些乐高积木组装成复杂、健壮、可生产部署的智能体的最终框架。

在实际项目中,我们的选择策略通常是:

  • 快速原型、简单线性流程 :优先使用LCEL,开发速度极快。
  • 涉及复杂状态、循环、分支或多智能体协作 :毫不犹豫地选择LangGraph。
  • 提示词管理、记忆、RAG、结构化输出 :在任何项目中都优先使用LangChain提供的标准组件,避免重复造轮子。

最后,一个关键的体会是: 不要试图用一个超级复杂的LCEL链解决所有问题 。当链的复杂度开始让你感到困惑时,就是应该考虑将其拆分成LangGraph中多个节点的时候了。清晰的模块边界和状态流,是维护长期项目稳定性的基石。希望这些从日常实战中提炼出的模式和代码片段,能帮助你更高效、更自信地构建下一个LLM应用。

Logo

汇聚全球AI编程工具,助力开发者即刻编程。

更多推荐