(refactor): Migrate queueForFunctions to FunctionPublisher and FunctionMessage

This commit is contained in:
Chirag Aggarwal
2026-05-13 17:35:35 +05:30
parent f6d2314e11
commit 85e2cf7d8b
37 changed files with 369 additions and 174 deletions
+5 -4
View File
@@ -4,8 +4,8 @@ require_once __DIR__ . '/init.php';
use Appwrite\Event\Delete;
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Event\Publisher\Certificate as CertificatePublisher;
use Appwrite\Event\Publisher\Func as FunctionPublisher;
use Appwrite\Event\Publisher\StatsResources as StatsResourcesPublisher;
use Appwrite\Event\Publisher\Usage as UsagePublisher;
use Appwrite\Platform\Appwrite;
@@ -281,9 +281,10 @@ $container->set('publisherForStatsResources', fn (Publisher $publisher) => new S
$publisher,
new Queue(System::getEnv('_APP_STATS_RESOURCES_QUEUE_NAME', Event::STATS_RESOURCES_QUEUE_NAME))
), ['publisher']);
$container->set('queueForFunctions', function (Publisher $publisher) {
return new Func($publisher);
}, ['publisher']);
$container->set('publisherForFunctions', fn (Publisher $publisher) => new FunctionPublisher(
$publisher,
new Queue(System::getEnv('_APP_FUNCTIONS_QUEUE_NAME', Event::FUNCTIONS_QUEUE_NAME), 'utopia-queue', Event::FUNCTIONS_QUEUE_TTL)
), ['publisher']);
$container->set('queueForDeletes', function (Publisher $publisher) {
return new Delete($publisher);
}, ['publisher']);
+15 -12
View File
@@ -7,10 +7,11 @@ use Appwrite\Event\Context\Audit as AuditContext;
use Appwrite\Event\Database as EventDatabase;
use Appwrite\Event\Delete;
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Event\Message\Audit as AuditMessage;
use Appwrite\Event\Message\Func as FunctionMessage;
use Appwrite\Event\Message\Usage as UsageMessage;
use Appwrite\Event\Publisher\Audit;
use Appwrite\Event\Publisher\Func as FunctionPublisher;
use Appwrite\Event\Publisher\Usage as UsagePublisher;
use Appwrite\Event\Realtime;
use Appwrite\Event\Webhook;
@@ -488,7 +489,7 @@ Http::init()
->inject('queueForDeletes')
->inject('queueForDatabase')
->inject('usage')
->inject('queueForFunctions')
->inject('publisherForFunctions')
->inject('dbForProject')
->inject('timelimit')
->inject('resourceToken')
@@ -500,7 +501,7 @@ Http::init()
->inject('platform')
->inject('authorization')
->inject('cacheControlForStorage')
->action(function (Http $utopia, Request $request, Response $response, Document $project, User $user, Event $queueForEvents, AuditContext $auditContext, Delete $queueForDeletes, EventDatabase $queueForDatabase, Context $usage, Func $queueForFunctions, Database $dbForProject, callable $timelimit, Document $resourceToken, string $mode, ?Key $apiKey, array $plan, Document $devKey, Telemetry $telemetry, array $platform, Authorization $authorization, callable $cacheControlForStorage) {
->action(function (Http $utopia, Request $request, Response $response, Document $project, User $user, Event $queueForEvents, AuditContext $auditContext, Delete $queueForDeletes, EventDatabase $queueForDatabase, Context $usage, FunctionPublisher $publisherForFunctions, Database $dbForProject, callable $timelimit, Document $resourceToken, string $mode, ?Key $apiKey, array $plan, Document $devKey, Telemetry $telemetry, array $platform, Authorization $authorization, callable $cacheControlForStorage) {
$response->setUser($user);
$request->setUser($user);
@@ -613,10 +614,6 @@ Http::init()
/* Auto-set projects */
$queueForDeletes->setProject($project);
$queueForDatabase->setProject($project);
$queueForFunctions->setProject($project);
/* Auto-set platforms */
$queueForFunctions->setPlatform($platform);
$useCache = $route->getLabel('cache', false);
$storageCacheOperationsCounter = $telemetry->createCounter('storage.cache.operations.load');
@@ -808,7 +805,7 @@ Http::shutdown()
->inject('publisherForUsage')
->inject('queueForDeletes')
->inject('queueForDatabase')
->inject('queueForFunctions')
->inject('publisherForFunctions')
->inject('queueForWebhooks')
->inject('queueForRealtime')
->inject('dbForProject')
@@ -818,7 +815,7 @@ Http::shutdown()
->inject('bus')
->inject('apiKey')
->inject('mode')
->action(function (Http $utopia, Request $request, Response $response, Document $project, User $user, Event $queueForEvents, AuditContext $auditContext, Audit $publisherForAudits, Context $usage, UsagePublisher $publisherForUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Func $queueForFunctions, Event $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject, Authorization $authorization, callable $timelimit, EventProcessor $eventProcessor, Bus $bus, ?Key $apiKey, string $mode) use ($parseLabel) {
->action(function (Http $utopia, Request $request, Response $response, Document $project, User $user, Event $queueForEvents, AuditContext $auditContext, Audit $publisherForAudits, Context $usage, UsagePublisher $publisherForUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, FunctionPublisher $publisherForFunctions, Event $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject, Authorization $authorization, callable $timelimit, EventProcessor $eventProcessor, Bus $bus, ?Key $apiKey, string $mode) use ($parseLabel) {
$responsePayload = $response->getPayload();
@@ -847,9 +844,15 @@ Http::shutdown()
if (! empty($functionsEvents)) {
foreach ($generatedEvents as $event) {
if (isset($functionsEvents[$event])) {
$queueForFunctions
->from($queueForEvents)
->trigger();
$publisherForFunctions->enqueue(FunctionMessage::fromEvent(
event: $queueForEvents->getEvent(),
params: $queueForEvents->getParams(),
project: $queueForEvents->getProject(),
user: $queueForEvents->getUser(),
userId: $queueForEvents->getUserId(),
payload: $queueForEvents->getPayload(),
platform: $queueForEvents->getPlatform(),
));
break;
}
}
+5
View File
@@ -5,6 +5,7 @@ use Appwrite\Event\Publisher\Audit as AuditPublisher;
use Appwrite\Event\Publisher\Build as BuildPublisher;
use Appwrite\Event\Publisher\Certificate as CertificatePublisher;
use Appwrite\Event\Publisher\Execution as ExecutionPublisher;
use Appwrite\Event\Publisher\Func as FunctionPublisher;
use Appwrite\Event\Publisher\Mail as MailPublisher;
use Appwrite\Event\Publisher\Messaging as MessagingPublisher;
use Appwrite\Event\Publisher\Migration as MigrationPublisher;
@@ -108,6 +109,10 @@ $container->set('publisherForExecutions', fn (Publisher $publisher) => new Execu
$publisher,
new Queue(System::getEnv('_APP_EXECUTIONS_QUEUE_NAME', Event::EXECUTIONS_QUEUE_NAME))
), ['publisher']);
$container->set('publisherForFunctions', fn (Publisher $publisher) => new FunctionPublisher(
$publisher,
new Queue(System::getEnv('_APP_FUNCTIONS_QUEUE_NAME', Event::FUNCTIONS_QUEUE_NAME), 'utopia-queue', Event::FUNCTIONS_QUEUE_TTL)
), ['publisher']);
$container->set('publisherForMigrations', fn (Publisher $publisher) => new MigrationPublisher(
$publisher,
new Queue(System::getEnv('_APP_MIGRATIONS_QUEUE_NAME', Event::MIGRATIONS_QUEUE_NAME))
+20 -10
View File
@@ -8,7 +8,8 @@ use Appwrite\Event\Context\Audit as AuditContext;
use Appwrite\Event\Database as EventDatabase;
use Appwrite\Event\Delete;
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Event\Message\Func as FunctionMessage;
use Appwrite\Event\Publisher\Func as FunctionPublisher;
use Appwrite\Event\Realtime;
use Appwrite\Event\Webhook;
use Appwrite\Extend\Exception;
@@ -48,6 +49,7 @@ use Utopia\Locale\Locale;
use Utopia\Logger\Log;
use Utopia\Pools\Group;
use Utopia\Queue\Publisher;
use Utopia\Queue\Queue;
use Utopia\Storage\Device;
use Utopia\System\System;
use Utopia\Telemetry\Adapter as Telemetry;
@@ -113,7 +115,10 @@ return function (Container $context): void {
$context->set('queueForRealtime', fn () => new Realtime(), []);
$context->set('usage', fn () => new UsageContext(), []);
$context->set('auditContext', fn () => new AuditContext(), []);
$context->set('queueForFunctions', fn (Publisher $publisher) => new Func($publisher), ['publisher']);
$context->set('publisherForFunctions', fn (Publisher $publisher) => new FunctionPublisher(
$publisher,
new Queue(System::getEnv('_APP_FUNCTIONS_QUEUE_NAME', Event::FUNCTIONS_QUEUE_NAME), 'utopia-queue', Event::FUNCTIONS_QUEUE_TTL)
), ['publisher']);
$context->set('eventProcessor', fn () => new EventProcessor(), []);
$context->set('dbForPlatform', function (Group $pools, Cache $cache, Authorization $authorization) {
$adapter = new DatabasePool($pools->get('console'));
@@ -635,7 +640,7 @@ return function (Container $context): void {
return;
}, ['user', 'store', 'proofForToken']);
$context->set('dbForProject', function (Group $pools, Database $dbForPlatform, Cache $cache, Document $project, Response $response, Publisher $publisher, Publisher $publisherFunctions, Publisher $publisherWebhooks, Event $queueForEvents, Func $queueForFunctions, Webhook $queueForWebhooks, Realtime $queueForRealtime, UsageContext $usage, Authorization $authorization, Request $request) {
$context->set('dbForProject', function (Group $pools, Database $dbForPlatform, Cache $cache, Document $project, Response $response, Publisher $publisher, Publisher $publisherFunctions, Publisher $publisherWebhooks, Event $queueForEvents, FunctionPublisher $publisherForFunctions, Webhook $queueForWebhooks, Realtime $queueForRealtime, UsageContext $usage, Authorization $authorization, Request $request) {
if ($project->isEmpty() || $project->getId() === 'console') {
return $dbForPlatform;
}
@@ -691,7 +696,7 @@ return function (Container $context): void {
* Accounts can be created in many ways beyond `createAccount`
* (anonymous, OAuth, phone, etc.), and those flows are probably not covered in event tests; so we handle this here.
*/
$eventDatabaseListener = function (Document $project, Document $document, Response $response, Event $queueForEvents, Func $queueForFunctions, Webhook $queueForWebhooks, Realtime $queueForRealtime) {
$eventDatabaseListener = function (Document $project, Document $document, Response $response, Event $queueForEvents, FunctionPublisher $publisherForFunctions, Webhook $queueForWebhooks, Realtime $queueForRealtime) {
// Only trigger events for user creation with the database listener.
if ($document->getCollection() !== 'users') {
return;
@@ -703,9 +708,15 @@ return function (Container $context): void {
->setPayload($response->output($document, Response::MODEL_USER));
// Trigger functions, webhooks, and realtime events
$queueForFunctions
->from($queueForEvents)
->trigger();
$publisherForFunctions->enqueue(FunctionMessage::fromEvent(
event: $queueForEvents->getEvent(),
params: $queueForEvents->getParams(),
project: $queueForEvents->getProject(),
user: $queueForEvents->getUser(),
userId: $queueForEvents->getUserId(),
payload: $queueForEvents->getPayload(),
platform: $queueForEvents->getPlatform(),
));
/** Trigger webhooks events only if a project has them enabled */
if (! empty($project->getAttribute('webhooks'))) {
@@ -885,7 +896,6 @@ return function (Container $context): void {
// Clone the queues, to prevent events triggered by the database listener
// from overwriting the events that are supposed to be triggered in the shutdown hook.
$queueForEventsClone = new Event($publisher);
$queueForFunctions = new Func($publisherFunctions);
$queueForWebhooks = new Webhook($publisherWebhooks);
$queueForRealtime = new Realtime();
@@ -900,7 +910,7 @@ return function (Container $context): void {
$document,
$response,
$queueForEventsClone->from($queueForEvents),
$queueForFunctions->from($queueForEvents),
$publisherForFunctions,
$queueForWebhooks->from($queueForEvents),
$queueForRealtime->from($queueForEvents)
))
@@ -909,7 +919,7 @@ return function (Container $context): void {
->on(Database::EVENT_DOCUMENT_DELETE, 'purge-function-events-cache', fn ($event, $document) => $functionsEventsCacheListener($event, $document, $project, $database));
return $database;
}, ['pools', 'dbForPlatform', 'cache', 'project', 'response', 'publisher', 'publisherFunctions', 'publisherWebhooks', 'queueForEvents', 'queueForFunctions', 'queueForWebhooks', 'queueForRealtime', 'usage', 'authorization', 'request']);
}, ['pools', 'dbForPlatform', 'cache', 'project', 'response', 'publisher', 'publisherFunctions', 'publisherWebhooks', 'queueForEvents', 'publisherForFunctions', 'queueForWebhooks', 'queueForRealtime', 'usage', 'authorization', 'request']);
$context->set('schema', function ($utopia, $dbForProject, $authorization) {
+6 -4
View File
@@ -3,7 +3,7 @@
use Appwrite\Event\Database as EventDatabase;
use Appwrite\Event\Delete;
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Event\Publisher\Func as FunctionPublisher;
use Appwrite\Event\Realtime;
use Appwrite\Event\Webhook;
use Appwrite\Usage\Context;
@@ -23,6 +23,7 @@ use Utopia\DSN\DSN;
use Utopia\Logger\Log;
use Utopia\Pools\Group;
use Utopia\Queue\Publisher;
use Utopia\Queue\Queue;
use Utopia\Registry\Registry;
use Utopia\Storage\Device\Telemetry as TelemetryDevice;
use Utopia\System\System;
@@ -343,9 +344,10 @@ return function (Container $container): void {
return new Webhook($publisher);
}, ['publisher']);
$container->set('queueForFunctions', function (Publisher $publisher) {
return new Func($publisher);
}, ['publisher']);
$container->set('publisherForFunctions', fn (Publisher $publisher) => new FunctionPublisher(
$publisher,
new Queue(System::getEnv('_APP_FUNCTIONS_QUEUE_NAME', Event::FUNCTIONS_QUEUE_NAME), 'utopia-queue', Event::FUNCTIONS_QUEUE_TTL)
), ['publisher']);
$container->set('queueForRealtime', function () {
return new Realtime();
+92
View File
@@ -0,0 +1,92 @@
<?php
namespace Appwrite\Event\Message;
use Appwrite\Event\Event;
use Utopia\Config\Config;
use Utopia\Database\Document;
final class Func extends Base
{
public function __construct(
public readonly ?Document $project = null,
public readonly ?Document $user = null,
public readonly ?string $userId = null,
public readonly ?Document $function = null,
public readonly ?string $functionId = null,
public readonly ?Document $execution = null,
public readonly string $type = '',
public readonly string $jwt = '',
public readonly array $payload = [],
public readonly array $events = [],
public readonly string $body = '',
public readonly string $path = '',
public readonly array $headers = [],
public readonly string $method = '',
public readonly array $platform = [],
) {
}
public static function fromEvent(
string $event,
array $params,
?Document $project = null,
?Document $user = null,
?string $userId = null,
array $payload = [],
array $platform = [],
): static {
return new self(
project: $project,
user: $user,
userId: $userId,
payload: $payload,
events: $event !== '' ? Event::generateEvents($event, $params) : [],
platform: $platform,
);
}
public function toArray(): array
{
$platform = !empty($this->platform) ? $this->platform : Config::getParam('platform', []);
return [
'project' => $this->project?->getArrayCopy(),
'user' => $this->user?->getArrayCopy(),
'userId' => $this->userId,
'function' => $this->function?->getArrayCopy(),
'functionId' => $this->functionId,
'execution' => $this->execution?->getArrayCopy(),
'type' => $this->type,
'jwt' => $this->jwt,
'payload' => $this->payload,
'events' => $this->events,
'body' => $this->body,
'path' => $this->path,
'headers' => $this->headers,
'method' => $this->method,
'platform' => $platform,
];
}
public static function fromArray(array $data): static
{
return new self(
project: !empty($data['project']) ? new Document($data['project']) : null,
user: !empty($data['user']) ? new Document($data['user']) : null,
userId: $data['userId'] ?? null,
function: !empty($data['function']) ? new Document($data['function']) : null,
functionId: $data['functionId'] ?? null,
execution: !empty($data['execution']) ? new Document($data['execution']) : null,
type: $data['type'] ?? '',
jwt: $data['jwt'] ?? '',
payload: $data['payload'] ?? [],
events: $data['events'] ?? [],
body: $data['body'] ?? '',
path: $data['path'] ?? '',
headers: $data['headers'] ?? [],
method: $data['method'] ?? '',
platform: $data['platform'] ?? [],
);
}
}
+27
View File
@@ -0,0 +1,27 @@
<?php
namespace Appwrite\Event\Publisher;
use Appwrite\Event\Message\Func as FunctionMessage;
use Utopia\Queue\Publisher;
use Utopia\Queue\Queue;
readonly class Func extends Base
{
public function __construct(
Publisher $publisher,
protected Queue $queue,
) {
parent::__construct($publisher);
}
public function enqueue(FunctionMessage $message, ?Queue $queue = null): string|bool
{
return $this->publish($queue ?? $this->queue, $message);
}
public function getSize(bool $failed = false, ?Queue $queue = null): int
{
return $this->getQueueSize($queue ?? $this->queue, $failed);
}
}
@@ -3,6 +3,8 @@
namespace Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documents;
use Appwrite\Event\Event;
use Appwrite\Event\Message\Func as FunctionMessage;
use Appwrite\Event\Publisher\Func as FunctionPublisher;
use Appwrite\Extend\Exception;
use Appwrite\Functions\EventProcessor;
use Appwrite\Platform\Modules\Databases\Http\Databases\Action as DatabasesAction;
@@ -421,7 +423,7 @@ abstract class Action extends DatabasesAction
* @param Document[] $documents
* @param Event $queueForEvents
* @param Event $queueForRealtime
* @param Event $queueForFunctions
* @param FunctionPublisher $publisherForFunctions
* @param Event $queueForWebhooks
* @param Database $dbForProject
* @param EventProcessor $eventProcessor
@@ -434,7 +436,7 @@ abstract class Action extends DatabasesAction
array $documents,
Event $queueForEvents,
Event $queueForRealtime,
Event $queueForFunctions,
FunctionPublisher $publisherForFunctions,
Event $queueForWebhooks,
Database $dbForProject,
EventProcessor $eventProcessor
@@ -472,9 +474,15 @@ abstract class Action extends DatabasesAction
if (!empty($functionsEvents)) {
foreach ($generatedEvents as $event) {
if (isset($functionsEvents[$event])) {
$queueForFunctions
->from($queueForEvents)
->trigger();
$publisherForFunctions->enqueue(FunctionMessage::fromEvent(
event: $queueForEvents->getEvent(),
params: $queueForEvents->getParams(),
project: $queueForEvents->getProject(),
user: $queueForEvents->getUser(),
userId: $queueForEvents->getUserId(),
payload: $queueForEvents->getPayload(),
platform: $queueForEvents->getPlatform(),
));
break;
}
}
@@ -494,7 +502,6 @@ abstract class Action extends DatabasesAction
$queueForEvents->reset();
$queueForRealtime->reset();
$queueForFunctions->reset();
$queueForWebhooks->reset();
}
}
@@ -3,6 +3,7 @@
namespace Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documents\Bulk;
use Appwrite\Event\Event;
use Appwrite\Event\Publisher\Func as FunctionPublisher;
use Appwrite\Extend\Exception;
use Appwrite\Functions\EventProcessor;
use Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documents\Action;
@@ -80,14 +81,14 @@ class Delete extends Action
->inject('usage')
->inject('queueForEvents')
->inject('queueForRealtime')
->inject('queueForFunctions')
->inject('publisherForFunctions')
->inject('queueForWebhooks')
->inject('plan')
->inject('eventProcessor')
->callback($this->action(...));
}
public function action(string $databaseId, string $collectionId, array $queries, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, callable $getDatabasesDB, Context $usage, Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan, EventProcessor $eventProcessor): void
public function action(string $databaseId, string $collectionId, array $queries, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, callable $getDatabasesDB, Context $usage, Event $queueForEvents, Event $queueForRealtime, FunctionPublisher $publisherForFunctions, Event $queueForWebhooks, array $plan, EventProcessor $eventProcessor): void
{
$database = $dbForProject->getDocument('databases', $databaseId);
if ($database->isEmpty()) {
@@ -206,7 +207,7 @@ class Delete extends Action
$documents,
$queueForEvents,
$queueForRealtime,
$queueForFunctions,
$publisherForFunctions,
$queueForWebhooks,
$dbForProject,
$eventProcessor
@@ -3,6 +3,7 @@
namespace Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documents\Bulk;
use Appwrite\Event\Event;
use Appwrite\Event\Publisher\Func as FunctionPublisher;
use Appwrite\Extend\Exception;
use Appwrite\Functions\EventProcessor;
use Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documents\Action;
@@ -84,14 +85,14 @@ class Update extends Action
->inject('usage')
->inject('queueForEvents')
->inject('queueForRealtime')
->inject('queueForFunctions')
->inject('publisherForFunctions')
->inject('queueForWebhooks')
->inject('plan')
->inject('eventProcessor')
->callback($this->action(...));
}
public function action(string $databaseId, string $collectionId, string|array $data, array $queries, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, callable $getDatabasesDB, Context $usage, Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan, EventProcessor $eventProcessor): void
public function action(string $databaseId, string $collectionId, string|array $data, array $queries, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, callable $getDatabasesDB, Context $usage, Event $queueForEvents, Event $queueForRealtime, FunctionPublisher $publisherForFunctions, Event $queueForWebhooks, array $plan, EventProcessor $eventProcessor): void
{
$data = \is_string($data)
? \json_decode($data, true)
@@ -237,7 +238,7 @@ class Update extends Action
$documents,
$queueForEvents,
$queueForRealtime,
$queueForFunctions,
$publisherForFunctions,
$queueForWebhooks,
$dbForProject,
$eventProcessor
@@ -3,6 +3,7 @@
namespace Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documents\Bulk;
use Appwrite\Event\Event;
use Appwrite\Event\Publisher\Func as FunctionPublisher;
use Appwrite\Extend\Exception;
use Appwrite\Functions\EventProcessor;
use Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documents\Action;
@@ -82,14 +83,14 @@ class Upsert extends Action
->inject('usage')
->inject('queueForEvents')
->inject('queueForRealtime')
->inject('queueForFunctions')
->inject('publisherForFunctions')
->inject('queueForWebhooks')
->inject('plan')
->inject('eventProcessor')
->callback($this->action(...));
}
public function action(string $databaseId, string $collectionId, array $documents, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, callable $getDatabasesDB, Context $usage, Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan, EventProcessor $eventProcessor): void
public function action(string $databaseId, string $collectionId, array $documents, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, callable $getDatabasesDB, Context $usage, Event $queueForEvents, Event $queueForRealtime, FunctionPublisher $publisherForFunctions, Event $queueForWebhooks, array $plan, EventProcessor $eventProcessor): void
{
$database = $dbForProject->getDocument('databases', $databaseId);
if ($database->isEmpty()) {
@@ -212,7 +213,7 @@ class Upsert extends Action
$upserted,
$queueForEvents,
$queueForRealtime,
$queueForFunctions,
$publisherForFunctions,
$queueForWebhooks,
$dbForProject,
$eventProcessor
@@ -3,6 +3,7 @@
namespace Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documents;
use Appwrite\Event\Event;
use Appwrite\Event\Publisher\Func as FunctionPublisher;
use Appwrite\Extend\Exception;
use Appwrite\Functions\EventProcessor;
use Appwrite\SDK\AuthType;
@@ -137,7 +138,7 @@ class Create extends Action
->inject('queueForEvents')
->inject('usage')
->inject('queueForRealtime')
->inject('queueForFunctions')
->inject('publisherForFunctions')
->inject('queueForWebhooks')
->inject('plan')
->inject('authorization')
@@ -145,7 +146,7 @@ class Create extends Action
->callback($this->action(...));
}
public function action(string $databaseId, string $documentId, string $collectionId, string|array $data, ?array $permissions, ?array $documents, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, callable $getDatabasesDB, User $user, Event $queueForEvents, Context $usage, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan, Authorization $authorization, EventProcessor $eventProcessor): void
public function action(string $databaseId, string $documentId, string $collectionId, string|array $data, ?array $permissions, ?array $documents, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, callable $getDatabasesDB, User $user, Event $queueForEvents, Context $usage, Event $queueForRealtime, FunctionPublisher $publisherForFunctions, Event $queueForWebhooks, array $plan, Authorization $authorization, EventProcessor $eventProcessor): void
{
$data = \is_string($data)
? \json_decode($data, true)
@@ -517,7 +518,7 @@ class Create extends Action
$created,
$queueForEvents,
$queueForRealtime,
$queueForFunctions,
$publisherForFunctions,
$queueForWebhooks,
$dbForProject,
$eventProcessor
@@ -5,6 +5,8 @@ namespace Appwrite\Platform\Modules\Databases\Http\Databases\Transactions;
use Appwrite\Databases\TransactionState;
use Appwrite\Event\Delete;
use Appwrite\Event\Event;
use Appwrite\Event\Message\Func as FunctionMessage;
use Appwrite\Event\Publisher\Func as FunctionPublisher;
use Appwrite\Extend\Exception;
use Appwrite\Functions\EventProcessor;
use Appwrite\SDK\AuthType;
@@ -77,7 +79,7 @@ class Update extends Action
->inject('queueForEvents')
->inject('usage')
->inject('queueForRealtime')
->inject('queueForFunctions')
->inject('publisherForFunctions')
->inject('queueForWebhooks')
->inject('authorization')
->inject('eventProcessor')
@@ -97,7 +99,7 @@ class Update extends Action
* @param Event $queueForEvents
* @param Context $usage
* @param Event $queueForRealtime
* @param Event $queueForFunctions
* @param FunctionPublisher $publisherForFunctions
* @param Event $queueForWebhooks
* @param EventProcessor $eventProcessor
* @return void
@@ -108,7 +110,7 @@ class Update extends Action
* @throws StructureException
* @throws \Utopia\Http\Exception
*/
public function action(string $transactionId, bool $commit, bool $rollback, Document $project, UtopiaResponse $response, Database $dbForProject, callable $getDatabasesDB, User $user, TransactionState $transactionState, Delete $queueForDeletes, Event $queueForEvents, Context $usage, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, Authorization $authorization, EventProcessor $eventProcessor): void
public function action(string $transactionId, bool $commit, bool $rollback, Document $project, UtopiaResponse $response, Database $dbForProject, callable $getDatabasesDB, User $user, TransactionState $transactionState, Delete $queueForDeletes, Event $queueForEvents, Context $usage, Event $queueForRealtime, FunctionPublisher $publisherForFunctions, Event $queueForWebhooks, Authorization $authorization, EventProcessor $eventProcessor): void
{
if (!$commit && !$rollback) {
throw new Exception(Exception::GENERAL_BAD_REQUEST, 'Either commit or rollback must be true');
@@ -461,7 +463,15 @@ class Update extends Action
if (!empty($functionsEvents)) {
foreach ($generatedEvents as $event) {
if (isset($functionsEvents[$event])) {
$queueForFunctions->from($queueForEvents)->trigger();
$publisherForFunctions->enqueue(FunctionMessage::fromEvent(
event: $queueForEvents->getEvent(),
params: $queueForEvents->getParams(),
project: $queueForEvents->getProject(),
user: $queueForEvents->getUser(),
userId: $queueForEvents->getUserId(),
payload: $queueForEvents->getPayload(),
platform: $queueForEvents->getPlatform(),
));
break;
}
}
@@ -480,7 +490,6 @@ class Update extends Action
$queueForEvents->reset();
$queueForRealtime->reset();
$queueForFunctions->reset();
$queueForWebhooks->reset();
}
}
@@ -63,7 +63,7 @@ class Delete extends DocumentsDelete
->inject('usage')
->inject('queueForEvents')
->inject('queueForRealtime')
->inject('queueForFunctions')
->inject('publisherForFunctions')
->inject('queueForWebhooks')
->inject('plan')
->inject('eventProcessor')
@@ -65,7 +65,7 @@ class Update extends DocumentsUpdate
->inject('usage')
->inject('queueForEvents')
->inject('queueForRealtime')
->inject('queueForFunctions')
->inject('publisherForFunctions')
->inject('queueForWebhooks')
->inject('plan')
->inject('eventProcessor')
@@ -65,7 +65,7 @@ class Upsert extends DocumentsUpsert
->inject('usage')
->inject('queueForEvents')
->inject('queueForRealtime')
->inject('queueForFunctions')
->inject('publisherForFunctions')
->inject('queueForWebhooks')
->inject('plan')
->inject('eventProcessor')
@@ -112,7 +112,7 @@ class Create extends DocumentCreate
->inject('queueForEvents')
->inject('usage')
->inject('queueForRealtime')
->inject('queueForFunctions')
->inject('publisherForFunctions')
->inject('queueForWebhooks')
->inject('plan')
->inject('authorization')
@@ -60,7 +60,7 @@ class Update extends TransactionsUpdate
->inject('queueForEvents')
->inject('usage')
->inject('queueForRealtime')
->inject('queueForFunctions')
->inject('publisherForFunctions')
->inject('queueForWebhooks')
->inject('authorization')
->inject('eventProcessor')
@@ -65,7 +65,7 @@ class Delete extends DocumentsDelete
->inject('usage')
->inject('queueForEvents')
->inject('queueForRealtime')
->inject('queueForFunctions')
->inject('publisherForFunctions')
->inject('queueForWebhooks')
->inject('plan')
->inject('eventProcessor')
@@ -67,7 +67,7 @@ class Update extends DocumentsUpdate
->inject('usage')
->inject('queueForEvents')
->inject('queueForRealtime')
->inject('queueForFunctions')
->inject('publisherForFunctions')
->inject('queueForWebhooks')
->inject('plan')
->inject('eventProcessor')
@@ -67,7 +67,7 @@ class Upsert extends DocumentsUpsert
->inject('usage')
->inject('queueForEvents')
->inject('queueForRealtime')
->inject('queueForFunctions')
->inject('publisherForFunctions')
->inject('queueForWebhooks')
->inject('plan')
->inject('eventProcessor')
@@ -109,7 +109,7 @@ class Create extends DocumentCreate
->inject('queueForEvents')
->inject('usage')
->inject('queueForRealtime')
->inject('queueForFunctions')
->inject('publisherForFunctions')
->inject('queueForWebhooks')
->inject('plan')
->inject('authorization')
@@ -61,7 +61,7 @@ class Update extends TransactionsUpdate
->inject('queueForEvents')
->inject('usage')
->inject('queueForRealtime')
->inject('queueForFunctions')
->inject('publisherForFunctions')
->inject('queueForWebhooks')
->inject('authorization')
->inject('eventProcessor')
@@ -63,7 +63,7 @@ class Delete extends DocumentsDelete
->inject('usage')
->inject('queueForEvents')
->inject('queueForRealtime')
->inject('queueForFunctions')
->inject('publisherForFunctions')
->inject('queueForWebhooks')
->inject('plan')
->inject('eventProcessor')
@@ -65,7 +65,7 @@ class Update extends DocumentsUpdate
->inject('usage')
->inject('queueForEvents')
->inject('queueForRealtime')
->inject('queueForFunctions')
->inject('publisherForFunctions')
->inject('queueForWebhooks')
->inject('plan')
->inject('eventProcessor')
@@ -65,7 +65,7 @@ class Upsert extends DocumentsUpsert
->inject('usage')
->inject('queueForEvents')
->inject('queueForRealtime')
->inject('queueForFunctions')
->inject('publisherForFunctions')
->inject('queueForWebhooks')
->inject('plan')
->inject('eventProcessor')
@@ -106,7 +106,7 @@ class Create extends DocumentCreate
->inject('queueForEvents')
->inject('usage')
->inject('queueForRealtime')
->inject('queueForFunctions')
->inject('publisherForFunctions')
->inject('queueForWebhooks')
->inject('plan')
->inject('authorization')
@@ -60,7 +60,7 @@ class Update extends TransactionsUpdate
->inject('queueForEvents')
->inject('usage')
->inject('queueForRealtime')
->inject('queueForFunctions')
->inject('publisherForFunctions')
->inject('queueForWebhooks')
->inject('authorization')
->inject('eventProcessor')
@@ -5,7 +5,8 @@ namespace Appwrite\Platform\Modules\Functions\Http\Executions;
use Ahc\Jwt\JWT;
use Appwrite\Event\Delete as DeleteEvent;
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Event\Message\Func as FunctionMessage;
use Appwrite\Event\Publisher\Func as FunctionPublisher;
use Appwrite\Extend\Exception;
use Appwrite\Extend\Exception as AppwriteException;
use Appwrite\Functions\Validator\Headers;
@@ -95,7 +96,7 @@ class Create extends Base
->inject('user')
->inject('queueForEvents')
->inject('usage')
->inject('queueForFunctions')
->inject('publisherForFunctions')
->inject('geodb')
->inject('store')
->inject('proofForToken')
@@ -123,7 +124,7 @@ class Create extends Base
User $user,
Event $queueForEvents,
Context $usage,
Func $queueForFunctions,
FunctionPublisher $publisherForFunctions,
Reader $geodb,
Store $store,
Token $proofForToken,
@@ -294,20 +295,19 @@ class Create extends Base
if ($async) {
if (is_null($scheduledAt)) {
$execution = $authorization->skip(fn () => $dbForProject->createDocument('executions', $execution));
$queueForFunctions
->setType('http')
->setExecution($execution)
->setFunction($function)
->setBody($body)
->setHeaders($headers)
->setPath($path)
->setMethod($method)
->setJWT($jwt)
->setProject($project)
->setUser($user)
->setParam('functionId', $function->getId())
->setParam('executionId', $execution->getId())
->trigger();
$publisherForFunctions->enqueue(new FunctionMessage(
project: $project,
user: $user,
function: $function,
functionId: $function->getId(),
execution: $execution,
type: 'http',
jwt: $jwt,
body: $body,
path: $path,
headers: $headers,
method: $method,
));
} else {
$data = [
'headers' => $headers,
@@ -3,9 +3,10 @@
namespace Appwrite\Platform\Modules\Functions\Http\Functions;
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Event\Message\Build as BuildMessage;
use Appwrite\Event\Message\Func as FunctionMessage;
use Appwrite\Event\Publisher\Build as BuildPublisher;
use Appwrite\Event\Publisher\Func as FunctionPublisher;
use Appwrite\Event\Realtime;
use Appwrite\Event\Validator\FunctionEvent;
use Appwrite\Event\Webhook;
@@ -119,7 +120,7 @@ class Create extends Base
->inject('publisherForBuilds')
->inject('queueForRealtime')
->inject('queueForWebhooks')
->inject('queueForFunctions')
->inject('publisherForFunctions')
->inject('dbForPlatform')
->inject('request')
->inject('gitHub')
@@ -161,7 +162,7 @@ class Create extends Base
BuildPublisher $publisherForBuilds,
Realtime $queueForRealtime,
Webhook $queueForWebhooks,
Func $queueForFunctions,
FunctionPublisher $publisherForFunctions,
Database $dbForPlatform,
Request $request,
GitHub $github,
@@ -423,9 +424,15 @@ class Create extends Base
->trigger();
/** Trigger Functions */
$queueForFunctions
->from($ruleCreate)
->trigger();
$publisherForFunctions->enqueue(FunctionMessage::fromEvent(
event: $ruleCreate->getEvent(),
params: $ruleCreate->getParams(),
project: $ruleCreate->getProject(),
user: $ruleCreate->getUser(),
userId: $ruleCreate->getUserId(),
payload: $ruleCreate->getPayload(),
platform: $ruleCreate->getPlatform(),
));
/** Trigger Realtime Events */
$queueForRealtime
@@ -4,8 +4,9 @@ namespace Appwrite\Platform\Modules\Functions\Workers;
use Ahc\Jwt\JWT;
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Event\Message\Func as FunctionMessage;
use Appwrite\Event\Message\Usage as UsageMessage;
use Appwrite\Event\Publisher\Func as FunctionPublisher;
use Appwrite\Event\Publisher\Screenshot;
use Appwrite\Event\Publisher\Usage as UsagePublisher;
use Appwrite\Event\Realtime;
@@ -63,7 +64,7 @@ class Builds extends Action
->inject('queueForEvents')
->inject('publisherForScreenshots')
->inject('queueForWebhooks')
->inject('queueForFunctions')
->inject('publisherForFunctions')
->inject('queueForRealtime')
->inject('usage')
->inject('publisherForUsage')
@@ -89,7 +90,7 @@ class Builds extends Action
Event $queueForEvents,
Screenshot $publisherForScreenshots,
Webhook $queueForWebhooks,
Func $queueForFunctions,
FunctionPublisher $publisherForFunctions,
Realtime $queueForRealtime,
Context $usage,
UsagePublisher $publisherForUsage,
@@ -131,7 +132,7 @@ class Builds extends Action
$deviceForFiles,
$publisherForScreenshots,
$queueForWebhooks,
$queueForFunctions,
$publisherForFunctions,
$queueForRealtime,
$queueForEvents,
$usage,
@@ -167,7 +168,7 @@ class Builds extends Action
Device $deviceForFiles,
Screenshot $publisherForScreenshots,
Webhook $queueForWebhooks,
Func $queueForFunctions,
FunctionPublisher $publisherForFunctions,
Realtime $queueForRealtime,
Event $queueForEvents,
Context $usage,
@@ -570,9 +571,15 @@ class Builds extends Action
->trigger();
/** Trigger Functions */
$queueForFunctions
->from($deploymentUpdate)
->trigger();
$publisherForFunctions->enqueue(FunctionMessage::fromEvent(
event: $deploymentUpdate->getEvent(),
params: $deploymentUpdate->getParams(),
project: $deploymentUpdate->getProject(),
user: $deploymentUpdate->getUser(),
userId: $deploymentUpdate->getUserId(),
payload: $deploymentUpdate->getPayload(),
platform: $deploymentUpdate->getPlatform(),
));
/** Trigger Realtime Event */
$queueForRealtime
@@ -5,10 +5,10 @@ namespace Appwrite\Platform\Modules\Health\Http\Health\Queue\Failed;
use Appwrite\Event\Database;
use Appwrite\Event\Delete;
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Event\Publisher\Audit;
use Appwrite\Event\Publisher\Build as BuildPublisher;
use Appwrite\Event\Publisher\Certificate;
use Appwrite\Event\Publisher\Func as FunctionPublisher;
use Appwrite\Event\Publisher\Mail as MailPublisher;
use Appwrite\Event\Publisher\Messaging as MessagingPublisher;
use Appwrite\Event\Publisher\Migration as MigrationPublisher;
@@ -78,7 +78,7 @@ class Get extends Base
->inject('queueForDeletes')
->inject('publisherForAudits')
->inject('publisherForMails')
->inject('queueForFunctions')
->inject('publisherForFunctions')
->inject('publisherForStatsResources')
->inject('publisherForUsage')
->inject('queueForWebhooks')
@@ -98,7 +98,7 @@ class Get extends Base
Delete $queueForDeletes,
Audit $publisherForAudits,
MailPublisher $publisherForMails,
Func $queueForFunctions,
FunctionPublisher $publisherForFunctions,
StatsResourcesPublisher $publisherForStatsResources,
UsagePublisher $publisherForUsage,
Webhook $queueForWebhooks,
@@ -115,7 +115,7 @@ class Get extends Base
System::getEnv('_APP_DELETE_QUEUE_NAME', Event::DELETE_QUEUE_NAME) => $queueForDeletes,
System::getEnv('_APP_AUDITS_QUEUE_NAME', Event::AUDITS_QUEUE_NAME) => $publisherForAudits,
System::getEnv('_APP_MAILS_QUEUE_NAME', Event::MAILS_QUEUE_NAME) => $publisherForMails,
System::getEnv('_APP_FUNCTIONS_QUEUE_NAME', Event::FUNCTIONS_QUEUE_NAME) => $queueForFunctions,
System::getEnv('_APP_FUNCTIONS_QUEUE_NAME', Event::FUNCTIONS_QUEUE_NAME) => $publisherForFunctions,
System::getEnv('_APP_STATS_RESOURCES_QUEUE_NAME', Event::STATS_RESOURCES_QUEUE_NAME) => $publisherForStatsResources,
System::getEnv('_APP_STATS_USAGE_QUEUE_NAME', Event::STATS_USAGE_QUEUE_NAME) => $publisherForUsage,
System::getEnv('_APP_WEBHOOK_QUEUE_NAME', Event::WEBHOOK_QUEUE_NAME) => $queueForWebhooks,
@@ -2,7 +2,7 @@
namespace Appwrite\Platform\Modules\Health\Http\Health\Queue\Functions;
use Appwrite\Event\Func;
use Appwrite\Event\Publisher\Func as FunctionPublisher;
use Appwrite\Platform\Modules\Health\Http\Health\Queue\Base;
use Appwrite\SDK\AuthType;
use Appwrite\SDK\ContentType;
@@ -42,16 +42,16 @@ class Get extends Base
contentType: ContentType::JSON
))
->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true)
->inject('queueForFunctions')
->inject('publisherForFunctions')
->inject('response')
->callback($this->action(...));
}
public function action(int|string $threshold, Func $queueForFunctions, Response $response): void
public function action(int|string $threshold, FunctionPublisher $publisherForFunctions, Response $response): void
{
$threshold = (int) $threshold;
$size = $queueForFunctions->getSize();
$size = $publisherForFunctions->getSize();
$this->assertQueueThreshold($size, $threshold);
@@ -2,7 +2,8 @@
namespace Appwrite\Platform\Tasks;
use Appwrite\Event\Func;
use Appwrite\Event\Message\Func as FunctionMessage;
use Appwrite\Event\Publisher\Func as FunctionPublisher;
use Swoole\Coroutine as Co;
use Utopia\Database\Database;
@@ -36,7 +37,10 @@ class ScheduleExecutions extends ScheduleBase
{
$intervalEnd = (new \DateTime())->modify('+' . self::ENQUEUE_TIMER . ' seconds');
$queueForFunctions = new Func($this->publisherFunctions);
$publisherForFunctions = new FunctionPublisher(
$this->publisherFunctions,
new \Utopia\Queue\Queue(\Utopia\System\System::getEnv('_APP_FUNCTIONS_QUEUE_NAME', \Appwrite\Event\Event::FUNCTIONS_QUEUE_NAME), 'utopia-queue', \Appwrite\Event\Event::FUNCTIONS_QUEUE_TTL)
);
foreach ($this->schedules as $schedule) {
if (!$schedule['active']) {
@@ -63,23 +67,22 @@ class ScheduleExecutions extends ScheduleBase
$this->updateProjectAccess($schedule['project'], $dbForPlatform);
\go(function () use ($queueForFunctions, $schedule, $scheduledAt, $delay, $data, $dbForPlatform) {
\go(function () use ($publisherForFunctions, $schedule, $scheduledAt, $delay, $data, $dbForPlatform) {
if ($delay > 0) {
Co::sleep($delay);
}
$queueForFunctions->setType('schedule')
// Set functionId instead of function as we don't have $dbForProject
// TODO: Refactor to use function instead of functionId
->setFunctionId($schedule['resource']['resourceId'])
->setExecution($schedule['resource'])
->setMethod($data['method'] ?? 'POST')
->setPath($data['path'] ?? '/')
->setHeaders($data['headers'] ?? [])
->setBody($data['body'] ?? '')
->setProject($schedule['project'])
->setUserId($data['userId'] ?? '')
->trigger();
$publisherForFunctions->enqueue(new FunctionMessage(
project: $schedule['project'],
userId: $data['userId'] ?? '',
functionId: $schedule['resource']['resourceId'],
execution: $schedule['resource'],
type: 'schedule',
body: $data['body'] ?? '',
path: $data['path'] ?? '/',
headers: $data['headers'] ?? [],
method: $data['method'] ?? 'POST',
));
$dbForPlatform->deleteDocument(
'schedules',
@@ -2,7 +2,8 @@
namespace Appwrite\Platform\Tasks;
use Appwrite\Event\Func;
use Appwrite\Event\Message\Func as FunctionMessage;
use Appwrite\Event\Publisher\Func as FunctionPublisher;
use Cron\CronExpression;
use Utopia\Console;
use Utopia\Database\Database;
@@ -97,14 +98,10 @@ class ScheduleFunctions extends ScheduleBase
$this->updateProjectAccess($schedule['project'], $dbForPlatform);
$queueForFunctions = new Func($this->publisherFunctions);
$queueForFunctions
->setType('schedule')
->setFunction($schedule['resource'])
->setMethod('POST')
->setPath('/')
->setProject($schedule['project']);
$publisherForFunctions = new FunctionPublisher(
$this->publisherFunctions,
new \Utopia\Queue\Queue(\Utopia\System\System::getEnv('_APP_FUNCTIONS_QUEUE_NAME', \Appwrite\Event\Event::FUNCTIONS_QUEUE_NAME), 'utopia-queue', \Appwrite\Event\Event::FUNCTIONS_QUEUE_TTL)
);
Span::init('schedule.functions.enqueue');
try {
@@ -112,7 +109,13 @@ class ScheduleFunctions extends ScheduleBase
Span::add('function.id', $schedule['resource']->getId());
Span::add('schedule.id', $schedule['$id'] ?? '');
$queueForFunctions->trigger();
$publisherForFunctions->enqueue(new FunctionMessage(
project: $schedule['project'],
function: $schedule['resource'],
type: 'schedule',
method: 'POST',
path: '/',
));
$this->recordEnqueueDelay($delayConfig['nextDate']);
} finally {
+24 -17
View File
@@ -4,9 +4,10 @@ namespace Appwrite\Platform\Workers;
use Appwrite\Certificates\Adapter as CertificatesAdapter;
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Event\Message\Func as FunctionMessage;
use Appwrite\Event\Message\Mail as MailMessage;
use Appwrite\Event\Publisher\Certificate;
use Appwrite\Event\Publisher\Func as FunctionPublisher;
use Appwrite\Event\Publisher\Mail as MailPublisher;
use Appwrite\Event\Realtime;
use Appwrite\Event\Webhook;
@@ -54,7 +55,7 @@ class Certificates extends Action
->inject('publisherForMails')
->inject('queueForEvents')
->inject('queueForWebhooks')
->inject('queueForFunctions')
->inject('publisherForFunctions')
->inject('queueForRealtime')
->inject('publisherForCertificates')
->inject('log')
@@ -70,7 +71,7 @@ class Certificates extends Action
* @param MailPublisher $publisherForMails
* @param Event $queueForEvents
* @param Webhook $queueForWebhooks
* @param Func $queueForFunctions
* @param FunctionPublisher $publisherForFunctions
* @param Realtime $queueForRealtime
* @param Certificate $publisherForCertificates
* @param Log $log
@@ -87,7 +88,7 @@ class Certificates extends Action
MailPublisher $publisherForMails,
Event $queueForEvents,
Webhook $queueForWebhooks,
Func $queueForFunctions,
FunctionPublisher $publisherForFunctions,
Realtime $queueForRealtime,
Certificate $publisherForCertificates,
Log $log,
@@ -113,11 +114,11 @@ class Certificates extends Action
switch ($action) {
case \Appwrite\Event\Certificate::ACTION_DOMAIN_VERIFICATION:
$this->handleDomainVerificationAction($domain, $dbForPlatform, $queueForEvents, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $publisherForCertificates, $log, $authorization, $validationDomain);
$this->handleDomainVerificationAction($domain, $dbForPlatform, $queueForEvents, $queueForWebhooks, $publisherForFunctions, $queueForRealtime, $publisherForCertificates, $log, $authorization, $validationDomain);
break;
case \Appwrite\Event\Certificate::ACTION_GENERATION:
$this->handleCertificateGenerationAction($domain, $domainType, $dbForPlatform, $publisherForMails, $queueForEvents, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $log, $certificates, $authorization, $skipRenewCheck, $plan, $validationDomain);
$this->handleCertificateGenerationAction($domain, $domainType, $dbForPlatform, $publisherForMails, $queueForEvents, $queueForWebhooks, $publisherForFunctions, $queueForRealtime, $log, $certificates, $authorization, $skipRenewCheck, $plan, $validationDomain);
break;
default:
@@ -130,7 +131,7 @@ class Certificates extends Action
* @param Database $dbForPlatform
* @param Event $queueForEvents
* @param Webhook $queueForWebhooks
* @param Func $queueForFunctions
* @param FunctionPublisher $publisherForFunctions
* @param Realtime $queueForRealtime
* @param Certificate $publisherForCertificates
* @param Log $log
@@ -146,7 +147,7 @@ class Certificates extends Action
Database $dbForPlatform,
Event $queueForEvents,
Webhook $queueForWebhooks,
Func $queueForFunctions,
FunctionPublisher $publisherForFunctions,
Realtime $queueForRealtime,
Certificate $publisherForCertificates,
Log $log,
@@ -185,7 +186,7 @@ class Certificates extends Action
$rule->setAttribute('logs', $logs);
} finally {
// Update rule and emit events
$this->updateRuleAndSendEvents($rule, $dbForPlatform, $queueForEvents, $queueForWebhooks, $queueForFunctions, $queueForRealtime);
$this->updateRuleAndSendEvents($rule, $dbForPlatform, $queueForEvents, $queueForWebhooks, $publisherForFunctions, $queueForRealtime);
}
// Issue a TLS certificate when domain is verified
@@ -213,7 +214,7 @@ class Certificates extends Action
* @param MailPublisher $publisherForMails
* @param Event $queueForEvents
* @param Webhook $queueForWebhooks
* @param Func $queueForFunctions
* @param FunctionPublisher $publisherForFunctions
* @param Realtime $queueForRealtime
* @param Log $log
* @param CertificatesAdapter $certificates
@@ -237,7 +238,7 @@ class Certificates extends Action
MailPublisher $publisherForMails,
Event $queueForEvents,
Webhook $queueForWebhooks,
Func $queueForFunctions,
FunctionPublisher $publisherForFunctions,
Realtime $queueForRealtime,
Log $log,
CertificatesAdapter $certificates,
@@ -370,7 +371,7 @@ class Certificates extends Action
// Update rule and emit events
$rule->setAttribute('certificateId', $certificate->getId());
$rule->setAttribute('logs', $logs);
$this->updateRuleAndSendEvents($rule, $dbForPlatform, $queueForEvents, $queueForWebhooks, $queueForFunctions, $queueForRealtime);
$this->updateRuleAndSendEvents($rule, $dbForPlatform, $queueForEvents, $queueForWebhooks, $publisherForFunctions, $queueForRealtime);
}
}
@@ -416,7 +417,7 @@ class Certificates extends Action
* @param Database $dbForPlatform Database connection for console
* @param Event $queueForEvents Event publisher for events
* @param Webhook $queueForWebhooks Webhook publisher for webhooks
* @param Func $queueForFunctions Function publisher for functions
* @param FunctionPublisher $publisherForFunctions Function publisher for functions
* @param Realtime $queueForRealtime Realtime publisher for realtime events
*
* @return void
@@ -426,7 +427,7 @@ class Certificates extends Action
Database $dbForPlatform,
Event $queueForEvents,
Webhook $queueForWebhooks,
Func $queueForFunctions,
FunctionPublisher $publisherForFunctions,
Realtime $queueForRealtime
): void {
$rule = $dbForPlatform->updateDocument('rules', $rule->getId(), new Document([
@@ -459,9 +460,15 @@ class Certificates extends Action
->trigger();
/** Trigger Functions */
$queueForFunctions
->from($queueForEvents)
->trigger();
$publisherForFunctions->enqueue(FunctionMessage::fromEvent(
event: $queueForEvents->getEvent(),
params: $queueForEvents->getParams(),
project: $queueForEvents->getProject(),
user: $queueForEvents->getUser(),
userId: $queueForEvents->getUserId(),
payload: $queueForEvents->getPayload(),
platform: $queueForEvents->getPlatform(),
));
/** Trigger Realtime Events */
$queueForRealtime
+33 -25
View File
@@ -5,7 +5,8 @@ namespace Appwrite\Platform\Workers;
use Ahc\Jwt\JWT;
use Appwrite\Bus\Events\ExecutionCompleted;
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Event\Message\Func as FunctionMessage;
use Appwrite\Event\Publisher\Func as FunctionPublisher;
use Appwrite\Event\Realtime;
use Appwrite\Event\Webhook;
use Appwrite\Extend\Exception as AppwriteException;
@@ -46,7 +47,7 @@ class Functions extends Action
->inject('message')
->inject('dbForProject')
->inject('queueForWebhooks')
->inject('queueForFunctions')
->inject('publisherForFunctions')
->inject('queueForRealtime')
->inject('queueForEvents')
->inject('bus')
@@ -61,7 +62,7 @@ class Functions extends Action
Message $message,
Database $dbForProject,
Webhook $queueForWebhooks,
Func $queueForFunctions,
FunctionPublisher $publisherForFunctions,
Realtime $queueForRealtime,
Event $queueForEvents,
Bus $bus,
@@ -78,7 +79,8 @@ class Functions extends Action
);
}
$type = $payload['type'] ?? '';
$functionMessage = FunctionMessage::fromArray($payload);
$type = $functionMessage->type;
Span::add('project.id', $project->getId());
Span::add('payload.type', $type);
@@ -86,18 +88,18 @@ class Functions extends Action
Span::add('queue.name', $message->getQueue());
Span::add('message.timestamp', (string) $message->getTimestamp());
$events = $payload['events'] ?? [];
$data = $payload['body'] ?? '';
$eventData = $payload['payload'] ?? '';
$platform = $payload['platform'] ?? Config::getParam('platform', []);
$function = new Document($payload['function'] ?? []);
$functionId = $payload['functionId'] ?? '';
$user = new Document($payload['user'] ?? []);
$userId = $payload['userId'] ?? '';
$method = $payload['method'] ?? 'POST';
$headers = $payload['headers'] ?? [];
$path = $payload['path'] ?? '/';
$jwt = $payload['jwt'] ?? '';
$events = $functionMessage->events;
$data = $functionMessage->body;
$eventData = $functionMessage->payload;
$platform = !empty($functionMessage->platform) ? $functionMessage->platform : Config::getParam('platform', []);
$function = $functionMessage->function ?? new Document();
$functionId = $functionMessage->functionId ?? '';
$user = $functionMessage->user ?? new Document();
$userId = $functionMessage->userId ?? '';
$method = $functionMessage->method ?: 'POST';
$headers = $functionMessage->headers;
$path = $functionMessage->path ?: '/';
$jwt = $functionMessage->jwt;
if ($user->isEmpty() && !empty($userId)) {
$user = $dbForProject->getDocument('users', $userId);
@@ -166,7 +168,7 @@ class Functions extends Action
log: $log,
dbForProject: $dbForProject,
queueForWebhooks: $queueForWebhooks,
queueForFunctions: $queueForFunctions,
publisherForFunctions: $publisherForFunctions,
queueForRealtime: $queueForRealtime,
queueForEvents: $queueForEvents,
bus: $bus,
@@ -185,7 +187,7 @@ class Functions extends Action
user: $user,
jwt: null,
event: $events[0],
eventData: \is_string($eventData) ? $eventData : \json_encode($eventData),
eventData: \json_encode($eventData) ?: null,
executionId: null,
);
Console::success('Triggered function: ' . $events[0]);
@@ -210,7 +212,7 @@ class Functions extends Action
log: $log,
dbForProject: $dbForProject,
queueForWebhooks: $queueForWebhooks,
queueForFunctions: $queueForFunctions,
publisherForFunctions: $publisherForFunctions,
queueForRealtime: $queueForRealtime,
queueForEvents: $queueForEvents,
bus: $bus,
@@ -236,7 +238,7 @@ class Functions extends Action
log: $log,
dbForProject: $dbForProject,
queueForWebhooks: $queueForWebhooks,
queueForFunctions: $queueForFunctions,
publisherForFunctions: $publisherForFunctions,
queueForRealtime: $queueForRealtime,
queueForEvents: $queueForEvents,
bus: $bus,
@@ -331,7 +333,7 @@ class Functions extends Action
/**
* @param Log $log
* @param Database $dbForProject
* @param Func $queueForFunctions
* @param FunctionPublisher $publisherForFunctions
* @param Realtime $queueForRealtime
* @param Event $queueForEvents
* @param Document $project
@@ -353,7 +355,7 @@ class Functions extends Action
Log $log,
Database $dbForProject,
Webhook $queueForWebhooks,
Func $queueForFunctions,
FunctionPublisher $publisherForFunctions,
Realtime $queueForRealtime,
Event $queueForEvents,
Bus $bus,
@@ -649,9 +651,15 @@ class Functions extends Action
->trigger();
/** Trigger Functions */
$queueForFunctions
->from($queueForEvents)
->trigger();
$publisherForFunctions->enqueue(FunctionMessage::fromEvent(
event: $queueForEvents->getEvent(),
params: $queueForEvents->getParams(),
project: $queueForEvents->getProject(),
user: $queueForEvents->getUser(),
userId: $queueForEvents->getUserId(),
payload: $queueForEvents->getPayload(),
platform: $queueForEvents->getPlatform(),
));
/** Trigger Realtime Events */
$queueForRealtime