Use Channel-based polling in all() implementation

Instead of relying on then() callbacks which may not fire correctly,
poll the adopted promise state directly using Swoole Coroutine::sleep
for proper yielding. Each promise is waited on in its own coroutine.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Jake Barnby
2026-01-21 22:21:07 +13:00
parent 3078e6bff5
commit ef46367fa3
@@ -44,47 +44,54 @@ class Swoole extends Adapter
return $this->createFulfilled([]);
}
// Create the combined promise without an executor (won't start a coroutine)
$combinedPromise = new SwoolePromise();
// Create a promise whose executor uses Channel to wait for all input promises
$promise = new SwoolePromise(function ($resolve, $reject) use ($promisesOrValues, $total) {
$channel = new \Swoole\Coroutine\Channel($total);
$result = [];
$error = null;
// Shared state across callbacks
$count = 0;
$result = [];
$rejected = false;
foreach ($promisesOrValues as $index => $promiseOrValue) {
if ($promiseOrValue instanceof GQLPromise) {
$result[$index] = null;
// Spawn a coroutine to wait for each promise
\go(function () use ($promiseOrValue, $index, &$result, &$error, $channel) {
/** @var SwoolePromise $adopted */
$adopted = $promiseOrValue->adoptedPromise;
$resolveAllWhenFinished = static function () use (&$count, $total, &$result, &$rejected, $combinedPromise): void {
if (!$rejected && $count === $total) {
\ksort($result);
$combinedPromise->resolve($result);
}
};
// Register then callbacks on each input promise
foreach ($promisesOrValues as $index => $promiseOrValue) {
if ($promiseOrValue instanceof GQLPromise) {
$result[$index] = null;
$promiseOrValue->then(
static function ($value) use (&$result, $index, &$count, $resolveAllWhenFinished): void {
$result[$index] = $value;
++$count;
$resolveAllWhenFinished();
},
static function ($error) use (&$rejected, $combinedPromise): void {
if (!$rejected) {
$rejected = true;
$combinedPromise->reject($error);
// Poll until the promise is settled
while ($adopted->isPending()) {
\Swoole\Coroutine::sleep(0.001);
}
}
);
} else {
$result[$index] = $promiseOrValue;
++$count;
if ($adopted->isFulfilled()) {
$result[$index] = $adopted->getResult();
} else {
if ($error === null) {
$error = $adopted->getResult();
}
}
$channel->push(true);
});
} else {
$result[$index] = $promiseOrValue;
$channel->push(true);
}
}
}
// Check if all non-promise values already resolved everything
$resolveAllWhenFinished();
// Wait for all coroutines to complete
for ($i = 0; $i < $total; $i++) {
$channel->pop();
}
$channel->close();
return new GQLPromise($combinedPromise, $this);
if ($error !== null) {
$reject($error);
} else {
\ksort($result);
$resolve($result);
}
});
return new GQLPromise($promise, $this);
}
}