symfony/processでコールバックが実行されるタイミングを知らないと、並列処理で標準出力がうまくいかない。
symfony/processをマルチプロセスにして、並列ストリーム出力する。 - mitsuru793のブログ
symfony/processを使い、並列処理を実装しました。次にProcessのコレクションクラスを作ったのですが、コレクション全体のisRunning
を確認するとメソッドで詰まりました。
このライブラリはプロセスの情報を更新するメソッドを、よく呼び出します。これが呼び出されないと標準出力のためのコールバックが実行されません。それではコードを見ていきましょう。
<?php declare(strict_types=1); use Symfony\Component\Process\Process; require_once __DIR__ . '/vendor/autoload.php'; $script = __DIR__ . "/tmp.sh"; file_put_contents($script, <<<'BASH' echo "start $1" sleep $2 echo "end $1" BASH ); $processes = [ new Process(['bash', $script, 1, 3]), new Process(['bash', $script, 2, 1]), ]; foreach ($processes as $i => $process) { $process->start(function (string $type, string $data) { echo 'in callback'; }); }
このようにstartには、出力のバッファを処理するコールバックが渡せます。runメソッドにも渡せます。 このコールバックが実行されるタイミングは、Process->readPipesというprivateメソッドが呼び出された時です。ここでしか呼び出されません。コールバックをセットしても、内部でこのメソッドが呼び出されない限りは、上記のコードは標準出力しません。
<?php // 呼び出し例 $this->readPipes($running && *$blocking*, ‘\\’ !== \*DIRECTORY_SEPARATOR*|| !$running);
<?php /** * Reads pipes, executes callback. * * @param bool $blocking Whether to use blocking calls or not * @param bool $close Whether to close file handles or not */ private function readPipes(bool *$blocking*, bool *$close*) { $result = $this->processPipes->readAndWrite(*$blocking*, *$close*); $callback = $this->callback; foreach ($result as $type => $data) { if (3 !== $type) { $callback(self::*STDOUT*=== $type ? self::*OUT*: self::*ERR*, $data); } elseif (!isset($this->fallbackStatus['signaled'])) { $this->fallbackStatus['exitcode'] = (int) $data; } } }
readPipesは次のメソッドで呼び出されます。
- wait
- updateStatus
waitはstartした外部コマンドの実行が終わるのをwhileで待機します。runメソッドで、startに続き呼び出されています。
updateStatusの呼び出し元は次のメソッドです。
- start
- wait
- getExitCode
- isRunning
- isTerminated
- getStatus
- readPipesForOutput(これだけprivate)
Startメソッドを使った際は、waitは呼び出されないため上記のどれかのメソッドを通じてupdateStatusを呼び出されないとコールバックが実行されません。
Processのコレクションを作る際に次の間違いをしました。
<?php final class Processes { /** @var Process[] */ public $items; public function __construct(array $items) { $this->items = $items; } public function areSomeRunning() { foreach ($this->items as $process) { /** @var Process $process */ if ($process->isRunning()) { return true; } return false; } } }
1つでもプロセスが起動中ならtrueを返すareSomeRunning()
を作りました。返す条件は正しいですが、1つ間違いがあります。return true;
でbreakすると、他のプロセスのisRunning
が呼び出されないので、updateStatus
が実行されません。つまり、他のプロセス出力のコールバックが実行されません。標準出力されないことになります。
<?php public function areSomeRunning() { $isRunning = false; foreach ($this->items as $process) { /** @var Process $process */ if ($process->isRunning()) { $isRunning = true; } return $isRunning; } }
返すbool値が分かったとしても、全プロセスの標準出力のために最後までループを回す必要があります。