From ea42b823d8563c2c0638eaedce3694c3b3ac57db Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Wed, 30 Apr 2025 22:47:41 +1200 Subject: [PATCH 01/14] Revert name merge issue --- src/Appwrite/Messaging/Adapter/Realtime.php | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Appwrite/Messaging/Adapter/Realtime.php b/src/Appwrite/Messaging/Adapter/Realtime.php index 18b0d94e7d..be263aa655 100644 --- a/src/Appwrite/Messaging/Adapter/Realtime.php +++ b/src/Appwrite/Messaging/Adapter/Realtime.php @@ -36,12 +36,12 @@ class Realtime extends MessagingAdapter */ public array $subscriptions = []; - private PubSubPool $redis; + private PubSubPool $pubSubPool; public function __construct() { global $register; - $this->redis = new PubSubPool($register->get('pools')->get('pubsub')); + $this->pubSubPool = new PubSubPool($register->get('pools')->get('pubsub')); } /** @@ -148,7 +148,7 @@ class Realtime extends MessagingAdapter $permissionsChanged = array_key_exists('permissionsChanged', $options) && $options['permissionsChanged']; $userId = array_key_exists('userId', $options) ? $options['userId'] : null; - $this->redis->publish('realtime', json_encode([ + $this->pubSubPool->publish('realtime', json_encode([ 'project' => $projectId, 'roles' => $roles, 'permissionsChanged' => $permissionsChanged, From 406630cb61eaa54b314e183a99f44c932199847a Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Wed, 14 May 2025 16:40:47 +1200 Subject: [PATCH 02/14] Update deps --- composer.lock | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/composer.lock b/composer.lock index cf15b7657e..5a3d58a047 100644 --- a/composer.lock +++ b/composer.lock @@ -1109,16 +1109,16 @@ }, { "name": "open-telemetry/api", - "version": "1.2.3", + "version": "1.3.0", "source": { "type": "git", "url": "https://github.com/opentelemetry-php/api.git", - "reference": "199d7ddda88f5f5619fa73463f1a5a7149ccd1f1" + "reference": "4e3bb38e069876fb73c2ce85c89583bf2b28cd86" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/opentelemetry-php/api/zipball/199d7ddda88f5f5619fa73463f1a5a7149ccd1f1", - "reference": "199d7ddda88f5f5619fa73463f1a5a7149ccd1f1", + "url": "https://api.github.com/repos/opentelemetry-php/api/zipball/4e3bb38e069876fb73c2ce85c89583bf2b28cd86", + "reference": "4e3bb38e069876fb73c2ce85c89583bf2b28cd86", "shasum": "" }, "require": { @@ -1175,7 +1175,7 @@ "issues": "https://github.com/open-telemetry/opentelemetry-php/issues", "source": "https://github.com/open-telemetry/opentelemetry-php" }, - "time": "2025-03-05T21:42:54+00:00" + "time": "2025-05-07T12:32:21+00:00" }, { "name": "open-telemetry/context", @@ -1238,16 +1238,16 @@ }, { "name": "open-telemetry/exporter-otlp", - "version": "1.2.1", + "version": "1.3.0", "source": { "type": "git", "url": "https://github.com/opentelemetry-php/exporter-otlp.git", - "reference": "b7580440b7481a98da97aceabeb46e1b276c8747" + "reference": "19adf03d2b0f91f9e9b1c7f93db6c755c737cf6c" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/opentelemetry-php/exporter-otlp/zipball/b7580440b7481a98da97aceabeb46e1b276c8747", - "reference": "b7580440b7481a98da97aceabeb46e1b276c8747", + "url": "https://api.github.com/repos/opentelemetry-php/exporter-otlp/zipball/19adf03d2b0f91f9e9b1c7f93db6c755c737cf6c", + "reference": "19adf03d2b0f91f9e9b1c7f93db6c755c737cf6c", "shasum": "" }, "require": { @@ -1298,7 +1298,7 @@ "issues": "https://github.com/open-telemetry/opentelemetry-php/issues", "source": "https://github.com/open-telemetry/opentelemetry-php" }, - "time": "2025-03-06T23:21:56+00:00" + "time": "2025-05-12T00:36:35+00:00" }, { "name": "open-telemetry/gen-otlp-protobuf", @@ -1365,16 +1365,16 @@ }, { "name": "open-telemetry/sdk", - "version": "1.3.0", + "version": "1.4.0", "source": { "type": "git", "url": "https://github.com/opentelemetry-php/sdk.git", - "reference": "05d9ceb6773b5bddcf485af6d4a6f543bbeb980b" + "reference": "939d3a28395c249a763676458140dad44b3a8011" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/opentelemetry-php/sdk/zipball/05d9ceb6773b5bddcf485af6d4a6f543bbeb980b", - "reference": "05d9ceb6773b5bddcf485af6d4a6f543bbeb980b", + "url": "https://api.github.com/repos/opentelemetry-php/sdk/zipball/939d3a28395c249a763676458140dad44b3a8011", + "reference": "939d3a28395c249a763676458140dad44b3a8011", "shasum": "" }, "require": { @@ -1451,7 +1451,7 @@ "issues": "https://github.com/open-telemetry/opentelemetry-php/issues", "source": "https://github.com/open-telemetry/opentelemetry-php" }, - "time": "2025-05-01T23:20:43+00:00" + "time": "2025-05-07T12:32:21+00:00" }, { "name": "open-telemetry/sem-conv", From 3e914a4f1b698485176bbcd171acbc8de9a0ed64 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Wed, 30 Apr 2025 22:47:41 +1200 Subject: [PATCH 03/14] Revert name merge issue --- src/Appwrite/Messaging/Adapter/Realtime.php | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Appwrite/Messaging/Adapter/Realtime.php b/src/Appwrite/Messaging/Adapter/Realtime.php index 1963bdedd6..96793b2683 100644 --- a/src/Appwrite/Messaging/Adapter/Realtime.php +++ b/src/Appwrite/Messaging/Adapter/Realtime.php @@ -36,12 +36,12 @@ class Realtime extends Adapter */ public array $subscriptions = []; - private Pool $pubsubPool; + private PubSubPool $pubSubPool; public function __construct() { global $register; - $this->pubsubPool = $register->get('pools')->get('pubsub'); + $this->pubSubPool = new PubSubPool($register->get('pools')->get('pubsub')); } /** @@ -147,7 +147,7 @@ class Realtime extends Adapter $permissionsChanged = array_key_exists('permissionsChanged', $options) && $options['permissionsChanged']; $userId = array_key_exists('userId', $options) ? $options['userId'] : null; - $message = [ + $this->pubSubPool->publish('realtime', json_encode([ 'project' => $projectId, 'roles' => $roles, 'permissionsChanged' => $permissionsChanged, From 16b2449787954c2211f431323137d252356c3f3e Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Wed, 14 May 2025 18:14:07 +1200 Subject: [PATCH 04/14] Revert "Merge pull request #9703 from appwrite/revert-9659-feat-pool-adapter" This reverts commit bf9deb09f588b59399b7207fc93311f3ec2b8fe3, reversing changes made to d312fe22ff264558e240565cac4fcb2f6a24a401. # Conflicts: # app/cli.php # app/init/registers.php # composer.lock # src/Appwrite/Messaging/Adapter/Realtime.php # src/Appwrite/Platform/Tasks/ScheduleBase.php # src/Appwrite/Platform/Tasks/ScheduleExecutions.php # src/Appwrite/Platform/Tasks/ScheduleFunctions.php # src/Appwrite/Platform/Tasks/ScheduleMessages.php --- app/cli.php | 42 ++---- app/controllers/api/health.php | 137 +++++++++--------- app/controllers/api/projects.php | 3 +- app/http.php | 19 +-- app/init/registers.php | 4 +- app/init/resources.php | 52 +++---- app/realtime.php | 110 ++++++++------ app/worker.php | 69 +++------ composer.json | 2 +- composer.lock | 31 ++-- src/Appwrite/Event/Event.php | 7 - src/Appwrite/Messaging/Adapter/Realtime.php | 25 ++-- src/Appwrite/Platform/Tasks/Doctor.php | 50 ++++--- src/Appwrite/Platform/Tasks/ScheduleBase.php | 6 +- .../Platform/Tasks/ScheduleExecutions.php | 12 +- .../Platform/Tasks/ScheduleFunctions.php | 13 +- .../Platform/Tasks/ScheduleMessages.php | 7 +- src/Appwrite/Platform/Workers/Deletes.php | 5 +- src/Appwrite/Platform/Workers/StatsUsage.php | 12 +- .../Platform/Workers/StatsUsageDump.php | 15 +- src/Appwrite/PubSub/Adapter/Pool.php | 46 ++++++ tests/resources/docker/docker-compose.yml | 32 +--- 22 files changed, 317 insertions(+), 382 deletions(-) create mode 100644 src/Appwrite/PubSub/Adapter/Pool.php diff --git a/app/cli.php b/app/cli.php index 9ade97e90c..61d8a90100 100644 --- a/app/cli.php +++ b/app/cli.php @@ -10,12 +10,13 @@ use Appwrite\Event\StatsUsage; use Appwrite\Platform\Appwrite; use Appwrite\Runtimes\Runtimes; use Executor\Executor; -use Swoole\Timer; +use Utopia\Cache\Adapter\Pool as CachePool; use Utopia\Cache\Adapter\Sharding; use Utopia\Cache\Cache; use Utopia\CLI\CLI; use Utopia\CLI\Console; use Utopia\Config\Config; +use Utopia\Database\Adapter\Pool as DatabasePool; use Utopia\Database\Database; use Utopia\Database\Document; use Utopia\Database\Validator\Authorization; @@ -23,11 +24,12 @@ use Utopia\DSN\DSN; use Utopia\Logger\Log; use Utopia\Platform\Service; use Utopia\Pools\Group; +use Utopia\Queue\Broker\Pool as BrokerPool; use Utopia\Queue\Publisher; use Utopia\Registry\Registry; use Utopia\System\System; use Utopia\Telemetry\Adapter\None as NoTelemetry; - +use Swoole\Timer; use function Swoole\Coroutine\run; // Overwriting runtimes to be architecture agnostic for CLI @@ -45,10 +47,7 @@ CLI::setResource('cache', function ($pools) { $adapters = []; foreach ($list as $value) { - $adapters[] = $pools - ->get($value) - ->pop() - ->getResource(); + $adapters[] = new CachePool($pools->get($value)); } return new Cache(new Sharding($adapters)); @@ -68,12 +67,8 @@ CLI::setResource('dbForPlatform', function ($pools, $cache) { $attempts++; try { // Prepare database connection - $dbAdapter = $pools - ->get('console') - ->pop() - ->getResource(); - - $dbForPlatform = new Database($dbAdapter, $cache); + $adapter = new DatabasePool($pools->get('console')); + $dbForPlatform = new Database($adapter, $cache); $dbForPlatform ->setNamespace('_console') @@ -91,7 +86,6 @@ CLI::setResource('dbForPlatform', function ($pools, $cache) { $ready = true; } catch (\Throwable $err) { Console::warning($err->getMessage()); - $pools->get('console')->reclaim(); sleep($sleep); } } while ($attempts < $maxAttempts && !$ready); @@ -141,12 +135,8 @@ CLI::setResource('getProjectDB', function (Group $pools, Database $dbForPlatform return $database; } - $dbAdapter = $pools - ->get($dsn->getHost()) - ->pop() - ->getResource(); - - $database = new Database($dbAdapter, $cache); + $adapter = new DatabasePool($pools->get($dsn->getHost())); + $database = new Database($adapter, $cache); $databases[$dsn->getHost()] = $database; $sharedTables = \explode(',', System::getEnv('_APP_DATABASE_SHARED_TABLES', '')); @@ -172,21 +162,15 @@ CLI::setResource('getProjectDB', function (Group $pools, Database $dbForPlatform CLI::setResource('getLogsDB', function (Group $pools, Cache $cache) { $database = null; + return function (?Document $project = null) use ($pools, $cache, $database) { if ($database !== null && $project !== null && !$project->isEmpty() && $project->getId() !== 'console') { $database->setTenant($project->getInternalId()); return $database; } - $dbAdapter = $pools - ->get('logs') - ->pop() - ->getResource(); - - $database = new Database( - $dbAdapter, - $cache - ); + $adapter = new DatabasePool($pools->get('logs')); + $database = new Database($adapter, $cache); $database ->setSharedTables(true) @@ -210,7 +194,7 @@ CLI::setResource('queueForStatsResources', function (Publisher $publisher) { return new StatsResources($publisher); }, ['publisher']); CLI::setResource('publisher', function (Group $pools) { - return $pools->get('publisher')->pop()->getResource(); + return new BrokerPool(publisher: $pools->get('publisher')); }, ['pools']); CLI::setResource('queueForFunctions', function (Publisher $publisher) { return new Func($publisher); diff --git a/app/controllers/api/health.php b/app/controllers/api/health.php index 3602ab8ff9..5fe2de5549 100644 --- a/app/controllers/api/health.php +++ b/app/controllers/api/health.php @@ -3,13 +3,16 @@ use Appwrite\ClamAV\Network; use Appwrite\Event\Event; use Appwrite\Extend\Exception; +use Appwrite\PubSub\Adapter\Pool as PubSubPool; use Appwrite\SDK\AuthType; use Appwrite\SDK\ContentType; use Appwrite\SDK\Method; use Appwrite\SDK\Response as SDKResponse; use Appwrite\Utopia\Response; use Utopia\App; +use Utopia\Cache\Adapter\Pool as CachePool; use Utopia\Config\Config; +use Utopia\Database\Adapter\Pool as DatabasePool; use Utopia\Database\Document; use Utopia\Domains\Validator\PublicDomain; use Utopia\Pools\Group; @@ -34,8 +37,8 @@ App::get('/v1/health') namespace: 'health', group: 'health', name: 'get', - auth: [AuthType::KEY], description: '/docs/references/health/get.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -70,11 +73,11 @@ App::get('/v1/health/db') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'health', name: 'getDB', description: '/docs/references/health/get-db.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -86,8 +89,8 @@ App::get('/v1/health/db') ->inject('response') ->inject('pools') ->action(function (Response $response, Group $pools) { - $output = []; + $failures = []; $configs = [ 'Console.DB' => Config::getParam('pools-console'), @@ -97,7 +100,7 @@ App::get('/v1/health/db') foreach ($configs as $key => $config) { foreach ($config as $database) { try { - $adapter = $pools->get($database)->pop()->getResource(); + $adapter = new DatabasePool($pools->get($database)); $checkStart = \microtime(true); @@ -108,16 +111,16 @@ App::get('/v1/health/db') 'ping' => \round((\microtime(true) - $checkStart) / 1000) ]); } else { - $failure[] = $database; + $failures[] = $database; } - } catch (\Throwable $th) { - $failure[] = $database; + } catch (\Throwable) { + $failures[] = $database; } } } - if (!empty($failure)) { - throw new Exception(Exception::GENERAL_SERVER_ERROR, 'DB failure on: ' . implode(", ", $failure)); + if (!empty($failures)) { + throw new Exception(Exception::GENERAL_SERVER_ERROR, 'DB failure on: ' . implode(", ", $failures)); } $response->dynamic(new Document([ @@ -131,11 +134,11 @@ App::get('/v1/health/cache') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'health', name: 'getCache', description: '/docs/references/health/get-cache.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -147,44 +150,39 @@ App::get('/v1/health/cache') ->inject('response') ->inject('pools') ->action(function (Response $response, Group $pools) { - $output = []; + $failures = []; $configs = [ 'Cache' => Config::getParam('pools-cache'), ]; foreach ($configs as $key => $config) { - foreach ($config as $database) { + foreach ($config as $cache) { try { - /** @var \Utopia\Cache\Adapter $adapter */ - $adapter = $pools->get($database)->pop()->getResource(); + $adapter = new CachePool($pools->get($cache)); $checkStart = \microtime(true); if ($adapter->ping()) { $output[] = new Document([ - 'name' => $key . " ($database)", + 'name' => $key . " ($cache)", 'status' => 'pass', 'ping' => \round((\microtime(true) - $checkStart) / 1000) ]); } else { - $output[] = new Document([ - 'name' => $key . " ($database)", - 'status' => 'fail', - 'ping' => \round((\microtime(true) - $checkStart) / 1000) - ]); + $failures[] = $cache; } - } catch (\Throwable $th) { - $output[] = new Document([ - 'name' => $key . " ($database)", - 'status' => 'fail', - 'ping' => \round((\microtime(true) - $checkStart) / 1000) - ]); + } catch (\Throwable) { + $failures[] = $cache; } } } + if (!empty($failures)) { + throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Cache failure on: ' . implode(", ", $failures)); + } + $response->dynamic(new Document([ 'statuses' => $output, 'total' => count($output), @@ -196,11 +194,11 @@ App::get('/v1/health/pubsub') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'health', name: 'getPubSub', description: '/docs/references/health/get-pubsub.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -212,44 +210,39 @@ App::get('/v1/health/pubsub') ->inject('response') ->inject('pools') ->action(function (Response $response, Group $pools) { - $output = []; + $failures = []; $configs = [ 'PubSub' => Config::getParam('pools-pubsub'), ]; foreach ($configs as $key => $config) { - foreach ($config as $database) { + foreach ($config as $pubsub) { try { - /** @var \Appwrite\PubSub\Adapter $adapter */ - $adapter = $pools->get($database)->pop()->getResource(); + $adapter = new PubSubPool($pools->get($pubsub)); $checkStart = \microtime(true); if ($adapter->ping()) { $output[] = new Document([ - 'name' => $key . " ($database)", + 'name' => $key . " ($pubsub)", 'status' => 'pass', 'ping' => \round((\microtime(true) - $checkStart) / 1000) ]); } else { - $output[] = new Document([ - 'name' => $key . " ($database)", - 'status' => 'fail', - 'ping' => \round((\microtime(true) - $checkStart) / 1000) - ]); + $failures[] = $pubsub; } - } catch (\Throwable $th) { - $output[] = new Document([ - 'name' => $key . " ($database)", - 'status' => 'fail', - 'ping' => \round((\microtime(true) - $checkStart) / 1000) - ]); + } catch (\Throwable) { + $failures[] = $pubsub; } } } + if (!empty($failures)) { + throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Pubsub failure on: ' . implode(", ", $failures)); + } + $response->dynamic(new Document([ 'statuses' => $output, 'total' => count($output), @@ -261,11 +254,11 @@ App::get('/v1/health/time') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'health', name: 'getTime', description: '/docs/references/health/get-time.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -325,11 +318,11 @@ App::get('/v1/health/queue/webhooks') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'queue', name: 'getQueueWebhooks', description: '/docs/references/health/get-queue-webhooks.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -351,18 +344,18 @@ App::get('/v1/health/queue/webhooks') } $response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE); - }, ['response']); + }); App::get('/v1/health/queue/logs') ->desc('Get logs queue') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'queue', name: 'getQueueLogs', description: '/docs/references/health/get-queue-logs.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -384,18 +377,18 @@ App::get('/v1/health/queue/logs') } $response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE); - }, ['response']); + }); App::get('/v1/health/certificate') ->desc('Get the SSL certificate for a domain') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'health', name: 'getCertificate', description: '/docs/references/health/get-certificate.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -441,18 +434,18 @@ App::get('/v1/health/certificate') 'validTo' => $certificatePayload['validTo_time_t'], 'signatureTypeSN' => $certificatePayload['signatureTypeSN'], ]), Response::MODEL_HEALTH_CERTIFICATE); - }, ['response']); + }); App::get('/v1/health/queue/certificates') ->desc('Get certificates queue') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'queue', name: 'getQueueCertificates', description: '/docs/references/health/get-queue-certificates.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -474,18 +467,18 @@ App::get('/v1/health/queue/certificates') } $response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE); - }, ['response']); + }); App::get('/v1/health/queue/builds') ->desc('Get builds queue') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'queue', name: 'getQueueBuilds', description: '/docs/references/health/get-queue-builds.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -507,18 +500,18 @@ App::get('/v1/health/queue/builds') } $response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE); - }, ['response']); + }); App::get('/v1/health/queue/databases') ->desc('Get databases queue') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'queue', name: 'getQueueDatabases', description: '/docs/references/health/get-queue-databases.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -541,18 +534,18 @@ App::get('/v1/health/queue/databases') } $response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE); - }, ['response']); + }); App::get('/v1/health/queue/deletes') ->desc('Get deletes queue') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'queue', name: 'getQueueDeletes', description: '/docs/references/health/get-queue-deletes.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -574,18 +567,18 @@ App::get('/v1/health/queue/deletes') } $response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE); - }, ['response']); + }); App::get('/v1/health/queue/mails') ->desc('Get mails queue') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'queue', name: 'getQueueMails', description: '/docs/references/health/get-queue-mails.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -607,18 +600,18 @@ App::get('/v1/health/queue/mails') } $response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE); - }, ['response']); + }); App::get('/v1/health/queue/messaging') ->desc('Get messaging queue') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'queue', name: 'getQueueMessaging', description: '/docs/references/health/get-queue-messaging.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -640,18 +633,18 @@ App::get('/v1/health/queue/messaging') } $response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE); - }, ['response']); + }); App::get('/v1/health/queue/migrations') ->desc('Get migrations queue') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'queue', name: 'getQueueMigrations', description: '/docs/references/health/get-queue-migrations.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -673,18 +666,18 @@ App::get('/v1/health/queue/migrations') } $response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE); - }, ['response']); + }); App::get('/v1/health/queue/functions') ->desc('Get functions queue') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'queue', name: 'getQueueFunctions', description: '/docs/references/health/get-queue-functions.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -706,18 +699,18 @@ App::get('/v1/health/queue/functions') } $response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE); - }, ['response']); + }); App::get('/v1/health/queue/stats-resources') ->desc('Get stats resources queue') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'queue', name: 'getQueueStatsResources', description: '/docs/references/health/get-queue-stats-resources.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -746,11 +739,11 @@ App::get('/v1/health/queue/stats-usage') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'queue', name: 'getQueueUsage', description: '/docs/references/health/get-queue-stats-usage.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -779,11 +772,11 @@ App::get('/v1/health/storage/local') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'storage', name: 'getStorageLocal', description: '/docs/references/health/get-storage-local.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -829,11 +822,11 @@ App::get('/v1/health/storage') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'storage', name: 'getStorage', description: '/docs/references/health/get-storage.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -877,11 +870,11 @@ App::get('/v1/health/anti-virus') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'health', name: 'getAntivirus', description: '/docs/references/health/get-storage-anti-virus.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -923,11 +916,11 @@ App::get('/v1/health/queue/failed/:name') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'queue', name: 'getFailedJobs', description: '/docs/references/health/get-failed-queue-jobs.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, diff --git a/app/controllers/api/projects.php b/app/controllers/api/projects.php index 839a51a764..c4f0b6a9df 100644 --- a/app/controllers/api/projects.php +++ b/app/controllers/api/projects.php @@ -24,6 +24,7 @@ use Utopia\App; use Utopia\Audit\Audit; use Utopia\Cache\Cache; use Utopia\Config\Config; +use Utopia\Database\Adapter\Pool as DatabasePool; use Utopia\Database\Database; use Utopia\Database\DateTime; use Utopia\Database\Document; @@ -223,7 +224,7 @@ App::post('/v1/projects') $sharedTables = $sharedTablesV1 || $sharedTablesV2; if (!$sharedTablesV2) { - $adapter = $pools->get($dsn->getHost())->pop()->getResource(); + $adapter = new DatabasePool($pools->get($dsn->getHost())); $dbForProject = new Database($adapter, $cache); if ($sharedTables) { diff --git a/app/http.php b/app/http.php index 963f550b8b..4d78837a8a 100644 --- a/app/http.php +++ b/app/http.php @@ -14,9 +14,11 @@ use Utopia\App; use Utopia\Audit\Audit; use Utopia\CLI\Console; use Utopia\Config\Config; +use Utopia\Database\Adapter\Pool as DatabasePool; use Utopia\Database\Database; use Utopia\Database\DateTime; use Utopia\Database\Document; +use Utopia\Database\Exception\Duplicate as DuplicateException; use Utopia\Database\Helpers\ID; use Utopia\Database\Helpers\Permission; use Utopia\Database\Helpers\Role; @@ -167,7 +169,7 @@ function createDatabase(App $app, string $resourceKey, string $dbName, array $co $sleep = 1; $attempts = 0; - do { + while (true) { try { $attempts++; $resource = $app->getResource($resourceKey); @@ -176,13 +178,12 @@ function createDatabase(App $app, string $resourceKey, string $dbName, array $co break; // exit loop on success } catch (\Exception $e) { Console::warning(" └── Database not ready. Retrying connection ({$attempts})..."); - $pools->reclaim(); if ($attempts >= $max) { throw new \Exception(' └── Failed to connect to database: ' . $e->getMessage()); } sleep($sleep); } - } while ($attempts < $max); + } Console::success("[Setup] - $dbName database init started..."); @@ -318,11 +319,7 @@ $http->on(Constant::EVENT_START, function (Server $http) use ($payloadSize, $reg $cache = $app->getResource('cache'); foreach ($sharedTablesV2 as $hostname) { - $adapter = $pools - ->get($hostname) - ->pop() - ->getResource(); - + $adapter = new DatabasePool($pools->get($hostname)); $dbForProject = (new Database($adapter, $cache)) ->setDatabase('appwrite') ->setSharedTables(true) @@ -332,7 +329,7 @@ $http->on(Constant::EVENT_START, function (Server $http) use ($payloadSize, $reg try { Console::success('[Setup] - Creating project database: ' . $hostname . '...'); $dbForProject->create(); - } catch (Duplicate) { + } catch (DuplicateException) { Console::success('[Setup] - Skip: metadata table already exists'); } @@ -358,7 +355,6 @@ $http->on(Constant::EVENT_START, function (Server $http) use ($payloadSize, $reg } } - $pools->reclaim(); Console::success('[Setup] - Server database init completed...'); }); @@ -473,6 +469,7 @@ $http->on(Constant::EVENT_REQUEST, function (SwooleRequest $swooleRequest, Swool Console::error('[Error] Message: ' . $th->getMessage()); Console::error('[Error] File: ' . $th->getFile()); Console::error('[Error] Line: ' . $th->getLine()); + Console::error('[Error] Trace: ' . $th->getTraceAsString()); $swooleResponse->setStatusCode(500); @@ -490,8 +487,6 @@ $http->on(Constant::EVENT_REQUEST, function (SwooleRequest $swooleRequest, Swool ]; $swooleResponse->end(\json_encode($output)); - } finally { - $pools->reclaim(); } }); diff --git a/app/init/registers.php b/app/init/registers.php index 1adaaf35ce..415730f936 100644 --- a/app/init/registers.php +++ b/app/init/registers.php @@ -216,13 +216,13 @@ $register->set('pools', function () { 'mysql', 'mariadb' => function () use ($dsnHost, $dsnPort, $dsnUser, $dsnPass, $dsnDatabase) { return new PDOProxy(function () use ($dsnHost, $dsnPort, $dsnUser, $dsnPass, $dsnDatabase) { - return new PDO("mysql:host={$dsnHost};port={$dsnPort};dbname={$dsnDatabase};charset=utf8mb4", $dsnUser, $dsnPass, array( + return new PDO("mysql:host={$dsnHost};port={$dsnPort};dbname={$dsnDatabase};charset=utf8mb4", $dsnUser, $dsnPass, [ \PDO::ATTR_TIMEOUT => 3, // Seconds \PDO::ATTR_PERSISTENT => false, \PDO::ATTR_DEFAULT_FETCH_MODE => \PDO::FETCH_ASSOC, \PDO::ATTR_EMULATE_PREPARES => true, \PDO::ATTR_STRINGIFY_FETCHES => true - )); + ]); }); }, 'redis' => function () use ($dsnHost, $dsnPort, $dsnPass) { diff --git a/app/init/resources.php b/app/init/resources.php index c719a47344..f48efbe177 100644 --- a/app/init/resources.php +++ b/app/init/resources.php @@ -24,10 +24,12 @@ use Appwrite\Utopia\Request; use Executor\Executor; use Utopia\Abuse\Adapters\TimeLimit\Redis as TimeLimitRedis; use Utopia\App; +use Utopia\Cache\Adapter\Pool as CachePool; use Utopia\Cache\Adapter\Sharding; use Utopia\Cache\Cache; use Utopia\CLI\Console; use Utopia\Config\Config; +use Utopia\Database\Adapter\Pool as DatabasePool; use Utopia\Database\Database; use Utopia\Database\Document; use Utopia\Database\Helpers\ID; @@ -37,6 +39,7 @@ use Utopia\DSN\DSN; use Utopia\Locale\Locale; use Utopia\Logger\Log; use Utopia\Pools\Group; +use Utopia\Queue\Broker\Pool as BrokerPool; use Utopia\Queue\Publisher; use Utopia\Storage\Device; use Utopia\Storage\Device\AWS; @@ -72,10 +75,10 @@ App::setResource('localeCodes', function () { // Queues App::setResource('publisher', function (Group $pools) { - return $pools->get('publisher')->pop()->getResource(); + return new BrokerPool(publisher: $pools->get('publisher')); }, ['pools']); App::setResource('consumer', function (Group $pools) { - return $pools->get('consumer')->pop()->getResource(); + return new BrokerPool(consumer: $pools->get('consumer')); }, ['pools']); App::setResource('queueForMessaging', function (Publisher $publisher) { return new Messaging($publisher); @@ -329,12 +332,8 @@ App::setResource('dbForProject', function (Group $pools, Database $dbForPlatform $dsn = new DSN('mysql://' . $project->getAttribute('database')); } - $dbAdapter = $pools - ->get($dsn->getHost()) - ->pop() - ->getResource(); - - $database = new Database($dbAdapter, $cache); + $adapter = new DatabasePool($pools->get($dsn->getHost())); + $database = new Database($adapter, $cache); $database ->setMetadata('host', \gethostname()) @@ -360,12 +359,8 @@ App::setResource('dbForProject', function (Group $pools, Database $dbForPlatform }, ['pools', 'dbForPlatform', 'cache', 'project']); App::setResource('dbForPlatform', function (Group $pools, Cache $cache) { - $dbAdapter = $pools - ->get('console') - ->pop() - ->getResource(); - - $database = new Database($dbAdapter, $cache); + $adapter = new DatabasePool($pools->get('console')); + $database = new Database($adapter, $cache); $database ->setNamespace('_console') @@ -378,7 +373,7 @@ App::setResource('dbForPlatform', function (Group $pools, Cache $cache) { }, ['pools', 'cache']); App::setResource('getProjectDB', function (Group $pools, Database $dbForPlatform, $cache) { - $databases = []; // TODO: @Meldiron This should probably be responsibility of utopia-php/pools + $databases = []; return function (Document $project) use ($pools, $dbForPlatform, $cache, &$databases) { if ($project->isEmpty() || $project->getId() === 'console') { @@ -420,12 +415,8 @@ App::setResource('getProjectDB', function (Group $pools, Database $dbForPlatform return $database; } - $dbAdapter = $pools - ->get($dsn->getHost()) - ->pop() - ->getResource(); - - $database = new Database($dbAdapter, $cache); + $adapter = new DatabasePool($pools->get($dsn->getHost())); + $database = new Database($adapter, $cache); $databases[$dsn->getHost()] = $database; $configure($database); @@ -435,21 +426,15 @@ App::setResource('getProjectDB', function (Group $pools, Database $dbForPlatform App::setResource('getLogsDB', function (Group $pools, Cache $cache) { $database = null; - return function (?Document $project = null) use ($pools, $cache, $database) { + + return function (?Document $project = null) use ($pools, $cache, &$database) { if ($database !== null && $project !== null && !$project->isEmpty() && $project->getId() !== 'console') { $database->setTenant($project->getInternalId()); return $database; } - $dbAdapter = $pools - ->get('logs') - ->pop() - ->getResource(); - - $database = new Database( - $dbAdapter, - $cache - ); + $adapter = new DatabasePool($pools->get('logs')); + $database = new Database($adapter, $cache); $database ->setSharedTables(true) @@ -473,10 +458,7 @@ App::setResource('cache', function (Group $pools, Telemetry $telemetry) { $adapters = []; foreach ($list as $value) { - $adapters[] = $pools - ->get($value) - ->pop() - ->getResource(); + $adapters[] = new CachePool($pools->get($value)); } $cache = new Cache(new Sharding($adapters)); diff --git a/app/realtime.php b/app/realtime.php index 86f9c85fdd..7e6fc0e311 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -5,6 +5,7 @@ use Appwrite\Extend\Exception; use Appwrite\Extend\Exception as AppwriteException; use Appwrite\Messaging\Adapter\Realtime; use Appwrite\Network\Validator\Origin; +use Appwrite\PubSub\Adapter\Pool as PubSubPool; use Appwrite\Utopia\Request; use Appwrite\Utopia\Response; use Swoole\Http\Request as SwooleRequest; @@ -15,10 +16,12 @@ use Swoole\Timer; use Utopia\Abuse\Abuse; use Utopia\Abuse\Adapters\TimeLimit\Redis as TimeLimitRedis; use Utopia\App; +use Utopia\Cache\Adapter\Pool as CachePool; use Utopia\Cache\Adapter\Sharding; use Utopia\Cache\Cache; use Utopia\CLI\Console; use Utopia\Config\Config; +use Utopia\Database\Adapter\Pool as DatabasePool; use Utopia\Database\Database; use Utopia\Database\DateTime; use Utopia\Database\Document; @@ -28,13 +31,15 @@ use Utopia\Database\Query; use Utopia\Database\Validator\Authorization; use Utopia\DSN\DSN; use Utopia\Logger\Log; +use Utopia\Pools\Group; +use Utopia\Registry\Registry; use Utopia\System\System; use Utopia\Telemetry\Adapter\None as NoTelemetry; use Utopia\WebSocket\Adapter; use Utopia\WebSocket\Server; /** - * @var \Utopia\Registry\Registry $register + * @var Registry $register */ require_once __DIR__ . '/init.php'; @@ -46,17 +51,17 @@ if (!function_exists('getConsoleDB')) { { global $register; - /** @var \Utopia\Pools\Group $pools */ + static $database = null; + + if ($database !== null) { + return $database; + } + + /** @var Group $pools */ $pools = $register->get('pools'); - $dbAdapter = $pools - ->get('console') - ->pop() - ->getResource() - ; - - $database = new Database($dbAdapter, getCache()); - + $adapter = new DatabasePool($pools->get('console')); + $database = new Database($adapter, getCache()); $database ->setNamespace('_console') ->setMetadata('host', \gethostname()) @@ -72,7 +77,13 @@ if (!function_exists('getProjectDB')) { { global $register; - /** @var \Utopia\Pools\Group $pools */ + static $databases = []; + + if (isset($databases[$project->getInternalId()])) { + return $databases[$project->getInternalId()]; + } + + /** @var Group $pools */ $pools = $register->get('pools'); if ($project->isEmpty() || $project->getId() === 'console') { @@ -86,11 +97,7 @@ if (!function_exists('getProjectDB')) { $dsn = new DSN('mysql://' . $project->getAttribute('database')); } - $adapter = $pools - ->get($dsn->getHost()) - ->pop() - ->getResource(); - + $adapter = new DatabasePool($pools->get($dsn->getHost())); $database = new Database($adapter, getCache()); $sharedTables = \explode(',', System::getEnv('_APP_DATABASE_SHARED_TABLES', '')); @@ -111,7 +118,7 @@ if (!function_exists('getProjectDB')) { ->setMetadata('host', \gethostname()) ->setMetadata('project', $project->getId()); - return $database; + return $databases[$project->getInternalId()] = $database; } } @@ -121,20 +128,22 @@ if (!function_exists('getCache')) { { global $register; - $pools = $register->get('pools'); /** @var \Utopia\Pools\Group $pools */ + static $cache = null; + + if ($cache !== null) { + return $cache; + } + + $pools = $register->get('pools'); /** @var Group $pools */ $list = Config::getParam('pools-cache', []); $adapters = []; foreach ($list as $value) { - $adapters[] = $pools - ->get($value) - ->pop() - ->getResource() - ; + $adapters[] = new CachePool($pools->get($value)); } - return new Cache(new Sharding($adapters)); + return $cache = new Cache(new Sharding($adapters)); } } @@ -142,6 +151,12 @@ if (!function_exists('getCache')) { if (!function_exists('getRedis')) { function getRedis(): \Redis { + static $redis = null; + + if ($redis !== null) { + return $redis; + } + $host = System::getEnv('_APP_REDIS_HOST', 'localhost'); $port = System::getEnv('_APP_REDIS_PORT', 6379); $pass = System::getEnv('_APP_REDIS_PASS', ''); @@ -160,21 +175,39 @@ if (!function_exists('getRedis')) { if (!function_exists('getTimelimit')) { function getTimelimit(): TimeLimitRedis { - return new TimeLimitRedis("", 0, 1, getRedis()); + static $timelimit = null; + + if ($timelimit !== null) { + return $timelimit; + } + + return $timelimit = new TimeLimitRedis("", 0, 1, getRedis()); } } if (!function_exists('getRealtime')) { function getRealtime(): Realtime { - return new Realtime(); + static $realtime = null; + + if ($realtime !== null) { + return $realtime; + } + + return $realtime = new Realtime(); } } if (!function_exists('getTelemetry')) { function getTelemetry(int $workerId): Utopia\Telemetry\Adapter { - return new NoTelemetry(); + static $telemetry = null; + + if ($telemetry !== null) { + return $telemetry; + } + + return $telemetry = new NoTelemetry(); } } @@ -273,7 +306,6 @@ $server->onStart(function () use ($stats, $register, $containerId, &$statsDocume sleep(DATABASE_RECONNECT_SLEEP); } } while (true); - $register->get('pools')->reclaim(); }); /** @@ -299,9 +331,7 @@ $server->onStart(function () use ($stats, $register, $containerId, &$statsDocume Authorization::skip(fn () => $database->updateDocument('realtime', $statsDocument->getId(), $statsDocument)); } catch (Throwable $th) { - call_user_func($logError, $th, "updateWorkerDocument"); - } finally { - $register->get('pools')->reclaim(); + $logError($th, "updateWorkerDocument"); } }); } @@ -370,8 +400,6 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, 'data' => $event['data'] ])); } - - $register->get('pools')->reclaim(); } } /** @@ -407,8 +435,8 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, } $start = time(); - /** @var \Appwrite\PubSub\Adapter $pubsub */ - $pubsub = $register->get('pools')->get('pubsub')->pop()->getResource(); + $pubsub = new PubSubPool($register->get('pools')->get('pubsub')); + if ($pubsub->ping(true)) { $attempts = 0; Console::success('Pub/sub connection established (worker: ' . $workerId . ')'); @@ -436,8 +464,6 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $realtime->unsubscribe($connection); $realtime->subscribe($projectId, $connection, $roles, $channels); - - $register->get('pools')->reclaim(); } } @@ -463,14 +489,12 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, } }); } catch (Throwable $th) { - call_user_func($logError, $th, "pubSubConnection"); + $logError($th, "pubSubConnection"); Console::error('Pub/sub error: ' . $th->getMessage()); $attempts++; sleep(DATABASE_RECONNECT_SLEEP); continue; - } finally { - $register->get('pools')->reclaim(); } } @@ -572,7 +596,7 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server, $stats->incr($project->getId(), 'connections'); $stats->incr($project->getId(), 'connectionsTotal'); } catch (Throwable $th) { - call_user_func($logError, $th, "initServer"); + $logError($th, "initServer"); // Handle SQL error code is 'HY000' $code = $th->getCode(); @@ -596,8 +620,6 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server, Console::error('[Error] Code: ' . $response['data']['code']); Console::error('[Error] Message: ' . $response['data']['message']); } - } finally { - $register->get('pools')->reclaim(); } }); @@ -696,8 +718,6 @@ $server->onMessage(function (int $connection, string $message) use ($server, $re if ($th->getCode() === 1008) { $server->close($connection, $th->getCode()); } - } finally { - $register->get('pools')->reclaim(); } }); diff --git a/app/worker.php b/app/worker.php index 232e0b3684..1ae2108a62 100644 --- a/app/worker.php +++ b/app/worker.php @@ -20,10 +20,12 @@ use Appwrite\Platform\Appwrite; use Executor\Executor; use Swoole\Runtime; use Utopia\Abuse\Adapters\TimeLimit\Redis as TimeLimitRedis; +use Utopia\Cache\Adapter\Pool as CachePool; use Utopia\Cache\Adapter\Sharding; use Utopia\Cache\Cache; use Utopia\CLI\Console; use Utopia\Config\Config; +use Utopia\Database\Adapter\Pool as DatabasePool; use Utopia\Database\Database; use Utopia\Database\DateTime; use Utopia\Database\Document; @@ -33,6 +35,7 @@ use Utopia\Logger\Log; use Utopia\Logger\Logger; use Utopia\Platform\Service; use Utopia\Pools\Group; +use Utopia\Queue\Broker\Pool as BrokerPool; use Utopia\Queue\Message; use Utopia\Queue\Publisher; use Utopia\Queue\Server; @@ -40,21 +43,17 @@ use Utopia\Registry\Registry; use Utopia\System\System; Authorization::disable(); -Runtime::enableCoroutine(SWOOLE_HOOK_ALL); +Runtime::enableCoroutine(); Server::setResource('register', fn () => $register); Server::setResource('dbForPlatform', function (Cache $cache, Registry $register) { $pools = $register->get('pools'); - $database = $pools - ->get('console') - ->pop() - ->getResource(); + $adapter = new DatabasePool($pools->get('console')); + $dbForPlatform = new Database($adapter, $cache); + $dbForPlatform->setNamespace('_console'); - $adapter = new Database($database, $cache); - $adapter->setNamespace('_console'); - - return $adapter; + return $dbForPlatform; }, ['cache', 'register']); Server::setResource('project', function (Message $message, Database $dbForPlatform) { @@ -82,20 +81,9 @@ Server::setResource('dbForProject', function (Cache $cache, Registry $register, $dsn = new DSN('mysql://' . $project->getAttribute('database')); } - $adapter = $pools - ->get($dsn->getHost()) - ->pop() - ->getResource(); - + $adapter = new DatabasePool($pools->get($dsn->getHost())); $database = new Database($adapter, $cache); - try { - $dsn = new DSN($project->getAttribute('database')); - } catch (\InvalidArgumentException) { - // TODO: Temporary until all projects are using shared tables - $dsn = new DSN('mysql://' . $project->getAttribute('database')); - } - $sharedTables = \explode(',', System::getEnv('_APP_DATABASE_SHARED_TABLES', '')); if (\in_array($dsn->getHost(), $sharedTables)) { @@ -150,12 +138,8 @@ Server::setResource('getProjectDB', function (Group $pools, Database $dbForPlatf return $database; } - $dbAdapter = $pools - ->get($dsn->getHost()) - ->pop() - ->getResource(); - - $database = new Database($dbAdapter, $cache); + $adapter = new DatabasePool($pools->get($dsn->getHost())); + $database = new Database($adapter, $cache); $databases[$dsn->getHost()] = $database; @@ -187,15 +171,8 @@ Server::setResource('getLogsDB', function (Group $pools, Cache $cache) { return $database; } - $dbAdapter = $pools - ->get('logs') - ->pop() - ->getResource(); - - $database = new Database( - $dbAdapter, - $cache - ); + $adapter = new DatabasePool($pools->get('logs')); + $database = new Database($adapter, $cache); $database ->setSharedTables(true) @@ -233,11 +210,7 @@ Server::setResource('cache', function (Registry $register) { $adapters = []; foreach ($list as $value) { - $adapters[] = $pools - ->get($value) - ->pop() - ->getResource() - ; + $adapters[] = new CachePool($pools->get($value)); } return new Cache(new Sharding($adapters)); @@ -267,11 +240,11 @@ Server::setResource('timelimit', function (\Redis $redis) { Server::setResource('log', fn () => new Log()); Server::setResource('publisher', function (Group $pools) { - return $pools->get('publisher')->pop()->getResource(); + return new BrokerPool(publisher: $pools->get('publisher')); }, ['pools']); Server::setResource('consumer', function (Group $pools) { - return $pools->get('consumer')->pop()->getResource(); + return new BrokerPool(consumer: $pools->get('consumer')); }, ['pools']); Server::setResource('queueForStatsUsage', function (Publisher $publisher) { @@ -448,13 +421,6 @@ try { $worker = $platform->getWorker(); -$worker - ->shutdown() - ->inject('pools') - ->action(function (Group $pools) { - $pools->reclaim(); - }); - $worker ->error() ->inject('error') @@ -462,8 +428,7 @@ $worker ->inject('log') ->inject('pools') ->inject('project') - ->action(function (Throwable $error, ?Logger $logger, Log $log, Group $pools, Document $project) use ($queueName) { - $pools->reclaim(); + ->action(function (Throwable $error, ?Logger $logger, Log $log, Group $pools, Document $project) use ($worker, $queueName) { $version = System::getEnv('_APP_VERSION', 'UNKNOWN'); if ($logger) { diff --git a/composer.json b/composer.json index 9b2cf7a1ab..914e664d7f 100644 --- a/composer.json +++ b/composer.json @@ -65,7 +65,7 @@ "utopia-php/platform": "0.7.*", "utopia-php/pools": "0.8.*", "utopia-php/preloader": "0.2.*", - "utopia-php/queue": "0.9.*", + "utopia-php/queue": "0.10.*", "utopia-php/registry": "0.5.*", "utopia-php/storage": "0.18.*", "utopia-php/swoole": "0.8.*", diff --git a/composer.lock b/composer.lock index e6bcd919e2..d04d67366e 100644 --- a/composer.lock +++ b/composer.lock @@ -4059,16 +4059,16 @@ }, { "name": "utopia-php/platform", - "version": "0.7.4", + "version": "0.7.5", "source": { "type": "git", "url": "https://github.com/utopia-php/platform.git", - "reference": "a5b93d8177702ec458c3af9137663133c012b71b" + "reference": "8febd7b6e0c0f2cbd2f4289447bcca97496e4aaf" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/platform/zipball/a5b93d8177702ec458c3af9137663133c012b71b", - "reference": "a5b93d8177702ec458c3af9137663133c012b71b", + "url": "https://api.github.com/repos/utopia-php/platform/zipball/8febd7b6e0c0f2cbd2f4289447bcca97496e4aaf", + "reference": "8febd7b6e0c0f2cbd2f4289447bcca97496e4aaf", "shasum": "" }, "require": { @@ -4077,11 +4077,11 @@ "php": ">=8.0", "utopia-php/cli": "0.15.*", "utopia-php/framework": "0.33.*", - "utopia-php/queue": "0.9.*" + "utopia-php/queue": "0.10.*" }, "require-dev": { - "laravel/pint": "1.2.*", - "phpunit/phpunit": "^9.3" + "laravel/pint": "1.*", + "phpunit/phpunit": "9.*" }, "type": "library", "autoload": { @@ -4103,9 +4103,9 @@ ], "support": { "issues": "https://github.com/utopia-php/platform/issues", - "source": "https://github.com/utopia-php/platform/tree/0.7.4" + "source": "https://github.com/utopia-php/platform/tree/0.7.5" }, - "time": "2025-03-13T13:00:12+00:00" + "time": "2025-04-17T12:20:16+00:00" }, { "name": "utopia-php/pools", @@ -4214,16 +4214,16 @@ }, { "name": "utopia-php/queue", - "version": "0.9.1", + "version": "0.10.0", "source": { "type": "git", "url": "https://github.com/utopia-php/queue.git", - "reference": "32b6f84c55aae761db5a5ae76cc91ca8dbc8bc32" + "reference": "0eccc559168ea72241c39a4c482d868314666be1" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/queue/zipball/32b6f84c55aae761db5a5ae76cc91ca8dbc8bc32", - "reference": "32b6f84c55aae761db5a5ae76cc91ca8dbc8bc32", + "url": "https://api.github.com/repos/utopia-php/queue/zipball/0eccc559168ea72241c39a4c482d868314666be1", + "reference": "0eccc559168ea72241c39a4c482d868314666be1", "shasum": "" }, "require": { @@ -4232,6 +4232,7 @@ "utopia-php/cli": "0.15.*", "utopia-php/fetch": "0.4.*", "utopia-php/framework": "0.33.*", + "utopia-php/pools": "0.8.*", "utopia-php/telemetry": "0.1.*" }, "require-dev": { @@ -4273,9 +4274,9 @@ ], "support": { "issues": "https://github.com/utopia-php/queue/issues", - "source": "https://github.com/utopia-php/queue/tree/0.9.1" + "source": "https://github.com/utopia-php/queue/tree/0.10.0" }, - "time": "2025-03-28T19:49:36+00:00" + "time": "2025-04-17T12:15:52+00:00" }, { "name": "utopia-php/registry", diff --git a/src/Appwrite/Event/Event.php b/src/Appwrite/Event/Event.php index d699a45417..2c735ef2d4 100644 --- a/src/Appwrite/Event/Event.php +++ b/src/Appwrite/Event/Event.php @@ -286,13 +286,6 @@ class Event return $this; } - public function setParamSensitive(string $key): self - { - $this->sensitive[$key] = true; - - return $this; - } - /** * Get param of event. * diff --git a/src/Appwrite/Messaging/Adapter/Realtime.php b/src/Appwrite/Messaging/Adapter/Realtime.php index 96793b2683..be263aa655 100644 --- a/src/Appwrite/Messaging/Adapter/Realtime.php +++ b/src/Appwrite/Messaging/Adapter/Realtime.php @@ -2,14 +2,14 @@ namespace Appwrite\Messaging\Adapter; -use Appwrite\Messaging\Adapter; +use Appwrite\Messaging\Adapter as MessagingAdapter; +use Appwrite\PubSub\Adapter\Pool as PubSubPool; use Utopia\Database\DateTime; use Utopia\Database\Document; use Utopia\Database\Helpers\ID; use Utopia\Database\Helpers\Role; -use Utopia\Pools\Pool; -class Realtime extends Adapter +class Realtime extends MessagingAdapter { /** * Connection Tree @@ -132,11 +132,12 @@ class Realtime extends Adapter * Sends an event to the Realtime Server * @param string $projectId * @param array $payload - * @param string $event + * @param array $events * @param array $channels * @param array $roles * @param array $options * @return void + * @throws \Exception */ public function send(string $projectId, array $payload, array $events, array $channels, array $roles, array $options = []): void { @@ -158,9 +159,7 @@ class Realtime extends Adapter 'timestamp' => DateTime::formatTz(DateTime::now()), 'payload' => $payload ] - ]; - - $this->pubsubPool->use(fn (\Appwrite\PubSub\Adapter $pubsub) => $pubsub->publish('realtime', json_encode($message))); + ])); } /** @@ -175,8 +174,9 @@ class Realtime extends Adapter * - 1,121.328 ms (±0.84%) | 1,000,000 Connections / 10,000,000 Subscriptions * * @param array $event + * @return int[]|string[] */ - public function getSubscribers(array $event) + public function getSubscribers(array $event): array { $receivers = []; @@ -230,7 +230,7 @@ class Realtime extends Adapter foreach ($channels as $key => $value) { switch (true) { - case strpos($key, 'account.') === 0: + case \str_starts_with($key, 'account.'): unset($channels[$key]); break; @@ -272,6 +272,7 @@ class Realtime extends Adapter $channels[] = 'account.' . $parts[1]; $roles = [Role::user(ID::custom($parts[1]))->toString()]; break; + case 'migrations': case 'rules': $channels[] = 'console'; $channels[] = 'projects.' . $project->getId(); @@ -352,12 +353,6 @@ class Realtime extends Adapter $roles = [Role::team($project->getAttribute('teamId'))->toString()]; } - break; - case 'migrations': - $channels[] = 'console'; - $channels[] = 'projects.' . $project->getId(); - $projectId = 'console'; - $roles = [Role::team($project->getAttribute('teamId'))->toString()]; break; } diff --git a/src/Appwrite/Platform/Tasks/Doctor.php b/src/Appwrite/Platform/Tasks/Doctor.php index c43afea527..5263133eba 100644 --- a/src/Appwrite/Platform/Tasks/Doctor.php +++ b/src/Appwrite/Platform/Tasks/Doctor.php @@ -3,14 +3,19 @@ namespace Appwrite\Platform\Tasks; use Appwrite\ClamAV\Network; -use Appwrite\PubSub\Adapter; +use Appwrite\PubSub\Adapter\Pool as PubSubPool; +use PHPMailer\PHPMailer\PHPMailer; use Utopia\App; +use Utopia\Cache\Adapter\Pool as CachePool; use Utopia\CLI\Console; use Utopia\Config\Config; +use Utopia\Database\Adapter\Pool as DatabasePool; use Utopia\Domains\Domain; use Utopia\DSN\DSN; use Utopia\Logger\Logger; use Utopia\Platform\Action; +use Utopia\Pools\Group; +use Utopia\Queue\Broker\Pool as BrokerPool; use Utopia\Registry\Registry; use Utopia\Storage\Device\Local; use Utopia\Storage\Storage; @@ -76,9 +81,9 @@ class Doctor extends Action Console::log('🟢 Abuse protection is enabled'); } - $authWhitelistRoot = System::getEnv('_APP_CONSOLE_WHITELIST_ROOT', null); - $authWhitelistEmails = System::getEnv('_APP_CONSOLE_WHITELIST_EMAILS', null); - $authWhitelistIPs = System::getEnv('_APP_CONSOLE_WHITELIST_IPS', null); + $authWhitelistRoot = System::getEnv('_APP_CONSOLE_WHITELIST_ROOT'); + $authWhitelistEmails = System::getEnv('_APP_CONSOLE_WHITELIST_EMAILS'); + $authWhitelistIPs = System::getEnv('_APP_CONSOLE_WHITELIST_IPS'); if ( empty($authWhitelistRoot) @@ -114,19 +119,16 @@ class Doctor extends Action } else { Console::log('🟢 Logging adapter is enabled (' . $providerName . ')'); } - } catch (\Throwable $th) { + } catch (\Throwable) { Console::log('🔴 Logging adapter is misconfigured'); } \usleep(200 * 1000); // Sleep for 0.2 seconds - try { - Console::log("\n" . '[Connectivity]'); - } catch (\Throwable $th) { - //throw $th; - } + Console::log("\n" . '[Connectivity]'); - $pools = $register->get('pools'); /** @var \Utopia\Pools\Group $pools */ + /** @var Group $pools */ + $pools = $register->get('pools'); $configs = [ 'Console.DB' => Config::getParam('pools-console'), @@ -136,20 +138,22 @@ class Doctor extends Action foreach ($configs as $key => $config) { foreach ($config as $database) { try { - $adapter = $pools->get($database)->pop()->getResource(); + $adapter = new DatabasePool($pools->get($database)); if ($adapter->ping()) { Console::success('🟢 ' . str_pad("{$key}({$database})", 50, '.') . 'connected'); } else { Console::error('🔴 ' . str_pad("{$key}({$database})", 47, '.') . 'disconnected'); } - } catch (\Throwable $th) { + } catch (\Throwable) { Console::error('🔴 ' . str_pad("{$key}.({$database})", 47, '.') . 'disconnected'); } } } - $pools = $register->get('pools'); /** @var \Utopia\Pools\Group $pools */ + /** @var Group $pools */ + $pools = $register->get('pools'); + $configs = [ 'Cache' => Config::getParam('pools-cache'), 'Queue' => Config::getParam('pools-queue'), @@ -159,15 +163,18 @@ class Doctor extends Action foreach ($configs as $key => $config) { foreach ($config as $pool) { try { - /** @var Adapter $adapter */ - $adapter = $pools->get($pool)->pop()->getResource(); + $adapter = match($key) { + 'Cache' => new CachePool($pools->get($pool)), + 'Queue' => new BrokerPool($pools->get($pool)), + 'PubSub' => new PubSubPool($pools->get($pool)), + }; if ($adapter->ping()) { Console::success('🟢 ' . str_pad("{$key}({$pool})", 50, '.') . 'connected'); } else { Console::error('🔴 ' . str_pad("{$key}({$pool})", 47, '.') . 'disconnected'); } - } catch (\Throwable $th) { + } catch (\Throwable) { Console::error('🔴 ' . str_pad("{$key}({$pool})", 47, '.') . 'disconnected'); } } @@ -185,13 +192,14 @@ class Doctor extends Action } else { Console::error('🔴 ' . str_pad("Antivirus", 47, '.') . 'disconnected'); } - } catch (\Throwable $th) { + } catch (\Throwable) { Console::error('🔴 ' . str_pad("Antivirus", 47, '.') . 'disconnected'); } } try { - $mail = $register->get('smtp'); /* @var $mail \PHPMailer\PHPMailer\PHPMailer */ + /* @var PHPMailer $mail */ + $mail = $register->get('smtp'); $mail->addAddress('demo@example.com', 'Example.com'); $mail->Subject = 'Test SMTP Connection'; @@ -200,7 +208,7 @@ class Doctor extends Action $mail->send(); Console::success('🟢 ' . str_pad("SMTP", 50, '.') . 'connected'); - } catch (\Throwable $th) { + } catch (\Throwable) { Console::error('🔴 ' . str_pad("SMTP", 47, '.') . 'disconnected'); } @@ -274,7 +282,7 @@ class Doctor extends Action Console::error('Failed to check for a newer version' . "\n"); } } - } catch (\Throwable $th) { + } catch (\Throwable) { Console::error('Failed to check for a newer version' . "\n"); } } diff --git a/src/Appwrite/Platform/Tasks/ScheduleBase.php b/src/Appwrite/Platform/Tasks/ScheduleBase.php index afd7d9d22a..3e9907f877 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleBase.php +++ b/src/Appwrite/Platform/Tasks/ScheduleBase.php @@ -13,6 +13,7 @@ use Utopia\Database\Query; use Utopia\Database\Validator\Authorization; use Utopia\Platform\Action; use Utopia\Pools\Group; +use Utopia\Queue\Broker\Pool as BrokerPool; use Utopia\System\System; use Utopia\Telemetry\Adapter as Telemetry; use Utopia\Telemetry\Gauge; @@ -25,6 +26,8 @@ abstract class ScheduleBase extends Action protected array $schedules = []; + protected BrokerPool $publisher; + private ?Histogram $collectSchedulesTelemetryDuration = null; private ?Gauge $collectSchedulesTelemetryCount = null; private ?Gauge $scheduleTelemetryCount = null; @@ -71,6 +74,7 @@ abstract class ScheduleBase extends Action Console::title(\ucfirst(static::getSupportedResource()) . ' scheduler V1'); Console::success(APP_NAME . ' ' . \ucfirst(static::getSupportedResource()) . ' scheduler v1 has started'); + $this->publisher = new BrokerPool($pools->get('publisher')); $this->scheduleTelemetryCount = $telemetry->createGauge('task.schedule.count'); $this->collectSchedulesTelemetryDuration = $telemetry->createHistogram('task.schedule.collect_schedules.duration', 's'); $this->collectSchedulesTelemetryCount = $telemetry->createGauge('task.schedule.collect_schedules.count'); @@ -122,8 +126,6 @@ abstract class ScheduleBase extends Action $schedule->getAttribute('resourceId') ); - $pools->reclaim(); - return [ '$internalId' => $schedule->getInternalId(), '$id' => $schedule->getId(), diff --git a/src/Appwrite/Platform/Tasks/ScheduleExecutions.php b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php index 89d1609a33..331928d265 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleExecutions.php +++ b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php @@ -3,7 +3,6 @@ namespace Appwrite\Platform\Tasks; use Appwrite\Event\Func; -use Swoole\Coroutine as Co; use Utopia\Database\Database; use Utopia\Pools\Group; @@ -29,9 +28,6 @@ class ScheduleExecutions extends ScheduleBase protected function enqueueResources(Group $pools, Database $dbForPlatform, callable $getProjectDB): void { - $queue = $pools->get('publisher')->pop(); - $connection = $queue->getResource(); - $queueForFunctions = new Func($connection); $intervalEnd = (new \DateTime())->modify('+' . self::ENQUEUE_TIMER . ' seconds'); foreach ($this->schedules as $schedule) { @@ -59,8 +55,10 @@ class ScheduleExecutions extends ScheduleBase $this->updateProjectAccess($schedule['project'], $dbForPlatform); - \go(function () use ($queueForFunctions, $schedule, $scheduledAt, $delay, $data) { - Co::sleep($delay); + \go(function () use ($schedule, $delay, $data, $pools) { + \Co::sleep($delay); + + $queueForFunctions = new Func($this->publisher); $queueForFunctions->setType('schedule') // Set functionId instead of function as we don't have $dbForProject @@ -85,7 +83,5 @@ class ScheduleExecutions extends ScheduleBase unset($this->schedules[$schedule['$internalId']]); } - - $queue->reclaim(); } } diff --git a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php index 6788748f3d..649fdb333a 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php +++ b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php @@ -75,12 +75,9 @@ class ScheduleFunctions extends ScheduleBase $delayedExecutions[$delay][] = ['key' => $key, 'nextDate' => $nextDate]; } - foreach ($delayedExecutions as $delay => $schedules) { - \go(function () use ($delay, $schedules, $pools, $dbForPlatform) { - \sleep($delay); // in seconds - - $queue = $pools->get('publisher')->pop(); - $connection = $queue->getResource(); + foreach ($delayedExecutions as $delay => $scheduleKeys) { + \go(function () use ($delay, $scheduleKeys, $pools, $dbForPlatform) { + \Co::sleep($delay); // in seconds foreach ($schedules as $delayConfig) { $scheduleKey = $delayConfig['key']; @@ -93,7 +90,7 @@ class ScheduleFunctions extends ScheduleBase $this->updateProjectAccess($schedule['project'], $dbForPlatform); - $queueForFunctions = new Func($connection); + $queueForFunctions = new Func($this->publisher); $queueForFunctions ->setType('schedule') @@ -105,8 +102,6 @@ class ScheduleFunctions extends ScheduleBase $this->recordEnqueueDelay($delayConfig['nextDate']); } - - $queue->reclaim(); }); } diff --git a/src/Appwrite/Platform/Tasks/ScheduleMessages.php b/src/Appwrite/Platform/Tasks/ScheduleMessages.php index a15df6ed5b..4f7dfab6a0 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleMessages.php +++ b/src/Appwrite/Platform/Tasks/ScheduleMessages.php @@ -40,10 +40,8 @@ class ScheduleMessages extends ScheduleBase continue; } - \go(function () use ($schedule, $scheduledAt, $pools, $dbForPlatform) { - $queue = $pools->get('publisher')->pop(); - $connection = $queue->getResource(); - $queueForMessaging = new Messaging($connection); + \go(function () use ($schedule, $pools, $dbForPlatform) { + $queueForMessaging = new Messaging($this->publisher); $this->updateProjectAccess($schedule['project'], $dbForPlatform); @@ -58,7 +56,6 @@ class ScheduleMessages extends ScheduleBase $schedule['$id'], ); - $queue->reclaim(); $this->recordEnqueueDelay($scheduledAt); unset($this->schedules[$schedule['$internalId']]); }); diff --git a/src/Appwrite/Platform/Workers/Deletes.php b/src/Appwrite/Platform/Workers/Deletes.php index a61db63de6..427772a6e0 100644 --- a/src/Appwrite/Platform/Workers/Deletes.php +++ b/src/Appwrite/Platform/Workers/Deletes.php @@ -61,10 +61,7 @@ class Deletes extends Action ->inject('executionRetention') ->inject('auditRetention') ->inject('log') - ->callback( - fn ($message, Document $project, Database $dbForPlatform, callable $getProjectDB, callable $getLogsDB, Device $deviceForFiles, Device $deviceForFunctions, Device $deviceForBuilds, Device $deviceForCache, CertificatesAdapter $certificates, Executor $executor, string $executionRetention, string $auditRetention, Log $log) => - $this->action($message, $project, $dbForPlatform, $getProjectDB, $getLogsDB, $deviceForFiles, $deviceForFunctions, $deviceForBuilds, $deviceForCache, $certificates, $executor, $executionRetention, $auditRetention, $log) - ); + ->callback([$this, 'action']); } /** diff --git a/src/Appwrite/Platform/Workers/StatsUsage.php b/src/Appwrite/Platform/Workers/StatsUsage.php index 66f285fcf5..60fab5d2ea 100644 --- a/src/Appwrite/Platform/Workers/StatsUsage.php +++ b/src/Appwrite/Platform/Workers/StatsUsage.php @@ -325,7 +325,7 @@ class StatsUsage extends Action break; } } catch (Throwable $e) { - console::error("[reducer] " . " {DateTime::now()} " . " {$project->getInternalId()} " . " {$e->getMessage()}"); + Console::error("[reducer] " . " {DateTime::now()} " . " {$project->getInternalId()} " . " {$e->getMessage()}"); } } @@ -344,7 +344,7 @@ class StatsUsage extends Action continue; } - console::log('['.DateTime::now().'] Id: '.$project->getId(). ' InternalId: '.$project->getInternalId(). ' Db: '.$project->getAttribute('database').' ReceivedAt: '.$receivedAt. ' Keys: '.$numberOfKeys); + Console::log('['.DateTime::now().'] Id: '.$project->getId(). ' InternalId: '.$project->getInternalId(). ' Db: '.$project->getAttribute('database').' ReceivedAt: '.$receivedAt. ' Keys: '.$numberOfKeys); try { foreach ($stats['keys'] ?? [] as $key => $value) { @@ -381,7 +381,7 @@ class StatsUsage extends Action } } } catch (Exception $e) { - console::error('[' . DateTime::now() . '] project [' . $project->getInternalId() . '] database [' . $project['database'] . '] ' . ' ' . $e->getMessage()); + Console::error('[' . DateTime::now() . '] project [' . $project->getInternalId() . '] database [' . $project['database'] . '] ' . ' ' . $e->getMessage()); } } @@ -405,7 +405,7 @@ class StatsUsage extends Action } - protected function prepareForLogsDB(Document $project, Document $stat) + protected function prepareForLogsDB(Document $project, Document $stat): void { if (System::getEnv('_APP_STATS_USAGE_DUAL_WRITING', 'disabled') === 'disabled') { return; @@ -430,8 +430,7 @@ class StatsUsage extends Action return; } - $dbForLogs = call_user_func($this->getLogsDB); - $dbForLogs + $dbForLogs = ($this->getLogsDB)() ->setTenant(null) ->setTenantPerDocument(true); @@ -446,6 +445,5 @@ class StatsUsage extends Action } catch (Throwable $th) { Console::error($th->getMessage()); } - $this->register->get('pools')->get('logs')->reclaim(); } } diff --git a/src/Appwrite/Platform/Workers/StatsUsageDump.php b/src/Appwrite/Platform/Workers/StatsUsageDump.php index b9d486e0d8..77ec3f13e6 100644 --- a/src/Appwrite/Platform/Workers/StatsUsageDump.php +++ b/src/Appwrite/Platform/Workers/StatsUsageDump.php @@ -70,9 +70,9 @@ class StatsUsageDump extends Action ]; /** - * @var callable + * @var callable(Document): Database */ - protected mixed $getLogsDB; + protected $getLogsDB; protected array $periods = [ '1h' => 'Y-m-d H:00', @@ -126,10 +126,10 @@ class StatsUsageDump extends Action continue; } - console::log('['.DateTime::now().'] Id: '.$project->getId(). ' InternalId: '.$project->getInternalId(). ' Db: '.$project->getAttribute('database').' ReceivedAt: '.$receivedAt. ' Keys: '.$numberOfKeys); + Console::log('['.DateTime::now().'] Id: '.$project->getId(). ' InternalId: '.$project->getInternalId(). ' Db: '.$project->getAttribute('database').' ReceivedAt: '.$receivedAt. ' Keys: '.$numberOfKeys); try { - /** @var \Utopia\Database\Database $dbForProject */ + /** @var Database $dbForProject */ $dbForProject = $getProjectDB($project); foreach ($stats['keys'] ?? [] as $key => $value) { if ($value == 0) { @@ -169,7 +169,7 @@ class StatsUsageDump extends Action } } } catch (\Exception $e) { - console::error('[' . DateTime::now() . '] project [' . $project->getInternalId() . '] database [' . $project['database'] . '] ' . ' ' . $e->getMessage()); + Console::error('[' . DateTime::now() . '] project [' . $project->getInternalId() . '] database [' . $project['database'] . '] ' . ' ' . $e->getMessage()); } } } @@ -190,8 +190,7 @@ class StatsUsageDump extends Action } } - /** @var \Utopia\Database\Database $dbForLogs*/ - $dbForLogs = call_user_func($this->getLogsDB, $project); + $dbForLogs = ($this->getLogsDB)($project); try { $dbForLogs->createOrUpdateDocumentsWithIncrease( @@ -203,7 +202,5 @@ class StatsUsageDump extends Action } catch (\Throwable $th) { Console::error($th->getMessage()); } - - $this->register->get('pools')->get('logs')->reclaim(); } } diff --git a/src/Appwrite/PubSub/Adapter/Pool.php b/src/Appwrite/PubSub/Adapter/Pool.php new file mode 100644 index 0000000000..a498118dae --- /dev/null +++ b/src/Appwrite/PubSub/Adapter/Pool.php @@ -0,0 +1,46 @@ +delegate(__FUNCTION__, \func_get_args()); + } + + public function subscribe($channels, $callback): void + { + $this->delegate(__FUNCTION__, \func_get_args()); + } + + public function publish($channel, $message): void + { + $this->delegate(__FUNCTION__, \func_get_args()); + } + + /** + * Forward method calls to the internal adapter instance via the pool. + * + * Required because __call() can't be used to implement abstract methods. + * + * @param string $method + * @param array $args + * @return mixed + * @throws DatabaseException + */ + public function delegate(string $method, array $args): mixed + { + return $this->pool->use(function (Adapter $adapter) use ($method, $args) { + return $adapter->{$method}(...$args); + }); + } +} diff --git a/tests/resources/docker/docker-compose.yml b/tests/resources/docker/docker-compose.yml index e549ac27a5..779d63d6ed 100644 --- a/tests/resources/docker/docker-compose.yml +++ b/tests/resources/docker/docker-compose.yml @@ -35,7 +35,7 @@ services: - VERSION=dev restart: unless-stopped ports: - - 9501:80 + - "9501:80" networks: - appwrite labels: @@ -52,15 +52,12 @@ services: - ./phpunit.xml:/usr/src/code/phpunit.xml - ./tests:/usr/src/code/tests - ./app:/usr/src/code/app - # - ./vendor:/usr/src/code/vendor - ./docs:/usr/src/code/docs - ./public:/usr/src/code/public - ./src:/usr/src/code/src - - ./debug:/tmp depends_on: - mariadb - redis - # - clamav environment: - _APP_COMPRESSION_MIN_SIZE_BYTES - _APP_ENV @@ -355,33 +352,6 @@ services: volumes: - appwrite-redis:/data:rw - # clamav: - # image: appwrite/clamav:1.2.0 - # container_name: appwrite-clamav - # restart: unless-stopped - # networks: - # - appwrite - # volumes: - # - appwrite-uploads:/storage/uploads - - - # redis-commander: - # image: rediscommander/redis-commander:latest - # restart: unless-stopped - # networks: - # - appwrite - # environment: - # - REDIS_HOSTS=redis - # ports: - # - "8081:8081" - - # webgrind: - # image: 'jokkedk/webgrind:latest' - # volumes: - # - './debug:/tmp' - # ports: - # - '3001:80' - networks: gateway: appwrite: From af3388a51f9226455f0924b596047eb97f84c5d6 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Wed, 14 May 2025 18:16:54 +1200 Subject: [PATCH 05/14] Format --- app/cli.php | 3 ++- composer.lock | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/app/cli.php b/app/cli.php index 61d8a90100..663423b57a 100644 --- a/app/cli.php +++ b/app/cli.php @@ -10,6 +10,7 @@ use Appwrite\Event\StatsUsage; use Appwrite\Platform\Appwrite; use Appwrite\Runtimes\Runtimes; use Executor\Executor; +use Swoole\Timer; use Utopia\Cache\Adapter\Pool as CachePool; use Utopia\Cache\Adapter\Sharding; use Utopia\Cache\Cache; @@ -29,7 +30,7 @@ use Utopia\Queue\Publisher; use Utopia\Registry\Registry; use Utopia\System\System; use Utopia\Telemetry\Adapter\None as NoTelemetry; -use Swoole\Timer; + use function Swoole\Coroutine\run; // Overwriting runtimes to be architecture agnostic for CLI diff --git a/composer.lock b/composer.lock index d04d67366e..945948739a 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "1eeb5a0f3560aefd8f71bd0955e30360", + "content-hash": "24b02f9535e6a202fde735137f0ec900", "packages": [ { "name": "adhocore/jwt", From 09152a08e1756620fcc18072e7ff96b54876ec76 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Wed, 14 May 2025 18:27:51 +1200 Subject: [PATCH 06/14] Fix merge --- src/Appwrite/Platform/Tasks/ScheduleFunctions.php | 6 +++--- src/Appwrite/Platform/Workers/Deletes.php | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php index 649fdb333a..19e068107a 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php +++ b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php @@ -75,9 +75,9 @@ class ScheduleFunctions extends ScheduleBase $delayedExecutions[$delay][] = ['key' => $key, 'nextDate' => $nextDate]; } - foreach ($delayedExecutions as $delay => $scheduleKeys) { - \go(function () use ($delay, $scheduleKeys, $pools, $dbForPlatform) { - \Co::sleep($delay); // in seconds + foreach ($delayedExecutions as $delay => $schedules) { + \go(function () use ($delay, $schedules, $pools, $dbForPlatform) { + \sleep($delay); // in seconds foreach ($schedules as $delayConfig) { $scheduleKey = $delayConfig['key']; diff --git a/src/Appwrite/Platform/Workers/Deletes.php b/src/Appwrite/Platform/Workers/Deletes.php index 427772a6e0..d47ba9cb15 100644 --- a/src/Appwrite/Platform/Workers/Deletes.php +++ b/src/Appwrite/Platform/Workers/Deletes.php @@ -61,7 +61,7 @@ class Deletes extends Action ->inject('executionRetention') ->inject('auditRetention') ->inject('log') - ->callback([$this, 'action']); + ->callback($this->action(...)); } /** From 33fbab7e3d0128afd17a9bdec1f50da4d1377f90 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Wed, 14 May 2025 21:28:59 +1200 Subject: [PATCH 07/14] Fix missing capture --- src/Appwrite/Platform/Tasks/ScheduleMessages.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Appwrite/Platform/Tasks/ScheduleMessages.php b/src/Appwrite/Platform/Tasks/ScheduleMessages.php index 4f7dfab6a0..5e65f7a8a6 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleMessages.php +++ b/src/Appwrite/Platform/Tasks/ScheduleMessages.php @@ -40,7 +40,7 @@ class ScheduleMessages extends ScheduleBase continue; } - \go(function () use ($schedule, $pools, $dbForPlatform) { + \go(function () use ($schedule, $scheduledAt, $pools, $dbForPlatform) { $queueForMessaging = new Messaging($this->publisher); $this->updateProjectAccess($schedule['project'], $dbForPlatform); From 55236524e336b8108b94e6e8c0d332dce6db9735 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Wed, 14 May 2025 21:42:39 +1200 Subject: [PATCH 08/14] Fix merge --- src/Appwrite/Platform/Tasks/ScheduleExecutions.php | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Appwrite/Platform/Tasks/ScheduleExecutions.php b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php index 331928d265..99c84e829a 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleExecutions.php +++ b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php @@ -3,6 +3,7 @@ namespace Appwrite\Platform\Tasks; use Appwrite\Event\Func; +use Swoole\Coroutine as Co; use Utopia\Database\Database; use Utopia\Pools\Group; @@ -28,6 +29,7 @@ class ScheduleExecutions extends ScheduleBase protected function enqueueResources(Group $pools, Database $dbForPlatform, callable $getProjectDB): void { + $queueForFunctions = new Func($this->publisher); $intervalEnd = (new \DateTime())->modify('+' . self::ENQUEUE_TIMER . ' seconds'); foreach ($this->schedules as $schedule) { @@ -55,10 +57,8 @@ class ScheduleExecutions extends ScheduleBase $this->updateProjectAccess($schedule['project'], $dbForPlatform); - \go(function () use ($schedule, $delay, $data, $pools) { - \Co::sleep($delay); - - $queueForFunctions = new Func($this->publisher); + \go(function () use ($queueForFunctions, $schedule, $scheduledAt, $delay, $data) { + Co::sleep($delay); $queueForFunctions->setType('schedule') // Set functionId instead of function as we don't have $dbForProject From 117717888572314130278089cb597ddee895b444 Mon Sep 17 00:00:00 2001 From: Fabian Gruber Date: Mon, 19 May 2025 12:40:11 +0200 Subject: [PATCH 09/14] fix: task coroutine hooks --- app/cli.php | 3 +++ 1 file changed, 3 insertions(+) diff --git a/app/cli.php b/app/cli.php index 9ade97e90c..0c91b3c34f 100644 --- a/app/cli.php +++ b/app/cli.php @@ -10,6 +10,7 @@ use Appwrite\Event\StatsUsage; use Appwrite\Platform\Appwrite; use Appwrite\Runtimes\Runtimes; use Executor\Executor; +use Swoole\Runtime; use Swoole\Timer; use Utopia\Cache\Adapter\Sharding; use Utopia\Cache\Cache; @@ -298,4 +299,6 @@ $cli $cli->shutdown()->action(fn () => Timer::clearAll()); +// Enable coroutines, but disable TCP hooks. These don't work until we use `\Utopia\Cache\Adapter\Pool` and `\Utopia\Database\Adapter\Pool`. +Runtime::enableCoroutine(SWOOLE_HOOK_ALL ^ SWOOLE_HOOK_TCP); run($cli->run(...)); From 149dfb965153d8f51126017e1f3a2456fb53241c Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Tue, 20 May 2025 20:07:11 +1200 Subject: [PATCH 10/14] Update lock --- composer.lock | 50 +++++++++++++++++++++++++------------------------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/composer.lock b/composer.lock index c52ecba812..432676ca3e 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "24b02f9535e6a202fde735137f0ec900", + "content-hash": "63feb1a7cf4cfa2cc7fa0870236e61ea", "packages": [ { "name": "adhocore/jwt", @@ -3499,16 +3499,16 @@ }, { "name": "utopia-php/database", - "version": "0.69.2", + "version": "0.69.5", "source": { "type": "git", "url": "https://github.com/utopia-php/database.git", - "reference": "60591ab073bb80bb9843338754b679bb8169e4ed" + "reference": "4abe53609dfc23b2ea82884d12b149df6a8af2f5" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/database/zipball/60591ab073bb80bb9843338754b679bb8169e4ed", - "reference": "60591ab073bb80bb9843338754b679bb8169e4ed", + "url": "https://api.github.com/repos/utopia-php/database/zipball/4abe53609dfc23b2ea82884d12b149df6a8af2f5", + "reference": "4abe53609dfc23b2ea82884d12b149df6a8af2f5", "shasum": "" }, "require": { @@ -3549,9 +3549,9 @@ ], "support": { "issues": "https://github.com/utopia-php/database/issues", - "source": "https://github.com/utopia-php/database/tree/0.69.2" + "source": "https://github.com/utopia-php/database/tree/0.69.5" }, - "time": "2025-05-14T07:51:44+00:00" + "time": "2025-05-17T08:01:51+00:00" }, { "name": "utopia-php/domains", @@ -3701,16 +3701,16 @@ }, { "name": "utopia-php/framework", - "version": "0.33.19", + "version": "0.33.20", "source": { "type": "git", "url": "https://github.com/utopia-php/http.git", - "reference": "64c7b7bb8a8595ffe875fa8d4b7705684dbf46c0" + "reference": "e1c7ab4e0b5b0a9a70256b1e00912e101e76a131" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/http/zipball/64c7b7bb8a8595ffe875fa8d4b7705684dbf46c0", - "reference": "64c7b7bb8a8595ffe875fa8d4b7705684dbf46c0", + "url": "https://api.github.com/repos/utopia-php/http/zipball/e1c7ab4e0b5b0a9a70256b1e00912e101e76a131", + "reference": "e1c7ab4e0b5b0a9a70256b1e00912e101e76a131", "shasum": "" }, "require": { @@ -3742,9 +3742,9 @@ ], "support": { "issues": "https://github.com/utopia-php/http/issues", - "source": "https://github.com/utopia-php/http/tree/0.33.19" + "source": "https://github.com/utopia-php/http/tree/0.33.20" }, - "time": "2025-03-06T11:37:49+00:00" + "time": "2025-05-18T23:51:21+00:00" }, { "name": "utopia-php/image", @@ -4059,16 +4059,16 @@ }, { "name": "utopia-php/platform", - "version": "0.7.5", + "version": "0.7.6", "source": { "type": "git", "url": "https://github.com/utopia-php/platform.git", - "reference": "8febd7b6e0c0f2cbd2f4289447bcca97496e4aaf" + "reference": "6bc7fbb43ec2b7f9ee5bdef5d4b5e4a81860950b" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/platform/zipball/8febd7b6e0c0f2cbd2f4289447bcca97496e4aaf", - "reference": "8febd7b6e0c0f2cbd2f4289447bcca97496e4aaf", + "url": "https://api.github.com/repos/utopia-php/platform/zipball/6bc7fbb43ec2b7f9ee5bdef5d4b5e4a81860950b", + "reference": "6bc7fbb43ec2b7f9ee5bdef5d4b5e4a81860950b", "shasum": "" }, "require": { @@ -4103,9 +4103,9 @@ ], "support": { "issues": "https://github.com/utopia-php/platform/issues", - "source": "https://github.com/utopia-php/platform/tree/0.7.5" + "source": "https://github.com/utopia-php/platform/tree/0.7.6" }, - "time": "2025-04-17T12:20:16+00:00" + "time": "2025-05-18T20:31:24+00:00" }, { "name": "utopia-php/pools", @@ -4771,16 +4771,16 @@ "packages-dev": [ { "name": "appwrite/sdk-generator", - "version": "0.40.16", + "version": "0.40.17", "source": { "type": "git", "url": "https://github.com/appwrite/sdk-generator.git", - "reference": "f1f506da74033f0cb5a11e3dffcfd1ee8daf237d" + "reference": "7e333c1003bfd4763e4d6f3a0a799fde5e7bc4de" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/appwrite/sdk-generator/zipball/f1f506da74033f0cb5a11e3dffcfd1ee8daf237d", - "reference": "f1f506da74033f0cb5a11e3dffcfd1ee8daf237d", + "url": "https://api.github.com/repos/appwrite/sdk-generator/zipball/7e333c1003bfd4763e4d6f3a0a799fde5e7bc4de", + "reference": "7e333c1003bfd4763e4d6f3a0a799fde5e7bc4de", "shasum": "" }, "require": { @@ -4816,9 +4816,9 @@ "description": "Appwrite PHP library for generating API SDKs for multiple programming languages and platforms", "support": { "issues": "https://github.com/appwrite/sdk-generator/issues", - "source": "https://github.com/appwrite/sdk-generator/tree/0.40.16" + "source": "https://github.com/appwrite/sdk-generator/tree/0.40.17" }, - "time": "2025-05-09T12:06:09+00:00" + "time": "2025-05-16T15:10:54+00:00" }, { "name": "doctrine/annotations", From 455554a6a2afe4f54ce3d36d542c0da09caf12b5 Mon Sep 17 00:00:00 2001 From: loks0n <22452787+loks0n@users.noreply.github.com> Date: Tue, 20 May 2025 11:06:09 +0100 Subject: [PATCH 11/14] fix: download endpoint --- app/controllers/api/storage.php | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/app/controllers/api/storage.php b/app/controllers/api/storage.php index 78c4c5fec9..a96b871bf2 100644 --- a/app/controllers/api/storage.php +++ b/app/controllers/api/storage.php @@ -1225,12 +1225,15 @@ App::get('/v1/storage/buckets/:bucketId/files/:fileId/download') if (!empty($source)) { if (!empty($rangeHeader)) { $response->send(substr($source, $start, ($end - $start + 1))); + return; } $response->send($source); + return; } if (!empty($rangeHeader)) { $response->send($deviceForFiles->read($path, $start, ($end - $start + 1))); + return; } if ($size > APP_STORAGE_READ_BUFFER) { @@ -1383,6 +1386,7 @@ App::get('/v1/storage/buckets/:bucketId/files/:fileId/view') if (!empty($source)) { if (!empty($rangeHeader)) { $response->send(substr($source, $start, ($end - $start + 1))); + return; } $response->send($source); return; @@ -1534,6 +1538,7 @@ App::get('/v1/storage/buckets/:bucketId/files/:fileId/push') if (!empty($source)) { if (!empty($rangeHeader)) { $response->send(substr($source, $start, ($end - $start + 1))); + return; } $response->send($source); return; From fe763093df4d91d21cbc015c237078e26cbf0a0e Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Wed, 21 May 2025 00:58:32 +1200 Subject: [PATCH 12/14] Catch query exception on get document for selects --- app/controllers/api/databases.php | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/app/controllers/api/databases.php b/app/controllers/api/databases.php index 675268b894..138428ef75 100644 --- a/app/controllers/api/databases.php +++ b/app/controllers/api/databases.php @@ -3785,7 +3785,12 @@ App::get('/v1/databases/:databaseId/collections/:collectionId/documents/:documen throw new Exception(Exception::GENERAL_QUERY_INVALID, $e->getMessage()); } - $document = $dbForProject->getDocument('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId(), $documentId, $queries); + try { + $document = $dbForProject->getDocument('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId(), $documentId, $queries); + } catch (QueryException $e) { + throw new Exception(Exception::GENERAL_QUERY_INVALID, $e->getMessage()); + } + if ($document->isEmpty()) { throw new Exception(Exception::DOCUMENT_NOT_FOUND); } From bc34f95646043aa10224e1015ec7122b2135800c Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Wed, 21 May 2025 01:13:08 +1200 Subject: [PATCH 13/14] Update lock --- app/controllers/api/databases.php | 2 +- composer.lock | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/app/controllers/api/databases.php b/app/controllers/api/databases.php index 138428ef75..ec402333e4 100644 --- a/app/controllers/api/databases.php +++ b/app/controllers/api/databases.php @@ -3790,7 +3790,7 @@ App::get('/v1/databases/:databaseId/collections/:collectionId/documents/:documen } catch (QueryException $e) { throw new Exception(Exception::GENERAL_QUERY_INVALID, $e->getMessage()); } - + if ($document->isEmpty()) { throw new Exception(Exception::DOCUMENT_NOT_FOUND); } diff --git a/composer.lock b/composer.lock index 432676ca3e..557b61f36c 100644 --- a/composer.lock +++ b/composer.lock @@ -4059,16 +4059,16 @@ }, { "name": "utopia-php/platform", - "version": "0.7.6", + "version": "0.7.7", "source": { "type": "git", "url": "https://github.com/utopia-php/platform.git", - "reference": "6bc7fbb43ec2b7f9ee5bdef5d4b5e4a81860950b" + "reference": "8c43cd866148a7c4c495e3401268429e338004b3" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/platform/zipball/6bc7fbb43ec2b7f9ee5bdef5d4b5e4a81860950b", - "reference": "6bc7fbb43ec2b7f9ee5bdef5d4b5e4a81860950b", + "url": "https://api.github.com/repos/utopia-php/platform/zipball/8c43cd866148a7c4c495e3401268429e338004b3", + "reference": "8c43cd866148a7c4c495e3401268429e338004b3", "shasum": "" }, "require": { @@ -4103,9 +4103,9 @@ ], "support": { "issues": "https://github.com/utopia-php/platform/issues", - "source": "https://github.com/utopia-php/platform/tree/0.7.6" + "source": "https://github.com/utopia-php/platform/tree/0.7.7" }, - "time": "2025-05-18T20:31:24+00:00" + "time": "2025-05-20T09:23:44+00:00" }, { "name": "utopia-php/pools", From 20f2cbffda764cf3c26fb78b3071855d98f39c6a Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Tue, 20 May 2025 14:37:10 +0000 Subject: [PATCH 14/14] Merge pull request #9811 from ArnabChatterjee20k/upsert-single-document-route Upsert single document route --- app/controllers/api/databases.php | 238 ++++++++++++++ docs/references/databases/upsert-document.md | 1 + .../e2e/Services/Databases/DatabasesBase.php | 303 +++++++++++++++++- 3 files changed, 541 insertions(+), 1 deletion(-) create mode 100644 docs/references/databases/upsert-document.md diff --git a/app/controllers/api/databases.php b/app/controllers/api/databases.php index ec402333e4..68d8891067 100644 --- a/app/controllers/api/databases.php +++ b/app/controllers/api/databases.php @@ -4207,6 +4207,244 @@ App::patch('/v1/databases/:databaseId/collections/:collectionId/documents/:docum $response->dynamic($document, Response::MODEL_DOCUMENT); }); +App::put('/v1/databases/:databaseId/collections/:collectionId/documents/:documentId') + ->desc('Upsert document') + ->groups(['api', 'database']) + ->label('event', 'databases.[databaseId].collections.[collectionId].documents.[documentId].upsert') + ->label('scope', 'documents.write') + ->label('resourceType', RESOURCE_TYPE_DATABASES) + ->label('audits.event', 'document.upsert') + ->label('audits.resource', 'database/{request.databaseId}/collection/{request.collectionId}/document/{response.$id}') + ->label('abuse-key', 'ip:{ip},method:{method},url:{url},userId:{userId}') + ->label('abuse-limit', APP_LIMIT_WRITE_RATE_DEFAULT * 2) + ->label('abuse-time', APP_LIMIT_WRITE_RATE_PERIOD_DEFAULT) + ->label('sdk', new Method( + namespace: 'databases', + group: 'documents', + name: 'upsertDocument', + description: '/docs/references/databases/upsert-document.md', + auth: [AuthType::SESSION, AuthType::KEY, AuthType::JWT], + responses: [ + new SDKResponse( + code: Response::STATUS_CODE_OK, + model: Response::MODEL_DOCUMENT, + ) + ], + contentType: ContentType::JSON + )) + ->param('databaseId', '', new UID(), 'Database ID.') + ->param('collectionId', '', new UID(), 'Collection ID.') + ->param('documentId', '', new CustomId(), 'Document ID.') + ->param('data', [], new JSON(), 'Document data as JSON object. Include all required attributes of the document to be created or updated.') + ->param('permissions', null, new Permissions(APP_LIMIT_ARRAY_PARAMS_SIZE, [Database::PERMISSION_READ, Database::PERMISSION_UPDATE, Database::PERMISSION_DELETE, Database::PERMISSION_WRITE]), 'An array of permissions strings. By default, the current permissions are inherited. [Learn more about permissions](https://appwrite.io/docs/permissions).', true) + ->inject('requestTimestamp') + ->inject('response') + ->inject('dbForProject') + ->inject('queueForEvents') + ->inject('queueForStatsUsage') + ->action(function (string $databaseId, string $collectionId, string $documentId, string|array $data, ?array $permissions, ?\DateTime $requestTimestamp, Response $response, Database $dbForProject, Event $queueForEvents, StatsUsage $queueForStatsUsage) { + $data = (\is_string($data)) ? \json_decode($data, true) : $data; // Cast to JSON array + + if (empty($data) && \is_null($permissions)) { + throw new Exception(Exception::DOCUMENT_MISSING_PAYLOAD); + } + + $isAPIKey = Auth::isAppUser(Authorization::getRoles()); + $isPrivilegedUser = Auth::isPrivilegedUser(Authorization::getRoles()); + + $database = Authorization::skip(fn () => $dbForProject->getDocument('databases', $databaseId)); + if ($database->isEmpty() || (!$database->getAttribute('enabled', false) && !$isAPIKey && !$isPrivilegedUser)) { + throw new Exception(Exception::DATABASE_NOT_FOUND); + } + + $collection = Authorization::skip(fn () => $dbForProject->getDocument('database_' . $database->getInternalId(), $collectionId)); + if ($collection->isEmpty() || (!$collection->getAttribute('enabled', false) && !$isAPIKey && !$isPrivilegedUser)) { + throw new Exception(Exception::COLLECTION_NOT_FOUND); + } + + // Map aggregate permissions into the multiple permissions they represent. + $permissions = Permission::aggregate($permissions, [ + Database::PERMISSION_READ, + Database::PERMISSION_UPDATE, + Database::PERMISSION_DELETE, + ]); + + // Users can only manage their own roles, API keys and Admin users can manage any + $roles = Authorization::getRoles(); + if (!$isAPIKey && !$isPrivilegedUser && !\is_null($permissions)) { + foreach (Database::PERMISSIONS as $type) { + foreach ($permissions as $permission) { + $permission = Permission::parse($permission); + if ($permission->getPermission() != $type) { + continue; + } + $role = (new Role( + $permission->getRole(), + $permission->getIdentifier(), + $permission->getDimension() + ))->toString(); + if (!Authorization::isRole($role)) { + throw new Exception(Exception::USER_UNAUTHORIZED, 'Permissions must be one of: (' . \implode(', ', $roles) . ')'); + } + } + } + } + + $data['$id'] = $documentId; + $data['$permissions'] = $permissions; + $newDocument = new Document($data); + + $operations = 0; + + $setCollection = (function (Document $collection, Document $document) use (&$setCollection, $dbForProject, $database, &$operations) { + + $operations++; + + $relationships = \array_filter( + $collection->getAttribute('attributes', []), + fn ($attribute) => $attribute->getAttribute('type') === Database::VAR_RELATIONSHIP + ); + + foreach ($relationships as $relationship) { + $related = $document->getAttribute($relationship->getAttribute('key')); + + if (empty($related)) { + continue; + } + + $isList = \is_array($related) && \array_values($related) === $related; + + if ($isList) { + $relations = $related; + } else { + $relations = [$related]; + } + + $relatedCollectionId = $relationship->getAttribute('relatedCollection'); + $relatedCollection = Authorization::skip( + fn () => $dbForProject->getDocument('database_' . $database->getInternalId(), $relatedCollectionId) + ); + + foreach ($relations as &$relation) { + // If the relation is an array it can be either update or create a child document. + if ( + \is_array($relation) + && \array_values($relation) !== $relation + && !isset($relation['$id']) + ) { + $relation['$id'] = ID::unique(); + $relation = new Document($relation); + } + if ($relation instanceof Document) { + $oldDocument = Authorization::skip(fn () => $dbForProject->getDocument( + 'database_' . $database->getInternalId() . '_collection_' . $relatedCollection->getInternalId(), + $relation->getId() + )); + $relation->removeAttribute('$collectionId'); + $relation->removeAttribute('$databaseId'); + // Attribute $collection is required for Utopia. + $relation->setAttribute( + '$collection', + 'database_' . $database->getInternalId() . '_collection_' . $relatedCollection->getInternalId() + ); + + if ($oldDocument->isEmpty()) { + if (isset($relation['$id']) && $relation['$id'] === 'unique()') { + $relation['$id'] = ID::unique(); + } + } + $setCollection($relatedCollection, $relation); + } + } + + if ($isList) { + $document->setAttribute($relationship->getAttribute('key'), \array_values($relations)); + } else { + $document->setAttribute($relationship->getAttribute('key'), \reset($relations)); + } + } + }); + + $setCollection($collection, $newDocument); + + $queueForStatsUsage + ->addMetric(METRIC_DATABASES_OPERATIONS_WRITES, \max(1, $operations)) + ->addMetric(str_replace('{databaseInternalId}', $database->getInternalId(), METRIC_DATABASE_ID_OPERATIONS_WRITES), \max(1, $operations)); + + $upserted = []; + try { + $modified = $dbForProject->createOrUpdateDocuments( + 'database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId(), + [$newDocument], + onNext: function (Document $document) use (&$upserted) { + $upserted[] = $document; + }, + ); + } catch (ConflictException) { + throw new Exception(Exception::DOCUMENT_UPDATE_CONFLICT); + } catch (DuplicateException) { + throw new Exception(Exception::DOCUMENT_ALREADY_EXISTS); + } catch (RelationshipException $e) { + throw new Exception(Exception::RELATIONSHIP_VALUE_INVALID, $e->getMessage()); + } catch (StructureException $e) { + throw new Exception(Exception::DOCUMENT_INVALID_STRUCTURE, $e->getMessage()); + } + + $document = $upserted[0]; + // Add $collectionId and $databaseId for all documents + $processDocument = function (Document $collection, Document $document) use (&$processDocument, $dbForProject, $database) { + $document->setAttribute('$databaseId', $database->getId()); + $document->setAttribute('$collectionId', $collection->getId()); + + $relationships = \array_filter( + $collection->getAttribute('attributes', []), + fn ($attribute) => $attribute->getAttribute('type') === Database::VAR_RELATIONSHIP + ); + + foreach ($relationships as $relationship) { + $related = $document->getAttribute($relationship->getAttribute('key')); + + if (empty($related)) { + continue; + } + if (!\is_array($related)) { + $related = [$related]; + } + + $relatedCollectionId = $relationship->getAttribute('relatedCollection'); + $relatedCollection = Authorization::skip( + fn () => $dbForProject->getDocument('database_' . $database->getInternalId(), $relatedCollectionId) + ); + + foreach ($related as $relation) { + if ($relation instanceof Document) { + $processDocument($relatedCollection, $relation); + } + } + } + }; + + $processDocument($collection, $document); + + $relationships = \array_map( + fn ($document) => $document->getAttribute('key'), + \array_filter( + $collection->getAttribute('attributes', []), + fn ($attribute) => $attribute->getAttribute('type') === Database::VAR_RELATIONSHIP + ) + ); + + $queueForEvents + ->setParam('databaseId', $databaseId) + ->setParam('collectionId', $collection->getId()) + ->setParam('documentId', $document->getId()) + ->setContext('collection', $collection) + ->setContext('database', $database) + ->setPayload($response->getPayload(), sensitive: $relationships); + + $response->dynamic($document, Response::MODEL_DOCUMENT); + }); + App::patch('/v1/databases/:databaseId/collections/:collectionId/documents') ->desc('Update documents') ->groups(['api', 'database']) diff --git a/docs/references/databases/upsert-document.md b/docs/references/databases/upsert-document.md new file mode 100644 index 0000000000..a67694cfa6 --- /dev/null +++ b/docs/references/databases/upsert-document.md @@ -0,0 +1 @@ +Create or update a Document. Before using this route, you should create a new collection resource using either a [server integration](https://appwrite.io/docs/server/databases#databasesCreateCollection) API or directly from your database console. \ No newline at end of file diff --git a/tests/e2e/Services/Databases/DatabasesBase.php b/tests/e2e/Services/Databases/DatabasesBase.php index 2b60dee856..7c0060ecaa 100644 --- a/tests/e2e/Services/Databases/DatabasesBase.php +++ b/tests/e2e/Services/Databases/DatabasesBase.php @@ -12,7 +12,6 @@ use Utopia\Database\Helpers\Permission; use Utopia\Database\Helpers\Role; use Utopia\Database\Query; use Utopia\Database\Validator\Datetime as DatetimeValidator; -use Utopia\Validator\JSON; trait DatabasesBase { @@ -1683,6 +1682,308 @@ trait DatabasesBase return $data; } + /** + * @depends testCreateIndexes + */ + public function testUpsertDocument(array $data): void + { + $databaseId = $data['databaseId']; + $documentId = ID::unique(); + $document = $this->client->call(Client::METHOD_PUT, '/databases/' . $databaseId . '/collections/' . $data['moviesId'] . '/documents/' . $documentId, array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $this->getProject()['$id'], + ], $this->getHeaders()), [ + 'data' => [ + 'title' => 'Thor: Ragnarok', + 'releaseYear' => 2000 + ], + 'permissions' => [ + Permission::read(Role::users()), + Permission::update(Role::users()), + Permission::delete(Role::users()), + ], + ]); + + $this->assertEquals(200, $document['headers']['status-code']); + $this->assertCount(3, $document['body']['$permissions']); + $document = $this->client->call(Client::METHOD_GET, '/databases/' . $databaseId . '/collections/' . $data['moviesId'] . '/documents/' . $documentId, array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $this->getProject()['$id'], + ], $this->getHeaders())); + + $this->assertEquals('Thor: Ragnarok', $document['body']['title']); + + $document = $this->client->call(Client::METHOD_PUT, '/databases/' . $databaseId . '/collections/' . $data['moviesId'] . '/documents/' . $documentId, array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $this->getProject()['$id'], + ], $this->getHeaders()), [ + 'data' => [ + 'title' => 'Thor: Love and Thunder', + 'releaseYear' => 2000 + ], + 'permissions' => [ + Permission::read(Role::users()), + Permission::update(Role::users()), + Permission::delete(Role::users()), + ], + ]); + + $this->assertEquals(200, $document['headers']['status-code']); + $this->assertEquals('Thor: Love and Thunder', $document['body']['title']); + + $document = $this->client->call(Client::METHOD_GET, '/databases/' . $databaseId . '/collections/' . $data['moviesId'] . '/documents/' . $documentId, array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $this->getProject()['$id'], + ], $this->getHeaders())); + + $this->assertEquals('Thor: Love and Thunder', $document['body']['title']); + + // removing permission to read and delete + $document = $this->client->call(Client::METHOD_PUT, '/databases/' . $databaseId . '/collections/' . $data['moviesId'] . '/documents/' . $documentId, array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $this->getProject()['$id'], + ], $this->getHeaders()), [ + 'data' => [ + 'title' => 'Thor: Love and Thunder', + 'releaseYear' => 2000 + ], + 'permissions' => [ + Permission::update(Role::users()) + ], + ]); + // shouldn't be able to read as no read permission + $document = $this->client->call(Client::METHOD_GET, '/databases/' . $databaseId . '/collections/' . $data['moviesId'] . '/documents/' . $documentId, array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $this->getProject()['$id'], + ], $this->getHeaders())); + switch ($this->getSide()) { + case 'client': + $this->assertEquals(404, $document['headers']['status-code']); + break; + case 'server': + $this->assertEquals(200, $document['headers']['status-code']); + break; + } + // shouldn't be able to delete as no delete permission + $document = $this->client->call(Client::METHOD_DELETE, '/databases/' . $databaseId . '/collections/' . $data['moviesId'] . '/documents/' . $documentId, array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $this->getProject()['$id'], + ], $this->getHeaders())); + // simulating for the client + // the document should not be allowed to be deleted as needed downward + if ($this->getSide() === 'client') { + $this->assertEquals(401, $document['headers']['status-code']); + } + // giving the delete permission + $document = $this->client->call(Client::METHOD_PUT, '/databases/' . $databaseId . '/collections/' . $data['moviesId'] . '/documents/' . $documentId, array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $this->getProject()['$id'], + ], $this->getHeaders()), [ + 'data' => [ + 'title' => 'Thor: Love and Thunder', + 'releaseYear' => 2000 + ], + 'permissions' => [ + Permission::read(Role::users()), + Permission::update(Role::users()), + Permission::delete(Role::users()) + ], + ]); + $document = $this->client->call(Client::METHOD_DELETE, '/databases/' . $databaseId . '/collections/' . $data['moviesId'] . '/documents/' . $documentId, array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $this->getProject()['$id'], + ], $this->getHeaders())); + $this->assertEquals(204, $document['headers']['status-code']); + + // relationship behaviour + $person = $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $this->getProject()['$id'], + 'x-appwrite-key' => $this->getProject()['apiKey'] + ]), [ + 'collectionId' => 'person-upsert', + 'name' => 'person', + 'permissions' => [ + Permission::read(Role::users()), + Permission::update(Role::users()), + Permission::delete(Role::users()), + Permission::create(Role::users()), + ], + 'documentSecurity' => true, + ]); + + $this->assertEquals(201, $person['headers']['status-code']); + + $library = $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $this->getProject()['$id'], + 'x-appwrite-key' => $this->getProject()['apiKey'] + ]), [ + 'collectionId' => 'library-upsert', + 'name' => 'library', + 'permissions' => [ + Permission::read(Role::users()), + Permission::update(Role::users()), + Permission::create(Role::users()), + Permission::delete(Role::users()), + ], + 'documentSecurity' => true, + ]); + + $this->assertEquals(201, $library['headers']['status-code']); + + $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $person['body']['$id'] . '/attributes/string', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $this->getProject()['$id'], + 'x-appwrite-key' => $this->getProject()['apiKey'] + ]), [ + 'key' => 'fullName', + 'size' => 255, + 'required' => false, + ]); + + sleep(1); // Wait for worker + + $relation = $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $person['body']['$id'] . '/attributes/relationship', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $this->getProject()['$id'], + 'x-appwrite-key' => $this->getProject()['apiKey'] + ]), [ + 'relatedCollectionId' => 'library-upsert', + 'type' => Database::RELATION_ONE_TO_ONE, + 'key' => 'library', + 'twoWay' => true, + 'onDelete' => Database::RELATION_MUTATE_CASCADE, + ]); + + sleep(1); // Wait for worker + + $libraryName = $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $library['body']['$id'] . '/attributes/string', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $this->getProject()['$id'], + 'x-appwrite-key' => $this->getProject()['apiKey'] + ]), [ + 'key' => 'libraryName', + 'size' => 255, + 'required' => true, + ]); + + sleep(1); // Wait for worker + + $this->assertEquals(202, $libraryName['headers']['status-code']); + + // upserting values + $documentId = ID::unique(); + $person1 = $this->client->call(Client::METHOD_PUT, '/databases/' . $databaseId . '/collections/' . $person['body']['$id'] . '/documents/'.$documentId, array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $this->getProject()['$id'], + ], $this->getHeaders()), [ + 'data' => [ + 'library' => [ + '$id' => 'library1', + '$permissions' => [ + Permission::read(Role::users()), + Permission::update(Role::users()), + Permission::delete(Role::users()), + ], + 'libraryName' => 'Library 1', + ], + ], + 'permissions' => [ + Permission::read(Role::users()), + Permission::update(Role::users()), + Permission::delete(Role::users()), + ] + ]); + + $this->assertEquals('Library 1', $person1['body']['library']['libraryName']); + $documents = $this->client->call(Client::METHOD_GET, '/databases/' . $databaseId . '/collections/' . $person['body']['$id'] . '/documents', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $this->getProject()['$id'], + ], $this->getHeaders()), [ + 'queries' => [ + Query::select(['fullName', 'library.*'])->toString(), + Query::equal('library', ['library1'])->toString(), + ], + ]); + + $this->assertEquals(1, $documents['body']['total']); + $this->assertEquals('Library 1', $documents['body']['documents'][0]['library']['libraryName']); + + + $person1 = $this->client->call(Client::METHOD_PUT, '/databases/' . $databaseId . '/collections/' . $person['body']['$id'] . '/documents/'.$documentId, array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $this->getProject()['$id'], + ], $this->getHeaders()), [ + 'data' => [ + 'library' => [ + '$id' => 'library1', + '$permissions' => [ + Permission::read(Role::users()), + Permission::update(Role::users()), + Permission::delete(Role::users()), + ], + 'libraryName' => 'Library 2', + ], + ], + 'permissions' => [ + Permission::read(Role::users()), + Permission::update(Role::users()), + Permission::delete(Role::users()), + ] + ]); + + // data should get updated + $this->assertEquals('Library 2', $person1['body']['library']['libraryName']); + $documents = $this->client->call(Client::METHOD_GET, '/databases/' . $databaseId . '/collections/' . $person['body']['$id'] . '/documents', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $this->getProject()['$id'], + ], $this->getHeaders()), [ + 'queries' => [ + Query::select(['fullName', 'library.*'])->toString(), + Query::equal('library', ['library1'])->toString(), + ], + ]); + + $this->assertEquals(1, $documents['body']['total']); + $this->assertEquals('Library 2', $documents['body']['documents'][0]['library']['libraryName']); + + + // data should get added + $person1 = $this->client->call(Client::METHOD_PUT, '/databases/' . $databaseId . '/collections/' . $person['body']['$id'] . '/documents/'.ID::unique(), array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $this->getProject()['$id'], + ], $this->getHeaders()), [ + 'data' => [ + 'library' => [ + '$id' => 'library2', + '$permissions' => [ + Permission::read(Role::users()), + Permission::update(Role::users()), + Permission::delete(Role::users()), + ], + 'libraryName' => 'Library 2', + ], + ], + 'permissions' => [ + Permission::read(Role::users()), + Permission::update(Role::users()), + Permission::delete(Role::users()), + ] + ]); + + $this->assertEquals('Library 2', $person1['body']['library']['libraryName']); + $documents = $this->client->call(Client::METHOD_GET, '/databases/' . $databaseId . '/collections/' . $person['body']['$id'] . '/documents', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $this->getProject()['$id'], + ], $this->getHeaders()), [ + 'queries' => [ + Query::select(['fullName', 'library.*'])->toString() + ], + ]); + $this->assertEquals(2, $documents['body']['total']); + } + /** * @depends testCreateDocument */