feat: implement deduplication for presence delete events and enhance presence management

This commit is contained in:
ArnabChatterjee20k
2026-05-13 12:20:12 +05:30
parent efadf1d74b
commit e799589d11
5 changed files with 252 additions and 3 deletions
+10
View File
@@ -773,6 +773,16 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
}
}
// Strip deleted presences from in-memory connection state so onClose doesn't
// re-fire delete events for rows already removed via HTTP DELETE.
$deletedPresenceId = Realtime::extractDeletedPresenceId($event);
if ($deletedPresenceId !== null) {
$realtime->removePresenceFromConnections(
(string) ($event['project'] ?? ''),
$deletedPresenceId,
);
}
$receivers = $realtime->getSubscribers($event);
if (System::getEnv('_APP_ENV', 'production') === 'development' && !empty($receivers)) {
@@ -204,6 +204,74 @@ class Realtime extends MessagingAdapter
return $subscriptions;
}
/**
* Dedup delete presence triggers.
* Scenario: when client is connected to realtime and a delete call is made throught rest.
* If not dedupe then two delete events will get triggered. So remove the presenceIds
*
* @param string $projectId
* @param string $presenceId
* @return int Number of connections whose presences map was updated.
*/
public function removePresenceFromConnections(string $projectId, string $presenceId): int
{
if ($projectId === '' || $presenceId === '') {
return 0;
}
$removed = 0;
foreach ($this->connections as $connectionId => $connection) {
if (($connection['projectId'] ?? null) !== $projectId) {
continue;
}
if (!isset($connection['presences'][$presenceId])) {
continue;
}
unset($this->connections[$connectionId]['presences'][$presenceId]);
$removed++;
}
return $removed;
}
/**
* Returns the presence ID carried by a `presences.{id}.delete` event payload,
* or null when the event is not a presence delete.
*
* @param array $event Decoded pubsub payload produced by self::send().
* @return string|null
*/
public static function extractDeletedPresenceId(array $event): ?string
{
$events = $event['data']['events'] ?? [];
if (!\is_array($events)) {
return null;
}
$isPresenceDelete = false;
foreach ($events as $eventName) {
if (
\is_string($eventName)
&& \str_starts_with($eventName, 'presences.')
&& \str_ends_with($eventName, '.delete')
) {
$isPresenceDelete = true;
break;
}
}
if (!$isPresenceDelete) {
return null;
}
$presenceId = $event['data']['payload']['$id'] ?? null;
if (!\is_string($presenceId) || $presenceId === '') {
return null;
}
return $presenceId;
}
/**
* Removes all subscriptions for a connection.
*
@@ -98,11 +98,11 @@ class Presence extends Action
},
);
$presence->removeAttribute('$collection');
$presence->removeAttribute('$tenant');
$presence->removeAttribute('hostname');
$presence->removeAttribute('perms_md5');
// Stash the Document keyed by ID so onClose can build delete-event payloads without
// re-reading the row from the DB. hostname is already stripped above so it won't leak
// into the realtime payload sent to subscribers.
$realtime->connections[$connectionId]['presences'][$presence->getId()] = $presence;
$triggerPresenceEvent($project, $user, 'presences.[presenceId].upsert', $presence);
@@ -2,6 +2,7 @@
namespace Tests\E2E\Services\Presences;
use Appwrite\Tests\Async\Exceptions\Critical;
use Tests\E2E\Client;
use Tests\E2E\Scopes\ProjectCustom;
use Tests\E2E\Scopes\Scope;
@@ -589,4 +590,94 @@ class PresenceRealtimeClientTest extends Scope
$listener->close();
}
}
public function testHttpDeleteThenCloseDoesNotDuplicateDeleteEvent(): void
{
[$project, $user, $headers] = $this->bootstrapIsolatedProject();
$presenceId = ID::unique();
$metadata = ['testRunId' => ID::unique(), 'source' => 'http-delete-then-close'];
$publisher = $this->connectRealtimeAndSubscribe($project, $headers, ['presences', 'presences.' . $presenceId], timeout: 1);
$listener = $this->connectRealtimeAndSubscribe($project, $headers, ['presences', 'presences.' . $presenceId], timeout: 1);
try {
// Publish a presence over WebSocket so the realtime worker tracks it in
// its in-memory connection map under the publisher connection.
$this->sendPresenceMessage(
$publisher,
$presenceId,
'online',
$metadata,
$this->getPresencePermissions(Role::any())
);
$this->collectPresenceOutcome($publisher, $presenceId, 'online', $metadata, $user['$id']);
$this->receivePresenceEvent($listener, $presenceId, 'upsert', 'online', $metadata, $user['$id']);
// HTTP DELETE removes the row from the DB and emits the delete event via pubsub.
// The realtime worker is expected to strip the presence from the publisher's
// in-memory connection state when it processes the pubsub message.
$delete = $this->client->call(
Client::METHOD_DELETE,
'/presences/' . $presenceId,
$this->getServerHeaders($project)
);
$this->assertSame(204, $delete['headers']['status-code']);
// Synchronization point: wait for the listener to receive the legitimate
// delete event before closing the publisher. Redis pubsub broadcasts to
// every realtime worker simultaneously, so the listener's worker observing
// the event implies the publisher's worker has also processed it (and run
// the in-memory cleanup) by the time onClose fires.
$deleteEvents = [];
$deleteEvents[] = $this->receivePresenceEvent($listener, $presenceId, 'delete', 'online', $metadata, $user['$id']);
$publisher->close();
// Watch for any additional presences.{id}.delete frame. A second one would
// be the regression: onClose re-firing the event for a presence already
// removed via HTTP DELETE.
$deadline = \microtime(true) + 2.0;
$this->assertEventually(
function () use ($listener, $presenceId, $deadline, &$deleteEvents): void {
try {
$raw = $listener->receive();
$frame = \json_decode($raw, true);
if (
\is_array($frame)
&& ($frame['type'] ?? null) === 'event'
&& ($frame['data']['payload']['$id'] ?? null) === $presenceId
&& \in_array('presences.' . $presenceId . '.delete', $frame['data']['events'] ?? [], true)
) {
$deleteEvents[] = $frame;
if (\count($deleteEvents) > 1) {
throw new Critical(
'Duplicate presence delete event after HTTP DELETE + WebSocket close: '
. \json_encode($frame)
);
}
}
} catch (TimeoutException) {
// No frame this poll; fall through to deadline check.
}
if (\microtime(true) < $deadline) {
// Throw a non-Critical exception so assertEventually retries.
throw new \RuntimeException('still watching for duplicate delete event');
}
},
timeoutMs: 3000,
waitMs: 0
);
$this->assertCount(
1,
$deleteEvents,
'Expected exactly one presences.' . $presenceId . '.delete event; got ' . \count($deleteEvents)
);
$this->assertPresenceRealtimeEvent($deleteEvents[0], $presenceId, 'delete', 'online', $metadata, $user['$id']);
} finally {
$listener->close();
}
}
}
+80
View File
@@ -1046,4 +1046,84 @@ class MessagingTest extends TestCase
$this->assertContains('presences.' . $presenceId, $result['channels']);
$this->assertContains(Role::any()->toString(), $result['roles']);
}
public function testExtractDeletedPresenceIdReturnsIdForDeleteEvent(): void
{
$event = [
'project' => 'proj',
'data' => [
'events' => [
'presences.abc.delete',
'presences.*.delete',
'presences.abc',
],
'payload' => ['$id' => 'abc'],
],
];
$this->assertSame('abc', Realtime::extractDeletedPresenceId($event));
}
public function testExtractDeletedPresenceIdRejectsNonDeleteEvents(): void
{
$this->assertNull(Realtime::extractDeletedPresenceId([
'data' => [
'events' => ['presences.abc.upsert'],
'payload' => ['$id' => 'abc'],
],
]));
// Unrelated resource that happens to end with `.delete` must not trigger.
$this->assertNull(Realtime::extractDeletedPresenceId([
'data' => [
'events' => ['documents.abc.delete'],
'payload' => ['$id' => 'abc'],
],
]));
// Missing payload ID — the event names look right but we have nothing to remove.
$this->assertNull(Realtime::extractDeletedPresenceId([
'data' => [
'events' => ['presences.abc.delete'],
'payload' => [],
],
]));
}
public function testRemovePresenceFromConnectionsScopedToProject(): void
{
$realtime = new Realtime();
// Two connections in different projects both holding the same presence ID; only
// the matching project should be touched.
$realtime->connections[1] = [
'projectId' => 'proj-a',
'presences' => ['p1' => new Document(['$id' => 'p1']), 'p2' => new Document(['$id' => 'p2'])],
];
$realtime->connections[2] = [
'projectId' => 'proj-b',
'presences' => ['p1' => new Document(['$id' => 'p1'])],
];
$removed = $realtime->removePresenceFromConnections('proj-a', 'p1');
$this->assertSame(1, $removed);
$this->assertArrayNotHasKey('p1', $realtime->connections[1]['presences']);
$this->assertArrayHasKey('p2', $realtime->connections[1]['presences']);
$this->assertArrayHasKey('p1', $realtime->connections[2]['presences']);
}
public function testRemovePresenceFromConnectionsNoMatchIsNoOp(): void
{
$realtime = new Realtime();
$realtime->connections[1] = [
'projectId' => 'proj-a',
'presences' => ['p1' => new Document(['$id' => 'p1'])],
];
$this->assertSame(0, $realtime->removePresenceFromConnections('proj-a', 'missing'));
$this->assertSame(0, $realtime->removePresenceFromConnections('', 'p1'));
$this->assertSame(0, $realtime->removePresenceFromConnections('proj-a', ''));
$this->assertArrayHasKey('p1', $realtime->connections[1]['presences']);
}
}