diff --git a/app/executor.php b/app/executor.php index 657ea7ff96..f586ad523a 100644 --- a/app/executor.php +++ b/app/executor.php @@ -29,6 +29,7 @@ use Utopia\Storage\Device\Local; use Utopia\Storage\Storage; use Swoole\Coroutine as Co; use Utopia\Cache\Cache; +use Utopia\Database\Query; use Utopia\Orchestration\Adapter\DockerCLI; require_once __DIR__ . '/init.php'; @@ -132,10 +133,12 @@ App::post('/v1/cleanup/function') global $orchestration; try { + // Get function document $function = Authorization::skip(function () use ($dbForInternal, $functionId) { return $dbForInternal->getDocument('functions', $functionId); }); + // Check if function exists if ($function->isEmpty()) { throw new Exception('Function not found', 404); } @@ -188,15 +191,18 @@ App::post('/v1/cleanup/tag') global $orchestration; try { + // Get tag document $tag = Authorization::skip(function () use ($dbForInternal, $tagId) { return $dbForInternal->getDocument('tags', $tagId); }); + // Check if tag exists if ($tag->isEmpty()) { throw new Exception('Tag not found', 404); } try { + // Remove the container of the tag $orchestration->remove('appwrite-function-' . $tag['$id'], true); Console::info('Removed container for tag ' . $tag['$id']); } catch (Exception $e) { @@ -217,14 +223,17 @@ App::post('/v1/tag') ->inject('dbForInternal') ->inject('projectID') ->action(function ($functionId, $tagId, $response, $dbForInternal, $projectID) { - $function = Authorization::skip(function() use ($functionId, $dbForInternal) { + // Get function document + $function = Authorization::skip(function () use ($functionId, $dbForInternal) { return $dbForInternal->getDocument('functions', $functionId); }); - $tag = Authorization::skip(function() use ($tagId, $dbForInternal) { + // Get tag document + $tag = Authorization::skip(function () use ($tagId, $dbForInternal) { return $dbForInternal->getDocument('tags', $tagId); }); + // Check if both documents exist if ($function->isEmpty()) { throw new Exception('Function not found', 404); } @@ -233,11 +242,13 @@ App::post('/v1/tag') throw new Exception('Tag not found', 404); } + // Update the schedule $schedule = $function->getAttribute('schedule', ''); $cron = (empty($function->getAttribute('tag')) && !empty($schedule)) ? new CronExpression($schedule) : null; $next = (empty($function->getAttribute('tag')) && !empty($schedule)) ? $cron->getNextRunDate()->format('U') : 0; - $function = Authorization::skip(function() use ($function, $dbForInternal, $tag, $next) { + // Update the function document setting the tag as the active one + $function = Authorization::skip(function () use ($function, $dbForInternal, $tag, $next) { return $function = $dbForInternal->updateDocument('functions', $function->getId(), new Document(array_merge($function->getArrayCopy(), [ 'tag' => $tag->getId(), 'scheduleNext' => (int)$next, @@ -318,6 +329,7 @@ function runBuildStage(string $tagID, Document $function, string $projectID, Dat $tagPathTargetDir = \pathinfo($tagPathTarget, PATHINFO_DIRNAME); $container = 'build-stage-' . $tag->getId(); + // Perform various checks if (!\is_readable($tagPath)) { throw new Exception('Code is not readable: ' . $tag->getAttribute('path', '')); } @@ -334,6 +346,7 @@ function runBuildStage(string $tagID, Document $function, string $projectID, Dat } } + // Set build container's environment variables $vars = \array_merge($function->getAttribute('vars', []), [ 'APPWRITE_FUNCTION_ID' => $function->getId(), 'APPWRITE_FUNCTION_NAME' => $function->getAttribute('name', ''), @@ -344,6 +357,7 @@ function runBuildStage(string $tagID, Document $function, string $projectID, Dat 'APPWRITE_ENTRYPOINT_NAME' => $tag->getAttribute('entrypoint') ]); + // Start tracking time $buildStart = \microtime(true); $buildTime = \time(); @@ -361,6 +375,7 @@ function runBuildStage(string $tagID, Document $function, string $projectID, Dat } }; + // Launch build container $id = $orchestration->run( image: $runtime['base'], name: $container, @@ -370,6 +385,8 @@ function runBuildStage(string $tagID, Document $function, string $projectID, Dat 'appwrite-type' => 'function', 'appwrite-created' => strval($buildTime), 'appwrite-runtime' => $function->getAttribute('runtime', ''), + 'appwrite-project' => $projectID, + 'appwrite-tag' => $tagID ], command: [ 'tail', @@ -383,6 +400,7 @@ function runBuildStage(string $tagID, Document $function, string $projectID, Dat ] ); + // Extract user code into build container $untarStdout = ''; $untarStderr = ''; @@ -459,9 +477,9 @@ function runBuildStage(string $tagID, Document $function, string $projectID, Dat } $tag->setAttribute('buildPath', $path) - ->setAttribute('status', 'ready') - ->setAttribute('buildStdout', \utf8_encode(\mb_substr($buildStdout, -4096))) - ->setAttribute('buildStderr', \utf8_encode(\mb_substr($buildStderr, -4096))); + ->setAttribute('status', 'ready') + ->setAttribute('buildStdout', \utf8_encode(\mb_substr($buildStdout, -4096))) + ->setAttribute('buildStderr', \utf8_encode(\mb_substr($buildStderr, -4096))); // Update tag with built code attribute $tag = Authorization::skip(function () use ($tag, $tagID, $database) { @@ -475,8 +493,8 @@ function runBuildStage(string $tagID, Document $function, string $projectID, Dat Console::error('Tag build failed: ' . $e->getMessage()); $tag->setAttribute('status', 'failed') - ->setAttribute('buildStdout', \utf8_encode(\mb_substr($buildStdout, -4096))) - ->setAttribute('buildStderr', \utf8_encode(\mb_substr($buildStderr, -4096))); + ->setAttribute('buildStdout', \utf8_encode(\mb_substr($buildStdout, -4096))) + ->setAttribute('buildStderr', \utf8_encode(\mb_substr($buildStderr, -4096))); Authorization::skip(function () use ($tag, $tagID, $database) { return $database->updateDocument('tags', $tagID, $tag); @@ -493,7 +511,6 @@ function runBuildStage(string $tagID, Document $function, string $projectID, Dat function createRuntimeServer(string $functionId, string $projectId, Document $tag, Database $database) { - global $register; global $orchestration; global $runtimes; global $activeFunctions; @@ -604,6 +621,7 @@ function createRuntimeServer(string $functionId, string $projectId, Document $ta $vars[$key] = strval($value); } + // Launch runtime server $id = $orchestration->run( image: $runtime['image'], name: $container, @@ -612,6 +630,8 @@ function createRuntimeServer(string $functionId, string $projectId, Document $ta 'appwrite-type' => 'function', 'appwrite-created' => strval($executionTime), 'appwrite-runtime' => $function->getAttribute('runtime', ''), + 'appwrite-project' => $projectId, + 'appwrite-tag' => $tag->getId(), ], hostname: $container, mountFolder: $tagPathTargetDir, @@ -910,7 +930,7 @@ function execute(string $trigger, string $projectId, string $executionId, string roles: $target['roles'] ); - if(App::getEnv('_APP_USAGE_STATS', 'enabled') == 'enabled') { + if (App::getEnv('_APP_USAGE_STATS', 'enabled') == 'enabled') { $statsd = $register->get('statsd'); $usage = new Stats($statsd); @@ -923,8 +943,7 @@ function execute(string $trigger, string $projectId, string $executionId, string ->setParam('functionExecutionTime', $executionTime * 1000) // ms ->setParam('networkRequestSize', 0) ->setParam('networkResponseSize', 0) - ->submit() - ; + ->submit(); $usage->submit(); } @@ -998,11 +1017,11 @@ $http->on('request', function (SwooleRequest $swooleRequest, SwooleResponse $swo return $swooleResponse->end('401: Authentication Error'); } - App::setResource('dbForInternal', function($db, $cache) use ($projectId) { + App::setResource('dbForInternal', function ($db, $cache) use ($projectId) { $cache = new Cache(new RedisCache($cache)); - + $database = new Database(new MariaDB($db), $cache); - $database->setNamespace('project_'.$projectId.'_internal'); + $database->setNamespace('project_' . $projectId . '_internal'); return $database; }, ['db', 'cache']); @@ -1073,7 +1092,7 @@ $http->on('request', function (SwooleRequest $swooleRequest, SwooleResponse $swo /** @var PDOPool $dbPool */ $dbPool = $register->get('dbPool'); $dbPool->put($db); - + /** @var RedisPool $redisPool */ $redisPool = $register->get('redisPool'); $redisPool->put($redis); @@ -1089,11 +1108,42 @@ function handleShutdown() // Remove all containers. global $orchestration; + global $register; + $functionsToRemove = $orchestration->list(['label' => 'appwrite-type=function']); foreach ($functionsToRemove as $container) { try { $orchestration->remove($container->getId(), true); + + // Get a database instance + $db = $register->get('dbPool')->get(); + $cache = $register->get('redisPool')->get(); + + $cache = new Cache(new RedisCache($cache)); + + $database = new Database(new MariaDB($db), $cache); + $database->setNamespace('project_'.$container->getLabels()["appwrite-project"].'_internal'); + + // Get list of all processing executions + $executions = Authorization::skip(function () use ($database, $container) { + return $database->find('executions', [ + new Query('tagId', Query::TYPE_EQUAL, [$container->getLabels()["appwrite-tag"]]), + new Query('status', Query::TYPE_EQUAL, ['waiting']) + ]); + }); + + // Mark all processing executions as failed + foreach ($executions as $execution) { + $execution->setAttribute('status', 'failed') + ->setAttribute('exitCode', 1) + ->setAttribute('stderr', 'Appwrite was shutdown during execution'); + + Authorization::skip(function () use ($database, $execution) { + $database->updateDocument('executions', $execution->getId(), $execution); + }); + } + Console::info('Removed container ' . $container->getName()); } catch (Exception $e) { Console::error('Failed to remove container: ' . $container->getName());