S08 后台任务详解笔记

基于 s08_background_tasks.py 源码逐行分析,配合 s08-background-tasks.md 设计思路。


一、问题:同步执行阻塞一切

前面 7 章的 agent 有一个共同假设:执行命令时,agent 等着它完成,然后才继续run_bash 里的 subprocess.run 是同步的——Python 线程会冻结,直到命令返回或超时。

这对快速命令(lscat)没问题。但如果是 pip install torch(下载 2GB,编译 10 分钟)呢?agent 什么都不能做,只能干等。120 秒超时后,命令被杀死,任务失败。

更糟的是:很多任务天然适合并行。"装依赖"和"写代码"是可以同时进行的——但同步执行强制它们串行。

二、解决方案:后台线程 + 通知队列

s08 的方案:让长时间命令在后台线程中运行,主循环不阻塞。命令完成后,结果通过通知队列在下一次 API 调用前注入到对话中。

主线程                       后台线程
+-----------------+        +-----------------+
| agent loop      |        | pip install ... |
| ...             |        | (跑了 8 分钟)    |
| 读 inbox        |        | ...             |
| [LLM call] <----+------- | enqueue(result) |
|  ^drain queue   |        +-----------------+
+-----------------+

模型视角:发起后台任务 → 继续做其他事 → 过几轮收到 “[bg:abc123] completed: Successfully installed torch…”。


三、和 s07 相比,多了什么?

组件 s07 s08
执行模型 同步阻塞 + 后台线程(非阻塞)
新工具 task_* (4个) background_runcheck_background
并发 threading.Thread
agent_loop 简单循环 + 通知队列排空

四、BackgroundManager:线程池 + 通知队列

4.1 数据结构

class BackgroundManager:
    def __init__(self):
        self.tasks = {}                  # task_id → {status, result, command}
        self._notification_queue = []    # 已完成任务的结果
        self._lock = threading.Lock()    # 保护 notification_queue 的线程锁

三个字段:tasks 字典追踪所有后台任务的状态,_notification_queue 是已完成任务的"待投递"列表,_lock 确保多线程同时写队列时不会数据错乱。

初学者注意threading.Lock() 是 Python 的互斥锁。多个线程同时访问 _notification_queue 时,with self._lock: 确保同一时刻只有一个线程在操作它——否则可能出现两个线程同时 append,导致一个结果丢失。

4.2 run() — 启动后台任务

def run(self, command: str) -> str:
    task_id = str(uuid.uuid4())[:8]      # 生成 8 位随机 ID
    self.tasks[task_id] = {"status": "running", "result": None, "command": command}
    thread = threading.Thread(
        target=self._execute, args=(task_id, command), daemon=True
    )
    thread.start()                        # 启动线程后立刻返回
    return f"Background task {task_id} started: {command[:80]}"

daemon=True 是关键:守护线程在主程序退出时自动被杀,不会阻止程序关闭。如果忘了设 daemon=True,主程序退出后这些线程可能变成僵尸进程。

uuid.uuid4()[:8]:UUID4 是随机生成的唯一标识符,取前 8 位足够区分任务(碰撞概率极低),而且比完整 UUID 好读。

4.3 _execute() — 线程真正干活的函数

def _execute(self, task_id: str, command: str):
    try:
        r = subprocess.run(
            command, shell=True, cwd=WORKDIR,
            capture_output=True, text=True, timeout=300   # ← 5 分钟超时
        )
        output = (r.stdout + r.stderr).strip()[:50000]
        status = "completed"
    except subprocess.TimeoutExpired:
        output = "Error: Timeout (300s)"
        status = "timeout"
    except Exception as e:
        output = f"Error: {e}"
        status = "error"

    # 更新状态
    self.tasks[task_id]["status"] = status
    self.tasks[task_id]["result"] = output or "(no output)"

    # 入队通知
    with self._lock:
        self._notification_queue.append({
            "task_id": task_id, "status": status,
            "command": command[:80], "result": (output or "(no output)")[:500],
        })

注意 timeout=300——后台任务给了 5 分钟,是同步 bash 的 2.5 倍。因为后台任务预期就是长任务(装包、跑测试套件),短超时没有意义。

通知中 result[:500] 截断到 500 字符——这和 micro_compact 是同一思路:模型不需要看完整的 50000 字符安装日志,500 字符的尾部通常包含成功/失败的关键信息。

4.4 check() — 查询任务状态

def check(self, task_id: str = None) -> str:
    if task_id:
        t = self.tasks.get(task_id)
        if not t:
            return f"Error: Unknown task {task_id}"
        return f"[{t['status']}] {t['command'][:60]}\n{t.get('result') or '(running)'}"
    # 不传 task_id → 列出所有
    lines = []
    for tid, t in self.tasks.items():
        lines.append(f"{tid}: [{t['status']}] {t['command'][:60]}")
    return "\n".join(lines) if lines else "No background tasks."

check() 不是必须的——通知队列会自动投递完成结果。但模型可能想知道"那个安装 pytorch 的任务跑到哪了?",所以提供了主动查询的接口。

4.5 drain_notifications() — 通知队列的排空

def drain_notifications(self) -> list:
    with self._lock:
        notifs = list(self._notification_queue)
        self._notification_queue.clear()
    return notifs

每次 API 调用前,agent_loop 调一次 drain_notifications()。取走所有积压通知并清空队列——每个通知只投递一次。这是"消费-清除"模式,确保模型不会重复看到同一条通知。


五、agent_loop 的改造

def agent_loop(messages: list):
    while True:
        # 排空通知队列,注入到对话中
        notifs = BG.drain_notifications()
        if notifs and messages:
            notif_text = "\n".join(
                f"[bg:{n['task_id']}] {n['status']}: {n['result']}"
                for n in notifs
            )
            messages.append({
                "role": "user",
                "content": f"<background-results>\n{notif_text}\n</background-results>"
            })

        response = client.messages.create(...)
        # ...后续流程不变...

通知以 <background-results> 标签包裹——和 <reminder><skill> 一样的语义标记模式。模型看到这个标签就知道"这是后台任务的结果,不是我或用户说的话"。

注意注入的时机:在 API 调用之前,不是在工具执行之后。这意味着模型在每轮推理时都能看到最新完成的后台任务——不需要等到下一轮。


六、完整流程走读

假设用户说:“装 pytorch,同时在后台跑,我先写代码。”

第 1 轮

模型调用 background_run("pip install torch") → 返回 "Background task a1b2c3d4 started: pip install torch"。后台线程启动,开始下载和安装。

第 2-5 轮

模型继续工作(写代码、读文件),每轮 API 调用前 drain_notifications() 检查队列——目前还是空的(安装还在进行中)。

第 6 轮

安装完成。_execute 把结果入队。下一轮 API 调用前:

messages.append({
    "role": "user",
    "content": "<background-results>\n[bg:a1b2c3d4] completed: Successfully installed torch-2.5.0...\n</background-results>"
})

模型看到 pytorch 装好了,继续后续工作。


七、设计洞察

7.1 异步不改变循环结构

agent_loop 的 while 循环还是那个 while 循环。后台执行只是把 subprocess.run 从"阻塞当前线程"变成"在另一个线程里跑"——执行模型变了,控制流没变。和前面每一章一样:扩展能力 = 加工具 + 改 handler,循环不动。

7.2 通知队列是解耦层

后台线程不知道 agent_loop 的存在,它只往队列里塞数据。agent_loop 不知道哪些线程在跑,它只从队列里取数据。队列是生产者和消费者之间唯一的耦合点。 这种解耦意味着以后可以换执行方式(进程池、远程执行)而不影响循环逻辑。

7.3 结果截断是必要的仁慈

通知中 result[:500] 截断到 500 字符——和 s06 的 micro_compact 是一脉相承的设计。安装日志可能有 50000 字符,但 99% 的进度条输出对模型毫无意义。只保留尾部(通常包含成功/失败信息),是对上下文空间的尊重。

Logo

汇聚全球AI编程工具,助力开发者即刻编程。

更多推荐