LazyLLM黑科技 | 一条 pipeline 为什么能自己把服务全拉起来?聊聊 Flow 的设计之道
LazyLLM黑科技 | 一条 pipeline 为什么能自己把服务全拉起来?聊聊 Flow 的设计之道
很多 AI 应用在原型阶段都很顺:模型能调通,RAG 能跑起来,Agent 也能串起来。但一旦进入工程化部署,问题就变了。
真正麻烦的,往往不是“某个模块能不能工作”,而是:
- 多个模块怎么在同一条链路里协同;
- 下游步骤如何拿到原始输入和上游中间结果;
- 一条 pipeline 里哪些模块需要部署,谁来启动,URL 怎么回填;
- tracing、hook、状态管理,怎么和业务流程放在一起。
这正是 LazyLLM 设计 Flow 组件时要解决的问题。它不是只想做一个“更好写”的 pipeline,而是希望把算法编排和工程部署放进同一个抽象里。
遇到的问题,也是难点所在
AI 应用开发里,最常见的难题其实集中在两件事上。
1. 数据流不是简单的 A -> B -> C。
以 RAG 为例,Retriever 负责召回,Reranker 既要拿到召回结果,也要拿到原始 query,最后的 Formatter 和 LLM 还要继续消费这些上下文。只靠普通函数串联,很快就会堆出大量临时变量和胶水代码。
2. 部署关系和调用关系是缠在一起的。
一条数据管道里可能同时有文档解析、Embedding、Reranker、LLM 推理模块。它们不仅有数据依赖,还有服务启动依赖。如果还要开发者手动逐个启动服务、配置 URL,再把地址填回主流程,工程成本会迅速失控。
所以,真正难的不是“把函数连起来”,而是让一条 AI 链路既能表达数据流,又能表达部署关系。
LazyLLM 的解决方案
LazyLLM 的思路很直接:
- 用
Flow描述应用执行拓扑; - 用
Bind解决跨步骤参数引用和透传; - 再把 Flow 接入 Module 体系,让模块发现、部署启动、服务发现一起成立。
一个典型的 RAG 写法大致是这样:
import lazyllm
from lazyllm import pipeline, parallel, bind, Document, Retriever, Reranker, SentenceSplitter
prompt = (
'你是一个问答助手,请基于给定上下文回答问题。\n'
'已知上下文:{context_str}\n'
'问题:{query}\n'
'回答:'
)
embed_model = lazyllm.TrainableModule('bge-m3')
rerank_model = lazyllm.TrainableModule('bge-reranker-large')
llm = lazyllm.TrainableModule('Qwen2.5-32B-Instruct').deploy_method(lazyllm.deploy.vllm)
documents = Document(dataset_path='rag_master', embed=embed_model, manager=False)
documents.create_node_group(name='sentences', transform=SentenceSplitter, chunk_size=1024, chunk_overlap=100)
with pipeline() as rag:
with parallel().sum as rag.retrieve:
rag.retrieve.vec = Retriever(documents, group_name='sentences', similarity='cosine', topk=3)
rag.retrieve.bm25 = Retriever(documents, 'CoarseChunk', 'bm25_chinese', 0.003, topk=3)
rag.rerank = Reranker(
'ModuleReranker', model=rerank_model, topk=1, output_format='content', join=True
) | bind(query=rag.input)
rag.formatter = (lambda nodes, query: {'context_str': nodes, 'query': query}) | bind(query=rag.input)
rag.answer = llm.prompt(lazyllm.ChatPrompter(prompt, extra_keys=['context_str']))
这段代码里最重要的不是语法本身,而是它背后的设计:
pipeline和parallel负责组织数据流;bind(query=rag.input)让原始 query 能跨步骤透传;TrainableModule、Document、Reranker这些模块既参与算法执行,也能进入后续部署链路。
也就是说,Flow 在 LazyLLM 里解决的不只是“怎么传数据”,还解决了“怎么让这条链路变成可部署系统”。
技术剖析一:LazyLLM 如何在 pipeline 里发现并部署所有模块
这一块可以压缩成一条非常清楚的源码链路:
pipeline.start()并不会直接启动 Flow,而是被改写成ActionModule(self).start()。ActionModule.submodules会递归遍历整个 Flow,把里面所有ModuleBase类型的节点找出来。ModuleBase._update(mode=['server'])会对这些子模块做 DFS,逐个调用_get_deploy_tasks(),收集部署任务。- 框架统一执行这些部署任务,并由各模块在完成部署后回填 URL。
这里最关键的设计点有两个。
第一,ActionModule 让 Flow 接上了 Module 系统。
一旦 Flow 被包装成 ActionModule,它就不再只是执行结构,而是一棵可以递归扫描的模块树。
这件事在源码里非常直接:
def flow_start(self):
# 关键:Flow 不自己部署,而是先包装成 ActionModule
ActionModule(self).start()
return self
# 把 Flow.start 重定向到上面的逻辑
lazyllm.ReprRule.add_rule('Module', 'Action', 'Flow')
lazyllm.LazyLLMFlowsBase.start = flow_start
第二,具体怎么部署,由模块自己定义。
例如:
TrainableModule._get_deploy_tasks()会返回一个小的部署 pipeline;ServerModule._get_deploy_tasks()也会返回自己的部署 pipeline;- 这些 pipeline 的最后一步,都会把部署出来的 URL 写回模块对象。
于是,LazyLLM 的工程闭环就成立了:
Flow 负责描述拓扑,ActionModule 负责发现模块,ModuleBase 负责调度部署,模块自身负责声明部署方式。
而 ActionModule 是如何发现 Flow 里的模块的?关键就在 submodules:
class ActionModule(ModuleBase):
def __init__(self, *action, return_trace=False):
super().__init__(return_trace=return_trace)
...
self.action = action
def forward(self, *args, **kw):
return self.action(*args, **kw)
@property
def submodules(self):
# 递归扫描整个 Flow,把所有 ModuleBase 节点提取出来
if isinstance(self.action, FlowBase):
submodule = []
self.action.for_each(lambda x: isinstance(x, ModuleBase), lambda x: submodule.append(x))
return submodule
return super().submodules
这段代码的意思很简单:Flow 本身不直接维护一张“待部署模块表”,而是在需要启动时递归扫描整个 Flow,把其中所有 ModuleBase 节点提取出来。
真正执行部署调度的,则是 ModuleBase._update():
def _update(self, *, mode: Optional[Union[str, List[str]]] = None, recursive: bool = True):
...
stack, visited = [(self, iter(self.submodules if recursive else []))], set()
while len(stack) > 0:
try:
top = next(stack[-1][1])
stack.append((top, iter(top.submodules)))
except StopIteration:
top = stack.pop()[0]
if top._module_id in visited: continue
visited.add(top._module_id)
# 核心:逐个模块收集部署任务
if 'server' in mode: deploy_tasks.absorb(top._get_deploy_tasks())
# 收集完之后统一执行
if 'server' in mode and len(deploy_tasks) > 0:
if redis_client:
Parallel(*deploy_tasks).set_sync(False)()
else:
Parallel.sequential(*deploy_tasks)()
return self
可以看到,框架真正做的事情并不复杂:DFS 遍历模块树,逐个调用 _get_deploy_tasks(),最后统一执行。
以 TrainableModule 为例,它自己返回的部署任务就是一段小的 deployment pipeline:
@lazyllm.once_wrapper
def _get_deploy_tasks(self):
if self._deploy is None: return None
...
self._deployer = self._deploy(**kwargs, **self._deploy_args)
def before_deploy(*no_use_args):
# 准备模型路径和部署输入
target_path = ...
return lazyllm.package(target_path, self._base_model)
# 返回一段部署流水线,最后把 URL 写回模块
return Pipeline(before_deploy, self._prepare_deploy, self._deployer, self._set_url)
这也是 LazyLLM 很关键的一点:部署逻辑不是写死在 Flow 里的,而是由每个模块自己声明,Flow 只负责把它们发现出来并组织调度。
技术剖析二:Bind 为什么能把参数透传做得既灵活又可控
Bind 的关键,不是“提前把值塞进去”,而是先描述“这个参数应该从哪里取”。
在 LazyLLM 里,p.input、p.kwargs['x']、p.output('step1') 这些写法,本质上都会被表示成一份引用信息,包括:
- 来自哪个 pipeline;
- 目标是
input、kwargs还是某个节点输出; - 是否还要继续做下标、属性访问或
unpack。
这部分源码也很直观:
class Args(object):
class _None: pass
class Unpack(package): pass
def __init__(self, source_id: str, target_id: str = 'input', *, unpack: bool = False):
# source_id: 来自哪个 pipeline
# target_id: 取 input / kwargs / 某个节点输出
self._item_key, self._attr_key = Bind.Args._None, Bind.Args._None
self._source_id, self._target_id = source_id, target_id
self._unpack = unpack
def __getitem__(self, key: str):
# 支持 p.input['query']
self._item_key = key
return self
def __getattr__(self, key: str):
# 支持 p.output('step1').field
self._attr_key = key
return self
等到 pipeline 真正运行时,Pipeline._run() 会维护一份 bind_args_source,把:
- 当前输入;
- 当前 kwargs;
- 每一步执行后的输出;
都记录下来。下游节点执行 Bind.__call__() 时,再按这份记录去解析参数。
最终取值发生在 get_arg():
def get_arg(self, source):
# 如果当前上下文没有,就去保存过的 bind_args 里回溯
if not source or source['source'] != self._source_id:
if self._source_id in locals['bind_args']:
source = locals['bind_args'][self._source_id]
else:
raise RuntimeError('Unable to find the bound parameter...')
input = result = source[self._target_id]
# 继续做下标 / 属性 / unpack 解析
if self._item_key is not Bind.Args._None: result = input[self._item_key]
elif self._attr_key is not Bind.Args._None: result = getattr(input, self._attr_key)
if self._unpack and isinstance(result, package): result = Bind.Args.Unpack(result)
return result
这套设计有两个好处:
- 直接成员绑定足够自然,写起来就像普通 Python;
- 嵌套 Flow 默认不会无限制穿透外层上下文,边界是清楚的。
如果确实需要在嵌套场景下引用外层输入或中间结果,LazyLLM 还提供了 save_pipeline_result(),显式把外层 pipeline 的绑定上下文保存起来。这样既保留了灵活性,也避免了参数来源失控。
总结
如果只把 LazyLLM 的 Flow 看成一个更好写的 pipeline,就低估它了。
它真正解决的是:当 AI 应用从 Demo 走向工程化部署时,如何把数据流编排、参数透传、模块发现、服务启动和服务发现,统一进同一个抽象里。
LazyLLM 给出的答案是:
- 用
Flow组织算法链路; - 用
Bind组织参数关系; - 用
ActionModule + ModuleBase把整条链路接入部署体系。
所以,Flow 在 LazyLLM 里从来不只是“数据管道”,而是 AI 应用从原型走向工程系统的一座桥。
欢迎升级体验 LazyLLM 最新版本,请大家去 github 上点一个免费的 star,支持一下~技术讨论欢迎关注 “LazyLLM” gzh!
LazyLLM 项目仓库链接🔗:
更多推荐




所有评论(0)