diff --git a/app/init/resources.php b/app/init/resources.php index a031292ca7..978a51be7e 100644 --- a/app/init/resources.php +++ b/app/init/resources.php @@ -524,6 +524,10 @@ Http::setResource('project', function ($dbForPlatform, $request, $console, $auth /** @var Utopia\Database\Document $console */ $projectId = $request->getParam('project', $request->getHeader('x-appwrite-project', '')); + // Realtime channel "project" can send project=Query array + if (!\is_string($projectId)) { + $projectId = $request->getHeader('x-appwrite-project', ''); + } if (empty($projectId) || $projectId === 'console') { return $console; diff --git a/app/realtime.php b/app/realtime.php index 97e7465683..0c5d041dc9 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -690,11 +690,11 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server, $code = 500; } - $message = $th->getMessage(); // sanitize 0 && 5xx errors - if (($code === 0 || $code >= 500) && !Http::isDevelopment()) { + $realtimeViolation = $th instanceof AppwriteException && $th->getType() === AppwriteException::REALTIME_POLICY_VIOLATION; + if (($code === 0 || $code >= 500) && !$realtimeViolation && !Http::isDevelopment()) { $message = 'Error: Server Error'; } @@ -717,7 +717,7 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server, } }); -$server->onMessage(function (int $connection, string $message) use ($server, $register, $realtime, $containerId) { +$server->onMessage(function (int $connection, string $message) use ($server, $register, $realtime, $containerId, $logError) { try { $response = new Response(new SwooleResponse()); $projectId = $realtime->connections[$connection]['projectId'] ?? null; @@ -840,6 +840,7 @@ $server->onMessage(function (int $connection, string $message) use ($server, $re throw new Exception(Exception::REALTIME_MESSAGE_FORMAT_INVALID, 'Message type is not valid.'); } } catch (Throwable $th) { + $logError($th, "realtimeMessage"); $code = $th->getCode(); if (!is_int($code)) { $code = 500; diff --git a/src/Appwrite/Messaging/Adapter/Realtime.php b/src/Appwrite/Messaging/Adapter/Realtime.php index 51aad2a635..e61cfdc116 100644 --- a/src/Appwrite/Messaging/Adapter/Realtime.php +++ b/src/Appwrite/Messaging/Adapter/Realtime.php @@ -344,8 +344,31 @@ class Realtime extends MessagingAdapter { $subscriptions = []; + /** + * Reserved channel params with expected type + * If matched the expected type then skip the query parsing like in project + */ + $reservedParamExpectedTypes = [ + 'project' => 'string', + ]; + foreach ($channelNames as $channel) { - $params = $getQueryParam(\str_replace('.', '_', $channel)); + $paramKey = \str_replace('.', '_', $channel); + $params = $getQueryParam($paramKey); + + if (\array_key_exists($paramKey, $reservedParamExpectedTypes) && $params !== null) { + $expectedType = $reservedParamExpectedTypes[$paramKey]; + $isExpectedType = match ($expectedType) { + 'array' => \is_array($params), + 'string' => \is_string($params), + default => false, + }; + + // If the value matches the expected type dont use it the queries + if ($isExpectedType) { + $params = null; + } + } if ($params === null) { if (!isset($subscriptions[0])) { @@ -358,7 +381,7 @@ class Realtime extends MessagingAdapter continue; } - if (!is_array($params)) { + if (!\is_array($params)) { $params = [$params]; } @@ -367,12 +390,12 @@ class Realtime extends MessagingAdapter $subscriptions[$index] = ['channels' => [], 'queries' => []]; } - if (!in_array($channel, $subscriptions[$index]['channels'])) { + if (!\in_array($channel, $subscriptions[$index]['channels'], true)) { $subscriptions[$index]['channels'][] = $channel; } if (empty($subscriptions[$index]['queries'])) { - $raw = is_array($slot) ? $slot : [$slot]; + $raw = \is_array($slot) ? $slot : [$slot]; $subscriptions[$index]['queries'] = self::convertQueries($raw); } } diff --git a/tests/e2e/Services/Realtime/RealtimeBase.php b/tests/e2e/Services/Realtime/RealtimeBase.php index 1b77c0ad4a..92d29ba3a3 100644 --- a/tests/e2e/Services/Realtime/RealtimeBase.php +++ b/tests/e2e/Services/Realtime/RealtimeBase.php @@ -68,6 +68,27 @@ trait RealtimeBase ); } + /** + * Build WebSocket client with custom query parameters. + * Useful for testing edge cases like project in header only, or project as Query array. + * + * @param array $queryParams Custom query parameters (e.g., ['channels' => ['project'], 'project' => [...]]) + * @param array $headers HTTP headers + * @return WebSocketClient + */ + private function getWebsocketWithCustomQuery(array $queryParams, array $headers = []): WebSocketClient + { + $queryString = http_build_query($queryParams); + + return new WebSocketClient( + "ws://appwrite.test/v1/realtime?" . $queryString, + [ + "headers" => $headers, + "timeout" => 30, + ] + ); + } + public function testConnection(): void { /** diff --git a/tests/e2e/Services/Realtime/RealtimeCustomClientQueryTest.php b/tests/e2e/Services/Realtime/RealtimeCustomClientQueryTest.php index 37ea1e2e05..b1fbb14bbb 100644 --- a/tests/e2e/Services/Realtime/RealtimeCustomClientQueryTest.php +++ b/tests/e2e/Services/Realtime/RealtimeCustomClientQueryTest.php @@ -2262,4 +2262,192 @@ class RealtimeCustomClientQueryTest extends Scope $client->close(); } + + public function testProjectChannelWithQuery() + { + $user = $this->getUser(); + $session = $user['session'] ?? ''; + $projectId = $this->getProject()['$id']; + + // Test OLD SDK behavior: project=projectId (string) in query param + // For reserved \"project\" param, string is treated as routing-only (project ID), + // and is not used as queries for the project channel. We should fall back to select(*). + $clientOldSdk = $this->getWebsocket(['project'], [ + 'origin' => 'http://localhost', + 'cookie' => 'a_session_' . $projectId . '=' . $session, + ], $projectId, null); + + $response = json_decode($clientOldSdk->receive(), true); + $this->assertEquals('connected', $response['type']); + $this->assertContains('project', $response['data']['channels']); + // Should have default select(['*']) subscription since project param was treated as project ID, not queries + $this->assertArrayHasKey('subscriptions', $response['data']); + $this->assertIsArray($response['data']['subscriptions']); + $this->assertNotEmpty($response['data']['subscriptions']); + + $clientOldSdk->close(); + + // Test NEW SDK behavior: project=Query array in query param, project ID in header + // The reserved param logic should use Query array as subscription queries for project channel + $queryArray = [Query::select(['*'])->toString()]; + $clientNewSdk = $this->getWebsocketWithCustomQuery( + [ + 'channels' => ['project'], + 'project' => [ + 0 => [ + 0 => $queryArray[0] + ] + ] + ], + [ + 'origin' => 'http://localhost', + 'cookie' => 'a_session_' . $projectId . '=' . $session, + 'x-appwrite-project' => $projectId, + ] + ); + + $response = json_decode($clientNewSdk->receive(), true); + $this->assertEquals('connected', $response['type']); + $this->assertContains('project', $response['data']['channels']); + // Should have subscription with the provided query + $this->assertArrayHasKey('subscriptions', $response['data']); + $this->assertIsArray($response['data']['subscriptions']); + $this->assertNotEmpty($response['data']['subscriptions']); + + $clientNewSdk->close(); + + // Test edge case: project param is array but not a valid Query array + // This should now fail with an invalid query error rather than silently falling back. + $clientEdgeCase = $this->getWebsocketWithCustomQuery( + [ + 'channels' => ['project'], + 'project' => ['invalid', 'array'] + ], + [ + 'origin' => 'http://localhost', + 'cookie' => 'a_session_' . $projectId . '=' . $session, + 'x-appwrite-project' => $projectId, + ] + ); + + $response = json_decode($clientEdgeCase->receive(), true); + $this->assertEquals('error', $response['type']); + $this->assertStringContainsString('Invalid query', $response['data']['message']); + } + + public function testProjectChannelWithHeaderOnly() + { + $user = $this->getUser(); + $session = $user['session'] ?? ''; + $projectId = $this->getProject()['$id']; + + // Test: project ID only in header, no project query param + // This simulates a client that only uses x-appwrite-project header + $client = $this->getWebsocketWithCustomQuery( + [ + 'channels' => ['project'] + ], + [ + 'origin' => 'http://localhost', + 'cookie' => 'a_session_' . $projectId . '=' . $session, + 'x-appwrite-project' => $projectId, + ] + ); + + $response = json_decode($client->receive(), true); + $this->assertEquals('connected', $response['type']); + $this->assertContains('project', $response['data']['channels']); + // Should have default select(['*']) subscription since no project query param + $this->assertArrayHasKey('subscriptions', $response['data']); + $this->assertIsArray($response['data']['subscriptions']); + $this->assertNotEmpty($response['data']['subscriptions']); + + $client->close(); + + // Test: project channel with queries, project ID only in header + $queryArray = [Query::select(['*'])->toString()]; + $clientWithQuery = $this->getWebsocketWithCustomQuery( + [ + 'channels' => ['project'], + 'project' => [ + 0 => [ + 0 => $queryArray[0] + ] + ] + ], + [ + 'origin' => 'http://localhost', + 'cookie' => 'a_session_' . $projectId . '=' . $session, + 'x-appwrite-project' => $projectId, + ] + ); + + $response = json_decode($clientWithQuery->receive(), true); + $this->assertEquals('connected', $response['type']); + $this->assertContains('project', $response['data']['channels']); + $this->assertArrayHasKey('subscriptions', $response['data']); + $this->assertIsArray($response['data']['subscriptions']); + $this->assertNotEmpty($response['data']['subscriptions']); + + $clientWithQuery->close(); + } + + public function testTestsChannelWithQueries() + { + $projectId = 'console'; + + // Subscribe without queries - should receive all events + $clientNoQuery = $this->getWebsocket(['tests'], [ + 'origin' => 'http://localhost', + ], $projectId); + + $response = json_decode($clientNoQuery->receive(), true); + $this->assertEquals('connected', $response['type']); + + // Subscribe with matching query - should receive events + $clientWithMatchingQuery = $this->getWebsocket(['tests'], [ + 'origin' => 'http://localhost', + ], $projectId, [ + Query::equal('response', ['WS:/v1/realtime:passed'])->toString(), + ]); + + $response = json_decode($clientWithMatchingQuery->receive(), true); + $this->assertEquals('connected', $response['type']); + + // Subscribe with non-matching query - should NOT receive events + $clientWithNonMatchingQuery = $this->getWebsocket(['tests'], [ + 'origin' => 'http://localhost', + ], $projectId, [ + Query::equal('response', ['failed'])->toString(), + ]); + + $response = json_decode($clientWithNonMatchingQuery->receive(), true); + $this->assertEquals('connected', $response['type']); + + sleep(6); + + // Client without query should receive event + $eventNoQuery = json_decode($clientNoQuery->receive(), true); + $this->assertEquals('event', $eventNoQuery['type']); + $this->assertEquals('test.event', $eventNoQuery['data']['events'][0]); + $this->assertEquals('WS:/v1/realtime:passed', $eventNoQuery['data']['payload']['response']); + + // Client with matching query should receive event + $eventMatching = json_decode($clientWithMatchingQuery->receive(), true); + $this->assertEquals('event', $eventMatching['type']); + $this->assertEquals('test.event', $eventMatching['data']['events'][0]); + $this->assertEquals('WS:/v1/realtime:passed', $eventMatching['data']['payload']['response']); + + // Client with non-matching query should NOT receive event + try { + $clientWithNonMatchingQuery->receive(); + $this->fail('Expected TimeoutException - client with non-matching query should not receive event'); + } catch (TimeoutException $e) { + $this->assertTrue(true); + } + + $clientNoQuery->close(); + $clientWithMatchingQuery->close(); + $clientWithNonMatchingQuery->close(); + } }