Merge pull request #11850 from appwrite/chore-migrate-selected-queues-to-publishers

Migrate executions, migrations, and stats resources to publishers
This commit is contained in:
Chirag Aggarwal
2026-04-10 15:13:15 +05:30
committed by GitHub
20 changed files with 308 additions and 160 deletions
+8 -9
View File
@@ -3,10 +3,10 @@
namespace Appwrite\Bus\Listeners;
use Appwrite\Bus\Events\ExecutionCompleted;
use Appwrite\Event\Execution;
use Appwrite\Event\Message\Execution as ExecutionMessage;
use Appwrite\Event\Publisher\Execution as ExecutionPublisher;
use Utopia\Bus\Listener;
use Utopia\Database\Document;
use Utopia\Queue\Publisher;
class Log extends Listener
{
@@ -24,16 +24,15 @@ class Log extends Listener
{
$this
->desc('Persists execution logs to database via queue')
->inject('publisher')
->inject('publisherForExecutions')
->callback($this->handle(...));
}
public function handle(ExecutionCompleted $event, Publisher $publisher): void
public function handle(ExecutionCompleted $event, ExecutionPublisher $publisherForExecutions): void
{
$queueForExecutions = new Execution($publisher);
$queueForExecutions
->setExecution(new Document($event->execution))
->setProject(new Document($event->project))
->trigger();
$publisherForExecutions->enqueue(new ExecutionMessage(
project: new Document($event->project),
execution: new Document($event->execution),
));
}
}
+30
View File
@@ -0,0 +1,30 @@
<?php
namespace Appwrite\Event\Message;
use Utopia\Database\Document;
final class Execution extends Base
{
public function __construct(
public readonly Document $project,
public readonly Document $execution,
) {
}
public function toArray(): array
{
return [
'project' => $this->project->getArrayCopy(),
'execution' => $this->execution->getArrayCopy(),
];
}
public static function fromArray(array $data): static
{
return new self(
project: new Document($data['project'] ?? []),
execution: new Document($data['execution'] ?? []),
);
}
}
+33
View File
@@ -0,0 +1,33 @@
<?php
namespace Appwrite\Event\Message;
use Utopia\Database\Document;
final class Migration extends Base
{
public function __construct(
public readonly Document $project,
public readonly Document $migration,
public readonly array $platform = [],
) {
}
public function toArray(): array
{
return [
'project' => $this->project->getArrayCopy(),
'migration' => $this->migration->getArrayCopy(),
'platform' => $this->platform,
];
}
public static function fromArray(array $data): static
{
return new self(
project: new Document($data['project'] ?? []),
migration: new Document($data['migration'] ?? []),
platform: $data['platform'] ?? [],
);
}
}
@@ -0,0 +1,27 @@
<?php
namespace Appwrite\Event\Message;
use Utopia\Database\Document;
final class StatsResources extends Base
{
public function __construct(
public readonly Document $project,
) {
}
public function toArray(): array
{
return [
'project' => $this->project->getArrayCopy(),
];
}
public static function fromArray(array $data): static
{
return new self(
project: new Document($data['project'] ?? []),
);
}
}
@@ -0,0 +1,27 @@
<?php
namespace Appwrite\Event\Publisher;
use Appwrite\Event\Message\Execution as ExecutionMessage;
use Utopia\Queue\Publisher;
use Utopia\Queue\Queue;
readonly class Execution extends Base
{
public function __construct(
Publisher $publisher,
protected Queue $queue
) {
parent::__construct($publisher);
}
public function enqueue(ExecutionMessage $message): string|bool
{
return $this->publish($this->queue, $message);
}
public function getSize(bool $failed = false): int
{
return $this->getQueueSize($this->queue, $failed);
}
}
@@ -0,0 +1,27 @@
<?php
namespace Appwrite\Event\Publisher;
use Appwrite\Event\Message\Migration as MigrationMessage;
use Utopia\Queue\Publisher;
use Utopia\Queue\Queue;
readonly class Migration extends Base
{
public function __construct(
Publisher $publisher,
protected Queue $queue
) {
parent::__construct($publisher);
}
public function enqueue(MigrationMessage $message): string|bool
{
return $this->publish($this->queue, $message);
}
public function getSize(bool $failed = false): int
{
return $this->getQueueSize($this->queue, $failed);
}
}
@@ -0,0 +1,34 @@
<?php
namespace Appwrite\Event\Publisher;
use Appwrite\Event\Message\StatsResources as StatsResourcesMessage;
use Utopia\Console;
use Utopia\Queue\Publisher;
use Utopia\Queue\Queue;
readonly class StatsResources extends Base
{
public function __construct(
Publisher $publisher,
protected Queue $queue
) {
parent::__construct($publisher);
}
public function enqueue(StatsResourcesMessage $message): string|bool
{
// Resource stats are best-effort; publishing failures should not interrupt the scheduler loop.
try {
return $this->publish($this->queue, $message);
} catch (\Throwable $th) {
Console::error('[StatsResources] Failed to publish stats resources message: ' . $th->getMessage());
return false;
}
}
public function getSize(bool $failed = false): int
{
return $this->getQueueSize($this->queue, $failed);
}
}
@@ -305,9 +305,7 @@ class Create extends Base
if ($async) {
if (is_null($scheduledAt)) {
if ($project->getId() != '6862e6a6000cce69f9da') {
$execution = $authorization->skip(fn () => $dbForProject->createDocument('executions', $execution));
}
$execution = $authorization->skip(fn () => $dbForProject->createDocument('executions', $execution));
$queueForFunctions
->setType('http')
->setExecution($execution)
@@ -348,9 +346,7 @@ class Create extends Base
->setAttribute('scheduleInternalId', $schedule->getSequence())
->setAttribute('scheduledAt', $scheduledAt);
if ($project->getId() != '6862e6a6000cce69f9da') {
$execution = $authorization->skip(fn () => $dbForProject->createDocument('executions', $execution));
}
$execution = $authorization->skip(fn () => $dbForProject->createDocument('executions', $execution));
}
if ($executionsRetentionCount > 0 && ENABLE_EXECUTIONS_LIMIT_ON_ROUTE) {
@@ -516,9 +512,7 @@ class Create extends Base
->addMetric(str_replace(['{resourceType}', '{resourceInternalId}'], [RESOURCE_TYPE_FUNCTIONS, $function->getSequence()], METRIC_RESOURCE_TYPE_ID_EXECUTIONS_MB_SECONDS), (int)(($spec['memory'] ?? APP_COMPUTE_MEMORY_DEFAULT) * $execution->getAttribute('duration', 0) * ($spec['cpus'] ?? APP_COMPUTE_CPUS_DEFAULT)))
;
if ($project->getId() != '6862e6a6000cce69f9da') {
$execution = $authorization->skip(fn () => $dbForProject->createDocument('executions', $execution));
}
$execution = $authorization->skip(fn () => $dbForProject->createDocument('executions', $execution));
}
$executionResponse['headers']['x-appwrite-execution-id'] = $execution->getId();
@@ -11,10 +11,10 @@ use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Event\Mail;
use Appwrite\Event\Messaging;
use Appwrite\Event\Migration;
use Appwrite\Event\Publisher\Migration as MigrationPublisher;
use Appwrite\Event\Publisher\StatsResources as StatsResourcesPublisher;
use Appwrite\Event\Publisher\Usage as UsagePublisher;
use Appwrite\Event\Screenshot;
use Appwrite\Event\StatsResources;
use Appwrite\Event\Webhook;
use Appwrite\Platform\Modules\Health\Http\Health\Queue\Base;
use Appwrite\SDK\AuthType;
@@ -78,13 +78,13 @@ class Get extends Base
->inject('queueForAudits')
->inject('queueForMails')
->inject('queueForFunctions')
->inject('queueForStatsResources')
->inject('publisherForStatsResources')
->inject('publisherForUsage')
->inject('queueForWebhooks')
->inject('queueForCertificates')
->inject('queueForBuilds')
->inject('queueForMessaging')
->inject('queueForMigrations')
->inject('publisherForMigrations')
->inject('queueForScreenshots')
->callback($this->action(...));
}
@@ -98,13 +98,13 @@ class Get extends Base
Audit $queueForAudits,
Mail $queueForMails,
Func $queueForFunctions,
StatsResources $queueForStatsResources,
StatsResourcesPublisher $publisherForStatsResources,
UsagePublisher $publisherForUsage,
Webhook $queueForWebhooks,
Certificate $queueForCertificates,
Build $queueForBuilds,
Messaging $queueForMessaging,
Migration $queueForMigrations,
MigrationPublisher $publisherForMigrations,
Screenshot $queueForScreenshots,
): void {
$threshold = (int) $threshold;
@@ -115,14 +115,14 @@ class Get extends Base
System::getEnv('_APP_AUDITS_QUEUE_NAME', Event::AUDITS_QUEUE_NAME) => $queueForAudits,
System::getEnv('_APP_MAILS_QUEUE_NAME', Event::MAILS_QUEUE_NAME) => $queueForMails,
System::getEnv('_APP_FUNCTIONS_QUEUE_NAME', Event::FUNCTIONS_QUEUE_NAME) => $queueForFunctions,
System::getEnv('_APP_STATS_RESOURCES_QUEUE_NAME', Event::STATS_RESOURCES_QUEUE_NAME) => $queueForStatsResources,
System::getEnv('_APP_STATS_RESOURCES_QUEUE_NAME', Event::STATS_RESOURCES_QUEUE_NAME) => $publisherForStatsResources,
System::getEnv('_APP_STATS_USAGE_QUEUE_NAME', Event::STATS_USAGE_QUEUE_NAME) => $publisherForUsage,
System::getEnv('_APP_WEBHOOK_QUEUE_NAME', Event::WEBHOOK_QUEUE_NAME) => $queueForWebhooks,
System::getEnv('_APP_CERTIFICATES_QUEUE_NAME', Event::CERTIFICATES_QUEUE_NAME) => $queueForCertificates,
System::getEnv('_APP_BUILDS_QUEUE_NAME', Event::BUILDS_QUEUE_NAME) => $queueForBuilds,
System::getEnv('_APP_SCREENSHOTS_QUEUE_NAME', Event::SCREENSHOTS_QUEUE_NAME) => $queueForScreenshots,
System::getEnv('_APP_MESSAGING_QUEUE_NAME', Event::MESSAGING_QUEUE_NAME) => $queueForMessaging,
System::getEnv('_APP_MIGRATIONS_QUEUE_NAME', Event::MIGRATIONS_QUEUE_NAME) => $queueForMigrations,
System::getEnv('_APP_MIGRATIONS_QUEUE_NAME', Event::MIGRATIONS_QUEUE_NAME) => $publisherForMigrations,
};
$failed = $queue->getSize(failed: true);
@@ -2,7 +2,7 @@
namespace Appwrite\Platform\Modules\Health\Http\Health\Queue\Migrations;
use Appwrite\Event\Migration;
use Appwrite\Event\Publisher\Migration as MigrationPublisher;
use Appwrite\Platform\Modules\Health\Http\Health\Queue\Base;
use Appwrite\SDK\AuthType;
use Appwrite\SDK\ContentType;
@@ -42,16 +42,16 @@ class Get extends Base
contentType: ContentType::JSON
))
->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true)
->inject('queueForMigrations')
->inject('publisherForMigrations')
->inject('response')
->callback($this->action(...));
}
public function action(int|string $threshold, Migration $queueForMigrations, Response $response): void
public function action(int|string $threshold, MigrationPublisher $publisherForMigrations, Response $response): void
{
$threshold = (int) $threshold;
$size = $queueForMigrations->getSize();
$size = $publisherForMigrations->getSize();
$this->assertQueueThreshold($size, $threshold);
@@ -2,7 +2,7 @@
namespace Appwrite\Platform\Modules\Health\Http\Health\Queue\StatsResources;
use Appwrite\Event\StatsResources;
use Appwrite\Event\Publisher\StatsResources as StatsResourcesPublisher;
use Appwrite\Platform\Modules\Health\Http\Health\Queue\Base;
use Appwrite\SDK\AuthType;
use Appwrite\SDK\ContentType;
@@ -42,16 +42,16 @@ class Get extends Base
contentType: ContentType::JSON
))
->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true)
->inject('queueForStatsResources')
->inject('publisherForStatsResources')
->inject('response')
->callback($this->action(...));
}
public function action(int|string $threshold, StatsResources $queueForStatsResources, Response $response): void
public function action(int|string $threshold, StatsResourcesPublisher $publisherForStatsResources, Response $response): void
{
$threshold = (int) $threshold;
$size = $queueForStatsResources->getSize();
$size = $publisherForStatsResources->getSize();
$this->assertQueueThreshold($size, $threshold);
@@ -2,7 +2,7 @@
namespace Appwrite\Platform\Tasks;
use Appwrite\Event\StatsResources as EventStatsResources;
use Appwrite\Event\Publisher\StatsResources as StatsResourcesPublisher;
use Appwrite\Platform\Action;
use Utopia\Console;
use Utopia\Database\Database;
@@ -43,11 +43,11 @@ class StatsResources extends Action
->desc('Schedules projects for usage count')
->inject('dbForPlatform')
->inject('logError')
->inject('queueForStatsResources')
->inject('publisherForStatsResources')
->callback($this->action(...));
}
public function action(Database $dbForPlatform, callable $logError, EventStatsResources $queueForStatsResources): void
public function action(Database $dbForPlatform, callable $logError, StatsResourcesPublisher $publisherForStatsResources): void
{
$this->logError = $logError;
$this->dbForPlatform = $dbForPlatform;
@@ -60,7 +60,7 @@ class StatsResources extends Action
$interval = (int) System::getEnv('_APP_STATS_RESOURCES_INTERVAL', '3600');
Console::loop(function () use ($queueForStatsResources) {
Console::loop(function () use ($publisherForStatsResources) {
$last24Hours = (new \DateTime())->sub(\DateInterval::createFromDateString('24 hours'));
/**
@@ -69,10 +69,10 @@ class StatsResources extends Action
$this->foreachDocument($this->dbForPlatform, 'projects', [
Query::greaterThanEqual('accessedAt', DateTime::format($last24Hours)),
Query::equal('region', [System::getEnv('_APP_REGION', 'default')])
], function ($project) use ($queueForStatsResources) {
$queueForStatsResources
->setProject($project)
->trigger();
], function ($project) use ($publisherForStatsResources) {
$publisherForStatsResources->enqueue(new \Appwrite\Event\Message\StatsResources(
project: $project,
));
Console::success('project: ' . $project->getId() . '(' . $project->getSequence() . ')' . ' queued');
});
}, $interval);
+4 -12
View File
@@ -2,9 +2,9 @@
namespace Appwrite\Platform\Workers;
use Appwrite\Event\Message\Execution;
use Exception;
use Utopia\Database\Database;
use Utopia\Database\Document;
use Utopia\Platform\Action;
use Utopia\Queue\Message;
@@ -32,21 +32,13 @@ class Executions extends Action
Message $message,
Database $dbForProject,
): void {
$payload = $message->getPayload() ?? [];
if (empty($payload)) {
throw new Exception('Missing payload');
}
$execution = new Document($payload['execution'] ?? []);
$executionMessage = Execution::fromArray($message->getPayload() ?? []);
$execution = $executionMessage->execution;
if ($execution->isEmpty()) {
throw new Exception('Missing execution');
}
$project = new Document($payload['project'] ?? []);
if ($project->getId() != '6862e6a6000cce69f9da') {
$dbForProject->upsertDocument('executions', $execution);
}
$dbForProject->upsertDocument('executions', $execution);
}
}
+4 -12
View File
@@ -4,6 +4,7 @@ namespace Appwrite\Platform\Workers;
use Ahc\Jwt\JWT;
use Appwrite\Event\Mail;
use Appwrite\Event\Message\Migration;
use Appwrite\Event\Message\Usage as UsageMessage;
use Appwrite\Event\Publisher\Usage as UsagePublisher;
use Appwrite\Event\Realtime;
@@ -129,7 +130,7 @@ class Migrations extends Action
array $plan,
Authorization $authorization,
): void {
$payload = $message->getPayload() ?? [];
$migrationMessage = Migration::fromArray($message->getPayload() ?? []);
$this->getDatabasesDB = $getDatabasesDB;
$this->getProjectDB = $getProjectDB;
@@ -137,12 +138,7 @@ class Migrations extends Action
$this->deviceForFiles = $deviceForFiles;
$this->plan = $plan;
if (empty($payload)) {
throw new Exception('Missing payload');
}
$events = $payload['events'] ?? [];
$migration = new Document($payload['migration'] ?? []);
$migration = $migrationMessage->migration;
if ($migration->isEmpty()) {
throw new \Exception('Migration not found');
@@ -161,11 +157,7 @@ class Migrations extends Action
$this->project = $project;
$this->logError = $logError;
$platform = $payload['platform'] ?? Config::getParam('platform', []);
if (!empty($events)) {
return;
}
$platform = $migrationMessage->platform ?: Config::getParam('platform', []);
try {
$this->processMigration(
@@ -2,6 +2,7 @@
namespace Appwrite\Platform\Workers;
use Appwrite\Event\Message\StatsResources as StatsResourcesMessage;
use Appwrite\Platform\Action;
use Exception;
use Throwable;
@@ -67,8 +68,8 @@ class StatsResources extends Action
{
$this->logError = $logError;
$payload = $message->getPayload() ?? [];
if (empty($payload)) {
$statsResources = StatsResourcesMessage::fromArray($message->getPayload() ?? []);
if ($statsResources->project->isEmpty()) {
throw new Exception('Missing payload');
}