addressing comments

This commit is contained in:
shimon
2022-12-12 11:37:10 +02:00
parent 93c3b73291
commit 2f34c566e5
5 changed files with 91 additions and 117 deletions
+51 -18
View File
@@ -57,7 +57,11 @@ $databaseListener = function (string $event, array $args, Document $project, Usa
if ($event === Database::EVENT_DOCUMENT_DELETE) {
$value = -1;
}
//var_dump($document->getCollection());
/**
* On Documents that tied by relations like functions>deployments>build || documents>collection>database || buckets>files
* When we remove a parent document we need to deduct his children aggregation from the project scope
*/
switch (true) {
case $document->getCollection() === 'teams':
@@ -65,7 +69,8 @@ $databaseListener = function (string $event, array $args, Document $project, Usa
break;
case $document->getCollection() === 'users':
$queueForUsage->addMetric("users", $value); // per project
// sessions
// Project sessions deduction
if ($event === Database::EVENT_DOCUMENT_DELETE) {
$userSessions = (count($document->getAttribute('sessions')));
$sessions = $dbForProject->getDocument('stats', md5("_inf_sessions"));
@@ -86,7 +91,8 @@ $databaseListener = function (string $event, array $args, Document $project, Usa
$queueForUsage->addMetric("databases", $value); // per project
if ($event === Database::EVENT_DOCUMENT_DELETE) {
// Collections
//Project collections deduction
$dbCollections = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getId()}" . ".collections"));
$projectCollections = $dbForProject->getDocument('stats', md5("_inf_collections"));
if (!$dbCollections->isEmpty()) {
@@ -97,7 +103,8 @@ $databaseListener = function (string $event, array $args, Document $project, Usa
$dbCollections['value']
);
}
// Documents
//Project documents deduction
$dbDocuments = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getId()}" . ".documents"));
$projectDocuments = $dbForProject->getDocument('stats', md5("_inf_documents"));
if (!$dbDocuments->isEmpty()) {
@@ -117,7 +124,8 @@ $databaseListener = function (string $event, array $args, Document $project, Usa
;
if ($event === Database::EVENT_DOCUMENT_DELETE) {
// Documents
//Project documents deduction
$dbDocuments = $dbForProject->getDocument('stats', md5("_inf_" . "{$document['databaseId']}" . ".documents"));
$projectDocuments = $dbForProject->getDocument('stats', md5("_inf_documents"));
if (!$dbDocuments->isEmpty()) {
@@ -132,9 +140,6 @@ $databaseListener = function (string $event, array $args, Document $project, Usa
break;
case str_starts_with($document->getCollection(), 'database_') && str_contains($document->getCollection(), '_collection_'): //documents
var_dump($collection);
var_dump($document);
$queueForUsage
->addMetric("documents", $value) // per project
->addMetric("{$document->getAttribute('$databaseId')}" . ".documents", $value) // per database
@@ -145,7 +150,8 @@ $databaseListener = function (string $event, array $args, Document $project, Usa
$queueForUsage->addMetric("buckets", $value); // per project
if ($event === Database::EVENT_DOCUMENT_DELETE) {
// bucket Files
//Project files deduction
$bucketFiles = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getId()}" . ".files"));
$projectFiles = $dbForProject->getDocument('stats', md5("_inf_files"));
if (!$bucketFiles->isEmpty()) {
@@ -156,7 +162,7 @@ $databaseListener = function (string $event, array $args, Document $project, Usa
$bucketFiles['value']
);
}
// bucket Storage
//Project files storage deduction
$bucketStorage = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getId()}" . ".files.storage"));
$projectStorage = $dbForProject->getDocument('stats', md5("_inf_files.storage"));
if (!$bucketStorage->isEmpty()) {
@@ -181,7 +187,8 @@ $databaseListener = function (string $event, array $args, Document $project, Usa
$queueForUsage->addMetric("functions", $value); // per project
if ($event === Database::EVENT_DOCUMENT_DELETE) {
// Deployments Storage
//Project deployments deduction
$functionDeployments = $dbForProject->getDocument('stats', md5("_inf_function." . "{$document->getId()}" . ".deployments"));
$projectDeployments = $dbForProject->getDocument('stats', md5("_inf_deployments"));
if (!$functionDeployments->isEmpty()) {
@@ -192,7 +199,8 @@ $databaseListener = function (string $event, array $args, Document $project, Usa
$functionDeployments['value']
);
}
// Deployments Storage
//Project deployments storage deduction
$functionDeploymentsStorage = $dbForProject->getDocument('stats', md5("_inf_function." . "{$document->getId()}" . ".deployments.storage"));
$projectDeploymentsStorage = $dbForProject->getDocument('stats', md5("_inf_function.deployments.storage"));
if (!$functionDeployments->isEmpty()) {
@@ -204,9 +212,31 @@ $databaseListener = function (string $event, array $args, Document $project, Usa
);
}
//TODO Missing solution for builds (functionId)
//Project builds deduction
$functionBuilds = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getId()}" . ".builds"));
$projectBuilds = $dbForProject->getDocument('stats', md5("_inf_builds"));
if (!$functionBuilds->isEmpty()) {
$dbForProject->decreaseDocumentAttribute(
'stats',
$projectBuilds->getId(),
'value',
$functionBuilds['value']
);
}
// Executions
//Project builds storage deduction
$functionBuildsStorage = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getId()}" . ".builds.storage"));
$projectFunctionBuilds = $dbForProject->getDocument('stats', md5("_inf_builds.storage"));
if (!$functionBuildsStorage->isEmpty()) {
$dbForProject->decreaseDocumentAttribute(
'stats',
$projectFunctionBuilds->getId(),
'value',
$functionBuildsStorage['value']
);
}
//Project executions deduction
$functionExecutions = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getId()}" . ".executions"));
$projectExecutions = $dbForProject->getDocument('stats', md5("_inf_executions"));
if (!$functionExecutions->isEmpty()) {
@@ -217,7 +247,8 @@ $databaseListener = function (string $event, array $args, Document $project, Usa
$functionExecutions['value']
);
}
// Executions Compute
//Project executions compute deduction
$functionExecutionsCompute = $dbForProject->getDocument('stats', md5("_inf_" . "{$document->getId()}" . ".executions.compute"));
$projectExecutionsCompute = $dbForProject->getDocument('stats', md5("_inf_executions.compute"));
if (!$functionExecutionsCompute->isEmpty()) {
@@ -238,12 +269,14 @@ $databaseListener = function (string $event, array $args, Document $project, Usa
->addMetric("{$document['resourceType']}" . "." . "{$document['resourceId']}" . ".deployments.storage", $document->getAttribute('size') * $value) // per function
;
break;
case $document->getCollection() === 'builds': // todo needs to extract functionId
case $document->getCollection() === 'builds':
$deployment = $dbForProject->getDocument('deployments', $document->getAttribute('deploymentId')); // Todo temp fix
$queueForUsage
->addMetric("builds", $value) // per project
->addMetric("builds.storage", $document->getAttribute('size') * $value) // per project
->addMetric("{$document['functionId']}" . ".builds", $value) // per function
->addMetric("{$document['functionId']}" . ".builds.storage", $document->getAttribute('size') * $value) // per function
->addMetric("{$deployment['resourceId']}" . ".builds", $value) // per function
->addMetric("{$deployment['resourceId']}" . ".builds.storage", $document->getAttribute('size') * $value) // per function
;
break;
case $document->getCollection() === 'executions':
+1 -1
View File
@@ -100,7 +100,7 @@ Server::setResource('pools', function ($register) {
$pools = $register->get('pools');
$connection = $pools->get('queue')->pop()->getResource();
$workerNumber = swoole_cpu_num() * intval(App::getEnv('_APP_WORKER_PER_CORE', 6));
$workerNumber = 1;
if (empty(App::getEnv('QUEUE'))) {
throw new Exception('Please configure "QUEUE" environemnt variable.');
}
+37 -46
View File
@@ -43,60 +43,51 @@ $server->job()
}
});
$server->start();
$server
->workerStart()
->inject('register')
->inject('cache')
->inject('pools')
->action(function ($register, $cache, $pools) use ($periods, &$stats) {
Timer::tick(30000, function () use ($register, $cache, $pools, $periods, &$stats) {
Timer::tick(30000, function () use ($register, $cache, $pools, $periods, &$stats) {
$slice = array_slice($stats, 0, count($stats));
array_splice($stats, 0, count($stats));
foreach ($slice as $metric) {
foreach ($periods as $period => $format) {
$time = 'inf' === $period ? null : date($format, time());
$id = \md5("{$time}_{$period}_{$metric['key']}");
$dbForProject = new Database($pools->get($metric['database'])->pop()->getResource(), $cache);
$dbForProject->setNamespace('_' . $metric['projectInternalId']);
$slice = array_slice($stats, 0, count($stats));
array_splice($stats, 0, count($stats));
var_dump($slice);
foreach ($slice as $metric) {
foreach ($periods as $period => $format) {
$time = 'inf' === $period ? null : date($format, time());
$id = \md5("{$time}_{$period}_{$metric['key']}");
$dbForProject = new Database(
$pools
->get($metric['database'])
->pop()
->getResource(),
$cache
try {
$document = $dbForProject->getDocument('stats', $id);
if ($document->isEmpty()) {
//console::log("{$period}, {$time}, {$metric['key']}={$metric['value']}");
$dbForProject->createDocument('stats', new Document([
'$id' => $id,
'period' => $period,
'time' => $time,
'metric' => $metric['key'],
'value' => $metric['value'],
'region' => App::getEnv('_APP_REGION', 'default'),
]));
} else {
//console::info("{$document->getAttribute('period')}, {$document->getAttribute('time')}, {$document->getAttribute('metric')} = {$value}");
$dbForProject->increaseDocumentAttribute(
'stats',
$document->getId(),
'value',
$metric['value']
);
$dbForProject->setNamespace('_' . $metric['projectInternalId']);
try {
$document = $dbForProject->getDocument('stats', $id);
if ($document->isEmpty()) {
//console::log("{$period}, {$time}, {$metric['key']}={$metric['value']}");
$dbForProject->createDocument('stats', new Document([
'$id' => $id,
'period' => $period,
'time' => $time,
'metric' => $metric['key'],
'value' => $metric['value'],
'region' => App::getEnv('_APP_REGION', 'default'),
]));
} else {
//console::info("{$document->getAttribute('period')}, {$document->getAttribute('time')}, {$document->getAttribute('metric')} = {$value}");
$dbForProject->increaseDocumentAttribute(
'stats',
$document->getId(),
'value',
$metric['value']
);
}
} catch (\Exception $e) {
console::error($e->getMessage());
} finally {
$pools->reclaim();
}
}
} catch (\Exception $e) {
console::error($e->getMessage());
} finally {
$pools->reclaim();
}
});
}
}
});
});
$server->start();
+2 -2
View File
@@ -93,7 +93,7 @@ services:
- ./public:/usr/src/code/public
- ./src:/usr/src/code/src
- ./dev:/usr/local/dev
- ./vendor/utopia-php/database:/usr/src/code/vendor/utopia-php/database
#- ./vendor/utopia-php/database:/usr/src/code/vendor/utopia-php/database
depends_on:
- mariadb
- redis
@@ -535,7 +535,7 @@ services:
volumes:
- ./app:/usr/src/code/app
- ./src:/usr/src/code/src
- ./vendor/utopia-php/database:/usr/src/code/vendor/utopia-php/database
#- ./vendor/utopia-php/database:/usr/src/code/vendor/utopia-php/database
depends_on:
- redis
- mariadb
-50
View File
@@ -1,50 +0,0 @@
<?php
namespace Appwrite\Platform\Tasks;
use Utopia\CLI\Console;
use Utopia\Database\Database as UtopiaDatabase;
use Utopia\Platform\Action;
class Usage extends Action
{
public static function getName(): string
{
return 'usage';
}
public function __construct()
{
$this
->desc('Schedules syncing data from influxdb to Appwrite console db')
->inject('dbForConsole')
->inject('getProjectDB')
->inject('logError')
// ->callback(fn ($dbForConsole, $influxDB, $register, $getProjectDB, $logError) => $this->action($dbForConsole, $influxDB, $register, $getProjectDB, $logError))
;
}
public function action()
{
// Console::title('Usage Aggregation V1');
// Console::success(APP_NAME . ' usage aggregation process v1 has started');
// $errorLogger = fn(Throwable $error, string $action = 'syncUsageStats') => $logError($error, "usage", $action);
// $interval = (int) App::getEnv('_APP_USAGE_AGGREGATION_INTERVAL', '30'); // 30 seconds (by default)
// $region = App::getEnv('region', 'default');
// $usage = new TimeSeries($region, $dbForConsole, $influxDB, $getProjectDB, $register, $errorLogger);
// Console::loop(function () use ($interval, $usage) {
// $now = date('d-m-Y H:i:s', time());
// Console::info("[{$now}] Aggregating Timeseries Usage data every {$interval} seconds");
// $loopStart = microtime(true);
// $usage->collect();
// $loopTook = microtime(true) - $loopStart;
// $now = date('d-m-Y H:i:s', time());
// Console::info("[{$now}] Aggregation took {$loopTook} seconds");
// }, $interval);
}
}