mirror of
https://github.com/appwrite/appwrite.git
synced 2026-05-26 13:51:13 +00:00
feat: add coroutine HTTP server experiment
This commit is contained in:
+8
-231
@@ -7,10 +7,7 @@ $registerRequestResources = require __DIR__ . '/init/resources/request.php';
|
||||
|
||||
use Appwrite\Utopia\Request;
|
||||
use Appwrite\Utopia\Response;
|
||||
use Swoole\Constant;
|
||||
use Swoole\Process;
|
||||
use Swoole\Table;
|
||||
use Swoole\Timer;
|
||||
use Utopia\Audit\Adapter\Database as AdapterDatabase;
|
||||
use Utopia\Audit\Adapter\SQL as AuditAdapterSQL;
|
||||
use Utopia\Audit\Audit;
|
||||
@@ -19,14 +16,12 @@ use Utopia\Config\Config;
|
||||
use Utopia\Console;
|
||||
use Utopia\Database\Adapter\Pool as DatabasePool;
|
||||
use Utopia\Database\Database;
|
||||
use Utopia\Database\DateTime;
|
||||
use Utopia\Database\Document;
|
||||
use Utopia\Database\Exception\Duplicate as DuplicateException;
|
||||
use Utopia\Database\Helpers\ID;
|
||||
use Utopia\Database\Helpers\Permission;
|
||||
use Utopia\Database\Helpers\Role;
|
||||
use Utopia\Database\Query;
|
||||
use Utopia\Http\Adapter\Swoole\Server;
|
||||
use Utopia\Http\Adapter\SwooleCoroutine\Server;
|
||||
use Utopia\Http\Files;
|
||||
use Utopia\Http\Http;
|
||||
use Utopia\Logger\Log;
|
||||
@@ -34,164 +29,34 @@ use Utopia\Logger\Log\User;
|
||||
use Utopia\Span\Span;
|
||||
use Utopia\System\System;
|
||||
|
||||
const DOMAIN_SYNC_TIMER = 30; // 30 seconds
|
||||
|
||||
$files = new Files();
|
||||
$files->load(__DIR__ . '/../public');
|
||||
|
||||
$riskyDomains = new Table(100_000);
|
||||
$riskyDomains->column('value', Table::TYPE_INT, 1);
|
||||
$riskyDomains->create();
|
||||
|
||||
$certifiedDomains = new Table(100_000);
|
||||
$certifiedDomains->column('value', Table::TYPE_INT, 1);
|
||||
$certifiedDomains->create();
|
||||
|
||||
global $container;
|
||||
$container->set('riskyDomains', fn () => $riskyDomains);
|
||||
$container->set('certifiedDomains', fn () => $certifiedDomains);
|
||||
$container->set('pools', function ($register) {
|
||||
return $register->get('pools');
|
||||
}, ['register']);
|
||||
|
||||
$payloadSize = 12 * (1024 * 1024); // 12MB - adding slight buffer for headers and other data that might be sent with the payload - update later with valid testing
|
||||
$totalWorkers = intval(System::getEnv('_APP_CPU_NUM', swoole_cpu_num())) * intval(System::getEnv('_APP_WORKER_PER_CORE', 6));
|
||||
|
||||
$swooleAdapter = new Server(
|
||||
host: "0.0.0.0",
|
||||
port: System::getEnv('PORT', 80),
|
||||
settings: [
|
||||
Constant::OPTION_WORKER_NUM => $totalWorkers,
|
||||
Constant::OPTION_DISPATCH_FUNC => dispatch(...),
|
||||
Constant::OPTION_DISPATCH_MODE => SWOOLE_DISPATCH_UIDMOD,
|
||||
Constant::OPTION_HTTP_COMPRESSION => false,
|
||||
Constant::OPTION_PACKAGE_MAX_LENGTH => $payloadSize,
|
||||
Constant::OPTION_OUTPUT_BUFFER_SIZE => $payloadSize,
|
||||
Constant::OPTION_TASK_WORKER_NUM => 1, // required for the task to fetch domains background
|
||||
'http_compression' => false,
|
||||
'package_max_length' => $payloadSize,
|
||||
'output_buffer_size' => $payloadSize,
|
||||
],
|
||||
container: $container,
|
||||
);
|
||||
|
||||
$container->set('container', fn () => fn () => $swooleAdapter->getContainer());
|
||||
|
||||
$http = $swooleAdapter->getServer();
|
||||
|
||||
/**
|
||||
* Assigns HTTP requests to worker threads by analyzing its payload/content.
|
||||
*
|
||||
* Routes requests as 'safe' or 'risky' based on specific content patterns (like POST actions or certain domains)
|
||||
* to optimize load distribution between the workers. Utilizes `$safeThreadsPercent` to manage risk by assigning
|
||||
* riskier tasks to a dedicated worker subset. Prefers idle workers, with fallback to random selection if necessary.
|
||||
* doc: https://openswoole.com/docs/modules/swoole-server/configuration#dispatch_func
|
||||
*
|
||||
* @param \Swoole\Http\Server $server Swoole server instance.
|
||||
* @param int $fd client ID
|
||||
* @param int $type the type of data and its current state
|
||||
* @param string|null $data Request content for categorization.
|
||||
* @global int $totalThreads Total number of workers.
|
||||
* @return int Chosen worker ID for the request.
|
||||
*/
|
||||
function dispatch(\Swoole\Http\Server $server, int $fd, int $type, $data = null): int
|
||||
{
|
||||
$resolveWorkerId = function (\Swoole\Http\Server $server, $data = null) {
|
||||
global $totalWorkers, $riskyDomains;
|
||||
|
||||
// If data is not set we can send request to any worker
|
||||
// first we try to pick idle worker, if not we randomly pick a worker
|
||||
if ($data === null) {
|
||||
for ($i = 0; $i < $totalWorkers; $i++) {
|
||||
if ($server->getWorkerStatus($i) === SWOOLE_WORKER_IDLE) {
|
||||
return $i;
|
||||
}
|
||||
}
|
||||
return rand(0, $totalWorkers - 1);
|
||||
}
|
||||
|
||||
$riskyWorkersPercent = intval(System::getEnv('_APP_RISKY_WORKERS_PERCENT', 80)) / 100; // Decimal form 0 to 1
|
||||
|
||||
// Each worker has numeric ID, starting from 0 and incrementing
|
||||
// From 0 to riskyWorkers, we consider safe workers
|
||||
// From riskyWorkers to totalWorkers, we consider risky workers
|
||||
$riskyWorkers = (int)floor($totalWorkers * $riskyWorkersPercent); // Absolute amount of risky workers
|
||||
|
||||
$domain = '';
|
||||
// max up to 3 as first line has request details and second line has host
|
||||
$lines = explode("\n", $data, 3);
|
||||
$request = $lines[0];
|
||||
if (count($lines) > 1) {
|
||||
$domain = trim(explode('Host: ', $lines[1])[1] ?? '');
|
||||
}
|
||||
|
||||
// Sync executions are considered risky
|
||||
$risky = false;
|
||||
if (str_starts_with($request, 'POST') && str_contains($request, '/executions')) {
|
||||
$risky = true;
|
||||
} elseif ($riskyDomains->get(md5($domain), 'value') === 1) {
|
||||
// executions request coming from custom domain
|
||||
$risky = true;
|
||||
} else {
|
||||
foreach (\explode(',', System::getEnv('_APP_DOMAIN_FUNCTIONS')) as $riskyDomain) {
|
||||
if (empty($riskyDomain)) {
|
||||
continue;
|
||||
}
|
||||
if (str_ends_with($domain, $riskyDomain)) {
|
||||
$risky = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if ($risky) {
|
||||
// If risky request, only consider risky workers
|
||||
for ($j = $riskyWorkers; $j < $totalWorkers; $j++) {
|
||||
/** Reference https://openswoole.com/docs/modules/swoole-server-getWorkerStatus#description */
|
||||
if ($server->getWorkerStatus($j) === SWOOLE_WORKER_IDLE) {
|
||||
// If idle worker found, give to him
|
||||
return $j;
|
||||
}
|
||||
}
|
||||
|
||||
// If no idle workers, give to random risky worker
|
||||
$worker = rand($riskyWorkers, $totalWorkers - 1);
|
||||
Console::warning("swoole_dispatch: Risky branch: did not find a idle worker, picking random worker {$worker}");
|
||||
return $worker;
|
||||
}
|
||||
|
||||
// If safe request, give to any idle worker
|
||||
// Its fine to pick risky worker here, because it's idle. Idle is never actually risky
|
||||
for ($i = 0; $i < $totalWorkers; $i++) {
|
||||
if ($server->getWorkerStatus($i) === SWOOLE_WORKER_IDLE) {
|
||||
return $i;
|
||||
}
|
||||
}
|
||||
|
||||
// If no idle worker found, give to random safe worker
|
||||
// We avoid risky workers here, as it could be in work - not idle. Thats exactly when they are risky.
|
||||
$worker = rand(0, $riskyWorkers - 1);
|
||||
Console::warning("swoole_dispatch: Non-risky branch: did not find a idle worker, picking random worker {$worker}");
|
||||
return $worker;
|
||||
};
|
||||
$workerId = $resolveWorkerId($server, $data);
|
||||
$server->bind($fd, $workerId);
|
||||
return $workerId;
|
||||
}
|
||||
|
||||
$http->on(Constant::EVENT_WORKER_START, function ($server, $workerId) {
|
||||
});
|
||||
|
||||
$http->on(Constant::EVENT_WORKER_STOP, function ($server, $workerId) {
|
||||
Timer::clearAll();
|
||||
Console::success('Worker ' . ++$workerId . ' stopped successfully');
|
||||
});
|
||||
|
||||
$http->on(Constant::EVENT_BEFORE_RELOAD, function ($server) {
|
||||
Console::success('Starting reload...');
|
||||
});
|
||||
|
||||
$http->on(Constant::EVENT_AFTER_RELOAD, function ($server) {
|
||||
Console::success('Reload completed...');
|
||||
});
|
||||
|
||||
$container->set('bus', function ($register) use ($swooleAdapter) {
|
||||
return $register->get('bus')->setResolver(fn (string $name) => $swooleAdapter->getContainer()->get($name));
|
||||
}, ['register']);
|
||||
@@ -290,7 +155,9 @@ function createDatabase(Http $app, string $resourceKey, string $dbName, array $c
|
||||
Span::current()?->finish();
|
||||
}
|
||||
|
||||
$http->on(Constant::EVENT_START, function ($http) use ($payloadSize, $totalWorkers, $swooleAdapter) {
|
||||
// The coroutine adapter does not expose process-worker hooks, so startup work stays in
|
||||
// a single onStart callback and request routing falls back to coroutine scheduling.
|
||||
$swooleAdapter->onStart(function () use ($payloadSize, $swooleAdapter) {
|
||||
$app = new Http($swooleAdapter, 'UTC');
|
||||
|
||||
/** @var \Utopia\Pools\Group $pools */
|
||||
@@ -496,20 +363,9 @@ $http->on(Constant::EVENT_START, function ($http) use ($payloadSize, $totalWorke
|
||||
});
|
||||
|
||||
Span::init('http.server.start');
|
||||
Span::add('server.workers', $totalWorkers);
|
||||
Span::add('server.adapter', 'swoole-coroutine');
|
||||
Span::add('server.payload_size', $payloadSize);
|
||||
Span::add('server.master_pid', $http->master_pid);
|
||||
Span::add('server.manager_pid', $http->manager_pid);
|
||||
Span::current()?->finish();
|
||||
|
||||
// Start the task that starts fetching custom domains
|
||||
$http->task([], 0);
|
||||
|
||||
// listen ctrl + c
|
||||
Process::signal(2, function () use ($http) {
|
||||
Console::log('Stop by Ctrl+C');
|
||||
$http->shutdown();
|
||||
});
|
||||
});
|
||||
|
||||
$swooleAdapter->onRequest(function ($utopiaRequest, $utopiaResponse) use ($files, $swooleAdapter, $registerRequestResources) {
|
||||
@@ -650,83 +506,4 @@ $swooleAdapter->onRequest(function ($utopiaRequest, $utopiaResponse) use ($files
|
||||
}
|
||||
});
|
||||
|
||||
// Fetch domains every `DOMAIN_SYNC_TIMER` seconds and update in the memory
|
||||
$http->on(Constant::EVENT_TASK, function () use ($swooleAdapter) {
|
||||
$lastSyncUpdate = null;
|
||||
|
||||
$app = new Http($swooleAdapter, 'UTC');
|
||||
|
||||
/** @var Utopia\Database\Database $dbForPlatform */
|
||||
$dbForPlatform = $app->getResource('dbForPlatform');
|
||||
|
||||
/** @var \Swoole\Table $riskyDomains */
|
||||
$riskyDomains = $app->getResource('riskyDomains');
|
||||
|
||||
Timer::tick(DOMAIN_SYNC_TIMER * 1000, function () use ($dbForPlatform, $riskyDomains, &$lastSyncUpdate, $app) {
|
||||
try {
|
||||
$time = DateTime::now();
|
||||
$limit = 1000;
|
||||
$sum = $limit;
|
||||
$latestDocument = null;
|
||||
|
||||
while ($sum === $limit) {
|
||||
$queries = [Query::limit($limit)];
|
||||
if ($latestDocument !== null) {
|
||||
$queries[] = Query::cursorAfter($latestDocument);
|
||||
}
|
||||
if ($lastSyncUpdate !== null) {
|
||||
$queries[] = Query::greaterThanEqual('$updatedAt', $lastSyncUpdate);
|
||||
}
|
||||
$results = [];
|
||||
try {
|
||||
$authorization = $app->getResource('authorization');
|
||||
$results = $authorization->skip(fn () => $dbForPlatform->find('rules', $queries));
|
||||
} catch (Throwable $th) {
|
||||
Console::error('rules ' . $th->getMessage());
|
||||
}
|
||||
|
||||
$sum = count($results);
|
||||
foreach ($results as $document) {
|
||||
$domain = $document->getAttribute('domain');
|
||||
|
||||
$denyDomains = [];
|
||||
$denyEnvVars = [
|
||||
System::getEnv('_APP_DOMAIN_FUNCTIONS_FALLBACK', ''),
|
||||
System::getEnv('_APP_DOMAIN_FUNCTIONS', ''),
|
||||
System::getEnv('_APP_DOMAIN_SITES', ''),
|
||||
];
|
||||
foreach ($denyEnvVars as $denyEnvVar) {
|
||||
foreach (\explode(',', $denyEnvVar) as $denyDomain) {
|
||||
if (empty($denyDomain)) {
|
||||
continue;
|
||||
}
|
||||
$denyDomains[] = $denyDomain;
|
||||
}
|
||||
}
|
||||
|
||||
$isDenyDomain = false;
|
||||
foreach ($denyDomains as $denyDomain) {
|
||||
if (str_ends_with($domain, $denyDomain)) {
|
||||
$isDenyDomain = true;
|
||||
}
|
||||
}
|
||||
|
||||
if ($isDenyDomain) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$riskyDomains->set(md5($domain), ['value' => 1]);
|
||||
}
|
||||
$latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null;
|
||||
}
|
||||
$lastSyncUpdate = $time;
|
||||
if ($sum > 0) {
|
||||
Console::log("Sync domains tick: {$sum} domains were updated");
|
||||
}
|
||||
} catch (Throwable $th) {
|
||||
Console::error($th->getMessage());
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
$swooleAdapter->start();
|
||||
|
||||
Reference in New Issue
Block a user