diff --git a/app/cli.php b/app/cli.php index 544148c8d5..13709b9b57 100644 --- a/app/cli.php +++ b/app/cli.php @@ -110,7 +110,7 @@ CLI::setResource('influxdb', function (Registry $register) { return $database; }, ['register']); -CLI::setResource('functions', function (Group $pools) { +CLI::setResource('queueForFunctions', function (Group $pools) { return new Func($pools->get('queue')->pop()->getResource()); }, ['pools']); diff --git a/app/controllers/api/functions.php b/app/controllers/api/functions.php index d64f302636..86981b0071 100644 --- a/app/controllers/api/functions.php +++ b/app/controllers/api/functions.php @@ -469,13 +469,13 @@ App::put('/v1/functions/:functionId') 'search' => implode(' ', [$functionId, $name, $function->getAttribute('runtime')]), ]))); - $schedule = $dbForConsole->getDocument('schedules', $function['scheduleId']); + $schedule = $dbForConsole->getDocument('schedules', $function->getAttribute('scheduleId')); /** * In case we want to clear the schedule */ if (!empty($function->getAttribute('deployment'))) { - $schedule->setAttribute('resourceUpdatedAt', $function['scheduleUpdatedAt']); + $schedule->setAttribute('resourceUpdatedAt', $function->getAttribute('scheduleUpdatedAt')); } $schedule @@ -537,7 +537,7 @@ App::patch('/v1/functions/:functionId/deployments/:deploymentId') 'deployment' => $deployment->getId() ]))); - $schedule = $dbForConsole->getDocument('schedules', $function['scheduleId']); + $schedule = $dbForConsole->getDocument('schedules', $function->getAttribute('scheduleId')); $active = !empty($function->getAttribute('schedule')); @@ -588,7 +588,7 @@ App::delete('/v1/functions/:functionId') throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Failed to remove function from DB'); } - $schedule = $dbForConsole->getDocument('schedules', $function['scheduleId']); + $schedule = $dbForConsole->getDocument('schedules', $function->getAttribute('scheduleId')); $schedule ->setAttribute('resourceUpdatedAt', DateTime::now()) @@ -790,7 +790,7 @@ App::post('/v1/functions/:functionId/deployments') * TODO Should we update also the function collection with the scheduleUpdatedAt attr? */ - $schedule = $dbForConsole->getDocument('schedules', $function['scheduleId']); + $schedule = $dbForConsole->getDocument('schedules', $function->getAttribute('scheduleId')); $active = !empty($function->getAttribute('schedule')); @@ -1059,8 +1059,8 @@ App::post('/v1/functions/:functionId/executions') ->inject('events') ->inject('usage') ->inject('mode') - ->inject('functions') - ->action(function (string $functionId, string $data, bool $async, Response $response, Document $project, Database $dbForProject, Document $user, Event $events, Stats $usage, string $mode, Func $functions) { + ->inject('queueForFunctions') + ->action(function (string $functionId, string $data, bool $async, Response $response, Document $project, Database $dbForProject, Document $user, Event $events, Stats $usage, string $mode, Func $queueForFunctions) { $function = Authorization::skip(fn () => $dbForProject->getDocument('functions', $functionId)); @@ -1148,7 +1148,7 @@ App::post('/v1/functions/:functionId/executions') ->setContext('function', $function); if ($async) { - $functions + $queueForFunctions ->setType('http') ->setExecution($execution) ->setFunction($function) @@ -1162,7 +1162,7 @@ App::post('/v1/functions/:functionId/executions') ->dynamic($execution, Response::MODEL_EXECUTION); } - $vars = array_reduce($function['vars'] ?? [], function (array $carry, Document $var) { + $vars = array_reduce($function->getAttribute('vars', []), function (array $carry, Document $var) { $carry[$var->getAttribute('key')] = $var->getAttribute('value') ?? ''; return $carry; }, []); diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index a12016606f..5624486755 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -252,8 +252,8 @@ App::shutdown() ->inject('database') ->inject('mode') ->inject('dbForProject') - ->inject('functions') - ->action(function (App $utopia, Request $request, Response $response, Document $project, Event $events, Audit $audits, Stats $usage, Delete $deletes, EventDatabase $database, string $mode, Database $dbForProject, Func $functions) use ($parseLabel) { + ->inject('queueForFunctions') + ->action(function (App $utopia, Request $request, Response $response, Document $project, Event $events, Audit $audits, Stats $usage, Delete $deletes, EventDatabase $database, string $mode, Database $dbForProject, Func $queueForFunctions) use ($parseLabel) { $responsePayload = $response->getPayload(); @@ -264,10 +264,8 @@ App::shutdown() /** * Trigger functions. */ - $functions + $queueForFunctions ->from($events) - ->setQueue(Event::FUNCTIONS_QUEUE_NAME) - ->setClass(Event::FUNCTIONS_CLASS_NAME) ->trigger(); /** diff --git a/app/init.php b/app/init.php index a6c4b89c45..af3b05965d 100644 --- a/app/init.php +++ b/app/init.php @@ -844,7 +844,7 @@ App::setResource('mails', fn() => new Mail()); App::setResource('deletes', fn() => new Delete()); App::setResource('database', fn() => new EventDatabase()); App::setResource('messaging', fn() => new Phone()); -App::setResource('functions', function (Group $pools) { +App::setResource('queueForFunctions', function (Group $pools) { return new Func($pools->get('queue')->pop()->getResource()); }, ['pools']); App::setResource('usage', function ($register) { diff --git a/app/workers/builds.php b/app/workers/builds.php index 315b13a179..478c5c5423 100644 --- a/app/workers/builds.php +++ b/app/workers/builds.php @@ -15,6 +15,7 @@ use Utopia\Database\ID; use Utopia\Storage\Storage; use Utopia\Database\Document; use Utopia\Config\Config; +use Utopia\Database\Validator\Authorization; require_once __DIR__ . '/../init.php'; @@ -156,7 +157,7 @@ class BuildsV1 extends Worker $source = $deployment->getAttribute('path'); - $vars = array_reduce($function['vars'] ?? [], function (array $carry, Document $var) { + $vars = array_reduce($function->getAttribute('vars', []), function (array $carry, Document $var) { $carry[$var->getAttribute('key')] = $var->getAttribute('value'); return $carry; }, []); @@ -192,6 +193,8 @@ class BuildsV1 extends Worker Console::success("Build id: $buildId created"); + $function->setAttribute('scheduleUpdatedAt', DateTime::now()); + /** Set auto deploy */ if ($deployment->getAttribute('activate') === true) { $function->setAttribute('deployment', $deployment->getId()); @@ -199,14 +202,15 @@ class BuildsV1 extends Worker } /** Update function schedule */ - // TODO: @Meldiron refactor scheduler here - /* - $schedule = $function->getAttribute('schedule', ''); - $cron = (empty($function->getAttribute('deployment')) && !empty($schedule)) ? new CronExpression($schedule) : null; - $next = (empty($function->getAttribute('deployment')) && !empty($schedule)) ? DateTime::format($cron->getNextRunDate()) : null; - $function->setAttribute('scheduleNext', $next); - $function = $dbForProject->updateDocument('functions', $function->getId(), $function); - */ + $schedule = $dbForConsole->getDocument('schedules', $function->getAttribute('scheduleId')); + $schedule->setAttribute('resourceUpdatedAt', $function->getAttribute('scheduleUpdatedAt')); + + $schedule + ->setAttribute('schedule', $function->getAttribute('schedule')) + ->setAttribute('active', !empty($function->getAttribute('schedule')) && !empty($function->getAttribute('deployment'))); + + + Authorization::skip(fn () => $dbForConsole->updateDocument('schedules', $schedule->getId(), $schedule)); } catch (\Throwable $th) { $endTime = DateTime::now(); $interval = (new \DateTime($endTime))->diff(new \DateTime($startTime)); diff --git a/app/workers/functions.php b/app/workers/functions.php index bfa4421449..68b9c21bf6 100644 --- a/app/workers/functions.php +++ b/app/workers/functions.php @@ -29,6 +29,7 @@ Authorization::setDefaultStatus(false); global $connection; global $workerNumber; + $adapter = new Swoole($connection, $workerNumber, Event::FUNCTIONS_QUEUE_NAME); $server = new Server($adapter); @@ -145,7 +146,7 @@ Server::setResource('execute', function () { $execution->setAttribute('status', 'processing'); $execution = $dbForProject->updateDocument('executions', $executionId, $execution); - $vars = array_reduce($function['vars'] ?? [], function (array $carry, Document $var) { + $vars = array_reduce($function->getAttribute('vars', []), function (array $carry, Document $var) { $carry[$var->getAttribute('key')] = $var->getAttribute('value'); return $carry; }, []); diff --git a/src/Appwrite/Platform/Tasks/Schedule.php b/src/Appwrite/Platform/Tasks/Schedule.php index 88fd0a9a80..dc86f9293e 100644 --- a/src/Appwrite/Platform/Tasks/Schedule.php +++ b/src/Appwrite/Platform/Tasks/Schedule.php @@ -3,17 +3,15 @@ namespace Appwrite\Platform\Tasks; use Cron\CronExpression; +use Swoole\Timer; use Utopia\App; use Utopia\Platform\Action; use Utopia\CLI\Console; use Utopia\Database\DateTime; use Utopia\Database\Document; use Utopia\Database\Query; -use Swoole\Timer; use Utopia\Database\Database; use Utopia\Pools\Group; -use Utopia\Queue\Client as Worker; -use Appwrite\Event\Event; use Appwrite\Event\Func; use function Swoole\Coroutine\run;