Merge branch '1.9.x' into feat-email-template-reset

This commit is contained in:
Harsh Mahajan
2026-05-17 22:53:54 +05:30
committed by GitHub
11 changed files with 1172 additions and 482 deletions
+8
View File
@@ -240,6 +240,12 @@ $register->set('pools', function () {
'multiple' => true,
'schemes' => ['redis'],
],
'lock' => [
'type' => 'lock',
'dsns' => $fallbackForRedis,
'multiple' => false,
'schemes' => ['redis'],
],
];
$maxConnections = (int) System::getEnv('_APP_CONNECTIONS_MAX', 151);
@@ -369,6 +375,8 @@ $register->set('pools', function () {
}
return $adapter;
case 'lock':
return $resource();
default:
throw new Exception(Exception::GENERAL_SERVER_ERROR, "Server error: Missing adapter implementation.");
}
+11
View File
@@ -29,6 +29,7 @@ use Utopia\Database\Document;
use Utopia\Database\Validator\Authorization;
use Utopia\DI\Container;
use Utopia\DSN\DSN;
use Utopia\Lock\Distributed;
use Utopia\Pools\Group;
use Utopia\Queue\Broker\Pool as BrokerPool;
use Utopia\Queue\Publisher;
@@ -248,6 +249,16 @@ $container->set('redis', function () {
return $redis;
});
$container->set('locks', function (Group $pools) {
return function (string $key, int $ttl, callable $callback, float $timeout = 0.0) use ($pools): mixed {
return $pools->get('lock')->use(function (\Redis $redis) use ($key, $ttl, $callback, $timeout) {
$lock = new Distributed($redis, $key, ttl: $ttl);
return $lock->withLock($callback, timeout: $timeout);
});
};
}, ['pools']);
$container->set('timelimit', function (\Redis $redis) {
return function (string $key, int $limit, int $time) use ($redis) {
return new TimeLimitRedis($key, $limit, $time, $redis);
+1
View File
@@ -72,6 +72,7 @@
"utopia-php/validators": "0.2.*",
"utopia-php/image": "0.8.*",
"utopia-php/locale": "0.8.*",
"utopia-php/lock": "0.2.*",
"utopia-php/logger": "0.8.*",
"utopia-php/messaging": "0.22.*",
"utopia-php/migration": "1.*",
Generated
+58 -7
View File
@@ -4,7 +4,7 @@
"Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies",
"This file is @generated automatically"
],
"content-hash": "6404f075ed03ef6651ce9cea63518fa0",
"content-hash": "035685d1335039f13e16d0532c874b21",
"packages": [
{
"name": "adhocore/jwt",
@@ -4498,6 +4498,57 @@
},
"time": "2025-08-12T12:58:26+00:00"
},
{
"name": "utopia-php/lock",
"version": "0.2.0",
"source": {
"type": "git",
"url": "https://github.com/utopia-php/lock.git",
"reference": "49317c9493d8f747e4299aa24c22862aa5f6e106"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/utopia-php/lock/zipball/49317c9493d8f747e4299aa24c22862aa5f6e106",
"reference": "49317c9493d8f747e4299aa24c22862aa5f6e106",
"shasum": ""
},
"require": {
"php": ">=8.3"
},
"require-dev": {
"laravel/pint": "1.*",
"phpstan/phpstan": "2.*",
"phpunit/phpunit": "11.*",
"swoole/ide-helper": "*"
},
"suggest": {
"ext-pcntl": "Required to run the File lock tests",
"ext-redis": "Required for the Distributed lock",
"ext-swoole": "Required for the Mutex and Semaphore locks (>=6.0)"
},
"type": "library",
"autoload": {
"psr-4": {
"Utopia\\Lock\\": "src/"
}
},
"notification-url": "https://packagist.org/downloads/",
"license": [
"MIT"
],
"authors": [
{
"name": "Appwrite Team",
"email": "team@appwrite.io"
}
],
"description": "Mutex, semaphore, file and distributed locks for PHP — one interface, four backends.",
"support": {
"issues": "https://github.com/utopia-php/lock/issues",
"source": "https://github.com/utopia-php/lock/tree/0.2.0"
},
"time": "2026-04-24T10:47:56+00:00"
},
{
"name": "utopia-php/logger",
"version": "0.8.0",
@@ -5141,16 +5192,16 @@
},
{
"name": "utopia-php/storage",
"version": "2.0.2",
"version": "2.0.3",
"source": {
"type": "git",
"url": "https://github.com/utopia-php/storage.git",
"reference": "64e132a3768e22243eda36fe4262da22fd204f3c"
"reference": "37129cf0bfcc03210172000e4388d4d3495ae013"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/utopia-php/storage/zipball/64e132a3768e22243eda36fe4262da22fd204f3c",
"reference": "64e132a3768e22243eda36fe4262da22fd204f3c",
"url": "https://api.github.com/repos/utopia-php/storage/zipball/37129cf0bfcc03210172000e4388d4d3495ae013",
"reference": "37129cf0bfcc03210172000e4388d4d3495ae013",
"shasum": ""
},
"require": {
@@ -5187,9 +5238,9 @@
],
"support": {
"issues": "https://github.com/utopia-php/storage/issues",
"source": "https://github.com/utopia-php/storage/tree/2.0.2"
"source": "https://github.com/utopia-php/storage/tree/2.0.3"
},
"time": "2026-05-01T15:06:16+00:00"
"time": "2026-05-15T09:42:32+00:00"
},
{
"name": "utopia-php/system",
@@ -21,6 +21,7 @@ use Utopia\Database\Query;
use Utopia\Database\Validator\Authorization;
use Utopia\Database\Validator\UID;
use Utopia\Http\Adapter\Swoole\Request;
use Utopia\Lock\Exception\Contention as LockContention;
use Utopia\Platform\Action;
use Utopia\Platform\Scope\HTTP;
use Utopia\Storage\Device;
@@ -92,6 +93,7 @@ class Create extends Action
->inject('plan')
->inject('authorization')
->inject('platform')
->inject('locks')
->callback($this->action(...));
}
@@ -111,7 +113,8 @@ class Create extends Action
BuildPublisher $publisherForBuilds,
array $plan,
Authorization $authorization,
array $platform
array $platform,
callable $locks
) {
$activate = \strval($activate) === 'true' || \strval($activate) === '1';
@@ -193,20 +196,38 @@ class Create extends Action
// Save to storage
$fileSize ??= $deviceForLocal->getFileSize($fileTmpName);
$path = $deviceForFunctions->getPath($deploymentId . '.' . \pathinfo($fileName, PATHINFO_EXTENSION));
$deployment = $dbForProject->getDocument('deployments', $deploymentId);
$lockKey = 'functions:deployment:' . $project->getId() . ':' . $functionId . ':' . $deploymentId;
$metadata = ['content_type' => $deviceForLocal->getFileMimeType($fileTmpName)];
if (!$deployment->isEmpty()) {
$chunks = $deployment->getAttribute('sourceChunksTotal', 1);
$uploaded = $deployment->getAttribute('sourceChunksUploaded', 0);
$metadata = $deployment->getAttribute('sourceMetadata', []);
$completed = false;
if ($uploaded === $chunks) {
$response
->setStatusCode(Response::STATUS_CODE_ACCEPTED)
->dynamic($deployment, Response::MODEL_DEPLOYMENT);
return;
}
try {
$locks($lockKey, 600, function () use (&$chunks, $dbForProject, $deploymentId, &$metadata, &$completed, $response): void {
$deployment = $dbForProject->getDocument('deployments', $deploymentId);
if (!$deployment->isEmpty()) {
$chunks = $deployment->getAttribute('sourceChunksTotal', 1);
$uploaded = $deployment->getAttribute('sourceChunksUploaded', 0);
$metadata = $deployment->getAttribute('sourceMetadata', []);
if ($uploaded === $chunks) {
$response
->setStatusCode(Response::STATUS_CODE_ACCEPTED)
->dynamic($deployment, Response::MODEL_DEPLOYMENT);
$completed = true;
return;
}
}
}, timeout: 120.0);
} catch (LockContention) {
$response->addHeader('Retry-After', '5');
throw new Exception(Exception::GENERAL_RATE_LIMIT_EXCEEDED, 'Deployment upload is busy. Try again.');
}
if ($completed) {
return;
}
$chunksUploaded = $deviceForFunctions->upload($fileTmpName, $path, $chunk, $chunks, $metadata);
@@ -217,118 +238,144 @@ class Create extends Action
$type = $request->getHeader('x-sdk-language') === 'cli' ? 'cli' : 'manual';
if ($chunksUploaded === $chunks) {
if ($activate) {
// Remove deploy for all other deployments.
$activeDeployments = $dbForProject->find('deployments', [
Query::equal('activate', [true]),
Query::equal('resourceId', [$functionId]),
Query::equal('resourceType', ['functions'])
]);
try {
$locks($lockKey, 600, function () use ($activate, &$chunks, $chunksUploaded, $commands, $dbForProject, $deploymentId, $deviceForFunctions, $entrypoint, $fileSize, &$function, $functionId, $path, &$metadata, $platform, $project, $publisherForBuilds, $queueForEvents, $response, $type): void {
$deployment = $dbForProject->getDocument('deployments', $deploymentId);
$uploaded = 0;
foreach ($activeDeployments as $activeDeployment) {
$activeDeployment->setAttribute('activate', false);
$dbForProject->updateDocument('deployments', $activeDeployment->getId(), new Document([
'activate' => false,
]));
if (!$deployment->isEmpty()) {
$chunks = $deployment->getAttribute('sourceChunksTotal', 1);
$uploaded = $deployment->getAttribute('sourceChunksUploaded', 0);
$metadata = \array_merge($deployment->getAttribute('sourceMetadata', []), $metadata);
if ($uploaded === $chunks) {
$response
->setStatusCode(Response::STATUS_CODE_ACCEPTED)
->dynamic($deployment, Response::MODEL_DEPLOYMENT);
return;
}
}
}
$fileSize = $deviceForFunctions->getFileSize($path);
$chunksUploaded = max($uploaded, $chunksUploaded);
if ($deployment->isEmpty()) {
$deployment = $dbForProject->createDocument('deployments', new Document([
'$id' => $deploymentId,
'$permissions' => [
Permission::read(Role::any()),
Permission::update(Role::any()),
Permission::delete(Role::any()),
],
'resourceInternalId' => $function->getSequence(),
'resourceId' => $function->getId(),
'resourceType' => 'functions',
'entrypoint' => $entrypoint,
'buildCommands' => $commands,
'startCommand' => $function->getAttribute('startCommand', ''),
'sourcePath' => $path,
'sourceSize' => $fileSize,
'totalSize' => $fileSize,
'sourceChunksTotal' => $chunks,
'sourceChunksUploaded' => $chunksUploaded,
'activate' => $activate,
'sourceMetadata' => $metadata,
'type' => $type
]));
if ($chunksUploaded === $chunks && $uploaded < $chunks) {
if ($activate) {
// Remove deploy for all other deployments.
$activeDeployments = $dbForProject->find('deployments', [
Query::equal('activate', [true]),
Query::equal('resourceId', [$functionId]),
Query::equal('resourceType', ['functions'])
]);
$function = $dbForProject->updateDocument('functions', $function->getId(), new Document([
'latestDeploymentId' => $deployment->getId(),
'latestDeploymentInternalId' => $deployment->getSequence(),
'latestDeploymentCreatedAt' => $deployment->getCreatedAt(),
'latestDeploymentStatus' => $deployment->getAttribute('status', ''),
]));
} else {
$deployment = $dbForProject->updateDocument('deployments', $deploymentId, new Document([
'sourceSize' => $fileSize,
'sourceChunksUploaded' => $chunksUploaded,
'sourceMetadata' => $metadata,
]));
}
foreach ($activeDeployments as $activeDeployment) {
$dbForProject->updateDocument('deployments', $activeDeployment->getId(), new Document([
'activate' => false,
]));
}
}
// Start the build
$publisherForBuilds->enqueue(new BuildMessage(
project: $project,
resource: $function,
deployment: $deployment,
type: BUILD_TYPE_DEPLOYMENT,
platform: $platform,
));
} else {
if ($deployment->isEmpty()) {
$deployment = $dbForProject->createDocument('deployments', new Document([
'$id' => $deploymentId,
'$permissions' => [
Permission::read(Role::any()),
Permission::update(Role::any()),
Permission::delete(Role::any()),
],
'resourceInternalId' => $function->getSequence(),
'resourceId' => $function->getId(),
'resourceType' => 'functions',
'entrypoint' => $entrypoint,
'buildCommands' => $commands,
'startCommand' => $function->getAttribute('startCommand', ''),
'sourcePath' => $path,
'sourceSize' => $fileSize,
'totalSize' => $fileSize,
'sourceChunksTotal' => $chunks,
'sourceChunksUploaded' => $chunksUploaded,
'activate' => $activate,
'sourceMetadata' => $metadata,
'type' => $type
]));
$fileSize = $deviceForFunctions->getFileSize($path);
$function = $dbForProject->updateDocument('functions', $function->getId(), new Document([
'latestDeploymentId' => $deployment->getId(),
'latestDeploymentInternalId' => $deployment->getSequence(),
'latestDeploymentCreatedAt' => $deployment->getCreatedAt(),
'latestDeploymentStatus' => $deployment->getAttribute('status', ''),
]));
} else {
$deployment = $dbForProject->updateDocument('deployments', $deploymentId, new Document([
'sourceChunksUploaded' => $chunksUploaded,
'sourceMetadata' => $metadata,
]));
}
if ($deployment->isEmpty()) {
$deployment = $dbForProject->createDocument('deployments', new Document([
'$id' => $deploymentId,
'$permissions' => [
Permission::read(Role::any()),
Permission::update(Role::any()),
Permission::delete(Role::any()),
],
'resourceInternalId' => $function->getSequence(),
'resourceId' => $function->getId(),
'resourceType' => 'functions',
'entrypoint' => $entrypoint,
'buildCommands' => $commands,
'startCommand' => $function->getAttribute('startCommand', ''),
'sourcePath' => $path,
'sourceSize' => $fileSize,
'totalSize' => $fileSize,
'sourceChunksTotal' => $chunks,
'sourceChunksUploaded' => $chunksUploaded,
'activate' => $activate,
'sourceMetadata' => $metadata,
'type' => $type
]));
$function = $dbForProject->updateDocument('functions', $function->getId(), new Document([
'latestDeploymentId' => $deployment->getId(),
'latestDeploymentInternalId' => $deployment->getSequence(),
'latestDeploymentCreatedAt' => $deployment->getCreatedAt(),
'latestDeploymentStatus' => $deployment->getAttribute('status', ''),
]));
} else {
$deployment = $dbForProject->updateDocument('deployments', $deploymentId, new Document([
'sourceSize' => $fileSize,
'sourceChunksUploaded' => $chunksUploaded,
'sourceMetadata' => $metadata,
]));
}
// Start the build
$publisherForBuilds->enqueue(new BuildMessage(
project: $project,
resource: $function,
deployment: $deployment,
type: BUILD_TYPE_DEPLOYMENT,
platform: $platform,
));
} else {
if ($deployment->isEmpty()) {
$deployment = $dbForProject->createDocument('deployments', new Document([
'$id' => $deploymentId,
'$permissions' => [
Permission::read(Role::any()),
Permission::update(Role::any()),
Permission::delete(Role::any()),
],
'resourceInternalId' => $function->getSequence(),
'resourceId' => $function->getId(),
'resourceType' => 'functions',
'entrypoint' => $entrypoint,
'buildCommands' => $commands,
'startCommand' => $function->getAttribute('startCommand', ''),
'sourcePath' => $path,
'sourceSize' => $fileSize,
'totalSize' => $fileSize,
'sourceChunksTotal' => $chunks,
'sourceChunksUploaded' => $chunksUploaded,
'activate' => $activate,
'sourceMetadata' => $metadata,
'type' => $type
]));
$function = $dbForProject->updateDocument('functions', $function->getId(), new Document([
'latestDeploymentId' => $deployment->getId(),
'latestDeploymentInternalId' => $deployment->getSequence(),
'latestDeploymentCreatedAt' => $deployment->getCreatedAt(),
'latestDeploymentStatus' => $deployment->getAttribute('status', ''),
]));
} else {
$deployment = $dbForProject->updateDocument('deployments', $deploymentId, new Document([
'sourceChunksUploaded' => $chunksUploaded,
'sourceMetadata' => $metadata,
]));
}
}
$metadata = null;
if ($chunksUploaded === $chunks) {
$queueForEvents
->setParam('functionId', $function->getId())
->setParam('deploymentId', $deployment->getId());
}
$response
->setStatusCode(Response::STATUS_CODE_ACCEPTED)
->dynamic($deployment, Response::MODEL_DEPLOYMENT);
}, timeout: 120.0);
} catch (LockContention) {
$response->addHeader('Retry-After', '5');
throw new Exception(Exception::GENERAL_RATE_LIMIT_EXCEEDED, 'Deployment upload is busy. Try again.');
}
$metadata = null;
$queueForEvents
->setParam('functionId', $function->getId())
->setParam('deploymentId', $deployment->getId());
$response
->setStatusCode(Response::STATUS_CODE_ACCEPTED)
->dynamic($deployment, Response::MODEL_DEPLOYMENT);
}
}
@@ -21,6 +21,7 @@ use Utopia\Database\Query;
use Utopia\Database\Validator\Authorization;
use Utopia\Database\Validator\UID;
use Utopia\Http\Adapter\Swoole\Request;
use Utopia\Lock\Exception\Contention as LockContention;
use Utopia\Platform\Action;
use Utopia\Platform\Scope\HTTP;
use Utopia\Storage\Device;
@@ -90,6 +91,7 @@ class Create extends Action
->inject('plan')
->inject('authorization')
->inject('platform')
->inject('locks')
->callback($this->action(...));
}
@@ -112,6 +114,7 @@ class Create extends Action
array $plan,
Authorization $authorization,
array $platform,
callable $locks,
) {
$activate = \strval($activate) === 'true' || \strval($activate) === '1';
@@ -193,20 +196,38 @@ class Create extends Action
// Save to storage
$fileSize ??= $deviceForLocal->getFileSize($fileTmpName);
$path = $deviceForSites->getPath($deploymentId . '.' . \pathinfo($fileName, PATHINFO_EXTENSION));
$deployment = $dbForProject->getDocument('deployments', $deploymentId);
$lockKey = 'sites:deployment:' . $project->getId() . ':' . $siteId . ':' . $deploymentId;
$metadata = ['content_type' => $deviceForLocal->getFileMimeType($fileTmpName)];
if (!$deployment->isEmpty()) {
$chunks = $deployment->getAttribute('sourceChunksTotal', 1);
$uploaded = $deployment->getAttribute('sourceChunksUploaded', 0);
$metadata = $deployment->getAttribute('sourceMetadata', []);
$completed = false;
if ($uploaded === $chunks) {
$response
->setStatusCode(Response::STATUS_CODE_ACCEPTED)
->dynamic($deployment, Response::MODEL_DEPLOYMENT);
return;
}
try {
$locks($lockKey, 600, function () use (&$chunks, $dbForProject, $deploymentId, &$metadata, &$completed, $response): void {
$deployment = $dbForProject->getDocument('deployments', $deploymentId);
if (!$deployment->isEmpty()) {
$chunks = $deployment->getAttribute('sourceChunksTotal', 1);
$uploaded = $deployment->getAttribute('sourceChunksUploaded', 0);
$metadata = $deployment->getAttribute('sourceMetadata', []);
if ($uploaded === $chunks) {
$response
->setStatusCode(Response::STATUS_CODE_ACCEPTED)
->dynamic($deployment, Response::MODEL_DEPLOYMENT);
$completed = true;
return;
}
}
}, timeout: 120.0);
} catch (LockContention) {
$response->addHeader('Retry-After', '5');
throw new Exception(Exception::GENERAL_RATE_LIMIT_EXCEEDED, 'Deployment upload is busy. Try again.');
}
if ($completed) {
return;
}
$chunksUploaded = $deviceForSites->upload($fileTmpName, $path, $chunk, $chunks, $metadata);
@@ -225,184 +246,208 @@ class Create extends Action
$commands[] = $buildCommand;
}
if ($chunksUploaded === $chunks) {
if ($activate) {
// Remove deploy for all other deployments.
$activeDeployments = $dbForProject->find('deployments', [
Query::equal('activate', [true]),
Query::equal('resourceId', [$siteId]),
Query::equal('resourceType', ['sites'])
]);
try {
$locks($lockKey, 600, function () use ($activate, $authorization, $commands, &$chunks, $chunksUploaded, $dbForPlatform, $dbForProject, $deploymentId, $deviceForSites, $fileSize, &$metadata, $outputDirectory, $path, $platform, $project, $publisherForBuilds, $queueForEvents, $response, &$site, $siteId, $type): void {
$deployment = $dbForProject->getDocument('deployments', $deploymentId);
$uploaded = 0;
foreach ($activeDeployments as $activeDeployment) {
$activeDeployment->setAttribute('activate', false);
$dbForProject->updateDocument('deployments', $activeDeployment->getId(), new Document(['activate' => false]));
if (!$deployment->isEmpty()) {
$chunks = $deployment->getAttribute('sourceChunksTotal', 1);
$uploaded = $deployment->getAttribute('sourceChunksUploaded', 0);
$metadata = \array_merge($deployment->getAttribute('sourceMetadata', []), $metadata);
if ($uploaded === $chunks) {
$response
->setStatusCode(Response::STATUS_CODE_ACCEPTED)
->dynamic($deployment, Response::MODEL_DEPLOYMENT);
return;
}
}
}
$fileSize = $deviceForSites->getFileSize($path);
$chunksUploaded = max($uploaded, $chunksUploaded);
if ($deployment->isEmpty()) {
$deployment = $dbForProject->createDocument('deployments', new Document([
'$id' => $deploymentId,
'$permissions' => [
Permission::read(Role::any()),
Permission::update(Role::any()),
Permission::delete(Role::any()),
],
'resourceInternalId' => $site->getSequence(),
'resourceId' => $site->getId(),
'resourceType' => 'sites',
'buildCommands' => \implode(' && ', $commands),
'startCommand' => $site->getAttribute('startCommand', ''),
'buildOutput' => $outputDirectory,
'adapter' => $site->getAttribute('adapter', ''),
'fallbackFile' => $site->getAttribute('fallbackFile', ''),
'sourcePath' => $path,
'sourceSize' => $fileSize,
'totalSize' => $fileSize,
'sourceChunksTotal' => $chunks,
'sourceChunksUploaded' => $chunksUploaded,
'activate' => $activate,
'sourceMetadata' => $metadata,
'type' => $type,
]));
if ($chunksUploaded === $chunks && $uploaded < $chunks) {
if ($activate) {
// Remove deploy for all other deployments.
$activeDeployments = $dbForProject->find('deployments', [
Query::equal('activate', [true]),
Query::equal('resourceId', [$siteId]),
Query::equal('resourceType', ['sites'])
]);
$site = $site
->setAttribute('latestDeploymentId', $deployment->getId())
->setAttribute('latestDeploymentInternalId', $deployment->getSequence())
->setAttribute('latestDeploymentCreatedAt', $deployment->getCreatedAt())
->setAttribute('latestDeploymentStatus', $deployment->getAttribute('status', ''));
$dbForProject->updateDocument('sites', $site->getId(), new Document([
'latestDeploymentId' => $deployment->getId(),
'latestDeploymentInternalId' => $deployment->getSequence(),
'latestDeploymentCreatedAt' => $deployment->getCreatedAt(),
'latestDeploymentStatus' => $deployment->getAttribute('status', ''),
]));
foreach ($activeDeployments as $activeDeployment) {
$dbForProject->updateDocument('deployments', $activeDeployment->getId(), new Document(['activate' => false]));
}
}
$sitesDomain = $platform['sitesDomain'];
$domain = ID::unique() . "." . $sitesDomain;
$fileSize = $deviceForSites->getFileSize($path);
// TODO: (@Meldiron) Remove after 1.7.x migration
$isMd5 = System::getEnv('_APP_RULES_FORMAT') === 'md5';
$ruleId = $isMd5 ? md5($domain) : ID::unique();
if ($deployment->isEmpty()) {
$deployment = $dbForProject->createDocument('deployments', new Document([
'$id' => $deploymentId,
'$permissions' => [
Permission::read(Role::any()),
Permission::update(Role::any()),
Permission::delete(Role::any()),
],
'resourceInternalId' => $site->getSequence(),
'resourceId' => $site->getId(),
'resourceType' => 'sites',
'buildCommands' => \implode(' && ', $commands),
'startCommand' => $site->getAttribute('startCommand', ''),
'buildOutput' => $outputDirectory,
'adapter' => $site->getAttribute('adapter', ''),
'fallbackFile' => $site->getAttribute('fallbackFile', ''),
'sourcePath' => $path,
'sourceSize' => $fileSize,
'totalSize' => $fileSize,
'sourceChunksTotal' => $chunks,
'sourceChunksUploaded' => $chunksUploaded,
'activate' => $activate,
'sourceMetadata' => $metadata,
'type' => $type,
]));
$authorization->skip(
fn () => $dbForPlatform->createDocument('rules', new Document([
'$id' => $ruleId,
'projectId' => $project->getId(),
'projectInternalId' => $project->getSequence(),
'domain' => $domain,
'type' => 'deployment',
'trigger' => 'deployment',
'deploymentId' => $deployment->isEmpty() ? '' : $deployment->getId(),
'deploymentInternalId' => $deployment->isEmpty() ? '' : $deployment->getSequence(),
'deploymentResourceType' => 'site',
'deploymentResourceId' => $site->getId(),
'deploymentResourceInternalId' => $site->getSequence(),
'status' => 'verified',
'certificateId' => '',
'search' => implode(' ', [$ruleId, $domain]),
'owner' => 'Appwrite',
'region' => $project->getAttribute('region')
]))
);
} else {
$deployment = $dbForProject->updateDocument('deployments', $deploymentId, new Document([
'sourceSize' => $fileSize,
'sourceChunksUploaded' => $chunksUploaded,
'sourceMetadata' => $metadata,
]));
}
$site = $site
->setAttribute('latestDeploymentId', $deployment->getId())
->setAttribute('latestDeploymentInternalId', $deployment->getSequence())
->setAttribute('latestDeploymentCreatedAt', $deployment->getCreatedAt())
->setAttribute('latestDeploymentStatus', $deployment->getAttribute('status', ''));
$dbForProject->updateDocument('sites', $site->getId(), new Document([
'latestDeploymentId' => $deployment->getId(),
'latestDeploymentInternalId' => $deployment->getSequence(),
'latestDeploymentCreatedAt' => $deployment->getCreatedAt(),
'latestDeploymentStatus' => $deployment->getAttribute('status', ''),
]));
// Start the build
$publisherForBuilds->enqueue(new BuildMessage(
project: $project,
resource: $site,
deployment: $deployment,
type: BUILD_TYPE_DEPLOYMENT,
platform: $platform,
));
} else {
if ($deployment->isEmpty()) {
$deployment = $dbForProject->createDocument('deployments', new Document([
'$id' => $deploymentId,
'$permissions' => [
Permission::read(Role::any()),
Permission::update(Role::any()),
Permission::delete(Role::any()),
],
'resourceInternalId' => $site->getSequence(),
'resourceId' => $site->getId(),
'resourceType' => 'sites',
'buildCommands' => \implode(' && ', $commands),
'startCommand' => $site->getAttribute('startCommand', ''),
'buildOutput' => $outputDirectory,
'adapter' => $site->getAttribute('adapter', ''),
'fallbackFile' => $site->getAttribute('fallbackFile', ''),
'sourcePath' => $path,
'sourceSize' => $fileSize,
'totalSize' => $fileSize,
'sourceChunksTotal' => $chunks,
'sourceChunksUploaded' => $chunksUploaded,
'activate' => $activate,
'sourceMetadata' => $metadata,
'type' => $type,
]));
$sitesDomain = $platform['sitesDomain'];
$domain = ID::unique() . "." . $sitesDomain;
$site = $site
->setAttribute('latestDeploymentId', $deployment->getId())
->setAttribute('latestDeploymentInternalId', $deployment->getSequence())
->setAttribute('latestDeploymentCreatedAt', $deployment->getCreatedAt())
->setAttribute('latestDeploymentStatus', $deployment->getAttribute('status', ''));
$dbForProject->updateDocument('sites', $site->getId(), new Document([
'latestDeploymentId' => $site->getAttribute('latestDeploymentId'),
'latestDeploymentInternalId' => $site->getAttribute('latestDeploymentInternalId'),
'latestDeploymentCreatedAt' => $site->getAttribute('latestDeploymentCreatedAt'),
'latestDeploymentStatus' => $site->getAttribute('latestDeploymentStatus'),
]));
// TODO: (@Meldiron) Remove after 1.7.x migration
$isMd5 = System::getEnv('_APP_RULES_FORMAT') === 'md5';
$ruleId = $isMd5 ? md5($domain) : ID::unique();
$sitesDomain = $platform['sitesDomain'];
$domain = ID::unique() . "." . $sitesDomain;
$ruleId = md5($domain);
$authorization->skip(
fn () => $dbForPlatform->createDocument('rules', new Document([
'$id' => $ruleId,
'projectId' => $project->getId(),
'projectInternalId' => $project->getSequence(),
'domain' => $domain,
'type' => 'deployment',
'trigger' => 'deployment',
'deploymentId' => $deployment->isEmpty() ? '' : $deployment->getId(),
'deploymentInternalId' => $deployment->isEmpty() ? '' : $deployment->getSequence(),
'deploymentResourceType' => 'site',
'deploymentResourceId' => $site->getId(),
'deploymentResourceInternalId' => $site->getSequence(),
'status' => 'verified',
'certificateId' => '',
'search' => implode(' ', [$ruleId, $domain]),
'owner' => 'Appwrite',
'region' => $project->getAttribute('region')
]))
);
} else {
$deployment = $dbForProject->updateDocument('deployments', $deploymentId, new Document([
'sourceChunksUploaded' => $chunksUploaded,
'sourceMetadata' => $metadata,
]));
}
$authorization->skip(
fn () => $dbForPlatform->createDocument('rules', new Document([
'$id' => $ruleId,
'projectId' => $project->getId(),
'projectInternalId' => $project->getSequence(),
'domain' => $domain,
'type' => 'deployment',
'trigger' => 'deployment',
'deploymentId' => $deployment->isEmpty() ? '' : $deployment->getId(),
'deploymentInternalId' => $deployment->isEmpty() ? '' : $deployment->getSequence(),
'deploymentResourceType' => 'site',
'deploymentResourceId' => $site->getId(),
'deploymentResourceInternalId' => $site->getSequence(),
'status' => 'verified',
'certificateId' => '',
'search' => implode(' ', [$ruleId, $domain]),
'owner' => 'Appwrite',
'region' => $project->getAttribute('region')
]))
);
} else {
$deployment = $dbForProject->updateDocument('deployments', $deploymentId, new Document([
'sourceSize' => $fileSize,
'sourceChunksUploaded' => $chunksUploaded,
'sourceMetadata' => $metadata,
]));
}
// Start the build
$publisherForBuilds->enqueue(new BuildMessage(
project: $project,
resource: $site,
deployment: $deployment,
type: BUILD_TYPE_DEPLOYMENT,
platform: $platform,
));
} else {
if ($deployment->isEmpty()) {
$deployment = $dbForProject->createDocument('deployments', new Document([
'$id' => $deploymentId,
'$permissions' => [
Permission::read(Role::any()),
Permission::update(Role::any()),
Permission::delete(Role::any()),
],
'resourceInternalId' => $site->getSequence(),
'resourceId' => $site->getId(),
'resourceType' => 'sites',
'buildCommands' => \implode(' && ', $commands),
'startCommand' => $site->getAttribute('startCommand', ''),
'buildOutput' => $outputDirectory,
'adapter' => $site->getAttribute('adapter', ''),
'fallbackFile' => $site->getAttribute('fallbackFile', ''),
'sourcePath' => $path,
'sourceSize' => $fileSize,
'totalSize' => $fileSize,
'sourceChunksTotal' => $chunks,
'sourceChunksUploaded' => $chunksUploaded,
'activate' => $activate,
'sourceMetadata' => $metadata,
'type' => $type,
]));
$site = $site
->setAttribute('latestDeploymentId', $deployment->getId())
->setAttribute('latestDeploymentInternalId', $deployment->getSequence())
->setAttribute('latestDeploymentCreatedAt', $deployment->getCreatedAt())
->setAttribute('latestDeploymentStatus', $deployment->getAttribute('status', ''));
$dbForProject->updateDocument('sites', $site->getId(), new Document([
'latestDeploymentId' => $site->getAttribute('latestDeploymentId'),
'latestDeploymentInternalId' => $site->getAttribute('latestDeploymentInternalId'),
'latestDeploymentCreatedAt' => $site->getAttribute('latestDeploymentCreatedAt'),
'latestDeploymentStatus' => $site->getAttribute('latestDeploymentStatus'),
]));
$sitesDomain = $platform['sitesDomain'];
$domain = ID::unique() . "." . $sitesDomain;
$ruleId = md5($domain);
$authorization->skip(
fn () => $dbForPlatform->createDocument('rules', new Document([
'$id' => $ruleId,
'projectId' => $project->getId(),
'projectInternalId' => $project->getSequence(),
'domain' => $domain,
'type' => 'deployment',
'trigger' => 'deployment',
'deploymentId' => $deployment->isEmpty() ? '' : $deployment->getId(),
'deploymentInternalId' => $deployment->isEmpty() ? '' : $deployment->getSequence(),
'deploymentResourceType' => 'site',
'deploymentResourceId' => $site->getId(),
'deploymentResourceInternalId' => $site->getSequence(),
'status' => 'verified',
'certificateId' => '',
'search' => implode(' ', [$ruleId, $domain]),
'owner' => 'Appwrite',
'region' => $project->getAttribute('region')
]))
);
} else {
$deployment = $dbForProject->updateDocument('deployments', $deploymentId, new Document([
'sourceChunksUploaded' => $chunksUploaded,
'sourceMetadata' => $metadata,
]));
}
}
$metadata = null;
if ($chunksUploaded === $chunks) {
$queueForEvents
->setParam('siteId', $site->getId())
->setParam('deploymentId', $deployment->getId());
}
$response
->setStatusCode(Response::STATUS_CODE_ACCEPTED)
->dynamic($deployment, Response::MODEL_DEPLOYMENT);
}, timeout: 120.0);
} catch (LockContention) {
$response->addHeader('Retry-After', '5');
throw new Exception(Exception::GENERAL_RATE_LIMIT_EXCEEDED, 'Deployment upload is busy. Try again.');
}
$metadata = null;
$queueForEvents
->setParam('siteId', $site->getId())
->setParam('deploymentId', $deployment->getId());
$response
->setStatusCode(Response::STATUS_CODE_ACCEPTED)
->dynamic($deployment, Response::MODEL_DEPLOYMENT);
}
}
@@ -29,6 +29,7 @@ use Utopia\Database\Validator\Authorization\Input;
use Utopia\Database\Validator\Permissions;
use Utopia\Database\Validator\UID;
use Utopia\Http\Adapter\Swoole\Request;
use Utopia\Lock\Exception\Contention as LockContention;
use Utopia\Platform\Action;
use Utopia\Platform\Scope\HTTP;
use Utopia\Storage\Device;
@@ -86,12 +87,13 @@ class Create extends Action
->inject('request')
->inject('response')
->inject('dbForProject')
->inject('project')
->inject('user')
->inject('queueForEvents')
->inject('mode')
->inject('deviceForFiles')
->inject('deviceForLocal')
->inject('authorization')
->inject('locks')
->callback($this->action(...));
}
@@ -103,12 +105,13 @@ class Create extends Action
Request $request,
Response $response,
Database $dbForProject,
Document $project,
User $user,
Event $queueForEvents,
string $mode,
Device $deviceForFiles,
Device $deviceForLocal,
Authorization $authorization
Authorization $authorization,
callable $locks
) {
$bucket = $authorization->skip(fn () => $dbForProject->getDocument('buckets', $bucketId));
@@ -234,189 +237,242 @@ class Create extends Action
$path = $deviceForFiles->getPath($fileId . '.' . \pathinfo($fileName, PATHINFO_EXTENSION));
$path = str_ireplace($deviceForFiles->getRoot(), $deviceForFiles->getRoot() . DIRECTORY_SEPARATOR . $bucket->getId(), $path); // Add bucket id to path after root
$file = $dbForProject->getDocument('bucket_' . $bucket->getSequence(), $fileId);
$lockKey = 'storage:file:' . $project->getId() . ':' . $bucket->getId() . ':' . $fileId;
$metadata = ['content_type' => $deviceForLocal->getFileMimeType($fileTmpName)];
if (!$file->isEmpty()) {
$chunks = $file->getAttribute('chunksTotal', 1);
$uploaded = $file->getAttribute('chunksUploaded', 0);
$metadata = $file->getAttribute('metadata', []);
$completed = false;
if ($uploaded === $chunks) {
if (empty($contentRange)) {
throw new Exception(Exception::STORAGE_FILE_ALREADY_EXISTS);
$mergeUploadMetadata = function (array $stored, array $current): array {
$merged = \array_merge($stored, $current);
if (isset($stored['parts']) || isset($current['parts'])) {
$parts = $stored['parts'] ?? [];
foreach (($current['parts'] ?? []) as $part => $value) {
$parts[(int) $part] = $value;
}
\ksort($parts);
$merged['parts'] = $parts;
$merged['chunks'] = \count($parts);
}
return $merged;
};
try {
$locks($lockKey, 600, function () use ($bucket, &$chunks, $contentRange, $dbForProject, $deviceForFiles, $fileId, $fileName, $fileSize, &$metadata, $path, $permissions, $response, &$completed): void {
$file = $dbForProject->getDocument('bucket_' . $bucket->getSequence(), $fileId);
if (!$file->isEmpty()) {
$chunks = $file->getAttribute('chunksTotal', 1);
$uploaded = $file->getAttribute('chunksUploaded', 0);
$metadata = $file->getAttribute('metadata', []);
if ($uploaded === $chunks) {
if (empty($contentRange)) {
throw new Exception(Exception::STORAGE_FILE_ALREADY_EXISTS);
}
$response
->setStatusCode(Response::STATUS_CODE_OK)
->dynamic($file, Response::MODEL_FILE);
$completed = true;
return;
}
}
$response
->setStatusCode(Response::STATUS_CODE_OK)
->dynamic($file, Response::MODEL_FILE);
return;
}
if ($file->isEmpty()) {
$deviceForFiles->prepareUpload($path, $metadata['content_type'] ?? '', $chunks, $metadata);
if (!empty($contentRange)) {
$doc = new Document([
'$id' => ID::custom($fileId),
'$permissions' => $permissions,
'bucketId' => $bucket->getId(),
'bucketInternalId' => $bucket->getSequence(),
'name' => $fileName,
'path' => $path,
'signature' => '',
'mimeType' => '',
'sizeOriginal' => $fileSize,
'sizeActual' => 0,
'algorithm' => '',
'comment' => '',
'chunksTotal' => $chunks,
'chunksUploaded' => 0,
'search' => implode(' ', [$fileId, $fileName]),
'metadata' => $metadata,
]);
try {
$dbForProject->createDocument('bucket_' . $bucket->getSequence(), $doc);
} catch (DuplicateException) {
throw new Exception(Exception::STORAGE_FILE_ALREADY_EXISTS);
} catch (NotFoundException) {
throw new Exception(Exception::STORAGE_BUCKET_NOT_FOUND);
}
}
}
}, timeout: 120.0);
} catch (LockContention) {
$response->addHeader('Retry-After', '5');
throw new Exception(Exception::GENERAL_RATE_LIMIT_EXCEEDED, 'File upload is busy. Try again.');
}
$chunksUploaded = $deviceForFiles->upload($fileTmpName, $path, $chunk, $chunks, $metadata);
if (empty($chunksUploaded)) {
throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Failed uploading file');
if ($completed) {
return;
}
if ($chunksUploaded === $chunks) {
if (System::getEnv('_APP_STORAGE_ANTIVIRUS') === 'enabled' && $bucket->getAttribute('antivirus', true) && $fileSize <= APP_LIMIT_ANTIVIRUS && $deviceForFiles->getType() === Storage::DEVICE_LOCAL) {
$antivirus = new Network(
System::getEnv('_APP_STORAGE_ANTIVIRUS_HOST', 'clamav'),
(int) System::getEnv('_APP_STORAGE_ANTIVIRUS_PORT', 3310)
);
$finalizeUpload = function (int $chunksUploaded) use ($authorization, $bucket, &$chunks, $contentRange, $dbForProject, $deviceForFiles, $fileId, $fileName, $fileSize, &$metadata, $mergeUploadMetadata, $path, $permissions, $queueForEvents, $response): void {
$file = $dbForProject->getDocument('bucket_' . $bucket->getSequence(), $fileId);
$uploaded = 0;
if (!$antivirus->fileScan($path)) {
$deviceForFiles->delete($path);
throw new Exception(Exception::STORAGE_INVALID_FILE);
if (!$file->isEmpty()) {
$chunks = $file->getAttribute('chunksTotal', 1);
$uploaded = $file->getAttribute('chunksUploaded', 0);
$metadata = $mergeUploadMetadata($file->getAttribute('metadata', []), $metadata);
if ($uploaded === $chunks) {
if (empty($contentRange)) {
throw new Exception(Exception::STORAGE_FILE_ALREADY_EXISTS);
}
$response
->setStatusCode(Response::STATUS_CODE_OK)
->dynamic($file, Response::MODEL_FILE);
return;
}
}
$mimeType = $deviceForFiles->getFileMimeType($path); // Get mime-type before compression and encryption
$fileHash = $deviceForFiles->getFileHash($path); // Get file hash before compression and encryption
$data = '';
$iv = '';
$tag = null;
// Compression
$algorithm = $bucket->getAttribute('compression', Compression::NONE);
if ($fileSize <= APP_STORAGE_READ_BUFFER && $algorithm != Compression::NONE) {
$data = $deviceForFiles->read($path);
switch ($algorithm) {
case Compression::ZSTD:
$compressor = new Zstd();
break;
case Compression::GZIP:
default:
$compressor = new GZIP();
break;
}
$data = $compressor->compress($data);
} else {
// reset the algorithm to none as we do not compress the file
// if file size exceedes the APP_STORAGE_READ_BUFFER
// regardless the bucket compression algoorithm
$algorithm = Compression::NONE;
}
$chunksUploaded = max($uploaded, $chunksUploaded, (int) ($metadata['chunks'] ?? 0));
if ($bucket->getAttribute('encryption', true) && $fileSize <= APP_STORAGE_READ_BUFFER) {
if (empty($data)) {
if ($chunksUploaded === $chunks && $uploaded < $chunks) {
$deviceForFiles->finalizeUpload($path, $chunks, $metadata);
if (System::getEnv('_APP_STORAGE_ANTIVIRUS') === 'enabled' && $bucket->getAttribute('antivirus', true) && $fileSize <= APP_LIMIT_ANTIVIRUS && $deviceForFiles->getType() === Storage::DEVICE_LOCAL) {
$antivirus = new Network(
System::getEnv('_APP_STORAGE_ANTIVIRUS_HOST', 'clamav'),
(int) System::getEnv('_APP_STORAGE_ANTIVIRUS_PORT', 3310)
);
if (!$antivirus->fileScan($path)) {
$deviceForFiles->delete($path);
throw new Exception(Exception::STORAGE_INVALID_FILE);
}
}
$mimeType = $deviceForFiles->getFileMimeType($path); // Get mime-type before compression and encryption
$fileHash = $deviceForFiles->getFileHash($path); // Get file hash before compression and encryption
$data = '';
$iv = '';
$tag = null;
// Compression
$algorithm = $bucket->getAttribute('compression', Compression::NONE);
if ($fileSize <= APP_STORAGE_READ_BUFFER && $algorithm != Compression::NONE) {
$data = $deviceForFiles->read($path);
switch ($algorithm) {
case Compression::ZSTD:
$compressor = new Zstd();
break;
case Compression::GZIP:
default:
$compressor = new GZIP();
break;
}
$data = $compressor->compress($data);
} else {
// reset the algorithm to none as we do not compress the file
// if file size exceedes the APP_STORAGE_READ_BUFFER
// regardless the bucket compression algoorithm
$algorithm = Compression::NONE;
}
$key = System::getEnv('_APP_OPENSSL_KEY_V1');
$iv = OpenSSL::randomPseudoBytes(OpenSSL::cipherIVLength(OpenSSL::CIPHER_AES_128_GCM));
$data = OpenSSL::encrypt($data, OpenSSL::CIPHER_AES_128_GCM, $key, 0, $iv, $tag);
}
if (!empty($data)) {
if (!$deviceForFiles->write($path, $data, $mimeType)) {
throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Failed to save file');
if ($bucket->getAttribute('encryption', true) && $fileSize <= APP_STORAGE_READ_BUFFER) {
if (empty($data)) {
$data = $deviceForFiles->read($path);
}
$key = System::getEnv('_APP_OPENSSL_KEY_V1');
$iv = OpenSSL::randomPseudoBytes(OpenSSL::cipherIVLength(OpenSSL::CIPHER_AES_128_GCM));
$data = OpenSSL::encrypt($data, OpenSSL::CIPHER_AES_128_GCM, $key, 0, $iv, $tag);
}
}
$sizeActual = $deviceForFiles->getFileSize($path);
$openSSLVersion = null;
$openSSLCipher = null;
$openSSLTag = null;
$openSSLIV = null;
if ($bucket->getAttribute('encryption', true) && $fileSize <= APP_STORAGE_READ_BUFFER) {
$openSSLVersion = '1';
$openSSLCipher = OpenSSL::CIPHER_AES_128_GCM;
$openSSLTag = \bin2hex($tag);
$openSSLIV = \bin2hex($iv);
}
if ($file->isEmpty()) {
$doc = new Document([
'$id' => $fileId,
'$permissions' => $permissions,
'bucketId' => $bucket->getId(),
'bucketInternalId' => $bucket->getSequence(),
'name' => $fileName,
'path' => $path,
'signature' => $fileHash,
'mimeType' => $mimeType,
'sizeOriginal' => $fileSize,
'sizeActual' => $sizeActual,
'algorithm' => $algorithm,
'comment' => '',
'chunksTotal' => $chunks,
'chunksUploaded' => $chunksUploaded,
'openSSLVersion' => $openSSLVersion,
'openSSLCipher' => $openSSLCipher,
'openSSLTag' => $openSSLTag,
'openSSLIV' => $openSSLIV,
'search' => implode(' ', [$fileId, $fileName]),
'metadata' => $metadata,
]);
try {
$file = $dbForProject->createDocument('bucket_' . $bucket->getSequence(), $doc);
} catch (DuplicateException) {
throw new Exception(Exception::STORAGE_FILE_ALREADY_EXISTS);
} catch (NotFoundException) {
throw new Exception(Exception::STORAGE_BUCKET_NOT_FOUND);
if (!empty($data)) {
if (!$deviceForFiles->write($path, $data, $mimeType)) {
throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Failed to save file');
}
}
$sizeActual = $deviceForFiles->getFileSize($path);
$openSSLVersion = null;
$openSSLCipher = null;
$openSSLTag = null;
$openSSLIV = null;
if ($bucket->getAttribute('encryption', true) && $fileSize <= APP_STORAGE_READ_BUFFER) {
$openSSLVersion = '1';
$openSSLCipher = OpenSSL::CIPHER_AES_128_GCM;
$openSSLTag = \bin2hex($tag);
$openSSLIV = \bin2hex($iv);
}
if ($file->isEmpty()) {
$doc = new Document([
'$id' => $fileId,
'$permissions' => $permissions,
'bucketId' => $bucket->getId(),
'bucketInternalId' => $bucket->getSequence(),
'name' => $fileName,
'path' => $path,
'signature' => $fileHash,
'mimeType' => $mimeType,
'sizeOriginal' => $fileSize,
'sizeActual' => $sizeActual,
'algorithm' => $algorithm,
'comment' => '',
'chunksTotal' => $chunks,
'chunksUploaded' => $chunksUploaded,
'openSSLVersion' => $openSSLVersion,
'openSSLCipher' => $openSSLCipher,
'openSSLTag' => $openSSLTag,
'openSSLIV' => $openSSLIV,
'search' => implode(' ', [$fileId, $fileName]),
'metadata' => $metadata,
]);
try {
$file = $dbForProject->createDocument('bucket_' . $bucket->getSequence(), $doc);
} catch (DuplicateException) {
throw new Exception(Exception::STORAGE_FILE_ALREADY_EXISTS);
} catch (NotFoundException) {
throw new Exception(Exception::STORAGE_BUCKET_NOT_FOUND);
}
} else {
/**
* Skip authorization in updateDocument.
* Without this, the file creation will fail when user doesn't have update permission.
* However as with chunk upload even if we are updating, we are essentially creating a file
* adding it's new chunk so we rely on the create-permission check performed earlier.
*/
$file = $authorization->skip(fn () => $dbForProject->updateDocument('bucket_' . $bucket->getSequence(), $fileId, new Document([
'$permissions' => $permissions,
'signature' => $fileHash,
'mimeType' => $mimeType,
'sizeActual' => $sizeActual,
'algorithm' => $algorithm,
'openSSLVersion' => $openSSLVersion,
'openSSLCipher' => $openSSLCipher,
'openSSLTag' => $openSSLTag,
'openSSLIV' => $openSSLIV,
'metadata' => $metadata,
'chunksUploaded' => $chunksUploaded,
])));
}
// Trigger after create success hook
$this->afterCreateSuccess($file);
} else {
$file = $file
->setAttribute('$permissions', $permissions)
->setAttribute('signature', $fileHash)
->setAttribute('mimeType', $mimeType)
->setAttribute('sizeActual', $sizeActual)
->setAttribute('algorithm', $algorithm)
->setAttribute('openSSLVersion', $openSSLVersion)
->setAttribute('openSSLCipher', $openSSLCipher)
->setAttribute('openSSLTag', $openSSLTag)
->setAttribute('openSSLIV', $openSSLIV)
->setAttribute('metadata', $metadata)
->setAttribute('chunksUploaded', $chunksUploaded);
/**
* Skip authorization in updateDocument.
* Without this, the file creation will fail when user doesn't have update permission.
* However as with chunk upload even if we are updating, we are essentially creating a file
* adding it's new chunk so we rely on the create-permission check performed earlier.
*/
$file = $authorization->skip(fn () => $dbForProject->updateDocument('bucket_' . $bucket->getSequence(), $fileId, $file));
}
// Trigger after create success hook
$this->afterCreateSuccess($file);
} else {
if ($file->isEmpty()) {
$doc = new Document([
'$id' => ID::custom($fileId),
'$permissions' => $permissions,
'bucketId' => $bucket->getId(),
'bucketInternalId' => $bucket->getSequence(),
'name' => $fileName,
'path' => $path,
'signature' => '',
'mimeType' => '',
'sizeOriginal' => $fileSize,
'sizeActual' => 0,
'algorithm' => '',
'comment' => '',
'chunksTotal' => $chunks,
'chunksUploaded' => $chunksUploaded,
'search' => implode(' ', [$fileId, $fileName]),
'metadata' => $metadata,
]);
try {
$file = $dbForProject->createDocument('bucket_' . $bucket->getSequence(), $doc);
} catch (DuplicateException) {
throw new Exception(Exception::STORAGE_FILE_ALREADY_EXISTS);
} catch (NotFoundException) {
throw new Exception(Exception::STORAGE_BUCKET_NOT_FOUND);
}
} else {
$file = $file
->setAttribute('chunksUploaded', $chunksUploaded)
->setAttribute('metadata', $metadata);
/**
* Skip authorization in updateDocument.
* Without this, the file creation will fail when user doesn't have update permission.
@@ -424,23 +480,41 @@ class Create extends Action
* adding it's new chunk so we rely on the create-permission check performed earlier.
*/
try {
$file = $authorization->skip(fn () => $dbForProject->updateDocument('bucket_' . $bucket->getSequence(), $fileId, $file));
$file = $authorization->skip(fn () => $dbForProject->updateDocument('bucket_' . $bucket->getSequence(), $fileId, new Document([
'chunksUploaded' => $chunksUploaded,
'metadata' => $metadata,
])));
} catch (NotFoundException) {
throw new Exception(Exception::STORAGE_BUCKET_NOT_FOUND);
}
}
if ($chunksUploaded === $chunks) {
$queueForEvents
->setParam('bucketId', $bucket->getId())
->setParam('fileId', $file->getId())
->setContext('bucket', $bucket);
}
$metadata = null; // was causing leaks as it was passed by reference
$response
->setStatusCode(Response::STATUS_CODE_CREATED)
->dynamic($file, Response::MODEL_FILE);
};
try {
$chunksUploaded = $deviceForFiles->uploadChunk($fileTmpName, $path, $chunk, $chunks, $metadata);
if (empty($chunksUploaded)) {
throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Failed uploading file');
}
$locks($lockKey, 600, fn () => $finalizeUpload($chunksUploaded), timeout: 120.0);
} catch (LockContention) {
$response->addHeader('Retry-After', '5');
throw new Exception(Exception::GENERAL_RATE_LIMIT_EXCEEDED, 'File upload is busy. Try again.');
}
$queueForEvents
->setParam('bucketId', $bucket->getId())
->setParam('fileId', $file->getId())
->setContext('bucket', $bucket);
$metadata = null; // was causing leaks as it was passed by reference
$response
->setStatusCode(Response::STATUS_CODE_CREATED)
->dynamic($file, Response::MODEL_FILE);
}
/**
@@ -131,7 +131,6 @@ class Get extends Action
throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Imagick extension is missing');
}
/* @type Document $bucket */
$bucket = $authorization->skip(fn () => $dbForProject->getDocument('buckets', $bucketId));
$isAPIKey = $user->isApp($authorization->getRoles());
@@ -155,7 +154,6 @@ class Get extends Action
if ($fileSecurity && !$valid && !$isToken) {
$file = $dbForProject->getDocument('bucket_' . $bucket->getSequence(), $fileId);
} else {
/* @type Document $file */
$file = $authorization->skip(fn () => $dbForProject->getDocument('bucket_' . $bucket->getSequence(), $fileId));
}
@@ -1199,6 +1199,144 @@ class FunctionsCustomServerTest extends Scope
}, 120000, 500);
}
public function testCreateDeploymentParallelChunksLargeFile(): void
{
$functionId = $this->setupFunction([
'functionId' => ID::unique(),
'name' => 'Test Parallel Chunk Deployment',
'execute' => [Role::user($this->getUser()['$id'])->toString()],
'runtime' => 'node-22',
'entrypoint' => 'index.js',
'timeout' => 10,
]);
$deploymentId = ID::unique();
$tmpDirectory = sys_get_temp_dir() . DIRECTORY_SEPARATOR . 'appwrite-parallel-function-deployment-' . $deploymentId;
mkdir($tmpDirectory);
try {
copy(__DIR__ . '/../../../resources/functions/basic/index.js', $tmpDirectory . DIRECTORY_SEPARATOR . 'index.js');
file_put_contents($tmpDirectory . DIRECTORY_SEPARATOR . 'large.bin', random_bytes(20 * 1024 * 1024));
$source = $tmpDirectory . DIRECTORY_SEPARATOR . 'code.tar.gz';
Console::execute('cd ' . $tmpDirectory . ' && tar --exclude code.tar.gz -czf code.tar.gz .', '', $this->stdout, $this->stderr);
$totalSize = filesize($source);
$chunkSize = 5 * 1024 * 1024;
$chunksTotal = (int) ceil($totalSize / $chunkSize);
$this->assertGreaterThanOrEqual(4, $chunksTotal, 'Test deployment must span at least 4 chunks');
$requests = [];
$sourceHandle = fopen($source, 'rb');
$this->assertNotFalse($sourceHandle, 'Could not open deployment package');
try {
for ($i = 0; $i < $chunksTotal; $i++) {
$start = $i * $chunkSize;
$end = min($start + $chunkSize, $totalSize) - 1;
$length = $end - $start + 1;
$chunkPath = $tmpDirectory . DIRECTORY_SEPARATOR . 'chunk-' . $i . '.part';
fseek($sourceHandle, $start);
file_put_contents($chunkPath, fread($sourceHandle, $length));
$requests[] = [
'headers' => [
'x-appwrite-project' => $this->getProject()['$id'],
'x-appwrite-key' => $this->getProject()['apiKey'],
'x-appwrite-id' => $deploymentId,
'content-range' => 'bytes ' . $start . '-' . $end . '/' . $totalSize,
],
'chunkPath' => $chunkPath,
];
}
} finally {
fclose($sourceHandle);
}
$responses = [];
$endpoint = parse_url($this->client->getEndpoint());
$scheme = $endpoint['scheme'] ?? 'http';
$host = $endpoint['host'] ?? 'appwrite';
$port = $endpoint['port'] ?? ($scheme === 'https' ? 443 : 80);
$basePath = rtrim($endpoint['path'] ?? '', '/');
\Swoole\Coroutine\run(function () use ($basePath, $functionId, $host, $port, $requests, $scheme, &$responses): void {
$wg = new \Swoole\Coroutine\WaitGroup();
foreach ($requests as $index => $request) {
$wg->add();
\Swoole\Coroutine::create(function () use ($basePath, $functionId, $host, $index, $port, $request, &$responses, $scheme, $wg): void {
try {
for ($attempt = 0; $attempt < 3; $attempt++) {
$client = new \Swoole\Coroutine\Http\Client($host, (int) $port, $scheme === 'https');
$client->set([
'timeout' => 300,
'ssl_verify_peer' => false,
'ssl_verify_host' => false,
]);
$client->setHeaders($request['headers']);
$client->setMethod(Client::METHOD_POST);
$client->setData([
'entrypoint' => 'index.js',
'activate' => true,
]);
$client->addFile($request['chunkPath'], 'code', 'application/x-gzip', 'code.tar.gz');
$client->execute($basePath . '/functions/' . $functionId . '/deployments');
$responses[$index] = [
'body' => $client->body,
'error' => $client->errMsg,
'headers' => $client->headers ?? [],
'statusCode' => $client->statusCode,
];
$client->close();
if ($responses[$index]['statusCode'] !== 429) {
break;
}
$retryAfter = (float) ($responses[$index]['headers']['retry-after'] ?? 0.1);
\Swoole\Coroutine::sleep(max($retryAfter, 0.1));
}
} finally {
$wg->done();
}
});
}
$wg->wait();
});
ksort($responses);
foreach ($responses as $response) {
$this->assertSame('', $response['error']);
$this->assertContains($response['statusCode'], [202], (string) $response['body']);
}
$this->assertEventually(function () use ($functionId, $deploymentId) {
$deployment = $this->getDeployment($functionId, $deploymentId);
$this->assertEquals(200, $deployment['headers']['status-code']);
$this->assertEquals('ready', $deployment['body']['status']);
$this->assertEquals($deploymentId, $deployment['body']['$id']);
}, 120000, 500);
} finally {
$this->cleanupFunction($functionId);
if (is_dir($tmpDirectory)) {
foreach (glob($tmpDirectory . DIRECTORY_SEPARATOR . '*') ?: [] as $file) {
unlink($file);
}
rmdir($tmpDirectory);
}
}
}
public function testUpdateDeployment(): void
{
$data = $this->setupTestDeployment();
@@ -1351,6 +1351,145 @@ class SitesCustomServerTest extends Scope
$this->cleanupSite($siteId);
}
public function testCreateDeploymentParallelChunksLargeFile(): void
{
$siteId = $this->setupSite([
'buildRuntime' => 'node-22',
'fallbackFile' => '',
'framework' => 'other',
'name' => 'Test Site Parallel Chunk Deployment',
'outputDirectory' => './',
'providerBranch' => 'main',
'providerRootDirectory' => './',
'siteId' => ID::unique()
]);
$deploymentId = ID::unique();
$tmpDirectory = sys_get_temp_dir() . DIRECTORY_SEPARATOR . 'appwrite-parallel-site-deployment-' . $deploymentId;
mkdir($tmpDirectory);
try {
file_put_contents($tmpDirectory . DIRECTORY_SEPARATOR . 'index.html', '<html><body>Hello World</body></html>');
file_put_contents($tmpDirectory . DIRECTORY_SEPARATOR . 'large.bin', random_bytes(20 * 1024 * 1024));
$source = $tmpDirectory . DIRECTORY_SEPARATOR . 'code.tar.gz';
Console::execute('cd ' . $tmpDirectory . ' && tar --exclude code.tar.gz -czf code.tar.gz .', '', $this->stdout, $this->stderr);
$totalSize = filesize($source);
$chunkSize = 5 * 1024 * 1024;
$chunksTotal = (int) ceil($totalSize / $chunkSize);
$this->assertGreaterThanOrEqual(4, $chunksTotal, 'Test deployment must span at least 4 chunks');
$requests = [];
$sourceHandle = fopen($source, 'rb');
$this->assertNotFalse($sourceHandle, 'Could not open deployment package');
try {
for ($i = 0; $i < $chunksTotal; $i++) {
$start = $i * $chunkSize;
$end = min($start + $chunkSize, $totalSize) - 1;
$length = $end - $start + 1;
$chunkPath = $tmpDirectory . DIRECTORY_SEPARATOR . 'chunk-' . $i . '.part';
fseek($sourceHandle, $start);
file_put_contents($chunkPath, fread($sourceHandle, $length));
$requests[] = [
'headers' => [
'x-appwrite-project' => $this->getProject()['$id'],
'x-appwrite-key' => $this->getProject()['apiKey'],
'x-appwrite-id' => $deploymentId,
'content-range' => 'bytes ' . $start . '-' . $end . '/' . $totalSize,
],
'chunkPath' => $chunkPath,
];
}
} finally {
fclose($sourceHandle);
}
$responses = [];
$endpoint = parse_url($this->client->getEndpoint());
$scheme = $endpoint['scheme'] ?? 'http';
$host = $endpoint['host'] ?? 'appwrite';
$port = $endpoint['port'] ?? ($scheme === 'https' ? 443 : 80);
$basePath = rtrim($endpoint['path'] ?? '', '/');
\Swoole\Coroutine\run(function () use ($basePath, $host, $port, $requests, $scheme, $siteId, &$responses): void {
$wg = new \Swoole\Coroutine\WaitGroup();
foreach ($requests as $index => $request) {
$wg->add();
\Swoole\Coroutine::create(function () use ($basePath, $host, $index, $port, $request, &$responses, $scheme, $siteId, $wg): void {
try {
for ($attempt = 0; $attempt < 3; $attempt++) {
$client = new \Swoole\Coroutine\Http\Client($host, (int) $port, $scheme === 'https');
$client->set([
'timeout' => 300,
'ssl_verify_peer' => false,
'ssl_verify_host' => false,
]);
$client->setHeaders($request['headers']);
$client->setMethod(Client::METHOD_POST);
$client->setData([
'activate' => true,
]);
$client->addFile($request['chunkPath'], 'code', 'application/x-gzip', 'code.tar.gz');
$client->execute($basePath . '/sites/' . $siteId . '/deployments');
$responses[$index] = [
'body' => $client->body,
'error' => $client->errMsg,
'headers' => $client->headers ?? [],
'statusCode' => $client->statusCode,
];
$client->close();
if ($responses[$index]['statusCode'] !== 429) {
break;
}
$retryAfter = (float) ($responses[$index]['headers']['retry-after'] ?? 0.1);
\Swoole\Coroutine::sleep(max($retryAfter, 0.1));
}
} finally {
$wg->done();
}
});
}
$wg->wait();
});
ksort($responses);
foreach ($responses as $response) {
$this->assertSame('', $response['error']);
$this->assertContains($response['statusCode'], [202], (string) $response['body']);
}
$this->assertEventually(function () use ($siteId, $deploymentId) {
$deployment = $this->getDeployment($siteId, $deploymentId);
$this->assertEquals(200, $deployment['headers']['status-code']);
$this->assertEquals('ready', $deployment['body']['status']);
$this->assertEquals($deploymentId, $deployment['body']['$id']);
}, 120000, 500);
} finally {
$this->cleanupSite($siteId);
if (is_dir($tmpDirectory)) {
foreach (glob($tmpDirectory . DIRECTORY_SEPARATOR . '*') ?: [] as $file) {
unlink($file);
}
rmdir($tmpDirectory);
}
}
}
public function testCreateDeployment()
{
$siteId = $this->setupSite([
+179 -1
View File
@@ -391,7 +391,7 @@ trait StorageBase
'bucketId' => ID::unique(),
'name' => 'Test Bucket 2',
'fileSecurity' => true,
'maximumFileSize' => 6000000000, //6GB
'maximumFileSize' => 6000000001,
'allowedFileExtensions' => ["jpg", "png"],
'permissions' => [
Permission::read(Role::any()),
@@ -1436,6 +1436,184 @@ trait StorageBase
]);
}
public function testCreateBucketFileParallelChunksLargeFile(): void
{
$totalSize = 20 * 1024 * 1024;
$chunkSize = 5 * 1024 * 1024;
$chunksTotal = (int) ceil($totalSize / $chunkSize);
$this->assertGreaterThanOrEqual(4, $chunksTotal, 'Test file must span at least 4 chunks');
$bucket = $this->client->call(Client::METHOD_POST, '/storage/buckets', [
'content-type' => 'application/json',
'x-appwrite-project' => $this->getProject()['$id'],
'x-appwrite-key' => $this->getProject()['apiKey'],
], [
'bucketId' => ID::unique(),
'name' => 'Test Bucket Parallel Chunk Upload',
'fileSecurity' => true,
'maximumFileSize' => $totalSize,
'permissions' => [
Permission::read(Role::any()),
Permission::create(Role::any()),
Permission::delete(Role::any()),
],
]);
$this->assertEquals(201, $bucket['headers']['status-code']);
$bucketId = $bucket['body']['$id'];
$fileId = ID::unique();
$tmpDirectory = sys_get_temp_dir() . DIRECTORY_SEPARATOR . 'appwrite-parallel-upload-' . $fileId;
$source = $tmpDirectory . DIRECTORY_SEPARATOR . 'large-parallel-upload.bin';
mkdir($tmpDirectory);
try {
$handle = fopen($source, 'wb');
$this->assertNotFalse($handle, 'Could not create test file');
$remaining = $totalSize;
$block = str_repeat(hash('sha256', $fileId, binary: true), 1024);
while ($remaining > 0) {
$bytes = substr($block, 0, min(strlen($block), $remaining));
fwrite($handle, $bytes);
$remaining -= strlen($bytes);
}
fclose($handle);
$requests = [];
$sourceHandle = fopen($source, 'rb');
$this->assertNotFalse($sourceHandle, 'Could not open test file');
for ($i = 0; $i < $chunksTotal; $i++) {
$start = $i * $chunkSize;
$end = min($start + $chunkSize, $totalSize) - 1;
$length = $end - $start + 1;
$chunkPath = $tmpDirectory . DIRECTORY_SEPARATOR . 'chunk-' . $i . '.part';
fseek($sourceHandle, $start);
file_put_contents($chunkPath, fread($sourceHandle, $length));
$requests[] = [
'headers' => [
'x-appwrite-project' => $this->getProject()['$id'],
'x-appwrite-key' => $this->getProject()['apiKey'],
'content-range' => 'bytes ' . $start . '-' . $end . '/' . $totalSize,
],
'chunkPath' => $chunkPath,
];
}
fclose($sourceHandle);
$responses = [];
$endpoint = parse_url($this->client->getEndpoint());
$scheme = $endpoint['scheme'] ?? 'http';
$host = $endpoint['host'] ?? 'appwrite';
$port = $endpoint['port'] ?? ($scheme === 'https' ? 443 : 80);
$basePath = rtrim($endpoint['path'] ?? '', '/');
\Swoole\Coroutine\run(function () use ($basePath, $bucketId, $fileId, $host, $port, $requests, $scheme, &$responses): void {
$wg = new \Swoole\Coroutine\WaitGroup();
foreach ($requests as $index => $request) {
$wg->add();
\Swoole\Coroutine::create(function () use ($basePath, $bucketId, $fileId, $host, $index, $port, $request, &$responses, $scheme, $wg): void {
try {
for ($attempt = 0; $attempt < 3; $attempt++) {
$client = new \Swoole\Coroutine\Http\Client($host, (int) $port, $scheme === 'https');
$client->set([
'timeout' => 300,
'ssl_verify_peer' => false,
'ssl_verify_host' => false,
]);
$client->setHeaders($request['headers']);
$client->setMethod(Client::METHOD_POST);
$client->setData([
'fileId' => $fileId,
'permissions[0]' => Permission::read(Role::any()),
'permissions[1]' => Permission::delete(Role::any()),
]);
$client->addFile($request['chunkPath'], 'file', 'application/octet-stream', 'large-parallel-upload.bin');
$client->execute($basePath . '/storage/buckets/' . $bucketId . '/files');
$responses[$index] = [
'body' => $client->body,
'error' => $client->errMsg,
'headers' => $client->headers ?? [],
'statusCode' => $client->statusCode,
];
$client->close();
if ($responses[$index]['statusCode'] !== 429) {
break;
}
$retryAfter = (float) ($responses[$index]['headers']['retry-after'] ?? 0.1);
\Swoole\Coroutine::sleep(max($retryAfter, 0.1));
}
} finally {
$wg->done();
}
});
}
$wg->wait();
});
ksort($responses);
foreach ($responses as $response) {
$this->assertSame('', $response['error']);
$this->assertContains($response['statusCode'], [200, 201], (string) $response['body']);
}
$uploadedFile = $this->client->call(Client::METHOD_GET, '/storage/buckets/' . $bucketId . '/files/' . $fileId, array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $this->getProject()['$id'],
'x-appwrite-key' => $this->getProject()['apiKey'],
]));
$this->assertEquals(200, $uploadedFile['headers']['status-code']);
$this->assertEquals($chunksTotal, $uploadedFile['body']['chunksTotal']);
$this->assertEquals($chunksTotal, $uploadedFile['body']['chunksUploaded']);
$download = $this->client->call(Client::METHOD_GET, '/storage/buckets/' . $bucketId . '/files/' . $fileId . '/download', array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $this->getProject()['$id'],
'x-appwrite-key' => $this->getProject()['apiKey'],
]));
$this->assertEquals(200, $download['headers']['status-code']);
$this->assertEquals($totalSize, strlen($download['body']));
$this->assertEquals(hash_file('sha256', $source), hash('sha256', $download['body']));
} finally {
if (isset($bucketId)) {
$this->client->call(Client::METHOD_DELETE, '/storage/buckets/' . $bucketId . '/files/' . $fileId, array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $this->getProject()['$id'],
'x-appwrite-key' => $this->getProject()['apiKey'],
]));
$this->client->call(Client::METHOD_DELETE, '/storage/buckets/' . $bucketId, [
'content-type' => 'application/json',
'x-appwrite-project' => $this->getProject()['$id'],
'x-appwrite-key' => $this->getProject()['apiKey'],
]);
}
foreach (glob($tmpDirectory . DIRECTORY_SEPARATOR . '*') ?: [] as $file) {
unlink($file);
}
if (is_dir($tmpDirectory)) {
rmdir($tmpDirectory);
}
}
}
public function testDeleteBucketFile(): void
{
// Create a fresh file just for deletion testing (not using cache since we delete it)