LangGraph核心组件--持久化记忆
·
记忆介绍:
是一种认知功能允许ai存储,检索和使用信息。
在langgraph中记忆分为短期记忆和长期记忆。但是无论哪种记忆方式都要激活持久化层。
短期记忆:
适合简单应用,只需要在会话中保持基本的状态,例如保存用户名或者简单偏好。
长期记忆:
适合跨会话学习和适应高级ai应用,如个人助理或者客户服务机器人。
关于记忆的介绍我个人也在我的langchain学习中有提到过。
链接如下:会话记忆

线程隔离的持久化层
没有激活持久化层的化时不会具备记忆的。一下代码是没有进行激活持久化的, 后续对于线程标识绑定为每个线程分配唯一标识(如线程ID),并在存储层中将其作为数据分区键,确保线程间数据隔离: 注意事项 线程安全:存储层需支持并发读写(如数据库连接池、锁机制)。 生命周期管理:线程结束时清理资源(如注册threading退出钩子)。 扩展性:支持更换存储后端(如Redis、内存缓存)。
from langchain_deepseek import ChatDeepSeek
import os
from dotenv import load_dotenv
from langgraph.graph import StateGraph,MessagesState,START
load_dotenv()
model= ChatDeepSeek(
model="deepseek-chat",
api_key=os.environ.get("DEEPSEEK_API_KEY"),
base_url="https://api.deepseek.com",
temperature=0.8
)
def model_call(state:MessagesState):
response=model.invoke(state["messages"])
return {"messages": response}
builder=StateGraph(MessagesState)
builder.add_node("call_model",model_call)
builder.add_edge(START,"call_model")
graph=builder.compile()
需要加上以下代码才可以:
from langgraph.checkpoint.memory import MemorySaver
# 使用MemorySaver 保存中间状态
memory=MemorySaver()
graph=builder.compile(memory)
config = {"configurable": {"thread_id": "1"}}
input_message = {"role": "user", "content": "hi! 我是tomie"}
for chunk in graph.stream({"messages": [input_message]}, config, stream_mode="values"):
chunk["messages"][-1].pretty_print()
input_message = {"role": "user", "content": "我叫什么名字?"}
for chunk in graph.stream({"messages": [input_message]}, config, stream_mode="values"):
chunk["messages"][-1].pretty_print()
# 如果我修改config中的线程名称再次询问时就不会知道你叫什么
input_message = {"role": "user", "content": "我叫什么名字?"}
for chunk in graph.stream({"messages": [input_message]}, {"configurable": {"thread_id": "2"}}, stream_mode="values"):
chunk["messages"][-1].pretty_print()
跨线程共享持久化数据- userid设置内存记忆
from langchain_community.embeddings import DashScopeEmbeddings
from langgraph.store.memory import InMemoryStore
from langchain_openai import OpenAIEmbeddings
import os
from dotenv import load_dotenv
load_dotenv()
# 使用国产嵌入模型 阿里系的模型
# 使用内存存储来保存向量化后记忆数据
in_memory_store = InMemoryStore(
index={
"embed":DashScopeEmbeddings(
dashscope_api_key=os.getenv("DASHSCOPE_API_KEY"),
model="text-embedding-v3") ,
"dims": 1024,
}
)
import uuid
from typing import Annotated
from typing_extensions import TypedDict
from langchain_deepseek import ChatDeepSeek
import os
from langchain_core.runnables import RunnableConfig
from langgraph.graph import StateGraph, MessagesState, START
from langgraph.checkpoint.memory import MemorySaver
from langgraph.store.base import BaseStore
model= ChatDeepSeek(
model="deepseek-chat",
api_key=os.environ.get("DEEPSEEK_API_KEY"),
base_url="https://api.deepseek.com",
temperature=0.8
)
# 注意:我们将 Store 参数传递给节点 --
# 这是我们编译图时使用的 Store
def call_model(state: MessagesState, config: RunnableConfig, *, store: BaseStore):
# 从存储中检索用户信息
user_id = config["configurable"]["user_id"]
# 从存储中检索用户信息
namespace = ("memories", user_id)
memories = store.search(namespace, query=str(state["messages"][-1].content))
info = "\n".join([d.value["data"] for d in memories])
system_msg = f"你是一个正在与用户交谈的小助手。用户信息:{info}"
# 如果用户要求模型记住信息,则存储新的记忆
last_message = state["messages"][-1]
if "记住" in last_message.content.lower() or "remember" in last_message.content.lower():
# 硬编码一个记忆
memory = "用户名字是tomiezhang"
store.put(namespace, str(uuid.uuid4()), {"data": memory})
response = model.invoke(
[{"role": "system", "content": system_msg}] + state["messages"]
)
return {"messages": response}
builder = StateGraph(MessagesState)
builder.add_node("call_model", call_model)
builder.add_edge(START, "call_model")
graph=builder.compile(checkpointer=MemorySaver(), store=in_memory_store)
"""进行一个初始化记忆"""
config={"configurable": {"thread_id": "1", "user_id": "1"} }
for chunk in graph.stream({"messages": [{"role": "user", "content": "hi! 请记住我是tomie"}]}, config, stream_mode="values"):
chunk["messages"][-1].pretty_print()
运行结果:
"""
================================ Human Message =================================
hi! 请记住我是tomie
================================== Ai Message ==================================
好的,Tomie,我已经记住啦!你正式成为我“最特别的朋友名单”里的第一位~以后每次对话我都会记得你的名字,就像记住一首喜欢的歌一样!😊
有什么想聊的,随时找我哦~
"""
切换一个线程然后相同的用户id去访问
config={"configurable": {"thread_id": "2", "user_id": "1"} }
for chunk in graph.stream({"messages": [{"role": "user", "content": "我叫什么名字"}]}, config, stream_mode="values"):
chunk["messages"][-1].pretty_print()
"""
运行结果如下:
================================ Human Message =================================
我叫什么名字
================================== Ai Message ==================================
您叫**tomiezhang**!😊 之前您提到过,我已经记住啦~
"""
记忆的实现
基于内存实现:
from typing import Literal
from langchain_deepseek import ChatDeepSeek
from langchain_core.tools import tool
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import MessagesState, StateGraph, START, END
from langgraph.prebuilt import ToolNode
# 注意:使用内存存储来存储记忆
memory = MemorySaver()
@tool
def search(query: str):
"""调用此函数可以浏览网络。"""
# 模拟一个网络搜索返回
return "北京天气晴朗 大约22度 湿度30%"
tools = [search]
tool_node = ToolNode(tools)
model= ChatDeepSeek(
model="deepseek-chat",
api_key=os.environ.get("DEEPSEEK_API_KEY"),
base_url="https://api.deepseek.com",
temperature=0.8
)
bound_model = model.bind_tools(tools)
def should_continue(state: MessagesState):
"""返回下一个要执行的节点。"""
last_message = state["messages"][-1]
# 如果没有函数调用,则结束
if not last_message.tool_calls:
return END
# 否则如果有,我们继续
return "action"
# 定义调用模型的函数
def call_model(state: MessagesState):
response = bound_model.invoke(state["messages"])
# 我们返回一个列表,因为这会被添加到现有列表中
return {"messages": response}
# 定义一个图
workflow = StateGraph(MessagesState)
# 定义我们将在其间循环的两个节点
workflow.add_node("agent", call_model)
workflow.add_node("action", tool_node)
# 将入口点设置为 `agent`
# 这意味着这个节点是第一个被调用的
workflow.add_edge(START, "agent")
# 现在我们添加一个条件边
workflow.add_conditional_edges(
# 首先,我们定义起始节点。我们使用 `agent`。
# 这意味着这些是在 `agent` 节点被调用后采取的边。
"agent",
# 接下来,我们传入将确定下一个调用哪个节点的函数。
should_continue,
# 接下来,我们传入路径映射 - 这条边可能去往的所有可能节点
["action", END],
)
# 现在我们从 `tools` 到 `agent` 添加一个普通边。
# 这意味着在调用 `tools` 之后,接下来调用 `agent` 节点。
workflow.add_edge("action", "agent")
# 最后,我们编译它!
# 这将它编译成一个 LangChain Runnable,
# 设置检查点为内存形式,注意没有设置store
app = workflow.compile(checkpointer=memory)
测试:
from langchain_core.messages import HumanMessage, SystemMessage
# 关键配置:指定 thread_id 为 "2"
config = {"configurable": {"thread_id": "20"}}
input_message = HumanMessage(content="hi! 我是tomie")
for event in app.stream({"messages": [input_message]}, config, stream_mode="values"):
event["messages"][-1].pretty_print()
input_message = HumanMessage(content="我叫什么名字?")
for event in app.stream({"messages": [input_message]}, config, stream_mode="values"):
event["messages"][-1].pretty_print()
基于MongoDB来实现:
from langchain.agents import create_agent
# 创建一个最简单的智能体
from typing import Literal
from langchain_core.tools import tool
from langchain_deepseek import ChatDeepSeek
from dotenv import load_dotenv
load_dotenv()
model = ChatDeepSeek(
model="deepseek-chat",
api_key=os.environ.get("DEEPSEEK_API_KEY"),
base_url="https://api.deepseek.com",
temperature=0.8
)
@tool
def search(city: Literal["北京", "广州", "深圳"]):
"""调用此函数可以查询天气。"""
if city == "北京":
return "北京天气晴朗 大约22度 湿度30%"
elif city == "深圳":
return "深圳天气晴朗 大约25度 湿度27%"
else:
return "没有这个城市的天气信息"
tools=[search]
# 连接MongoDB 进行查询
from langgraph.checkpoint.mongodb import MongoDBSaver
MONGODB_URL="localhost:27017"
with MongoDBSaver.from_conn_string(MONGODB_URL) as checkpointer:
graph=create_agent(model, tools, checkpointer= checkpointer)
config={"configurable": {"thread_id": "1", "user_id": "1"} }
response = graph.invoke(
{"messages": [("human", "北京的天气怎么样?")]},config
)
print(response)
测试:
with MongoDBSaver.from_conn_string(MONGODB_URL) as checkpointer:
graph=create_agent(model, tools, checkpointer= checkpointer)
config={"configurable": {"thread_id": "1", "user_id": "1"} }
response = graph.invoke({"messages": [("human","我上次询问的是什么")]},config)
print(response["messages"][-1].content)
优化记忆
- 消息过滤:对旧的消息进行类似删除或者编辑的操作,目的是为了防止撑爆上下文
- 消息总结:对消息进行总结,将相似的消息进行合并,从而减少上下文长度
- 注意对记忆的管理是一项关于召回率和精度的平衡艺术
from langchain_core.messages import RemoveMessage,SystemMessage,HumanMessage
from typing import Literal
from langchain_deepseek import ChatDeepSeek
from langchain_core.tools import tool
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import MessagesState, StateGraph, START
from langgraph.prebuilt import ToolNode
memory = MemorySaver()
class State(MessagesState):
summary:str = ""
model = ChatDeepSeek(
model="deepseek-chat",
api_key=os.environ.get("DEEPSEEK_API_KEY"),
base_url="https://api.deepseek.com",
temperature=0.8
)
# 定义调用模型的函数
def call_model(state: State):
summary=state.get("summary","")
if summary:
system_message=f"之前的对话摘要为:{summary}"
messages=[SystemMessage(content=system_message)]+state["messages"]
else:
messages=state["messages"]
response = model.invoke(messages)
return {"messages": [response]}
def print_update(update):
for k,v in update.items():
for m in v["messages"]:
m.pretty_print()
if "summary" in v:
print(v["summary"])
def should_continue(state: State)-> Literal["summarize_conversation",END]:
"""返回下一个要执行的节点"""
messages=state["messages"]
if len(messages)>=6 :
return "summarize_conversation"
return END
def summarize_conversation(state: State):
#首先要总结对话
summary=state.get("summary","")
if summary:
summary_message=(f"之前的对话摘要为:{summary}\n\n"
"考虑上面的新消息扩展摘要")
else:
summary_message=("请创建上述对话的摘要")
messages=state["messages"]+[HumanMessage(content=summary_message)]
response = model.invoke(messages)
# 现在我们需要删除我们不在想要显示的内容
# 因此我将删除除最后两条以外的消息
delete_messages=[RemoveMessage(id=m.id) for m in state["messages"][:-2]]
return {"messages": delete_messages, "summary": response.content}
# 定义一个新图
workflow = StateGraph(State)
# 定义我们将在其间循环的两个节点
workflow.add_node("conversation", call_model)
workflow.add_node("summarize_conversation",summarize_conversation)
# 将入口点设置为 `agent`
# 这意味着这个节点是第一个被调用的
workflow.add_edge(START, "conversation")
workflow.add_conditional_edges("conversation", should_continue, ["summarize_conversation",END])
workflow.add_edge("summarize_conversation", END)
app=workflow.compile(checkpointer=memory)
from langchain_core.messages import HumanMessage
config = {"configurable": {"thread_id": "4"}}
input_message = HumanMessage(content="hi! 我是tomie")
input_message.pretty_print()
for event in app.stream({"messages": [input_message]}, config, stream_mode="updates"):
print_update(event)
input_message = HumanMessage(content="我叫什么名字?")
input_message.pretty_print()
for event in app.stream({"messages": [input_message]}, config, stream_mode="updates"):
print_update(event)
input_message = HumanMessage(content="我喜欢AI应用开发!")
input_message.pretty_print()
for event in app.stream({"messages": [input_message]}, config, stream_mode="updates"):
print_update(event)
"""
输出结果
================================ Human Message =================================
hi! 我是tomie
================================== Ai Message ==================================
你好呀,Tomie!👋 很高兴认识你~
有什么我可以帮你的吗?无论是聊天、解答问题,还是需要一点灵感,我都在这里哦~ 😊
================================ Human Message =================================
我叫什么名字?
================================== Ai Message ==================================
你的名字是 **Tomie** 呀~刚才你告诉我的!😊
不过如果你想换个称呼,也可以随时告诉我,我都可以记住哦~
================================ Human Message =================================
我喜欢AI应用开发!
================================== Ai Message ==================================
哇,太棒了!🎉 **AI应用开发** 是个超酷的领域~
从自然语言处理、图像识别,到智能推荐系统,甚至是自动化工作流,都有很多有趣的东西可以玩!
你目前有在做什么项目吗?或者有没有特别感兴趣的方向?比如:
- 🤖 聊天机器人 / 虚拟助手
- 🎨 生成式 AI(图像、音乐、文本)
- 📊 数据分析 + 机器学习模型
- 🧠 RAG(检索增强生成)或 Agent 应用
- 🛠️ 用 LangChain、AutoGPT 等工具搭建应用
随便聊聊,说不定我能帮你出出主意或者提供一些灵感呢~ 😄
================================ Remove Message ================================
================================ Remove Message ================================
================================ Remove Message ================================
================================ Remove Message ================================
以下是上述对话的摘要:
**对话摘要**
- 用户名为 **Tomie**。
- Tomie 表示喜欢 **AI 应用开发**。
- 助手对此表示赞赏,并列举了多个 AI 开发方向(如聊天机器人、生成式 AI、数据分析、RAG 等),邀请 Tomie 分享具体兴趣或项目。
- 最后,助手表示愿意提供灵感或建议。
"""
更多推荐

所有评论(0)