Merge pull request #10299 from appwrite/feat-functions-redis-queue

Feat: Redis functions queue (reapply with fixes)
This commit is contained in:
Matej Bačo
2025-08-11 16:55:02 +02:00
committed by GitHub
10 changed files with 34 additions and 10 deletions
+3
View File
@@ -194,6 +194,9 @@ CLI::setResource('publisher', function (Group $pools) {
CLI::setResource('publisherDatabases', function (BrokerPool $publisher) {
return $publisher;
}, ['publisher']);
CLI::setResource('publisherFunctions', function (BrokerPool $publisher) {
return $publisher;
}, ['publisher']);
CLI::setResource('publisherMigrations', function (BrokerPool $publisher) {
return $publisher;
}, ['publisher']);
+3 -3
View File
@@ -686,12 +686,12 @@ App::get('/v1/health/queue/functions')
contentType: ContentType::JSON
))
->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true)
->inject('publisher')
->inject('publisherFunctions')
->inject('response')
->action(function (int|string $threshold, Publisher $publisher, Response $response) {
->action(function (int|string $threshold, Publisher $publisherFunctions, Response $response) {
$threshold = \intval($threshold);
$size = $publisher->getQueueSize(new Queue(Event::FUNCTIONS_QUEUE_NAME));
$size = $publisherFunctions->getQueueSize(new Queue(Event::FUNCTIONS_QUEUE_NAME));
if ($size >= $threshold) {
throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}.");
+4 -2
View File
@@ -29,6 +29,7 @@ use Utopia\Database\DateTime;
use Utopia\Database\Document;
use Utopia\Database\Helpers\Role;
use Utopia\Database\Validator\Authorization;
use Utopia\Queue\Broker\Pool as BrokerPool;
use Utopia\Queue\Publisher;
use Utopia\System\System;
use Utopia\Telemetry\Adapter as Telemetry;
@@ -408,6 +409,7 @@ App::init()
->inject('project')
->inject('user')
->inject('publisher')
->inject('publisherFunctions')
->inject('queueForEvents')
->inject('queueForMessaging')
->inject('queueForAudits')
@@ -423,7 +425,7 @@ App::init()
->inject('plan')
->inject('devKey')
->inject('telemetry')
->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Publisher $publisher, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, StatsUsage $queueForStatsUsage, Database $dbForProject, callable $timelimit, Document $resourceToken, string $mode, ?Key $apiKey, array $plan, Document $devKey, Telemetry $telemetry) use ($usageDatabaseListener, $eventDatabaseListener) {
->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Publisher $publisher, BrokerPool $publisherFunctions, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, StatsUsage $queueForStatsUsage, Database $dbForProject, callable $timelimit, Document $resourceToken, string $mode, ?Key $apiKey, array $plan, Document $devKey, Telemetry $telemetry) use ($usageDatabaseListener, $eventDatabaseListener) {
$route = $utopia->getRoute();
@@ -535,7 +537,7 @@ App::init()
// Clone the queues, to prevent events triggered by the database listener
// from overwriting the events that are supposed to be triggered in the shutdown hook.
$queueForEventsClone = new Event($publisher);
$queueForFunctions = new Func($publisher);
$queueForFunctions = new Func($publisherFunctions);
$queueForWebhooks = new Webhook($publisher);
$queueForRealtime = new Realtime();
+6
View File
@@ -83,6 +83,9 @@ App::setResource('publisher', function (Group $pools) {
App::setResource('publisherDatabases', function (BrokerPool $publisher) {
return $publisher;
}, ['publisher']);
App::setResource('publisherFunctions', function (BrokerPool $publisher) {
return $publisher;
}, ['publisher']);
App::setResource('publisherMigrations', function (BrokerPool $publisher) {
return $publisher;
}, ['publisher']);
@@ -95,6 +98,9 @@ App::setResource('consumer', function (Group $pools) {
App::setResource('consumerDatabases', function (BrokerPool $consumer) {
return $consumer;
}, ['consumer']);
App::setResource('consumerFunctions', function (BrokerPool $consumer) {
return $consumer;
}, ['consumer']);
App::setResource('consumerMigrations', function (BrokerPool $consumer) {
return $consumer;
}, ['publisher']);
+8
View File
@@ -251,6 +251,10 @@ Server::setResource('publisherDatabases', function (BrokerPool $publisher) {
return $publisher;
}, ['publisher']);
Server::setResource('publisherFunctions', function (BrokerPool $publisher) {
return $publisher;
}, ['publisher']);
Server::setResource('publisherMigrations', function (BrokerPool $publisher) {
return $publisher;
}, ['publisher']);
@@ -267,6 +271,10 @@ Server::setResource('consumerDatabases', function (BrokerPool $consumer) {
return $consumer;
}, ['consumer']);
Server::setResource('consumerFunctions', function (BrokerPool $consumer) {
return $consumer;
}, ['consumer']);
Server::setResource('consumerMigrations', function (BrokerPool $consumer) {
return $consumer;
}, ['consumer']);
Generated
+2 -2
View File
@@ -8403,7 +8403,7 @@
],
"aliases": [],
"minimum-stability": "stable",
"stability-flags": {},
"stability-flags": [],
"prefer-stable": false,
"prefer-lowest": false,
"platform": {
@@ -8427,5 +8427,5 @@
"platform-overrides": {
"php": "8.3"
},
"plugin-api-version": "2.6.0"
"plugin-api-version": "2.3.0"
}
+4 -1
View File
@@ -26,6 +26,7 @@ abstract class ScheduleBase extends Action
protected BrokerPool $publisher;
protected BrokerPool $publisherMigrations;
protected BrokerPool $publisherFunctions;
private ?Histogram $collectSchedulesTelemetryDuration = null;
private ?Gauge $collectSchedulesTelemetryCount = null;
@@ -45,6 +46,7 @@ abstract class ScheduleBase extends Action
->desc("Execute {$type}s scheduled in Appwrite")
->inject('publisher')
->inject('publisherMigrations')
->inject('publisherFunctions')
->inject('dbForPlatform')
->inject('getProjectDB')
->inject('telemetry')
@@ -67,13 +69,14 @@ abstract class ScheduleBase extends Action
* 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute
* 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutine sleeps until exact time before sending request to worker.
*/
public function action(BrokerPool $publisher, BrokerPool $publisherMigrations, Database $dbForPlatform, callable $getProjectDB, Telemetry $telemetry): void
public function action(BrokerPool $publisher, BrokerPool $publisherMigrations, BrokerPool $publisherFunctions, Database $dbForPlatform, callable $getProjectDB, Telemetry $telemetry): void
{
Console::title(\ucfirst(static::getSupportedResource()) . ' scheduler V1');
Console::success(APP_NAME . ' ' . \ucfirst(static::getSupportedResource()) . ' scheduler v1 has started');
$this->publisher = $publisher;
$this->publisherMigrations = $publisherMigrations;
$this->publisherFunctions = $publisherFunctions;
$this->scheduleTelemetryCount = $telemetry->createGauge('task.schedule.count');
$this->collectSchedulesTelemetryDuration = $telemetry->createHistogram('task.schedule.collect_schedules.duration', 's');
@@ -30,7 +30,7 @@ class ScheduleExecutions extends ScheduleBase
{
$intervalEnd = (new \DateTime())->modify('+' . self::ENQUEUE_TIMER . ' seconds');
$queueForFunctions = new Func($this->publisher);
$queueForFunctions = new Func($this->publisherFunctions);
foreach ($this->schedules as $schedule) {
if (!$schedule['active']) {
@@ -90,7 +90,7 @@ class ScheduleFunctions extends ScheduleBase
$this->updateProjectAccess($schedule['project'], $dbForPlatform);
$queueForFunctions = new Func($this->publisher);
$queueForFunctions = new Func($this->publisherFunctions);
$queueForFunctions
->setType('schedule')
@@ -3,6 +3,7 @@
namespace Tests\E2E\Services\Messaging;
use Appwrite\Messaging\Status as MessageStatus;
use Appwrite\Tests\Retry;
use CURLFile;
use Tests\E2E\Client;
use Utopia\Database\DateTime;
@@ -1190,6 +1191,7 @@ trait MessagingBase
$this->assertEquals(MessageStatus::FAILED, $message['body']['status']);
}
#[Retry(count: 3)]
public function testUpdateScheduledAt(): void
{
// Create user