From d31a358392c83daa6b0ee020ddaa67ada1df2636 Mon Sep 17 00:00:00 2001 From: Damodar Lohani Date: Thu, 30 Jan 2025 07:07:10 +0000 Subject: [PATCH] Temp: everything to be removed --- app/controllers/api/health.php | 4 +- app/worker.php | 15 + bin/worker-usage | 3 + bin/worker-usage-dump | 3 + docker-compose.yml | 64 +++++ src/Appwrite/Event/Event.php | 8 + src/Appwrite/Event/StatsUsage.php | 4 +- src/Appwrite/Event/Usage.php | 78 +++++ src/Appwrite/Event/UsageDump.php | 44 +++ src/Appwrite/Platform/Services/Workers.php | 8 + src/Appwrite/Platform/Workers/Usage.php | 288 +++++++++++++++++++ src/Appwrite/Platform/Workers/UsageDump.php | 301 ++++++++++++++++++++ 12 files changed, 816 insertions(+), 4 deletions(-) create mode 100644 bin/worker-usage create mode 100644 bin/worker-usage-dump create mode 100644 src/Appwrite/Event/Usage.php create mode 100644 src/Appwrite/Event/UsageDump.php create mode 100644 src/Appwrite/Platform/Workers/Usage.php create mode 100644 src/Appwrite/Platform/Workers/UsageDump.php diff --git a/app/controllers/api/health.php b/app/controllers/api/health.php index ab3551fcd9..a4f6586262 100644 --- a/app/controllers/api/health.php +++ b/app/controllers/api/health.php @@ -788,7 +788,7 @@ App::get('/v1/health/queue/usage') ->action(function (int|string $threshold, Connection $queue, Response $response) { $threshold = \intval($threshold); - $client = new Client(Event::USAGE_QUEUE_NAME, $queue); + $client = new Client(Event::STATS_USAGE_QUEUE_NAME, $queue); $size = $client->getQueueSize(); if ($size >= $threshold) { @@ -995,7 +995,7 @@ App::get('/v1/health/queue/failed/:name') Event::AUDITS_QUEUE_NAME, Event::MAILS_QUEUE_NAME, Event::FUNCTIONS_QUEUE_NAME, - Event::USAGE_QUEUE_NAME, + Event::STATS_USAGE_QUEUE_NAME, Event::STATS_USAGE_DUMP_QUEUE_NAME, Event::WEBHOOK_QUEUE_NAME, Event::CERTIFICATES_QUEUE_NAME, diff --git a/app/worker.php b/app/worker.php index 415d6a6f27..d57a2712a0 100644 --- a/app/worker.php +++ b/app/worker.php @@ -13,8 +13,12 @@ use Appwrite\Event\Func; use Appwrite\Event\Mail; use Appwrite\Event\Messaging; use Appwrite\Event\Migration; +/** remove */ use Appwrite\Event\StatsUsage; use Appwrite\Event\StatsUsageDump; +/** /remove */ +use Appwrite\Event\Usage; +use Appwrite\Event\UsageDump; use Appwrite\Platform\Appwrite; use Swoole\Runtime; use Utopia\Abuse\Adapters\TimeLimit\Redis as TimeLimitRedis; @@ -257,6 +261,17 @@ Server::setResource('timelimit', function (\Redis $redis) { Server::setResource('log', fn () => new Log()); +/** remove */ +Server::setResource('queueForUsage', function (Connection $queue) { + return new Usage($queue); + return new StatsUsage($queue); +}, ['queue']); + +Server::setResource('queueForUsageDump', function (Connection $queue) { + return new UsageDump($queue); + return new StatsUsageDump($queue); +}, ['queue']); +/** /remove */ Server::setResource('queueForStatsUsage', function (Connection $queue) { return new StatsUsage($queue); }, ['queue']); diff --git a/bin/worker-usage b/bin/worker-usage new file mode 100644 index 0000000000..e39ce8477c --- /dev/null +++ b/bin/worker-usage @@ -0,0 +1,3 @@ +#!/bin/sh + +php /usr/src/code/app/worker.php usage $@ \ No newline at end of file diff --git a/bin/worker-usage-dump b/bin/worker-usage-dump new file mode 100644 index 0000000000..43ca87fcb3 --- /dev/null +++ b/bin/worker-usage-dump @@ -0,0 +1,3 @@ +#!/bin/sh + +php /usr/src/code/app/worker.php usage-dump $@ \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index a8193bdec8..4ea1408d9c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -845,7 +845,71 @@ services: - _APP_LOGGING_CONFIG - _APP_USAGE_AGGREGATION_INTERVAL - _APP_DATABASE_SHARED_TABLES + + # remove + appwrite-worker-usage: + entrypoint: worker-usage + <<: *x-logging + container_name: appwrite-worker-usage + image: appwrite-dev + networks: + - appwrite + volumes: + - ./app:/usr/src/code/app + - ./src:/usr/src/code/src + depends_on: + - redis + - mariadb + environment: + - _APP_ENV + - _APP_WORKER_PER_CORE + - _APP_OPENSSL_KEY_V1 + - _APP_DB_HOST + - _APP_DB_PORT + - _APP_DB_SCHEMA + - _APP_DB_USER + - _APP_DB_PASS + - _APP_REDIS_HOST + - _APP_REDIS_PORT + - _APP_REDIS_USER + - _APP_REDIS_PASS + - _APP_USAGE_STATS + - _APP_LOGGING_CONFIG + - _APP_USAGE_AGGREGATION_INTERVAL + - _APP_DATABASE_SHARED_TABLES + appwrite-worker-usage-dump: + entrypoint: worker-usage-dump + <<: *x-logging + container_name: appwrite-worker-usage-dump + image: appwrite-dev + networks: + - appwrite + volumes: + - ./app:/usr/src/code/app + - ./src:/usr/src/code/src + depends_on: + - redis + - mariadb + environment: + - _APP_ENV + - _APP_WORKER_PER_CORE + - _APP_OPENSSL_KEY_V1 + - _APP_DB_HOST + - _APP_DB_PORT + - _APP_DB_SCHEMA + - _APP_DB_USER + - _APP_DB_PASS + - _APP_REDIS_HOST + - _APP_REDIS_PORT + - _APP_REDIS_USER + - _APP_REDIS_PASS + - _APP_USAGE_STATS + - _APP_LOGGING_CONFIG + - _APP_USAGE_AGGREGATION_INTERVAL + - _APP_DATABASE_SHARED_TABLES + # /remove + appwrite-task-scheduler-functions: entrypoint: schedule-functions <<: *x-logging diff --git a/src/Appwrite/Event/Event.php b/src/Appwrite/Event/Event.php index a1c5abd1d8..f6638d2cf6 100644 --- a/src/Appwrite/Event/Event.php +++ b/src/Appwrite/Event/Event.php @@ -24,12 +24,20 @@ class Event public const FUNCTIONS_QUEUE_NAME = 'v1-functions'; public const FUNCTIONS_CLASS_NAME = 'FunctionsV1'; + /** remove */ public const USAGE_QUEUE_NAME = 'v1-usage'; public const USAGE_CLASS_NAME = 'UsageV1'; + public const USAGE_DUMP_QUEUE_NAME = 'v1-usage-dump'; + public const USAGE_DUMP_CLASS_NAME = 'UsageDumpV1'; + /** /remove */ + public const STATS_RESOURCES_QUEUE_NAME = 'v1-stats-resources'; public const STATS_RESOURCES_CLASS_NAME = 'StatsResources'; + public const STATS_USAGE_QUEUE_NAME = 'v1-stats-usage'; + public const STATS_USAGE_CLASS_NAME = 'StatsUsageV1'; + public const STATS_USAGE_DUMP_QUEUE_NAME = 'v1-stats-usage-dump'; public const STATS_USAGE_DUMP_CLASS_NAME = 'StatsUsageDumpV1'; diff --git a/src/Appwrite/Event/StatsUsage.php b/src/Appwrite/Event/StatsUsage.php index 7f54370fca..a1c027f797 100644 --- a/src/Appwrite/Event/StatsUsage.php +++ b/src/Appwrite/Event/StatsUsage.php @@ -16,8 +16,8 @@ class StatsUsage extends Event parent::__construct($connection); $this - ->setQueue(Event::USAGE_QUEUE_NAME) - ->setClass(Event::USAGE_CLASS_NAME); + ->setQueue(Event::STATS_USAGE_QUEUE_NAME) + ->setClass(Event::STATS_USAGE_CLASS_NAME); } /** diff --git a/src/Appwrite/Event/Usage.php b/src/Appwrite/Event/Usage.php new file mode 100644 index 0000000000..5609859f37 --- /dev/null +++ b/src/Appwrite/Event/Usage.php @@ -0,0 +1,78 @@ +setQueue(Event::USAGE_QUEUE_NAME) + ->setClass(Event::USAGE_CLASS_NAME); + } + + /** + * Add reduce. + * + * @param Document $document + * @return self + */ + public function addReduce(Document $document): self + { + $this->reduce[] = $document; + + return $this; + } + + /** + * Add metric. + * + * @param string $key + * @param int $value + * @return self + */ + public function addMetric(string $key, int $value): self + { + + $this->metrics[] = [ + 'key' => $key, + 'value' => $value, + ]; + + return $this; + } + + /** + * Prepare the payload for the usage event. + * + * @return array + */ + protected function preparePayload(): array + { + return [ + 'project' => $this->project, + 'reduce' => $this->reduce, + 'metrics' => $this->metrics, + ]; + } + + /** + * Sends metrics to the usage worker. + * + * @return string|bool + */ + public function trigger(): string|bool + { + parent::trigger(); + $this->metrics = []; + return true; + } +} diff --git a/src/Appwrite/Event/UsageDump.php b/src/Appwrite/Event/UsageDump.php new file mode 100644 index 0000000000..6f44de4eda --- /dev/null +++ b/src/Appwrite/Event/UsageDump.php @@ -0,0 +1,44 @@ +setQueue(Event::USAGE_DUMP_QUEUE_NAME) + ->setClass(Event::USAGE_DUMP_CLASS_NAME); + } + + /** + * Add Stats. + * + * @param array $stats + * @return self + */ + public function setStats(array $stats): self + { + $this->stats = $stats; + + return $this; + } + + /** + * Prepare the payload for the usage dump event. + * + * @return array + */ + protected function preparePayload(): array + { + return [ + 'stats' => $this->stats, + ]; + } +} diff --git a/src/Appwrite/Platform/Services/Workers.php b/src/Appwrite/Platform/Services/Workers.php index 4f4095aca4..e121ee35f7 100644 --- a/src/Appwrite/Platform/Services/Workers.php +++ b/src/Appwrite/Platform/Services/Workers.php @@ -14,6 +14,10 @@ use Appwrite\Platform\Workers\Migrations; use Appwrite\Platform\Workers\StatsResources; use Appwrite\Platform\Workers\StatsUsage; use Appwrite\Platform\Workers\StatsUsageDump; +/** remove */ +use Appwrite\Platform\Workers\Usage; +use Appwrite\Platform\Workers\UsageDump; +/** /remove */ use Appwrite\Platform\Workers\Webhooks; use Utopia\Platform\Service; @@ -36,6 +40,10 @@ class Workers extends Service ->addAction(StatsUsage::getName(), new StatsUsage()) ->addAction(Migrations::getName(), new Migrations()) ->addAction(StatsResources::getName(), new StatsResources()) + /** Remove */ + ->addAction(UsageDump::getName(), new UsageDump()) + ->addAction(Usage::getName(), new Usage()) + /** /remove */ ; } } diff --git a/src/Appwrite/Platform/Workers/Usage.php b/src/Appwrite/Platform/Workers/Usage.php new file mode 100644 index 0000000000..3f7428d0dd --- /dev/null +++ b/src/Appwrite/Platform/Workers/Usage.php @@ -0,0 +1,288 @@ +desc('Usage worker') + ->inject('message') + ->inject('project') + ->inject('getProjectDB') + ->inject('queueForUsageDump') + ->callback([$this, 'action']); + + $this->aggregationInterval = (int) System::getEnv('_APP_USAGE_AGGREGATION_INTERVAL', '20'); + $this->lastTriggeredTime = time(); + } + + /** + * @param Message $message + * @param Document $project + * @param callable $getProjectDB + * @param UsageDump $queueForUsageDump + * @return void + * @throws \Utopia\Database\Exception + * @throws Exception + */ + public function action(Message $message, Document $project, callable $getProjectDB, UsageDump $queueForUsageDump): void + { + $payload = $message->getPayload() ?? []; + if (empty($payload)) { + throw new Exception('Missing payload'); + } + + if (empty($project->getAttribute('database'))) { + var_dump($payload); + return; + } + + $projectId = $project->getInternalId(); + foreach ($payload['reduce'] ?? [] as $document) { + if (empty($document)) { + continue; + } + + $this->reduce( + project: $project, + document: new Document($document), + metrics: $payload['metrics'], + getProjectDB: $getProjectDB + ); + } + + + $this->stats[$projectId]['project'] = [ + '$id' => $project->getId(), + '$internalId' => $project->getInternalId(), + 'database' => $project->getAttribute('database'), + ]; + $this->stats[$projectId]['receivedAt'] = DateTime::now(); + foreach ($payload['metrics'] ?? [] as $metric) { + $this->keys++; + if (!isset($this->stats[$projectId]['keys'][$metric['key']])) { + $this->stats[$projectId]['keys'][$metric['key']] = $metric['value']; + continue; + } + + $this->stats[$projectId]['keys'][$metric['key']] += $metric['value']; + } + + // If keys crossed threshold or X time passed since the last send and there are some keys in the array ($this->stats) + if ( + $this->keys >= self::KEYS_THRESHOLD || + (time() - $this->lastTriggeredTime > $this->aggregationInterval && $this->keys > 0) + ) { + Console::warning('[' . DateTime::now() . '] Aggregated ' . $this->keys . ' keys'); + + $queueForUsageDump + ->setStats($this->stats) + ->trigger(); + + $this->stats = []; + $this->keys = 0; + $this->lastTriggeredTime = time(); + } + } + + /** + * On Documents that tied by relations like functions>deployments>build || documents>collection>database || buckets>files. + * When we remove a parent document we need to deduct his children aggregation from the project scope. + * @param Document $project + * @param Document $document + * @param array $metrics + * @param callable $getProjectDB + * @return void + */ + private function reduce(Document $project, Document $document, array &$metrics, callable $getProjectDB): void + { + $dbForProject = $getProjectDB($project); + + try { + switch (true) { + case $document->getCollection() === 'users': // users + $sessions = count($document->getAttribute(METRIC_SESSIONS, 0)); + if (!empty($sessions)) { + $metrics[] = [ + 'key' => METRIC_SESSIONS, + 'value' => ($sessions * -1), + ]; + } + break; + case $document->getCollection() === 'databases': // databases + $collections = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{databaseInternalId}', $document->getInternalId(), METRIC_DATABASE_ID_COLLECTIONS))); + $documents = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{databaseInternalId}', $document->getInternalId(), METRIC_DATABASE_ID_DOCUMENTS))); + if (!empty($collections['value'])) { + $metrics[] = [ + 'key' => METRIC_COLLECTIONS, + 'value' => ($collections['value'] * -1), + ]; + } + + if (!empty($documents['value'])) { + $metrics[] = [ + 'key' => METRIC_DOCUMENTS, + 'value' => ($documents['value'] * -1), + ]; + } + break; + case str_starts_with($document->getCollection(), 'database_') && !str_contains($document->getCollection(), 'collection'): //collections + $parts = explode('_', $document->getCollection()); + $databaseInternalId = $parts[1] ?? 0; + $documents = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace(['{databaseInternalId}', '{collectionInternalId}'], [$databaseInternalId, $document->getInternalId()], METRIC_DATABASE_ID_COLLECTION_ID_DOCUMENTS))); + + if (!empty($documents['value'])) { + $metrics[] = [ + 'key' => METRIC_DOCUMENTS, + 'value' => ($documents['value'] * -1), + ]; + $metrics[] = [ + 'key' => str_replace('{databaseInternalId}', $databaseInternalId, METRIC_DATABASE_ID_DOCUMENTS), + 'value' => ($documents['value'] * -1), + ]; + } + break; + + case $document->getCollection() === 'buckets': + $files = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{bucketInternalId}', $document->getInternalId(), METRIC_BUCKET_ID_FILES))); + $storage = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{bucketInternalId}', $document->getInternalId(), METRIC_BUCKET_ID_FILES_STORAGE))); + + if (!empty($files['value'])) { + $metrics[] = [ + 'key' => METRIC_FILES, + 'value' => ($files['value'] * -1), + ]; + } + + if (!empty($storage['value'])) { + $metrics[] = [ + 'key' => METRIC_FILES_STORAGE, + 'value' => ($storage['value'] * -1), + ]; + } + break; + + case $document->getCollection() === 'functions': + $deployments = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace(['{resourceType}', '{resourceInternalId}'], ['functions', $document->getInternalId()], METRIC_FUNCTION_ID_DEPLOYMENTS))); + $deploymentsStorage = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace(['{resourceType}', '{resourceInternalId}'], ['functions', $document->getInternalId()], METRIC_FUNCTION_ID_DEPLOYMENTS_STORAGE))); + $builds = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS))); + $buildsSuccess = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS_SUCCESS))); + $buildsFailed = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS_FAILED))); + $buildsStorage = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS_STORAGE))); + $buildsCompute = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS_COMPUTE))); + $buildsComputeSuccess = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS_COMPUTE_SUCCESS))); + $buildsComputeFailed = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS_COMPUTE_FAILED))); + $executions = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_EXECUTIONS))); + $executionsCompute = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_EXECUTIONS_COMPUTE))); + + if (!empty($deployments['value'])) { + $metrics[] = [ + 'key' => METRIC_DEPLOYMENTS, + 'value' => ($deployments['value'] * -1), + ]; + } + + if (!empty($deploymentsStorage['value'])) { + $metrics[] = [ + 'key' => METRIC_DEPLOYMENTS_STORAGE, + 'value' => ($deploymentsStorage['value'] * -1), + ]; + } + + if (!empty($builds['value'])) { + $metrics[] = [ + 'key' => METRIC_BUILDS, + 'value' => ($builds['value'] * -1), + ]; + } + + if (!empty($buildsSuccess['value'])) { + $metrics[] = [ + 'key' => METRIC_BUILDS_SUCCESS, + 'value' => ($buildsSuccess['value'] * -1), + ]; + } + + if (!empty($buildsFailed['value'])) { + $metrics[] = [ + 'key' => METRIC_BUILDS_FAILED, + 'value' => ($buildsFailed['value'] * -1), + ]; + } + + if (!empty($buildsStorage['value'])) { + $metrics[] = [ + 'key' => METRIC_BUILDS_STORAGE, + 'value' => ($buildsStorage['value'] * -1), + ]; + } + + if (!empty($buildsCompute['value'])) { + $metrics[] = [ + 'key' => METRIC_BUILDS_COMPUTE, + 'value' => ($buildsCompute['value'] * -1), + ]; + } + + if (!empty($buildsComputeSuccess['value'])) { + $metrics[] = [ + 'key' => METRIC_BUILDS_COMPUTE_SUCCESS, + 'value' => ($buildsComputeSuccess['value'] * -1), + ]; + } + + if (!empty($buildsComputeFailed['value'])) { + $metrics[] = [ + 'key' => METRIC_BUILDS_COMPUTE_FAILED, + 'value' => ($buildsComputeFailed['value'] * -1), + ]; + } + + if (!empty($executions['value'])) { + $metrics[] = [ + 'key' => METRIC_EXECUTIONS, + 'value' => ($executions['value'] * -1), + ]; + } + + if (!empty($executionsCompute['value'])) { + $metrics[] = [ + 'key' => METRIC_EXECUTIONS_COMPUTE, + 'value' => ($executionsCompute['value'] * -1), + ]; + } + break; + default: + break; + } + } catch (\Throwable $e) { + console::error("[reducer] " . " {DateTime::now()} " . " {$project->getInternalId()} " . " {$e->getMessage()}"); + } + } +} diff --git a/src/Appwrite/Platform/Workers/UsageDump.php b/src/Appwrite/Platform/Workers/UsageDump.php new file mode 100644 index 0000000000..bb1d605442 --- /dev/null +++ b/src/Appwrite/Platform/Workers/UsageDump.php @@ -0,0 +1,301 @@ + 'Y-m-d H:00', + '1d' => 'Y-m-d 00:00', + 'inf' => '0000-00-00 00:00' + ]; + + public static function getName(): string + { + return 'usage-dump'; + } + + /** + * @throws \Exception + */ + public function __construct() + { + $this + ->inject('message') + ->inject('getProjectDB') + ->callback([$this, 'action']); + } + + /** + * @param Message $message + * @param callable $getProjectDB + * @return void + * @throws Exception + * @throws \Utopia\Database\Exception + */ + public function action(Message $message, callable $getProjectDB): void + { + $payload = $message->getPayload() ?? []; + if (empty($payload)) { + throw new Exception('Missing payload'); + } + + try { + foreach ($payload['stats'] ?? [] as $stats) { + $project = new Document($stats['project'] ?? []); + $numberOfKeys = !empty($stats['keys']) ? \count($stats['keys']) : 0; + $receivedAt = $stats['receivedAt'] ?? 'NONE'; + if ($numberOfKeys === 0) { + continue; + } + + $dbForProject = $getProjectDB($project); + $projectDocuments = []; + $databaseCache = []; + $collectionSizeCache = []; + + Console::log('['.DateTime::now().'] Id: '.$project->getId(). ' InternalId: '.$project->getInternalId(). ' Db: '.$project->getAttribute('database').' ReceivedAt: '.$receivedAt. ' Keys: '.$numberOfKeys . ' Started'); + $start = \microtime(true); + + foreach ($stats['keys'] ?? [] as $key => $value) { + if ($value == 0) { + continue; + } + + foreach ($this->periods as $period => $format) { + $time = 'inf' === $period ? null : \date($format, \time()); + $id = \md5("{$time}_{$period}_{$key}"); + + if (\str_contains($key, METRIC_DATABASES_STORAGE)) { + $this->handleDatabaseStorage( + $id, + $key, + $time, + $period, + $dbForProject, + $projectDocuments, + $databaseCache, + $collectionSizeCache + ); + continue; + } + + $projectDocuments[] = new Document([ + '$id' => $id, + 'period' => $period, + 'time' => $time, + 'metric' => $key, + 'value' => $value, + 'region' => System::getEnv('_APP_REGION', 'default'), + ]); + } + } + + $dbForProject->createOrUpdateDocumentsWithIncrease( + collection: 'stats', + attribute: 'value', + documents: $projectDocuments + ); + + $end = \microtime(true); + Console::log('['.DateTime::now().'] Id: '.$project->getId(). ' InternalId: '.$project->getInternalId(). ' Db: '.$project->getAttribute('database').' ReceivedAt: '.$receivedAt. ' Keys: '.$numberOfKeys. ' Time: '.($end - $start).'s'); + } + } catch (\Exception $e) { + Console::error('[' . DateTime::now() . '] Error processing stats: ' . $e->getMessage()); + } + } + + private function handleDatabaseStorage( + string $id, + string $key, + ?string $time, + string $period, + Database $dbForProject, + array &$projectDocuments, + array &$databaseCache, + array &$collectionSizeCache, + ): void { + $data = \explode('.', $key); + $value = 0; + $previousValue = 0; + + try { + $previousValue = $dbForProject + ->getDocument('stats', $id) + ->getAttribute('value', 0); + } catch (\Exception) { + // No previous value + } + + switch (\count($data)) { + case METRIC_COLLECTION_LEVEL_STORAGE: + $databaseInternalId = $data[0]; + $collectionInternalId = $data[1]; + $collectionId = "database_{$databaseInternalId}_collection_{$collectionInternalId}"; + + if (!isset($collectionSizeCache[$collectionId])) { + try { + $collectionSizeCache[$collectionId] = $dbForProject->getSizeOfCollection($collectionId); + } catch (\Exception $e) { + if (!$e instanceof NotFound) { + throw $e; + } + $collectionSizeCache[$collectionId] = 0; + } + } + + $value = $collectionSizeCache[$collectionId]; + + $diff = $value - $previousValue; + if ($diff === 0) { + break; + } + + $keys = [ + $key, + \str_replace(['{databaseInternalId}'], [$data[0]], METRIC_DATABASE_ID_STORAGE), + METRIC_DATABASES_STORAGE + ]; + + foreach ($keys as $metric) { + $projectDocuments[] = $this->createStatsDocument($id, $period, $time, $metric, $diff); + } + break; + + case METRIC_DATABASE_LEVEL_STORAGE: + $databaseInternalId = $data[0]; + $databaseId = "database_{$databaseInternalId}"; + + if (!isset($databaseCache[$databaseId])) { + try { + $databaseCache[$databaseId] = $dbForProject->find($databaseId); + } catch (\Exception $e) { + if (!$e instanceof NotFound) { + throw $e; + } + $databaseCache[$databaseId] = []; + } + } + + foreach ($databaseCache[$databaseId] as $collection) { + $collectionId = "{$databaseId}_collection_{$collection->getInternalId()}"; + + if (!isset($collectionSizeCache[$collectionId])) { + try { + $collectionSizeCache[$collectionId] = $dbForProject->getSizeOfCollection($collectionId); + } catch (\Exception $e) { + if (!$e instanceof NotFound) { + throw $e; + } + $collectionSizeCache[$collectionId] = 0; + } + } + $value += $collectionSizeCache[$collectionId]; + } + + $diff = $value - $previousValue; + if ($diff === 0) { + break; + } + + $keys = [ + \str_replace(['{databaseInternalId}'], [$data[0]], METRIC_DATABASE_ID_STORAGE), + METRIC_DATABASES_STORAGE + ]; + + foreach ($keys as $metric) { + $projectDocuments[] = $this->createStatsDocument($id, $period, $time, $metric, $diff); + } + break; + + case METRIC_PROJECT_LEVEL_STORAGE: + if (!isset($databaseCache['*'])) { + try { + $databaseCache['*'] = $dbForProject->find('databases'); + } catch (\Exception $e) { + if (!$e instanceof NotFound) { + throw $e; + } + $databaseCache['*'] = []; + } + } + + foreach ($databaseCache['*'] as $database) { + $databaseId = "database_{$database->getInternalId()}"; + if (!isset($databaseCache[$databaseId])) { + try { + $databaseCache[$databaseId] = $dbForProject->find($databaseId); + } catch (\Exception $e) { + if (!$e instanceof NotFound) { + throw $e; + } + $databaseCache[$databaseId] = []; + } + } + + foreach ($databaseCache[$databaseId] as $collection) { + $collectionId = "{$databaseId}_collection_{$collection->getInternalId()}"; + + if (!isset($collectionSizeCache[$collectionId])) { + try { + $collectionSizeCache[$collectionId] = $dbForProject->getSizeOfCollection($collectionId); + } catch (\Exception $e) { + if (!$e instanceof NotFound) { + throw $e; + } + $collectionSizeCache[$collectionId] = 0; + } + } + $value += $collectionSizeCache[$collectionId]; + } + } + + $diff = $value - $previousValue; + if ($diff === 0) { + break; + } + + $keys = [ + METRIC_DATABASES_STORAGE + ]; + + foreach ($keys as $metric) { + $projectDocuments[] = $this->createStatsDocument($id, $period, $time, $metric, $diff); + } + + break; + } + } + + private function createStatsDocument( + string $id, + string $period, + ?string $time, + string $key, + int $diff, + ): Document { + return new Document([ + '$id' => $id, + 'period' => $period, + 'time' => $time, + 'metric' => $key, + 'value' => $diff, + 'region' => System::getEnv('_APP_REGION', 'default'), + ]); + } +}