diff --git a/app/controllers/api/edge.php b/app/controllers/api/edge.php index 61dc5f2916..fd124726c6 100644 --- a/app/controllers/api/edge.php +++ b/app/controllers/api/edge.php @@ -2,15 +2,16 @@ use Ahc\Jwt\JWT; use Ahc\Jwt\JWTException; -use Appwrite\Event\Delete; use Appwrite\Event\SyncIn; use Appwrite\Extend\Exception; use Appwrite\Utopia\Request; use Appwrite\Utopia\Response; use Utopia\App; +use Utopia\Registry\Registry; use Utopia\Validator\ArrayList; use Utopia\Validator\Text; -use Utopia\Validator\WhiteList; +use Utopia\Queue\Client; +use Utopia\Queue\Connection\Redis; App::post('/v1/edge') ->desc('Purge cache keys') @@ -18,11 +19,12 @@ App::post('/v1/edge') ->param('keys', '', new ArrayList(new Text(100), 1000), 'Cache keys') ->inject('request') ->inject('response') - ->action(function (array $keys, Request $request, Response $response) { + ->inject('register') + ->action(function (array $keys, Request $request, Response $response, Registry $register) { - if (empty($keys)) { + //if (empty($keys)) { throw new Exception(Exception::KEY_NOT_FOUND); - } + //} $token = $request->getHeader('authorization'); $token = str_replace(["Bearer"," "], "", $token); @@ -33,40 +35,22 @@ App::post('/v1/edge') throw new Exception(Exception::USER_JWT_INVALID, 'Failed to verify JWT. ' . $error->getMessage()); } - $syncIn = new SyncIn(); + $connection = $register + ->get('workerRedisConnection'); + + $client = new Client('syncIn', $connection); + $client->resetStats(); + foreach ($keys as $key) { - $syncIn - ->addKey($key) - ->trigger(); + $client->enqueue([ + 'type' => 'from endpoint', + 'value' => [ + 'key' => $key + ] + ]); } $response ->setStatusCode(Response::STATUS_CODE_OK) ->send(); }); - -App::post('/v1/edge/notify') - ->desc('Flush notification') - ->label('scope', 'public') - ->param('region', '', new WhiteList(['nyc1', 'blr1', 'fra1']), 'Cloud regions') - ->inject('request') - ->inject('response') - ->action(function (string $region, Request $request, Response $response) { - - $token = $request->getHeader('authorization'); - $token = str_replace(["Bearer"," "], "", $token); - $jwt = new JWT(App::getEnv('_APP_OPENSSL_KEY_V1'), 'HS256', 600, 10); - try { - $payload = $jwt->decode($token); - } catch (JWTException $error) { - throw new Exception(Exception::USER_JWT_INVALID, 'Failed to verify JWT. ' . $error->getMessage()); - } - - (new Delete()) - ->setRegion($region) - ->trigger(); - - $response - ->setStatusCode(Response::STATUS_CODE_OK) - ->send(); - }); diff --git a/app/init.php b/app/init.php index d0763c6295..470552fc74 100644 --- a/app/init.php +++ b/app/init.php @@ -48,6 +48,8 @@ use Utopia\Database\ID; use Utopia\Logger\Logger; use Utopia\Config\Config; use Utopia\Locale\Locale; +use Utopia\Queue\Client as clientQueue; +use Utopia\Queue\Connection\Redis as redisQueue; use Utopia\Registry\Registry; use MaxMind\Db\Reader; use PHPMailer\PHPMailer\PHPMailer; @@ -931,24 +933,47 @@ $register->set('syncOut', function () { return new SyncOut(); }); +$register->set('workerRedisConnection', function () { + return new redisQueue('redis', 6379); +}); + +$register->set('workerSyncOut', function () use ($register) { + return new clientQueue('syncOut', $register->get('workerRedisConnection')); +}); + + App::setResource('dbForProject', function ($db, $cache, Document $project, $register) { $cache = new Cache(new RedisCache($cache)); $cache->on(cache::EVENT_SAVE, function ($key) use ($register) { + $register - ->get('syncOut') - ->addKey($key) - ->trigger(); + ->get('workerSyncOut') + ->resetStats(); + $register + ->get('workerSyncOut') + ->enqueue([ + 'type' => 'saved from init', + 'value' => [ + 'key' => $key + ] + ]); }); $cache->on(cache::EVENT_PURGE, function ($key) use ($register) { $register - ->get('syncOut') - ->addKey($key) - ->trigger(); + ->get('workerSyncOut') + ->resetStats(); + $register + ->get('workerSyncOut') + ->enqueue([ + 'type' => 'purge from init', + 'value' => [ + 'key' => $key + ] + ]); }); - $database = new Database(new MariaDB($db), $cache); $database->setDefaultDatabase(App::getEnv('_APP_DB_SCHEMA', 'appwrite')); $database->setNamespace("_{$project->getInternalId()}"); diff --git a/app/preload.php b/app/preload.php index fb63a381b6..e890f8fc83 100644 --- a/app/preload.php +++ b/app/preload.php @@ -36,6 +36,7 @@ foreach ( realpath(__DIR__ . '/../vendor/mongodb'), realpath(__DIR__ . '/../vendor/utopia-php/websocket'), // TODO: remove workerman autoload realpath(__DIR__ . '/../vendor/utopia-php/cache'), // TODO: Remove when memcached ext issue get fixed + realpath(__DIR__ . '/../vendor/utopia-php/queue'), // TODO: Remove when memcached ext issue get fixed ] as $key => $value ) { if ($value !== false) { diff --git a/app/tasks/syncsCloud.php b/app/tasks/syncsCloud.php index 276e20930f..522a867a35 100644 --- a/app/tasks/syncsCloud.php +++ b/app/tasks/syncsCloud.php @@ -3,7 +3,6 @@ global $cli; global $register; -use Appwrite\Event\SyncOut; use Utopia\App; use Utopia\Cache\Cache; use Utopia\CLI\Console; @@ -13,6 +12,7 @@ use Utopia\Database\DateTime; use Utopia\Cache\Adapter\Redis as RedisCache; use Utopia\Database\Query; + function getConsoleDatabase(): Database { global $register; @@ -44,6 +44,42 @@ function getConsoleDatabase(): Database return $database; } +function syncRegionalCache($dbForConsole, $regionOrg): void +{ + global $register; + + $time = DateTime::now(); + $chunks = $dbForConsole->find('syncs', [ + Query::equal('regionOrg', [$regionOrg]), + Query::limit(500) + ]); + + if (count($chunks) > 0) { + Console::info("[{$time}] Found " . \count($chunks) . " cache key chunks to purge."); + foreach ($chunks as $chunk) { + $keys = $chunk->getAttribute('keys'); + foreach ($keys ?? [] as $key) { + $register + ->get('workerSyncOut') + ->resetStats(); + + $register + ->get('workerSyncOut') + ->enqueue([ + 'type' => 'from cloud maintenance', + 'value' => [ + 'region' => $chunk->getAttribute('regionDest'), + 'key' => $key + ] + ]); + } + $dbForConsole->deleteDocument('syncs', $chunk->getId()); + } + } else { + Console::info("[{$time}] No cache key chunks where found."); + } +} + $cli ->task('syncsCloud') ->desc('Schedules cloud sync tasks') @@ -51,31 +87,6 @@ $cli Console::title('Syncs cloud V1'); Console::success(APP_NAME . ' Syncs cloud process v1 has started'); - function syncRegionalCache($dbForConsole, $regionOrg): void - { - $time = DateTime::now(); - $chunks = $dbForConsole->find('syncs', [ - Query::equal('regionOrg', [$regionOrg]), - Query::limit(500) - ]); - - if (count($chunks) > 0) { - Console::info("[{$time}] Found " . \count($chunks) . " cache key chunks to purge."); - foreach ($chunks as $chunk) { - $keys = $chunk->getAttribute('keys'); - foreach ($keys['keys'] ?? [] as $key) { - (new SyncOut()) - ->setRegion($chunk->getAttribute('region')) - ->addKey($key) - ->trigger(); - } - $dbForConsole->deleteDocument('syncs', $chunk->getId()); - } - } else { - Console::info("[{$time}] No cache key chunks where found."); - } - } - $interval = (int) App::getEnv('_APP_SYNCS_CLOUD_INTERVAL', '180'); Console::loop(function () use ($interval) { diff --git a/app/workers/syncsIn.php b/app/workers/syncsIn.php index 00c426ff10..40031ca20e 100644 --- a/app/workers/syncsIn.php +++ b/app/workers/syncsIn.php @@ -1,47 +1,49 @@ args['key'])) { - $this->getCache()->purge($this->args['key']); - } - } - - /** - * Get cache - * @return RedisCache - * @throws Exception - */ - private function getCache(): RedisCache - { - global $register; - - return new RedisCache($register->get('cache')); - } - - - - public function shutdown(): void - { - } + global $register; + return new RedisCache($register->get('cache')); } + +$connection = new Queue\Connection\Redis('redis'); +$adapter = new Queue\Adapter\Swoole($connection, 1, 'syncIn'); +$server = new Queue\Server($adapter); + +$server->job() + ->inject('message') + ->action(function (Message $message) use (&$keys, &$counter) { + + $payload = $message->getPayload()['value']; + if (!empty($payload['key'])) { + var_dump('purging ' . $payload['key']); + getCache()->purge($payload['key']); + } + }); + +$server + ->error() + ->inject('error') + ->action(function ($error) { + echo $error->getMessage() . PHP_EOL; + }); + +$server + ->workerStart(function () { + echo "In region [" . CURRENT_REGION . "] cache purging worker Started" . PHP_EOL; + }) + ->start(); diff --git a/app/workers/syncsOut.php b/app/workers/syncsOut.php index a622941768..46e924aa0b 100644 --- a/app/workers/syncsOut.php +++ b/app/workers/syncsOut.php @@ -1,95 +1,187 @@ regions = Config::getParam('regions', []); - } + $attempts = 0; - public function run(): void - { + do { + try { + $attempts++; + $cache = new Cache(new RedisCache($register->get('cache'))); + $database = new Database(new MariaDB($register->get('db')), $cache); + $database->setDefaultDatabase(App::getEnv('_APP_DB_SCHEMA', 'appwrite')); + $database->setNamespace($namespace); // Main DB + + if (!empty($projectId) && !$database->getDocument('projects', $projectId)->isEmpty()) { + throw new \Exception("Project does not exist: {$projectId}"); + } + + if ($type === DATABASE_CONSOLE && !$database->exists($database->getDefaultDatabase(), Database::METADATA)) { + throw new \Exception('Console project not ready'); + } + + break; // leave loop if successful + } catch (\Exception $e) { + Console::warning("Database not ready. Retrying connection ({$attempts})..."); + if ($attempts >= DATABASE_RECONNECT_MAX_ATTEMPTS) { + throw new \Exception('Failed to connect to database: ' . $e->getMessage()); + } + sleep($sleep); + } + } while ($attempts < DATABASE_RECONNECT_MAX_ATTEMPTS); + + return $database; +} + +function send($url, $token, $keys): int +{ + + $ch = curl_init($url); + curl_setopt($ch, CURLOPT_HTTPHEADER, [ + 'Authorization: Bearer ' . $token, + 'Content-Type: application/json' + ]); + curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); + curl_setopt($ch, CURLOPT_TIMEOUT, 5); + curl_setopt($ch, CURLOPT_CUSTOMREQUEST, 'POST'); + curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($keys)); + + for ($attempts = 0; $attempts < 5; $attempts++) { + curl_exec($ch); + $responseStatus = curl_getinfo($ch, CURLINFO_HTTP_CODE); + + if ($responseStatus === 200) { + return $responseStatus; + } + + sleep(2); + } + curl_close($ch); + return $responseStatus; +} + +$connection = new Queue\Connection\Redis('redis'); +$adapter = new Queue\Adapter\Swoole($connection, 1, 'syncOut'); +$server = new Queue\Server($adapter); + +$server->job() + ->inject('message') + ->action(function (Message $message) use (&$keys, &$counter) { + + $payload = $message->getPayload()['value']; + $regions = Config::getParam('regions', true); + + if (!empty($payload['region'])) { + $tmp = $regions[$payload['region']]; + $regions = []; + $regions[$payload['region']] = $tmp; + } $currentRegion = App::getEnv('_APP_REGION', 'nyc1'); + $keys[$payload['key']] = null; - $data[] = $this->args['key']; - $jwt = new JWT(App::getEnv('_APP_OPENSSL_KEY_V1'), 'HS256', 600, 10); - $token = $jwt->encode($data); + if (count($keys) > MAX_KEY_COUNT || ($counter + SUBMITION_INTERVAL) < time()) { - if (!empty($this->args['region'])) { - $this->regions = $this->regions[$this->args['region']]; - } + $jwt = new JWT(App::getEnv('_APP_OPENSSL_KEY_V1'), 'HS256', 600, 10); + $token = $jwt->encode([]); - foreach ($this->regions as $code => $region) { - if ($currentRegion === $code) { - continue; + foreach ($regions as $code => $region) { + if (CURRENT_REGION === $code) { + continue; + } + + $status = send($region['domain'] . '/v1/edge', $token, ['keys' => array_keys($keys)]); + +// var_dump([ +// 'keyscount' => count($keys), +// 'keys' => array_keys($keys), +// 'timestamp' => date('m/d/Y H:i:s', $counter) +// ]); + + if ($status !== Response::STATUS_CODE_OK) { + getDB(DATABASE_CONSOLE)->createDocument('syncs', new Document([ + 'requestedAt' => DateTime::now(), + 'regionOrg' => $currentRegion, + 'regionDest' => $code, + 'keys' => array_keys($keys), + 'status' => $status, + ])); + } } - $status = $this->send($region['domain'] . '/v1/edge', $token, ['keys' => $data]); - - if ($status !== Response::STATUS_CODE_OK) { - $this->getConsoleDB()->createDocument('syncs', new Document([ - 'requestedAt' => DateTime::now(), - 'regionOrg' => $currentRegion, - 'regionDest' => $code, - 'keys' => $data, - 'status' => $status, - ])); - } + $counter = time(); + $keys = []; + //var_dump('new time set -> ' . $counter); + //var_dump($keys); } - } + }); - private function send($url, $token, $data): int - { +$server + ->error() + ->inject('error') + ->action(function ($error) { + echo $error->getMessage() . PHP_EOL; + echo $error->getLine() . PHP_EOL; + }); - $ch = curl_init($url); - curl_setopt($ch, CURLOPT_HTTPHEADER, [ - 'Authorization: Bearer ' . $token, - 'Content-Type: application/json' - ]); - curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); - curl_setopt($ch, CURLOPT_TIMEOUT, 5); - curl_setopt($ch, CURLOPT_CUSTOMREQUEST, 'POST'); - curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($data)); - - for ($attempts = 0; $attempts < 3; $attempts++) { - curl_exec($ch); - $responseStatus = curl_getinfo($ch, CURLINFO_HTTP_CODE); - - if ($responseStatus === 200) { - return $responseStatus; - } - - sleep(2); - } - curl_close($ch); - return $responseStatus; - } - - - public function shutdown(): void - { - } -} +$server + ->workerStart(function () { + echo "Out region [" . CURRENT_REGION . "] cache purging worker Started" . PHP_EOL; + }) + ->start(); diff --git a/bin/worker-syncs-in b/bin/worker-syncs-in index f82d859e1f..6b973c44fe 100644 --- a/bin/worker-syncs-in +++ b/bin/worker-syncs-in @@ -1,10 +1,3 @@ #!/bin/sh -if [ -z "$_APP_REDIS_USER" ] && [ -z "$_APP_REDIS_PASS" ] -then - REDIS_BACKEND="${_APP_REDIS_HOST}:${_APP_REDIS_PORT}" -else - REDIS_BACKEND="redis://${_APP_REDIS_USER}:${_APP_REDIS_PASS}@${_APP_REDIS_HOST}:${_APP_REDIS_PORT}" -fi - -INTERVAL=0.1 QUEUE='v1-syncs-in' APP_INCLUDE='/usr/src/code/app/workers/syncsIn.php' php /usr/src/code/vendor/bin/resque -dopcache.preload=opcache.preload=/usr/src/code/app/preload.php \ No newline at end of file +php /usr/src/code/app/workers/syncsIn.php $@ \ No newline at end of file diff --git a/bin/worker-syncs-out b/bin/worker-syncs-out index 523ecd5270..8117e84b34 100644 --- a/bin/worker-syncs-out +++ b/bin/worker-syncs-out @@ -1,10 +1,3 @@ #!/bin/sh -if [ -z "$_APP_REDIS_USER" ] && [ -z "$_APP_REDIS_PASS" ] -then - REDIS_BACKEND="${_APP_REDIS_HOST}:${_APP_REDIS_PORT}" -else - REDIS_BACKEND="redis://${_APP_REDIS_USER}:${_APP_REDIS_PASS}@${_APP_REDIS_HOST}:${_APP_REDIS_PORT}" -fi - -INTERVAL=0.1 QUEUE='v1-syncs-out' APP_INCLUDE='/usr/src/code/app/workers/syncsOut.php' php /usr/src/code/vendor/bin/resque -dopcache.preload=opcache.preload=/usr/src/code/app/preload.php \ No newline at end of file +php /usr/src/code/app/workers/syncsOut.php $@ \ No newline at end of file diff --git a/composer.json b/composer.json index f302008567..58f3070f7e 100644 --- a/composer.json +++ b/composer.json @@ -61,6 +61,7 @@ "utopia-php/websocket": "0.1.0", "utopia-php/image": "0.5.*", "utopia-php/orchestration": "0.6.*", + "utopia-php/queue": "0.3.*", "resque/php-resque": "1.3.6", "matomo/device-detector": "6.0.0", "dragonmantank/cron-expression": "3.3.1", diff --git a/composer.lock b/composer.lock index db9ee209a7..6f7efe70a3 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "1ceb54089b20b863e8685e1644ba51f9", + "content-hash": "73ea3ea466d02c3c6427d235c11554ff", "packages": [ { "name": "adhocore/jwt", @@ -2492,6 +2492,67 @@ }, "time": "2020-10-24T07:04:59+00:00" }, + { + "name": "utopia-php/queue", + "version": "0.3.0", + "source": { + "type": "git", + "url": "https://github.com/utopia-php/queue.git", + "reference": "42b132c6f2431b726c2bc629c386921e4934b863" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/utopia-php/queue/zipball/42b132c6f2431b726c2bc629c386921e4934b863", + "reference": "42b132c6f2431b726c2bc629c386921e4934b863", + "shasum": "" + }, + "require": { + "php": ">=8.0", + "utopia-php/cli": "0.13.*", + "utopia-php/framework": "0.*.*" + }, + "require-dev": { + "laravel/pint": "^0.2.3", + "phpstan/phpstan": "^1.8", + "phpunit/phpunit": "^9.5.5", + "swoole/ide-helper": "4.8.8", + "workerman/workerman": "^4.0" + }, + "suggest": { + "ext-swoole": "Needed to support Swoole.", + "workerman/workerman": "Needed to support Workerman." + }, + "type": "library", + "autoload": { + "psr-4": { + "Utopia\\Queue\\": "src/Queue" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Torsten Dittmann", + "email": "torsten@appwrite.io" + } + ], + "description": "A powerful task queue.", + "keywords": [ + "Tasks", + "framework", + "php", + "queue", + "upf", + "utopia" + ], + "support": { + "issues": "https://github.com/utopia-php/queue/issues", + "source": "https://github.com/utopia-php/queue/tree/0.3.0" + }, + "time": "2022-10-19T13:22:07+00:00" + }, { "name": "utopia-php/registry", "version": "0.5.0", @@ -5388,5 +5449,5 @@ "platform-overrides": { "php": "8.0" }, - "plugin-api-version": "2.3.0" + "plugin-api-version": "2.2.0" }