diff --git a/app/http.php b/app/http.php index 67da67376d..e061e2d549 100644 --- a/app/http.php +++ b/app/http.php @@ -7,10 +7,7 @@ $registerRequestResources = require __DIR__ . '/init/resources/request.php'; use Appwrite\Utopia\Request; use Appwrite\Utopia\Response; -use Swoole\Constant; -use Swoole\Process; use Swoole\Table; -use Swoole\Timer; use Utopia\Audit\Adapter\Database as AdapterDatabase; use Utopia\Audit\Adapter\SQL as AuditAdapterSQL; use Utopia\Audit\Audit; @@ -19,14 +16,12 @@ use Utopia\Config\Config; use Utopia\Console; use Utopia\Database\Adapter\Pool as DatabasePool; use Utopia\Database\Database; -use Utopia\Database\DateTime; use Utopia\Database\Document; use Utopia\Database\Exception\Duplicate as DuplicateException; use Utopia\Database\Helpers\ID; use Utopia\Database\Helpers\Permission; use Utopia\Database\Helpers\Role; -use Utopia\Database\Query; -use Utopia\Http\Adapter\Swoole\Server; +use Utopia\Http\Adapter\SwooleCoroutine\Server; use Utopia\Http\Files; use Utopia\Http\Http; use Utopia\Logger\Log; @@ -34,164 +29,34 @@ use Utopia\Logger\Log\User; use Utopia\Span\Span; use Utopia\System\System; -const DOMAIN_SYNC_TIMER = 30; // 30 seconds - $files = new Files(); $files->load(__DIR__ . '/../public'); -$riskyDomains = new Table(100_000); -$riskyDomains->column('value', Table::TYPE_INT, 1); -$riskyDomains->create(); - $certifiedDomains = new Table(100_000); $certifiedDomains->column('value', Table::TYPE_INT, 1); $certifiedDomains->create(); global $container; -$container->set('riskyDomains', fn () => $riskyDomains); $container->set('certifiedDomains', fn () => $certifiedDomains); $container->set('pools', function ($register) { return $register->get('pools'); }, ['register']); $payloadSize = 12 * (1024 * 1024); // 12MB - adding slight buffer for headers and other data that might be sent with the payload - update later with valid testing -$totalWorkers = intval(System::getEnv('_APP_CPU_NUM', swoole_cpu_num())) * intval(System::getEnv('_APP_WORKER_PER_CORE', 6)); $swooleAdapter = new Server( host: "0.0.0.0", port: System::getEnv('PORT', 80), settings: [ - Constant::OPTION_WORKER_NUM => $totalWorkers, - Constant::OPTION_DISPATCH_FUNC => dispatch(...), - Constant::OPTION_DISPATCH_MODE => SWOOLE_DISPATCH_UIDMOD, - Constant::OPTION_HTTP_COMPRESSION => false, - Constant::OPTION_PACKAGE_MAX_LENGTH => $payloadSize, - Constant::OPTION_OUTPUT_BUFFER_SIZE => $payloadSize, - Constant::OPTION_TASK_WORKER_NUM => 1, // required for the task to fetch domains background + 'http_compression' => false, + 'package_max_length' => $payloadSize, + 'output_buffer_size' => $payloadSize, ], container: $container, ); $container->set('container', fn () => fn () => $swooleAdapter->getContainer()); -$http = $swooleAdapter->getServer(); - -/** - * Assigns HTTP requests to worker threads by analyzing its payload/content. - * - * Routes requests as 'safe' or 'risky' based on specific content patterns (like POST actions or certain domains) - * to optimize load distribution between the workers. Utilizes `$safeThreadsPercent` to manage risk by assigning - * riskier tasks to a dedicated worker subset. Prefers idle workers, with fallback to random selection if necessary. - * doc: https://openswoole.com/docs/modules/swoole-server/configuration#dispatch_func - * - * @param \Swoole\Http\Server $server Swoole server instance. - * @param int $fd client ID - * @param int $type the type of data and its current state - * @param string|null $data Request content for categorization. - * @global int $totalThreads Total number of workers. - * @return int Chosen worker ID for the request. - */ -function dispatch(\Swoole\Http\Server $server, int $fd, int $type, $data = null): int -{ - $resolveWorkerId = function (\Swoole\Http\Server $server, $data = null) { - global $totalWorkers, $riskyDomains; - - // If data is not set we can send request to any worker - // first we try to pick idle worker, if not we randomly pick a worker - if ($data === null) { - for ($i = 0; $i < $totalWorkers; $i++) { - if ($server->getWorkerStatus($i) === SWOOLE_WORKER_IDLE) { - return $i; - } - } - return rand(0, $totalWorkers - 1); - } - - $riskyWorkersPercent = intval(System::getEnv('_APP_RISKY_WORKERS_PERCENT', 80)) / 100; // Decimal form 0 to 1 - - // Each worker has numeric ID, starting from 0 and incrementing - // From 0 to riskyWorkers, we consider safe workers - // From riskyWorkers to totalWorkers, we consider risky workers - $riskyWorkers = (int)floor($totalWorkers * $riskyWorkersPercent); // Absolute amount of risky workers - - $domain = ''; - // max up to 3 as first line has request details and second line has host - $lines = explode("\n", $data, 3); - $request = $lines[0]; - if (count($lines) > 1) { - $domain = trim(explode('Host: ', $lines[1])[1] ?? ''); - } - - // Sync executions are considered risky - $risky = false; - if (str_starts_with($request, 'POST') && str_contains($request, '/executions')) { - $risky = true; - } elseif ($riskyDomains->get(md5($domain), 'value') === 1) { - // executions request coming from custom domain - $risky = true; - } else { - foreach (\explode(',', System::getEnv('_APP_DOMAIN_FUNCTIONS')) as $riskyDomain) { - if (empty($riskyDomain)) { - continue; - } - if (str_ends_with($domain, $riskyDomain)) { - $risky = true; - break; - } - } - } - - if ($risky) { - // If risky request, only consider risky workers - for ($j = $riskyWorkers; $j < $totalWorkers; $j++) { - /** Reference https://openswoole.com/docs/modules/swoole-server-getWorkerStatus#description */ - if ($server->getWorkerStatus($j) === SWOOLE_WORKER_IDLE) { - // If idle worker found, give to him - return $j; - } - } - - // If no idle workers, give to random risky worker - $worker = rand($riskyWorkers, $totalWorkers - 1); - Console::warning("swoole_dispatch: Risky branch: did not find a idle worker, picking random worker {$worker}"); - return $worker; - } - - // If safe request, give to any idle worker - // Its fine to pick risky worker here, because it's idle. Idle is never actually risky - for ($i = 0; $i < $totalWorkers; $i++) { - if ($server->getWorkerStatus($i) === SWOOLE_WORKER_IDLE) { - return $i; - } - } - - // If no idle worker found, give to random safe worker - // We avoid risky workers here, as it could be in work - not idle. Thats exactly when they are risky. - $worker = rand(0, $riskyWorkers - 1); - Console::warning("swoole_dispatch: Non-risky branch: did not find a idle worker, picking random worker {$worker}"); - return $worker; - }; - $workerId = $resolveWorkerId($server, $data); - $server->bind($fd, $workerId); - return $workerId; -} - -$http->on(Constant::EVENT_WORKER_START, function ($server, $workerId) { -}); - -$http->on(Constant::EVENT_WORKER_STOP, function ($server, $workerId) { - Timer::clearAll(); - Console::success('Worker ' . ++$workerId . ' stopped successfully'); -}); - -$http->on(Constant::EVENT_BEFORE_RELOAD, function ($server) { - Console::success('Starting reload...'); -}); - -$http->on(Constant::EVENT_AFTER_RELOAD, function ($server) { - Console::success('Reload completed...'); -}); - $container->set('bus', function ($register) use ($swooleAdapter) { return $register->get('bus')->setResolver(fn (string $name) => $swooleAdapter->getContainer()->get($name)); }, ['register']); @@ -290,7 +155,9 @@ function createDatabase(Http $app, string $resourceKey, string $dbName, array $c Span::current()?->finish(); } -$http->on(Constant::EVENT_START, function ($http) use ($payloadSize, $totalWorkers, $swooleAdapter) { +// The coroutine adapter does not expose process-worker hooks, so startup work stays in +// a single onStart callback and request routing falls back to coroutine scheduling. +$swooleAdapter->onStart(function () use ($payloadSize, $swooleAdapter) { $app = new Http($swooleAdapter, 'UTC'); /** @var \Utopia\Pools\Group $pools */ @@ -496,20 +363,9 @@ $http->on(Constant::EVENT_START, function ($http) use ($payloadSize, $totalWorke }); Span::init('http.server.start'); - Span::add('server.workers', $totalWorkers); + Span::add('server.adapter', 'swoole-coroutine'); Span::add('server.payload_size', $payloadSize); - Span::add('server.master_pid', $http->master_pid); - Span::add('server.manager_pid', $http->manager_pid); Span::current()?->finish(); - - // Start the task that starts fetching custom domains - $http->task([], 0); - - // listen ctrl + c - Process::signal(2, function () use ($http) { - Console::log('Stop by Ctrl+C'); - $http->shutdown(); - }); }); $swooleAdapter->onRequest(function ($utopiaRequest, $utopiaResponse) use ($files, $swooleAdapter, $registerRequestResources) { @@ -650,83 +506,4 @@ $swooleAdapter->onRequest(function ($utopiaRequest, $utopiaResponse) use ($files } }); -// Fetch domains every `DOMAIN_SYNC_TIMER` seconds and update in the memory -$http->on(Constant::EVENT_TASK, function () use ($swooleAdapter) { - $lastSyncUpdate = null; - - $app = new Http($swooleAdapter, 'UTC'); - - /** @var Utopia\Database\Database $dbForPlatform */ - $dbForPlatform = $app->getResource('dbForPlatform'); - - /** @var \Swoole\Table $riskyDomains */ - $riskyDomains = $app->getResource('riskyDomains'); - - Timer::tick(DOMAIN_SYNC_TIMER * 1000, function () use ($dbForPlatform, $riskyDomains, &$lastSyncUpdate, $app) { - try { - $time = DateTime::now(); - $limit = 1000; - $sum = $limit; - $latestDocument = null; - - while ($sum === $limit) { - $queries = [Query::limit($limit)]; - if ($latestDocument !== null) { - $queries[] = Query::cursorAfter($latestDocument); - } - if ($lastSyncUpdate !== null) { - $queries[] = Query::greaterThanEqual('$updatedAt', $lastSyncUpdate); - } - $results = []; - try { - $authorization = $app->getResource('authorization'); - $results = $authorization->skip(fn () => $dbForPlatform->find('rules', $queries)); - } catch (Throwable $th) { - Console::error('rules ' . $th->getMessage()); - } - - $sum = count($results); - foreach ($results as $document) { - $domain = $document->getAttribute('domain'); - - $denyDomains = []; - $denyEnvVars = [ - System::getEnv('_APP_DOMAIN_FUNCTIONS_FALLBACK', ''), - System::getEnv('_APP_DOMAIN_FUNCTIONS', ''), - System::getEnv('_APP_DOMAIN_SITES', ''), - ]; - foreach ($denyEnvVars as $denyEnvVar) { - foreach (\explode(',', $denyEnvVar) as $denyDomain) { - if (empty($denyDomain)) { - continue; - } - $denyDomains[] = $denyDomain; - } - } - - $isDenyDomain = false; - foreach ($denyDomains as $denyDomain) { - if (str_ends_with($domain, $denyDomain)) { - $isDenyDomain = true; - } - } - - if ($isDenyDomain) { - continue; - } - - $riskyDomains->set(md5($domain), ['value' => 1]); - } - $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; - } - $lastSyncUpdate = $time; - if ($sum > 0) { - Console::log("Sync domains tick: {$sum} domains were updated"); - } - } catch (Throwable $th) { - Console::error($th->getMessage()); - } - }); -}); - $swooleAdapter->start();