前言

  1. 什么是流式传输,就是不会等任务完成后直接返回结果,而是把任务进行中的进展实时返回,主要用途就是让用户实时看到进展,避免等待太长时间影响用户体验。
  2. LangChain 的 流式输出 (Streaming),就是来解决这个问题的。它能将 Agent 的思考过程、中间步骤、甚至生成的每一个字,像直播一样实时推送到你面前。
  3. 这篇博客主要讲述langchain agent的流式输出怎么用,显示效果是怎样的。

一、什么是流式输出?

  1. 简单来说,流式输出就是把 Agent 的工作过程和最终结果,分成无数个小片段(chunk),按顺序实时地发送给用户。你的应用界面不再是“死等”一个巨大的响应,而是可以逐字显示回复,或者实时更新 Agent 的执行状态。
  2. LangChain 的流式系统非常强大,他能做到:
    • Agent 的进度:比如“正在调用天气工具”、“工具返回结果”、“正在生成最终回答”。
    • LLM 的 Token:模型生成的每一个字,甚至工具调用的参数,都可以逐个字符地显示出来。
    • 模型的“思考”过程:某些模型(如 Claude)在给出答案前会有内部的推理/思考步骤,这些思考过程也可以实时展示。
    • 自定义更新:你可以在工具里自己定义流式传输的内容,比如“已下载 10/100 个文件”。

二、流式输出的三种核心模式

LangChain 提供了三种主要的流式传输模式。核心就是 agent.stream() 方法里的 stream_mode 参数。

模式 直播内容 形象比喻
updates Agent 的步骤更新。每执行完一个步骤(如调用一个工具),就发送一次完整的步骤结果(就是一个message)。 就像看直播带货,主播每介绍完一个商品(步骤),就给你看这个商品的完整信息。
custom 自定义数据流。在工具或中间件代码中,主动用 writer() 发送任意数据。 就像主播(工具)自己拿手机,直播后台的准备情况。
组合模式 一般是custom 和 updates 组合

三、updates模式、values模式

  1. 这两个模式本质上是以agent的每一个message为基本单位返回,包含ToolMessage、HumanMessage、AIMessage三类,updates模式会只返回增量的BaseMessage,而values属于叠加,除了新增的这个,之前返回的BaseMessage也会叠加在当前chunk中。

    # 只返回一个BaseMessage实例
    chunk_iterator = agent.stream(
        {"messages": [{"role": "user", "content": "旧金山天气如何?"}]},
        stream_mode="updates"
    )
    for chunk in chunk_iterator:
        print(chunk)
    

  2. 我们可以再看一下values的模式,我们看一下chunk长什么样子

    # 只返回一个BaseMessage实例
    chunk_iterator = agent.stream(
        {"messages": [{"role": "user", "content": "旧金山天气如何?"}]},
        stream_mode="values"
    )
    for chunk in chunk_iterator:
        print(chunk)
    

    在这里插入图片描述

            for chunk in self.agent.stream(input_dict, stream_mode="values"):
                msg = chunk["messages"][-1]
                if isinstance(msg, HumanMessage):
                    print(f"【人类问题】:{msg.content}")
                # 1. 模型思考/最终回答
                if isinstance(msg,AIMessage):
                    if hasattr(msg, "content") and msg.content:
                        print(f"🤖 思考/回答: {msg.content}")
    
                    # 2. 工具调用
                    if hasattr(msg, "tool_calls") and msg.tool_calls:
                        for tc in msg.tool_calls:
                            tool_name = tc.get("name", "unknown")
                            tool_args = tc.get("args", {})
                            print(f"【🤖agent大脑】决定调用调用工具: {tool_name}")
                            print(f"   入参: {tool_args}")
    
                # 3. 工具返回结果
                if isinstance(msg,ToolMessage):
                    # if hasattr(msg, "additional_kwargs") and "tool_call_id" in msg.additional_kwargs:
                        print(f"✅ 🛠️ 工具返回内容: {msg.content}")
    

四、custom模式

  1. 我们可以通过custom模式,在工具内部控制流式输出的内容,主要是通过组件get_stream_writer完成

    def get_weather(city: str) -> str:
        """Get weather for a given city."""
        writer = get_stream_writer()  # 获取流式写入器
        writer(f"🔍 正在查询城市 '{city}' 的数据...")
        # 模拟耗时操作
        # ... 执行一些操作 ...
        writer(f"✅ 数据查询完成,正在处理...")
        return f"{city},阳光明媚!"  # 模拟返回
    
    agent = create_agent(
        model=model,  # 假设的模型
        tools=[get_weather],
    )
    
    iterator = agent.stream(
        input={"messages": [{"role": "user", "content": "旧金山天气如何?"}]},
        stream_mode="custom",
    )
    
    for chunk in iterator:
        print(chunk)
    

    在这里插入图片描述

五、组合模式

  1. 我们一般生产上,可以指定组合模式,一般是updates和custom,这样不仅能实时传输agent的每一步的message,也可以实时传输我们在工具中定义的内容。

    iterator = agent.stream(
        input={"messages": [{"role": "user", "content": "旧金山天气如何?"}]},
        stream_mode=["updates", "custom"],
    )
    

    在这里插入图片描述
    在这里插入图片描述

Logo

汇聚全球AI编程工具,助力开发者即刻编程。

更多推荐