mirror of
https://github.com/appwrite/appwrite.git
synced 2026-05-26 13:51:13 +00:00
fix: remove stats usage dump
This commit is contained in:
@@ -756,38 +756,6 @@ App::get('/v1/health/queue/stats-usage')
|
||||
$response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE);
|
||||
});
|
||||
|
||||
App::get('/v1/health/queue/stats-usage-dump')
|
||||
->desc('Get usage dump queue')
|
||||
->groups(['api', 'health'])
|
||||
->label('scope', 'health.read')
|
||||
->label('sdk', new Method(
|
||||
auth: [AuthType::KEY],
|
||||
namespace: 'health',
|
||||
name: 'getQueueStatsUsageDump',
|
||||
description: '/docs/references/health/get-queue-stats-usage-dump.md',
|
||||
responses: [
|
||||
new SDKResponse(
|
||||
code: Response::STATUS_CODE_OK,
|
||||
model: Response::MODEL_HEALTH_QUEUE,
|
||||
)
|
||||
],
|
||||
contentType: ContentType::JSON
|
||||
))
|
||||
->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true)
|
||||
->inject('publisher')
|
||||
->inject('response')
|
||||
->action(function (int|string $threshold, Publisher $publisher, Response $response) {
|
||||
$threshold = \intval($threshold);
|
||||
|
||||
$size = $publisher->getQueueSize(new Queue(Event::STATS_USAGE_DUMP_QUEUE_NAME));
|
||||
|
||||
if ($size >= $threshold) {
|
||||
throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}.");
|
||||
}
|
||||
|
||||
$response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE);
|
||||
});
|
||||
|
||||
App::get('/v1/health/storage/local')
|
||||
->desc('Get local storage')
|
||||
->groups(['api', 'health'])
|
||||
|
||||
@@ -15,7 +15,6 @@ use Appwrite\Event\Messaging;
|
||||
use Appwrite\Event\Migration;
|
||||
use Appwrite\Event\Realtime;
|
||||
use Appwrite\Event\StatsUsage;
|
||||
use Appwrite\Event\StatsUsageDump;
|
||||
use Appwrite\Event\Webhook;
|
||||
use Appwrite\Platform\Appwrite;
|
||||
use Swoole\Runtime;
|
||||
@@ -278,10 +277,6 @@ Server::setResource('queueForStatsUsage', function (Publisher $publisher) {
|
||||
return new StatsUsage($publisher);
|
||||
}, ['publisher']);
|
||||
|
||||
Server::setResource('queueForStatsUsageDump', function (Publisher $publisher) {
|
||||
return new StatsUsageDump($publisher);
|
||||
}, ['publisher']);
|
||||
|
||||
Server::setResource('queueForDatabase', function (Publisher $publisher) {
|
||||
return new EventDatabase($publisher);
|
||||
}, ['publisher']);
|
||||
|
||||
@@ -24,23 +24,12 @@ class Event
|
||||
public const FUNCTIONS_QUEUE_NAME = 'v1-functions';
|
||||
public const FUNCTIONS_CLASS_NAME = 'FunctionsV1';
|
||||
|
||||
/** remove */
|
||||
public const USAGE_QUEUE_NAME = 'v1-usage';
|
||||
public const USAGE_CLASS_NAME = 'UsageV1';
|
||||
|
||||
public const USAGE_DUMP_QUEUE_NAME = 'v1-usage-dump';
|
||||
public const USAGE_DUMP_CLASS_NAME = 'UsageDumpV1';
|
||||
/** /remove */
|
||||
|
||||
public const STATS_RESOURCES_QUEUE_NAME = 'v1-stats-resources';
|
||||
public const STATS_RESOURCES_CLASS_NAME = 'StatsResourcesV1';
|
||||
|
||||
public const STATS_USAGE_QUEUE_NAME = 'v1-stats-usage';
|
||||
public const STATS_USAGE_CLASS_NAME = 'StatsUsageV1';
|
||||
|
||||
public const STATS_USAGE_DUMP_QUEUE_NAME = 'v1-stats-usage-dump';
|
||||
public const STATS_USAGE_DUMP_CLASS_NAME = 'StatsUsageDumpV1';
|
||||
|
||||
public const WEBHOOK_QUEUE_NAME = 'v1-webhooks';
|
||||
public const WEBHOOK_CLASS_NAME = 'WebhooksV1';
|
||||
|
||||
|
||||
@@ -1,44 +0,0 @@
|
||||
<?php
|
||||
|
||||
namespace Appwrite\Event;
|
||||
|
||||
use Utopia\Queue\Publisher;
|
||||
|
||||
class StatsUsageDump extends Event
|
||||
{
|
||||
protected array $stats;
|
||||
|
||||
public function __construct(protected Publisher $publisher)
|
||||
{
|
||||
parent::__construct($publisher);
|
||||
|
||||
$this
|
||||
->setQueue(Event::STATS_USAGE_DUMP_QUEUE_NAME)
|
||||
->setClass(Event::STATS_USAGE_DUMP_CLASS_NAME);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add Stats.
|
||||
*
|
||||
* @param array $stats
|
||||
* @return self
|
||||
*/
|
||||
public function setStats(array $stats): self
|
||||
{
|
||||
$this->stats = $stats;
|
||||
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Prepare the payload for the usage dump event.
|
||||
*
|
||||
* @return array
|
||||
*/
|
||||
protected function preparePayload(): array
|
||||
{
|
||||
return [
|
||||
'stats' => $this->stats,
|
||||
];
|
||||
}
|
||||
}
|
||||
@@ -13,7 +13,6 @@ use Appwrite\Platform\Workers\Messaging;
|
||||
use Appwrite\Platform\Workers\Migrations;
|
||||
use Appwrite\Platform\Workers\StatsResources;
|
||||
use Appwrite\Platform\Workers\StatsUsage;
|
||||
use Appwrite\Platform\Workers\StatsUsageDump;
|
||||
use Appwrite\Platform\Workers\Webhooks;
|
||||
use Utopia\Platform\Service;
|
||||
|
||||
@@ -32,7 +31,6 @@ class Workers extends Service
|
||||
->addAction(Mails::getName(), new Mails())
|
||||
->addAction(Messaging::getName(), new Messaging())
|
||||
->addAction(Webhooks::getName(), new Webhooks())
|
||||
->addAction(StatsUsageDump::getName(), new StatsUsageDump())
|
||||
->addAction(StatsUsage::getName(), new StatsUsage())
|
||||
->addAction(Migrations::getName(), new Migrations())
|
||||
->addAction(StatsResources::getName(), new StatsResources())
|
||||
|
||||
@@ -1,278 +0,0 @@
|
||||
<?php
|
||||
|
||||
namespace Appwrite\Platform\Workers;
|
||||
|
||||
use Appwrite\Extend\Exception;
|
||||
use Throwable;
|
||||
use Utopia\App;
|
||||
use Utopia\CLI\Console;
|
||||
use Utopia\Database\Database;
|
||||
use Utopia\Database\DateTime;
|
||||
use Utopia\Database\Document;
|
||||
use Utopia\Platform\Action;
|
||||
use Utopia\Queue\Message;
|
||||
use Utopia\Registry\Registry;
|
||||
use Utopia\System\System;
|
||||
|
||||
class StatsUsageDump extends Action
|
||||
{
|
||||
protected const BATCH_AGGREGATION_INTERVAL = 60; // in seconds
|
||||
|
||||
private int $lastDispatchTime = 0;
|
||||
private int $lastDispatchTimeLogsDB = 0;
|
||||
protected array $stats = [];
|
||||
|
||||
/**
|
||||
* Stats for batch write separated per project
|
||||
* @var array
|
||||
*/
|
||||
private array $projects = [];
|
||||
|
||||
/**
|
||||
* Array of stat documents to batch write to logsDB
|
||||
* @var array
|
||||
*/
|
||||
private array $statDocuments = [];
|
||||
|
||||
protected function getBatchSize(): int
|
||||
{
|
||||
return intval(System::getEnv('_APP_QUEUE_PREFETCH_COUNT', 1));
|
||||
}
|
||||
|
||||
protected Registry $register;
|
||||
|
||||
/**
|
||||
* Metrics to skip writing to logsDB
|
||||
* As these metrics are calculated separately
|
||||
* by logs DB
|
||||
* @var array
|
||||
*/
|
||||
protected array $skipBaseMetrics = [
|
||||
METRIC_DATABASES => true,
|
||||
METRIC_BUCKETS => true,
|
||||
METRIC_USERS => true,
|
||||
METRIC_FUNCTIONS => true,
|
||||
METRIC_TEAMS => true,
|
||||
METRIC_MESSAGES => true,
|
||||
METRIC_MAU => true,
|
||||
METRIC_WEBHOOKS => true,
|
||||
METRIC_PLATFORMS => true,
|
||||
METRIC_PROVIDERS => true,
|
||||
METRIC_TOPICS => true,
|
||||
METRIC_KEYS => true,
|
||||
METRIC_FILES => true,
|
||||
METRIC_FILES_STORAGE => true,
|
||||
METRIC_DEPLOYMENTS_STORAGE => true,
|
||||
METRIC_BUILDS_STORAGE => true,
|
||||
METRIC_DEPLOYMENTS => true,
|
||||
METRIC_BUILDS => true,
|
||||
METRIC_COLLECTIONS => true,
|
||||
METRIC_DOCUMENTS => true,
|
||||
METRIC_DATABASES_STORAGE => true,
|
||||
];
|
||||
|
||||
/**
|
||||
* Skip metrics associated with parent IDs
|
||||
* these need to be checked individually with `str_ends_with`
|
||||
*/
|
||||
protected array $skipParentIdMetrics = [
|
||||
'.files',
|
||||
'.files.storage',
|
||||
'.collections',
|
||||
'.documents',
|
||||
'.deployments',
|
||||
'.deployments.storage',
|
||||
'.builds',
|
||||
'.builds.storage',
|
||||
'.databases.storage'
|
||||
];
|
||||
|
||||
/**
|
||||
* @var callable
|
||||
*/
|
||||
protected mixed $getLogsDB;
|
||||
|
||||
protected array $periods = [
|
||||
'1h' => 'Y-m-d H:00',
|
||||
'1d' => 'Y-m-d 00:00',
|
||||
'inf' => '0000-00-00 00:00'
|
||||
];
|
||||
|
||||
public static function getName(): string
|
||||
{
|
||||
return 'stats-usage-dump';
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws \Exception
|
||||
*/
|
||||
public function __construct()
|
||||
{
|
||||
$this
|
||||
->inject('message')
|
||||
->inject('getProjectDB')
|
||||
->inject('getLogsDB')
|
||||
->inject('register')
|
||||
->callback([$this, 'action']);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Message $message
|
||||
* @param callable $getProjectDB
|
||||
* @param callable $getLogsDB
|
||||
* @param Registry $register
|
||||
* @return void
|
||||
* @throws Exception
|
||||
* @throws \Throwable
|
||||
* @throws \Utopia\Database\Exception
|
||||
*/
|
||||
public function action(Message $message, callable $getProjectDB, callable $getLogsDB, Registry $register): void
|
||||
{
|
||||
$this->getLogsDB = $getLogsDB;
|
||||
$this->register = $register;
|
||||
$payload = $message->getPayload() ?? [];
|
||||
if (empty($payload)) {
|
||||
throw new Exception('Missing payload');
|
||||
}
|
||||
|
||||
foreach ($payload['stats'] ?? [] as $stats) {
|
||||
$project = new Document($stats['project'] ?? []);
|
||||
$numberOfKeys = !empty($stats['keys']) ? count($stats['keys']) : 0;
|
||||
$receivedAt = $stats['receivedAt'] ?? null;
|
||||
if ($numberOfKeys === 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
console::log('['.DateTime::now().'] Id: '.$project->getId(). ' InternalId: '.$project->getInternalId(). ' Db: '.$project->getAttribute('database').' ReceivedAt: '.$receivedAt. ' Keys: '.$numberOfKeys);
|
||||
|
||||
try {
|
||||
foreach ($stats['keys'] ?? [] as $key => $value) {
|
||||
if ($value == 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (str_contains($key, METRIC_DATABASES_STORAGE)) {
|
||||
// skip database storage calc as it's wrong and we plan to get this from StatsResources
|
||||
continue;
|
||||
}
|
||||
|
||||
foreach ($this->periods as $period => $format) {
|
||||
$time = null;
|
||||
|
||||
if ($period !== 'inf') {
|
||||
$time = !empty($receivedAt) ? (new \DateTime($receivedAt))->format($format) : date($format, time());
|
||||
}
|
||||
$id = \md5("{$time}_{$period}_{$key}");
|
||||
|
||||
$document = new Document([
|
||||
'$id' => $id,
|
||||
'period' => $period,
|
||||
'time' => $time,
|
||||
'metric' => $key,
|
||||
'value' => $value,
|
||||
'region' => System::getEnv('_APP_REGION', 'default'),
|
||||
]);
|
||||
|
||||
|
||||
$this->projects[$project->getInternalId()]['project'] = new Document([
|
||||
'$id' => $project->getId(),
|
||||
'$internalId' => $project->getInternalId(),
|
||||
'database' => $project->getAttribute('database'),
|
||||
]);
|
||||
$this->projects[$project->getInternalId()]['stats'][] = $document;
|
||||
|
||||
$this->prepareForLogsDB($project, $document);
|
||||
}
|
||||
}
|
||||
} catch (\Exception $e) {
|
||||
console::error('[' . DateTime::now() . '] project [' . $project->getInternalId() . '] database [' . $project['database'] . '] ' . ' ' . $e->getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
$batchSize = $this->getBatchSize();
|
||||
$shouldProcessBatch = \count($this->projects) >= $batchSize;
|
||||
if (!$shouldProcessBatch && \count($this->projects) > 0) {
|
||||
$shouldProcessBatch = (\time() - $this->lastDispatchTime) >= self::BATCH_AGGREGATION_INTERVAL;
|
||||
}
|
||||
|
||||
if ($shouldProcessBatch || App::isDevelopment()) {
|
||||
foreach ($this->projects as $internalId => $projectStats) {
|
||||
if (empty($internalId)) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
/** @var \Utopia\Database\Database $dbForProject */
|
||||
$dbForProject = $getProjectDB($projectStats['project']);
|
||||
Console::log('Processing batch with ' . count($projectStats['stats']) . ' stats');
|
||||
$dbForProject->createOrUpdateDocumentsWithIncrease('stats', 'value', $projectStats['stats']);
|
||||
Console::success('Batch successfully written to DB');
|
||||
|
||||
unset($this->projects[$internalId]);
|
||||
} catch (Throwable $e) {
|
||||
Console::error('Error processing stats: ' . $e->getMessage());
|
||||
}
|
||||
}
|
||||
$this->lastDispatchTime = time();
|
||||
}
|
||||
$this->writeToLogsDB();
|
||||
|
||||
}
|
||||
|
||||
protected function prepareForLogsDB(Document $project, Document $stat)
|
||||
{
|
||||
if (System::getEnv('_APP_STATS_USAGE_DUAL_WRITING', 'disabled') === 'disabled') {
|
||||
return;
|
||||
}
|
||||
if (array_key_exists($stat->getAttribute('metric'), $this->skipBaseMetrics)) {
|
||||
return;
|
||||
}
|
||||
foreach ($this->skipParentIdMetrics as $skipMetric) {
|
||||
if (str_ends_with($stat->getAttribute('metric'), $skipMetric)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
$documentClone = new Document($stat->getArrayCopy());
|
||||
$documentClone->setAttribute('$tenant', (int) $project->getInternalId());
|
||||
$this->statDocuments[] = $documentClone;
|
||||
}
|
||||
|
||||
protected function writeToLogsDB(): void
|
||||
{
|
||||
if (System::getEnv('_APP_STATS_USAGE_DUAL_WRITING', 'disabled') === 'disabled') {
|
||||
Console::log('Dual Writing is disabled. Skipping...');
|
||||
return;
|
||||
}
|
||||
|
||||
$batchSize = $this->getBatchSize();
|
||||
$shouldProcessBatch = \count($this->statDocuments) >= $batchSize;
|
||||
if (!$shouldProcessBatch && \count($this->statDocuments) > 0) {
|
||||
$shouldProcessBatch = (\time() - $this->lastDispatchTimeLogsDB) >= self::BATCH_AGGREGATION_INTERVAL;
|
||||
}
|
||||
|
||||
if (!$shouldProcessBatch) {
|
||||
return;
|
||||
}
|
||||
|
||||
/** @var \Utopia\Database\Database $dbForLogs*/
|
||||
$dbForLogs = call_user_func($this->getLogsDB);
|
||||
$dbForLogs
|
||||
->setTenant(null)
|
||||
->setTenantPerDocument(true);
|
||||
|
||||
try {
|
||||
Console::log('Processing batch with ' . count($this->statDocuments) . ' stats');
|
||||
$dbForLogs->createOrUpdateDocumentsWithIncrease(
|
||||
'stats',
|
||||
'value',
|
||||
$this->statDocuments
|
||||
);
|
||||
Console::success('Usage logs pushed to Logs DB');
|
||||
} catch (Throwable $th) {
|
||||
Console::error($th->getMessage());
|
||||
} finally {
|
||||
$this->lastDispatchTimeLogsDB = time();
|
||||
}
|
||||
|
||||
$this->register->get('pools')->get('logs')->reclaim();
|
||||
}
|
||||
}
|
||||
@@ -541,28 +541,4 @@ class HealthCustomServerTest extends Scope
|
||||
], $this->getHeaders()), []);
|
||||
$this->assertEquals(503, $response['headers']['status-code']);
|
||||
}
|
||||
|
||||
public function testStatsUsageDumpSuccess()
|
||||
{
|
||||
/**
|
||||
* Test for SUCCESS
|
||||
*/
|
||||
$response = $this->client->call(Client::METHOD_GET, '/health/queue/stats-usage-dump', array_merge([
|
||||
'content-type' => 'application/json',
|
||||
'x-appwrite-project' => $this->getProject()['$id'],
|
||||
], $this->getHeaders()), []);
|
||||
|
||||
$this->assertEquals(200, $response['headers']['status-code']);
|
||||
$this->assertIsInt($response['body']['size']);
|
||||
$this->assertLessThan(100, $response['body']['size']);
|
||||
|
||||
/**
|
||||
* Test for FAILURE
|
||||
*/
|
||||
$response = $this->client->call(Client::METHOD_GET, '/health/queue/stats-usage-dump?threshold=0', array_merge([
|
||||
'content-type' => 'application/json',
|
||||
'x-appwrite-project' => $this->getProject()['$id'],
|
||||
], $this->getHeaders()), []);
|
||||
$this->assertEquals(503, $response['headers']['status-code']);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user