mirror of
https://github.com/appwrite/appwrite.git
synced 2026-05-26 13:51:13 +00:00
Merge branch '1.6.x' of https://github.com/appwrite/appwrite into multi-region-support
# Conflicts: # composer.lock
This commit is contained in:
@@ -71,7 +71,7 @@ App::post('/v1/console/assistant')
|
||||
->param('prompt', '', new Text(2000), 'Prompt. A string containing questions asked to the AI assistant.')
|
||||
->inject('response')
|
||||
->action(function (string $prompt, Response $response) {
|
||||
$ch = curl_init('http://appwrite-assistant:3003/');
|
||||
$ch = curl_init('http://appwrite-assistant:3003/v1/models/assistant/prompt');
|
||||
$responseHeaders = [];
|
||||
$query = json_encode(['prompt' => $prompt]);
|
||||
$headers = ['accept: text/event-stream'];
|
||||
|
||||
@@ -24,7 +24,6 @@ use Appwrite\Utopia\Response\Model\Rule;
|
||||
use Executor\Executor;
|
||||
use MaxMind\Db\Reader;
|
||||
use Utopia\Abuse\Abuse;
|
||||
use Utopia\Abuse\Adapters\Database\TimeLimit;
|
||||
use Utopia\App;
|
||||
use Utopia\CLI\Console;
|
||||
use Utopia\Config\Config;
|
||||
@@ -180,22 +179,23 @@ App::post('/v1/functions')
|
||||
->inject('request')
|
||||
->inject('response')
|
||||
->inject('dbForProject')
|
||||
->inject('timelimit')
|
||||
->inject('project')
|
||||
->inject('user')
|
||||
->inject('queueForEvents')
|
||||
->inject('queueForBuilds')
|
||||
->inject('dbForPlatform')
|
||||
->inject('gitHub')
|
||||
->action(function (string $functionId, string $name, string $runtime, array $execute, array $events, string $schedule, int $timeout, bool $enabled, bool $logging, string $entrypoint, string $commands, array $scopes, string $installationId, string $providerRepositoryId, string $providerBranch, bool $providerSilentMode, string $providerRootDirectory, string $templateRepository, string $templateOwner, string $templateRootDirectory, string $templateVersion, string $specification, Request $request, Response $response, Database $dbForProject, Document $project, Document $user, Event $queueForEvents, Build $queueForBuilds, Database $dbForPlatform, GitHub $github) use ($redeployVcs) {
|
||||
->action(function (string $functionId, string $name, string $runtime, array $execute, array $events, string $schedule, int $timeout, bool $enabled, bool $logging, string $entrypoint, string $commands, array $scopes, string $installationId, string $providerRepositoryId, string $providerBranch, bool $providerSilentMode, string $providerRootDirectory, string $templateRepository, string $templateOwner, string $templateRootDirectory, string $templateVersion, string $specification, Request $request, Response $response, Database $dbForProject, callable $timelimit, Document $project, Document $user, Event $queueForEvents, Build $queueForBuilds, Database $dbForPlatform, GitHub $github) use ($redeployVcs) {
|
||||
$functionId = ($functionId == 'unique()') ? ID::unique() : $functionId;
|
||||
|
||||
// Temporary abuse check
|
||||
$abuseCheck = function () use ($project, $dbForProject, $response) {
|
||||
$abuseCheck = function () use ($project, $timelimit, $response) {
|
||||
$abuseKey = "projectId:{projectId},url:{url}";
|
||||
$abuseLimit = App::getEnv('_APP_FUNCTIONS_CREATION_ABUSE_LIMIT', 50);
|
||||
$abuseTime = 86400; // 1 day
|
||||
|
||||
$timeLimit = new TimeLimit($abuseKey, $abuseLimit, $abuseTime, $dbForProject);
|
||||
$timeLimit = $timelimit($abuseKey, $abuseLimit, $abuseTime);
|
||||
$timeLimit
|
||||
->setParam('{projectId}', $project->getId())
|
||||
->setParam('{url}', '/v1/functions');
|
||||
@@ -203,7 +203,7 @@ App::post('/v1/functions')
|
||||
$abuse = new Abuse($timeLimit);
|
||||
$remaining = $timeLimit->remaining();
|
||||
$limit = $timeLimit->limit();
|
||||
$time = (new \DateTime($timeLimit->time()))->getTimestamp() + $abuseTime;
|
||||
$time = $timeLimit->time() + $abuseTime;
|
||||
|
||||
$response
|
||||
->addHeader('X-RateLimit-Limit', $limit)
|
||||
|
||||
@@ -16,7 +16,6 @@ use Appwrite\Utopia\Database\Validator\Queries\Projects;
|
||||
use Appwrite\Utopia\Request;
|
||||
use Appwrite\Utopia\Response;
|
||||
use PHPMailer\PHPMailer\PHPMailer;
|
||||
use Utopia\Abuse\Adapters\Database\TimeLimit;
|
||||
use Utopia\App;
|
||||
use Utopia\Audit\Audit;
|
||||
use Utopia\Cache\Cache;
|
||||
@@ -237,9 +236,6 @@ App::post('/v1/projects')
|
||||
if ($create || $projectTables) {
|
||||
$audit = new Audit($dbForProject);
|
||||
$audit->setup();
|
||||
|
||||
$abuse = new TimeLimit('', 0, 1, $dbForProject);
|
||||
$abuse->setup();
|
||||
}
|
||||
|
||||
if (!$create && $sharedTablesV1) {
|
||||
@@ -253,17 +249,6 @@ App::post('/v1/projects')
|
||||
'indexes' => $indexes,
|
||||
'documentSecurity' => true
|
||||
]));
|
||||
|
||||
$attributes = \array_map(fn ($attribute) => new Document($attribute), TimeLimit::ATTRIBUTES);
|
||||
$indexes = \array_map(fn (array $index) => new Document($index), TimeLimit::INDEXES);
|
||||
$dbForProject->createDocument(Database::METADATA, new Document([
|
||||
'$id' => ID::custom('abuse'),
|
||||
'$permissions' => [Permission::create(Role::any())],
|
||||
'name' => 'abuse',
|
||||
'attributes' => $attributes,
|
||||
'indexes' => $indexes,
|
||||
'documentSecurity' => true
|
||||
]));
|
||||
}
|
||||
|
||||
if ($create || $sharedTablesV1) {
|
||||
|
||||
@@ -91,6 +91,15 @@ function router(App $utopia, Database $dbForPlatform, callable $getProjectDB, Sw
|
||||
$project = Authorization::skip(
|
||||
fn () => $dbForPlatform->getDocument('projects', $projectId)
|
||||
);
|
||||
|
||||
if (!$project->isEmpty() && $project->getId() !== 'console') {
|
||||
$accessedAt = $project->getAttribute('accessedAt', '');
|
||||
if (DateTime::formatTz(DateTime::addSeconds(new \DateTime(), -APP_PROJECT_ACCESS)) > $accessedAt) {
|
||||
$project->setAttribute('accessedAt', DateTime::now());
|
||||
Authorization::skip(fn () => $dbForPlatform->updateDocument('projects', $project->getId(), $project));
|
||||
}
|
||||
}
|
||||
|
||||
if (array_key_exists('proxy', $project->getAttribute('services', []))) {
|
||||
$status = $project->getAttribute('services', [])['proxy'];
|
||||
if (!$status) {
|
||||
|
||||
@@ -19,7 +19,6 @@ use Appwrite\Extend\Exception as AppwriteException;
|
||||
use Appwrite\Utopia\Request;
|
||||
use Appwrite\Utopia\Response;
|
||||
use Utopia\Abuse\Abuse;
|
||||
use Utopia\Abuse\Adapters\Database\TimeLimit;
|
||||
use Utopia\App;
|
||||
use Utopia\Cache\Adapter\Filesystem;
|
||||
use Utopia\Cache\Cache;
|
||||
@@ -420,8 +419,9 @@ App::init()
|
||||
->inject('queueForBuilds')
|
||||
->inject('queueForUsage')
|
||||
->inject('dbForProject')
|
||||
->inject('timelimit')
|
||||
->inject('mode')
|
||||
->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Connection $queue, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Usage $queueForUsage, Database $dbForProject, string $mode) use ($usageDatabaseListener, $eventDatabaseListener) {
|
||||
->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Connection $queue, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Usage $queueForUsage, Database $dbForProject, callable $timelimit, string $mode) use ($usageDatabaseListener, $eventDatabaseListener) {
|
||||
|
||||
$route = $utopia->getRoute();
|
||||
|
||||
@@ -444,7 +444,7 @@ App::init()
|
||||
foreach ($abuseKeyLabel as $abuseKey) {
|
||||
$start = $request->getContentRangeStart();
|
||||
$end = $request->getContentRangeEnd();
|
||||
$timeLimit = new TimeLimit($abuseKey, $route->getLabel('abuse-limit', 0), $route->getLabel('abuse-time', 3600), $dbForProject);
|
||||
$timeLimit = $timelimit($abuseKey, $route->getLabel('abuse-limit', 0), $route->getLabel('abuse-time', 3600));
|
||||
$timeLimit
|
||||
->setParam('{projectId}', $project->getId())
|
||||
->setParam('{userId}', $user->getId())
|
||||
@@ -472,7 +472,7 @@ App::init()
|
||||
$abuse = new Abuse($timeLimit);
|
||||
$remaining = $timeLimit->remaining();
|
||||
$limit = $timeLimit->limit();
|
||||
$time = (new \DateTime($timeLimit->time()))->getTimestamp() + $route->getLabel('abuse-time', 3600);
|
||||
$time = $timeLimit->time() + $route->getLabel('abuse-time', 3600);
|
||||
|
||||
if ($limit && ($remaining < $closestLimit || is_null($closestLimit))) {
|
||||
$closestLimit = $remaining;
|
||||
|
||||
@@ -10,10 +10,8 @@ use Swoole\Http\Response as SwooleResponse;
|
||||
use Swoole\Http\Server;
|
||||
use Swoole\Process;
|
||||
use Swoole\Table;
|
||||
use Utopia\Abuse\Adapters\Database\TimeLimit;
|
||||
use Utopia\App;
|
||||
use Utopia\Audit\Audit;
|
||||
use Utopia\Cache\Cache;
|
||||
use Utopia\CLI\Console;
|
||||
use Utopia\Config\Config;
|
||||
use Utopia\Database\Database;
|
||||
@@ -200,11 +198,6 @@ $http->on(Constant::EVENT_START, function (Server $http) use ($payloadSize, $reg
|
||||
$audit->setup();
|
||||
}
|
||||
|
||||
if ($dbForPlatform->getCollection(TimeLimit::COLLECTION)->isEmpty()) {
|
||||
$adapter = new TimeLimit("", 0, 1, $dbForPlatform);
|
||||
$adapter->setup();
|
||||
}
|
||||
|
||||
/** @var array $collections */
|
||||
$collections = Config::getParam('collections', []);
|
||||
$consoleCollections = $collections['console'];
|
||||
|
||||
+34
-7
@@ -48,6 +48,7 @@ use Appwrite\Utopia\Request;
|
||||
use MaxMind\Db\Reader;
|
||||
use PHPMailer\PHPMailer\PHPMailer;
|
||||
use Swoole\Database\PDOProxy;
|
||||
use Utopia\Abuse\Adapters\TimeLimit\Redis as TimeLimitRedis;
|
||||
use Utopia\App;
|
||||
use Utopia\Cache\Adapter\Redis as RedisCache;
|
||||
use Utopia\Cache\Adapter\Sharding;
|
||||
@@ -231,6 +232,11 @@ const API_KEY_DYNAMIC = 'dynamic';
|
||||
// Usage metrics
|
||||
const METRIC_TEAMS = 'teams';
|
||||
const METRIC_USERS = 'users';
|
||||
const METRIC_WEBHOOKS_SENT = 'webhooks.events.sent';
|
||||
const METRIC_WEBHOOKS_FAILED = 'webhooks.events.failed';
|
||||
const METRIC_WEBHOOK_ID_SENT = '{webhookInternalId}.webhooks.events.sent';
|
||||
const METRIC_WEBHOOK_ID_FAILED = '{webhookInternalId}.webhooks.events.failed';
|
||||
|
||||
|
||||
const METRIC_AUTH_METHOD_PHONE = 'auth.method.phone';
|
||||
const METRIC_AUTH_METHOD_PHONE_COUNTRY_CODE = METRIC_AUTH_METHOD_PHONE . '.{countryCode}';
|
||||
@@ -853,31 +859,31 @@ $register->set('pools', function () {
|
||||
$connections = [
|
||||
'console' => [
|
||||
'type' => 'database',
|
||||
'dsns' => System::getEnv('_APP_CONNECTIONS_DB_CONSOLE', $fallbackForDB),
|
||||
'dsns' => $fallbackForDB,
|
||||
'multiple' => false,
|
||||
'schemes' => ['mariadb', 'mysql'],
|
||||
],
|
||||
'database' => [
|
||||
'type' => 'database',
|
||||
'dsns' => System::getEnv('_APP_CONNECTIONS_DB_PROJECT', $fallbackForDB),
|
||||
'dsns' => $fallbackForDB,
|
||||
'multiple' => true,
|
||||
'schemes' => ['mariadb', 'mysql'],
|
||||
],
|
||||
'queue' => [
|
||||
'type' => 'queue',
|
||||
'dsns' => System::getEnv('_APP_CONNECTIONS_QUEUE', $fallbackForRedis),
|
||||
'dsns' => $fallbackForRedis,
|
||||
'multiple' => false,
|
||||
'schemes' => ['redis'],
|
||||
],
|
||||
'pubsub' => [
|
||||
'type' => 'pubsub',
|
||||
'dsns' => System::getEnv('_APP_CONNECTIONS_PUBSUB', $fallbackForRedis),
|
||||
'dsns' => $fallbackForRedis,
|
||||
'multiple' => false,
|
||||
'schemes' => ['redis'],
|
||||
],
|
||||
'cache' => [
|
||||
'type' => 'cache',
|
||||
'dsns' => System::getEnv('_APP_CONNECTIONS_CACHE', $fallbackForRedis),
|
||||
'dsns' => $fallbackForRedis,
|
||||
'multiple' => true,
|
||||
'schemes' => ['redis'],
|
||||
],
|
||||
@@ -949,12 +955,12 @@ $register->set('pools', function () {
|
||||
});
|
||||
},
|
||||
'redis' => function () use ($dsnHost, $dsnPort, $dsnPass) {
|
||||
$redis = new Redis();
|
||||
$redis = new \Redis();
|
||||
@$redis->pconnect($dsnHost, (int)$dsnPort);
|
||||
if ($dsnPass) {
|
||||
$redis->auth($dsnPass);
|
||||
}
|
||||
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
|
||||
$redis->setOption(\Redis::OPT_READ_TIMEOUT, -1);
|
||||
|
||||
return $redis;
|
||||
},
|
||||
@@ -1538,6 +1544,27 @@ App::setResource('cache', function (Group $pools) {
|
||||
return new Cache(new Sharding($adapters));
|
||||
}, ['pools']);
|
||||
|
||||
App::setResource('redis', function () {
|
||||
$host = System::getEnv('_APP_REDIS_HOST', 'localhost');
|
||||
$port = System::getEnv('_APP_REDIS_PORT', 6379);
|
||||
$pass = System::getEnv('_APP_REDIS_PASS', '');
|
||||
|
||||
$redis = new \Redis();
|
||||
@$redis->pconnect($host, (int)$port);
|
||||
if ($pass) {
|
||||
$redis->auth($pass);
|
||||
}
|
||||
$redis->setOption(\Redis::OPT_READ_TIMEOUT, -1);
|
||||
|
||||
return $redis;
|
||||
});
|
||||
|
||||
App::setResource('timelimit', function (\Redis $redis) {
|
||||
return function (string $key, int $limit, int $time) use ($redis) {
|
||||
return new TimeLimitRedis($key, $limit, $time, $redis);
|
||||
};
|
||||
}, ['redis']);
|
||||
|
||||
App::setResource('deviceForLocal', function () {
|
||||
return new Local();
|
||||
});
|
||||
|
||||
+32
-6
@@ -13,7 +13,7 @@ use Swoole\Runtime;
|
||||
use Swoole\Table;
|
||||
use Swoole\Timer;
|
||||
use Utopia\Abuse\Abuse;
|
||||
use Utopia\Abuse\Adapters\Database\TimeLimit;
|
||||
use Utopia\Abuse\Adapters\TimeLimit\Redis as TimeLimitRedis;
|
||||
use Utopia\App;
|
||||
use Utopia\Cache\Adapter\Sharding;
|
||||
use Utopia\Cache\Cache;
|
||||
@@ -138,6 +138,32 @@ if (!function_exists('getCache')) {
|
||||
}
|
||||
}
|
||||
|
||||
// Allows overriding
|
||||
if (!function_exists('getRedis')) {
|
||||
function getRedis(): \Redis
|
||||
{
|
||||
$host = System::getEnv('_APP_REDIS_HOST', 'localhost');
|
||||
$port = System::getEnv('_APP_REDIS_PORT', 6379);
|
||||
$pass = System::getEnv('_APP_REDIS_PASS', '');
|
||||
|
||||
$redis = new \Redis();
|
||||
@$redis->pconnect($host, (int)$port);
|
||||
if ($pass) {
|
||||
$redis->auth($pass);
|
||||
}
|
||||
$redis->setOption(\Redis::OPT_READ_TIMEOUT, -1);
|
||||
|
||||
return $redis;
|
||||
}
|
||||
}
|
||||
|
||||
if (!function_exists('getTimelimit')) {
|
||||
function getTimelimit(): TimeLimitRedis
|
||||
{
|
||||
return new TimeLimitRedis("", 0, 1, getRedis());
|
||||
}
|
||||
}
|
||||
|
||||
if (!function_exists('getRealtime')) {
|
||||
function getRealtime(): Realtime
|
||||
{
|
||||
@@ -481,7 +507,7 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server,
|
||||
throw new AppwriteException(AppwriteException::GENERAL_API_DISABLED);
|
||||
}
|
||||
|
||||
$dbForProject = getProjectDB($project);
|
||||
$timelimit = $app->getResource('timelimit');
|
||||
$console = $app->getResource('console'); /** @var Document $console */
|
||||
$user = $app->getResource('user'); /** @var Document $user */
|
||||
|
||||
@@ -490,12 +516,12 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server,
|
||||
*
|
||||
* Abuse limits are connecting 128 times per minute and ip address.
|
||||
*/
|
||||
$timeLimit = new TimeLimit('url:{url},ip:{ip}', 128, 60, $dbForProject);
|
||||
$timeLimit
|
||||
$timelimit = $timelimit('url:{url},ip:{ip}', 128, 60);
|
||||
$timelimit
|
||||
->setParam('{ip}', $request->getIP())
|
||||
->setParam('{url}', $request->getURI());
|
||||
|
||||
$abuse = new Abuse($timeLimit);
|
||||
$abuse = new Abuse($timelimit);
|
||||
|
||||
if (System::getEnv('_APP_OPTIONS_ABUSE', 'enabled') === 'enabled' && $abuse->check()) {
|
||||
throw new Exception(Exception::REALTIME_TOO_MANY_MESSAGES, 'Too many requests');
|
||||
@@ -593,7 +619,7 @@ $server->onMessage(function (int $connection, string $message) use ($server, $re
|
||||
*
|
||||
* Abuse limits are sending 32 times per minute and connection.
|
||||
*/
|
||||
$timeLimit = new TimeLimit('url:{url},connection:{connection}', 32, 60, $database);
|
||||
$timeLimit = getTimelimit('url:{url},connection:{connection}', 32, 60);
|
||||
|
||||
$timeLimit
|
||||
->setParam('{connection}', $connection)
|
||||
|
||||
+23
-1
@@ -17,6 +17,7 @@ use Appwrite\Event\Usage;
|
||||
use Appwrite\Event\UsageDump;
|
||||
use Appwrite\Platform\Appwrite;
|
||||
use Swoole\Runtime;
|
||||
use Utopia\Abuse\Adapters\TimeLimit\Redis as TimeLimitRedis;
|
||||
use Utopia\Cache\Adapter\Sharding;
|
||||
use Utopia\Cache\Cache;
|
||||
use Utopia\CLI\Console;
|
||||
@@ -173,7 +174,7 @@ Server::setResource('getProjectDB', function (Group $pools, Database $dbForPlatf
|
||||
}, ['pools', 'dbForPlatform', 'cache']);
|
||||
|
||||
Server::setResource('abuseRetention', function () {
|
||||
return DateTime::addSeconds(new \DateTime(), -1 * System::getEnv('_APP_MAINTENANCE_RETENTION_ABUSE', 86400));
|
||||
return time() - (int) System::getEnv('_APP_MAINTENANCE_RETENTION_ABUSE', 86400);
|
||||
});
|
||||
|
||||
Server::setResource('auditRetention', function () {
|
||||
@@ -200,6 +201,27 @@ Server::setResource('cache', function (Registry $register) {
|
||||
return new Cache(new Sharding($adapters));
|
||||
}, ['register']);
|
||||
|
||||
Server::setResource('redis', function () {
|
||||
$host = System::getEnv('_APP_REDIS_HOST', 'localhost');
|
||||
$port = System::getEnv('_APP_REDIS_PORT', 6379);
|
||||
$pass = System::getEnv('_APP_REDIS_PASS', '');
|
||||
|
||||
$redis = new \Redis();
|
||||
@$redis->pconnect($host, (int)$port);
|
||||
if ($pass) {
|
||||
$redis->auth($pass);
|
||||
}
|
||||
$redis->setOption(\Redis::OPT_READ_TIMEOUT, -1);
|
||||
|
||||
return $redis;
|
||||
});
|
||||
|
||||
Server::setResource('timelimit', function (\Redis $redis) {
|
||||
return function (string $key, int $limit, int $time) use ($redis) {
|
||||
return new TimeLimitRedis($key, $limit, $time, $redis);
|
||||
};
|
||||
}, ['redis']);
|
||||
|
||||
Server::setResource('log', fn () => new Log());
|
||||
|
||||
Server::setResource('queueForUsage', function (Connection $queue) {
|
||||
|
||||
+1
-1
@@ -45,7 +45,7 @@
|
||||
"ext-sockets": "*",
|
||||
"appwrite/php-runtimes": "0.16.*",
|
||||
"appwrite/php-clamav": "2.0.*",
|
||||
"utopia-php/abuse": "0.43.*",
|
||||
"utopia-php/abuse": "0.45.*",
|
||||
"utopia-php/analytics": "0.10.*",
|
||||
"utopia-php/audit": "0.43.*",
|
||||
"utopia-php/cache": "0.11.*",
|
||||
|
||||
+1
-1
@@ -870,7 +870,7 @@ services:
|
||||
|
||||
appwrite-assistant:
|
||||
container_name: appwrite-assistant
|
||||
image: appwrite/assistant:0.5.0
|
||||
image: appwrite/assistant:0.7.0
|
||||
networks:
|
||||
- appwrite
|
||||
environment:
|
||||
|
||||
@@ -42,6 +42,7 @@ class Usage extends Event
|
||||
*/
|
||||
public function addMetric(string $key, int $value): self
|
||||
{
|
||||
|
||||
$this->metrics[] = [
|
||||
'key' => $key,
|
||||
'value' => $value,
|
||||
@@ -62,10 +63,15 @@ class Usage extends Event
|
||||
}
|
||||
|
||||
$client = new Client($this->queue, $this->connection);
|
||||
return $client->enqueue([
|
||||
|
||||
$result = $client->enqueue([
|
||||
'project' => $this->getProject(),
|
||||
'reduce' => $this->reduce,
|
||||
'metrics' => $this->metrics,
|
||||
]);
|
||||
|
||||
$this->metrics = [];
|
||||
|
||||
return $result;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
namespace Appwrite\Event;
|
||||
|
||||
use Utopia\Database\Document;
|
||||
use Utopia\Queue\Connection;
|
||||
|
||||
class Webhook extends Event
|
||||
@@ -14,4 +15,16 @@ class Webhook extends Event
|
||||
->setQueue(Event::WEBHOOK_QUEUE_NAME)
|
||||
->setClass(Event::WEBHOOK_CLASS_NAME);
|
||||
}
|
||||
|
||||
public function trigger(): string|bool
|
||||
{
|
||||
/** Filter out context and trim project to keep the payload small */
|
||||
$this->context = [];
|
||||
$this->project = new Document([
|
||||
'$id' => $this->project->getId(),
|
||||
'$internalId' => $this->project->getInternalId(),
|
||||
]);
|
||||
|
||||
return parent::trigger();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ use Utopia\Database\DateTime;
|
||||
use Utopia\Database\Document;
|
||||
use Utopia\Database\Exception;
|
||||
use Utopia\Database\Query;
|
||||
use Utopia\Database\Validator\Authorization;
|
||||
use Utopia\Platform\Action;
|
||||
use Utopia\Pools\Group;
|
||||
use Utopia\System\System;
|
||||
@@ -39,6 +40,17 @@ abstract class ScheduleBase extends Action
|
||||
->callback(fn (Group $pools, Database $dbForPlatform, callable $getProjectDB) => $this->action($pools, $dbForPlatform, $getProjectDB));
|
||||
}
|
||||
|
||||
protected function updateProjectAccess(Document $project, Database $dbForPlatform): void
|
||||
{
|
||||
if (!$project->isEmpty() && $project->getId() !== 'console') {
|
||||
$accessedAt = $project->getAttribute('accessedAt', '');
|
||||
if (DateTime::formatTz(DateTime::addSeconds(new \DateTime(), -APP_PROJECT_ACCESS)) > $accessedAt) {
|
||||
$project->setAttribute('accessedAt', DateTime::now());
|
||||
Authorization::skip(fn () => $dbForPlatform->updateDocument('projects', $project->getId(), $project));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 1. Load all documents from 'schedules' collection to create local copy
|
||||
* 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute
|
||||
|
||||
@@ -57,6 +57,8 @@ class ScheduleExecutions extends ScheduleBase
|
||||
|
||||
$delay = $scheduledAt->getTimestamp() - (new \DateTime())->getTimestamp();
|
||||
|
||||
$this->updateProjectAccess($schedule['project'], $dbForPlatform);
|
||||
|
||||
\go(function () use ($queueForFunctions, $schedule, $delay, $data) {
|
||||
Co::sleep($delay);
|
||||
|
||||
|
||||
@@ -70,7 +70,7 @@ class ScheduleFunctions extends ScheduleBase
|
||||
}
|
||||
|
||||
foreach ($delayedExecutions as $delay => $scheduleKeys) {
|
||||
\go(function () use ($delay, $scheduleKeys, $pools) {
|
||||
\go(function () use ($delay, $scheduleKeys, $pools, $dbForPlatform) {
|
||||
\sleep($delay); // in seconds
|
||||
|
||||
$queue = $pools->get('queue')->pop();
|
||||
@@ -84,6 +84,8 @@ class ScheduleFunctions extends ScheduleBase
|
||||
|
||||
$schedule = $this->schedules[$scheduleKey];
|
||||
|
||||
$this->updateProjectAccess($schedule['project'], $dbForPlatform);
|
||||
|
||||
$queueForFunctions = new Func($connection);
|
||||
|
||||
$queueForFunctions
|
||||
|
||||
@@ -45,6 +45,8 @@ class ScheduleMessages extends ScheduleBase
|
||||
$connection = $queue->getResource();
|
||||
$queueForMessaging = new Messaging($connection);
|
||||
|
||||
$this->updateProjectAccess($schedule['project'], $dbForPlatform);
|
||||
|
||||
$queueForMessaging
|
||||
->setType(MESSAGE_SEND_TYPE_EXTERNAL)
|
||||
->setMessageId($schedule['resourceId'])
|
||||
|
||||
@@ -8,7 +8,6 @@ use Appwrite\Extend\Exception;
|
||||
use Executor\Executor;
|
||||
use Throwable;
|
||||
use Utopia\Abuse\Abuse;
|
||||
use Utopia\Abuse\Adapters\Database\TimeLimit;
|
||||
use Utopia\Audit\Audit;
|
||||
use Utopia\Cache\Adapter\Filesystem;
|
||||
use Utopia\Cache\Cache;
|
||||
@@ -47,6 +46,7 @@ class Deletes extends Action
|
||||
->inject('message')
|
||||
->inject('dbForPlatform')
|
||||
->inject('getProjectDB')
|
||||
->inject('timelimit')
|
||||
->inject('deviceForFiles')
|
||||
->inject('deviceForFunctions')
|
||||
->inject('deviceForBuilds')
|
||||
@@ -57,8 +57,8 @@ class Deletes extends Action
|
||||
->inject('auditRetention')
|
||||
->inject('log')
|
||||
->callback(
|
||||
fn ($message, $dbForPlatform, callable $getProjectDB, Device $deviceForFiles, Device $deviceForFunctions, Device $deviceForBuilds, Device $deviceForCache, CertificatesAdapter $certificates, string $abuseRetention, string $executionRetention, string $auditRetention, Log $log) =>
|
||||
$this->action($message, $dbForPlatform, $getProjectDB, $deviceForFiles, $deviceForFunctions, $deviceForBuilds, $deviceForCache, $certificates, $abuseRetention, $executionRetention, $auditRetention, $log)
|
||||
fn ($message, $dbForPlatform, callable $getProjectDB, callable $timelimit, Device $deviceForFiles, Device $deviceForFunctions, Device $deviceForBuilds, Device $deviceForCache, CertificatesAdapter $certificates, int $abuseRetention, string $executionRetention, string $auditRetention, Log $log) =>
|
||||
$this->action($message, $dbForPlatform, $getProjectDB, $timelimit, $deviceForFiles, $deviceForFunctions, $deviceForBuilds, $deviceForCache, $certificates, $abuseRetention, $executionRetention, $auditRetention, $log)
|
||||
);
|
||||
}
|
||||
|
||||
@@ -66,7 +66,7 @@ class Deletes extends Action
|
||||
* @throws Exception
|
||||
* @throws Throwable
|
||||
*/
|
||||
public function action(Message $message, Database $dbForPlatform, callable $getProjectDB, Device $deviceForFiles, Device $deviceForFunctions, Device $deviceForBuilds, Device $deviceForCache, CertificatesAdapter $certificates, string $abuseRetention, string $executionRetention, string $auditRetention, Log $log): void
|
||||
public function action(Message $message, Database $dbForPlatform, callable $getProjectDB, callable $timelimit, Device $deviceForFiles, Device $deviceForFunctions, Device $deviceForBuilds, Device $deviceForCache, CertificatesAdapter $certificates, int $abuseRetention, string $executionRetention, string $auditRetention, Log $log): void
|
||||
{
|
||||
$payload = $message->getPayload() ?? [];
|
||||
|
||||
@@ -126,7 +126,7 @@ class Deletes extends Action
|
||||
}
|
||||
break;
|
||||
case DELETE_TYPE_ABUSE:
|
||||
$this->deleteAbuseLogs($project, $getProjectDB, $abuseRetention);
|
||||
$this->deleteAbuseLogs($project, $timelimit, $abuseRetention);
|
||||
break;
|
||||
case DELETE_TYPE_REALTIME:
|
||||
$this->deleteRealtimeUsage($dbForPlatform, $datetime);
|
||||
@@ -494,8 +494,7 @@ class Deletes extends Action
|
||||
|
||||
$projectCollectionIds = [
|
||||
...\array_keys(Config::getParam('collections', [])['projects']),
|
||||
Audit::COLLECTION,
|
||||
TimeLimit::COLLECTION,
|
||||
Audit::COLLECTION
|
||||
];
|
||||
|
||||
$limit = \count($projectCollectionIds) + 25;
|
||||
@@ -704,15 +703,14 @@ class Deletes extends Action
|
||||
/**
|
||||
* @param Database $dbForPlatform
|
||||
* @param callable $getProjectDB
|
||||
* @param string $datetime
|
||||
* @param int $abuseRetention
|
||||
* @return void
|
||||
* @throws Exception
|
||||
*/
|
||||
private function deleteAbuseLogs(Document $project, callable $getProjectDB, string $abuseRetention): void
|
||||
private function deleteAbuseLogs(Document $project, callable $timelimit, int $abuseRetention): void
|
||||
{
|
||||
$projectId = $project->getId();
|
||||
$dbForProject = $getProjectDB($project);
|
||||
$timeLimit = new TimeLimit("", 0, 1, $dbForProject);
|
||||
$timeLimit = $timelimit("", 0, 1);
|
||||
$abuse = new Abuse($timeLimit);
|
||||
|
||||
try {
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
namespace Appwrite\Platform\Workers;
|
||||
|
||||
use Appwrite\Event\Mail;
|
||||
use Appwrite\Event\Usage;
|
||||
use Appwrite\Template\Template;
|
||||
use Exception;
|
||||
use Utopia\Database\Database;
|
||||
@@ -33,8 +34,9 @@ class Webhooks extends Action
|
||||
->inject('message')
|
||||
->inject('dbForPlatform')
|
||||
->inject('queueForMails')
|
||||
->inject('queueForUsage')
|
||||
->inject('log')
|
||||
->callback(fn (Message $message, Database $dbForPlatform, Mail $queueForMails, Log $log) => $this->action($message, $dbForPlatform, $queueForMails, $log));
|
||||
->callback(fn (Message $message, Database $dbForPlatform, Mail $queueForMails, Usage $queueForUsage, Log $log) => $this->action($message, $dbForPlatform, $queueForMails, $queueForUsage, $log));
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -45,7 +47,7 @@ class Webhooks extends Action
|
||||
* @return void
|
||||
* @throws Exception
|
||||
*/
|
||||
public function action(Message $message, Database $dbForPlatform, Mail $queueForMails, Log $log): void
|
||||
public function action(Message $message, Database $dbForPlatform, Mail $queueForMails, Usage $queueForUsage, Log $log): void
|
||||
{
|
||||
$this->errors = [];
|
||||
$payload = $message->getPayload() ?? [];
|
||||
@@ -56,14 +58,15 @@ class Webhooks extends Action
|
||||
|
||||
$events = $payload['events'];
|
||||
$webhookPayload = json_encode($payload['payload']);
|
||||
$project = new Document($payload['project']);
|
||||
$user = new Document($payload['user'] ?? []);
|
||||
|
||||
$project = new Document($payload['project']);
|
||||
$project = $dbForPlatform->getDocument('projects', $project->getId());
|
||||
$log->addTag('projectId', $project->getId());
|
||||
|
||||
foreach ($project->getAttribute('webhooks', []) as $webhook) {
|
||||
if (array_intersect($webhook->getAttribute('events', []), $events)) {
|
||||
$this->execute($events, $webhookPayload, $webhook, $user, $project, $dbForPlatform, $queueForMails);
|
||||
$this->execute($events, $webhookPayload, $webhook, $user, $project, $dbForPlatform, $queueForMails, $queueForUsage);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -82,7 +85,7 @@ class Webhooks extends Action
|
||||
* @param Mail $queueForMails
|
||||
* @return void
|
||||
*/
|
||||
private function execute(array $events, string $payload, Document $webhook, Document $user, Document $project, Database $dbForPlatform, Mail $queueForMails): void
|
||||
private function execute(array $events, string $payload, Document $webhook, Document $user, Document $project, Database $dbForPlatform, Mail $queueForMails, Usage $queueForUsage): void
|
||||
{
|
||||
if ($webhook->getAttribute('enabled') !== true) {
|
||||
return;
|
||||
@@ -165,11 +168,25 @@ class Webhooks extends Action
|
||||
$dbForPlatform->purgeCachedDocument('projects', $project->getId());
|
||||
|
||||
$this->errors[] = $logs;
|
||||
$queueForUsage
|
||||
->addMetric(METRIC_WEBHOOKS_FAILED, 1)
|
||||
->addMetric(str_replace('{webhookInternalId}', $webhook->getInternalId(), METRIC_WEBHOOK_ID_FAILED), 1)
|
||||
;
|
||||
|
||||
|
||||
} else {
|
||||
$webhook->setAttribute('attempts', 0); // Reset attempts on success
|
||||
$dbForPlatform->updateDocument('webhooks', $webhook->getId(), $webhook);
|
||||
$dbForPlatform->purgeCachedDocument('projects', $project->getId());
|
||||
$queueForUsage
|
||||
->addMetric(METRIC_WEBHOOKS_SENT, 1)
|
||||
->addMetric(str_replace('{webhookInternalId}', $webhook->getInternalId(), METRIC_WEBHOOK_ID_SENT), 1)
|
||||
;
|
||||
}
|
||||
|
||||
$queueForUsage
|
||||
->setProject($project)
|
||||
->trigger();
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
Reference in New Issue
Block a user