Files
appwrite/app/realtime.php
T
Chirag Aggarwal 8b026d3459 perf: optimize updateDocument() calls to use sparse documents
Optimize updateDocument() calls across the codebase to pass only changed
attributes as sparse Document objects rather than full documents. This is
more efficient because updateDocument() internally performs array_merge().

Changes:
- Updated 58 files to use sparse Document objects
- Added Performance Patterns section to AGENTS.md with optimization guidelines
- Applied pattern to Workers, Functions, Sites, Teams, VCS modules
- Updated app/controllers/api files (account, users, messaging)
- Updated app infrastructure files (realtime, general, init/resources, shared/api)

Exceptions maintained:
- Migration files (need full document updates by design)
- Cases with 6+ attributes (marginal benefit)
- Complex nested relationship logic
2026-03-06 17:05:19 +05:30

918 lines
32 KiB
PHP

<?php
use Appwrite\Extend\Exception;
use Appwrite\Extend\Exception as AppwriteException;
use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Network\Validator\Origin;
use Appwrite\PubSub\Adapter\Pool as PubSubPool;
use Appwrite\Utopia\Database\Documents\User;
use Appwrite\Utopia\Request;
use Appwrite\Utopia\Response;
use Swoole\Coroutine;
use Swoole\Http\Request as SwooleRequest;
use Swoole\Http\Response as SwooleResponse;
use Swoole\Runtime;
use Swoole\Table;
use Swoole\Timer;
use Utopia\Abuse\Abuse;
use Utopia\Abuse\Adapters\TimeLimit\Redis as TimeLimitRedis;
use Utopia\Auth\Hashes\Sha;
use Utopia\Auth\Proofs\Token;
use Utopia\Auth\Store;
use Utopia\Cache\Adapter\Pool as CachePool;
use Utopia\Cache\Adapter\Sharding;
use Utopia\Cache\Cache;
use Utopia\Config\Config;
use Utopia\Console;
use Utopia\Database\Adapter\Pool as DatabasePool;
use Utopia\Database\Database;
use Utopia\Database\DateTime;
use Utopia\Database\Document;
use Utopia\Database\Exception\Query as QueryException;
use Utopia\Database\Helpers\ID;
use Utopia\Database\Helpers\Role;
use Utopia\Database\Query;
use Utopia\Database\Validator\Authorization;
use Utopia\DSN\DSN;
use Utopia\Http\Http;
use Utopia\Logger\Log;
use Utopia\Pools\Group;
use Utopia\Registry\Registry;
use Utopia\System\System;
use Utopia\Telemetry\Adapter\None as NoTelemetry;
use Utopia\WebSocket\Adapter;
use Utopia\WebSocket\Server;
/**
* @var Registry $register
*/
require_once __DIR__ . '/init.php';
Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
// Allows overriding
if (!function_exists('getConsoleDB')) {
function getConsoleDB(): Database
{
$ctx = Coroutine::getContext();
if (isset($ctx['dbForPlatform'])) {
return $ctx['dbForPlatform'];
}
global $register;
/** @var Group $pools */
$pools = $register->get('pools');
$adapter = new DatabasePool($pools->get('console'));
$database = new Database($adapter, getCache());
$database
->setDatabase(APP_DATABASE)
->setNamespace('_console')
->setMetadata('host', \gethostname())
->setMetadata('project', '_console');
$database->setDocumentType('users', User::class);
return $ctx['dbForPlatform'] = $database;
}
}
// Allows overriding
if (!function_exists('getProjectDB')) {
function getProjectDB(Document $project): Database
{
$ctx = Coroutine::getContext();
if (!isset($ctx['dbForProject'])) {
$ctx['dbForProject'] = [];
}
if (isset($ctx['dbForProject'][$project->getSequence()])) {
return $ctx['dbForProject'][$project->getSequence()];
}
global $register;
/** @var Group $pools */
$pools = $register->get('pools');
if ($project->isEmpty() || $project->getId() === 'console') {
return getConsoleDB();
}
try {
$dsn = new DSN($project->getAttribute('database'));
} catch (\InvalidArgumentException) {
// TODO: Temporary until all projects are using shared tables
$dsn = new DSN('mysql://' . $project->getAttribute('database'));
}
$adapter = new DatabasePool($pools->get($dsn->getHost()));
$database = new Database($adapter, getCache());
$sharedTables = \explode(',', System::getEnv('_APP_DATABASE_SHARED_TABLES', ''));
if (\in_array($dsn->getHost(), $sharedTables)) {
$database
->setSharedTables(true)
->setTenant((int)$project->getSequence())
->setNamespace($dsn->getParam('namespace'));
} else {
$database
->setSharedTables(false)
->setTenant(null)
->setNamespace('_' . $project->getSequence());
}
$database
->setDatabase(APP_DATABASE)
->setMetadata('host', \gethostname())
->setMetadata('project', $project->getId());
$database->setDocumentType('users', User::class);
return $ctx['dbForProject'][$project->getSequence()] = $database;
}
}
// Allows overriding
if (!function_exists('getCache')) {
function getCache(): Cache
{
$ctx = Coroutine::getContext();
if (isset($ctx['cache'])) {
return $ctx['cache'];
}
global $register;
$pools = $register->get('pools'); /** @var Group $pools */
$list = Config::getParam('pools-cache', []);
$adapters = [];
foreach ($list as $value) {
$adapters[] = new CachePool($pools->get($value));
}
return $ctx['cache'] = new Cache(new Sharding($adapters));
}
}
// Allows overriding
if (!function_exists('getRedis')) {
function getRedis(): \Redis
{
$ctx = Coroutine::getContext();
if (isset($ctx['redis'])) {
return $ctx['redis'];
}
$host = System::getEnv('_APP_REDIS_HOST', 'localhost');
$port = System::getEnv('_APP_REDIS_PORT', 6379);
$pass = System::getEnv('_APP_REDIS_PASS', '');
$redis = new \Redis();
@$redis->pconnect($host, (int)$port);
if ($pass) {
$redis->auth($pass);
}
$redis->setOption(\Redis::OPT_READ_TIMEOUT, -1);
return $ctx['redis'] = $redis;
}
}
if (!function_exists('getTimelimit')) {
function getTimelimit(string $key = "", int $limit = 0, int $seconds = 1): TimeLimitRedis
{
$ctx = Coroutine::getContext();
if (isset($ctx['timelimit'])) {
return $ctx['timelimit'];
}
return $ctx['timelimit'] = new TimeLimitRedis($key, $limit, $seconds, getRedis());
}
}
if (!function_exists('getRealtime')) {
function getRealtime(): Realtime
{
$ctx = Coroutine::getContext();
if (isset($ctx['realtime'])) {
return $ctx['realtime'];
}
return $ctx['realtime'] = new Realtime();
}
}
if (!function_exists('getTelemetry')) {
function getTelemetry(int $workerId): Utopia\Telemetry\Adapter
{
$ctx = Coroutine::getContext();
if (isset($ctx['telemetry'])) {
return $ctx['telemetry'];
}
return $ctx['telemetry'] = new NoTelemetry();
}
}
$realtime = getRealtime();
/**
* Table for statistics across all workers.
*/
$stats = new Table(4096, 1);
$stats->column('projectId', Table::TYPE_STRING, 64);
$stats->column('teamId', Table::TYPE_STRING, 64);
$stats->column('connections', Table::TYPE_INT);
$stats->column('connectionsTotal', Table::TYPE_INT);
$stats->column('messages', Table::TYPE_INT);
$stats->create();
$containerId = uniqid();
$statsDocument = null;
$workerNumber = intval(System::getEnv('_APP_CPU_NUM', swoole_cpu_num())) * intval(System::getEnv('_APP_WORKER_PER_CORE', 6));
$adapter = new Adapter\Swoole(port: System::getEnv('PORT', 80));
$adapter
->setPackageMaxLength(64000) // Default maximum Package Size (64kb)
->setWorkerNumber($workerNumber);
$server = new Server($adapter);
// Allows overriding
if (!function_exists('logError')) {
function logError(Throwable $error, string $action, array $tags = [], ?Document $project = null, ?Document $user = null, ?Authorization $authorization = null): void
{
global $register;
$logger = $register->get('realtimeLogger');
if ($logger && !$error instanceof Exception) {
$version = System::getEnv('_APP_VERSION', 'UNKNOWN');
$log = new Log();
$log->setNamespace("realtime");
$log->setServer(System::getEnv('_APP_LOGGING_SERVICE_IDENTIFIER', \gethostname()));
$log->setVersion($version);
$log->setType(Log::TYPE_ERROR);
$log->setMessage($error->getMessage());
$log->addTag('code', $error->getCode());
$log->addTag('verboseType', get_class($error));
$log->addTag('projectId', $project?->getId() ?: 'n/a');
$log->addTag('userId', $user?->getId() ?: 'n/a');
foreach ($tags as $key => $value) {
$log->addTag($key, $value ?: 'n/a');
}
$log->addExtra('file', $error->getFile());
$log->addExtra('line', $error->getLine());
$log->addExtra('trace', $error->getTraceAsString());
$log->addExtra('detailedTrace', $error->getTrace());
$log->addExtra('roles', $authorization?->getRoles() ?? []);
$log->setAction($action);
$isProduction = System::getEnv('_APP_ENV', 'development') === 'production';
$log->setEnvironment($isProduction ? Log::ENVIRONMENT_PRODUCTION : Log::ENVIRONMENT_STAGING);
try {
$responseCode = $logger->addLog($log);
Console::info('Error log pushed with status code: ' . $responseCode);
} catch (Throwable $th) {
Console::error('Error pushing log: ' . $th->getMessage());
}
}
Console::error('[Error] Type: ' . get_class($error));
Console::error('[Error] Message: ' . $error->getMessage());
Console::error('[Error] File: ' . $error->getFile());
Console::error('[Error] Line: ' . $error->getLine());
}
}
$server->error(logError(...));
$server->onStart(function () use ($stats, $register, $containerId, &$statsDocument) {
sleep(5); // wait for the initial database schema to be ready
Console::success('Server started successfully');
/**
* Create document for this worker to share stats across Containers.
*/
go(function () use ($register, $containerId, &$statsDocument) {
$attempts = 0;
$database = getConsoleDB();
do {
try {
$attempts++;
$document = new Document([
'$id' => ID::unique(),
'$collection' => ID::custom('realtime'),
'$permissions' => [],
'container' => $containerId,
'timestamp' => DateTime::now(),
'value' => '{}'
]);
$statsDocument = $database->getAuthorization()->skip(fn () => $database->createDocument('realtime', $document));
break;
} catch (Throwable) {
Console::warning("Collection not ready. Retrying connection ({$attempts})...");
sleep(DATABASE_RECONNECT_SLEEP);
}
} while (true);
});
/**
* Save current connections to the Database every 5 seconds.
*/
// TODO: Remove this if check once it doesn't cause issues for cloud
if (System::getEnv('_APP_EDITION', 'self-hosted') === 'self-hosted') {
Timer::tick(5000, function () use ($register, $stats, &$statsDocument) {
$payload = [];
foreach ($stats as $projectId => $value) {
$payload[$projectId] = $stats->get($projectId, 'connectionsTotal');
}
if (empty($payload) || empty($statsDocument)) {
return;
}
try {
$database = getConsoleDB();
$statsDocument
->setAttribute('timestamp', DateTime::now())
->setAttribute('value', json_encode($payload));
$database->getAuthorization()->skip(fn () => $database->updateDocument('realtime', $statsDocument->getId(), new Document([
'timestamp' => $statsDocument->getAttribute('timestamp'),
'value' => $statsDocument->getAttribute('value')
])));
} catch (Throwable $th) {
logError($th, "updateWorkerDocument");
}
});
}
});
$server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $realtime) {
Console::success('Worker ' . $workerId . ' started successfully');
$telemetry = getTelemetry($workerId);
$register->set('telemetry', fn () => $telemetry);
$register->set('telemetry.connectionCounter', fn () => $telemetry->createUpDownCounter('realtime.server.open_connections'));
$register->set('telemetry.connectionCreatedCounter', fn () => $telemetry->createCounter('realtime.server.connection.created'));
$register->set('telemetry.messageSentCounter', fn () => $telemetry->createCounter('realtime.server.message.sent'));
$attempts = 0;
$start = time();
Timer::tick(5000, function () use ($server, $register, $realtime, $stats) {
/**
* Sending current connections to project channels on the console project every 5 seconds.
*/
// TODO: Remove this if check once it doesn't cause issues for cloud
if (System::getEnv('_APP_EDITION', 'self-hosted') === 'self-hosted') {
if ($realtime->hasSubscriber('console', Role::users()->toString(), 'project')) {
$database = getConsoleDB();
$payload = [];
$list = $database->getAuthorization()->skip(fn () => $database->find('realtime', [
Query::greaterThan('timestamp', DateTime::addSeconds(new \DateTime(), -15)),
]));
/**
* Aggregate stats across containers.
*/
foreach ($list as $document) {
foreach (json_decode($document->getAttribute('value')) as $projectId => $value) {
if (array_key_exists($projectId, $payload)) {
$payload[$projectId] += $value;
} else {
$payload[$projectId] = $value;
}
}
}
foreach ($stats as $projectId => $value) {
if (!array_key_exists($projectId, $payload)) {
continue;
}
$event = [
'project' => 'console',
'roles' => ['team:' . $stats->get($projectId, 'teamId')],
'data' => [
'events' => ['stats.connections'],
'channels' => ['project'],
'timestamp' => DateTime::formatTz(DateTime::now()),
'payload' => [
$projectId => $payload[$projectId]
]
]
];
$server->send($realtime->getSubscribers($event), json_encode([
'type' => 'event',
'data' => $event['data']
]));
}
}
}
/**
* Sending test message for SDK E2E tests every 5 seconds.
*/
if ($realtime->hasSubscriber('console', Role::guests()->toString(), 'tests')) {
$payload = ['response' => 'WS:/v1/realtime:passed'];
$event = [
'project' => 'console',
'roles' => [Role::guests()->toString()],
'data' => [
'events' => ['test.event'],
'channels' => ['tests'],
'timestamp' => DateTime::formatTz(DateTime::now()),
'payload' => $payload
]
];
$subscribers = $realtime->getSubscribers($event);
$groups = [];
foreach ($subscribers as $id => $matched) {
$key = implode(',', array_keys($matched));
$groups[$key]['ids'][] = $id;
$groups[$key]['subscriptions'] = array_keys($matched);
}
foreach ($groups as $group) {
$data = $event['data'];
$data['subscriptions'] = $group['subscriptions'];
$server->send($group['ids'], json_encode([
'type' => 'event',
'data' => $data
]));
}
}
});
while ($attempts < 300) {
try {
if ($attempts > 0) {
Console::error('Pub/sub connection lost (lasted ' . (time() - $start) . ' seconds, worker: ' . $workerId . ').
Attempting restart in 5 seconds (attempt #' . $attempts . ')');
sleep(5); // 5 sec delay between connection attempts
}
$start = time();
$pubsub = new PubSubPool($register->get('pools')->get('pubsub'));
if ($pubsub->ping(true)) {
$attempts = 0;
Console::success('Pub/sub connection established (worker: ' . $workerId . ')');
} else {
Console::error('Pub/sub failed (worker: ' . $workerId . ')');
}
$pubsub->subscribe(['realtime'], function (mixed $redis, string $channel, string $payload) use ($server, $workerId, $stats, $register, $realtime) {
$event = json_decode($payload, true);
if ($event['permissionsChanged'] && isset($event['userId'])) {
$projectId = $event['project'];
$userId = $event['userId'];
if ($realtime->hasSubscriber($projectId, 'user:' . $userId)) {
$connection = array_key_first(reset($realtime->subscriptions[$projectId]['user:' . $userId]));
$consoleDatabase = getConsoleDB();
$project = $consoleDatabase->getAuthorization()->skip(fn () => $consoleDatabase->getDocument('projects', $projectId));
$database = getProjectDB($project);
/** @var Appwrite\Utopia\Database\Documents\User $user */
$user = $database->getDocument('users', $userId);
$roles = $user->getRoles($database->getAuthorization());
$authorization = $realtime->connections[$connection]['authorization'] ?? null;
$meta = $realtime->getSubscriptionMetadata($connection);
$realtime->unsubscribe($connection);
foreach ($meta as $subscriptionId => $subscription) {
$queries = Query::parseQueries($subscription['queries'] ?? []);
$realtime->subscribe(
$projectId,
$connection,
$subscriptionId,
$roles,
$subscription['channels'] ?? [],
$queries
);
}
// Restore authorization after subscribe
if ($authorization !== null) {
$realtime->connections[$connection]['authorization'] = $authorization;
}
}
}
$receivers = $realtime->getSubscribers($event);
if (Http::isDevelopment() && !empty($receivers)) {
Console::log("[Debug][Worker {$workerId}] Receivers: " . count($receivers));
Console::log("[Debug][Worker {$workerId}] Connection IDs: " . json_encode(array_keys($receivers)));
Console::log("[Debug][Worker {$workerId}] Matched: " . json_encode(array_values($receivers)));
Console::log("[Debug][Worker {$workerId}] Event: " . $payload);
}
// Group connections by matched subscription IDs for batch sending
$groups = [];
foreach ($receivers as $id => $matched) {
$key = implode(',', array_keys($matched));
$groups[$key]['ids'][] = $id;
$groups[$key]['subscriptions'] = array_keys($matched);
}
$total = 0;
foreach ($groups as $group) {
$data = $event['data'];
$data['subscriptions'] = $group['subscriptions'];
$server->send($group['ids'], json_encode([
'type' => 'event',
'data' => $data
]));
$total += count($group['ids']);
}
if ($total > 0) {
$register->get('telemetry.messageSentCounter')->add($total);
$stats->incr($event['project'], 'messages', $total);
}
});
} catch (Throwable $th) {
logError($th, "pubSubConnection");
Console::error('Pub/sub error: ' . $th->getMessage());
$attempts++;
sleep(DATABASE_RECONNECT_SLEEP);
continue;
}
}
Console::error('Failed to restart pub/sub...');
});
$server->onOpen(function (int $connection, SwooleRequest $request) use ($server, $register, $stats, &$realtime) {
$app = new Http('UTC');
$request = new Request($request);
$response = new Response(new SwooleResponse());
Console::info("Connection open (user: {$connection})");
Http::setResource('pools', fn () => $register->get('pools'));
Http::setResource('request', fn () => $request);
Http::setResource('response', fn () => $response);
$project = null;
$logUser = null;
$authorization = null;
try {
/** @var Document $project */
$project = $app->getResource('project');
$authorization = $app->getResource('authorization');
/*
* Project Check
*/
if (empty($project->getId())) {
throw new Exception(Exception::REALTIME_POLICY_VIOLATION, 'Missing or unknown project ID');
}
if (
array_key_exists('realtime', $project->getAttribute('apis', []))
&& !$project->getAttribute('apis', [])['realtime']
&& !(User::isPrivileged($authorization->getRoles()) || User::isApp($authorization->getRoles()))
) {
throw new AppwriteException(AppwriteException::GENERAL_API_DISABLED);
}
$projectRegion = $project->getAttribute('region', '');
$currentRegion = System::getEnv('_APP_REGION', 'default');
if (!empty($projectRegion) && $projectRegion !== $currentRegion) {
throw new AppwriteException(AppwriteException::GENERAL_ACCESS_FORBIDDEN, 'Project is not accessible in this region. Please make sure you are using the correct endpoint');
}
$timelimit = $app->getResource('timelimit');
$user = $app->getResource('user'); /** @var User $user */
$logUser = $user;
/*
* Abuse Check
*
* Abuse limits are connecting 128 times per minute and ip address.
*/
$timelimit = $timelimit('url:{url},ip:{ip}', 128, 60);
$timelimit
->setParam('{ip}', $request->getIP())
->setParam('{url}', $request->getURI());
$abuse = new Abuse($timelimit);
if (System::getEnv('_APP_OPTIONS_ABUSE', 'enabled') === 'enabled' && $abuse->check()) {
throw new Exception(Exception::REALTIME_TOO_MANY_MESSAGES, 'Too many requests');
}
/*
* Validate Client Domain - Check to avoid CSRF attack.
* Adding Appwrite API domains to allow XDOMAIN communication.
* Skip this check for non-web platforms which are not required to send an origin header.
*/
$origin = $request->getOrigin();
$originValidator = $app->getResource('originValidator');
if (!empty($origin) && !$originValidator->isValid($origin) && $project->getId() !== 'console') {
throw new Exception(Exception::REALTIME_POLICY_VIOLATION, $originValidator->getDescription());
}
$roles = $user->getRoles($authorization);
$channels = Realtime::convertChannels($request->getQuery('channels', []), $user->getId());
/**
* Channels Check
*/
if (empty($channels)) {
throw new Exception(Exception::REALTIME_POLICY_VIOLATION, 'Missing channels');
}
$names = array_keys($channels);
try {
$subscriptions = Realtime::constructSubscriptions(
$names,
fn ($channel) => $request->getQuery($channel, null)
);
} catch (QueryException $e) {
throw new Exception(Exception::REALTIME_POLICY_VIOLATION, $e->getMessage());
}
$mapping = [];
foreach ($subscriptions as $index => $subscription) {
$subscriptionId = ID::unique();
$realtime->subscribe(
$project->getId(),
$connection,
$subscriptionId,
$roles,
$subscription['channels'],
$subscription['queries']
);
$mapping[$index] = $subscriptionId;
}
$realtime->connections[$connection]['authorization'] = $authorization;
$user = empty($user->getId()) ? null : $response->output($user, Response::MODEL_ACCOUNT);
$server->send([$connection], json_encode([
'type' => 'connected',
'data' => [
'channels' => $names,
'subscriptions' => $mapping,
'user' => $user
]
]));
$register->get('telemetry.connectionCounter')->add(1);
$register->get('telemetry.connectionCreatedCounter')->add(1);
$stats->set($project->getId(), [
'projectId' => $project->getId(),
'teamId' => $project->getAttribute('teamId')
]);
$stats->incr($project->getId(), 'connections');
$stats->incr($project->getId(), 'connectionsTotal');
} catch (Throwable $th) {
logError($th, 'realtime', project: $project, user: $logUser, authorization: $authorization);
// Handle SQL error code is 'HY000'
$code = $th->getCode();
if (!\is_int($code)) {
$code = 500;
}
$message = $th->getMessage();
// sanitize 0 && 5xx errors
$realtimeViolation = $th instanceof AppwriteException && $th->getType() === AppwriteException::REALTIME_POLICY_VIOLATION;
if (($code === 0 || $code >= 500) && !$realtimeViolation && !Http::isDevelopment()) {
$message = 'Error: Server Error';
}
$response = [
'type' => 'error',
'data' => [
'code' => $code,
'message' => $message
]
];
$server->send([$connection], json_encode($response));
$server->close($connection, $code);
if (Http::isDevelopment()) {
Console::error('[Error] Connection Error');
Console::error('[Error] Code: ' . $response['data']['code']);
Console::error('[Error] Message: ' . $response['data']['message']);
}
}
});
$server->onMessage(function (int $connection, string $message) use ($server, $register, $realtime, $containerId) {
$project = null;
$authorization = null;
try {
$response = new Response(new SwooleResponse());
$projectId = $realtime->connections[$connection]['projectId'] ?? null;
// Get authorization from connection (stored during onOpen)
$authorization = $realtime->connections[$connection]['authorization'] ?? null;
if ($authorization === null) {
$authorization = new Authorization('');
}
$database = getConsoleDB();
$database->setAuthorization($authorization);
if ($projectId !== 'console') {
$project = $authorization->skip(fn () => $database->getDocument('projects', $projectId));
$database = getProjectDB($project);
$database->setAuthorization($authorization);
} else {
$project = null;
}
/*
* Abuse Check
*
* Abuse limits are sending 32 times per minute and connection.
*/
$timeLimit = getTimelimit('url:{url},connection:{connection}', 32, 60);
$timeLimit
->setParam('{connection}', $connection)
->setParam('{container}', $containerId);
$abuse = new Abuse($timeLimit);
if ($abuse->check() && System::getEnv('_APP_OPTIONS_ABUSE', 'enabled') === 'enabled') {
throw new Exception(Exception::REALTIME_TOO_MANY_MESSAGES, 'Too many messages.');
}
$message = json_decode($message, true);
if (is_null($message) || (!array_key_exists('type', $message) && !array_key_exists('data', $message))) {
throw new Exception(Exception::REALTIME_MESSAGE_FORMAT_INVALID, 'Message format is not valid.');
}
switch ($message['type']) {
case 'ping':
$server->send([$connection], json_encode([
'type' => 'pong'
]));
break;
case 'authentication':
if (!array_key_exists('session', $message['data'])) {
throw new Exception(Exception::REALTIME_MESSAGE_FORMAT_INVALID, 'Payload is not valid.');
}
$store = new Store();
$store->decode($message['data']['session']);
/** @var User $user */
$user = $database->getDocument('users', $store->getProperty('id', ''));
/**
* TODO:
* Moving forward, we should try to use our dependency injection container
* to inject the proof for token.
* This way we will have one source of truth for the proof for token.
*/
$proofForToken = new Token();
$proofForToken->setHash(new Sha());
if (
empty($user->getId()) // Check a document has been found in the DB
|| !$user->sessionVerify($store->getProperty('secret', ''), $proofForToken) // Validate user has valid login token
) {
// cookie not valid
throw new Exception(Exception::REALTIME_MESSAGE_FORMAT_INVALID, 'Session is not valid.');
}
$roles = $user->getRoles($database->getAuthorization());
$authorization = $realtime->connections[$connection]['authorization'] ?? null;
$projectId = $realtime->connections[$connection]['projectId'] ?? null;
$meta = $realtime->getSubscriptionMetadata($connection);
$realtime->unsubscribe($connection);
if (!empty($projectId)) {
foreach ($meta as $subscriptionId => $subscription) {
$queries = Query::parseQueries($subscription['queries'] ?? []);
$realtime->subscribe(
$projectId,
$connection,
$subscriptionId,
$roles,
$subscription['channels'] ?? [],
$queries
);
}
}
if ($authorization !== null) {
$realtime->connections[$connection]['authorization'] = $authorization;
}
$user = $response->output($user, Response::MODEL_ACCOUNT);
$server->send([$connection], json_encode([
'type' => 'response',
'data' => [
'to' => 'authentication',
'success' => true,
'user' => $user
]
]));
break;
default:
throw new Exception(Exception::REALTIME_MESSAGE_FORMAT_INVALID, 'Message type is not valid.');
}
} catch (Throwable $th) {
logError($th, 'realtimeMessage', project: $project, authorization: $authorization);
$code = $th->getCode();
if (!is_int($code)) {
$code = 500;
}
$message = $th->getMessage();
// sanitize 0 && 5xx errors
if (($code === 0 || $code >= 500) && !Http::isDevelopment()) {
$message = 'Error: Server Error';
}
$response = [
'type' => 'error',
'data' => [
'code' => $code,
'message' => $message
]
];
$server->send([$connection], json_encode($response));
if ($th->getCode() === 1008) {
$server->close($connection, $th->getCode());
}
}
});
$server->onClose(function (int $connection) use ($realtime, $stats, $register) {
if (array_key_exists($connection, $realtime->connections)) {
$stats->decr($realtime->connections[$connection]['projectId'], 'connectionsTotal');
$register->get('telemetry.connectionCounter')->add(-1);
}
$realtime->unsubscribe($connection);
Console::info('Connection close: ' . $connection);
});
$server->start();