diff --git a/app/cli.php b/app/cli.php index a1e41289b2..9517347420 100644 --- a/app/cli.php +++ b/app/cli.php @@ -12,11 +12,13 @@ use Appwrite\Runtimes\Runtimes; use Executor\Executor; use Swoole\Runtime; use Swoole\Timer; +use Utopia\Cache\Adapter\Pool as CachePool; use Utopia\Cache\Adapter\Sharding; use Utopia\Cache\Cache; use Utopia\CLI\CLI; use Utopia\CLI\Console; use Utopia\Config\Config; +use Utopia\Database\Adapter\Pool as DatabasePool; use Utopia\Database\Database; use Utopia\Database\Document; use Utopia\Database\Validator\Authorization; @@ -24,6 +26,7 @@ use Utopia\DSN\DSN; use Utopia\Logger\Log; use Utopia\Platform\Service; use Utopia\Pools\Group; +use Utopia\Queue\Broker\Pool as BrokerPool; use Utopia\Queue\Publisher; use Utopia\Registry\Registry; use Utopia\System\System; @@ -46,10 +49,7 @@ CLI::setResource('cache', function ($pools) { $adapters = []; foreach ($list as $value) { - $adapters[] = $pools - ->get($value) - ->pop() - ->getResource(); + $adapters[] = new CachePool($pools->get($value)); } return new Cache(new Sharding($adapters)); @@ -69,12 +69,8 @@ CLI::setResource('dbForPlatform', function ($pools, $cache) { $attempts++; try { // Prepare database connection - $dbAdapter = $pools - ->get('console') - ->pop() - ->getResource(); - - $dbForPlatform = new Database($dbAdapter, $cache); + $adapter = new DatabasePool($pools->get('console')); + $dbForPlatform = new Database($adapter, $cache); $dbForPlatform ->setNamespace('_console') @@ -92,7 +88,6 @@ CLI::setResource('dbForPlatform', function ($pools, $cache) { $ready = true; } catch (\Throwable $err) { Console::warning($err->getMessage()); - $pools->get('console')->reclaim(); sleep($sleep); } } while ($attempts < $maxAttempts && !$ready); @@ -142,12 +137,8 @@ CLI::setResource('getProjectDB', function (Group $pools, Database $dbForPlatform return $database; } - $dbAdapter = $pools - ->get($dsn->getHost()) - ->pop() - ->getResource(); - - $database = new Database($dbAdapter, $cache); + $adapter = new DatabasePool($pools->get($dsn->getHost())); + $database = new Database($adapter, $cache); $databases[$dsn->getHost()] = $database; $sharedTables = \explode(',', System::getEnv('_APP_DATABASE_SHARED_TABLES', '')); @@ -173,21 +164,15 @@ CLI::setResource('getProjectDB', function (Group $pools, Database $dbForPlatform CLI::setResource('getLogsDB', function (Group $pools, Cache $cache) { $database = null; + return function (?Document $project = null) use ($pools, $cache, $database) { if ($database !== null && $project !== null && !$project->isEmpty() && $project->getId() !== 'console') { $database->setTenant($project->getInternalId()); return $database; } - $dbAdapter = $pools - ->get('logs') - ->pop() - ->getResource(); - - $database = new Database( - $dbAdapter, - $cache - ); + $adapter = new DatabasePool($pools->get('logs')); + $database = new Database($adapter, $cache); $database ->setSharedTables(true) @@ -211,7 +196,7 @@ CLI::setResource('queueForStatsResources', function (Publisher $publisher) { return new StatsResources($publisher); }, ['publisher']); CLI::setResource('publisher', function (Group $pools) { - return $pools->get('publisher')->pop()->getResource(); + return new BrokerPool(publisher: $pools->get('publisher')); }, ['pools']); CLI::setResource('queueForFunctions', function (Publisher $publisher) { return new Func($publisher); diff --git a/app/controllers/api/databases.php b/app/controllers/api/databases.php index 8883e245b6..277bfae16a 100644 --- a/app/controllers/api/databases.php +++ b/app/controllers/api/databases.php @@ -3785,7 +3785,12 @@ App::get('/v1/databases/:databaseId/collections/:collectionId/documents/:documen throw new Exception(Exception::GENERAL_QUERY_INVALID, $e->getMessage()); } - $document = $dbForProject->getDocument('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId(), $documentId, $queries); + try { + $document = $dbForProject->getDocument('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId(), $documentId, $queries); + } catch (QueryException $e) { + throw new Exception(Exception::GENERAL_QUERY_INVALID, $e->getMessage()); + } + if ($document->isEmpty()) { throw new Exception(Exception::DOCUMENT_NOT_FOUND); } diff --git a/app/controllers/api/health.php b/app/controllers/api/health.php index 11fc4cc244..b95eb432a1 100644 --- a/app/controllers/api/health.php +++ b/app/controllers/api/health.php @@ -3,13 +3,16 @@ use Appwrite\ClamAV\Network; use Appwrite\Event\Event; use Appwrite\Extend\Exception; +use Appwrite\PubSub\Adapter\Pool as PubSubPool; use Appwrite\SDK\AuthType; use Appwrite\SDK\ContentType; use Appwrite\SDK\Method; use Appwrite\SDK\Response as SDKResponse; use Appwrite\Utopia\Response; use Utopia\App; +use Utopia\Cache\Adapter\Pool as CachePool; use Utopia\Config\Config; +use Utopia\Database\Adapter\Pool as DatabasePool; use Utopia\Database\Document; use Utopia\Domains\Validator\PublicDomain; use Utopia\Pools\Group; @@ -34,8 +37,8 @@ App::get('/v1/health') namespace: 'health', group: 'health', name: 'get', - auth: [AuthType::KEY], description: '/docs/references/health/get.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -70,11 +73,11 @@ App::get('/v1/health/db') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'health', name: 'getDB', description: '/docs/references/health/get-db.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -86,8 +89,8 @@ App::get('/v1/health/db') ->inject('response') ->inject('pools') ->action(function (Response $response, Group $pools) { - $output = []; + $failures = []; $configs = [ 'Console.DB' => Config::getParam('pools-console'), @@ -97,7 +100,7 @@ App::get('/v1/health/db') foreach ($configs as $key => $config) { foreach ($config as $database) { try { - $adapter = $pools->get($database)->pop()->getResource(); + $adapter = new DatabasePool($pools->get($database)); $checkStart = \microtime(true); @@ -108,16 +111,16 @@ App::get('/v1/health/db') 'ping' => \round((\microtime(true) - $checkStart) / 1000) ]); } else { - $failure[] = $database; + $failures[] = $database; } - } catch (\Throwable $th) { - $failure[] = $database; + } catch (\Throwable) { + $failures[] = $database; } } } - if (!empty($failure)) { - throw new Exception(Exception::GENERAL_SERVER_ERROR, 'DB failure on: ' . implode(", ", $failure)); + if (!empty($failures)) { + throw new Exception(Exception::GENERAL_SERVER_ERROR, 'DB failure on: ' . implode(", ", $failures)); } $response->dynamic(new Document([ @@ -131,11 +134,11 @@ App::get('/v1/health/cache') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'health', name: 'getCache', description: '/docs/references/health/get-cache.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -147,44 +150,39 @@ App::get('/v1/health/cache') ->inject('response') ->inject('pools') ->action(function (Response $response, Group $pools) { - $output = []; + $failures = []; $configs = [ 'Cache' => Config::getParam('pools-cache'), ]; foreach ($configs as $key => $config) { - foreach ($config as $database) { + foreach ($config as $cache) { try { - /** @var \Utopia\Cache\Adapter $adapter */ - $adapter = $pools->get($database)->pop()->getResource(); + $adapter = new CachePool($pools->get($cache)); $checkStart = \microtime(true); if ($adapter->ping()) { $output[] = new Document([ - 'name' => $key . " ($database)", + 'name' => $key . " ($cache)", 'status' => 'pass', 'ping' => \round((\microtime(true) - $checkStart) / 1000) ]); } else { - $output[] = new Document([ - 'name' => $key . " ($database)", - 'status' => 'fail', - 'ping' => \round((\microtime(true) - $checkStart) / 1000) - ]); + $failures[] = $cache; } - } catch (\Throwable $th) { - $output[] = new Document([ - 'name' => $key . " ($database)", - 'status' => 'fail', - 'ping' => \round((\microtime(true) - $checkStart) / 1000) - ]); + } catch (\Throwable) { + $failures[] = $cache; } } } + if (!empty($failures)) { + throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Cache failure on: ' . implode(", ", $failures)); + } + $response->dynamic(new Document([ 'statuses' => $output, 'total' => count($output), @@ -196,11 +194,11 @@ App::get('/v1/health/pubsub') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'health', name: 'getPubSub', description: '/docs/references/health/get-pubsub.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -212,44 +210,39 @@ App::get('/v1/health/pubsub') ->inject('response') ->inject('pools') ->action(function (Response $response, Group $pools) { - $output = []; + $failures = []; $configs = [ 'PubSub' => Config::getParam('pools-pubsub'), ]; foreach ($configs as $key => $config) { - foreach ($config as $database) { + foreach ($config as $pubsub) { try { - /** @var \Appwrite\PubSub\Adapter $adapter */ - $adapter = $pools->get($database)->pop()->getResource(); + $adapter = new PubSubPool($pools->get($pubsub)); $checkStart = \microtime(true); if ($adapter->ping()) { $output[] = new Document([ - 'name' => $key . " ($database)", + 'name' => $key . " ($pubsub)", 'status' => 'pass', 'ping' => \round((\microtime(true) - $checkStart) / 1000) ]); } else { - $output[] = new Document([ - 'name' => $key . " ($database)", - 'status' => 'fail', - 'ping' => \round((\microtime(true) - $checkStart) / 1000) - ]); + $failures[] = $pubsub; } - } catch (\Throwable $th) { - $output[] = new Document([ - 'name' => $key . " ($database)", - 'status' => 'fail', - 'ping' => \round((\microtime(true) - $checkStart) / 1000) - ]); + } catch (\Throwable) { + $failures[] = $pubsub; } } } + if (!empty($failures)) { + throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Pubsub failure on: ' . implode(", ", $failures)); + } + $response->dynamic(new Document([ 'statuses' => $output, 'total' => count($output), @@ -261,11 +254,11 @@ App::get('/v1/health/time') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'health', name: 'getTime', description: '/docs/references/health/get-time.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -325,11 +318,11 @@ App::get('/v1/health/queue/webhooks') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'queue', name: 'getQueueWebhooks', description: '/docs/references/health/get-queue-webhooks.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -351,18 +344,18 @@ App::get('/v1/health/queue/webhooks') } $response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE); - }, ['response']); + }); App::get('/v1/health/queue/logs') ->desc('Get logs queue') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'queue', name: 'getQueueLogs', description: '/docs/references/health/get-queue-logs.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -384,18 +377,18 @@ App::get('/v1/health/queue/logs') } $response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE); - }, ['response']); + }); App::get('/v1/health/certificate') ->desc('Get the SSL certificate for a domain') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'health', name: 'getCertificate', description: '/docs/references/health/get-certificate.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -441,18 +434,18 @@ App::get('/v1/health/certificate') 'validTo' => $certificatePayload['validTo_time_t'], 'signatureTypeSN' => $certificatePayload['signatureTypeSN'], ]), Response::MODEL_HEALTH_CERTIFICATE); - }, ['response']); + }); App::get('/v1/health/queue/certificates') ->desc('Get certificates queue') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'queue', name: 'getQueueCertificates', description: '/docs/references/health/get-queue-certificates.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -474,18 +467,18 @@ App::get('/v1/health/queue/certificates') } $response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE); - }, ['response']); + }); App::get('/v1/health/queue/builds') ->desc('Get builds queue') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'queue', name: 'getQueueBuilds', description: '/docs/references/health/get-queue-builds.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -507,18 +500,18 @@ App::get('/v1/health/queue/builds') } $response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE); - }, ['response']); + }); App::get('/v1/health/queue/databases') ->desc('Get databases queue') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'queue', name: 'getQueueDatabases', description: '/docs/references/health/get-queue-databases.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -541,18 +534,18 @@ App::get('/v1/health/queue/databases') } $response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE); - }, ['response']); + }); App::get('/v1/health/queue/deletes') ->desc('Get deletes queue') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'queue', name: 'getQueueDeletes', description: '/docs/references/health/get-queue-deletes.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -574,18 +567,18 @@ App::get('/v1/health/queue/deletes') } $response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE); - }, ['response']); + }); App::get('/v1/health/queue/mails') ->desc('Get mails queue') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'queue', name: 'getQueueMails', description: '/docs/references/health/get-queue-mails.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -607,18 +600,18 @@ App::get('/v1/health/queue/mails') } $response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE); - }, ['response']); + }); App::get('/v1/health/queue/messaging') ->desc('Get messaging queue') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'queue', name: 'getQueueMessaging', description: '/docs/references/health/get-queue-messaging.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -640,18 +633,18 @@ App::get('/v1/health/queue/messaging') } $response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE); - }, ['response']); + }); App::get('/v1/health/queue/migrations') ->desc('Get migrations queue') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'queue', name: 'getQueueMigrations', description: '/docs/references/health/get-queue-migrations.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -673,18 +666,18 @@ App::get('/v1/health/queue/migrations') } $response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE); - }, ['response']); + }); App::get('/v1/health/queue/functions') ->desc('Get functions queue') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'queue', name: 'getQueueFunctions', description: '/docs/references/health/get-queue-functions.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -706,18 +699,18 @@ App::get('/v1/health/queue/functions') } $response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE); - }, ['response']); + }); App::get('/v1/health/queue/stats-resources') ->desc('Get stats resources queue') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'queue', name: 'getQueueStatsResources', description: '/docs/references/health/get-queue-stats-resources.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -746,11 +739,11 @@ App::get('/v1/health/queue/stats-usage') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'queue', name: 'getQueueUsage', description: '/docs/references/health/get-queue-stats-usage.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -779,11 +772,11 @@ App::get('/v1/health/storage/local') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'storage', name: 'getStorageLocal', description: '/docs/references/health/get-storage-local.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -829,11 +822,11 @@ App::get('/v1/health/storage') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'storage', name: 'getStorage', description: '/docs/references/health/get-storage.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -878,11 +871,11 @@ App::get('/v1/health/anti-virus') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'health', name: 'getAntivirus', description: '/docs/references/health/get-storage-anti-virus.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -924,11 +917,11 @@ App::get('/v1/health/queue/failed/:name') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'queue', name: 'getFailedJobs', description: '/docs/references/health/get-failed-queue-jobs.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, diff --git a/app/controllers/api/projects.php b/app/controllers/api/projects.php index e158cf52fb..5eda8e9a0e 100644 --- a/app/controllers/api/projects.php +++ b/app/controllers/api/projects.php @@ -24,6 +24,7 @@ use Utopia\App; use Utopia\Audit\Audit; use Utopia\Cache\Cache; use Utopia\Config\Config; +use Utopia\Database\Adapter\Pool as DatabasePool; use Utopia\Database\Database; use Utopia\Database\DateTime; use Utopia\Database\Document; @@ -223,7 +224,7 @@ App::post('/v1/projects') $sharedTables = $sharedTablesV1 || $sharedTablesV2; if (!$sharedTablesV2) { - $adapter = $pools->get($dsn->getHost())->pop()->getResource(); + $adapter = new DatabasePool($pools->get($dsn->getHost())); $dbForProject = new Database($adapter, $cache); if ($sharedTables) { diff --git a/app/controllers/api/storage.php b/app/controllers/api/storage.php index ee6279c3bc..b3b8fb906a 100644 --- a/app/controllers/api/storage.php +++ b/app/controllers/api/storage.php @@ -1241,12 +1241,15 @@ App::get('/v1/storage/buckets/:bucketId/files/:fileId/download') if (!empty($source)) { if (!empty($rangeHeader)) { $response->send(substr($source, $start, ($end - $start + 1))); + return; } $response->send($source); + return; } if (!empty($rangeHeader)) { $response->send($deviceForFiles->read($path, $start, ($end - $start + 1))); + return; } if ($size > APP_STORAGE_READ_BUFFER) { @@ -1407,6 +1410,7 @@ App::get('/v1/storage/buckets/:bucketId/files/:fileId/view') if (!empty($source)) { if (!empty($rangeHeader)) { $response->send(substr($source, $start, ($end - $start + 1))); + return; } $response->send($source); return; @@ -1558,6 +1562,7 @@ App::get('/v1/storage/buckets/:bucketId/files/:fileId/push') if (!empty($source)) { if (!empty($rangeHeader)) { $response->send(substr($source, $start, ($end - $start + 1))); + return; } $response->send($source); return; diff --git a/app/http.php b/app/http.php index e413c5d248..6064dfdd4c 100644 --- a/app/http.php +++ b/app/http.php @@ -15,9 +15,11 @@ use Utopia\Audit\Audit; use Utopia\CLI\Console; use Utopia\Compression\Compression; use Utopia\Config\Config; +use Utopia\Database\Adapter\Pool as DatabasePool; use Utopia\Database\Database; use Utopia\Database\DateTime; use Utopia\Database\Document; +use Utopia\Database\Exception\Duplicate as DuplicateException; use Utopia\Database\Helpers\ID; use Utopia\Database\Helpers\Permission; use Utopia\Database\Helpers\Role; @@ -170,7 +172,7 @@ function createDatabase(App $app, string $resourceKey, string $dbName, array $co $sleep = 1; $attempts = 0; - do { + while (true) { try { $attempts++; $resource = $app->getResource($resourceKey); @@ -179,13 +181,12 @@ function createDatabase(App $app, string $resourceKey, string $dbName, array $co break; // exit loop on success } catch (\Exception $e) { Console::warning(" └── Database not ready. Retrying connection ({$attempts})..."); - $pools->reclaim(); if ($attempts >= $max) { throw new \Exception(' └── Failed to connect to database: ' . $e->getMessage()); } sleep($sleep); } - } while ($attempts < $max); + } Console::success("[Setup] - $dbName database init started..."); @@ -368,11 +369,7 @@ $http->on(Constant::EVENT_START, function (Server $http) use ($payloadSize, $reg $cache = $app->getResource('cache'); foreach ($sharedTablesV2 as $hostname) { - $adapter = $pools - ->get($hostname) - ->pop() - ->getResource(); - + $adapter = new DatabasePool($pools->get($hostname)); $dbForProject = (new Database($adapter, $cache)) ->setDatabase('appwrite') ->setSharedTables(true) @@ -382,7 +379,7 @@ $http->on(Constant::EVENT_START, function (Server $http) use ($payloadSize, $reg try { Console::success('[Setup] - Creating project database: ' . $hostname . '...'); $dbForProject->create(); - } catch (Duplicate) { + } catch (DuplicateException) { Console::success('[Setup] - Skip: metadata table already exists'); } @@ -408,7 +405,6 @@ $http->on(Constant::EVENT_START, function (Server $http) use ($payloadSize, $reg } } - $pools->reclaim(); Console::success('[Setup] - Server database init completed...'); }); @@ -523,6 +519,7 @@ $http->on(Constant::EVENT_REQUEST, function (SwooleRequest $swooleRequest, Swool Console::error('[Error] Message: ' . $th->getMessage()); Console::error('[Error] File: ' . $th->getFile()); Console::error('[Error] Line: ' . $th->getLine()); + Console::error('[Error] Trace: ' . $th->getTraceAsString()); $swooleResponse->setStatusCode(500); @@ -540,8 +537,6 @@ $http->on(Constant::EVENT_REQUEST, function (SwooleRequest $swooleRequest, Swool ]; $swooleResponse->end(\json_encode($output)); - } finally { - $pools->reclaim(); } }); diff --git a/app/init/registers.php b/app/init/registers.php index 1adaaf35ce..415730f936 100644 --- a/app/init/registers.php +++ b/app/init/registers.php @@ -216,13 +216,13 @@ $register->set('pools', function () { 'mysql', 'mariadb' => function () use ($dsnHost, $dsnPort, $dsnUser, $dsnPass, $dsnDatabase) { return new PDOProxy(function () use ($dsnHost, $dsnPort, $dsnUser, $dsnPass, $dsnDatabase) { - return new PDO("mysql:host={$dsnHost};port={$dsnPort};dbname={$dsnDatabase};charset=utf8mb4", $dsnUser, $dsnPass, array( + return new PDO("mysql:host={$dsnHost};port={$dsnPort};dbname={$dsnDatabase};charset=utf8mb4", $dsnUser, $dsnPass, [ \PDO::ATTR_TIMEOUT => 3, // Seconds \PDO::ATTR_PERSISTENT => false, \PDO::ATTR_DEFAULT_FETCH_MODE => \PDO::FETCH_ASSOC, \PDO::ATTR_EMULATE_PREPARES => true, \PDO::ATTR_STRINGIFY_FETCHES => true - )); + ]); }); }, 'redis' => function () use ($dsnHost, $dsnPort, $dsnPass) { diff --git a/app/init/resources.php b/app/init/resources.php index e96432ad70..910e8369db 100644 --- a/app/init/resources.php +++ b/app/init/resources.php @@ -24,10 +24,12 @@ use Appwrite\Utopia\Request; use Executor\Executor; use Utopia\Abuse\Adapters\TimeLimit\Redis as TimeLimitRedis; use Utopia\App; +use Utopia\Cache\Adapter\Pool as CachePool; use Utopia\Cache\Adapter\Sharding; use Utopia\Cache\Cache; use Utopia\CLI\Console; use Utopia\Config\Config; +use Utopia\Database\Adapter\Pool as DatabasePool; use Utopia\Database\Database; use Utopia\Database\DateTime as DatabaseDateTime; use Utopia\Database\Document; @@ -38,6 +40,7 @@ use Utopia\DSN\DSN; use Utopia\Locale\Locale; use Utopia\Logger\Log; use Utopia\Pools\Group; +use Utopia\Queue\Broker\Pool as BrokerPool; use Utopia\Queue\Publisher; use Utopia\Storage\Device; use Utopia\Storage\Device\AWS; @@ -74,10 +77,10 @@ App::setResource('localeCodes', function () { // Queues App::setResource('publisher', function (Group $pools) { - return $pools->get('publisher')->pop()->getResource(); + return new BrokerPool(publisher: $pools->get('publisher')); }, ['pools']); App::setResource('consumer', function (Group $pools) { - return $pools->get('consumer')->pop()->getResource(); + return new BrokerPool(consumer: $pools->get('consumer')); }, ['pools']); App::setResource('queueForMessaging', function (Publisher $publisher) { return new Messaging($publisher); @@ -331,12 +334,8 @@ App::setResource('dbForProject', function (Group $pools, Database $dbForPlatform $dsn = new DSN('mysql://' . $project->getAttribute('database')); } - $dbAdapter = $pools - ->get($dsn->getHost()) - ->pop() - ->getResource(); - - $database = new Database($dbAdapter, $cache); + $adapter = new DatabasePool($pools->get($dsn->getHost())); + $database = new Database($adapter, $cache); $database ->setMetadata('host', \gethostname()) @@ -362,12 +361,8 @@ App::setResource('dbForProject', function (Group $pools, Database $dbForPlatform }, ['pools', 'dbForPlatform', 'cache', 'project']); App::setResource('dbForPlatform', function (Group $pools, Cache $cache) { - $dbAdapter = $pools - ->get('console') - ->pop() - ->getResource(); - - $database = new Database($dbAdapter, $cache); + $adapter = new DatabasePool($pools->get('console')); + $database = new Database($adapter, $cache); $database ->setNamespace('_console') @@ -380,7 +375,7 @@ App::setResource('dbForPlatform', function (Group $pools, Cache $cache) { }, ['pools', 'cache']); App::setResource('getProjectDB', function (Group $pools, Database $dbForPlatform, $cache) { - $databases = []; // TODO: @Meldiron This should probably be responsibility of utopia-php/pools + $databases = []; return function (Document $project) use ($pools, $dbForPlatform, $cache, &$databases) { if ($project->isEmpty() || $project->getId() === 'console') { @@ -422,12 +417,8 @@ App::setResource('getProjectDB', function (Group $pools, Database $dbForPlatform return $database; } - $dbAdapter = $pools - ->get($dsn->getHost()) - ->pop() - ->getResource(); - - $database = new Database($dbAdapter, $cache); + $adapter = new DatabasePool($pools->get($dsn->getHost())); + $database = new Database($adapter, $cache); $databases[$dsn->getHost()] = $database; $configure($database); @@ -437,21 +428,15 @@ App::setResource('getProjectDB', function (Group $pools, Database $dbForPlatform App::setResource('getLogsDB', function (Group $pools, Cache $cache) { $database = null; - return function (?Document $project = null) use ($pools, $cache, $database) { + + return function (?Document $project = null) use ($pools, $cache, &$database) { if ($database !== null && $project !== null && !$project->isEmpty() && $project->getId() !== 'console') { $database->setTenant($project->getInternalId()); return $database; } - $dbAdapter = $pools - ->get('logs') - ->pop() - ->getResource(); - - $database = new Database( - $dbAdapter, - $cache - ); + $adapter = new DatabasePool($pools->get('logs')); + $database = new Database($adapter, $cache); $database ->setSharedTables(true) @@ -475,10 +460,7 @@ App::setResource('cache', function (Group $pools) { $adapters = []; foreach ($list as $value) { - $adapters[] = $pools - ->get($value) - ->pop() - ->getResource(); + $adapters[] = new CachePool($pools->get($value)); } return new Cache(new Sharding($adapters)); diff --git a/app/realtime.php b/app/realtime.php index 86f9c85fdd..7e6fc0e311 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -5,6 +5,7 @@ use Appwrite\Extend\Exception; use Appwrite\Extend\Exception as AppwriteException; use Appwrite\Messaging\Adapter\Realtime; use Appwrite\Network\Validator\Origin; +use Appwrite\PubSub\Adapter\Pool as PubSubPool; use Appwrite\Utopia\Request; use Appwrite\Utopia\Response; use Swoole\Http\Request as SwooleRequest; @@ -15,10 +16,12 @@ use Swoole\Timer; use Utopia\Abuse\Abuse; use Utopia\Abuse\Adapters\TimeLimit\Redis as TimeLimitRedis; use Utopia\App; +use Utopia\Cache\Adapter\Pool as CachePool; use Utopia\Cache\Adapter\Sharding; use Utopia\Cache\Cache; use Utopia\CLI\Console; use Utopia\Config\Config; +use Utopia\Database\Adapter\Pool as DatabasePool; use Utopia\Database\Database; use Utopia\Database\DateTime; use Utopia\Database\Document; @@ -28,13 +31,15 @@ use Utopia\Database\Query; use Utopia\Database\Validator\Authorization; use Utopia\DSN\DSN; use Utopia\Logger\Log; +use Utopia\Pools\Group; +use Utopia\Registry\Registry; use Utopia\System\System; use Utopia\Telemetry\Adapter\None as NoTelemetry; use Utopia\WebSocket\Adapter; use Utopia\WebSocket\Server; /** - * @var \Utopia\Registry\Registry $register + * @var Registry $register */ require_once __DIR__ . '/init.php'; @@ -46,17 +51,17 @@ if (!function_exists('getConsoleDB')) { { global $register; - /** @var \Utopia\Pools\Group $pools */ + static $database = null; + + if ($database !== null) { + return $database; + } + + /** @var Group $pools */ $pools = $register->get('pools'); - $dbAdapter = $pools - ->get('console') - ->pop() - ->getResource() - ; - - $database = new Database($dbAdapter, getCache()); - + $adapter = new DatabasePool($pools->get('console')); + $database = new Database($adapter, getCache()); $database ->setNamespace('_console') ->setMetadata('host', \gethostname()) @@ -72,7 +77,13 @@ if (!function_exists('getProjectDB')) { { global $register; - /** @var \Utopia\Pools\Group $pools */ + static $databases = []; + + if (isset($databases[$project->getInternalId()])) { + return $databases[$project->getInternalId()]; + } + + /** @var Group $pools */ $pools = $register->get('pools'); if ($project->isEmpty() || $project->getId() === 'console') { @@ -86,11 +97,7 @@ if (!function_exists('getProjectDB')) { $dsn = new DSN('mysql://' . $project->getAttribute('database')); } - $adapter = $pools - ->get($dsn->getHost()) - ->pop() - ->getResource(); - + $adapter = new DatabasePool($pools->get($dsn->getHost())); $database = new Database($adapter, getCache()); $sharedTables = \explode(',', System::getEnv('_APP_DATABASE_SHARED_TABLES', '')); @@ -111,7 +118,7 @@ if (!function_exists('getProjectDB')) { ->setMetadata('host', \gethostname()) ->setMetadata('project', $project->getId()); - return $database; + return $databases[$project->getInternalId()] = $database; } } @@ -121,20 +128,22 @@ if (!function_exists('getCache')) { { global $register; - $pools = $register->get('pools'); /** @var \Utopia\Pools\Group $pools */ + static $cache = null; + + if ($cache !== null) { + return $cache; + } + + $pools = $register->get('pools'); /** @var Group $pools */ $list = Config::getParam('pools-cache', []); $adapters = []; foreach ($list as $value) { - $adapters[] = $pools - ->get($value) - ->pop() - ->getResource() - ; + $adapters[] = new CachePool($pools->get($value)); } - return new Cache(new Sharding($adapters)); + return $cache = new Cache(new Sharding($adapters)); } } @@ -142,6 +151,12 @@ if (!function_exists('getCache')) { if (!function_exists('getRedis')) { function getRedis(): \Redis { + static $redis = null; + + if ($redis !== null) { + return $redis; + } + $host = System::getEnv('_APP_REDIS_HOST', 'localhost'); $port = System::getEnv('_APP_REDIS_PORT', 6379); $pass = System::getEnv('_APP_REDIS_PASS', ''); @@ -160,21 +175,39 @@ if (!function_exists('getRedis')) { if (!function_exists('getTimelimit')) { function getTimelimit(): TimeLimitRedis { - return new TimeLimitRedis("", 0, 1, getRedis()); + static $timelimit = null; + + if ($timelimit !== null) { + return $timelimit; + } + + return $timelimit = new TimeLimitRedis("", 0, 1, getRedis()); } } if (!function_exists('getRealtime')) { function getRealtime(): Realtime { - return new Realtime(); + static $realtime = null; + + if ($realtime !== null) { + return $realtime; + } + + return $realtime = new Realtime(); } } if (!function_exists('getTelemetry')) { function getTelemetry(int $workerId): Utopia\Telemetry\Adapter { - return new NoTelemetry(); + static $telemetry = null; + + if ($telemetry !== null) { + return $telemetry; + } + + return $telemetry = new NoTelemetry(); } } @@ -273,7 +306,6 @@ $server->onStart(function () use ($stats, $register, $containerId, &$statsDocume sleep(DATABASE_RECONNECT_SLEEP); } } while (true); - $register->get('pools')->reclaim(); }); /** @@ -299,9 +331,7 @@ $server->onStart(function () use ($stats, $register, $containerId, &$statsDocume Authorization::skip(fn () => $database->updateDocument('realtime', $statsDocument->getId(), $statsDocument)); } catch (Throwable $th) { - call_user_func($logError, $th, "updateWorkerDocument"); - } finally { - $register->get('pools')->reclaim(); + $logError($th, "updateWorkerDocument"); } }); } @@ -370,8 +400,6 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, 'data' => $event['data'] ])); } - - $register->get('pools')->reclaim(); } } /** @@ -407,8 +435,8 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, } $start = time(); - /** @var \Appwrite\PubSub\Adapter $pubsub */ - $pubsub = $register->get('pools')->get('pubsub')->pop()->getResource(); + $pubsub = new PubSubPool($register->get('pools')->get('pubsub')); + if ($pubsub->ping(true)) { $attempts = 0; Console::success('Pub/sub connection established (worker: ' . $workerId . ')'); @@ -436,8 +464,6 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $realtime->unsubscribe($connection); $realtime->subscribe($projectId, $connection, $roles, $channels); - - $register->get('pools')->reclaim(); } } @@ -463,14 +489,12 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, } }); } catch (Throwable $th) { - call_user_func($logError, $th, "pubSubConnection"); + $logError($th, "pubSubConnection"); Console::error('Pub/sub error: ' . $th->getMessage()); $attempts++; sleep(DATABASE_RECONNECT_SLEEP); continue; - } finally { - $register->get('pools')->reclaim(); } } @@ -572,7 +596,7 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server, $stats->incr($project->getId(), 'connections'); $stats->incr($project->getId(), 'connectionsTotal'); } catch (Throwable $th) { - call_user_func($logError, $th, "initServer"); + $logError($th, "initServer"); // Handle SQL error code is 'HY000' $code = $th->getCode(); @@ -596,8 +620,6 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server, Console::error('[Error] Code: ' . $response['data']['code']); Console::error('[Error] Message: ' . $response['data']['message']); } - } finally { - $register->get('pools')->reclaim(); } }); @@ -696,8 +718,6 @@ $server->onMessage(function (int $connection, string $message) use ($server, $re if ($th->getCode() === 1008) { $server->close($connection, $th->getCode()); } - } finally { - $register->get('pools')->reclaim(); } }); diff --git a/app/worker.php b/app/worker.php index 0dceb4f937..79cfffc8bb 100644 --- a/app/worker.php +++ b/app/worker.php @@ -20,10 +20,12 @@ use Appwrite\Platform\Appwrite; use Executor\Executor; use Swoole\Runtime; use Utopia\Abuse\Adapters\TimeLimit\Redis as TimeLimitRedis; +use Utopia\Cache\Adapter\Pool as CachePool; use Utopia\Cache\Adapter\Sharding; use Utopia\Cache\Cache; use Utopia\CLI\Console; use Utopia\Config\Config; +use Utopia\Database\Adapter\Pool as DatabasePool; use Utopia\Database\Database; use Utopia\Database\DateTime; use Utopia\Database\Document; @@ -33,6 +35,7 @@ use Utopia\Logger\Log; use Utopia\Logger\Logger; use Utopia\Platform\Service; use Utopia\Pools\Group; +use Utopia\Queue\Broker\Pool as BrokerPool; use Utopia\Queue\Message; use Utopia\Queue\Publisher; use Utopia\Queue\Server; @@ -41,21 +44,17 @@ use Utopia\System\System; use Utopia\Telemetry\Adapter\None as NoTelemetry; Authorization::disable(); -Runtime::enableCoroutine(SWOOLE_HOOK_ALL); +Runtime::enableCoroutine(); Server::setResource('register', fn () => $register); Server::setResource('dbForPlatform', function (Cache $cache, Registry $register) { $pools = $register->get('pools'); - $database = $pools - ->get('console') - ->pop() - ->getResource(); + $adapter = new DatabasePool($pools->get('console')); + $dbForPlatform = new Database($adapter, $cache); + $dbForPlatform->setNamespace('_console'); - $adapter = new Database($database, $cache); - $adapter->setNamespace('_console'); - - return $adapter; + return $dbForPlatform; }, ['cache', 'register']); Server::setResource('project', function (Message $message, Database $dbForPlatform) { @@ -83,20 +82,9 @@ Server::setResource('dbForProject', function (Cache $cache, Registry $register, $dsn = new DSN('mysql://' . $project->getAttribute('database')); } - $adapter = $pools - ->get($dsn->getHost()) - ->pop() - ->getResource(); - + $adapter = new DatabasePool($pools->get($dsn->getHost())); $database = new Database($adapter, $cache); - try { - $dsn = new DSN($project->getAttribute('database')); - } catch (\InvalidArgumentException) { - // TODO: Temporary until all projects are using shared tables - $dsn = new DSN('mysql://' . $project->getAttribute('database')); - } - $sharedTables = \explode(',', System::getEnv('_APP_DATABASE_SHARED_TABLES', '')); if (\in_array($dsn->getHost(), $sharedTables)) { @@ -151,12 +139,8 @@ Server::setResource('getProjectDB', function (Group $pools, Database $dbForPlatf return $database; } - $dbAdapter = $pools - ->get($dsn->getHost()) - ->pop() - ->getResource(); - - $database = new Database($dbAdapter, $cache); + $adapter = new DatabasePool($pools->get($dsn->getHost())); + $database = new Database($adapter, $cache); $databases[$dsn->getHost()] = $database; @@ -188,15 +172,8 @@ Server::setResource('getLogsDB', function (Group $pools, Cache $cache) { return $database; } - $dbAdapter = $pools - ->get('logs') - ->pop() - ->getResource(); - - $database = new Database( - $dbAdapter, - $cache - ); + $adapter = new DatabasePool($pools->get('logs')); + $database = new Database($adapter, $cache); $database ->setSharedTables(true) @@ -234,11 +211,7 @@ Server::setResource('cache', function (Registry $register) { $adapters = []; foreach ($list as $value) { - $adapters[] = $pools - ->get($value) - ->pop() - ->getResource() - ; + $adapters[] = new CachePool($pools->get($value)); } return new Cache(new Sharding($adapters)); @@ -268,11 +241,11 @@ Server::setResource('timelimit', function (\Redis $redis) { Server::setResource('log', fn () => new Log()); Server::setResource('publisher', function (Group $pools) { - return $pools->get('publisher')->pop()->getResource(); + return new BrokerPool(publisher: $pools->get('publisher')); }, ['pools']); Server::setResource('consumer', function (Group $pools) { - return $pools->get('consumer')->pop()->getResource(); + return new BrokerPool(consumer: $pools->get('consumer')); }, ['pools']); Server::setResource('queueForStatsUsage', function (Publisher $publisher) { @@ -463,13 +436,6 @@ try { $worker = $platform->getWorker(); -$worker - ->shutdown() - ->inject('pools') - ->action(function (Group $pools) { - $pools->reclaim(); - }); - $worker ->error() ->inject('error') @@ -477,8 +443,7 @@ $worker ->inject('log') ->inject('pools') ->inject('project') - ->action(function (Throwable $error, ?Logger $logger, Log $log, Group $pools, Document $project) use ($queueName) { - $pools->reclaim(); + ->action(function (Throwable $error, ?Logger $logger, Log $log, Group $pools, Document $project) use ($worker, $queueName) { $version = System::getEnv('_APP_VERSION', 'UNKNOWN'); if ($logger) { diff --git a/src/Appwrite/Event/Event.php b/src/Appwrite/Event/Event.php index d699a45417..2c735ef2d4 100644 --- a/src/Appwrite/Event/Event.php +++ b/src/Appwrite/Event/Event.php @@ -286,13 +286,6 @@ class Event return $this; } - public function setParamSensitive(string $key): self - { - $this->sensitive[$key] = true; - - return $this; - } - /** * Get param of event. * diff --git a/src/Appwrite/Messaging/Adapter/Realtime.php b/src/Appwrite/Messaging/Adapter/Realtime.php index 568132ceb1..d122841d68 100644 --- a/src/Appwrite/Messaging/Adapter/Realtime.php +++ b/src/Appwrite/Messaging/Adapter/Realtime.php @@ -2,14 +2,14 @@ namespace Appwrite\Messaging\Adapter; -use Appwrite\Messaging\Adapter; +use Appwrite\Messaging\Adapter as MessagingAdapter; +use Appwrite\PubSub\Adapter\Pool as PubSubPool; use Utopia\Database\DateTime; use Utopia\Database\Document; use Utopia\Database\Helpers\ID; use Utopia\Database\Helpers\Role; -use Utopia\Pools\Pool; -class Realtime extends Adapter +class Realtime extends MessagingAdapter { /** * Connection Tree @@ -36,12 +36,12 @@ class Realtime extends Adapter */ public array $subscriptions = []; - private Pool $pubsubPool; + private PubSubPool $pubSubPool; public function __construct() { global $register; - $this->pubsubPool = $register->get('pools')->get('pubsub'); + $this->pubSubPool = new PubSubPool($register->get('pools')->get('pubsub')); } /** @@ -132,11 +132,12 @@ class Realtime extends Adapter * Sends an event to the Realtime Server * @param string $projectId * @param array $payload - * @param string $event + * @param array $events * @param array $channels * @param array $roles * @param array $options * @return void + * @throws \Exception */ public function send(string $projectId, array $payload, array $events, array $channels, array $roles, array $options = []): void { @@ -147,7 +148,7 @@ class Realtime extends Adapter $permissionsChanged = array_key_exists('permissionsChanged', $options) && $options['permissionsChanged']; $userId = array_key_exists('userId', $options) ? $options['userId'] : null; - $message = [ + $this->pubSubPool->publish('realtime', json_encode([ 'project' => $projectId, 'roles' => $roles, 'permissionsChanged' => $permissionsChanged, @@ -158,9 +159,7 @@ class Realtime extends Adapter 'timestamp' => DateTime::formatTz(DateTime::now()), 'payload' => $payload ] - ]; - - $this->pubsubPool->use(fn (\Appwrite\PubSub\Adapter $pubsub) => $pubsub->publish('realtime', json_encode($message))); + ])); } /** @@ -175,8 +174,9 @@ class Realtime extends Adapter * - 1,121.328 ms (±0.84%) | 1,000,000 Connections / 10,000,000 Subscriptions * * @param array $event + * @return int[]|string[] */ - public function getSubscribers(array $event) + public function getSubscribers(array $event): array { $receivers = []; @@ -230,7 +230,7 @@ class Realtime extends Adapter foreach ($channels as $key => $value) { switch (true) { - case strpos($key, 'account.') === 0: + case \str_starts_with($key, 'account.'): unset($channels[$key]); break; @@ -353,7 +353,6 @@ class Realtime extends Adapter } break; - case 'sites': if ($parts[2] === 'deployments') { $channels[] = 'console'; @@ -361,7 +360,6 @@ class Realtime extends Adapter $projectId = 'console'; $roles = [Role::team($project->getAttribute('teamId'))->toString()]; } - break; case 'migrations': $channels[] = 'console'; diff --git a/src/Appwrite/Platform/Tasks/Doctor.php b/src/Appwrite/Platform/Tasks/Doctor.php index 790f1a7290..b543555477 100644 --- a/src/Appwrite/Platform/Tasks/Doctor.php +++ b/src/Appwrite/Platform/Tasks/Doctor.php @@ -3,14 +3,19 @@ namespace Appwrite\Platform\Tasks; use Appwrite\ClamAV\Network; -use Appwrite\PubSub\Adapter; +use Appwrite\PubSub\Adapter\Pool as PubSubPool; +use PHPMailer\PHPMailer\PHPMailer; use Utopia\App; +use Utopia\Cache\Adapter\Pool as CachePool; use Utopia\CLI\Console; use Utopia\Config\Config; +use Utopia\Database\Adapter\Pool as DatabasePool; use Utopia\Domains\Domain; use Utopia\DSN\DSN; use Utopia\Logger\Logger; use Utopia\Platform\Action; +use Utopia\Pools\Group; +use Utopia\Queue\Broker\Pool as BrokerPool; use Utopia\Registry\Registry; use Utopia\Storage\Device\Local; use Utopia\Storage\Storage; @@ -89,9 +94,9 @@ class Doctor extends Action Console::log('🟢 Abuse protection is enabled'); } - $authWhitelistRoot = System::getEnv('_APP_CONSOLE_WHITELIST_ROOT', null); - $authWhitelistEmails = System::getEnv('_APP_CONSOLE_WHITELIST_EMAILS', null); - $authWhitelistIPs = System::getEnv('_APP_CONSOLE_WHITELIST_IPS', null); + $authWhitelistRoot = System::getEnv('_APP_CONSOLE_WHITELIST_ROOT'); + $authWhitelistEmails = System::getEnv('_APP_CONSOLE_WHITELIST_EMAILS'); + $authWhitelistIPs = System::getEnv('_APP_CONSOLE_WHITELIST_IPS'); if ( empty($authWhitelistRoot) @@ -127,19 +132,16 @@ class Doctor extends Action } else { Console::log('🟢 Logging adapter is enabled (' . $providerName . ')'); } - } catch (\Throwable $th) { + } catch (\Throwable) { Console::log('🔴 Logging adapter is misconfigured'); } \usleep(200 * 1000); // Sleep for 0.2 seconds - try { - Console::log("\n" . '[Connectivity]'); - } catch (\Throwable $th) { - //throw $th; - } + Console::log("\n" . '[Connectivity]'); - $pools = $register->get('pools'); /** @var \Utopia\Pools\Group $pools */ + /** @var Group $pools */ + $pools = $register->get('pools'); $configs = [ 'Console.DB' => Config::getParam('pools-console'), @@ -149,20 +151,22 @@ class Doctor extends Action foreach ($configs as $key => $config) { foreach ($config as $database) { try { - $adapter = $pools->get($database)->pop()->getResource(); + $adapter = new DatabasePool($pools->get($database)); if ($adapter->ping()) { Console::success('🟢 ' . str_pad("{$key}({$database})", 50, '.') . 'connected'); } else { Console::error('🔴 ' . str_pad("{$key}({$database})", 47, '.') . 'disconnected'); } - } catch (\Throwable $th) { + } catch (\Throwable) { Console::error('🔴 ' . str_pad("{$key}.({$database})", 47, '.') . 'disconnected'); } } } - $pools = $register->get('pools'); /** @var \Utopia\Pools\Group $pools */ + /** @var Group $pools */ + $pools = $register->get('pools'); + $configs = [ 'Cache' => Config::getParam('pools-cache'), 'Queue' => Config::getParam('pools-queue'), @@ -172,15 +176,18 @@ class Doctor extends Action foreach ($configs as $key => $config) { foreach ($config as $pool) { try { - /** @var Adapter $adapter */ - $adapter = $pools->get($pool)->pop()->getResource(); + $adapter = match($key) { + 'Cache' => new CachePool($pools->get($pool)), + 'Queue' => new BrokerPool($pools->get($pool)), + 'PubSub' => new PubSubPool($pools->get($pool)), + }; if ($adapter->ping()) { Console::success('🟢 ' . str_pad("{$key}({$pool})", 50, '.') . 'connected'); } else { Console::error('🔴 ' . str_pad("{$key}({$pool})", 47, '.') . 'disconnected'); } - } catch (\Throwable $th) { + } catch (\Throwable) { Console::error('🔴 ' . str_pad("{$key}({$pool})", 47, '.') . 'disconnected'); } } @@ -198,13 +205,14 @@ class Doctor extends Action } else { Console::error('🔴 ' . str_pad("Antivirus", 47, '.') . 'disconnected'); } - } catch (\Throwable $th) { + } catch (\Throwable) { Console::error('🔴 ' . str_pad("Antivirus", 47, '.') . 'disconnected'); } } try { - $mail = $register->get('smtp'); /* @var $mail \PHPMailer\PHPMailer\PHPMailer */ + /* @var PHPMailer $mail */ + $mail = $register->get('smtp'); $mail->addAddress('demo@example.com', 'Example.com'); $mail->Subject = 'Test SMTP Connection'; @@ -213,7 +221,7 @@ class Doctor extends Action $mail->send(); Console::success('🟢 ' . str_pad("SMTP", 50, '.') . 'connected'); - } catch (\Throwable $th) { + } catch (\Throwable) { Console::error('🔴 ' . str_pad("SMTP", 47, '.') . 'disconnected'); } @@ -287,7 +295,7 @@ class Doctor extends Action Console::error('Failed to check for a newer version' . "\n"); } } - } catch (\Throwable $th) { + } catch (\Throwable) { Console::error('Failed to check for a newer version' . "\n"); } } diff --git a/src/Appwrite/Platform/Tasks/ScheduleBase.php b/src/Appwrite/Platform/Tasks/ScheduleBase.php index 3cf89f9f44..286ffe45cb 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleBase.php +++ b/src/Appwrite/Platform/Tasks/ScheduleBase.php @@ -12,6 +12,7 @@ use Utopia\Database\Query; use Utopia\Database\Validator\Authorization; use Utopia\Platform\Action; use Utopia\Pools\Group; +use Utopia\Queue\Broker\Pool as BrokerPool; use Utopia\System\System; use Utopia\Telemetry\Adapter as Telemetry; use Utopia\Telemetry\Gauge; @@ -24,6 +25,8 @@ abstract class ScheduleBase extends Action protected array $schedules = []; + protected BrokerPool $publisher; + private ?Histogram $collectSchedulesTelemetryDuration = null; private ?Gauge $collectSchedulesTelemetryCount = null; private ?Gauge $scheduleTelemetryCount = null; @@ -68,6 +71,7 @@ abstract class ScheduleBase extends Action Console::title(\ucfirst(static::getSupportedResource()) . ' scheduler V1'); Console::success(APP_NAME . ' ' . \ucfirst(static::getSupportedResource()) . ' scheduler v1 has started'); + $this->publisher = new BrokerPool($pools->get('publisher')); $this->scheduleTelemetryCount = $telemetry->createGauge('task.schedule.count'); $this->collectSchedulesTelemetryDuration = $telemetry->createHistogram('task.schedule.collect_schedules.duration', 's'); $this->collectSchedulesTelemetryCount = $telemetry->createGauge('task.schedule.collect_schedules.count'); @@ -119,8 +123,6 @@ abstract class ScheduleBase extends Action $schedule->getAttribute('resourceId') ); - $pools->reclaim(); - return [ '$internalId' => $schedule->getInternalId(), '$id' => $schedule->getId(), diff --git a/src/Appwrite/Platform/Tasks/ScheduleExecutions.php b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php index 2a8fd99b7f..6bf2f93afe 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleExecutions.php +++ b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php @@ -29,9 +29,7 @@ class ScheduleExecutions extends ScheduleBase protected function enqueueResources(Group $pools, Database $dbForPlatform, callable $getProjectDB): void { - $queue = $pools->get('publisher')->pop(); - $connection = $queue->getResource(); - $queueForFunctions = new Func($connection); + $queueForFunctions = new Func($this->publisher); $intervalEnd = (new \DateTime())->modify('+' . self::ENQUEUE_TIMER . ' seconds'); foreach ($this->schedules as $schedule) { @@ -85,7 +83,5 @@ class ScheduleExecutions extends ScheduleBase unset($this->schedules[$schedule['$internalId']]); } - - $queue->reclaim(); } } diff --git a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php index 6788748f3d..19e068107a 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php +++ b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php @@ -79,9 +79,6 @@ class ScheduleFunctions extends ScheduleBase \go(function () use ($delay, $schedules, $pools, $dbForPlatform) { \sleep($delay); // in seconds - $queue = $pools->get('publisher')->pop(); - $connection = $queue->getResource(); - foreach ($schedules as $delayConfig) { $scheduleKey = $delayConfig['key']; // Ensure schedule was not deleted @@ -93,7 +90,7 @@ class ScheduleFunctions extends ScheduleBase $this->updateProjectAccess($schedule['project'], $dbForPlatform); - $queueForFunctions = new Func($connection); + $queueForFunctions = new Func($this->publisher); $queueForFunctions ->setType('schedule') @@ -105,8 +102,6 @@ class ScheduleFunctions extends ScheduleBase $this->recordEnqueueDelay($delayConfig['nextDate']); } - - $queue->reclaim(); }); } diff --git a/src/Appwrite/Platform/Tasks/ScheduleMessages.php b/src/Appwrite/Platform/Tasks/ScheduleMessages.php index a15df6ed5b..5e65f7a8a6 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleMessages.php +++ b/src/Appwrite/Platform/Tasks/ScheduleMessages.php @@ -41,9 +41,7 @@ class ScheduleMessages extends ScheduleBase } \go(function () use ($schedule, $scheduledAt, $pools, $dbForPlatform) { - $queue = $pools->get('publisher')->pop(); - $connection = $queue->getResource(); - $queueForMessaging = new Messaging($connection); + $queueForMessaging = new Messaging($this->publisher); $this->updateProjectAccess($schedule['project'], $dbForPlatform); @@ -58,7 +56,6 @@ class ScheduleMessages extends ScheduleBase $schedule['$id'], ); - $queue->reclaim(); $this->recordEnqueueDelay($scheduledAt); unset($this->schedules[$schedule['$internalId']]); }); diff --git a/src/Appwrite/Platform/Workers/Deletes.php b/src/Appwrite/Platform/Workers/Deletes.php index 12143b4106..43bc55583d 100644 --- a/src/Appwrite/Platform/Workers/Deletes.php +++ b/src/Appwrite/Platform/Workers/Deletes.php @@ -63,7 +63,7 @@ class Deletes extends Action ->inject('executionRetention') ->inject('auditRetention') ->inject('log') - ->callback([$this, 'action']); + ->callback($this->action(...)); } /** diff --git a/src/Appwrite/Platform/Workers/StatsUsage.php b/src/Appwrite/Platform/Workers/StatsUsage.php index 07131593e2..25c80fabdc 100644 --- a/src/Appwrite/Platform/Workers/StatsUsage.php +++ b/src/Appwrite/Platform/Workers/StatsUsage.php @@ -357,7 +357,7 @@ class StatsUsage extends Action break; } } catch (Throwable $e) { - console::error("[reducer] " . " {DateTime::now()} " . " {$project->getInternalId()} " . " {$e->getMessage()}"); + Console::error("[reducer] " . " {DateTime::now()} " . " {$project->getInternalId()} " . " {$e->getMessage()}"); } } @@ -376,7 +376,7 @@ class StatsUsage extends Action continue; } - console::log('['.DateTime::now().'] Id: '.$project->getId(). ' InternalId: '.$project->getInternalId(). ' Db: '.$project->getAttribute('database').' ReceivedAt: '.$receivedAt. ' Keys: '.$numberOfKeys); + Console::log('['.DateTime::now().'] Id: '.$project->getId(). ' InternalId: '.$project->getInternalId(). ' Db: '.$project->getAttribute('database').' ReceivedAt: '.$receivedAt. ' Keys: '.$numberOfKeys); try { foreach ($stats['keys'] ?? [] as $key => $value) { @@ -413,7 +413,7 @@ class StatsUsage extends Action } } } catch (Exception $e) { - console::error('[' . DateTime::now() . '] project [' . $project->getInternalId() . '] database [' . $project['database'] . '] ' . ' ' . $e->getMessage()); + Console::error('[' . DateTime::now() . '] project [' . $project->getInternalId() . '] database [' . $project['database'] . '] ' . ' ' . $e->getMessage()); } } @@ -437,7 +437,7 @@ class StatsUsage extends Action } - protected function prepareForLogsDB(Document $project, Document $stat) + protected function prepareForLogsDB(Document $project, Document $stat): void { if (System::getEnv('_APP_STATS_USAGE_DUAL_WRITING', 'disabled') === 'disabled') { return; @@ -462,8 +462,7 @@ class StatsUsage extends Action return; } - $dbForLogs = call_user_func($this->getLogsDB); - $dbForLogs + $dbForLogs = ($this->getLogsDB)() ->setTenant(null) ->setTenantPerDocument(true); @@ -478,6 +477,5 @@ class StatsUsage extends Action } catch (Throwable $th) { Console::error($th->getMessage()); } - $this->register->get('pools')->get('logs')->reclaim(); } } diff --git a/src/Appwrite/Platform/Workers/StatsUsageDump.php b/src/Appwrite/Platform/Workers/StatsUsageDump.php index b9d486e0d8..77ec3f13e6 100644 --- a/src/Appwrite/Platform/Workers/StatsUsageDump.php +++ b/src/Appwrite/Platform/Workers/StatsUsageDump.php @@ -70,9 +70,9 @@ class StatsUsageDump extends Action ]; /** - * @var callable + * @var callable(Document): Database */ - protected mixed $getLogsDB; + protected $getLogsDB; protected array $periods = [ '1h' => 'Y-m-d H:00', @@ -126,10 +126,10 @@ class StatsUsageDump extends Action continue; } - console::log('['.DateTime::now().'] Id: '.$project->getId(). ' InternalId: '.$project->getInternalId(). ' Db: '.$project->getAttribute('database').' ReceivedAt: '.$receivedAt. ' Keys: '.$numberOfKeys); + Console::log('['.DateTime::now().'] Id: '.$project->getId(). ' InternalId: '.$project->getInternalId(). ' Db: '.$project->getAttribute('database').' ReceivedAt: '.$receivedAt. ' Keys: '.$numberOfKeys); try { - /** @var \Utopia\Database\Database $dbForProject */ + /** @var Database $dbForProject */ $dbForProject = $getProjectDB($project); foreach ($stats['keys'] ?? [] as $key => $value) { if ($value == 0) { @@ -169,7 +169,7 @@ class StatsUsageDump extends Action } } } catch (\Exception $e) { - console::error('[' . DateTime::now() . '] project [' . $project->getInternalId() . '] database [' . $project['database'] . '] ' . ' ' . $e->getMessage()); + Console::error('[' . DateTime::now() . '] project [' . $project->getInternalId() . '] database [' . $project['database'] . '] ' . ' ' . $e->getMessage()); } } } @@ -190,8 +190,7 @@ class StatsUsageDump extends Action } } - /** @var \Utopia\Database\Database $dbForLogs*/ - $dbForLogs = call_user_func($this->getLogsDB, $project); + $dbForLogs = ($this->getLogsDB)($project); try { $dbForLogs->createOrUpdateDocumentsWithIncrease( @@ -203,7 +202,5 @@ class StatsUsageDump extends Action } catch (\Throwable $th) { Console::error($th->getMessage()); } - - $this->register->get('pools')->get('logs')->reclaim(); } } diff --git a/src/Appwrite/PubSub/Adapter/Pool.php b/src/Appwrite/PubSub/Adapter/Pool.php new file mode 100644 index 0000000000..a498118dae --- /dev/null +++ b/src/Appwrite/PubSub/Adapter/Pool.php @@ -0,0 +1,46 @@ +delegate(__FUNCTION__, \func_get_args()); + } + + public function subscribe($channels, $callback): void + { + $this->delegate(__FUNCTION__, \func_get_args()); + } + + public function publish($channel, $message): void + { + $this->delegate(__FUNCTION__, \func_get_args()); + } + + /** + * Forward method calls to the internal adapter instance via the pool. + * + * Required because __call() can't be used to implement abstract methods. + * + * @param string $method + * @param array $args + * @return mixed + * @throws DatabaseException + */ + public function delegate(string $method, array $args): mixed + { + return $this->pool->use(function (Adapter $adapter) use ($method, $args) { + return $adapter->{$method}(...$args); + }); + } +} diff --git a/tests/resources/docker/docker-compose.yml b/tests/resources/docker/docker-compose.yml index 4bbca3e9c0..039ffb731a 100644 --- a/tests/resources/docker/docker-compose.yml +++ b/tests/resources/docker/docker-compose.yml @@ -35,7 +35,7 @@ services: - VERSION=dev restart: unless-stopped ports: - - 9501:80 + - "9501:80" networks: - appwrite labels: @@ -52,15 +52,12 @@ services: - ./phpunit.xml:/usr/src/code/phpunit.xml - ./tests:/usr/src/code/tests - ./app:/usr/src/code/app - # - ./vendor:/usr/src/code/vendor - ./docs:/usr/src/code/docs - ./public:/usr/src/code/public - ./src:/usr/src/code/src - - ./debug:/tmp depends_on: - mariadb - redis - # - clamav environment: - _APP_COMPRESSION_MIN_SIZE_BYTES - _APP_ENV @@ -353,33 +350,6 @@ services: volumes: - appwrite-redis:/data:rw - # clamav: - # image: appwrite/clamav:1.2.0 - # container_name: appwrite-clamav - # restart: unless-stopped - # networks: - # - appwrite - # volumes: - # - appwrite-uploads:/storage/uploads - - - # redis-commander: - # image: rediscommander/redis-commander:latest - # restart: unless-stopped - # networks: - # - appwrite - # environment: - # - REDIS_HOSTS=redis - # ports: - # - "8081:8081" - - # webgrind: - # image: 'jokkedk/webgrind:latest' - # volumes: - # - './debug:/tmp' - # ports: - # - '3001:80' - networks: gateway: appwrite: