From 1b0a02e20d654bfcb9a83babecbb0df2cf92009e Mon Sep 17 00:00:00 2001 From: Christy Jacob Date: Sat, 5 Feb 2022 23:49:57 +0400 Subject: [PATCH] feat: fix executor issues --- app/controllers/api/functions.php | 35 ++-- app/executor.php | 52 +----- app/workers/functions.php | 295 ++++++++++++++++++++---------- src/Executor/Executor.php | 2 + 4 files changed, 219 insertions(+), 165 deletions(-) diff --git a/app/controllers/api/functions.php b/app/controllers/api/functions.php index 8776a3d1d8..b899f9ed5d 100644 --- a/app/controllers/api/functions.php +++ b/app/controllers/api/functions.php @@ -332,7 +332,7 @@ App::put('/v1/functions/:functionId') ]))); if ($next && $schedule !== $original) { - ResqueScheduler::enqueueAt($next, 'v1-functions', 'FunctionsV1', [ + ResqueScheduler::enqueueAt($next, Event::FUNCTIONS_QUEUE_NAME, Event::FUNCTIONS_CLASS_NAME, [ 'projectId' => $project->getId(), 'webhooks' => $project->getAttribute('webhooks', []), 'functionId' => $function->getId(), @@ -761,13 +761,14 @@ App::post('/v1/functions/:functionId/executions') } $runtimes = Config::getParam('runtimes', []); - $key = $function->getAttribute('runtime', ''); - $runtime = isset($runtimes[$key]) ? $runtimes[$key] : null; + + $runtime = (isset($runtimes[$function->getAttribute('runtime', '')])) ? $runtimes[$function->getAttribute('runtime', '')] : null; + if (\is_null($runtime)) { throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported', 400); } - $deployment = Authorization::skip(fn() => $dbForProject->getDocument('deployments', $function->getAttribute('deployment'))); + $deployment = Authorization::skip(fn() => $dbForProject->getDocument('deployments', $function->getAttribute('deployment', ''))); if ($deployment->getAttribute('resourceId') !== $function->getId()) { throw new Exception('Deployment not found. Deploy deployment before trying to execute a function', 404); @@ -784,7 +785,7 @@ App::post('/v1/functions/:functionId/executions') } if ($build->getAttribute('status') !== 'ready') { - throw new Exception('Build not completed', 400); + throw new Exception('Build not ready', 400); } $validator = new Authorization('execute'); @@ -798,7 +799,7 @@ App::post('/v1/functions/:functionId/executions') $execution = Authorization::skip(fn() => $dbForProject->createDocument('executions', new Document([ '$id' => $executionId, '$read' => (!$user->isEmpty()) ? ['user:' . $user->getId()] : [], - '$write' => ['role:all'], + '$write' => [], 'dateCreated' => time(), 'functionId' => $function->getId(), 'deploymentId' => $deployment->getId(), @@ -833,32 +834,23 @@ App::post('/v1/functions/:functionId/executions') } if ($async) { - Resque::enqueue('v1-functions', 'FunctionsV1', [ + Resque::enqueue(Event::FUNCTIONS_QUEUE_NAME, Event::FUNCTIONS_CLASS_NAME, [ 'projectId' => $project->getId(), - 'deploymentId' => $deployment->getId(), - 'buildId' => $deployment->getAttribute('buildId', ''), - 'path' => $build->getAttribute('outputPath', ''), - 'vars' => $function->getAttribute('vars', []), - 'data' => $data, - 'runtime' => $function->getAttribute('runtime', ''), - 'timeout' => $function->getAttribute('timeout', 0), - 'baseImage' => $runtime['image'], - 'webhooks' => $project->getAttribute('webhooks', []), - 'userId' => $user->getId(), 'functionId' => $function->getId(), + 'webhooks' => $project->getAttribute('webhooks', []), 'executionId' => $execution->getId(), 'trigger' => 'http', + 'data' => $data, + 'userId' => $user->getId(), 'jwt' => $jwt, ]); $response->setStatusCode(Response::STATUS_CODE_CREATED); $response->dynamic($execution, Response::MODEL_EXECUTION); - return $response; } /** Send variables */ $vars = \array_merge($function->getAttribute('vars', []), [ - 'ENTRYPOINT_NAME' => $deployment->getAttribute('entrypoint', ''), 'APPWRITE_FUNCTION_ID' => $function->getId(), 'APPWRITE_FUNCTION_NAME' => $function->getAttribute('name', ''), 'APPWRITE_FUNCTION_DEPLOYMENT' => $deployment->getId(), @@ -871,9 +863,8 @@ App::post('/v1/functions/:functionId/executions') 'APPWRITE_FUNCTION_JWT' => $jwt, ]); - // Directly execute function. + /** Execute function */ $executor = new Executor(); - $responseExecute = $executor->createExecution( projectId: $project->getId(), functionId: $function->getId(), @@ -882,6 +873,7 @@ App::post('/v1/functions/:functionId/executions') path: $build->getAttribute('outputPath', ''), vars: $vars, data: $data, + entrypoint: $deployment->getAttribute('entrypoint', ''), runtime: $function->getAttribute('runtime', ''), timeout: $function->getAttribute('timeout', 0), baseImage: $runtime['image'], @@ -889,6 +881,7 @@ App::post('/v1/functions/:functionId/executions') userId: $user->getId(), ); + /** Update execution status */ $execution->setAttribute('status', $responseExecute['status']); $execution->setAttribute('statusCode', $responseExecute['statusCode']); $execution->setAttribute('stdout', $responseExecute['stdout']); diff --git a/app/executor.php b/app/executor.php index eafc634e4b..050800b5c8 100644 --- a/app/executor.php +++ b/app/executor.php @@ -1,11 +1,7 @@ '/usr/code', - 'file' => $vars['ENTRYPOINT_NAME'], + 'file' => $entrypoint, 'env' => $vars, 'payload' => $data, 'timeout' => $timeout ?? (int) App::getEnv('_APP_FUNCTIONS_TIMEOUT', 900) @@ -348,8 +344,6 @@ function execute(string $projectId, string $functionId, string $deploymentId, ar throw new Exception('An internal curl error has occurred within the executor! Error Msg: ' . $error, 500); } - var_dump($executorResponse); - $executionData = []; if (!empty($executorResponse)) { @@ -390,44 +384,7 @@ function execute(string $projectId, string $functionId, string $deploymentId, ar 'time' => $executionTime, ]; - /** Trigger event */ - $executionModel = new Execution(); - $executionUpdate = new Event('v1-webhooks', 'WebhooksV1'); - $executionUpdate - ->setParam('projectId', $projectId) - ->setParam('userId', $userId) - ->setParam('webhooks', $webhooks) - ->setParam('event', 'functions.executions.update') - ->setParam('eventData', (new Document($execution))->getArrayCopy(array_keys($executionModel->getRules()))); - $executionUpdate->trigger(); - - /** Trigger realtime event */ - $target = Realtime::fromPayload('functions.executions.update', new Document($execution)); - Realtime::send( - projectId: $projectId, - payload: $execution, - event: 'functions.executions.update', - channels: $target['channels'], - roles: $target['roles'] - ); - - if (App::getEnv('_APP_USAGE_STATS', 'enabled') === 'enabled') { - $statsd = $register->get('statsd'); - $usage = new Stats($statsd); - $usage - ->setParam('projectId', $projectId) - ->setParam('functionId', $functionId) - ->setParam('functionExecution', 1) - ->setParam('functionStatus', $functionStatus) - ->setParam('functionExecutionTime', $executionTime * 1000) // ms - ->setParam('networkRequestSize', 0) - ->setParam('networkResponseSize', 0) - ->submit(); - $usage->submit(); - } - return $execution; - }; function runBuildStage(string $buildId, string $projectID, string $path, array $vars, string $baseImage, string $runtime): array @@ -651,6 +608,7 @@ App::post('/v1/functions/:functionId/executions') ->param('vars', '', new Assoc(), 'Environment Variables required for the build', false) ->param('data', '', new Text(8192), 'Data to be forwarded to the function, this is user specified.', true) ->param('runtime', '', new Text(128), 'Runtime for the cloud function', false) + ->param('entrypoint', '', new Text(256), 'Entrypoint of the code file') ->param('timeout', 15, new ValidatorRange(1, 900), 'Function maximum execution time in seconds.', true) ->param('baseImage', '', new Text(128), 'Base image name of the runtime', false) ->param('webhooks', [], new ArrayList(new JSON()), 'Any webhooks that need to be triggered after this execution', true) @@ -658,7 +616,7 @@ App::post('/v1/functions/:functionId/executions') ->inject('projectId') ->inject('response') ->action( - function (string $functionId, string $deploymentId, string $buildId, string $path, array $vars, string $data, string $runtime, $timeout, string $baseImage, array $webhooks, string $userId, string $projectId, Response $response) { + function (string $functionId, string $deploymentId, string $buildId, string $path, array $vars, string $data, string $runtime, string $entrypoint, $timeout, string $baseImage, array $webhooks, string $userId, string $projectId, Response $response) { $build = [ '$id' => $buildId, @@ -666,7 +624,7 @@ App::post('/v1/functions/:functionId/executions') ]; // Send both data and vars from the caller - $execution = execute($projectId, $functionId, $deploymentId, $build, $vars, $data, $userId, $baseImage, $runtime, $timeout, $webhooks); + $execution = execute($projectId, $functionId, $deploymentId, $build, $vars, $data, $userId, $baseImage, $runtime, $entrypoint, $timeout, $webhooks); $response ->setStatusCode(Response::STATUS_CODE_OK) diff --git a/app/workers/functions.php b/app/workers/functions.php index bf5779f1e6..52c3df4fb1 100644 --- a/app/workers/functions.php +++ b/app/workers/functions.php @@ -1,7 +1,12 @@ executor = new Executor(); } public function run(): void @@ -70,13 +81,7 @@ class FunctionsV1 extends Worker /** @var Document[] $functions */ while ($sum >= $limit) { - - Authorization::disable(); - - $functions = $database->find('functions', [], $limit, $offset, ['name'], [Database::ORDER_ASC]); - - Authorization::reset(); - + $functions = Authorization::skip(fn() => $database->find('functions', [], $limit, $offset, ['name'], [Database::ORDER_ASC])); $sum = \count($functions); $offset = $offset + $limit; @@ -84,31 +89,31 @@ class FunctionsV1 extends Worker foreach ($functions as $function) { $events = $function->getAttribute('events', []); - $deployment = $function->getAttribute('deployment', []); - Console::success('Itterating function: ' . $function->getAttribute('name')); - - if (!\in_array($event, $events) || empty($deployment)) { + if (!\in_array($event, $events)) { continue; } - Console::success('Triggered function: ' . $event); + Console::success('Iterating function: ' . $function->getAttribute('name')); $this->execute( - trigger: 'event', projectId: $projectId, - executionId: '', - database: $database, function: $function, + dbForProject: $database, + executionId: $executionId, + webhooks: $webhooks, + trigger: $trigger, event: $event, eventData: $eventData, data: $data, - webhooks: $webhooks, userId: $userId, jwt: $jwt ); + + Console::success('Triggered function: ' . $event); } } + break; case 'schedule': @@ -126,9 +131,7 @@ class FunctionsV1 extends Worker */ // Reschedule - Authorization::disable(); - $function = $database->getDocument('functions', $functionId); - Authorization::reset(); + $function = Authorization::skip(fn() => $database->getDocument('functions', $functionId)); if (empty($function->getId())) { throw new Exception('Function not found ('.$functionId.')'); @@ -145,19 +148,18 @@ class FunctionsV1 extends Worker ->setAttribute('scheduleNext', $next) ->setAttribute('schedulePrevious', \time()); - Authorization::disable(); + $function = Authorization::skip(function() use ($database, $function, $next, $functionId) { + $function = $database->updateDocument('functions', $function->getId(), new Document(array_merge($function->getArrayCopy(), [ + 'scheduleNext' => (int)$next, + ]))); + + if ($function === false) { + throw new Exception('Function update failed (' . $functionId . ')'); + } + return $function; + }); - $function = $database->updateDocument('functions', $function->getId(), new Document(array_merge($function->getArrayCopy(), [ - 'scheduleNext' => (int)$next, - ]))); - - if ($function === false) { - throw new Exception('Function update failed (' . $functionId . ')'); - } - - Authorization::reset(); - - ResqueScheduler::enqueueAt($next, 'v1-functions', 'FunctionsV1', [ + ResqueScheduler::enqueueAt($next, Event::FUNCTIONS_QUEUE_NAME, Event::FUNCTIONS_CLASS_NAME, [ 'projectId' => $projectId, 'webhooks' => $webhooks, 'functionId' => $function->getId(), @@ -168,101 +170,200 @@ class FunctionsV1 extends Worker ]); // Async task reschedule $this->execute( - trigger: $trigger, projectId: $projectId, - executionId: $executionId, - database: $database, function: $function, - data: $data, + dbForProject: $database, + executionId: $executionId, webhooks: $webhooks, + trigger: $trigger, + event: $event, + eventData: $eventData, + data: $data, userId: $userId, jwt: $jwt ); break; case 'http': - Authorization::disable(); - $function = $database->getDocument('functions', $functionId); - Authorization::reset(); + $function = Authorization::skip(fn() => $database->getDocument('functions', $functionId)); if (empty($function->getId())) { throw new Exception('Function not found ('.$functionId.')'); } - $deployment = Authorization::skip(fn() => $database->getDocument('deployments', $function->getAttribute('deployment'))); - - $build = Authorization::skip(fn() => $database->getDocument('builds', $deployment->getAttribute('build'))); - $path = $build->getAttribute('path', ''); - $this->execute( - trigger: $trigger, projectId: $projectId, - executionId: $executionId, - path: $path, - buildId: $deployment->getAttribute('buildId', ''), - deploymentId: $deployment->getId(), - database: $database, function: $function, - data: $data, + dbForProject: $database, + executionId: $executionId, webhooks: $webhooks, + trigger: $trigger, + event: $event, + eventData: $eventData, + data: $data, userId: $userId, jwt: $jwt ); + break; } } - /** - * Execute function deployment - * - * @param string $trigger - * @param string $projectId - * @param string $executionId - * @param Database $database - * @param Document $function - * @param string $event - * @param string $eventData - * @param string $data - * @param array $webhooks - * @param string $userId - * @param string $jwt - * - * @return void - */ - public function execute(string $trigger, string $path, string $projectId, string $deploymentId, string $buildId, string $executionId, Database $database, Document $function, string $event = '', string $eventData = '', string $data = '', array $webhooks = [], string $userId = '', string $jwt = ''): void - { - $ch = \curl_init(); - \curl_setopt($ch, CURLOPT_URL, "http://appwrite-executor/v1/functions/{$function->getId()}/executions"); - \curl_setopt($ch, CURLOPT_POST, true); - \curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode([ - 'deploymentId' => $deploymentId, - 'buildId' => $buildId, - 'path' => $path, - 'vars' => $function->getAttribute('vars', []), - 'data' => $data, - 'runtime' => $function->getAttribute('runtime', ''), - 'timeout' => $function->getAttribute('timeout', 0), - 'baseImage' => '', - 'webhooks' => $webhooks, - 'userId' => $userId, - ])); - \curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); - \curl_setopt($ch, CURLOPT_TIMEOUT, App::getEnv('_APP_FUNCTIONS_TIMEOUT', 900) + 200); // + 200 for safety margin - \curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, 10); - \curl_setopt($ch, CURLOPT_HTTPHEADER, [ - 'Content-Type: application/json', - 'x-appwrite-project: '.$projectId, - 'x-appwrite-executor-key: '. App::getEnv('_APP_EXECUTOR_SECRET', '') - ]); + private function execute( + string $projectId, + Document $function, + Database $dbForProject, + string $executionId, + array $webhooks, + string $trigger, + string $event, + string $eventData, + string $data, + string $userId, + string $jwt + ) { - \curl_exec($ch); + $functionId = $function->getId(); + $deploymentId = $function->getAttribute('deployment', ''); - $error = \curl_error($ch); - if (!empty($error)) { - Console::error('Curl error: '.$error); + /** Check if deployment exists */ + $deployment = Authorization::skip(fn() => $dbForProject->getDocument('deployments', $deploymentId)); + + if ($deployment->getAttribute('resourceId') !== $functionId) { + throw new Exception('Deployment not found. Create deployment before trying to execute a function', 404); } - \curl_close($ch); + if ($deployment->isEmpty()) { + throw new Exception('Deployment not found. Create deployment before trying to execute a function', 404); + } + + /** Check if build has exists */ + $build = Authorization::skip(fn() => $dbForProject->getDocument('builds', $deployment->getAttribute('buildId', ''))); + if ($build->isEmpty()) { + throw new Exception('Build not found', 404); + } + + if ($build->getAttribute('status') !== 'ready') { + throw new Exception('Build not ready', 400); + } + + /** Check if runtime is supported */ + $runtimes = Config::getParam('runtimes', []); + $runtime = (isset($runtimes[$function->getAttribute('runtime', '')])) ? $runtimes[$function->getAttribute('runtime', '')] : null; + + if (\is_null($runtime)) { + throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported', 400); + } + + /** Create execution or update execution status */ + $execution = Authorization::skip(function() use ($dbForProject, &$executionId, $functionId, $deploymentId, $trigger, $userId) { + $execution = $dbForProject->getDocument('executions', $executionId); + if ($execution->isEmpty()) { + $executionId = $dbForProject->getId(); + $execution = $dbForProject->createDocument('executions', new Document([ + '$id' => $executionId, + '$read' => $userId ? ['user:' . $userId] : [], + '$write' => [], + 'dateCreated' => time(), + 'functionId' => $functionId, + 'deploymentId' => $deploymentId, + 'trigger' => $trigger, + 'status' => 'waiting', + 'statusCode' => 0, + 'stdout' => '', + 'stderr' => '', + 'time' => 0.0, + 'search' => implode(' ', [$functionId, $executionId]), + ])); + + if ($execution->isEmpty()) { + throw new Exception('Failed to create or read execution'); + } + } + $execution->setAttribute('status', 'processing'); + $execution = $dbForProject->updateDocument('executions', $executionId, $execution); + return $execution; + }); + + /** Collect environment variables */ + $vars = [ + 'APPWRITE_FUNCTION_ID' => $functionId, + 'APPWRITE_FUNCTION_NAME' => $function->getAttribute('name', ''), + 'APPWRITE_FUNCTION_DEPLOYMENT' => $deploymentId, + 'APPWRITE_FUNCTION_RUNTIME_NAME' => $runtime['name'], + 'APPWRITE_FUNCTION_RUNTIME_VERSION' => $runtime['version'], + 'APPWRITE_FUNCTION_TRIGGER' => $trigger, + 'APPWRITE_FUNCTION_EVENT' => $event, + 'APPWRITE_FUNCTION_EVENT_DATA' => $eventData, + 'APPWRITE_FUNCTION_DATA' => $data, + 'APPWRITE_FUNCTION_PROJECT_ID' => $projectId, + 'APPWRITE_FUNCTION_USER_ID' => $userId, + 'APPWRITE_FUNCTION_JWT' => $jwt, + ]; + $vars = \array_merge($function->getAttribute('vars', []), $vars); + + /** Execute function */ + $executionResponse = $this->executor->createExecution( + projectId: $projectId, + functionId: $functionId, + deploymentId: $deploymentId, + buildId: $deployment->getAttribute('buildId', ''), + path: $build->getAttribute('outputPath', ''), + vars: $vars, + entrypoint: $deployment->getAttribute('entrypoint', ''), + data: $vars['APPWRITE_FUNCTION_DATA'], + runtime: $function->getAttribute('runtime', ''), + timeout: $function->getAttribute('timeout', 0), + baseImage: $runtime['image'], + webhooks: $webhooks, + userId: $userId, + ); + + /** Update execution status */ + $execution->setAttribute('status', $executionResponse['status']); + $execution->setAttribute('statusCode', $executionResponse['statusCode']); + $execution->setAttribute('stdout', $executionResponse['stdout']); + $execution->setAttribute('stderr', $executionResponse['stderr']); + $execution->setAttribute('time', $executionResponse['time']); + $execution = Authorization::skip(fn() => $dbForProject->updateDocument('executions', $executionId, $execution)); + + /** Trigger Webhook */ + $executionModel = new Execution(); + $executionUpdate = new Event(Event::WEBHOOK_QUEUE_NAME, Event::WEBHOOK_CLASS_NAME); + $executionUpdate + ->setParam('projectId', $projectId) + ->setParam('userId', $userId) + ->setParam('webhooks', $webhooks) + ->setParam('event', 'functions.executions.update') + ->setParam('eventData', $execution->getArrayCopy(array_keys($executionModel->getRules()))); + $executionUpdate->trigger(); + + /** Trigger realtime event */ + $target = Realtime::fromPayload('functions.executions.update', $execution); + Realtime::send( + projectId: $projectId, + payload: $execution->getArrayCopy(), + event: 'functions.executions.update', + channels: $target['channels'], + roles: $target['roles'] + ); + + /** Update usage stats */ + global $register; + if (App::getEnv('_APP_USAGE_STATS', 'enabled') === 'enabled') { + $statsd = $register->get('statsd'); + $usage = new Stats($statsd); + $usage + ->setParam('projectId', $projectId) + ->setParam('functionId', $function->getId()) + ->setParam('functionExecution', 1) + ->setParam('functionStatus', $execution->getAttribute('status', '')) + ->setParam('functionExecutionTime', $execution->getAttribute('time') * 1000) // ms + ->setParam('networkRequestSize', 0) + ->setParam('networkResponseSize', 0) + ->submit(); + $usage->submit(); + } } public function shutdown(): void diff --git a/src/Executor/Executor.php b/src/Executor/Executor.php index 8cbb1f17fb..e38ef0585f 100644 --- a/src/Executor/Executor.php +++ b/src/Executor/Executor.php @@ -92,6 +92,7 @@ class Executor string $buildId, string $path, array $vars, + string $entrypoint, string $data, string $runtime, string $baseImage, @@ -117,6 +118,7 @@ class Executor 'baseImage' => $baseImage, 'webhooks' => $webhooks, 'userId' => $userId, + 'entrypoint' => $entrypoint ]; $response = $this->call(self::METHOD_POST, $route, $headers, $params, true, 30);