diff --git a/app/cli.php b/app/cli.php index 458df2d642..73908510d9 100644 --- a/app/cli.php +++ b/app/cli.php @@ -6,8 +6,8 @@ use Appwrite\Event\Certificate; use Appwrite\Event\Delete; use Appwrite\Event\Event; use Appwrite\Event\Func; +use Appwrite\Event\Publisher\StatsResources as StatsResourcesPublisher; use Appwrite\Event\Publisher\Usage as UsagePublisher; -use Appwrite\Event\StatsResources; use Appwrite\Platform\Appwrite; use Appwrite\Runtimes\Runtimes; use Appwrite\Usage\Context as UsageContext; @@ -253,9 +253,10 @@ $container->set('publisherForUsage', fn (Publisher $publisher) => new UsagePubli $publisher, new Queue(System::getEnv('_APP_STATS_USAGE_QUEUE_NAME', Event::STATS_USAGE_QUEUE_NAME)) ), ['publisher']); -$container->set('queueForStatsResources', function (Publisher $publisher) { - return new StatsResources($publisher); -}, ['publisher']); +$container->set('publisherForStatsResources', fn (Publisher $publisher) => new StatsResourcesPublisher( + $publisher, + new Queue(System::getEnv('_APP_STATS_RESOURCES_QUEUE_NAME', Event::STATS_RESOURCES_QUEUE_NAME)) +), ['publisher']); $container->set('queueForFunctions', function (Publisher $publisher) { return new Func($publisher); }, ['publisher']); diff --git a/app/controllers/api/migrations.php b/app/controllers/api/migrations.php index 45a663fb56..4c541d2817 100644 --- a/app/controllers/api/migrations.php +++ b/app/controllers/api/migrations.php @@ -1,7 +1,8 @@ inject('dbForProject') ->inject('project') ->inject('platform') - ->inject('user') ->inject('queueForEvents') - ->inject('queueForMigrations') - ->action(function (array $resources, string $endpoint, string $projectId, string $apiKey, Response $response, Database $dbForProject, Document $project, array $platform, Document $user, Event $queueForEvents, Migration $queueForMigrations) { + ->inject('publisherForMigrations') + ->action(function (array $resources, string $endpoint, string $projectId, string $apiKey, Response $response, Database $dbForProject, Document $project, array $platform, Event $queueForEvents, MigrationPublisher $publisherForMigrations) { $migration = $dbForProject->createDocument('migrations', new Document([ '$id' => ID::unique(), 'status' => 'pending', @@ -114,12 +114,11 @@ Http::post('/v1/migrations/appwrite') $queueForEvents->setParam('migrationId', $migration->getId()); // Trigger Transfer - $queueForMigrations - ->setMigration($migration) - ->setProject($project) - ->setPlatform($platform) - ->setUser($user) - ->trigger(); + $publisherForMigrations->enqueue(new MigrationMessage( + project: $project, + migration: $migration, + platform: $platform, + )); $response ->setStatusCode(Response::STATUS_CODE_ACCEPTED) @@ -151,10 +150,9 @@ Http::post('/v1/migrations/firebase') ->inject('dbForProject') ->inject('project') ->inject('platform') - ->inject('user') ->inject('queueForEvents') - ->inject('queueForMigrations') - ->action(function (array $resources, string $serviceAccount, Response $response, Database $dbForProject, Document $project, array $platform, Document $user, Event $queueForEvents, Migration $queueForMigrations) { + ->inject('publisherForMigrations') + ->action(function (array $resources, string $serviceAccount, Response $response, Database $dbForProject, Document $project, array $platform, Event $queueForEvents, MigrationPublisher $publisherForMigrations) { $serviceAccountData = json_decode($serviceAccount, true); if (empty($serviceAccountData)) { @@ -183,12 +181,11 @@ Http::post('/v1/migrations/firebase') $queueForEvents->setParam('migrationId', $migration->getId()); // Trigger Transfer - $queueForMigrations - ->setMigration($migration) - ->setProject($project) - ->setPlatform($platform) - ->setUser($user) - ->trigger(); + $publisherForMigrations->enqueue(new MigrationMessage( + project: $project, + migration: $migration, + platform: $platform, + )); $response ->setStatusCode(Response::STATUS_CODE_ACCEPTED) @@ -225,10 +222,9 @@ Http::post('/v1/migrations/supabase') ->inject('dbForProject') ->inject('project') ->inject('platform') - ->inject('user') ->inject('queueForEvents') - ->inject('queueForMigrations') - ->action(function (array $resources, string $endpoint, string $apiKey, string $databaseHost, string $username, string $password, int $port, Response $response, Database $dbForProject, Document $project, array $platform, Document $user, Event $queueForEvents, Migration $queueForMigrations) { + ->inject('publisherForMigrations') + ->action(function (array $resources, string $endpoint, string $apiKey, string $databaseHost, string $username, string $password, int $port, Response $response, Database $dbForProject, Document $project, array $platform, Event $queueForEvents, MigrationPublisher $publisherForMigrations) { $migration = $dbForProject->createDocument('migrations', new Document([ '$id' => ID::unique(), 'status' => 'pending', @@ -252,12 +248,11 @@ Http::post('/v1/migrations/supabase') $queueForEvents->setParam('migrationId', $migration->getId()); // Trigger Transfer - $queueForMigrations - ->setMigration($migration) - ->setProject($project) - ->setPlatform($platform) - ->setUser($user) - ->trigger(); + $publisherForMigrations->enqueue(new MigrationMessage( + project: $project, + migration: $migration, + platform: $platform, + )); $response ->setStatusCode(Response::STATUS_CODE_ACCEPTED) @@ -295,10 +290,9 @@ Http::post('/v1/migrations/nhost') ->inject('dbForProject') ->inject('project') ->inject('platform') - ->inject('user') ->inject('queueForEvents') - ->inject('queueForMigrations') - ->action(function (array $resources, string $subdomain, string $region, string $adminSecret, string $database, string $username, string $password, int $port, Response $response, Database $dbForProject, Document $project, array $platform, Document $user, Event $queueForEvents, Migration $queueForMigrations) { + ->inject('publisherForMigrations') + ->action(function (array $resources, string $subdomain, string $region, string $adminSecret, string $database, string $username, string $password, int $port, Response $response, Database $dbForProject, Document $project, array $platform, Event $queueForEvents, MigrationPublisher $publisherForMigrations) { $migration = $dbForProject->createDocument('migrations', new Document([ '$id' => ID::unique(), 'status' => 'pending', @@ -323,12 +317,11 @@ Http::post('/v1/migrations/nhost') $queueForEvents->setParam('migrationId', $migration->getId()); // Trigger Transfer - $queueForMigrations - ->setMigration($migration) - ->setProject($project) - ->setPlatform($platform) - ->setUser($user) - ->trigger(); + $publisherForMigrations->enqueue(new MigrationMessage( + project: $project, + migration: $migration, + platform: $platform, + )); $response ->setStatusCode(Response::STATUS_CODE_ACCEPTED) @@ -368,7 +361,7 @@ Http::post('/v1/migrations/csv/imports') ->inject('deviceForFiles') ->inject('deviceForMigrations') ->inject('queueForEvents') - ->inject('queueForMigrations') + ->inject('publisherForMigrations') ->action(function ( string $bucketId, string $fileId, @@ -383,7 +376,7 @@ Http::post('/v1/migrations/csv/imports') Device $deviceForFiles, Device $deviceForMigrations, Event $queueForEvents, - Migration $queueForMigrations + MigrationPublisher $publisherForMigrations ) { $bucket = $authorization->skip(function () use ($internalFile, $dbForPlatform, $dbForProject, $bucketId) { if ($internalFile) { @@ -479,11 +472,10 @@ Http::post('/v1/migrations/csv/imports') $queueForEvents->setParam('migrationId', $migration->getId()); - $queueForMigrations - ->setMigration($migration) - ->setProject($project) - ->setProject($project) - ->trigger(); + $publisherForMigrations->enqueue(new MigrationMessage( + project: $project, + migration: $migration, + )); $response ->setStatusCode(Response::STATUS_CODE_ACCEPTED) @@ -526,7 +518,7 @@ Http::post('/v1/migrations/csv/exports') ->inject('project') ->inject('platform') ->inject('queueForEvents') - ->inject('queueForMigrations') + ->inject('publisherForMigrations') ->action(function ( string $resourceId, string $filename, @@ -545,7 +537,7 @@ Http::post('/v1/migrations/csv/exports') Document $project, array $platform, Event $queueForEvents, - Migration $queueForMigrations + MigrationPublisher $publisherForMigrations ) { try { $parsedQueries = Query::parseQueries($queries); @@ -630,11 +622,11 @@ Http::post('/v1/migrations/csv/exports') $queueForEvents->setParam('migrationId', $migration->getId()); - $queueForMigrations - ->setMigration($migration) - ->setProject($project) - ->setPlatform($platform) - ->trigger(); + $publisherForMigrations->enqueue(new MigrationMessage( + project: $project, + migration: $migration, + platform: $platform, + )); $response ->setStatusCode(Response::STATUS_CODE_ACCEPTED) @@ -673,7 +665,7 @@ Http::post('/v1/migrations/json/imports') ->inject('deviceForFiles') ->inject('deviceForMigrations') ->inject('queueForEvents') - ->inject('queueForMigrations') + ->inject('publisherForMigrations') ->action(function ( string $bucketId, string $fileId, @@ -688,7 +680,7 @@ Http::post('/v1/migrations/json/imports') Device $deviceForFiles, Device $deviceForMigrations, Event $queueForEvents, - Migration $queueForMigrations + MigrationPublisher $publisherForMigrations ) { $bucket = $authorization->skip(function () use ($internalFile, $dbForPlatform, $dbForProject, $bucketId) { if ($internalFile) { @@ -783,11 +775,11 @@ Http::post('/v1/migrations/json/imports') $queueForEvents->setParam('migrationId', $migration->getId()); - $queueForMigrations - ->setMigration($migration) - ->setProject($project) - ->setPlatform($platform) - ->trigger(); + $publisherForMigrations->enqueue(new MigrationMessage( + project: $project, + migration: $migration, + platform: $platform, + )); $response ->setStatusCode(Response::STATUS_CODE_ACCEPTED) @@ -826,7 +818,7 @@ Http::post('/v1/migrations/json/exports') ->inject('project') ->inject('platform') ->inject('queueForEvents') - ->inject('queueForMigrations') + ->inject('publisherForMigrations') ->action(function ( string $resourceId, string $filename, @@ -841,7 +833,7 @@ Http::post('/v1/migrations/json/exports') Document $project, array $platform, Event $queueForEvents, - Migration $queueForMigrations + MigrationPublisher $publisherForMigrations ) { try { $parsedQueries = Query::parseQueries($queries); @@ -915,11 +907,11 @@ Http::post('/v1/migrations/json/exports') $queueForEvents->setParam('migrationId', $migration->getId()); - $queueForMigrations - ->setMigration($migration) - ->setProject($project) - ->setPlatform($platform) - ->trigger(); + $publisherForMigrations->enqueue(new MigrationMessage( + project: $project, + migration: $migration, + platform: $platform, + )); $response ->setStatusCode(Response::STATUS_CODE_ACCEPTED) @@ -1216,9 +1208,8 @@ Http::patch('/v1/migrations/:migrationId') ->inject('dbForProject') ->inject('project') ->inject('platform') - ->inject('user') - ->inject('queueForMigrations') - ->action(function (string $migrationId, Response $response, Database $dbForProject, Document $project, array $platform, Document $user, Migration $queueForMigrations) { + ->inject('publisherForMigrations') + ->action(function (string $migrationId, Response $response, Database $dbForProject, Document $project, array $platform, MigrationPublisher $publisherForMigrations) { $migration = $dbForProject->getDocument('migrations', $migrationId); if ($migration->isEmpty()) { @@ -1234,12 +1225,11 @@ Http::patch('/v1/migrations/:migrationId') ->setAttribute('dateUpdated', \time()); // Trigger Migration - $queueForMigrations - ->setMigration($migration) - ->setProject($project) - ->setPlatform($platform) - ->setUser($user) - ->trigger(); + $publisherForMigrations->enqueue(new MigrationMessage( + project: $project, + migration: $migration, + platform: $platform, + )); $response->noContent(); }); diff --git a/app/init/resources.php b/app/init/resources.php index fdca88c30e..32d6e0a45f 100644 --- a/app/init/resources.php +++ b/app/init/resources.php @@ -1,6 +1,9 @@ set('publisherForUsage', fn (Publisher $publisher) => new UsagePubli $publisher, new Queue(System::getEnv('_APP_STATS_USAGE_QUEUE_NAME', Event::STATS_USAGE_QUEUE_NAME)) ), ['publisher']); +$container->set('publisherForExecutions', fn (Publisher $publisher) => new ExecutionPublisher( + $publisher, + new Queue(System::getEnv('_APP_EXECUTIONS_QUEUE_NAME', Event::EXECUTIONS_QUEUE_NAME)) +), ['publisher']); +$container->set('publisherForMigrations', fn (Publisher $publisher) => new MigrationPublisher( + $publisher, + new Queue(System::getEnv('_APP_MIGRATIONS_QUEUE_NAME', Event::MIGRATIONS_QUEUE_NAME)) +), ['publisher']); +$container->set('publisherForStatsResources', fn (Publisher $publisher) => new StatsResourcesPublisher( + $publisher, + new Queue(System::getEnv('_APP_STATS_RESOURCES_QUEUE_NAME', Event::STATS_RESOURCES_QUEUE_NAME)) +), ['publisher']); /** * Platform configuration diff --git a/app/init/resources/request.php b/app/init/resources/request.php index 156e151501..63e58e92f7 100644 --- a/app/init/resources/request.php +++ b/app/init/resources/request.php @@ -13,10 +13,8 @@ use Appwrite\Event\Event; use Appwrite\Event\Func; use Appwrite\Event\Mail; use Appwrite\Event\Messaging; -use Appwrite\Event\Migration; use Appwrite\Event\Realtime; use Appwrite\Event\Screenshot; -use Appwrite\Event\StatsResources; use Appwrite\Event\Webhook; use Appwrite\Extend\Exception; use Appwrite\Functions\EventProcessor; @@ -163,13 +161,6 @@ return function (Container $container): void { $container->set('queueForCertificates', function (Publisher $publisher) { return new Certificate($publisher); }, ['publisher']); - $container->set('queueForMigrations', function (Publisher $publisher) { - return new Migration($publisher); - }, ['publisher']); - $container->set('queueForStatsResources', function (Publisher $publisher) { - return new StatsResources($publisher); - }, ['publisher']); - $container->set('dbForPlatform', function (Group $pools, Cache $cache, Authorization $authorization) { $adapter = new DatabasePool($pools->get('console')); $database = new Database($adapter, $cache); diff --git a/app/init/worker/message.php b/app/init/worker/message.php index 95477088ce..f893c84858 100644 --- a/app/init/worker/message.php +++ b/app/init/worker/message.php @@ -9,7 +9,6 @@ use Appwrite\Event\Event; use Appwrite\Event\Func; use Appwrite\Event\Mail; use Appwrite\Event\Messaging; -use Appwrite\Event\Migration; use Appwrite\Event\Realtime; use Appwrite\Event\Screenshot; use Appwrite\Event\Webhook; @@ -344,10 +343,6 @@ return function (Container $container): void { return new Certificate($publisher); }, ['publisher']); - $container->set('queueForMigrations', function (Publisher $publisher) { - return new Migration($publisher); - }, ['publisher']); - $container->set('deviceForSites', function (Document $project, Telemetry $telemetry) { return new TelemetryDevice($telemetry, getDevice(APP_STORAGE_SITES . '/app-' . $project->getId())); }, ['project', 'telemetry']); diff --git a/src/Appwrite/Bus/Listeners/Log.php b/src/Appwrite/Bus/Listeners/Log.php index 9bd539d5fe..076ed5c02d 100644 --- a/src/Appwrite/Bus/Listeners/Log.php +++ b/src/Appwrite/Bus/Listeners/Log.php @@ -3,10 +3,10 @@ namespace Appwrite\Bus\Listeners; use Appwrite\Bus\Events\ExecutionCompleted; -use Appwrite\Event\Execution; +use Appwrite\Event\Message\Execution as ExecutionMessage; +use Appwrite\Event\Publisher\Execution as ExecutionPublisher; use Utopia\Bus\Listener; use Utopia\Database\Document; -use Utopia\Queue\Publisher; class Log extends Listener { @@ -24,16 +24,15 @@ class Log extends Listener { $this ->desc('Persists execution logs to database via queue') - ->inject('publisher') + ->inject('publisherForExecutions') ->callback($this->handle(...)); } - public function handle(ExecutionCompleted $event, Publisher $publisher): void + public function handle(ExecutionCompleted $event, ExecutionPublisher $publisherForExecutions): void { - $queueForExecutions = new Execution($publisher); - $queueForExecutions - ->setExecution(new Document($event->execution)) - ->setProject(new Document($event->project)) - ->trigger(); + $publisherForExecutions->enqueue(new ExecutionMessage( + project: new Document($event->project), + execution: new Document($event->execution), + )); } } diff --git a/src/Appwrite/Event/Message/Execution.php b/src/Appwrite/Event/Message/Execution.php new file mode 100644 index 0000000000..0943c82e4a --- /dev/null +++ b/src/Appwrite/Event/Message/Execution.php @@ -0,0 +1,30 @@ + $this->project->getArrayCopy(), + 'execution' => $this->execution->getArrayCopy(), + ]; + } + + public static function fromArray(array $data): static + { + return new self( + project: new Document($data['project'] ?? []), + execution: new Document($data['execution'] ?? []), + ); + } +} diff --git a/src/Appwrite/Event/Message/Migration.php b/src/Appwrite/Event/Message/Migration.php new file mode 100644 index 0000000000..ceeec45461 --- /dev/null +++ b/src/Appwrite/Event/Message/Migration.php @@ -0,0 +1,33 @@ + $this->project->getArrayCopy(), + 'migration' => $this->migration->getArrayCopy(), + 'platform' => $this->platform, + ]; + } + + public static function fromArray(array $data): static + { + return new self( + project: new Document($data['project'] ?? []), + migration: new Document($data['migration'] ?? []), + platform: $data['platform'] ?? [], + ); + } +} diff --git a/src/Appwrite/Event/Message/StatsResources.php b/src/Appwrite/Event/Message/StatsResources.php new file mode 100644 index 0000000000..584cbc137a --- /dev/null +++ b/src/Appwrite/Event/Message/StatsResources.php @@ -0,0 +1,27 @@ + $this->project->getArrayCopy(), + ]; + } + + public static function fromArray(array $data): static + { + return new self( + project: new Document($data['project'] ?? []), + ); + } +} diff --git a/src/Appwrite/Event/Publisher/Execution.php b/src/Appwrite/Event/Publisher/Execution.php new file mode 100644 index 0000000000..05ea28d540 --- /dev/null +++ b/src/Appwrite/Event/Publisher/Execution.php @@ -0,0 +1,27 @@ +publish($this->queue, $message); + } + + public function getSize(bool $failed = false): int + { + return $this->getQueueSize($this->queue, $failed); + } +} diff --git a/src/Appwrite/Event/Publisher/Migration.php b/src/Appwrite/Event/Publisher/Migration.php new file mode 100644 index 0000000000..fc455a7e95 --- /dev/null +++ b/src/Appwrite/Event/Publisher/Migration.php @@ -0,0 +1,27 @@ +publish($this->queue, $message); + } + + public function getSize(bool $failed = false): int + { + return $this->getQueueSize($this->queue, $failed); + } +} diff --git a/src/Appwrite/Event/Publisher/StatsResources.php b/src/Appwrite/Event/Publisher/StatsResources.php new file mode 100644 index 0000000000..4c04583b15 --- /dev/null +++ b/src/Appwrite/Event/Publisher/StatsResources.php @@ -0,0 +1,34 @@ +publish($this->queue, $message); + } catch (\Throwable $th) { + Console::error('[StatsResources] Failed to publish stats resources message: ' . $th->getMessage()); + return false; + } + } + + public function getSize(bool $failed = false): int + { + return $this->getQueueSize($this->queue, $failed); + } +} diff --git a/src/Appwrite/Platform/Modules/Functions/Http/Executions/Create.php b/src/Appwrite/Platform/Modules/Functions/Http/Executions/Create.php index 37292ce984..72474b03f9 100644 --- a/src/Appwrite/Platform/Modules/Functions/Http/Executions/Create.php +++ b/src/Appwrite/Platform/Modules/Functions/Http/Executions/Create.php @@ -305,9 +305,7 @@ class Create extends Base if ($async) { if (is_null($scheduledAt)) { - if ($project->getId() != '6862e6a6000cce69f9da') { - $execution = $authorization->skip(fn () => $dbForProject->createDocument('executions', $execution)); - } + $execution = $authorization->skip(fn () => $dbForProject->createDocument('executions', $execution)); $queueForFunctions ->setType('http') ->setExecution($execution) @@ -348,9 +346,7 @@ class Create extends Base ->setAttribute('scheduleInternalId', $schedule->getSequence()) ->setAttribute('scheduledAt', $scheduledAt); - if ($project->getId() != '6862e6a6000cce69f9da') { - $execution = $authorization->skip(fn () => $dbForProject->createDocument('executions', $execution)); - } + $execution = $authorization->skip(fn () => $dbForProject->createDocument('executions', $execution)); } if ($executionsRetentionCount > 0 && ENABLE_EXECUTIONS_LIMIT_ON_ROUTE) { @@ -516,9 +512,7 @@ class Create extends Base ->addMetric(str_replace(['{resourceType}', '{resourceInternalId}'], [RESOURCE_TYPE_FUNCTIONS, $function->getSequence()], METRIC_RESOURCE_TYPE_ID_EXECUTIONS_MB_SECONDS), (int)(($spec['memory'] ?? APP_COMPUTE_MEMORY_DEFAULT) * $execution->getAttribute('duration', 0) * ($spec['cpus'] ?? APP_COMPUTE_CPUS_DEFAULT))) ; - if ($project->getId() != '6862e6a6000cce69f9da') { - $execution = $authorization->skip(fn () => $dbForProject->createDocument('executions', $execution)); - } + $execution = $authorization->skip(fn () => $dbForProject->createDocument('executions', $execution)); } $executionResponse['headers']['x-appwrite-execution-id'] = $execution->getId(); diff --git a/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Failed/Get.php b/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Failed/Get.php index cb3640746f..1f7cc0bf33 100644 --- a/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Failed/Get.php +++ b/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Failed/Get.php @@ -11,10 +11,10 @@ use Appwrite\Event\Event; use Appwrite\Event\Func; use Appwrite\Event\Mail; use Appwrite\Event\Messaging; -use Appwrite\Event\Migration; +use Appwrite\Event\Publisher\Migration as MigrationPublisher; +use Appwrite\Event\Publisher\StatsResources as StatsResourcesPublisher; use Appwrite\Event\Publisher\Usage as UsagePublisher; use Appwrite\Event\Screenshot; -use Appwrite\Event\StatsResources; use Appwrite\Event\Webhook; use Appwrite\Platform\Modules\Health\Http\Health\Queue\Base; use Appwrite\SDK\AuthType; @@ -78,13 +78,13 @@ class Get extends Base ->inject('queueForAudits') ->inject('queueForMails') ->inject('queueForFunctions') - ->inject('queueForStatsResources') + ->inject('publisherForStatsResources') ->inject('publisherForUsage') ->inject('queueForWebhooks') ->inject('queueForCertificates') ->inject('queueForBuilds') ->inject('queueForMessaging') - ->inject('queueForMigrations') + ->inject('publisherForMigrations') ->inject('queueForScreenshots') ->callback($this->action(...)); } @@ -98,13 +98,13 @@ class Get extends Base Audit $queueForAudits, Mail $queueForMails, Func $queueForFunctions, - StatsResources $queueForStatsResources, + StatsResourcesPublisher $publisherForStatsResources, UsagePublisher $publisherForUsage, Webhook $queueForWebhooks, Certificate $queueForCertificates, Build $queueForBuilds, Messaging $queueForMessaging, - Migration $queueForMigrations, + MigrationPublisher $publisherForMigrations, Screenshot $queueForScreenshots, ): void { $threshold = (int) $threshold; @@ -115,14 +115,14 @@ class Get extends Base System::getEnv('_APP_AUDITS_QUEUE_NAME', Event::AUDITS_QUEUE_NAME) => $queueForAudits, System::getEnv('_APP_MAILS_QUEUE_NAME', Event::MAILS_QUEUE_NAME) => $queueForMails, System::getEnv('_APP_FUNCTIONS_QUEUE_NAME', Event::FUNCTIONS_QUEUE_NAME) => $queueForFunctions, - System::getEnv('_APP_STATS_RESOURCES_QUEUE_NAME', Event::STATS_RESOURCES_QUEUE_NAME) => $queueForStatsResources, + System::getEnv('_APP_STATS_RESOURCES_QUEUE_NAME', Event::STATS_RESOURCES_QUEUE_NAME) => $publisherForStatsResources, System::getEnv('_APP_STATS_USAGE_QUEUE_NAME', Event::STATS_USAGE_QUEUE_NAME) => $publisherForUsage, System::getEnv('_APP_WEBHOOK_QUEUE_NAME', Event::WEBHOOK_QUEUE_NAME) => $queueForWebhooks, System::getEnv('_APP_CERTIFICATES_QUEUE_NAME', Event::CERTIFICATES_QUEUE_NAME) => $queueForCertificates, System::getEnv('_APP_BUILDS_QUEUE_NAME', Event::BUILDS_QUEUE_NAME) => $queueForBuilds, System::getEnv('_APP_SCREENSHOTS_QUEUE_NAME', Event::SCREENSHOTS_QUEUE_NAME) => $queueForScreenshots, System::getEnv('_APP_MESSAGING_QUEUE_NAME', Event::MESSAGING_QUEUE_NAME) => $queueForMessaging, - System::getEnv('_APP_MIGRATIONS_QUEUE_NAME', Event::MIGRATIONS_QUEUE_NAME) => $queueForMigrations, + System::getEnv('_APP_MIGRATIONS_QUEUE_NAME', Event::MIGRATIONS_QUEUE_NAME) => $publisherForMigrations, }; $failed = $queue->getSize(failed: true); diff --git a/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Migrations/Get.php b/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Migrations/Get.php index 4faca7d8a4..70bef3562b 100644 --- a/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Migrations/Get.php +++ b/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Migrations/Get.php @@ -2,7 +2,7 @@ namespace Appwrite\Platform\Modules\Health\Http\Health\Queue\Migrations; -use Appwrite\Event\Migration; +use Appwrite\Event\Publisher\Migration as MigrationPublisher; use Appwrite\Platform\Modules\Health\Http\Health\Queue\Base; use Appwrite\SDK\AuthType; use Appwrite\SDK\ContentType; @@ -42,16 +42,16 @@ class Get extends Base contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('queueForMigrations') + ->inject('publisherForMigrations') ->inject('response') ->callback($this->action(...)); } - public function action(int|string $threshold, Migration $queueForMigrations, Response $response): void + public function action(int|string $threshold, MigrationPublisher $publisherForMigrations, Response $response): void { $threshold = (int) $threshold; - $size = $queueForMigrations->getSize(); + $size = $publisherForMigrations->getSize(); $this->assertQueueThreshold($size, $threshold); diff --git a/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/StatsResources/Get.php b/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/StatsResources/Get.php index 57605298fd..5ab0aa2532 100644 --- a/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/StatsResources/Get.php +++ b/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/StatsResources/Get.php @@ -2,7 +2,7 @@ namespace Appwrite\Platform\Modules\Health\Http\Health\Queue\StatsResources; -use Appwrite\Event\StatsResources; +use Appwrite\Event\Publisher\StatsResources as StatsResourcesPublisher; use Appwrite\Platform\Modules\Health\Http\Health\Queue\Base; use Appwrite\SDK\AuthType; use Appwrite\SDK\ContentType; @@ -42,16 +42,16 @@ class Get extends Base contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('queueForStatsResources') + ->inject('publisherForStatsResources') ->inject('response') ->callback($this->action(...)); } - public function action(int|string $threshold, StatsResources $queueForStatsResources, Response $response): void + public function action(int|string $threshold, StatsResourcesPublisher $publisherForStatsResources, Response $response): void { $threshold = (int) $threshold; - $size = $queueForStatsResources->getSize(); + $size = $publisherForStatsResources->getSize(); $this->assertQueueThreshold($size, $threshold); diff --git a/src/Appwrite/Platform/Tasks/StatsResources.php b/src/Appwrite/Platform/Tasks/StatsResources.php index 220e377619..8699d73bbb 100644 --- a/src/Appwrite/Platform/Tasks/StatsResources.php +++ b/src/Appwrite/Platform/Tasks/StatsResources.php @@ -2,7 +2,7 @@ namespace Appwrite\Platform\Tasks; -use Appwrite\Event\StatsResources as EventStatsResources; +use Appwrite\Event\Publisher\StatsResources as StatsResourcesPublisher; use Appwrite\Platform\Action; use Utopia\Console; use Utopia\Database\Database; @@ -43,11 +43,11 @@ class StatsResources extends Action ->desc('Schedules projects for usage count') ->inject('dbForPlatform') ->inject('logError') - ->inject('queueForStatsResources') + ->inject('publisherForStatsResources') ->callback($this->action(...)); } - public function action(Database $dbForPlatform, callable $logError, EventStatsResources $queueForStatsResources): void + public function action(Database $dbForPlatform, callable $logError, StatsResourcesPublisher $publisherForStatsResources): void { $this->logError = $logError; $this->dbForPlatform = $dbForPlatform; @@ -60,7 +60,7 @@ class StatsResources extends Action $interval = (int) System::getEnv('_APP_STATS_RESOURCES_INTERVAL', '3600'); - Console::loop(function () use ($queueForStatsResources) { + Console::loop(function () use ($publisherForStatsResources) { $last24Hours = (new \DateTime())->sub(\DateInterval::createFromDateString('24 hours')); /** @@ -69,10 +69,10 @@ class StatsResources extends Action $this->foreachDocument($this->dbForPlatform, 'projects', [ Query::greaterThanEqual('accessedAt', DateTime::format($last24Hours)), Query::equal('region', [System::getEnv('_APP_REGION', 'default')]) - ], function ($project) use ($queueForStatsResources) { - $queueForStatsResources - ->setProject($project) - ->trigger(); + ], function ($project) use ($publisherForStatsResources) { + $publisherForStatsResources->enqueue(new \Appwrite\Event\Message\StatsResources( + project: $project, + )); Console::success('project: ' . $project->getId() . '(' . $project->getSequence() . ')' . ' queued'); }); }, $interval); diff --git a/src/Appwrite/Platform/Workers/Executions.php b/src/Appwrite/Platform/Workers/Executions.php index d874e26267..673e9de791 100644 --- a/src/Appwrite/Platform/Workers/Executions.php +++ b/src/Appwrite/Platform/Workers/Executions.php @@ -2,9 +2,9 @@ namespace Appwrite\Platform\Workers; +use Appwrite\Event\Message\Execution; use Exception; use Utopia\Database\Database; -use Utopia\Database\Document; use Utopia\Platform\Action; use Utopia\Queue\Message; @@ -32,21 +32,13 @@ class Executions extends Action Message $message, Database $dbForProject, ): void { - $payload = $message->getPayload() ?? []; - - if (empty($payload)) { - throw new Exception('Missing payload'); - } - - $execution = new Document($payload['execution'] ?? []); + $executionMessage = Execution::fromArray($message->getPayload() ?? []); + $execution = $executionMessage->execution; if ($execution->isEmpty()) { throw new Exception('Missing execution'); } - $project = new Document($payload['project'] ?? []); - if ($project->getId() != '6862e6a6000cce69f9da') { - $dbForProject->upsertDocument('executions', $execution); - } + $dbForProject->upsertDocument('executions', $execution); } } diff --git a/src/Appwrite/Platform/Workers/Migrations.php b/src/Appwrite/Platform/Workers/Migrations.php index 43f5c97ba6..118ff7acf9 100644 --- a/src/Appwrite/Platform/Workers/Migrations.php +++ b/src/Appwrite/Platform/Workers/Migrations.php @@ -4,6 +4,7 @@ namespace Appwrite\Platform\Workers; use Ahc\Jwt\JWT; use Appwrite\Event\Mail; +use Appwrite\Event\Message\Migration; use Appwrite\Event\Message\Usage as UsageMessage; use Appwrite\Event\Publisher\Usage as UsagePublisher; use Appwrite\Event\Realtime; @@ -129,7 +130,7 @@ class Migrations extends Action array $plan, Authorization $authorization, ): void { - $payload = $message->getPayload() ?? []; + $migrationMessage = Migration::fromArray($message->getPayload() ?? []); $this->getDatabasesDB = $getDatabasesDB; $this->getProjectDB = $getProjectDB; @@ -137,12 +138,7 @@ class Migrations extends Action $this->deviceForFiles = $deviceForFiles; $this->plan = $plan; - if (empty($payload)) { - throw new Exception('Missing payload'); - } - - $events = $payload['events'] ?? []; - $migration = new Document($payload['migration'] ?? []); + $migration = $migrationMessage->migration; if ($migration->isEmpty()) { throw new \Exception('Migration not found'); @@ -161,11 +157,7 @@ class Migrations extends Action $this->project = $project; $this->logError = $logError; - $platform = $payload['platform'] ?? Config::getParam('platform', []); - - if (!empty($events)) { - return; - } + $platform = $migrationMessage->platform ?: Config::getParam('platform', []); try { $this->processMigration( diff --git a/src/Appwrite/Platform/Workers/StatsResources.php b/src/Appwrite/Platform/Workers/StatsResources.php index b2823d3722..db214f5d32 100644 --- a/src/Appwrite/Platform/Workers/StatsResources.php +++ b/src/Appwrite/Platform/Workers/StatsResources.php @@ -2,6 +2,7 @@ namespace Appwrite\Platform\Workers; +use Appwrite\Event\Message\StatsResources as StatsResourcesMessage; use Appwrite\Platform\Action; use Exception; use Throwable; @@ -67,8 +68,8 @@ class StatsResources extends Action { $this->logError = $logError; - $payload = $message->getPayload() ?? []; - if (empty($payload)) { + $statsResources = StatsResourcesMessage::fromArray($message->getPayload() ?? []); + if ($statsResources->project->isEmpty()) { throw new Exception('Missing payload'); }