App::getEnv('_APP_REGION') !== $region && $region !== 'default', ARRAY_FILTER_USE_KEY ); $stack = [ 'regions' => $regions, 'keys' => [], ]; $failures = []; const CHUNK_MAX_KEYS = 500; const MAX_CURL_SEND_ATTEMPTS = 4; /** * @param string $url * @param string $token * @param array $stack * @return array */ function call(string $url, string $token, array $stack): array { $payload = []; $ch = curl_init($url); curl_setopt($ch, CURLOPT_HTTPHEADER, [ 'Authorization: Bearer ' . $token, 'Content-Type: application/json' ]); curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); curl_setopt($ch, CURLOPT_TIMEOUT, 5); curl_setopt($ch, CURLOPT_CUSTOMREQUEST, 'POST'); curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($stack)); for ($attempts = 0; $attempts < MAX_CURL_SEND_ATTEMPTS; $attempts++) { $response = curl_exec($ch); $status = curl_getinfo($ch, CURLINFO_HTTP_CODE); $payload = [ 'status' => $status, 'payload' => json_decode($response, true) ]; if ($status === 200) { return $payload; } sleep(1); } curl_close($ch); return $payload; } /** * @throws Authorization * @throws Structure * @throws Exception|\Exception */ function handle($dbForConsole, $regions, $stack): void { $jwt = new JWT(App::getEnv('_APP_OPENSSL_KEY_V1'), 'HS256', 600, 10); $token = $jwt->encode([]); foreach ($regions as $code => $region) { $time = DateTime::now(); $response = call($region['domain'] . '/v1/edge/sync', $token, ['keys' => $stack]); if ($response['status'] !== Response::STATUS_CODE_OK) { Console::error("[{$time}] Request to {$code} has failed"); $dbForConsole->createDocument('sync', new Document([ 'region' => App::getEnv('_APP_REGION'), 'target' => $code, 'keys' => $stack, 'status' => $response['status'], 'payload' => $response['payload'], ])); } } } $server->job() ->inject('message') ->action(function (Message $message) use (&$stack, &$failures) { $payload = $message->getPayload() ?? []; if (!empty($payload['keys'])) { $regions = array_filter( Config::getParam('regions', []), fn ($region) => $payload['region'] === $region, ARRAY_FILTER_USE_KEY ); $failures[] = [ 'regions' => $regions, 'keys' => $payload['keys'] ]; } if (!empty($payload['key'])) { if (!in_array($payload['key'], $stack['keys'] ?? [])) { $stack['keys'][] = $payload['key']; } } }); $server ->workerStart() ->inject('dbForConsole') ->action(function ($dbForConsole) use (&$stack, &$failures) { Timer::tick(5000, function () use ($dbForConsole, &$stack, &$failures) { $time = DateTime::now(); if (empty($stack['keys']) && count($failures) === 0) { Console::info("[{$time}] Stack is empty"); return; } if (count($failures) > 0) { $i = 0; while ($i < count($failures)) { $failure = array_shift($failures); Console::info("[{$time}] ReSending " . count($failure['keys']) . " to " . key($failure['regions'])); handle($dbForConsole, $failure['regions'], $failure['keys']); $i++; } return; } $chunk = array_slice($stack['keys'], 0, CHUNK_MAX_KEYS); array_splice($stack['keys'], 0, CHUNK_MAX_KEYS); Console::log("[{$time}] Sending " . count($chunk) . " remains " . count($stack['keys'])); handle($dbForConsole, $stack['regions'], $chunk); $chunk = []; }); Console::success("Out [" . App::getEnv('_APP_REGION') . "] edge cache purging worker Started"); }); $server->start();