内容参考于:图灵AI大模型全栈

上一个内容中写了LCEL链,通过__or__函数实现,在LangChain中它把这个东西封装成了Runable类型,在LangChain我们引入的常用的类都是继承于Runable类型

下方是它的使用示例,用了很多管道的写法

# ====================== 导入依赖库 ======================
# LangChain OpenAI兼容接口客户端:用于调用支持OpenAI API规范的大模型(此处为阿里云通义千问)
from langchain_openai import ChatOpenAI
# 系统消息类:向大模型传递固定角色设定和规则
from langchain_core.messages import SystemMessage
# 聊天提示词模板:结构化构建大模型的输入,支持参数动态填充
from langchain_core.prompts import ChatPromptTemplate
# JSON输出解析器:将大模型的文本输出自动解析为Python字典,保证结构化输出
from langchain_core.output_parsers import JsonOutputParser
# 内存型向量存储:轻量级向量数据库,数据仅存于内存,无需持久化,适合演示和小数据量场景
from langchain_core.vectorstores import InMemoryVectorStore
# LangChain可运行组件:用于构建流式、可组合的AI流程管道
# RunnableMap:并行执行多个任务,统一输入,输出合并为字典
# RunnableBranch:根据条件判断执行不同分支,实现流程分支逻辑
# RunnableLambda:将普通Python函数包装为可运行组件,接入管道
from langchain_core.runnables import RunnableMap, RunnableBranch, RunnableLambda
# Tavily搜索工具:专为AI代理设计的实时搜索引擎,返回结构化、适合LLM处理的网页内容
# 官网:https://www.tavily.com/,支持实时搜索、深度研究、内容提取等功能
from langchain_tavily import TavilySearch
# HuggingFace嵌入模型封装:加载本地或HuggingFace Hub上的文本嵌入模型,将文本转为高维向量
from langchain_huggingface import HuggingFaceEmbeddings
# 加载.env文件工具:从环境变量文件读取敏感信息(API密钥),避免硬编码
from dotenv import load_dotenv
# 操作系统接口:用于读取系统环境变量
import os

# 加载当前目录下.env文件中的环境变量
# .env文件格式示例:
# DASHSCOPE_API_KEY=你的阿里云通义千问API密钥
# TAVILY_API_KEY=你的Tavily搜索API密钥
load_dotenv()


class TravelQASystem:
    def __init__(self, openai_api_key, tavily_api_key, embed_path):
        """
        初始化旅游问答系统核心组件
        入参说明:
        1. openai_api_key: str
           - 含义:阿里云DashScope平台的API密钥
           - 值来源:.env文件中的DASHSCOPE_API_KEY,需在https://dashscope.console.aliyun.com/申请
        2. tavily_api_key: str
           - 含义:Tavily搜索平台的API密钥(原代码变量名serpapi_api_key为错误命名,已修正)
           - 值来源:.env文件中的TAVILY_API_KEY,需在https://www.tavily.com/注册申请
        3. embed_path: str
           - 含义:文本嵌入模型的本地路径或HuggingFace Hub模型名
           - 值来源:用户本地下载的模型文件夹路径(如示例中的BGE中文模型)
           - 可选值:也可直接传"HuggingFace Hub模型名"(如"BAAI/bge-large-zh-v1.5"),会自动下载
        """
        # ====================== 1. 初始化大语言模型 ======================
        # 机制:通过OpenAI兼容接口调用阿里云通义千问
        # 阿里云DashScope提供了与OpenAI完全兼容的API端点,因此可直接复用ChatOpenAI类
        self.llm = ChatOpenAI(
            api_key=openai_api_key,  # 通义千问API密钥
            # API基础URL:阿里云DashScope OpenAI兼容模式固定端点
            # 原文档解析失败但URL本身有效,为官方标准地址
            base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
            model="qwen-plus"  # 调用的模型名称,可选值:qwen-turbo/qwen-plus/qwen-max等
        )

        # ====================== 2. 初始化实时搜索工具 ======================
        # 机制:调用Tavily API获取互联网实时数据(如天气、最新景点信息)
        # Tavily优势:专为LLM优化,返回内容已分块、去重、结构化,延迟低(p50仅180ms)
        self.search = TavilySearch(tavily_api_key=tavily_api_key)

        # ====================== 3. 初始化文本嵌入模型 ======================
        # 机制:将文本转换为高维向量,用于后续向量相似度检索
        # 示例使用BGE中文大模型,是目前效果最好的中文嵌入模型之一
        self.embeddings = HuggingFaceEmbeddings(model_name=embed_path)

        # ====================== 4. 构建本地景点知识库 ======================
        # 本地静态景点数据,实际项目中可从数据库、CSV文件或爬虫获取
        self.attraction_data = [
            "故宫:北京地标,明清皇宫,开放时间8:30-17:00",
            "颐和园:皇家园林,昆明湖、长廊等景点",
            "八达岭长城:距离市区70公里,建议游览3-4小时"
        ]

        # ====================== 5. 初始化内存向量存储 ======================
        # 机制:将本地景点文本转换为向量并存储在内存中,实现快速相似度检索
        # from_texts方法:自动批量向量化文本并创建向量存储实例
        self.vector_store = InMemoryVectorStore.from_texts(
            texts=self.attraction_data,  # 要向量化的文本列表
            embedding=self.embeddings,   # 用于向量化的嵌入模型
            k=1  # 检索时返回的最相似结果数量,此处只返回最匹配的1条
        )

    def setup_runnable_pipeline(self):
        """定义LangChain Runnable流程管道,实现问答全流程编排"""
        # ====================== 阶段1:问题解析模块 ======================
        # 功能:从用户自然语言问题中提取结构化信息(地点、查询类型)
        # 提示词模板:明确要求大模型以指定JSON格式输出,保证解析稳定性
        parse_prompt = ChatPromptTemplate.from_messages([
            SystemMessage(content="你是旅游助手,需从用户问题中提取地点和查询类型(天气/景点介绍/行程规划)"),
            ("user", """问题:{user_question}请以JSON格式返回:{{"location": "地点", "type": "查询类型"}}""")
        ])
        # 管道串联:提示词模板 → 大模型 → JSON解析器
        # 输出:Python字典,格式如{"location": "北京", "type": "天气"}
        parse_module = parse_prompt | self.llm | JsonOutputParser()

        # ====================== 阶段2:数据获取模块 ======================
        # 子模块1:实时天气查询(调用Tavily搜索)
        # RunnableLambda:将lambda函数包装为可运行组件,接入管道
        weather_query = RunnableLambda(
            lambda x: self.search.invoke(f"{x['location']} 今日天气")
        )

        # 子模块2:本地景点信息检索(向量相似度检索)
        # 管道:提取location → 向量检索器 → 提取检索结果的文本内容
        # self.vector_store.as_retriever()是Runable类型,Runable类型会前后自动转换所以把(lambda x: x["location"])转换成了RunnableLambda
        attraction_retrieval = (lambda x: x["location"]) | self.vector_store.as_retriever() | (
            lambda x: x[0].page_content
        )

        # 并行数据获取:RunnableMap同时执行天气查询和景点检索
        # 机制:多个任务并行执行,共享输入,输出合并为字典,提高效率
        data_acquisition = RunnableMap({
            "weather": weather_query,       # 键:weather,值:Tavily返回的天气结果
            "attraction": attraction_retrieval,  # 键:attraction,值:本地景点文本
            "location": (lambda x: x["location"])  # 透传地点信息,供后续使用
        })

        # ====================== 阶段3:回答生成模块 ======================
        # 功能:整合所有获取到的信息,生成自然语言回答
        generate_prompt = ChatPromptTemplate.from_messages([
            SystemMessage(content="你是专业旅游顾问,需结合景点信息和天气生成建议"),
            ("user", """地点:{location}
                景点信息:{attraction}
                天气情况:{weather}
                请生成1条行程建议,包含注意事项(如天气相关准备)""")
        ])
        # 管道:提示词模板 → 大模型 → 提取回答内容并去除首尾空白
        generate_module = generate_prompt | self.llm | (lambda x: x.content.strip())

        # ====================== 阶段4:全流程串联(含条件分支) ======================
        # RunnableBranch机制:根据输入条件动态选择执行分支
        # 格式:RunnableBranch( (条件函数1, 分支1), (条件函数2, 分支2), 默认分支 )
        self.travel_qa_pipeline = (
            # 第一步:解析用户问题,得到结构化的location和type
            parse_module
            # 统一输出格式,为后续分支做准备
            | (lambda x: {"location": x["location"], "type": x["type"]})
            # 条件分支:根据查询类型选择数据获取路径
            | RunnableBranch(
                # 分支1:查询类型包含"天气" → 并行获取天气+景点信息
                (lambda x: "天气" in x["type"], data_acquisition),
                # 默认分支:仅查询景点 → 只获取本地景点信息
                lambda x: {"location": x["location"], "attraction": attraction_retrieval.invoke(x)}
            )
            # 最后一步:生成最终回答
            | generate_module
        )

    def process_user_question(self, user_question):
        """
        处理用户提问并返回最终回答
        入参:
        - user_question: str,用户的自然语言提问
        返回:
        - str,AI生成的回答内容
        """
        input_data = {"user_question": user_question}
        # 调用Runnable管道的invoke方法,执行全流程
        # 实际项目中建议添加try-except异常处理,捕获API调用失败、解析错误等情况
        response = self.travel_qa_pipeline.invoke(input_data)
        return response


# 示例用法
if __name__ == "__main__":
    # 从环境变量读取API密钥
    OPENAI_API_KEY = os.getenv("DASHSCOPE_API_KEY")
    TAVILY_API_KEY = os.getenv("TAVILY_API_KEY")  # 原代码变量名serpapi_api_key已修正
    # 本地嵌入模型路径,替换为你自己的实际路径
    embed_path = r"D:\LLM\Local_model\BAAI\bge-large-zh-v1___5"

    # 初始化旅游问答系统
    travel_qa = TravelQASystem(OPENAI_API_KEY, TAVILY_API_KEY, embed_path)
    # 构建Runnable流程管道
    travel_qa.setup_runnable_pipeline()

    # 测试用例1:查询天气+景点建议(触发并行数据获取分支)
    question1 = "今天故宫的天气怎么样?"
    answer1 = travel_qa.process_user_question(question1)
    print(f"User Question: {question1}\nAI Answer: {answer1}\n")

    # 测试用例2:仅查询景点介绍(触发默认分支)
    # question2 = "颐和园有什么好玩的?"
    # answer2 = travel_qa.process_user_question(question2)
    # print(f"User Question: {question2}\nAI Answer: {answer2}\n")

多个参数

上方使用的invoke来调用的大模型,它只能传递一个参数,如果要传递多个参数就要使用batch,如下图,传递了input_list这样的一个数组当入参,然后返回的数据也是一个列表


from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from dotenv import load_dotenv
import os

load_dotenv()


# 创建模型实例
template = PromptTemplate(
    input_variables=["role", "fruit"],
    template="{role}喜欢吃{fruit}?",
)

# 创建LLM
llm = ChatOpenAI(api_key=os.getenv("DASHSCOPE_API_KEY"),
                 base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
                 model='qwen-plus',
                 temperature=0)


llm_chain = template | llm

# 输入列表
input_list = [
    {"role": "猪八戒", "fruit": "人参果"}, {"role": "孙悟空", "fruit": "仙桃"}
]

# 调用LLMChain,返回结果
result = llm_chain.batch(input_list)
print("第一个============================================================")
print(result[0].content)
print("第二个============================================================")
print(result[1].content)


img

Logo

汇聚全球AI编程工具,助力开发者即刻编程。

更多推荐