概述

工作流(workflow)的一个关键特征是它们支持分支和循环逻辑,比基于图形的方法更简单、更灵活。

本文实现了一个工作流,该工作流会根据工作流结点的结果值来判断和选择要执行的后续工作流结点。从而能够更好的控制工作的执行过程,实现更加强大的业务逻辑。

实现逻辑

我们创建了 4 个新的事件类型。 start 随机决定采用一个分支或另一个分支,然后每个分支中的多个步骤完成工作流程。

当然,您可以按任意顺序组合分支和循环,以满足您的应用程序的需求。

实现代码

1.定义start函数,这是工作流的起点。在其中产生随机数,根据随机数的值来选择不通的执行分支。

2.定义后续的执行步骤,并根据业务逻辑来定义不通结点的事件结果。


from llama_index.core.workflow import (
    Event,
    StartEvent,
    StopEvent,
    Workflow,
    step,
)
​
from llama_index.core.memory import VectorMemory
from llama_index.core import SimpleDirectoryReader, VectorStoreIndex, Settings
from llama_index.llms.ollama import Ollama
​
import asyncio
import random
​
# 使用本地大模型
Settings.llm = Ollama(model="llama3.2", request_timeout=360)
​
# 定义分支处理类
class BranchA1Event(Event):
    payload: str
​
class BranchA2Event(Event):
    payload: str
​
class BranchB1Event(Event):
    payload: str
​
class BranchB2Event(Event):
    payload: str
​
​
# 主类
class BranchWorkflow(Workflow):
    @step
    async def start(self, ev: StartEvent) -> BranchA1Event | BranchB1Event:
        if random.randint(0, 1) == 0:
            print("Go to branch A")
            return BranchA1Event(payload="Branch A")
        else:
            print("Go to branch B")
            return BranchB1Event(payload="Branch B")
​
    @step
    async def step_a1(self, ev: BranchA1Event) -> BranchA2Event:
        print("a1: " + ev.payload)
        return BranchA2Event(payload=ev.payload)
​
    @step
    async def step_b1(self, ev: BranchB1Event) -> BranchB2Event:
        print("b1: " + ev.payload)
        return BranchB2Event(payload=ev.payload)
​
    @step
    async def step_a2(self, ev: BranchA2Event) -> StopEvent:
        print("a2: " + ev.payload)
        return StopEvent(result="Branch A complete.")
​
    @step
    async def step_b2(self, ev: BranchB2Event) -> StopEvent:
        print("b2" + ev.payload)
        return StopEvent(result="Branch B complete.")
​
# 画出工作流图形
from llama_index.utils.workflow import draw_all_possible_flows
draw_all_possible_flows(BranchWorkflow, filename="workflow_branch.html")
​
async def main():
    w = BranchWorkflow(timeout=10, verbose=False)
    result = await w.run(first_input="Start the workflow.")
    print(result)
​
# run main
if __name__ == '__main__':
    asyncio.run(main())

通过以上代码画出来的工作流逻辑如下图所示:

小结

本文实现了一个worflow的分支处理结构。根据本文的例子进行修改,可以满足具有分支业务处理逻辑的需要。从而让工作流更加灵活。

Logo

汇聚全球AI编程工具,助力开发者即刻编程。

更多推荐