Add transformer

This commit is contained in:
Jake Barnby
2026-02-04 23:15:21 +13:00
parent ce49b8ecb8
commit 98fc16640a
13 changed files with 76 additions and 220 deletions
+26 -3
View File
@@ -28,6 +28,24 @@ class TransactionState
$this->authorization = $authorization;
}
/**
* Apply database transformer to document if configured
*
* @param Document $document
* @param string $collectionId
* @return Document
*/
private function transformDocument(Document $document, string $collectionId): Document
{
$transformer = $this->dbForProject->getTransformer();
if ($transformer === null || $document->isEmpty()) {
return $document;
}
// Create minimal collection Document - transformer only uses getId()
$collection = new Document(['$id' => $collectionId]);
return $transformer($document, $collection, $this->dbForProject);
}
/**
* Get a document with transaction-aware logic
@@ -61,7 +79,7 @@ class TransactionState
}
if ($docState['action'] === 'create') {
return $this->applyProjection($docState['document'], $queries);
return $this->transformDocument($this->applyProjection($docState['document'], $queries), $collectionId);
}
if ($docState['action'] === 'update' || $docState['action'] === 'upsert') {
@@ -74,9 +92,9 @@ class TransactionState
}
}
// Reapply projection in case transaction added new fields
return $this->applyProjection($committedDoc, $queries);
return $this->transformDocument($this->applyProjection($committedDoc, $queries), $collectionId);
} elseif ($docState['action'] === 'upsert') {
return $this->applyProjection($docState['document'], $queries);
return $this->transformDocument($this->applyProjection($docState['document'], $queries), $collectionId);
}
}
}
@@ -141,6 +159,11 @@ class TransactionState
}
}
// Transform all documents before returning
foreach ($documentMap as $docId => $doc) {
$documentMap[$docId] = $this->transformDocument($doc, $collectionId);
}
return array_values($documentMap);
}
@@ -281,99 +281,48 @@ abstract class Action extends DatabasesAction
}
/**
* Resolves relationships in a document and attaches metadata.
* Get the database document and configure the transformer automatically.
* Replaces manual database fetch + processDocument() pattern.
*
* @param Database $dbForProject
* @param string $databaseId
* @param Authorization $authorization
* @param bool $isAPIKey
* @param bool $isPrivilegedUser
* @param int|null &$operations Optional counter incremented for each document processed
* @return Document The database document
* @throws Exception
*/
protected function processDocument(
/* database */
Document $database,
Document $collection,
Document $document,
protected function getDatabaseDocument(
Database $dbForProject,
/* options */
array &$collectionsCache,
string $databaseId,
Authorization $authorization,
?int &$operations = null,
int $depth = 0,
): bool {
if ($operations !== null && $document->isEmpty()) {
return false;
bool $isAPIKey,
bool $isPrivilegedUser,
?int &$operations = null
): Document {
$database = $authorization->skip(fn () => $dbForProject->getDocument('databases', $databaseId));
if ($database->isEmpty() || (!$database->getAttribute('enabled', false) && !$isAPIKey && !$isPrivilegedUser)) {
throw new Exception(Exception::DATABASE_NOT_FOUND, params: [$databaseId]);
}
if ($operations !== null) {
$operations++;
}
// Auto-configure transformer
$contextKey = '$' . $this->getCollectionsEventsContext() . 'Id';
$collectionId = $collection->getId();
$document->removeAttribute('$collection');
$document->setAttribute('$databaseId', $database->getId());
$document->setAttribute('$' . $this->getCollectionsEventsContext() . 'Id', $collectionId);
// Stop processing relationships if max depth reached
if ($depth >= Database::RELATION_MAX_DEPTH) {
return true;
}
$relationships = $collectionsCache[$collectionId] ??= \array_filter(
$collection->getAttribute('attributes', []),
fn ($attr) => $attr->getAttribute('type') === Database::VAR_RELATIONSHIP
);
foreach ($relationships as $relationship) {
$key = $relationship->getAttribute('key');
$related = $document->getAttribute($key);
if (empty($related)) {
if (\in_array(\gettype($related), ['array', 'object']) && $operations !== null) {
$dbForProject->setTransformer(
function (Document $document, Document $collection, Database $db) use ($database, $contextKey, &$operations): Document {
if ($operations !== null) {
$operations++;
}
continue;
$document->removeAttribute('$collection');
$document->setAttribute('$databaseId', $database->getId());
$document->setAttribute($contextKey, $collection->getId());
return $document;
}
);
$relations = \is_array($related) ? $related : [$related];
$relatedCollectionId = $relationship->getAttribute('relatedCollection');
if (!isset($collectionsCache[$relatedCollectionId])) {
$relatedCollectionDoc = $authorization->skip(
fn () => $dbForProject->getDocument(
'database_' . $database->getSequence(),
$relatedCollectionId
)
);
$collectionsCache[$relatedCollectionId] = \array_filter(
$relatedCollectionDoc->getAttribute('attributes', []),
fn ($attr) => $attr->getAttribute('type') === Database::VAR_RELATIONSHIP
);
}
foreach ($relations as $relation) {
if ($relation instanceof Document) {
$relatedCollection = new Document([
'$id' => $relatedCollectionId,
'attributes' => $collectionsCache[$relatedCollectionId],
]);
$this->processDocument(
database: $database,
collection: $relatedCollection,
document: $relation,
dbForProject: $dbForProject,
collectionsCache: $collectionsCache,
authorization: $authorization,
operations: $operations,
depth: $depth + 1
);
}
}
if (\is_array($related)) {
$document->setAttribute($relationship->getAttribute('key'), \array_values($relations));
} elseif (empty($relations)) {
$document->setAttribute($relationship->getAttribute('key'), null);
}
}
return true;
return $database;
}
/**
@@ -94,10 +94,7 @@ class Decrement extends Action
$isAPIKey = User::isApp($authorization->getRoles());
$isPrivilegedUser = User::isPrivileged($authorization->getRoles());
$database = $authorization->skip(fn () => $dbForProject->getDocument('databases', $databaseId));
if ($database->isEmpty()) {
throw new Exception(Exception::DATABASE_NOT_FOUND, params: [$databaseId]);
}
$database = $this->getDatabaseDocument($dbForProject, $databaseId, $authorization, $isAPIKey, $isPrivilegedUser);
$collection = $authorization->skip(fn () => $dbForProject->getDocument('database_' . $database->getSequence(), $collectionId));
if ($collection->isEmpty()) {
@@ -178,8 +175,6 @@ class Decrement extends Action
value: $value,
min: $min
);
$document->setAttribute('$databaseId', $database->getId());
$document->setAttribute('$' . $this->getCollectionsEventsContext() . 'Id', $collectionId);
} catch (ConflictException) {
throw new Exception($this->getConflictException());
} catch (NotFoundException) {
@@ -94,10 +94,7 @@ class Increment extends Action
$isAPIKey = User::isApp($authorization->getRoles());
$isPrivilegedUser = User::isPrivileged($authorization->getRoles());
$database = $authorization->skip(fn () => $dbForProject->getDocument('databases', $databaseId));
if ($database->isEmpty()) {
throw new Exception(Exception::DATABASE_NOT_FOUND, params: [$databaseId]);
}
$database = $this->getDatabaseDocument($dbForProject, $databaseId, $authorization, $isAPIKey, $isPrivilegedUser);
$collection = $authorization->skip(fn () => $dbForProject->getDocument('database_' . $database->getSequence(), $collectionId));
if ($collection->isEmpty()) {
@@ -178,8 +175,6 @@ class Increment extends Action
value: $value,
max: $max
);
$document->setAttribute('$databaseId', $database->getId());
$document->setAttribute('$' . $this->getCollectionsEventsContext() . 'Id', $collectionId);
} catch (ConflictException) {
throw new Exception($this->getConflictException());
} catch (NotFoundException) {
@@ -88,10 +88,7 @@ class Delete extends Action
public function action(string $databaseId, string $collectionId, array $queries, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, StatsUsage $queueForStatsUsage, Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan, EventProcessor $eventProcessor): void
{
$database = $dbForProject->getDocument('databases', $databaseId);
if ($database->isEmpty()) {
throw new Exception(Exception::DATABASE_NOT_FOUND, params: [$databaseId]);
}
$database = $this->getDatabaseDocument($dbForProject, $databaseId, $dbForProject->getAuthorization(), true, true);
$collection = $dbForProject->getDocument('database_' . $database->getSequence(), $collectionId);
if ($collection->isEmpty()) {
@@ -183,11 +180,6 @@ class Delete extends Action
throw new Exception(Exception::GENERAL_QUERY_INVALID, $e->getMessage());
}
foreach ($documents as $document) {
$document->setAttribute('$databaseId', $database->getId());
$document->setAttribute('$'.$this->getCollectionsEventsContext().'Id', $collection->getId());
}
$queueForStatsUsage
->addMetric(METRIC_DATABASES_OPERATIONS_WRITES, \max(1, $modified))
->addMetric(str_replace('{databaseInternalId}', $database->getSequence(), METRIC_DATABASE_ID_OPERATIONS_WRITES), \max(1, $modified));
@@ -100,10 +100,7 @@ class Update extends Action
throw new Exception($this->getMissingPayloadException());
}
$database = $dbForProject->getDocument('databases', $databaseId);
if ($database->isEmpty()) {
throw new Exception(Exception::DATABASE_NOT_FOUND, params: [$databaseId]);
}
$database = $this->getDatabaseDocument($dbForProject, $databaseId, $dbForProject->getAuthorization(), true, true);
$collection = $dbForProject->getDocument('database_' . $database->getSequence(), $collectionId);
if ($collection->isEmpty()) {
@@ -214,11 +211,6 @@ class Update extends Action
throw new Exception(Exception::GENERAL_QUERY_INVALID, $e->getMessage());
}
foreach ($documents as $document) {
$document->setAttribute('$databaseId', $database->getId());
$document->setAttribute('$'.$this->getCollectionsEventsContext().'Id', $collection->getId());
}
$queueForStatsUsage
->addMetric(METRIC_DATABASES_OPERATIONS_WRITES, \max(1, $modified))
->addMetric(str_replace('{databaseInternalId}', $database->getSequence(), METRIC_DATABASE_ID_OPERATIONS_WRITES), \max(1, $modified));
@@ -90,10 +90,7 @@ class Upsert extends Action
public function action(string $databaseId, string $collectionId, array $documents, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, StatsUsage $queueForStatsUsage, Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan, EventProcessor $eventProcessor): void
{
$database = $dbForProject->getDocument('databases', $databaseId);
if ($database->isEmpty()) {
throw new Exception(Exception::DATABASE_NOT_FOUND, params: [$databaseId]);
}
$database = $this->getDatabaseDocument($dbForProject, $databaseId, $dbForProject->getAuthorization(), true, true);
$collection = $dbForProject->getDocument('database_' . $database->getSequence(), $collectionId);
if ($collection->isEmpty()) {
@@ -189,11 +186,6 @@ class Upsert extends Action
throw new Exception($this->getStructureException(), $e->getMessage());
}
foreach ($upserted as $document) {
$document->setAttribute('$databaseId', $database->getId());
$document->setAttribute('$'.$this->getCollectionsEventsContext().'Id', $collection->getId());
}
$queueForStatsUsage
->addMetric(METRIC_DATABASES_OPERATIONS_WRITES, \max(1, $modified))
->addMetric(str_replace('{databaseInternalId}', $database->getSequence(), METRIC_DATABASE_ID_OPERATIONS_WRITES), \max(1, $modified));
@@ -189,10 +189,7 @@ class Create extends Action
throw new Exception(Exception::GENERAL_UNAUTHORIZED_SCOPE);
}
$database = $authorization->skip(fn () => $dbForProject->getDocument('databases', $databaseId));
if ($database->isEmpty() || (!$database->getAttribute('enabled', false) && !$isAPIKey && !$isPrivilegedUser)) {
throw new Exception(Exception::DATABASE_NOT_FOUND, params: [$databaseId]);
}
$database = $this->getDatabaseDocument($dbForProject, $databaseId, $authorization, $isAPIKey, $isPrivilegedUser);
$collection = $authorization->skip(fn () => $dbForProject->getDocument('database_' . $database->getSequence(), $collectionId));
if ($collection->isEmpty() || (!$collection->getAttribute('enabled', false) && !$isAPIKey && !$isPrivilegedUser)) {
@@ -471,18 +468,6 @@ class Create extends Action
->setParam('tableId', $collection->getId())
->setContext($this->getCollectionsEventsContext(), $collection);
$collectionsCache = [];
foreach ($documents as $document) {
$this->processDocument(
database: $database,
collection: $collection,
document: $document,
dbForProject: $dbForProject,
collectionsCache: $collectionsCache,
authorization: $authorization
);
}
$queueForStatsUsage
->addMetric(METRIC_DATABASES_OPERATIONS_WRITES, \max(1, $operations))
->addMetric(str_replace('{databaseInternalId}', $database->getSequence(), METRIC_DATABASE_ID_OPERATIONS_WRITES), \max(1, $operations)); // per collection
@@ -101,14 +101,10 @@ class Delete extends Action
array $plan,
Authorization $authorization
): void {
$database = $authorization->skip(fn () => $dbForProject->getDocument('databases', $databaseId));
$isAPIKey = User::isApp($authorization->getRoles());
$isPrivilegedUser = User::isPrivileged($authorization->getRoles());
if ($database->isEmpty() || (!$database->getAttribute('enabled', false) && !$isAPIKey && !$isPrivilegedUser)) {
throw new Exception(Exception::DATABASE_NOT_FOUND, params: [$databaseId]);
}
$database = $this->getDatabaseDocument($dbForProject, $databaseId, $authorization, $isAPIKey, $isPrivilegedUser);
$collection = $authorization->skip(fn () => $dbForProject->getDocument('database_' . $database->getSequence(), $collectionId));
@@ -199,16 +195,11 @@ class Delete extends Action
throw new Exception($this->getRestrictedException());
}
$collectionsCache = [];
$this->processDocument(
database: $database,
collection: $collection,
document: $document,
dbForProject: $dbForProject,
collectionsCache: $collectionsCache,
authorization: $authorization
);
// Apply transformation to deleted document for events
$contextKey = '$' . $this->getCollectionsEventsContext() . 'Id';
$document->removeAttribute('$collection');
$document->setAttribute('$databaseId', $database->getId());
$document->setAttribute($contextKey, $collection->getId());
$queueForStatsUsage
->addMetric(METRIC_DATABASES_OPERATIONS_WRITES, 1)
@@ -79,10 +79,8 @@ class Get extends Action
$isAPIKey = User::isApp($authorization->getRoles());
$isPrivilegedUser = User::isPrivileged($authorization->getRoles());
$database = $authorization->skip(fn () => $dbForProject->getDocument('databases', $databaseId));
if ($database->isEmpty() || (!$database->getAttribute('enabled', false) && !$isAPIKey && !$isPrivilegedUser)) {
throw new Exception(Exception::DATABASE_NOT_FOUND, params: [$databaseId]);
}
$operations = 0;
$database = $this->getDatabaseDocument($dbForProject, $databaseId, $authorization, $isAPIKey, $isPrivilegedUser, $operations);
$collection = $authorization->skip(fn () => $dbForProject->getDocument('database_' . $database->getSequence(), $collectionId));
@@ -118,18 +116,6 @@ class Get extends Action
throw new Exception($this->getNotFoundException(), params: [$documentId]);
}
$operations = 0;
$collectionsCache = [];
$this->processDocument(
database: $database,
collection: $collection,
document: $document,
dbForProject: $dbForProject,
collectionsCache: $collectionsCache,
authorization: $authorization,
operations: $operations
);
$queueForStatsUsage
->addMetric(METRIC_DATABASES_OPERATIONS_READS, max($operations, 1))
->addMetric(str_replace('{databaseInternalId}', $database->getSequence(), METRIC_DATABASE_ID_OPERATIONS_READS), $operations);
@@ -99,14 +99,10 @@ class Update extends Action
throw new Exception($this->getMissingPayloadException());
}
$database = $authorization->skip(fn () => $dbForProject->getDocument('databases', $databaseId));
$isAPIKey = User::isApp($authorization->getRoles());
$isPrivilegedUser = User::isPrivileged($authorization->getRoles());
if ($database->isEmpty() || (!$database->getAttribute('enabled', false) && !$isAPIKey && !$isPrivilegedUser)) {
throw new Exception(Exception::DATABASE_NOT_FOUND, params: [$databaseId]);
}
$database = $this->getDatabaseDocument($dbForProject, $databaseId, $authorization, $isAPIKey, $isPrivilegedUser);
$collection = $authorization->skip(fn () => $dbForProject->getDocument('database_' . $database->getSequence(), $collectionId));
@@ -337,16 +333,6 @@ class Update extends Action
throw new Exception($this->getStructureException(), $e->getMessage());
}
$collectionsCache = [];
$this->processDocument(
database: $database,
collection: $collection,
document: $document,
dbForProject: $dbForProject,
collectionsCache: $collectionsCache,
authorization: $authorization,
);
$response->dynamic($document, $this->getResponseModel());
$relationships = \array_map(
@@ -110,10 +110,7 @@ class Upsert extends Action
$isAPIKey = User::isApp($authorization->getRoles());
$isPrivilegedUser = User::isPrivileged($authorization->getRoles());
$database = $authorization->skip(fn () => $dbForProject->getDocument('databases', $databaseId));
if ($database->isEmpty() || (!$database->getAttribute('enabled', false) && !$isAPIKey && !$isPrivilegedUser)) {
throw new Exception(Exception::DATABASE_NOT_FOUND, params: [$databaseId]);
}
$database = $this->getDatabaseDocument($dbForProject, $databaseId, $authorization, $isAPIKey, $isPrivilegedUser);
$collection = $authorization->skip(fn () => $dbForProject->getDocument('database_' . $database->getSequence(), $collectionId));
if ($collection->isEmpty() || (!$collection->getAttribute('enabled', false) && !$isAPIKey && !$isPrivilegedUser)) {
@@ -346,8 +343,6 @@ class Upsert extends Action
throw new Exception($this->getStructureException(), $e->getMessage());
}
$collectionsCache = [];
if (empty($upserted[0])) {
if ($transactionId !== null) {
// For transactions, get the document with transaction changes applied
@@ -359,15 +354,6 @@ class Upsert extends Action
$document = $upserted[0];
$this->processDocument(
database: $database,
collection: $collection,
document: $document,
dbForProject: $dbForProject,
collectionsCache: $collectionsCache,
authorization: $authorization
);
$relationships = \array_map(
fn ($document) => $document->getAttribute('key'),
\array_filter(
@@ -83,10 +83,8 @@ class XList extends Action
$isAPIKey = User::isApp($authorization->getRoles());
$isPrivilegedUser = User::isPrivileged($authorization->getRoles());
$database = $authorization->skip(fn () => $dbForProject->getDocument('databases', $databaseId));
if ($database->isEmpty() || (!$database->getAttribute('enabled', false) && !$isAPIKey && !$isPrivilegedUser)) {
throw new Exception(Exception::DATABASE_NOT_FOUND, params: [$databaseId]);
}
$operations = 0;
$database = $this->getDatabaseDocument($dbForProject, $databaseId, $authorization, $isAPIKey, $isPrivilegedUser, $operations);
$collection = $authorization->skip(fn () => $dbForProject->getDocument('database_' . $database->getSequence(), $collectionId));
if ($collection->isEmpty() || (!$collection->getAttribute('enabled', false) && !$isAPIKey && !$isPrivilegedUser)) {
@@ -153,20 +151,6 @@ class XList extends Action
throw new Exception(Exception::GENERAL_QUERY_INVALID, $e->getMessage());
}
$operations = 0;
$collectionsCache = [];
foreach ($documents as $document) {
$this->processDocument(
database: $database,
collection: $collection,
document: $document,
dbForProject: $dbForProject,
collectionsCache: $collectionsCache,
authorization: $authorization,
operations: $operations
);
}
$queueForStatsUsage
->addMetric(METRIC_DATABASES_OPERATIONS_READS, max($operations, 1))
->addMetric(str_replace('{databaseInternalId}', $database->getSequence(), METRIC_DATABASE_ID_OPERATIONS_READS), $operations);