From 85e2cf7d8b0752c968d54d9600ea2e2f77abcd70 Mon Sep 17 00:00:00 2001 From: Chirag Aggarwal Date: Wed, 13 May 2026 17:35:35 +0530 Subject: [PATCH] (refactor): Migrate queueForFunctions to FunctionPublisher and FunctionMessage --- app/cli.php | 9 +- app/controllers/shared/api.php | 27 +++--- app/init/resources.php | 5 + app/init/resources/request.php | 30 ++++-- app/init/worker/message.php | 10 +- src/Appwrite/Event/Message/Func.php | 92 +++++++++++++++++++ src/Appwrite/Event/Publisher/Func.php | 27 ++++++ .../Collections/Documents/Action.php | 19 ++-- .../Collections/Documents/Bulk/Delete.php | 7 +- .../Collections/Documents/Bulk/Update.php | 7 +- .../Collections/Documents/Bulk/Upsert.php | 7 +- .../Collections/Documents/Create.php | 7 +- .../Http/Databases/Transactions/Update.php | 19 +++- .../Collections/Documents/Bulk/Delete.php | 2 +- .../Collections/Documents/Bulk/Update.php | 2 +- .../Collections/Documents/Bulk/Upsert.php | 2 +- .../Collections/Documents/Create.php | 2 +- .../Http/DocumentsDB/Transactions/Update.php | 2 +- .../Http/TablesDB/Tables/Rows/Bulk/Delete.php | 2 +- .../Http/TablesDB/Tables/Rows/Bulk/Update.php | 2 +- .../Http/TablesDB/Tables/Rows/Bulk/Upsert.php | 2 +- .../Http/TablesDB/Tables/Rows/Create.php | 2 +- .../Http/TablesDB/Transactions/Update.php | 2 +- .../Collections/Documents/Bulk/Delete.php | 2 +- .../Collections/Documents/Bulk/Update.php | 2 +- .../Collections/Documents/Bulk/Upsert.php | 2 +- .../Collections/Documents/Create.php | 2 +- .../Http/VectorsDB/Transactions/Update.php | 2 +- .../Functions/Http/Executions/Create.php | 34 +++---- .../Functions/Http/Functions/Create.php | 19 ++-- .../Modules/Functions/Workers/Builds.php | 23 +++-- .../Health/Http/Health/Queue/Failed/Get.php | 8 +- .../Http/Health/Queue/Functions/Get.php | 8 +- .../Platform/Tasks/ScheduleExecutions.php | 33 ++++--- .../Platform/Tasks/ScheduleFunctions.php | 23 +++-- .../Platform/Workers/Certificates.php | 41 +++++---- src/Appwrite/Platform/Workers/Functions.php | 58 +++++++----- 37 files changed, 369 insertions(+), 174 deletions(-) create mode 100644 src/Appwrite/Event/Message/Func.php create mode 100644 src/Appwrite/Event/Publisher/Func.php diff --git a/app/cli.php b/app/cli.php index ada155c4dc..8dfbaf4e9a 100644 --- a/app/cli.php +++ b/app/cli.php @@ -4,8 +4,8 @@ require_once __DIR__ . '/init.php'; use Appwrite\Event\Delete; use Appwrite\Event\Event; -use Appwrite\Event\Func; use Appwrite\Event\Publisher\Certificate as CertificatePublisher; +use Appwrite\Event\Publisher\Func as FunctionPublisher; use Appwrite\Event\Publisher\StatsResources as StatsResourcesPublisher; use Appwrite\Event\Publisher\Usage as UsagePublisher; use Appwrite\Platform\Appwrite; @@ -281,9 +281,10 @@ $container->set('publisherForStatsResources', fn (Publisher $publisher) => new S $publisher, new Queue(System::getEnv('_APP_STATS_RESOURCES_QUEUE_NAME', Event::STATS_RESOURCES_QUEUE_NAME)) ), ['publisher']); -$container->set('queueForFunctions', function (Publisher $publisher) { - return new Func($publisher); -}, ['publisher']); +$container->set('publisherForFunctions', fn (Publisher $publisher) => new FunctionPublisher( + $publisher, + new Queue(System::getEnv('_APP_FUNCTIONS_QUEUE_NAME', Event::FUNCTIONS_QUEUE_NAME), 'utopia-queue', Event::FUNCTIONS_QUEUE_TTL) +), ['publisher']); $container->set('queueForDeletes', function (Publisher $publisher) { return new Delete($publisher); }, ['publisher']); diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index 8365274e98..4445d0db0f 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -7,10 +7,11 @@ use Appwrite\Event\Context\Audit as AuditContext; use Appwrite\Event\Database as EventDatabase; use Appwrite\Event\Delete; use Appwrite\Event\Event; -use Appwrite\Event\Func; use Appwrite\Event\Message\Audit as AuditMessage; +use Appwrite\Event\Message\Func as FunctionMessage; use Appwrite\Event\Message\Usage as UsageMessage; use Appwrite\Event\Publisher\Audit; +use Appwrite\Event\Publisher\Func as FunctionPublisher; use Appwrite\Event\Publisher\Usage as UsagePublisher; use Appwrite\Event\Realtime; use Appwrite\Event\Webhook; @@ -488,7 +489,7 @@ Http::init() ->inject('queueForDeletes') ->inject('queueForDatabase') ->inject('usage') - ->inject('queueForFunctions') + ->inject('publisherForFunctions') ->inject('dbForProject') ->inject('timelimit') ->inject('resourceToken') @@ -500,7 +501,7 @@ Http::init() ->inject('platform') ->inject('authorization') ->inject('cacheControlForStorage') - ->action(function (Http $utopia, Request $request, Response $response, Document $project, User $user, Event $queueForEvents, AuditContext $auditContext, Delete $queueForDeletes, EventDatabase $queueForDatabase, Context $usage, Func $queueForFunctions, Database $dbForProject, callable $timelimit, Document $resourceToken, string $mode, ?Key $apiKey, array $plan, Document $devKey, Telemetry $telemetry, array $platform, Authorization $authorization, callable $cacheControlForStorage) { + ->action(function (Http $utopia, Request $request, Response $response, Document $project, User $user, Event $queueForEvents, AuditContext $auditContext, Delete $queueForDeletes, EventDatabase $queueForDatabase, Context $usage, FunctionPublisher $publisherForFunctions, Database $dbForProject, callable $timelimit, Document $resourceToken, string $mode, ?Key $apiKey, array $plan, Document $devKey, Telemetry $telemetry, array $platform, Authorization $authorization, callable $cacheControlForStorage) { $response->setUser($user); $request->setUser($user); @@ -613,10 +614,6 @@ Http::init() /* Auto-set projects */ $queueForDeletes->setProject($project); $queueForDatabase->setProject($project); - $queueForFunctions->setProject($project); - - /* Auto-set platforms */ - $queueForFunctions->setPlatform($platform); $useCache = $route->getLabel('cache', false); $storageCacheOperationsCounter = $telemetry->createCounter('storage.cache.operations.load'); @@ -808,7 +805,7 @@ Http::shutdown() ->inject('publisherForUsage') ->inject('queueForDeletes') ->inject('queueForDatabase') - ->inject('queueForFunctions') + ->inject('publisherForFunctions') ->inject('queueForWebhooks') ->inject('queueForRealtime') ->inject('dbForProject') @@ -818,7 +815,7 @@ Http::shutdown() ->inject('bus') ->inject('apiKey') ->inject('mode') - ->action(function (Http $utopia, Request $request, Response $response, Document $project, User $user, Event $queueForEvents, AuditContext $auditContext, Audit $publisherForAudits, Context $usage, UsagePublisher $publisherForUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Func $queueForFunctions, Event $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject, Authorization $authorization, callable $timelimit, EventProcessor $eventProcessor, Bus $bus, ?Key $apiKey, string $mode) use ($parseLabel) { + ->action(function (Http $utopia, Request $request, Response $response, Document $project, User $user, Event $queueForEvents, AuditContext $auditContext, Audit $publisherForAudits, Context $usage, UsagePublisher $publisherForUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, FunctionPublisher $publisherForFunctions, Event $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject, Authorization $authorization, callable $timelimit, EventProcessor $eventProcessor, Bus $bus, ?Key $apiKey, string $mode) use ($parseLabel) { $responsePayload = $response->getPayload(); @@ -847,9 +844,15 @@ Http::shutdown() if (! empty($functionsEvents)) { foreach ($generatedEvents as $event) { if (isset($functionsEvents[$event])) { - $queueForFunctions - ->from($queueForEvents) - ->trigger(); + $publisherForFunctions->enqueue(FunctionMessage::fromEvent( + event: $queueForEvents->getEvent(), + params: $queueForEvents->getParams(), + project: $queueForEvents->getProject(), + user: $queueForEvents->getUser(), + userId: $queueForEvents->getUserId(), + payload: $queueForEvents->getPayload(), + platform: $queueForEvents->getPlatform(), + )); break; } } diff --git a/app/init/resources.php b/app/init/resources.php index a626b612cb..dbaa89b21d 100644 --- a/app/init/resources.php +++ b/app/init/resources.php @@ -5,6 +5,7 @@ use Appwrite\Event\Publisher\Audit as AuditPublisher; use Appwrite\Event\Publisher\Build as BuildPublisher; use Appwrite\Event\Publisher\Certificate as CertificatePublisher; use Appwrite\Event\Publisher\Execution as ExecutionPublisher; +use Appwrite\Event\Publisher\Func as FunctionPublisher; use Appwrite\Event\Publisher\Mail as MailPublisher; use Appwrite\Event\Publisher\Messaging as MessagingPublisher; use Appwrite\Event\Publisher\Migration as MigrationPublisher; @@ -108,6 +109,10 @@ $container->set('publisherForExecutions', fn (Publisher $publisher) => new Execu $publisher, new Queue(System::getEnv('_APP_EXECUTIONS_QUEUE_NAME', Event::EXECUTIONS_QUEUE_NAME)) ), ['publisher']); +$container->set('publisherForFunctions', fn (Publisher $publisher) => new FunctionPublisher( + $publisher, + new Queue(System::getEnv('_APP_FUNCTIONS_QUEUE_NAME', Event::FUNCTIONS_QUEUE_NAME), 'utopia-queue', Event::FUNCTIONS_QUEUE_TTL) +), ['publisher']); $container->set('publisherForMigrations', fn (Publisher $publisher) => new MigrationPublisher( $publisher, new Queue(System::getEnv('_APP_MIGRATIONS_QUEUE_NAME', Event::MIGRATIONS_QUEUE_NAME)) diff --git a/app/init/resources/request.php b/app/init/resources/request.php index 68a5a3edf5..68f5968519 100644 --- a/app/init/resources/request.php +++ b/app/init/resources/request.php @@ -8,7 +8,8 @@ use Appwrite\Event\Context\Audit as AuditContext; use Appwrite\Event\Database as EventDatabase; use Appwrite\Event\Delete; use Appwrite\Event\Event; -use Appwrite\Event\Func; +use Appwrite\Event\Message\Func as FunctionMessage; +use Appwrite\Event\Publisher\Func as FunctionPublisher; use Appwrite\Event\Realtime; use Appwrite\Event\Webhook; use Appwrite\Extend\Exception; @@ -48,6 +49,7 @@ use Utopia\Locale\Locale; use Utopia\Logger\Log; use Utopia\Pools\Group; use Utopia\Queue\Publisher; +use Utopia\Queue\Queue; use Utopia\Storage\Device; use Utopia\System\System; use Utopia\Telemetry\Adapter as Telemetry; @@ -113,7 +115,10 @@ return function (Container $context): void { $context->set('queueForRealtime', fn () => new Realtime(), []); $context->set('usage', fn () => new UsageContext(), []); $context->set('auditContext', fn () => new AuditContext(), []); - $context->set('queueForFunctions', fn (Publisher $publisher) => new Func($publisher), ['publisher']); + $context->set('publisherForFunctions', fn (Publisher $publisher) => new FunctionPublisher( + $publisher, + new Queue(System::getEnv('_APP_FUNCTIONS_QUEUE_NAME', Event::FUNCTIONS_QUEUE_NAME), 'utopia-queue', Event::FUNCTIONS_QUEUE_TTL) + ), ['publisher']); $context->set('eventProcessor', fn () => new EventProcessor(), []); $context->set('dbForPlatform', function (Group $pools, Cache $cache, Authorization $authorization) { $adapter = new DatabasePool($pools->get('console')); @@ -635,7 +640,7 @@ return function (Container $context): void { return; }, ['user', 'store', 'proofForToken']); - $context->set('dbForProject', function (Group $pools, Database $dbForPlatform, Cache $cache, Document $project, Response $response, Publisher $publisher, Publisher $publisherFunctions, Publisher $publisherWebhooks, Event $queueForEvents, Func $queueForFunctions, Webhook $queueForWebhooks, Realtime $queueForRealtime, UsageContext $usage, Authorization $authorization, Request $request) { + $context->set('dbForProject', function (Group $pools, Database $dbForPlatform, Cache $cache, Document $project, Response $response, Publisher $publisher, Publisher $publisherFunctions, Publisher $publisherWebhooks, Event $queueForEvents, FunctionPublisher $publisherForFunctions, Webhook $queueForWebhooks, Realtime $queueForRealtime, UsageContext $usage, Authorization $authorization, Request $request) { if ($project->isEmpty() || $project->getId() === 'console') { return $dbForPlatform; } @@ -691,7 +696,7 @@ return function (Container $context): void { * Accounts can be created in many ways beyond `createAccount` * (anonymous, OAuth, phone, etc.), and those flows are probably not covered in event tests; so we handle this here. */ - $eventDatabaseListener = function (Document $project, Document $document, Response $response, Event $queueForEvents, Func $queueForFunctions, Webhook $queueForWebhooks, Realtime $queueForRealtime) { + $eventDatabaseListener = function (Document $project, Document $document, Response $response, Event $queueForEvents, FunctionPublisher $publisherForFunctions, Webhook $queueForWebhooks, Realtime $queueForRealtime) { // Only trigger events for user creation with the database listener. if ($document->getCollection() !== 'users') { return; @@ -703,9 +708,15 @@ return function (Container $context): void { ->setPayload($response->output($document, Response::MODEL_USER)); // Trigger functions, webhooks, and realtime events - $queueForFunctions - ->from($queueForEvents) - ->trigger(); + $publisherForFunctions->enqueue(FunctionMessage::fromEvent( + event: $queueForEvents->getEvent(), + params: $queueForEvents->getParams(), + project: $queueForEvents->getProject(), + user: $queueForEvents->getUser(), + userId: $queueForEvents->getUserId(), + payload: $queueForEvents->getPayload(), + platform: $queueForEvents->getPlatform(), + )); /** Trigger webhooks events only if a project has them enabled */ if (! empty($project->getAttribute('webhooks'))) { @@ -885,7 +896,6 @@ return function (Container $context): void { // Clone the queues, to prevent events triggered by the database listener // from overwriting the events that are supposed to be triggered in the shutdown hook. $queueForEventsClone = new Event($publisher); - $queueForFunctions = new Func($publisherFunctions); $queueForWebhooks = new Webhook($publisherWebhooks); $queueForRealtime = new Realtime(); @@ -900,7 +910,7 @@ return function (Container $context): void { $document, $response, $queueForEventsClone->from($queueForEvents), - $queueForFunctions->from($queueForEvents), + $publisherForFunctions, $queueForWebhooks->from($queueForEvents), $queueForRealtime->from($queueForEvents) )) @@ -909,7 +919,7 @@ return function (Container $context): void { ->on(Database::EVENT_DOCUMENT_DELETE, 'purge-function-events-cache', fn ($event, $document) => $functionsEventsCacheListener($event, $document, $project, $database)); return $database; - }, ['pools', 'dbForPlatform', 'cache', 'project', 'response', 'publisher', 'publisherFunctions', 'publisherWebhooks', 'queueForEvents', 'queueForFunctions', 'queueForWebhooks', 'queueForRealtime', 'usage', 'authorization', 'request']); + }, ['pools', 'dbForPlatform', 'cache', 'project', 'response', 'publisher', 'publisherFunctions', 'publisherWebhooks', 'queueForEvents', 'publisherForFunctions', 'queueForWebhooks', 'queueForRealtime', 'usage', 'authorization', 'request']); $context->set('schema', function ($utopia, $dbForProject, $authorization) { diff --git a/app/init/worker/message.php b/app/init/worker/message.php index 791bf5edf0..d4aea0c51e 100644 --- a/app/init/worker/message.php +++ b/app/init/worker/message.php @@ -3,7 +3,7 @@ use Appwrite\Event\Database as EventDatabase; use Appwrite\Event\Delete; use Appwrite\Event\Event; -use Appwrite\Event\Func; +use Appwrite\Event\Publisher\Func as FunctionPublisher; use Appwrite\Event\Realtime; use Appwrite\Event\Webhook; use Appwrite\Usage\Context; @@ -23,6 +23,7 @@ use Utopia\DSN\DSN; use Utopia\Logger\Log; use Utopia\Pools\Group; use Utopia\Queue\Publisher; +use Utopia\Queue\Queue; use Utopia\Registry\Registry; use Utopia\Storage\Device\Telemetry as TelemetryDevice; use Utopia\System\System; @@ -343,9 +344,10 @@ return function (Container $container): void { return new Webhook($publisher); }, ['publisher']); - $container->set('queueForFunctions', function (Publisher $publisher) { - return new Func($publisher); - }, ['publisher']); + $container->set('publisherForFunctions', fn (Publisher $publisher) => new FunctionPublisher( + $publisher, + new Queue(System::getEnv('_APP_FUNCTIONS_QUEUE_NAME', Event::FUNCTIONS_QUEUE_NAME), 'utopia-queue', Event::FUNCTIONS_QUEUE_TTL) + ), ['publisher']); $container->set('queueForRealtime', function () { return new Realtime(); diff --git a/src/Appwrite/Event/Message/Func.php b/src/Appwrite/Event/Message/Func.php new file mode 100644 index 0000000000..2a2ae9d90f --- /dev/null +++ b/src/Appwrite/Event/Message/Func.php @@ -0,0 +1,92 @@ +platform) ? $this->platform : Config::getParam('platform', []); + + return [ + 'project' => $this->project?->getArrayCopy(), + 'user' => $this->user?->getArrayCopy(), + 'userId' => $this->userId, + 'function' => $this->function?->getArrayCopy(), + 'functionId' => $this->functionId, + 'execution' => $this->execution?->getArrayCopy(), + 'type' => $this->type, + 'jwt' => $this->jwt, + 'payload' => $this->payload, + 'events' => $this->events, + 'body' => $this->body, + 'path' => $this->path, + 'headers' => $this->headers, + 'method' => $this->method, + 'platform' => $platform, + ]; + } + + public static function fromArray(array $data): static + { + return new self( + project: !empty($data['project']) ? new Document($data['project']) : null, + user: !empty($data['user']) ? new Document($data['user']) : null, + userId: $data['userId'] ?? null, + function: !empty($data['function']) ? new Document($data['function']) : null, + functionId: $data['functionId'] ?? null, + execution: !empty($data['execution']) ? new Document($data['execution']) : null, + type: $data['type'] ?? '', + jwt: $data['jwt'] ?? '', + payload: $data['payload'] ?? [], + events: $data['events'] ?? [], + body: $data['body'] ?? '', + path: $data['path'] ?? '', + headers: $data['headers'] ?? [], + method: $data['method'] ?? '', + platform: $data['platform'] ?? [], + ); + } +} diff --git a/src/Appwrite/Event/Publisher/Func.php b/src/Appwrite/Event/Publisher/Func.php new file mode 100644 index 0000000000..46f748a59f --- /dev/null +++ b/src/Appwrite/Event/Publisher/Func.php @@ -0,0 +1,27 @@ +publish($queue ?? $this->queue, $message); + } + + public function getSize(bool $failed = false, ?Queue $queue = null): int + { + return $this->getQueueSize($queue ?? $this->queue, $failed); + } +} diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Action.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Action.php index 8100a2c51b..d62782f95e 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Action.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Action.php @@ -3,6 +3,8 @@ namespace Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documents; use Appwrite\Event\Event; +use Appwrite\Event\Message\Func as FunctionMessage; +use Appwrite\Event\Publisher\Func as FunctionPublisher; use Appwrite\Extend\Exception; use Appwrite\Functions\EventProcessor; use Appwrite\Platform\Modules\Databases\Http\Databases\Action as DatabasesAction; @@ -421,7 +423,7 @@ abstract class Action extends DatabasesAction * @param Document[] $documents * @param Event $queueForEvents * @param Event $queueForRealtime - * @param Event $queueForFunctions + * @param FunctionPublisher $publisherForFunctions * @param Event $queueForWebhooks * @param Database $dbForProject * @param EventProcessor $eventProcessor @@ -434,7 +436,7 @@ abstract class Action extends DatabasesAction array $documents, Event $queueForEvents, Event $queueForRealtime, - Event $queueForFunctions, + FunctionPublisher $publisherForFunctions, Event $queueForWebhooks, Database $dbForProject, EventProcessor $eventProcessor @@ -472,9 +474,15 @@ abstract class Action extends DatabasesAction if (!empty($functionsEvents)) { foreach ($generatedEvents as $event) { if (isset($functionsEvents[$event])) { - $queueForFunctions - ->from($queueForEvents) - ->trigger(); + $publisherForFunctions->enqueue(FunctionMessage::fromEvent( + event: $queueForEvents->getEvent(), + params: $queueForEvents->getParams(), + project: $queueForEvents->getProject(), + user: $queueForEvents->getUser(), + userId: $queueForEvents->getUserId(), + payload: $queueForEvents->getPayload(), + platform: $queueForEvents->getPlatform(), + )); break; } } @@ -494,7 +502,6 @@ abstract class Action extends DatabasesAction $queueForEvents->reset(); $queueForRealtime->reset(); - $queueForFunctions->reset(); $queueForWebhooks->reset(); } } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Delete.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Delete.php index 267a54adb0..2dc3100046 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Delete.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Delete.php @@ -3,6 +3,7 @@ namespace Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documents\Bulk; use Appwrite\Event\Event; +use Appwrite\Event\Publisher\Func as FunctionPublisher; use Appwrite\Extend\Exception; use Appwrite\Functions\EventProcessor; use Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documents\Action; @@ -80,14 +81,14 @@ class Delete extends Action ->inject('usage') ->inject('queueForEvents') ->inject('queueForRealtime') - ->inject('queueForFunctions') + ->inject('publisherForFunctions') ->inject('queueForWebhooks') ->inject('plan') ->inject('eventProcessor') ->callback($this->action(...)); } - public function action(string $databaseId, string $collectionId, array $queries, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, callable $getDatabasesDB, Context $usage, Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan, EventProcessor $eventProcessor): void + public function action(string $databaseId, string $collectionId, array $queries, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, callable $getDatabasesDB, Context $usage, Event $queueForEvents, Event $queueForRealtime, FunctionPublisher $publisherForFunctions, Event $queueForWebhooks, array $plan, EventProcessor $eventProcessor): void { $database = $dbForProject->getDocument('databases', $databaseId); if ($database->isEmpty()) { @@ -206,7 +207,7 @@ class Delete extends Action $documents, $queueForEvents, $queueForRealtime, - $queueForFunctions, + $publisherForFunctions, $queueForWebhooks, $dbForProject, $eventProcessor diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Update.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Update.php index da3adf1192..393590d1e6 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Update.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Update.php @@ -3,6 +3,7 @@ namespace Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documents\Bulk; use Appwrite\Event\Event; +use Appwrite\Event\Publisher\Func as FunctionPublisher; use Appwrite\Extend\Exception; use Appwrite\Functions\EventProcessor; use Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documents\Action; @@ -84,14 +85,14 @@ class Update extends Action ->inject('usage') ->inject('queueForEvents') ->inject('queueForRealtime') - ->inject('queueForFunctions') + ->inject('publisherForFunctions') ->inject('queueForWebhooks') ->inject('plan') ->inject('eventProcessor') ->callback($this->action(...)); } - public function action(string $databaseId, string $collectionId, string|array $data, array $queries, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, callable $getDatabasesDB, Context $usage, Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan, EventProcessor $eventProcessor): void + public function action(string $databaseId, string $collectionId, string|array $data, array $queries, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, callable $getDatabasesDB, Context $usage, Event $queueForEvents, Event $queueForRealtime, FunctionPublisher $publisherForFunctions, Event $queueForWebhooks, array $plan, EventProcessor $eventProcessor): void { $data = \is_string($data) ? \json_decode($data, true) @@ -237,7 +238,7 @@ class Update extends Action $documents, $queueForEvents, $queueForRealtime, - $queueForFunctions, + $publisherForFunctions, $queueForWebhooks, $dbForProject, $eventProcessor diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Upsert.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Upsert.php index 5a5ebf48ee..d69298919b 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Upsert.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Upsert.php @@ -3,6 +3,7 @@ namespace Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documents\Bulk; use Appwrite\Event\Event; +use Appwrite\Event\Publisher\Func as FunctionPublisher; use Appwrite\Extend\Exception; use Appwrite\Functions\EventProcessor; use Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documents\Action; @@ -82,14 +83,14 @@ class Upsert extends Action ->inject('usage') ->inject('queueForEvents') ->inject('queueForRealtime') - ->inject('queueForFunctions') + ->inject('publisherForFunctions') ->inject('queueForWebhooks') ->inject('plan') ->inject('eventProcessor') ->callback($this->action(...)); } - public function action(string $databaseId, string $collectionId, array $documents, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, callable $getDatabasesDB, Context $usage, Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan, EventProcessor $eventProcessor): void + public function action(string $databaseId, string $collectionId, array $documents, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, callable $getDatabasesDB, Context $usage, Event $queueForEvents, Event $queueForRealtime, FunctionPublisher $publisherForFunctions, Event $queueForWebhooks, array $plan, EventProcessor $eventProcessor): void { $database = $dbForProject->getDocument('databases', $databaseId); if ($database->isEmpty()) { @@ -212,7 +213,7 @@ class Upsert extends Action $upserted, $queueForEvents, $queueForRealtime, - $queueForFunctions, + $publisherForFunctions, $queueForWebhooks, $dbForProject, $eventProcessor diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Create.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Create.php index 633a2bbc86..2ade0b2b79 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Create.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Create.php @@ -3,6 +3,7 @@ namespace Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documents; use Appwrite\Event\Event; +use Appwrite\Event\Publisher\Func as FunctionPublisher; use Appwrite\Extend\Exception; use Appwrite\Functions\EventProcessor; use Appwrite\SDK\AuthType; @@ -137,7 +138,7 @@ class Create extends Action ->inject('queueForEvents') ->inject('usage') ->inject('queueForRealtime') - ->inject('queueForFunctions') + ->inject('publisherForFunctions') ->inject('queueForWebhooks') ->inject('plan') ->inject('authorization') @@ -145,7 +146,7 @@ class Create extends Action ->callback($this->action(...)); } - public function action(string $databaseId, string $documentId, string $collectionId, string|array $data, ?array $permissions, ?array $documents, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, callable $getDatabasesDB, User $user, Event $queueForEvents, Context $usage, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan, Authorization $authorization, EventProcessor $eventProcessor): void + public function action(string $databaseId, string $documentId, string $collectionId, string|array $data, ?array $permissions, ?array $documents, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, callable $getDatabasesDB, User $user, Event $queueForEvents, Context $usage, Event $queueForRealtime, FunctionPublisher $publisherForFunctions, Event $queueForWebhooks, array $plan, Authorization $authorization, EventProcessor $eventProcessor): void { $data = \is_string($data) ? \json_decode($data, true) @@ -517,7 +518,7 @@ class Create extends Action $created, $queueForEvents, $queueForRealtime, - $queueForFunctions, + $publisherForFunctions, $queueForWebhooks, $dbForProject, $eventProcessor diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Transactions/Update.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Transactions/Update.php index c4d51e6c64..4f91ba3f94 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Transactions/Update.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Transactions/Update.php @@ -5,6 +5,8 @@ namespace Appwrite\Platform\Modules\Databases\Http\Databases\Transactions; use Appwrite\Databases\TransactionState; use Appwrite\Event\Delete; use Appwrite\Event\Event; +use Appwrite\Event\Message\Func as FunctionMessage; +use Appwrite\Event\Publisher\Func as FunctionPublisher; use Appwrite\Extend\Exception; use Appwrite\Functions\EventProcessor; use Appwrite\SDK\AuthType; @@ -77,7 +79,7 @@ class Update extends Action ->inject('queueForEvents') ->inject('usage') ->inject('queueForRealtime') - ->inject('queueForFunctions') + ->inject('publisherForFunctions') ->inject('queueForWebhooks') ->inject('authorization') ->inject('eventProcessor') @@ -97,7 +99,7 @@ class Update extends Action * @param Event $queueForEvents * @param Context $usage * @param Event $queueForRealtime - * @param Event $queueForFunctions + * @param FunctionPublisher $publisherForFunctions * @param Event $queueForWebhooks * @param EventProcessor $eventProcessor * @return void @@ -108,7 +110,7 @@ class Update extends Action * @throws StructureException * @throws \Utopia\Http\Exception */ - public function action(string $transactionId, bool $commit, bool $rollback, Document $project, UtopiaResponse $response, Database $dbForProject, callable $getDatabasesDB, User $user, TransactionState $transactionState, Delete $queueForDeletes, Event $queueForEvents, Context $usage, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, Authorization $authorization, EventProcessor $eventProcessor): void + public function action(string $transactionId, bool $commit, bool $rollback, Document $project, UtopiaResponse $response, Database $dbForProject, callable $getDatabasesDB, User $user, TransactionState $transactionState, Delete $queueForDeletes, Event $queueForEvents, Context $usage, Event $queueForRealtime, FunctionPublisher $publisherForFunctions, Event $queueForWebhooks, Authorization $authorization, EventProcessor $eventProcessor): void { if (!$commit && !$rollback) { throw new Exception(Exception::GENERAL_BAD_REQUEST, 'Either commit or rollback must be true'); @@ -461,7 +463,15 @@ class Update extends Action if (!empty($functionsEvents)) { foreach ($generatedEvents as $event) { if (isset($functionsEvents[$event])) { - $queueForFunctions->from($queueForEvents)->trigger(); + $publisherForFunctions->enqueue(FunctionMessage::fromEvent( + event: $queueForEvents->getEvent(), + params: $queueForEvents->getParams(), + project: $queueForEvents->getProject(), + user: $queueForEvents->getUser(), + userId: $queueForEvents->getUserId(), + payload: $queueForEvents->getPayload(), + platform: $queueForEvents->getPlatform(), + )); break; } } @@ -480,7 +490,6 @@ class Update extends Action $queueForEvents->reset(); $queueForRealtime->reset(); - $queueForFunctions->reset(); $queueForWebhooks->reset(); } } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/DocumentsDB/Collections/Documents/Bulk/Delete.php b/src/Appwrite/Platform/Modules/Databases/Http/DocumentsDB/Collections/Documents/Bulk/Delete.php index 09ad9a5741..6b2910aac4 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/DocumentsDB/Collections/Documents/Bulk/Delete.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/DocumentsDB/Collections/Documents/Bulk/Delete.php @@ -63,7 +63,7 @@ class Delete extends DocumentsDelete ->inject('usage') ->inject('queueForEvents') ->inject('queueForRealtime') - ->inject('queueForFunctions') + ->inject('publisherForFunctions') ->inject('queueForWebhooks') ->inject('plan') ->inject('eventProcessor') diff --git a/src/Appwrite/Platform/Modules/Databases/Http/DocumentsDB/Collections/Documents/Bulk/Update.php b/src/Appwrite/Platform/Modules/Databases/Http/DocumentsDB/Collections/Documents/Bulk/Update.php index c723f1bc30..f395d0b490 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/DocumentsDB/Collections/Documents/Bulk/Update.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/DocumentsDB/Collections/Documents/Bulk/Update.php @@ -65,7 +65,7 @@ class Update extends DocumentsUpdate ->inject('usage') ->inject('queueForEvents') ->inject('queueForRealtime') - ->inject('queueForFunctions') + ->inject('publisherForFunctions') ->inject('queueForWebhooks') ->inject('plan') ->inject('eventProcessor') diff --git a/src/Appwrite/Platform/Modules/Databases/Http/DocumentsDB/Collections/Documents/Bulk/Upsert.php b/src/Appwrite/Platform/Modules/Databases/Http/DocumentsDB/Collections/Documents/Bulk/Upsert.php index d5b62ec903..5acc4626af 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/DocumentsDB/Collections/Documents/Bulk/Upsert.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/DocumentsDB/Collections/Documents/Bulk/Upsert.php @@ -65,7 +65,7 @@ class Upsert extends DocumentsUpsert ->inject('usage') ->inject('queueForEvents') ->inject('queueForRealtime') - ->inject('queueForFunctions') + ->inject('publisherForFunctions') ->inject('queueForWebhooks') ->inject('plan') ->inject('eventProcessor') diff --git a/src/Appwrite/Platform/Modules/Databases/Http/DocumentsDB/Collections/Documents/Create.php b/src/Appwrite/Platform/Modules/Databases/Http/DocumentsDB/Collections/Documents/Create.php index 532ae826e2..2df96958ad 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/DocumentsDB/Collections/Documents/Create.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/DocumentsDB/Collections/Documents/Create.php @@ -112,7 +112,7 @@ class Create extends DocumentCreate ->inject('queueForEvents') ->inject('usage') ->inject('queueForRealtime') - ->inject('queueForFunctions') + ->inject('publisherForFunctions') ->inject('queueForWebhooks') ->inject('plan') ->inject('authorization') diff --git a/src/Appwrite/Platform/Modules/Databases/Http/DocumentsDB/Transactions/Update.php b/src/Appwrite/Platform/Modules/Databases/Http/DocumentsDB/Transactions/Update.php index b4c0c2ffab..97eff24508 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/DocumentsDB/Transactions/Update.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/DocumentsDB/Transactions/Update.php @@ -60,7 +60,7 @@ class Update extends TransactionsUpdate ->inject('queueForEvents') ->inject('usage') ->inject('queueForRealtime') - ->inject('queueForFunctions') + ->inject('publisherForFunctions') ->inject('queueForWebhooks') ->inject('authorization') ->inject('eventProcessor') diff --git a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Delete.php b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Delete.php index 37a3db01db..8315a8d04b 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Delete.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Delete.php @@ -65,7 +65,7 @@ class Delete extends DocumentsDelete ->inject('usage') ->inject('queueForEvents') ->inject('queueForRealtime') - ->inject('queueForFunctions') + ->inject('publisherForFunctions') ->inject('queueForWebhooks') ->inject('plan') ->inject('eventProcessor') diff --git a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Update.php b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Update.php index bb839b752e..a31ebc15e0 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Update.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Update.php @@ -67,7 +67,7 @@ class Update extends DocumentsUpdate ->inject('usage') ->inject('queueForEvents') ->inject('queueForRealtime') - ->inject('queueForFunctions') + ->inject('publisherForFunctions') ->inject('queueForWebhooks') ->inject('plan') ->inject('eventProcessor') diff --git a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Upsert.php b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Upsert.php index 364bf4a928..543de8c4bc 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Upsert.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Upsert.php @@ -67,7 +67,7 @@ class Upsert extends DocumentsUpsert ->inject('usage') ->inject('queueForEvents') ->inject('queueForRealtime') - ->inject('queueForFunctions') + ->inject('publisherForFunctions') ->inject('queueForWebhooks') ->inject('plan') ->inject('eventProcessor') diff --git a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Create.php b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Create.php index 26649accfb..ea9e3e0b03 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Create.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Create.php @@ -109,7 +109,7 @@ class Create extends DocumentCreate ->inject('queueForEvents') ->inject('usage') ->inject('queueForRealtime') - ->inject('queueForFunctions') + ->inject('publisherForFunctions') ->inject('queueForWebhooks') ->inject('plan') ->inject('authorization') diff --git a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Transactions/Update.php b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Transactions/Update.php index 872927d533..c41186b5c3 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Transactions/Update.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Transactions/Update.php @@ -61,7 +61,7 @@ class Update extends TransactionsUpdate ->inject('queueForEvents') ->inject('usage') ->inject('queueForRealtime') - ->inject('queueForFunctions') + ->inject('publisherForFunctions') ->inject('queueForWebhooks') ->inject('authorization') ->inject('eventProcessor') diff --git a/src/Appwrite/Platform/Modules/Databases/Http/VectorsDB/Collections/Documents/Bulk/Delete.php b/src/Appwrite/Platform/Modules/Databases/Http/VectorsDB/Collections/Documents/Bulk/Delete.php index a4d640b423..4c7d97aa55 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/VectorsDB/Collections/Documents/Bulk/Delete.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/VectorsDB/Collections/Documents/Bulk/Delete.php @@ -63,7 +63,7 @@ class Delete extends DocumentsDelete ->inject('usage') ->inject('queueForEvents') ->inject('queueForRealtime') - ->inject('queueForFunctions') + ->inject('publisherForFunctions') ->inject('queueForWebhooks') ->inject('plan') ->inject('eventProcessor') diff --git a/src/Appwrite/Platform/Modules/Databases/Http/VectorsDB/Collections/Documents/Bulk/Update.php b/src/Appwrite/Platform/Modules/Databases/Http/VectorsDB/Collections/Documents/Bulk/Update.php index 2784fa220a..18e441ede7 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/VectorsDB/Collections/Documents/Bulk/Update.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/VectorsDB/Collections/Documents/Bulk/Update.php @@ -65,7 +65,7 @@ class Update extends DocumentsUpdate ->inject('usage') ->inject('queueForEvents') ->inject('queueForRealtime') - ->inject('queueForFunctions') + ->inject('publisherForFunctions') ->inject('queueForWebhooks') ->inject('plan') ->inject('eventProcessor') diff --git a/src/Appwrite/Platform/Modules/Databases/Http/VectorsDB/Collections/Documents/Bulk/Upsert.php b/src/Appwrite/Platform/Modules/Databases/Http/VectorsDB/Collections/Documents/Bulk/Upsert.php index cfbf6c9158..c26e61d716 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/VectorsDB/Collections/Documents/Bulk/Upsert.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/VectorsDB/Collections/Documents/Bulk/Upsert.php @@ -65,7 +65,7 @@ class Upsert extends DocumentsUpsert ->inject('usage') ->inject('queueForEvents') ->inject('queueForRealtime') - ->inject('queueForFunctions') + ->inject('publisherForFunctions') ->inject('queueForWebhooks') ->inject('plan') ->inject('eventProcessor') diff --git a/src/Appwrite/Platform/Modules/Databases/Http/VectorsDB/Collections/Documents/Create.php b/src/Appwrite/Platform/Modules/Databases/Http/VectorsDB/Collections/Documents/Create.php index 563b5f60ef..dee8d8e85f 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/VectorsDB/Collections/Documents/Create.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/VectorsDB/Collections/Documents/Create.php @@ -106,7 +106,7 @@ class Create extends DocumentCreate ->inject('queueForEvents') ->inject('usage') ->inject('queueForRealtime') - ->inject('queueForFunctions') + ->inject('publisherForFunctions') ->inject('queueForWebhooks') ->inject('plan') ->inject('authorization') diff --git a/src/Appwrite/Platform/Modules/Databases/Http/VectorsDB/Transactions/Update.php b/src/Appwrite/Platform/Modules/Databases/Http/VectorsDB/Transactions/Update.php index f4bd4d67f5..d6399e6bc0 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/VectorsDB/Transactions/Update.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/VectorsDB/Transactions/Update.php @@ -60,7 +60,7 @@ class Update extends TransactionsUpdate ->inject('queueForEvents') ->inject('usage') ->inject('queueForRealtime') - ->inject('queueForFunctions') + ->inject('publisherForFunctions') ->inject('queueForWebhooks') ->inject('authorization') ->inject('eventProcessor') diff --git a/src/Appwrite/Platform/Modules/Functions/Http/Executions/Create.php b/src/Appwrite/Platform/Modules/Functions/Http/Executions/Create.php index 02dd76294e..f2f51a90e6 100644 --- a/src/Appwrite/Platform/Modules/Functions/Http/Executions/Create.php +++ b/src/Appwrite/Platform/Modules/Functions/Http/Executions/Create.php @@ -5,7 +5,8 @@ namespace Appwrite\Platform\Modules\Functions\Http\Executions; use Ahc\Jwt\JWT; use Appwrite\Event\Delete as DeleteEvent; use Appwrite\Event\Event; -use Appwrite\Event\Func; +use Appwrite\Event\Message\Func as FunctionMessage; +use Appwrite\Event\Publisher\Func as FunctionPublisher; use Appwrite\Extend\Exception; use Appwrite\Extend\Exception as AppwriteException; use Appwrite\Functions\Validator\Headers; @@ -95,7 +96,7 @@ class Create extends Base ->inject('user') ->inject('queueForEvents') ->inject('usage') - ->inject('queueForFunctions') + ->inject('publisherForFunctions') ->inject('geodb') ->inject('store') ->inject('proofForToken') @@ -123,7 +124,7 @@ class Create extends Base User $user, Event $queueForEvents, Context $usage, - Func $queueForFunctions, + FunctionPublisher $publisherForFunctions, Reader $geodb, Store $store, Token $proofForToken, @@ -294,20 +295,19 @@ class Create extends Base if ($async) { if (is_null($scheduledAt)) { $execution = $authorization->skip(fn () => $dbForProject->createDocument('executions', $execution)); - $queueForFunctions - ->setType('http') - ->setExecution($execution) - ->setFunction($function) - ->setBody($body) - ->setHeaders($headers) - ->setPath($path) - ->setMethod($method) - ->setJWT($jwt) - ->setProject($project) - ->setUser($user) - ->setParam('functionId', $function->getId()) - ->setParam('executionId', $execution->getId()) - ->trigger(); + $publisherForFunctions->enqueue(new FunctionMessage( + project: $project, + user: $user, + function: $function, + functionId: $function->getId(), + execution: $execution, + type: 'http', + jwt: $jwt, + body: $body, + path: $path, + headers: $headers, + method: $method, + )); } else { $data = [ 'headers' => $headers, diff --git a/src/Appwrite/Platform/Modules/Functions/Http/Functions/Create.php b/src/Appwrite/Platform/Modules/Functions/Http/Functions/Create.php index 00a91141fb..148f0945ac 100644 --- a/src/Appwrite/Platform/Modules/Functions/Http/Functions/Create.php +++ b/src/Appwrite/Platform/Modules/Functions/Http/Functions/Create.php @@ -3,9 +3,10 @@ namespace Appwrite\Platform\Modules\Functions\Http\Functions; use Appwrite\Event\Event; -use Appwrite\Event\Func; use Appwrite\Event\Message\Build as BuildMessage; +use Appwrite\Event\Message\Func as FunctionMessage; use Appwrite\Event\Publisher\Build as BuildPublisher; +use Appwrite\Event\Publisher\Func as FunctionPublisher; use Appwrite\Event\Realtime; use Appwrite\Event\Validator\FunctionEvent; use Appwrite\Event\Webhook; @@ -119,7 +120,7 @@ class Create extends Base ->inject('publisherForBuilds') ->inject('queueForRealtime') ->inject('queueForWebhooks') - ->inject('queueForFunctions') + ->inject('publisherForFunctions') ->inject('dbForPlatform') ->inject('request') ->inject('gitHub') @@ -161,7 +162,7 @@ class Create extends Base BuildPublisher $publisherForBuilds, Realtime $queueForRealtime, Webhook $queueForWebhooks, - Func $queueForFunctions, + FunctionPublisher $publisherForFunctions, Database $dbForPlatform, Request $request, GitHub $github, @@ -423,9 +424,15 @@ class Create extends Base ->trigger(); /** Trigger Functions */ - $queueForFunctions - ->from($ruleCreate) - ->trigger(); + $publisherForFunctions->enqueue(FunctionMessage::fromEvent( + event: $ruleCreate->getEvent(), + params: $ruleCreate->getParams(), + project: $ruleCreate->getProject(), + user: $ruleCreate->getUser(), + userId: $ruleCreate->getUserId(), + payload: $ruleCreate->getPayload(), + platform: $ruleCreate->getPlatform(), + )); /** Trigger Realtime Events */ $queueForRealtime diff --git a/src/Appwrite/Platform/Modules/Functions/Workers/Builds.php b/src/Appwrite/Platform/Modules/Functions/Workers/Builds.php index e120da4e3d..5aa95d3bf2 100644 --- a/src/Appwrite/Platform/Modules/Functions/Workers/Builds.php +++ b/src/Appwrite/Platform/Modules/Functions/Workers/Builds.php @@ -4,8 +4,9 @@ namespace Appwrite\Platform\Modules\Functions\Workers; use Ahc\Jwt\JWT; use Appwrite\Event\Event; -use Appwrite\Event\Func; +use Appwrite\Event\Message\Func as FunctionMessage; use Appwrite\Event\Message\Usage as UsageMessage; +use Appwrite\Event\Publisher\Func as FunctionPublisher; use Appwrite\Event\Publisher\Screenshot; use Appwrite\Event\Publisher\Usage as UsagePublisher; use Appwrite\Event\Realtime; @@ -63,7 +64,7 @@ class Builds extends Action ->inject('queueForEvents') ->inject('publisherForScreenshots') ->inject('queueForWebhooks') - ->inject('queueForFunctions') + ->inject('publisherForFunctions') ->inject('queueForRealtime') ->inject('usage') ->inject('publisherForUsage') @@ -89,7 +90,7 @@ class Builds extends Action Event $queueForEvents, Screenshot $publisherForScreenshots, Webhook $queueForWebhooks, - Func $queueForFunctions, + FunctionPublisher $publisherForFunctions, Realtime $queueForRealtime, Context $usage, UsagePublisher $publisherForUsage, @@ -131,7 +132,7 @@ class Builds extends Action $deviceForFiles, $publisherForScreenshots, $queueForWebhooks, - $queueForFunctions, + $publisherForFunctions, $queueForRealtime, $queueForEvents, $usage, @@ -167,7 +168,7 @@ class Builds extends Action Device $deviceForFiles, Screenshot $publisherForScreenshots, Webhook $queueForWebhooks, - Func $queueForFunctions, + FunctionPublisher $publisherForFunctions, Realtime $queueForRealtime, Event $queueForEvents, Context $usage, @@ -570,9 +571,15 @@ class Builds extends Action ->trigger(); /** Trigger Functions */ - $queueForFunctions - ->from($deploymentUpdate) - ->trigger(); + $publisherForFunctions->enqueue(FunctionMessage::fromEvent( + event: $deploymentUpdate->getEvent(), + params: $deploymentUpdate->getParams(), + project: $deploymentUpdate->getProject(), + user: $deploymentUpdate->getUser(), + userId: $deploymentUpdate->getUserId(), + payload: $deploymentUpdate->getPayload(), + platform: $deploymentUpdate->getPlatform(), + )); /** Trigger Realtime Event */ $queueForRealtime diff --git a/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Failed/Get.php b/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Failed/Get.php index 70d7713280..0429118e41 100644 --- a/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Failed/Get.php +++ b/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Failed/Get.php @@ -5,10 +5,10 @@ namespace Appwrite\Platform\Modules\Health\Http\Health\Queue\Failed; use Appwrite\Event\Database; use Appwrite\Event\Delete; use Appwrite\Event\Event; -use Appwrite\Event\Func; use Appwrite\Event\Publisher\Audit; use Appwrite\Event\Publisher\Build as BuildPublisher; use Appwrite\Event\Publisher\Certificate; +use Appwrite\Event\Publisher\Func as FunctionPublisher; use Appwrite\Event\Publisher\Mail as MailPublisher; use Appwrite\Event\Publisher\Messaging as MessagingPublisher; use Appwrite\Event\Publisher\Migration as MigrationPublisher; @@ -78,7 +78,7 @@ class Get extends Base ->inject('queueForDeletes') ->inject('publisherForAudits') ->inject('publisherForMails') - ->inject('queueForFunctions') + ->inject('publisherForFunctions') ->inject('publisherForStatsResources') ->inject('publisherForUsage') ->inject('queueForWebhooks') @@ -98,7 +98,7 @@ class Get extends Base Delete $queueForDeletes, Audit $publisherForAudits, MailPublisher $publisherForMails, - Func $queueForFunctions, + FunctionPublisher $publisherForFunctions, StatsResourcesPublisher $publisherForStatsResources, UsagePublisher $publisherForUsage, Webhook $queueForWebhooks, @@ -115,7 +115,7 @@ class Get extends Base System::getEnv('_APP_DELETE_QUEUE_NAME', Event::DELETE_QUEUE_NAME) => $queueForDeletes, System::getEnv('_APP_AUDITS_QUEUE_NAME', Event::AUDITS_QUEUE_NAME) => $publisherForAudits, System::getEnv('_APP_MAILS_QUEUE_NAME', Event::MAILS_QUEUE_NAME) => $publisherForMails, - System::getEnv('_APP_FUNCTIONS_QUEUE_NAME', Event::FUNCTIONS_QUEUE_NAME) => $queueForFunctions, + System::getEnv('_APP_FUNCTIONS_QUEUE_NAME', Event::FUNCTIONS_QUEUE_NAME) => $publisherForFunctions, System::getEnv('_APP_STATS_RESOURCES_QUEUE_NAME', Event::STATS_RESOURCES_QUEUE_NAME) => $publisherForStatsResources, System::getEnv('_APP_STATS_USAGE_QUEUE_NAME', Event::STATS_USAGE_QUEUE_NAME) => $publisherForUsage, System::getEnv('_APP_WEBHOOK_QUEUE_NAME', Event::WEBHOOK_QUEUE_NAME) => $queueForWebhooks, diff --git a/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Functions/Get.php b/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Functions/Get.php index 1d10b8d1a0..29c7a7c859 100644 --- a/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Functions/Get.php +++ b/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Functions/Get.php @@ -2,7 +2,7 @@ namespace Appwrite\Platform\Modules\Health\Http\Health\Queue\Functions; -use Appwrite\Event\Func; +use Appwrite\Event\Publisher\Func as FunctionPublisher; use Appwrite\Platform\Modules\Health\Http\Health\Queue\Base; use Appwrite\SDK\AuthType; use Appwrite\SDK\ContentType; @@ -42,16 +42,16 @@ class Get extends Base contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('queueForFunctions') + ->inject('publisherForFunctions') ->inject('response') ->callback($this->action(...)); } - public function action(int|string $threshold, Func $queueForFunctions, Response $response): void + public function action(int|string $threshold, FunctionPublisher $publisherForFunctions, Response $response): void { $threshold = (int) $threshold; - $size = $queueForFunctions->getSize(); + $size = $publisherForFunctions->getSize(); $this->assertQueueThreshold($size, $threshold); diff --git a/src/Appwrite/Platform/Tasks/ScheduleExecutions.php b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php index cd7873bab6..49dd851b6d 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleExecutions.php +++ b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php @@ -2,7 +2,8 @@ namespace Appwrite\Platform\Tasks; -use Appwrite\Event\Func; +use Appwrite\Event\Message\Func as FunctionMessage; +use Appwrite\Event\Publisher\Func as FunctionPublisher; use Swoole\Coroutine as Co; use Utopia\Database\Database; @@ -36,7 +37,10 @@ class ScheduleExecutions extends ScheduleBase { $intervalEnd = (new \DateTime())->modify('+' . self::ENQUEUE_TIMER . ' seconds'); - $queueForFunctions = new Func($this->publisherFunctions); + $publisherForFunctions = new FunctionPublisher( + $this->publisherFunctions, + new \Utopia\Queue\Queue(\Utopia\System\System::getEnv('_APP_FUNCTIONS_QUEUE_NAME', \Appwrite\Event\Event::FUNCTIONS_QUEUE_NAME), 'utopia-queue', \Appwrite\Event\Event::FUNCTIONS_QUEUE_TTL) + ); foreach ($this->schedules as $schedule) { if (!$schedule['active']) { @@ -63,23 +67,22 @@ class ScheduleExecutions extends ScheduleBase $this->updateProjectAccess($schedule['project'], $dbForPlatform); - \go(function () use ($queueForFunctions, $schedule, $scheduledAt, $delay, $data, $dbForPlatform) { + \go(function () use ($publisherForFunctions, $schedule, $scheduledAt, $delay, $data, $dbForPlatform) { if ($delay > 0) { Co::sleep($delay); } - $queueForFunctions->setType('schedule') - // Set functionId instead of function as we don't have $dbForProject - // TODO: Refactor to use function instead of functionId - ->setFunctionId($schedule['resource']['resourceId']) - ->setExecution($schedule['resource']) - ->setMethod($data['method'] ?? 'POST') - ->setPath($data['path'] ?? '/') - ->setHeaders($data['headers'] ?? []) - ->setBody($data['body'] ?? '') - ->setProject($schedule['project']) - ->setUserId($data['userId'] ?? '') - ->trigger(); + $publisherForFunctions->enqueue(new FunctionMessage( + project: $schedule['project'], + userId: $data['userId'] ?? '', + functionId: $schedule['resource']['resourceId'], + execution: $schedule['resource'], + type: 'schedule', + body: $data['body'] ?? '', + path: $data['path'] ?? '/', + headers: $data['headers'] ?? [], + method: $data['method'] ?? 'POST', + )); $dbForPlatform->deleteDocument( 'schedules', diff --git a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php index 6dd9cd0351..c1a1891386 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php +++ b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php @@ -2,7 +2,8 @@ namespace Appwrite\Platform\Tasks; -use Appwrite\Event\Func; +use Appwrite\Event\Message\Func as FunctionMessage; +use Appwrite\Event\Publisher\Func as FunctionPublisher; use Cron\CronExpression; use Utopia\Console; use Utopia\Database\Database; @@ -97,14 +98,10 @@ class ScheduleFunctions extends ScheduleBase $this->updateProjectAccess($schedule['project'], $dbForPlatform); - $queueForFunctions = new Func($this->publisherFunctions); - - $queueForFunctions - ->setType('schedule') - ->setFunction($schedule['resource']) - ->setMethod('POST') - ->setPath('/') - ->setProject($schedule['project']); + $publisherForFunctions = new FunctionPublisher( + $this->publisherFunctions, + new \Utopia\Queue\Queue(\Utopia\System\System::getEnv('_APP_FUNCTIONS_QUEUE_NAME', \Appwrite\Event\Event::FUNCTIONS_QUEUE_NAME), 'utopia-queue', \Appwrite\Event\Event::FUNCTIONS_QUEUE_TTL) + ); Span::init('schedule.functions.enqueue'); try { @@ -112,7 +109,13 @@ class ScheduleFunctions extends ScheduleBase Span::add('function.id', $schedule['resource']->getId()); Span::add('schedule.id', $schedule['$id'] ?? ''); - $queueForFunctions->trigger(); + $publisherForFunctions->enqueue(new FunctionMessage( + project: $schedule['project'], + function: $schedule['resource'], + type: 'schedule', + method: 'POST', + path: '/', + )); $this->recordEnqueueDelay($delayConfig['nextDate']); } finally { diff --git a/src/Appwrite/Platform/Workers/Certificates.php b/src/Appwrite/Platform/Workers/Certificates.php index af3d145f85..4a31216599 100644 --- a/src/Appwrite/Platform/Workers/Certificates.php +++ b/src/Appwrite/Platform/Workers/Certificates.php @@ -4,9 +4,10 @@ namespace Appwrite\Platform\Workers; use Appwrite\Certificates\Adapter as CertificatesAdapter; use Appwrite\Event\Event; -use Appwrite\Event\Func; +use Appwrite\Event\Message\Func as FunctionMessage; use Appwrite\Event\Message\Mail as MailMessage; use Appwrite\Event\Publisher\Certificate; +use Appwrite\Event\Publisher\Func as FunctionPublisher; use Appwrite\Event\Publisher\Mail as MailPublisher; use Appwrite\Event\Realtime; use Appwrite\Event\Webhook; @@ -54,7 +55,7 @@ class Certificates extends Action ->inject('publisherForMails') ->inject('queueForEvents') ->inject('queueForWebhooks') - ->inject('queueForFunctions') + ->inject('publisherForFunctions') ->inject('queueForRealtime') ->inject('publisherForCertificates') ->inject('log') @@ -70,7 +71,7 @@ class Certificates extends Action * @param MailPublisher $publisherForMails * @param Event $queueForEvents * @param Webhook $queueForWebhooks - * @param Func $queueForFunctions + * @param FunctionPublisher $publisherForFunctions * @param Realtime $queueForRealtime * @param Certificate $publisherForCertificates * @param Log $log @@ -87,7 +88,7 @@ class Certificates extends Action MailPublisher $publisherForMails, Event $queueForEvents, Webhook $queueForWebhooks, - Func $queueForFunctions, + FunctionPublisher $publisherForFunctions, Realtime $queueForRealtime, Certificate $publisherForCertificates, Log $log, @@ -113,11 +114,11 @@ class Certificates extends Action switch ($action) { case \Appwrite\Event\Certificate::ACTION_DOMAIN_VERIFICATION: - $this->handleDomainVerificationAction($domain, $dbForPlatform, $queueForEvents, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $publisherForCertificates, $log, $authorization, $validationDomain); + $this->handleDomainVerificationAction($domain, $dbForPlatform, $queueForEvents, $queueForWebhooks, $publisherForFunctions, $queueForRealtime, $publisherForCertificates, $log, $authorization, $validationDomain); break; case \Appwrite\Event\Certificate::ACTION_GENERATION: - $this->handleCertificateGenerationAction($domain, $domainType, $dbForPlatform, $publisherForMails, $queueForEvents, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $log, $certificates, $authorization, $skipRenewCheck, $plan, $validationDomain); + $this->handleCertificateGenerationAction($domain, $domainType, $dbForPlatform, $publisherForMails, $queueForEvents, $queueForWebhooks, $publisherForFunctions, $queueForRealtime, $log, $certificates, $authorization, $skipRenewCheck, $plan, $validationDomain); break; default: @@ -130,7 +131,7 @@ class Certificates extends Action * @param Database $dbForPlatform * @param Event $queueForEvents * @param Webhook $queueForWebhooks - * @param Func $queueForFunctions + * @param FunctionPublisher $publisherForFunctions * @param Realtime $queueForRealtime * @param Certificate $publisherForCertificates * @param Log $log @@ -146,7 +147,7 @@ class Certificates extends Action Database $dbForPlatform, Event $queueForEvents, Webhook $queueForWebhooks, - Func $queueForFunctions, + FunctionPublisher $publisherForFunctions, Realtime $queueForRealtime, Certificate $publisherForCertificates, Log $log, @@ -185,7 +186,7 @@ class Certificates extends Action $rule->setAttribute('logs', $logs); } finally { // Update rule and emit events - $this->updateRuleAndSendEvents($rule, $dbForPlatform, $queueForEvents, $queueForWebhooks, $queueForFunctions, $queueForRealtime); + $this->updateRuleAndSendEvents($rule, $dbForPlatform, $queueForEvents, $queueForWebhooks, $publisherForFunctions, $queueForRealtime); } // Issue a TLS certificate when domain is verified @@ -213,7 +214,7 @@ class Certificates extends Action * @param MailPublisher $publisherForMails * @param Event $queueForEvents * @param Webhook $queueForWebhooks - * @param Func $queueForFunctions + * @param FunctionPublisher $publisherForFunctions * @param Realtime $queueForRealtime * @param Log $log * @param CertificatesAdapter $certificates @@ -237,7 +238,7 @@ class Certificates extends Action MailPublisher $publisherForMails, Event $queueForEvents, Webhook $queueForWebhooks, - Func $queueForFunctions, + FunctionPublisher $publisherForFunctions, Realtime $queueForRealtime, Log $log, CertificatesAdapter $certificates, @@ -370,7 +371,7 @@ class Certificates extends Action // Update rule and emit events $rule->setAttribute('certificateId', $certificate->getId()); $rule->setAttribute('logs', $logs); - $this->updateRuleAndSendEvents($rule, $dbForPlatform, $queueForEvents, $queueForWebhooks, $queueForFunctions, $queueForRealtime); + $this->updateRuleAndSendEvents($rule, $dbForPlatform, $queueForEvents, $queueForWebhooks, $publisherForFunctions, $queueForRealtime); } } @@ -416,7 +417,7 @@ class Certificates extends Action * @param Database $dbForPlatform Database connection for console * @param Event $queueForEvents Event publisher for events * @param Webhook $queueForWebhooks Webhook publisher for webhooks - * @param Func $queueForFunctions Function publisher for functions + * @param FunctionPublisher $publisherForFunctions Function publisher for functions * @param Realtime $queueForRealtime Realtime publisher for realtime events * * @return void @@ -426,7 +427,7 @@ class Certificates extends Action Database $dbForPlatform, Event $queueForEvents, Webhook $queueForWebhooks, - Func $queueForFunctions, + FunctionPublisher $publisherForFunctions, Realtime $queueForRealtime ): void { $rule = $dbForPlatform->updateDocument('rules', $rule->getId(), new Document([ @@ -459,9 +460,15 @@ class Certificates extends Action ->trigger(); /** Trigger Functions */ - $queueForFunctions - ->from($queueForEvents) - ->trigger(); + $publisherForFunctions->enqueue(FunctionMessage::fromEvent( + event: $queueForEvents->getEvent(), + params: $queueForEvents->getParams(), + project: $queueForEvents->getProject(), + user: $queueForEvents->getUser(), + userId: $queueForEvents->getUserId(), + payload: $queueForEvents->getPayload(), + platform: $queueForEvents->getPlatform(), + )); /** Trigger Realtime Events */ $queueForRealtime diff --git a/src/Appwrite/Platform/Workers/Functions.php b/src/Appwrite/Platform/Workers/Functions.php index 93b500b98b..73c1db9444 100644 --- a/src/Appwrite/Platform/Workers/Functions.php +++ b/src/Appwrite/Platform/Workers/Functions.php @@ -5,7 +5,8 @@ namespace Appwrite\Platform\Workers; use Ahc\Jwt\JWT; use Appwrite\Bus\Events\ExecutionCompleted; use Appwrite\Event\Event; -use Appwrite\Event\Func; +use Appwrite\Event\Message\Func as FunctionMessage; +use Appwrite\Event\Publisher\Func as FunctionPublisher; use Appwrite\Event\Realtime; use Appwrite\Event\Webhook; use Appwrite\Extend\Exception as AppwriteException; @@ -46,7 +47,7 @@ class Functions extends Action ->inject('message') ->inject('dbForProject') ->inject('queueForWebhooks') - ->inject('queueForFunctions') + ->inject('publisherForFunctions') ->inject('queueForRealtime') ->inject('queueForEvents') ->inject('bus') @@ -61,7 +62,7 @@ class Functions extends Action Message $message, Database $dbForProject, Webhook $queueForWebhooks, - Func $queueForFunctions, + FunctionPublisher $publisherForFunctions, Realtime $queueForRealtime, Event $queueForEvents, Bus $bus, @@ -78,7 +79,8 @@ class Functions extends Action ); } - $type = $payload['type'] ?? ''; + $functionMessage = FunctionMessage::fromArray($payload); + $type = $functionMessage->type; Span::add('project.id', $project->getId()); Span::add('payload.type', $type); @@ -86,18 +88,18 @@ class Functions extends Action Span::add('queue.name', $message->getQueue()); Span::add('message.timestamp', (string) $message->getTimestamp()); - $events = $payload['events'] ?? []; - $data = $payload['body'] ?? ''; - $eventData = $payload['payload'] ?? ''; - $platform = $payload['platform'] ?? Config::getParam('platform', []); - $function = new Document($payload['function'] ?? []); - $functionId = $payload['functionId'] ?? ''; - $user = new Document($payload['user'] ?? []); - $userId = $payload['userId'] ?? ''; - $method = $payload['method'] ?? 'POST'; - $headers = $payload['headers'] ?? []; - $path = $payload['path'] ?? '/'; - $jwt = $payload['jwt'] ?? ''; + $events = $functionMessage->events; + $data = $functionMessage->body; + $eventData = $functionMessage->payload; + $platform = !empty($functionMessage->platform) ? $functionMessage->platform : Config::getParam('platform', []); + $function = $functionMessage->function ?? new Document(); + $functionId = $functionMessage->functionId ?? ''; + $user = $functionMessage->user ?? new Document(); + $userId = $functionMessage->userId ?? ''; + $method = $functionMessage->method ?: 'POST'; + $headers = $functionMessage->headers; + $path = $functionMessage->path ?: '/'; + $jwt = $functionMessage->jwt; if ($user->isEmpty() && !empty($userId)) { $user = $dbForProject->getDocument('users', $userId); @@ -166,7 +168,7 @@ class Functions extends Action log: $log, dbForProject: $dbForProject, queueForWebhooks: $queueForWebhooks, - queueForFunctions: $queueForFunctions, + publisherForFunctions: $publisherForFunctions, queueForRealtime: $queueForRealtime, queueForEvents: $queueForEvents, bus: $bus, @@ -185,7 +187,7 @@ class Functions extends Action user: $user, jwt: null, event: $events[0], - eventData: \is_string($eventData) ? $eventData : \json_encode($eventData), + eventData: \json_encode($eventData) ?: null, executionId: null, ); Console::success('Triggered function: ' . $events[0]); @@ -210,7 +212,7 @@ class Functions extends Action log: $log, dbForProject: $dbForProject, queueForWebhooks: $queueForWebhooks, - queueForFunctions: $queueForFunctions, + publisherForFunctions: $publisherForFunctions, queueForRealtime: $queueForRealtime, queueForEvents: $queueForEvents, bus: $bus, @@ -236,7 +238,7 @@ class Functions extends Action log: $log, dbForProject: $dbForProject, queueForWebhooks: $queueForWebhooks, - queueForFunctions: $queueForFunctions, + publisherForFunctions: $publisherForFunctions, queueForRealtime: $queueForRealtime, queueForEvents: $queueForEvents, bus: $bus, @@ -331,7 +333,7 @@ class Functions extends Action /** * @param Log $log * @param Database $dbForProject - * @param Func $queueForFunctions + * @param FunctionPublisher $publisherForFunctions * @param Realtime $queueForRealtime * @param Event $queueForEvents * @param Document $project @@ -353,7 +355,7 @@ class Functions extends Action Log $log, Database $dbForProject, Webhook $queueForWebhooks, - Func $queueForFunctions, + FunctionPublisher $publisherForFunctions, Realtime $queueForRealtime, Event $queueForEvents, Bus $bus, @@ -649,9 +651,15 @@ class Functions extends Action ->trigger(); /** Trigger Functions */ - $queueForFunctions - ->from($queueForEvents) - ->trigger(); + $publisherForFunctions->enqueue(FunctionMessage::fromEvent( + event: $queueForEvents->getEvent(), + params: $queueForEvents->getParams(), + project: $queueForEvents->getProject(), + user: $queueForEvents->getUser(), + userId: $queueForEvents->getUserId(), + payload: $queueForEvents->getPayload(), + platform: $queueForEvents->getPlatform(), + )); /** Trigger Realtime Events */ $queueForRealtime