symfony/processでコールバックが実行されるタイミングを知らないと、並列処理で標準出力がうまくいかない。

symfony/processをマルチプロセスにして、並列ストリーム出力する。 - mitsuru793のブログ

symfony/processを使い、並列処理を実装しました。次にProcessのコレクションクラスを作ったのですが、コレクション全体のisRunningを確認するとメソッドで詰まりました。

このライブラリはプロセスの情報を更新するメソッドを、よく呼び出します。これが呼び出されないと標準出力のためのコールバックが実行されません。それではコードを見ていきましょう。

<?php
declare(strict_types=1);

use Symfony\Component\Process\Process;

require_once __DIR__ . '/vendor/autoload.php';

$script = __DIR__ . "/tmp.sh";

file_put_contents($script, <<<'BASH'
echo "start $1"
sleep $2
echo "end $1"
BASH
);

$processes = [
    new Process(['bash', $script, 1, 3]),
    new Process(['bash', $script, 2, 1]),
];

foreach ($processes as $i => $process) {
    $process->start(function (string $type, string $data) {
        echo 'in callback';
    });
}

このようにstartには、出力のバッファを処理するコールバックが渡せます。runメソッドにも渡せます。 このコールバックが実行されるタイミングは、Process->readPipesというprivateメソッドが呼び出された時です。ここでしか呼び出されません。コールバックをセットしても、内部でこのメソッドが呼び出されない限りは、上記のコードは標準出力しません。

<?php
// 呼び出し例
$this->readPipes($running && *$blocking*, ‘\\’ !== \*DIRECTORY_SEPARATOR*|| !$running);
<?php
/**
 * Reads pipes, executes callback.
 *
 * @param bool $blocking Whether to use blocking calls or not
 * @param bool $close    Whether to close file handles or not
 */
private function readPipes(bool *$blocking*, bool *$close*)
{
    $result = $this->processPipes->readAndWrite(*$blocking*, *$close*);

    $callback = $this->callback;
    foreach ($result as $type => $data) {
        if (3 !== $type) {
            $callback(self::*STDOUT*=== $type ? self::*OUT*: self::*ERR*, $data);
        } elseif (!isset($this->fallbackStatus['signaled'])) {
            $this->fallbackStatus['exitcode'] = (int) $data;
        }
    }
}

readPipesは次のメソッドで呼び出されます。

  • wait
  • updateStatus

waitはstartした外部コマンドの実行が終わるのをwhileで待機します。runメソッドで、startに続き呼び出されています。

updateStatusの呼び出し元は次のメソッドです。

  • start
  • wait
  • getExitCode
  • isRunning
  • isTerminated
  • getStatus
  • readPipesForOutput(これだけprivate)

Startメソッドを使った際は、waitは呼び出されないため上記のどれかのメソッドを通じてupdateStatusを呼び出されないとコールバックが実行されません。

Processのコレクションを作る際に次の間違いをしました。

<?php
final class Processes
{
    /** @var Process[] */
    public $items;

    public function __construct(array $items)
    {
        $this->items = $items;
    }

    public function areSomeRunning()
    {
        foreach ($this->items as $process) {
            /** @var Process $process */
            if ($process->isRunning()) {
                return true;
            }
            return false;
        }
    }
}

1つでもプロセスが起動中ならtrueを返すareSomeRunning()を作りました。返す条件は正しいですが、1つ間違いがあります。return true;でbreakすると、他のプロセスのisRunningが呼び出されないので、updateStatusが実行されません。つまり、他のプロセス出力のコールバックが実行されません。標準出力されないことになります。

<?php
public function areSomeRunning()
{
    $isRunning = false;
    foreach ($this->items as $process) {
        /** @var Process $process */
        if ($process->isRunning()) {
            $isRunning = true;
        }

        return $isRunning;
    }
}

返すbool値が分かったとしても、全プロセスの標準出力のために最後までループを回す必要があります。