mirror of
https://github.com/appwrite/appwrite.git
synced 2026-05-26 13:51:13 +00:00
adding realtime region sync
This commit is contained in:
@@ -3,6 +3,7 @@
|
||||
namespace Appwrite\Platform\Tasks;
|
||||
|
||||
use Utopia\App;
|
||||
use Utopia\Config\Config;
|
||||
use Utopia\Platform\Action;
|
||||
use Utopia\CLI\Console;
|
||||
use Utopia\Database\DateTime;
|
||||
@@ -33,40 +34,56 @@ class EdgeSync extends Action
|
||||
Console::title('Edge-sync V1');
|
||||
Console::success(APP_NAME . ' Edge-sync v1 has started');
|
||||
|
||||
$interval = (int) App::getEnv('_APP_SYNC_EDGE_INTERVAL', '180');
|
||||
Console::loop(function () use ($interval, $dbForConsole, $queueForCacheSyncOut) {
|
||||
$regions = array_filter(
|
||||
Config::getParam('regions', []),
|
||||
fn ($region) => App::getEnv('_APP_REGION') !== $region
|
||||
&& $region !== 'default',
|
||||
ARRAY_FILTER_USE_KEY
|
||||
);
|
||||
|
||||
$interval = (int) App::getEnv('_APP_SYNC_EDGE_INTERVAL', '180');
|
||||
Console::loop(function () use ($interval, $dbForConsole, $queueForCacheSyncOut, $regions) {
|
||||
$time = DateTime::now();
|
||||
$count = 0;
|
||||
$chunk = 0;
|
||||
$limit = 50;
|
||||
$sum = $limit;
|
||||
|
||||
Console::success("[{$time}] New task every {$interval} seconds");
|
||||
|
||||
while ($sum === $limit) {
|
||||
$chunk++;
|
||||
foreach ($regions as $target) {
|
||||
$count = 0;
|
||||
$chunk = 0;
|
||||
$limit = 50;
|
||||
$sum = $limit;
|
||||
$keys = [];
|
||||
while ($sum === $limit) {
|
||||
$chunk++;
|
||||
|
||||
$results = $dbForConsole->find('syncs', [
|
||||
Query::equal('region', [App::getEnv('_APP_REGION')]),
|
||||
Query::limit($limit)
|
||||
]);
|
||||
$results = $dbForConsole->find('syncs', [
|
||||
Query::equal('region', [App::getEnv('_APP_REGION')]),
|
||||
Query::equal('target', [$target]),
|
||||
Query::limit($limit)
|
||||
]);
|
||||
|
||||
$sum = count($results);
|
||||
if ($sum > 0) {
|
||||
foreach ($results as $document) {
|
||||
Console::info("[{$time}] Enqueueing keys chunk {$count} to {$document->getAttribute('target')}");
|
||||
$queueForCacheSyncOut
|
||||
->enqueue([
|
||||
'region' => $document->getAttribute('target'),
|
||||
'keys' => $document->getAttribute('keys')
|
||||
]);
|
||||
|
||||
$dbForConsole->deleteDocument('syncs', $document->getId());
|
||||
$count++;
|
||||
$sum = count($results);
|
||||
if ($sum > 0) {
|
||||
foreach ($results as $document) {
|
||||
$keys[] = [
|
||||
'type' => $document->getAttribute('type'),
|
||||
'key' => $document->getAttribute('key')
|
||||
];
|
||||
$dbForConsole->deleteDocument('syncs', $document->getId());
|
||||
$count++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!empty($keys)) {
|
||||
Console::info("[{$time}] Enqueueing keys chunk {$count} to {$target}");
|
||||
$queueForCacheSyncOut
|
||||
->enqueue([
|
||||
'region' => $target,
|
||||
'keys' => $keys
|
||||
]);
|
||||
} else {
|
||||
Console::info("[{$time}] No cache keys where found.");
|
||||
Console::info("[{$time}] No cache keys where found.");
|
||||
}
|
||||
}
|
||||
}, $interval);
|
||||
|
||||
Reference in New Issue
Block a user