Remove usage and usage dump in favor of stats-usage and stats-usage-dump

This commit is contained in:
Damodar Lohani
2025-02-11 07:52:37 +00:00
parent 2f95b0e2e0
commit 940f4d12af
8 changed files with 1 additions and 606 deletions
+1 -3
View File
@@ -88,9 +88,7 @@ RUN chmod +x /usr/local/bin/doctor && \
chmod +x /usr/local/bin/worker-stats-usage && \
chmod +x /usr/local/bin/worker-stats-usage-dump && \
chmod +x /usr/local/bin/stats-resources && \
chmod +x /usr/local/bin/worker-stats-resources && \
chmod +x /usr/local/bin/worker-usage && \
chmod +x /usr/local/bin/worker-usage-dump
chmod +x /usr/local/bin/worker-stats-resources
# Letsencrypt Permissions
RUN mkdir -p /etc/letsencrypt/live/ && chmod -Rf 755 /etc/letsencrypt/live/
-3
View File
@@ -1197,9 +1197,6 @@ App::setResource('queueForAudits', function (Queue\Publisher $publisher) {
App::setResource('queueForFunctions', function (Queue\Publisher $publisher) {
return new Func($publisher);
}, ['publisher']);
App::setResource('queueForUsage', function (Queue\Publisher $publisher) {
return new Usage($publisher);
}, ['publisher']);
App::setResource('queueForCertificates', function (Queue\Publisher $publisher) {
return new Certificate($publisher);
}, ['publisher']);
-8
View File
@@ -269,14 +269,6 @@ Server::setResource('consumer', function (Group $pools) {
return $pools->get('consumer')->pop()->getResource();
}, ['pools']);
Server::setResource('queueForUsage', function (Publisher $publisher) {
return new Usage($publisher);
}, ['publisher']);
Server::setResource('queueForUsageDump', function (Publisher $publisher) {
return new UsageDump($publisher);
}, ['publisher']);
Server::setResource('queueForStatsUsage', function (Publisher $publisher) {
return new StatsUsage($publisher);
}, ['publisher']);
-3
View File
@@ -1,3 +0,0 @@
#!/bin/sh
php /usr/src/code/app/worker.php usage $@
-3
View File
@@ -1,3 +0,0 @@
#!/bin/sh
php /usr/src/code/app/worker.php usage-dump $@
@@ -14,10 +14,6 @@ use Appwrite\Platform\Workers\Migrations;
use Appwrite\Platform\Workers\StatsResources;
use Appwrite\Platform\Workers\StatsUsage;
use Appwrite\Platform\Workers\StatsUsageDump;
/** remove */
use Appwrite\Platform\Workers\Usage;
use Appwrite\Platform\Workers\UsageDump;
/** /remove */
use Appwrite\Platform\Workers\Webhooks;
use Utopia\Platform\Service;
@@ -40,10 +36,6 @@ class Workers extends Service
->addAction(StatsUsage::getName(), new StatsUsage())
->addAction(Migrations::getName(), new Migrations())
->addAction(StatsResources::getName(), new StatsResources())
/** Remove */
->addAction(UsageDump::getName(), new UsageDump())
->addAction(Usage::getName(), new Usage())
/** /remove */
;
}
}
-292
View File
@@ -1,292 +0,0 @@
<?php
namespace Appwrite\Platform\Workers;
use Appwrite\Event\UsageDump;
use Exception;
use Utopia\CLI\Console;
use Utopia\Database\DateTime;
use Utopia\Database\Document;
use Utopia\Platform\Action;
use Utopia\Queue\Message;
use Utopia\System\System;
class Usage extends Action
{
private array $stats = [];
private int $lastTriggeredTime = 0;
private int $aggregationInterval = 20;
private int $keys = 0;
private const INFINITY_PERIOD = '_inf_';
private const KEYS_THRESHOLD = 20000;
public static function getName(): string
{
return 'usage';
}
/**
* @throws Exception
*/
public function __construct()
{
$this
->desc('Usage worker')
->inject('message')
->inject('project')
->inject('getProjectDB')
->inject('queueForUsageDump')
->callback(function (Message $message, Document $project, callable $getProjectDB, UsageDump $queueForUsageDump) {
$this->action($message, $project, $getProjectDB, $queueForUsageDump);
});
$this->aggregationInterval = (int) System::getEnv('_APP_USAGE_AGGREGATION_INTERVAL', '20');
$this->lastTriggeredTime = time();
}
/**
* @param Message $message
* @param Document $project
* @param callable $getProjectDB
* @param UsageDump $queueForUsageDump
* @return void
* @throws \Utopia\Database\Exception
* @throws Exception
*/
public function action(Message $message, Document $project, callable $getProjectDB, UsageDump $queueForUsageDump): void
{
$payload = $message->getPayload() ?? [];
if (empty($payload)) {
throw new Exception('Missing payload');
}
if (empty($project->getAttribute('database'))) {
var_dump($payload);
return;
}
$projectId = $project->getInternalId();
foreach ($payload['reduce'] ?? [] as $document) {
if (empty($document)) {
continue;
}
$this->reduce(
project: $project,
document: new Document($document),
metrics: $payload['metrics'],
getProjectDB: $getProjectDB
);
}
$this->stats[$projectId]['project'] = [
'$id' => $project->getId(),
'$internalId' => $project->getInternalId(),
'database' => $project->getAttribute('database'),
];
$this->stats[$projectId]['receivedAt'] = DateTime::now();
foreach ($payload['metrics'] ?? [] as $metric) {
$this->keys++;
if (!isset($this->stats[$projectId]['keys'][$metric['key']])) {
$this->stats[$projectId]['keys'][$metric['key']] = $metric['value'];
continue;
}
$this->stats[$projectId]['keys'][$metric['key']] += $metric['value'];
}
// If keys crossed threshold or X time passed since the last send and there are some keys in the array ($this->stats)
if (
$this->keys >= self::KEYS_THRESHOLD ||
(time() - $this->lastTriggeredTime > $this->aggregationInterval && $this->keys > 0)
) {
Console::warning('[' . DateTime::now() . '] Aggregated ' . $this->keys . ' keys');
$queueForUsageDump
->setStats($this->stats)
->trigger();
$this->stats = [];
$this->keys = 0;
$this->lastTriggeredTime = time();
}
}
/**
* On Documents that tied by relations like functions>deployments>build || documents>collection>database || buckets>files.
* When we remove a parent document we need to deduct his children aggregation from the project scope.
* @param Document $project
* @param Document $document
* @param array $metrics
* @param callable $getProjectDB
* @return void
*/
private function reduce(Document $project, Document $document, array &$metrics, callable $getProjectDB): void
{
$dbForProject = $getProjectDB($project);
try {
switch (true) {
case $document->getCollection() === 'users': // users
$sessions = count($document->getAttribute(METRIC_SESSIONS, 0));
if (!empty($sessions)) {
$metrics[] = [
'key' => METRIC_SESSIONS,
'value' => ($sessions * -1),
];
}
break;
case $document->getCollection() === 'databases': // databases
$collections = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{databaseInternalId}', $document->getInternalId(), METRIC_DATABASE_ID_COLLECTIONS)));
$documents = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{databaseInternalId}', $document->getInternalId(), METRIC_DATABASE_ID_DOCUMENTS)));
if (!empty($collections['value'])) {
$metrics[] = [
'key' => METRIC_COLLECTIONS,
'value' => ($collections['value'] * -1),
];
}
if (!empty($documents['value'])) {
$metrics[] = [
'key' => METRIC_DOCUMENTS,
'value' => ($documents['value'] * -1),
];
}
break;
case str_starts_with($document->getCollection(), 'database_') && !str_contains($document->getCollection(), 'collection'): //collections
$parts = explode('_', $document->getCollection());
$databaseInternalId = $parts[1] ?? 0;
$documents = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace(['{databaseInternalId}', '{collectionInternalId}'], [$databaseInternalId, $document->getInternalId()], METRIC_DATABASE_ID_COLLECTION_ID_DOCUMENTS)));
if (!empty($documents['value'])) {
$metrics[] = [
'key' => METRIC_DOCUMENTS,
'value' => ($documents['value'] * -1),
];
$metrics[] = [
'key' => str_replace('{databaseInternalId}', $databaseInternalId, METRIC_DATABASE_ID_DOCUMENTS),
'value' => ($documents['value'] * -1),
];
}
break;
case $document->getCollection() === 'buckets':
$files = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{bucketInternalId}', $document->getInternalId(), METRIC_BUCKET_ID_FILES)));
$storage = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{bucketInternalId}', $document->getInternalId(), METRIC_BUCKET_ID_FILES_STORAGE)));
if (!empty($files['value'])) {
$metrics[] = [
'key' => METRIC_FILES,
'value' => ($files['value'] * -1),
];
}
if (!empty($storage['value'])) {
$metrics[] = [
'key' => METRIC_FILES_STORAGE,
'value' => ($storage['value'] * -1),
];
}
break;
case $document->getCollection() === 'functions':
$deployments = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace(['{resourceType}', '{resourceInternalId}'], ['functions', $document->getInternalId()], METRIC_FUNCTION_ID_DEPLOYMENTS)));
$deploymentsStorage = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace(['{resourceType}', '{resourceInternalId}'], ['functions', $document->getInternalId()], METRIC_FUNCTION_ID_DEPLOYMENTS_STORAGE)));
$builds = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS)));
$buildsSuccess = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS_SUCCESS)));
$buildsFailed = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS_FAILED)));
$buildsStorage = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS_STORAGE)));
$buildsCompute = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS_COMPUTE)));
$buildsComputeSuccess = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS_COMPUTE_SUCCESS)));
$buildsComputeFailed = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS_COMPUTE_FAILED)));
$executions = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_EXECUTIONS)));
$executionsCompute = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_EXECUTIONS_COMPUTE)));
if (!empty($deployments['value'])) {
$metrics[] = [
'key' => METRIC_DEPLOYMENTS,
'value' => ($deployments['value'] * -1),
];
}
if (!empty($deploymentsStorage['value'])) {
$metrics[] = [
'key' => METRIC_DEPLOYMENTS_STORAGE,
'value' => ($deploymentsStorage['value'] * -1),
];
}
if (!empty($builds['value'])) {
$metrics[] = [
'key' => METRIC_BUILDS,
'value' => ($builds['value'] * -1),
];
}
if (!empty($buildsSuccess['value'])) {
$metrics[] = [
'key' => METRIC_BUILDS_SUCCESS,
'value' => ($buildsSuccess['value'] * -1),
];
}
if (!empty($buildsFailed['value'])) {
$metrics[] = [
'key' => METRIC_BUILDS_FAILED,
'value' => ($buildsFailed['value'] * -1),
];
}
if (!empty($buildsStorage['value'])) {
$metrics[] = [
'key' => METRIC_BUILDS_STORAGE,
'value' => ($buildsStorage['value'] * -1),
];
}
if (!empty($buildsCompute['value'])) {
$metrics[] = [
'key' => METRIC_BUILDS_COMPUTE,
'value' => ($buildsCompute['value'] * -1),
];
}
if (!empty($buildsComputeSuccess['value'])) {
$metrics[] = [
'key' => METRIC_BUILDS_COMPUTE_SUCCESS,
'value' => ($buildsComputeSuccess['value'] * -1),
];
}
if (!empty($buildsComputeFailed['value'])) {
$metrics[] = [
'key' => METRIC_BUILDS_COMPUTE_FAILED,
'value' => ($buildsComputeFailed['value'] * -1),
];
}
if (!empty($executions['value'])) {
$metrics[] = [
'key' => METRIC_EXECUTIONS,
'value' => ($executions['value'] * -1),
];
}
if (!empty($executionsCompute['value'])) {
$metrics[] = [
'key' => METRIC_EXECUTIONS_COMPUTE,
'value' => ($executionsCompute['value'] * -1),
];
}
break;
default:
break;
}
} catch (\Throwable $e) {
console::error("[reducer] " . " {DateTime::now()} " . " {$project->getInternalId()} " . " {$e->getMessage()}");
}
}
}
-286
View File
@@ -1,286 +0,0 @@
<?php
namespace Appwrite\Platform\Workers;
use Appwrite\Extend\Exception;
use Utopia\CLI\Console;
use Utopia\Database\Database;
use Utopia\Database\DateTime;
use Utopia\Database\Document;
use Utopia\Database\Exception\Duplicate;
use Utopia\Platform\Action;
use Utopia\Queue\Message;
use Utopia\System\System;
const METRIC_COLLECTION_LEVEL_STORAGE = 4;
const METRIC_DATABASE_LEVEL_STORAGE = 3;
const METRIC_PROJECT_LEVEL_STORAGE = 2;
class UsageDump extends Action
{
protected array $stats = [];
protected array $periods = [
'1h' => 'Y-m-d H:00',
'1d' => 'Y-m-d 00:00',
'inf' => '0000-00-00 00:00'
];
public static function getName(): string
{
return 'usage-dump';
}
/**
* @throws \Exception
*/
public function __construct()
{
$this
->inject('message')
->inject('getProjectDB')
->callback(function (Message $message, callable $getProjectDB) {
$this->action($message, $getProjectDB);
});
}
/**
* @param Message $message
* @param callable $getProjectDB
* @return void
* @throws Exception
* @throws \Utopia\Database\Exception
*/
public function action(Message $message, callable $getProjectDB): void
{
$payload = $message->getPayload() ?? [];
if (empty($payload)) {
throw new Exception('Missing payload');
}
foreach ($payload['stats'] ?? [] as $stats) {
$project = new Document($stats['project'] ?? []);
/**
* End temp bug fallback
*/
$numberOfKeys = !empty($stats['keys']) ? count($stats['keys']) : 0;
$receivedAt = $stats['receivedAt'] ?? 'NONE';
if ($numberOfKeys === 0) {
continue;
}
console::log('['.DateTime::now().'] Id: '.$project->getId(). ' InternalId: '.$project->getInternalId(). ' Db: '.$project->getAttribute('database').' ReceivedAt: '.$receivedAt. ' Keys: '.$numberOfKeys);
try {
$dbForProject = $getProjectDB($project);
foreach ($stats['keys'] ?? [] as $key => $value) {
if ($value == 0) {
continue;
}
if (str_contains($key, METRIC_DATABASES_STORAGE)) {
try {
$this->handleDatabaseStorage($key, $dbForProject);
} catch (\Exception $e) {
console::error('[' . DateTime::now() . '] failed to calculate database storage for key [' . $key . '] ' . $e->getMessage());
}
continue;
}
foreach ($this->periods as $period => $format) {
$time = 'inf' === $period ? null : date($format, time());
$id = \md5("{$time}_{$period}_{$key}");
try {
$dbForProject->createDocument('stats', new Document([
'$id' => $id,
'period' => $period,
'time' => $time,
'metric' => $key,
'value' => $value,
'region' => System::getEnv('_APP_REGION', 'default'),
]));
} catch (Duplicate $th) {
if ($value < 0) {
$dbForProject->decreaseDocumentAttribute(
'stats',
$id,
'value',
abs($value)
);
} else {
$dbForProject->increaseDocumentAttribute(
'stats',
$id,
'value',
$value
);
}
}
}
}
} catch (\Exception $e) {
console::error('[' . DateTime::now() . '] project [' . $project->getInternalId() . '] database [' . $project['database'] . '] ' . ' ' . $e->getMessage());
}
}
}
private function handleDatabaseStorage(string $key, Database $dbForProject): void
{
$data = explode('.', $key);
$start = microtime(true);
$updateMetric = function (Database $dbForProject, int $value, string $key, string $period, string|null $time) {
$id = \md5("{$time}_{$period}_{$key}");
try {
$dbForProject->createDocument('stats', new Document([
'$id' => $id,
'period' => $period,
'time' => $time,
'metric' => $key,
'value' => $value,
'region' => System::getEnv('_APP_REGION', 'default'),
]));
} catch (Duplicate $th) {
if ($value < 0) {
$dbForProject->decreaseDocumentAttribute(
'stats',
$id,
'value',
abs($value)
);
} else {
$dbForProject->increaseDocumentAttribute(
'stats',
$id,
'value',
$value
);
}
}
};
foreach ($this->periods as $period => $format) {
$time = 'inf' === $period ? null : date($format, time());
$id = \md5("{$time}_{$period}_{$key}");
$value = 0;
$previousValue = 0;
try {
$previousValue = ($dbForProject->getDocument('stats', $id))->getAttribute('value', 0);
} catch (\Exception $e) {
// No previous value
}
switch (count($data)) {
// Collection Level
case METRIC_COLLECTION_LEVEL_STORAGE:
Console::log('[' . DateTime::now() . '] Collection Level Storage Calculation [' . $key . ']');
$databaseInternalId = $data[0];
$collectionInternalId = $data[1];
try {
$value = $dbForProject->getSizeOfCollection('database_' . $databaseInternalId . '_collection_' . $collectionInternalId);
} catch (\Exception $e) {
// Collection not found
if ($e->getMessage() !== 'Collection not found') {
throw $e;
}
}
// Compare with previous value
$diff = $value - $previousValue;
if ($diff === 0) {
break;
}
// Update Collection
$updateMetric($dbForProject, $diff, $key, $period, $time);
// Update Database
$databaseKey = str_replace(['{databaseInternalId}'], [$data[0]], METRIC_DATABASE_ID_STORAGE);
$updateMetric($dbForProject, $diff, $databaseKey, $period, $time);
// Update Project
$projectKey = METRIC_DATABASES_STORAGE;
$updateMetric($dbForProject, $diff, $projectKey, $period, $time);
break;
// Database Level
case METRIC_DATABASE_LEVEL_STORAGE:
Console::log('[' . DateTime::now() . '] Database Level Storage Calculation [' . $key . ']');
$databaseInternalId = $data[0];
$collections = [];
try {
$collections = $dbForProject->find('database_' . $databaseInternalId);
} catch (\Exception $e) {
// Database not found
if ($e->getMessage() !== 'Collection not found') {
throw $e;
}
}
foreach ($collections as $collection) {
try {
$value += $dbForProject->getSizeOfCollection('database_' . $databaseInternalId . '_collection_' . $collection->getInternalId());
} catch (\Exception $e) {
// Collection not found
if ($e->getMessage() !== 'Collection not found') {
throw $e;
}
}
}
$diff = $value - $previousValue;
if ($diff === 0) {
break;
}
// Update Database
$databaseKey = str_replace(['{databaseInternalId}'], [$data[0]], METRIC_DATABASE_ID_STORAGE);
$updateMetric($dbForProject, $diff, $databaseKey, $period, $time);
// Update Project
$projectKey = METRIC_DATABASES_STORAGE;
$updateMetric($dbForProject, $diff, $projectKey, $period, $time);
break;
// Project Level
case METRIC_PROJECT_LEVEL_STORAGE:
Console::log('[' . DateTime::now() . '] Project Level Storage Calculation [' . $key . ']');
// Get all project databases
$databases = $dbForProject->find('database');
// Recalculate all databases
foreach ($databases as $database) {
$collections = $dbForProject->find('database_' . $database->getInternalId());
foreach ($collections as $collection) {
try {
$value += $dbForProject->getSizeOfCollection('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId());
} catch (\Exception $e) {
// Collection not found
if ($e->getMessage() !== 'Collection not found') {
throw $e;
}
}
}
}
$diff = $value - $previousValue;
// Update Project
$projectKey = METRIC_DATABASES_STORAGE;
$updateMetric($dbForProject, $diff, $projectKey, $period, $time);
break;
}
}
$end = microtime(true);
console::log('[' . DateTime::now() . '] DB Storage Calculation [' . $key . '] took ' . (($end - $start) * 1000) . ' milliseconds');
}
}