mirror of
https://github.com/appwrite/appwrite.git
synced 2026-05-26 13:51:13 +00:00
+12
-27
@@ -12,11 +12,13 @@ use Appwrite\Runtimes\Runtimes;
|
||||
use Executor\Executor;
|
||||
use Swoole\Runtime;
|
||||
use Swoole\Timer;
|
||||
use Utopia\Cache\Adapter\Pool as CachePool;
|
||||
use Utopia\Cache\Adapter\Sharding;
|
||||
use Utopia\Cache\Cache;
|
||||
use Utopia\CLI\CLI;
|
||||
use Utopia\CLI\Console;
|
||||
use Utopia\Config\Config;
|
||||
use Utopia\Database\Adapter\Pool as DatabasePool;
|
||||
use Utopia\Database\Database;
|
||||
use Utopia\Database\Document;
|
||||
use Utopia\Database\Validator\Authorization;
|
||||
@@ -24,6 +26,7 @@ use Utopia\DSN\DSN;
|
||||
use Utopia\Logger\Log;
|
||||
use Utopia\Platform\Service;
|
||||
use Utopia\Pools\Group;
|
||||
use Utopia\Queue\Broker\Pool as BrokerPool;
|
||||
use Utopia\Queue\Publisher;
|
||||
use Utopia\Registry\Registry;
|
||||
use Utopia\System\System;
|
||||
@@ -46,10 +49,7 @@ CLI::setResource('cache', function ($pools) {
|
||||
$adapters = [];
|
||||
|
||||
foreach ($list as $value) {
|
||||
$adapters[] = $pools
|
||||
->get($value)
|
||||
->pop()
|
||||
->getResource();
|
||||
$adapters[] = new CachePool($pools->get($value));
|
||||
}
|
||||
|
||||
return new Cache(new Sharding($adapters));
|
||||
@@ -69,12 +69,8 @@ CLI::setResource('dbForPlatform', function ($pools, $cache) {
|
||||
$attempts++;
|
||||
try {
|
||||
// Prepare database connection
|
||||
$dbAdapter = $pools
|
||||
->get('console')
|
||||
->pop()
|
||||
->getResource();
|
||||
|
||||
$dbForPlatform = new Database($dbAdapter, $cache);
|
||||
$adapter = new DatabasePool($pools->get('console'));
|
||||
$dbForPlatform = new Database($adapter, $cache);
|
||||
|
||||
$dbForPlatform
|
||||
->setNamespace('_console')
|
||||
@@ -92,7 +88,6 @@ CLI::setResource('dbForPlatform', function ($pools, $cache) {
|
||||
$ready = true;
|
||||
} catch (\Throwable $err) {
|
||||
Console::warning($err->getMessage());
|
||||
$pools->get('console')->reclaim();
|
||||
sleep($sleep);
|
||||
}
|
||||
} while ($attempts < $maxAttempts && !$ready);
|
||||
@@ -142,12 +137,8 @@ CLI::setResource('getProjectDB', function (Group $pools, Database $dbForPlatform
|
||||
return $database;
|
||||
}
|
||||
|
||||
$dbAdapter = $pools
|
||||
->get($dsn->getHost())
|
||||
->pop()
|
||||
->getResource();
|
||||
|
||||
$database = new Database($dbAdapter, $cache);
|
||||
$adapter = new DatabasePool($pools->get($dsn->getHost()));
|
||||
$database = new Database($adapter, $cache);
|
||||
$databases[$dsn->getHost()] = $database;
|
||||
$sharedTables = \explode(',', System::getEnv('_APP_DATABASE_SHARED_TABLES', ''));
|
||||
|
||||
@@ -173,21 +164,15 @@ CLI::setResource('getProjectDB', function (Group $pools, Database $dbForPlatform
|
||||
|
||||
CLI::setResource('getLogsDB', function (Group $pools, Cache $cache) {
|
||||
$database = null;
|
||||
|
||||
return function (?Document $project = null) use ($pools, $cache, $database) {
|
||||
if ($database !== null && $project !== null && !$project->isEmpty() && $project->getId() !== 'console') {
|
||||
$database->setTenant($project->getInternalId());
|
||||
return $database;
|
||||
}
|
||||
|
||||
$dbAdapter = $pools
|
||||
->get('logs')
|
||||
->pop()
|
||||
->getResource();
|
||||
|
||||
$database = new Database(
|
||||
$dbAdapter,
|
||||
$cache
|
||||
);
|
||||
$adapter = new DatabasePool($pools->get('logs'));
|
||||
$database = new Database($adapter, $cache);
|
||||
|
||||
$database
|
||||
->setSharedTables(true)
|
||||
@@ -211,7 +196,7 @@ CLI::setResource('queueForStatsResources', function (Publisher $publisher) {
|
||||
return new StatsResources($publisher);
|
||||
}, ['publisher']);
|
||||
CLI::setResource('publisher', function (Group $pools) {
|
||||
return $pools->get('publisher')->pop()->getResource();
|
||||
return new BrokerPool(publisher: $pools->get('publisher'));
|
||||
}, ['pools']);
|
||||
CLI::setResource('queueForFunctions', function (Publisher $publisher) {
|
||||
return new Func($publisher);
|
||||
|
||||
@@ -3789,7 +3789,12 @@ App::get('/v1/databases/:databaseId/collections/:collectionId/documents/:documen
|
||||
throw new Exception(Exception::GENERAL_QUERY_INVALID, $e->getMessage());
|
||||
}
|
||||
|
||||
$document = $dbForProject->getDocument('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId(), $documentId, $queries);
|
||||
try {
|
||||
$document = $dbForProject->getDocument('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId(), $documentId, $queries);
|
||||
} catch (QueryException $e) {
|
||||
throw new Exception(Exception::GENERAL_QUERY_INVALID, $e->getMessage());
|
||||
}
|
||||
|
||||
if ($document->isEmpty()) {
|
||||
throw new Exception(Exception::DOCUMENT_NOT_FOUND);
|
||||
}
|
||||
|
||||
@@ -3,13 +3,16 @@
|
||||
use Appwrite\ClamAV\Network;
|
||||
use Appwrite\Event\Event;
|
||||
use Appwrite\Extend\Exception;
|
||||
use Appwrite\PubSub\Adapter\Pool as PubSubPool;
|
||||
use Appwrite\SDK\AuthType;
|
||||
use Appwrite\SDK\ContentType;
|
||||
use Appwrite\SDK\Method;
|
||||
use Appwrite\SDK\Response as SDKResponse;
|
||||
use Appwrite\Utopia\Response;
|
||||
use Utopia\App;
|
||||
use Utopia\Cache\Adapter\Pool as CachePool;
|
||||
use Utopia\Config\Config;
|
||||
use Utopia\Database\Adapter\Pool as DatabasePool;
|
||||
use Utopia\Database\Document;
|
||||
use Utopia\Domains\Validator\PublicDomain;
|
||||
use Utopia\Pools\Group;
|
||||
@@ -34,8 +37,8 @@ App::get('/v1/health')
|
||||
namespace: 'health',
|
||||
group: 'health',
|
||||
name: 'get',
|
||||
auth: [AuthType::KEY],
|
||||
description: '/docs/references/health/get.md',
|
||||
auth: [AuthType::KEY],
|
||||
responses: [
|
||||
new SDKResponse(
|
||||
code: Response::STATUS_CODE_OK,
|
||||
@@ -70,11 +73,11 @@ App::get('/v1/health/db')
|
||||
->groups(['api', 'health'])
|
||||
->label('scope', 'health.read')
|
||||
->label('sdk', new Method(
|
||||
auth: [AuthType::KEY],
|
||||
namespace: 'health',
|
||||
group: 'health',
|
||||
name: 'getDB',
|
||||
description: '/docs/references/health/get-db.md',
|
||||
auth: [AuthType::KEY],
|
||||
responses: [
|
||||
new SDKResponse(
|
||||
code: Response::STATUS_CODE_OK,
|
||||
@@ -86,8 +89,8 @@ App::get('/v1/health/db')
|
||||
->inject('response')
|
||||
->inject('pools')
|
||||
->action(function (Response $response, Group $pools) {
|
||||
|
||||
$output = [];
|
||||
$failures = [];
|
||||
|
||||
$configs = [
|
||||
'Console.DB' => Config::getParam('pools-console'),
|
||||
@@ -97,7 +100,7 @@ App::get('/v1/health/db')
|
||||
foreach ($configs as $key => $config) {
|
||||
foreach ($config as $database) {
|
||||
try {
|
||||
$adapter = $pools->get($database)->pop()->getResource();
|
||||
$adapter = new DatabasePool($pools->get($database));
|
||||
|
||||
$checkStart = \microtime(true);
|
||||
|
||||
@@ -108,16 +111,16 @@ App::get('/v1/health/db')
|
||||
'ping' => \round((\microtime(true) - $checkStart) / 1000)
|
||||
]);
|
||||
} else {
|
||||
$failure[] = $database;
|
||||
$failures[] = $database;
|
||||
}
|
||||
} catch (\Throwable $th) {
|
||||
$failure[] = $database;
|
||||
} catch (\Throwable) {
|
||||
$failures[] = $database;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!empty($failure)) {
|
||||
throw new Exception(Exception::GENERAL_SERVER_ERROR, 'DB failure on: ' . implode(", ", $failure));
|
||||
if (!empty($failures)) {
|
||||
throw new Exception(Exception::GENERAL_SERVER_ERROR, 'DB failure on: ' . implode(", ", $failures));
|
||||
}
|
||||
|
||||
$response->dynamic(new Document([
|
||||
@@ -131,11 +134,11 @@ App::get('/v1/health/cache')
|
||||
->groups(['api', 'health'])
|
||||
->label('scope', 'health.read')
|
||||
->label('sdk', new Method(
|
||||
auth: [AuthType::KEY],
|
||||
namespace: 'health',
|
||||
group: 'health',
|
||||
name: 'getCache',
|
||||
description: '/docs/references/health/get-cache.md',
|
||||
auth: [AuthType::KEY],
|
||||
responses: [
|
||||
new SDKResponse(
|
||||
code: Response::STATUS_CODE_OK,
|
||||
@@ -147,44 +150,39 @@ App::get('/v1/health/cache')
|
||||
->inject('response')
|
||||
->inject('pools')
|
||||
->action(function (Response $response, Group $pools) {
|
||||
|
||||
$output = [];
|
||||
$failures = [];
|
||||
|
||||
$configs = [
|
||||
'Cache' => Config::getParam('pools-cache'),
|
||||
];
|
||||
|
||||
foreach ($configs as $key => $config) {
|
||||
foreach ($config as $database) {
|
||||
foreach ($config as $cache) {
|
||||
try {
|
||||
/** @var \Utopia\Cache\Adapter $adapter */
|
||||
$adapter = $pools->get($database)->pop()->getResource();
|
||||
$adapter = new CachePool($pools->get($cache));
|
||||
|
||||
$checkStart = \microtime(true);
|
||||
|
||||
if ($adapter->ping()) {
|
||||
$output[] = new Document([
|
||||
'name' => $key . " ($database)",
|
||||
'name' => $key . " ($cache)",
|
||||
'status' => 'pass',
|
||||
'ping' => \round((\microtime(true) - $checkStart) / 1000)
|
||||
]);
|
||||
} else {
|
||||
$output[] = new Document([
|
||||
'name' => $key . " ($database)",
|
||||
'status' => 'fail',
|
||||
'ping' => \round((\microtime(true) - $checkStart) / 1000)
|
||||
]);
|
||||
$failures[] = $cache;
|
||||
}
|
||||
} catch (\Throwable $th) {
|
||||
$output[] = new Document([
|
||||
'name' => $key . " ($database)",
|
||||
'status' => 'fail',
|
||||
'ping' => \round((\microtime(true) - $checkStart) / 1000)
|
||||
]);
|
||||
} catch (\Throwable) {
|
||||
$failures[] = $cache;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!empty($failures)) {
|
||||
throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Cache failure on: ' . implode(", ", $failures));
|
||||
}
|
||||
|
||||
$response->dynamic(new Document([
|
||||
'statuses' => $output,
|
||||
'total' => count($output),
|
||||
@@ -196,11 +194,11 @@ App::get('/v1/health/pubsub')
|
||||
->groups(['api', 'health'])
|
||||
->label('scope', 'health.read')
|
||||
->label('sdk', new Method(
|
||||
auth: [AuthType::KEY],
|
||||
namespace: 'health',
|
||||
group: 'health',
|
||||
name: 'getPubSub',
|
||||
description: '/docs/references/health/get-pubsub.md',
|
||||
auth: [AuthType::KEY],
|
||||
responses: [
|
||||
new SDKResponse(
|
||||
code: Response::STATUS_CODE_OK,
|
||||
@@ -212,44 +210,39 @@ App::get('/v1/health/pubsub')
|
||||
->inject('response')
|
||||
->inject('pools')
|
||||
->action(function (Response $response, Group $pools) {
|
||||
|
||||
$output = [];
|
||||
$failures = [];
|
||||
|
||||
$configs = [
|
||||
'PubSub' => Config::getParam('pools-pubsub'),
|
||||
];
|
||||
|
||||
foreach ($configs as $key => $config) {
|
||||
foreach ($config as $database) {
|
||||
foreach ($config as $pubsub) {
|
||||
try {
|
||||
/** @var \Appwrite\PubSub\Adapter $adapter */
|
||||
$adapter = $pools->get($database)->pop()->getResource();
|
||||
$adapter = new PubSubPool($pools->get($pubsub));
|
||||
|
||||
$checkStart = \microtime(true);
|
||||
|
||||
if ($adapter->ping()) {
|
||||
$output[] = new Document([
|
||||
'name' => $key . " ($database)",
|
||||
'name' => $key . " ($pubsub)",
|
||||
'status' => 'pass',
|
||||
'ping' => \round((\microtime(true) - $checkStart) / 1000)
|
||||
]);
|
||||
} else {
|
||||
$output[] = new Document([
|
||||
'name' => $key . " ($database)",
|
||||
'status' => 'fail',
|
||||
'ping' => \round((\microtime(true) - $checkStart) / 1000)
|
||||
]);
|
||||
$failures[] = $pubsub;
|
||||
}
|
||||
} catch (\Throwable $th) {
|
||||
$output[] = new Document([
|
||||
'name' => $key . " ($database)",
|
||||
'status' => 'fail',
|
||||
'ping' => \round((\microtime(true) - $checkStart) / 1000)
|
||||
]);
|
||||
} catch (\Throwable) {
|
||||
$failures[] = $pubsub;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!empty($failures)) {
|
||||
throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Pubsub failure on: ' . implode(", ", $failures));
|
||||
}
|
||||
|
||||
$response->dynamic(new Document([
|
||||
'statuses' => $output,
|
||||
'total' => count($output),
|
||||
@@ -261,11 +254,11 @@ App::get('/v1/health/time')
|
||||
->groups(['api', 'health'])
|
||||
->label('scope', 'health.read')
|
||||
->label('sdk', new Method(
|
||||
auth: [AuthType::KEY],
|
||||
namespace: 'health',
|
||||
group: 'health',
|
||||
name: 'getTime',
|
||||
description: '/docs/references/health/get-time.md',
|
||||
auth: [AuthType::KEY],
|
||||
responses: [
|
||||
new SDKResponse(
|
||||
code: Response::STATUS_CODE_OK,
|
||||
@@ -325,11 +318,11 @@ App::get('/v1/health/queue/webhooks')
|
||||
->groups(['api', 'health'])
|
||||
->label('scope', 'health.read')
|
||||
->label('sdk', new Method(
|
||||
auth: [AuthType::KEY],
|
||||
namespace: 'health',
|
||||
group: 'queue',
|
||||
name: 'getQueueWebhooks',
|
||||
description: '/docs/references/health/get-queue-webhooks.md',
|
||||
auth: [AuthType::KEY],
|
||||
responses: [
|
||||
new SDKResponse(
|
||||
code: Response::STATUS_CODE_OK,
|
||||
@@ -351,18 +344,18 @@ App::get('/v1/health/queue/webhooks')
|
||||
}
|
||||
|
||||
$response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE);
|
||||
}, ['response']);
|
||||
});
|
||||
|
||||
App::get('/v1/health/queue/logs')
|
||||
->desc('Get logs queue')
|
||||
->groups(['api', 'health'])
|
||||
->label('scope', 'health.read')
|
||||
->label('sdk', new Method(
|
||||
auth: [AuthType::KEY],
|
||||
namespace: 'health',
|
||||
group: 'queue',
|
||||
name: 'getQueueLogs',
|
||||
description: '/docs/references/health/get-queue-logs.md',
|
||||
auth: [AuthType::KEY],
|
||||
responses: [
|
||||
new SDKResponse(
|
||||
code: Response::STATUS_CODE_OK,
|
||||
@@ -384,18 +377,18 @@ App::get('/v1/health/queue/logs')
|
||||
}
|
||||
|
||||
$response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE);
|
||||
}, ['response']);
|
||||
});
|
||||
|
||||
App::get('/v1/health/certificate')
|
||||
->desc('Get the SSL certificate for a domain')
|
||||
->groups(['api', 'health'])
|
||||
->label('scope', 'health.read')
|
||||
->label('sdk', new Method(
|
||||
auth: [AuthType::KEY],
|
||||
namespace: 'health',
|
||||
group: 'health',
|
||||
name: 'getCertificate',
|
||||
description: '/docs/references/health/get-certificate.md',
|
||||
auth: [AuthType::KEY],
|
||||
responses: [
|
||||
new SDKResponse(
|
||||
code: Response::STATUS_CODE_OK,
|
||||
@@ -441,18 +434,18 @@ App::get('/v1/health/certificate')
|
||||
'validTo' => $certificatePayload['validTo_time_t'],
|
||||
'signatureTypeSN' => $certificatePayload['signatureTypeSN'],
|
||||
]), Response::MODEL_HEALTH_CERTIFICATE);
|
||||
}, ['response']);
|
||||
});
|
||||
|
||||
App::get('/v1/health/queue/certificates')
|
||||
->desc('Get certificates queue')
|
||||
->groups(['api', 'health'])
|
||||
->label('scope', 'health.read')
|
||||
->label('sdk', new Method(
|
||||
auth: [AuthType::KEY],
|
||||
namespace: 'health',
|
||||
group: 'queue',
|
||||
name: 'getQueueCertificates',
|
||||
description: '/docs/references/health/get-queue-certificates.md',
|
||||
auth: [AuthType::KEY],
|
||||
responses: [
|
||||
new SDKResponse(
|
||||
code: Response::STATUS_CODE_OK,
|
||||
@@ -474,18 +467,18 @@ App::get('/v1/health/queue/certificates')
|
||||
}
|
||||
|
||||
$response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE);
|
||||
}, ['response']);
|
||||
});
|
||||
|
||||
App::get('/v1/health/queue/builds')
|
||||
->desc('Get builds queue')
|
||||
->groups(['api', 'health'])
|
||||
->label('scope', 'health.read')
|
||||
->label('sdk', new Method(
|
||||
auth: [AuthType::KEY],
|
||||
namespace: 'health',
|
||||
group: 'queue',
|
||||
name: 'getQueueBuilds',
|
||||
description: '/docs/references/health/get-queue-builds.md',
|
||||
auth: [AuthType::KEY],
|
||||
responses: [
|
||||
new SDKResponse(
|
||||
code: Response::STATUS_CODE_OK,
|
||||
@@ -507,18 +500,18 @@ App::get('/v1/health/queue/builds')
|
||||
}
|
||||
|
||||
$response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE);
|
||||
}, ['response']);
|
||||
});
|
||||
|
||||
App::get('/v1/health/queue/databases')
|
||||
->desc('Get databases queue')
|
||||
->groups(['api', 'health'])
|
||||
->label('scope', 'health.read')
|
||||
->label('sdk', new Method(
|
||||
auth: [AuthType::KEY],
|
||||
namespace: 'health',
|
||||
group: 'queue',
|
||||
name: 'getQueueDatabases',
|
||||
description: '/docs/references/health/get-queue-databases.md',
|
||||
auth: [AuthType::KEY],
|
||||
responses: [
|
||||
new SDKResponse(
|
||||
code: Response::STATUS_CODE_OK,
|
||||
@@ -541,18 +534,18 @@ App::get('/v1/health/queue/databases')
|
||||
}
|
||||
|
||||
$response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE);
|
||||
}, ['response']);
|
||||
});
|
||||
|
||||
App::get('/v1/health/queue/deletes')
|
||||
->desc('Get deletes queue')
|
||||
->groups(['api', 'health'])
|
||||
->label('scope', 'health.read')
|
||||
->label('sdk', new Method(
|
||||
auth: [AuthType::KEY],
|
||||
namespace: 'health',
|
||||
group: 'queue',
|
||||
name: 'getQueueDeletes',
|
||||
description: '/docs/references/health/get-queue-deletes.md',
|
||||
auth: [AuthType::KEY],
|
||||
responses: [
|
||||
new SDKResponse(
|
||||
code: Response::STATUS_CODE_OK,
|
||||
@@ -574,18 +567,18 @@ App::get('/v1/health/queue/deletes')
|
||||
}
|
||||
|
||||
$response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE);
|
||||
}, ['response']);
|
||||
});
|
||||
|
||||
App::get('/v1/health/queue/mails')
|
||||
->desc('Get mails queue')
|
||||
->groups(['api', 'health'])
|
||||
->label('scope', 'health.read')
|
||||
->label('sdk', new Method(
|
||||
auth: [AuthType::KEY],
|
||||
namespace: 'health',
|
||||
group: 'queue',
|
||||
name: 'getQueueMails',
|
||||
description: '/docs/references/health/get-queue-mails.md',
|
||||
auth: [AuthType::KEY],
|
||||
responses: [
|
||||
new SDKResponse(
|
||||
code: Response::STATUS_CODE_OK,
|
||||
@@ -607,18 +600,18 @@ App::get('/v1/health/queue/mails')
|
||||
}
|
||||
|
||||
$response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE);
|
||||
}, ['response']);
|
||||
});
|
||||
|
||||
App::get('/v1/health/queue/messaging')
|
||||
->desc('Get messaging queue')
|
||||
->groups(['api', 'health'])
|
||||
->label('scope', 'health.read')
|
||||
->label('sdk', new Method(
|
||||
auth: [AuthType::KEY],
|
||||
namespace: 'health',
|
||||
group: 'queue',
|
||||
name: 'getQueueMessaging',
|
||||
description: '/docs/references/health/get-queue-messaging.md',
|
||||
auth: [AuthType::KEY],
|
||||
responses: [
|
||||
new SDKResponse(
|
||||
code: Response::STATUS_CODE_OK,
|
||||
@@ -640,18 +633,18 @@ App::get('/v1/health/queue/messaging')
|
||||
}
|
||||
|
||||
$response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE);
|
||||
}, ['response']);
|
||||
});
|
||||
|
||||
App::get('/v1/health/queue/migrations')
|
||||
->desc('Get migrations queue')
|
||||
->groups(['api', 'health'])
|
||||
->label('scope', 'health.read')
|
||||
->label('sdk', new Method(
|
||||
auth: [AuthType::KEY],
|
||||
namespace: 'health',
|
||||
group: 'queue',
|
||||
name: 'getQueueMigrations',
|
||||
description: '/docs/references/health/get-queue-migrations.md',
|
||||
auth: [AuthType::KEY],
|
||||
responses: [
|
||||
new SDKResponse(
|
||||
code: Response::STATUS_CODE_OK,
|
||||
@@ -673,18 +666,18 @@ App::get('/v1/health/queue/migrations')
|
||||
}
|
||||
|
||||
$response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE);
|
||||
}, ['response']);
|
||||
});
|
||||
|
||||
App::get('/v1/health/queue/functions')
|
||||
->desc('Get functions queue')
|
||||
->groups(['api', 'health'])
|
||||
->label('scope', 'health.read')
|
||||
->label('sdk', new Method(
|
||||
auth: [AuthType::KEY],
|
||||
namespace: 'health',
|
||||
group: 'queue',
|
||||
name: 'getQueueFunctions',
|
||||
description: '/docs/references/health/get-queue-functions.md',
|
||||
auth: [AuthType::KEY],
|
||||
responses: [
|
||||
new SDKResponse(
|
||||
code: Response::STATUS_CODE_OK,
|
||||
@@ -706,18 +699,18 @@ App::get('/v1/health/queue/functions')
|
||||
}
|
||||
|
||||
$response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE);
|
||||
}, ['response']);
|
||||
});
|
||||
|
||||
App::get('/v1/health/queue/stats-resources')
|
||||
->desc('Get stats resources queue')
|
||||
->groups(['api', 'health'])
|
||||
->label('scope', 'health.read')
|
||||
->label('sdk', new Method(
|
||||
auth: [AuthType::KEY],
|
||||
namespace: 'health',
|
||||
group: 'queue',
|
||||
name: 'getQueueStatsResources',
|
||||
description: '/docs/references/health/get-queue-stats-resources.md',
|
||||
auth: [AuthType::KEY],
|
||||
responses: [
|
||||
new SDKResponse(
|
||||
code: Response::STATUS_CODE_OK,
|
||||
@@ -746,11 +739,11 @@ App::get('/v1/health/queue/stats-usage')
|
||||
->groups(['api', 'health'])
|
||||
->label('scope', 'health.read')
|
||||
->label('sdk', new Method(
|
||||
auth: [AuthType::KEY],
|
||||
namespace: 'health',
|
||||
group: 'queue',
|
||||
name: 'getQueueUsage',
|
||||
description: '/docs/references/health/get-queue-stats-usage.md',
|
||||
auth: [AuthType::KEY],
|
||||
responses: [
|
||||
new SDKResponse(
|
||||
code: Response::STATUS_CODE_OK,
|
||||
@@ -779,11 +772,11 @@ App::get('/v1/health/storage/local')
|
||||
->groups(['api', 'health'])
|
||||
->label('scope', 'health.read')
|
||||
->label('sdk', new Method(
|
||||
auth: [AuthType::KEY],
|
||||
namespace: 'health',
|
||||
group: 'storage',
|
||||
name: 'getStorageLocal',
|
||||
description: '/docs/references/health/get-storage-local.md',
|
||||
auth: [AuthType::KEY],
|
||||
responses: [
|
||||
new SDKResponse(
|
||||
code: Response::STATUS_CODE_OK,
|
||||
@@ -829,11 +822,11 @@ App::get('/v1/health/storage')
|
||||
->groups(['api', 'health'])
|
||||
->label('scope', 'health.read')
|
||||
->label('sdk', new Method(
|
||||
auth: [AuthType::KEY],
|
||||
namespace: 'health',
|
||||
group: 'storage',
|
||||
name: 'getStorage',
|
||||
description: '/docs/references/health/get-storage.md',
|
||||
auth: [AuthType::KEY],
|
||||
responses: [
|
||||
new SDKResponse(
|
||||
code: Response::STATUS_CODE_OK,
|
||||
@@ -878,11 +871,11 @@ App::get('/v1/health/anti-virus')
|
||||
->groups(['api', 'health'])
|
||||
->label('scope', 'health.read')
|
||||
->label('sdk', new Method(
|
||||
auth: [AuthType::KEY],
|
||||
namespace: 'health',
|
||||
group: 'health',
|
||||
name: 'getAntivirus',
|
||||
description: '/docs/references/health/get-storage-anti-virus.md',
|
||||
auth: [AuthType::KEY],
|
||||
responses: [
|
||||
new SDKResponse(
|
||||
code: Response::STATUS_CODE_OK,
|
||||
@@ -924,11 +917,11 @@ App::get('/v1/health/queue/failed/:name')
|
||||
->groups(['api', 'health'])
|
||||
->label('scope', 'health.read')
|
||||
->label('sdk', new Method(
|
||||
auth: [AuthType::KEY],
|
||||
namespace: 'health',
|
||||
group: 'queue',
|
||||
name: 'getFailedJobs',
|
||||
description: '/docs/references/health/get-failed-queue-jobs.md',
|
||||
auth: [AuthType::KEY],
|
||||
responses: [
|
||||
new SDKResponse(
|
||||
code: Response::STATUS_CODE_OK,
|
||||
|
||||
@@ -24,6 +24,7 @@ use Utopia\App;
|
||||
use Utopia\Audit\Audit;
|
||||
use Utopia\Cache\Cache;
|
||||
use Utopia\Config\Config;
|
||||
use Utopia\Database\Adapter\Pool as DatabasePool;
|
||||
use Utopia\Database\Database;
|
||||
use Utopia\Database\DateTime;
|
||||
use Utopia\Database\Document;
|
||||
@@ -223,7 +224,7 @@ App::post('/v1/projects')
|
||||
$sharedTables = $sharedTablesV1 || $sharedTablesV2;
|
||||
|
||||
if (!$sharedTablesV2) {
|
||||
$adapter = $pools->get($dsn->getHost())->pop()->getResource();
|
||||
$adapter = new DatabasePool($pools->get($dsn->getHost()));
|
||||
$dbForProject = new Database($adapter, $cache);
|
||||
|
||||
if ($sharedTables) {
|
||||
|
||||
@@ -1241,12 +1241,15 @@ App::get('/v1/storage/buckets/:bucketId/files/:fileId/download')
|
||||
if (!empty($source)) {
|
||||
if (!empty($rangeHeader)) {
|
||||
$response->send(substr($source, $start, ($end - $start + 1)));
|
||||
return;
|
||||
}
|
||||
$response->send($source);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!empty($rangeHeader)) {
|
||||
$response->send($deviceForFiles->read($path, $start, ($end - $start + 1)));
|
||||
return;
|
||||
}
|
||||
|
||||
if ($size > APP_STORAGE_READ_BUFFER) {
|
||||
@@ -1407,6 +1410,7 @@ App::get('/v1/storage/buckets/:bucketId/files/:fileId/view')
|
||||
if (!empty($source)) {
|
||||
if (!empty($rangeHeader)) {
|
||||
$response->send(substr($source, $start, ($end - $start + 1)));
|
||||
return;
|
||||
}
|
||||
$response->send($source);
|
||||
return;
|
||||
@@ -1558,6 +1562,7 @@ App::get('/v1/storage/buckets/:bucketId/files/:fileId/push')
|
||||
if (!empty($source)) {
|
||||
if (!empty($rangeHeader)) {
|
||||
$response->send(substr($source, $start, ($end - $start + 1)));
|
||||
return;
|
||||
}
|
||||
$response->send($source);
|
||||
return;
|
||||
|
||||
+7
-12
@@ -15,9 +15,11 @@ use Utopia\Audit\Audit;
|
||||
use Utopia\CLI\Console;
|
||||
use Utopia\Compression\Compression;
|
||||
use Utopia\Config\Config;
|
||||
use Utopia\Database\Adapter\Pool as DatabasePool;
|
||||
use Utopia\Database\Database;
|
||||
use Utopia\Database\DateTime;
|
||||
use Utopia\Database\Document;
|
||||
use Utopia\Database\Exception\Duplicate as DuplicateException;
|
||||
use Utopia\Database\Helpers\ID;
|
||||
use Utopia\Database\Helpers\Permission;
|
||||
use Utopia\Database\Helpers\Role;
|
||||
@@ -170,7 +172,7 @@ function createDatabase(App $app, string $resourceKey, string $dbName, array $co
|
||||
$sleep = 1;
|
||||
$attempts = 0;
|
||||
|
||||
do {
|
||||
while (true) {
|
||||
try {
|
||||
$attempts++;
|
||||
$resource = $app->getResource($resourceKey);
|
||||
@@ -179,13 +181,12 @@ function createDatabase(App $app, string $resourceKey, string $dbName, array $co
|
||||
break; // exit loop on success
|
||||
} catch (\Exception $e) {
|
||||
Console::warning(" └── Database not ready. Retrying connection ({$attempts})...");
|
||||
$pools->reclaim();
|
||||
if ($attempts >= $max) {
|
||||
throw new \Exception(' └── Failed to connect to database: ' . $e->getMessage());
|
||||
}
|
||||
sleep($sleep);
|
||||
}
|
||||
} while ($attempts < $max);
|
||||
}
|
||||
|
||||
Console::success("[Setup] - $dbName database init started...");
|
||||
|
||||
@@ -368,11 +369,7 @@ $http->on(Constant::EVENT_START, function (Server $http) use ($payloadSize, $reg
|
||||
$cache = $app->getResource('cache');
|
||||
|
||||
foreach ($sharedTablesV2 as $hostname) {
|
||||
$adapter = $pools
|
||||
->get($hostname)
|
||||
->pop()
|
||||
->getResource();
|
||||
|
||||
$adapter = new DatabasePool($pools->get($hostname));
|
||||
$dbForProject = (new Database($adapter, $cache))
|
||||
->setDatabase('appwrite')
|
||||
->setSharedTables(true)
|
||||
@@ -382,7 +379,7 @@ $http->on(Constant::EVENT_START, function (Server $http) use ($payloadSize, $reg
|
||||
try {
|
||||
Console::success('[Setup] - Creating project database: ' . $hostname . '...');
|
||||
$dbForProject->create();
|
||||
} catch (Duplicate) {
|
||||
} catch (DuplicateException) {
|
||||
Console::success('[Setup] - Skip: metadata table already exists');
|
||||
}
|
||||
|
||||
@@ -408,7 +405,6 @@ $http->on(Constant::EVENT_START, function (Server $http) use ($payloadSize, $reg
|
||||
}
|
||||
}
|
||||
|
||||
$pools->reclaim();
|
||||
Console::success('[Setup] - Server database init completed...');
|
||||
});
|
||||
|
||||
@@ -523,6 +519,7 @@ $http->on(Constant::EVENT_REQUEST, function (SwooleRequest $swooleRequest, Swool
|
||||
Console::error('[Error] Message: ' . $th->getMessage());
|
||||
Console::error('[Error] File: ' . $th->getFile());
|
||||
Console::error('[Error] Line: ' . $th->getLine());
|
||||
Console::error('[Error] Trace: ' . $th->getTraceAsString());
|
||||
|
||||
$swooleResponse->setStatusCode(500);
|
||||
|
||||
@@ -540,8 +537,6 @@ $http->on(Constant::EVENT_REQUEST, function (SwooleRequest $swooleRequest, Swool
|
||||
];
|
||||
|
||||
$swooleResponse->end(\json_encode($output));
|
||||
} finally {
|
||||
$pools->reclaim();
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
@@ -216,13 +216,13 @@ $register->set('pools', function () {
|
||||
'mysql',
|
||||
'mariadb' => function () use ($dsnHost, $dsnPort, $dsnUser, $dsnPass, $dsnDatabase) {
|
||||
return new PDOProxy(function () use ($dsnHost, $dsnPort, $dsnUser, $dsnPass, $dsnDatabase) {
|
||||
return new PDO("mysql:host={$dsnHost};port={$dsnPort};dbname={$dsnDatabase};charset=utf8mb4", $dsnUser, $dsnPass, array(
|
||||
return new PDO("mysql:host={$dsnHost};port={$dsnPort};dbname={$dsnDatabase};charset=utf8mb4", $dsnUser, $dsnPass, [
|
||||
\PDO::ATTR_TIMEOUT => 3, // Seconds
|
||||
\PDO::ATTR_PERSISTENT => false,
|
||||
\PDO::ATTR_DEFAULT_FETCH_MODE => \PDO::FETCH_ASSOC,
|
||||
\PDO::ATTR_EMULATE_PREPARES => true,
|
||||
\PDO::ATTR_STRINGIFY_FETCHES => true
|
||||
));
|
||||
]);
|
||||
});
|
||||
},
|
||||
'redis' => function () use ($dsnHost, $dsnPort, $dsnPass) {
|
||||
|
||||
+17
-35
@@ -24,10 +24,12 @@ use Appwrite\Utopia\Request;
|
||||
use Executor\Executor;
|
||||
use Utopia\Abuse\Adapters\TimeLimit\Redis as TimeLimitRedis;
|
||||
use Utopia\App;
|
||||
use Utopia\Cache\Adapter\Pool as CachePool;
|
||||
use Utopia\Cache\Adapter\Sharding;
|
||||
use Utopia\Cache\Cache;
|
||||
use Utopia\CLI\Console;
|
||||
use Utopia\Config\Config;
|
||||
use Utopia\Database\Adapter\Pool as DatabasePool;
|
||||
use Utopia\Database\Database;
|
||||
use Utopia\Database\DateTime as DatabaseDateTime;
|
||||
use Utopia\Database\Document;
|
||||
@@ -38,6 +40,7 @@ use Utopia\DSN\DSN;
|
||||
use Utopia\Locale\Locale;
|
||||
use Utopia\Logger\Log;
|
||||
use Utopia\Pools\Group;
|
||||
use Utopia\Queue\Broker\Pool as BrokerPool;
|
||||
use Utopia\Queue\Publisher;
|
||||
use Utopia\Storage\Device;
|
||||
use Utopia\Storage\Device\AWS;
|
||||
@@ -74,10 +77,10 @@ App::setResource('localeCodes', function () {
|
||||
|
||||
// Queues
|
||||
App::setResource('publisher', function (Group $pools) {
|
||||
return $pools->get('publisher')->pop()->getResource();
|
||||
return new BrokerPool(publisher: $pools->get('publisher'));
|
||||
}, ['pools']);
|
||||
App::setResource('consumer', function (Group $pools) {
|
||||
return $pools->get('consumer')->pop()->getResource();
|
||||
return new BrokerPool(consumer: $pools->get('consumer'));
|
||||
}, ['pools']);
|
||||
App::setResource('queueForMessaging', function (Publisher $publisher) {
|
||||
return new Messaging($publisher);
|
||||
@@ -331,12 +334,8 @@ App::setResource('dbForProject', function (Group $pools, Database $dbForPlatform
|
||||
$dsn = new DSN('mysql://' . $project->getAttribute('database'));
|
||||
}
|
||||
|
||||
$dbAdapter = $pools
|
||||
->get($dsn->getHost())
|
||||
->pop()
|
||||
->getResource();
|
||||
|
||||
$database = new Database($dbAdapter, $cache);
|
||||
$adapter = new DatabasePool($pools->get($dsn->getHost()));
|
||||
$database = new Database($adapter, $cache);
|
||||
|
||||
$database
|
||||
->setMetadata('host', \gethostname())
|
||||
@@ -362,12 +361,8 @@ App::setResource('dbForProject', function (Group $pools, Database $dbForPlatform
|
||||
}, ['pools', 'dbForPlatform', 'cache', 'project']);
|
||||
|
||||
App::setResource('dbForPlatform', function (Group $pools, Cache $cache) {
|
||||
$dbAdapter = $pools
|
||||
->get('console')
|
||||
->pop()
|
||||
->getResource();
|
||||
|
||||
$database = new Database($dbAdapter, $cache);
|
||||
$adapter = new DatabasePool($pools->get('console'));
|
||||
$database = new Database($adapter, $cache);
|
||||
|
||||
$database
|
||||
->setNamespace('_console')
|
||||
@@ -380,7 +375,7 @@ App::setResource('dbForPlatform', function (Group $pools, Cache $cache) {
|
||||
}, ['pools', 'cache']);
|
||||
|
||||
App::setResource('getProjectDB', function (Group $pools, Database $dbForPlatform, $cache) {
|
||||
$databases = []; // TODO: @Meldiron This should probably be responsibility of utopia-php/pools
|
||||
$databases = [];
|
||||
|
||||
return function (Document $project) use ($pools, $dbForPlatform, $cache, &$databases) {
|
||||
if ($project->isEmpty() || $project->getId() === 'console') {
|
||||
@@ -422,12 +417,8 @@ App::setResource('getProjectDB', function (Group $pools, Database $dbForPlatform
|
||||
return $database;
|
||||
}
|
||||
|
||||
$dbAdapter = $pools
|
||||
->get($dsn->getHost())
|
||||
->pop()
|
||||
->getResource();
|
||||
|
||||
$database = new Database($dbAdapter, $cache);
|
||||
$adapter = new DatabasePool($pools->get($dsn->getHost()));
|
||||
$database = new Database($adapter, $cache);
|
||||
$databases[$dsn->getHost()] = $database;
|
||||
$configure($database);
|
||||
|
||||
@@ -437,21 +428,15 @@ App::setResource('getProjectDB', function (Group $pools, Database $dbForPlatform
|
||||
|
||||
App::setResource('getLogsDB', function (Group $pools, Cache $cache) {
|
||||
$database = null;
|
||||
return function (?Document $project = null) use ($pools, $cache, $database) {
|
||||
|
||||
return function (?Document $project = null) use ($pools, $cache, &$database) {
|
||||
if ($database !== null && $project !== null && !$project->isEmpty() && $project->getId() !== 'console') {
|
||||
$database->setTenant($project->getInternalId());
|
||||
return $database;
|
||||
}
|
||||
|
||||
$dbAdapter = $pools
|
||||
->get('logs')
|
||||
->pop()
|
||||
->getResource();
|
||||
|
||||
$database = new Database(
|
||||
$dbAdapter,
|
||||
$cache
|
||||
);
|
||||
$adapter = new DatabasePool($pools->get('logs'));
|
||||
$database = new Database($adapter, $cache);
|
||||
|
||||
$database
|
||||
->setSharedTables(true)
|
||||
@@ -475,10 +460,7 @@ App::setResource('cache', function (Group $pools) {
|
||||
$adapters = [];
|
||||
|
||||
foreach ($list as $value) {
|
||||
$adapters[] = $pools
|
||||
->get($value)
|
||||
->pop()
|
||||
->getResource();
|
||||
$adapters[] = new CachePool($pools->get($value));
|
||||
}
|
||||
|
||||
return new Cache(new Sharding($adapters));
|
||||
|
||||
+65
-45
@@ -5,6 +5,7 @@ use Appwrite\Extend\Exception;
|
||||
use Appwrite\Extend\Exception as AppwriteException;
|
||||
use Appwrite\Messaging\Adapter\Realtime;
|
||||
use Appwrite\Network\Validator\Origin;
|
||||
use Appwrite\PubSub\Adapter\Pool as PubSubPool;
|
||||
use Appwrite\Utopia\Request;
|
||||
use Appwrite\Utopia\Response;
|
||||
use Swoole\Http\Request as SwooleRequest;
|
||||
@@ -15,10 +16,12 @@ use Swoole\Timer;
|
||||
use Utopia\Abuse\Abuse;
|
||||
use Utopia\Abuse\Adapters\TimeLimit\Redis as TimeLimitRedis;
|
||||
use Utopia\App;
|
||||
use Utopia\Cache\Adapter\Pool as CachePool;
|
||||
use Utopia\Cache\Adapter\Sharding;
|
||||
use Utopia\Cache\Cache;
|
||||
use Utopia\CLI\Console;
|
||||
use Utopia\Config\Config;
|
||||
use Utopia\Database\Adapter\Pool as DatabasePool;
|
||||
use Utopia\Database\Database;
|
||||
use Utopia\Database\DateTime;
|
||||
use Utopia\Database\Document;
|
||||
@@ -28,13 +31,15 @@ use Utopia\Database\Query;
|
||||
use Utopia\Database\Validator\Authorization;
|
||||
use Utopia\DSN\DSN;
|
||||
use Utopia\Logger\Log;
|
||||
use Utopia\Pools\Group;
|
||||
use Utopia\Registry\Registry;
|
||||
use Utopia\System\System;
|
||||
use Utopia\Telemetry\Adapter\None as NoTelemetry;
|
||||
use Utopia\WebSocket\Adapter;
|
||||
use Utopia\WebSocket\Server;
|
||||
|
||||
/**
|
||||
* @var \Utopia\Registry\Registry $register
|
||||
* @var Registry $register
|
||||
*/
|
||||
require_once __DIR__ . '/init.php';
|
||||
|
||||
@@ -46,17 +51,17 @@ if (!function_exists('getConsoleDB')) {
|
||||
{
|
||||
global $register;
|
||||
|
||||
/** @var \Utopia\Pools\Group $pools */
|
||||
static $database = null;
|
||||
|
||||
if ($database !== null) {
|
||||
return $database;
|
||||
}
|
||||
|
||||
/** @var Group $pools */
|
||||
$pools = $register->get('pools');
|
||||
|
||||
$dbAdapter = $pools
|
||||
->get('console')
|
||||
->pop()
|
||||
->getResource()
|
||||
;
|
||||
|
||||
$database = new Database($dbAdapter, getCache());
|
||||
|
||||
$adapter = new DatabasePool($pools->get('console'));
|
||||
$database = new Database($adapter, getCache());
|
||||
$database
|
||||
->setNamespace('_console')
|
||||
->setMetadata('host', \gethostname())
|
||||
@@ -72,7 +77,13 @@ if (!function_exists('getProjectDB')) {
|
||||
{
|
||||
global $register;
|
||||
|
||||
/** @var \Utopia\Pools\Group $pools */
|
||||
static $databases = [];
|
||||
|
||||
if (isset($databases[$project->getInternalId()])) {
|
||||
return $databases[$project->getInternalId()];
|
||||
}
|
||||
|
||||
/** @var Group $pools */
|
||||
$pools = $register->get('pools');
|
||||
|
||||
if ($project->isEmpty() || $project->getId() === 'console') {
|
||||
@@ -86,11 +97,7 @@ if (!function_exists('getProjectDB')) {
|
||||
$dsn = new DSN('mysql://' . $project->getAttribute('database'));
|
||||
}
|
||||
|
||||
$adapter = $pools
|
||||
->get($dsn->getHost())
|
||||
->pop()
|
||||
->getResource();
|
||||
|
||||
$adapter = new DatabasePool($pools->get($dsn->getHost()));
|
||||
$database = new Database($adapter, getCache());
|
||||
|
||||
$sharedTables = \explode(',', System::getEnv('_APP_DATABASE_SHARED_TABLES', ''));
|
||||
@@ -111,7 +118,7 @@ if (!function_exists('getProjectDB')) {
|
||||
->setMetadata('host', \gethostname())
|
||||
->setMetadata('project', $project->getId());
|
||||
|
||||
return $database;
|
||||
return $databases[$project->getInternalId()] = $database;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -121,20 +128,22 @@ if (!function_exists('getCache')) {
|
||||
{
|
||||
global $register;
|
||||
|
||||
$pools = $register->get('pools'); /** @var \Utopia\Pools\Group $pools */
|
||||
static $cache = null;
|
||||
|
||||
if ($cache !== null) {
|
||||
return $cache;
|
||||
}
|
||||
|
||||
$pools = $register->get('pools'); /** @var Group $pools */
|
||||
|
||||
$list = Config::getParam('pools-cache', []);
|
||||
$adapters = [];
|
||||
|
||||
foreach ($list as $value) {
|
||||
$adapters[] = $pools
|
||||
->get($value)
|
||||
->pop()
|
||||
->getResource()
|
||||
;
|
||||
$adapters[] = new CachePool($pools->get($value));
|
||||
}
|
||||
|
||||
return new Cache(new Sharding($adapters));
|
||||
return $cache = new Cache(new Sharding($adapters));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -142,6 +151,12 @@ if (!function_exists('getCache')) {
|
||||
if (!function_exists('getRedis')) {
|
||||
function getRedis(): \Redis
|
||||
{
|
||||
static $redis = null;
|
||||
|
||||
if ($redis !== null) {
|
||||
return $redis;
|
||||
}
|
||||
|
||||
$host = System::getEnv('_APP_REDIS_HOST', 'localhost');
|
||||
$port = System::getEnv('_APP_REDIS_PORT', 6379);
|
||||
$pass = System::getEnv('_APP_REDIS_PASS', '');
|
||||
@@ -160,21 +175,39 @@ if (!function_exists('getRedis')) {
|
||||
if (!function_exists('getTimelimit')) {
|
||||
function getTimelimit(): TimeLimitRedis
|
||||
{
|
||||
return new TimeLimitRedis("", 0, 1, getRedis());
|
||||
static $timelimit = null;
|
||||
|
||||
if ($timelimit !== null) {
|
||||
return $timelimit;
|
||||
}
|
||||
|
||||
return $timelimit = new TimeLimitRedis("", 0, 1, getRedis());
|
||||
}
|
||||
}
|
||||
|
||||
if (!function_exists('getRealtime')) {
|
||||
function getRealtime(): Realtime
|
||||
{
|
||||
return new Realtime();
|
||||
static $realtime = null;
|
||||
|
||||
if ($realtime !== null) {
|
||||
return $realtime;
|
||||
}
|
||||
|
||||
return $realtime = new Realtime();
|
||||
}
|
||||
}
|
||||
|
||||
if (!function_exists('getTelemetry')) {
|
||||
function getTelemetry(int $workerId): Utopia\Telemetry\Adapter
|
||||
{
|
||||
return new NoTelemetry();
|
||||
static $telemetry = null;
|
||||
|
||||
if ($telemetry !== null) {
|
||||
return $telemetry;
|
||||
}
|
||||
|
||||
return $telemetry = new NoTelemetry();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -273,7 +306,6 @@ $server->onStart(function () use ($stats, $register, $containerId, &$statsDocume
|
||||
sleep(DATABASE_RECONNECT_SLEEP);
|
||||
}
|
||||
} while (true);
|
||||
$register->get('pools')->reclaim();
|
||||
});
|
||||
|
||||
/**
|
||||
@@ -299,9 +331,7 @@ $server->onStart(function () use ($stats, $register, $containerId, &$statsDocume
|
||||
|
||||
Authorization::skip(fn () => $database->updateDocument('realtime', $statsDocument->getId(), $statsDocument));
|
||||
} catch (Throwable $th) {
|
||||
call_user_func($logError, $th, "updateWorkerDocument");
|
||||
} finally {
|
||||
$register->get('pools')->reclaim();
|
||||
$logError($th, "updateWorkerDocument");
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -370,8 +400,6 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
|
||||
'data' => $event['data']
|
||||
]));
|
||||
}
|
||||
|
||||
$register->get('pools')->reclaim();
|
||||
}
|
||||
}
|
||||
/**
|
||||
@@ -407,8 +435,8 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
|
||||
}
|
||||
$start = time();
|
||||
|
||||
/** @var \Appwrite\PubSub\Adapter $pubsub */
|
||||
$pubsub = $register->get('pools')->get('pubsub')->pop()->getResource();
|
||||
$pubsub = new PubSubPool($register->get('pools')->get('pubsub'));
|
||||
|
||||
if ($pubsub->ping(true)) {
|
||||
$attempts = 0;
|
||||
Console::success('Pub/sub connection established (worker: ' . $workerId . ')');
|
||||
@@ -436,8 +464,6 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
|
||||
|
||||
$realtime->unsubscribe($connection);
|
||||
$realtime->subscribe($projectId, $connection, $roles, $channels);
|
||||
|
||||
$register->get('pools')->reclaim();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -463,14 +489,12 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
|
||||
}
|
||||
});
|
||||
} catch (Throwable $th) {
|
||||
call_user_func($logError, $th, "pubSubConnection");
|
||||
$logError($th, "pubSubConnection");
|
||||
|
||||
Console::error('Pub/sub error: ' . $th->getMessage());
|
||||
$attempts++;
|
||||
sleep(DATABASE_RECONNECT_SLEEP);
|
||||
continue;
|
||||
} finally {
|
||||
$register->get('pools')->reclaim();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -572,7 +596,7 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server,
|
||||
$stats->incr($project->getId(), 'connections');
|
||||
$stats->incr($project->getId(), 'connectionsTotal');
|
||||
} catch (Throwable $th) {
|
||||
call_user_func($logError, $th, "initServer");
|
||||
$logError($th, "initServer");
|
||||
|
||||
// Handle SQL error code is 'HY000'
|
||||
$code = $th->getCode();
|
||||
@@ -596,8 +620,6 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server,
|
||||
Console::error('[Error] Code: ' . $response['data']['code']);
|
||||
Console::error('[Error] Message: ' . $response['data']['message']);
|
||||
}
|
||||
} finally {
|
||||
$register->get('pools')->reclaim();
|
||||
}
|
||||
});
|
||||
|
||||
@@ -696,8 +718,6 @@ $server->onMessage(function (int $connection, string $message) use ($server, $re
|
||||
if ($th->getCode() === 1008) {
|
||||
$server->close($connection, $th->getCode());
|
||||
}
|
||||
} finally {
|
||||
$register->get('pools')->reclaim();
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
+17
-52
@@ -20,10 +20,12 @@ use Appwrite\Platform\Appwrite;
|
||||
use Executor\Executor;
|
||||
use Swoole\Runtime;
|
||||
use Utopia\Abuse\Adapters\TimeLimit\Redis as TimeLimitRedis;
|
||||
use Utopia\Cache\Adapter\Pool as CachePool;
|
||||
use Utopia\Cache\Adapter\Sharding;
|
||||
use Utopia\Cache\Cache;
|
||||
use Utopia\CLI\Console;
|
||||
use Utopia\Config\Config;
|
||||
use Utopia\Database\Adapter\Pool as DatabasePool;
|
||||
use Utopia\Database\Database;
|
||||
use Utopia\Database\DateTime;
|
||||
use Utopia\Database\Document;
|
||||
@@ -33,6 +35,7 @@ use Utopia\Logger\Log;
|
||||
use Utopia\Logger\Logger;
|
||||
use Utopia\Platform\Service;
|
||||
use Utopia\Pools\Group;
|
||||
use Utopia\Queue\Broker\Pool as BrokerPool;
|
||||
use Utopia\Queue\Message;
|
||||
use Utopia\Queue\Publisher;
|
||||
use Utopia\Queue\Server;
|
||||
@@ -41,21 +44,17 @@ use Utopia\System\System;
|
||||
use Utopia\Telemetry\Adapter\None as NoTelemetry;
|
||||
|
||||
Authorization::disable();
|
||||
Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
|
||||
Runtime::enableCoroutine();
|
||||
|
||||
Server::setResource('register', fn () => $register);
|
||||
|
||||
Server::setResource('dbForPlatform', function (Cache $cache, Registry $register) {
|
||||
$pools = $register->get('pools');
|
||||
$database = $pools
|
||||
->get('console')
|
||||
->pop()
|
||||
->getResource();
|
||||
$adapter = new DatabasePool($pools->get('console'));
|
||||
$dbForPlatform = new Database($adapter, $cache);
|
||||
$dbForPlatform->setNamespace('_console');
|
||||
|
||||
$adapter = new Database($database, $cache);
|
||||
$adapter->setNamespace('_console');
|
||||
|
||||
return $adapter;
|
||||
return $dbForPlatform;
|
||||
}, ['cache', 'register']);
|
||||
|
||||
Server::setResource('project', function (Message $message, Database $dbForPlatform) {
|
||||
@@ -83,20 +82,9 @@ Server::setResource('dbForProject', function (Cache $cache, Registry $register,
|
||||
$dsn = new DSN('mysql://' . $project->getAttribute('database'));
|
||||
}
|
||||
|
||||
$adapter = $pools
|
||||
->get($dsn->getHost())
|
||||
->pop()
|
||||
->getResource();
|
||||
|
||||
$adapter = new DatabasePool($pools->get($dsn->getHost()));
|
||||
$database = new Database($adapter, $cache);
|
||||
|
||||
try {
|
||||
$dsn = new DSN($project->getAttribute('database'));
|
||||
} catch (\InvalidArgumentException) {
|
||||
// TODO: Temporary until all projects are using shared tables
|
||||
$dsn = new DSN('mysql://' . $project->getAttribute('database'));
|
||||
}
|
||||
|
||||
$sharedTables = \explode(',', System::getEnv('_APP_DATABASE_SHARED_TABLES', ''));
|
||||
|
||||
if (\in_array($dsn->getHost(), $sharedTables)) {
|
||||
@@ -151,12 +139,8 @@ Server::setResource('getProjectDB', function (Group $pools, Database $dbForPlatf
|
||||
return $database;
|
||||
}
|
||||
|
||||
$dbAdapter = $pools
|
||||
->get($dsn->getHost())
|
||||
->pop()
|
||||
->getResource();
|
||||
|
||||
$database = new Database($dbAdapter, $cache);
|
||||
$adapter = new DatabasePool($pools->get($dsn->getHost()));
|
||||
$database = new Database($adapter, $cache);
|
||||
|
||||
$databases[$dsn->getHost()] = $database;
|
||||
|
||||
@@ -188,15 +172,8 @@ Server::setResource('getLogsDB', function (Group $pools, Cache $cache) {
|
||||
return $database;
|
||||
}
|
||||
|
||||
$dbAdapter = $pools
|
||||
->get('logs')
|
||||
->pop()
|
||||
->getResource();
|
||||
|
||||
$database = new Database(
|
||||
$dbAdapter,
|
||||
$cache
|
||||
);
|
||||
$adapter = new DatabasePool($pools->get('logs'));
|
||||
$database = new Database($adapter, $cache);
|
||||
|
||||
$database
|
||||
->setSharedTables(true)
|
||||
@@ -234,11 +211,7 @@ Server::setResource('cache', function (Registry $register) {
|
||||
$adapters = [];
|
||||
|
||||
foreach ($list as $value) {
|
||||
$adapters[] = $pools
|
||||
->get($value)
|
||||
->pop()
|
||||
->getResource()
|
||||
;
|
||||
$adapters[] = new CachePool($pools->get($value));
|
||||
}
|
||||
|
||||
return new Cache(new Sharding($adapters));
|
||||
@@ -268,11 +241,11 @@ Server::setResource('timelimit', function (\Redis $redis) {
|
||||
Server::setResource('log', fn () => new Log());
|
||||
|
||||
Server::setResource('publisher', function (Group $pools) {
|
||||
return $pools->get('publisher')->pop()->getResource();
|
||||
return new BrokerPool(publisher: $pools->get('publisher'));
|
||||
}, ['pools']);
|
||||
|
||||
Server::setResource('consumer', function (Group $pools) {
|
||||
return $pools->get('consumer')->pop()->getResource();
|
||||
return new BrokerPool(consumer: $pools->get('consumer'));
|
||||
}, ['pools']);
|
||||
|
||||
Server::setResource('queueForStatsUsage', function (Publisher $publisher) {
|
||||
@@ -463,13 +436,6 @@ try {
|
||||
|
||||
$worker = $platform->getWorker();
|
||||
|
||||
$worker
|
||||
->shutdown()
|
||||
->inject('pools')
|
||||
->action(function (Group $pools) {
|
||||
$pools->reclaim();
|
||||
});
|
||||
|
||||
$worker
|
||||
->error()
|
||||
->inject('error')
|
||||
@@ -477,8 +443,7 @@ $worker
|
||||
->inject('log')
|
||||
->inject('pools')
|
||||
->inject('project')
|
||||
->action(function (Throwable $error, ?Logger $logger, Log $log, Group $pools, Document $project) use ($queueName) {
|
||||
$pools->reclaim();
|
||||
->action(function (Throwable $error, ?Logger $logger, Log $log, Group $pools, Document $project) use ($worker, $queueName) {
|
||||
$version = System::getEnv('_APP_VERSION', 'UNKNOWN');
|
||||
|
||||
if ($logger) {
|
||||
|
||||
@@ -286,13 +286,6 @@ class Event
|
||||
return $this;
|
||||
}
|
||||
|
||||
public function setParamSensitive(string $key): self
|
||||
{
|
||||
$this->sensitive[$key] = true;
|
||||
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get param of event.
|
||||
*
|
||||
|
||||
@@ -2,14 +2,14 @@
|
||||
|
||||
namespace Appwrite\Messaging\Adapter;
|
||||
|
||||
use Appwrite\Messaging\Adapter;
|
||||
use Appwrite\Messaging\Adapter as MessagingAdapter;
|
||||
use Appwrite\PubSub\Adapter\Pool as PubSubPool;
|
||||
use Utopia\Database\DateTime;
|
||||
use Utopia\Database\Document;
|
||||
use Utopia\Database\Helpers\ID;
|
||||
use Utopia\Database\Helpers\Role;
|
||||
use Utopia\Pools\Pool;
|
||||
|
||||
class Realtime extends Adapter
|
||||
class Realtime extends MessagingAdapter
|
||||
{
|
||||
/**
|
||||
* Connection Tree
|
||||
@@ -36,12 +36,12 @@ class Realtime extends Adapter
|
||||
*/
|
||||
public array $subscriptions = [];
|
||||
|
||||
private Pool $pubsubPool;
|
||||
private PubSubPool $pubSubPool;
|
||||
|
||||
public function __construct()
|
||||
{
|
||||
global $register;
|
||||
$this->pubsubPool = $register->get('pools')->get('pubsub');
|
||||
$this->pubSubPool = new PubSubPool($register->get('pools')->get('pubsub'));
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -132,11 +132,12 @@ class Realtime extends Adapter
|
||||
* Sends an event to the Realtime Server
|
||||
* @param string $projectId
|
||||
* @param array $payload
|
||||
* @param string $event
|
||||
* @param array $events
|
||||
* @param array $channels
|
||||
* @param array $roles
|
||||
* @param array $options
|
||||
* @return void
|
||||
* @throws \Exception
|
||||
*/
|
||||
public function send(string $projectId, array $payload, array $events, array $channels, array $roles, array $options = []): void
|
||||
{
|
||||
@@ -147,7 +148,7 @@ class Realtime extends Adapter
|
||||
$permissionsChanged = array_key_exists('permissionsChanged', $options) && $options['permissionsChanged'];
|
||||
$userId = array_key_exists('userId', $options) ? $options['userId'] : null;
|
||||
|
||||
$message = [
|
||||
$this->pubSubPool->publish('realtime', json_encode([
|
||||
'project' => $projectId,
|
||||
'roles' => $roles,
|
||||
'permissionsChanged' => $permissionsChanged,
|
||||
@@ -158,9 +159,7 @@ class Realtime extends Adapter
|
||||
'timestamp' => DateTime::formatTz(DateTime::now()),
|
||||
'payload' => $payload
|
||||
]
|
||||
];
|
||||
|
||||
$this->pubsubPool->use(fn (\Appwrite\PubSub\Adapter $pubsub) => $pubsub->publish('realtime', json_encode($message)));
|
||||
]));
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -175,8 +174,9 @@ class Realtime extends Adapter
|
||||
* - 1,121.328 ms (±0.84%) | 1,000,000 Connections / 10,000,000 Subscriptions
|
||||
*
|
||||
* @param array $event
|
||||
* @return int[]|string[]
|
||||
*/
|
||||
public function getSubscribers(array $event)
|
||||
public function getSubscribers(array $event): array
|
||||
{
|
||||
|
||||
$receivers = [];
|
||||
@@ -230,7 +230,7 @@ class Realtime extends Adapter
|
||||
|
||||
foreach ($channels as $key => $value) {
|
||||
switch (true) {
|
||||
case strpos($key, 'account.') === 0:
|
||||
case \str_starts_with($key, 'account.'):
|
||||
unset($channels[$key]);
|
||||
break;
|
||||
|
||||
@@ -353,7 +353,6 @@ class Realtime extends Adapter
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case 'sites':
|
||||
if ($parts[2] === 'deployments') {
|
||||
$channels[] = 'console';
|
||||
@@ -361,7 +360,6 @@ class Realtime extends Adapter
|
||||
$projectId = 'console';
|
||||
$roles = [Role::team($project->getAttribute('teamId'))->toString()];
|
||||
}
|
||||
|
||||
break;
|
||||
case 'migrations':
|
||||
$channels[] = 'console';
|
||||
|
||||
@@ -3,14 +3,19 @@
|
||||
namespace Appwrite\Platform\Tasks;
|
||||
|
||||
use Appwrite\ClamAV\Network;
|
||||
use Appwrite\PubSub\Adapter;
|
||||
use Appwrite\PubSub\Adapter\Pool as PubSubPool;
|
||||
use PHPMailer\PHPMailer\PHPMailer;
|
||||
use Utopia\App;
|
||||
use Utopia\Cache\Adapter\Pool as CachePool;
|
||||
use Utopia\CLI\Console;
|
||||
use Utopia\Config\Config;
|
||||
use Utopia\Database\Adapter\Pool as DatabasePool;
|
||||
use Utopia\Domains\Domain;
|
||||
use Utopia\DSN\DSN;
|
||||
use Utopia\Logger\Logger;
|
||||
use Utopia\Platform\Action;
|
||||
use Utopia\Pools\Group;
|
||||
use Utopia\Queue\Broker\Pool as BrokerPool;
|
||||
use Utopia\Registry\Registry;
|
||||
use Utopia\Storage\Device\Local;
|
||||
use Utopia\Storage\Storage;
|
||||
@@ -89,9 +94,9 @@ class Doctor extends Action
|
||||
Console::log('🟢 Abuse protection is enabled');
|
||||
}
|
||||
|
||||
$authWhitelistRoot = System::getEnv('_APP_CONSOLE_WHITELIST_ROOT', null);
|
||||
$authWhitelistEmails = System::getEnv('_APP_CONSOLE_WHITELIST_EMAILS', null);
|
||||
$authWhitelistIPs = System::getEnv('_APP_CONSOLE_WHITELIST_IPS', null);
|
||||
$authWhitelistRoot = System::getEnv('_APP_CONSOLE_WHITELIST_ROOT');
|
||||
$authWhitelistEmails = System::getEnv('_APP_CONSOLE_WHITELIST_EMAILS');
|
||||
$authWhitelistIPs = System::getEnv('_APP_CONSOLE_WHITELIST_IPS');
|
||||
|
||||
if (
|
||||
empty($authWhitelistRoot)
|
||||
@@ -127,19 +132,16 @@ class Doctor extends Action
|
||||
} else {
|
||||
Console::log('🟢 Logging adapter is enabled (' . $providerName . ')');
|
||||
}
|
||||
} catch (\Throwable $th) {
|
||||
} catch (\Throwable) {
|
||||
Console::log('🔴 Logging adapter is misconfigured');
|
||||
}
|
||||
|
||||
\usleep(200 * 1000); // Sleep for 0.2 seconds
|
||||
|
||||
try {
|
||||
Console::log("\n" . '[Connectivity]');
|
||||
} catch (\Throwable $th) {
|
||||
//throw $th;
|
||||
}
|
||||
Console::log("\n" . '[Connectivity]');
|
||||
|
||||
$pools = $register->get('pools'); /** @var \Utopia\Pools\Group $pools */
|
||||
/** @var Group $pools */
|
||||
$pools = $register->get('pools');
|
||||
|
||||
$configs = [
|
||||
'Console.DB' => Config::getParam('pools-console'),
|
||||
@@ -149,20 +151,22 @@ class Doctor extends Action
|
||||
foreach ($configs as $key => $config) {
|
||||
foreach ($config as $database) {
|
||||
try {
|
||||
$adapter = $pools->get($database)->pop()->getResource();
|
||||
$adapter = new DatabasePool($pools->get($database));
|
||||
|
||||
if ($adapter->ping()) {
|
||||
Console::success('🟢 ' . str_pad("{$key}({$database})", 50, '.') . 'connected');
|
||||
} else {
|
||||
Console::error('🔴 ' . str_pad("{$key}({$database})", 47, '.') . 'disconnected');
|
||||
}
|
||||
} catch (\Throwable $th) {
|
||||
} catch (\Throwable) {
|
||||
Console::error('🔴 ' . str_pad("{$key}.({$database})", 47, '.') . 'disconnected');
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
$pools = $register->get('pools'); /** @var \Utopia\Pools\Group $pools */
|
||||
/** @var Group $pools */
|
||||
$pools = $register->get('pools');
|
||||
|
||||
$configs = [
|
||||
'Cache' => Config::getParam('pools-cache'),
|
||||
'Queue' => Config::getParam('pools-queue'),
|
||||
@@ -172,15 +176,18 @@ class Doctor extends Action
|
||||
foreach ($configs as $key => $config) {
|
||||
foreach ($config as $pool) {
|
||||
try {
|
||||
/** @var Adapter $adapter */
|
||||
$adapter = $pools->get($pool)->pop()->getResource();
|
||||
$adapter = match($key) {
|
||||
'Cache' => new CachePool($pools->get($pool)),
|
||||
'Queue' => new BrokerPool($pools->get($pool)),
|
||||
'PubSub' => new PubSubPool($pools->get($pool)),
|
||||
};
|
||||
|
||||
if ($adapter->ping()) {
|
||||
Console::success('🟢 ' . str_pad("{$key}({$pool})", 50, '.') . 'connected');
|
||||
} else {
|
||||
Console::error('🔴 ' . str_pad("{$key}({$pool})", 47, '.') . 'disconnected');
|
||||
}
|
||||
} catch (\Throwable $th) {
|
||||
} catch (\Throwable) {
|
||||
Console::error('🔴 ' . str_pad("{$key}({$pool})", 47, '.') . 'disconnected');
|
||||
}
|
||||
}
|
||||
@@ -198,13 +205,14 @@ class Doctor extends Action
|
||||
} else {
|
||||
Console::error('🔴 ' . str_pad("Antivirus", 47, '.') . 'disconnected');
|
||||
}
|
||||
} catch (\Throwable $th) {
|
||||
} catch (\Throwable) {
|
||||
Console::error('🔴 ' . str_pad("Antivirus", 47, '.') . 'disconnected');
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
$mail = $register->get('smtp'); /* @var $mail \PHPMailer\PHPMailer\PHPMailer */
|
||||
/* @var PHPMailer $mail */
|
||||
$mail = $register->get('smtp');
|
||||
|
||||
$mail->addAddress('demo@example.com', 'Example.com');
|
||||
$mail->Subject = 'Test SMTP Connection';
|
||||
@@ -213,7 +221,7 @@ class Doctor extends Action
|
||||
|
||||
$mail->send();
|
||||
Console::success('🟢 ' . str_pad("SMTP", 50, '.') . 'connected');
|
||||
} catch (\Throwable $th) {
|
||||
} catch (\Throwable) {
|
||||
Console::error('🔴 ' . str_pad("SMTP", 47, '.') . 'disconnected');
|
||||
}
|
||||
|
||||
@@ -287,7 +295,7 @@ class Doctor extends Action
|
||||
Console::error('Failed to check for a newer version' . "\n");
|
||||
}
|
||||
}
|
||||
} catch (\Throwable $th) {
|
||||
} catch (\Throwable) {
|
||||
Console::error('Failed to check for a newer version' . "\n");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ use Utopia\Database\Query;
|
||||
use Utopia\Database\Validator\Authorization;
|
||||
use Utopia\Platform\Action;
|
||||
use Utopia\Pools\Group;
|
||||
use Utopia\Queue\Broker\Pool as BrokerPool;
|
||||
use Utopia\System\System;
|
||||
use Utopia\Telemetry\Adapter as Telemetry;
|
||||
use Utopia\Telemetry\Gauge;
|
||||
@@ -24,6 +25,8 @@ abstract class ScheduleBase extends Action
|
||||
|
||||
protected array $schedules = [];
|
||||
|
||||
protected BrokerPool $publisher;
|
||||
|
||||
private ?Histogram $collectSchedulesTelemetryDuration = null;
|
||||
private ?Gauge $collectSchedulesTelemetryCount = null;
|
||||
private ?Gauge $scheduleTelemetryCount = null;
|
||||
@@ -68,6 +71,7 @@ abstract class ScheduleBase extends Action
|
||||
Console::title(\ucfirst(static::getSupportedResource()) . ' scheduler V1');
|
||||
Console::success(APP_NAME . ' ' . \ucfirst(static::getSupportedResource()) . ' scheduler v1 has started');
|
||||
|
||||
$this->publisher = new BrokerPool($pools->get('publisher'));
|
||||
$this->scheduleTelemetryCount = $telemetry->createGauge('task.schedule.count');
|
||||
$this->collectSchedulesTelemetryDuration = $telemetry->createHistogram('task.schedule.collect_schedules.duration', 's');
|
||||
$this->collectSchedulesTelemetryCount = $telemetry->createGauge('task.schedule.collect_schedules.count');
|
||||
@@ -119,8 +123,6 @@ abstract class ScheduleBase extends Action
|
||||
$schedule->getAttribute('resourceId')
|
||||
);
|
||||
|
||||
$pools->reclaim();
|
||||
|
||||
return [
|
||||
'$internalId' => $schedule->getInternalId(),
|
||||
'$id' => $schedule->getId(),
|
||||
|
||||
@@ -29,9 +29,7 @@ class ScheduleExecutions extends ScheduleBase
|
||||
|
||||
protected function enqueueResources(Group $pools, Database $dbForPlatform, callable $getProjectDB): void
|
||||
{
|
||||
$queue = $pools->get('publisher')->pop();
|
||||
$connection = $queue->getResource();
|
||||
$queueForFunctions = new Func($connection);
|
||||
$queueForFunctions = new Func($this->publisher);
|
||||
$intervalEnd = (new \DateTime())->modify('+' . self::ENQUEUE_TIMER . ' seconds');
|
||||
|
||||
foreach ($this->schedules as $schedule) {
|
||||
@@ -85,7 +83,5 @@ class ScheduleExecutions extends ScheduleBase
|
||||
|
||||
unset($this->schedules[$schedule['$internalId']]);
|
||||
}
|
||||
|
||||
$queue->reclaim();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -79,9 +79,6 @@ class ScheduleFunctions extends ScheduleBase
|
||||
\go(function () use ($delay, $schedules, $pools, $dbForPlatform) {
|
||||
\sleep($delay); // in seconds
|
||||
|
||||
$queue = $pools->get('publisher')->pop();
|
||||
$connection = $queue->getResource();
|
||||
|
||||
foreach ($schedules as $delayConfig) {
|
||||
$scheduleKey = $delayConfig['key'];
|
||||
// Ensure schedule was not deleted
|
||||
@@ -93,7 +90,7 @@ class ScheduleFunctions extends ScheduleBase
|
||||
|
||||
$this->updateProjectAccess($schedule['project'], $dbForPlatform);
|
||||
|
||||
$queueForFunctions = new Func($connection);
|
||||
$queueForFunctions = new Func($this->publisher);
|
||||
|
||||
$queueForFunctions
|
||||
->setType('schedule')
|
||||
@@ -105,8 +102,6 @@ class ScheduleFunctions extends ScheduleBase
|
||||
|
||||
$this->recordEnqueueDelay($delayConfig['nextDate']);
|
||||
}
|
||||
|
||||
$queue->reclaim();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -41,9 +41,7 @@ class ScheduleMessages extends ScheduleBase
|
||||
}
|
||||
|
||||
\go(function () use ($schedule, $scheduledAt, $pools, $dbForPlatform) {
|
||||
$queue = $pools->get('publisher')->pop();
|
||||
$connection = $queue->getResource();
|
||||
$queueForMessaging = new Messaging($connection);
|
||||
$queueForMessaging = new Messaging($this->publisher);
|
||||
|
||||
$this->updateProjectAccess($schedule['project'], $dbForPlatform);
|
||||
|
||||
@@ -58,7 +56,6 @@ class ScheduleMessages extends ScheduleBase
|
||||
$schedule['$id'],
|
||||
);
|
||||
|
||||
$queue->reclaim();
|
||||
$this->recordEnqueueDelay($scheduledAt);
|
||||
unset($this->schedules[$schedule['$internalId']]);
|
||||
});
|
||||
|
||||
@@ -63,7 +63,7 @@ class Deletes extends Action
|
||||
->inject('executionRetention')
|
||||
->inject('auditRetention')
|
||||
->inject('log')
|
||||
->callback([$this, 'action']);
|
||||
->callback($this->action(...));
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -357,7 +357,7 @@ class StatsUsage extends Action
|
||||
break;
|
||||
}
|
||||
} catch (Throwable $e) {
|
||||
console::error("[reducer] " . " {DateTime::now()} " . " {$project->getInternalId()} " . " {$e->getMessage()}");
|
||||
Console::error("[reducer] " . " {DateTime::now()} " . " {$project->getInternalId()} " . " {$e->getMessage()}");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -376,7 +376,7 @@ class StatsUsage extends Action
|
||||
continue;
|
||||
}
|
||||
|
||||
console::log('['.DateTime::now().'] Id: '.$project->getId(). ' InternalId: '.$project->getInternalId(). ' Db: '.$project->getAttribute('database').' ReceivedAt: '.$receivedAt. ' Keys: '.$numberOfKeys);
|
||||
Console::log('['.DateTime::now().'] Id: '.$project->getId(). ' InternalId: '.$project->getInternalId(). ' Db: '.$project->getAttribute('database').' ReceivedAt: '.$receivedAt. ' Keys: '.$numberOfKeys);
|
||||
|
||||
try {
|
||||
foreach ($stats['keys'] ?? [] as $key => $value) {
|
||||
@@ -413,7 +413,7 @@ class StatsUsage extends Action
|
||||
}
|
||||
}
|
||||
} catch (Exception $e) {
|
||||
console::error('[' . DateTime::now() . '] project [' . $project->getInternalId() . '] database [' . $project['database'] . '] ' . ' ' . $e->getMessage());
|
||||
Console::error('[' . DateTime::now() . '] project [' . $project->getInternalId() . '] database [' . $project['database'] . '] ' . ' ' . $e->getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -437,7 +437,7 @@ class StatsUsage extends Action
|
||||
|
||||
}
|
||||
|
||||
protected function prepareForLogsDB(Document $project, Document $stat)
|
||||
protected function prepareForLogsDB(Document $project, Document $stat): void
|
||||
{
|
||||
if (System::getEnv('_APP_STATS_USAGE_DUAL_WRITING', 'disabled') === 'disabled') {
|
||||
return;
|
||||
@@ -462,8 +462,7 @@ class StatsUsage extends Action
|
||||
return;
|
||||
}
|
||||
|
||||
$dbForLogs = call_user_func($this->getLogsDB);
|
||||
$dbForLogs
|
||||
$dbForLogs = ($this->getLogsDB)()
|
||||
->setTenant(null)
|
||||
->setTenantPerDocument(true);
|
||||
|
||||
@@ -478,6 +477,5 @@ class StatsUsage extends Action
|
||||
} catch (Throwable $th) {
|
||||
Console::error($th->getMessage());
|
||||
}
|
||||
$this->register->get('pools')->get('logs')->reclaim();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -70,9 +70,9 @@ class StatsUsageDump extends Action
|
||||
];
|
||||
|
||||
/**
|
||||
* @var callable
|
||||
* @var callable(Document): Database
|
||||
*/
|
||||
protected mixed $getLogsDB;
|
||||
protected $getLogsDB;
|
||||
|
||||
protected array $periods = [
|
||||
'1h' => 'Y-m-d H:00',
|
||||
@@ -126,10 +126,10 @@ class StatsUsageDump extends Action
|
||||
continue;
|
||||
}
|
||||
|
||||
console::log('['.DateTime::now().'] Id: '.$project->getId(). ' InternalId: '.$project->getInternalId(). ' Db: '.$project->getAttribute('database').' ReceivedAt: '.$receivedAt. ' Keys: '.$numberOfKeys);
|
||||
Console::log('['.DateTime::now().'] Id: '.$project->getId(). ' InternalId: '.$project->getInternalId(). ' Db: '.$project->getAttribute('database').' ReceivedAt: '.$receivedAt. ' Keys: '.$numberOfKeys);
|
||||
|
||||
try {
|
||||
/** @var \Utopia\Database\Database $dbForProject */
|
||||
/** @var Database $dbForProject */
|
||||
$dbForProject = $getProjectDB($project);
|
||||
foreach ($stats['keys'] ?? [] as $key => $value) {
|
||||
if ($value == 0) {
|
||||
@@ -169,7 +169,7 @@ class StatsUsageDump extends Action
|
||||
}
|
||||
}
|
||||
} catch (\Exception $e) {
|
||||
console::error('[' . DateTime::now() . '] project [' . $project->getInternalId() . '] database [' . $project['database'] . '] ' . ' ' . $e->getMessage());
|
||||
Console::error('[' . DateTime::now() . '] project [' . $project->getInternalId() . '] database [' . $project['database'] . '] ' . ' ' . $e->getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -190,8 +190,7 @@ class StatsUsageDump extends Action
|
||||
}
|
||||
}
|
||||
|
||||
/** @var \Utopia\Database\Database $dbForLogs*/
|
||||
$dbForLogs = call_user_func($this->getLogsDB, $project);
|
||||
$dbForLogs = ($this->getLogsDB)($project);
|
||||
|
||||
try {
|
||||
$dbForLogs->createOrUpdateDocumentsWithIncrease(
|
||||
@@ -203,7 +202,5 @@ class StatsUsageDump extends Action
|
||||
} catch (\Throwable $th) {
|
||||
Console::error($th->getMessage());
|
||||
}
|
||||
|
||||
$this->register->get('pools')->get('logs')->reclaim();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,46 @@
|
||||
<?php
|
||||
|
||||
namespace Appwrite\PubSub\Adapter;
|
||||
|
||||
use Appwrite\PubSub\Adapter;
|
||||
use Utopia\Database\Exception as DatabaseException;
|
||||
use Utopia\Pools\Pool as UtopiaPool;
|
||||
|
||||
class Pool implements Adapter
|
||||
{
|
||||
public function __construct(private UtopiaPool $pool)
|
||||
{
|
||||
}
|
||||
|
||||
public function ping($message = null): bool
|
||||
{
|
||||
return $this->delegate(__FUNCTION__, \func_get_args());
|
||||
}
|
||||
|
||||
public function subscribe($channels, $callback): void
|
||||
{
|
||||
$this->delegate(__FUNCTION__, \func_get_args());
|
||||
}
|
||||
|
||||
public function publish($channel, $message): void
|
||||
{
|
||||
$this->delegate(__FUNCTION__, \func_get_args());
|
||||
}
|
||||
|
||||
/**
|
||||
* Forward method calls to the internal adapter instance via the pool.
|
||||
*
|
||||
* Required because __call() can't be used to implement abstract methods.
|
||||
*
|
||||
* @param string $method
|
||||
* @param array<mixed> $args
|
||||
* @return mixed
|
||||
* @throws DatabaseException
|
||||
*/
|
||||
public function delegate(string $method, array $args): mixed
|
||||
{
|
||||
return $this->pool->use(function (Adapter $adapter) use ($method, $args) {
|
||||
return $adapter->{$method}(...$args);
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -35,7 +35,7 @@ services:
|
||||
- VERSION=dev
|
||||
restart: unless-stopped
|
||||
ports:
|
||||
- 9501:80
|
||||
- "9501:80"
|
||||
networks:
|
||||
- appwrite
|
||||
labels:
|
||||
@@ -52,15 +52,12 @@ services:
|
||||
- ./phpunit.xml:/usr/src/code/phpunit.xml
|
||||
- ./tests:/usr/src/code/tests
|
||||
- ./app:/usr/src/code/app
|
||||
# - ./vendor:/usr/src/code/vendor
|
||||
- ./docs:/usr/src/code/docs
|
||||
- ./public:/usr/src/code/public
|
||||
- ./src:/usr/src/code/src
|
||||
- ./debug:/tmp
|
||||
depends_on:
|
||||
- mariadb
|
||||
- redis
|
||||
# - clamav
|
||||
environment:
|
||||
- _APP_COMPRESSION_MIN_SIZE_BYTES
|
||||
- _APP_ENV
|
||||
@@ -353,33 +350,6 @@ services:
|
||||
volumes:
|
||||
- appwrite-redis:/data:rw
|
||||
|
||||
# clamav:
|
||||
# image: appwrite/clamav:1.2.0
|
||||
# container_name: appwrite-clamav
|
||||
# restart: unless-stopped
|
||||
# networks:
|
||||
# - appwrite
|
||||
# volumes:
|
||||
# - appwrite-uploads:/storage/uploads
|
||||
|
||||
|
||||
# redis-commander:
|
||||
# image: rediscommander/redis-commander:latest
|
||||
# restart: unless-stopped
|
||||
# networks:
|
||||
# - appwrite
|
||||
# environment:
|
||||
# - REDIS_HOSTS=redis
|
||||
# ports:
|
||||
# - "8081:8081"
|
||||
|
||||
# webgrind:
|
||||
# image: 'jokkedk/webgrind:latest'
|
||||
# volumes:
|
||||
# - './debug:/tmp'
|
||||
# ports:
|
||||
# - '3001:80'
|
||||
|
||||
networks:
|
||||
gateway:
|
||||
appwrite:
|
||||
|
||||
Reference in New Issue
Block a user