diff --git a/app/controllers/api/health.php b/app/controllers/api/health.php index 2bdaea3c2c..43368b8d3f 100644 --- a/app/controllers/api/health.php +++ b/app/controllers/api/health.php @@ -523,11 +523,16 @@ App::get('/v1/health/queue/databases') ->param('name', 'database_db_main', new Text(256), 'Queue name for which to check the queue size', true) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) ->inject('publisher') + ->inject('publisherRedis') ->inject('response') - ->action(function (string $name, int|string $threshold, Publisher $publisher, Response $response) { + ->action(function (string $name, int|string $threshold, Publisher $publisher, ?Publisher $publisherRedis, Response $response) { $threshold = \intval($threshold); - $size = $publisher->getQueueSize(new Queue($name)); + $isRedisFallback = \str_contains(System::getEnv('_APP_WORKER_REDIS_FALLBACK', ''), 'databases'); + + $size = $isRedisFallback + ? $publisherRedis->getQueueSize(new Queue($name)) + : $publisher->getQueueSize(new Queue($name)); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); @@ -655,11 +660,16 @@ App::get('/v1/health/queue/migrations') )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) ->inject('publisher') + ->inject('publisherRedis') ->inject('response') - ->action(function (int|string $threshold, Publisher $publisher, Response $response) { + ->action(function (int|string $threshold, Publisher $publisher, ?Publisher $publisherRedis, Response $response) { $threshold = \intval($threshold); - $size = $publisher->getQueueSize(new Queue(Event::MIGRATIONS_QUEUE_NAME)); + $isRedisFallback = \str_contains(System::getEnv('_APP_WORKER_REDIS_FALLBACK', ''), 'migrations'); + + $size = $isRedisFallback + ? $publisherRedis->getQueueSize(new Queue(Event::MIGRATIONS_QUEUE_NAME)) + : $publisher->getQueueSize(new Queue(Event::MIGRATIONS_QUEUE_NAME)); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); diff --git a/app/worker.php b/app/worker.php index 845914c923..c1fdd93f2c 100644 --- a/app/worker.php +++ b/app/worker.php @@ -247,10 +247,18 @@ Server::setResource('publisher', function (Group $pools) { return new BrokerPool(publisher: $pools->get('publisher')); }, ['pools']); +Server::setResource('publisherRedis', function () { + return null; +}); + Server::setResource('consumer', function (Group $pools) { return new BrokerPool(consumer: $pools->get('consumer')); }, ['pools']); +Server::setResource('consumerRedis', function () { + return null; +}); + Server::setResource('queueForStatsUsage', function (Publisher $publisher) { return new StatsUsage($publisher); }, ['publisher']);