roc_open

与 shell_exec 等函数不同,proc_open 是创建进程的丰富工具。PHP 核心甚至为它引入了特殊的"hack"来正确处理管道。管道是进程间通信的最佳方式之一,也是最便捷的方式。唯一更好的方案是共享内存加文件事件,这仅仅是因为内存区域位于操作系统内核之外。

为什么管道方便?因为它们"假装"是文件。可以像普通文件一样用 fread 读取、fwrite 写入。这对开发者来说非常有价值,因为代码变得更通用、更可移植。

通信协议

管道是字节流,它不知道消息的概念,只是简单地在父子进程间传输字节。为了让代码正常工作,需要一种方式来界定消息边界。最简单的格式是 NDJSON:每条消息占一行,行与行之间用 \n 分隔。

{"id":1,"value":42}
{"id":2,"value":17}
{"id":3,"value":88}

为什么这种格式很合适?因为 PHP 对 JSON 支持很好,而且 PHP 有 fgets 函数可以从流中读取行。

$line = fgets(STDIN);          // 读取到 \n 为止
$task = json_decode($line);    // 解析 JSON

下面编写普通同步代码,打开进程并通过 NDJSON 协议与之通信:

function run_worker(string $script, array $tasks): void
{
    $process = proc_open(
        [PHP_BINARY, $script],
        [
            0 => ['pipe', 'r'],   // 子进程 STDIN
            1 => ['pipe', 'w'],   // 子进程 STDOUT
            2 => STDERR,
        ],
        $pipes
    );

    foreach ($tasks as $task) {
        // 发送任务
        fwrite($pipes[0], json_encode($task) . "\n");

        // 等待响应 ← PHP 在这里阻塞,直到读取到一行
        $line   = fgets($pipes[1]);
        $result = json_decode(trim($line), true);

        echo "Task #{$result['task_id']}: {$result['result']}\n";
    }

    fclose($pipes[0]);   // 关闭 STDIN → worker.php 将退出
    fclose($pipes[1]);
    proc_close($process);
}

run_worker('worker.php', [
    ['id' => 1, 'value' => 42],
    ['id' => 2, 'value' => 17],
    ['id' => 3, 'value' => 88],
]);

现在把这段代码放到协程里,不是放一个,而是放 10 个。

$tasks = [
    ['id' => 1, 'value' => 42],
    ['id' => 2, 'value' => 17],
    // ... 还有 28 个任务
];

for ($i = 0; $i < 10; $i++) {
    spawn(run_worker(...), 'worker.php', $tasks);
}

只写了一行异步代码:spawn(run_worker(...)),但实际上 proc_openfgets 和 fclose 从一开始就变成了非阻塞 I/O 操作。现在十个协程(实际上是 11 个,因为有主代码)竞争 CPU 时间。大部分代码仍保持顺序执行。

Channels

目前每个协程都收到相同的任务数组,这显然不是期望的行为。例如,可能想从 Twig 模板生成静态站点的 HTML,给定文件列表后让每个文件由独立的 worker 处理;或者可能需要解析日志。

Workers 相互独立运行,因为工作量可能不同,而且不知道 worker 是否忙碌。为此编写任务分发层相当复杂,也不理想。需要一个快速、便捷的解决方案,无需过多思考。

为此,TrueAsync 提供了 Channel,正是为这类任务设计的。Channel 是一个队列,用于在协程间交换数据。Channel 中的每条消息可以是一个任务。协程从同一个 Channel 读取并执行任务。无需检查 worker 是否忙碌,只需把任务发到 Channel,worker 准备好时就会执行。

$taskQueue = new Async\Channel(10);

for ($i = 0; $i < 10; $i++) {
    spawn(run_worker(...), 'worker.php', $taskQueue);
}

foreach ($tasks as $task) {
    $channel->send($task);
}

$channel->close();

当然,这段代码没有错误处理或边界检查,但它尽可能简单且易读。然而在底层,一些非平凡的事情正在发生。假设有 100 个任务和 10 个进程的池。当所有 worker 都忙碌时会发生什么?$channel->send($task) 会挂起协程,直到队列有空间。队列大小与池大小匹配并非巧合;任务供给函数会暂停,防止内存被尚无法处理的任务填满。

TaskGroup

已编写的代码离生产就绪还很远。不仅跳过了错误处理,而且没有控制协程。它们悬在空中,如果应用开始关闭或出现问题(异常),无法保证协程不会因逻辑错误而挂起。这很糟糕,而且在生产环境中代码挂起却无从排查原因,非常令人不快。

为最小化意外错误的风险,使用 TaskGroup 模式。它专门设计用于一起启动协程、一起等待它们、一起销毁它们。

$taskQueue = new Async\Channel(10);
$group     = new Async\TaskGroup();

try {
    // 在组管理下启动 workers
    for ($i = 0; $i < 10; $i++) {
        $group->spawn(run_worker(...), 'worker.php', $taskQueue);
    }

    // 向 Channel 投喂任务
    foreach ($tasks as $task) {
        $taskQueue->send($task);
    }
} finally {
    // 关闭任务 Channel —— 尝试读取的协程
    // 会收到 `ChannelClosedException` 并优雅关闭
    $taskQueue->close();
    // 封印组,防止意外添加新协程
    $group->seal();                // 防止添加新协程
    // 等待尚未完成的任务
    $group->all()->await();        // 等待所有 worker 完成
}

增强代码的容错性

前一个版本的代码不适合真实场景。如果 worker 进程崩溃,fwrite($pipes[0], json_encode($task) . "\n") 可能失败。

而且代码还隐藏着一个隐患。

function run_worker(string $script, Channel $taskQueue): void
{
    return;
}

$taskQueue = new Channel(POOL_SIZE);
$group     = new TaskGroup();

try {
    for ($i = 0; $i < POOL_SIZE; $i++) {
        $group->spawn(run_worker(...), __DIR__ . '/worker.php', $taskQueue);
    }

    foreach (generate_tasks(TASK_COUNT) as $task) {
        $taskQueue->send($task);
    }
} finally {
    $taskQueue->close();
    $group->seal();
    $group->all()->await();
    echo "\nDone.\n";
}

结果会是死锁错误,输出类似这样:

=== DEADLOCK REPORT START ===
Coroutines waiting: 1, active_events: 0

Coroutine 4 spawned at main:0, suspended at main.php:96 (main)
  waiting for:
    - Channel(capacity=3, receivers=0, senders=1)

=== DEADLOCK REPORT END ===

死锁调试输出可通过 async.debug_deadlock ini 指令控制。详见 https://true-async.github.io/en/docs/reference/ini-settings.html

原因是主协程试图向已满的 Channel 发送任务,但没有人会去读取。为避免此错误,需要在逻辑上将 TaskGroup 的生命周期与 $taskQueue 绑定。这可以通过 TaskGroup::all()->await() 等待所有活跃任务来实现。

spawn(function () use ($taskQueue, $group) {
    try {
        $group->all()->await();
    } finally {
        $taskQueue->close();
    }
});

更简洁的实现方式:

foreach (generate_tasks(TASK_COUNT) as $task) {
    $taskQueue->send($task, $group->all());
}

Channel::send 方法的第二个参数是取消令牌,允许在任意条件下取消操作。$group->all() 返回一个 Future,当所有任务完成时解析。

不幸的是,即使 $taskQueue->send($task, $group->all()) 也不是 100% 正确,因为它没有处理 TaskGroup 中所有协程已完成但新协程尚未启动的情况。Channel 类的语义——在 TrueAsync 0.6.0+ 中与 Go 非常相似——是最有问题的实现之一,值得单独关注。

现在改进进程池代码本身的错误处理。

foreach ($taskQueue as $task) {
    $encoded = json_encode($task);
    if ($encoded === false) {
        echo "[worker] json_encode failed for task #{$task['id']}: " . json_last_error_msg() . "\n";
        return;
    }

    if (fwrite($pipes[0], $encoded . "\n")) {
        echo "[worker] fwrite failed for task #{$task['id']}: pipe may be broken\n";
        return;
    }

    $line = fgets($pipes[1]);

    if ($line === false && stream_get_meta_data($pipes[1])['timed_out']) {
        echo "[worker] timeout waiting for response on task #{$task['id']}\n";
        return;
    } else if ($line === false) {
        echo "[worker] fgets failed for task #{$task['id']}: pipe closed or EOF\n";
        return;
    }

    $result = json_decode(trim($line), true);
    if ($result === null) {
        echo "[worker] json_decode failed for task #{$task['id']}: " . json_last_error_msg() . " (raw: " . trim($line) . ")\n";
        return;
    }
}

在循环前添加超时设置:stream_set_timeout($pipes[1], 5)——五秒——防止父进程失控。

故意在 worker_fail.php 中破坏 worker 代码(例如添加 sleep(10))看看会发生什么:

php.exe E:\php\examples\workers_process_cli\main2.php
[worker] fwrite failed for task #1: pipe may be broken
[worker] fwrite failed for task #2: pipe may be broken
[worker] fwrite failed for task #3: pipe may be broken

Done.
Logo

汇聚全球AI编程工具,助力开发者即刻编程。

更多推荐