symfony/processをマルチプロセスにして、並列ストリーム出力する。

プロセスの制御には組み込み関数ではなく、symfony/processを使います。内部では、組み込み関数proc_openが使われています。

プロセスの実行には2種類のメソッドがあります。

  • run(同期的で、内部でwaitメソッドを呼び出す)
  • start(続いてwaitメソッドを呼び出す、runと同じになる)

同時のプロセスを動かしたいので今回はstartを使います。runを使うと、1つのプロセスが終わるまで次のプロセスは実行されません。

<?php
declare(strict_types=1);

use Symfony\Component\Process\Process;

require_once __DIR__ . '/vendor/autoload.php';

$script = __DIR__ . "/tmp.sh";

file_put_contents($script, <<<'BASH'
echo "start $1"
sleep $2
echo "end $1"
BASH
);

$processes = [
    new Process(['bash', $script, 1, 3]),
    new Process(['bash', $script, 2, 1]),
];

foreach ($processes as $i => $process) {
    $process->start(); // bashを実行
    sleep(1); // プロセスが順番に起動することを保証
}

do {
    $isRunning = false;
    foreach ($processes as $i => $process) {
        echo $process->getIncrementalOutput(); // 標準出力
        if ($process->isRunning()) {
            $isRunning = true;
        }
    }
} while($isRunning);

unlink($script);

標準出力

start 1
start 2
end 2
end 1

1個目のプロセスは3秒かかり、2個目は1秒です。1個目のプロセスをstartした後にsleep(1)を入れておかないと、2個目が先に起動することがあります。おそらくstartメソッドを先に呼び出しても、実際にbashが実行されるまでほんの少し時間がかかってしまうことがあるのではと思います。

プロセスが出力した文字列は、getOutputではなくgetIncrementalOutputを使います。getOutputだとそれまでに出力されたものが、毎回全て出力されてしまいます。トータルのバッファを出力ということですね。getIncrementalOutputだと、前回のこのメソッド呼び出しから新たに出力されたものを取得できます。

実はgetIncrementalOutputを使わなくても、run, startメソッドにcallbackを渡して実現可能です。

<?php
foreach ($processes as $i => $process) {
    $process->start(function (string $type, string $data) {
        echo $data;
    });
    sleep(1);
}

do {
    $isRunning = false;
    foreach ($processes as $i => $process) {
//        echo $process->getIncrementalOutput();
        if ($process->isRunning()) {
            $isRunning = true;
        }
    }
} while($isRunning);

コールバックの引数$typeはProcessのクラス定数です。

<?php
class Process implements \IteratorAggregate
{
    const ERR = 'err';
    const OUT = 'out';
}

if ($type === Process::ERR) { echo $data; }

このように標準・エラー出力を切り替えることができます。

注意点として、コールバックが呼び出させれるのはプロセスに出力があった時です。

<?php

file_put_contents($script, <<<'BASH'
sleep $2
BASH
);

foreach ($processes as $i => $process) {
    $process->start(function (string $type, string $data) {
        echo 'in callback';
    });
}

一部省略しています。bashsleepだけにして、常にin callbackと出力するようにcallbackを設定しているのですが何も出力されません。