From 65d1e58e345168870de496f33d30b4276c7b3f0a Mon Sep 17 00:00:00 2001 From: Chirag Aggarwal Date: Thu, 14 May 2026 14:40:11 +0530 Subject: [PATCH 1/4] refactor: migrate delete queue publisher --- app/cli.php | 9 ++- app/controllers/api/account.php | 53 +++++++------ app/controllers/api/messaging.php | 15 ++-- app/controllers/api/users.php | 27 ++++--- app/controllers/general.php | 41 +++++----- app/controllers/shared/api.php | 12 +-- app/init/resources.php | 5 ++ app/init/resources/request.php | 7 +- app/init/worker/message.php | 10 +-- src/Appwrite/Event/Message/Delete.php | 45 +++++++++++ src/Appwrite/Event/Publisher/Delete.php | 27 +++++++ .../Modules/Advisor/Http/Reports/Delete.php | 15 ++-- .../Http/Databases/Transactions/Delete.php | 17 +++-- .../Http/Databases/Transactions/Update.php | 33 ++++---- .../Http/DocumentsDB/Transactions/Delete.php | 3 +- .../Http/DocumentsDB/Transactions/Update.php | 2 +- .../Http/TablesDB/Transactions/Delete.php | 3 +- .../Http/TablesDB/Transactions/Update.php | 2 +- .../Http/VectorsDB/Transactions/Delete.php | 3 +- .../Http/VectorsDB/Transactions/Update.php | 2 +- .../Functions/Http/Deployments/Delete.php | 15 ++-- .../Functions/Http/Executions/Create.php | 31 ++++---- .../Functions/Http/Functions/Delete.php | 15 ++-- .../Health/Http/Health/Queue/Deletes/Get.php | 8 +- .../Health/Http/Health/Queue/Failed/Get.php | 8 +- .../Modules/Project/Http/Project/Delete.php | 18 +++-- .../Modules/Proxy/Http/Rules/Delete.php | 15 ++-- .../Modules/Sites/Http/Deployments/Delete.php | 15 ++-- .../Modules/Sites/Http/Sites/Delete.php | 15 ++-- .../Modules/Storage/Http/Buckets/Delete.php | 15 ++-- .../Storage/Http/Buckets/Files/Delete.php | 18 +++-- .../Modules/Teams/Http/Teams/Delete.php | 24 +++--- .../Modules/VCS/Http/Installations/Delete.php | 18 +++-- src/Appwrite/Platform/Tasks/Maintenance.php | 75 +++++++++---------- src/Appwrite/Platform/Workers/Deletes.php | 38 +++++----- 35 files changed, 400 insertions(+), 259 deletions(-) create mode 100644 src/Appwrite/Event/Message/Delete.php create mode 100644 src/Appwrite/Event/Publisher/Delete.php diff --git a/app/cli.php b/app/cli.php index 8dfbaf4e9a..c5f436215e 100644 --- a/app/cli.php +++ b/app/cli.php @@ -2,9 +2,9 @@ require_once __DIR__ . '/init.php'; -use Appwrite\Event\Delete; use Appwrite\Event\Event; use Appwrite\Event\Publisher\Certificate as CertificatePublisher; +use Appwrite\Event\Publisher\Delete as DeletePublisher; use Appwrite\Event\Publisher\Func as FunctionPublisher; use Appwrite\Event\Publisher\StatsResources as StatsResourcesPublisher; use Appwrite\Event\Publisher\Usage as UsagePublisher; @@ -285,9 +285,10 @@ $container->set('publisherForFunctions', fn (Publisher $publisher) => new Functi $publisher, new Queue(System::getEnv('_APP_FUNCTIONS_QUEUE_NAME', Event::FUNCTIONS_QUEUE_NAME), 'utopia-queue', Event::FUNCTIONS_QUEUE_TTL) ), ['publisher']); -$container->set('queueForDeletes', function (Publisher $publisher) { - return new Delete($publisher); -}, ['publisher']); +$container->set('publisherForDeletes', fn (Publisher $publisher) => new DeletePublisher( + $publisher, + new Queue(System::getEnv('_APP_DELETE_QUEUE_NAME', Event::DELETE_QUEUE_NAME)) +), ['publisher']); $container->set('logError', function (Registry $register) { return function (Throwable $error, string $namespace, string $action) use ($register) { Console::error('[Error] Timestamp: ' . date('c', time())); diff --git a/app/controllers/api/account.php b/app/controllers/api/account.php index e01c27e45c..e12263a009 100644 --- a/app/controllers/api/account.php +++ b/app/controllers/api/account.php @@ -11,10 +11,11 @@ use Appwrite\Auth\Validator\PersonalData; use Appwrite\Auth\Validator\Phone; use Appwrite\Bus\Events\SessionCreated; use Appwrite\Detector\Detector; -use Appwrite\Event\Delete; use Appwrite\Event\Event; +use Appwrite\Event\Message\Delete as DeleteMessage; use Appwrite\Event\Message\Mail as MailMessage; use Appwrite\Event\Message\Messaging as MessagingMessage; +use Appwrite\Event\Publisher\Delete as DeletePublisher; use Appwrite\Event\Publisher\Mail as MailPublisher; use Appwrite\Event\Publisher\Messaging as MessagingPublisher; use Appwrite\Extend\Exception; @@ -472,9 +473,9 @@ Http::delete('/v1/account') ->inject('response') ->inject('dbForProject') ->inject('queueForEvents') - ->inject('queueForDeletes') + ->inject('publisherForDeletes') ->inject('authorization') - ->action(function (Document $user, Document $project, Response $response, Database $dbForProject, Event $queueForEvents, Delete $queueForDeletes, Authorization $authorization) { + ->action(function (Document $user, Document $project, Response $response, Database $dbForProject, Event $queueForEvents, DeletePublisher $publisherForDeletes, Authorization $authorization) { if ($user->isEmpty()) { throw new Exception(Exception::USER_NOT_FOUND); } @@ -498,9 +499,11 @@ Http::delete('/v1/account') $dbForProject->deleteDocument('users', $user->getId()); - $queueForDeletes - ->setType(DELETE_TYPE_DOCUMENT) - ->setDocument($user); + $publisherForDeletes->enqueue(new DeleteMessage( + project: $project, + type: DELETE_TYPE_DOCUMENT, + document: $user, + )); $queueForEvents ->setParam('userId', $user->getId()) @@ -582,12 +585,12 @@ Http::delete('/v1/account/sessions') ->inject('dbForProject') ->inject('locale') ->inject('queueForEvents') - ->inject('queueForDeletes') + ->inject('publisherForDeletes') ->inject('store') ->inject('proofForToken') ->inject('domainVerification') ->inject('cookieDomain') - ->action(function (Request $request, Response $response, User $user, Database $dbForProject, Locale $locale, Event $queueForEvents, Delete $queueForDeletes, Store $store, ProofsToken $proofForToken, bool $domainVerification, ?string $cookieDomain) { + ->action(function (Request $request, Response $response, User $user, Database $dbForProject, Locale $locale, Event $queueForEvents, DeletePublisher $publisherForDeletes, Store $store, ProofsToken $proofForToken, bool $domainVerification, ?string $cookieDomain) { $protocol = $request->getProtocol(); $sessions = $user->getAttribute('sessions', []); @@ -617,10 +620,11 @@ Http::delete('/v1/account/sessions') $queueForEvents ->setPayload($response->output($session, Response::MODEL_SESSION)); - $queueForDeletes - ->setType(DELETE_TYPE_SESSION_TARGETS) - ->setDocument($session) - ->trigger(); + $publisherForDeletes->enqueue(new DeleteMessage( + project: $queueForEvents->getProject(), + type: DELETE_TYPE_SESSION_TARGETS, + document: $session, + )); } } @@ -714,12 +718,12 @@ Http::delete('/v1/account/sessions/:sessionId') ->inject('dbForProject') ->inject('locale') ->inject('queueForEvents') - ->inject('queueForDeletes') + ->inject('publisherForDeletes') ->inject('store') ->inject('proofForToken') ->inject('domainVerification') ->inject('cookieDomain') - ->action(function (?string $sessionId, ?\DateTime $requestTimestamp, Request $request, Response $response, User $user, Database $dbForProject, Locale $locale, Event $queueForEvents, Delete $queueForDeletes, Store $store, ProofsToken $proofForToken, bool $domainVerification, ?string $cookieDomain) { + ->action(function (?string $sessionId, ?\DateTime $requestTimestamp, Request $request, Response $response, User $user, Database $dbForProject, Locale $locale, Event $queueForEvents, DeletePublisher $publisherForDeletes, Store $store, ProofsToken $proofForToken, bool $domainVerification, ?string $cookieDomain) { $protocol = $request->getProtocol(); $sessionId = ($sessionId === 'current') @@ -761,10 +765,11 @@ Http::delete('/v1/account/sessions/:sessionId') ->setParam('sessionId', $session->getId()) ->setPayload($response->output($session, Response::MODEL_SESSION)); - $queueForDeletes - ->setType(DELETE_TYPE_SESSION_TARGETS) - ->setDocument($session) - ->trigger(); + $publisherForDeletes->enqueue(new DeleteMessage( + project: $queueForEvents->getProject(), + type: DELETE_TYPE_SESSION_TARGETS, + document: $session, + )); $response->noContent(); return; @@ -4675,13 +4680,13 @@ Http::delete('/v1/account/targets/:targetId/push') )) ->param('targetId', '', fn (Database $dbForProject) => new UID($dbForProject->getAdapter()->getMaxUIDLength()), 'Target ID.', false, ['dbForProject']) ->inject('queueForEvents') - ->inject('queueForDeletes') + ->inject('publisherForDeletes') ->inject('user') ->inject('request') ->inject('response') ->inject('dbForProject') ->inject('authorization') - ->action(function (string $targetId, Event $queueForEvents, Delete $queueForDeletes, Document $user, Request $request, Response $response, Database $dbForProject, Authorization $authorization) { + ->action(function (string $targetId, Event $queueForEvents, DeletePublisher $publisherForDeletes, Document $user, Request $request, Response $response, Database $dbForProject, Authorization $authorization) { $target = $authorization->skip(fn () => $dbForProject->getDocument('targets', $targetId)); if ($target->isEmpty()) { @@ -4696,9 +4701,11 @@ Http::delete('/v1/account/targets/:targetId/push') $dbForProject->purgeCachedDocument('users', $user->getId()); - $queueForDeletes - ->setType(DELETE_TYPE_TARGET) - ->setDocument($target); + $publisherForDeletes->enqueue(new DeleteMessage( + project: $queueForEvents->getProject(), + type: DELETE_TYPE_TARGET, + document: $target, + )); $queueForEvents ->setParam('userId', $user->getId()) diff --git a/app/controllers/api/messaging.php b/app/controllers/api/messaging.php index f59f606174..d1ffa2e478 100644 --- a/app/controllers/api/messaging.php +++ b/app/controllers/api/messaging.php @@ -3,9 +3,10 @@ use Ahc\Jwt\JWT; use Appwrite\Auth\Validator\Phone; use Appwrite\Detector\Detector; -use Appwrite\Event\Delete; use Appwrite\Event\Event; +use Appwrite\Event\Message\Delete as DeleteMessage; use Appwrite\Event\Message\Messaging as MessagingMessage; +use Appwrite\Event\Publisher\Delete as DeletePublisher; use Appwrite\Event\Publisher\Messaging as MessagingPublisher; use Appwrite\Extend\Exception; use Appwrite\Messaging\Status as MessageStatus; @@ -2728,9 +2729,9 @@ Http::delete('/v1/messaging/topics/:topicId') ->param('topicId', '', fn (Database $dbForProject) => new UID($dbForProject->getAdapter()->getMaxUIDLength()), 'Topic ID.', false, ['dbForProject']) ->inject('queueForEvents') ->inject('dbForProject') - ->inject('queueForDeletes') + ->inject('publisherForDeletes') ->inject('response') - ->action(function (string $topicId, Event $queueForEvents, Database $dbForProject, Delete $queueForDeletes, Response $response) { + ->action(function (string $topicId, Event $queueForEvents, Database $dbForProject, DeletePublisher $publisherForDeletes, Response $response) { $topic = $dbForProject->getDocument('topics', $topicId); if ($topic->isEmpty()) { @@ -2739,9 +2740,11 @@ Http::delete('/v1/messaging/topics/:topicId') $dbForProject->deleteDocument('topics', $topicId); - $queueForDeletes - ->setType(DELETE_TYPE_TOPIC) - ->setDocument($topic); + $publisherForDeletes->enqueue(new DeleteMessage( + project: $queueForEvents->getProject(), + type: DELETE_TYPE_TOPIC, + document: $topic, + )); $queueForEvents ->setParam('topicId', $topic->getId()); diff --git a/app/controllers/api/users.php b/app/controllers/api/users.php index 3f52069609..ccd7cf4661 100644 --- a/app/controllers/api/users.php +++ b/app/controllers/api/users.php @@ -11,8 +11,9 @@ use Appwrite\Auth\Validator\Phone; use Appwrite\Deletes\Identities as DeleteIdentities; use Appwrite\Deletes\Targets as DeleteTargets; use Appwrite\Detector\Detector; -use Appwrite\Event\Delete; use Appwrite\Event\Event; +use Appwrite\Event\Message\Delete as DeleteMessage; +use Appwrite\Event\Publisher\Delete as DeletePublisher; use Appwrite\Extend\Exception; use Appwrite\Hooks\Hooks; use Appwrite\SDK\AuthType; @@ -2592,8 +2593,8 @@ Http::delete('/v1/users/:userId') ->inject('response') ->inject('dbForProject') ->inject('queueForEvents') - ->inject('queueForDeletes') - ->action(function (string $userId, Response $response, Database $dbForProject, Event $queueForEvents, Delete $queueForDeletes) { + ->inject('publisherForDeletes') + ->action(function (string $userId, Response $response, Database $dbForProject, Event $queueForEvents, DeletePublisher $publisherForDeletes) { $user = $dbForProject->getDocument('users', $userId); @@ -2608,9 +2609,11 @@ Http::delete('/v1/users/:userId') DeleteIdentities::delete($dbForProject, Query::equal('userInternalId', [$user->getSequence()])); DeleteTargets::delete($dbForProject, Query::equal('userInternalId', [$user->getSequence()])); - $queueForDeletes - ->setType(DELETE_TYPE_DOCUMENT) - ->setDocument($clone); + $publisherForDeletes->enqueue(new DeleteMessage( + project: $queueForEvents->getProject(), + type: DELETE_TYPE_DOCUMENT, + document: $clone, + )); $queueForEvents ->setParam('userId', $user->getId()) @@ -2643,10 +2646,10 @@ Http::delete('/v1/users/:userId/targets/:targetId') ->param('userId', '', fn (Database $dbForProject) => new UID($dbForProject->getAdapter()->getMaxUIDLength()), 'User ID.', false, ['dbForProject']) ->param('targetId', '', fn (Database $dbForProject) => new UID($dbForProject->getAdapter()->getMaxUIDLength()), 'Target ID.', false, ['dbForProject']) ->inject('queueForEvents') - ->inject('queueForDeletes') + ->inject('publisherForDeletes') ->inject('response') ->inject('dbForProject') - ->action(function (string $userId, string $targetId, Event $queueForEvents, Delete $queueForDeletes, Response $response, Database $dbForProject) { + ->action(function (string $userId, string $targetId, Event $queueForEvents, DeletePublisher $publisherForDeletes, Response $response, Database $dbForProject) { $user = $dbForProject->getDocument('users', $userId); if ($user->isEmpty()) { @@ -2666,9 +2669,11 @@ Http::delete('/v1/users/:userId/targets/:targetId') $dbForProject->deleteDocument('targets', $target->getId()); $dbForProject->purgeCachedDocument('users', $user->getId()); - $queueForDeletes - ->setType(DELETE_TYPE_TARGET) - ->setDocument($target); + $publisherForDeletes->enqueue(new DeleteMessage( + project: $queueForEvents->getProject(), + type: DELETE_TYPE_TARGET, + document: $target, + )); $queueForEvents ->setParam('userId', $user->getId()) diff --git a/app/controllers/general.php b/app/controllers/general.php index dbcfa7f754..6ca0a63ee2 100644 --- a/app/controllers/general.php +++ b/app/controllers/general.php @@ -7,9 +7,10 @@ use Ahc\Jwt\JWTException; use Appwrite\Auth\Key; use Appwrite\Bus\Events\ExecutionCompleted; use Appwrite\Bus\Events\RequestCompleted; -use Appwrite\Event\Delete as DeleteEvent; use Appwrite\Event\Event; +use Appwrite\Event\Message\Delete as DeleteMessage; use Appwrite\Event\Publisher\Certificate; +use Appwrite\Event\Publisher\Delete as DeletePublisher; use Appwrite\Extend\Exception as AppwriteException; use Appwrite\Network\Cors; use Appwrite\Platform\Appwrite; @@ -74,7 +75,7 @@ use Utopia\Validator\Text; Config::setParam('cookieSamesite', Response::COOKIE_SAMESITE_NONE); -function router(Http $utopia, Database $dbForPlatform, callable $getProjectDB, SwooleRequest $swooleRequest, Request $request, Response $response, Log $log, Event $queueForEvents, Bus $bus, Executor $executor, Reader $geodb, callable $isResourceBlocked, array $platform, string $previewHostname, Authorization $authorization, ?Key $apiKey, DeleteEvent $queueForDeletes, int $executionsRetentionCount) +function router(Http $utopia, Database $dbForPlatform, callable $getProjectDB, SwooleRequest $swooleRequest, Request $request, Response $response, Log $log, Event $queueForEvents, Bus $bus, Executor $executor, Reader $geodb, callable $isResourceBlocked, array $platform, string $previewHostname, Authorization $authorization, ?Key $apiKey, DeletePublisher $publisherForDeletes, int $executionsRetentionCount) { $host = $request->getHostname(); if (!empty($previewHostname)) { @@ -790,12 +791,12 @@ function router(Http $utopia, Database $dbForPlatform, callable $getProjectDB, S ? RESOURCE_TYPE_FUNCTIONS : RESOURCE_TYPE_SITES; - $queueForDeletes - ->setProject($project) - ->setResourceType($resourceType) - ->setResource($resource->getSequence()) - ->setType(DELETE_TYPE_EXECUTIONS_LIMIT) - ->trigger(); + $publisherForDeletes->enqueue(new DeleteMessage( + project: $project, + type: DELETE_TYPE_EXECUTIONS_LIMIT, + resource: (string) $resource->getSequence(), + resourceType: $resourceType, + )); } return true; @@ -856,9 +857,9 @@ Http::init() ->inject('apiKey') ->inject('cors') ->inject('authorization') - ->inject('queueForDeletes') + ->inject('publisherForDeletes') ->inject('executionsRetentionCount') - ->action(function (Http $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Log $log, Document $project, Database $dbForPlatform, callable $getProjectDB, Locale $locale, array $localeCodes, Reader $geodb, Event $queueForEvents, Bus $bus, Executor $executor, array $platform, callable $isResourceBlocked, string $previewHostname, Document $devKey, ?Key $apiKey, Cors $cors, Authorization $authorization, DeleteEvent $queueForDeletes, int $executionsRetentionCount) { + ->action(function (Http $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Log $log, Document $project, Database $dbForPlatform, callable $getProjectDB, Locale $locale, array $localeCodes, Reader $geodb, Event $queueForEvents, Bus $bus, Executor $executor, array $platform, callable $isResourceBlocked, string $previewHostname, Document $devKey, ?Key $apiKey, Cors $cors, Authorization $authorization, DeletePublisher $publisherForDeletes, int $executionsRetentionCount) { /* * Appwrite Router */ @@ -866,7 +867,7 @@ Http::init() $platformHostnames = $platform['hostnames'] ?? []; // Only run Router when external domain if (!\in_array($hostname, $platformHostnames) || !empty($previewHostname)) { - if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $log, $queueForEvents, $bus, $executor, $geodb, $isResourceBlocked, $platform, $previewHostname, $authorization, $apiKey, $queueForDeletes, $executionsRetentionCount)) { + if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $log, $queueForEvents, $bus, $executor, $geodb, $isResourceBlocked, $platform, $previewHostname, $authorization, $apiKey, $publisherForDeletes, $executionsRetentionCount)) { $utopia->getRoute()?->label('router', true); } } @@ -1167,16 +1168,16 @@ Http::options() ->inject('apiKey') ->inject('cors') ->inject('authorization') - ->inject('queueForDeletes') + ->inject('publisherForDeletes') ->inject('executionsRetentionCount') - ->action(function (Http $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Log $log, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, Bus $bus, Executor $executor, Reader $geodb, callable $isResourceBlocked, array $platform, string $previewHostname, Document $project, Document $devKey, ?Key $apiKey, Cors $cors, Authorization $authorization, DeleteEvent $queueForDeletes, int $executionsRetentionCount) { + ->action(function (Http $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Log $log, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, Bus $bus, Executor $executor, Reader $geodb, callable $isResourceBlocked, array $platform, string $previewHostname, Document $project, Document $devKey, ?Key $apiKey, Cors $cors, Authorization $authorization, DeletePublisher $publisherForDeletes, int $executionsRetentionCount) { /* * Appwrite Router */ $platformHostnames = $platform['hostnames'] ?? []; // Only run Router when external domain if (!in_array($request->getHostname(), $platformHostnames) || !empty($previewHostname)) { - if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $log, $queueForEvents, $bus, $executor, $geodb, $isResourceBlocked, $platform, $previewHostname, $authorization, $apiKey, $queueForDeletes, $executionsRetentionCount)) { + if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $log, $queueForEvents, $bus, $executor, $geodb, $isResourceBlocked, $platform, $previewHostname, $authorization, $apiKey, $publisherForDeletes, $executionsRetentionCount)) { $utopia->getRoute()?->label('router', true); } } @@ -1569,15 +1570,15 @@ Http::get('/robots.txt') ->inject('previewHostname') ->inject('apiKey') ->inject('authorization') - ->inject('queueForDeletes') + ->inject('publisherForDeletes') ->inject('executionsRetentionCount') - ->action(function (Http $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Log $log, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, Bus $bus, Executor $executor, Reader $geodb, callable $isResourceBlocked, array $platform, string $previewHostname, ?Key $apiKey, Authorization $authorization, DeleteEvent $queueForDeletes, int $executionsRetentionCount) { + ->action(function (Http $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Log $log, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, Bus $bus, Executor $executor, Reader $geodb, callable $isResourceBlocked, array $platform, string $previewHostname, ?Key $apiKey, Authorization $authorization, DeletePublisher $publisherForDeletes, int $executionsRetentionCount) { $platformHostnames = $platform['hostnames'] ?? []; if (in_array($request->getHostname(), $platformHostnames) || !empty($previewHostname)) { $template = new View(__DIR__ . '/../views/general/robots.phtml'); $response->text($template->render(false)); } else { - if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $log, $queueForEvents, $bus, $executor, $geodb, $isResourceBlocked, $platform, $previewHostname, $authorization, $apiKey, $queueForDeletes, $executionsRetentionCount)) { + if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $log, $queueForEvents, $bus, $executor, $geodb, $isResourceBlocked, $platform, $previewHostname, $authorization, $apiKey, $publisherForDeletes, $executionsRetentionCount)) { $utopia->getRoute()?->label('router', true); } } @@ -1603,15 +1604,15 @@ Http::get('/humans.txt') ->inject('previewHostname') ->inject('apiKey') ->inject('authorization') - ->inject('queueForDeletes') + ->inject('publisherForDeletes') ->inject('executionsRetentionCount') - ->action(function (Http $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Log $log, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, Bus $bus, Executor $executor, Reader $geodb, callable $isResourceBlocked, array $platform, string $previewHostname, ?Key $apiKey, Authorization $authorization, DeleteEvent $queueForDeletes, int $executionsRetentionCount) { + ->action(function (Http $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Log $log, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, Bus $bus, Executor $executor, Reader $geodb, callable $isResourceBlocked, array $platform, string $previewHostname, ?Key $apiKey, Authorization $authorization, DeletePublisher $publisherForDeletes, int $executionsRetentionCount) { $platformHostnames = $platform['hostnames'] ?? []; if (in_array($request->getHostname(), $platformHostnames) || !empty($previewHostname)) { $template = new View(__DIR__ . '/../views/general/humans.phtml'); $response->text($template->render(false)); } else { - if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $log, $queueForEvents, $bus, $executor, $geodb, $isResourceBlocked, $platform, $previewHostname, $authorization, $apiKey, $queueForDeletes, $executionsRetentionCount)) { + if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $log, $queueForEvents, $bus, $executor, $geodb, $isResourceBlocked, $platform, $previewHostname, $authorization, $apiKey, $publisherForDeletes, $executionsRetentionCount)) { $utopia->getRoute()?->label('router', true); } } diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index 6f808296b0..eb99a656c0 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -5,7 +5,6 @@ use Appwrite\Auth\MFA\Type\TOTP; use Appwrite\Bus\Events\RequestCompleted; use Appwrite\Event\Context\Audit as AuditContext; use Appwrite\Event\Database as EventDatabase; -use Appwrite\Event\Delete; use Appwrite\Event\Event; use Appwrite\Event\Message\Audit as AuditMessage; use Appwrite\Event\Message\Func as FunctionMessage; @@ -565,7 +564,6 @@ Http::init() ->inject('user') ->inject('queueForEvents') ->inject('auditContext') - ->inject('queueForDeletes') ->inject('queueForDatabase') ->inject('usage') ->inject('publisherForFunctions') @@ -578,7 +576,7 @@ Http::init() ->inject('platform') ->inject('authorization') ->inject('cacheControlForStorage') - ->action(function (Http $utopia, Request $request, Response $response, Document $project, User $user, Event $queueForEvents, AuditContext $auditContext, Delete $queueForDeletes, EventDatabase $queueForDatabase, Context $usage, FunctionPublisher $publisherForFunctions, Database $dbForProject, Document $resourceToken, string $mode, ?Key $apiKey, array $plan, Telemetry $telemetry, array $platform, Authorization $authorization, callable $cacheControlForStorage) { + ->action(function (Http $utopia, Request $request, Response $response, Document $project, User $user, Event $queueForEvents, AuditContext $auditContext, EventDatabase $queueForDatabase, Context $usage, FunctionPublisher $publisherForFunctions, Database $dbForProject, Document $resourceToken, string $mode, ?Key $apiKey, array $plan, Telemetry $telemetry, array $platform, Authorization $authorization, callable $cacheControlForStorage) { $response->setUser($user); $request->setUser($user); @@ -625,7 +623,6 @@ Http::init() } /* Auto-set projects */ - $queueForDeletes->setProject($project); $queueForDatabase->setProject($project); $useCache = $route->getLabel('cache', false); @@ -818,7 +815,6 @@ Http::shutdown() ->inject('publisherForAudits') ->inject('usage') ->inject('publisherForUsage') - ->inject('queueForDeletes') ->inject('queueForDatabase') ->inject('publisherForFunctions') ->inject('queueForWebhooks') @@ -830,7 +826,7 @@ Http::shutdown() ->inject('bus') ->inject('apiKey') ->inject('mode') - ->action(function (Http $utopia, Request $request, Response $response, Document $project, User $user, Event $queueForEvents, AuditContext $auditContext, Audit $publisherForAudits, Context $usage, UsagePublisher $publisherForUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, FunctionPublisher $publisherForFunctions, Event $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject, Authorization $authorization, callable $timelimit, EventProcessor $eventProcessor, Bus $bus, ?Key $apiKey, string $mode) use ($parseLabel) { + ->action(function (Http $utopia, Request $request, Response $response, Document $project, User $user, Event $queueForEvents, AuditContext $auditContext, Audit $publisherForAudits, Context $usage, UsagePublisher $publisherForUsage, EventDatabase $queueForDatabase, FunctionPublisher $publisherForFunctions, Event $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject, Authorization $authorization, callable $timelimit, EventProcessor $eventProcessor, Bus $bus, ?Key $apiKey, string $mode) use ($parseLabel) { $responsePayload = $response->getPayload(); @@ -977,10 +973,6 @@ Http::shutdown() $publisherForAudits->enqueue(AuditMessage::fromContext($auditContext)); } - if (! empty($queueForDeletes->getType())) { - $queueForDeletes->trigger(); - } - if (! empty($queueForDatabase->getType())) { $queueForDatabase->trigger(); } diff --git a/app/init/resources.php b/app/init/resources.php index dbaa89b21d..64f16b4c05 100644 --- a/app/init/resources.php +++ b/app/init/resources.php @@ -4,6 +4,7 @@ use Appwrite\Event\Event; use Appwrite\Event\Publisher\Audit as AuditPublisher; use Appwrite\Event\Publisher\Build as BuildPublisher; use Appwrite\Event\Publisher\Certificate as CertificatePublisher; +use Appwrite\Event\Publisher\Delete as DeletePublisher; use Appwrite\Event\Publisher\Execution as ExecutionPublisher; use Appwrite\Event\Publisher\Func as FunctionPublisher; use Appwrite\Event\Publisher\Mail as MailPublisher; @@ -125,6 +126,10 @@ $container->set('publisherForBuilds', fn (Publisher $publisher) => new BuildPubl $publisher, new Queue(System::getEnv('_APP_BUILDS_QUEUE_NAME', Event::BUILDS_QUEUE_NAME)) ), ['publisher']); +$container->set('publisherForDeletes', fn (Publisher $publisher) => new DeletePublisher( + $publisher, + new Queue(System::getEnv('_APP_DELETE_QUEUE_NAME', Event::DELETE_QUEUE_NAME)) +), ['publisher']); $container->set('publisherForMails', fn (Publisher $publisher) => new MailPublisher( $publisher, new Queue(System::getEnv('_APP_MAILS_QUEUE_NAME', Event::MAILS_QUEUE_NAME)) diff --git a/app/init/resources/request.php b/app/init/resources/request.php index 68f5968519..a6ce80b316 100644 --- a/app/init/resources/request.php +++ b/app/init/resources/request.php @@ -6,9 +6,9 @@ use Appwrite\Auth\Key; use Appwrite\Databases\TransactionState; use Appwrite\Event\Context\Audit as AuditContext; use Appwrite\Event\Database as EventDatabase; -use Appwrite\Event\Delete; use Appwrite\Event\Event; use Appwrite\Event\Message\Func as FunctionMessage; +use Appwrite\Event\Publisher\Delete as DeletePublisher; use Appwrite\Event\Publisher\Func as FunctionPublisher; use Appwrite\Event\Realtime; use Appwrite\Event\Webhook; @@ -109,7 +109,6 @@ return function (Container $context): void { // Per-request queue resources (stateful, accumulate event data during request) $context->set('queueForDatabase', fn (Publisher $publisher) => new EventDatabase($publisher), ['publisher']); - $context->set('queueForDeletes', fn (Publisher $publisher) => new Delete($publisher), ['publisher']); $context->set('queueForEvents', fn (Publisher $publisher) => new Event($publisher), ['publisher']); $context->set('queueForWebhooks', fn (Publisher $publisher) => new Webhook($publisher), ['publisher']); $context->set('queueForRealtime', fn () => new Realtime(), []); @@ -119,6 +118,10 @@ return function (Container $context): void { $publisher, new Queue(System::getEnv('_APP_FUNCTIONS_QUEUE_NAME', Event::FUNCTIONS_QUEUE_NAME), 'utopia-queue', Event::FUNCTIONS_QUEUE_TTL) ), ['publisher']); + $context->set('publisherForDeletes', fn (Publisher $publisher) => new DeletePublisher( + $publisher, + new Queue(System::getEnv('_APP_DELETE_QUEUE_NAME', Event::DELETE_QUEUE_NAME)) + ), ['publisher']); $context->set('eventProcessor', fn () => new EventProcessor(), []); $context->set('dbForPlatform', function (Group $pools, Cache $cache, Authorization $authorization) { $adapter = new DatabasePool($pools->get('console')); diff --git a/app/init/worker/message.php b/app/init/worker/message.php index d4aea0c51e..16f180b2c2 100644 --- a/app/init/worker/message.php +++ b/app/init/worker/message.php @@ -1,8 +1,8 @@ set('queueForDeletes', function (Publisher $publisher) { - return new Delete($publisher); - }, ['publisher']); - $container->set('queueForEvents', function (Publisher $publisher) { return new Event($publisher); }, ['publisher']); @@ -348,6 +344,10 @@ return function (Container $container): void { $publisher, new Queue(System::getEnv('_APP_FUNCTIONS_QUEUE_NAME', Event::FUNCTIONS_QUEUE_NAME), 'utopia-queue', Event::FUNCTIONS_QUEUE_TTL) ), ['publisher']); + $container->set('publisherForDeletes', fn (Publisher $publisher) => new DeletePublisher( + $publisher, + new Queue(System::getEnv('_APP_DELETE_QUEUE_NAME', Event::DELETE_QUEUE_NAME)) + ), ['publisher']); $container->set('queueForRealtime', function () { return new Realtime(); diff --git a/src/Appwrite/Event/Message/Delete.php b/src/Appwrite/Event/Message/Delete.php new file mode 100644 index 0000000000..6866cf3f02 --- /dev/null +++ b/src/Appwrite/Event/Message/Delete.php @@ -0,0 +1,45 @@ + $this->project?->getArrayCopy(), + 'type' => $this->type, + 'document' => $this->document?->getArrayCopy(), + 'resource' => $this->resource, + 'resourceType' => $this->resourceType, + 'datetime' => $this->datetime, + 'hourlyUsageRetentionDatetime' => $this->hourlyUsageRetentionDatetime, + ]; + } + + public static function fromArray(array $data): static + { + return new self( + project: !empty($data['project']) ? new Document($data['project']) : null, + type: $data['type'] ?? '', + document: !empty($data['document']) ? new Document($data['document']) : null, + resource: $data['resource'] ?? null, + resourceType: $data['resourceType'] ?? null, + datetime: $data['datetime'] ?? null, + hourlyUsageRetentionDatetime: $data['hourlyUsageRetentionDatetime'] ?? null, + ); + } +} diff --git a/src/Appwrite/Event/Publisher/Delete.php b/src/Appwrite/Event/Publisher/Delete.php new file mode 100644 index 0000000000..fb3b46c647 --- /dev/null +++ b/src/Appwrite/Event/Publisher/Delete.php @@ -0,0 +1,27 @@ +publish($queue ?? $this->queue, $message); + } + + public function getSize(bool $failed = false, ?Queue $queue = null): int + { + return $this->getQueueSize($queue ?? $this->queue, $failed); + } +} diff --git a/src/Appwrite/Platform/Modules/Advisor/Http/Reports/Delete.php b/src/Appwrite/Platform/Modules/Advisor/Http/Reports/Delete.php index 6b1dfba31b..1efc029c17 100644 --- a/src/Appwrite/Platform/Modules/Advisor/Http/Reports/Delete.php +++ b/src/Appwrite/Platform/Modules/Advisor/Http/Reports/Delete.php @@ -2,8 +2,9 @@ namespace Appwrite\Platform\Modules\Advisor\Http\Reports; -use Appwrite\Event\Delete as DeleteEvent; use Appwrite\Event\Event; +use Appwrite\Event\Message\Delete as DeleteMessage; +use Appwrite\Event\Publisher\Delete as DeletePublisher; use Appwrite\Extend\Exception; use Appwrite\Platform\Action; use Appwrite\SDK\AuthType; @@ -58,7 +59,7 @@ class Delete extends Action ->inject('response') ->inject('project') ->inject('dbForPlatform') - ->inject('queueForDeletes') + ->inject('publisherForDeletes') ->inject('queueForEvents') ->callback($this->action(...)); } @@ -68,7 +69,7 @@ class Delete extends Action Response $response, Document $project, Database $dbForPlatform, - DeleteEvent $queueForDeletes, + DeletePublisher $publisherForDeletes, Event $queueForEvents ): void { $report = $dbForPlatform->skipFilters( @@ -84,9 +85,11 @@ class Delete extends Action throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Failed to remove report from DB'); } - $queueForDeletes - ->setType(DELETE_TYPE_REPORT) - ->setDocument($report); + $publisherForDeletes->enqueue(new DeleteMessage( + project: $project, + type: DELETE_TYPE_REPORT, + document: $report, + )); $queueForEvents ->setParam('reportId', $report->getId()) diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Transactions/Delete.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Transactions/Delete.php index d57cebbe4a..072cb21bbc 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Transactions/Delete.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Transactions/Delete.php @@ -2,7 +2,8 @@ namespace Appwrite\Platform\Modules\Databases\Http\Databases\Transactions; -use Appwrite\Event\Delete as DeleteEvent; +use Appwrite\Event\Message\Delete as DeleteMessage; +use Appwrite\Event\Publisher\Delete as DeletePublisher; use Appwrite\Extend\Exception; use Appwrite\SDK\AuthType; use Appwrite\SDK\ContentType; @@ -10,6 +11,7 @@ use Appwrite\SDK\Method; use Appwrite\SDK\Response as SDKResponse; use Appwrite\Utopia\Response as UtopiaResponse; use Utopia\Database\Database; +use Utopia\Database\Document; use Utopia\Database\Validator\UID; use Utopia\Http\Adapter\Swoole\Response as SwooleResponse; @@ -51,11 +53,12 @@ class Delete extends Action ->param('transactionId', '', fn (Database $dbForProject) => new UID($dbForProject->getAdapter()->getMaxUIDLength()), 'Transaction ID.', false, ['dbForProject']) ->inject('response') ->inject('dbForProject') - ->inject('queueForDeletes') + ->inject('publisherForDeletes') + ->inject('project') ->callback($this->action(...)); } - public function action(string $transactionId, UtopiaResponse $response, Database $dbForProject, DeleteEvent $queueForDeletes): void + public function action(string $transactionId, UtopiaResponse $response, Database $dbForProject, DeletePublisher $publisherForDeletes, Document $project): void { $transaction = $dbForProject->getDocument('transactions', $transactionId); @@ -65,9 +68,11 @@ class Delete extends Action $dbForProject->deleteDocument('transactions', $transactionId); - $queueForDeletes - ->setType(DELETE_TYPE_DOCUMENT) - ->setDocument($transaction); + $publisherForDeletes->enqueue(new DeleteMessage( + project: $project, + type: DELETE_TYPE_DOCUMENT, + document: $transaction, + )); $response->noContent(); } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Transactions/Update.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Transactions/Update.php index 4f91ba3f94..fe2ad8dbae 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Transactions/Update.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Transactions/Update.php @@ -3,9 +3,10 @@ namespace Appwrite\Platform\Modules\Databases\Http\Databases\Transactions; use Appwrite\Databases\TransactionState; -use Appwrite\Event\Delete; use Appwrite\Event\Event; +use Appwrite\Event\Message\Delete as DeleteMessage; use Appwrite\Event\Message\Func as FunctionMessage; +use Appwrite\Event\Publisher\Delete as DeletePublisher; use Appwrite\Event\Publisher\Func as FunctionPublisher; use Appwrite\Extend\Exception; use Appwrite\Functions\EventProcessor; @@ -75,7 +76,7 @@ class Update extends Action ->inject('getDatabasesDB') ->inject('user') ->inject('transactionState') - ->inject('queueForDeletes') + ->inject('publisherForDeletes') ->inject('queueForEvents') ->inject('usage') ->inject('queueForRealtime') @@ -95,7 +96,7 @@ class Update extends Action * @param callable $getDatabasesDB * @param User $user * @param TransactionState $transactionState - * @param Delete $queueForDeletes + * @param DeletePublisher $publisherForDeletes * @param Event $queueForEvents * @param Context $usage * @param Event $queueForRealtime @@ -110,7 +111,7 @@ class Update extends Action * @throws StructureException * @throws \Utopia\Http\Exception */ - public function action(string $transactionId, bool $commit, bool $rollback, Document $project, UtopiaResponse $response, Database $dbForProject, callable $getDatabasesDB, User $user, TransactionState $transactionState, Delete $queueForDeletes, Event $queueForEvents, Context $usage, Event $queueForRealtime, FunctionPublisher $publisherForFunctions, Event $queueForWebhooks, Authorization $authorization, EventProcessor $eventProcessor): void + public function action(string $transactionId, bool $commit, bool $rollback, Document $project, UtopiaResponse $response, Database $dbForProject, callable $getDatabasesDB, User $user, TransactionState $transactionState, DeletePublisher $publisherForDeletes, Event $queueForEvents, Context $usage, Event $queueForRealtime, FunctionPublisher $publisherForFunctions, Event $queueForWebhooks, Authorization $authorization, EventProcessor $eventProcessor): void { if (!$commit && !$rollback) { throw new Exception(Exception::GENERAL_BAD_REQUEST, 'Either commit or rollback must be true'); @@ -156,9 +157,11 @@ class Update extends Action new Document(['status' => 'committed']) )); - $queueForDeletes - ->setType(DELETE_TYPE_DOCUMENT) - ->setDocument($transaction); + $publisherForDeletes->enqueue(new DeleteMessage( + project: $project, + type: DELETE_TYPE_DOCUMENT, + document: $transaction, + )); $response ->setStatusCode(SwooleResponse::STATUS_CODE_OK) @@ -295,9 +298,11 @@ class Update extends Action new Document(['status' => 'committed']) )); - $queueForDeletes - ->setType(DELETE_TYPE_DOCUMENT) - ->setDocument($transaction); + $publisherForDeletes->enqueue(new DeleteMessage( + project: $project, + type: DELETE_TYPE_DOCUMENT, + document: $transaction, + )); } catch (NotFoundException $e) { $authorization->skip(fn () => $dbForProject->updateDocument('transactions', $transactionId, new Document([ 'status' => 'failed', @@ -501,9 +506,11 @@ class Update extends Action new Document(['status' => 'failed']) )); - $queueForDeletes - ->setType(DELETE_TYPE_DOCUMENT) - ->setDocument($transaction); + $publisherForDeletes->enqueue(new DeleteMessage( + project: $project, + type: DELETE_TYPE_DOCUMENT, + document: $transaction, + )); } $response diff --git a/src/Appwrite/Platform/Modules/Databases/Http/DocumentsDB/Transactions/Delete.php b/src/Appwrite/Platform/Modules/Databases/Http/DocumentsDB/Transactions/Delete.php index 036f2e9600..94ff3fa214 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/DocumentsDB/Transactions/Delete.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/DocumentsDB/Transactions/Delete.php @@ -49,7 +49,8 @@ class Delete extends TransactionsDelete ->param('transactionId', '', new UID(), 'Transaction ID.') ->inject('response') ->inject('dbForProject') - ->inject('queueForDeletes') + ->inject('publisherForDeletes') + ->inject('project') ->callback($this->action(...)); } } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/DocumentsDB/Transactions/Update.php b/src/Appwrite/Platform/Modules/Databases/Http/DocumentsDB/Transactions/Update.php index 97eff24508..1b9cdee137 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/DocumentsDB/Transactions/Update.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/DocumentsDB/Transactions/Update.php @@ -56,7 +56,7 @@ class Update extends TransactionsUpdate ->inject('getDatabasesDB') ->inject('user') ->inject('transactionState') - ->inject('queueForDeletes') + ->inject('publisherForDeletes') ->inject('queueForEvents') ->inject('usage') ->inject('queueForRealtime') diff --git a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Transactions/Delete.php b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Transactions/Delete.php index 9ee85ff153..988bfc3d1d 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Transactions/Delete.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Transactions/Delete.php @@ -50,7 +50,8 @@ class Delete extends TransactionsDelete ->param('transactionId', '', fn (Database $dbForProject) => new UID($dbForProject->getAdapter()->getMaxUIDLength()), 'Transaction ID.', false, ['dbForProject']) ->inject('response') ->inject('dbForProject') - ->inject('queueForDeletes') + ->inject('publisherForDeletes') + ->inject('project') ->callback($this->action(...)); } } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Transactions/Update.php b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Transactions/Update.php index c41186b5c3..bd06f475b2 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Transactions/Update.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Transactions/Update.php @@ -57,7 +57,7 @@ class Update extends TransactionsUpdate ->inject('getDatabasesDB') ->inject('user') ->inject('transactionState') - ->inject('queueForDeletes') + ->inject('publisherForDeletes') ->inject('queueForEvents') ->inject('usage') ->inject('queueForRealtime') diff --git a/src/Appwrite/Platform/Modules/Databases/Http/VectorsDB/Transactions/Delete.php b/src/Appwrite/Platform/Modules/Databases/Http/VectorsDB/Transactions/Delete.php index 0ac2caecba..2de71fc904 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/VectorsDB/Transactions/Delete.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/VectorsDB/Transactions/Delete.php @@ -49,7 +49,8 @@ class Delete extends TransactionsDelete ->param('transactionId', '', new UID(), 'Transaction ID.') ->inject('response') ->inject('dbForProject') - ->inject('queueForDeletes') + ->inject('publisherForDeletes') + ->inject('project') ->callback($this->action(...)); } } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/VectorsDB/Transactions/Update.php b/src/Appwrite/Platform/Modules/Databases/Http/VectorsDB/Transactions/Update.php index d6399e6bc0..cebfcb42e8 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/VectorsDB/Transactions/Update.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/VectorsDB/Transactions/Update.php @@ -56,7 +56,7 @@ class Update extends TransactionsUpdate ->inject('getDatabasesDB') ->inject('user') ->inject('transactionState') - ->inject('queueForDeletes') + ->inject('publisherForDeletes') ->inject('queueForEvents') ->inject('usage') ->inject('queueForRealtime') diff --git a/src/Appwrite/Platform/Modules/Functions/Http/Deployments/Delete.php b/src/Appwrite/Platform/Modules/Functions/Http/Deployments/Delete.php index 3d75919eb8..be4437ffe3 100644 --- a/src/Appwrite/Platform/Modules/Functions/Http/Deployments/Delete.php +++ b/src/Appwrite/Platform/Modules/Functions/Http/Deployments/Delete.php @@ -2,8 +2,9 @@ namespace Appwrite\Platform\Modules\Functions\Http\Deployments; -use Appwrite\Event\Delete as DeleteEvent; use Appwrite\Event\Event; +use Appwrite\Event\Message\Delete as DeleteMessage; +use Appwrite\Event\Publisher\Delete as DeletePublisher; use Appwrite\Extend\Exception; use Appwrite\SDK\AuthType; use Appwrite\SDK\ContentType; @@ -59,7 +60,7 @@ class Delete extends Action ->param('deploymentId', '', fn (Database $dbForProject) => new UID($dbForProject->getAdapter()->getMaxUIDLength()), 'Deployment ID.', false, ['dbForProject']) ->inject('response') ->inject('dbForProject') - ->inject('queueForDeletes') + ->inject('publisherForDeletes') ->inject('queueForEvents') ->inject('deviceForFunctions') ->callback($this->action(...)); @@ -70,7 +71,7 @@ class Delete extends Action string $deploymentId, Response $response, Database $dbForProject, - DeleteEvent $queueForDeletes, + DeletePublisher $publisherForDeletes, Event $queueForEvents, Device $deviceForFunctions ) { @@ -128,9 +129,11 @@ class Delete extends Action ->setParam('functionId', $function->getId()) ->setParam('deploymentId', $deployment->getId()); - $queueForDeletes - ->setType(DELETE_TYPE_DOCUMENT) - ->setDocument($deployment); + $publisherForDeletes->enqueue(new DeleteMessage( + project: $queueForEvents->getProject(), + type: DELETE_TYPE_DOCUMENT, + document: $deployment, + )); $response->noContent(); } diff --git a/src/Appwrite/Platform/Modules/Functions/Http/Executions/Create.php b/src/Appwrite/Platform/Modules/Functions/Http/Executions/Create.php index f2f51a90e6..35264730f8 100644 --- a/src/Appwrite/Platform/Modules/Functions/Http/Executions/Create.php +++ b/src/Appwrite/Platform/Modules/Functions/Http/Executions/Create.php @@ -3,9 +3,10 @@ namespace Appwrite\Platform\Modules\Functions\Http\Executions; use Ahc\Jwt\JWT; -use Appwrite\Event\Delete as DeleteEvent; use Appwrite\Event\Event; +use Appwrite\Event\Message\Delete as DeleteMessage; use Appwrite\Event\Message\Func as FunctionMessage; +use Appwrite\Event\Publisher\Delete as DeletePublisher; use Appwrite\Event\Publisher\Func as FunctionPublisher; use Appwrite\Extend\Exception; use Appwrite\Extend\Exception as AppwriteException; @@ -103,7 +104,7 @@ class Create extends Base ->inject('executor') ->inject('platform') ->inject('authorization') - ->inject('queueForDeletes') + ->inject('publisherForDeletes') ->inject('executionsRetentionCount') ->callback($this->action(...)); } @@ -131,7 +132,7 @@ class Create extends Base Executor $executor, array $platform, Authorization $authorization, - DeleteEvent $queueForDeletes, + DeletePublisher $publisherForDeletes, int $executionsRetentionCount, ) { $async = \strval($async) === 'true' || \strval($async) === '1'; @@ -338,12 +339,12 @@ class Create extends Base } if ($executionsRetentionCount > 0 && ENABLE_EXECUTIONS_LIMIT_ON_ROUTE) { - $queueForDeletes - ->setProject($project) - ->setResource($function->getSequence()) - ->setResourceType(RESOURCE_TYPE_FUNCTIONS) - ->setType(DELETE_TYPE_EXECUTIONS_LIMIT) - ->trigger(); + $publisherForDeletes->enqueue(new DeleteMessage( + project: $project, + type: DELETE_TYPE_EXECUTIONS_LIMIT, + resource: (string) $function->getSequence(), + resourceType: RESOURCE_TYPE_FUNCTIONS, + )); } $response->setStatusCode(Response::STATUS_CODE_ACCEPTED); @@ -529,12 +530,12 @@ class Create extends Base } if ($executionsRetentionCount > 0 && ENABLE_EXECUTIONS_LIMIT_ON_ROUTE) { - $queueForDeletes - ->setProject($project) - ->setResource($function->getSequence()) - ->setResourceType(RESOURCE_TYPE_FUNCTIONS) - ->setType(DELETE_TYPE_EXECUTIONS_LIMIT) - ->trigger(); + $publisherForDeletes->enqueue(new DeleteMessage( + project: $project, + type: DELETE_TYPE_EXECUTIONS_LIMIT, + resource: (string) $function->getSequence(), + resourceType: RESOURCE_TYPE_FUNCTIONS, + )); } $response diff --git a/src/Appwrite/Platform/Modules/Functions/Http/Functions/Delete.php b/src/Appwrite/Platform/Modules/Functions/Http/Functions/Delete.php index fb45cee82f..1517ee7793 100644 --- a/src/Appwrite/Platform/Modules/Functions/Http/Functions/Delete.php +++ b/src/Appwrite/Platform/Modules/Functions/Http/Functions/Delete.php @@ -2,8 +2,9 @@ namespace Appwrite\Platform\Modules\Functions\Http\Functions; -use Appwrite\Event\Delete as DeleteEvent; use Appwrite\Event\Event; +use Appwrite\Event\Message\Delete as DeleteMessage; +use Appwrite\Event\Publisher\Delete as DeletePublisher; use Appwrite\Extend\Exception; use Appwrite\Platform\Modules\Compute\Base; use Appwrite\SDK\AuthType; @@ -59,7 +60,7 @@ class Delete extends Base ->param('functionId', '', fn (Database $dbForProject) => new UID($dbForProject->getAdapter()->getMaxUIDLength()), 'Function ID.', false, ['dbForProject']) ->inject('response') ->inject('dbForProject') - ->inject('queueForDeletes') + ->inject('publisherForDeletes') ->inject('queueForEvents') ->inject('dbForPlatform') ->inject('authorization') @@ -70,7 +71,7 @@ class Delete extends Base string $functionId, Response $response, Database $dbForProject, - DeleteEvent $queueForDeletes, + DeletePublisher $publisherForDeletes, Event $queueForEvents, Database $dbForPlatform, Authorization $authorization @@ -97,9 +98,11 @@ class Delete extends Base ]))); } - $queueForDeletes - ->setType(DELETE_TYPE_DOCUMENT) - ->setDocument($function); + $publisherForDeletes->enqueue(new DeleteMessage( + project: $queueForEvents->getProject(), + type: DELETE_TYPE_DOCUMENT, + document: $function, + )); $queueForEvents->setParam('functionId', $function->getId()); diff --git a/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Deletes/Get.php b/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Deletes/Get.php index 816583fc47..c1bcc900e0 100644 --- a/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Deletes/Get.php +++ b/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Deletes/Get.php @@ -2,7 +2,7 @@ namespace Appwrite\Platform\Modules\Health\Http\Health\Queue\Deletes; -use Appwrite\Event\Delete; +use Appwrite\Event\Publisher\Delete; use Appwrite\Platform\Modules\Health\Http\Health\Queue\Base; use Appwrite\SDK\AuthType; use Appwrite\SDK\ContentType; @@ -42,16 +42,16 @@ class Get extends Base contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('queueForDeletes') + ->inject('publisherForDeletes') ->inject('response') ->callback($this->action(...)); } - public function action(int|string $threshold, Delete $queueForDeletes, Response $response): void + public function action(int|string $threshold, Delete $publisherForDeletes, Response $response): void { $threshold = (int) $threshold; - $size = $queueForDeletes->getSize(); + $size = $publisherForDeletes->getSize(); $this->assertQueueThreshold($size, $threshold); diff --git a/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Failed/Get.php b/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Failed/Get.php index 0429118e41..5aa29fcaba 100644 --- a/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Failed/Get.php +++ b/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Failed/Get.php @@ -3,11 +3,11 @@ namespace Appwrite\Platform\Modules\Health\Http\Health\Queue\Failed; use Appwrite\Event\Database; -use Appwrite\Event\Delete; use Appwrite\Event\Event; use Appwrite\Event\Publisher\Audit; use Appwrite\Event\Publisher\Build as BuildPublisher; use Appwrite\Event\Publisher\Certificate; +use Appwrite\Event\Publisher\Delete as DeletePublisher; use Appwrite\Event\Publisher\Func as FunctionPublisher; use Appwrite\Event\Publisher\Mail as MailPublisher; use Appwrite\Event\Publisher\Messaging as MessagingPublisher; @@ -75,7 +75,7 @@ class Get extends Base ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) ->inject('response') ->inject('queueForDatabase') - ->inject('queueForDeletes') + ->inject('publisherForDeletes') ->inject('publisherForAudits') ->inject('publisherForMails') ->inject('publisherForFunctions') @@ -95,7 +95,7 @@ class Get extends Base int|string $threshold, Response $response, Database $queueForDatabase, - Delete $queueForDeletes, + DeletePublisher $publisherForDeletes, Audit $publisherForAudits, MailPublisher $publisherForMails, FunctionPublisher $publisherForFunctions, @@ -112,7 +112,7 @@ class Get extends Base $queue = match ($name) { System::getEnv('_APP_DATABASE_QUEUE_NAME', Event::DATABASE_QUEUE_NAME) => $queueForDatabase, - System::getEnv('_APP_DELETE_QUEUE_NAME', Event::DELETE_QUEUE_NAME) => $queueForDeletes, + System::getEnv('_APP_DELETE_QUEUE_NAME', Event::DELETE_QUEUE_NAME) => $publisherForDeletes, System::getEnv('_APP_AUDITS_QUEUE_NAME', Event::AUDITS_QUEUE_NAME) => $publisherForAudits, System::getEnv('_APP_MAILS_QUEUE_NAME', Event::MAILS_QUEUE_NAME) => $publisherForMails, System::getEnv('_APP_FUNCTIONS_QUEUE_NAME', Event::FUNCTIONS_QUEUE_NAME) => $publisherForFunctions, diff --git a/src/Appwrite/Platform/Modules/Project/Http/Project/Delete.php b/src/Appwrite/Platform/Modules/Project/Http/Project/Delete.php index 4b26557ca9..201061dd62 100644 --- a/src/Appwrite/Platform/Modules/Project/Http/Project/Delete.php +++ b/src/Appwrite/Platform/Modules/Project/Http/Project/Delete.php @@ -2,7 +2,8 @@ namespace Appwrite\Platform\Modules\Project\Http\Project; -use Appwrite\Event\Delete as DeleteQueue; +use Appwrite\Event\Message\Delete as DeleteMessage; +use Appwrite\Event\Publisher\Delete as DeletePublisher; use Appwrite\Extend\Exception; use Appwrite\SDK\AuthType; use Appwrite\SDK\ContentType; @@ -53,7 +54,7 @@ class Delete extends Action )) ->inject('response') ->inject('dbForPlatform') - ->inject('queueForDeletes') + ->inject('publisherForDeletes') ->inject('authorization') ->inject('project') ->callback($this->action(...)); @@ -62,19 +63,20 @@ class Delete extends Action public function action( Response $response, Database $dbForPlatform, - DeleteQueue $queueForDeletes, + DeletePublisher $publisherForDeletes, Authorization $authorization, Document $project, ) { - $queueForDeletes - ->setProject($project) - ->setType(DELETE_TYPE_DOCUMENT) - ->setDocument($project); - if (!$authorization->skip(fn () => $dbForPlatform->deleteDocument('projects', $project->getId()))) { throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Failed to remove project from DB'); } + $publisherForDeletes->enqueue(new DeleteMessage( + project: $project, + type: DELETE_TYPE_DOCUMENT, + document: $project, + )); + $response->noContent(); } } diff --git a/src/Appwrite/Platform/Modules/Proxy/Http/Rules/Delete.php b/src/Appwrite/Platform/Modules/Proxy/Http/Rules/Delete.php index 29751ff20a..991b8eb006 100644 --- a/src/Appwrite/Platform/Modules/Proxy/Http/Rules/Delete.php +++ b/src/Appwrite/Platform/Modules/Proxy/Http/Rules/Delete.php @@ -2,8 +2,9 @@ namespace Appwrite\Platform\Modules\Proxy\Http\Rules; -use Appwrite\Event\Delete as DeleteEvent; use Appwrite\Event\Event; +use Appwrite\Event\Message\Delete as DeleteMessage; +use Appwrite\Event\Publisher\Delete as DeletePublisher; use Appwrite\Extend\Exception; use Appwrite\SDK\AuthType; use Appwrite\SDK\ContentType; @@ -57,7 +58,7 @@ class Delete extends Action ->inject('response') ->inject('project') ->inject('dbForPlatform') - ->inject('queueForDeletes') + ->inject('publisherForDeletes') ->inject('queueForEvents') ->inject('authorization') ->callback($this->action(...)); @@ -68,7 +69,7 @@ class Delete extends Action Response $response, Document $project, Database $dbForPlatform, - DeleteEvent $queueForDeletes, + DeletePublisher $publisherForDeletes, Event $queueForEvents, Authorization $authorization, ) { @@ -80,9 +81,11 @@ class Delete extends Action $authorization->skip(fn () => $dbForPlatform->deleteDocument('rules', $rule->getId())); - $queueForDeletes - ->setType(DELETE_TYPE_DOCUMENT) - ->setDocument($rule); + $publisherForDeletes->enqueue(new DeleteMessage( + project: $project, + type: DELETE_TYPE_DOCUMENT, + document: $rule, + )); $queueForEvents->setParam('ruleId', $rule->getId()); diff --git a/src/Appwrite/Platform/Modules/Sites/Http/Deployments/Delete.php b/src/Appwrite/Platform/Modules/Sites/Http/Deployments/Delete.php index efea79395f..b50e9b54f4 100644 --- a/src/Appwrite/Platform/Modules/Sites/Http/Deployments/Delete.php +++ b/src/Appwrite/Platform/Modules/Sites/Http/Deployments/Delete.php @@ -2,8 +2,9 @@ namespace Appwrite\Platform\Modules\Sites\Http\Deployments; -use Appwrite\Event\Delete as DeleteEvent; use Appwrite\Event\Event; +use Appwrite\Event\Message\Delete as DeleteMessage; +use Appwrite\Event\Publisher\Delete as DeletePublisher; use Appwrite\Extend\Exception; use Appwrite\SDK\AuthType; use Appwrite\SDK\ContentType; @@ -59,7 +60,7 @@ class Delete extends Action ->param('deploymentId', '', fn (Database $dbForProject) => new UID($dbForProject->getAdapter()->getMaxUIDLength()), 'Deployment ID.', false, ['dbForProject']) ->inject('response') ->inject('dbForProject') - ->inject('queueForDeletes') + ->inject('publisherForDeletes') ->inject('queueForEvents') ->inject('deviceForSites') ->callback($this->action(...)); @@ -70,7 +71,7 @@ class Delete extends Action string $deploymentId, Response $response, Database $dbForProject, - DeleteEvent $queueForDeletes, + DeletePublisher $publisherForDeletes, Event $queueForEvents, Device $deviceForSites ) { @@ -130,9 +131,11 @@ class Delete extends Action ->setParam('siteId', $site->getId()) ->setParam('deploymentId', $deployment->getId()); - $queueForDeletes - ->setType(DELETE_TYPE_DOCUMENT) - ->setDocument($deployment); + $publisherForDeletes->enqueue(new DeleteMessage( + project: $queueForEvents->getProject(), + type: DELETE_TYPE_DOCUMENT, + document: $deployment, + )); $response->noContent(); } diff --git a/src/Appwrite/Platform/Modules/Sites/Http/Sites/Delete.php b/src/Appwrite/Platform/Modules/Sites/Http/Sites/Delete.php index ebc192b6e6..50b070d098 100644 --- a/src/Appwrite/Platform/Modules/Sites/Http/Sites/Delete.php +++ b/src/Appwrite/Platform/Modules/Sites/Http/Sites/Delete.php @@ -2,8 +2,9 @@ namespace Appwrite\Platform\Modules\Sites\Http\Sites; -use Appwrite\Event\Delete as DeleteEvent; use Appwrite\Event\Event; +use Appwrite\Event\Message\Delete as DeleteMessage; +use Appwrite\Event\Publisher\Delete as DeletePublisher; use Appwrite\Extend\Exception; use Appwrite\Platform\Modules\Compute\Base; use Appwrite\SDK\AuthType; @@ -56,7 +57,7 @@ class Delete extends Base ->param('siteId', '', fn (Database $dbForProject) => new UID($dbForProject->getAdapter()->getMaxUIDLength()), 'Site ID.', false, ['dbForProject']) ->inject('response') ->inject('dbForProject') - ->inject('queueForDeletes') + ->inject('publisherForDeletes') ->inject('queueForEvents') ->callback($this->action(...)); } @@ -65,7 +66,7 @@ class Delete extends Base string $siteId, Response $response, Database $dbForProject, - DeleteEvent $queueForDeletes, + DeletePublisher $publisherForDeletes, Event $queueForEvents ) { $site = $dbForProject->getDocument('sites', $siteId); @@ -78,9 +79,11 @@ class Delete extends Base throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Failed to remove site from DB'); } - $queueForDeletes - ->setType(DELETE_TYPE_DOCUMENT) - ->setDocument($site); + $publisherForDeletes->enqueue(new DeleteMessage( + project: $queueForEvents->getProject(), + type: DELETE_TYPE_DOCUMENT, + document: $site, + )); $queueForEvents->setParam('siteId', $site->getId()); diff --git a/src/Appwrite/Platform/Modules/Storage/Http/Buckets/Delete.php b/src/Appwrite/Platform/Modules/Storage/Http/Buckets/Delete.php index 9523f55e12..2581a2163d 100644 --- a/src/Appwrite/Platform/Modules/Storage/Http/Buckets/Delete.php +++ b/src/Appwrite/Platform/Modules/Storage/Http/Buckets/Delete.php @@ -2,8 +2,9 @@ namespace Appwrite\Platform\Modules\Storage\Http\Buckets; -use Appwrite\Event\Delete as DeleteEvent; use Appwrite\Event\Event; +use Appwrite\Event\Message\Delete as DeleteMessage; +use Appwrite\Event\Publisher\Delete as DeletePublisher; use Appwrite\Extend\Exception; use Appwrite\SDK\AuthType; use Appwrite\SDK\ContentType; @@ -53,7 +54,7 @@ class Delete extends Action ->param('bucketId', '', new UID(), 'Bucket unique ID.') ->inject('response') ->inject('dbForProject') - ->inject('queueForDeletes') + ->inject('publisherForDeletes') ->inject('queueForEvents') ->callback($this->action(...)); } @@ -62,7 +63,7 @@ class Delete extends Action string $bucketId, Response $response, Database $dbForProject, - DeleteEvent $queueForDeletes, + DeletePublisher $publisherForDeletes, Event $queueForEvents ) { $bucket = $dbForProject->getDocument('buckets', $bucketId); @@ -75,9 +76,11 @@ class Delete extends Action throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Failed to remove bucket from DB'); } - $queueForDeletes - ->setType(DELETE_TYPE_DOCUMENT) - ->setDocument($bucket); + $publisherForDeletes->enqueue(new DeleteMessage( + project: $queueForEvents->getProject(), + type: DELETE_TYPE_DOCUMENT, + document: $bucket, + )); $queueForEvents ->setParam('bucketId', $bucket->getId()) diff --git a/src/Appwrite/Platform/Modules/Storage/Http/Buckets/Files/Delete.php b/src/Appwrite/Platform/Modules/Storage/Http/Buckets/Files/Delete.php index 5b44c61d18..6d8781d484 100644 --- a/src/Appwrite/Platform/Modules/Storage/Http/Buckets/Files/Delete.php +++ b/src/Appwrite/Platform/Modules/Storage/Http/Buckets/Files/Delete.php @@ -2,8 +2,9 @@ namespace Appwrite\Platform\Modules\Storage\Http\Buckets\Files; -use Appwrite\Event\Delete as DeleteEvent; use Appwrite\Event\Event; +use Appwrite\Event\Message\Delete as DeleteMessage; +use Appwrite\Event\Publisher\Delete as DeletePublisher; use Appwrite\Extend\Exception; use Appwrite\SDK\AuthType; use Appwrite\SDK\ContentType; @@ -64,7 +65,7 @@ class Delete extends Action ->inject('dbForProject') ->inject('queueForEvents') ->inject('deviceForFiles') - ->inject('queueForDeletes') + ->inject('publisherForDeletes') ->inject('authorization') ->inject('user') ->callback($this->action(...)); @@ -77,7 +78,7 @@ class Delete extends Action Database $dbForProject, Event $queueForEvents, Device $deviceForFiles, - DeleteEvent $queueForDeletes, + DeletePublisher $publisherForDeletes, Authorization $authorization, User $user, ) { @@ -126,11 +127,12 @@ class Delete extends Action } if ($deviceDeleted) { - $queueForDeletes - ->setType(DELETE_TYPE_CACHE_BY_RESOURCE) - ->setResourceType('bucket/' . $bucket->getId()) - ->setResource('file/' . $fileId) - ; + $publisherForDeletes->enqueue(new DeleteMessage( + project: $queueForEvents->getProject(), + type: DELETE_TYPE_CACHE_BY_RESOURCE, + resource: 'file/' . $fileId, + resourceType: 'bucket/' . $bucket->getId(), + )); try { if ($fileSecurity && !$valid) { diff --git a/src/Appwrite/Platform/Modules/Teams/Http/Teams/Delete.php b/src/Appwrite/Platform/Modules/Teams/Http/Teams/Delete.php index 0cb7c54a26..3bae031e06 100644 --- a/src/Appwrite/Platform/Modules/Teams/Http/Teams/Delete.php +++ b/src/Appwrite/Platform/Modules/Teams/Http/Teams/Delete.php @@ -2,8 +2,9 @@ namespace Appwrite\Platform\Modules\Teams\Http\Teams; -use Appwrite\Event\Delete as DeleteEvent; use Appwrite\Event\Event; +use Appwrite\Event\Message\Delete as DeleteMessage; +use Appwrite\Event\Publisher\Delete as DeletePublisher; use Appwrite\Extend\Exception; use Appwrite\Platform\Action; use Appwrite\Platform\Workers\Deletes; @@ -55,13 +56,13 @@ class Delete extends Action ->inject('response') ->inject('getProjectDB') ->inject('dbForProject') - ->inject('queueForDeletes') + ->inject('publisherForDeletes') ->inject('queueForEvents') ->inject('project') ->callback($this->action(...)); } - public function action(string $teamId, Response $response, callable $getProjectDB, Database $dbForProject, DeleteEvent $queueForDeletes, Event $queueForEvents, Document $project) + public function action(string $teamId, Response $response, callable $getProjectDB, Database $dbForProject, DeletePublisher $publisherForDeletes, Event $queueForEvents, Document $project) { $team = $dbForProject->getDocument('teams', $teamId); @@ -79,15 +80,18 @@ class Delete extends Action // Async delete if ($project->getId() === 'console') { - $queueForDeletes - ->setType(DELETE_TYPE_TEAM_PROJECTS) - ->setDocument($team) - ->trigger(); + $publisherForDeletes->enqueue(new DeleteMessage( + project: $project, + type: DELETE_TYPE_TEAM_PROJECTS, + document: $team, + )); } - $queueForDeletes - ->setType(DELETE_TYPE_DOCUMENT) - ->setDocument($team); + $publisherForDeletes->enqueue(new DeleteMessage( + project: $project, + type: DELETE_TYPE_DOCUMENT, + document: $team, + )); $queueForEvents ->setParam('teamId', $team->getId()) diff --git a/src/Appwrite/Platform/Modules/VCS/Http/Installations/Delete.php b/src/Appwrite/Platform/Modules/VCS/Http/Installations/Delete.php index 26a9476941..5d90d6d231 100644 --- a/src/Appwrite/Platform/Modules/VCS/Http/Installations/Delete.php +++ b/src/Appwrite/Platform/Modules/VCS/Http/Installations/Delete.php @@ -2,7 +2,8 @@ namespace Appwrite\Platform\Modules\VCS\Http\Installations; -use Appwrite\Event\Delete as DeleteEvent; +use Appwrite\Event\Message\Delete as DeleteMessage; +use Appwrite\Event\Publisher\Delete as DeletePublisher; use Appwrite\Extend\Exception; use Appwrite\Platform\Action; use Appwrite\SDK\AuthType; @@ -11,6 +12,7 @@ use Appwrite\SDK\Method; use Appwrite\SDK\Response as SDKResponse; use Appwrite\Utopia\Response; use Utopia\Database\Database; +use Utopia\Database\Document; use Utopia\Platform\Scope\HTTP; use Utopia\Validator\Text; @@ -49,7 +51,8 @@ class Delete extends Action ->param('installationId', '', new Text(256), 'Installation Id') ->inject('response') ->inject('dbForPlatform') - ->inject('queueForDeletes') + ->inject('publisherForDeletes') + ->inject('project') ->callback($this->action(...)); } @@ -57,7 +60,8 @@ class Delete extends Action string $installationId, Response $response, Database $dbForPlatform, - DeleteEvent $queueForDeletes + DeletePublisher $publisherForDeletes, + Document $project, ) { $installation = $dbForPlatform->getDocument('installations', $installationId); @@ -69,9 +73,11 @@ class Delete extends Action throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Failed to remove installation from DB'); } - $queueForDeletes - ->setType(DELETE_TYPE_DOCUMENT) - ->setDocument($installation); + $publisherForDeletes->enqueue(new DeleteMessage( + project: $project, + type: DELETE_TYPE_DOCUMENT, + document: $installation, + )); $response->noContent(); } diff --git a/src/Appwrite/Platform/Tasks/Maintenance.php b/src/Appwrite/Platform/Tasks/Maintenance.php index fe803f1292..e43281545a 100644 --- a/src/Appwrite/Platform/Tasks/Maintenance.php +++ b/src/Appwrite/Platform/Tasks/Maintenance.php @@ -2,8 +2,9 @@ namespace Appwrite\Platform\Tasks; -use Appwrite\Event\Delete; +use Appwrite\Event\Message\Delete as DeleteMessage; use Appwrite\Event\Publisher\Certificate; +use Appwrite\Event\Publisher\Delete as DeletePublisher; use DateInterval; use DateTime; use Utopia\Console; @@ -30,11 +31,11 @@ class Maintenance extends Action ->inject('dbForPlatform') ->inject('console') ->inject('publisherForCertificates') - ->inject('queueForDeletes') + ->inject('publisherForDeletes') ->callback($this->action(...)); } - public function action(string $type, Database $dbForPlatform, Document $console, Certificate $publisherForCertificates, Delete $queueForDeletes): void + public function action(string $type, Database $dbForPlatform, Document $console, Certificate $publisherForCertificates, DeletePublisher $publisherForDeletes): void { Console::title('Maintenance V1'); Console::success(APP_NAME . ' maintenance process v1 has started'); @@ -59,7 +60,7 @@ class Maintenance extends Action $delay = $next->getTimestamp() - $now->getTimestamp(); } - $action = function () use ($interval, $cacheRetention, $schedulesDeletionRetention, $usageStatsRetentionHourly, $dbForPlatform, $console, $queueForDeletes, $publisherForCertificates) { + $action = function () use ($interval, $cacheRetention, $schedulesDeletionRetention, $usageStatsRetentionHourly, $dbForPlatform, $console, $publisherForDeletes, $publisherForCertificates) { $time = DatabaseDateTime::now(); Console::info("[{$time}] Notifying workers with maintenance tasks every {$interval} seconds"); @@ -70,12 +71,12 @@ class Maintenance extends Action $dbForPlatform->foreach( 'projects', - function (Document $project) use ($queueForDeletes, $usageStatsRetentionHourly) { - $queueForDeletes - ->setType(DELETE_TYPE_MAINTENANCE) - ->setProject($project) - ->setUsageRetentionHourlyDateTime(DatabaseDateTime::addSeconds(new \DateTime(), -1 * $usageStatsRetentionHourly)) - ->trigger(); + function (Document $project) use ($publisherForDeletes, $usageStatsRetentionHourly) { + $publisherForDeletes->enqueue(new DeleteMessage( + project: $project, + type: DELETE_TYPE_MAINTENANCE, + hourlyUsageRetentionDatetime: DatabaseDateTime::addSeconds(new \DateTime(), -1 * $usageStatsRetentionHourly), + )); }, [ Query::equal('region', [System::getEnv('_APP_REGION', 'default')]), @@ -85,17 +86,17 @@ class Maintenance extends Action ] ); - $queueForDeletes - ->setType(DELETE_TYPE_MAINTENANCE) - ->setProject($console) - ->setUsageRetentionHourlyDateTime(DatabaseDateTime::addSeconds(new \DateTime(), -1 * $usageStatsRetentionHourly)) - ->trigger(); + $publisherForDeletes->enqueue(new DeleteMessage( + project: $console, + type: DELETE_TYPE_MAINTENANCE, + hourlyUsageRetentionDatetime: DatabaseDateTime::addSeconds(new \DateTime(), -1 * $usageStatsRetentionHourly), + )); - $this->notifyDeleteConnections($queueForDeletes); + $this->notifyDeleteConnections($publisherForDeletes); $this->renewCertificates($dbForPlatform, $publisherForCertificates); - $this->notifyDeleteCache($cacheRetention, $queueForDeletes); - $this->notifyDeleteSchedules($schedulesDeletionRetention, $queueForDeletes); - $this->notifyDeleteCSVExports($queueForDeletes); + $this->notifyDeleteCache($cacheRetention, $publisherForDeletes); + $this->notifyDeleteSchedules($schedulesDeletionRetention, $publisherForDeletes); + $this->notifyDeleteCSVExports($publisherForDeletes); }; if ($type === 'loop') { @@ -109,19 +110,17 @@ class Maintenance extends Action } } - private function notifyDeleteConnections(Delete $queueForDeletes): void + private function notifyDeleteConnections(DeletePublisher $publisherForDeletes): void { - $queueForDeletes - ->setType(DELETE_TYPE_REALTIME) - ->setDatetime(DatabaseDateTime::addSeconds(new \DateTime(), -60)) - ->trigger(); + $publisherForDeletes->enqueue(new DeleteMessage( + type: DELETE_TYPE_REALTIME, + datetime: DatabaseDateTime::addSeconds(new \DateTime(), -60), + )); } - private function notifyDeleteCSVExports(Delete $queueForDeletes): void + private function notifyDeleteCSVExports(DeletePublisher $publisherForDeletes): void { - $queueForDeletes - ->setType(DELETE_TYPE_CSV_EXPORTS) - ->trigger(); + $publisherForDeletes->enqueue(new DeleteMessage(type: DELETE_TYPE_CSV_EXPORTS)); } private function renewCertificates(Database $dbForPlatform, Certificate $publisherForCertificate): void @@ -172,19 +171,19 @@ class Maintenance extends Action } } - private function notifyDeleteCache($interval, Delete $queueForDeletes): void + private function notifyDeleteCache($interval, DeletePublisher $publisherForDeletes): void { - $queueForDeletes - ->setType(DELETE_TYPE_CACHE_BY_TIMESTAMP) - ->setDatetime(DatabaseDateTime::addSeconds(new \DateTime(), -1 * $interval)) - ->trigger(); + $publisherForDeletes->enqueue(new DeleteMessage( + type: DELETE_TYPE_CACHE_BY_TIMESTAMP, + datetime: DatabaseDateTime::addSeconds(new \DateTime(), -1 * $interval), + )); } - private function notifyDeleteSchedules($interval, Delete $queueForDeletes): void + private function notifyDeleteSchedules($interval, DeletePublisher $publisherForDeletes): void { - $queueForDeletes - ->setType(DELETE_TYPE_SCHEDULES) - ->setDatetime(DatabaseDateTime::addSeconds(new \DateTime(), -1 * $interval)) - ->trigger(); + $publisherForDeletes->enqueue(new DeleteMessage( + type: DELETE_TYPE_SCHEDULES, + datetime: DatabaseDateTime::addSeconds(new \DateTime(), -1 * $interval), + )); } } diff --git a/src/Appwrite/Platform/Workers/Deletes.php b/src/Appwrite/Platform/Workers/Deletes.php index 49bbb2cf9e..a58fc48098 100644 --- a/src/Appwrite/Platform/Workers/Deletes.php +++ b/src/Appwrite/Platform/Workers/Deletes.php @@ -5,7 +5,8 @@ namespace Appwrite\Platform\Workers; use Appwrite\Certificates\Adapter as CertificatesAdapter; use Appwrite\Deletes\Identities; use Appwrite\Deletes\Targets; -use Appwrite\Event\Delete as DeleteEvent; +use Appwrite\Event\Message\Delete as DeleteMessage; +use Appwrite\Event\Publisher\Delete as DeletePublisher; use Appwrite\Extend\Exception; use Executor\Executor; use Throwable; @@ -66,7 +67,7 @@ class Deletes extends Action ->inject('executionsRetentionCount') ->inject('auditRetention') ->inject('log') - ->inject('queueForDeletes') + ->inject('publisherForDeletes') ->inject('getAudit') ->callback($this->action(...)); } @@ -93,7 +94,7 @@ class Deletes extends Action int $executionsRetentionCount, string $auditRetention, Log $log, - DeleteEvent $queueForDeletes, + DeletePublisher $publisherForDeletes, callable $getAudit, ): void { $payload = $message->getPayload(); @@ -102,12 +103,13 @@ class Deletes extends Action throw new Exception('Missing payload'); } - $type = $payload['type'] ?? ''; - $datetime = $payload['datetime'] ?? null; - $hourlyUsageRetentionDatetime = $payload['hourlyUsageRetentionDatetime'] ?? null; - $resource = $payload['resource'] ?? null; - $resourceType = $payload['resourceType'] ?? null; - $document = new Document($payload['document'] ?? []); + $deleteMessage = DeleteMessage::fromArray($payload); + $type = $deleteMessage->type; + $datetime = $deleteMessage->datetime; + $hourlyUsageRetentionDatetime = $deleteMessage->hourlyUsageRetentionDatetime; + $resource = $deleteMessage->resource; + $resourceType = $deleteMessage->resourceType; + $document = $deleteMessage->document ?? new Document(); $log->addTag('projectId', $project->getId()); $log->addTag('type', $type); @@ -214,7 +216,7 @@ class Deletes extends Action $this->deleteUsageStats($project, $getProjectDB, $getLogsDB, $hourlyUsageRetentionDatetime); $this->deleteExpiredSessions($project, $getProjectDB); $this->deleteExpiredTransactions($project, $getProjectDB); - $this->deleteOldDeployments($queueForDeletes, $project, $getProjectDB); + $this->deleteOldDeployments($publisherForDeletes, $project, $getProjectDB); break; case DELETE_TYPE_REPORT: $this->deleteReport($dbForPlatform, $project, $document); @@ -390,12 +392,12 @@ class Deletes extends Action Targets::delete($getProjectDB($project), Query::equal('sessionInternalId', [$session->getSequence()])); } - private function deleteOldDeployments(DeleteEvent $queueForDeletes, Document $project, callable $getProjectDB): void + private function deleteOldDeployments(DeletePublisher $publisherForDeletes, Document $project, callable $getProjectDB): void { /** @var Database $dbForProject */ $dbForProject = $getProjectDB($project); - $removalCallback = function (Document $resource) use ($dbForProject, $queueForDeletes, $project) { + $removalCallback = function (Document $resource) use ($dbForProject, $publisherForDeletes, $project) { $retention = $resource->getAttribute('deploymentRetention', 0); // 0 means unlimited - never delete @@ -420,12 +422,12 @@ class Deletes extends Action 'deployments', $queries, $dbForProject, - function (Document $deployment) use ($queueForDeletes, $project) { - $queueForDeletes - ->setType(DELETE_TYPE_DOCUMENT) - ->setDocument($deployment) - ->setProject($project) - ->trigger(); + function (Document $deployment) use ($publisherForDeletes, $project) { + $publisherForDeletes->enqueue(new DeleteMessage( + project: $project, + type: DELETE_TYPE_DOCUMENT, + document: $deployment, + )); } ); }; From 8f481d6f2cf4bd1df5ca565e3d6389ccc3b2c560 Mon Sep 17 00:00:00 2001 From: Chirag Aggarwal Date: Thu, 14 May 2026 15:37:04 +0530 Subject: [PATCH 2/4] refactor: keep delete publisher in shared container --- app/init/resources/request.php | 5 ----- app/init/worker/message.php | 6 ------ 2 files changed, 11 deletions(-) diff --git a/app/init/resources/request.php b/app/init/resources/request.php index a6ce80b316..2e303dcfaa 100644 --- a/app/init/resources/request.php +++ b/app/init/resources/request.php @@ -8,7 +8,6 @@ use Appwrite\Event\Context\Audit as AuditContext; use Appwrite\Event\Database as EventDatabase; use Appwrite\Event\Event; use Appwrite\Event\Message\Func as FunctionMessage; -use Appwrite\Event\Publisher\Delete as DeletePublisher; use Appwrite\Event\Publisher\Func as FunctionPublisher; use Appwrite\Event\Realtime; use Appwrite\Event\Webhook; @@ -118,10 +117,6 @@ return function (Container $context): void { $publisher, new Queue(System::getEnv('_APP_FUNCTIONS_QUEUE_NAME', Event::FUNCTIONS_QUEUE_NAME), 'utopia-queue', Event::FUNCTIONS_QUEUE_TTL) ), ['publisher']); - $context->set('publisherForDeletes', fn (Publisher $publisher) => new DeletePublisher( - $publisher, - new Queue(System::getEnv('_APP_DELETE_QUEUE_NAME', Event::DELETE_QUEUE_NAME)) - ), ['publisher']); $context->set('eventProcessor', fn () => new EventProcessor(), []); $context->set('dbForPlatform', function (Group $pools, Cache $cache, Authorization $authorization) { $adapter = new DatabasePool($pools->get('console')); diff --git a/app/init/worker/message.php b/app/init/worker/message.php index 16f180b2c2..2862d3ccdf 100644 --- a/app/init/worker/message.php +++ b/app/init/worker/message.php @@ -2,7 +2,6 @@ use Appwrite\Event\Database as EventDatabase; use Appwrite\Event\Event; -use Appwrite\Event\Publisher\Delete as DeletePublisher; use Appwrite\Event\Publisher\Func as FunctionPublisher; use Appwrite\Event\Realtime; use Appwrite\Event\Webhook; @@ -344,11 +343,6 @@ return function (Container $container): void { $publisher, new Queue(System::getEnv('_APP_FUNCTIONS_QUEUE_NAME', Event::FUNCTIONS_QUEUE_NAME), 'utopia-queue', Event::FUNCTIONS_QUEUE_TTL) ), ['publisher']); - $container->set('publisherForDeletes', fn (Publisher $publisher) => new DeletePublisher( - $publisher, - new Queue(System::getEnv('_APP_DELETE_QUEUE_NAME', Event::DELETE_QUEUE_NAME)) - ), ['publisher']); - $container->set('queueForRealtime', function () { return new Realtime(); }, []); From 6d0a76cf5156258d9c142f21da87a0ea0d7dbbd9 Mon Sep 17 00:00:00 2001 From: Chirag Aggarwal Date: Thu, 14 May 2026 16:35:08 +0530 Subject: [PATCH 3/4] Fix text response schemas in API specs --- .../SDK/Specification/Format/OpenAPI3.php | 16 ++++++++++++++++ .../SDK/Specification/Format/Swagger2.php | 12 ++++++++++++ 2 files changed, 28 insertions(+) diff --git a/src/Appwrite/SDK/Specification/Format/OpenAPI3.php b/src/Appwrite/SDK/Specification/Format/OpenAPI3.php index 3be3fe7115..99538bc8f2 100644 --- a/src/Appwrite/SDK/Specification/Format/OpenAPI3.php +++ b/src/Appwrite/SDK/Specification/Format/OpenAPI3.php @@ -4,6 +4,7 @@ namespace Appwrite\SDK\Specification\Format; use Appwrite\Platform\Tasks\Specs; use Appwrite\SDK\AuthType; +use Appwrite\SDK\ContentType; use Appwrite\SDK\Method; use Appwrite\SDK\MethodType; use Appwrite\SDK\Response; @@ -291,6 +292,21 @@ class OpenAPI3 extends Format } if (!(\is_array($model)) && $model->isNone()) { + if ($produces === ContentType::TEXT->value && $response->getCode() !== 204) { + $temp['responses'][(string)$response->getCode()] = [ + 'description' => 'Text', + 'content' => [ + $produces => [ + 'schema' => [ + 'type' => 'string', + ], + ], + ], + ]; + + continue; + } + $temp['responses'][(string)$response->getCode()] = [ 'description' => in_array($produces, [ 'image/*', diff --git a/src/Appwrite/SDK/Specification/Format/Swagger2.php b/src/Appwrite/SDK/Specification/Format/Swagger2.php index 9a16bc8bbe..fb1ef66eca 100644 --- a/src/Appwrite/SDK/Specification/Format/Swagger2.php +++ b/src/Appwrite/SDK/Specification/Format/Swagger2.php @@ -4,6 +4,7 @@ namespace Appwrite\SDK\Specification\Format; use Appwrite\Platform\Tasks\Specs; use Appwrite\SDK\AuthType; +use Appwrite\SDK\ContentType; use Appwrite\SDK\Method; use Appwrite\SDK\MethodType; use Appwrite\SDK\Response; @@ -298,6 +299,17 @@ class Swagger2 extends Format } if (!(\is_array($model)) && $model->isNone()) { + if ($produces === ContentType::TEXT->value && !\in_array($response->getCode(), [204, 301, 302, 308], true)) { + $temp['responses'][(string)$response->getCode()] = [ + 'description' => 'Text', + 'schema' => [ + 'type' => 'string', + ], + ]; + + continue; + } + $temp['responses'][(string)$response->getCode()] = [ 'description' => in_array($produces, [ 'image/*', From e1547023f63e5ea9f7a99cd00bbcb24485f7dd38 Mon Sep 17 00:00:00 2001 From: Chirag Aggarwal Date: Thu, 14 May 2026 16:48:58 +0530 Subject: [PATCH 4/4] Fix OpenAPI text redirect response schemas --- src/Appwrite/SDK/Specification/Format/OpenAPI3.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Appwrite/SDK/Specification/Format/OpenAPI3.php b/src/Appwrite/SDK/Specification/Format/OpenAPI3.php index 99538bc8f2..68ab7a0986 100644 --- a/src/Appwrite/SDK/Specification/Format/OpenAPI3.php +++ b/src/Appwrite/SDK/Specification/Format/OpenAPI3.php @@ -292,7 +292,7 @@ class OpenAPI3 extends Format } if (!(\is_array($model)) && $model->isNone()) { - if ($produces === ContentType::TEXT->value && $response->getCode() !== 204) { + if ($produces === ContentType::TEXT->value && !\in_array($response->getCode(), [204, 301, 302, 308], true)) { $temp['responses'][(string)$response->getCode()] = [ 'description' => 'Text', 'content' => [