mirror of
https://github.com/appwrite/appwrite.git
synced 2026-05-26 13:51:13 +00:00
Temp: everything to be removed
This commit is contained in:
@@ -788,7 +788,7 @@ App::get('/v1/health/queue/usage')
|
||||
->action(function (int|string $threshold, Connection $queue, Response $response) {
|
||||
$threshold = \intval($threshold);
|
||||
|
||||
$client = new Client(Event::USAGE_QUEUE_NAME, $queue);
|
||||
$client = new Client(Event::STATS_USAGE_QUEUE_NAME, $queue);
|
||||
$size = $client->getQueueSize();
|
||||
|
||||
if ($size >= $threshold) {
|
||||
@@ -995,7 +995,7 @@ App::get('/v1/health/queue/failed/:name')
|
||||
Event::AUDITS_QUEUE_NAME,
|
||||
Event::MAILS_QUEUE_NAME,
|
||||
Event::FUNCTIONS_QUEUE_NAME,
|
||||
Event::USAGE_QUEUE_NAME,
|
||||
Event::STATS_USAGE_QUEUE_NAME,
|
||||
Event::STATS_USAGE_DUMP_QUEUE_NAME,
|
||||
Event::WEBHOOK_QUEUE_NAME,
|
||||
Event::CERTIFICATES_QUEUE_NAME,
|
||||
|
||||
@@ -13,8 +13,12 @@ use Appwrite\Event\Func;
|
||||
use Appwrite\Event\Mail;
|
||||
use Appwrite\Event\Messaging;
|
||||
use Appwrite\Event\Migration;
|
||||
/** remove */
|
||||
use Appwrite\Event\StatsUsage;
|
||||
use Appwrite\Event\StatsUsageDump;
|
||||
/** /remove */
|
||||
use Appwrite\Event\Usage;
|
||||
use Appwrite\Event\UsageDump;
|
||||
use Appwrite\Platform\Appwrite;
|
||||
use Swoole\Runtime;
|
||||
use Utopia\Abuse\Adapters\TimeLimit\Redis as TimeLimitRedis;
|
||||
@@ -257,6 +261,17 @@ Server::setResource('timelimit', function (\Redis $redis) {
|
||||
|
||||
Server::setResource('log', fn () => new Log());
|
||||
|
||||
/** remove */
|
||||
Server::setResource('queueForUsage', function (Connection $queue) {
|
||||
return new Usage($queue);
|
||||
return new StatsUsage($queue);
|
||||
}, ['queue']);
|
||||
|
||||
Server::setResource('queueForUsageDump', function (Connection $queue) {
|
||||
return new UsageDump($queue);
|
||||
return new StatsUsageDump($queue);
|
||||
}, ['queue']);
|
||||
/** /remove */
|
||||
Server::setResource('queueForStatsUsage', function (Connection $queue) {
|
||||
return new StatsUsage($queue);
|
||||
}, ['queue']);
|
||||
|
||||
@@ -0,0 +1,3 @@
|
||||
#!/bin/sh
|
||||
|
||||
php /usr/src/code/app/worker.php usage $@
|
||||
@@ -0,0 +1,3 @@
|
||||
#!/bin/sh
|
||||
|
||||
php /usr/src/code/app/worker.php usage-dump $@
|
||||
@@ -845,7 +845,71 @@ services:
|
||||
- _APP_LOGGING_CONFIG
|
||||
- _APP_USAGE_AGGREGATION_INTERVAL
|
||||
- _APP_DATABASE_SHARED_TABLES
|
||||
|
||||
# remove
|
||||
appwrite-worker-usage:
|
||||
entrypoint: worker-usage
|
||||
<<: *x-logging
|
||||
container_name: appwrite-worker-usage
|
||||
image: appwrite-dev
|
||||
networks:
|
||||
- appwrite
|
||||
volumes:
|
||||
- ./app:/usr/src/code/app
|
||||
- ./src:/usr/src/code/src
|
||||
depends_on:
|
||||
- redis
|
||||
- mariadb
|
||||
environment:
|
||||
- _APP_ENV
|
||||
- _APP_WORKER_PER_CORE
|
||||
- _APP_OPENSSL_KEY_V1
|
||||
- _APP_DB_HOST
|
||||
- _APP_DB_PORT
|
||||
- _APP_DB_SCHEMA
|
||||
- _APP_DB_USER
|
||||
- _APP_DB_PASS
|
||||
- _APP_REDIS_HOST
|
||||
- _APP_REDIS_PORT
|
||||
- _APP_REDIS_USER
|
||||
- _APP_REDIS_PASS
|
||||
- _APP_USAGE_STATS
|
||||
- _APP_LOGGING_CONFIG
|
||||
- _APP_USAGE_AGGREGATION_INTERVAL
|
||||
- _APP_DATABASE_SHARED_TABLES
|
||||
|
||||
appwrite-worker-usage-dump:
|
||||
entrypoint: worker-usage-dump
|
||||
<<: *x-logging
|
||||
container_name: appwrite-worker-usage-dump
|
||||
image: appwrite-dev
|
||||
networks:
|
||||
- appwrite
|
||||
volumes:
|
||||
- ./app:/usr/src/code/app
|
||||
- ./src:/usr/src/code/src
|
||||
depends_on:
|
||||
- redis
|
||||
- mariadb
|
||||
environment:
|
||||
- _APP_ENV
|
||||
- _APP_WORKER_PER_CORE
|
||||
- _APP_OPENSSL_KEY_V1
|
||||
- _APP_DB_HOST
|
||||
- _APP_DB_PORT
|
||||
- _APP_DB_SCHEMA
|
||||
- _APP_DB_USER
|
||||
- _APP_DB_PASS
|
||||
- _APP_REDIS_HOST
|
||||
- _APP_REDIS_PORT
|
||||
- _APP_REDIS_USER
|
||||
- _APP_REDIS_PASS
|
||||
- _APP_USAGE_STATS
|
||||
- _APP_LOGGING_CONFIG
|
||||
- _APP_USAGE_AGGREGATION_INTERVAL
|
||||
- _APP_DATABASE_SHARED_TABLES
|
||||
# /remove
|
||||
|
||||
appwrite-task-scheduler-functions:
|
||||
entrypoint: schedule-functions
|
||||
<<: *x-logging
|
||||
|
||||
@@ -24,12 +24,20 @@ class Event
|
||||
public const FUNCTIONS_QUEUE_NAME = 'v1-functions';
|
||||
public const FUNCTIONS_CLASS_NAME = 'FunctionsV1';
|
||||
|
||||
/** remove */
|
||||
public const USAGE_QUEUE_NAME = 'v1-usage';
|
||||
public const USAGE_CLASS_NAME = 'UsageV1';
|
||||
|
||||
public const USAGE_DUMP_QUEUE_NAME = 'v1-usage-dump';
|
||||
public const USAGE_DUMP_CLASS_NAME = 'UsageDumpV1';
|
||||
/** /remove */
|
||||
|
||||
public const STATS_RESOURCES_QUEUE_NAME = 'v1-stats-resources';
|
||||
public const STATS_RESOURCES_CLASS_NAME = 'StatsResources';
|
||||
|
||||
public const STATS_USAGE_QUEUE_NAME = 'v1-stats-usage';
|
||||
public const STATS_USAGE_CLASS_NAME = 'StatsUsageV1';
|
||||
|
||||
public const STATS_USAGE_DUMP_QUEUE_NAME = 'v1-stats-usage-dump';
|
||||
public const STATS_USAGE_DUMP_CLASS_NAME = 'StatsUsageDumpV1';
|
||||
|
||||
|
||||
@@ -16,8 +16,8 @@ class StatsUsage extends Event
|
||||
parent::__construct($connection);
|
||||
|
||||
$this
|
||||
->setQueue(Event::USAGE_QUEUE_NAME)
|
||||
->setClass(Event::USAGE_CLASS_NAME);
|
||||
->setQueue(Event::STATS_USAGE_QUEUE_NAME)
|
||||
->setClass(Event::STATS_USAGE_CLASS_NAME);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -0,0 +1,78 @@
|
||||
<?php
|
||||
|
||||
namespace Appwrite\Event;
|
||||
|
||||
use Utopia\Database\Document;
|
||||
use Utopia\Queue\Connection;
|
||||
|
||||
class Usage extends Event
|
||||
{
|
||||
protected array $metrics = [];
|
||||
protected array $reduce = [];
|
||||
|
||||
public function __construct(protected Connection $connection)
|
||||
{
|
||||
parent::__construct($connection);
|
||||
|
||||
$this
|
||||
->setQueue(Event::USAGE_QUEUE_NAME)
|
||||
->setClass(Event::USAGE_CLASS_NAME);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add reduce.
|
||||
*
|
||||
* @param Document $document
|
||||
* @return self
|
||||
*/
|
||||
public function addReduce(Document $document): self
|
||||
{
|
||||
$this->reduce[] = $document;
|
||||
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add metric.
|
||||
*
|
||||
* @param string $key
|
||||
* @param int $value
|
||||
* @return self
|
||||
*/
|
||||
public function addMetric(string $key, int $value): self
|
||||
{
|
||||
|
||||
$this->metrics[] = [
|
||||
'key' => $key,
|
||||
'value' => $value,
|
||||
];
|
||||
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Prepare the payload for the usage event.
|
||||
*
|
||||
* @return array
|
||||
*/
|
||||
protected function preparePayload(): array
|
||||
{
|
||||
return [
|
||||
'project' => $this->project,
|
||||
'reduce' => $this->reduce,
|
||||
'metrics' => $this->metrics,
|
||||
];
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends metrics to the usage worker.
|
||||
*
|
||||
* @return string|bool
|
||||
*/
|
||||
public function trigger(): string|bool
|
||||
{
|
||||
parent::trigger();
|
||||
$this->metrics = [];
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,44 @@
|
||||
<?php
|
||||
|
||||
namespace Appwrite\Event;
|
||||
|
||||
use Utopia\Queue\Connection;
|
||||
|
||||
class UsageDump extends Event
|
||||
{
|
||||
protected array $stats;
|
||||
|
||||
public function __construct(protected Connection $connection)
|
||||
{
|
||||
parent::__construct($connection);
|
||||
|
||||
$this
|
||||
->setQueue(Event::USAGE_DUMP_QUEUE_NAME)
|
||||
->setClass(Event::USAGE_DUMP_CLASS_NAME);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add Stats.
|
||||
*
|
||||
* @param array $stats
|
||||
* @return self
|
||||
*/
|
||||
public function setStats(array $stats): self
|
||||
{
|
||||
$this->stats = $stats;
|
||||
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Prepare the payload for the usage dump event.
|
||||
*
|
||||
* @return array
|
||||
*/
|
||||
protected function preparePayload(): array
|
||||
{
|
||||
return [
|
||||
'stats' => $this->stats,
|
||||
];
|
||||
}
|
||||
}
|
||||
@@ -14,6 +14,10 @@ use Appwrite\Platform\Workers\Migrations;
|
||||
use Appwrite\Platform\Workers\StatsResources;
|
||||
use Appwrite\Platform\Workers\StatsUsage;
|
||||
use Appwrite\Platform\Workers\StatsUsageDump;
|
||||
/** remove */
|
||||
use Appwrite\Platform\Workers\Usage;
|
||||
use Appwrite\Platform\Workers\UsageDump;
|
||||
/** /remove */
|
||||
use Appwrite\Platform\Workers\Webhooks;
|
||||
use Utopia\Platform\Service;
|
||||
|
||||
@@ -36,6 +40,10 @@ class Workers extends Service
|
||||
->addAction(StatsUsage::getName(), new StatsUsage())
|
||||
->addAction(Migrations::getName(), new Migrations())
|
||||
->addAction(StatsResources::getName(), new StatsResources())
|
||||
/** Remove */
|
||||
->addAction(UsageDump::getName(), new UsageDump())
|
||||
->addAction(Usage::getName(), new Usage())
|
||||
/** /remove */
|
||||
;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,288 @@
|
||||
<?php
|
||||
|
||||
namespace Appwrite\Platform\Workers;
|
||||
|
||||
use Appwrite\Event\UsageDump;
|
||||
use Exception;
|
||||
use Utopia\CLI\Console;
|
||||
use Utopia\Database\DateTime;
|
||||
use Utopia\Database\Document;
|
||||
use Utopia\Platform\Action;
|
||||
use Utopia\Queue\Message;
|
||||
use Utopia\System\System;
|
||||
|
||||
class Usage extends Action
|
||||
{
|
||||
private array $stats = [];
|
||||
private int $lastTriggeredTime = 0;
|
||||
private int $aggregationInterval = 20;
|
||||
private int $keys = 0;
|
||||
private const INFINITY_PERIOD = '_inf_';
|
||||
private const KEYS_THRESHOLD = 20000;
|
||||
|
||||
public static function getName(): string
|
||||
{
|
||||
return 'usage';
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws Exception
|
||||
*/
|
||||
public function __construct()
|
||||
{
|
||||
$this
|
||||
->desc('Usage worker')
|
||||
->inject('message')
|
||||
->inject('project')
|
||||
->inject('getProjectDB')
|
||||
->inject('queueForUsageDump')
|
||||
->callback([$this, 'action']);
|
||||
|
||||
$this->aggregationInterval = (int) System::getEnv('_APP_USAGE_AGGREGATION_INTERVAL', '20');
|
||||
$this->lastTriggeredTime = time();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Message $message
|
||||
* @param Document $project
|
||||
* @param callable $getProjectDB
|
||||
* @param UsageDump $queueForUsageDump
|
||||
* @return void
|
||||
* @throws \Utopia\Database\Exception
|
||||
* @throws Exception
|
||||
*/
|
||||
public function action(Message $message, Document $project, callable $getProjectDB, UsageDump $queueForUsageDump): void
|
||||
{
|
||||
$payload = $message->getPayload() ?? [];
|
||||
if (empty($payload)) {
|
||||
throw new Exception('Missing payload');
|
||||
}
|
||||
|
||||
if (empty($project->getAttribute('database'))) {
|
||||
var_dump($payload);
|
||||
return;
|
||||
}
|
||||
|
||||
$projectId = $project->getInternalId();
|
||||
foreach ($payload['reduce'] ?? [] as $document) {
|
||||
if (empty($document)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$this->reduce(
|
||||
project: $project,
|
||||
document: new Document($document),
|
||||
metrics: $payload['metrics'],
|
||||
getProjectDB: $getProjectDB
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
$this->stats[$projectId]['project'] = [
|
||||
'$id' => $project->getId(),
|
||||
'$internalId' => $project->getInternalId(),
|
||||
'database' => $project->getAttribute('database'),
|
||||
];
|
||||
$this->stats[$projectId]['receivedAt'] = DateTime::now();
|
||||
foreach ($payload['metrics'] ?? [] as $metric) {
|
||||
$this->keys++;
|
||||
if (!isset($this->stats[$projectId]['keys'][$metric['key']])) {
|
||||
$this->stats[$projectId]['keys'][$metric['key']] = $metric['value'];
|
||||
continue;
|
||||
}
|
||||
|
||||
$this->stats[$projectId]['keys'][$metric['key']] += $metric['value'];
|
||||
}
|
||||
|
||||
// If keys crossed threshold or X time passed since the last send and there are some keys in the array ($this->stats)
|
||||
if (
|
||||
$this->keys >= self::KEYS_THRESHOLD ||
|
||||
(time() - $this->lastTriggeredTime > $this->aggregationInterval && $this->keys > 0)
|
||||
) {
|
||||
Console::warning('[' . DateTime::now() . '] Aggregated ' . $this->keys . ' keys');
|
||||
|
||||
$queueForUsageDump
|
||||
->setStats($this->stats)
|
||||
->trigger();
|
||||
|
||||
$this->stats = [];
|
||||
$this->keys = 0;
|
||||
$this->lastTriggeredTime = time();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* On Documents that tied by relations like functions>deployments>build || documents>collection>database || buckets>files.
|
||||
* When we remove a parent document we need to deduct his children aggregation from the project scope.
|
||||
* @param Document $project
|
||||
* @param Document $document
|
||||
* @param array $metrics
|
||||
* @param callable $getProjectDB
|
||||
* @return void
|
||||
*/
|
||||
private function reduce(Document $project, Document $document, array &$metrics, callable $getProjectDB): void
|
||||
{
|
||||
$dbForProject = $getProjectDB($project);
|
||||
|
||||
try {
|
||||
switch (true) {
|
||||
case $document->getCollection() === 'users': // users
|
||||
$sessions = count($document->getAttribute(METRIC_SESSIONS, 0));
|
||||
if (!empty($sessions)) {
|
||||
$metrics[] = [
|
||||
'key' => METRIC_SESSIONS,
|
||||
'value' => ($sessions * -1),
|
||||
];
|
||||
}
|
||||
break;
|
||||
case $document->getCollection() === 'databases': // databases
|
||||
$collections = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{databaseInternalId}', $document->getInternalId(), METRIC_DATABASE_ID_COLLECTIONS)));
|
||||
$documents = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{databaseInternalId}', $document->getInternalId(), METRIC_DATABASE_ID_DOCUMENTS)));
|
||||
if (!empty($collections['value'])) {
|
||||
$metrics[] = [
|
||||
'key' => METRIC_COLLECTIONS,
|
||||
'value' => ($collections['value'] * -1),
|
||||
];
|
||||
}
|
||||
|
||||
if (!empty($documents['value'])) {
|
||||
$metrics[] = [
|
||||
'key' => METRIC_DOCUMENTS,
|
||||
'value' => ($documents['value'] * -1),
|
||||
];
|
||||
}
|
||||
break;
|
||||
case str_starts_with($document->getCollection(), 'database_') && !str_contains($document->getCollection(), 'collection'): //collections
|
||||
$parts = explode('_', $document->getCollection());
|
||||
$databaseInternalId = $parts[1] ?? 0;
|
||||
$documents = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace(['{databaseInternalId}', '{collectionInternalId}'], [$databaseInternalId, $document->getInternalId()], METRIC_DATABASE_ID_COLLECTION_ID_DOCUMENTS)));
|
||||
|
||||
if (!empty($documents['value'])) {
|
||||
$metrics[] = [
|
||||
'key' => METRIC_DOCUMENTS,
|
||||
'value' => ($documents['value'] * -1),
|
||||
];
|
||||
$metrics[] = [
|
||||
'key' => str_replace('{databaseInternalId}', $databaseInternalId, METRIC_DATABASE_ID_DOCUMENTS),
|
||||
'value' => ($documents['value'] * -1),
|
||||
];
|
||||
}
|
||||
break;
|
||||
|
||||
case $document->getCollection() === 'buckets':
|
||||
$files = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{bucketInternalId}', $document->getInternalId(), METRIC_BUCKET_ID_FILES)));
|
||||
$storage = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{bucketInternalId}', $document->getInternalId(), METRIC_BUCKET_ID_FILES_STORAGE)));
|
||||
|
||||
if (!empty($files['value'])) {
|
||||
$metrics[] = [
|
||||
'key' => METRIC_FILES,
|
||||
'value' => ($files['value'] * -1),
|
||||
];
|
||||
}
|
||||
|
||||
if (!empty($storage['value'])) {
|
||||
$metrics[] = [
|
||||
'key' => METRIC_FILES_STORAGE,
|
||||
'value' => ($storage['value'] * -1),
|
||||
];
|
||||
}
|
||||
break;
|
||||
|
||||
case $document->getCollection() === 'functions':
|
||||
$deployments = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace(['{resourceType}', '{resourceInternalId}'], ['functions', $document->getInternalId()], METRIC_FUNCTION_ID_DEPLOYMENTS)));
|
||||
$deploymentsStorage = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace(['{resourceType}', '{resourceInternalId}'], ['functions', $document->getInternalId()], METRIC_FUNCTION_ID_DEPLOYMENTS_STORAGE)));
|
||||
$builds = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS)));
|
||||
$buildsSuccess = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS_SUCCESS)));
|
||||
$buildsFailed = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS_FAILED)));
|
||||
$buildsStorage = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS_STORAGE)));
|
||||
$buildsCompute = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS_COMPUTE)));
|
||||
$buildsComputeSuccess = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS_COMPUTE_SUCCESS)));
|
||||
$buildsComputeFailed = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS_COMPUTE_FAILED)));
|
||||
$executions = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_EXECUTIONS)));
|
||||
$executionsCompute = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_EXECUTIONS_COMPUTE)));
|
||||
|
||||
if (!empty($deployments['value'])) {
|
||||
$metrics[] = [
|
||||
'key' => METRIC_DEPLOYMENTS,
|
||||
'value' => ($deployments['value'] * -1),
|
||||
];
|
||||
}
|
||||
|
||||
if (!empty($deploymentsStorage['value'])) {
|
||||
$metrics[] = [
|
||||
'key' => METRIC_DEPLOYMENTS_STORAGE,
|
||||
'value' => ($deploymentsStorage['value'] * -1),
|
||||
];
|
||||
}
|
||||
|
||||
if (!empty($builds['value'])) {
|
||||
$metrics[] = [
|
||||
'key' => METRIC_BUILDS,
|
||||
'value' => ($builds['value'] * -1),
|
||||
];
|
||||
}
|
||||
|
||||
if (!empty($buildsSuccess['value'])) {
|
||||
$metrics[] = [
|
||||
'key' => METRIC_BUILDS_SUCCESS,
|
||||
'value' => ($buildsSuccess['value'] * -1),
|
||||
];
|
||||
}
|
||||
|
||||
if (!empty($buildsFailed['value'])) {
|
||||
$metrics[] = [
|
||||
'key' => METRIC_BUILDS_FAILED,
|
||||
'value' => ($buildsFailed['value'] * -1),
|
||||
];
|
||||
}
|
||||
|
||||
if (!empty($buildsStorage['value'])) {
|
||||
$metrics[] = [
|
||||
'key' => METRIC_BUILDS_STORAGE,
|
||||
'value' => ($buildsStorage['value'] * -1),
|
||||
];
|
||||
}
|
||||
|
||||
if (!empty($buildsCompute['value'])) {
|
||||
$metrics[] = [
|
||||
'key' => METRIC_BUILDS_COMPUTE,
|
||||
'value' => ($buildsCompute['value'] * -1),
|
||||
];
|
||||
}
|
||||
|
||||
if (!empty($buildsComputeSuccess['value'])) {
|
||||
$metrics[] = [
|
||||
'key' => METRIC_BUILDS_COMPUTE_SUCCESS,
|
||||
'value' => ($buildsComputeSuccess['value'] * -1),
|
||||
];
|
||||
}
|
||||
|
||||
if (!empty($buildsComputeFailed['value'])) {
|
||||
$metrics[] = [
|
||||
'key' => METRIC_BUILDS_COMPUTE_FAILED,
|
||||
'value' => ($buildsComputeFailed['value'] * -1),
|
||||
];
|
||||
}
|
||||
|
||||
if (!empty($executions['value'])) {
|
||||
$metrics[] = [
|
||||
'key' => METRIC_EXECUTIONS,
|
||||
'value' => ($executions['value'] * -1),
|
||||
];
|
||||
}
|
||||
|
||||
if (!empty($executionsCompute['value'])) {
|
||||
$metrics[] = [
|
||||
'key' => METRIC_EXECUTIONS_COMPUTE,
|
||||
'value' => ($executionsCompute['value'] * -1),
|
||||
];
|
||||
}
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
} catch (\Throwable $e) {
|
||||
console::error("[reducer] " . " {DateTime::now()} " . " {$project->getInternalId()} " . " {$e->getMessage()}");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,301 @@
|
||||
<?php
|
||||
|
||||
namespace Appwrite\Platform\Workers;
|
||||
|
||||
use Appwrite\Extend\Exception;
|
||||
use Utopia\CLI\Console;
|
||||
use Utopia\Database\Database;
|
||||
use Utopia\Database\DateTime;
|
||||
use Utopia\Database\Document;
|
||||
use Utopia\Database\Exception\NotFound;
|
||||
use Utopia\Platform\Action;
|
||||
use Utopia\Queue\Message;
|
||||
use Utopia\System\System;
|
||||
|
||||
const METRIC_COLLECTION_LEVEL_STORAGE = 4;
|
||||
const METRIC_DATABASE_LEVEL_STORAGE = 3;
|
||||
const METRIC_PROJECT_LEVEL_STORAGE = 2;
|
||||
|
||||
class UsageDump extends Action
|
||||
{
|
||||
protected array $stats = [];
|
||||
protected array $periods = [
|
||||
'1h' => 'Y-m-d H:00',
|
||||
'1d' => 'Y-m-d 00:00',
|
||||
'inf' => '0000-00-00 00:00'
|
||||
];
|
||||
|
||||
public static function getName(): string
|
||||
{
|
||||
return 'usage-dump';
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws \Exception
|
||||
*/
|
||||
public function __construct()
|
||||
{
|
||||
$this
|
||||
->inject('message')
|
||||
->inject('getProjectDB')
|
||||
->callback([$this, 'action']);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Message $message
|
||||
* @param callable $getProjectDB
|
||||
* @return void
|
||||
* @throws Exception
|
||||
* @throws \Utopia\Database\Exception
|
||||
*/
|
||||
public function action(Message $message, callable $getProjectDB): void
|
||||
{
|
||||
$payload = $message->getPayload() ?? [];
|
||||
if (empty($payload)) {
|
||||
throw new Exception('Missing payload');
|
||||
}
|
||||
|
||||
try {
|
||||
foreach ($payload['stats'] ?? [] as $stats) {
|
||||
$project = new Document($stats['project'] ?? []);
|
||||
$numberOfKeys = !empty($stats['keys']) ? \count($stats['keys']) : 0;
|
||||
$receivedAt = $stats['receivedAt'] ?? 'NONE';
|
||||
if ($numberOfKeys === 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$dbForProject = $getProjectDB($project);
|
||||
$projectDocuments = [];
|
||||
$databaseCache = [];
|
||||
$collectionSizeCache = [];
|
||||
|
||||
Console::log('['.DateTime::now().'] Id: '.$project->getId(). ' InternalId: '.$project->getInternalId(). ' Db: '.$project->getAttribute('database').' ReceivedAt: '.$receivedAt. ' Keys: '.$numberOfKeys . ' Started');
|
||||
$start = \microtime(true);
|
||||
|
||||
foreach ($stats['keys'] ?? [] as $key => $value) {
|
||||
if ($value == 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
foreach ($this->periods as $period => $format) {
|
||||
$time = 'inf' === $period ? null : \date($format, \time());
|
||||
$id = \md5("{$time}_{$period}_{$key}");
|
||||
|
||||
if (\str_contains($key, METRIC_DATABASES_STORAGE)) {
|
||||
$this->handleDatabaseStorage(
|
||||
$id,
|
||||
$key,
|
||||
$time,
|
||||
$period,
|
||||
$dbForProject,
|
||||
$projectDocuments,
|
||||
$databaseCache,
|
||||
$collectionSizeCache
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
$projectDocuments[] = new Document([
|
||||
'$id' => $id,
|
||||
'period' => $period,
|
||||
'time' => $time,
|
||||
'metric' => $key,
|
||||
'value' => $value,
|
||||
'region' => System::getEnv('_APP_REGION', 'default'),
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
||||
$dbForProject->createOrUpdateDocumentsWithIncrease(
|
||||
collection: 'stats',
|
||||
attribute: 'value',
|
||||
documents: $projectDocuments
|
||||
);
|
||||
|
||||
$end = \microtime(true);
|
||||
Console::log('['.DateTime::now().'] Id: '.$project->getId(). ' InternalId: '.$project->getInternalId(). ' Db: '.$project->getAttribute('database').' ReceivedAt: '.$receivedAt. ' Keys: '.$numberOfKeys. ' Time: '.($end - $start).'s');
|
||||
}
|
||||
} catch (\Exception $e) {
|
||||
Console::error('[' . DateTime::now() . '] Error processing stats: ' . $e->getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private function handleDatabaseStorage(
|
||||
string $id,
|
||||
string $key,
|
||||
?string $time,
|
||||
string $period,
|
||||
Database $dbForProject,
|
||||
array &$projectDocuments,
|
||||
array &$databaseCache,
|
||||
array &$collectionSizeCache,
|
||||
): void {
|
||||
$data = \explode('.', $key);
|
||||
$value = 0;
|
||||
$previousValue = 0;
|
||||
|
||||
try {
|
||||
$previousValue = $dbForProject
|
||||
->getDocument('stats', $id)
|
||||
->getAttribute('value', 0);
|
||||
} catch (\Exception) {
|
||||
// No previous value
|
||||
}
|
||||
|
||||
switch (\count($data)) {
|
||||
case METRIC_COLLECTION_LEVEL_STORAGE:
|
||||
$databaseInternalId = $data[0];
|
||||
$collectionInternalId = $data[1];
|
||||
$collectionId = "database_{$databaseInternalId}_collection_{$collectionInternalId}";
|
||||
|
||||
if (!isset($collectionSizeCache[$collectionId])) {
|
||||
try {
|
||||
$collectionSizeCache[$collectionId] = $dbForProject->getSizeOfCollection($collectionId);
|
||||
} catch (\Exception $e) {
|
||||
if (!$e instanceof NotFound) {
|
||||
throw $e;
|
||||
}
|
||||
$collectionSizeCache[$collectionId] = 0;
|
||||
}
|
||||
}
|
||||
|
||||
$value = $collectionSizeCache[$collectionId];
|
||||
|
||||
$diff = $value - $previousValue;
|
||||
if ($diff === 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
$keys = [
|
||||
$key,
|
||||
\str_replace(['{databaseInternalId}'], [$data[0]], METRIC_DATABASE_ID_STORAGE),
|
||||
METRIC_DATABASES_STORAGE
|
||||
];
|
||||
|
||||
foreach ($keys as $metric) {
|
||||
$projectDocuments[] = $this->createStatsDocument($id, $period, $time, $metric, $diff);
|
||||
}
|
||||
break;
|
||||
|
||||
case METRIC_DATABASE_LEVEL_STORAGE:
|
||||
$databaseInternalId = $data[0];
|
||||
$databaseId = "database_{$databaseInternalId}";
|
||||
|
||||
if (!isset($databaseCache[$databaseId])) {
|
||||
try {
|
||||
$databaseCache[$databaseId] = $dbForProject->find($databaseId);
|
||||
} catch (\Exception $e) {
|
||||
if (!$e instanceof NotFound) {
|
||||
throw $e;
|
||||
}
|
||||
$databaseCache[$databaseId] = [];
|
||||
}
|
||||
}
|
||||
|
||||
foreach ($databaseCache[$databaseId] as $collection) {
|
||||
$collectionId = "{$databaseId}_collection_{$collection->getInternalId()}";
|
||||
|
||||
if (!isset($collectionSizeCache[$collectionId])) {
|
||||
try {
|
||||
$collectionSizeCache[$collectionId] = $dbForProject->getSizeOfCollection($collectionId);
|
||||
} catch (\Exception $e) {
|
||||
if (!$e instanceof NotFound) {
|
||||
throw $e;
|
||||
}
|
||||
$collectionSizeCache[$collectionId] = 0;
|
||||
}
|
||||
}
|
||||
$value += $collectionSizeCache[$collectionId];
|
||||
}
|
||||
|
||||
$diff = $value - $previousValue;
|
||||
if ($diff === 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
$keys = [
|
||||
\str_replace(['{databaseInternalId}'], [$data[0]], METRIC_DATABASE_ID_STORAGE),
|
||||
METRIC_DATABASES_STORAGE
|
||||
];
|
||||
|
||||
foreach ($keys as $metric) {
|
||||
$projectDocuments[] = $this->createStatsDocument($id, $period, $time, $metric, $diff);
|
||||
}
|
||||
break;
|
||||
|
||||
case METRIC_PROJECT_LEVEL_STORAGE:
|
||||
if (!isset($databaseCache['*'])) {
|
||||
try {
|
||||
$databaseCache['*'] = $dbForProject->find('databases');
|
||||
} catch (\Exception $e) {
|
||||
if (!$e instanceof NotFound) {
|
||||
throw $e;
|
||||
}
|
||||
$databaseCache['*'] = [];
|
||||
}
|
||||
}
|
||||
|
||||
foreach ($databaseCache['*'] as $database) {
|
||||
$databaseId = "database_{$database->getInternalId()}";
|
||||
if (!isset($databaseCache[$databaseId])) {
|
||||
try {
|
||||
$databaseCache[$databaseId] = $dbForProject->find($databaseId);
|
||||
} catch (\Exception $e) {
|
||||
if (!$e instanceof NotFound) {
|
||||
throw $e;
|
||||
}
|
||||
$databaseCache[$databaseId] = [];
|
||||
}
|
||||
}
|
||||
|
||||
foreach ($databaseCache[$databaseId] as $collection) {
|
||||
$collectionId = "{$databaseId}_collection_{$collection->getInternalId()}";
|
||||
|
||||
if (!isset($collectionSizeCache[$collectionId])) {
|
||||
try {
|
||||
$collectionSizeCache[$collectionId] = $dbForProject->getSizeOfCollection($collectionId);
|
||||
} catch (\Exception $e) {
|
||||
if (!$e instanceof NotFound) {
|
||||
throw $e;
|
||||
}
|
||||
$collectionSizeCache[$collectionId] = 0;
|
||||
}
|
||||
}
|
||||
$value += $collectionSizeCache[$collectionId];
|
||||
}
|
||||
}
|
||||
|
||||
$diff = $value - $previousValue;
|
||||
if ($diff === 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
$keys = [
|
||||
METRIC_DATABASES_STORAGE
|
||||
];
|
||||
|
||||
foreach ($keys as $metric) {
|
||||
$projectDocuments[] = $this->createStatsDocument($id, $period, $time, $metric, $diff);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
private function createStatsDocument(
|
||||
string $id,
|
||||
string $period,
|
||||
?string $time,
|
||||
string $key,
|
||||
int $diff,
|
||||
): Document {
|
||||
return new Document([
|
||||
'$id' => $id,
|
||||
'period' => $period,
|
||||
'time' => $time,
|
||||
'metric' => $key,
|
||||
'value' => $diff,
|
||||
'region' => System::getEnv('_APP_REGION', 'default'),
|
||||
]);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user