LazyLLM黑科技 | 一条 pipeline 为什么能自己把服务全拉起来?聊聊 Flow 的设计之道

很多 AI 应用在原型阶段都很顺:模型能调通,RAG 能跑起来,Agent 也能串起来。但一旦进入工程化部署,问题就变了。

真正麻烦的,往往不是“某个模块能不能工作”,而是:

  • 多个模块怎么在同一条链路里协同;
  • 下游步骤如何拿到原始输入和上游中间结果;
  • 一条 pipeline 里哪些模块需要部署,谁来启动,URL 怎么回填;
  • tracing、hook、状态管理,怎么和业务流程放在一起。

这正是 LazyLLM 设计 Flow 组件时要解决的问题。它不是只想做一个“更好写”的 pipeline,而是希望把算法编排和工程部署放进同一个抽象里。

遇到的问题,也是难点所在

AI 应用开发里,最常见的难题其实集中在两件事上。

1. 数据流不是简单的 A -> B -> C

以 RAG 为例,Retriever 负责召回,Reranker 既要拿到召回结果,也要拿到原始 query,最后的 Formatter 和 LLM 还要继续消费这些上下文。只靠普通函数串联,很快就会堆出大量临时变量和胶水代码。

2. 部署关系和调用关系是缠在一起的。

一条数据管道里可能同时有文档解析、Embedding、Reranker、LLM 推理模块。它们不仅有数据依赖,还有服务启动依赖。如果还要开发者手动逐个启动服务、配置 URL,再把地址填回主流程,工程成本会迅速失控。

所以,真正难的不是“把函数连起来”,而是让一条 AI 链路既能表达数据流,又能表达部署关系

LazyLLM 的解决方案

LazyLLM 的思路很直接:

  • Flow 描述应用执行拓扑;
  • Bind 解决跨步骤参数引用和透传;
  • 再把 Flow 接入 Module 体系,让模块发现、部署启动、服务发现一起成立。

一个典型的 RAG 写法大致是这样:

import lazyllm
from lazyllm import pipeline, parallel, bind, Document, Retriever, Reranker, SentenceSplitter

prompt = (
    '你是一个问答助手,请基于给定上下文回答问题。\n'
    '已知上下文:{context_str}\n'
    '问题:{query}\n'
    '回答:'
)

embed_model = lazyllm.TrainableModule('bge-m3')
rerank_model = lazyllm.TrainableModule('bge-reranker-large')
llm = lazyllm.TrainableModule('Qwen2.5-32B-Instruct').deploy_method(lazyllm.deploy.vllm)

documents = Document(dataset_path='rag_master', embed=embed_model, manager=False)
documents.create_node_group(name='sentences', transform=SentenceSplitter, chunk_size=1024, chunk_overlap=100)

with pipeline() as rag:
    with parallel().sum as rag.retrieve:
        rag.retrieve.vec = Retriever(documents, group_name='sentences', similarity='cosine', topk=3)
        rag.retrieve.bm25 = Retriever(documents, 'CoarseChunk', 'bm25_chinese', 0.003, topk=3)

    rag.rerank = Reranker(
        'ModuleReranker', model=rerank_model, topk=1, output_format='content', join=True
    ) | bind(query=rag.input)

    rag.formatter = (lambda nodes, query: {'context_str': nodes, 'query': query}) | bind(query=rag.input)

    rag.answer = llm.prompt(lazyllm.ChatPrompter(prompt, extra_keys=['context_str']))



这段代码里最重要的不是语法本身,而是它背后的设计:

  • pipelineparallel 负责组织数据流;
  • bind(query=rag.input) 让原始 query 能跨步骤透传;
  • TrainableModuleDocumentReranker 这些模块既参与算法执行,也能进入后续部署链路。

也就是说,Flow 在 LazyLLM 里解决的不只是“怎么传数据”,还解决了“怎么让这条链路变成可部署系统”。

技术剖析一:LazyLLM 如何在 pipeline 里发现并部署所有模块

这一块可以压缩成一条非常清楚的源码链路:

  1. pipeline.start() 并不会直接启动 Flow,而是被改写成 ActionModule(self).start()
  2. ActionModule.submodules 会递归遍历整个 Flow,把里面所有 ModuleBase 类型的节点找出来。
  3. ModuleBase._update(mode=['server']) 会对这些子模块做 DFS,逐个调用 _get_deploy_tasks(),收集部署任务。
  4. 框架统一执行这些部署任务,并由各模块在完成部署后回填 URL。

这里最关键的设计点有两个。

第一,ActionModule 让 Flow 接上了 Module 系统。
一旦 Flow 被包装成 ActionModule,它就不再只是执行结构,而是一棵可以递归扫描的模块树。

这件事在源码里非常直接:

def flow_start(self):
    # 关键:Flow 不自己部署,而是先包装成 ActionModule
    ActionModule(self).start()
    return self

# 把 Flow.start 重定向到上面的逻辑
lazyllm.ReprRule.add_rule('Module', 'Action', 'Flow')
lazyllm.LazyLLMFlowsBase.start = flow_start



第二,具体怎么部署,由模块自己定义。
例如:

  • TrainableModule._get_deploy_tasks() 会返回一个小的部署 pipeline;
  • ServerModule._get_deploy_tasks() 也会返回自己的部署 pipeline;
  • 这些 pipeline 的最后一步,都会把部署出来的 URL 写回模块对象。

于是,LazyLLM 的工程闭环就成立了:
Flow 负责描述拓扑,ActionModule 负责发现模块,ModuleBase 负责调度部署,模块自身负责声明部署方式。

ActionModule 是如何发现 Flow 里的模块的?关键就在 submodules

class ActionModule(ModuleBase):
    def __init__(self, *action, return_trace=False):
        super().__init__(return_trace=return_trace)
        ...
        self.action = action

    def forward(self, *args, **kw):
        return self.action(*args, **kw)

    @property
    def submodules(self):
        # 递归扫描整个 Flow,把所有 ModuleBase 节点提取出来
        if isinstance(self.action, FlowBase):
            submodule = []
            self.action.for_each(lambda x: isinstance(x, ModuleBase), lambda x: submodule.append(x))
            return submodule
        return super().submodules



这段代码的意思很简单:Flow 本身不直接维护一张“待部署模块表”,而是在需要启动时递归扫描整个 Flow,把其中所有 ModuleBase 节点提取出来。

真正执行部署调度的,则是 ModuleBase._update()

def _update(self, *, mode: Optional[Union[str, List[str]]] = None, recursive: bool = True):
    ...
    stack, visited = [(self, iter(self.submodules if recursive else []))], set()
    while len(stack) > 0:
        try:
            top = next(stack[-1][1])
            stack.append((top, iter(top.submodules)))
        except StopIteration:
            top = stack.pop()[0]
            if top._module_id in visited: continue
            visited.add(top._module_id)
            # 核心:逐个模块收集部署任务
            if 'server' in mode: deploy_tasks.absorb(top._get_deploy_tasks())

    # 收集完之后统一执行
    if 'server' in mode and len(deploy_tasks) > 0:
        if redis_client:
            Parallel(*deploy_tasks).set_sync(False)()
        else:
            Parallel.sequential(*deploy_tasks)()
    return self



可以看到,框架真正做的事情并不复杂:DFS 遍历模块树,逐个调用 _get_deploy_tasks(),最后统一执行。

TrainableModule 为例,它自己返回的部署任务就是一段小的 deployment pipeline:

@lazyllm.once_wrapper
def _get_deploy_tasks(self):
    if self._deploy is None: return None
    ...
    self._deployer = self._deploy(**kwargs, **self._deploy_args)

    def before_deploy(*no_use_args):
        # 准备模型路径和部署输入
        target_path = ...
        return lazyllm.package(target_path, self._base_model)

    # 返回一段部署流水线,最后把 URL 写回模块
    return Pipeline(before_deploy, self._prepare_deploy, self._deployer, self._set_url)



这也是 LazyLLM 很关键的一点:部署逻辑不是写死在 Flow 里的,而是由每个模块自己声明,Flow 只负责把它们发现出来并组织调度。

技术剖析二:Bind 为什么能把参数透传做得既灵活又可控

Bind 的关键,不是“提前把值塞进去”,而是先描述“这个参数应该从哪里取”。

在 LazyLLM 里,p.inputp.kwargs['x']p.output('step1') 这些写法,本质上都会被表示成一份引用信息,包括:

  • 来自哪个 pipeline;
  • 目标是 inputkwargs 还是某个节点输出;
  • 是否还要继续做下标、属性访问或 unpack

这部分源码也很直观:

class Args(object):
    class _None: pass
    class Unpack(package): pass

    def __init__(self, source_id: str, target_id: str = 'input', *, unpack: bool = False):
        # source_id: 来自哪个 pipeline
        # target_id: 取 input / kwargs / 某个节点输出
        self._item_key, self._attr_key = Bind.Args._None, Bind.Args._None
        self._source_id, self._target_id = source_id, target_id
        self._unpack = unpack

    def __getitem__(self, key: str):
        # 支持 p.input['query']
        self._item_key = key
        return self

    def __getattr__(self, key: str):
        # 支持 p.output('step1').field
        self._attr_key = key
        return self



等到 pipeline 真正运行时,Pipeline._run() 会维护一份 bind_args_source,把:

  • 当前输入;
  • 当前 kwargs;
  • 每一步执行后的输出;

都记录下来。下游节点执行 Bind.__call__() 时,再按这份记录去解析参数。

最终取值发生在 get_arg()

def get_arg(self, source):
    # 如果当前上下文没有,就去保存过的 bind_args 里回溯
    if not source or source['source'] != self._source_id:
        if self._source_id in locals['bind_args']:
            source = locals['bind_args'][self._source_id]
        else:
            raise RuntimeError('Unable to find the bound parameter...')

    input = result = source[self._target_id]

    # 继续做下标 / 属性 / unpack 解析
    if self._item_key is not Bind.Args._None: result = input[self._item_key]
    elif self._attr_key is not Bind.Args._None: result = getattr(input, self._attr_key)
    if self._unpack and isinstance(result, package): result = Bind.Args.Unpack(result)
    return result



这套设计有两个好处:

  • 直接成员绑定足够自然,写起来就像普通 Python;
  • 嵌套 Flow 默认不会无限制穿透外层上下文,边界是清楚的。

如果确实需要在嵌套场景下引用外层输入或中间结果,LazyLLM 还提供了 save_pipeline_result(),显式把外层 pipeline 的绑定上下文保存起来。这样既保留了灵活性,也避免了参数来源失控。

总结

如果只把 LazyLLM 的 Flow 看成一个更好写的 pipeline,就低估它了。

它真正解决的是:当 AI 应用从 Demo 走向工程化部署时,如何把数据流编排、参数透传、模块发现、服务启动和服务发现,统一进同一个抽象里。

LazyLLM 给出的答案是:

  • Flow 组织算法链路;
  • Bind 组织参数关系;
  • ActionModule + ModuleBase 把整条链路接入部署体系。

所以,Flow 在 LazyLLM 里从来不只是“数据管道”,而是 AI 应用从原型走向工程系统的一座桥。

欢迎升级体验 LazyLLM 最新版本,请大家去 github 上点一个免费的 star,支持一下~技术讨论欢迎关注 “LazyLLM” gzh!

LazyLLM 项目仓库链接🔗:

Logo

汇聚全球AI编程工具,助力开发者即刻编程。

更多推荐