Files
appwrite/app/realtime.php
eldadfux 9c597623ff fix: handle null authorization in realtime connections
Added a check to initialize the authorization object with an empty string if it is null when processing messages in the realtime server. This ensures that the database authorization is always set, preventing potential errors during message handling.
2026-02-19 19:50:48 +01:00

887 lines
31 KiB
PHP

<?php
use Appwrite\Extend\Exception;
use Appwrite\Extend\Exception as AppwriteException;
use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Network\Validator\Origin;
use Appwrite\PubSub\Adapter\Pool as PubSubPool;
use Appwrite\Utopia\Database\Documents\User;
use Appwrite\Utopia\Request;
use Appwrite\Utopia\Response;
use Swoole\Coroutine;
use Swoole\Http\Request as SwooleRequest;
use Swoole\Http\Response as SwooleResponse;
use Swoole\Runtime;
use Swoole\Table;
use Swoole\Timer;
use Utopia\Abuse\Abuse;
use Utopia\Abuse\Adapters\TimeLimit\Redis as TimeLimitRedis;
use Utopia\Auth\Hashes\Sha;
use Utopia\Auth\Proofs\Token;
use Utopia\Auth\Store;
use Utopia\Cache\Adapter\Pool as CachePool;
use Utopia\Cache\Adapter\Sharding;
use Utopia\Cache\Cache;
use Utopia\Config\Config;
use Utopia\Console;
use Utopia\Database\Adapter\Pool as DatabasePool;
use Utopia\Database\Database;
use Utopia\Database\DateTime;
use Utopia\Database\Document;
use Utopia\Database\Exception\Query as QueryException;
use Utopia\Database\Helpers\ID;
use Utopia\Database\Helpers\Role;
use Utopia\Database\Query;
use Utopia\Database\Validator\Authorization;
use Utopia\DSN\DSN;
use Utopia\Http\Http;
use Utopia\Logger\Log;
use Utopia\Pools\Group;
use Utopia\Registry\Registry;
use Utopia\System\System;
use Utopia\Telemetry\Adapter\None as NoTelemetry;
use Utopia\WebSocket\Adapter;
use Utopia\WebSocket\Server;
/**
* @var Registry $register
*/
require_once __DIR__ . '/init.php';
Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
// Allows overriding
if (!function_exists('getConsoleDB')) {
function getConsoleDB(): Database
{
$ctx = Coroutine::getContext();
if (isset($ctx['dbForPlatform'])) {
return $ctx['dbForPlatform'];
}
global $register;
/** @var Group $pools */
$pools = $register->get('pools');
$adapter = new DatabasePool($pools->get('console'));
$database = new Database($adapter, getCache());
$database
->setDatabase(APP_DATABASE)
->setNamespace('_console')
->setMetadata('host', \gethostname())
->setMetadata('project', '_console');
$database->setDocumentType('users', User::class);
return $ctx['dbForPlatform'] = $database;
}
}
// Allows overriding
if (!function_exists('getProjectDB')) {
function getProjectDB(Document $project): Database
{
$ctx = Coroutine::getContext();
if (!isset($ctx['dbForProject'])) {
$ctx['dbForProject'] = [];
}
if (isset($ctx['dbForProject'][$project->getSequence()])) {
return $ctx['dbForProject'][$project->getSequence()];
}
global $register;
/** @var Group $pools */
$pools = $register->get('pools');
if ($project->isEmpty() || $project->getId() === 'console') {
return getConsoleDB();
}
try {
$dsn = new DSN($project->getAttribute('database'));
} catch (\InvalidArgumentException) {
// TODO: Temporary until all projects are using shared tables
$dsn = new DSN('mysql://' . $project->getAttribute('database'));
}
$adapter = new DatabasePool($pools->get($dsn->getHost()));
$database = new Database($adapter, getCache());
$sharedTables = \explode(',', System::getEnv('_APP_DATABASE_SHARED_TABLES', ''));
if (\in_array($dsn->getHost(), $sharedTables)) {
$database
->setSharedTables(true)
->setTenant((int)$project->getSequence())
->setNamespace($dsn->getParam('namespace'));
} else {
$database
->setSharedTables(false)
->setTenant(null)
->setNamespace('_' . $project->getSequence());
}
$database
->setDatabase(APP_DATABASE)
->setMetadata('host', \gethostname())
->setMetadata('project', $project->getId());
$database->setDocumentType('users', User::class);
return $ctx['dbForProject'][$project->getSequence()] = $database;
}
}
// Allows overriding
if (!function_exists('getCache')) {
function getCache(): Cache
{
$ctx = Coroutine::getContext();
if (isset($ctx['cache'])) {
return $ctx['cache'];
}
global $register;
$pools = $register->get('pools'); /** @var Group $pools */
$list = Config::getParam('pools-cache', []);
$adapters = [];
foreach ($list as $value) {
$adapters[] = new CachePool($pools->get($value));
}
return $ctx['cache'] = new Cache(new Sharding($adapters));
}
}
// Allows overriding
if (!function_exists('getRedis')) {
function getRedis(): \Redis
{
$ctx = Coroutine::getContext();
if (isset($ctx['redis'])) {
return $ctx['redis'];
}
$host = System::getEnv('_APP_REDIS_HOST', 'localhost');
$port = System::getEnv('_APP_REDIS_PORT', 6379);
$pass = System::getEnv('_APP_REDIS_PASS', '');
$redis = new \Redis();
@$redis->pconnect($host, (int)$port);
if ($pass) {
$redis->auth($pass);
}
$redis->setOption(\Redis::OPT_READ_TIMEOUT, -1);
return $ctx['redis'] = $redis;
}
}
if (!function_exists('getTimelimit')) {
function getTimelimit(string $key = "", int $limit = 0, int $seconds = 1): TimeLimitRedis
{
$ctx = Coroutine::getContext();
if (isset($ctx['timelimit'])) {
return $ctx['timelimit'];
}
return $ctx['timelimit'] = new TimeLimitRedis($key, $limit, $seconds, getRedis());
}
}
if (!function_exists('getRealtime')) {
function getRealtime(): Realtime
{
$ctx = Coroutine::getContext();
if (isset($ctx['realtime'])) {
return $ctx['realtime'];
}
return $ctx['realtime'] = new Realtime();
}
}
if (!function_exists('getTelemetry')) {
function getTelemetry(int $workerId): Utopia\Telemetry\Adapter
{
$ctx = Coroutine::getContext();
if (isset($ctx['telemetry'])) {
return $ctx['telemetry'];
}
return $ctx['telemetry'] = new NoTelemetry();
}
}
$realtime = getRealtime();
/**
* Table for statistics across all workers.
*/
$stats = new Table(4096, 1);
$stats->column('projectId', Table::TYPE_STRING, 64);
$stats->column('teamId', Table::TYPE_STRING, 64);
$stats->column('connections', Table::TYPE_INT);
$stats->column('connectionsTotal', Table::TYPE_INT);
$stats->column('messages', Table::TYPE_INT);
$stats->create();
$containerId = uniqid();
$statsDocument = null;
$workerNumber = intval(System::getEnv('_APP_CPU_NUM', swoole_cpu_num())) * intval(System::getEnv('_APP_WORKER_PER_CORE', 6));
$adapter = new Adapter\Swoole(port: System::getEnv('PORT', 80));
$adapter
->setPackageMaxLength(64000) // Default maximum Package Size (64kb)
->setWorkerNumber($workerNumber);
$server = new Server($adapter);
$logError = function (Throwable $error, string $action) use ($register) {
$logger = $register->get('realtimeLogger');
if ($logger && !$error instanceof Exception) {
$version = System::getEnv('_APP_VERSION', 'UNKNOWN');
$log = new Log();
$log->setNamespace("realtime");
$log->setServer(System::getEnv('_APP_LOGGING_SERVICE_IDENTIFIER', \gethostname()));
$log->setVersion($version);
$log->setType(Log::TYPE_ERROR);
$log->setMessage($error->getMessage());
$log->addTag('code', $error->getCode());
$log->addTag('verboseType', get_class($error));
$log->addExtra('file', $error->getFile());
$log->addExtra('line', $error->getLine());
$log->addExtra('trace', $error->getTraceAsString());
$log->setAction($action);
$isProduction = System::getEnv('_APP_ENV', 'development') === 'production';
$log->setEnvironment($isProduction ? Log::ENVIRONMENT_PRODUCTION : Log::ENVIRONMENT_STAGING);
try {
$responseCode = $logger->addLog($log);
Console::info('Error log pushed with status code: ' . $responseCode);
} catch (Throwable $th) {
Console::error('Error pushing log: ' . $th->getMessage());
}
}
Console::error('[Error] Type: ' . get_class($error));
Console::error('[Error] Message: ' . $error->getMessage());
Console::error('[Error] File: ' . $error->getFile());
Console::error('[Error] Line: ' . $error->getLine());
};
$server->error($logError);
$server->onStart(function () use ($stats, $register, $containerId, &$statsDocument, $logError) {
sleep(5); // wait for the initial database schema to be ready
Console::success('Server started successfully');
/**
* Create document for this worker to share stats across Containers.
*/
go(function () use ($register, $containerId, &$statsDocument) {
$attempts = 0;
$database = getConsoleDB();
do {
try {
$attempts++;
$document = new Document([
'$id' => ID::unique(),
'$collection' => ID::custom('realtime'),
'$permissions' => [],
'container' => $containerId,
'timestamp' => DateTime::now(),
'value' => '{}'
]);
$statsDocument = $database->getAuthorization()->skip(fn () => $database->createDocument('realtime', $document));
break;
} catch (Throwable) {
Console::warning("Collection not ready. Retrying connection ({$attempts})...");
sleep(DATABASE_RECONNECT_SLEEP);
}
} while (true);
});
/**
* Save current connections to the Database every 5 seconds.
*/
// TODO: Remove this if check once it doesn't cause issues for cloud
if (System::getEnv('_APP_EDITION', 'self-hosted') === 'self-hosted') {
Timer::tick(5000, function () use ($register, $stats, &$statsDocument, $logError) {
$payload = [];
foreach ($stats as $projectId => $value) {
$payload[$projectId] = $stats->get($projectId, 'connectionsTotal');
}
if (empty($payload) || empty($statsDocument)) {
return;
}
try {
$database = getConsoleDB();
$statsDocument
->setAttribute('timestamp', DateTime::now())
->setAttribute('value', json_encode($payload));
$database->getAuthorization()->skip(fn () => $database->updateDocument('realtime', $statsDocument->getId(), $statsDocument));
} catch (Throwable $th) {
$logError($th, "updateWorkerDocument");
}
});
}
});
$server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $realtime, $logError) {
Console::success('Worker ' . $workerId . ' started successfully');
$telemetry = getTelemetry($workerId);
$register->set('telemetry', fn () => $telemetry);
$register->set('telemetry.connectionCounter', fn () => $telemetry->createUpDownCounter('realtime.server.open_connections'));
$register->set('telemetry.connectionCreatedCounter', fn () => $telemetry->createCounter('realtime.server.connection.created'));
$register->set('telemetry.messageSentCounter', fn () => $telemetry->createCounter('realtime.server.message.sent'));
$attempts = 0;
$start = time();
Timer::tick(5000, function () use ($server, $register, $realtime, $stats, $logError) {
/**
* Sending current connections to project channels on the console project every 5 seconds.
*/
// TODO: Remove this if check once it doesn't cause issues for cloud
if (System::getEnv('_APP_EDITION', 'self-hosted') === 'self-hosted') {
if ($realtime->hasSubscriber('console', Role::users()->toString(), 'project')) {
$database = getConsoleDB();
$payload = [];
$list = $database->getAuthorization()->skip(fn () => $database->find('realtime', [
Query::greaterThan('timestamp', DateTime::addSeconds(new \DateTime(), -15)),
]));
/**
* Aggregate stats across containers.
*/
foreach ($list as $document) {
foreach (json_decode($document->getAttribute('value')) as $projectId => $value) {
if (array_key_exists($projectId, $payload)) {
$payload[$projectId] += $value;
} else {
$payload[$projectId] = $value;
}
}
}
foreach ($stats as $projectId => $value) {
if (!array_key_exists($projectId, $payload)) {
continue;
}
$event = [
'project' => 'console',
'roles' => ['team:' . $stats->get($projectId, 'teamId')],
'data' => [
'events' => ['stats.connections'],
'channels' => ['project'],
'timestamp' => DateTime::formatTz(DateTime::now()),
'payload' => [
$projectId => $payload[$projectId]
]
]
];
$server->send($realtime->getSubscribers($event), json_encode([
'type' => 'event',
'data' => $event['data']
]));
}
}
}
/**
* Sending test message for SDK E2E tests every 5 seconds.
*/
if ($realtime->hasSubscriber('console', Role::guests()->toString(), 'tests')) {
$payload = ['response' => 'WS:/v1/realtime:passed'];
$event = [
'project' => 'console',
'roles' => [Role::guests()->toString()],
'data' => [
'events' => ['test.event'],
'channels' => ['tests'],
'timestamp' => DateTime::formatTz(DateTime::now()),
'payload' => $payload
]
];
$subscribers = $realtime->getSubscribers($event);
$groups = [];
foreach ($subscribers as $id => $matched) {
$key = implode(',', array_keys($matched));
$groups[$key]['ids'][] = $id;
$groups[$key]['subscriptions'] = array_keys($matched);
}
foreach ($groups as $group) {
$data = $event['data'];
$data['subscriptions'] = $group['subscriptions'];
$server->send($group['ids'], json_encode([
'type' => 'event',
'data' => $data
]));
}
}
});
while ($attempts < 300) {
try {
if ($attempts > 0) {
Console::error('Pub/sub connection lost (lasted ' . (time() - $start) . ' seconds, worker: ' . $workerId . ').
Attempting restart in 5 seconds (attempt #' . $attempts . ')');
sleep(5); // 5 sec delay between connection attempts
}
$start = time();
$pubsub = new PubSubPool($register->get('pools')->get('pubsub'));
if ($pubsub->ping(true)) {
$attempts = 0;
Console::success('Pub/sub connection established (worker: ' . $workerId . ')');
} else {
Console::error('Pub/sub failed (worker: ' . $workerId . ')');
}
$pubsub->subscribe(['realtime'], function (mixed $redis, string $channel, string $payload) use ($server, $workerId, $stats, $register, $realtime) {
$event = json_decode($payload, true);
if ($event['permissionsChanged'] && isset($event['userId'])) {
$projectId = $event['project'];
$userId = $event['userId'];
if ($realtime->hasSubscriber($projectId, 'user:' . $userId)) {
$connection = array_key_first(reset($realtime->subscriptions[$projectId]['user:' . $userId]));
$consoleDatabase = getConsoleDB();
$project = $consoleDatabase->getAuthorization()->skip(fn () => $consoleDatabase->getDocument('projects', $projectId));
$database = getProjectDB($project);
/** @var Appwrite\Utopia\Database\Documents\User $user */
$user = $database->getDocument('users', $userId);
$roles = $user->getRoles($database->getAuthorization());
$authorization = $realtime->connections[$connection]['authorization'] ?? null;
$meta = $realtime->getSubscriptionMetadata($connection);
$realtime->unsubscribe($connection);
foreach ($meta as $subscriptionId => $subscription) {
$queries = Query::parseQueries($subscription['queries'] ?? []);
$realtime->subscribe(
$projectId,
$connection,
$subscriptionId,
$roles,
$subscription['channels'] ?? [],
$queries
);
}
// Restore authorization after subscribe
if ($authorization !== null) {
$realtime->connections[$connection]['authorization'] = $authorization;
}
}
}
$receivers = $realtime->getSubscribers($event);
if (Http::isDevelopment() && !empty($receivers)) {
Console::log("[Debug][Worker {$workerId}] Receivers: " . count($receivers));
Console::log("[Debug][Worker {$workerId}] Connection IDs: " . json_encode(array_keys($receivers)));
Console::log("[Debug][Worker {$workerId}] Matched: " . json_encode(array_values($receivers)));
Console::log("[Debug][Worker {$workerId}] Event: " . $payload);
}
// Group connections by matched subscription IDs for batch sending
$groups = [];
foreach ($receivers as $id => $matched) {
$key = implode(',', array_keys($matched));
$groups[$key]['ids'][] = $id;
$groups[$key]['subscriptions'] = array_keys($matched);
}
$total = 0;
foreach ($groups as $group) {
$data = $event['data'];
$data['subscriptions'] = $group['subscriptions'];
$server->send($group['ids'], json_encode([
'type' => 'event',
'data' => $data
]));
$total += count($group['ids']);
}
if ($total > 0) {
$register->get('telemetry.messageSentCounter')->add($total);
$stats->incr($event['project'], 'messages', $total);
}
});
} catch (Throwable $th) {
$logError($th, "pubSubConnection");
Console::error('Pub/sub error: ' . $th->getMessage());
$attempts++;
sleep(DATABASE_RECONNECT_SLEEP);
continue;
}
}
Console::error('Failed to restart pub/sub...');
});
$server->onOpen(function (int $connection, SwooleRequest $request) use ($server, $register, $stats, &$realtime, $logError) {
$app = new Http('UTC');
$request = new Request($request);
$response = new Response(new SwooleResponse());
Console::info("Connection open (user: {$connection})");
Http::setResource('pools', fn () => $register->get('pools'));
Http::setResource('request', fn () => $request);
Http::setResource('response', fn () => $response);
try {
/** @var Document $project */
$project = $app->getResource('project');
$authorization = $app->getResource('authorization');
/*
* Project Check
*/
if (empty($project->getId())) {
throw new Exception(Exception::REALTIME_POLICY_VIOLATION, 'Missing or unknown project ID');
}
if (
array_key_exists('realtime', $project->getAttribute('apis', []))
&& !$project->getAttribute('apis', [])['realtime']
&& !(User::isPrivileged($authorization->getRoles()) || User::isApp($authorization->getRoles()))
) {
throw new AppwriteException(AppwriteException::GENERAL_API_DISABLED);
}
$timelimit = $app->getResource('timelimit');
$user = $app->getResource('user'); /** @var User $user */
/*
* Abuse Check
*
* Abuse limits are connecting 128 times per minute and ip address.
*/
$timelimit = $timelimit('url:{url},ip:{ip}', 128, 60);
$timelimit
->setParam('{ip}', $request->getIP())
->setParam('{url}', $request->getURI());
$abuse = new Abuse($timelimit);
if (System::getEnv('_APP_OPTIONS_ABUSE', 'enabled') === 'enabled' && $abuse->check()) {
throw new Exception(Exception::REALTIME_TOO_MANY_MESSAGES, 'Too many requests');
}
/*
* Validate Client Domain - Check to avoid CSRF attack.
* Adding Appwrite API domains to allow XDOMAIN communication.
* Skip this check for non-web platforms which are not required to send an origin header.
*/
$origin = $request->getOrigin();
$originValidator = $app->getResource('originValidator');
if (!empty($origin) && !$originValidator->isValid($origin) && $project->getId() !== 'console') {
throw new Exception(Exception::REALTIME_POLICY_VIOLATION, $originValidator->getDescription());
}
$roles = $user->getRoles($authorization);
$channels = Realtime::convertChannels($request->getQuery('channels', []), $user->getId());
/**
* Channels Check
*/
if (empty($channels)) {
throw new Exception(Exception::REALTIME_POLICY_VIOLATION, 'Missing channels');
}
$names = array_keys($channels);
try {
$subscriptions = Realtime::constructSubscriptions(
$names,
fn ($channel) => $request->getQuery($channel, null)
);
} catch (QueryException $e) {
throw new Exception(Exception::REALTIME_POLICY_VIOLATION, $e->getMessage());
}
$mapping = [];
foreach ($subscriptions as $index => $subscription) {
$subscriptionId = ID::unique();
$realtime->subscribe(
$project->getId(),
$connection,
$subscriptionId,
$roles,
$subscription['channels'],
$subscription['queries']
);
$mapping[$index] = $subscriptionId;
}
$realtime->connections[$connection]['authorization'] = $authorization;
$user = empty($user->getId()) ? null : $response->output($user, Response::MODEL_ACCOUNT);
$server->send([$connection], json_encode([
'type' => 'connected',
'data' => [
'channels' => $names,
'subscriptions' => $mapping,
'user' => $user
]
]));
$register->get('telemetry.connectionCounter')->add(1);
$register->get('telemetry.connectionCreatedCounter')->add(1);
$stats->set($project->getId(), [
'projectId' => $project->getId(),
'teamId' => $project->getAttribute('teamId')
]);
$stats->incr($project->getId(), 'connections');
$stats->incr($project->getId(), 'connectionsTotal');
} catch (Throwable $th) {
$logError($th, "initServer");
// Handle SQL error code is 'HY000'
$code = $th->getCode();
if (!is_int($code)) {
$code = 500;
}
$message = $th->getMessage();
// sanitize 0 && 5xx errors
$realtimeViolation = $th instanceof AppwriteException && $th->getType() === AppwriteException::REALTIME_POLICY_VIOLATION;
if (($code === 0 || $code >= 500) && !$realtimeViolation && !Http::isDevelopment()) {
$message = 'Error: Server Error';
}
$response = [
'type' => 'error',
'data' => [
'code' => $code,
'message' => $message
]
];
$server->send([$connection], json_encode($response));
$server->close($connection, $code);
if (Http::isDevelopment()) {
Console::error('[Error] Connection Error');
Console::error('[Error] Code: ' . $response['data']['code']);
Console::error('[Error] Message: ' . $response['data']['message']);
}
}
});
$server->onMessage(function (int $connection, string $message) use ($server, $register, $realtime, $containerId, $logError) {
try {
$response = new Response(new SwooleResponse());
$projectId = $realtime->connections[$connection]['projectId'] ?? null;
// Get authorization from connection (stored during onOpen)
$authorization = $realtime->connections[$connection]['authorization'] ?? null;
if ($authorization === null) {
$authorization = new Authorization('');
}
$database = getConsoleDB();
$database->setAuthorization($authorization);
if ($projectId !== 'console') {
$project = $authorization->skip(fn () => $database->getDocument('projects', $projectId));
$database = getProjectDB($project);
$database->setAuthorization($authorization);
} else {
$project = null;
}
/*
* Abuse Check
*
* Abuse limits are sending 32 times per minute and connection.
*/
$timeLimit = getTimelimit('url:{url},connection:{connection}', 32, 60);
$timeLimit
->setParam('{connection}', $connection)
->setParam('{container}', $containerId);
$abuse = new Abuse($timeLimit);
if ($abuse->check() && System::getEnv('_APP_OPTIONS_ABUSE', 'enabled') === 'enabled') {
throw new Exception(Exception::REALTIME_TOO_MANY_MESSAGES, 'Too many messages.');
}
$message = json_decode($message, true);
if (is_null($message) || (!array_key_exists('type', $message) && !array_key_exists('data', $message))) {
throw new Exception(Exception::REALTIME_MESSAGE_FORMAT_INVALID, 'Message format is not valid.');
}
switch ($message['type']) {
case 'ping':
$server->send([$connection], json_encode([
'type' => 'pong'
]));
break;
case 'authentication':
if (!array_key_exists('session', $message['data'])) {
throw new Exception(Exception::REALTIME_MESSAGE_FORMAT_INVALID, 'Payload is not valid.');
}
$store = new Store();
$store->decode($message['data']['session']);
/** @var User $user */
$user = $database->getDocument('users', $store->getProperty('id', ''));
/**
* TODO:
* Moving forward, we should try to use our dependency injection container
* to inject the proof for token.
* This way we will have one source of truth for the proof for token.
*/
$proofForToken = new Token();
$proofForToken->setHash(new Sha());
if (
empty($user->getId()) // Check a document has been found in the DB
|| !$user->sessionVerify($store->getProperty('secret', ''), $proofForToken) // Validate user has valid login token
) {
// cookie not valid
throw new Exception(Exception::REALTIME_MESSAGE_FORMAT_INVALID, 'Session is not valid.');
}
$roles = $user->getRoles($database->getAuthorization());
$authorization = $realtime->connections[$connection]['authorization'] ?? null;
$projectId = $realtime->connections[$connection]['projectId'] ?? null;
$meta = $realtime->getSubscriptionMetadata($connection);
$realtime->unsubscribe($connection);
if (!empty($projectId)) {
foreach ($meta as $subscriptionId => $subscription) {
$queries = Query::parseQueries($subscription['queries'] ?? []);
$realtime->subscribe(
$projectId,
$connection,
$subscriptionId,
$roles,
$subscription['channels'] ?? [],
$queries
);
}
}
if ($authorization !== null) {
$realtime->connections[$connection]['authorization'] = $authorization;
}
$user = $response->output($user, Response::MODEL_ACCOUNT);
$server->send([$connection], json_encode([
'type' => 'response',
'data' => [
'to' => 'authentication',
'success' => true,
'user' => $user
]
]));
break;
default:
throw new Exception(Exception::REALTIME_MESSAGE_FORMAT_INVALID, 'Message type is not valid.');
}
} catch (Throwable $th) {
$logError($th, "realtimeMessage");
$code = $th->getCode();
if (!is_int($code)) {
$code = 500;
}
$message = $th->getMessage();
// sanitize 0 && 5xx errors
if (($code === 0 || $code >= 500) && !Http::isDevelopment()) {
$message = 'Error: Server Error';
}
$response = [
'type' => 'error',
'data' => [
'code' => $code,
'message' => $message
]
];
$server->send([$connection], json_encode($response));
if ($th->getCode() === 1008) {
$server->close($connection, $th->getCode());
}
}
});
$server->onClose(function (int $connection) use ($realtime, $stats, $register) {
if (array_key_exists($connection, $realtime->connections)) {
$stats->decr($realtime->connections[$connection]['projectId'], 'connectionsTotal');
$register->get('telemetry.connectionCounter')->add(-1);
}
$realtime->unsubscribe($connection);
Console::info('Connection close: ' . $connection);
});
$server->start();