From 3e155cbe3ffb87bc2a29a9a614ddc69284ec731c Mon Sep 17 00:00:00 2001 From: shimon Date: Mon, 2 Jan 2023 14:08:53 +0200 Subject: [PATCH] adding realtime region sync --- src/Appwrite/Platform/Tasks/EdgeSync.php | 67 +++++++++++++++--------- 1 file changed, 42 insertions(+), 25 deletions(-) diff --git a/src/Appwrite/Platform/Tasks/EdgeSync.php b/src/Appwrite/Platform/Tasks/EdgeSync.php index db2ba03924..cdc99136b1 100644 --- a/src/Appwrite/Platform/Tasks/EdgeSync.php +++ b/src/Appwrite/Platform/Tasks/EdgeSync.php @@ -3,6 +3,7 @@ namespace Appwrite\Platform\Tasks; use Utopia\App; +use Utopia\Config\Config; use Utopia\Platform\Action; use Utopia\CLI\Console; use Utopia\Database\DateTime; @@ -33,40 +34,56 @@ class EdgeSync extends Action Console::title('Edge-sync V1'); Console::success(APP_NAME . ' Edge-sync v1 has started'); - $interval = (int) App::getEnv('_APP_SYNC_EDGE_INTERVAL', '180'); - Console::loop(function () use ($interval, $dbForConsole, $queueForCacheSyncOut) { + $regions = array_filter( + Config::getParam('regions', []), + fn ($region) => App::getEnv('_APP_REGION') !== $region + && $region !== 'default', + ARRAY_FILTER_USE_KEY + ); + $interval = (int) App::getEnv('_APP_SYNC_EDGE_INTERVAL', '180'); + Console::loop(function () use ($interval, $dbForConsole, $queueForCacheSyncOut, $regions) { $time = DateTime::now(); - $count = 0; - $chunk = 0; - $limit = 50; - $sum = $limit; Console::success("[{$time}] New task every {$interval} seconds"); - while ($sum === $limit) { - $chunk++; + foreach ($regions as $target) { + $count = 0; + $chunk = 0; + $limit = 50; + $sum = $limit; + $keys = []; + while ($sum === $limit) { + $chunk++; - $results = $dbForConsole->find('syncs', [ - Query::equal('region', [App::getEnv('_APP_REGION')]), - Query::limit($limit) - ]); + $results = $dbForConsole->find('syncs', [ + Query::equal('region', [App::getEnv('_APP_REGION')]), + Query::equal('target', [$target]), + Query::limit($limit) + ]); - $sum = count($results); - if ($sum > 0) { - foreach ($results as $document) { - Console::info("[{$time}] Enqueueing keys chunk {$count} to {$document->getAttribute('target')}"); - $queueForCacheSyncOut - ->enqueue([ - 'region' => $document->getAttribute('target'), - 'keys' => $document->getAttribute('keys') - ]); - - $dbForConsole->deleteDocument('syncs', $document->getId()); - $count++; + $sum = count($results); + if ($sum > 0) { + foreach ($results as $document) { + $keys[] = [ + 'type' => $document->getAttribute('type'), + 'key' => $document->getAttribute('key') + ]; + $dbForConsole->deleteDocument('syncs', $document->getId()); + $count++; + } } + } + + if (!empty($keys)) { + Console::info("[{$time}] Enqueueing keys chunk {$count} to {$target}"); + $queueForCacheSyncOut + ->enqueue([ + 'region' => $target, + 'keys' => $keys + ]); } else { - Console::info("[{$time}] No cache keys where found."); + Console::info("[{$time}] No cache keys where found."); } } }, $interval);