Merge pull request #12305 from appwrite/chore/refactor-delete-queue-publisher

refactor: Migrate queueForDeletes to DeletePublisher and DeleteMessage
This commit is contained in:
Chirag Aggarwal
2026-05-14 16:31:03 +05:30
committed by GitHub
35 changed files with 390 additions and 260 deletions
+45
View File
@@ -0,0 +1,45 @@
<?php
namespace Appwrite\Event\Message;
use Utopia\Database\Document;
final class Delete extends Base
{
public function __construct(
public readonly ?Document $project = null,
public readonly string $type = '',
public readonly ?Document $document = null,
public readonly ?string $resource = null,
public readonly ?string $resourceType = null,
public readonly ?string $datetime = null,
public readonly ?string $hourlyUsageRetentionDatetime = null,
) {
}
public function toArray(): array
{
return [
'project' => $this->project?->getArrayCopy(),
'type' => $this->type,
'document' => $this->document?->getArrayCopy(),
'resource' => $this->resource,
'resourceType' => $this->resourceType,
'datetime' => $this->datetime,
'hourlyUsageRetentionDatetime' => $this->hourlyUsageRetentionDatetime,
];
}
public static function fromArray(array $data): static
{
return new self(
project: !empty($data['project']) ? new Document($data['project']) : null,
type: $data['type'] ?? '',
document: !empty($data['document']) ? new Document($data['document']) : null,
resource: $data['resource'] ?? null,
resourceType: $data['resourceType'] ?? null,
datetime: $data['datetime'] ?? null,
hourlyUsageRetentionDatetime: $data['hourlyUsageRetentionDatetime'] ?? null,
);
}
}
+27
View File
@@ -0,0 +1,27 @@
<?php
namespace Appwrite\Event\Publisher;
use Appwrite\Event\Message\Delete as DeleteMessage;
use Utopia\Queue\Publisher;
use Utopia\Queue\Queue;
readonly class Delete extends Base
{
public function __construct(
Publisher $publisher,
protected Queue $queue,
) {
parent::__construct($publisher);
}
public function enqueue(DeleteMessage $message, ?Queue $queue = null): string|bool
{
return $this->publish($queue ?? $this->queue, $message);
}
public function getSize(bool $failed = false, ?Queue $queue = null): int
{
return $this->getQueueSize($queue ?? $this->queue, $failed);
}
}
@@ -2,8 +2,9 @@
namespace Appwrite\Platform\Modules\Advisor\Http\Reports;
use Appwrite\Event\Delete as DeleteEvent;
use Appwrite\Event\Event;
use Appwrite\Event\Message\Delete as DeleteMessage;
use Appwrite\Event\Publisher\Delete as DeletePublisher;
use Appwrite\Extend\Exception;
use Appwrite\Platform\Action;
use Appwrite\SDK\AuthType;
@@ -58,7 +59,7 @@ class Delete extends Action
->inject('response')
->inject('project')
->inject('dbForPlatform')
->inject('queueForDeletes')
->inject('publisherForDeletes')
->inject('queueForEvents')
->callback($this->action(...));
}
@@ -68,7 +69,7 @@ class Delete extends Action
Response $response,
Document $project,
Database $dbForPlatform,
DeleteEvent $queueForDeletes,
DeletePublisher $publisherForDeletes,
Event $queueForEvents
): void {
$report = $dbForPlatform->skipFilters(
@@ -84,9 +85,11 @@ class Delete extends Action
throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Failed to remove report from DB');
}
$queueForDeletes
->setType(DELETE_TYPE_REPORT)
->setDocument($report);
$publisherForDeletes->enqueue(new DeleteMessage(
project: $project,
type: DELETE_TYPE_REPORT,
document: $report,
));
$queueForEvents
->setParam('reportId', $report->getId())
@@ -2,7 +2,8 @@
namespace Appwrite\Platform\Modules\Databases\Http\Databases\Transactions;
use Appwrite\Event\Delete as DeleteEvent;
use Appwrite\Event\Message\Delete as DeleteMessage;
use Appwrite\Event\Publisher\Delete as DeletePublisher;
use Appwrite\Extend\Exception;
use Appwrite\SDK\AuthType;
use Appwrite\SDK\ContentType;
@@ -10,6 +11,7 @@ use Appwrite\SDK\Method;
use Appwrite\SDK\Response as SDKResponse;
use Appwrite\Utopia\Response as UtopiaResponse;
use Utopia\Database\Database;
use Utopia\Database\Document;
use Utopia\Database\Validator\UID;
use Utopia\Http\Adapter\Swoole\Response as SwooleResponse;
@@ -51,11 +53,12 @@ class Delete extends Action
->param('transactionId', '', fn (Database $dbForProject) => new UID($dbForProject->getAdapter()->getMaxUIDLength()), 'Transaction ID.', false, ['dbForProject'])
->inject('response')
->inject('dbForProject')
->inject('queueForDeletes')
->inject('publisherForDeletes')
->inject('project')
->callback($this->action(...));
}
public function action(string $transactionId, UtopiaResponse $response, Database $dbForProject, DeleteEvent $queueForDeletes): void
public function action(string $transactionId, UtopiaResponse $response, Database $dbForProject, DeletePublisher $publisherForDeletes, Document $project): void
{
$transaction = $dbForProject->getDocument('transactions', $transactionId);
@@ -65,9 +68,11 @@ class Delete extends Action
$dbForProject->deleteDocument('transactions', $transactionId);
$queueForDeletes
->setType(DELETE_TYPE_DOCUMENT)
->setDocument($transaction);
$publisherForDeletes->enqueue(new DeleteMessage(
project: $project,
type: DELETE_TYPE_DOCUMENT,
document: $transaction,
));
$response->noContent();
}
@@ -3,9 +3,10 @@
namespace Appwrite\Platform\Modules\Databases\Http\Databases\Transactions;
use Appwrite\Databases\TransactionState;
use Appwrite\Event\Delete;
use Appwrite\Event\Event;
use Appwrite\Event\Message\Delete as DeleteMessage;
use Appwrite\Event\Message\Func as FunctionMessage;
use Appwrite\Event\Publisher\Delete as DeletePublisher;
use Appwrite\Event\Publisher\Func as FunctionPublisher;
use Appwrite\Extend\Exception;
use Appwrite\Functions\EventProcessor;
@@ -75,7 +76,7 @@ class Update extends Action
->inject('getDatabasesDB')
->inject('user')
->inject('transactionState')
->inject('queueForDeletes')
->inject('publisherForDeletes')
->inject('queueForEvents')
->inject('usage')
->inject('queueForRealtime')
@@ -95,7 +96,7 @@ class Update extends Action
* @param callable $getDatabasesDB
* @param User $user
* @param TransactionState $transactionState
* @param Delete $queueForDeletes
* @param DeletePublisher $publisherForDeletes
* @param Event $queueForEvents
* @param Context $usage
* @param Event $queueForRealtime
@@ -110,7 +111,7 @@ class Update extends Action
* @throws StructureException
* @throws \Utopia\Http\Exception
*/
public function action(string $transactionId, bool $commit, bool $rollback, Document $project, UtopiaResponse $response, Database $dbForProject, callable $getDatabasesDB, User $user, TransactionState $transactionState, Delete $queueForDeletes, Event $queueForEvents, Context $usage, Event $queueForRealtime, FunctionPublisher $publisherForFunctions, Event $queueForWebhooks, Authorization $authorization, EventProcessor $eventProcessor): void
public function action(string $transactionId, bool $commit, bool $rollback, Document $project, UtopiaResponse $response, Database $dbForProject, callable $getDatabasesDB, User $user, TransactionState $transactionState, DeletePublisher $publisherForDeletes, Event $queueForEvents, Context $usage, Event $queueForRealtime, FunctionPublisher $publisherForFunctions, Event $queueForWebhooks, Authorization $authorization, EventProcessor $eventProcessor): void
{
if (!$commit && !$rollback) {
throw new Exception(Exception::GENERAL_BAD_REQUEST, 'Either commit or rollback must be true');
@@ -156,9 +157,11 @@ class Update extends Action
new Document(['status' => 'committed'])
));
$queueForDeletes
->setType(DELETE_TYPE_DOCUMENT)
->setDocument($transaction);
$publisherForDeletes->enqueue(new DeleteMessage(
project: $project,
type: DELETE_TYPE_DOCUMENT,
document: $transaction,
));
$response
->setStatusCode(SwooleResponse::STATUS_CODE_OK)
@@ -295,9 +298,11 @@ class Update extends Action
new Document(['status' => 'committed'])
));
$queueForDeletes
->setType(DELETE_TYPE_DOCUMENT)
->setDocument($transaction);
$publisherForDeletes->enqueue(new DeleteMessage(
project: $project,
type: DELETE_TYPE_DOCUMENT,
document: $transaction,
));
} catch (NotFoundException $e) {
$authorization->skip(fn () => $dbForProject->updateDocument('transactions', $transactionId, new Document([
'status' => 'failed',
@@ -501,9 +506,11 @@ class Update extends Action
new Document(['status' => 'failed'])
));
$queueForDeletes
->setType(DELETE_TYPE_DOCUMENT)
->setDocument($transaction);
$publisherForDeletes->enqueue(new DeleteMessage(
project: $project,
type: DELETE_TYPE_DOCUMENT,
document: $transaction,
));
}
$response
@@ -49,7 +49,8 @@ class Delete extends TransactionsDelete
->param('transactionId', '', new UID(), 'Transaction ID.')
->inject('response')
->inject('dbForProject')
->inject('queueForDeletes')
->inject('publisherForDeletes')
->inject('project')
->callback($this->action(...));
}
}
@@ -56,7 +56,7 @@ class Update extends TransactionsUpdate
->inject('getDatabasesDB')
->inject('user')
->inject('transactionState')
->inject('queueForDeletes')
->inject('publisherForDeletes')
->inject('queueForEvents')
->inject('usage')
->inject('queueForRealtime')
@@ -50,7 +50,8 @@ class Delete extends TransactionsDelete
->param('transactionId', '', fn (Database $dbForProject) => new UID($dbForProject->getAdapter()->getMaxUIDLength()), 'Transaction ID.', false, ['dbForProject'])
->inject('response')
->inject('dbForProject')
->inject('queueForDeletes')
->inject('publisherForDeletes')
->inject('project')
->callback($this->action(...));
}
}
@@ -57,7 +57,7 @@ class Update extends TransactionsUpdate
->inject('getDatabasesDB')
->inject('user')
->inject('transactionState')
->inject('queueForDeletes')
->inject('publisherForDeletes')
->inject('queueForEvents')
->inject('usage')
->inject('queueForRealtime')
@@ -49,7 +49,8 @@ class Delete extends TransactionsDelete
->param('transactionId', '', new UID(), 'Transaction ID.')
->inject('response')
->inject('dbForProject')
->inject('queueForDeletes')
->inject('publisherForDeletes')
->inject('project')
->callback($this->action(...));
}
}
@@ -56,7 +56,7 @@ class Update extends TransactionsUpdate
->inject('getDatabasesDB')
->inject('user')
->inject('transactionState')
->inject('queueForDeletes')
->inject('publisherForDeletes')
->inject('queueForEvents')
->inject('usage')
->inject('queueForRealtime')
@@ -2,8 +2,9 @@
namespace Appwrite\Platform\Modules\Functions\Http\Deployments;
use Appwrite\Event\Delete as DeleteEvent;
use Appwrite\Event\Event;
use Appwrite\Event\Message\Delete as DeleteMessage;
use Appwrite\Event\Publisher\Delete as DeletePublisher;
use Appwrite\Extend\Exception;
use Appwrite\SDK\AuthType;
use Appwrite\SDK\ContentType;
@@ -59,7 +60,7 @@ class Delete extends Action
->param('deploymentId', '', fn (Database $dbForProject) => new UID($dbForProject->getAdapter()->getMaxUIDLength()), 'Deployment ID.', false, ['dbForProject'])
->inject('response')
->inject('dbForProject')
->inject('queueForDeletes')
->inject('publisherForDeletes')
->inject('queueForEvents')
->inject('deviceForFunctions')
->callback($this->action(...));
@@ -70,7 +71,7 @@ class Delete extends Action
string $deploymentId,
Response $response,
Database $dbForProject,
DeleteEvent $queueForDeletes,
DeletePublisher $publisherForDeletes,
Event $queueForEvents,
Device $deviceForFunctions
) {
@@ -128,9 +129,11 @@ class Delete extends Action
->setParam('functionId', $function->getId())
->setParam('deploymentId', $deployment->getId());
$queueForDeletes
->setType(DELETE_TYPE_DOCUMENT)
->setDocument($deployment);
$publisherForDeletes->enqueue(new DeleteMessage(
project: $queueForEvents->getProject(),
type: DELETE_TYPE_DOCUMENT,
document: $deployment,
));
$response->noContent();
}
@@ -3,9 +3,10 @@
namespace Appwrite\Platform\Modules\Functions\Http\Executions;
use Ahc\Jwt\JWT;
use Appwrite\Event\Delete as DeleteEvent;
use Appwrite\Event\Event;
use Appwrite\Event\Message\Delete as DeleteMessage;
use Appwrite\Event\Message\Func as FunctionMessage;
use Appwrite\Event\Publisher\Delete as DeletePublisher;
use Appwrite\Event\Publisher\Func as FunctionPublisher;
use Appwrite\Extend\Exception;
use Appwrite\Extend\Exception as AppwriteException;
@@ -103,7 +104,7 @@ class Create extends Base
->inject('executor')
->inject('platform')
->inject('authorization')
->inject('queueForDeletes')
->inject('publisherForDeletes')
->inject('executionsRetentionCount')
->callback($this->action(...));
}
@@ -131,7 +132,7 @@ class Create extends Base
Executor $executor,
array $platform,
Authorization $authorization,
DeleteEvent $queueForDeletes,
DeletePublisher $publisherForDeletes,
int $executionsRetentionCount,
) {
$async = \strval($async) === 'true' || \strval($async) === '1';
@@ -338,12 +339,12 @@ class Create extends Base
}
if ($executionsRetentionCount > 0 && ENABLE_EXECUTIONS_LIMIT_ON_ROUTE) {
$queueForDeletes
->setProject($project)
->setResource($function->getSequence())
->setResourceType(RESOURCE_TYPE_FUNCTIONS)
->setType(DELETE_TYPE_EXECUTIONS_LIMIT)
->trigger();
$publisherForDeletes->enqueue(new DeleteMessage(
project: $project,
type: DELETE_TYPE_EXECUTIONS_LIMIT,
resource: (string) $function->getSequence(),
resourceType: RESOURCE_TYPE_FUNCTIONS,
));
}
$response->setStatusCode(Response::STATUS_CODE_ACCEPTED);
@@ -529,12 +530,12 @@ class Create extends Base
}
if ($executionsRetentionCount > 0 && ENABLE_EXECUTIONS_LIMIT_ON_ROUTE) {
$queueForDeletes
->setProject($project)
->setResource($function->getSequence())
->setResourceType(RESOURCE_TYPE_FUNCTIONS)
->setType(DELETE_TYPE_EXECUTIONS_LIMIT)
->trigger();
$publisherForDeletes->enqueue(new DeleteMessage(
project: $project,
type: DELETE_TYPE_EXECUTIONS_LIMIT,
resource: (string) $function->getSequence(),
resourceType: RESOURCE_TYPE_FUNCTIONS,
));
}
$response
@@ -2,8 +2,9 @@
namespace Appwrite\Platform\Modules\Functions\Http\Functions;
use Appwrite\Event\Delete as DeleteEvent;
use Appwrite\Event\Event;
use Appwrite\Event\Message\Delete as DeleteMessage;
use Appwrite\Event\Publisher\Delete as DeletePublisher;
use Appwrite\Extend\Exception;
use Appwrite\Platform\Modules\Compute\Base;
use Appwrite\SDK\AuthType;
@@ -59,7 +60,7 @@ class Delete extends Base
->param('functionId', '', fn (Database $dbForProject) => new UID($dbForProject->getAdapter()->getMaxUIDLength()), 'Function ID.', false, ['dbForProject'])
->inject('response')
->inject('dbForProject')
->inject('queueForDeletes')
->inject('publisherForDeletes')
->inject('queueForEvents')
->inject('dbForPlatform')
->inject('authorization')
@@ -70,7 +71,7 @@ class Delete extends Base
string $functionId,
Response $response,
Database $dbForProject,
DeleteEvent $queueForDeletes,
DeletePublisher $publisherForDeletes,
Event $queueForEvents,
Database $dbForPlatform,
Authorization $authorization
@@ -97,9 +98,11 @@ class Delete extends Base
])));
}
$queueForDeletes
->setType(DELETE_TYPE_DOCUMENT)
->setDocument($function);
$publisherForDeletes->enqueue(new DeleteMessage(
project: $queueForEvents->getProject(),
type: DELETE_TYPE_DOCUMENT,
document: $function,
));
$queueForEvents->setParam('functionId', $function->getId());
@@ -2,7 +2,7 @@
namespace Appwrite\Platform\Modules\Health\Http\Health\Queue\Deletes;
use Appwrite\Event\Delete;
use Appwrite\Event\Publisher\Delete;
use Appwrite\Platform\Modules\Health\Http\Health\Queue\Base;
use Appwrite\SDK\AuthType;
use Appwrite\SDK\ContentType;
@@ -42,16 +42,16 @@ class Get extends Base
contentType: ContentType::JSON
))
->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true)
->inject('queueForDeletes')
->inject('publisherForDeletes')
->inject('response')
->callback($this->action(...));
}
public function action(int|string $threshold, Delete $queueForDeletes, Response $response): void
public function action(int|string $threshold, Delete $publisherForDeletes, Response $response): void
{
$threshold = (int) $threshold;
$size = $queueForDeletes->getSize();
$size = $publisherForDeletes->getSize();
$this->assertQueueThreshold($size, $threshold);
@@ -3,11 +3,11 @@
namespace Appwrite\Platform\Modules\Health\Http\Health\Queue\Failed;
use Appwrite\Event\Database;
use Appwrite\Event\Delete;
use Appwrite\Event\Event;
use Appwrite\Event\Publisher\Audit;
use Appwrite\Event\Publisher\Build as BuildPublisher;
use Appwrite\Event\Publisher\Certificate;
use Appwrite\Event\Publisher\Delete as DeletePublisher;
use Appwrite\Event\Publisher\Func as FunctionPublisher;
use Appwrite\Event\Publisher\Mail as MailPublisher;
use Appwrite\Event\Publisher\Messaging as MessagingPublisher;
@@ -75,7 +75,7 @@ class Get extends Base
->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true)
->inject('response')
->inject('queueForDatabase')
->inject('queueForDeletes')
->inject('publisherForDeletes')
->inject('publisherForAudits')
->inject('publisherForMails')
->inject('publisherForFunctions')
@@ -95,7 +95,7 @@ class Get extends Base
int|string $threshold,
Response $response,
Database $queueForDatabase,
Delete $queueForDeletes,
DeletePublisher $publisherForDeletes,
Audit $publisherForAudits,
MailPublisher $publisherForMails,
FunctionPublisher $publisherForFunctions,
@@ -112,7 +112,7 @@ class Get extends Base
$queue = match ($name) {
System::getEnv('_APP_DATABASE_QUEUE_NAME', Event::DATABASE_QUEUE_NAME) => $queueForDatabase,
System::getEnv('_APP_DELETE_QUEUE_NAME', Event::DELETE_QUEUE_NAME) => $queueForDeletes,
System::getEnv('_APP_DELETE_QUEUE_NAME', Event::DELETE_QUEUE_NAME) => $publisherForDeletes,
System::getEnv('_APP_AUDITS_QUEUE_NAME', Event::AUDITS_QUEUE_NAME) => $publisherForAudits,
System::getEnv('_APP_MAILS_QUEUE_NAME', Event::MAILS_QUEUE_NAME) => $publisherForMails,
System::getEnv('_APP_FUNCTIONS_QUEUE_NAME', Event::FUNCTIONS_QUEUE_NAME) => $publisherForFunctions,
@@ -2,7 +2,8 @@
namespace Appwrite\Platform\Modules\Project\Http\Project;
use Appwrite\Event\Delete as DeleteQueue;
use Appwrite\Event\Message\Delete as DeleteMessage;
use Appwrite\Event\Publisher\Delete as DeletePublisher;
use Appwrite\Extend\Exception;
use Appwrite\SDK\AuthType;
use Appwrite\SDK\ContentType;
@@ -53,7 +54,7 @@ class Delete extends Action
))
->inject('response')
->inject('dbForPlatform')
->inject('queueForDeletes')
->inject('publisherForDeletes')
->inject('authorization')
->inject('project')
->callback($this->action(...));
@@ -62,19 +63,20 @@ class Delete extends Action
public function action(
Response $response,
Database $dbForPlatform,
DeleteQueue $queueForDeletes,
DeletePublisher $publisherForDeletes,
Authorization $authorization,
Document $project,
) {
$queueForDeletes
->setProject($project)
->setType(DELETE_TYPE_DOCUMENT)
->setDocument($project);
if (!$authorization->skip(fn () => $dbForPlatform->deleteDocument('projects', $project->getId()))) {
throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Failed to remove project from DB');
}
$publisherForDeletes->enqueue(new DeleteMessage(
project: $project,
type: DELETE_TYPE_DOCUMENT,
document: $project,
));
$response->noContent();
}
}
@@ -2,8 +2,9 @@
namespace Appwrite\Platform\Modules\Proxy\Http\Rules;
use Appwrite\Event\Delete as DeleteEvent;
use Appwrite\Event\Event;
use Appwrite\Event\Message\Delete as DeleteMessage;
use Appwrite\Event\Publisher\Delete as DeletePublisher;
use Appwrite\Extend\Exception;
use Appwrite\SDK\AuthType;
use Appwrite\SDK\ContentType;
@@ -57,7 +58,7 @@ class Delete extends Action
->inject('response')
->inject('project')
->inject('dbForPlatform')
->inject('queueForDeletes')
->inject('publisherForDeletes')
->inject('queueForEvents')
->inject('authorization')
->callback($this->action(...));
@@ -68,7 +69,7 @@ class Delete extends Action
Response $response,
Document $project,
Database $dbForPlatform,
DeleteEvent $queueForDeletes,
DeletePublisher $publisherForDeletes,
Event $queueForEvents,
Authorization $authorization,
) {
@@ -80,9 +81,11 @@ class Delete extends Action
$authorization->skip(fn () => $dbForPlatform->deleteDocument('rules', $rule->getId()));
$queueForDeletes
->setType(DELETE_TYPE_DOCUMENT)
->setDocument($rule);
$publisherForDeletes->enqueue(new DeleteMessage(
project: $project,
type: DELETE_TYPE_DOCUMENT,
document: $rule,
));
$queueForEvents->setParam('ruleId', $rule->getId());
@@ -2,8 +2,9 @@
namespace Appwrite\Platform\Modules\Sites\Http\Deployments;
use Appwrite\Event\Delete as DeleteEvent;
use Appwrite\Event\Event;
use Appwrite\Event\Message\Delete as DeleteMessage;
use Appwrite\Event\Publisher\Delete as DeletePublisher;
use Appwrite\Extend\Exception;
use Appwrite\SDK\AuthType;
use Appwrite\SDK\ContentType;
@@ -59,7 +60,7 @@ class Delete extends Action
->param('deploymentId', '', fn (Database $dbForProject) => new UID($dbForProject->getAdapter()->getMaxUIDLength()), 'Deployment ID.', false, ['dbForProject'])
->inject('response')
->inject('dbForProject')
->inject('queueForDeletes')
->inject('publisherForDeletes')
->inject('queueForEvents')
->inject('deviceForSites')
->callback($this->action(...));
@@ -70,7 +71,7 @@ class Delete extends Action
string $deploymentId,
Response $response,
Database $dbForProject,
DeleteEvent $queueForDeletes,
DeletePublisher $publisherForDeletes,
Event $queueForEvents,
Device $deviceForSites
) {
@@ -130,9 +131,11 @@ class Delete extends Action
->setParam('siteId', $site->getId())
->setParam('deploymentId', $deployment->getId());
$queueForDeletes
->setType(DELETE_TYPE_DOCUMENT)
->setDocument($deployment);
$publisherForDeletes->enqueue(new DeleteMessage(
project: $queueForEvents->getProject(),
type: DELETE_TYPE_DOCUMENT,
document: $deployment,
));
$response->noContent();
}
@@ -2,8 +2,9 @@
namespace Appwrite\Platform\Modules\Sites\Http\Sites;
use Appwrite\Event\Delete as DeleteEvent;
use Appwrite\Event\Event;
use Appwrite\Event\Message\Delete as DeleteMessage;
use Appwrite\Event\Publisher\Delete as DeletePublisher;
use Appwrite\Extend\Exception;
use Appwrite\Platform\Modules\Compute\Base;
use Appwrite\SDK\AuthType;
@@ -56,7 +57,7 @@ class Delete extends Base
->param('siteId', '', fn (Database $dbForProject) => new UID($dbForProject->getAdapter()->getMaxUIDLength()), 'Site ID.', false, ['dbForProject'])
->inject('response')
->inject('dbForProject')
->inject('queueForDeletes')
->inject('publisherForDeletes')
->inject('queueForEvents')
->callback($this->action(...));
}
@@ -65,7 +66,7 @@ class Delete extends Base
string $siteId,
Response $response,
Database $dbForProject,
DeleteEvent $queueForDeletes,
DeletePublisher $publisherForDeletes,
Event $queueForEvents
) {
$site = $dbForProject->getDocument('sites', $siteId);
@@ -78,9 +79,11 @@ class Delete extends Base
throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Failed to remove site from DB');
}
$queueForDeletes
->setType(DELETE_TYPE_DOCUMENT)
->setDocument($site);
$publisherForDeletes->enqueue(new DeleteMessage(
project: $queueForEvents->getProject(),
type: DELETE_TYPE_DOCUMENT,
document: $site,
));
$queueForEvents->setParam('siteId', $site->getId());
@@ -2,8 +2,9 @@
namespace Appwrite\Platform\Modules\Storage\Http\Buckets;
use Appwrite\Event\Delete as DeleteEvent;
use Appwrite\Event\Event;
use Appwrite\Event\Message\Delete as DeleteMessage;
use Appwrite\Event\Publisher\Delete as DeletePublisher;
use Appwrite\Extend\Exception;
use Appwrite\SDK\AuthType;
use Appwrite\SDK\ContentType;
@@ -53,7 +54,7 @@ class Delete extends Action
->param('bucketId', '', new UID(), 'Bucket unique ID.')
->inject('response')
->inject('dbForProject')
->inject('queueForDeletes')
->inject('publisherForDeletes')
->inject('queueForEvents')
->callback($this->action(...));
}
@@ -62,7 +63,7 @@ class Delete extends Action
string $bucketId,
Response $response,
Database $dbForProject,
DeleteEvent $queueForDeletes,
DeletePublisher $publisherForDeletes,
Event $queueForEvents
) {
$bucket = $dbForProject->getDocument('buckets', $bucketId);
@@ -75,9 +76,11 @@ class Delete extends Action
throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Failed to remove bucket from DB');
}
$queueForDeletes
->setType(DELETE_TYPE_DOCUMENT)
->setDocument($bucket);
$publisherForDeletes->enqueue(new DeleteMessage(
project: $queueForEvents->getProject(),
type: DELETE_TYPE_DOCUMENT,
document: $bucket,
));
$queueForEvents
->setParam('bucketId', $bucket->getId())
@@ -2,8 +2,9 @@
namespace Appwrite\Platform\Modules\Storage\Http\Buckets\Files;
use Appwrite\Event\Delete as DeleteEvent;
use Appwrite\Event\Event;
use Appwrite\Event\Message\Delete as DeleteMessage;
use Appwrite\Event\Publisher\Delete as DeletePublisher;
use Appwrite\Extend\Exception;
use Appwrite\SDK\AuthType;
use Appwrite\SDK\ContentType;
@@ -64,7 +65,7 @@ class Delete extends Action
->inject('dbForProject')
->inject('queueForEvents')
->inject('deviceForFiles')
->inject('queueForDeletes')
->inject('publisherForDeletes')
->inject('authorization')
->inject('user')
->callback($this->action(...));
@@ -77,7 +78,7 @@ class Delete extends Action
Database $dbForProject,
Event $queueForEvents,
Device $deviceForFiles,
DeleteEvent $queueForDeletes,
DeletePublisher $publisherForDeletes,
Authorization $authorization,
User $user,
) {
@@ -126,11 +127,12 @@ class Delete extends Action
}
if ($deviceDeleted) {
$queueForDeletes
->setType(DELETE_TYPE_CACHE_BY_RESOURCE)
->setResourceType('bucket/' . $bucket->getId())
->setResource('file/' . $fileId)
;
$publisherForDeletes->enqueue(new DeleteMessage(
project: $queueForEvents->getProject(),
type: DELETE_TYPE_CACHE_BY_RESOURCE,
resource: 'file/' . $fileId,
resourceType: 'bucket/' . $bucket->getId(),
));
try {
if ($fileSecurity && !$valid) {
@@ -2,8 +2,9 @@
namespace Appwrite\Platform\Modules\Teams\Http\Teams;
use Appwrite\Event\Delete as DeleteEvent;
use Appwrite\Event\Event;
use Appwrite\Event\Message\Delete as DeleteMessage;
use Appwrite\Event\Publisher\Delete as DeletePublisher;
use Appwrite\Extend\Exception;
use Appwrite\Platform\Action;
use Appwrite\Platform\Workers\Deletes;
@@ -55,13 +56,13 @@ class Delete extends Action
->inject('response')
->inject('getProjectDB')
->inject('dbForProject')
->inject('queueForDeletes')
->inject('publisherForDeletes')
->inject('queueForEvents')
->inject('project')
->callback($this->action(...));
}
public function action(string $teamId, Response $response, callable $getProjectDB, Database $dbForProject, DeleteEvent $queueForDeletes, Event $queueForEvents, Document $project)
public function action(string $teamId, Response $response, callable $getProjectDB, Database $dbForProject, DeletePublisher $publisherForDeletes, Event $queueForEvents, Document $project)
{
$team = $dbForProject->getDocument('teams', $teamId);
@@ -79,15 +80,18 @@ class Delete extends Action
// Async delete
if ($project->getId() === 'console') {
$queueForDeletes
->setType(DELETE_TYPE_TEAM_PROJECTS)
->setDocument($team)
->trigger();
$publisherForDeletes->enqueue(new DeleteMessage(
project: $project,
type: DELETE_TYPE_TEAM_PROJECTS,
document: $team,
));
}
$queueForDeletes
->setType(DELETE_TYPE_DOCUMENT)
->setDocument($team);
$publisherForDeletes->enqueue(new DeleteMessage(
project: $project,
type: DELETE_TYPE_DOCUMENT,
document: $team,
));
$queueForEvents
->setParam('teamId', $team->getId())
@@ -2,7 +2,8 @@
namespace Appwrite\Platform\Modules\VCS\Http\Installations;
use Appwrite\Event\Delete as DeleteEvent;
use Appwrite\Event\Message\Delete as DeleteMessage;
use Appwrite\Event\Publisher\Delete as DeletePublisher;
use Appwrite\Extend\Exception;
use Appwrite\Platform\Action;
use Appwrite\SDK\AuthType;
@@ -11,6 +12,7 @@ use Appwrite\SDK\Method;
use Appwrite\SDK\Response as SDKResponse;
use Appwrite\Utopia\Response;
use Utopia\Database\Database;
use Utopia\Database\Document;
use Utopia\Platform\Scope\HTTP;
use Utopia\Validator\Text;
@@ -49,7 +51,8 @@ class Delete extends Action
->param('installationId', '', new Text(256), 'Installation Id')
->inject('response')
->inject('dbForPlatform')
->inject('queueForDeletes')
->inject('publisherForDeletes')
->inject('project')
->callback($this->action(...));
}
@@ -57,7 +60,8 @@ class Delete extends Action
string $installationId,
Response $response,
Database $dbForPlatform,
DeleteEvent $queueForDeletes
DeletePublisher $publisherForDeletes,
Document $project,
) {
$installation = $dbForPlatform->getDocument('installations', $installationId);
@@ -69,9 +73,11 @@ class Delete extends Action
throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Failed to remove installation from DB');
}
$queueForDeletes
->setType(DELETE_TYPE_DOCUMENT)
->setDocument($installation);
$publisherForDeletes->enqueue(new DeleteMessage(
project: $project,
type: DELETE_TYPE_DOCUMENT,
document: $installation,
));
$response->noContent();
}
+37 -38
View File
@@ -2,8 +2,9 @@
namespace Appwrite\Platform\Tasks;
use Appwrite\Event\Delete;
use Appwrite\Event\Message\Delete as DeleteMessage;
use Appwrite\Event\Publisher\Certificate;
use Appwrite\Event\Publisher\Delete as DeletePublisher;
use DateInterval;
use DateTime;
use Utopia\Console;
@@ -30,11 +31,11 @@ class Maintenance extends Action
->inject('dbForPlatform')
->inject('console')
->inject('publisherForCertificates')
->inject('queueForDeletes')
->inject('publisherForDeletes')
->callback($this->action(...));
}
public function action(string $type, Database $dbForPlatform, Document $console, Certificate $publisherForCertificates, Delete $queueForDeletes): void
public function action(string $type, Database $dbForPlatform, Document $console, Certificate $publisherForCertificates, DeletePublisher $publisherForDeletes): void
{
Console::title('Maintenance V1');
Console::success(APP_NAME . ' maintenance process v1 has started');
@@ -59,7 +60,7 @@ class Maintenance extends Action
$delay = $next->getTimestamp() - $now->getTimestamp();
}
$action = function () use ($interval, $cacheRetention, $schedulesDeletionRetention, $usageStatsRetentionHourly, $dbForPlatform, $console, $queueForDeletes, $publisherForCertificates) {
$action = function () use ($interval, $cacheRetention, $schedulesDeletionRetention, $usageStatsRetentionHourly, $dbForPlatform, $console, $publisherForDeletes, $publisherForCertificates) {
$time = DatabaseDateTime::now();
Console::info("[{$time}] Notifying workers with maintenance tasks every {$interval} seconds");
@@ -70,12 +71,12 @@ class Maintenance extends Action
$dbForPlatform->foreach(
'projects',
function (Document $project) use ($queueForDeletes, $usageStatsRetentionHourly) {
$queueForDeletes
->setType(DELETE_TYPE_MAINTENANCE)
->setProject($project)
->setUsageRetentionHourlyDateTime(DatabaseDateTime::addSeconds(new \DateTime(), -1 * $usageStatsRetentionHourly))
->trigger();
function (Document $project) use ($publisherForDeletes, $usageStatsRetentionHourly) {
$publisherForDeletes->enqueue(new DeleteMessage(
project: $project,
type: DELETE_TYPE_MAINTENANCE,
hourlyUsageRetentionDatetime: DatabaseDateTime::addSeconds(new \DateTime(), -1 * $usageStatsRetentionHourly),
));
},
[
Query::equal('region', [System::getEnv('_APP_REGION', 'default')]),
@@ -85,17 +86,17 @@ class Maintenance extends Action
]
);
$queueForDeletes
->setType(DELETE_TYPE_MAINTENANCE)
->setProject($console)
->setUsageRetentionHourlyDateTime(DatabaseDateTime::addSeconds(new \DateTime(), -1 * $usageStatsRetentionHourly))
->trigger();
$publisherForDeletes->enqueue(new DeleteMessage(
project: $console,
type: DELETE_TYPE_MAINTENANCE,
hourlyUsageRetentionDatetime: DatabaseDateTime::addSeconds(new \DateTime(), -1 * $usageStatsRetentionHourly),
));
$this->notifyDeleteConnections($queueForDeletes);
$this->notifyDeleteConnections($publisherForDeletes);
$this->renewCertificates($dbForPlatform, $publisherForCertificates);
$this->notifyDeleteCache($cacheRetention, $queueForDeletes);
$this->notifyDeleteSchedules($schedulesDeletionRetention, $queueForDeletes);
$this->notifyDeleteCSVExports($queueForDeletes);
$this->notifyDeleteCache($cacheRetention, $publisherForDeletes);
$this->notifyDeleteSchedules($schedulesDeletionRetention, $publisherForDeletes);
$this->notifyDeleteCSVExports($publisherForDeletes);
};
if ($type === 'loop') {
@@ -109,19 +110,17 @@ class Maintenance extends Action
}
}
private function notifyDeleteConnections(Delete $queueForDeletes): void
private function notifyDeleteConnections(DeletePublisher $publisherForDeletes): void
{
$queueForDeletes
->setType(DELETE_TYPE_REALTIME)
->setDatetime(DatabaseDateTime::addSeconds(new \DateTime(), -60))
->trigger();
$publisherForDeletes->enqueue(new DeleteMessage(
type: DELETE_TYPE_REALTIME,
datetime: DatabaseDateTime::addSeconds(new \DateTime(), -60),
));
}
private function notifyDeleteCSVExports(Delete $queueForDeletes): void
private function notifyDeleteCSVExports(DeletePublisher $publisherForDeletes): void
{
$queueForDeletes
->setType(DELETE_TYPE_CSV_EXPORTS)
->trigger();
$publisherForDeletes->enqueue(new DeleteMessage(type: DELETE_TYPE_CSV_EXPORTS));
}
private function renewCertificates(Database $dbForPlatform, Certificate $publisherForCertificate): void
@@ -172,19 +171,19 @@ class Maintenance extends Action
}
}
private function notifyDeleteCache($interval, Delete $queueForDeletes): void
private function notifyDeleteCache($interval, DeletePublisher $publisherForDeletes): void
{
$queueForDeletes
->setType(DELETE_TYPE_CACHE_BY_TIMESTAMP)
->setDatetime(DatabaseDateTime::addSeconds(new \DateTime(), -1 * $interval))
->trigger();
$publisherForDeletes->enqueue(new DeleteMessage(
type: DELETE_TYPE_CACHE_BY_TIMESTAMP,
datetime: DatabaseDateTime::addSeconds(new \DateTime(), -1 * $interval),
));
}
private function notifyDeleteSchedules($interval, Delete $queueForDeletes): void
private function notifyDeleteSchedules($interval, DeletePublisher $publisherForDeletes): void
{
$queueForDeletes
->setType(DELETE_TYPE_SCHEDULES)
->setDatetime(DatabaseDateTime::addSeconds(new \DateTime(), -1 * $interval))
->trigger();
$publisherForDeletes->enqueue(new DeleteMessage(
type: DELETE_TYPE_SCHEDULES,
datetime: DatabaseDateTime::addSeconds(new \DateTime(), -1 * $interval),
));
}
}
+20 -18
View File
@@ -5,7 +5,8 @@ namespace Appwrite\Platform\Workers;
use Appwrite\Certificates\Adapter as CertificatesAdapter;
use Appwrite\Deletes\Identities;
use Appwrite\Deletes\Targets;
use Appwrite\Event\Delete as DeleteEvent;
use Appwrite\Event\Message\Delete as DeleteMessage;
use Appwrite\Event\Publisher\Delete as DeletePublisher;
use Appwrite\Extend\Exception;
use Executor\Executor;
use Throwable;
@@ -66,7 +67,7 @@ class Deletes extends Action
->inject('executionsRetentionCount')
->inject('auditRetention')
->inject('log')
->inject('queueForDeletes')
->inject('publisherForDeletes')
->inject('getAudit')
->callback($this->action(...));
}
@@ -93,7 +94,7 @@ class Deletes extends Action
int $executionsRetentionCount,
string $auditRetention,
Log $log,
DeleteEvent $queueForDeletes,
DeletePublisher $publisherForDeletes,
callable $getAudit,
): void {
$payload = $message->getPayload();
@@ -102,12 +103,13 @@ class Deletes extends Action
throw new Exception('Missing payload');
}
$type = $payload['type'] ?? '';
$datetime = $payload['datetime'] ?? null;
$hourlyUsageRetentionDatetime = $payload['hourlyUsageRetentionDatetime'] ?? null;
$resource = $payload['resource'] ?? null;
$resourceType = $payload['resourceType'] ?? null;
$document = new Document($payload['document'] ?? []);
$deleteMessage = DeleteMessage::fromArray($payload);
$type = $deleteMessage->type;
$datetime = $deleteMessage->datetime;
$hourlyUsageRetentionDatetime = $deleteMessage->hourlyUsageRetentionDatetime;
$resource = $deleteMessage->resource;
$resourceType = $deleteMessage->resourceType;
$document = $deleteMessage->document ?? new Document();
$log->addTag('projectId', $project->getId());
$log->addTag('type', $type);
@@ -214,7 +216,7 @@ class Deletes extends Action
$this->deleteUsageStats($project, $getProjectDB, $getLogsDB, $hourlyUsageRetentionDatetime);
$this->deleteExpiredSessions($project, $getProjectDB);
$this->deleteExpiredTransactions($project, $getProjectDB);
$this->deleteOldDeployments($queueForDeletes, $project, $getProjectDB);
$this->deleteOldDeployments($publisherForDeletes, $project, $getProjectDB);
break;
case DELETE_TYPE_REPORT:
$this->deleteReport($dbForPlatform, $project, $document);
@@ -390,12 +392,12 @@ class Deletes extends Action
Targets::delete($getProjectDB($project), Query::equal('sessionInternalId', [$session->getSequence()]));
}
private function deleteOldDeployments(DeleteEvent $queueForDeletes, Document $project, callable $getProjectDB): void
private function deleteOldDeployments(DeletePublisher $publisherForDeletes, Document $project, callable $getProjectDB): void
{
/** @var Database $dbForProject */
$dbForProject = $getProjectDB($project);
$removalCallback = function (Document $resource) use ($dbForProject, $queueForDeletes, $project) {
$removalCallback = function (Document $resource) use ($dbForProject, $publisherForDeletes, $project) {
$retention = $resource->getAttribute('deploymentRetention', 0);
// 0 means unlimited - never delete
@@ -420,12 +422,12 @@ class Deletes extends Action
'deployments',
$queries,
$dbForProject,
function (Document $deployment) use ($queueForDeletes, $project) {
$queueForDeletes
->setType(DELETE_TYPE_DOCUMENT)
->setDocument($deployment)
->setProject($project)
->trigger();
function (Document $deployment) use ($publisherForDeletes, $project) {
$publisherForDeletes->enqueue(new DeleteMessage(
project: $project,
type: DELETE_TYPE_DOCUMENT,
document: $deployment,
));
}
);
};