refactor: triggering realtime events with queueForRealtime

This commit is contained in:
Chirag Aggarwal
2025-02-11 03:29:32 +00:00
parent 2f95b0e2e0
commit 375f643214
5 changed files with 80 additions and 76 deletions
+19 -33
View File
@@ -6,13 +6,13 @@ use Appwrite\Event\Build;
use Appwrite\Event\Delete;
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Event\Realtime;
use Appwrite\Event\StatsUsage;
use Appwrite\Event\Validator\FunctionEvent;
use Appwrite\Extend\Exception;
use Appwrite\Extend\Exception as AppwriteException;
use Appwrite\Functions\Validator\Headers;
use Appwrite\Functions\Validator\RuntimeSpecification;
use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Platform\Tasks\ScheduleExecutions;
use Appwrite\SDK\AuthType;
use Appwrite\SDK\ContentType;
@@ -194,9 +194,10 @@ App::post('/v1/functions')
->inject('user')
->inject('queueForEvents')
->inject('queueForBuilds')
->inject('queueForRealtime')
->inject('dbForPlatform')
->inject('gitHub')
->action(function (string $functionId, string $name, string $runtime, array $execute, array $events, string $schedule, int $timeout, bool $enabled, bool $logging, string $entrypoint, string $commands, array $scopes, string $installationId, string $providerRepositoryId, string $providerBranch, bool $providerSilentMode, string $providerRootDirectory, string $templateRepository, string $templateOwner, string $templateRootDirectory, string $templateVersion, string $specification, Request $request, Response $response, Database $dbForProject, callable $timelimit, Document $project, Document $user, Event $queueForEvents, Build $queueForBuilds, Database $dbForPlatform, GitHub $github) use ($redeployVcs) {
->action(function (string $functionId, string $name, string $runtime, array $execute, array $events, string $schedule, int $timeout, bool $enabled, bool $logging, string $entrypoint, string $commands, array $scopes, string $installationId, string $providerRepositoryId, string $providerBranch, bool $providerSilentMode, string $providerRootDirectory, string $templateRepository, string $templateOwner, string $templateRootDirectory, string $templateVersion, string $specification, Request $request, Response $response, Database $dbForProject, callable $timelimit, Document $project, Document $user, Event $queueForEvents, Build $queueForBuilds, Realtime $queueForRealtime, Database $dbForPlatform, GitHub $github) use ($redeployVcs) {
$functionId = ($functionId == 'unique()') ? ID::unique() : $functionId;
// Temporary abuse check
@@ -386,18 +387,18 @@ App::post('/v1/functions')
]))
);
/** Trigger Webhook */
$ruleModel = new Rule();
$ruleCreate =
$queueForEvents
->setClass(Event::WEBHOOK_CLASS_NAME)
->setQueue(Event::WEBHOOK_QUEUE_NAME);
->setProject($project)
->setEvent('rules.[ruleId].create')
->setParam('ruleId', $rule->getId())
->setPayload($rule->getArrayCopy(array_keys($ruleModel->getRules())));
/** Trigger Webhook */
$ruleCreate
->setProject($project)
->setEvent('rules.[ruleId].create')
->setParam('ruleId', $rule->getId())
->setPayload($rule->getArrayCopy(array_keys($ruleModel->getRules())))
->setClass(Event::WEBHOOK_CLASS_NAME)
->setQueue(Event::WEBHOOK_QUEUE_NAME)
->trigger();
/** Trigger Functions */
@@ -406,31 +407,16 @@ App::post('/v1/functions')
->setQueue(Event::FUNCTIONS_QUEUE_NAME)
->trigger();
/** Trigger realtime event */
$allEvents = Event::generateEvents('rules.[ruleId].create', [
'ruleId' => $rule->getId(),
]);
/** Trigger Realtime Events */
$queueForRealtime
->from($ruleCreate)
->setProjectId('console')
->trigger();
$target = Realtime::fromPayload(
// Pass first, most verbose event pattern
event: $allEvents[0],
payload: $rule,
project: $project
);
Realtime::send(
projectId: 'console',
payload: $rule->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles']
);
Realtime::send(
projectId: $project->getId(),
payload: $rule->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles']
);
$queueForRealtime
->from($ruleCreate)
->setProjectId($project->getId())
->trigger();
}
$queueForEvents->setParam('functionId', $function->getId());
+5
View File
@@ -13,6 +13,7 @@ use Appwrite\Event\Func;
use Appwrite\Event\Mail;
use Appwrite\Event\Messaging;
use Appwrite\Event\Migration;
use Appwrite\Event\Realtime;
use Appwrite\Event\StatsUsage;
use Appwrite\Event\StatsUsageDump;
/** remove */
@@ -317,6 +318,10 @@ Server::setResource('queueForFunctions', function (Publisher $publisher) {
return new Func($publisher);
}, ['publisher']);
Server::setResource('queueForRealtime', function () {
return new Realtime();
}, []);
Server::setResource('queueForCertificates', function (Publisher $publisher) {
return new Certificate($publisher);
}, ['publisher']);
+23
View File
@@ -66,6 +66,7 @@ class Event
protected ?Document $project = null;
protected ?Document $user = null;
protected ?string $userId = null;
protected ?string $projectId = null;
protected bool $paused = false;
/**
@@ -151,6 +152,18 @@ class Event
return $this;
}
/**
* Set projectId for this event.
*
* @param string $projectId
* @return self
*/
public function setProjectId(string $projectId): self
{
$this->projectId = $projectId;
return $this;
}
/**
* Get project for this event.
*
@@ -161,6 +174,16 @@ class Event
return $this->project;
}
/**
* Get projectId for this event.
*
* @return ?string
*/
public function getProjectId(): ?string
{
return $this->projectId;
}
/**
* Set user for this event.
*
+1 -1
View File
@@ -54,7 +54,7 @@ class Realtime extends Event
);
RealtimeAdapter::send(
projectId: $target['projectId'] ?? $this->getProject()->getId(),
projectId: $this->getProjectId() ?? $target['projectId'] ?? $this->getProject()->getId(),
payload: $this->getRealtimePayload(),
events: $allEvents,
channels: $target['channels'],
+32 -42
View File
@@ -3,7 +3,7 @@
namespace Appwrite\Platform\Workers;
use Appwrite\Event\Event;
use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Event\Realtime;
use Exception;
use Utopia\CLI\Console;
use Utopia\Database\Database;
@@ -38,7 +38,8 @@ class Databases extends Action
->inject('dbForPlatform')
->inject('dbForProject')
->inject('log')
->callback(fn (Message $message, Document $project, Database $dbForPlatform, Database $dbForProject, Log $log) => $this->action($message, $project, $dbForPlatform, $dbForProject, $log));
->inject('queueForRealtime')
->callback(fn (Message $message, Document $project, Database $dbForPlatform, Database $dbForProject, Log $log, Realtime $queueForRealtime) => $this->action($message, $project, $dbForPlatform, $dbForProject, $log, $queueForRealtime));
}
/**
@@ -47,10 +48,11 @@ class Databases extends Action
* @param Database $dbForPlatform
* @param Database $dbForProject
* @param Log $log
* @param Realtime $queueForRealtime
* @return void
* @throws \Exception
*/
public function action(Message $message, Document $project, Database $dbForPlatform, Database $dbForProject, Log $log): void
public function action(Message $message, Document $project, Database $dbForPlatform, Database $dbForProject, Log $log, Realtime $queueForRealtime): void
{
$payload = $message->getPayload() ?? [];
@@ -75,10 +77,10 @@ class Databases extends Action
match (\strval($type)) {
DATABASE_TYPE_DELETE_DATABASE => $this->deleteDatabase($database, $project, $dbForProject),
DATABASE_TYPE_DELETE_COLLECTION => $this->deleteCollection($database, $collection, $project, $dbForProject),
DATABASE_TYPE_CREATE_ATTRIBUTE => $this->createAttribute($database, $collection, $document, $project, $dbForPlatform, $dbForProject),
DATABASE_TYPE_DELETE_ATTRIBUTE => $this->deleteAttribute($database, $collection, $document, $project, $dbForPlatform, $dbForProject),
DATABASE_TYPE_CREATE_INDEX => $this->createIndex($database, $collection, $document, $project, $dbForPlatform, $dbForProject),
DATABASE_TYPE_DELETE_INDEX => $this->deleteIndex($database, $collection, $document, $project, $dbForPlatform, $dbForProject),
DATABASE_TYPE_CREATE_ATTRIBUTE => $this->createAttribute($database, $collection, $document, $project, $dbForPlatform, $dbForProject, $queueForRealtime),
DATABASE_TYPE_DELETE_ATTRIBUTE => $this->deleteAttribute($database, $collection, $document, $project, $dbForPlatform, $dbForProject, $queueForRealtime),
DATABASE_TYPE_CREATE_INDEX => $this->createIndex($database, $collection, $document, $project, $dbForPlatform, $dbForProject, $queueForRealtime),
DATABASE_TYPE_DELETE_INDEX => $this->deleteIndex($database, $collection, $document, $project, $dbForPlatform, $dbForProject, $queueForRealtime),
default => throw new \Exception('No database operation for type: ' . \strval($type)),
};
}
@@ -90,13 +92,14 @@ class Databases extends Action
* @param Document $project
* @param Database $dbForPlatform
* @param Database $dbForProject
* @param Realtime $queueForRealtime
* @return void
* @throws Authorization
* @throws Conflict
* @throws \Exception
* @throws \Throwable
*/
private function createAttribute(Document $database, Document $collection, Document $attribute, Document $project, Database $dbForPlatform, Database $dbForProject): void
private function createAttribute(Document $database, Document $collection, Document $attribute, Document $project, Database $dbForPlatform, Database $dbForProject, Realtime $queueForRealtime): void
{
if ($collection->isEmpty()) {
throw new Exception('Missing collection');
@@ -106,12 +109,6 @@ class Databases extends Action
}
$projectId = $project->getId();
$events = Event::generateEvents('databases.[databaseId].collections.[collectionId].attributes.[attributeId].update', [
'databaseId' => $database->getId(),
'collectionId' => $collection->getId(),
'attributeId' => $attribute->getId()
]);
/**
* TODO @christyjacob4 verify if this is still the case
* Fetch attribute from the database, since with Resque float values are loosing informations.
@@ -200,7 +197,7 @@ class Databases extends Action
throw $e;
} finally {
$this->trigger($database, $collection, $attribute, $project, $projectId, $events);
$this->trigger($database, $collection, $attribute, $project, $queueForRealtime);
if (! $relatedCollection->isEmpty()) {
$dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $relatedCollection->getId());
@@ -217,13 +214,14 @@ class Databases extends Action
* @param Document $project
* @param Database $dbForPlatform
* @param Database $dbForProject
* @param Realtime $queueForRealtime
* @return void
* @throws Authorization
* @throws Conflict
* @throws \Exception
* @throws \Throwable
**/
private function deleteAttribute(Document $database, Document $collection, Document $attribute, Document $project, Database $dbForPlatform, Database $dbForProject): void
private function deleteAttribute(Document $database, Document $collection, Document $attribute, Document $project, Database $dbForPlatform, Database $dbForProject, Realtime $queueForRealtime): void
{
if ($collection->isEmpty()) {
throw new Exception('Missing collection');
@@ -312,7 +310,7 @@ class Databases extends Action
throw $e;
} finally {
$this->trigger($database, $collection, $attribute, $project, $projectId, $events);
$this->trigger($database, $collection, $attribute, $project, $queueForRealtime);
}
// The underlying database removes/rebuilds indexes when attribute is removed
@@ -358,7 +356,7 @@ class Databases extends Action
}
if ($exists) { // Delete the duplicate if created, else update in db
$this->deleteIndex($database, $collection, $index, $project, $dbForPlatform, $dbForProject);
$this->deleteIndex($database, $collection, $index, $project, $dbForPlatform, $dbForProject, $queueForRealtime);
} else {
$dbForProject->updateDocument('indexes', $index->getId(), $index);
}
@@ -381,6 +379,7 @@ class Databases extends Action
* @param Document $project
* @param Database $dbForPlatform
* @param Database $dbForProject
* @param Realtime $queueForRealtime
* @return void
* @throws Authorization
* @throws Conflict
@@ -388,7 +387,7 @@ class Databases extends Action
* @throws DatabaseException
* @throws \Throwable
*/
private function createIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForPlatform, Database $dbForProject): void
private function createIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForPlatform, Database $dbForProject, Realtime $queueForRealtime): void
{
if ($collection->isEmpty()) {
throw new Exception('Missing collection');
@@ -430,7 +429,7 @@ class Databases extends Action
throw $e;
} finally {
$this->trigger($database, $collection, $index, $project, $projectId, $events);
$this->trigger($database, $collection, $index, $project, $queueForRealtime);
$dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $collectionId);
}
}
@@ -442,6 +441,7 @@ class Databases extends Action
* @param Document $project
* @param Database $dbForPlatform
* @param Database $dbForProject
* @param Realtime $queueForRealtime
* @return void
* @throws Authorization
* @throws Conflict
@@ -449,7 +449,7 @@ class Databases extends Action
* @throws DatabaseException
* @throws \Throwable
*/
private function deleteIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForPlatform, Database $dbForProject): void
private function deleteIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForPlatform, Database $dbForProject, Realtime $queueForRealtime): void
{
if ($collection->isEmpty()) {
throw new Exception('Missing collection');
@@ -490,7 +490,7 @@ class Databases extends Action
throw $e;
} finally {
$this->trigger($database, $collection, $index, $project, $projectId, $events);
$this->trigger($database, $collection, $index, $project, $queueForRealtime);
$dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $collection->getId());
}
}
@@ -617,26 +617,16 @@ class Databases extends Action
Document $collection,
Document $attribute,
Document $project,
string $projectId,
array $events
Realtime $queueForRealtime
): void {
$target = Realtime::fromPayload(
// Pass first, most verbose event pattern
event: $events[0],
payload: $attribute,
project: $project,
);
Realtime::send(
projectId: 'console',
payload: $attribute->getArrayCopy(),
events: $events,
channels: $target['channels'],
roles: $target['roles'],
options: [
'projectId' => $projectId,
'databaseId' => $database->getId(),
'collectionId' => $collection->getId()
]
);
$queueForRealtime
->setProject($project)
->setProjectId('console')
->setEvent('databases.[databaseId].collections.[collectionId].attributes.[attributeId].update')
->setParam('databaseId', $database->getId())
->setParam('collectionId', $collection->getId())
->setParam('attributeId', $attribute->getId())
->setPayload($attribute->getArrayCopy())
->trigger();
}
}