Merge pull request #11272 from appwrite/realtime-query-reserved-channels

This commit is contained in:
Jake Barnby
2026-02-11 07:09:28 +00:00
committed by GitHub
5 changed files with 244 additions and 7 deletions
+4
View File
@@ -524,6 +524,10 @@ Http::setResource('project', function ($dbForPlatform, $request, $console, $auth
/** @var Utopia\Database\Document $console */
$projectId = $request->getParam('project', $request->getHeader('x-appwrite-project', ''));
// Realtime channel "project" can send project=Query array
if (!\is_string($projectId)) {
$projectId = $request->getHeader('x-appwrite-project', '');
}
if (empty($projectId) || $projectId === 'console') {
return $console;
+4 -3
View File
@@ -690,11 +690,11 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server,
$code = 500;
}
$message = $th->getMessage();
// sanitize 0 && 5xx errors
if (($code === 0 || $code >= 500) && !Http::isDevelopment()) {
$realtimeViolation = $th instanceof AppwriteException && $th->getType() === AppwriteException::REALTIME_POLICY_VIOLATION;
if (($code === 0 || $code >= 500) && !$realtimeViolation && !Http::isDevelopment()) {
$message = 'Error: Server Error';
}
@@ -717,7 +717,7 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server,
}
});
$server->onMessage(function (int $connection, string $message) use ($server, $register, $realtime, $containerId) {
$server->onMessage(function (int $connection, string $message) use ($server, $register, $realtime, $containerId, $logError) {
try {
$response = new Response(new SwooleResponse());
$projectId = $realtime->connections[$connection]['projectId'] ?? null;
@@ -840,6 +840,7 @@ $server->onMessage(function (int $connection, string $message) use ($server, $re
throw new Exception(Exception::REALTIME_MESSAGE_FORMAT_INVALID, 'Message type is not valid.');
}
} catch (Throwable $th) {
$logError($th, "realtimeMessage");
$code = $th->getCode();
if (!is_int($code)) {
$code = 500;
+27 -4
View File
@@ -344,8 +344,31 @@ class Realtime extends MessagingAdapter
{
$subscriptions = [];
/**
* Reserved channel params with expected type
* If matched the expected type then skip the query parsing like in project
*/
$reservedParamExpectedTypes = [
'project' => 'string',
];
foreach ($channelNames as $channel) {
$params = $getQueryParam(\str_replace('.', '_', $channel));
$paramKey = \str_replace('.', '_', $channel);
$params = $getQueryParam($paramKey);
if (\array_key_exists($paramKey, $reservedParamExpectedTypes) && $params !== null) {
$expectedType = $reservedParamExpectedTypes[$paramKey];
$isExpectedType = match ($expectedType) {
'array' => \is_array($params),
'string' => \is_string($params),
default => false,
};
// If the value matches the expected type dont use it the queries
if ($isExpectedType) {
$params = null;
}
}
if ($params === null) {
if (!isset($subscriptions[0])) {
@@ -358,7 +381,7 @@ class Realtime extends MessagingAdapter
continue;
}
if (!is_array($params)) {
if (!\is_array($params)) {
$params = [$params];
}
@@ -367,12 +390,12 @@ class Realtime extends MessagingAdapter
$subscriptions[$index] = ['channels' => [], 'queries' => []];
}
if (!in_array($channel, $subscriptions[$index]['channels'])) {
if (!\in_array($channel, $subscriptions[$index]['channels'], true)) {
$subscriptions[$index]['channels'][] = $channel;
}
if (empty($subscriptions[$index]['queries'])) {
$raw = is_array($slot) ? $slot : [$slot];
$raw = \is_array($slot) ? $slot : [$slot];
$subscriptions[$index]['queries'] = self::convertQueries($raw);
}
}
@@ -68,6 +68,27 @@ trait RealtimeBase
);
}
/**
* Build WebSocket client with custom query parameters.
* Useful for testing edge cases like project in header only, or project as Query array.
*
* @param array $queryParams Custom query parameters (e.g., ['channels' => ['project'], 'project' => [...]])
* @param array $headers HTTP headers
* @return WebSocketClient
*/
private function getWebsocketWithCustomQuery(array $queryParams, array $headers = []): WebSocketClient
{
$queryString = http_build_query($queryParams);
return new WebSocketClient(
"ws://appwrite.test/v1/realtime?" . $queryString,
[
"headers" => $headers,
"timeout" => 30,
]
);
}
public function testConnection(): void
{
/**
@@ -2262,4 +2262,192 @@ class RealtimeCustomClientQueryTest extends Scope
$client->close();
}
public function testProjectChannelWithQuery()
{
$user = $this->getUser();
$session = $user['session'] ?? '';
$projectId = $this->getProject()['$id'];
// Test OLD SDK behavior: project=projectId (string) in query param
// For reserved \"project\" param, string is treated as routing-only (project ID),
// and is not used as queries for the project channel. We should fall back to select(*).
$clientOldSdk = $this->getWebsocket(['project'], [
'origin' => 'http://localhost',
'cookie' => 'a_session_' . $projectId . '=' . $session,
], $projectId, null);
$response = json_decode($clientOldSdk->receive(), true);
$this->assertEquals('connected', $response['type']);
$this->assertContains('project', $response['data']['channels']);
// Should have default select(['*']) subscription since project param was treated as project ID, not queries
$this->assertArrayHasKey('subscriptions', $response['data']);
$this->assertIsArray($response['data']['subscriptions']);
$this->assertNotEmpty($response['data']['subscriptions']);
$clientOldSdk->close();
// Test NEW SDK behavior: project=Query array in query param, project ID in header
// The reserved param logic should use Query array as subscription queries for project channel
$queryArray = [Query::select(['*'])->toString()];
$clientNewSdk = $this->getWebsocketWithCustomQuery(
[
'channels' => ['project'],
'project' => [
0 => [
0 => $queryArray[0]
]
]
],
[
'origin' => 'http://localhost',
'cookie' => 'a_session_' . $projectId . '=' . $session,
'x-appwrite-project' => $projectId,
]
);
$response = json_decode($clientNewSdk->receive(), true);
$this->assertEquals('connected', $response['type']);
$this->assertContains('project', $response['data']['channels']);
// Should have subscription with the provided query
$this->assertArrayHasKey('subscriptions', $response['data']);
$this->assertIsArray($response['data']['subscriptions']);
$this->assertNotEmpty($response['data']['subscriptions']);
$clientNewSdk->close();
// Test edge case: project param is array but not a valid Query array
// This should now fail with an invalid query error rather than silently falling back.
$clientEdgeCase = $this->getWebsocketWithCustomQuery(
[
'channels' => ['project'],
'project' => ['invalid', 'array']
],
[
'origin' => 'http://localhost',
'cookie' => 'a_session_' . $projectId . '=' . $session,
'x-appwrite-project' => $projectId,
]
);
$response = json_decode($clientEdgeCase->receive(), true);
$this->assertEquals('error', $response['type']);
$this->assertStringContainsString('Invalid query', $response['data']['message']);
}
public function testProjectChannelWithHeaderOnly()
{
$user = $this->getUser();
$session = $user['session'] ?? '';
$projectId = $this->getProject()['$id'];
// Test: project ID only in header, no project query param
// This simulates a client that only uses x-appwrite-project header
$client = $this->getWebsocketWithCustomQuery(
[
'channels' => ['project']
],
[
'origin' => 'http://localhost',
'cookie' => 'a_session_' . $projectId . '=' . $session,
'x-appwrite-project' => $projectId,
]
);
$response = json_decode($client->receive(), true);
$this->assertEquals('connected', $response['type']);
$this->assertContains('project', $response['data']['channels']);
// Should have default select(['*']) subscription since no project query param
$this->assertArrayHasKey('subscriptions', $response['data']);
$this->assertIsArray($response['data']['subscriptions']);
$this->assertNotEmpty($response['data']['subscriptions']);
$client->close();
// Test: project channel with queries, project ID only in header
$queryArray = [Query::select(['*'])->toString()];
$clientWithQuery = $this->getWebsocketWithCustomQuery(
[
'channels' => ['project'],
'project' => [
0 => [
0 => $queryArray[0]
]
]
],
[
'origin' => 'http://localhost',
'cookie' => 'a_session_' . $projectId . '=' . $session,
'x-appwrite-project' => $projectId,
]
);
$response = json_decode($clientWithQuery->receive(), true);
$this->assertEquals('connected', $response['type']);
$this->assertContains('project', $response['data']['channels']);
$this->assertArrayHasKey('subscriptions', $response['data']);
$this->assertIsArray($response['data']['subscriptions']);
$this->assertNotEmpty($response['data']['subscriptions']);
$clientWithQuery->close();
}
public function testTestsChannelWithQueries()
{
$projectId = 'console';
// Subscribe without queries - should receive all events
$clientNoQuery = $this->getWebsocket(['tests'], [
'origin' => 'http://localhost',
], $projectId);
$response = json_decode($clientNoQuery->receive(), true);
$this->assertEquals('connected', $response['type']);
// Subscribe with matching query - should receive events
$clientWithMatchingQuery = $this->getWebsocket(['tests'], [
'origin' => 'http://localhost',
], $projectId, [
Query::equal('response', ['WS:/v1/realtime:passed'])->toString(),
]);
$response = json_decode($clientWithMatchingQuery->receive(), true);
$this->assertEquals('connected', $response['type']);
// Subscribe with non-matching query - should NOT receive events
$clientWithNonMatchingQuery = $this->getWebsocket(['tests'], [
'origin' => 'http://localhost',
], $projectId, [
Query::equal('response', ['failed'])->toString(),
]);
$response = json_decode($clientWithNonMatchingQuery->receive(), true);
$this->assertEquals('connected', $response['type']);
sleep(6);
// Client without query should receive event
$eventNoQuery = json_decode($clientNoQuery->receive(), true);
$this->assertEquals('event', $eventNoQuery['type']);
$this->assertEquals('test.event', $eventNoQuery['data']['events'][0]);
$this->assertEquals('WS:/v1/realtime:passed', $eventNoQuery['data']['payload']['response']);
// Client with matching query should receive event
$eventMatching = json_decode($clientWithMatchingQuery->receive(), true);
$this->assertEquals('event', $eventMatching['type']);
$this->assertEquals('test.event', $eventMatching['data']['events'][0]);
$this->assertEquals('WS:/v1/realtime:passed', $eventMatching['data']['payload']['response']);
// Client with non-matching query should NOT receive event
try {
$clientWithNonMatchingQuery->receive();
$this->fail('Expected TimeoutException - client with non-matching query should not receive event');
} catch (TimeoutException $e) {
$this->assertTrue(true);
}
$clientNoQuery->close();
$clientWithMatchingQuery->close();
$clientWithNonMatchingQuery->close();
}
}