diff --git a/app/realtime.php b/app/realtime.php index 4c542c9672..b4c76c6b6f 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -773,6 +773,16 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, } } + // Strip deleted presences from in-memory connection state so onClose doesn't + // re-fire delete events for rows already removed via HTTP DELETE. + $deletedPresenceId = Realtime::extractDeletedPresenceId($event); + if ($deletedPresenceId !== null) { + $realtime->removePresenceFromConnections( + (string) ($event['project'] ?? ''), + $deletedPresenceId, + ); + } + $receivers = $realtime->getSubscribers($event); if (System::getEnv('_APP_ENV', 'production') === 'development' && !empty($receivers)) { diff --git a/src/Appwrite/Messaging/Adapter/Realtime.php b/src/Appwrite/Messaging/Adapter/Realtime.php index 5015aa5465..29ee048977 100644 --- a/src/Appwrite/Messaging/Adapter/Realtime.php +++ b/src/Appwrite/Messaging/Adapter/Realtime.php @@ -204,6 +204,74 @@ class Realtime extends MessagingAdapter return $subscriptions; } + /** + * Dedup delete presence triggers. + * Scenario: when client is connected to realtime and a delete call is made throught rest. + * If not dedupe then two delete events will get triggered. So remove the presenceIds + * + * @param string $projectId + * @param string $presenceId + * @return int Number of connections whose presences map was updated. + */ + public function removePresenceFromConnections(string $projectId, string $presenceId): int + { + if ($projectId === '' || $presenceId === '') { + return 0; + } + + $removed = 0; + foreach ($this->connections as $connectionId => $connection) { + if (($connection['projectId'] ?? null) !== $projectId) { + continue; + } + if (!isset($connection['presences'][$presenceId])) { + continue; + } + unset($this->connections[$connectionId]['presences'][$presenceId]); + $removed++; + } + + return $removed; + } + + /** + * Returns the presence ID carried by a `presences.{id}.delete` event payload, + * or null when the event is not a presence delete. + * + * @param array $event Decoded pubsub payload produced by self::send(). + * @return string|null + */ + public static function extractDeletedPresenceId(array $event): ?string + { + $events = $event['data']['events'] ?? []; + if (!\is_array($events)) { + return null; + } + + $isPresenceDelete = false; + foreach ($events as $eventName) { + if ( + \is_string($eventName) + && \str_starts_with($eventName, 'presences.') + && \str_ends_with($eventName, '.delete') + ) { + $isPresenceDelete = true; + break; + } + } + + if (!$isPresenceDelete) { + return null; + } + + $presenceId = $event['data']['payload']['$id'] ?? null; + if (!\is_string($presenceId) || $presenceId === '') { + return null; + } + + return $presenceId; + } + /** * Removes all subscriptions for a connection. * diff --git a/src/Appwrite/Realtime/Message/Handlers/Presence.php b/src/Appwrite/Realtime/Message/Handlers/Presence.php index 4a764da5c1..76ed48f47e 100644 --- a/src/Appwrite/Realtime/Message/Handlers/Presence.php +++ b/src/Appwrite/Realtime/Message/Handlers/Presence.php @@ -98,11 +98,11 @@ class Presence extends Action }, ); + $presence->removeAttribute('$collection'); + $presence->removeAttribute('$tenant'); $presence->removeAttribute('hostname'); + $presence->removeAttribute('perms_md5'); - // Stash the Document keyed by ID so onClose can build delete-event payloads without - // re-reading the row from the DB. hostname is already stripped above so it won't leak - // into the realtime payload sent to subscribers. $realtime->connections[$connectionId]['presences'][$presence->getId()] = $presence; $triggerPresenceEvent($project, $user, 'presences.[presenceId].upsert', $presence); diff --git a/tests/e2e/Services/Presences/PresenceRealtimeClientTest.php b/tests/e2e/Services/Presences/PresenceRealtimeClientTest.php index f3e4aac1a7..bf2baf9ede 100644 --- a/tests/e2e/Services/Presences/PresenceRealtimeClientTest.php +++ b/tests/e2e/Services/Presences/PresenceRealtimeClientTest.php @@ -2,6 +2,7 @@ namespace Tests\E2E\Services\Presences; +use Appwrite\Tests\Async\Exceptions\Critical; use Tests\E2E\Client; use Tests\E2E\Scopes\ProjectCustom; use Tests\E2E\Scopes\Scope; @@ -589,4 +590,94 @@ class PresenceRealtimeClientTest extends Scope $listener->close(); } } + + public function testHttpDeleteThenCloseDoesNotDuplicateDeleteEvent(): void + { + [$project, $user, $headers] = $this->bootstrapIsolatedProject(); + $presenceId = ID::unique(); + $metadata = ['testRunId' => ID::unique(), 'source' => 'http-delete-then-close']; + + $publisher = $this->connectRealtimeAndSubscribe($project, $headers, ['presences', 'presences.' . $presenceId], timeout: 1); + $listener = $this->connectRealtimeAndSubscribe($project, $headers, ['presences', 'presences.' . $presenceId], timeout: 1); + + try { + // Publish a presence over WebSocket so the realtime worker tracks it in + // its in-memory connection map under the publisher connection. + $this->sendPresenceMessage( + $publisher, + $presenceId, + 'online', + $metadata, + $this->getPresencePermissions(Role::any()) + ); + $this->collectPresenceOutcome($publisher, $presenceId, 'online', $metadata, $user['$id']); + $this->receivePresenceEvent($listener, $presenceId, 'upsert', 'online', $metadata, $user['$id']); + + // HTTP DELETE removes the row from the DB and emits the delete event via pubsub. + // The realtime worker is expected to strip the presence from the publisher's + // in-memory connection state when it processes the pubsub message. + $delete = $this->client->call( + Client::METHOD_DELETE, + '/presences/' . $presenceId, + $this->getServerHeaders($project) + ); + $this->assertSame(204, $delete['headers']['status-code']); + + // Synchronization point: wait for the listener to receive the legitimate + // delete event before closing the publisher. Redis pubsub broadcasts to + // every realtime worker simultaneously, so the listener's worker observing + // the event implies the publisher's worker has also processed it (and run + // the in-memory cleanup) by the time onClose fires. + $deleteEvents = []; + $deleteEvents[] = $this->receivePresenceEvent($listener, $presenceId, 'delete', 'online', $metadata, $user['$id']); + + $publisher->close(); + + // Watch for any additional presences.{id}.delete frame. A second one would + // be the regression: onClose re-firing the event for a presence already + // removed via HTTP DELETE. + $deadline = \microtime(true) + 2.0; + + $this->assertEventually( + function () use ($listener, $presenceId, $deadline, &$deleteEvents): void { + try { + $raw = $listener->receive(); + $frame = \json_decode($raw, true); + if ( + \is_array($frame) + && ($frame['type'] ?? null) === 'event' + && ($frame['data']['payload']['$id'] ?? null) === $presenceId + && \in_array('presences.' . $presenceId . '.delete', $frame['data']['events'] ?? [], true) + ) { + $deleteEvents[] = $frame; + if (\count($deleteEvents) > 1) { + throw new Critical( + 'Duplicate presence delete event after HTTP DELETE + WebSocket close: ' + . \json_encode($frame) + ); + } + } + } catch (TimeoutException) { + // No frame this poll; fall through to deadline check. + } + + if (\microtime(true) < $deadline) { + // Throw a non-Critical exception so assertEventually retries. + throw new \RuntimeException('still watching for duplicate delete event'); + } + }, + timeoutMs: 3000, + waitMs: 0 + ); + + $this->assertCount( + 1, + $deleteEvents, + 'Expected exactly one presences.' . $presenceId . '.delete event; got ' . \count($deleteEvents) + ); + $this->assertPresenceRealtimeEvent($deleteEvents[0], $presenceId, 'delete', 'online', $metadata, $user['$id']); + } finally { + $listener->close(); + } + } } diff --git a/tests/unit/Messaging/MessagingTest.php b/tests/unit/Messaging/MessagingTest.php index d2b125251a..12c99a83ef 100644 --- a/tests/unit/Messaging/MessagingTest.php +++ b/tests/unit/Messaging/MessagingTest.php @@ -1046,4 +1046,84 @@ class MessagingTest extends TestCase $this->assertContains('presences.' . $presenceId, $result['channels']); $this->assertContains(Role::any()->toString(), $result['roles']); } + + public function testExtractDeletedPresenceIdReturnsIdForDeleteEvent(): void + { + $event = [ + 'project' => 'proj', + 'data' => [ + 'events' => [ + 'presences.abc.delete', + 'presences.*.delete', + 'presences.abc', + ], + 'payload' => ['$id' => 'abc'], + ], + ]; + + $this->assertSame('abc', Realtime::extractDeletedPresenceId($event)); + } + + public function testExtractDeletedPresenceIdRejectsNonDeleteEvents(): void + { + $this->assertNull(Realtime::extractDeletedPresenceId([ + 'data' => [ + 'events' => ['presences.abc.upsert'], + 'payload' => ['$id' => 'abc'], + ], + ])); + + // Unrelated resource that happens to end with `.delete` must not trigger. + $this->assertNull(Realtime::extractDeletedPresenceId([ + 'data' => [ + 'events' => ['documents.abc.delete'], + 'payload' => ['$id' => 'abc'], + ], + ])); + + // Missing payload ID — the event names look right but we have nothing to remove. + $this->assertNull(Realtime::extractDeletedPresenceId([ + 'data' => [ + 'events' => ['presences.abc.delete'], + 'payload' => [], + ], + ])); + } + + public function testRemovePresenceFromConnectionsScopedToProject(): void + { + $realtime = new Realtime(); + + // Two connections in different projects both holding the same presence ID; only + // the matching project should be touched. + $realtime->connections[1] = [ + 'projectId' => 'proj-a', + 'presences' => ['p1' => new Document(['$id' => 'p1']), 'p2' => new Document(['$id' => 'p2'])], + ]; + $realtime->connections[2] = [ + 'projectId' => 'proj-b', + 'presences' => ['p1' => new Document(['$id' => 'p1'])], + ]; + + $removed = $realtime->removePresenceFromConnections('proj-a', 'p1'); + + $this->assertSame(1, $removed); + $this->assertArrayNotHasKey('p1', $realtime->connections[1]['presences']); + $this->assertArrayHasKey('p2', $realtime->connections[1]['presences']); + $this->assertArrayHasKey('p1', $realtime->connections[2]['presences']); + } + + public function testRemovePresenceFromConnectionsNoMatchIsNoOp(): void + { + $realtime = new Realtime(); + $realtime->connections[1] = [ + 'projectId' => 'proj-a', + 'presences' => ['p1' => new Document(['$id' => 'p1'])], + ]; + + $this->assertSame(0, $realtime->removePresenceFromConnections('proj-a', 'missing')); + $this->assertSame(0, $realtime->removePresenceFromConnections('', 'p1')); + $this->assertSame(0, $realtime->removePresenceFromConnections('proj-a', '')); + $this->assertArrayHasKey('p1', $realtime->connections[1]['presences']); + } }