From d28643bd2bdc0f9dfaed5985f1c7ba83b37540d5 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Fri, 5 Sep 2025 01:02:30 +1200 Subject: [PATCH] Update DB --- composer.json | 2 +- composer.lock | 68 +-- .../Collections/Documents/Bulk/Upsert.php | 2 +- .../Collections/Documents/Upsert.php | 2 +- .../Databases/Http/Transactions/Update.php | 539 +++++++++++++----- .../Platform/Workers/StatsResources.php | 2 +- src/Appwrite/Platform/Workers/StatsUsage.php | 4 +- .../Transactions/TransactionsTest.php | 4 +- 8 files changed, 433 insertions(+), 190 deletions(-) diff --git a/composer.json b/composer.json index 0c662c775f..7d9176d2aa 100644 --- a/composer.json +++ b/composer.json @@ -52,7 +52,7 @@ "utopia-php/cache": "0.13.*", "utopia-php/cli": "0.15.*", "utopia-php/config": "0.2.*", - "utopia-php/database": "1.*", + "utopia-php/database": "2.*", "utopia-php/detector": "0.1.*", "utopia-php/domains": "0.8.*", "utopia-php/dns": "0.3.*", diff --git a/composer.lock b/composer.lock index ed66af8572..bf76f13a3e 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "0da713ee5642eba1d30bc51c1a04a723", + "content-hash": "3565fcc2471b5d18a159b6da1c8fad31", "packages": [ { "name": "adhocore/jwt", @@ -3296,16 +3296,16 @@ }, { "name": "utopia-php/abuse", - "version": "1.0.0", + "version": "1.0.1", "source": { "type": "git", "url": "https://github.com/utopia-php/abuse.git", - "reference": "c5e2232033b507a07f72180dc56d37e1872ee7be" + "reference": "cd591568791556d246d901d6aaf9935ab02c3f9a" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/abuse/zipball/c5e2232033b507a07f72180dc56d37e1872ee7be", - "reference": "c5e2232033b507a07f72180dc56d37e1872ee7be", + "url": "https://api.github.com/repos/utopia-php/abuse/zipball/cd591568791556d246d901d6aaf9935ab02c3f9a", + "reference": "cd591568791556d246d901d6aaf9935ab02c3f9a", "shasum": "" }, "require": { @@ -3313,7 +3313,7 @@ "ext-pdo": "*", "ext-redis": "*", "php": ">=8.0", - "utopia-php/database": "1.*" + "utopia-php/database": "2.*" }, "require-dev": { "laravel/pint": "1.*", @@ -3341,9 +3341,9 @@ ], "support": { "issues": "https://github.com/utopia-php/abuse/issues", - "source": "https://github.com/utopia-php/abuse/tree/1.0.0" + "source": "https://github.com/utopia-php/abuse/tree/1.0.1" }, - "time": "2025-08-13T09:12:54+00:00" + "time": "2025-09-04T12:46:54+00:00" }, { "name": "utopia-php/analytics", @@ -3393,21 +3393,21 @@ }, { "name": "utopia-php/audit", - "version": "1.0.0", + "version": "1.0.1", "source": { "type": "git", "url": "https://github.com/utopia-php/audit.git", - "reference": "c0ed75f4d068f1f6c2e7149a909490d4214e72bb" + "reference": "5ef26d6a2ab2db7bb86288a1a6ef970307b46f22" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/audit/zipball/c0ed75f4d068f1f6c2e7149a909490d4214e72bb", - "reference": "c0ed75f4d068f1f6c2e7149a909490d4214e72bb", + "url": "https://api.github.com/repos/utopia-php/audit/zipball/5ef26d6a2ab2db7bb86288a1a6ef970307b46f22", + "reference": "5ef26d6a2ab2db7bb86288a1a6ef970307b46f22", "shasum": "" }, "require": { "php": ">=8.0", - "utopia-php/database": "1.*" + "utopia-php/database": "2.*" }, "require-dev": { "laravel/pint": "1.*", @@ -3434,9 +3434,9 @@ ], "support": { "issues": "https://github.com/utopia-php/audit/issues", - "source": "https://github.com/utopia-php/audit/tree/1.0.0" + "source": "https://github.com/utopia-php/audit/tree/1.0.1" }, - "time": "2025-08-13T09:09:00+00:00" + "time": "2025-09-04T12:46:43+00:00" }, { "name": "utopia-php/cache", @@ -3638,16 +3638,16 @@ }, { "name": "utopia-php/database", - "version": "1.3.1", + "version": "2.0.0", "source": { "type": "git", "url": "https://github.com/utopia-php/database.git", - "reference": "fcd166b715a14cfea11f7a9c47d4c0076bedcecd" + "reference": "e4a03ba543abc4e436ec1b450750a14bd36011d5" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/database/zipball/fcd166b715a14cfea11f7a9c47d4c0076bedcecd", - "reference": "fcd166b715a14cfea11f7a9c47d4c0076bedcecd", + "url": "https://api.github.com/repos/utopia-php/database/zipball/e4a03ba543abc4e436ec1b450750a14bd36011d5", + "reference": "e4a03ba543abc4e436ec1b450750a14bd36011d5", "shasum": "" }, "require": { @@ -3688,9 +3688,9 @@ ], "support": { "issues": "https://github.com/utopia-php/database/issues", - "source": "https://github.com/utopia-php/database/tree/1.3.1" + "source": "https://github.com/utopia-php/database/tree/2.0.0" }, - "time": "2025-09-03T15:50:41+00:00" + "time": "2025-09-04T12:36:53+00:00" }, { "name": "utopia-php/detector", @@ -3942,16 +3942,16 @@ }, { "name": "utopia-php/framework", - "version": "0.33.23", + "version": "0.33.24", "source": { "type": "git", "url": "https://github.com/utopia-php/http.git", - "reference": "88e8002365c10a727014ecc56322bcd1d780ceed" + "reference": "5112b1023342163e3fbedec99f38fc32c8700aa0" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/http/zipball/88e8002365c10a727014ecc56322bcd1d780ceed", - "reference": "88e8002365c10a727014ecc56322bcd1d780ceed", + "url": "https://api.github.com/repos/utopia-php/http/zipball/5112b1023342163e3fbedec99f38fc32c8700aa0", + "reference": "5112b1023342163e3fbedec99f38fc32c8700aa0", "shasum": "" }, "require": { @@ -3983,9 +3983,9 @@ ], "support": { "issues": "https://github.com/utopia-php/http/issues", - "source": "https://github.com/utopia-php/http/tree/0.33.23" + "source": "https://github.com/utopia-php/http/tree/0.33.24" }, - "time": "2025-09-03T11:58:14+00:00" + "time": "2025-09-04T04:18:39+00:00" }, { "name": "utopia-php/image", @@ -4190,16 +4190,16 @@ }, { "name": "utopia-php/migration", - "version": "1.0.1", + "version": "1.0.2", "source": { "type": "git", "url": "https://github.com/utopia-php/migration.git", - "reference": "38171023efd3abe650d2abc5ac65f5df52311da6" + "reference": "eb60a61934be1d6f2f4fdabd9903a841ba078bc9" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/migration/zipball/38171023efd3abe650d2abc5ac65f5df52311da6", - "reference": "38171023efd3abe650d2abc5ac65f5df52311da6", + "url": "https://api.github.com/repos/utopia-php/migration/zipball/eb60a61934be1d6f2f4fdabd9903a841ba078bc9", + "reference": "eb60a61934be1d6f2f4fdabd9903a841ba078bc9", "shasum": "" }, "require": { @@ -4207,7 +4207,7 @@ "ext-curl": "*", "ext-openssl": "*", "php": ">=8.1", - "utopia-php/database": "1.*", + "utopia-php/database": "2.*", "utopia-php/dsn": "0.2.*", "utopia-php/framework": "0.33.*", "utopia-php/storage": "0.18.*" @@ -4240,9 +4240,9 @@ ], "support": { "issues": "https://github.com/utopia-php/migration/issues", - "source": "https://github.com/utopia-php/migration/tree/1.0.1" + "source": "https://github.com/utopia-php/migration/tree/1.0.2" }, - "time": "2025-08-28T13:41:25+00:00" + "time": "2025-09-04T12:47:05+00:00" }, { "name": "utopia-php/orchestration", diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Upsert.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Upsert.php index 2debdfcf6d..618d640d95 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Upsert.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Upsert.php @@ -161,7 +161,7 @@ class Upsert extends Action try { $modified = $dbForProject->withPreserveDates(function () use ($dbForProject, $database, $collection, $documents, $plan, &$upserted) { - return $dbForProject->createOrUpdateDocuments( + return $dbForProject->upsertDocuments( 'database_' . $database->getSequence() . '_collection_' . $collection->getSequence(), $documents, onNext: function (Document $document) use ($plan, &$upserted) { diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Upsert.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Upsert.php index c117691348..7a6c7e960b 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Upsert.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Upsert.php @@ -299,7 +299,7 @@ class Upsert extends Action $upserted = []; try { $dbForProject->withPreserveDates(function () use (&$upserted, $dbForProject, $database, $collection, $newDocument) { - return $dbForProject->createOrUpdateDocuments( + return $dbForProject->upsertDocuments( 'database_' . $database->getSequence() . '_collection_' . $collection->getSequence(), [$newDocument], onNext: function (Document $document) use (&$upserted) { diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Transactions/Update.php b/src/Appwrite/Platform/Modules/Databases/Http/Transactions/Update.php index b8b77d79ef..ed277e7b2f 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Transactions/Update.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Transactions/Update.php @@ -12,9 +12,11 @@ use Appwrite\SDK\Response as SDKResponse; use Appwrite\Utopia\Response as UtopiaResponse; use Utopia\Database\Database; use Utopia\Database\Document; +use Utopia\Database\Exception\Authorization; use Utopia\Database\Exception\Conflict as ConflictException; use Utopia\Database\Exception\Duplicate as DuplicateException; use Utopia\Database\Exception\NotFound as NotFoundException; +use Utopia\Database\Exception\Structure; use Utopia\Database\Exception\Transaction as TransactionException; use Utopia\Database\Query; use Utopia\Database\Validator\UID; @@ -65,6 +67,23 @@ class Update extends Action ->callback($this->action(...)); } + /** + * @param string $transactionId + * @param bool $commit + * @param bool $rollback + * @param UtopiaResponse $response + * @param Database $dbForProject + * @param Delete $queueForDeletes + * @return void + * @throws ConflictException + * @throws Exception + * @throws \DateMalformedStringException + * @throws \Throwable + * @throws \Utopia\Database\Exception + * @throws Authorization + * @throws Structure + * @throws \Utopia\Exception + */ public function action(string $transactionId, bool $commit, bool $rollback, UtopiaResponse $response, Database $dbForProject, Delete $queueForDeletes): void { if (!$commit && !$rollback) { @@ -113,149 +132,38 @@ class Update extends Action $action = $operation['action']; $data = $operation['data']; - // Check if this operation depends on documents created in same transaction - $dependent = \in_array($action, ['update', 'increment', 'decrement']) - && isset($state[$collectionId][$documentId]); - - if ($dependent) { - // Don't use timestamp wrapper for dependent operations - switch ($action) { - case 'update': - // Update the state document directly - $existing = $state[$collectionId][$documentId]; - foreach ($data as $key => $value) { - $existing->setAttribute($key, $value); - } - $state[$collectionId][$documentId] = $dbForProject->updateDocument( - $collectionId, - $documentId, - $existing - ); - break; - - case 'increment': - $state[$collectionId][$documentId] = $dbForProject->increaseDocumentAttribute( - collection: $collectionId, - id: $documentId, - attribute: $data['attribute'], - value: $data['value'] ?? 1, - max: $data['max'] ?? null - ); - break; - - case 'decrement': - $state[$collectionId][$documentId] = $dbForProject->decreaseDocumentAttribute( - collection: $collectionId, - id: $documentId, - attribute: $data['attribute'], - value: $data['value'] ?? 1, - min: $data['min'] ?? null - ); - break; - } - } else { - // Use timestamp wrapper for independent operations - $dbForProject->withRequestTimestamp($createdAt, function () use ($dbForProject, $queueForDeletes, $action, $collectionId, $documentId, $data, &$state) { - switch ($action) { - case 'create': - if ($documentId && !isset($data['$id'])) { - $data['$id'] = $documentId; - } - $state[$collectionId][$documentId] = $dbForProject->createDocument( - $collectionId, - new Document($data), - ); - break; - - case 'update': - $document = $dbForProject->updateDocument( - $collectionId, - $documentId, - new Document($data), - ); - - if ($document->isEmpty()) { - throw new ConflictException(''); - } - - $state[$collectionId][$documentId] = $document; - break; - - case 'upsert': - $document = new Document($data); - $dbForProject->createOrUpdateDocuments( - $collectionId, - [$document], - onNext: function (Document $document) use (&$state, $collectionId) { - $state[$collectionId][$document->getId()] = $document; - } - ); - break; - - case 'delete': - $dbForProject->deleteDocument($collectionId, $documentId); - - if (isset($state[$collectionId][$documentId])) { - unset($state[$collectionId][$documentId]); - } - break; - - case 'increment': - $dbForProject->increaseDocumentAttribute( - collection: $collectionId, - id: $documentId, - attribute: $data['attribute'], - value: $data['value'] ?? 1, - max: $data['max'] ?? null - ); - break; - - case 'decrement': - $dbForProject->decreaseDocumentAttribute( - collection: $collectionId, - id: $documentId, - attribute: $data['attribute'], - value: $data['value'] ?? 1, - min: $data['min'] ?? null - ); - break; - - case 'bulkCreate': - $dbForProject->createDocuments( - $collectionId, - $data, - onNext: function (Document $document) use (&$state, $collectionId) { - $state[$collectionId][$document->getId()] = $document; - } - ); - break; - - case 'bulkUpdate': - $dbForProject->updateDocuments( - $collectionId, - new Document($data['data']), - Query::parseQueries($data['queries'] ?? []) - ); - break; - - case 'bulkUpsert': - $dbForProject->createOrUpdateDocuments( - $collectionId, - $data, - onNext: function (Document $document) use (&$state, $collectionId) { - $state[$collectionId][$document->getId()] = $document; - } - ); - break; - - case 'bulkDelete': - $dbForProject->deleteDocuments( - $collectionId, - Query::parseQueries($data['queries'] ?? []) - ); - break; - } - }); + // Execute the operation based on its type + switch ($action) { + case 'create': + $this->handleCreateOperation($dbForProject, $collectionId, $documentId, $data, $createdAt, $state); + break; + case 'update': + $this->handleUpdateOperation($dbForProject, $collectionId, $documentId, $data, $createdAt, $state); + break; + case 'upsert': + $this->handleUpsertOperation($dbForProject, $collectionId, $documentId, $data, $createdAt, $state); + break; + case 'delete': + $this->handleDeleteOperation($dbForProject, $collectionId, $documentId, $createdAt, $state); + break; + case 'increment': + $this->handleIncrementOperation($dbForProject, $collectionId, $documentId, $data, $createdAt, $state); + break; + case 'decrement': + $this->handleDecrementOperation($dbForProject, $collectionId, $documentId, $data, $createdAt, $state); + break; + case 'bulkCreate': + $this->handleBulkCreateOperation($dbForProject, $collectionId, $data, $createdAt, $state); + break; + case 'bulkUpdate': + $this->handleBulkUpdateOperation($dbForProject, $collectionId, $data, $createdAt, $state); + break; + case 'bulkUpsert': + $this->handleBulkUpsertOperation($dbForProject, $collectionId, $data, $createdAt, $state); + break; + case 'bulkDelete': + $this->handleBulkDeleteOperation($dbForProject, $collectionId, $data, $createdAt, $state); + break; } } @@ -269,7 +177,6 @@ class Update extends Action $queueForDeletes ->setType(DELETE_TYPE_DOCUMENT) ->setDocument($transaction); - } catch (NotFoundException $e) { $dbForProject->updateDocument('transactions', $transactionId, new Document([ 'status' => 'failed', @@ -290,17 +197,351 @@ class Update extends Action } if ($rollback) { - $queueForDeletes - ->setType(DELETE_TYPE_DOCUMENT) - ->setDocument($transaction); - $transaction = $dbForProject->updateDocument('transactions', $transactionId, new Document([ 'status' => 'rolledBack', ])); + + $queueForDeletes + ->setType(DELETE_TYPE_DOCUMENT) + ->setDocument($transaction); } $response ->setStatusCode(SwooleResponse::STATUS_CODE_OK) ->dynamic($transaction, UtopiaResponse::MODEL_TRANSACTION); } -} + + /** + * Handle create operation + * @throws \Utopia\Database\Exception + */ + private function handleCreateOperation( + Database $dbForProject, + string $collectionId, + ?string $documentId, + array $data, + \DateTime $createdAt, + array &$state + ): void + { + if ($documentId && !isset($data['$id'])) { + $data['$id'] = $documentId; + } + $dbForProject->withRequestTimestamp($createdAt, function () use ($dbForProject, $collectionId, $data, &$state) { + $state[$collectionId][$data['$id']] = $dbForProject->createDocument( + $collectionId, + new Document($data), + ); + }); + } + + /** + * Handle update operation + * @throws ConflictException + * @throws \Utopia\Database\Exception + */ + private function handleUpdateOperation( + Database $dbForProject, + string $collectionId, + string $documentId, + array $data, + \DateTime $createdAt, + array &$state + ): void + { + $dependent = isset($state[$collectionId][$documentId]); + + if ($dependent) { + // Update the state document directly without timestamp wrapper + $state[$collectionId][$documentId] = $dbForProject->updateDocument( + $collectionId, + $documentId, + new Document($data), + ); + return; + } + + // Use timestamp wrapper for independent operations + $dbForProject->withRequestTimestamp($createdAt, function () use ($dbForProject, $collectionId, $documentId, $data, &$state) { + $document = $dbForProject->updateDocument( + $collectionId, + $documentId, + new Document($data), + ); + if ($document->isEmpty()) { + throw new ConflictException(''); + } + $state[$collectionId][$documentId] = $document; + }); + } + + /** + * Handle upsert operation + * @throws \Utopia\Database\Exception + */ + private function handleUpsertOperation( + Database $dbForProject, + string $collectionId, + ?string $documentId, + array $data, + \DateTime $createdAt, + array &$state + ): void + { + $dependent = isset($state[$collectionId][$documentId]); + + if ($dependent) { + // Upsert the state document directly without timestamp wrapper + $state[$collectionId][$documentId] = $dbForProject->upsertDocument( + $collectionId, + new Document($data), + ); + return; + } + + // Use timestamp wrapper for independent operations + $dbForProject->withRequestTimestamp($createdAt, function () use ($dbForProject, $collectionId, $documentId, $data, &$state) { + $state[$collectionId][$documentId] = $dbForProject->upsertDocument( + $collectionId, + new Document($data), + ); + }); + } + + /** + * Handle delete operation + */ + private function handleDeleteOperation( + Database $dbForProject, + string $collectionId, + string $documentId, + \DateTime $createdAt, + array &$state + ): void + { + $dependent = isset($state[$collectionId][$documentId]); + + if ($dependent) { + // Delete without timestamp wrapper + $dbForProject->deleteDocument($collectionId, $documentId); + unset($state[$collectionId][$documentId]); + return; + } + + // Use timestamp wrapper for independent operations + $dbForProject->withRequestTimestamp($createdAt, function () use ($dbForProject, $collectionId, $documentId, &$state) { + $dbForProject->deleteDocument($collectionId, $documentId); + if (isset($state[$collectionId][$documentId])) { + unset($state[$collectionId][$documentId]); + } + }); + } + + /** + * Handle increment operation + */ + private function handleIncrementOperation( + Database $dbForProject, + string $collectionId, + string $documentId, + array $data, + \DateTime $createdAt, + array &$state + ): void + { + $dependent = isset($state[$collectionId][$documentId]); + + if ($dependent) { + // Increment without timestamp wrapper + $state[$collectionId][$documentId] = $dbForProject->increaseDocumentAttribute( + collection: $collectionId, + id: $documentId, + attribute: $data['attribute'], + value: $data['value'] ?? 1, + max: $data['max'] ?? null + ); + return; + } + + // Use timestamp wrapper for independent operations + $dbForProject->withRequestTimestamp($createdAt, function () use ($dbForProject, $collectionId, $documentId, $data) { + $dbForProject->increaseDocumentAttribute( + collection: $collectionId, + id: $documentId, + attribute: $data['attribute'], + value: $data['value'] ?? 1, + max: $data['max'] ?? null + ); + }); + } + + /** + * Handle decrement operation + */ + private function handleDecrementOperation( + Database $dbForProject, + string $collectionId, + string $documentId, + array $data, + \DateTime $createdAt, + array &$state + ): void + { + $dependent = isset($state[$collectionId][$documentId]); + + if ($dependent) { + // Decrement without timestamp wrapper + $state[$collectionId][$documentId] = $dbForProject->decreaseDocumentAttribute( + collection: $collectionId, + id: $documentId, + attribute: $data['attribute'], + value: $data['value'] ?? 1, + min: $data['min'] ?? null + ); + return; + } + + // Use timestamp wrapper for independent operations + $dbForProject->withRequestTimestamp($createdAt, function () use ($dbForProject, $collectionId, $documentId, $data) { + $dbForProject->decreaseDocumentAttribute( + collection: $collectionId, + id: $documentId, + attribute: $data['attribute'], + value: $data['value'] ?? 1, + min: $data['min'] ?? null + ); + }); + } + + /** + * Handle bulk create operation + */ + private function handleBulkCreateOperation( + Database $dbForProject, + string $collectionId, + array $data, + \DateTime $createdAt, + array &$state + ): void + { + $dbForProject->withRequestTimestamp($createdAt, function () use ($dbForProject, $collectionId, $data, &$state) { + $dbForProject->createDocuments( + $collectionId, + $data, + onNext: function (Document $document) use (&$state, $collectionId) { + $state[$collectionId][$document->getId()] = $document; + } + ); + }); + } + + /** + * Handle bulk update operation with manual timestamp checking + * @throws \Utopia\Database\Exception + * @throws \Utopia\Database\Exception\Query + * @throws ConflictException + */ + private function handleBulkUpdateOperation( + Database $dbForProject, + string $collectionId, + array $data, + \DateTime $createdAt, + array &$state + ): void + { + $queries = Query::parseQueries($data['queries'] ?? []); + + $dbForProject->updateDocuments( + $collectionId, + new Document($data['data']), + $queries, + onNext: function (Document $updated, Document $old) use (&$state, $collectionId, $createdAt) { + // Check if this document was created/modified in this transaction + $dependent = isset($state[$collectionId][$updated->getId()]); + + // If not in transaction state, check for timestamp conflicts + if (!$dependent) { + $oldUpdatedAt = new \DateTime($old->getUpdatedAt()); + if ($oldUpdatedAt > $createdAt) { + throw new ConflictException('Document was updated after the request timestamp'); + } + } + + $state[$collectionId][$updated->getId()] = $updated; + } + ); + } + + /** + * Handle bulk upsert operation with manual timestamp checking + * @throws ConflictException + */ + private function handleBulkUpsertOperation( + Database $dbForProject, + string $collectionId, + array $data, + \DateTime $createdAt, + array &$state + ): void + { + // Run bulk upsert without timestamp wrapper, checking manually in callback + $dbForProject->upsertDocuments( + $collectionId, + $data, + onNext: function (Document $upserted, ?Document $old) use (&$state, $collectionId, $createdAt) { + if ($old !== null) { + // This is an update - check if document was created/modified in this transaction + $dependent = isset($state[$collectionId][$upserted->getId()]); + + // If not in transaction state, check for timestamp conflicts + if (!$dependent) { + $oldUpdatedAt = new \DateTime($old->getUpdatedAt()); + if ($oldUpdatedAt > $createdAt) { + throw new ConflictException('Document was updated after the request timestamp'); + } + } + } + + // If $old is null, this is a create operation - no timestamp check needed + $state[$collectionId][$upserted->getId()] = $upserted; + } + ); + } + + /** + * Handle bulk delete operation with manual timestamp checking + * @throws \Utopia\Database\Exception\Query + * @throws ConflictException + */ + private function handleBulkDeleteOperation( + Database $dbForProject, + string $collectionId, + array $data, + \DateTime $createdAt, + array &$state + ): void + { + $queries = Query::parseQueries($data['queries'] ?? []); + + $dbForProject->deleteDocuments( + $collectionId, + $queries, + onNext: function (Document $deleted, Document $old) use (&$state, $collectionId, $createdAt) { + $dependent = isset($state[$collectionId][$deleted->getId()]); + + // If not in transaction state, check for timestamp conflicts + if (!$dependent) { + $oldUpdatedAt = new \DateTime($old->getUpdatedAt()); + if ($oldUpdatedAt > $createdAt) { + throw new ConflictException('Document was updated after the transaction operation'); + } + } + + // Remove from state after successful deletion + if (isset($state[$collectionId][$deleted->getId()])) { + unset($state[$collectionId][$deleted->getId()]); + } + } + ); + } +} \ No newline at end of file diff --git a/src/Appwrite/Platform/Workers/StatsResources.php b/src/Appwrite/Platform/Workers/StatsResources.php index 98c9d01a87..6f334437b0 100644 --- a/src/Appwrite/Platform/Workers/StatsResources.php +++ b/src/Appwrite/Platform/Workers/StatsResources.php @@ -432,7 +432,7 @@ class StatsResources extends Action protected function writeDocuments(Database $dbForLogs, Document $project): void { - $dbForLogs->createOrUpdateDocuments( + $dbForLogs->upsertDocuments( 'stats', $this->documents ); diff --git a/src/Appwrite/Platform/Workers/StatsUsage.php b/src/Appwrite/Platform/Workers/StatsUsage.php index 3610381d5a..3a615072df 100644 --- a/src/Appwrite/Platform/Workers/StatsUsage.php +++ b/src/Appwrite/Platform/Workers/StatsUsage.php @@ -424,7 +424,7 @@ class StatsUsage extends Action try { $dbForProject = $getProjectDB($projectStats['project']); Console::log('Processing batch with ' . count($projectStats['stats']) . ' stats'); - $dbForProject->createOrUpdateDocumentsWithIncrease('stats', 'value', $projectStats['stats']); + $dbForProject->upsertDocumentsWithIncrease('stats', 'value', $projectStats['stats']); Console::success('Batch successfully written to DB'); unset($this->projects[$sequence]); @@ -468,7 +468,7 @@ class StatsUsage extends Action try { Console::log('Processing batch with ' . count($this->statDocuments) . ' stats'); - $dbForLogs->createOrUpdateDocumentsWithIncrease( + $dbForLogs->upsertDocumentsWithIncrease( 'stats', 'value', $this->statDocuments diff --git a/tests/e2e/Services/Databases/Transactions/TransactionsTest.php b/tests/e2e/Services/Databases/Transactions/TransactionsTest.php index cb0f3f3827..f1e8b64d58 100644 --- a/tests/e2e/Services/Databases/Transactions/TransactionsTest.php +++ b/tests/e2e/Services/Databases/Transactions/TransactionsTest.php @@ -1044,6 +1044,8 @@ class TransactionsTest extends Scope 'required' => true, ]); + sleep(2); + // Create unique index on email $this->client->call(Client::METHOD_POST, "/databases/{$databaseId}/collections/{$collectionId}/indexes", array_merge([ 'content-type' => 'application/json', @@ -1055,7 +1057,7 @@ class TransactionsTest extends Scope 'attributes' => ['email'], ]); - sleep(3); + sleep(2); // Create an existing document $this->client->call(Client::METHOD_POST, "/databases/{$databaseId}/collections/{$collectionId}/documents", array_merge([