大模型开发手记(七):LangChain Agent 流式输出:实时交互体验
·
前言
- 什么是流式传输,就是不会等任务完成后直接返回结果,而是把任务进行中的进展实时返回,主要用途就是让用户实时看到进展,避免等待太长时间影响用户体验。
- LangChain 的 流式输出 (Streaming),就是来解决这个问题的。它能将 Agent 的思考过程、中间步骤、甚至生成的每一个字,像直播一样实时推送到你面前。
- 这篇博客主要讲述langchain agent的流式输出怎么用,显示效果是怎样的。
一、什么是流式输出?
- 简单来说,流式输出就是把 Agent 的工作过程和最终结果,分成无数个小片段(chunk),按顺序实时地发送给用户。你的应用界面不再是“死等”一个巨大的响应,而是可以逐字显示回复,或者实时更新 Agent 的执行状态。
- LangChain 的流式系统非常强大,他能做到:
- Agent 的进度:比如“正在调用天气工具”、“工具返回结果”、“正在生成最终回答”。
- LLM 的 Token:模型生成的每一个字,甚至工具调用的参数,都可以逐个字符地显示出来。
- 模型的“思考”过程:某些模型(如 Claude)在给出答案前会有内部的推理/思考步骤,这些思考过程也可以实时展示。
- 自定义更新:你可以在工具里自己定义流式传输的内容,比如“已下载 10/100 个文件”。
二、流式输出的三种核心模式
LangChain 提供了三种主要的流式传输模式。核心就是 agent.stream() 方法里的 stream_mode 参数。
| 模式 | 直播内容 | 形象比喻 |
|---|---|---|
| updates | Agent 的步骤更新。每执行完一个步骤(如调用一个工具),就发送一次完整的步骤结果(就是一个message)。 | 就像看直播带货,主播每介绍完一个商品(步骤),就给你看这个商品的完整信息。 |
| custom | 自定义数据流。在工具或中间件代码中,主动用 writer() 发送任意数据。 | 就像主播(工具)自己拿手机,直播后台的准备情况。 |
| 组合模式 | 一般是custom 和 updates 组合 |
三、updates模式、values模式
-
这两个模式本质上是以agent的每一个message为基本单位返回,包含ToolMessage、HumanMessage、AIMessage三类,updates模式会只返回增量的BaseMessage,而values属于叠加,除了新增的这个,之前返回的BaseMessage也会叠加在当前chunk中。
# 只返回一个BaseMessage实例 chunk_iterator = agent.stream( {"messages": [{"role": "user", "content": "旧金山天气如何?"}]}, stream_mode="updates" ) for chunk in chunk_iterator: print(chunk)
-
我们可以再看一下values的模式,我们看一下chunk长什么样子
# 只返回一个BaseMessage实例 chunk_iterator = agent.stream( {"messages": [{"role": "user", "content": "旧金山天气如何?"}]}, stream_mode="values" ) for chunk in chunk_iterator: print(chunk)
for chunk in self.agent.stream(input_dict, stream_mode="values"): msg = chunk["messages"][-1] if isinstance(msg, HumanMessage): print(f"【人类问题】:{msg.content}") # 1. 模型思考/最终回答 if isinstance(msg,AIMessage): if hasattr(msg, "content") and msg.content: print(f"🤖 思考/回答: {msg.content}") # 2. 工具调用 if hasattr(msg, "tool_calls") and msg.tool_calls: for tc in msg.tool_calls: tool_name = tc.get("name", "unknown") tool_args = tc.get("args", {}) print(f"【🤖agent大脑】决定调用调用工具: {tool_name}") print(f" 入参: {tool_args}") # 3. 工具返回结果 if isinstance(msg,ToolMessage): # if hasattr(msg, "additional_kwargs") and "tool_call_id" in msg.additional_kwargs: print(f"✅ 🛠️ 工具返回内容: {msg.content}")
四、custom模式
-
我们可以通过custom模式,在工具内部控制流式输出的内容,主要是通过组件
get_stream_writer完成def get_weather(city: str) -> str: """Get weather for a given city.""" writer = get_stream_writer() # 获取流式写入器 writer(f"🔍 正在查询城市 '{city}' 的数据...") # 模拟耗时操作 # ... 执行一些操作 ... writer(f"✅ 数据查询完成,正在处理...") return f"{city},阳光明媚!" # 模拟返回 agent = create_agent( model=model, # 假设的模型 tools=[get_weather], ) iterator = agent.stream( input={"messages": [{"role": "user", "content": "旧金山天气如何?"}]}, stream_mode="custom", ) for chunk in iterator: print(chunk)
五、组合模式
-
我们一般生产上,可以指定组合模式,一般是updates和custom,这样不仅能实时传输agent的每一步的message,也可以实时传输我们在工具中定义的内容。
iterator = agent.stream( input={"messages": [{"role": "user", "content": "旧金山天气如何?"}]}, stream_mode=["updates", "custom"], )

更多推荐




所有评论(0)