From 09edebd6a3d2bc0d09ade6cf1e085d2cdae06226 Mon Sep 17 00:00:00 2001 From: ArnabChatterjee20k Date: Fri, 23 Jan 2026 00:51:14 +0530 Subject: [PATCH] feat: enhance hasSubscriber method to support query filtering --- app/realtime.php | 18 +++++++++++ src/Appwrite/Messaging/Adapter/Realtime.php | 36 +++++++++++++++++++-- 2 files changed, 52 insertions(+), 2 deletions(-) diff --git a/app/realtime.php b/app/realtime.php index eded4d79bc..2914ee4d98 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -417,6 +417,24 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, /** * Sending test message for SDK E2E tests every 5 seconds. */ + if ($realtime->hasSubscriber('console', Role::guests()->toString(), 'tests', [Query::equal('type', ['tests'])->toString()])) { + $payload = ['response' => 'WS:/v1/realtime:passed', 'type' => 'tests']; + + $event = [ + 'project' => 'console', + 'roles' => [Role::guests()->toString()], + 'data' => [ + 'events' => ['test.event'], + 'channels' => ['tests'], + 'timestamp' => DateTime::formatTz(DateTime::now()), + 'payload' => $payload, + ] + ]; + $server->send($realtime->getSubscribers($event), json_encode([ + 'type' => 'event', + 'data' => $event['data'] + ])); + } if ($realtime->hasSubscriber('console', Role::guests()->toString(), 'tests')) { $payload = ['response' => 'WS:/v1/realtime:passed']; diff --git a/src/Appwrite/Messaging/Adapter/Realtime.php b/src/Appwrite/Messaging/Adapter/Realtime.php index 9e03a7aaf7..a991638ac6 100644 --- a/src/Appwrite/Messaging/Adapter/Realtime.php +++ b/src/Appwrite/Messaging/Adapter/Realtime.php @@ -118,9 +118,10 @@ class Realtime extends MessagingAdapter * @param string $projectId * @param string $role * @param string $channel + * @param array $queries * @return bool */ - public function hasSubscriber(string $projectId, string $role, string $channel = ''): bool + public function hasSubscriber(string $projectId, string $role, string $channel = '', array $queries = []): bool { //TODO: look into moving it to an abstract class in the parent class if (empty($channel)) { @@ -128,9 +129,40 @@ class Realtime extends MessagingAdapter && array_key_exists($role, $this->subscriptions[$projectId]); } - return array_key_exists($projectId, $this->subscriptions) + $hasBasicSubscription = array_key_exists($projectId, $this->subscriptions) && array_key_exists($role, $this->subscriptions[$projectId]) && array_key_exists($channel, $this->subscriptions[$projectId][$role]); + + if (!$hasBasicSubscription) { + return false; + } + + if (empty($queries)) { + return true; + } + + $queries = Realtime::convertQueries($queries); + + $queryStrings = array_map(fn ($query) => $query->toString(), $queries); + + foreach (array_keys($this->subscriptions[$projectId][$role][$channel]) as $connectionId) { + if (!isset($this->connections[$connectionId])) { + continue; + } + $connection = $this->connections[$connectionId]; + if ($connection['projectId'] === $projectId && isset($connection['channels'][$channel])) { + $connectionQueries = $connection['queries'] ?? []; + if (empty($connectionQueries)) { + continue; + } + $connectionQueryStrings = array_map(fn ($query) => $query->toString(), $connectionQueries); + if (empty(array_diff($queryStrings, $connectionQueryStrings))) { + return true; + } + } + } + + return false; } /**