diff --git a/app/cli.php b/app/cli.php index 0f8426afd9..052643f004 100644 --- a/app/cli.php +++ b/app/cli.php @@ -318,6 +318,10 @@ $setResource('logError', function (Registry $register) { $setResource('executor', fn () => new Executor(), []); +$setResource('bus', function (Registry $register) use ($cli) { + return $register->get('bus')->setResolver(fn (string $name) => $cli->getResource($name)); +}, ['register']); + $setResource('telemetry', fn () => new NoTelemetry(), []); $cli diff --git a/app/controllers/general.php b/app/controllers/general.php index 2ac03368df..5f1a5e5d9d 100644 --- a/app/controllers/general.php +++ b/app/controllers/general.php @@ -5,10 +5,10 @@ require_once __DIR__ . '/../init.php'; use Ahc\Jwt\JWT; use Ahc\Jwt\JWTException; use Appwrite\Auth\Key; +use Appwrite\Bus\ExecutionCompleted; use Appwrite\Event\Certificate; use Appwrite\Event\Delete as DeleteEvent; use Appwrite\Event\Event; -use Appwrite\Event\Execution; use Appwrite\Event\StatsUsage; use Appwrite\Extend\Exception as AppwriteException; use Appwrite\Network\Cors; @@ -35,6 +35,7 @@ use Executor\Executor; use MaxMind\Db\Reader; use Swoole\Http\Request as SwooleRequest; use Swoole\Table; +use Utopia\Bus\Bus; use Utopia\Config\Config; use Utopia\Console; use Utopia\Database\Database; @@ -62,7 +63,7 @@ Config::setParam('domainVerification', false); Config::setParam('cookieDomain', 'localhost'); Config::setParam('cookieSamesite', Response::COOKIE_SAMESITE_NONE); -function router(Http $utopia, Database $dbForPlatform, callable $getProjectDB, SwooleRequest $swooleRequest, Request $request, Response $response, Log $log, Event $queueForEvents, StatsUsage $queueForStatsUsage, Execution $queueForExecutions, Executor $executor, Reader $geodb, callable $isResourceBlocked, array $platform, string $previewHostname, Authorization $authorization, ?Key $apiKey, DeleteEvent $queueForDeletes, int $executionsRetentionCount) +function router(Http $utopia, Database $dbForPlatform, callable $getProjectDB, SwooleRequest $swooleRequest, Request $request, Response $response, Log $log, Event $queueForEvents, StatsUsage $queueForStatsUsage, Bus $bus, Executor $executor, Reader $geodb, callable $isResourceBlocked, array $platform, string $previewHostname, Authorization $authorization, ?Key $apiKey, DeleteEvent $queueForDeletes, int $executionsRetentionCount) { $host = $request->getHostname() ?? ''; if (!empty($previewHostname)) { @@ -706,10 +707,12 @@ function router(Http $utopia, Database $dbForPlatform, callable $getProjectDB, S } } finally { if ($type === 'function' || $type === 'site') { - $queueForExecutions - ->setExecution($execution) - ->setProject($project) - ->trigger(); + $bus->dispatch(new ExecutionCompleted( + execution: $execution->getArrayCopy(), + project: $project->getArrayCopy(), + spec: $spec, + resource: $resource->getArrayCopy(), + )); } } @@ -766,31 +769,11 @@ function router(Http $utopia, Database $dbForPlatform, callable $getProjectDB, S } } - $metricTypeExecutions = str_replace(['{resourceType}'], [$deployment->getAttribute('resourceType')], METRIC_RESOURCE_TYPE_EXECUTIONS); - $metricTypeIdExecutions = str_replace(['{resourceType}', '{resourceInternalId}'], [$deployment->getAttribute('resourceType'), $resource->getSequence()], METRIC_RESOURCE_TYPE_ID_EXECUTIONS); - $metricTypeExecutionsCompute = str_replace(['{resourceType}'], [$deployment->getAttribute('resourceType')], METRIC_RESOURCE_TYPE_EXECUTIONS_COMPUTE); - $metricTypeIdExecutionsCompute = str_replace(['{resourceType}', '{resourceInternalId}'], [$deployment->getAttribute('resourceType'), $resource->getSequence()], METRIC_RESOURCE_TYPE_ID_EXECUTIONS_COMPUTE); - $metricTypeExecutionsMbSeconds = str_replace(['{resourceType}'], [$deployment->getAttribute('resourceType')], METRIC_RESOURCE_TYPE_EXECUTIONS_MB_SECONDS); - $metricTypeIdExecutionsMBSeconds = str_replace(['{resourceType}', '{resourceInternalId}'], [$deployment->getAttribute('resourceType'), $resource->getSequence()], METRIC_RESOURCE_TYPE_ID_EXECUTIONS_MB_SECONDS); if ($deployment->getAttribute('resourceType') === 'sites') { $queueForStatsUsage ->disableMetric(METRIC_NETWORK_REQUESTS) ->disableMetric(METRIC_NETWORK_INBOUND) - ->disableMetric(METRIC_NETWORK_OUTBOUND); - if ($resource->getAttribute('adapter') !== 'ssr') { - $queueForStatsUsage - ->disableMetric(METRIC_EXECUTIONS) - ->disableMetric(METRIC_EXECUTIONS_COMPUTE) - ->disableMetric(METRIC_EXECUTIONS_MB_SECONDS) - ->disableMetric($metricTypeExecutions) - ->disableMetric($metricTypeIdExecutions) - ->disableMetric($metricTypeExecutionsCompute) - ->disableMetric($metricTypeIdExecutionsCompute) - ->disableMetric($metricTypeExecutionsMbSeconds) - ->disableMetric($metricTypeIdExecutionsMBSeconds); - } - - $queueForStatsUsage + ->disableMetric(METRIC_NETWORK_OUTBOUND) ->addMetric(METRIC_SITES_REQUESTS, 1) ->addMetric(METRIC_SITES_INBOUND, $request->getSize() + $fileSize) ->addMetric(METRIC_SITES_OUTBOUND, $response->getSize()) @@ -800,22 +783,10 @@ function router(Http $utopia, Database $dbForPlatform, callable $getProjectDB, S ; } - - $compute = (int)($execution->getAttribute('duration') * 1000); - $mbSeconds = (int)(($spec['memory'] ?? APP_COMPUTE_MEMORY_DEFAULT) * $execution->getAttribute('duration', 0) * ($spec['cpus'] ?? APP_COMPUTE_CPUS_DEFAULT)); $queueForStatsUsage ->addMetric(METRIC_NETWORK_REQUESTS, 1) ->addMetric(METRIC_NETWORK_INBOUND, $request->getSize() + $fileSize) ->addMetric(METRIC_NETWORK_OUTBOUND, $response->getSize()) - ->addMetric(METRIC_EXECUTIONS, 1) - ->addMetric($metricTypeExecutions, 1) - ->addMetric($metricTypeIdExecutions, 1) - ->addMetric(METRIC_EXECUTIONS_COMPUTE, $compute) // per project - ->addMetric($metricTypeExecutionsCompute, $compute) // per function - ->addMetric($metricTypeIdExecutionsCompute, $compute) // per function - ->addMetric(METRIC_EXECUTIONS_MB_SECONDS, $mbSeconds) - ->addMetric($metricTypeExecutionsMbSeconds, $mbSeconds) - ->addMetric($metricTypeIdExecutionsMBSeconds, $mbSeconds) ->setProject($project) ->trigger(); @@ -883,7 +854,7 @@ Http::init() ->inject('geodb') ->inject('queueForStatsUsage') ->inject('queueForEvents') - ->inject('queueForExecutions') + ->inject('bus') ->inject('executor') ->inject('platform') ->inject('isResourceBlocked') @@ -894,7 +865,7 @@ Http::init() ->inject('authorization') ->inject('queueForDeletes') ->inject('executionsRetentionCount') - ->action(function (Http $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Log $log, Document $project, Database $dbForPlatform, callable $getProjectDB, Locale $locale, array $localeCodes, Reader $geodb, StatsUsage $queueForStatsUsage, Event $queueForEvents, Execution $queueForExecutions, Executor $executor, array $platform, callable $isResourceBlocked, string $previewHostname, Document $devKey, ?Key $apiKey, Cors $cors, Authorization $authorization, DeleteEvent $queueForDeletes, int $executionsRetentionCount) { + ->action(function (Http $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Log $log, Document $project, Database $dbForPlatform, callable $getProjectDB, Locale $locale, array $localeCodes, Reader $geodb, StatsUsage $queueForStatsUsage, Event $queueForEvents, Bus $bus, Executor $executor, array $platform, callable $isResourceBlocked, string $previewHostname, Document $devKey, ?Key $apiKey, Cors $cors, Authorization $authorization, DeleteEvent $queueForDeletes, int $executionsRetentionCount) { /* * Appwrite Router */ @@ -902,7 +873,7 @@ Http::init() $platformHostnames = $platform['hostnames'] ?? []; // Only run Router when external domain if (!\in_array($hostname, $platformHostnames) || !empty($previewHostname)) { - if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $log, $queueForEvents, $queueForStatsUsage, $queueForExecutions, $executor, $geodb, $isResourceBlocked, $platform, $previewHostname, $authorization, $apiKey, $queueForDeletes, $executionsRetentionCount)) { + if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $log, $queueForEvents, $queueForStatsUsage, $bus, $executor, $geodb, $isResourceBlocked, $platform, $previewHostname, $authorization, $apiKey, $queueForDeletes, $executionsRetentionCount)) { $utopia->getRoute()?->label('router', true); } } @@ -1179,7 +1150,7 @@ Http::options() ->inject('getProjectDB') ->inject('queueForEvents') ->inject('queueForStatsUsage') - ->inject('queueForExecutions') + ->inject('bus') ->inject('executor') ->inject('geodb') ->inject('isResourceBlocked') @@ -1192,14 +1163,14 @@ Http::options() ->inject('authorization') ->inject('queueForDeletes') ->inject('executionsRetentionCount') - ->action(function (Http $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Log $log, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, StatsUsage $queueForStatsUsage, Execution $queueForExecutions, Executor $executor, Reader $geodb, callable $isResourceBlocked, array $platform, string $previewHostname, Document $project, Document $devKey, ?Key $apiKey, Cors $cors, Authorization $authorization, DeleteEvent $queueForDeletes, int $executionsRetentionCount) { + ->action(function (Http $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Log $log, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, StatsUsage $queueForStatsUsage, Bus $bus, Executor $executor, Reader $geodb, callable $isResourceBlocked, array $platform, string $previewHostname, Document $project, Document $devKey, ?Key $apiKey, Cors $cors, Authorization $authorization, DeleteEvent $queueForDeletes, int $executionsRetentionCount) { /* * Appwrite Router */ $platformHostnames = $platform['hostnames'] ?? []; // Only run Router when external domain if (!in_array($request->getHostname(), $platformHostnames) || !empty($previewHostname)) { - if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $log, $queueForEvents, $queueForStatsUsage, $queueForExecutions, $executor, $geodb, $isResourceBlocked, $platform, $previewHostname, $authorization, $apiKey, $queueForDeletes, $executionsRetentionCount)) { + if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $log, $queueForEvents, $queueForStatsUsage, $bus, $executor, $geodb, $isResourceBlocked, $platform, $previewHostname, $authorization, $apiKey, $queueForDeletes, $executionsRetentionCount)) { $utopia->getRoute()?->label('router', true); } } @@ -1569,7 +1540,7 @@ Http::get('/robots.txt') ->inject('getProjectDB') ->inject('queueForEvents') ->inject('queueForStatsUsage') - ->inject('queueForExecutions') + ->inject('bus') ->inject('executor') ->inject('geodb') ->inject('isResourceBlocked') @@ -1579,13 +1550,13 @@ Http::get('/robots.txt') ->inject('authorization') ->inject('queueForDeletes') ->inject('executionsRetentionCount') - ->action(function (Http $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Log $log, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, StatsUsage $queueForStatsUsage, Execution $queueForExecutions, Executor $executor, Reader $geodb, callable $isResourceBlocked, array $platform, string $previewHostname, ?Key $apiKey, Authorization $authorization, DeleteEvent $queueForDeletes, int $executionsRetentionCount) { + ->action(function (Http $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Log $log, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, StatsUsage $queueForStatsUsage, Bus $bus, Executor $executor, Reader $geodb, callable $isResourceBlocked, array $platform, string $previewHostname, ?Key $apiKey, Authorization $authorization, DeleteEvent $queueForDeletes, int $executionsRetentionCount) { $platformHostnames = $platform['hostnames'] ?? []; if (in_array($request->getHostname(), $platformHostnames) || !empty($previewHostname)) { $template = new View(__DIR__ . '/../views/general/robots.phtml'); $response->text($template->render(false)); } else { - if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $log, $queueForEvents, $queueForStatsUsage, $queueForExecutions, $executor, $geodb, $isResourceBlocked, $platform, $previewHostname, $authorization, $apiKey, $queueForDeletes, $executionsRetentionCount)) { + if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $log, $queueForEvents, $queueForStatsUsage, $bus, $executor, $geodb, $isResourceBlocked, $platform, $previewHostname, $authorization, $apiKey, $queueForDeletes, $executionsRetentionCount)) { $utopia->getRoute()?->label('router', true); } } @@ -1604,7 +1575,7 @@ Http::get('/humans.txt') ->inject('getProjectDB') ->inject('queueForEvents') ->inject('queueForStatsUsage') - ->inject('queueForExecutions') + ->inject('bus') ->inject('executor') ->inject('geodb') ->inject('isResourceBlocked') @@ -1614,13 +1585,13 @@ Http::get('/humans.txt') ->inject('authorization') ->inject('queueForDeletes') ->inject('executionsRetentionCount') - ->action(function (Http $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Log $log, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, StatsUsage $queueForStatsUsage, Execution $queueForExecutions, Executor $executor, Reader $geodb, callable $isResourceBlocked, array $platform, string $previewHostname, ?Key $apiKey, Authorization $authorization, DeleteEvent $queueForDeletes, int $executionsRetentionCount) { + ->action(function (Http $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Log $log, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, StatsUsage $queueForStatsUsage, Bus $bus, Executor $executor, Reader $geodb, callable $isResourceBlocked, array $platform, string $previewHostname, ?Key $apiKey, Authorization $authorization, DeleteEvent $queueForDeletes, int $executionsRetentionCount) { $platformHostnames = $platform['hostnames'] ?? []; if (in_array($request->getHostname(), $platformHostnames) || !empty($previewHostname)) { $template = new View(__DIR__ . '/../views/general/humans.phtml'); $response->text($template->render(false)); } else { - if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $log, $queueForEvents, $queueForStatsUsage, $queueForExecutions, $executor, $geodb, $isResourceBlocked, $platform, $previewHostname, $authorization, $apiKey, $queueForDeletes, $executionsRetentionCount)) { + if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $log, $queueForEvents, $queueForStatsUsage, $bus, $executor, $geodb, $isResourceBlocked, $platform, $previewHostname, $authorization, $apiKey, $queueForDeletes, $executionsRetentionCount)) { $utopia->getRoute()?->label('router', true); } } diff --git a/app/http.php b/app/http.php index 30c4a8317b..7f771de130 100644 --- a/app/http.php +++ b/app/http.php @@ -188,6 +188,10 @@ $http->on(Constant::EVENT_AFTER_RELOAD, function ($server) { Console::success('Reload completed...'); }); +Http::setResource('bus', function ($register, $utopia) { + return $register->get('bus')->setResolver(fn (string $name) => $utopia->getResource($name)); +}, ['register', 'utopia']); + include __DIR__ . '/controllers/general.php'; function createDatabase(Http $app, string $resourceKey, string $dbName, array $collections, mixed $pools, ?callable $extraSetup = null): void diff --git a/app/init/registers.php b/app/init/registers.php index 411fd4c69d..26a9329270 100644 --- a/app/init/registers.php +++ b/app/init/registers.php @@ -449,3 +449,11 @@ $register->set('promiseAdapter', function () { $register->set('hooks', function () { return new Hooks(); }); +$listeners = require __DIR__ . '/../listeners.php'; +$register->set('bus', function () use ($listeners) { + $bus = new \Utopia\Bus\Bus(); + foreach ($listeners as $listener) { + $bus->subscribe($listener); + } + return $bus; +}); diff --git a/app/init/resources.php b/app/init/resources.php index f3e66c388a..36aadc9707 100644 --- a/app/init/resources.php +++ b/app/init/resources.php @@ -10,7 +10,6 @@ use Appwrite\Event\Certificate; use Appwrite\Event\Database as EventDatabase; use Appwrite\Event\Delete; use Appwrite\Event\Event; -use Appwrite\Event\Execution; use Appwrite\Event\Func; use Appwrite\Event\Mail; use Appwrite\Event\Messaging; @@ -160,9 +159,6 @@ Http::setResource('queueForAudits', function (Publisher $publisher) { Http::setResource('queueForFunctions', function (Publisher $publisher) { return new Func($publisher); }, ['publisher']); -Http::setResource('queueForExecutions', function (Publisher $publisher) { - return new Execution($publisher); -}, ['publisher']); Http::setResource('eventProcessor', function () { return new EventProcessor(); }, []); diff --git a/app/listeners.php b/app/listeners.php new file mode 100644 index 0000000000..714c255974 --- /dev/null +++ b/app/listeners.php @@ -0,0 +1,9 @@ +getWorker(); +Server::setResource('bus', function ($register) use ($worker) { + return $register->get('bus')->setResolver(fn (string $name) => $worker->getResource($name)); +}, ['register']); + $worker ->error() ->inject('error') diff --git a/composer.json b/composer.json index 3dd24aa29f..1422bd5d0a 100644 --- a/composer.json +++ b/composer.json @@ -19,7 +19,8 @@ "autoload": { "psr-4": { "Appwrite\\": "src/Appwrite", - "Executor\\": "src/Executor" + "Executor\\": "src/Executor", + "Utopia\\Bus\\": "src/Utopia/Bus" } }, "autoload-dev": { diff --git a/phpstan.neon b/phpstan.neon index 153b3be21c..90f28e7539 100644 --- a/phpstan.neon +++ b/phpstan.neon @@ -1,7 +1,11 @@ parameters: level: 8 paths: + - src/Utopia/Bus + - src/Appwrite/Bus - src/Appwrite/Transformation + bootstrapFiles: + - app/init/constants.php scanDirectories: - vendor/swoole/ide-helper excludePaths: diff --git a/src/Appwrite/Bus/ExecutionCompleted.php b/src/Appwrite/Bus/ExecutionCompleted.php new file mode 100644 index 0000000000..1508266ea0 --- /dev/null +++ b/src/Appwrite/Bus/ExecutionCompleted.php @@ -0,0 +1,22 @@ + $execution + * @param array $project + * @param array $spec + * @param array $resource + */ + public function __construct( + public readonly array $execution, + public readonly array $project, + public readonly array $spec = [], + public readonly array $resource = [], + ) { + } +} diff --git a/src/Appwrite/Bus/Listeners/Log.php b/src/Appwrite/Bus/Listeners/Log.php new file mode 100644 index 0000000000..12ca7303c8 --- /dev/null +++ b/src/Appwrite/Bus/Listeners/Log.php @@ -0,0 +1,39 @@ +desc('Persists execution logs to database via queue') + ->inject('publisher') + ->callback($this->handle(...)); + } + + public function handle(ExecutionCompleted $event, Publisher $publisher): void + { + $queueForExecutions = new Execution($publisher); + $queueForExecutions + ->setExecution(new Document($event->execution)) + ->setProject(new Document($event->project)) + ->trigger(); + } +} diff --git a/src/Appwrite/Bus/Listeners/Usage.php b/src/Appwrite/Bus/Listeners/Usage.php new file mode 100644 index 0000000000..e91bd910f5 --- /dev/null +++ b/src/Appwrite/Bus/Listeners/Usage.php @@ -0,0 +1,64 @@ +desc('Records execution usage metrics') + ->inject('publisher') + ->callback($this->handle(...)); + } + + public function handle(ExecutionCompleted $event, Publisher $publisher): void + { + $execution = new Document($event->execution); + $resource = new Document($event->resource); + + // Non-SSR sites don't record execution metrics + if ($execution->getAttribute('resourceType') === 'sites' && $resource->getAttribute('adapter') !== 'ssr') { + return; + } + $project = new Document($event->project); + $spec = $event->spec; + + $resourceType = $execution->getAttribute('resourceType', ''); + $resourceInternalId = $execution->getAttribute('resourceInternalId', ''); + $duration = $execution->getAttribute('duration', 0); + + $compute = (int)($duration * 1000); + $mbSeconds = (int)(($spec['memory'] ?? APP_COMPUTE_MEMORY_DEFAULT) * $duration * ($spec['cpus'] ?? APP_COMPUTE_CPUS_DEFAULT)); + + $queueForStatsUsage = new StatsUsage($publisher); + $queueForStatsUsage + ->setProject($project) + ->addMetric(METRIC_EXECUTIONS, 1) + ->addMetric(str_replace(['{resourceType}'], [$resourceType], METRIC_RESOURCE_TYPE_EXECUTIONS), 1) + ->addMetric(str_replace(['{resourceType}', '{resourceInternalId}'], [$resourceType, $resourceInternalId], METRIC_RESOURCE_TYPE_ID_EXECUTIONS), 1) + ->addMetric(METRIC_EXECUTIONS_COMPUTE, $compute) + ->addMetric(str_replace(['{resourceType}'], [$resourceType], METRIC_RESOURCE_TYPE_EXECUTIONS_COMPUTE), $compute) + ->addMetric(str_replace(['{resourceType}', '{resourceInternalId}'], [$resourceType, $resourceInternalId], METRIC_RESOURCE_TYPE_ID_EXECUTIONS_COMPUTE), $compute) + ->addMetric(METRIC_EXECUTIONS_MB_SECONDS, $mbSeconds) + ->addMetric(str_replace(['{resourceType}'], [$resourceType], METRIC_RESOURCE_TYPE_EXECUTIONS_MB_SECONDS), $mbSeconds) + ->addMetric(str_replace(['{resourceType}', '{resourceInternalId}'], [$resourceType, $resourceInternalId], METRIC_RESOURCE_TYPE_ID_EXECUTIONS_MB_SECONDS), $mbSeconds) + ->trigger(); + } +} diff --git a/src/Appwrite/Platform/Workers/Functions.php b/src/Appwrite/Platform/Workers/Functions.php index 0932aea335..6c59bc8da0 100644 --- a/src/Appwrite/Platform/Workers/Functions.php +++ b/src/Appwrite/Platform/Workers/Functions.php @@ -3,15 +3,15 @@ namespace Appwrite\Platform\Workers; use Ahc\Jwt\JWT; +use Appwrite\Bus\ExecutionCompleted; use Appwrite\Event\Event; -use Appwrite\Event\Execution as ExecutionEvent; use Appwrite\Event\Func; use Appwrite\Event\Realtime; -use Appwrite\Event\StatsUsage; use Appwrite\Event\Webhook; use Appwrite\Extend\Exception as AppwriteException; use Appwrite\Utopia\Response\Model\Execution; use Executor\Executor; +use Utopia\Bus\Bus; use Utopia\Config\Config; use Utopia\Console; use Utopia\Database\Database; @@ -47,8 +47,7 @@ class Functions extends Action ->inject('queueForFunctions') ->inject('queueForRealtime') ->inject('queueForEvents') - ->inject('queueForStatsUsage') - ->inject('queueForExecutions') + ->inject('bus') ->inject('log') ->inject('executor') ->inject('isResourceBlocked') @@ -63,8 +62,7 @@ class Functions extends Action Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, - StatsUsage $queueForStatsUsage, - ExecutionEvent $queueForExecutions, + Bus $bus, Log $log, Executor $executor, callable $isResourceBlocked @@ -158,9 +156,8 @@ class Functions extends Action queueForWebhooks: $queueForWebhooks, queueForFunctions: $queueForFunctions, queueForRealtime: $queueForRealtime, - queueForStatsUsage: $queueForStatsUsage, queueForEvents: $queueForEvents, - queueForExecutions: $queueForExecutions, + bus: $bus, project: $project, function: $function, executor: $executor, @@ -203,9 +200,8 @@ class Functions extends Action queueForWebhooks: $queueForWebhooks, queueForFunctions: $queueForFunctions, queueForRealtime: $queueForRealtime, - queueForStatsUsage: $queueForStatsUsage, queueForEvents: $queueForEvents, - queueForExecutions: $queueForExecutions, + bus: $bus, project: $project, function: $function, executor: $executor, @@ -230,9 +226,8 @@ class Functions extends Action queueForWebhooks: $queueForWebhooks, queueForFunctions: $queueForFunctions, queueForRealtime: $queueForRealtime, - queueForStatsUsage: $queueForStatsUsage, queueForEvents: $queueForEvents, - queueForExecutions: $queueForExecutions, + bus: $bus, project: $project, function: $function, executor: $executor, @@ -266,7 +261,7 @@ class Functions extends Action private function fail( string $message, Document $project, - ExecutionEvent $queueForExecutions, + Bus $bus, Document $function, string $trigger, string $path, @@ -309,10 +304,10 @@ class Functions extends Action 'duration' => 0.0, ]); - $queueForExecutions - ->setExecution($execution) - ->setProject($project) - ->trigger(); + $bus->dispatch(new ExecutionCompleted( + execution: $execution->getArrayCopy(), + project: $project->getArrayCopy(), + )); } /** @@ -320,7 +315,6 @@ class Functions extends Action * @param Database $dbForProject * @param Func $queueForFunctions * @param Realtime $queueForRealtime - * @param StatsUsage $queueForStatsUsage * @param Event $queueForEvents * @param Document $project * @param Document $function @@ -343,9 +337,8 @@ class Functions extends Action Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, - StatsUsage $queueForStatsUsage, Event $queueForEvents, - ExecutionEvent $queueForExecutions, + Bus $bus, Document $project, Document $function, Executor $executor, @@ -373,19 +366,19 @@ class Functions extends Action if ($deployment->getAttribute('resourceId') !== $functionId) { $errorMessage = 'The execution could not be completed because a corresponding deployment was not found. A function deployment needs to be created before it can be executed. Please create a deployment for your function and try again.'; - $this->fail($errorMessage, $project, $queueForExecutions, $function, $trigger, $path, $method, $user, $jwt, $event); + $this->fail($errorMessage, $project, $bus, $function, $trigger, $path, $method, $user, $jwt, $event); return; } if ($deployment->isEmpty()) { $errorMessage = 'The execution could not be completed because a corresponding deployment was not found. A function deployment needs to be created before it can be executed. Please create a deployment for your function and try again.'; - $this->fail($errorMessage, $project, $queueForExecutions, $function, $trigger, $path, $method, $user, $jwt, $event); + $this->fail($errorMessage, $project, $bus, $function, $trigger, $path, $method, $user, $jwt, $event); return; } if ($deployment->getAttribute('status') !== 'ready') { $errorMessage = 'The execution could not be completed because the build is not ready. Please wait for the build to complete and try again.'; - $this->fail($errorMessage, $project, $queueForExecutions, $function, $trigger, $path, $method, $user, $jwt, $event); + $this->fail($errorMessage, $project, $bus, $function, $trigger, $path, $method, $user, $jwt, $event); return; } @@ -592,26 +585,12 @@ class Functions extends Action $error = $th->getMessage(); $errorCode = $th->getCode(); } finally { - /** Persist final execution status */ - $queueForExecutions - ->setExecution($execution) - ->setProject($project) - ->trigger(); - - /** Trigger usage queue */ - $queueForStatsUsage - ->setProject($project) - ->addMetric(METRIC_EXECUTIONS, 1) - ->addMetric(str_replace(['{resourceType}'], [RESOURCE_TYPE_FUNCTIONS], METRIC_RESOURCE_TYPE_EXECUTIONS), 1) - ->addMetric(str_replace(['{resourceType}', '{resourceInternalId}'], [RESOURCE_TYPE_FUNCTIONS, $function->getSequence()], METRIC_RESOURCE_TYPE_ID_EXECUTIONS), 1) - ->addMetric(METRIC_EXECUTIONS_COMPUTE, (int)($execution->getAttribute('duration') * 1000))// per project - ->addMetric(str_replace(['{resourceType}'], [RESOURCE_TYPE_FUNCTIONS], METRIC_RESOURCE_TYPE_EXECUTIONS_COMPUTE), (int)($execution->getAttribute('duration') * 1000)) - ->addMetric(str_replace(['{resourceType}', '{resourceInternalId}'], [RESOURCE_TYPE_FUNCTIONS, $function->getSequence()], METRIC_RESOURCE_TYPE_ID_EXECUTIONS_COMPUTE), (int)($execution->getAttribute('duration') * 1000)) - ->addMetric(METRIC_EXECUTIONS_MB_SECONDS, (int)(($spec['memory'] ?? APP_COMPUTE_MEMORY_DEFAULT) * $execution->getAttribute('duration', 0) * ($spec['cpus'] ?? APP_COMPUTE_CPUS_DEFAULT))) - ->addMetric(str_replace(['{resourceType}'], [RESOURCE_TYPE_FUNCTIONS], METRIC_RESOURCE_TYPE_EXECUTIONS_MB_SECONDS), (int)(($spec['memory'] ?? APP_COMPUTE_MEMORY_DEFAULT) * $execution->getAttribute('duration', 0) * ($spec['cpus'] ?? APP_COMPUTE_CPUS_DEFAULT))) - ->addMetric(str_replace(['{resourceType}', '{resourceInternalId}'], [RESOURCE_TYPE_FUNCTIONS, $function->getSequence()], METRIC_RESOURCE_TYPE_ID_EXECUTIONS_MB_SECONDS), (int)(($spec['memory'] ?? APP_COMPUTE_MEMORY_DEFAULT) * $execution->getAttribute('duration', 0) * ($spec['cpus'] ?? APP_COMPUTE_CPUS_DEFAULT))) - ->trigger() - ; + /** Persist final execution status and record usage */ + $bus->dispatch(new ExecutionCompleted( + execution: $execution->getArrayCopy(), + project: $project->getArrayCopy(), + spec: $spec, + )); } $executionModel = new Execution(); diff --git a/src/Utopia/Bus/Bus.php b/src/Utopia/Bus/Bus.php new file mode 100644 index 0000000000..8c4d76958d --- /dev/null +++ b/src/Utopia/Bus/Bus.php @@ -0,0 +1,59 @@ +, Listener[]> */ + private array $listeners = []; + + /** @var ?\Closure(string): mixed */ + private ?\Closure $resolver = null; + + public function setResolver(callable $resolver): self + { + $this->resolver = $resolver(...); + return $this; + } + + public function subscribe(Listener $listener): self + { + foreach ($listener::getEvents() as $event) { + $this->listeners[$event][] = $listener; + } + return $this; + } + + public function dispatch(Event $event): void + { + if ($this->resolver === null) { + throw new \LogicException('Bus resolver must be set via setResolver() before dispatching events'); + } + + $resolver = $this->resolver; + $listeners = $this->listeners[$event::class] ?? []; + + /** @var array}> $resolved */ + $resolved = []; + foreach ($listeners as $listener) { + $deps = array_map($resolver, $listener->getInjections()); + $resolved[] = [$listener, $deps]; + } + + go(function () use ($resolved, $event) { + foreach ($resolved as [$listener, $deps]) { + $action = 'listener.' . $listener::getName(); + Span::init($action); + try { + ($listener->getCallback())($event, ...$deps); + } catch (\Throwable $e) { + Span::error($e); + } finally { + Span::current()?->finish(); + } + } + }); + } +} diff --git a/src/Utopia/Bus/Event.php b/src/Utopia/Bus/Event.php new file mode 100644 index 0000000000..1423f1198d --- /dev/null +++ b/src/Utopia/Bus/Event.php @@ -0,0 +1,7 @@ + */ + protected array $injections = []; + protected ?\Closure $callback = null; + + abstract public static function getName(): string; + + /** + * @return array> + */ + abstract public static function getEvents(): array; + + protected function desc(string $desc): self + { + $this->desc = $desc; + return $this; + } + + protected function inject(string $injection): self + { + $this->injections[] = $injection; + return $this; + } + + protected function callback(callable $callback): self + { + $this->callback = $callback(...); + return $this; + } + + /** @return array */ + public function getInjections(): array + { + return $this->injections; + } + + public function getCallback(): callable + { + if ($this->callback === null) { + throw new \LogicException(static::class . ' must set a callback via $this->callback()'); + } + + return $this->callback; + } +}