diff --git a/app/controllers/api/health.php b/app/controllers/api/health.php index e5336067c8..204ba0ad0a 100644 --- a/app/controllers/api/health.php +++ b/app/controllers/api/health.php @@ -756,38 +756,6 @@ App::get('/v1/health/queue/stats-usage') $response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE); }); -App::get('/v1/health/queue/stats-usage-dump') - ->desc('Get usage dump queue') - ->groups(['api', 'health']) - ->label('scope', 'health.read') - ->label('sdk', new Method( - auth: [AuthType::KEY], - namespace: 'health', - name: 'getQueueStatsUsageDump', - description: '/docs/references/health/get-queue-stats-usage-dump.md', - responses: [ - new SDKResponse( - code: Response::STATUS_CODE_OK, - model: Response::MODEL_HEALTH_QUEUE, - ) - ], - contentType: ContentType::JSON - )) - ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('publisher') - ->inject('response') - ->action(function (int|string $threshold, Publisher $publisher, Response $response) { - $threshold = \intval($threshold); - - $size = $publisher->getQueueSize(new Queue(Event::STATS_USAGE_DUMP_QUEUE_NAME)); - - if ($size >= $threshold) { - throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); - } - - $response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE); - }); - App::get('/v1/health/storage/local') ->desc('Get local storage') ->groups(['api', 'health']) diff --git a/app/worker.php b/app/worker.php index 90496c0430..e061a6a6d6 100644 --- a/app/worker.php +++ b/app/worker.php @@ -15,7 +15,6 @@ use Appwrite\Event\Messaging; use Appwrite\Event\Migration; use Appwrite\Event\Realtime; use Appwrite\Event\StatsUsage; -use Appwrite\Event\StatsUsageDump; use Appwrite\Event\Webhook; use Appwrite\Platform\Appwrite; use Swoole\Runtime; @@ -278,10 +277,6 @@ Server::setResource('queueForStatsUsage', function (Publisher $publisher) { return new StatsUsage($publisher); }, ['publisher']); -Server::setResource('queueForStatsUsageDump', function (Publisher $publisher) { - return new StatsUsageDump($publisher); -}, ['publisher']); - Server::setResource('queueForDatabase', function (Publisher $publisher) { return new EventDatabase($publisher); }, ['publisher']); diff --git a/src/Appwrite/Event/Event.php b/src/Appwrite/Event/Event.php index 0edffdf4dc..14b1fd356a 100644 --- a/src/Appwrite/Event/Event.php +++ b/src/Appwrite/Event/Event.php @@ -24,23 +24,12 @@ class Event public const FUNCTIONS_QUEUE_NAME = 'v1-functions'; public const FUNCTIONS_CLASS_NAME = 'FunctionsV1'; - /** remove */ - public const USAGE_QUEUE_NAME = 'v1-usage'; - public const USAGE_CLASS_NAME = 'UsageV1'; - - public const USAGE_DUMP_QUEUE_NAME = 'v1-usage-dump'; - public const USAGE_DUMP_CLASS_NAME = 'UsageDumpV1'; - /** /remove */ - public const STATS_RESOURCES_QUEUE_NAME = 'v1-stats-resources'; public const STATS_RESOURCES_CLASS_NAME = 'StatsResourcesV1'; public const STATS_USAGE_QUEUE_NAME = 'v1-stats-usage'; public const STATS_USAGE_CLASS_NAME = 'StatsUsageV1'; - public const STATS_USAGE_DUMP_QUEUE_NAME = 'v1-stats-usage-dump'; - public const STATS_USAGE_DUMP_CLASS_NAME = 'StatsUsageDumpV1'; - public const WEBHOOK_QUEUE_NAME = 'v1-webhooks'; public const WEBHOOK_CLASS_NAME = 'WebhooksV1'; diff --git a/src/Appwrite/Event/StatsUsageDump.php b/src/Appwrite/Event/StatsUsageDump.php deleted file mode 100644 index 0573a88040..0000000000 --- a/src/Appwrite/Event/StatsUsageDump.php +++ /dev/null @@ -1,44 +0,0 @@ -setQueue(Event::STATS_USAGE_DUMP_QUEUE_NAME) - ->setClass(Event::STATS_USAGE_DUMP_CLASS_NAME); - } - - /** - * Add Stats. - * - * @param array $stats - * @return self - */ - public function setStats(array $stats): self - { - $this->stats = $stats; - - return $this; - } - - /** - * Prepare the payload for the usage dump event. - * - * @return array - */ - protected function preparePayload(): array - { - return [ - 'stats' => $this->stats, - ]; - } -} diff --git a/src/Appwrite/Platform/Services/Workers.php b/src/Appwrite/Platform/Services/Workers.php index 4f4095aca4..eb544c140e 100644 --- a/src/Appwrite/Platform/Services/Workers.php +++ b/src/Appwrite/Platform/Services/Workers.php @@ -13,7 +13,6 @@ use Appwrite\Platform\Workers\Messaging; use Appwrite\Platform\Workers\Migrations; use Appwrite\Platform\Workers\StatsResources; use Appwrite\Platform\Workers\StatsUsage; -use Appwrite\Platform\Workers\StatsUsageDump; use Appwrite\Platform\Workers\Webhooks; use Utopia\Platform\Service; @@ -32,7 +31,6 @@ class Workers extends Service ->addAction(Mails::getName(), new Mails()) ->addAction(Messaging::getName(), new Messaging()) ->addAction(Webhooks::getName(), new Webhooks()) - ->addAction(StatsUsageDump::getName(), new StatsUsageDump()) ->addAction(StatsUsage::getName(), new StatsUsage()) ->addAction(Migrations::getName(), new Migrations()) ->addAction(StatsResources::getName(), new StatsResources()) diff --git a/src/Appwrite/Platform/Workers/StatsUsageDump.php b/src/Appwrite/Platform/Workers/StatsUsageDump.php deleted file mode 100644 index 2832e97000..0000000000 --- a/src/Appwrite/Platform/Workers/StatsUsageDump.php +++ /dev/null @@ -1,278 +0,0 @@ - true, - METRIC_BUCKETS => true, - METRIC_USERS => true, - METRIC_FUNCTIONS => true, - METRIC_TEAMS => true, - METRIC_MESSAGES => true, - METRIC_MAU => true, - METRIC_WEBHOOKS => true, - METRIC_PLATFORMS => true, - METRIC_PROVIDERS => true, - METRIC_TOPICS => true, - METRIC_KEYS => true, - METRIC_FILES => true, - METRIC_FILES_STORAGE => true, - METRIC_DEPLOYMENTS_STORAGE => true, - METRIC_BUILDS_STORAGE => true, - METRIC_DEPLOYMENTS => true, - METRIC_BUILDS => true, - METRIC_COLLECTIONS => true, - METRIC_DOCUMENTS => true, - METRIC_DATABASES_STORAGE => true, - ]; - - /** - * Skip metrics associated with parent IDs - * these need to be checked individually with `str_ends_with` - */ - protected array $skipParentIdMetrics = [ - '.files', - '.files.storage', - '.collections', - '.documents', - '.deployments', - '.deployments.storage', - '.builds', - '.builds.storage', - '.databases.storage' - ]; - - /** - * @var callable - */ - protected mixed $getLogsDB; - - protected array $periods = [ - '1h' => 'Y-m-d H:00', - '1d' => 'Y-m-d 00:00', - 'inf' => '0000-00-00 00:00' - ]; - - public static function getName(): string - { - return 'stats-usage-dump'; - } - - /** - * @throws \Exception - */ - public function __construct() - { - $this - ->inject('message') - ->inject('getProjectDB') - ->inject('getLogsDB') - ->inject('register') - ->callback([$this, 'action']); - } - - /** - * @param Message $message - * @param callable $getProjectDB - * @param callable $getLogsDB - * @param Registry $register - * @return void - * @throws Exception - * @throws \Throwable - * @throws \Utopia\Database\Exception - */ - public function action(Message $message, callable $getProjectDB, callable $getLogsDB, Registry $register): void - { - $this->getLogsDB = $getLogsDB; - $this->register = $register; - $payload = $message->getPayload() ?? []; - if (empty($payload)) { - throw new Exception('Missing payload'); - } - - foreach ($payload['stats'] ?? [] as $stats) { - $project = new Document($stats['project'] ?? []); - $numberOfKeys = !empty($stats['keys']) ? count($stats['keys']) : 0; - $receivedAt = $stats['receivedAt'] ?? null; - if ($numberOfKeys === 0) { - continue; - } - - console::log('['.DateTime::now().'] Id: '.$project->getId(). ' InternalId: '.$project->getInternalId(). ' Db: '.$project->getAttribute('database').' ReceivedAt: '.$receivedAt. ' Keys: '.$numberOfKeys); - - try { - foreach ($stats['keys'] ?? [] as $key => $value) { - if ($value == 0) { - continue; - } - - if (str_contains($key, METRIC_DATABASES_STORAGE)) { - // skip database storage calc as it's wrong and we plan to get this from StatsResources - continue; - } - - foreach ($this->periods as $period => $format) { - $time = null; - - if ($period !== 'inf') { - $time = !empty($receivedAt) ? (new \DateTime($receivedAt))->format($format) : date($format, time()); - } - $id = \md5("{$time}_{$period}_{$key}"); - - $document = new Document([ - '$id' => $id, - 'period' => $period, - 'time' => $time, - 'metric' => $key, - 'value' => $value, - 'region' => System::getEnv('_APP_REGION', 'default'), - ]); - - - $this->projects[$project->getInternalId()]['project'] = new Document([ - '$id' => $project->getId(), - '$internalId' => $project->getInternalId(), - 'database' => $project->getAttribute('database'), - ]); - $this->projects[$project->getInternalId()]['stats'][] = $document; - - $this->prepareForLogsDB($project, $document); - } - } - } catch (\Exception $e) { - console::error('[' . DateTime::now() . '] project [' . $project->getInternalId() . '] database [' . $project['database'] . '] ' . ' ' . $e->getMessage()); - } - } - - $batchSize = $this->getBatchSize(); - $shouldProcessBatch = \count($this->projects) >= $batchSize; - if (!$shouldProcessBatch && \count($this->projects) > 0) { - $shouldProcessBatch = (\time() - $this->lastDispatchTime) >= self::BATCH_AGGREGATION_INTERVAL; - } - - if ($shouldProcessBatch || App::isDevelopment()) { - foreach ($this->projects as $internalId => $projectStats) { - if (empty($internalId)) { - continue; - } - try { - /** @var \Utopia\Database\Database $dbForProject */ - $dbForProject = $getProjectDB($projectStats['project']); - Console::log('Processing batch with ' . count($projectStats['stats']) . ' stats'); - $dbForProject->createOrUpdateDocumentsWithIncrease('stats', 'value', $projectStats['stats']); - Console::success('Batch successfully written to DB'); - - unset($this->projects[$internalId]); - } catch (Throwable $e) { - Console::error('Error processing stats: ' . $e->getMessage()); - } - } - $this->lastDispatchTime = time(); - } - $this->writeToLogsDB(); - - } - - protected function prepareForLogsDB(Document $project, Document $stat) - { - if (System::getEnv('_APP_STATS_USAGE_DUAL_WRITING', 'disabled') === 'disabled') { - return; - } - if (array_key_exists($stat->getAttribute('metric'), $this->skipBaseMetrics)) { - return; - } - foreach ($this->skipParentIdMetrics as $skipMetric) { - if (str_ends_with($stat->getAttribute('metric'), $skipMetric)) { - return; - } - } - $documentClone = new Document($stat->getArrayCopy()); - $documentClone->setAttribute('$tenant', (int) $project->getInternalId()); - $this->statDocuments[] = $documentClone; - } - - protected function writeToLogsDB(): void - { - if (System::getEnv('_APP_STATS_USAGE_DUAL_WRITING', 'disabled') === 'disabled') { - Console::log('Dual Writing is disabled. Skipping...'); - return; - } - - $batchSize = $this->getBatchSize(); - $shouldProcessBatch = \count($this->statDocuments) >= $batchSize; - if (!$shouldProcessBatch && \count($this->statDocuments) > 0) { - $shouldProcessBatch = (\time() - $this->lastDispatchTimeLogsDB) >= self::BATCH_AGGREGATION_INTERVAL; - } - - if (!$shouldProcessBatch) { - return; - } - - /** @var \Utopia\Database\Database $dbForLogs*/ - $dbForLogs = call_user_func($this->getLogsDB); - $dbForLogs - ->setTenant(null) - ->setTenantPerDocument(true); - - try { - Console::log('Processing batch with ' . count($this->statDocuments) . ' stats'); - $dbForLogs->createOrUpdateDocumentsWithIncrease( - 'stats', - 'value', - $this->statDocuments - ); - Console::success('Usage logs pushed to Logs DB'); - } catch (Throwable $th) { - Console::error($th->getMessage()); - } finally { - $this->lastDispatchTimeLogsDB = time(); - } - - $this->register->get('pools')->get('logs')->reclaim(); - } -} diff --git a/tests/e2e/Services/Health/HealthCustomServerTest.php b/tests/e2e/Services/Health/HealthCustomServerTest.php index 04b1408cd0..4b7062dc22 100644 --- a/tests/e2e/Services/Health/HealthCustomServerTest.php +++ b/tests/e2e/Services/Health/HealthCustomServerTest.php @@ -541,28 +541,4 @@ class HealthCustomServerTest extends Scope ], $this->getHeaders()), []); $this->assertEquals(503, $response['headers']['status-code']); } - - public function testStatsUsageDumpSuccess() - { - /** - * Test for SUCCESS - */ - $response = $this->client->call(Client::METHOD_GET, '/health/queue/stats-usage-dump', array_merge([ - 'content-type' => 'application/json', - 'x-appwrite-project' => $this->getProject()['$id'], - ], $this->getHeaders()), []); - - $this->assertEquals(200, $response['headers']['status-code']); - $this->assertIsInt($response['body']['size']); - $this->assertLessThan(100, $response['body']['size']); - - /** - * Test for FAILURE - */ - $response = $this->client->call(Client::METHOD_GET, '/health/queue/stats-usage-dump?threshold=0', array_merge([ - 'content-type' => 'application/json', - 'x-appwrite-project' => $this->getProject()['$id'], - ], $this->getHeaders()), []); - $this->assertEquals(503, $response['headers']['status-code']); - } }