From 375f6432142f462fe8a052c1ea7138dea4755fed Mon Sep 17 00:00:00 2001 From: Chirag Aggarwal Date: Tue, 11 Feb 2025 03:29:32 +0000 Subject: [PATCH] refactor: triggering realtime events with queueForRealtime --- app/controllers/api/functions.php | 52 ++++++--------- app/worker.php | 5 ++ src/Appwrite/Event/Event.php | 23 +++++++ src/Appwrite/Event/Realtime.php | 2 +- src/Appwrite/Platform/Workers/Databases.php | 74 +++++++++------------ 5 files changed, 80 insertions(+), 76 deletions(-) diff --git a/app/controllers/api/functions.php b/app/controllers/api/functions.php index 14255ef7a4..0f9f5211f4 100644 --- a/app/controllers/api/functions.php +++ b/app/controllers/api/functions.php @@ -6,13 +6,13 @@ use Appwrite\Event\Build; use Appwrite\Event\Delete; use Appwrite\Event\Event; use Appwrite\Event\Func; +use Appwrite\Event\Realtime; use Appwrite\Event\StatsUsage; use Appwrite\Event\Validator\FunctionEvent; use Appwrite\Extend\Exception; use Appwrite\Extend\Exception as AppwriteException; use Appwrite\Functions\Validator\Headers; use Appwrite\Functions\Validator\RuntimeSpecification; -use Appwrite\Messaging\Adapter\Realtime; use Appwrite\Platform\Tasks\ScheduleExecutions; use Appwrite\SDK\AuthType; use Appwrite\SDK\ContentType; @@ -194,9 +194,10 @@ App::post('/v1/functions') ->inject('user') ->inject('queueForEvents') ->inject('queueForBuilds') + ->inject('queueForRealtime') ->inject('dbForPlatform') ->inject('gitHub') - ->action(function (string $functionId, string $name, string $runtime, array $execute, array $events, string $schedule, int $timeout, bool $enabled, bool $logging, string $entrypoint, string $commands, array $scopes, string $installationId, string $providerRepositoryId, string $providerBranch, bool $providerSilentMode, string $providerRootDirectory, string $templateRepository, string $templateOwner, string $templateRootDirectory, string $templateVersion, string $specification, Request $request, Response $response, Database $dbForProject, callable $timelimit, Document $project, Document $user, Event $queueForEvents, Build $queueForBuilds, Database $dbForPlatform, GitHub $github) use ($redeployVcs) { + ->action(function (string $functionId, string $name, string $runtime, array $execute, array $events, string $schedule, int $timeout, bool $enabled, bool $logging, string $entrypoint, string $commands, array $scopes, string $installationId, string $providerRepositoryId, string $providerBranch, bool $providerSilentMode, string $providerRootDirectory, string $templateRepository, string $templateOwner, string $templateRootDirectory, string $templateVersion, string $specification, Request $request, Response $response, Database $dbForProject, callable $timelimit, Document $project, Document $user, Event $queueForEvents, Build $queueForBuilds, Realtime $queueForRealtime, Database $dbForPlatform, GitHub $github) use ($redeployVcs) { $functionId = ($functionId == 'unique()') ? ID::unique() : $functionId; // Temporary abuse check @@ -386,18 +387,18 @@ App::post('/v1/functions') ])) ); - /** Trigger Webhook */ $ruleModel = new Rule(); $ruleCreate = $queueForEvents - ->setClass(Event::WEBHOOK_CLASS_NAME) - ->setQueue(Event::WEBHOOK_QUEUE_NAME); + ->setProject($project) + ->setEvent('rules.[ruleId].create') + ->setParam('ruleId', $rule->getId()) + ->setPayload($rule->getArrayCopy(array_keys($ruleModel->getRules()))); + /** Trigger Webhook */ $ruleCreate - ->setProject($project) - ->setEvent('rules.[ruleId].create') - ->setParam('ruleId', $rule->getId()) - ->setPayload($rule->getArrayCopy(array_keys($ruleModel->getRules()))) + ->setClass(Event::WEBHOOK_CLASS_NAME) + ->setQueue(Event::WEBHOOK_QUEUE_NAME) ->trigger(); /** Trigger Functions */ @@ -406,31 +407,16 @@ App::post('/v1/functions') ->setQueue(Event::FUNCTIONS_QUEUE_NAME) ->trigger(); - /** Trigger realtime event */ - $allEvents = Event::generateEvents('rules.[ruleId].create', [ - 'ruleId' => $rule->getId(), - ]); + /** Trigger Realtime Events */ + $queueForRealtime + ->from($ruleCreate) + ->setProjectId('console') + ->trigger(); - $target = Realtime::fromPayload( - // Pass first, most verbose event pattern - event: $allEvents[0], - payload: $rule, - project: $project - ); - Realtime::send( - projectId: 'console', - payload: $rule->getArrayCopy(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'] - ); - Realtime::send( - projectId: $project->getId(), - payload: $rule->getArrayCopy(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'] - ); + $queueForRealtime + ->from($ruleCreate) + ->setProjectId($project->getId()) + ->trigger(); } $queueForEvents->setParam('functionId', $function->getId()); diff --git a/app/worker.php b/app/worker.php index 605474e9f1..ad6bf475f9 100644 --- a/app/worker.php +++ b/app/worker.php @@ -13,6 +13,7 @@ use Appwrite\Event\Func; use Appwrite\Event\Mail; use Appwrite\Event\Messaging; use Appwrite\Event\Migration; +use Appwrite\Event\Realtime; use Appwrite\Event\StatsUsage; use Appwrite\Event\StatsUsageDump; /** remove */ @@ -317,6 +318,10 @@ Server::setResource('queueForFunctions', function (Publisher $publisher) { return new Func($publisher); }, ['publisher']); +Server::setResource('queueForRealtime', function () { + return new Realtime(); +}, []); + Server::setResource('queueForCertificates', function (Publisher $publisher) { return new Certificate($publisher); }, ['publisher']); diff --git a/src/Appwrite/Event/Event.php b/src/Appwrite/Event/Event.php index 0edffdf4dc..8085a836a8 100644 --- a/src/Appwrite/Event/Event.php +++ b/src/Appwrite/Event/Event.php @@ -66,6 +66,7 @@ class Event protected ?Document $project = null; protected ?Document $user = null; protected ?string $userId = null; + protected ?string $projectId = null; protected bool $paused = false; /** @@ -151,6 +152,18 @@ class Event return $this; } + /** + * Set projectId for this event. + * + * @param string $projectId + * @return self + */ + public function setProjectId(string $projectId): self + { + $this->projectId = $projectId; + return $this; + } + /** * Get project for this event. * @@ -161,6 +174,16 @@ class Event return $this->project; } + /** + * Get projectId for this event. + * + * @return ?string + */ + public function getProjectId(): ?string + { + return $this->projectId; + } + /** * Set user for this event. * diff --git a/src/Appwrite/Event/Realtime.php b/src/Appwrite/Event/Realtime.php index f4f00b59d4..8c302bbabf 100644 --- a/src/Appwrite/Event/Realtime.php +++ b/src/Appwrite/Event/Realtime.php @@ -54,7 +54,7 @@ class Realtime extends Event ); RealtimeAdapter::send( - projectId: $target['projectId'] ?? $this->getProject()->getId(), + projectId: $this->getProjectId() ?? $target['projectId'] ?? $this->getProject()->getId(), payload: $this->getRealtimePayload(), events: $allEvents, channels: $target['channels'], diff --git a/src/Appwrite/Platform/Workers/Databases.php b/src/Appwrite/Platform/Workers/Databases.php index 441b09b4cc..50a3fa52f3 100644 --- a/src/Appwrite/Platform/Workers/Databases.php +++ b/src/Appwrite/Platform/Workers/Databases.php @@ -3,7 +3,7 @@ namespace Appwrite\Platform\Workers; use Appwrite\Event\Event; -use Appwrite\Messaging\Adapter\Realtime; +use Appwrite\Event\Realtime; use Exception; use Utopia\CLI\Console; use Utopia\Database\Database; @@ -38,7 +38,8 @@ class Databases extends Action ->inject('dbForPlatform') ->inject('dbForProject') ->inject('log') - ->callback(fn (Message $message, Document $project, Database $dbForPlatform, Database $dbForProject, Log $log) => $this->action($message, $project, $dbForPlatform, $dbForProject, $log)); + ->inject('queueForRealtime') + ->callback(fn (Message $message, Document $project, Database $dbForPlatform, Database $dbForProject, Log $log, Realtime $queueForRealtime) => $this->action($message, $project, $dbForPlatform, $dbForProject, $log, $queueForRealtime)); } /** @@ -47,10 +48,11 @@ class Databases extends Action * @param Database $dbForPlatform * @param Database $dbForProject * @param Log $log + * @param Realtime $queueForRealtime * @return void * @throws \Exception */ - public function action(Message $message, Document $project, Database $dbForPlatform, Database $dbForProject, Log $log): void + public function action(Message $message, Document $project, Database $dbForPlatform, Database $dbForProject, Log $log, Realtime $queueForRealtime): void { $payload = $message->getPayload() ?? []; @@ -75,10 +77,10 @@ class Databases extends Action match (\strval($type)) { DATABASE_TYPE_DELETE_DATABASE => $this->deleteDatabase($database, $project, $dbForProject), DATABASE_TYPE_DELETE_COLLECTION => $this->deleteCollection($database, $collection, $project, $dbForProject), - DATABASE_TYPE_CREATE_ATTRIBUTE => $this->createAttribute($database, $collection, $document, $project, $dbForPlatform, $dbForProject), - DATABASE_TYPE_DELETE_ATTRIBUTE => $this->deleteAttribute($database, $collection, $document, $project, $dbForPlatform, $dbForProject), - DATABASE_TYPE_CREATE_INDEX => $this->createIndex($database, $collection, $document, $project, $dbForPlatform, $dbForProject), - DATABASE_TYPE_DELETE_INDEX => $this->deleteIndex($database, $collection, $document, $project, $dbForPlatform, $dbForProject), + DATABASE_TYPE_CREATE_ATTRIBUTE => $this->createAttribute($database, $collection, $document, $project, $dbForPlatform, $dbForProject, $queueForRealtime), + DATABASE_TYPE_DELETE_ATTRIBUTE => $this->deleteAttribute($database, $collection, $document, $project, $dbForPlatform, $dbForProject, $queueForRealtime), + DATABASE_TYPE_CREATE_INDEX => $this->createIndex($database, $collection, $document, $project, $dbForPlatform, $dbForProject, $queueForRealtime), + DATABASE_TYPE_DELETE_INDEX => $this->deleteIndex($database, $collection, $document, $project, $dbForPlatform, $dbForProject, $queueForRealtime), default => throw new \Exception('No database operation for type: ' . \strval($type)), }; } @@ -90,13 +92,14 @@ class Databases extends Action * @param Document $project * @param Database $dbForPlatform * @param Database $dbForProject + * @param Realtime $queueForRealtime * @return void * @throws Authorization * @throws Conflict * @throws \Exception * @throws \Throwable */ - private function createAttribute(Document $database, Document $collection, Document $attribute, Document $project, Database $dbForPlatform, Database $dbForProject): void + private function createAttribute(Document $database, Document $collection, Document $attribute, Document $project, Database $dbForPlatform, Database $dbForProject, Realtime $queueForRealtime): void { if ($collection->isEmpty()) { throw new Exception('Missing collection'); @@ -106,12 +109,6 @@ class Databases extends Action } $projectId = $project->getId(); - - $events = Event::generateEvents('databases.[databaseId].collections.[collectionId].attributes.[attributeId].update', [ - 'databaseId' => $database->getId(), - 'collectionId' => $collection->getId(), - 'attributeId' => $attribute->getId() - ]); /** * TODO @christyjacob4 verify if this is still the case * Fetch attribute from the database, since with Resque float values are loosing informations. @@ -200,7 +197,7 @@ class Databases extends Action throw $e; } finally { - $this->trigger($database, $collection, $attribute, $project, $projectId, $events); + $this->trigger($database, $collection, $attribute, $project, $queueForRealtime); if (! $relatedCollection->isEmpty()) { $dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $relatedCollection->getId()); @@ -217,13 +214,14 @@ class Databases extends Action * @param Document $project * @param Database $dbForPlatform * @param Database $dbForProject + * @param Realtime $queueForRealtime * @return void * @throws Authorization * @throws Conflict * @throws \Exception * @throws \Throwable **/ - private function deleteAttribute(Document $database, Document $collection, Document $attribute, Document $project, Database $dbForPlatform, Database $dbForProject): void + private function deleteAttribute(Document $database, Document $collection, Document $attribute, Document $project, Database $dbForPlatform, Database $dbForProject, Realtime $queueForRealtime): void { if ($collection->isEmpty()) { throw new Exception('Missing collection'); @@ -312,7 +310,7 @@ class Databases extends Action throw $e; } finally { - $this->trigger($database, $collection, $attribute, $project, $projectId, $events); + $this->trigger($database, $collection, $attribute, $project, $queueForRealtime); } // The underlying database removes/rebuilds indexes when attribute is removed @@ -358,7 +356,7 @@ class Databases extends Action } if ($exists) { // Delete the duplicate if created, else update in db - $this->deleteIndex($database, $collection, $index, $project, $dbForPlatform, $dbForProject); + $this->deleteIndex($database, $collection, $index, $project, $dbForPlatform, $dbForProject, $queueForRealtime); } else { $dbForProject->updateDocument('indexes', $index->getId(), $index); } @@ -381,6 +379,7 @@ class Databases extends Action * @param Document $project * @param Database $dbForPlatform * @param Database $dbForProject + * @param Realtime $queueForRealtime * @return void * @throws Authorization * @throws Conflict @@ -388,7 +387,7 @@ class Databases extends Action * @throws DatabaseException * @throws \Throwable */ - private function createIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForPlatform, Database $dbForProject): void + private function createIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForPlatform, Database $dbForProject, Realtime $queueForRealtime): void { if ($collection->isEmpty()) { throw new Exception('Missing collection'); @@ -430,7 +429,7 @@ class Databases extends Action throw $e; } finally { - $this->trigger($database, $collection, $index, $project, $projectId, $events); + $this->trigger($database, $collection, $index, $project, $queueForRealtime); $dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $collectionId); } } @@ -442,6 +441,7 @@ class Databases extends Action * @param Document $project * @param Database $dbForPlatform * @param Database $dbForProject + * @param Realtime $queueForRealtime * @return void * @throws Authorization * @throws Conflict @@ -449,7 +449,7 @@ class Databases extends Action * @throws DatabaseException * @throws \Throwable */ - private function deleteIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForPlatform, Database $dbForProject): void + private function deleteIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForPlatform, Database $dbForProject, Realtime $queueForRealtime): void { if ($collection->isEmpty()) { throw new Exception('Missing collection'); @@ -490,7 +490,7 @@ class Databases extends Action throw $e; } finally { - $this->trigger($database, $collection, $index, $project, $projectId, $events); + $this->trigger($database, $collection, $index, $project, $queueForRealtime); $dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $collection->getId()); } } @@ -617,26 +617,16 @@ class Databases extends Action Document $collection, Document $attribute, Document $project, - string $projectId, - array $events + Realtime $queueForRealtime ): void { - $target = Realtime::fromPayload( - // Pass first, most verbose event pattern - event: $events[0], - payload: $attribute, - project: $project, - ); - Realtime::send( - projectId: 'console', - payload: $attribute->getArrayCopy(), - events: $events, - channels: $target['channels'], - roles: $target['roles'], - options: [ - 'projectId' => $projectId, - 'databaseId' => $database->getId(), - 'collectionId' => $collection->getId() - ] - ); + $queueForRealtime + ->setProject($project) + ->setProjectId('console') + ->setEvent('databases.[databaseId].collections.[collectionId].attributes.[attributeId].update') + ->setParam('databaseId', $database->getId()) + ->setParam('collectionId', $collection->getId()) + ->setParam('attributeId', $attribute->getId()) + ->setPayload($attribute->getArrayCopy()) + ->trigger(); } }