From 6bf61426675c4e0446e7ebe5a5acc926bef91a2a Mon Sep 17 00:00:00 2001 From: Chirag Aggarwal Date: Fri, 10 Apr 2026 13:02:00 +0530 Subject: [PATCH 1/4] refactor: migrate selected queues to publishers --- app/cli.php | 9 +- app/controllers/api/migrations.php | 138 +++++++++--------- app/init/resources.php | 15 ++ app/init/resources/request.php | 9 -- app/init/worker/message.php | 5 - src/Appwrite/Bus/Listeners/Log.php | 17 +-- src/Appwrite/Event/Message/Execution.php | 31 ++++ src/Appwrite/Event/Message/Migration.php | 37 +++++ src/Appwrite/Event/Message/StatsResources.php | 28 ++++ src/Appwrite/Event/Publisher/Execution.php | 27 ++++ src/Appwrite/Event/Publisher/Migration.php | 27 ++++ .../Event/Publisher/StatsResources.php | 33 +++++ .../Health/Http/Health/Queue/Failed/Get.php | 16 +- .../Http/Health/Queue/Migrations/Get.php | 8 +- .../Http/Health/Queue/StatsResources/Get.php | 8 +- .../Platform/Tasks/StatsResources.php | 17 ++- src/Appwrite/Platform/Workers/Executions.php | 13 +- src/Appwrite/Platform/Workers/Migrations.php | 16 +- .../Platform/Workers/StatsResources.php | 5 +- 19 files changed, 316 insertions(+), 143 deletions(-) create mode 100644 src/Appwrite/Event/Message/Execution.php create mode 100644 src/Appwrite/Event/Message/Migration.php create mode 100644 src/Appwrite/Event/Message/StatsResources.php create mode 100644 src/Appwrite/Event/Publisher/Execution.php create mode 100644 src/Appwrite/Event/Publisher/Migration.php create mode 100644 src/Appwrite/Event/Publisher/StatsResources.php diff --git a/app/cli.php b/app/cli.php index 458df2d642..73908510d9 100644 --- a/app/cli.php +++ b/app/cli.php @@ -6,8 +6,8 @@ use Appwrite\Event\Certificate; use Appwrite\Event\Delete; use Appwrite\Event\Event; use Appwrite\Event\Func; +use Appwrite\Event\Publisher\StatsResources as StatsResourcesPublisher; use Appwrite\Event\Publisher\Usage as UsagePublisher; -use Appwrite\Event\StatsResources; use Appwrite\Platform\Appwrite; use Appwrite\Runtimes\Runtimes; use Appwrite\Usage\Context as UsageContext; @@ -253,9 +253,10 @@ $container->set('publisherForUsage', fn (Publisher $publisher) => new UsagePubli $publisher, new Queue(System::getEnv('_APP_STATS_USAGE_QUEUE_NAME', Event::STATS_USAGE_QUEUE_NAME)) ), ['publisher']); -$container->set('queueForStatsResources', function (Publisher $publisher) { - return new StatsResources($publisher); -}, ['publisher']); +$container->set('publisherForStatsResources', fn (Publisher $publisher) => new StatsResourcesPublisher( + $publisher, + new Queue(System::getEnv('_APP_STATS_RESOURCES_QUEUE_NAME', Event::STATS_RESOURCES_QUEUE_NAME)) +), ['publisher']); $container->set('queueForFunctions', function (Publisher $publisher) { return new Func($publisher); }, ['publisher']); diff --git a/app/controllers/api/migrations.php b/app/controllers/api/migrations.php index 45a663fb56..1ca4cafdfe 100644 --- a/app/controllers/api/migrations.php +++ b/app/controllers/api/migrations.php @@ -1,7 +1,8 @@ inject('platform') ->inject('user') ->inject('queueForEvents') - ->inject('queueForMigrations') - ->action(function (array $resources, string $endpoint, string $projectId, string $apiKey, Response $response, Database $dbForProject, Document $project, array $platform, Document $user, Event $queueForEvents, Migration $queueForMigrations) { + ->inject('publisherForMigrations') + ->action(function (array $resources, string $endpoint, string $projectId, string $apiKey, Response $response, Database $dbForProject, Document $project, array $platform, Document $user, Event $queueForEvents, MigrationPublisher $publisherForMigrations) { $migration = $dbForProject->createDocument('migrations', new Document([ '$id' => ID::unique(), 'status' => 'pending', @@ -114,12 +115,12 @@ Http::post('/v1/migrations/appwrite') $queueForEvents->setParam('migrationId', $migration->getId()); // Trigger Transfer - $queueForMigrations - ->setMigration($migration) - ->setProject($project) - ->setPlatform($platform) - ->setUser($user) - ->trigger(); + $publisherForMigrations->enqueue(new MigrationMessage( + project: $project, + migration: $migration, + platform: $platform, + user: $user, + )); $response ->setStatusCode(Response::STATUS_CODE_ACCEPTED) @@ -153,8 +154,8 @@ Http::post('/v1/migrations/firebase') ->inject('platform') ->inject('user') ->inject('queueForEvents') - ->inject('queueForMigrations') - ->action(function (array $resources, string $serviceAccount, Response $response, Database $dbForProject, Document $project, array $platform, Document $user, Event $queueForEvents, Migration $queueForMigrations) { + ->inject('publisherForMigrations') + ->action(function (array $resources, string $serviceAccount, Response $response, Database $dbForProject, Document $project, array $platform, Document $user, Event $queueForEvents, MigrationPublisher $publisherForMigrations) { $serviceAccountData = json_decode($serviceAccount, true); if (empty($serviceAccountData)) { @@ -183,12 +184,12 @@ Http::post('/v1/migrations/firebase') $queueForEvents->setParam('migrationId', $migration->getId()); // Trigger Transfer - $queueForMigrations - ->setMigration($migration) - ->setProject($project) - ->setPlatform($platform) - ->setUser($user) - ->trigger(); + $publisherForMigrations->enqueue(new MigrationMessage( + project: $project, + migration: $migration, + platform: $platform, + user: $user, + )); $response ->setStatusCode(Response::STATUS_CODE_ACCEPTED) @@ -227,8 +228,8 @@ Http::post('/v1/migrations/supabase') ->inject('platform') ->inject('user') ->inject('queueForEvents') - ->inject('queueForMigrations') - ->action(function (array $resources, string $endpoint, string $apiKey, string $databaseHost, string $username, string $password, int $port, Response $response, Database $dbForProject, Document $project, array $platform, Document $user, Event $queueForEvents, Migration $queueForMigrations) { + ->inject('publisherForMigrations') + ->action(function (array $resources, string $endpoint, string $apiKey, string $databaseHost, string $username, string $password, int $port, Response $response, Database $dbForProject, Document $project, array $platform, Document $user, Event $queueForEvents, MigrationPublisher $publisherForMigrations) { $migration = $dbForProject->createDocument('migrations', new Document([ '$id' => ID::unique(), 'status' => 'pending', @@ -252,12 +253,12 @@ Http::post('/v1/migrations/supabase') $queueForEvents->setParam('migrationId', $migration->getId()); // Trigger Transfer - $queueForMigrations - ->setMigration($migration) - ->setProject($project) - ->setPlatform($platform) - ->setUser($user) - ->trigger(); + $publisherForMigrations->enqueue(new MigrationMessage( + project: $project, + migration: $migration, + platform: $platform, + user: $user, + )); $response ->setStatusCode(Response::STATUS_CODE_ACCEPTED) @@ -297,8 +298,8 @@ Http::post('/v1/migrations/nhost') ->inject('platform') ->inject('user') ->inject('queueForEvents') - ->inject('queueForMigrations') - ->action(function (array $resources, string $subdomain, string $region, string $adminSecret, string $database, string $username, string $password, int $port, Response $response, Database $dbForProject, Document $project, array $platform, Document $user, Event $queueForEvents, Migration $queueForMigrations) { + ->inject('publisherForMigrations') + ->action(function (array $resources, string $subdomain, string $region, string $adminSecret, string $database, string $username, string $password, int $port, Response $response, Database $dbForProject, Document $project, array $platform, Document $user, Event $queueForEvents, MigrationPublisher $publisherForMigrations) { $migration = $dbForProject->createDocument('migrations', new Document([ '$id' => ID::unique(), 'status' => 'pending', @@ -323,12 +324,12 @@ Http::post('/v1/migrations/nhost') $queueForEvents->setParam('migrationId', $migration->getId()); // Trigger Transfer - $queueForMigrations - ->setMigration($migration) - ->setProject($project) - ->setPlatform($platform) - ->setUser($user) - ->trigger(); + $publisherForMigrations->enqueue(new MigrationMessage( + project: $project, + migration: $migration, + platform: $platform, + user: $user, + )); $response ->setStatusCode(Response::STATUS_CODE_ACCEPTED) @@ -368,7 +369,7 @@ Http::post('/v1/migrations/csv/imports') ->inject('deviceForFiles') ->inject('deviceForMigrations') ->inject('queueForEvents') - ->inject('queueForMigrations') + ->inject('publisherForMigrations') ->action(function ( string $bucketId, string $fileId, @@ -383,7 +384,7 @@ Http::post('/v1/migrations/csv/imports') Device $deviceForFiles, Device $deviceForMigrations, Event $queueForEvents, - Migration $queueForMigrations + MigrationPublisher $publisherForMigrations ) { $bucket = $authorization->skip(function () use ($internalFile, $dbForPlatform, $dbForProject, $bucketId) { if ($internalFile) { @@ -479,11 +480,10 @@ Http::post('/v1/migrations/csv/imports') $queueForEvents->setParam('migrationId', $migration->getId()); - $queueForMigrations - ->setMigration($migration) - ->setProject($project) - ->setProject($project) - ->trigger(); + $publisherForMigrations->enqueue(new MigrationMessage( + project: $project, + migration: $migration, + )); $response ->setStatusCode(Response::STATUS_CODE_ACCEPTED) @@ -526,7 +526,7 @@ Http::post('/v1/migrations/csv/exports') ->inject('project') ->inject('platform') ->inject('queueForEvents') - ->inject('queueForMigrations') + ->inject('publisherForMigrations') ->action(function ( string $resourceId, string $filename, @@ -545,7 +545,7 @@ Http::post('/v1/migrations/csv/exports') Document $project, array $platform, Event $queueForEvents, - Migration $queueForMigrations + MigrationPublisher $publisherForMigrations ) { try { $parsedQueries = Query::parseQueries($queries); @@ -630,11 +630,11 @@ Http::post('/v1/migrations/csv/exports') $queueForEvents->setParam('migrationId', $migration->getId()); - $queueForMigrations - ->setMigration($migration) - ->setProject($project) - ->setPlatform($platform) - ->trigger(); + $publisherForMigrations->enqueue(new MigrationMessage( + project: $project, + migration: $migration, + platform: $platform, + )); $response ->setStatusCode(Response::STATUS_CODE_ACCEPTED) @@ -673,7 +673,7 @@ Http::post('/v1/migrations/json/imports') ->inject('deviceForFiles') ->inject('deviceForMigrations') ->inject('queueForEvents') - ->inject('queueForMigrations') + ->inject('publisherForMigrations') ->action(function ( string $bucketId, string $fileId, @@ -688,7 +688,7 @@ Http::post('/v1/migrations/json/imports') Device $deviceForFiles, Device $deviceForMigrations, Event $queueForEvents, - Migration $queueForMigrations + MigrationPublisher $publisherForMigrations ) { $bucket = $authorization->skip(function () use ($internalFile, $dbForPlatform, $dbForProject, $bucketId) { if ($internalFile) { @@ -783,11 +783,11 @@ Http::post('/v1/migrations/json/imports') $queueForEvents->setParam('migrationId', $migration->getId()); - $queueForMigrations - ->setMigration($migration) - ->setProject($project) - ->setPlatform($platform) - ->trigger(); + $publisherForMigrations->enqueue(new MigrationMessage( + project: $project, + migration: $migration, + platform: $platform, + )); $response ->setStatusCode(Response::STATUS_CODE_ACCEPTED) @@ -826,7 +826,7 @@ Http::post('/v1/migrations/json/exports') ->inject('project') ->inject('platform') ->inject('queueForEvents') - ->inject('queueForMigrations') + ->inject('publisherForMigrations') ->action(function ( string $resourceId, string $filename, @@ -841,7 +841,7 @@ Http::post('/v1/migrations/json/exports') Document $project, array $platform, Event $queueForEvents, - Migration $queueForMigrations + MigrationPublisher $publisherForMigrations ) { try { $parsedQueries = Query::parseQueries($queries); @@ -915,11 +915,11 @@ Http::post('/v1/migrations/json/exports') $queueForEvents->setParam('migrationId', $migration->getId()); - $queueForMigrations - ->setMigration($migration) - ->setProject($project) - ->setPlatform($platform) - ->trigger(); + $publisherForMigrations->enqueue(new MigrationMessage( + project: $project, + migration: $migration, + platform: $platform, + )); $response ->setStatusCode(Response::STATUS_CODE_ACCEPTED) @@ -1217,8 +1217,8 @@ Http::patch('/v1/migrations/:migrationId') ->inject('project') ->inject('platform') ->inject('user') - ->inject('queueForMigrations') - ->action(function (string $migrationId, Response $response, Database $dbForProject, Document $project, array $platform, Document $user, Migration $queueForMigrations) { + ->inject('publisherForMigrations') + ->action(function (string $migrationId, Response $response, Database $dbForProject, Document $project, array $platform, Document $user, MigrationPublisher $publisherForMigrations) { $migration = $dbForProject->getDocument('migrations', $migrationId); if ($migration->isEmpty()) { @@ -1234,12 +1234,12 @@ Http::patch('/v1/migrations/:migrationId') ->setAttribute('dateUpdated', \time()); // Trigger Migration - $queueForMigrations - ->setMigration($migration) - ->setProject($project) - ->setPlatform($platform) - ->setUser($user) - ->trigger(); + $publisherForMigrations->enqueue(new MigrationMessage( + project: $project, + migration: $migration, + platform: $platform, + user: $user, + )); $response->noContent(); }); diff --git a/app/init/resources.php b/app/init/resources.php index fdca88c30e..8787f9759f 100644 --- a/app/init/resources.php +++ b/app/init/resources.php @@ -1,6 +1,9 @@ set('publisherForUsage', fn (Publisher $publisher) => new UsagePubli $publisher, new Queue(System::getEnv('_APP_STATS_USAGE_QUEUE_NAME', Event::STATS_USAGE_QUEUE_NAME)) ), ['publisher']); +$container->set('publisherForExecutions', fn (Publisher $publisher) => new ExecutionPublisher( + $publisher, + new Queue(Event::EXECUTIONS_QUEUE_NAME) +), ['publisher']); +$container->set('publisherForMigrations', fn (Publisher $publisher) => new MigrationPublisher( + $publisher, + new Queue(System::getEnv('_APP_MIGRATIONS_QUEUE_NAME', Event::MIGRATIONS_QUEUE_NAME)) +), ['publisher']); +$container->set('publisherForStatsResources', fn (Publisher $publisher) => new StatsResourcesPublisher( + $publisher, + new Queue(System::getEnv('_APP_STATS_RESOURCES_QUEUE_NAME', Event::STATS_RESOURCES_QUEUE_NAME)) +), ['publisher']); /** * Platform configuration diff --git a/app/init/resources/request.php b/app/init/resources/request.php index 156e151501..63e58e92f7 100644 --- a/app/init/resources/request.php +++ b/app/init/resources/request.php @@ -13,10 +13,8 @@ use Appwrite\Event\Event; use Appwrite\Event\Func; use Appwrite\Event\Mail; use Appwrite\Event\Messaging; -use Appwrite\Event\Migration; use Appwrite\Event\Realtime; use Appwrite\Event\Screenshot; -use Appwrite\Event\StatsResources; use Appwrite\Event\Webhook; use Appwrite\Extend\Exception; use Appwrite\Functions\EventProcessor; @@ -163,13 +161,6 @@ return function (Container $container): void { $container->set('queueForCertificates', function (Publisher $publisher) { return new Certificate($publisher); }, ['publisher']); - $container->set('queueForMigrations', function (Publisher $publisher) { - return new Migration($publisher); - }, ['publisher']); - $container->set('queueForStatsResources', function (Publisher $publisher) { - return new StatsResources($publisher); - }, ['publisher']); - $container->set('dbForPlatform', function (Group $pools, Cache $cache, Authorization $authorization) { $adapter = new DatabasePool($pools->get('console')); $database = new Database($adapter, $cache); diff --git a/app/init/worker/message.php b/app/init/worker/message.php index 95477088ce..f893c84858 100644 --- a/app/init/worker/message.php +++ b/app/init/worker/message.php @@ -9,7 +9,6 @@ use Appwrite\Event\Event; use Appwrite\Event\Func; use Appwrite\Event\Mail; use Appwrite\Event\Messaging; -use Appwrite\Event\Migration; use Appwrite\Event\Realtime; use Appwrite\Event\Screenshot; use Appwrite\Event\Webhook; @@ -344,10 +343,6 @@ return function (Container $container): void { return new Certificate($publisher); }, ['publisher']); - $container->set('queueForMigrations', function (Publisher $publisher) { - return new Migration($publisher); - }, ['publisher']); - $container->set('deviceForSites', function (Document $project, Telemetry $telemetry) { return new TelemetryDevice($telemetry, getDevice(APP_STORAGE_SITES . '/app-' . $project->getId())); }, ['project', 'telemetry']); diff --git a/src/Appwrite/Bus/Listeners/Log.php b/src/Appwrite/Bus/Listeners/Log.php index 9bd539d5fe..076ed5c02d 100644 --- a/src/Appwrite/Bus/Listeners/Log.php +++ b/src/Appwrite/Bus/Listeners/Log.php @@ -3,10 +3,10 @@ namespace Appwrite\Bus\Listeners; use Appwrite\Bus\Events\ExecutionCompleted; -use Appwrite\Event\Execution; +use Appwrite\Event\Message\Execution as ExecutionMessage; +use Appwrite\Event\Publisher\Execution as ExecutionPublisher; use Utopia\Bus\Listener; use Utopia\Database\Document; -use Utopia\Queue\Publisher; class Log extends Listener { @@ -24,16 +24,15 @@ class Log extends Listener { $this ->desc('Persists execution logs to database via queue') - ->inject('publisher') + ->inject('publisherForExecutions') ->callback($this->handle(...)); } - public function handle(ExecutionCompleted $event, Publisher $publisher): void + public function handle(ExecutionCompleted $event, ExecutionPublisher $publisherForExecutions): void { - $queueForExecutions = new Execution($publisher); - $queueForExecutions - ->setExecution(new Document($event->execution)) - ->setProject(new Document($event->project)) - ->trigger(); + $publisherForExecutions->enqueue(new ExecutionMessage( + project: new Document($event->project), + execution: new Document($event->execution), + )); } } diff --git a/src/Appwrite/Event/Message/Execution.php b/src/Appwrite/Event/Message/Execution.php new file mode 100644 index 0000000000..d957c184e4 --- /dev/null +++ b/src/Appwrite/Event/Message/Execution.php @@ -0,0 +1,31 @@ + $this->project->getArrayCopy(), + 'execution' => $this->execution->getArrayCopy(), + ]; + } + + public static function fromArray(array $data): static + { + /** @phpstan-ignore new.static (subclass constructors are backwards-compatible via optional params) */ + return new static( + project: new Document($data['project'] ?? []), + execution: new Document($data['execution'] ?? []), + ); + } +} diff --git a/src/Appwrite/Event/Message/Migration.php b/src/Appwrite/Event/Message/Migration.php new file mode 100644 index 0000000000..d884cc8192 --- /dev/null +++ b/src/Appwrite/Event/Message/Migration.php @@ -0,0 +1,37 @@ + $this->project->getArrayCopy(), + 'migration' => $this->migration->getArrayCopy(), + 'platform' => $this->platform, + 'user' => $this->user?->getArrayCopy() ?? [], + ]; + } + + public static function fromArray(array $data): static + { + /** @phpstan-ignore new.static (subclass constructors are backwards-compatible via optional params) */ + return new static( + project: new Document($data['project'] ?? []), + migration: new Document($data['migration'] ?? []), + platform: $data['platform'] ?? [], + user: new Document($data['user'] ?? []), + ); + } +} diff --git a/src/Appwrite/Event/Message/StatsResources.php b/src/Appwrite/Event/Message/StatsResources.php new file mode 100644 index 0000000000..0370b3c616 --- /dev/null +++ b/src/Appwrite/Event/Message/StatsResources.php @@ -0,0 +1,28 @@ + $this->project->getArrayCopy(), + ]; + } + + public static function fromArray(array $data): static + { + /** @phpstan-ignore new.static (subclass constructors are backwards-compatible via optional params) */ + return new static( + project: new Document($data['project'] ?? []), + ); + } +} diff --git a/src/Appwrite/Event/Publisher/Execution.php b/src/Appwrite/Event/Publisher/Execution.php new file mode 100644 index 0000000000..05ea28d540 --- /dev/null +++ b/src/Appwrite/Event/Publisher/Execution.php @@ -0,0 +1,27 @@ +publish($this->queue, $message); + } + + public function getSize(bool $failed = false): int + { + return $this->getQueueSize($this->queue, $failed); + } +} diff --git a/src/Appwrite/Event/Publisher/Migration.php b/src/Appwrite/Event/Publisher/Migration.php new file mode 100644 index 0000000000..fc455a7e95 --- /dev/null +++ b/src/Appwrite/Event/Publisher/Migration.php @@ -0,0 +1,27 @@ +publish($this->queue, $message); + } + + public function getSize(bool $failed = false): int + { + return $this->getQueueSize($this->queue, $failed); + } +} diff --git a/src/Appwrite/Event/Publisher/StatsResources.php b/src/Appwrite/Event/Publisher/StatsResources.php new file mode 100644 index 0000000000..a0e38483f0 --- /dev/null +++ b/src/Appwrite/Event/Publisher/StatsResources.php @@ -0,0 +1,33 @@ +publish($this->queue, $message); + } catch (\Throwable $th) { + Console::error('[StatsResources] Failed to publish stats resources message: ' . $th->getMessage()); + return false; + } + } + + public function getSize(bool $failed = false): int + { + return $this->getQueueSize($this->queue, $failed); + } +} diff --git a/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Failed/Get.php b/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Failed/Get.php index cb3640746f..1f7cc0bf33 100644 --- a/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Failed/Get.php +++ b/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Failed/Get.php @@ -11,10 +11,10 @@ use Appwrite\Event\Event; use Appwrite\Event\Func; use Appwrite\Event\Mail; use Appwrite\Event\Messaging; -use Appwrite\Event\Migration; +use Appwrite\Event\Publisher\Migration as MigrationPublisher; +use Appwrite\Event\Publisher\StatsResources as StatsResourcesPublisher; use Appwrite\Event\Publisher\Usage as UsagePublisher; use Appwrite\Event\Screenshot; -use Appwrite\Event\StatsResources; use Appwrite\Event\Webhook; use Appwrite\Platform\Modules\Health\Http\Health\Queue\Base; use Appwrite\SDK\AuthType; @@ -78,13 +78,13 @@ class Get extends Base ->inject('queueForAudits') ->inject('queueForMails') ->inject('queueForFunctions') - ->inject('queueForStatsResources') + ->inject('publisherForStatsResources') ->inject('publisherForUsage') ->inject('queueForWebhooks') ->inject('queueForCertificates') ->inject('queueForBuilds') ->inject('queueForMessaging') - ->inject('queueForMigrations') + ->inject('publisherForMigrations') ->inject('queueForScreenshots') ->callback($this->action(...)); } @@ -98,13 +98,13 @@ class Get extends Base Audit $queueForAudits, Mail $queueForMails, Func $queueForFunctions, - StatsResources $queueForStatsResources, + StatsResourcesPublisher $publisherForStatsResources, UsagePublisher $publisherForUsage, Webhook $queueForWebhooks, Certificate $queueForCertificates, Build $queueForBuilds, Messaging $queueForMessaging, - Migration $queueForMigrations, + MigrationPublisher $publisherForMigrations, Screenshot $queueForScreenshots, ): void { $threshold = (int) $threshold; @@ -115,14 +115,14 @@ class Get extends Base System::getEnv('_APP_AUDITS_QUEUE_NAME', Event::AUDITS_QUEUE_NAME) => $queueForAudits, System::getEnv('_APP_MAILS_QUEUE_NAME', Event::MAILS_QUEUE_NAME) => $queueForMails, System::getEnv('_APP_FUNCTIONS_QUEUE_NAME', Event::FUNCTIONS_QUEUE_NAME) => $queueForFunctions, - System::getEnv('_APP_STATS_RESOURCES_QUEUE_NAME', Event::STATS_RESOURCES_QUEUE_NAME) => $queueForStatsResources, + System::getEnv('_APP_STATS_RESOURCES_QUEUE_NAME', Event::STATS_RESOURCES_QUEUE_NAME) => $publisherForStatsResources, System::getEnv('_APP_STATS_USAGE_QUEUE_NAME', Event::STATS_USAGE_QUEUE_NAME) => $publisherForUsage, System::getEnv('_APP_WEBHOOK_QUEUE_NAME', Event::WEBHOOK_QUEUE_NAME) => $queueForWebhooks, System::getEnv('_APP_CERTIFICATES_QUEUE_NAME', Event::CERTIFICATES_QUEUE_NAME) => $queueForCertificates, System::getEnv('_APP_BUILDS_QUEUE_NAME', Event::BUILDS_QUEUE_NAME) => $queueForBuilds, System::getEnv('_APP_SCREENSHOTS_QUEUE_NAME', Event::SCREENSHOTS_QUEUE_NAME) => $queueForScreenshots, System::getEnv('_APP_MESSAGING_QUEUE_NAME', Event::MESSAGING_QUEUE_NAME) => $queueForMessaging, - System::getEnv('_APP_MIGRATIONS_QUEUE_NAME', Event::MIGRATIONS_QUEUE_NAME) => $queueForMigrations, + System::getEnv('_APP_MIGRATIONS_QUEUE_NAME', Event::MIGRATIONS_QUEUE_NAME) => $publisherForMigrations, }; $failed = $queue->getSize(failed: true); diff --git a/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Migrations/Get.php b/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Migrations/Get.php index 4faca7d8a4..70bef3562b 100644 --- a/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Migrations/Get.php +++ b/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Migrations/Get.php @@ -2,7 +2,7 @@ namespace Appwrite\Platform\Modules\Health\Http\Health\Queue\Migrations; -use Appwrite\Event\Migration; +use Appwrite\Event\Publisher\Migration as MigrationPublisher; use Appwrite\Platform\Modules\Health\Http\Health\Queue\Base; use Appwrite\SDK\AuthType; use Appwrite\SDK\ContentType; @@ -42,16 +42,16 @@ class Get extends Base contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('queueForMigrations') + ->inject('publisherForMigrations') ->inject('response') ->callback($this->action(...)); } - public function action(int|string $threshold, Migration $queueForMigrations, Response $response): void + public function action(int|string $threshold, MigrationPublisher $publisherForMigrations, Response $response): void { $threshold = (int) $threshold; - $size = $queueForMigrations->getSize(); + $size = $publisherForMigrations->getSize(); $this->assertQueueThreshold($size, $threshold); diff --git a/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/StatsResources/Get.php b/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/StatsResources/Get.php index 57605298fd..5ab0aa2532 100644 --- a/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/StatsResources/Get.php +++ b/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/StatsResources/Get.php @@ -2,7 +2,7 @@ namespace Appwrite\Platform\Modules\Health\Http\Health\Queue\StatsResources; -use Appwrite\Event\StatsResources; +use Appwrite\Event\Publisher\StatsResources as StatsResourcesPublisher; use Appwrite\Platform\Modules\Health\Http\Health\Queue\Base; use Appwrite\SDK\AuthType; use Appwrite\SDK\ContentType; @@ -42,16 +42,16 @@ class Get extends Base contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('queueForStatsResources') + ->inject('publisherForStatsResources') ->inject('response') ->callback($this->action(...)); } - public function action(int|string $threshold, StatsResources $queueForStatsResources, Response $response): void + public function action(int|string $threshold, StatsResourcesPublisher $publisherForStatsResources, Response $response): void { $threshold = (int) $threshold; - $size = $queueForStatsResources->getSize(); + $size = $publisherForStatsResources->getSize(); $this->assertQueueThreshold($size, $threshold); diff --git a/src/Appwrite/Platform/Tasks/StatsResources.php b/src/Appwrite/Platform/Tasks/StatsResources.php index 220e377619..d6236f07d6 100644 --- a/src/Appwrite/Platform/Tasks/StatsResources.php +++ b/src/Appwrite/Platform/Tasks/StatsResources.php @@ -2,7 +2,8 @@ namespace Appwrite\Platform\Tasks; -use Appwrite\Event\StatsResources as EventStatsResources; +use Appwrite\Event\Message\StatsResources as StatsResourcesMessage; +use Appwrite\Event\Publisher\StatsResources as StatsResourcesPublisher; use Appwrite\Platform\Action; use Utopia\Console; use Utopia\Database\Database; @@ -43,11 +44,11 @@ class StatsResources extends Action ->desc('Schedules projects for usage count') ->inject('dbForPlatform') ->inject('logError') - ->inject('queueForStatsResources') + ->inject('publisherForStatsResources') ->callback($this->action(...)); } - public function action(Database $dbForPlatform, callable $logError, EventStatsResources $queueForStatsResources): void + public function action(Database $dbForPlatform, callable $logError, StatsResourcesPublisher $publisherForStatsResources): void { $this->logError = $logError; $this->dbForPlatform = $dbForPlatform; @@ -60,7 +61,7 @@ class StatsResources extends Action $interval = (int) System::getEnv('_APP_STATS_RESOURCES_INTERVAL', '3600'); - Console::loop(function () use ($queueForStatsResources) { + Console::loop(function () use ($publisherForStatsResources) { $last24Hours = (new \DateTime())->sub(\DateInterval::createFromDateString('24 hours')); /** @@ -69,10 +70,10 @@ class StatsResources extends Action $this->foreachDocument($this->dbForPlatform, 'projects', [ Query::greaterThanEqual('accessedAt', DateTime::format($last24Hours)), Query::equal('region', [System::getEnv('_APP_REGION', 'default')]) - ], function ($project) use ($queueForStatsResources) { - $queueForStatsResources - ->setProject($project) - ->trigger(); + ], function ($project) use ($publisherForStatsResources) { + $publisherForStatsResources->enqueue(new StatsResourcesMessage( + project: $project, + )); Console::success('project: ' . $project->getId() . '(' . $project->getSequence() . ')' . ' queued'); }); }, $interval); diff --git a/src/Appwrite/Platform/Workers/Executions.php b/src/Appwrite/Platform/Workers/Executions.php index d874e26267..9a1645e55d 100644 --- a/src/Appwrite/Platform/Workers/Executions.php +++ b/src/Appwrite/Platform/Workers/Executions.php @@ -2,9 +2,9 @@ namespace Appwrite\Platform\Workers; +use Appwrite\Event\Message\Execution as ExecutionMessage; use Exception; use Utopia\Database\Database; -use Utopia\Database\Document; use Utopia\Platform\Action; use Utopia\Queue\Message; @@ -32,19 +32,14 @@ class Executions extends Action Message $message, Database $dbForProject, ): void { - $payload = $message->getPayload() ?? []; - - if (empty($payload)) { - throw new Exception('Missing payload'); - } - - $execution = new Document($payload['execution'] ?? []); + $executionMessage = ExecutionMessage::fromArray($message->getPayload() ?? []); + $execution = $executionMessage->execution; if ($execution->isEmpty()) { throw new Exception('Missing execution'); } - $project = new Document($payload['project'] ?? []); + $project = $executionMessage->project; if ($project->getId() != '6862e6a6000cce69f9da') { $dbForProject->upsertDocument('executions', $execution); } diff --git a/src/Appwrite/Platform/Workers/Migrations.php b/src/Appwrite/Platform/Workers/Migrations.php index 43f5c97ba6..198e3f568d 100644 --- a/src/Appwrite/Platform/Workers/Migrations.php +++ b/src/Appwrite/Platform/Workers/Migrations.php @@ -4,6 +4,7 @@ namespace Appwrite\Platform\Workers; use Ahc\Jwt\JWT; use Appwrite\Event\Mail; +use Appwrite\Event\Message\Migration as MigrationMessage; use Appwrite\Event\Message\Usage as UsageMessage; use Appwrite\Event\Publisher\Usage as UsagePublisher; use Appwrite\Event\Realtime; @@ -129,7 +130,7 @@ class Migrations extends Action array $plan, Authorization $authorization, ): void { - $payload = $message->getPayload() ?? []; + $migrationMessage = MigrationMessage::fromArray($message->getPayload() ?? []); $this->getDatabasesDB = $getDatabasesDB; $this->getProjectDB = $getProjectDB; @@ -137,12 +138,7 @@ class Migrations extends Action $this->deviceForFiles = $deviceForFiles; $this->plan = $plan; - if (empty($payload)) { - throw new Exception('Missing payload'); - } - - $events = $payload['events'] ?? []; - $migration = new Document($payload['migration'] ?? []); + $migration = $migrationMessage->migration; if ($migration->isEmpty()) { throw new \Exception('Migration not found'); @@ -161,11 +157,7 @@ class Migrations extends Action $this->project = $project; $this->logError = $logError; - $platform = $payload['platform'] ?? Config::getParam('platform', []); - - if (!empty($events)) { - return; - } + $platform = $migrationMessage->platform ?: Config::getParam('platform', []); try { $this->processMigration( diff --git a/src/Appwrite/Platform/Workers/StatsResources.php b/src/Appwrite/Platform/Workers/StatsResources.php index b2823d3722..db214f5d32 100644 --- a/src/Appwrite/Platform/Workers/StatsResources.php +++ b/src/Appwrite/Platform/Workers/StatsResources.php @@ -2,6 +2,7 @@ namespace Appwrite\Platform\Workers; +use Appwrite\Event\Message\StatsResources as StatsResourcesMessage; use Appwrite\Platform\Action; use Exception; use Throwable; @@ -67,8 +68,8 @@ class StatsResources extends Action { $this->logError = $logError; - $payload = $message->getPayload() ?? []; - if (empty($payload)) { + $statsResources = StatsResourcesMessage::fromArray($message->getPayload() ?? []); + if ($statsResources->project->isEmpty()) { throw new Exception('Missing payload'); } From 82ec75d58258ba3f5e2334375078023c9b981cc2 Mon Sep 17 00:00:00 2001 From: Chirag Aggarwal Date: Fri, 10 Apr 2026 13:12:08 +0530 Subject: [PATCH 2/4] chore: address PR review feedback --- app/controllers/api/migrations.php | 20 +++++-------------- app/init/resources.php | 2 +- src/Appwrite/Event/Message/Migration.php | 3 --- .../Event/Publisher/StatsResources.php | 1 + src/Appwrite/Platform/Workers/Executions.php | 8 +++++++- 5 files changed, 14 insertions(+), 20 deletions(-) diff --git a/app/controllers/api/migrations.php b/app/controllers/api/migrations.php index 1ca4cafdfe..4c541d2817 100644 --- a/app/controllers/api/migrations.php +++ b/app/controllers/api/migrations.php @@ -91,10 +91,9 @@ Http::post('/v1/migrations/appwrite') ->inject('dbForProject') ->inject('project') ->inject('platform') - ->inject('user') ->inject('queueForEvents') ->inject('publisherForMigrations') - ->action(function (array $resources, string $endpoint, string $projectId, string $apiKey, Response $response, Database $dbForProject, Document $project, array $platform, Document $user, Event $queueForEvents, MigrationPublisher $publisherForMigrations) { + ->action(function (array $resources, string $endpoint, string $projectId, string $apiKey, Response $response, Database $dbForProject, Document $project, array $platform, Event $queueForEvents, MigrationPublisher $publisherForMigrations) { $migration = $dbForProject->createDocument('migrations', new Document([ '$id' => ID::unique(), 'status' => 'pending', @@ -119,7 +118,6 @@ Http::post('/v1/migrations/appwrite') project: $project, migration: $migration, platform: $platform, - user: $user, )); $response @@ -152,10 +150,9 @@ Http::post('/v1/migrations/firebase') ->inject('dbForProject') ->inject('project') ->inject('platform') - ->inject('user') ->inject('queueForEvents') ->inject('publisherForMigrations') - ->action(function (array $resources, string $serviceAccount, Response $response, Database $dbForProject, Document $project, array $platform, Document $user, Event $queueForEvents, MigrationPublisher $publisherForMigrations) { + ->action(function (array $resources, string $serviceAccount, Response $response, Database $dbForProject, Document $project, array $platform, Event $queueForEvents, MigrationPublisher $publisherForMigrations) { $serviceAccountData = json_decode($serviceAccount, true); if (empty($serviceAccountData)) { @@ -188,7 +185,6 @@ Http::post('/v1/migrations/firebase') project: $project, migration: $migration, platform: $platform, - user: $user, )); $response @@ -226,10 +222,9 @@ Http::post('/v1/migrations/supabase') ->inject('dbForProject') ->inject('project') ->inject('platform') - ->inject('user') ->inject('queueForEvents') ->inject('publisherForMigrations') - ->action(function (array $resources, string $endpoint, string $apiKey, string $databaseHost, string $username, string $password, int $port, Response $response, Database $dbForProject, Document $project, array $platform, Document $user, Event $queueForEvents, MigrationPublisher $publisherForMigrations) { + ->action(function (array $resources, string $endpoint, string $apiKey, string $databaseHost, string $username, string $password, int $port, Response $response, Database $dbForProject, Document $project, array $platform, Event $queueForEvents, MigrationPublisher $publisherForMigrations) { $migration = $dbForProject->createDocument('migrations', new Document([ '$id' => ID::unique(), 'status' => 'pending', @@ -257,7 +252,6 @@ Http::post('/v1/migrations/supabase') project: $project, migration: $migration, platform: $platform, - user: $user, )); $response @@ -296,10 +290,9 @@ Http::post('/v1/migrations/nhost') ->inject('dbForProject') ->inject('project') ->inject('platform') - ->inject('user') ->inject('queueForEvents') ->inject('publisherForMigrations') - ->action(function (array $resources, string $subdomain, string $region, string $adminSecret, string $database, string $username, string $password, int $port, Response $response, Database $dbForProject, Document $project, array $platform, Document $user, Event $queueForEvents, MigrationPublisher $publisherForMigrations) { + ->action(function (array $resources, string $subdomain, string $region, string $adminSecret, string $database, string $username, string $password, int $port, Response $response, Database $dbForProject, Document $project, array $platform, Event $queueForEvents, MigrationPublisher $publisherForMigrations) { $migration = $dbForProject->createDocument('migrations', new Document([ '$id' => ID::unique(), 'status' => 'pending', @@ -328,7 +321,6 @@ Http::post('/v1/migrations/nhost') project: $project, migration: $migration, platform: $platform, - user: $user, )); $response @@ -1216,9 +1208,8 @@ Http::patch('/v1/migrations/:migrationId') ->inject('dbForProject') ->inject('project') ->inject('platform') - ->inject('user') ->inject('publisherForMigrations') - ->action(function (string $migrationId, Response $response, Database $dbForProject, Document $project, array $platform, Document $user, MigrationPublisher $publisherForMigrations) { + ->action(function (string $migrationId, Response $response, Database $dbForProject, Document $project, array $platform, MigrationPublisher $publisherForMigrations) { $migration = $dbForProject->getDocument('migrations', $migrationId); if ($migration->isEmpty()) { @@ -1238,7 +1229,6 @@ Http::patch('/v1/migrations/:migrationId') project: $project, migration: $migration, platform: $platform, - user: $user, )); $response->noContent(); diff --git a/app/init/resources.php b/app/init/resources.php index 8787f9759f..32d6e0a45f 100644 --- a/app/init/resources.php +++ b/app/init/resources.php @@ -87,7 +87,7 @@ $container->set('publisherForUsage', fn (Publisher $publisher) => new UsagePubli ), ['publisher']); $container->set('publisherForExecutions', fn (Publisher $publisher) => new ExecutionPublisher( $publisher, - new Queue(Event::EXECUTIONS_QUEUE_NAME) + new Queue(System::getEnv('_APP_EXECUTIONS_QUEUE_NAME', Event::EXECUTIONS_QUEUE_NAME)) ), ['publisher']); $container->set('publisherForMigrations', fn (Publisher $publisher) => new MigrationPublisher( $publisher, diff --git a/src/Appwrite/Event/Message/Migration.php b/src/Appwrite/Event/Message/Migration.php index d884cc8192..ed040670e6 100644 --- a/src/Appwrite/Event/Message/Migration.php +++ b/src/Appwrite/Event/Message/Migration.php @@ -10,7 +10,6 @@ class Migration extends Base public Document $project, public Document $migration, public array $platform = [], - public ?Document $user = null, ) { } @@ -20,7 +19,6 @@ class Migration extends Base 'project' => $this->project->getArrayCopy(), 'migration' => $this->migration->getArrayCopy(), 'platform' => $this->platform, - 'user' => $this->user?->getArrayCopy() ?? [], ]; } @@ -31,7 +29,6 @@ class Migration extends Base project: new Document($data['project'] ?? []), migration: new Document($data['migration'] ?? []), platform: $data['platform'] ?? [], - user: new Document($data['user'] ?? []), ); } } diff --git a/src/Appwrite/Event/Publisher/StatsResources.php b/src/Appwrite/Event/Publisher/StatsResources.php index a0e38483f0..4c04583b15 100644 --- a/src/Appwrite/Event/Publisher/StatsResources.php +++ b/src/Appwrite/Event/Publisher/StatsResources.php @@ -18,6 +18,7 @@ readonly class StatsResources extends Base public function enqueue(StatsResourcesMessage $message): string|bool { + // Resource stats are best-effort; publishing failures should not interrupt the scheduler loop. try { return $this->publish($this->queue, $message); } catch (\Throwable $th) { diff --git a/src/Appwrite/Platform/Workers/Executions.php b/src/Appwrite/Platform/Workers/Executions.php index 9a1645e55d..f440429356 100644 --- a/src/Appwrite/Platform/Workers/Executions.php +++ b/src/Appwrite/Platform/Workers/Executions.php @@ -10,6 +10,12 @@ use Utopia\Queue\Message; class Executions extends Action { + /** + * Keep skipping execution upserts for this internal project. + * The HTTP execution flow applies the same exclusion separately. + */ + private const string EXCLUDED_PROJECT_ID = '6862e6a6000cce69f9da'; + public static function getName(): string { return 'executions'; @@ -40,7 +46,7 @@ class Executions extends Action } $project = $executionMessage->project; - if ($project->getId() != '6862e6a6000cce69f9da') { + if ($project->getId() !== self::EXCLUDED_PROJECT_ID) { $dbForProject->upsertDocument('executions', $execution); } } From 7282c5d51f66a42eac1c23109b10d9d3ff39866f Mon Sep 17 00:00:00 2001 From: Chirag Aggarwal Date: Fri, 10 Apr 2026 13:25:32 +0530 Subject: [PATCH 3/4] chore: remove unused execution exclusion --- .../Modules/Functions/Http/Executions/Create.php | 12 +++--------- src/Appwrite/Platform/Workers/Executions.php | 10 +--------- 2 files changed, 4 insertions(+), 18 deletions(-) diff --git a/src/Appwrite/Platform/Modules/Functions/Http/Executions/Create.php b/src/Appwrite/Platform/Modules/Functions/Http/Executions/Create.php index 37292ce984..72474b03f9 100644 --- a/src/Appwrite/Platform/Modules/Functions/Http/Executions/Create.php +++ b/src/Appwrite/Platform/Modules/Functions/Http/Executions/Create.php @@ -305,9 +305,7 @@ class Create extends Base if ($async) { if (is_null($scheduledAt)) { - if ($project->getId() != '6862e6a6000cce69f9da') { - $execution = $authorization->skip(fn () => $dbForProject->createDocument('executions', $execution)); - } + $execution = $authorization->skip(fn () => $dbForProject->createDocument('executions', $execution)); $queueForFunctions ->setType('http') ->setExecution($execution) @@ -348,9 +346,7 @@ class Create extends Base ->setAttribute('scheduleInternalId', $schedule->getSequence()) ->setAttribute('scheduledAt', $scheduledAt); - if ($project->getId() != '6862e6a6000cce69f9da') { - $execution = $authorization->skip(fn () => $dbForProject->createDocument('executions', $execution)); - } + $execution = $authorization->skip(fn () => $dbForProject->createDocument('executions', $execution)); } if ($executionsRetentionCount > 0 && ENABLE_EXECUTIONS_LIMIT_ON_ROUTE) { @@ -516,9 +512,7 @@ class Create extends Base ->addMetric(str_replace(['{resourceType}', '{resourceInternalId}'], [RESOURCE_TYPE_FUNCTIONS, $function->getSequence()], METRIC_RESOURCE_TYPE_ID_EXECUTIONS_MB_SECONDS), (int)(($spec['memory'] ?? APP_COMPUTE_MEMORY_DEFAULT) * $execution->getAttribute('duration', 0) * ($spec['cpus'] ?? APP_COMPUTE_CPUS_DEFAULT))) ; - if ($project->getId() != '6862e6a6000cce69f9da') { - $execution = $authorization->skip(fn () => $dbForProject->createDocument('executions', $execution)); - } + $execution = $authorization->skip(fn () => $dbForProject->createDocument('executions', $execution)); } $executionResponse['headers']['x-appwrite-execution-id'] = $execution->getId(); diff --git a/src/Appwrite/Platform/Workers/Executions.php b/src/Appwrite/Platform/Workers/Executions.php index f440429356..b218e1e598 100644 --- a/src/Appwrite/Platform/Workers/Executions.php +++ b/src/Appwrite/Platform/Workers/Executions.php @@ -10,12 +10,6 @@ use Utopia\Queue\Message; class Executions extends Action { - /** - * Keep skipping execution upserts for this internal project. - * The HTTP execution flow applies the same exclusion separately. - */ - private const string EXCLUDED_PROJECT_ID = '6862e6a6000cce69f9da'; - public static function getName(): string { return 'executions'; @@ -46,8 +40,6 @@ class Executions extends Action } $project = $executionMessage->project; - if ($project->getId() !== self::EXCLUDED_PROJECT_ID) { - $dbForProject->upsertDocument('executions', $execution); - } + $dbForProject->upsertDocument('executions', $execution); } } From f77a64bff9c92bca4d5ece55199ed078860318d6 Mon Sep 17 00:00:00 2001 From: Chirag Aggarwal Date: Fri, 10 Apr 2026 14:00:57 +0530 Subject: [PATCH 4/4] chore: address publisher PR nits --- src/Appwrite/Event/Message/Execution.php | 9 ++++----- src/Appwrite/Event/Message/Migration.php | 11 +++++------ src/Appwrite/Event/Message/StatsResources.php | 7 +++---- src/Appwrite/Platform/Tasks/StatsResources.php | 3 +-- src/Appwrite/Platform/Workers/Executions.php | 5 ++--- src/Appwrite/Platform/Workers/Migrations.php | 4 ++-- 6 files changed, 17 insertions(+), 22 deletions(-) diff --git a/src/Appwrite/Event/Message/Execution.php b/src/Appwrite/Event/Message/Execution.php index d957c184e4..0943c82e4a 100644 --- a/src/Appwrite/Event/Message/Execution.php +++ b/src/Appwrite/Event/Message/Execution.php @@ -4,11 +4,11 @@ namespace Appwrite\Event\Message; use Utopia\Database\Document; -class Execution extends Base +final class Execution extends Base { public function __construct( - public Document $project, - public Document $execution, + public readonly Document $project, + public readonly Document $execution, ) { } @@ -22,8 +22,7 @@ class Execution extends Base public static function fromArray(array $data): static { - /** @phpstan-ignore new.static (subclass constructors are backwards-compatible via optional params) */ - return new static( + return new self( project: new Document($data['project'] ?? []), execution: new Document($data['execution'] ?? []), ); diff --git a/src/Appwrite/Event/Message/Migration.php b/src/Appwrite/Event/Message/Migration.php index ed040670e6..ceeec45461 100644 --- a/src/Appwrite/Event/Message/Migration.php +++ b/src/Appwrite/Event/Message/Migration.php @@ -4,12 +4,12 @@ namespace Appwrite\Event\Message; use Utopia\Database\Document; -class Migration extends Base +final class Migration extends Base { public function __construct( - public Document $project, - public Document $migration, - public array $platform = [], + public readonly Document $project, + public readonly Document $migration, + public readonly array $platform = [], ) { } @@ -24,8 +24,7 @@ class Migration extends Base public static function fromArray(array $data): static { - /** @phpstan-ignore new.static (subclass constructors are backwards-compatible via optional params) */ - return new static( + return new self( project: new Document($data['project'] ?? []), migration: new Document($data['migration'] ?? []), platform: $data['platform'] ?? [], diff --git a/src/Appwrite/Event/Message/StatsResources.php b/src/Appwrite/Event/Message/StatsResources.php index 0370b3c616..584cbc137a 100644 --- a/src/Appwrite/Event/Message/StatsResources.php +++ b/src/Appwrite/Event/Message/StatsResources.php @@ -4,10 +4,10 @@ namespace Appwrite\Event\Message; use Utopia\Database\Document; -class StatsResources extends Base +final class StatsResources extends Base { public function __construct( - public Document $project, + public readonly Document $project, ) { } @@ -20,8 +20,7 @@ class StatsResources extends Base public static function fromArray(array $data): static { - /** @phpstan-ignore new.static (subclass constructors are backwards-compatible via optional params) */ - return new static( + return new self( project: new Document($data['project'] ?? []), ); } diff --git a/src/Appwrite/Platform/Tasks/StatsResources.php b/src/Appwrite/Platform/Tasks/StatsResources.php index d6236f07d6..8699d73bbb 100644 --- a/src/Appwrite/Platform/Tasks/StatsResources.php +++ b/src/Appwrite/Platform/Tasks/StatsResources.php @@ -2,7 +2,6 @@ namespace Appwrite\Platform\Tasks; -use Appwrite\Event\Message\StatsResources as StatsResourcesMessage; use Appwrite\Event\Publisher\StatsResources as StatsResourcesPublisher; use Appwrite\Platform\Action; use Utopia\Console; @@ -71,7 +70,7 @@ class StatsResources extends Action Query::greaterThanEqual('accessedAt', DateTime::format($last24Hours)), Query::equal('region', [System::getEnv('_APP_REGION', 'default')]) ], function ($project) use ($publisherForStatsResources) { - $publisherForStatsResources->enqueue(new StatsResourcesMessage( + $publisherForStatsResources->enqueue(new \Appwrite\Event\Message\StatsResources( project: $project, )); Console::success('project: ' . $project->getId() . '(' . $project->getSequence() . ')' . ' queued'); diff --git a/src/Appwrite/Platform/Workers/Executions.php b/src/Appwrite/Platform/Workers/Executions.php index b218e1e598..673e9de791 100644 --- a/src/Appwrite/Platform/Workers/Executions.php +++ b/src/Appwrite/Platform/Workers/Executions.php @@ -2,7 +2,7 @@ namespace Appwrite\Platform\Workers; -use Appwrite\Event\Message\Execution as ExecutionMessage; +use Appwrite\Event\Message\Execution; use Exception; use Utopia\Database\Database; use Utopia\Platform\Action; @@ -32,14 +32,13 @@ class Executions extends Action Message $message, Database $dbForProject, ): void { - $executionMessage = ExecutionMessage::fromArray($message->getPayload() ?? []); + $executionMessage = Execution::fromArray($message->getPayload() ?? []); $execution = $executionMessage->execution; if ($execution->isEmpty()) { throw new Exception('Missing execution'); } - $project = $executionMessage->project; $dbForProject->upsertDocument('executions', $execution); } } diff --git a/src/Appwrite/Platform/Workers/Migrations.php b/src/Appwrite/Platform/Workers/Migrations.php index 198e3f568d..118ff7acf9 100644 --- a/src/Appwrite/Platform/Workers/Migrations.php +++ b/src/Appwrite/Platform/Workers/Migrations.php @@ -4,7 +4,7 @@ namespace Appwrite\Platform\Workers; use Ahc\Jwt\JWT; use Appwrite\Event\Mail; -use Appwrite\Event\Message\Migration as MigrationMessage; +use Appwrite\Event\Message\Migration; use Appwrite\Event\Message\Usage as UsageMessage; use Appwrite\Event\Publisher\Usage as UsagePublisher; use Appwrite\Event\Realtime; @@ -130,7 +130,7 @@ class Migrations extends Action array $plan, Authorization $authorization, ): void { - $migrationMessage = MigrationMessage::fromArray($message->getPayload() ?? []); + $migrationMessage = Migration::fromArray($message->getPayload() ?? []); $this->getDatabasesDB = $getDatabasesDB; $this->getProjectDB = $getProjectDB;