From b45ff6b6461219677faed24dde444caa9f50ed26 Mon Sep 17 00:00:00 2001 From: loks0n <22452787+loks0n@users.noreply.github.com> Date: Fri, 13 Feb 2026 12:28:26 +0000 Subject: [PATCH] refactor: replace queueForExecutions with Bus event bus Introduce a generic event bus (Utopia\Bus) with typed events, listener base class, Span instrumentation, and coroutine dispatch. Replace all direct queueForExecutions and inline execution usage calls with ExecutionCompleted event and dedicated listeners (Log, Usage). Co-Authored-By: Claude Opus 4.6 --- app/cli.php | 4 ++ app/controllers/general.php | 73 +++++++-------------- app/http.php | 4 ++ app/init/registers.php | 8 +++ app/init/resources.php | 4 -- app/listeners.php | 9 +++ app/worker.php | 8 +-- composer.json | 3 +- phpstan.neon | 4 ++ src/Appwrite/Bus/ExecutionCompleted.php | 22 +++++++ src/Appwrite/Bus/Listeners/Log.php | 39 +++++++++++ src/Appwrite/Bus/Listeners/Usage.php | 64 ++++++++++++++++++ src/Appwrite/Platform/Workers/Functions.php | 65 +++++++----------- src/Utopia/Bus/Bus.php | 59 +++++++++++++++++ src/Utopia/Bus/Event.php | 7 ++ src/Utopia/Bus/Listener.php | 51 ++++++++++++++ 16 files changed, 321 insertions(+), 103 deletions(-) create mode 100644 app/listeners.php create mode 100644 src/Appwrite/Bus/ExecutionCompleted.php create mode 100644 src/Appwrite/Bus/Listeners/Log.php create mode 100644 src/Appwrite/Bus/Listeners/Usage.php create mode 100644 src/Utopia/Bus/Bus.php create mode 100644 src/Utopia/Bus/Event.php create mode 100644 src/Utopia/Bus/Listener.php diff --git a/app/cli.php b/app/cli.php index 0f8426afd9..052643f004 100644 --- a/app/cli.php +++ b/app/cli.php @@ -318,6 +318,10 @@ $setResource('logError', function (Registry $register) { $setResource('executor', fn () => new Executor(), []); +$setResource('bus', function (Registry $register) use ($cli) { + return $register->get('bus')->setResolver(fn (string $name) => $cli->getResource($name)); +}, ['register']); + $setResource('telemetry', fn () => new NoTelemetry(), []); $cli diff --git a/app/controllers/general.php b/app/controllers/general.php index 2ac03368df..5f1a5e5d9d 100644 --- a/app/controllers/general.php +++ b/app/controllers/general.php @@ -5,10 +5,10 @@ require_once __DIR__ . '/../init.php'; use Ahc\Jwt\JWT; use Ahc\Jwt\JWTException; use Appwrite\Auth\Key; +use Appwrite\Bus\ExecutionCompleted; use Appwrite\Event\Certificate; use Appwrite\Event\Delete as DeleteEvent; use Appwrite\Event\Event; -use Appwrite\Event\Execution; use Appwrite\Event\StatsUsage; use Appwrite\Extend\Exception as AppwriteException; use Appwrite\Network\Cors; @@ -35,6 +35,7 @@ use Executor\Executor; use MaxMind\Db\Reader; use Swoole\Http\Request as SwooleRequest; use Swoole\Table; +use Utopia\Bus\Bus; use Utopia\Config\Config; use Utopia\Console; use Utopia\Database\Database; @@ -62,7 +63,7 @@ Config::setParam('domainVerification', false); Config::setParam('cookieDomain', 'localhost'); Config::setParam('cookieSamesite', Response::COOKIE_SAMESITE_NONE); -function router(Http $utopia, Database $dbForPlatform, callable $getProjectDB, SwooleRequest $swooleRequest, Request $request, Response $response, Log $log, Event $queueForEvents, StatsUsage $queueForStatsUsage, Execution $queueForExecutions, Executor $executor, Reader $geodb, callable $isResourceBlocked, array $platform, string $previewHostname, Authorization $authorization, ?Key $apiKey, DeleteEvent $queueForDeletes, int $executionsRetentionCount) +function router(Http $utopia, Database $dbForPlatform, callable $getProjectDB, SwooleRequest $swooleRequest, Request $request, Response $response, Log $log, Event $queueForEvents, StatsUsage $queueForStatsUsage, Bus $bus, Executor $executor, Reader $geodb, callable $isResourceBlocked, array $platform, string $previewHostname, Authorization $authorization, ?Key $apiKey, DeleteEvent $queueForDeletes, int $executionsRetentionCount) { $host = $request->getHostname() ?? ''; if (!empty($previewHostname)) { @@ -706,10 +707,12 @@ function router(Http $utopia, Database $dbForPlatform, callable $getProjectDB, S } } finally { if ($type === 'function' || $type === 'site') { - $queueForExecutions - ->setExecution($execution) - ->setProject($project) - ->trigger(); + $bus->dispatch(new ExecutionCompleted( + execution: $execution->getArrayCopy(), + project: $project->getArrayCopy(), + spec: $spec, + resource: $resource->getArrayCopy(), + )); } } @@ -766,31 +769,11 @@ function router(Http $utopia, Database $dbForPlatform, callable $getProjectDB, S } } - $metricTypeExecutions = str_replace(['{resourceType}'], [$deployment->getAttribute('resourceType')], METRIC_RESOURCE_TYPE_EXECUTIONS); - $metricTypeIdExecutions = str_replace(['{resourceType}', '{resourceInternalId}'], [$deployment->getAttribute('resourceType'), $resource->getSequence()], METRIC_RESOURCE_TYPE_ID_EXECUTIONS); - $metricTypeExecutionsCompute = str_replace(['{resourceType}'], [$deployment->getAttribute('resourceType')], METRIC_RESOURCE_TYPE_EXECUTIONS_COMPUTE); - $metricTypeIdExecutionsCompute = str_replace(['{resourceType}', '{resourceInternalId}'], [$deployment->getAttribute('resourceType'), $resource->getSequence()], METRIC_RESOURCE_TYPE_ID_EXECUTIONS_COMPUTE); - $metricTypeExecutionsMbSeconds = str_replace(['{resourceType}'], [$deployment->getAttribute('resourceType')], METRIC_RESOURCE_TYPE_EXECUTIONS_MB_SECONDS); - $metricTypeIdExecutionsMBSeconds = str_replace(['{resourceType}', '{resourceInternalId}'], [$deployment->getAttribute('resourceType'), $resource->getSequence()], METRIC_RESOURCE_TYPE_ID_EXECUTIONS_MB_SECONDS); if ($deployment->getAttribute('resourceType') === 'sites') { $queueForStatsUsage ->disableMetric(METRIC_NETWORK_REQUESTS) ->disableMetric(METRIC_NETWORK_INBOUND) - ->disableMetric(METRIC_NETWORK_OUTBOUND); - if ($resource->getAttribute('adapter') !== 'ssr') { - $queueForStatsUsage - ->disableMetric(METRIC_EXECUTIONS) - ->disableMetric(METRIC_EXECUTIONS_COMPUTE) - ->disableMetric(METRIC_EXECUTIONS_MB_SECONDS) - ->disableMetric($metricTypeExecutions) - ->disableMetric($metricTypeIdExecutions) - ->disableMetric($metricTypeExecutionsCompute) - ->disableMetric($metricTypeIdExecutionsCompute) - ->disableMetric($metricTypeExecutionsMbSeconds) - ->disableMetric($metricTypeIdExecutionsMBSeconds); - } - - $queueForStatsUsage + ->disableMetric(METRIC_NETWORK_OUTBOUND) ->addMetric(METRIC_SITES_REQUESTS, 1) ->addMetric(METRIC_SITES_INBOUND, $request->getSize() + $fileSize) ->addMetric(METRIC_SITES_OUTBOUND, $response->getSize()) @@ -800,22 +783,10 @@ function router(Http $utopia, Database $dbForPlatform, callable $getProjectDB, S ; } - - $compute = (int)($execution->getAttribute('duration') * 1000); - $mbSeconds = (int)(($spec['memory'] ?? APP_COMPUTE_MEMORY_DEFAULT) * $execution->getAttribute('duration', 0) * ($spec['cpus'] ?? APP_COMPUTE_CPUS_DEFAULT)); $queueForStatsUsage ->addMetric(METRIC_NETWORK_REQUESTS, 1) ->addMetric(METRIC_NETWORK_INBOUND, $request->getSize() + $fileSize) ->addMetric(METRIC_NETWORK_OUTBOUND, $response->getSize()) - ->addMetric(METRIC_EXECUTIONS, 1) - ->addMetric($metricTypeExecutions, 1) - ->addMetric($metricTypeIdExecutions, 1) - ->addMetric(METRIC_EXECUTIONS_COMPUTE, $compute) // per project - ->addMetric($metricTypeExecutionsCompute, $compute) // per function - ->addMetric($metricTypeIdExecutionsCompute, $compute) // per function - ->addMetric(METRIC_EXECUTIONS_MB_SECONDS, $mbSeconds) - ->addMetric($metricTypeExecutionsMbSeconds, $mbSeconds) - ->addMetric($metricTypeIdExecutionsMBSeconds, $mbSeconds) ->setProject($project) ->trigger(); @@ -883,7 +854,7 @@ Http::init() ->inject('geodb') ->inject('queueForStatsUsage') ->inject('queueForEvents') - ->inject('queueForExecutions') + ->inject('bus') ->inject('executor') ->inject('platform') ->inject('isResourceBlocked') @@ -894,7 +865,7 @@ Http::init() ->inject('authorization') ->inject('queueForDeletes') ->inject('executionsRetentionCount') - ->action(function (Http $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Log $log, Document $project, Database $dbForPlatform, callable $getProjectDB, Locale $locale, array $localeCodes, Reader $geodb, StatsUsage $queueForStatsUsage, Event $queueForEvents, Execution $queueForExecutions, Executor $executor, array $platform, callable $isResourceBlocked, string $previewHostname, Document $devKey, ?Key $apiKey, Cors $cors, Authorization $authorization, DeleteEvent $queueForDeletes, int $executionsRetentionCount) { + ->action(function (Http $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Log $log, Document $project, Database $dbForPlatform, callable $getProjectDB, Locale $locale, array $localeCodes, Reader $geodb, StatsUsage $queueForStatsUsage, Event $queueForEvents, Bus $bus, Executor $executor, array $platform, callable $isResourceBlocked, string $previewHostname, Document $devKey, ?Key $apiKey, Cors $cors, Authorization $authorization, DeleteEvent $queueForDeletes, int $executionsRetentionCount) { /* * Appwrite Router */ @@ -902,7 +873,7 @@ Http::init() $platformHostnames = $platform['hostnames'] ?? []; // Only run Router when external domain if (!\in_array($hostname, $platformHostnames) || !empty($previewHostname)) { - if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $log, $queueForEvents, $queueForStatsUsage, $queueForExecutions, $executor, $geodb, $isResourceBlocked, $platform, $previewHostname, $authorization, $apiKey, $queueForDeletes, $executionsRetentionCount)) { + if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $log, $queueForEvents, $queueForStatsUsage, $bus, $executor, $geodb, $isResourceBlocked, $platform, $previewHostname, $authorization, $apiKey, $queueForDeletes, $executionsRetentionCount)) { $utopia->getRoute()?->label('router', true); } } @@ -1179,7 +1150,7 @@ Http::options() ->inject('getProjectDB') ->inject('queueForEvents') ->inject('queueForStatsUsage') - ->inject('queueForExecutions') + ->inject('bus') ->inject('executor') ->inject('geodb') ->inject('isResourceBlocked') @@ -1192,14 +1163,14 @@ Http::options() ->inject('authorization') ->inject('queueForDeletes') ->inject('executionsRetentionCount') - ->action(function (Http $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Log $log, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, StatsUsage $queueForStatsUsage, Execution $queueForExecutions, Executor $executor, Reader $geodb, callable $isResourceBlocked, array $platform, string $previewHostname, Document $project, Document $devKey, ?Key $apiKey, Cors $cors, Authorization $authorization, DeleteEvent $queueForDeletes, int $executionsRetentionCount) { + ->action(function (Http $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Log $log, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, StatsUsage $queueForStatsUsage, Bus $bus, Executor $executor, Reader $geodb, callable $isResourceBlocked, array $platform, string $previewHostname, Document $project, Document $devKey, ?Key $apiKey, Cors $cors, Authorization $authorization, DeleteEvent $queueForDeletes, int $executionsRetentionCount) { /* * Appwrite Router */ $platformHostnames = $platform['hostnames'] ?? []; // Only run Router when external domain if (!in_array($request->getHostname(), $platformHostnames) || !empty($previewHostname)) { - if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $log, $queueForEvents, $queueForStatsUsage, $queueForExecutions, $executor, $geodb, $isResourceBlocked, $platform, $previewHostname, $authorization, $apiKey, $queueForDeletes, $executionsRetentionCount)) { + if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $log, $queueForEvents, $queueForStatsUsage, $bus, $executor, $geodb, $isResourceBlocked, $platform, $previewHostname, $authorization, $apiKey, $queueForDeletes, $executionsRetentionCount)) { $utopia->getRoute()?->label('router', true); } } @@ -1569,7 +1540,7 @@ Http::get('/robots.txt') ->inject('getProjectDB') ->inject('queueForEvents') ->inject('queueForStatsUsage') - ->inject('queueForExecutions') + ->inject('bus') ->inject('executor') ->inject('geodb') ->inject('isResourceBlocked') @@ -1579,13 +1550,13 @@ Http::get('/robots.txt') ->inject('authorization') ->inject('queueForDeletes') ->inject('executionsRetentionCount') - ->action(function (Http $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Log $log, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, StatsUsage $queueForStatsUsage, Execution $queueForExecutions, Executor $executor, Reader $geodb, callable $isResourceBlocked, array $platform, string $previewHostname, ?Key $apiKey, Authorization $authorization, DeleteEvent $queueForDeletes, int $executionsRetentionCount) { + ->action(function (Http $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Log $log, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, StatsUsage $queueForStatsUsage, Bus $bus, Executor $executor, Reader $geodb, callable $isResourceBlocked, array $platform, string $previewHostname, ?Key $apiKey, Authorization $authorization, DeleteEvent $queueForDeletes, int $executionsRetentionCount) { $platformHostnames = $platform['hostnames'] ?? []; if (in_array($request->getHostname(), $platformHostnames) || !empty($previewHostname)) { $template = new View(__DIR__ . '/../views/general/robots.phtml'); $response->text($template->render(false)); } else { - if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $log, $queueForEvents, $queueForStatsUsage, $queueForExecutions, $executor, $geodb, $isResourceBlocked, $platform, $previewHostname, $authorization, $apiKey, $queueForDeletes, $executionsRetentionCount)) { + if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $log, $queueForEvents, $queueForStatsUsage, $bus, $executor, $geodb, $isResourceBlocked, $platform, $previewHostname, $authorization, $apiKey, $queueForDeletes, $executionsRetentionCount)) { $utopia->getRoute()?->label('router', true); } } @@ -1604,7 +1575,7 @@ Http::get('/humans.txt') ->inject('getProjectDB') ->inject('queueForEvents') ->inject('queueForStatsUsage') - ->inject('queueForExecutions') + ->inject('bus') ->inject('executor') ->inject('geodb') ->inject('isResourceBlocked') @@ -1614,13 +1585,13 @@ Http::get('/humans.txt') ->inject('authorization') ->inject('queueForDeletes') ->inject('executionsRetentionCount') - ->action(function (Http $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Log $log, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, StatsUsage $queueForStatsUsage, Execution $queueForExecutions, Executor $executor, Reader $geodb, callable $isResourceBlocked, array $platform, string $previewHostname, ?Key $apiKey, Authorization $authorization, DeleteEvent $queueForDeletes, int $executionsRetentionCount) { + ->action(function (Http $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Log $log, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, StatsUsage $queueForStatsUsage, Bus $bus, Executor $executor, Reader $geodb, callable $isResourceBlocked, array $platform, string $previewHostname, ?Key $apiKey, Authorization $authorization, DeleteEvent $queueForDeletes, int $executionsRetentionCount) { $platformHostnames = $platform['hostnames'] ?? []; if (in_array($request->getHostname(), $platformHostnames) || !empty($previewHostname)) { $template = new View(__DIR__ . '/../views/general/humans.phtml'); $response->text($template->render(false)); } else { - if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $log, $queueForEvents, $queueForStatsUsage, $queueForExecutions, $executor, $geodb, $isResourceBlocked, $platform, $previewHostname, $authorization, $apiKey, $queueForDeletes, $executionsRetentionCount)) { + if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $log, $queueForEvents, $queueForStatsUsage, $bus, $executor, $geodb, $isResourceBlocked, $platform, $previewHostname, $authorization, $apiKey, $queueForDeletes, $executionsRetentionCount)) { $utopia->getRoute()?->label('router', true); } } diff --git a/app/http.php b/app/http.php index 30c4a8317b..7f771de130 100644 --- a/app/http.php +++ b/app/http.php @@ -188,6 +188,10 @@ $http->on(Constant::EVENT_AFTER_RELOAD, function ($server) { Console::success('Reload completed...'); }); +Http::setResource('bus', function ($register, $utopia) { + return $register->get('bus')->setResolver(fn (string $name) => $utopia->getResource($name)); +}, ['register', 'utopia']); + include __DIR__ . '/controllers/general.php'; function createDatabase(Http $app, string $resourceKey, string $dbName, array $collections, mixed $pools, ?callable $extraSetup = null): void diff --git a/app/init/registers.php b/app/init/registers.php index 411fd4c69d..26a9329270 100644 --- a/app/init/registers.php +++ b/app/init/registers.php @@ -449,3 +449,11 @@ $register->set('promiseAdapter', function () { $register->set('hooks', function () { return new Hooks(); }); +$listeners = require __DIR__ . '/../listeners.php'; +$register->set('bus', function () use ($listeners) { + $bus = new \Utopia\Bus\Bus(); + foreach ($listeners as $listener) { + $bus->subscribe($listener); + } + return $bus; +}); diff --git a/app/init/resources.php b/app/init/resources.php index f3e66c388a..36aadc9707 100644 --- a/app/init/resources.php +++ b/app/init/resources.php @@ -10,7 +10,6 @@ use Appwrite\Event\Certificate; use Appwrite\Event\Database as EventDatabase; use Appwrite\Event\Delete; use Appwrite\Event\Event; -use Appwrite\Event\Execution; use Appwrite\Event\Func; use Appwrite\Event\Mail; use Appwrite\Event\Messaging; @@ -160,9 +159,6 @@ Http::setResource('queueForAudits', function (Publisher $publisher) { Http::setResource('queueForFunctions', function (Publisher $publisher) { return new Func($publisher); }, ['publisher']); -Http::setResource('queueForExecutions', function (Publisher $publisher) { - return new Execution($publisher); -}, ['publisher']); Http::setResource('eventProcessor', function () { return new EventProcessor(); }, []); diff --git a/app/listeners.php b/app/listeners.php new file mode 100644 index 0000000000..714c255974 --- /dev/null +++ b/app/listeners.php @@ -0,0 +1,9 @@ +getWorker(); +Server::setResource('bus', function ($register) use ($worker) { + return $register->get('bus')->setResolver(fn (string $name) => $worker->getResource($name)); +}, ['register']); + $worker ->error() ->inject('error') diff --git a/composer.json b/composer.json index 3dd24aa29f..1422bd5d0a 100644 --- a/composer.json +++ b/composer.json @@ -19,7 +19,8 @@ "autoload": { "psr-4": { "Appwrite\\": "src/Appwrite", - "Executor\\": "src/Executor" + "Executor\\": "src/Executor", + "Utopia\\Bus\\": "src/Utopia/Bus" } }, "autoload-dev": { diff --git a/phpstan.neon b/phpstan.neon index 153b3be21c..90f28e7539 100644 --- a/phpstan.neon +++ b/phpstan.neon @@ -1,7 +1,11 @@ parameters: level: 8 paths: + - src/Utopia/Bus + - src/Appwrite/Bus - src/Appwrite/Transformation + bootstrapFiles: + - app/init/constants.php scanDirectories: - vendor/swoole/ide-helper excludePaths: diff --git a/src/Appwrite/Bus/ExecutionCompleted.php b/src/Appwrite/Bus/ExecutionCompleted.php new file mode 100644 index 0000000000..1508266ea0 --- /dev/null +++ b/src/Appwrite/Bus/ExecutionCompleted.php @@ -0,0 +1,22 @@ + $execution + * @param array $project + * @param array $spec + * @param array $resource + */ + public function __construct( + public readonly array $execution, + public readonly array $project, + public readonly array $spec = [], + public readonly array $resource = [], + ) { + } +} diff --git a/src/Appwrite/Bus/Listeners/Log.php b/src/Appwrite/Bus/Listeners/Log.php new file mode 100644 index 0000000000..12ca7303c8 --- /dev/null +++ b/src/Appwrite/Bus/Listeners/Log.php @@ -0,0 +1,39 @@ +desc('Persists execution logs to database via queue') + ->inject('publisher') + ->callback($this->handle(...)); + } + + public function handle(ExecutionCompleted $event, Publisher $publisher): void + { + $queueForExecutions = new Execution($publisher); + $queueForExecutions + ->setExecution(new Document($event->execution)) + ->setProject(new Document($event->project)) + ->trigger(); + } +} diff --git a/src/Appwrite/Bus/Listeners/Usage.php b/src/Appwrite/Bus/Listeners/Usage.php new file mode 100644 index 0000000000..e91bd910f5 --- /dev/null +++ b/src/Appwrite/Bus/Listeners/Usage.php @@ -0,0 +1,64 @@ +desc('Records execution usage metrics') + ->inject('publisher') + ->callback($this->handle(...)); + } + + public function handle(ExecutionCompleted $event, Publisher $publisher): void + { + $execution = new Document($event->execution); + $resource = new Document($event->resource); + + // Non-SSR sites don't record execution metrics + if ($execution->getAttribute('resourceType') === 'sites' && $resource->getAttribute('adapter') !== 'ssr') { + return; + } + $project = new Document($event->project); + $spec = $event->spec; + + $resourceType = $execution->getAttribute('resourceType', ''); + $resourceInternalId = $execution->getAttribute('resourceInternalId', ''); + $duration = $execution->getAttribute('duration', 0); + + $compute = (int)($duration * 1000); + $mbSeconds = (int)(($spec['memory'] ?? APP_COMPUTE_MEMORY_DEFAULT) * $duration * ($spec['cpus'] ?? APP_COMPUTE_CPUS_DEFAULT)); + + $queueForStatsUsage = new StatsUsage($publisher); + $queueForStatsUsage + ->setProject($project) + ->addMetric(METRIC_EXECUTIONS, 1) + ->addMetric(str_replace(['{resourceType}'], [$resourceType], METRIC_RESOURCE_TYPE_EXECUTIONS), 1) + ->addMetric(str_replace(['{resourceType}', '{resourceInternalId}'], [$resourceType, $resourceInternalId], METRIC_RESOURCE_TYPE_ID_EXECUTIONS), 1) + ->addMetric(METRIC_EXECUTIONS_COMPUTE, $compute) + ->addMetric(str_replace(['{resourceType}'], [$resourceType], METRIC_RESOURCE_TYPE_EXECUTIONS_COMPUTE), $compute) + ->addMetric(str_replace(['{resourceType}', '{resourceInternalId}'], [$resourceType, $resourceInternalId], METRIC_RESOURCE_TYPE_ID_EXECUTIONS_COMPUTE), $compute) + ->addMetric(METRIC_EXECUTIONS_MB_SECONDS, $mbSeconds) + ->addMetric(str_replace(['{resourceType}'], [$resourceType], METRIC_RESOURCE_TYPE_EXECUTIONS_MB_SECONDS), $mbSeconds) + ->addMetric(str_replace(['{resourceType}', '{resourceInternalId}'], [$resourceType, $resourceInternalId], METRIC_RESOURCE_TYPE_ID_EXECUTIONS_MB_SECONDS), $mbSeconds) + ->trigger(); + } +} diff --git a/src/Appwrite/Platform/Workers/Functions.php b/src/Appwrite/Platform/Workers/Functions.php index 0932aea335..6c59bc8da0 100644 --- a/src/Appwrite/Platform/Workers/Functions.php +++ b/src/Appwrite/Platform/Workers/Functions.php @@ -3,15 +3,15 @@ namespace Appwrite\Platform\Workers; use Ahc\Jwt\JWT; +use Appwrite\Bus\ExecutionCompleted; use Appwrite\Event\Event; -use Appwrite\Event\Execution as ExecutionEvent; use Appwrite\Event\Func; use Appwrite\Event\Realtime; -use Appwrite\Event\StatsUsage; use Appwrite\Event\Webhook; use Appwrite\Extend\Exception as AppwriteException; use Appwrite\Utopia\Response\Model\Execution; use Executor\Executor; +use Utopia\Bus\Bus; use Utopia\Config\Config; use Utopia\Console; use Utopia\Database\Database; @@ -47,8 +47,7 @@ class Functions extends Action ->inject('queueForFunctions') ->inject('queueForRealtime') ->inject('queueForEvents') - ->inject('queueForStatsUsage') - ->inject('queueForExecutions') + ->inject('bus') ->inject('log') ->inject('executor') ->inject('isResourceBlocked') @@ -63,8 +62,7 @@ class Functions extends Action Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, - StatsUsage $queueForStatsUsage, - ExecutionEvent $queueForExecutions, + Bus $bus, Log $log, Executor $executor, callable $isResourceBlocked @@ -158,9 +156,8 @@ class Functions extends Action queueForWebhooks: $queueForWebhooks, queueForFunctions: $queueForFunctions, queueForRealtime: $queueForRealtime, - queueForStatsUsage: $queueForStatsUsage, queueForEvents: $queueForEvents, - queueForExecutions: $queueForExecutions, + bus: $bus, project: $project, function: $function, executor: $executor, @@ -203,9 +200,8 @@ class Functions extends Action queueForWebhooks: $queueForWebhooks, queueForFunctions: $queueForFunctions, queueForRealtime: $queueForRealtime, - queueForStatsUsage: $queueForStatsUsage, queueForEvents: $queueForEvents, - queueForExecutions: $queueForExecutions, + bus: $bus, project: $project, function: $function, executor: $executor, @@ -230,9 +226,8 @@ class Functions extends Action queueForWebhooks: $queueForWebhooks, queueForFunctions: $queueForFunctions, queueForRealtime: $queueForRealtime, - queueForStatsUsage: $queueForStatsUsage, queueForEvents: $queueForEvents, - queueForExecutions: $queueForExecutions, + bus: $bus, project: $project, function: $function, executor: $executor, @@ -266,7 +261,7 @@ class Functions extends Action private function fail( string $message, Document $project, - ExecutionEvent $queueForExecutions, + Bus $bus, Document $function, string $trigger, string $path, @@ -309,10 +304,10 @@ class Functions extends Action 'duration' => 0.0, ]); - $queueForExecutions - ->setExecution($execution) - ->setProject($project) - ->trigger(); + $bus->dispatch(new ExecutionCompleted( + execution: $execution->getArrayCopy(), + project: $project->getArrayCopy(), + )); } /** @@ -320,7 +315,6 @@ class Functions extends Action * @param Database $dbForProject * @param Func $queueForFunctions * @param Realtime $queueForRealtime - * @param StatsUsage $queueForStatsUsage * @param Event $queueForEvents * @param Document $project * @param Document $function @@ -343,9 +337,8 @@ class Functions extends Action Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, - StatsUsage $queueForStatsUsage, Event $queueForEvents, - ExecutionEvent $queueForExecutions, + Bus $bus, Document $project, Document $function, Executor $executor, @@ -373,19 +366,19 @@ class Functions extends Action if ($deployment->getAttribute('resourceId') !== $functionId) { $errorMessage = 'The execution could not be completed because a corresponding deployment was not found. A function deployment needs to be created before it can be executed. Please create a deployment for your function and try again.'; - $this->fail($errorMessage, $project, $queueForExecutions, $function, $trigger, $path, $method, $user, $jwt, $event); + $this->fail($errorMessage, $project, $bus, $function, $trigger, $path, $method, $user, $jwt, $event); return; } if ($deployment->isEmpty()) { $errorMessage = 'The execution could not be completed because a corresponding deployment was not found. A function deployment needs to be created before it can be executed. Please create a deployment for your function and try again.'; - $this->fail($errorMessage, $project, $queueForExecutions, $function, $trigger, $path, $method, $user, $jwt, $event); + $this->fail($errorMessage, $project, $bus, $function, $trigger, $path, $method, $user, $jwt, $event); return; } if ($deployment->getAttribute('status') !== 'ready') { $errorMessage = 'The execution could not be completed because the build is not ready. Please wait for the build to complete and try again.'; - $this->fail($errorMessage, $project, $queueForExecutions, $function, $trigger, $path, $method, $user, $jwt, $event); + $this->fail($errorMessage, $project, $bus, $function, $trigger, $path, $method, $user, $jwt, $event); return; } @@ -592,26 +585,12 @@ class Functions extends Action $error = $th->getMessage(); $errorCode = $th->getCode(); } finally { - /** Persist final execution status */ - $queueForExecutions - ->setExecution($execution) - ->setProject($project) - ->trigger(); - - /** Trigger usage queue */ - $queueForStatsUsage - ->setProject($project) - ->addMetric(METRIC_EXECUTIONS, 1) - ->addMetric(str_replace(['{resourceType}'], [RESOURCE_TYPE_FUNCTIONS], METRIC_RESOURCE_TYPE_EXECUTIONS), 1) - ->addMetric(str_replace(['{resourceType}', '{resourceInternalId}'], [RESOURCE_TYPE_FUNCTIONS, $function->getSequence()], METRIC_RESOURCE_TYPE_ID_EXECUTIONS), 1) - ->addMetric(METRIC_EXECUTIONS_COMPUTE, (int)($execution->getAttribute('duration') * 1000))// per project - ->addMetric(str_replace(['{resourceType}'], [RESOURCE_TYPE_FUNCTIONS], METRIC_RESOURCE_TYPE_EXECUTIONS_COMPUTE), (int)($execution->getAttribute('duration') * 1000)) - ->addMetric(str_replace(['{resourceType}', '{resourceInternalId}'], [RESOURCE_TYPE_FUNCTIONS, $function->getSequence()], METRIC_RESOURCE_TYPE_ID_EXECUTIONS_COMPUTE), (int)($execution->getAttribute('duration') * 1000)) - ->addMetric(METRIC_EXECUTIONS_MB_SECONDS, (int)(($spec['memory'] ?? APP_COMPUTE_MEMORY_DEFAULT) * $execution->getAttribute('duration', 0) * ($spec['cpus'] ?? APP_COMPUTE_CPUS_DEFAULT))) - ->addMetric(str_replace(['{resourceType}'], [RESOURCE_TYPE_FUNCTIONS], METRIC_RESOURCE_TYPE_EXECUTIONS_MB_SECONDS), (int)(($spec['memory'] ?? APP_COMPUTE_MEMORY_DEFAULT) * $execution->getAttribute('duration', 0) * ($spec['cpus'] ?? APP_COMPUTE_CPUS_DEFAULT))) - ->addMetric(str_replace(['{resourceType}', '{resourceInternalId}'], [RESOURCE_TYPE_FUNCTIONS, $function->getSequence()], METRIC_RESOURCE_TYPE_ID_EXECUTIONS_MB_SECONDS), (int)(($spec['memory'] ?? APP_COMPUTE_MEMORY_DEFAULT) * $execution->getAttribute('duration', 0) * ($spec['cpus'] ?? APP_COMPUTE_CPUS_DEFAULT))) - ->trigger() - ; + /** Persist final execution status and record usage */ + $bus->dispatch(new ExecutionCompleted( + execution: $execution->getArrayCopy(), + project: $project->getArrayCopy(), + spec: $spec, + )); } $executionModel = new Execution(); diff --git a/src/Utopia/Bus/Bus.php b/src/Utopia/Bus/Bus.php new file mode 100644 index 0000000000..8c4d76958d --- /dev/null +++ b/src/Utopia/Bus/Bus.php @@ -0,0 +1,59 @@ +, Listener[]> */ + private array $listeners = []; + + /** @var ?\Closure(string): mixed */ + private ?\Closure $resolver = null; + + public function setResolver(callable $resolver): self + { + $this->resolver = $resolver(...); + return $this; + } + + public function subscribe(Listener $listener): self + { + foreach ($listener::getEvents() as $event) { + $this->listeners[$event][] = $listener; + } + return $this; + } + + public function dispatch(Event $event): void + { + if ($this->resolver === null) { + throw new \LogicException('Bus resolver must be set via setResolver() before dispatching events'); + } + + $resolver = $this->resolver; + $listeners = $this->listeners[$event::class] ?? []; + + /** @var array}> $resolved */ + $resolved = []; + foreach ($listeners as $listener) { + $deps = array_map($resolver, $listener->getInjections()); + $resolved[] = [$listener, $deps]; + } + + go(function () use ($resolved, $event) { + foreach ($resolved as [$listener, $deps]) { + $action = 'listener.' . $listener::getName(); + Span::init($action); + try { + ($listener->getCallback())($event, ...$deps); + } catch (\Throwable $e) { + Span::error($e); + } finally { + Span::current()?->finish(); + } + } + }); + } +} diff --git a/src/Utopia/Bus/Event.php b/src/Utopia/Bus/Event.php new file mode 100644 index 0000000000..1423f1198d --- /dev/null +++ b/src/Utopia/Bus/Event.php @@ -0,0 +1,7 @@ + */ + protected array $injections = []; + protected ?\Closure $callback = null; + + abstract public static function getName(): string; + + /** + * @return array> + */ + abstract public static function getEvents(): array; + + protected function desc(string $desc): self + { + $this->desc = $desc; + return $this; + } + + protected function inject(string $injection): self + { + $this->injections[] = $injection; + return $this; + } + + protected function callback(callable $callback): self + { + $this->callback = $callback(...); + return $this; + } + + /** @return array */ + public function getInjections(): array + { + return $this->injections; + } + + public function getCallback(): callable + { + if ($this->callback === null) { + throw new \LogicException(static::class . ' must set a callback via $this->callback()'); + } + + return $this->callback; + } +}