diff --git a/app/init/registers.php b/app/init/registers.php index 54c0053a33..21ce536a8b 100644 --- a/app/init/registers.php +++ b/app/init/registers.php @@ -240,6 +240,12 @@ $register->set('pools', function () { 'multiple' => true, 'schemes' => ['redis'], ], + 'lock' => [ + 'type' => 'lock', + 'dsns' => $fallbackForRedis, + 'multiple' => false, + 'schemes' => ['redis'], + ], ]; $maxConnections = (int) System::getEnv('_APP_CONNECTIONS_MAX', 151); @@ -369,6 +375,8 @@ $register->set('pools', function () { } return $adapter; + case 'lock': + return $resource(); default: throw new Exception(Exception::GENERAL_SERVER_ERROR, "Server error: Missing adapter implementation."); } diff --git a/app/init/resources.php b/app/init/resources.php index 92b581157f..d48a60c06c 100644 --- a/app/init/resources.php +++ b/app/init/resources.php @@ -29,6 +29,7 @@ use Utopia\Database\Document; use Utopia\Database\Validator\Authorization; use Utopia\DI\Container; use Utopia\DSN\DSN; +use Utopia\Lock\Distributed; use Utopia\Pools\Group; use Utopia\Queue\Broker\Pool as BrokerPool; use Utopia\Queue\Publisher; @@ -248,6 +249,16 @@ $container->set('redis', function () { return $redis; }); +$container->set('locks', function (Group $pools) { + return function (string $key, int $ttl, callable $callback, float $timeout = 0.0) use ($pools): mixed { + return $pools->get('lock')->use(function (\Redis $redis) use ($key, $ttl, $callback, $timeout) { + $lock = new Distributed($redis, $key, ttl: $ttl); + + return $lock->withLock($callback, timeout: $timeout); + }); + }; +}, ['pools']); + $container->set('timelimit', function (\Redis $redis) { return function (string $key, int $limit, int $time) use ($redis) { return new TimeLimitRedis($key, $limit, $time, $redis); diff --git a/composer.json b/composer.json index c04bc28f14..400e3c1822 100644 --- a/composer.json +++ b/composer.json @@ -72,6 +72,7 @@ "utopia-php/validators": "0.2.*", "utopia-php/image": "0.8.*", "utopia-php/locale": "0.8.*", + "utopia-php/lock": "0.2.*", "utopia-php/logger": "0.8.*", "utopia-php/messaging": "0.22.*", "utopia-php/migration": "1.*", diff --git a/composer.lock b/composer.lock index 3e2e5e764b..66d8f62925 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "6404f075ed03ef6651ce9cea63518fa0", + "content-hash": "035685d1335039f13e16d0532c874b21", "packages": [ { "name": "adhocore/jwt", @@ -4498,6 +4498,57 @@ }, "time": "2025-08-12T12:58:26+00:00" }, + { + "name": "utopia-php/lock", + "version": "0.2.0", + "source": { + "type": "git", + "url": "https://github.com/utopia-php/lock.git", + "reference": "49317c9493d8f747e4299aa24c22862aa5f6e106" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/utopia-php/lock/zipball/49317c9493d8f747e4299aa24c22862aa5f6e106", + "reference": "49317c9493d8f747e4299aa24c22862aa5f6e106", + "shasum": "" + }, + "require": { + "php": ">=8.3" + }, + "require-dev": { + "laravel/pint": "1.*", + "phpstan/phpstan": "2.*", + "phpunit/phpunit": "11.*", + "swoole/ide-helper": "*" + }, + "suggest": { + "ext-pcntl": "Required to run the File lock tests", + "ext-redis": "Required for the Distributed lock", + "ext-swoole": "Required for the Mutex and Semaphore locks (>=6.0)" + }, + "type": "library", + "autoload": { + "psr-4": { + "Utopia\\Lock\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Appwrite Team", + "email": "team@appwrite.io" + } + ], + "description": "Mutex, semaphore, file and distributed locks for PHP — one interface, four backends.", + "support": { + "issues": "https://github.com/utopia-php/lock/issues", + "source": "https://github.com/utopia-php/lock/tree/0.2.0" + }, + "time": "2026-04-24T10:47:56+00:00" + }, { "name": "utopia-php/logger", "version": "0.8.0", @@ -5141,16 +5192,16 @@ }, { "name": "utopia-php/storage", - "version": "2.0.2", + "version": "2.0.3", "source": { "type": "git", "url": "https://github.com/utopia-php/storage.git", - "reference": "64e132a3768e22243eda36fe4262da22fd204f3c" + "reference": "37129cf0bfcc03210172000e4388d4d3495ae013" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/storage/zipball/64e132a3768e22243eda36fe4262da22fd204f3c", - "reference": "64e132a3768e22243eda36fe4262da22fd204f3c", + "url": "https://api.github.com/repos/utopia-php/storage/zipball/37129cf0bfcc03210172000e4388d4d3495ae013", + "reference": "37129cf0bfcc03210172000e4388d4d3495ae013", "shasum": "" }, "require": { @@ -5187,9 +5238,9 @@ ], "support": { "issues": "https://github.com/utopia-php/storage/issues", - "source": "https://github.com/utopia-php/storage/tree/2.0.2" + "source": "https://github.com/utopia-php/storage/tree/2.0.3" }, - "time": "2026-05-01T15:06:16+00:00" + "time": "2026-05-15T09:42:32+00:00" }, { "name": "utopia-php/system", diff --git a/src/Appwrite/Platform/Modules/Functions/Http/Deployments/Create.php b/src/Appwrite/Platform/Modules/Functions/Http/Deployments/Create.php index 57c465faef..9af5491598 100644 --- a/src/Appwrite/Platform/Modules/Functions/Http/Deployments/Create.php +++ b/src/Appwrite/Platform/Modules/Functions/Http/Deployments/Create.php @@ -21,6 +21,7 @@ use Utopia\Database\Query; use Utopia\Database\Validator\Authorization; use Utopia\Database\Validator\UID; use Utopia\Http\Adapter\Swoole\Request; +use Utopia\Lock\Exception\Contention as LockContention; use Utopia\Platform\Action; use Utopia\Platform\Scope\HTTP; use Utopia\Storage\Device; @@ -92,6 +93,7 @@ class Create extends Action ->inject('plan') ->inject('authorization') ->inject('platform') + ->inject('locks') ->callback($this->action(...)); } @@ -111,7 +113,8 @@ class Create extends Action BuildPublisher $publisherForBuilds, array $plan, Authorization $authorization, - array $platform + array $platform, + callable $locks ) { $activate = \strval($activate) === 'true' || \strval($activate) === '1'; @@ -193,20 +196,38 @@ class Create extends Action // Save to storage $fileSize ??= $deviceForLocal->getFileSize($fileTmpName); $path = $deviceForFunctions->getPath($deploymentId . '.' . \pathinfo($fileName, PATHINFO_EXTENSION)); - $deployment = $dbForProject->getDocument('deployments', $deploymentId); + + $lockKey = 'functions:deployment:' . $project->getId() . ':' . $functionId . ':' . $deploymentId; $metadata = ['content_type' => $deviceForLocal->getFileMimeType($fileTmpName)]; - if (!$deployment->isEmpty()) { - $chunks = $deployment->getAttribute('sourceChunksTotal', 1); - $uploaded = $deployment->getAttribute('sourceChunksUploaded', 0); - $metadata = $deployment->getAttribute('sourceMetadata', []); + $completed = false; - if ($uploaded === $chunks) { - $response - ->setStatusCode(Response::STATUS_CODE_ACCEPTED) - ->dynamic($deployment, Response::MODEL_DEPLOYMENT); - return; - } + try { + $locks($lockKey, 600, function () use (&$chunks, $dbForProject, $deploymentId, &$metadata, &$completed, $response): void { + $deployment = $dbForProject->getDocument('deployments', $deploymentId); + + if (!$deployment->isEmpty()) { + $chunks = $deployment->getAttribute('sourceChunksTotal', 1); + $uploaded = $deployment->getAttribute('sourceChunksUploaded', 0); + $metadata = $deployment->getAttribute('sourceMetadata', []); + + if ($uploaded === $chunks) { + $response + ->setStatusCode(Response::STATUS_CODE_ACCEPTED) + ->dynamic($deployment, Response::MODEL_DEPLOYMENT); + + $completed = true; + return; + } + } + }, timeout: 120.0); + } catch (LockContention) { + $response->addHeader('Retry-After', '5'); + throw new Exception(Exception::GENERAL_RATE_LIMIT_EXCEEDED, 'Deployment upload is busy. Try again.'); + } + + if ($completed) { + return; } $chunksUploaded = $deviceForFunctions->upload($fileTmpName, $path, $chunk, $chunks, $metadata); @@ -217,118 +238,144 @@ class Create extends Action $type = $request->getHeader('x-sdk-language') === 'cli' ? 'cli' : 'manual'; - if ($chunksUploaded === $chunks) { - if ($activate) { - // Remove deploy for all other deployments. - $activeDeployments = $dbForProject->find('deployments', [ - Query::equal('activate', [true]), - Query::equal('resourceId', [$functionId]), - Query::equal('resourceType', ['functions']) - ]); + try { + $locks($lockKey, 600, function () use ($activate, &$chunks, $chunksUploaded, $commands, $dbForProject, $deploymentId, $deviceForFunctions, $entrypoint, $fileSize, &$function, $functionId, $path, &$metadata, $platform, $project, $publisherForBuilds, $queueForEvents, $response, $type): void { + $deployment = $dbForProject->getDocument('deployments', $deploymentId); + $uploaded = 0; - foreach ($activeDeployments as $activeDeployment) { - $activeDeployment->setAttribute('activate', false); - $dbForProject->updateDocument('deployments', $activeDeployment->getId(), new Document([ - 'activate' => false, - ])); + if (!$deployment->isEmpty()) { + $chunks = $deployment->getAttribute('sourceChunksTotal', 1); + $uploaded = $deployment->getAttribute('sourceChunksUploaded', 0); + $metadata = \array_merge($deployment->getAttribute('sourceMetadata', []), $metadata); + + if ($uploaded === $chunks) { + $response + ->setStatusCode(Response::STATUS_CODE_ACCEPTED) + ->dynamic($deployment, Response::MODEL_DEPLOYMENT); + return; + } } - } - $fileSize = $deviceForFunctions->getFileSize($path); + $chunksUploaded = max($uploaded, $chunksUploaded); - if ($deployment->isEmpty()) { - $deployment = $dbForProject->createDocument('deployments', new Document([ - '$id' => $deploymentId, - '$permissions' => [ - Permission::read(Role::any()), - Permission::update(Role::any()), - Permission::delete(Role::any()), - ], - 'resourceInternalId' => $function->getSequence(), - 'resourceId' => $function->getId(), - 'resourceType' => 'functions', - 'entrypoint' => $entrypoint, - 'buildCommands' => $commands, - 'startCommand' => $function->getAttribute('startCommand', ''), - 'sourcePath' => $path, - 'sourceSize' => $fileSize, - 'totalSize' => $fileSize, - 'sourceChunksTotal' => $chunks, - 'sourceChunksUploaded' => $chunksUploaded, - 'activate' => $activate, - 'sourceMetadata' => $metadata, - 'type' => $type - ])); + if ($chunksUploaded === $chunks && $uploaded < $chunks) { + if ($activate) { + // Remove deploy for all other deployments. + $activeDeployments = $dbForProject->find('deployments', [ + Query::equal('activate', [true]), + Query::equal('resourceId', [$functionId]), + Query::equal('resourceType', ['functions']) + ]); - $function = $dbForProject->updateDocument('functions', $function->getId(), new Document([ - 'latestDeploymentId' => $deployment->getId(), - 'latestDeploymentInternalId' => $deployment->getSequence(), - 'latestDeploymentCreatedAt' => $deployment->getCreatedAt(), - 'latestDeploymentStatus' => $deployment->getAttribute('status', ''), - ])); - } else { - $deployment = $dbForProject->updateDocument('deployments', $deploymentId, new Document([ - 'sourceSize' => $fileSize, - 'sourceChunksUploaded' => $chunksUploaded, - 'sourceMetadata' => $metadata, - ])); - } + foreach ($activeDeployments as $activeDeployment) { + $dbForProject->updateDocument('deployments', $activeDeployment->getId(), new Document([ + 'activate' => false, + ])); + } + } - // Start the build - $publisherForBuilds->enqueue(new BuildMessage( - project: $project, - resource: $function, - deployment: $deployment, - type: BUILD_TYPE_DEPLOYMENT, - platform: $platform, - )); - } else { - if ($deployment->isEmpty()) { - $deployment = $dbForProject->createDocument('deployments', new Document([ - '$id' => $deploymentId, - '$permissions' => [ - Permission::read(Role::any()), - Permission::update(Role::any()), - Permission::delete(Role::any()), - ], - 'resourceInternalId' => $function->getSequence(), - 'resourceId' => $function->getId(), - 'resourceType' => 'functions', - 'entrypoint' => $entrypoint, - 'buildCommands' => $commands, - 'startCommand' => $function->getAttribute('startCommand', ''), - 'sourcePath' => $path, - 'sourceSize' => $fileSize, - 'totalSize' => $fileSize, - 'sourceChunksTotal' => $chunks, - 'sourceChunksUploaded' => $chunksUploaded, - 'activate' => $activate, - 'sourceMetadata' => $metadata, - 'type' => $type - ])); + $fileSize = $deviceForFunctions->getFileSize($path); - $function = $dbForProject->updateDocument('functions', $function->getId(), new Document([ - 'latestDeploymentId' => $deployment->getId(), - 'latestDeploymentInternalId' => $deployment->getSequence(), - 'latestDeploymentCreatedAt' => $deployment->getCreatedAt(), - 'latestDeploymentStatus' => $deployment->getAttribute('status', ''), - ])); - } else { - $deployment = $dbForProject->updateDocument('deployments', $deploymentId, new Document([ - 'sourceChunksUploaded' => $chunksUploaded, - 'sourceMetadata' => $metadata, - ])); - } + if ($deployment->isEmpty()) { + $deployment = $dbForProject->createDocument('deployments', new Document([ + '$id' => $deploymentId, + '$permissions' => [ + Permission::read(Role::any()), + Permission::update(Role::any()), + Permission::delete(Role::any()), + ], + 'resourceInternalId' => $function->getSequence(), + 'resourceId' => $function->getId(), + 'resourceType' => 'functions', + 'entrypoint' => $entrypoint, + 'buildCommands' => $commands, + 'startCommand' => $function->getAttribute('startCommand', ''), + 'sourcePath' => $path, + 'sourceSize' => $fileSize, + 'totalSize' => $fileSize, + 'sourceChunksTotal' => $chunks, + 'sourceChunksUploaded' => $chunksUploaded, + 'activate' => $activate, + 'sourceMetadata' => $metadata, + 'type' => $type + ])); + + $function = $dbForProject->updateDocument('functions', $function->getId(), new Document([ + 'latestDeploymentId' => $deployment->getId(), + 'latestDeploymentInternalId' => $deployment->getSequence(), + 'latestDeploymentCreatedAt' => $deployment->getCreatedAt(), + 'latestDeploymentStatus' => $deployment->getAttribute('status', ''), + ])); + } else { + $deployment = $dbForProject->updateDocument('deployments', $deploymentId, new Document([ + 'sourceSize' => $fileSize, + 'sourceChunksUploaded' => $chunksUploaded, + 'sourceMetadata' => $metadata, + ])); + } + + // Start the build + $publisherForBuilds->enqueue(new BuildMessage( + project: $project, + resource: $function, + deployment: $deployment, + type: BUILD_TYPE_DEPLOYMENT, + platform: $platform, + )); + } else { + if ($deployment->isEmpty()) { + $deployment = $dbForProject->createDocument('deployments', new Document([ + '$id' => $deploymentId, + '$permissions' => [ + Permission::read(Role::any()), + Permission::update(Role::any()), + Permission::delete(Role::any()), + ], + 'resourceInternalId' => $function->getSequence(), + 'resourceId' => $function->getId(), + 'resourceType' => 'functions', + 'entrypoint' => $entrypoint, + 'buildCommands' => $commands, + 'startCommand' => $function->getAttribute('startCommand', ''), + 'sourcePath' => $path, + 'sourceSize' => $fileSize, + 'totalSize' => $fileSize, + 'sourceChunksTotal' => $chunks, + 'sourceChunksUploaded' => $chunksUploaded, + 'activate' => $activate, + 'sourceMetadata' => $metadata, + 'type' => $type + ])); + + $function = $dbForProject->updateDocument('functions', $function->getId(), new Document([ + 'latestDeploymentId' => $deployment->getId(), + 'latestDeploymentInternalId' => $deployment->getSequence(), + 'latestDeploymentCreatedAt' => $deployment->getCreatedAt(), + 'latestDeploymentStatus' => $deployment->getAttribute('status', ''), + ])); + } else { + $deployment = $dbForProject->updateDocument('deployments', $deploymentId, new Document([ + 'sourceChunksUploaded' => $chunksUploaded, + 'sourceMetadata' => $metadata, + ])); + } + } + + $metadata = null; + + if ($chunksUploaded === $chunks) { + $queueForEvents + ->setParam('functionId', $function->getId()) + ->setParam('deploymentId', $deployment->getId()); + } + + $response + ->setStatusCode(Response::STATUS_CODE_ACCEPTED) + ->dynamic($deployment, Response::MODEL_DEPLOYMENT); + }, timeout: 120.0); + } catch (LockContention) { + $response->addHeader('Retry-After', '5'); + throw new Exception(Exception::GENERAL_RATE_LIMIT_EXCEEDED, 'Deployment upload is busy. Try again.'); } - - $metadata = null; - - $queueForEvents - ->setParam('functionId', $function->getId()) - ->setParam('deploymentId', $deployment->getId()); - - $response - ->setStatusCode(Response::STATUS_CODE_ACCEPTED) - ->dynamic($deployment, Response::MODEL_DEPLOYMENT); } } diff --git a/src/Appwrite/Platform/Modules/Sites/Http/Deployments/Create.php b/src/Appwrite/Platform/Modules/Sites/Http/Deployments/Create.php index 63ed776709..d27755d106 100644 --- a/src/Appwrite/Platform/Modules/Sites/Http/Deployments/Create.php +++ b/src/Appwrite/Platform/Modules/Sites/Http/Deployments/Create.php @@ -21,6 +21,7 @@ use Utopia\Database\Query; use Utopia\Database\Validator\Authorization; use Utopia\Database\Validator\UID; use Utopia\Http\Adapter\Swoole\Request; +use Utopia\Lock\Exception\Contention as LockContention; use Utopia\Platform\Action; use Utopia\Platform\Scope\HTTP; use Utopia\Storage\Device; @@ -90,6 +91,7 @@ class Create extends Action ->inject('plan') ->inject('authorization') ->inject('platform') + ->inject('locks') ->callback($this->action(...)); } @@ -112,6 +114,7 @@ class Create extends Action array $plan, Authorization $authorization, array $platform, + callable $locks, ) { $activate = \strval($activate) === 'true' || \strval($activate) === '1'; @@ -193,20 +196,38 @@ class Create extends Action // Save to storage $fileSize ??= $deviceForLocal->getFileSize($fileTmpName); $path = $deviceForSites->getPath($deploymentId . '.' . \pathinfo($fileName, PATHINFO_EXTENSION)); - $deployment = $dbForProject->getDocument('deployments', $deploymentId); + + $lockKey = 'sites:deployment:' . $project->getId() . ':' . $siteId . ':' . $deploymentId; $metadata = ['content_type' => $deviceForLocal->getFileMimeType($fileTmpName)]; - if (!$deployment->isEmpty()) { - $chunks = $deployment->getAttribute('sourceChunksTotal', 1); - $uploaded = $deployment->getAttribute('sourceChunksUploaded', 0); - $metadata = $deployment->getAttribute('sourceMetadata', []); + $completed = false; - if ($uploaded === $chunks) { - $response - ->setStatusCode(Response::STATUS_CODE_ACCEPTED) - ->dynamic($deployment, Response::MODEL_DEPLOYMENT); - return; - } + try { + $locks($lockKey, 600, function () use (&$chunks, $dbForProject, $deploymentId, &$metadata, &$completed, $response): void { + $deployment = $dbForProject->getDocument('deployments', $deploymentId); + + if (!$deployment->isEmpty()) { + $chunks = $deployment->getAttribute('sourceChunksTotal', 1); + $uploaded = $deployment->getAttribute('sourceChunksUploaded', 0); + $metadata = $deployment->getAttribute('sourceMetadata', []); + + if ($uploaded === $chunks) { + $response + ->setStatusCode(Response::STATUS_CODE_ACCEPTED) + ->dynamic($deployment, Response::MODEL_DEPLOYMENT); + + $completed = true; + return; + } + } + }, timeout: 120.0); + } catch (LockContention) { + $response->addHeader('Retry-After', '5'); + throw new Exception(Exception::GENERAL_RATE_LIMIT_EXCEEDED, 'Deployment upload is busy. Try again.'); + } + + if ($completed) { + return; } $chunksUploaded = $deviceForSites->upload($fileTmpName, $path, $chunk, $chunks, $metadata); @@ -225,184 +246,208 @@ class Create extends Action $commands[] = $buildCommand; } - if ($chunksUploaded === $chunks) { - if ($activate) { - // Remove deploy for all other deployments. - $activeDeployments = $dbForProject->find('deployments', [ - Query::equal('activate', [true]), - Query::equal('resourceId', [$siteId]), - Query::equal('resourceType', ['sites']) - ]); + try { + $locks($lockKey, 600, function () use ($activate, $authorization, $commands, &$chunks, $chunksUploaded, $dbForPlatform, $dbForProject, $deploymentId, $deviceForSites, $fileSize, &$metadata, $outputDirectory, $path, $platform, $project, $publisherForBuilds, $queueForEvents, $response, &$site, $siteId, $type): void { + $deployment = $dbForProject->getDocument('deployments', $deploymentId); + $uploaded = 0; - foreach ($activeDeployments as $activeDeployment) { - $activeDeployment->setAttribute('activate', false); - $dbForProject->updateDocument('deployments', $activeDeployment->getId(), new Document(['activate' => false])); + if (!$deployment->isEmpty()) { + $chunks = $deployment->getAttribute('sourceChunksTotal', 1); + $uploaded = $deployment->getAttribute('sourceChunksUploaded', 0); + $metadata = \array_merge($deployment->getAttribute('sourceMetadata', []), $metadata); + + if ($uploaded === $chunks) { + $response + ->setStatusCode(Response::STATUS_CODE_ACCEPTED) + ->dynamic($deployment, Response::MODEL_DEPLOYMENT); + return; + } } - } - $fileSize = $deviceForSites->getFileSize($path); + $chunksUploaded = max($uploaded, $chunksUploaded); - if ($deployment->isEmpty()) { - $deployment = $dbForProject->createDocument('deployments', new Document([ - '$id' => $deploymentId, - '$permissions' => [ - Permission::read(Role::any()), - Permission::update(Role::any()), - Permission::delete(Role::any()), - ], - 'resourceInternalId' => $site->getSequence(), - 'resourceId' => $site->getId(), - 'resourceType' => 'sites', - 'buildCommands' => \implode(' && ', $commands), - 'startCommand' => $site->getAttribute('startCommand', ''), - 'buildOutput' => $outputDirectory, - 'adapter' => $site->getAttribute('adapter', ''), - 'fallbackFile' => $site->getAttribute('fallbackFile', ''), - 'sourcePath' => $path, - 'sourceSize' => $fileSize, - 'totalSize' => $fileSize, - 'sourceChunksTotal' => $chunks, - 'sourceChunksUploaded' => $chunksUploaded, - 'activate' => $activate, - 'sourceMetadata' => $metadata, - 'type' => $type, - ])); + if ($chunksUploaded === $chunks && $uploaded < $chunks) { + if ($activate) { + // Remove deploy for all other deployments. + $activeDeployments = $dbForProject->find('deployments', [ + Query::equal('activate', [true]), + Query::equal('resourceId', [$siteId]), + Query::equal('resourceType', ['sites']) + ]); - $site = $site - ->setAttribute('latestDeploymentId', $deployment->getId()) - ->setAttribute('latestDeploymentInternalId', $deployment->getSequence()) - ->setAttribute('latestDeploymentCreatedAt', $deployment->getCreatedAt()) - ->setAttribute('latestDeploymentStatus', $deployment->getAttribute('status', '')); - $dbForProject->updateDocument('sites', $site->getId(), new Document([ - 'latestDeploymentId' => $deployment->getId(), - 'latestDeploymentInternalId' => $deployment->getSequence(), - 'latestDeploymentCreatedAt' => $deployment->getCreatedAt(), - 'latestDeploymentStatus' => $deployment->getAttribute('status', ''), - ])); + foreach ($activeDeployments as $activeDeployment) { + $dbForProject->updateDocument('deployments', $activeDeployment->getId(), new Document(['activate' => false])); + } + } - $sitesDomain = $platform['sitesDomain']; - $domain = ID::unique() . "." . $sitesDomain; + $fileSize = $deviceForSites->getFileSize($path); - // TODO: (@Meldiron) Remove after 1.7.x migration - $isMd5 = System::getEnv('_APP_RULES_FORMAT') === 'md5'; - $ruleId = $isMd5 ? md5($domain) : ID::unique(); + if ($deployment->isEmpty()) { + $deployment = $dbForProject->createDocument('deployments', new Document([ + '$id' => $deploymentId, + '$permissions' => [ + Permission::read(Role::any()), + Permission::update(Role::any()), + Permission::delete(Role::any()), + ], + 'resourceInternalId' => $site->getSequence(), + 'resourceId' => $site->getId(), + 'resourceType' => 'sites', + 'buildCommands' => \implode(' && ', $commands), + 'startCommand' => $site->getAttribute('startCommand', ''), + 'buildOutput' => $outputDirectory, + 'adapter' => $site->getAttribute('adapter', ''), + 'fallbackFile' => $site->getAttribute('fallbackFile', ''), + 'sourcePath' => $path, + 'sourceSize' => $fileSize, + 'totalSize' => $fileSize, + 'sourceChunksTotal' => $chunks, + 'sourceChunksUploaded' => $chunksUploaded, + 'activate' => $activate, + 'sourceMetadata' => $metadata, + 'type' => $type, + ])); - $authorization->skip( - fn () => $dbForPlatform->createDocument('rules', new Document([ - '$id' => $ruleId, - 'projectId' => $project->getId(), - 'projectInternalId' => $project->getSequence(), - 'domain' => $domain, - 'type' => 'deployment', - 'trigger' => 'deployment', - 'deploymentId' => $deployment->isEmpty() ? '' : $deployment->getId(), - 'deploymentInternalId' => $deployment->isEmpty() ? '' : $deployment->getSequence(), - 'deploymentResourceType' => 'site', - 'deploymentResourceId' => $site->getId(), - 'deploymentResourceInternalId' => $site->getSequence(), - 'status' => 'verified', - 'certificateId' => '', - 'search' => implode(' ', [$ruleId, $domain]), - 'owner' => 'Appwrite', - 'region' => $project->getAttribute('region') - ])) - ); - } else { - $deployment = $dbForProject->updateDocument('deployments', $deploymentId, new Document([ - 'sourceSize' => $fileSize, - 'sourceChunksUploaded' => $chunksUploaded, - 'sourceMetadata' => $metadata, - ])); - } + $site = $site + ->setAttribute('latestDeploymentId', $deployment->getId()) + ->setAttribute('latestDeploymentInternalId', $deployment->getSequence()) + ->setAttribute('latestDeploymentCreatedAt', $deployment->getCreatedAt()) + ->setAttribute('latestDeploymentStatus', $deployment->getAttribute('status', '')); + $dbForProject->updateDocument('sites', $site->getId(), new Document([ + 'latestDeploymentId' => $deployment->getId(), + 'latestDeploymentInternalId' => $deployment->getSequence(), + 'latestDeploymentCreatedAt' => $deployment->getCreatedAt(), + 'latestDeploymentStatus' => $deployment->getAttribute('status', ''), + ])); - // Start the build - $publisherForBuilds->enqueue(new BuildMessage( - project: $project, - resource: $site, - deployment: $deployment, - type: BUILD_TYPE_DEPLOYMENT, - platform: $platform, - )); - } else { - if ($deployment->isEmpty()) { - $deployment = $dbForProject->createDocument('deployments', new Document([ - '$id' => $deploymentId, - '$permissions' => [ - Permission::read(Role::any()), - Permission::update(Role::any()), - Permission::delete(Role::any()), - ], - 'resourceInternalId' => $site->getSequence(), - 'resourceId' => $site->getId(), - 'resourceType' => 'sites', - 'buildCommands' => \implode(' && ', $commands), - 'startCommand' => $site->getAttribute('startCommand', ''), - 'buildOutput' => $outputDirectory, - 'adapter' => $site->getAttribute('adapter', ''), - 'fallbackFile' => $site->getAttribute('fallbackFile', ''), - 'sourcePath' => $path, - 'sourceSize' => $fileSize, - 'totalSize' => $fileSize, - 'sourceChunksTotal' => $chunks, - 'sourceChunksUploaded' => $chunksUploaded, - 'activate' => $activate, - 'sourceMetadata' => $metadata, - 'type' => $type, - ])); + $sitesDomain = $platform['sitesDomain']; + $domain = ID::unique() . "." . $sitesDomain; - $site = $site - ->setAttribute('latestDeploymentId', $deployment->getId()) - ->setAttribute('latestDeploymentInternalId', $deployment->getSequence()) - ->setAttribute('latestDeploymentCreatedAt', $deployment->getCreatedAt()) - ->setAttribute('latestDeploymentStatus', $deployment->getAttribute('status', '')); - $dbForProject->updateDocument('sites', $site->getId(), new Document([ - 'latestDeploymentId' => $site->getAttribute('latestDeploymentId'), - 'latestDeploymentInternalId' => $site->getAttribute('latestDeploymentInternalId'), - 'latestDeploymentCreatedAt' => $site->getAttribute('latestDeploymentCreatedAt'), - 'latestDeploymentStatus' => $site->getAttribute('latestDeploymentStatus'), - ])); + // TODO: (@Meldiron) Remove after 1.7.x migration + $isMd5 = System::getEnv('_APP_RULES_FORMAT') === 'md5'; + $ruleId = $isMd5 ? md5($domain) : ID::unique(); - $sitesDomain = $platform['sitesDomain']; - $domain = ID::unique() . "." . $sitesDomain; - $ruleId = md5($domain); - $authorization->skip( - fn () => $dbForPlatform->createDocument('rules', new Document([ - '$id' => $ruleId, - 'projectId' => $project->getId(), - 'projectInternalId' => $project->getSequence(), - 'domain' => $domain, - 'type' => 'deployment', - 'trigger' => 'deployment', - 'deploymentId' => $deployment->isEmpty() ? '' : $deployment->getId(), - 'deploymentInternalId' => $deployment->isEmpty() ? '' : $deployment->getSequence(), - 'deploymentResourceType' => 'site', - 'deploymentResourceId' => $site->getId(), - 'deploymentResourceInternalId' => $site->getSequence(), - 'status' => 'verified', - 'certificateId' => '', - 'search' => implode(' ', [$ruleId, $domain]), - 'owner' => 'Appwrite', - 'region' => $project->getAttribute('region') - ])) - ); - } else { - $deployment = $dbForProject->updateDocument('deployments', $deploymentId, new Document([ - 'sourceChunksUploaded' => $chunksUploaded, - 'sourceMetadata' => $metadata, - ])); - } + $authorization->skip( + fn () => $dbForPlatform->createDocument('rules', new Document([ + '$id' => $ruleId, + 'projectId' => $project->getId(), + 'projectInternalId' => $project->getSequence(), + 'domain' => $domain, + 'type' => 'deployment', + 'trigger' => 'deployment', + 'deploymentId' => $deployment->isEmpty() ? '' : $deployment->getId(), + 'deploymentInternalId' => $deployment->isEmpty() ? '' : $deployment->getSequence(), + 'deploymentResourceType' => 'site', + 'deploymentResourceId' => $site->getId(), + 'deploymentResourceInternalId' => $site->getSequence(), + 'status' => 'verified', + 'certificateId' => '', + 'search' => implode(' ', [$ruleId, $domain]), + 'owner' => 'Appwrite', + 'region' => $project->getAttribute('region') + ])) + ); + } else { + $deployment = $dbForProject->updateDocument('deployments', $deploymentId, new Document([ + 'sourceSize' => $fileSize, + 'sourceChunksUploaded' => $chunksUploaded, + 'sourceMetadata' => $metadata, + ])); + } + + // Start the build + $publisherForBuilds->enqueue(new BuildMessage( + project: $project, + resource: $site, + deployment: $deployment, + type: BUILD_TYPE_DEPLOYMENT, + platform: $platform, + )); + } else { + if ($deployment->isEmpty()) { + $deployment = $dbForProject->createDocument('deployments', new Document([ + '$id' => $deploymentId, + '$permissions' => [ + Permission::read(Role::any()), + Permission::update(Role::any()), + Permission::delete(Role::any()), + ], + 'resourceInternalId' => $site->getSequence(), + 'resourceId' => $site->getId(), + 'resourceType' => 'sites', + 'buildCommands' => \implode(' && ', $commands), + 'startCommand' => $site->getAttribute('startCommand', ''), + 'buildOutput' => $outputDirectory, + 'adapter' => $site->getAttribute('adapter', ''), + 'fallbackFile' => $site->getAttribute('fallbackFile', ''), + 'sourcePath' => $path, + 'sourceSize' => $fileSize, + 'totalSize' => $fileSize, + 'sourceChunksTotal' => $chunks, + 'sourceChunksUploaded' => $chunksUploaded, + 'activate' => $activate, + 'sourceMetadata' => $metadata, + 'type' => $type, + ])); + + $site = $site + ->setAttribute('latestDeploymentId', $deployment->getId()) + ->setAttribute('latestDeploymentInternalId', $deployment->getSequence()) + ->setAttribute('latestDeploymentCreatedAt', $deployment->getCreatedAt()) + ->setAttribute('latestDeploymentStatus', $deployment->getAttribute('status', '')); + $dbForProject->updateDocument('sites', $site->getId(), new Document([ + 'latestDeploymentId' => $site->getAttribute('latestDeploymentId'), + 'latestDeploymentInternalId' => $site->getAttribute('latestDeploymentInternalId'), + 'latestDeploymentCreatedAt' => $site->getAttribute('latestDeploymentCreatedAt'), + 'latestDeploymentStatus' => $site->getAttribute('latestDeploymentStatus'), + ])); + + $sitesDomain = $platform['sitesDomain']; + $domain = ID::unique() . "." . $sitesDomain; + $ruleId = md5($domain); + $authorization->skip( + fn () => $dbForPlatform->createDocument('rules', new Document([ + '$id' => $ruleId, + 'projectId' => $project->getId(), + 'projectInternalId' => $project->getSequence(), + 'domain' => $domain, + 'type' => 'deployment', + 'trigger' => 'deployment', + 'deploymentId' => $deployment->isEmpty() ? '' : $deployment->getId(), + 'deploymentInternalId' => $deployment->isEmpty() ? '' : $deployment->getSequence(), + 'deploymentResourceType' => 'site', + 'deploymentResourceId' => $site->getId(), + 'deploymentResourceInternalId' => $site->getSequence(), + 'status' => 'verified', + 'certificateId' => '', + 'search' => implode(' ', [$ruleId, $domain]), + 'owner' => 'Appwrite', + 'region' => $project->getAttribute('region') + ])) + ); + } else { + $deployment = $dbForProject->updateDocument('deployments', $deploymentId, new Document([ + 'sourceChunksUploaded' => $chunksUploaded, + 'sourceMetadata' => $metadata, + ])); + } + } + + $metadata = null; + + if ($chunksUploaded === $chunks) { + $queueForEvents + ->setParam('siteId', $site->getId()) + ->setParam('deploymentId', $deployment->getId()); + } + + $response + ->setStatusCode(Response::STATUS_CODE_ACCEPTED) + ->dynamic($deployment, Response::MODEL_DEPLOYMENT); + }, timeout: 120.0); + } catch (LockContention) { + $response->addHeader('Retry-After', '5'); + throw new Exception(Exception::GENERAL_RATE_LIMIT_EXCEEDED, 'Deployment upload is busy. Try again.'); } - - - - $metadata = null; - - $queueForEvents - ->setParam('siteId', $site->getId()) - ->setParam('deploymentId', $deployment->getId()); - - $response - ->setStatusCode(Response::STATUS_CODE_ACCEPTED) - ->dynamic($deployment, Response::MODEL_DEPLOYMENT); } } diff --git a/src/Appwrite/Platform/Modules/Storage/Http/Buckets/Files/Create.php b/src/Appwrite/Platform/Modules/Storage/Http/Buckets/Files/Create.php index 2ce5ef97f5..8530475f0c 100644 --- a/src/Appwrite/Platform/Modules/Storage/Http/Buckets/Files/Create.php +++ b/src/Appwrite/Platform/Modules/Storage/Http/Buckets/Files/Create.php @@ -29,6 +29,7 @@ use Utopia\Database\Validator\Authorization\Input; use Utopia\Database\Validator\Permissions; use Utopia\Database\Validator\UID; use Utopia\Http\Adapter\Swoole\Request; +use Utopia\Lock\Exception\Contention as LockContention; use Utopia\Platform\Action; use Utopia\Platform\Scope\HTTP; use Utopia\Storage\Device; @@ -86,12 +87,13 @@ class Create extends Action ->inject('request') ->inject('response') ->inject('dbForProject') + ->inject('project') ->inject('user') ->inject('queueForEvents') - ->inject('mode') ->inject('deviceForFiles') ->inject('deviceForLocal') ->inject('authorization') + ->inject('locks') ->callback($this->action(...)); } @@ -103,12 +105,13 @@ class Create extends Action Request $request, Response $response, Database $dbForProject, + Document $project, User $user, Event $queueForEvents, - string $mode, Device $deviceForFiles, Device $deviceForLocal, - Authorization $authorization + Authorization $authorization, + callable $locks ) { $bucket = $authorization->skip(fn () => $dbForProject->getDocument('buckets', $bucketId)); @@ -234,189 +237,242 @@ class Create extends Action $path = $deviceForFiles->getPath($fileId . '.' . \pathinfo($fileName, PATHINFO_EXTENSION)); $path = str_ireplace($deviceForFiles->getRoot(), $deviceForFiles->getRoot() . DIRECTORY_SEPARATOR . $bucket->getId(), $path); // Add bucket id to path after root - $file = $dbForProject->getDocument('bucket_' . $bucket->getSequence(), $fileId); + $lockKey = 'storage:file:' . $project->getId() . ':' . $bucket->getId() . ':' . $fileId; $metadata = ['content_type' => $deviceForLocal->getFileMimeType($fileTmpName)]; - if (!$file->isEmpty()) { - $chunks = $file->getAttribute('chunksTotal', 1); - $uploaded = $file->getAttribute('chunksUploaded', 0); - $metadata = $file->getAttribute('metadata', []); + $completed = false; - if ($uploaded === $chunks) { - if (empty($contentRange)) { - throw new Exception(Exception::STORAGE_FILE_ALREADY_EXISTS); + $mergeUploadMetadata = function (array $stored, array $current): array { + $merged = \array_merge($stored, $current); + + if (isset($stored['parts']) || isset($current['parts'])) { + $parts = $stored['parts'] ?? []; + foreach (($current['parts'] ?? []) as $part => $value) { + $parts[(int) $part] = $value; + } + \ksort($parts); + + $merged['parts'] = $parts; + $merged['chunks'] = \count($parts); + } + + return $merged; + }; + + try { + $locks($lockKey, 600, function () use ($bucket, &$chunks, $contentRange, $dbForProject, $deviceForFiles, $fileId, $fileName, $fileSize, &$metadata, $path, $permissions, $response, &$completed): void { + $file = $dbForProject->getDocument('bucket_' . $bucket->getSequence(), $fileId); + if (!$file->isEmpty()) { + $chunks = $file->getAttribute('chunksTotal', 1); + $uploaded = $file->getAttribute('chunksUploaded', 0); + $metadata = $file->getAttribute('metadata', []); + + if ($uploaded === $chunks) { + if (empty($contentRange)) { + throw new Exception(Exception::STORAGE_FILE_ALREADY_EXISTS); + } + + $response + ->setStatusCode(Response::STATUS_CODE_OK) + ->dynamic($file, Response::MODEL_FILE); + + $completed = true; + return; + } } - $response - ->setStatusCode(Response::STATUS_CODE_OK) - ->dynamic($file, Response::MODEL_FILE); - return; - } + if ($file->isEmpty()) { + $deviceForFiles->prepareUpload($path, $metadata['content_type'] ?? '', $chunks, $metadata); + + if (!empty($contentRange)) { + $doc = new Document([ + '$id' => ID::custom($fileId), + '$permissions' => $permissions, + 'bucketId' => $bucket->getId(), + 'bucketInternalId' => $bucket->getSequence(), + 'name' => $fileName, + 'path' => $path, + 'signature' => '', + 'mimeType' => '', + 'sizeOriginal' => $fileSize, + 'sizeActual' => 0, + 'algorithm' => '', + 'comment' => '', + 'chunksTotal' => $chunks, + 'chunksUploaded' => 0, + 'search' => implode(' ', [$fileId, $fileName]), + 'metadata' => $metadata, + ]); + + try { + $dbForProject->createDocument('bucket_' . $bucket->getSequence(), $doc); + } catch (DuplicateException) { + throw new Exception(Exception::STORAGE_FILE_ALREADY_EXISTS); + } catch (NotFoundException) { + throw new Exception(Exception::STORAGE_BUCKET_NOT_FOUND); + } + } + } + }, timeout: 120.0); + } catch (LockContention) { + $response->addHeader('Retry-After', '5'); + throw new Exception(Exception::GENERAL_RATE_LIMIT_EXCEEDED, 'File upload is busy. Try again.'); } - $chunksUploaded = $deviceForFiles->upload($fileTmpName, $path, $chunk, $chunks, $metadata); - - if (empty($chunksUploaded)) { - throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Failed uploading file'); + if ($completed) { + return; } - if ($chunksUploaded === $chunks) { - if (System::getEnv('_APP_STORAGE_ANTIVIRUS') === 'enabled' && $bucket->getAttribute('antivirus', true) && $fileSize <= APP_LIMIT_ANTIVIRUS && $deviceForFiles->getType() === Storage::DEVICE_LOCAL) { - $antivirus = new Network( - System::getEnv('_APP_STORAGE_ANTIVIRUS_HOST', 'clamav'), - (int) System::getEnv('_APP_STORAGE_ANTIVIRUS_PORT', 3310) - ); + $finalizeUpload = function (int $chunksUploaded) use ($authorization, $bucket, &$chunks, $contentRange, $dbForProject, $deviceForFiles, $fileId, $fileName, $fileSize, &$metadata, $mergeUploadMetadata, $path, $permissions, $queueForEvents, $response): void { + $file = $dbForProject->getDocument('bucket_' . $bucket->getSequence(), $fileId); + $uploaded = 0; - if (!$antivirus->fileScan($path)) { - $deviceForFiles->delete($path); - throw new Exception(Exception::STORAGE_INVALID_FILE); + if (!$file->isEmpty()) { + $chunks = $file->getAttribute('chunksTotal', 1); + $uploaded = $file->getAttribute('chunksUploaded', 0); + $metadata = $mergeUploadMetadata($file->getAttribute('metadata', []), $metadata); + + if ($uploaded === $chunks) { + if (empty($contentRange)) { + throw new Exception(Exception::STORAGE_FILE_ALREADY_EXISTS); + } + + $response + ->setStatusCode(Response::STATUS_CODE_OK) + ->dynamic($file, Response::MODEL_FILE); + + return; } } - $mimeType = $deviceForFiles->getFileMimeType($path); // Get mime-type before compression and encryption - $fileHash = $deviceForFiles->getFileHash($path); // Get file hash before compression and encryption - $data = ''; - $iv = ''; - $tag = null; - // Compression - $algorithm = $bucket->getAttribute('compression', Compression::NONE); - if ($fileSize <= APP_STORAGE_READ_BUFFER && $algorithm != Compression::NONE) { - $data = $deviceForFiles->read($path); - switch ($algorithm) { - case Compression::ZSTD: - $compressor = new Zstd(); - break; - case Compression::GZIP: - default: - $compressor = new GZIP(); - break; - } - $data = $compressor->compress($data); - } else { - // reset the algorithm to none as we do not compress the file - // if file size exceedes the APP_STORAGE_READ_BUFFER - // regardless the bucket compression algoorithm - $algorithm = Compression::NONE; - } + $chunksUploaded = max($uploaded, $chunksUploaded, (int) ($metadata['chunks'] ?? 0)); - if ($bucket->getAttribute('encryption', true) && $fileSize <= APP_STORAGE_READ_BUFFER) { - if (empty($data)) { + if ($chunksUploaded === $chunks && $uploaded < $chunks) { + $deviceForFiles->finalizeUpload($path, $chunks, $metadata); + + if (System::getEnv('_APP_STORAGE_ANTIVIRUS') === 'enabled' && $bucket->getAttribute('antivirus', true) && $fileSize <= APP_LIMIT_ANTIVIRUS && $deviceForFiles->getType() === Storage::DEVICE_LOCAL) { + $antivirus = new Network( + System::getEnv('_APP_STORAGE_ANTIVIRUS_HOST', 'clamav'), + (int) System::getEnv('_APP_STORAGE_ANTIVIRUS_PORT', 3310) + ); + + if (!$antivirus->fileScan($path)) { + $deviceForFiles->delete($path); + throw new Exception(Exception::STORAGE_INVALID_FILE); + } + } + + $mimeType = $deviceForFiles->getFileMimeType($path); // Get mime-type before compression and encryption + $fileHash = $deviceForFiles->getFileHash($path); // Get file hash before compression and encryption + $data = ''; + $iv = ''; + $tag = null; + // Compression + $algorithm = $bucket->getAttribute('compression', Compression::NONE); + if ($fileSize <= APP_STORAGE_READ_BUFFER && $algorithm != Compression::NONE) { $data = $deviceForFiles->read($path); + switch ($algorithm) { + case Compression::ZSTD: + $compressor = new Zstd(); + break; + case Compression::GZIP: + default: + $compressor = new GZIP(); + break; + } + $data = $compressor->compress($data); + } else { + // reset the algorithm to none as we do not compress the file + // if file size exceedes the APP_STORAGE_READ_BUFFER + // regardless the bucket compression algoorithm + $algorithm = Compression::NONE; } - $key = System::getEnv('_APP_OPENSSL_KEY_V1'); - $iv = OpenSSL::randomPseudoBytes(OpenSSL::cipherIVLength(OpenSSL::CIPHER_AES_128_GCM)); - $data = OpenSSL::encrypt($data, OpenSSL::CIPHER_AES_128_GCM, $key, 0, $iv, $tag); - } - if (!empty($data)) { - if (!$deviceForFiles->write($path, $data, $mimeType)) { - throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Failed to save file'); + if ($bucket->getAttribute('encryption', true) && $fileSize <= APP_STORAGE_READ_BUFFER) { + if (empty($data)) { + $data = $deviceForFiles->read($path); + } + $key = System::getEnv('_APP_OPENSSL_KEY_V1'); + $iv = OpenSSL::randomPseudoBytes(OpenSSL::cipherIVLength(OpenSSL::CIPHER_AES_128_GCM)); + $data = OpenSSL::encrypt($data, OpenSSL::CIPHER_AES_128_GCM, $key, 0, $iv, $tag); } - } - $sizeActual = $deviceForFiles->getFileSize($path); - - $openSSLVersion = null; - $openSSLCipher = null; - $openSSLTag = null; - $openSSLIV = null; - - if ($bucket->getAttribute('encryption', true) && $fileSize <= APP_STORAGE_READ_BUFFER) { - $openSSLVersion = '1'; - $openSSLCipher = OpenSSL::CIPHER_AES_128_GCM; - $openSSLTag = \bin2hex($tag); - $openSSLIV = \bin2hex($iv); - } - - if ($file->isEmpty()) { - $doc = new Document([ - '$id' => $fileId, - '$permissions' => $permissions, - 'bucketId' => $bucket->getId(), - 'bucketInternalId' => $bucket->getSequence(), - 'name' => $fileName, - 'path' => $path, - 'signature' => $fileHash, - 'mimeType' => $mimeType, - 'sizeOriginal' => $fileSize, - 'sizeActual' => $sizeActual, - 'algorithm' => $algorithm, - 'comment' => '', - 'chunksTotal' => $chunks, - 'chunksUploaded' => $chunksUploaded, - 'openSSLVersion' => $openSSLVersion, - 'openSSLCipher' => $openSSLCipher, - 'openSSLTag' => $openSSLTag, - 'openSSLIV' => $openSSLIV, - 'search' => implode(' ', [$fileId, $fileName]), - 'metadata' => $metadata, - ]); - - try { - $file = $dbForProject->createDocument('bucket_' . $bucket->getSequence(), $doc); - } catch (DuplicateException) { - throw new Exception(Exception::STORAGE_FILE_ALREADY_EXISTS); - } catch (NotFoundException) { - throw new Exception(Exception::STORAGE_BUCKET_NOT_FOUND); + if (!empty($data)) { + if (!$deviceForFiles->write($path, $data, $mimeType)) { + throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Failed to save file'); + } } + + $sizeActual = $deviceForFiles->getFileSize($path); + + $openSSLVersion = null; + $openSSLCipher = null; + $openSSLTag = null; + $openSSLIV = null; + + if ($bucket->getAttribute('encryption', true) && $fileSize <= APP_STORAGE_READ_BUFFER) { + $openSSLVersion = '1'; + $openSSLCipher = OpenSSL::CIPHER_AES_128_GCM; + $openSSLTag = \bin2hex($tag); + $openSSLIV = \bin2hex($iv); + } + + if ($file->isEmpty()) { + $doc = new Document([ + '$id' => $fileId, + '$permissions' => $permissions, + 'bucketId' => $bucket->getId(), + 'bucketInternalId' => $bucket->getSequence(), + 'name' => $fileName, + 'path' => $path, + 'signature' => $fileHash, + 'mimeType' => $mimeType, + 'sizeOriginal' => $fileSize, + 'sizeActual' => $sizeActual, + 'algorithm' => $algorithm, + 'comment' => '', + 'chunksTotal' => $chunks, + 'chunksUploaded' => $chunksUploaded, + 'openSSLVersion' => $openSSLVersion, + 'openSSLCipher' => $openSSLCipher, + 'openSSLTag' => $openSSLTag, + 'openSSLIV' => $openSSLIV, + 'search' => implode(' ', [$fileId, $fileName]), + 'metadata' => $metadata, + ]); + + try { + $file = $dbForProject->createDocument('bucket_' . $bucket->getSequence(), $doc); + } catch (DuplicateException) { + throw new Exception(Exception::STORAGE_FILE_ALREADY_EXISTS); + } catch (NotFoundException) { + throw new Exception(Exception::STORAGE_BUCKET_NOT_FOUND); + } + } else { + /** + * Skip authorization in updateDocument. + * Without this, the file creation will fail when user doesn't have update permission. + * However as with chunk upload even if we are updating, we are essentially creating a file + * adding it's new chunk so we rely on the create-permission check performed earlier. + */ + $file = $authorization->skip(fn () => $dbForProject->updateDocument('bucket_' . $bucket->getSequence(), $fileId, new Document([ + '$permissions' => $permissions, + 'signature' => $fileHash, + 'mimeType' => $mimeType, + 'sizeActual' => $sizeActual, + 'algorithm' => $algorithm, + 'openSSLVersion' => $openSSLVersion, + 'openSSLCipher' => $openSSLCipher, + 'openSSLTag' => $openSSLTag, + 'openSSLIV' => $openSSLIV, + 'metadata' => $metadata, + 'chunksUploaded' => $chunksUploaded, + ]))); + } + + // Trigger after create success hook + $this->afterCreateSuccess($file); } else { - $file = $file - ->setAttribute('$permissions', $permissions) - ->setAttribute('signature', $fileHash) - ->setAttribute('mimeType', $mimeType) - ->setAttribute('sizeActual', $sizeActual) - ->setAttribute('algorithm', $algorithm) - ->setAttribute('openSSLVersion', $openSSLVersion) - ->setAttribute('openSSLCipher', $openSSLCipher) - ->setAttribute('openSSLTag', $openSSLTag) - ->setAttribute('openSSLIV', $openSSLIV) - ->setAttribute('metadata', $metadata) - ->setAttribute('chunksUploaded', $chunksUploaded); - - /** - * Skip authorization in updateDocument. - * Without this, the file creation will fail when user doesn't have update permission. - * However as with chunk upload even if we are updating, we are essentially creating a file - * adding it's new chunk so we rely on the create-permission check performed earlier. - */ - $file = $authorization->skip(fn () => $dbForProject->updateDocument('bucket_' . $bucket->getSequence(), $fileId, $file)); - } - - // Trigger after create success hook - $this->afterCreateSuccess($file); - } else { - if ($file->isEmpty()) { - $doc = new Document([ - '$id' => ID::custom($fileId), - '$permissions' => $permissions, - 'bucketId' => $bucket->getId(), - 'bucketInternalId' => $bucket->getSequence(), - 'name' => $fileName, - 'path' => $path, - 'signature' => '', - 'mimeType' => '', - 'sizeOriginal' => $fileSize, - 'sizeActual' => 0, - 'algorithm' => '', - 'comment' => '', - 'chunksTotal' => $chunks, - 'chunksUploaded' => $chunksUploaded, - 'search' => implode(' ', [$fileId, $fileName]), - 'metadata' => $metadata, - ]); - - try { - $file = $dbForProject->createDocument('bucket_' . $bucket->getSequence(), $doc); - } catch (DuplicateException) { - throw new Exception(Exception::STORAGE_FILE_ALREADY_EXISTS); - } catch (NotFoundException) { - throw new Exception(Exception::STORAGE_BUCKET_NOT_FOUND); - } - } else { - $file = $file - ->setAttribute('chunksUploaded', $chunksUploaded) - ->setAttribute('metadata', $metadata); - /** * Skip authorization in updateDocument. * Without this, the file creation will fail when user doesn't have update permission. @@ -424,23 +480,41 @@ class Create extends Action * adding it's new chunk so we rely on the create-permission check performed earlier. */ try { - $file = $authorization->skip(fn () => $dbForProject->updateDocument('bucket_' . $bucket->getSequence(), $fileId, $file)); + $file = $authorization->skip(fn () => $dbForProject->updateDocument('bucket_' . $bucket->getSequence(), $fileId, new Document([ + 'chunksUploaded' => $chunksUploaded, + 'metadata' => $metadata, + ]))); } catch (NotFoundException) { throw new Exception(Exception::STORAGE_BUCKET_NOT_FOUND); } } + + if ($chunksUploaded === $chunks) { + $queueForEvents + ->setParam('bucketId', $bucket->getId()) + ->setParam('fileId', $file->getId()) + ->setContext('bucket', $bucket); + } + + $metadata = null; // was causing leaks as it was passed by reference + + $response + ->setStatusCode(Response::STATUS_CODE_CREATED) + ->dynamic($file, Response::MODEL_FILE); + }; + + try { + $chunksUploaded = $deviceForFiles->uploadChunk($fileTmpName, $path, $chunk, $chunks, $metadata); + + if (empty($chunksUploaded)) { + throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Failed uploading file'); + } + + $locks($lockKey, 600, fn () => $finalizeUpload($chunksUploaded), timeout: 120.0); + } catch (LockContention) { + $response->addHeader('Retry-After', '5'); + throw new Exception(Exception::GENERAL_RATE_LIMIT_EXCEEDED, 'File upload is busy. Try again.'); } - - $queueForEvents - ->setParam('bucketId', $bucket->getId()) - ->setParam('fileId', $file->getId()) - ->setContext('bucket', $bucket); - - $metadata = null; // was causing leaks as it was passed by reference - - $response - ->setStatusCode(Response::STATUS_CODE_CREATED) - ->dynamic($file, Response::MODEL_FILE); } /** diff --git a/src/Appwrite/Platform/Modules/Storage/Http/Buckets/Files/Preview/Get.php b/src/Appwrite/Platform/Modules/Storage/Http/Buckets/Files/Preview/Get.php index cb511d5231..68bc2cabae 100644 --- a/src/Appwrite/Platform/Modules/Storage/Http/Buckets/Files/Preview/Get.php +++ b/src/Appwrite/Platform/Modules/Storage/Http/Buckets/Files/Preview/Get.php @@ -131,7 +131,6 @@ class Get extends Action throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Imagick extension is missing'); } - /* @type Document $bucket */ $bucket = $authorization->skip(fn () => $dbForProject->getDocument('buckets', $bucketId)); $isAPIKey = $user->isApp($authorization->getRoles()); @@ -155,7 +154,6 @@ class Get extends Action if ($fileSecurity && !$valid && !$isToken) { $file = $dbForProject->getDocument('bucket_' . $bucket->getSequence(), $fileId); } else { - /* @type Document $file */ $file = $authorization->skip(fn () => $dbForProject->getDocument('bucket_' . $bucket->getSequence(), $fileId)); } diff --git a/tests/e2e/Services/Functions/FunctionsCustomServerTest.php b/tests/e2e/Services/Functions/FunctionsCustomServerTest.php index f08b711fb2..b1f07c3f9d 100644 --- a/tests/e2e/Services/Functions/FunctionsCustomServerTest.php +++ b/tests/e2e/Services/Functions/FunctionsCustomServerTest.php @@ -1199,6 +1199,144 @@ class FunctionsCustomServerTest extends Scope }, 120000, 500); } + public function testCreateDeploymentParallelChunksLargeFile(): void + { + $functionId = $this->setupFunction([ + 'functionId' => ID::unique(), + 'name' => 'Test Parallel Chunk Deployment', + 'execute' => [Role::user($this->getUser()['$id'])->toString()], + 'runtime' => 'node-22', + 'entrypoint' => 'index.js', + 'timeout' => 10, + ]); + + $deploymentId = ID::unique(); + $tmpDirectory = sys_get_temp_dir() . DIRECTORY_SEPARATOR . 'appwrite-parallel-function-deployment-' . $deploymentId; + + mkdir($tmpDirectory); + + try { + copy(__DIR__ . '/../../../resources/functions/basic/index.js', $tmpDirectory . DIRECTORY_SEPARATOR . 'index.js'); + file_put_contents($tmpDirectory . DIRECTORY_SEPARATOR . 'large.bin', random_bytes(20 * 1024 * 1024)); + + $source = $tmpDirectory . DIRECTORY_SEPARATOR . 'code.tar.gz'; + Console::execute('cd ' . $tmpDirectory . ' && tar --exclude code.tar.gz -czf code.tar.gz .', '', $this->stdout, $this->stderr); + + $totalSize = filesize($source); + $chunkSize = 5 * 1024 * 1024; + $chunksTotal = (int) ceil($totalSize / $chunkSize); + + $this->assertGreaterThanOrEqual(4, $chunksTotal, 'Test deployment must span at least 4 chunks'); + + $requests = []; + $sourceHandle = fopen($source, 'rb'); + $this->assertNotFalse($sourceHandle, 'Could not open deployment package'); + + try { + for ($i = 0; $i < $chunksTotal; $i++) { + $start = $i * $chunkSize; + $end = min($start + $chunkSize, $totalSize) - 1; + $length = $end - $start + 1; + $chunkPath = $tmpDirectory . DIRECTORY_SEPARATOR . 'chunk-' . $i . '.part'; + + fseek($sourceHandle, $start); + file_put_contents($chunkPath, fread($sourceHandle, $length)); + + $requests[] = [ + 'headers' => [ + 'x-appwrite-project' => $this->getProject()['$id'], + 'x-appwrite-key' => $this->getProject()['apiKey'], + 'x-appwrite-id' => $deploymentId, + 'content-range' => 'bytes ' . $start . '-' . $end . '/' . $totalSize, + ], + 'chunkPath' => $chunkPath, + ]; + } + } finally { + fclose($sourceHandle); + } + + $responses = []; + $endpoint = parse_url($this->client->getEndpoint()); + $scheme = $endpoint['scheme'] ?? 'http'; + $host = $endpoint['host'] ?? 'appwrite'; + $port = $endpoint['port'] ?? ($scheme === 'https' ? 443 : 80); + $basePath = rtrim($endpoint['path'] ?? '', '/'); + + \Swoole\Coroutine\run(function () use ($basePath, $functionId, $host, $port, $requests, $scheme, &$responses): void { + $wg = new \Swoole\Coroutine\WaitGroup(); + + foreach ($requests as $index => $request) { + $wg->add(); + \Swoole\Coroutine::create(function () use ($basePath, $functionId, $host, $index, $port, $request, &$responses, $scheme, $wg): void { + try { + for ($attempt = 0; $attempt < 3; $attempt++) { + $client = new \Swoole\Coroutine\Http\Client($host, (int) $port, $scheme === 'https'); + $client->set([ + 'timeout' => 300, + 'ssl_verify_peer' => false, + 'ssl_verify_host' => false, + ]); + $client->setHeaders($request['headers']); + $client->setMethod(Client::METHOD_POST); + $client->setData([ + 'entrypoint' => 'index.js', + 'activate' => true, + ]); + $client->addFile($request['chunkPath'], 'code', 'application/x-gzip', 'code.tar.gz'); + $client->execute($basePath . '/functions/' . $functionId . '/deployments'); + + $responses[$index] = [ + 'body' => $client->body, + 'error' => $client->errMsg, + 'headers' => $client->headers ?? [], + 'statusCode' => $client->statusCode, + ]; + + $client->close(); + + if ($responses[$index]['statusCode'] !== 429) { + break; + } + + $retryAfter = (float) ($responses[$index]['headers']['retry-after'] ?? 0.1); + \Swoole\Coroutine::sleep(max($retryAfter, 0.1)); + } + } finally { + $wg->done(); + } + }); + } + + $wg->wait(); + }); + + ksort($responses); + + foreach ($responses as $response) { + $this->assertSame('', $response['error']); + $this->assertContains($response['statusCode'], [202], (string) $response['body']); + } + + $this->assertEventually(function () use ($functionId, $deploymentId) { + $deployment = $this->getDeployment($functionId, $deploymentId); + + $this->assertEquals(200, $deployment['headers']['status-code']); + $this->assertEquals('ready', $deployment['body']['status']); + $this->assertEquals($deploymentId, $deployment['body']['$id']); + }, 120000, 500); + } finally { + $this->cleanupFunction($functionId); + + if (is_dir($tmpDirectory)) { + foreach (glob($tmpDirectory . DIRECTORY_SEPARATOR . '*') ?: [] as $file) { + unlink($file); + } + rmdir($tmpDirectory); + } + } + } + public function testUpdateDeployment(): void { $data = $this->setupTestDeployment(); diff --git a/tests/e2e/Services/Sites/SitesCustomServerTest.php b/tests/e2e/Services/Sites/SitesCustomServerTest.php index a32b990b9e..9cca689780 100644 --- a/tests/e2e/Services/Sites/SitesCustomServerTest.php +++ b/tests/e2e/Services/Sites/SitesCustomServerTest.php @@ -1351,6 +1351,145 @@ class SitesCustomServerTest extends Scope $this->cleanupSite($siteId); } + public function testCreateDeploymentParallelChunksLargeFile(): void + { + $siteId = $this->setupSite([ + 'buildRuntime' => 'node-22', + 'fallbackFile' => '', + 'framework' => 'other', + 'name' => 'Test Site Parallel Chunk Deployment', + 'outputDirectory' => './', + 'providerBranch' => 'main', + 'providerRootDirectory' => './', + 'siteId' => ID::unique() + ]); + + $deploymentId = ID::unique(); + $tmpDirectory = sys_get_temp_dir() . DIRECTORY_SEPARATOR . 'appwrite-parallel-site-deployment-' . $deploymentId; + + mkdir($tmpDirectory); + + try { + file_put_contents($tmpDirectory . DIRECTORY_SEPARATOR . 'index.html', 'Hello World'); + file_put_contents($tmpDirectory . DIRECTORY_SEPARATOR . 'large.bin', random_bytes(20 * 1024 * 1024)); + + $source = $tmpDirectory . DIRECTORY_SEPARATOR . 'code.tar.gz'; + Console::execute('cd ' . $tmpDirectory . ' && tar --exclude code.tar.gz -czf code.tar.gz .', '', $this->stdout, $this->stderr); + + $totalSize = filesize($source); + $chunkSize = 5 * 1024 * 1024; + $chunksTotal = (int) ceil($totalSize / $chunkSize); + + $this->assertGreaterThanOrEqual(4, $chunksTotal, 'Test deployment must span at least 4 chunks'); + + $requests = []; + $sourceHandle = fopen($source, 'rb'); + $this->assertNotFalse($sourceHandle, 'Could not open deployment package'); + + try { + for ($i = 0; $i < $chunksTotal; $i++) { + $start = $i * $chunkSize; + $end = min($start + $chunkSize, $totalSize) - 1; + $length = $end - $start + 1; + $chunkPath = $tmpDirectory . DIRECTORY_SEPARATOR . 'chunk-' . $i . '.part'; + + fseek($sourceHandle, $start); + file_put_contents($chunkPath, fread($sourceHandle, $length)); + + $requests[] = [ + 'headers' => [ + 'x-appwrite-project' => $this->getProject()['$id'], + 'x-appwrite-key' => $this->getProject()['apiKey'], + 'x-appwrite-id' => $deploymentId, + 'content-range' => 'bytes ' . $start . '-' . $end . '/' . $totalSize, + ], + 'chunkPath' => $chunkPath, + ]; + } + } finally { + fclose($sourceHandle); + } + + $responses = []; + $endpoint = parse_url($this->client->getEndpoint()); + $scheme = $endpoint['scheme'] ?? 'http'; + $host = $endpoint['host'] ?? 'appwrite'; + $port = $endpoint['port'] ?? ($scheme === 'https' ? 443 : 80); + $basePath = rtrim($endpoint['path'] ?? '', '/'); + + \Swoole\Coroutine\run(function () use ($basePath, $host, $port, $requests, $scheme, $siteId, &$responses): void { + $wg = new \Swoole\Coroutine\WaitGroup(); + + foreach ($requests as $index => $request) { + $wg->add(); + \Swoole\Coroutine::create(function () use ($basePath, $host, $index, $port, $request, &$responses, $scheme, $siteId, $wg): void { + try { + for ($attempt = 0; $attempt < 3; $attempt++) { + $client = new \Swoole\Coroutine\Http\Client($host, (int) $port, $scheme === 'https'); + $client->set([ + 'timeout' => 300, + 'ssl_verify_peer' => false, + 'ssl_verify_host' => false, + ]); + $client->setHeaders($request['headers']); + $client->setMethod(Client::METHOD_POST); + $client->setData([ + 'activate' => true, + ]); + $client->addFile($request['chunkPath'], 'code', 'application/x-gzip', 'code.tar.gz'); + $client->execute($basePath . '/sites/' . $siteId . '/deployments'); + + $responses[$index] = [ + 'body' => $client->body, + 'error' => $client->errMsg, + 'headers' => $client->headers ?? [], + 'statusCode' => $client->statusCode, + ]; + + $client->close(); + + if ($responses[$index]['statusCode'] !== 429) { + break; + } + + $retryAfter = (float) ($responses[$index]['headers']['retry-after'] ?? 0.1); + \Swoole\Coroutine::sleep(max($retryAfter, 0.1)); + } + } finally { + $wg->done(); + } + }); + } + + $wg->wait(); + }); + + ksort($responses); + + foreach ($responses as $response) { + $this->assertSame('', $response['error']); + $this->assertContains($response['statusCode'], [202], (string) $response['body']); + } + + $this->assertEventually(function () use ($siteId, $deploymentId) { + $deployment = $this->getDeployment($siteId, $deploymentId); + + $this->assertEquals(200, $deployment['headers']['status-code']); + $this->assertEquals('ready', $deployment['body']['status']); + $this->assertEquals($deploymentId, $deployment['body']['$id']); + }, 120000, 500); + } finally { + $this->cleanupSite($siteId); + + if (is_dir($tmpDirectory)) { + foreach (glob($tmpDirectory . DIRECTORY_SEPARATOR . '*') ?: [] as $file) { + unlink($file); + } + rmdir($tmpDirectory); + } + } + } + public function testCreateDeployment() { $siteId = $this->setupSite([ diff --git a/tests/e2e/Services/Storage/StorageBase.php b/tests/e2e/Services/Storage/StorageBase.php index 5e09031a9c..375e526fcf 100644 --- a/tests/e2e/Services/Storage/StorageBase.php +++ b/tests/e2e/Services/Storage/StorageBase.php @@ -391,7 +391,7 @@ trait StorageBase 'bucketId' => ID::unique(), 'name' => 'Test Bucket 2', 'fileSecurity' => true, - 'maximumFileSize' => 6000000000, //6GB + 'maximumFileSize' => 6000000001, 'allowedFileExtensions' => ["jpg", "png"], 'permissions' => [ Permission::read(Role::any()), @@ -1436,6 +1436,184 @@ trait StorageBase ]); } + public function testCreateBucketFileParallelChunksLargeFile(): void + { + $totalSize = 20 * 1024 * 1024; + $chunkSize = 5 * 1024 * 1024; + $chunksTotal = (int) ceil($totalSize / $chunkSize); + + $this->assertGreaterThanOrEqual(4, $chunksTotal, 'Test file must span at least 4 chunks'); + + $bucket = $this->client->call(Client::METHOD_POST, '/storage/buckets', [ + 'content-type' => 'application/json', + 'x-appwrite-project' => $this->getProject()['$id'], + 'x-appwrite-key' => $this->getProject()['apiKey'], + ], [ + 'bucketId' => ID::unique(), + 'name' => 'Test Bucket Parallel Chunk Upload', + 'fileSecurity' => true, + 'maximumFileSize' => $totalSize, + 'permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + Permission::delete(Role::any()), + ], + ]); + + $this->assertEquals(201, $bucket['headers']['status-code']); + + $bucketId = $bucket['body']['$id']; + $fileId = ID::unique(); + $tmpDirectory = sys_get_temp_dir() . DIRECTORY_SEPARATOR . 'appwrite-parallel-upload-' . $fileId; + $source = $tmpDirectory . DIRECTORY_SEPARATOR . 'large-parallel-upload.bin'; + + mkdir($tmpDirectory); + + try { + $handle = fopen($source, 'wb'); + $this->assertNotFalse($handle, 'Could not create test file'); + + $remaining = $totalSize; + $block = str_repeat(hash('sha256', $fileId, binary: true), 1024); + while ($remaining > 0) { + $bytes = substr($block, 0, min(strlen($block), $remaining)); + fwrite($handle, $bytes); + $remaining -= strlen($bytes); + } + fclose($handle); + + $requests = []; + + $sourceHandle = fopen($source, 'rb'); + $this->assertNotFalse($sourceHandle, 'Could not open test file'); + + for ($i = 0; $i < $chunksTotal; $i++) { + $start = $i * $chunkSize; + $end = min($start + $chunkSize, $totalSize) - 1; + $length = $end - $start + 1; + $chunkPath = $tmpDirectory . DIRECTORY_SEPARATOR . 'chunk-' . $i . '.part'; + + fseek($sourceHandle, $start); + file_put_contents($chunkPath, fread($sourceHandle, $length)); + + $requests[] = [ + 'headers' => [ + 'x-appwrite-project' => $this->getProject()['$id'], + 'x-appwrite-key' => $this->getProject()['apiKey'], + 'content-range' => 'bytes ' . $start . '-' . $end . '/' . $totalSize, + ], + 'chunkPath' => $chunkPath, + ]; + } + fclose($sourceHandle); + + $responses = []; + $endpoint = parse_url($this->client->getEndpoint()); + $scheme = $endpoint['scheme'] ?? 'http'; + $host = $endpoint['host'] ?? 'appwrite'; + $port = $endpoint['port'] ?? ($scheme === 'https' ? 443 : 80); + $basePath = rtrim($endpoint['path'] ?? '', '/'); + + \Swoole\Coroutine\run(function () use ($basePath, $bucketId, $fileId, $host, $port, $requests, $scheme, &$responses): void { + $wg = new \Swoole\Coroutine\WaitGroup(); + + foreach ($requests as $index => $request) { + $wg->add(); + \Swoole\Coroutine::create(function () use ($basePath, $bucketId, $fileId, $host, $index, $port, $request, &$responses, $scheme, $wg): void { + try { + for ($attempt = 0; $attempt < 3; $attempt++) { + $client = new \Swoole\Coroutine\Http\Client($host, (int) $port, $scheme === 'https'); + $client->set([ + 'timeout' => 300, + 'ssl_verify_peer' => false, + 'ssl_verify_host' => false, + ]); + $client->setHeaders($request['headers']); + $client->setMethod(Client::METHOD_POST); + $client->setData([ + 'fileId' => $fileId, + 'permissions[0]' => Permission::read(Role::any()), + 'permissions[1]' => Permission::delete(Role::any()), + ]); + $client->addFile($request['chunkPath'], 'file', 'application/octet-stream', 'large-parallel-upload.bin'); + $client->execute($basePath . '/storage/buckets/' . $bucketId . '/files'); + + $responses[$index] = [ + 'body' => $client->body, + 'error' => $client->errMsg, + 'headers' => $client->headers ?? [], + 'statusCode' => $client->statusCode, + ]; + + $client->close(); + + if ($responses[$index]['statusCode'] !== 429) { + break; + } + + $retryAfter = (float) ($responses[$index]['headers']['retry-after'] ?? 0.1); + \Swoole\Coroutine::sleep(max($retryAfter, 0.1)); + } + } finally { + $wg->done(); + } + }); + } + + $wg->wait(); + }); + + ksort($responses); + + foreach ($responses as $response) { + $this->assertSame('', $response['error']); + $this->assertContains($response['statusCode'], [200, 201], (string) $response['body']); + } + + $uploadedFile = $this->client->call(Client::METHOD_GET, '/storage/buckets/' . $bucketId . '/files/' . $fileId, array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $this->getProject()['$id'], + 'x-appwrite-key' => $this->getProject()['apiKey'], + ])); + + $this->assertEquals(200, $uploadedFile['headers']['status-code']); + $this->assertEquals($chunksTotal, $uploadedFile['body']['chunksTotal']); + $this->assertEquals($chunksTotal, $uploadedFile['body']['chunksUploaded']); + + $download = $this->client->call(Client::METHOD_GET, '/storage/buckets/' . $bucketId . '/files/' . $fileId . '/download', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $this->getProject()['$id'], + 'x-appwrite-key' => $this->getProject()['apiKey'], + ])); + + $this->assertEquals(200, $download['headers']['status-code']); + $this->assertEquals($totalSize, strlen($download['body'])); + $this->assertEquals(hash_file('sha256', $source), hash('sha256', $download['body'])); + } finally { + if (isset($bucketId)) { + $this->client->call(Client::METHOD_DELETE, '/storage/buckets/' . $bucketId . '/files/' . $fileId, array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $this->getProject()['$id'], + 'x-appwrite-key' => $this->getProject()['apiKey'], + ])); + + $this->client->call(Client::METHOD_DELETE, '/storage/buckets/' . $bucketId, [ + 'content-type' => 'application/json', + 'x-appwrite-project' => $this->getProject()['$id'], + 'x-appwrite-key' => $this->getProject()['apiKey'], + ]); + } + + foreach (glob($tmpDirectory . DIRECTORY_SEPARATOR . '*') ?: [] as $file) { + unlink($file); + } + + if (is_dir($tmpDirectory)) { + rmdir($tmpDirectory); + } + } + } + public function testDeleteBucketFile(): void { // Create a fresh file just for deletion testing (not using cache since we delete it)