Merge branch 'feat-265-realtime' of https://github.com/appwrite/appwrite into feat-265-realtime-usage

This commit is contained in:
Torsten Dittmann
2021-07-01 10:46:24 +02:00
80 changed files with 2377 additions and 8283 deletions
+28
View File
@@ -271,4 +271,32 @@ class Auth
return false;
}
/**
* Returns all roles for a user.
*
* @param Document $user
* @return array
*/
public static function getRoles(Document $user): array
{
if ($user->getId()) {
$roles[] = 'user:'.$user->getId();
$roles[] = 'role:'.Auth::USER_ROLE_MEMBER;
} else {
return ['role:'.Auth::USER_ROLE_GUEST];
}
foreach ($user->getAttribute('memberships', []) as $node) {
if (isset($node['teamId']) && isset($node['roles'])) {
$roles[] = 'team:' . $node['teamId'];
foreach ($node['roles'] as $nodeRole) { // Set all team roles
$roles[] = 'team:' . $node['teamId'] . '/' . $nodeRole;
}
}
}
return $roles;
}
}
+26 -20
View File
@@ -2,13 +2,12 @@
namespace Appwrite\Database\Adapter;
use Utopia\Registry\Registry;
use Appwrite\Database\Adapter;
use Appwrite\Database\Exception\Duplicate;
use Appwrite\Database\Validator\Authorization;
use Exception;
use PDO;
use Redis as Client;
use Redis;
class MySQL extends Adapter
{
@@ -23,11 +22,6 @@ class MySQL extends Adapter
const OPTIONS_LIMIT_ATTRIBUTES = 1000;
/**
* @var Registry
*/
protected $register;
/**
* Last modified.
*
@@ -42,16 +36,28 @@ class MySQL extends Adapter
*/
protected $debug = [];
/**
* @var PDO
*/
protected $pdo;
/**
* @var Redis
*/
protected $redis;
/**
* Constructor.
*
* Set connection and settings
*
* @param Registry $register
* @param PDO $pdo
* @param Redis $redis
*/
public function __construct(Registry $register)
public function __construct($pdo, Redis $redis)
{
$this->register = $register;
$this->pdo = $pdo;
$this->redis = $redis;
}
/**
@@ -87,8 +93,8 @@ class MySQL extends Adapter
ORDER BY `order`
');
$st->bindParam(':documentUid', $document['uid'], PDO::PARAM_STR);
$st->bindParam(':documentRevision', $document['revision'], PDO::PARAM_STR);
$st->bindParam(':documentUid', $document['uid'], PDO::PARAM_STR, 32);
$st->bindParam(':documentRevision', $document['revision'], PDO::PARAM_STR, 32);
$st->execute();
@@ -116,8 +122,8 @@ class MySQL extends Adapter
ORDER BY `order`
');
$st->bindParam(':start', $document['uid'], PDO::PARAM_STR);
$st->bindParam(':revision', $document['revision'], PDO::PARAM_STR);
$st->bindParam(':start', $document['uid'], PDO::PARAM_STR, 32);
$st->bindParam(':revision', $document['revision'], PDO::PARAM_STR, 32);
$st->execute();
@@ -933,18 +939,18 @@ class MySQL extends Adapter
*
* @throws Exception
*/
protected function getPDO(): PDO
protected function getPDO()
{
return $this->register->get('db');
return $this->pdo;
}
/**
* @throws Exception
*
* @return Client
* @return Redis
*/
protected function getRedis(): Client
protected function getRedis(): Redis
{
return $this->register->get('cache');
return $this->redis;
}
}
}
+7 -8
View File
@@ -2,7 +2,6 @@
namespace Appwrite\Database\Adapter;
use Utopia\Registry\Registry;
use Appwrite\Database\Adapter;
use Exception;
use Redis as Client;
@@ -10,9 +9,9 @@ use Redis as Client;
class Redis extends Adapter
{
/**
* @var Registry
* @var Client
*/
protected $register;
protected $redis;
/**
* @var Adapter
@@ -23,11 +22,11 @@ class Redis extends Adapter
* Redis constructor.
*
* @param Adapter $adapter
* @param Registry $register
* @param Client $redis
*/
public function __construct(Adapter $adapter, Registry $register)
public function __construct(Adapter $adapter, Client $redis)
{
$this->register = $register;
$this->redis = $redis;
$this->adapter = $adapter;
}
@@ -261,7 +260,7 @@ class Redis extends Adapter
*/
protected function getRedis(): Client
{
return $this->register->get('cache');
return $this->redis;
}
/**
@@ -281,4 +280,4 @@ class Redis extends Adapter
return parent::setNamespace($namespace);
}
}
}
-19
View File
@@ -1,19 +0,0 @@
<?php
namespace Appwrite\Database;
abstract class Pool
{
protected $available = true;
protected $pool;
protected $size = 5;
abstract public function get();
public function destruct()
{
$this->available = false;
while (!$this->pool->isEmpty()) {
$this->pool->pop();
}
}
}
-49
View File
@@ -1,49 +0,0 @@
<?php
namespace Appwrite\Database\Pool;
use Appwrite\Database\Pool;
use Appwrite\Extend\PDO;
use SplQueue;
class PDOPool extends Pool
{
public function __construct(int $size, string $host = 'localhost', string $schema = 'appwrite', string $user = '', string $pass = '', string $charset = 'utf8mb4')
{
$this->pool = new SplQueue;
$this->size = $size;
for ($i = 0; $i < $this->size; $i++) {
$pdo = new PDO(
"mysql:" .
"host={$host};" .
"dbname={$schema};" .
"charset={$charset}",
$user,
$pass,
[
PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES utf8mb4',
PDO::ATTR_TIMEOUT => 3, // Seconds
PDO::ATTR_PERSISTENT => true,
PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC,
PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
PDO::MYSQL_ATTR_USE_BUFFERED_QUERY => true
]
);
$this->pool->enqueue($pdo);
}
}
public function put(PDO $pdo)
{
$this->pool->enqueue($pdo);
}
public function get(): PDO
{
if ($this->available && count($this->pool) > 0) {
return $this->pool->dequeue();
}
sleep(0.01);
return $this->get();
}
}
-42
View File
@@ -1,42 +0,0 @@
<?php
namespace Appwrite\Database\Pool;
use Appwrite\Database\Pool;
use SplQueue;
use Redis;
class RedisPool extends Pool
{
public function __construct(int $size, string $host, int $port, array $auth = [])
{
$this->pool = new SplQueue;
$this->size = $size;
for ($i = 0; $i < $this->size; $i++) {
$redis = new Redis();
$redis->pconnect($host, $port);
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
if ($auth) {
$redis->auth($auth);
}
$this->pool->enqueue($redis);
}
}
public function put(Redis $redis)
{
$this->pool->enqueue($redis);
}
public function get(): Redis
{
if ($this->available && !$this->pool->isEmpty()) {
return $this->pool->dequeue();
}
sleep(0.1);
return $this->get();
}
}
-228
View File
@@ -1,228 +0,0 @@
<?php
namespace Appwrite\Event;
use Appwrite\Database\Document;
use Utopia\App;
class Realtime
{
/**
* @var string
*/
protected $project = '';
/**
* @var string
*/
protected $event = '';
/**
* @var string
*/
protected $userId = '';
/**
* @var array
*/
protected $channels = [];
/**
* @var array
*/
protected $permissions = [];
/**
* @var false
*/
protected $permissionsChanged = false;
/**
* @var Document
*/
protected $payload;
/**
* Event constructor.
*
* @param string $project
* @param string $event
* @param array $payload
*/
public function __construct(string $project, string $event, array $payload)
{
$this->project = $project;
$this->event = $event;
$this->payload = new Document($payload);
}
/**
* @param string $project
* return $this
*/
public function setProject(string $project): self
{
$this->project = $project;
return $this;
}
/**
* @param string $userId
* return $this
*/
public function setUserId(string $userId): self
{
$this->userId = $userId;
return $this;
}
/**
* @return string
*/
public function getProject(): string
{
return $this->project;
}
/**
* @param string $event
* return $this
*/
public function setEvent(string $event): self
{
$this->event = $event;
return $this;
}
/**
* @return string
*/
public function getEvent(): string
{
return $this->event;
}
/**
* @param array $payload
* @return $this
*/
public function setPayload(array $payload): self
{
$this->payload = new Document($payload);
return $this;
}
/**
* @return Document
*/
public function getPayload(): Document
{
return $this->payload;
}
/**
* Populate channels array based on the event name and payload.
*
* @return void
*/
private function prepareChannels(): void
{
switch (true) {
case strpos($this->event, 'account.recovery.') === 0:
case strpos($this->event, 'account.sessions.') === 0:
case strpos($this->event, 'account.verification.') === 0:
$this->channels[] = 'account.' . $this->payload->getAttribute('userId');
$this->permissions = ['user:' . $this->payload->getAttribute('userId')];
break;
case strpos($this->event, 'account.') === 0:
$this->channels[] = 'account.' . $this->payload->getId();
$this->permissions = ['user:' . $this->payload->getId()];
break;
case strpos($this->event, 'teams.memberships') === 0:
$this->permissionsChanged = in_array($this->event, ['teams.memberships.update', 'teams.memberships.delete', 'teams.memberships.update.status']);
$this->channels[] = 'memberships';
$this->channels[] = 'memberships.' . $this->payload->getId();
$this->permissions = ['team:' . $this->payload->getAttribute('teamId')];
break;
case strpos($this->event, 'teams.') === 0:
$this->permissionsChanged = $this->event === 'teams.create';
$this->channels[] = 'teams';
$this->channels[] = 'teams.' . $this->payload->getId();
$this->permissions = ['team:' . $this->payload->getId()];
break;
case strpos($this->event, 'database.collections.') === 0:
$this->channels[] = 'collections';
$this->channels[] = 'collections.' . $this->payload->getId();
$this->permissions = $this->payload->getAttribute('$permissions.read');
break;
case strpos($this->event, 'database.documents.') === 0:
$this->channels[] = 'documents';
$this->channels[] = 'collections.' . $this->payload->getAttribute('$collection') . '.documents';
$this->channels[] = 'documents.' . $this->payload->getId();
$this->permissions = $this->payload->getAttribute('$permissions.read');
break;
case strpos($this->event, 'storage.') === 0:
$this->channels[] = 'files';
$this->channels[] = 'files.' . $this->payload->getId();
$this->permissions = $this->payload->getAttribute('$permissions.read');
break;
case strpos($this->event, 'functions.executions.') === 0:
if (!empty($this->payload->getAttribute('$permissions.read'))) {
$this->channels[] = 'executions';
$this->channels[] = 'executions.' . $this->payload->getId();
$this->channels[] = 'functions.' . $this->payload->getAttribute('functionId');
$this->permissions = $this->payload->getAttribute('$permissions.read');
}
break;
}
}
/**
* Execute Event.
*
* @return void
*/
public function trigger(): void
{
$this->prepareChannels();
if (empty($this->channels)) return;
$redis = new \Redis();
$redis->connect(App::getEnv('_APP_REDIS_HOST', ''), App::getEnv('_APP_REDIS_PORT', ''));
$redis->publish('realtime', json_encode([
'project' => $this->project,
'permissions' => $this->permissions,
'permissionsChanged' => $this->permissionsChanged,
'userId' => $this->userId,
'data' => [
'event' => $this->event,
'channels' => $this->channels,
'timestamp' => time(),
'payload' => $this->payload->getArrayCopy()
]
]));
$this->reset();
}
/**
* Resets this event and unpopulates all data.
*
* @return $this
*/
public function reset(): self
{
$this->event = '';
$this->payload = $this->channels = [];
return $this;
}
}
+10
View File
@@ -0,0 +1,10 @@
<?php
namespace Appwrite\Messaging;
abstract class Adapter
{
public abstract function subscribe(string $project, mixed $identifier, array $roles, array $channels): void;
public abstract function unsubscribe(mixed $identifier): void;
public static abstract function send(string $projectId, array $payload, string $event, array $channels, array $permissions, array $options): void;
}
+295
View File
@@ -0,0 +1,295 @@
<?php
namespace Appwrite\Messaging\Adapter;
use Appwrite\Database\Document;
use Appwrite\Messaging\Adapter;
use Utopia\App;
class Realtime extends Adapter
{
/**
* Connection Tree
*
* [CONNECTION_ID] ->
* 'projectId' -> [PROJECT_ID]
* 'roles' -> [ROLE_x, ROLE_Y]
* 'channels' -> [CHANNEL_NAME_X, CHANNEL_NAME_Y, CHANNEL_NAME_Z]
*/
public array $connections = [];
/**
* Subscription Tree
*
* [PROJECT_ID] ->
* [ROLE_X] ->
* [CHANNEL_NAME_X] -> [CONNECTION_ID]
* [CHANNEL_NAME_Y] -> [CONNECTION_ID]
* [CHANNEL_NAME_Z] -> [CONNECTION_ID]
* [ROLE_Y] ->
* [CHANNEL_NAME_X] -> [CONNECTION_ID]
* [CHANNEL_NAME_Y] -> [CONNECTION_ID]
* [CHANNEL_NAME_Z] -> [CONNECTION_ID]
*/
public array $subscriptions = [];
/**
* Adds a subscribtion.
* @param string $projectId Project ID.
* @param mixed $connection Unique Identifier - Connection ID.
* @param array $roles Roles of the Subscription.
* @param array $channels Subscribed Channels.
* @return void
*/
public function subscribe(string $projectId, mixed $connection, array $roles, array $channels): void
{
if (!isset($this->subscriptions[$projectId])) { // Init Project
$this->subscriptions[$projectId] = [];
}
foreach ($roles as $role) {
if (!isset($this->subscriptions[$projectId][$role])) { // Add user first connection
$this->subscriptions[$projectId][$role] = [];
}
foreach ($channels as $channel => $list) {
$this->subscriptions[$projectId][$role][$channel][$connection] = true;
}
}
$this->connections[$connection] = [
'projectId' => $projectId,
'roles' => $roles,
'channels' => $channels
];
}
/**
* Removes Subscription.
*
* @param mixed $connection
* @return void
*/
public function unsubscribe(mixed $connection): void
{
$projectId = $this->connections[$connection]['projectId'] ?? '';
$roles = $this->connections[$connection]['roles'] ?? [];
foreach ($roles as $role) {
foreach ($this->subscriptions[$projectId][$role] as $channel => $list) {
unset($this->subscriptions[$projectId][$role][$channel][$connection]); // Remove connection
if (empty($this->subscriptions[$projectId][$role][$channel])) {
unset($this->subscriptions[$projectId][$role][$channel]); // Remove channel when no connections
}
}
if (empty($this->subscriptions[$projectId][$role])) {
unset($this->subscriptions[$projectId][$role]); // Remove role when no channels
}
}
if (empty($this->subscriptions[$projectId])) { // Remove project when no roles
unset($this->subscriptions[$projectId]);
}
unset($this->connections[$connection]);
}
/**
* Checks if Channel has a subscriber.
* @param string $projectId
* @param string $role
* @param string $channel
* @return bool
*/
public function hasSubscriber(string $projectId, string $role, string $channel = ''): bool
{
if (empty($channel)) {
return array_key_exists($projectId, $this->subscriptions)
&& array_key_exists($role, $this->subscriptions[$projectId]);
}
return array_key_exists($projectId, $this->subscriptions)
&& array_key_exists($role, $this->subscriptions[$projectId])
&& array_key_exists($channel, $this->subscriptions[$projectId][$role]);
}
/**
* Sends an event to the Realtime Server.
* @param string $project
* @param array $payload
* @param string $event
* @param array $channels
* @param array $permissions
* @param array $options
* @return void
*/
public static function send(string $project, array $payload, string $event, array $channels, array $permissions, array $options = []): void
{
if (empty($channels) || empty($permissions) || empty($project)) return;
$permissionsChanged = array_key_exists('permissionsChanged', $options) && $options['permissionsChanged'];
$userId = array_key_exists('userId', $options) ? $options['userId'] : null;
$redis = new \Redis();
$redis->connect(App::getEnv('_APP_REDIS_HOST', ''), App::getEnv('_APP_REDIS_PORT', ''));
$redis->publish('realtime', json_encode([
'project' => $project,
'permissions' => $permissions,
'permissionsChanged' => $permissionsChanged,
'userId' => $userId,
'data' => [
'event' => $event,
'channels' => $channels,
'timestamp' => time(),
'payload' => $payload
]
]));
}
/**
* Identifies the receivers of all subscriptions, based on the permissions and event.
*
* Example of performance with an event with user:XXX permissions and with X users spread across 10 different channels:
* - 0.014 ms (±6.88%) | 10 Connections / 100 Subscriptions
* - 0.070 ms (±3.71%) | 100 Connections / 1,000 Subscriptions
* - 0.846 ms (±2.74%) | 1,000 Connections / 10,000 Subscriptions
* - 10.866 ms (±1.01%) | 10,000 Connections / 100,000 Subscriptions
* - 110.201 ms (±2.32%) | 100,000 Connections / 1,000,000 Subscriptions
* - 1,121.328 ms (±0.84%) | 1,000,000 Connections / 10,000,000 Subscriptions
*
* @param array $event
*/
public function getReceivers(array $event)
{
$receivers = [];
if (isset($this->subscriptions[$event['project']])) {
foreach ($this->subscriptions[$event['project']] as $role => $subscription) {
foreach ($event['data']['channels'] as $channel) {
if (
\array_key_exists($channel, $this->subscriptions[$event['project']][$role])
&& (\in_array($role, $event['permissions']) || \in_array('*', $event['permissions']))
) {
foreach (array_keys($this->subscriptions[$event['project']][$role][$channel]) as $ids) {
$receivers[$ids] = 0;
}
break;
}
}
}
}
return array_keys($receivers);
}
/**
* Converts the channels from the Query Params into an array.
* Also renames the account channel to account.USER_ID and removes all illegal account channel variations.
* @param array $channels
* @param Document $user
* @return array
*/
public static function convertChannels(array $channels, Document $user): array
{
$channels = array_flip($channels);
foreach ($channels as $key => $value) {
switch (true) {
case strpos($key, 'account.') === 0:
unset($channels[$key]);
break;
case $key === 'account':
if (!empty($user->getId())) {
$channels['account.' . $user->getId()] = $value;
}
unset($channels['account']);
break;
}
}
if (\array_key_exists('account', $channels)) {
if ($user->getId()) {
$channels['account.' . $user->getId()] = $channels['account'];
}
unset($channels['account']);
}
return $channels;
}
/**
* Create channels array based on the event name and payload.
*
* @return void
*/
public static function fromPayload(string $event, Document $payload): array
{
$channels = [];
$permissions = [];
$permissionsChanged = false;
switch (true) {
case strpos($event, 'account.recovery.') === 0:
case strpos($event, 'account.sessions.') === 0:
case strpos($event, 'account.verification.') === 0:
$channels[] = 'account.' . $payload->getAttribute('userId');
$permissions = ['user:' . $payload->getAttribute('userId')];
break;
case strpos($event, 'account.') === 0:
$channels[] = 'account.' . $payload->getId();
$permissions = ['user:' . $payload->getId()];
break;
case strpos($event, 'teams.memberships') === 0:
$permissionsChanged = in_array($event, ['teams.memberships.update', 'teams.memberships.delete', 'teams.memberships.update.status']);
$channels[] = 'memberships';
$channels[] = 'memberships.' . $payload->getId();
$permissions = ['team:' . $payload->getAttribute('teamId')];
break;
case strpos($event, 'teams.') === 0:
$permissionsChanged = $event === 'teams.create';
$channels[] = 'teams';
$channels[] = 'teams.' . $payload->getId();
$permissions = ['team:' . $payload->getId()];
break;
case strpos($event, 'database.collections.') === 0:
$channels[] = 'collections';
$channels[] = 'collections.' . $payload->getId();
$permissions = $payload->getAttribute('$permissions.read');
break;
case strpos($event, 'database.documents.') === 0:
$channels[] = 'documents';
$channels[] = 'collections.' . $payload->getAttribute('$collection') . '.documents';
$channels[] = 'documents.' . $payload->getId();
$permissions = $payload->getAttribute('$permissions.read');
break;
case strpos($event, 'storage.') === 0:
$channels[] = 'files';
$channels[] = 'files.' . $payload->getId();
$permissions = $payload->getAttribute('$permissions.read');
break;
case strpos($event, 'functions.executions.') === 0:
if (!empty($payload->getAttribute('$permissions.read'))) {
$channels[] = 'executions';
$channels[] = 'executions.' . $payload->getId();
$channels[] = 'functions.' . $payload->getAttribute('functionId');
$permissions = $payload->getAttribute('$permissions.read');
}
break;
}
return [
'channels' => $channels,
'permissions' => $permissions,
'permissionsChanged' => $permissionsChanged
];
}
}
-202
View File
@@ -1,202 +0,0 @@
<?php
namespace Appwrite\Realtime;
use Appwrite\Auth\Auth;
use Appwrite\Database\Document;
class Parser
{
/**
* @var Document $user
*/
static $user;
/**
* Sets the current user for the role and channel parsing.
*
* @param Document $user
*/
static function setUser(Document $user)
{
self::$user = $user;
}
/**
* Returns array of roles that the set User has permissions to.
*
* @return array
*/
static function getRoles()
{
if (!isset(self::$user)) {
return [];
}
$roles = ['role:' . ((self::$user->isEmpty()) ? Auth::USER_ROLE_GUEST : Auth::USER_ROLE_MEMBER)];
if (!(self::$user->isEmpty())) {
$roles[] = 'user:' . self::$user->getId();
}
foreach (self::$user->getAttribute('memberships', []) as $node) {
if (isset($node['teamId']) && isset($node['roles'])) {
$roles[] = 'team:' . $node['teamId'];
foreach ($node['roles'] as $nodeRole) { // Set all team roles
$roles[] = 'team:' . $node['teamId'] . '/' . $nodeRole;
}
}
}
return $roles;
}
/**
* Converts the channels from the Query Params into an array.
* Also renames the account channel to account.USER_ID and removes all illegal account channel variations.
*
* @param array $channels
*/
static function parseChannels(array $channels)
{
$channels = array_flip($channels);
foreach ($channels as $key => $value) {
switch (true) {
case strpos($key, 'account.') === 0:
unset($channels[$key]);
break;
case $key === 'account':
if (!empty(self::$user->getId())) {
$channels['account.' . self::$user->getId()] = $value;
}
unset($channels['account']);
break;
}
}
if (\array_key_exists('account', $channels)) {
if (self::$user->getId()) {
$channels['account.' . self::$user->getId()] = $channels['account'];
}
unset($channels['account']);
}
return $channels;
}
/**
* Identifies the receivers of all subscriptions, based on the permissions and event.
*
* Example of performance with an event with user:XXX permissions and with X users spread across 10 different channels:
* - 0.014 ms (±6.88%) | 10 Connections / 100 Subscriptions
* - 0.070 ms (±3.71%) | 100 Connections / 1,000 Subscriptions
* - 0.846 ms (±2.74%) | 1,000 Connections / 10,000 Subscriptions
* - 10.866 ms (±1.01%) | 10,000 Connections / 100,000 Subscriptions
* - 110.201 ms (±2.32%) | 100,000 Connections / 1,000,000 Subscriptions
* - 1,121.328 ms (±0.84%) | 1,000,000 Connections / 10,000,000 Subscriptions
*
* @param array $event
* @param array $connections
* @param array $subscriptions
*/
static function identifyReceivers(array &$event, array &$subscriptions)
{
$receivers = [];
if (isset($subscriptions[$event['project']])) {
foreach ($subscriptions[$event['project']] as $role => $subscription) {
foreach ($event['data']['channels'] as $channel) {
if (
\array_key_exists($channel, $subscriptions[$event['project']][$role])
&& (\in_array($role, $event['permissions']) || \in_array('*', $event['permissions']))
) {
foreach (array_keys($subscriptions[$event['project']][$role][$channel]) as $ids) {
$receivers[$ids] = 0;
}
break;
}
}
}
}
return array_keys($receivers);
}
/**
* Adds Subscription.
*
* @param string $projectId
* @param mixed $connection
* @param array $subscriptions
* @param array $roles
* @param array $channels
*/
static function subscribe($projectId, $connection, $roles, &$subscriptions, &$connections, &$channels)
{
/**
* Build Subscriptions Tree
*
* [PROJECT_ID] ->
* [ROLE_X] ->
* [CHANNEL_NAME_X] -> [CONNECTION_ID]
* [CHANNEL_NAME_Y] -> [CONNECTION_ID]
* [CHANNEL_NAME_Z] -> [CONNECTION_ID]
* [ROLE_Y] ->
* [CHANNEL_NAME_X] -> [CONNECTION_ID]
* [CHANNEL_NAME_Y] -> [CONNECTION_ID]
* [CHANNEL_NAME_Z] -> [CONNECTION_ID]
*/
if (!isset($subscriptions[$projectId])) { // Init Project
$subscriptions[$projectId] = [];
}
foreach ($roles as $role) {
if (!isset($subscriptions[$projectId][$role])) { // Add user first connection
$subscriptions[$projectId][$role] = [];
}
foreach ($channels as $channel => $list) {
$subscriptions[$projectId][$role][$channel][$connection] = true;
}
}
$connections[$connection] = [
'projectId' => $projectId,
'roles' => $roles,
'channels' => $channels
];
}
/**
* Remove Subscription.
*
* @param mixed $connection
* @param array $subscriptions
* @param array $connections
*/
static function unsubscribe($connection, &$subscriptions, &$connections)
{
$projectId = $connections[$connection]['projectId'] ?? '';
$roles = $connections[$connection]['roles'] ?? [];
foreach ($roles as $role) {
foreach ($subscriptions[$projectId][$role] as $channel => $list) {
unset($subscriptions[$projectId][$role][$channel][$connection]); // Remove connection
if (empty($subscriptions[$projectId][$role][$channel])) {
unset($subscriptions[$projectId][$role][$channel]); // Remove channel when no connections
}
}
if (empty($subscriptions[$projectId][$role])) {
unset($subscriptions[$projectId][$role]); // Remove role when no channels
}
}
if (empty($subscriptions[$projectId])) { // Remove project when no roles
unset($subscriptions[$projectId]);
}
unset($connections[$connection]);
}
}
-553
View File
@@ -1,553 +0,0 @@
<?php
namespace Appwrite\Realtime;
use Appwrite\Database\Database;
use Appwrite\Database\Adapter\MySQL as MySQLAdapter;
use Appwrite\Database\Adapter\Redis as RedisAdapter;
use Appwrite\Database\Validator\Authorization;
use Appwrite\Event\Event;
use Appwrite\Network\Validator\Origin;
use Appwrite\Utopia\Response;
use Exception;
use Swoole\Coroutine\Redis;
use Swoole\Http\Request;
use Swoole\Http\Response as SwooleResponse;
use Swoole\Process;
use Swoole\Table;
use Swoole\Timer;
use Swoole\WebSocket\Frame;
use Swoole\WebSocket\Server as SwooleServer;
use Utopia\Abuse\Abuse;
use Utopia\Abuse\Adapters\TimeLimit;
use Utopia\App;
use Utopia\CLI\Console;
use Utopia\Config\Config;
use Utopia\Exception as UtopiaException;
use Utopia\Registry\Registry;
use Utopia\Swoole\Request as SwooleRequest;
class Server
{
/**
* Container scoped Registry.
* @var Registry
*/
public Registry $register;
/**
* Container scoped Swoole Server.
* @var SwooleServer
*/
public SwooleServer $server;
/**
* Container scoped Usage Table.
* @var Table
*/
public Table $usage;
/**
* Container scoped Database connection.
* @var Database
*/
public Database $consoleDb;
/**
* Container scoped Database connection.
* @var Database
*/
public Database $projectDb;
/**
* Container scoped Redis connection.
* @var Redis
*/
public Redis $cache;
/**
* Worker scoped subscription.
* @var array
*/
public array $subscriptions;
/**
* Worker scoped connections.
* @var array
*/
public array $connections;
public function __construct(Registry &$register, $host = '0.0.0.0', $port = 80, $config = [])
{
$this->subscriptions = [];
$this->connections = [];
$this->register = $register;
$this->usage = new Table(4096, 1);
$this->usage->column('projectId', Table::TYPE_STRING, 64);
$this->usage->column('connections', Table::TYPE_INT);
$this->usage->column('connectionsTotal', Table::TYPE_INT);
$this->usage->column('messages', Table::TYPE_INT);
$this->usage->create();
$this->consoleDb = new Database();
$this->consoleDb->setAdapter(new RedisAdapter(new MySQLAdapter($this->register), $this->register));
$this->consoleDb->setNamespace('app_console');
$this->consoleDb->setMocks(Config::getParam('collections', []));
$this->projectDb = new Database();
$this->projectDb->setAdapter(new RedisAdapter(new MySQLAdapter($this->register), $this->register));
$this->projectDb->setMocks(Config::getParam('collections', []));
$this->server = new SwooleServer($host, $port, SWOOLE_PROCESS);
$this->server->set($config);
$this->server->on('start', [$this, 'onStart']);
$this->server->on('workerStart', [$this, 'onWorkerStart']);
$this->server->on('open', [$this, 'onOpen']);
$this->server->on('message', [$this, 'onMessage']);
$this->server->on('close', [$this, 'onClose']);
$this->server->container_id = uniqid();
$this->server->start();
}
/**
* This is executed when the Realtime server starts.
*
* @param SwooleServer $server
* @return void
*/
public function onStart(SwooleServer $server): void
{
Console::success('Server started succefully');
Console::info("Master pid {$server->master_pid}, manager pid {$server->manager_pid}");
try {
go(function() {
$document = [
'$collection' => Database::SYSTEM_COLLECTION_CONNECTIONS,
'$permissions' => [
'read' => ['*'],
'write' => ['*'],
],
'container' => $this->server->container_id,
'timestamp' => time(),
'value' => '{}'
];
Authorization::disable();
$document = $this->consoleDb->createDocument($document);
Authorization::enable();
$this->server->document_id = $document->getId();
});
} catch (\Throwable $th) {
Console::error('[Error] Type: '.get_class($th));
Console::error('[Error] Message: '.$th->getMessage());
Console::error('[Error] File: '.$th->getFile());
Console::error('[Error] Line: '.$th->getLine());
}
// Run ever 10 seconds
Timer::tick(10000, function () {
/** @var Table $stats */
foreach ($this->usage as $projectId => $value) {
if (empty($value['connections']) && empty($value['messages'])) {
continue;
}
$connections = $value['connections'];
$messages = $value['messages'];
$usage = new Event('v1-usage', 'UsageV1');
$usage
->setParam('projectId', $projectId)
->setParam('realtimeConnections', $connections)
->setParam('realtimeMessages', $messages)
->setParam('networkRequestSize', 0)
->setParam('networkResponseSize', 0);
$this->usage->set($projectId, [
'projectId' => $projectId,
'messages' => 0,
'connections' => 0
]);
if (App::getEnv('_APP_USAGE_STATS', 'enabled') == 'enabled') {
$usage->trigger();
}
}
});
// Run ever 10 seconds
Timer::tick(10000, function () {
$payload = [];
foreach ($this->usage as $projectId => $value) {
if (!empty($value['connectionsTotal'])) {
$payload[$projectId] = $value['connectionsTotal'];
}
}
if (empty($payload)){
return;
}
$document = [
'$id' => $this->server->document_id,
'$collection' => Database::SYSTEM_COLLECTION_CONNECTIONS,
'$permissions' => [
'read' => ['*'],
'write' => ['*'],
],
'container' => $this->server->container_id,
'timestamp' => time(),
'value' => json_encode($payload)
];
try {
$document = $this->consoleDb->updateDocument($document);
} catch (\Throwable $th) {
Console::error('[Error] Type: '.get_class($th));
Console::error('[Error] Message: '.$th->getMessage());
Console::error('[Error] File: '.$th->getFile());
Console::error('[Error] Line: '.$th->getLine());
}
});
Process::signal(2, function () use ($server) {
Console::log('Stop by Ctrl+C');
$server->shutdown();
});
}
/**
* This is executed when a WebSocket worker process starts.
*
* @param SwooleServer $server
* @param int $workerId
* @return void
* @throws Exception
*/
public function onWorkerStart(SwooleServer $server, int $workerId): void
{
Console::success('Worker ' . $workerId . ' started succefully');
$attempts = 0;
$start = time();
$redisPool = $this->register->get('redisPool');
/**
* Sending current connections to project channels on the console project every 5 seconds.
*/
$server->tick(5000, function () use (&$server) {
$this->tickSendProjectUsage($server);
});
while ($attempts < 300) {
try {
if ($attempts > 0) {
Console::error('Pub/sub connection lost (lasted ' . (time() - $start) . ' seconds, worker: ' . $workerId . ').
Attempting restart in 5 seconds (attempt #' . $attempts . ')');
sleep(5); // 5 sec delay between connection attempts
}
/** @var Swoole\Coroutine\Redis $redis */
$redis = $redisPool->get();
if ($redis->ping(true)) {
$attempts = 0;
Console::success('Pub/sub connection established (worker: ' . $workerId . ')');
} else {
Console::error('Pub/sub failed (worker: ' . $workerId . ')');
}
$redis->subscribe(['realtime'], function ($redis, $channel, $payload) use ($server, $workerId) {
$this->onRedisPublish($payload, $server, $workerId);
});
} catch (\Throwable $th) {
Console::error('Pub/sub error: ' . $th->getMessage());
$redisPool->put($redis);
$attempts++;
continue;
}
$attempts++;
}
Console::error('Failed to restart pub/sub...');
}
/**
* This is executed when a new Realtime connection is established.
* @param SwooleServer $server
* @param Request $request
* @return void
* @throws Exception
* @throws UtopiaException
*/
public function onOpen(SwooleServer $server, Request $request): void
{
$app = new App('UTC');
$connection = $request->fd;
$request = new SwooleRequest($request);
$db = $this->register->get('dbPool')->get();
$redis = $this->register->get('redisPool')->get();
$this->register->set('db', function () use (&$db) {
return $db;
});
$this->register->set('cache', function () use (&$redis) {
return $redis;
});
Console::info("Connection open (user: {$connection}, worker: {$server->getWorkerId()})");
App::setResource('request', function () use ($request) {
return $request;
});
App::setResource('response', function () {
return new Response(new SwooleResponse());
});
try {
/** @var \Appwrite\Database\Document $user */
$user = $app->getResource('user');
/** @var \Appwrite\Database\Document $project */
$project = $app->getResource('project');
/** @var \Appwrite\Database\Document $console */
$console = $app->getResource('console');
/*
* Project Check
*/
if (empty($project->getId())) {
throw new Exception('Missing or unknown project ID', 1008);
}
/*
* Abuse Check
*
* Abuse limits are connecting 128 times per minute and ip address.
*/
$timeLimit = new TimeLimit('url:{url},ip:{ip}', 128, 60, function () use ($db) {
return $db;
});
$timeLimit
->setNamespace('app_' . $project->getId())
->setParam('{ip}', $request->getIP())
->setParam('{url}', $request->getURI());
$abuse = new Abuse($timeLimit);
if ($abuse->check() && App::getEnv('_APP_OPTIONS_ABUSE', 'enabled') === 'enabled') {
throw new Exception('Too many requests', 1013);
}
/*
* Validate Client Domain - Check to avoid CSRF attack.
* Adding Appwrite API domains to allow XDOMAIN communication.
* Skip this check for non-web platforms which are not required to send an origin header.
*/
$origin = $request->getOrigin();
$originValidator = new Origin(\array_merge($project->getAttribute('platforms', []), $console->getAttribute('platforms', [])));
if (!$originValidator->isValid($origin) && $project->getId() !== 'console') {
throw new Exception($originValidator->getDescription(), 1008);
}
Parser::setUser($user);
$roles = Parser::getRoles();
$channels = Parser::parseChannels($request->getQuery('channels', []));
/**
* Channels Check
*/
if (empty($channels)) {
throw new Exception('Missing channels', 1008);
}
Parser::subscribe($project->getId(), $connection, $roles, $this->subscriptions, $this->connections, $channels);
$server->push($connection, json_encode($channels));
$this->usage->incr($project->getId(), 'connections');
$this->usage->incr($project->getId(), 'connectionsTotal');
} catch (\Throwable $th) {
$response = [
'code' => $th->getCode(),
'message' => $th->getMessage()
];
// Temporarily print debug logs by default for Alpha testing.
//if (App::isDevelopment()) {
Console::error("[Error] Connection Error");
Console::error("[Error] Code: " . $response['code']);
Console::error("[Error] Message: " . $response['message']);
//}
$server->push($connection, json_encode($response));
$server->close($connection);
}
/**
* Put used PDO and Redis Connections back into their pools.
*/
/** @var PDOPool $dbPool */
$dbPool = $this->register->get('dbPool');
$dbPool->put($db);
/** @var RedisPool $redisPool */
$redisPool = $this->register->get('redisPool');
$redisPool->put($redis);
}
/**
* This is executed when a message is received by the Realtime server.
*
* @param SwooleServer $server
* @param Frame $frame
* @return void
*/
public function onMessage(SwooleServer $server, Frame $frame)
{
$server->push($frame->fd, 'Sending messages is not allowed.');
$server->close($frame->fd);
}
/**
* This is executed when a Realtime connection is closed.
*
* @param SwooleServer $server
* @param int $connection
* @return void
*/
public function onClose(SwooleServer $server, int $connection)
{
if (array_key_exists($connection, $this->connections)) {
$this->usage->decr($this->connections[$connection]['projectId'], 'connectionsTotal');
}
Parser::unsubscribe($connection, $this->subscriptions, $this->connections);
Console::info('Connection close: ' . $connection);
}
/**
* This is executed when an event is published on realtime channel in Redis.
*
* Supported Resources:
* - Collection
* - Document
* - File
* - Account
* - Session
* - Team? (not implemented yet)
* - Membership? (not implemented yet)
* - Function
* - Execution
*
* @param string $payload
* @param SwooleServer $server
* @param int $workerId
* @return void
*/
public function onRedisPublish(string $payload, SwooleServer &$server, int $workerId)
{
$event = json_decode($payload, true);
if ($event['permissionsChanged'] && isset($event['userId'])) {
$this->addPermission($event);
}
$receivers = Parser::identifyReceivers($event, $this->subscriptions);
// Temporarily print debug logs by default for Alpha testing.
// if (App::isDevelopment() && !empty($receivers)) {
if (!empty($receivers)) {
Console::log("[Debug][Worker {$workerId}] Receivers: " . count($receivers));
Console::log("[Debug][Worker {$workerId}] Receivers Connection IDs: " . json_encode($receivers));
Console::log("[Debug][Worker {$workerId}] Event: " . $payload);
}
foreach ($receivers as $receiver) {
if ($server->exist($receiver) && $server->isEstablished($receiver)) {
$server->push(
$receiver,
json_encode($event['data']),
SWOOLE_WEBSOCKET_OPCODE_TEXT,
SWOOLE_WEBSOCKET_FLAG_FIN | SWOOLE_WEBSOCKET_FLAG_COMPRESS
);
} else {
$server->close($receiver);
}
}
if (($num = count($receivers)) > 0) {
$this->usage->incr($event['project'], 'messages', $num);
}
}
/**
* This sends the usage to the `console` channel.
*
* @param SwooleServer $server
* @return void
*/
public function tickSendProjectUsage(SwooleServer &$server)
{
if (
array_key_exists('console', $this->subscriptions)
&& array_key_exists('role:member', $this->subscriptions['console'])
&& array_key_exists('project', $this->subscriptions['console']['role:member'])
) {
$payload = [];
$list = $this->consoleDb->getCollection([
'filters' => [
'$collection='.Database::SYSTEM_COLLECTION_CONNECTIONS,
'timestamp>'.(time() - 15)
],
]);
foreach ($list as $document) {
foreach (json_decode($document->getAttribute('value')) as $projectId => $value) {
if (array_key_exists($projectId, $payload)) {
$payload[$projectId] += $value;
} else {
$payload[$projectId] = $value;
}
}
}
foreach ($this->subscriptions['console']['role:member']['project'] as $connection => $value) {
$server->push(
$connection,
json_encode([
'event' => 'stats.connections',
'channels' => ['project'],
'timestamp' => time(),
'payload' => $payload
]),
SWOOLE_WEBSOCKET_OPCODE_TEXT,
SWOOLE_WEBSOCKET_FLAG_FIN | SWOOLE_WEBSOCKET_FLAG_COMPRESS
);
}
}
}
private function addPermission(array $event)
{
$project = $event['project'];
$userId = $event['userId'];
if (array_key_exists($project, $this->subscriptions) && array_key_exists('user:'.$userId, $this->subscriptions[$project])) {
$connection = array_key_first(reset($this->subscriptions[$project]['user:'.$userId]));
} else {
return;
}
$this->projectDb->setNamespace('app_'.$project);
$user = $this->projectDb->getDocument($userId);
Parser::setUser($user);
$roles = Parser::getRoles();
Parser::subscribe($project, $connection, $roles, $this->subscriptions, $this->connections, $this->connections[$connection]['channels']);
}
}
+2 -2
View File
@@ -47,9 +47,9 @@ class Func extends Model
'default' => '',
'example' => 'enabled',
])
->addRule('env', [
->addRule('runtime', [
'type' => self::TYPE_STRING,
'description' => 'Function execution environment.',
'description' => 'Function execution runtime.',
'default' => '',
'example' => 'python-3.8',
])