refactor: replace queueForExecutions with Bus event bus

Introduce a generic event bus (Utopia\Bus) with typed events, listener
base class, Span instrumentation, and coroutine dispatch. Replace all
direct queueForExecutions and inline execution usage calls with
ExecutionCompleted event and dedicated listeners (Log, Usage).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
loks0n
2026-02-13 12:28:26 +00:00
parent 598c71fb11
commit b45ff6b646
16 changed files with 321 additions and 103 deletions
+4
View File
@@ -318,6 +318,10 @@ $setResource('logError', function (Registry $register) {
$setResource('executor', fn () => new Executor(), []);
$setResource('bus', function (Registry $register) use ($cli) {
return $register->get('bus')->setResolver(fn (string $name) => $cli->getResource($name));
}, ['register']);
$setResource('telemetry', fn () => new NoTelemetry(), []);
$cli
+22 -51
View File
@@ -5,10 +5,10 @@ require_once __DIR__ . '/../init.php';
use Ahc\Jwt\JWT;
use Ahc\Jwt\JWTException;
use Appwrite\Auth\Key;
use Appwrite\Bus\ExecutionCompleted;
use Appwrite\Event\Certificate;
use Appwrite\Event\Delete as DeleteEvent;
use Appwrite\Event\Event;
use Appwrite\Event\Execution;
use Appwrite\Event\StatsUsage;
use Appwrite\Extend\Exception as AppwriteException;
use Appwrite\Network\Cors;
@@ -35,6 +35,7 @@ use Executor\Executor;
use MaxMind\Db\Reader;
use Swoole\Http\Request as SwooleRequest;
use Swoole\Table;
use Utopia\Bus\Bus;
use Utopia\Config\Config;
use Utopia\Console;
use Utopia\Database\Database;
@@ -62,7 +63,7 @@ Config::setParam('domainVerification', false);
Config::setParam('cookieDomain', 'localhost');
Config::setParam('cookieSamesite', Response::COOKIE_SAMESITE_NONE);
function router(Http $utopia, Database $dbForPlatform, callable $getProjectDB, SwooleRequest $swooleRequest, Request $request, Response $response, Log $log, Event $queueForEvents, StatsUsage $queueForStatsUsage, Execution $queueForExecutions, Executor $executor, Reader $geodb, callable $isResourceBlocked, array $platform, string $previewHostname, Authorization $authorization, ?Key $apiKey, DeleteEvent $queueForDeletes, int $executionsRetentionCount)
function router(Http $utopia, Database $dbForPlatform, callable $getProjectDB, SwooleRequest $swooleRequest, Request $request, Response $response, Log $log, Event $queueForEvents, StatsUsage $queueForStatsUsage, Bus $bus, Executor $executor, Reader $geodb, callable $isResourceBlocked, array $platform, string $previewHostname, Authorization $authorization, ?Key $apiKey, DeleteEvent $queueForDeletes, int $executionsRetentionCount)
{
$host = $request->getHostname() ?? '';
if (!empty($previewHostname)) {
@@ -706,10 +707,12 @@ function router(Http $utopia, Database $dbForPlatform, callable $getProjectDB, S
}
} finally {
if ($type === 'function' || $type === 'site') {
$queueForExecutions
->setExecution($execution)
->setProject($project)
->trigger();
$bus->dispatch(new ExecutionCompleted(
execution: $execution->getArrayCopy(),
project: $project->getArrayCopy(),
spec: $spec,
resource: $resource->getArrayCopy(),
));
}
}
@@ -766,31 +769,11 @@ function router(Http $utopia, Database $dbForPlatform, callable $getProjectDB, S
}
}
$metricTypeExecutions = str_replace(['{resourceType}'], [$deployment->getAttribute('resourceType')], METRIC_RESOURCE_TYPE_EXECUTIONS);
$metricTypeIdExecutions = str_replace(['{resourceType}', '{resourceInternalId}'], [$deployment->getAttribute('resourceType'), $resource->getSequence()], METRIC_RESOURCE_TYPE_ID_EXECUTIONS);
$metricTypeExecutionsCompute = str_replace(['{resourceType}'], [$deployment->getAttribute('resourceType')], METRIC_RESOURCE_TYPE_EXECUTIONS_COMPUTE);
$metricTypeIdExecutionsCompute = str_replace(['{resourceType}', '{resourceInternalId}'], [$deployment->getAttribute('resourceType'), $resource->getSequence()], METRIC_RESOURCE_TYPE_ID_EXECUTIONS_COMPUTE);
$metricTypeExecutionsMbSeconds = str_replace(['{resourceType}'], [$deployment->getAttribute('resourceType')], METRIC_RESOURCE_TYPE_EXECUTIONS_MB_SECONDS);
$metricTypeIdExecutionsMBSeconds = str_replace(['{resourceType}', '{resourceInternalId}'], [$deployment->getAttribute('resourceType'), $resource->getSequence()], METRIC_RESOURCE_TYPE_ID_EXECUTIONS_MB_SECONDS);
if ($deployment->getAttribute('resourceType') === 'sites') {
$queueForStatsUsage
->disableMetric(METRIC_NETWORK_REQUESTS)
->disableMetric(METRIC_NETWORK_INBOUND)
->disableMetric(METRIC_NETWORK_OUTBOUND);
if ($resource->getAttribute('adapter') !== 'ssr') {
$queueForStatsUsage
->disableMetric(METRIC_EXECUTIONS)
->disableMetric(METRIC_EXECUTIONS_COMPUTE)
->disableMetric(METRIC_EXECUTIONS_MB_SECONDS)
->disableMetric($metricTypeExecutions)
->disableMetric($metricTypeIdExecutions)
->disableMetric($metricTypeExecutionsCompute)
->disableMetric($metricTypeIdExecutionsCompute)
->disableMetric($metricTypeExecutionsMbSeconds)
->disableMetric($metricTypeIdExecutionsMBSeconds);
}
$queueForStatsUsage
->disableMetric(METRIC_NETWORK_OUTBOUND)
->addMetric(METRIC_SITES_REQUESTS, 1)
->addMetric(METRIC_SITES_INBOUND, $request->getSize() + $fileSize)
->addMetric(METRIC_SITES_OUTBOUND, $response->getSize())
@@ -800,22 +783,10 @@ function router(Http $utopia, Database $dbForPlatform, callable $getProjectDB, S
;
}
$compute = (int)($execution->getAttribute('duration') * 1000);
$mbSeconds = (int)(($spec['memory'] ?? APP_COMPUTE_MEMORY_DEFAULT) * $execution->getAttribute('duration', 0) * ($spec['cpus'] ?? APP_COMPUTE_CPUS_DEFAULT));
$queueForStatsUsage
->addMetric(METRIC_NETWORK_REQUESTS, 1)
->addMetric(METRIC_NETWORK_INBOUND, $request->getSize() + $fileSize)
->addMetric(METRIC_NETWORK_OUTBOUND, $response->getSize())
->addMetric(METRIC_EXECUTIONS, 1)
->addMetric($metricTypeExecutions, 1)
->addMetric($metricTypeIdExecutions, 1)
->addMetric(METRIC_EXECUTIONS_COMPUTE, $compute) // per project
->addMetric($metricTypeExecutionsCompute, $compute) // per function
->addMetric($metricTypeIdExecutionsCompute, $compute) // per function
->addMetric(METRIC_EXECUTIONS_MB_SECONDS, $mbSeconds)
->addMetric($metricTypeExecutionsMbSeconds, $mbSeconds)
->addMetric($metricTypeIdExecutionsMBSeconds, $mbSeconds)
->setProject($project)
->trigger();
@@ -883,7 +854,7 @@ Http::init()
->inject('geodb')
->inject('queueForStatsUsage')
->inject('queueForEvents')
->inject('queueForExecutions')
->inject('bus')
->inject('executor')
->inject('platform')
->inject('isResourceBlocked')
@@ -894,7 +865,7 @@ Http::init()
->inject('authorization')
->inject('queueForDeletes')
->inject('executionsRetentionCount')
->action(function (Http $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Log $log, Document $project, Database $dbForPlatform, callable $getProjectDB, Locale $locale, array $localeCodes, Reader $geodb, StatsUsage $queueForStatsUsage, Event $queueForEvents, Execution $queueForExecutions, Executor $executor, array $platform, callable $isResourceBlocked, string $previewHostname, Document $devKey, ?Key $apiKey, Cors $cors, Authorization $authorization, DeleteEvent $queueForDeletes, int $executionsRetentionCount) {
->action(function (Http $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Log $log, Document $project, Database $dbForPlatform, callable $getProjectDB, Locale $locale, array $localeCodes, Reader $geodb, StatsUsage $queueForStatsUsage, Event $queueForEvents, Bus $bus, Executor $executor, array $platform, callable $isResourceBlocked, string $previewHostname, Document $devKey, ?Key $apiKey, Cors $cors, Authorization $authorization, DeleteEvent $queueForDeletes, int $executionsRetentionCount) {
/*
* Appwrite Router
*/
@@ -902,7 +873,7 @@ Http::init()
$platformHostnames = $platform['hostnames'] ?? [];
// Only run Router when external domain
if (!\in_array($hostname, $platformHostnames) || !empty($previewHostname)) {
if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $log, $queueForEvents, $queueForStatsUsage, $queueForExecutions, $executor, $geodb, $isResourceBlocked, $platform, $previewHostname, $authorization, $apiKey, $queueForDeletes, $executionsRetentionCount)) {
if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $log, $queueForEvents, $queueForStatsUsage, $bus, $executor, $geodb, $isResourceBlocked, $platform, $previewHostname, $authorization, $apiKey, $queueForDeletes, $executionsRetentionCount)) {
$utopia->getRoute()?->label('router', true);
}
}
@@ -1179,7 +1150,7 @@ Http::options()
->inject('getProjectDB')
->inject('queueForEvents')
->inject('queueForStatsUsage')
->inject('queueForExecutions')
->inject('bus')
->inject('executor')
->inject('geodb')
->inject('isResourceBlocked')
@@ -1192,14 +1163,14 @@ Http::options()
->inject('authorization')
->inject('queueForDeletes')
->inject('executionsRetentionCount')
->action(function (Http $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Log $log, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, StatsUsage $queueForStatsUsage, Execution $queueForExecutions, Executor $executor, Reader $geodb, callable $isResourceBlocked, array $platform, string $previewHostname, Document $project, Document $devKey, ?Key $apiKey, Cors $cors, Authorization $authorization, DeleteEvent $queueForDeletes, int $executionsRetentionCount) {
->action(function (Http $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Log $log, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, StatsUsage $queueForStatsUsage, Bus $bus, Executor $executor, Reader $geodb, callable $isResourceBlocked, array $platform, string $previewHostname, Document $project, Document $devKey, ?Key $apiKey, Cors $cors, Authorization $authorization, DeleteEvent $queueForDeletes, int $executionsRetentionCount) {
/*
* Appwrite Router
*/
$platformHostnames = $platform['hostnames'] ?? [];
// Only run Router when external domain
if (!in_array($request->getHostname(), $platformHostnames) || !empty($previewHostname)) {
if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $log, $queueForEvents, $queueForStatsUsage, $queueForExecutions, $executor, $geodb, $isResourceBlocked, $platform, $previewHostname, $authorization, $apiKey, $queueForDeletes, $executionsRetentionCount)) {
if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $log, $queueForEvents, $queueForStatsUsage, $bus, $executor, $geodb, $isResourceBlocked, $platform, $previewHostname, $authorization, $apiKey, $queueForDeletes, $executionsRetentionCount)) {
$utopia->getRoute()?->label('router', true);
}
}
@@ -1569,7 +1540,7 @@ Http::get('/robots.txt')
->inject('getProjectDB')
->inject('queueForEvents')
->inject('queueForStatsUsage')
->inject('queueForExecutions')
->inject('bus')
->inject('executor')
->inject('geodb')
->inject('isResourceBlocked')
@@ -1579,13 +1550,13 @@ Http::get('/robots.txt')
->inject('authorization')
->inject('queueForDeletes')
->inject('executionsRetentionCount')
->action(function (Http $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Log $log, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, StatsUsage $queueForStatsUsage, Execution $queueForExecutions, Executor $executor, Reader $geodb, callable $isResourceBlocked, array $platform, string $previewHostname, ?Key $apiKey, Authorization $authorization, DeleteEvent $queueForDeletes, int $executionsRetentionCount) {
->action(function (Http $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Log $log, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, StatsUsage $queueForStatsUsage, Bus $bus, Executor $executor, Reader $geodb, callable $isResourceBlocked, array $platform, string $previewHostname, ?Key $apiKey, Authorization $authorization, DeleteEvent $queueForDeletes, int $executionsRetentionCount) {
$platformHostnames = $platform['hostnames'] ?? [];
if (in_array($request->getHostname(), $platformHostnames) || !empty($previewHostname)) {
$template = new View(__DIR__ . '/../views/general/robots.phtml');
$response->text($template->render(false));
} else {
if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $log, $queueForEvents, $queueForStatsUsage, $queueForExecutions, $executor, $geodb, $isResourceBlocked, $platform, $previewHostname, $authorization, $apiKey, $queueForDeletes, $executionsRetentionCount)) {
if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $log, $queueForEvents, $queueForStatsUsage, $bus, $executor, $geodb, $isResourceBlocked, $platform, $previewHostname, $authorization, $apiKey, $queueForDeletes, $executionsRetentionCount)) {
$utopia->getRoute()?->label('router', true);
}
}
@@ -1604,7 +1575,7 @@ Http::get('/humans.txt')
->inject('getProjectDB')
->inject('queueForEvents')
->inject('queueForStatsUsage')
->inject('queueForExecutions')
->inject('bus')
->inject('executor')
->inject('geodb')
->inject('isResourceBlocked')
@@ -1614,13 +1585,13 @@ Http::get('/humans.txt')
->inject('authorization')
->inject('queueForDeletes')
->inject('executionsRetentionCount')
->action(function (Http $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Log $log, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, StatsUsage $queueForStatsUsage, Execution $queueForExecutions, Executor $executor, Reader $geodb, callable $isResourceBlocked, array $platform, string $previewHostname, ?Key $apiKey, Authorization $authorization, DeleteEvent $queueForDeletes, int $executionsRetentionCount) {
->action(function (Http $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Log $log, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, StatsUsage $queueForStatsUsage, Bus $bus, Executor $executor, Reader $geodb, callable $isResourceBlocked, array $platform, string $previewHostname, ?Key $apiKey, Authorization $authorization, DeleteEvent $queueForDeletes, int $executionsRetentionCount) {
$platformHostnames = $platform['hostnames'] ?? [];
if (in_array($request->getHostname(), $platformHostnames) || !empty($previewHostname)) {
$template = new View(__DIR__ . '/../views/general/humans.phtml');
$response->text($template->render(false));
} else {
if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $log, $queueForEvents, $queueForStatsUsage, $queueForExecutions, $executor, $geodb, $isResourceBlocked, $platform, $previewHostname, $authorization, $apiKey, $queueForDeletes, $executionsRetentionCount)) {
if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $log, $queueForEvents, $queueForStatsUsage, $bus, $executor, $geodb, $isResourceBlocked, $platform, $previewHostname, $authorization, $apiKey, $queueForDeletes, $executionsRetentionCount)) {
$utopia->getRoute()?->label('router', true);
}
}
+4
View File
@@ -188,6 +188,10 @@ $http->on(Constant::EVENT_AFTER_RELOAD, function ($server) {
Console::success('Reload completed...');
});
Http::setResource('bus', function ($register, $utopia) {
return $register->get('bus')->setResolver(fn (string $name) => $utopia->getResource($name));
}, ['register', 'utopia']);
include __DIR__ . '/controllers/general.php';
function createDatabase(Http $app, string $resourceKey, string $dbName, array $collections, mixed $pools, ?callable $extraSetup = null): void
+8
View File
@@ -449,3 +449,11 @@ $register->set('promiseAdapter', function () {
$register->set('hooks', function () {
return new Hooks();
});
$listeners = require __DIR__ . '/../listeners.php';
$register->set('bus', function () use ($listeners) {
$bus = new \Utopia\Bus\Bus();
foreach ($listeners as $listener) {
$bus->subscribe($listener);
}
return $bus;
});
-4
View File
@@ -10,7 +10,6 @@ use Appwrite\Event\Certificate;
use Appwrite\Event\Database as EventDatabase;
use Appwrite\Event\Delete;
use Appwrite\Event\Event;
use Appwrite\Event\Execution;
use Appwrite\Event\Func;
use Appwrite\Event\Mail;
use Appwrite\Event\Messaging;
@@ -160,9 +159,6 @@ Http::setResource('queueForAudits', function (Publisher $publisher) {
Http::setResource('queueForFunctions', function (Publisher $publisher) {
return new Func($publisher);
}, ['publisher']);
Http::setResource('queueForExecutions', function (Publisher $publisher) {
return new Execution($publisher);
}, ['publisher']);
Http::setResource('eventProcessor', function () {
return new EventProcessor();
}, []);
+9
View File
@@ -0,0 +1,9 @@
<?php
use Appwrite\Bus\Listeners\Log;
use Appwrite\Bus\Listeners\Usage;
return [
new Log(),
new Usage(),
];
+4 -4
View File
@@ -9,7 +9,6 @@ use Appwrite\Event\Certificate;
use Appwrite\Event\Database as EventDatabase;
use Appwrite\Event\Delete;
use Appwrite\Event\Event;
use Appwrite\Event\Execution;
use Appwrite\Event\Func;
use Appwrite\Event\Mail;
use Appwrite\Event\Messaging;
@@ -355,9 +354,6 @@ Server::setResource('queueForFunctions', function (Publisher $publisher) {
return new Func($publisher);
}, ['publisher']);
Server::setResource('queueForExecutions', function (Publisher $publisher) {
return new Execution($publisher);
}, ['publisher']);
Server::setResource('queueForRealtime', function () {
return new Realtime();
@@ -542,6 +538,10 @@ try {
$worker = $platform->getWorker();
Server::setResource('bus', function ($register) use ($worker) {
return $register->get('bus')->setResolver(fn (string $name) => $worker->getResource($name));
}, ['register']);
$worker
->error()
->inject('error')
+2 -1
View File
@@ -19,7 +19,8 @@
"autoload": {
"psr-4": {
"Appwrite\\": "src/Appwrite",
"Executor\\": "src/Executor"
"Executor\\": "src/Executor",
"Utopia\\Bus\\": "src/Utopia/Bus"
}
},
"autoload-dev": {
+4
View File
@@ -1,7 +1,11 @@
parameters:
level: 8
paths:
- src/Utopia/Bus
- src/Appwrite/Bus
- src/Appwrite/Transformation
bootstrapFiles:
- app/init/constants.php
scanDirectories:
- vendor/swoole/ide-helper
excludePaths:
+22
View File
@@ -0,0 +1,22 @@
<?php
namespace Appwrite\Bus;
use Utopia\Bus\Event;
class ExecutionCompleted implements Event
{
/**
* @param array<string, mixed> $execution
* @param array<string, mixed> $project
* @param array<string, mixed> $spec
* @param array<string, mixed> $resource
*/
public function __construct(
public readonly array $execution,
public readonly array $project,
public readonly array $spec = [],
public readonly array $resource = [],
) {
}
}
+39
View File
@@ -0,0 +1,39 @@
<?php
namespace Appwrite\Bus\Listeners;
use Appwrite\Bus\ExecutionCompleted;
use Appwrite\Event\Execution;
use Utopia\Bus\Listener;
use Utopia\Database\Document;
use Utopia\Queue\Publisher;
class Log extends Listener
{
public static function getName(): string
{
return 'log';
}
public static function getEvents(): array
{
return [ExecutionCompleted::class];
}
public function __construct()
{
$this
->desc('Persists execution logs to database via queue')
->inject('publisher')
->callback($this->handle(...));
}
public function handle(ExecutionCompleted $event, Publisher $publisher): void
{
$queueForExecutions = new Execution($publisher);
$queueForExecutions
->setExecution(new Document($event->execution))
->setProject(new Document($event->project))
->trigger();
}
}
+64
View File
@@ -0,0 +1,64 @@
<?php
namespace Appwrite\Bus\Listeners;
use Appwrite\Bus\ExecutionCompleted;
use Appwrite\Event\StatsUsage;
use Utopia\Bus\Listener;
use Utopia\Database\Document;
use Utopia\Queue\Publisher;
class Usage extends Listener
{
public static function getName(): string
{
return 'usage';
}
public static function getEvents(): array
{
return [ExecutionCompleted::class];
}
public function __construct()
{
$this
->desc('Records execution usage metrics')
->inject('publisher')
->callback($this->handle(...));
}
public function handle(ExecutionCompleted $event, Publisher $publisher): void
{
$execution = new Document($event->execution);
$resource = new Document($event->resource);
// Non-SSR sites don't record execution metrics
if ($execution->getAttribute('resourceType') === 'sites' && $resource->getAttribute('adapter') !== 'ssr') {
return;
}
$project = new Document($event->project);
$spec = $event->spec;
$resourceType = $execution->getAttribute('resourceType', '');
$resourceInternalId = $execution->getAttribute('resourceInternalId', '');
$duration = $execution->getAttribute('duration', 0);
$compute = (int)($duration * 1000);
$mbSeconds = (int)(($spec['memory'] ?? APP_COMPUTE_MEMORY_DEFAULT) * $duration * ($spec['cpus'] ?? APP_COMPUTE_CPUS_DEFAULT));
$queueForStatsUsage = new StatsUsage($publisher);
$queueForStatsUsage
->setProject($project)
->addMetric(METRIC_EXECUTIONS, 1)
->addMetric(str_replace(['{resourceType}'], [$resourceType], METRIC_RESOURCE_TYPE_EXECUTIONS), 1)
->addMetric(str_replace(['{resourceType}', '{resourceInternalId}'], [$resourceType, $resourceInternalId], METRIC_RESOURCE_TYPE_ID_EXECUTIONS), 1)
->addMetric(METRIC_EXECUTIONS_COMPUTE, $compute)
->addMetric(str_replace(['{resourceType}'], [$resourceType], METRIC_RESOURCE_TYPE_EXECUTIONS_COMPUTE), $compute)
->addMetric(str_replace(['{resourceType}', '{resourceInternalId}'], [$resourceType, $resourceInternalId], METRIC_RESOURCE_TYPE_ID_EXECUTIONS_COMPUTE), $compute)
->addMetric(METRIC_EXECUTIONS_MB_SECONDS, $mbSeconds)
->addMetric(str_replace(['{resourceType}'], [$resourceType], METRIC_RESOURCE_TYPE_EXECUTIONS_MB_SECONDS), $mbSeconds)
->addMetric(str_replace(['{resourceType}', '{resourceInternalId}'], [$resourceType, $resourceInternalId], METRIC_RESOURCE_TYPE_ID_EXECUTIONS_MB_SECONDS), $mbSeconds)
->trigger();
}
}
+22 -43
View File
@@ -3,15 +3,15 @@
namespace Appwrite\Platform\Workers;
use Ahc\Jwt\JWT;
use Appwrite\Bus\ExecutionCompleted;
use Appwrite\Event\Event;
use Appwrite\Event\Execution as ExecutionEvent;
use Appwrite\Event\Func;
use Appwrite\Event\Realtime;
use Appwrite\Event\StatsUsage;
use Appwrite\Event\Webhook;
use Appwrite\Extend\Exception as AppwriteException;
use Appwrite\Utopia\Response\Model\Execution;
use Executor\Executor;
use Utopia\Bus\Bus;
use Utopia\Config\Config;
use Utopia\Console;
use Utopia\Database\Database;
@@ -47,8 +47,7 @@ class Functions extends Action
->inject('queueForFunctions')
->inject('queueForRealtime')
->inject('queueForEvents')
->inject('queueForStatsUsage')
->inject('queueForExecutions')
->inject('bus')
->inject('log')
->inject('executor')
->inject('isResourceBlocked')
@@ -63,8 +62,7 @@ class Functions extends Action
Func $queueForFunctions,
Realtime $queueForRealtime,
Event $queueForEvents,
StatsUsage $queueForStatsUsage,
ExecutionEvent $queueForExecutions,
Bus $bus,
Log $log,
Executor $executor,
callable $isResourceBlocked
@@ -158,9 +156,8 @@ class Functions extends Action
queueForWebhooks: $queueForWebhooks,
queueForFunctions: $queueForFunctions,
queueForRealtime: $queueForRealtime,
queueForStatsUsage: $queueForStatsUsage,
queueForEvents: $queueForEvents,
queueForExecutions: $queueForExecutions,
bus: $bus,
project: $project,
function: $function,
executor: $executor,
@@ -203,9 +200,8 @@ class Functions extends Action
queueForWebhooks: $queueForWebhooks,
queueForFunctions: $queueForFunctions,
queueForRealtime: $queueForRealtime,
queueForStatsUsage: $queueForStatsUsage,
queueForEvents: $queueForEvents,
queueForExecutions: $queueForExecutions,
bus: $bus,
project: $project,
function: $function,
executor: $executor,
@@ -230,9 +226,8 @@ class Functions extends Action
queueForWebhooks: $queueForWebhooks,
queueForFunctions: $queueForFunctions,
queueForRealtime: $queueForRealtime,
queueForStatsUsage: $queueForStatsUsage,
queueForEvents: $queueForEvents,
queueForExecutions: $queueForExecutions,
bus: $bus,
project: $project,
function: $function,
executor: $executor,
@@ -266,7 +261,7 @@ class Functions extends Action
private function fail(
string $message,
Document $project,
ExecutionEvent $queueForExecutions,
Bus $bus,
Document $function,
string $trigger,
string $path,
@@ -309,10 +304,10 @@ class Functions extends Action
'duration' => 0.0,
]);
$queueForExecutions
->setExecution($execution)
->setProject($project)
->trigger();
$bus->dispatch(new ExecutionCompleted(
execution: $execution->getArrayCopy(),
project: $project->getArrayCopy(),
));
}
/**
@@ -320,7 +315,6 @@ class Functions extends Action
* @param Database $dbForProject
* @param Func $queueForFunctions
* @param Realtime $queueForRealtime
* @param StatsUsage $queueForStatsUsage
* @param Event $queueForEvents
* @param Document $project
* @param Document $function
@@ -343,9 +337,8 @@ class Functions extends Action
Webhook $queueForWebhooks,
Func $queueForFunctions,
Realtime $queueForRealtime,
StatsUsage $queueForStatsUsage,
Event $queueForEvents,
ExecutionEvent $queueForExecutions,
Bus $bus,
Document $project,
Document $function,
Executor $executor,
@@ -373,19 +366,19 @@ class Functions extends Action
if ($deployment->getAttribute('resourceId') !== $functionId) {
$errorMessage = 'The execution could not be completed because a corresponding deployment was not found. A function deployment needs to be created before it can be executed. Please create a deployment for your function and try again.';
$this->fail($errorMessage, $project, $queueForExecutions, $function, $trigger, $path, $method, $user, $jwt, $event);
$this->fail($errorMessage, $project, $bus, $function, $trigger, $path, $method, $user, $jwt, $event);
return;
}
if ($deployment->isEmpty()) {
$errorMessage = 'The execution could not be completed because a corresponding deployment was not found. A function deployment needs to be created before it can be executed. Please create a deployment for your function and try again.';
$this->fail($errorMessage, $project, $queueForExecutions, $function, $trigger, $path, $method, $user, $jwt, $event);
$this->fail($errorMessage, $project, $bus, $function, $trigger, $path, $method, $user, $jwt, $event);
return;
}
if ($deployment->getAttribute('status') !== 'ready') {
$errorMessage = 'The execution could not be completed because the build is not ready. Please wait for the build to complete and try again.';
$this->fail($errorMessage, $project, $queueForExecutions, $function, $trigger, $path, $method, $user, $jwt, $event);
$this->fail($errorMessage, $project, $bus, $function, $trigger, $path, $method, $user, $jwt, $event);
return;
}
@@ -592,26 +585,12 @@ class Functions extends Action
$error = $th->getMessage();
$errorCode = $th->getCode();
} finally {
/** Persist final execution status */
$queueForExecutions
->setExecution($execution)
->setProject($project)
->trigger();
/** Trigger usage queue */
$queueForStatsUsage
->setProject($project)
->addMetric(METRIC_EXECUTIONS, 1)
->addMetric(str_replace(['{resourceType}'], [RESOURCE_TYPE_FUNCTIONS], METRIC_RESOURCE_TYPE_EXECUTIONS), 1)
->addMetric(str_replace(['{resourceType}', '{resourceInternalId}'], [RESOURCE_TYPE_FUNCTIONS, $function->getSequence()], METRIC_RESOURCE_TYPE_ID_EXECUTIONS), 1)
->addMetric(METRIC_EXECUTIONS_COMPUTE, (int)($execution->getAttribute('duration') * 1000))// per project
->addMetric(str_replace(['{resourceType}'], [RESOURCE_TYPE_FUNCTIONS], METRIC_RESOURCE_TYPE_EXECUTIONS_COMPUTE), (int)($execution->getAttribute('duration') * 1000))
->addMetric(str_replace(['{resourceType}', '{resourceInternalId}'], [RESOURCE_TYPE_FUNCTIONS, $function->getSequence()], METRIC_RESOURCE_TYPE_ID_EXECUTIONS_COMPUTE), (int)($execution->getAttribute('duration') * 1000))
->addMetric(METRIC_EXECUTIONS_MB_SECONDS, (int)(($spec['memory'] ?? APP_COMPUTE_MEMORY_DEFAULT) * $execution->getAttribute('duration', 0) * ($spec['cpus'] ?? APP_COMPUTE_CPUS_DEFAULT)))
->addMetric(str_replace(['{resourceType}'], [RESOURCE_TYPE_FUNCTIONS], METRIC_RESOURCE_TYPE_EXECUTIONS_MB_SECONDS), (int)(($spec['memory'] ?? APP_COMPUTE_MEMORY_DEFAULT) * $execution->getAttribute('duration', 0) * ($spec['cpus'] ?? APP_COMPUTE_CPUS_DEFAULT)))
->addMetric(str_replace(['{resourceType}', '{resourceInternalId}'], [RESOURCE_TYPE_FUNCTIONS, $function->getSequence()], METRIC_RESOURCE_TYPE_ID_EXECUTIONS_MB_SECONDS), (int)(($spec['memory'] ?? APP_COMPUTE_MEMORY_DEFAULT) * $execution->getAttribute('duration', 0) * ($spec['cpus'] ?? APP_COMPUTE_CPUS_DEFAULT)))
->trigger()
;
/** Persist final execution status and record usage */
$bus->dispatch(new ExecutionCompleted(
execution: $execution->getArrayCopy(),
project: $project->getArrayCopy(),
spec: $spec,
));
}
$executionModel = new Execution();
+59
View File
@@ -0,0 +1,59 @@
<?php
namespace Utopia\Bus;
use Utopia\Span\Span;
class Bus
{
/** @var array<class-string<Event>, Listener[]> */
private array $listeners = [];
/** @var ?\Closure(string): mixed */
private ?\Closure $resolver = null;
public function setResolver(callable $resolver): self
{
$this->resolver = $resolver(...);
return $this;
}
public function subscribe(Listener $listener): self
{
foreach ($listener::getEvents() as $event) {
$this->listeners[$event][] = $listener;
}
return $this;
}
public function dispatch(Event $event): void
{
if ($this->resolver === null) {
throw new \LogicException('Bus resolver must be set via setResolver() before dispatching events');
}
$resolver = $this->resolver;
$listeners = $this->listeners[$event::class] ?? [];
/** @var array<array{Listener, array<mixed>}> $resolved */
$resolved = [];
foreach ($listeners as $listener) {
$deps = array_map($resolver, $listener->getInjections());
$resolved[] = [$listener, $deps];
}
go(function () use ($resolved, $event) {
foreach ($resolved as [$listener, $deps]) {
$action = 'listener.' . $listener::getName();
Span::init($action);
try {
($listener->getCallback())($event, ...$deps);
} catch (\Throwable $e) {
Span::error($e);
} finally {
Span::current()?->finish();
}
}
});
}
}
+7
View File
@@ -0,0 +1,7 @@
<?php
namespace Utopia\Bus;
interface Event
{
}
+51
View File
@@ -0,0 +1,51 @@
<?php
namespace Utopia\Bus;
abstract class Listener
{
protected ?string $desc = null;
/** @var array<string> */
protected array $injections = [];
protected ?\Closure $callback = null;
abstract public static function getName(): string;
/**
* @return array<class-string<Event>>
*/
abstract public static function getEvents(): array;
protected function desc(string $desc): self
{
$this->desc = $desc;
return $this;
}
protected function inject(string $injection): self
{
$this->injections[] = $injection;
return $this;
}
protected function callback(callable $callback): self
{
$this->callback = $callback(...);
return $this;
}
/** @return array<string> */
public function getInjections(): array
{
return $this->injections;
}
public function getCallback(): callable
{
if ($this->callback === null) {
throw new \LogicException(static::class . ' must set a callback via $this->callback()');
}
return $this->callback;
}
}