mirror of
https://github.com/appwrite/appwrite.git
synced 2026-05-26 13:51:13 +00:00
feat(lock): include project internal id in lock key + telemetry
Per-manager request, lock keys are now prefixed with the project's
internal id (sequence) so that:
- Locks are partitioned by project — Redis cluster slot affinity
if/when sharded.
- Cross-project requests can't compete on the same key for
collection-scoped resources.
- Telemetry (counter + Sentry tags) carries 'project' alongside
'target', so dashboards can filter contention by project.
Key shapes:
set: lock:platform:{project}:{collection}:{id}:{attribute}
run/orFail: lock:platform:{project}:{collection}:{id}
withKey: raw (caller-provided)
Lock now requires a project document at construction. All existing
call sites (4 in CE + 2 in cloud) run inside Http::init()-resolved
request scope where the project document is set, so no migration
needed. Workers/CLI without project context can use withKey directly.
This commit is contained in:
@@ -75,9 +75,9 @@ return function (Container $container): void {
|
||||
return $register->get('logger');
|
||||
}, ['register']);
|
||||
|
||||
$container->set('lock', function (\Redis $redis, Telemetry $telemetry, Database $dbForPlatform, Authorization $authorization, Log $log, ?Logger $logger): Lock {
|
||||
return new Lock($redis, $telemetry, $dbForPlatform, $authorization, $log, $logger);
|
||||
}, ['redis', 'telemetry', 'dbForPlatform', 'authorization', 'log', 'logger']);
|
||||
$container->set('lock', function (\Redis $redis, Telemetry $telemetry, Database $dbForPlatform, Authorization $authorization, Log $log, ?Logger $logger, Document $project): Lock {
|
||||
return new Lock($redis, $telemetry, $dbForPlatform, $authorization, $log, $logger, $project);
|
||||
}, ['redis', 'telemetry', 'dbForPlatform', 'authorization', 'log', 'logger', 'project']);
|
||||
|
||||
$container->set('authorization', function () {
|
||||
return new Authorization();
|
||||
|
||||
@@ -22,6 +22,8 @@ final class Lock
|
||||
|
||||
private readonly mixed $attempts;
|
||||
|
||||
private readonly string $projectInternalId;
|
||||
|
||||
/** @var array<string,int> */
|
||||
private static array $lastReportAt = [];
|
||||
|
||||
@@ -32,9 +34,12 @@ final class Lock
|
||||
private readonly Authorization $authorization,
|
||||
private readonly Log $log,
|
||||
private readonly ?Logger $logger,
|
||||
Document $project,
|
||||
) {
|
||||
$this->enabled = System::getEnv('_APP_LOCKING_ENABLED', 'enabled') !== 'disabled';
|
||||
$this->attempts = $telemetry->createCounter('lock.attempts', null, 'Distributed lock acquire outcomes');
|
||||
$sequence = $project->getSequence();
|
||||
$this->projectInternalId = ($sequence !== null && $sequence !== '') ? (string) $sequence : 'unknown';
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -49,16 +54,14 @@ final class Lock
|
||||
string $attribute = 'accessedAt',
|
||||
?string $value = null,
|
||||
): void {
|
||||
$this->withKey(
|
||||
"lock:platform:{$collection}:{$id}:{$attribute}",
|
||||
function () use ($collection, $id, $attribute, $value) {
|
||||
$this->authorization->skip(fn () => $this->dbForPlatform->updateDocument(
|
||||
$collection,
|
||||
$id,
|
||||
new Document([$attribute => $value ?? DateTime::now()])
|
||||
));
|
||||
}
|
||||
);
|
||||
$key = "lock:platform:{$this->projectInternalId}:{$collection}:{$id}:{$attribute}";
|
||||
$this->execute($key, $collection, function () use ($collection, $id, $attribute, $value) {
|
||||
$this->authorization->skip(fn () => $this->dbForPlatform->updateDocument(
|
||||
$collection,
|
||||
$id,
|
||||
new Document([$attribute => $value ?? DateTime::now()])
|
||||
));
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -67,7 +70,8 @@ final class Lock
|
||||
*/
|
||||
public function run(string $collection, string $id, Closure $fn): void
|
||||
{
|
||||
$this->withKey("lock:platform:{$collection}:{$id}", $fn);
|
||||
$key = "lock:platform:{$this->projectInternalId}:{$collection}:{$id}";
|
||||
$this->execute($key, $collection, $fn);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -77,18 +81,19 @@ final class Lock
|
||||
*/
|
||||
public function runOrFail(string $collection, string $id, Closure $fn): mixed
|
||||
{
|
||||
return $this->withKey(
|
||||
"lock:platform:{$collection}:{$id}",
|
||||
$fn,
|
||||
ttl: 10,
|
||||
orFail: true,
|
||||
);
|
||||
$key = "lock:platform:{$this->projectInternalId}:{$collection}:{$id}";
|
||||
|
||||
return $this->execute($key, $collection, $fn, ttl: 10, orFail: true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Generic lock primitive with full control over key, TTL, contention
|
||||
* behavior, and wait timeout. Escape hatch for non-platform keys
|
||||
* (cache, queue, edge) and for unusual TTL/timeout requirements.
|
||||
*
|
||||
* Caller may pass `target` for telemetry; otherwise it's extracted by
|
||||
* position from the key (best-effort for keys following the standard
|
||||
* `lock:<scope>:<...>:<target>:<...>` shape).
|
||||
*/
|
||||
public function withKey(
|
||||
string $key,
|
||||
@@ -96,18 +101,37 @@ final class Lock
|
||||
int $ttl = 5,
|
||||
bool $orFail = false,
|
||||
float $waitTimeout = 3.0,
|
||||
?string $target = null,
|
||||
): mixed {
|
||||
return $this->execute(
|
||||
$key,
|
||||
$target ?? self::targetOf($key),
|
||||
$fn,
|
||||
ttl: $ttl,
|
||||
orFail: $orFail,
|
||||
waitTimeout: $waitTimeout,
|
||||
);
|
||||
}
|
||||
|
||||
private function execute(
|
||||
string $key,
|
||||
string $target,
|
||||
Closure $fn,
|
||||
int $ttl = 5,
|
||||
bool $orFail = false,
|
||||
float $waitTimeout = 3.0,
|
||||
): mixed {
|
||||
if (! $this->enabled) {
|
||||
return $fn();
|
||||
}
|
||||
|
||||
$target = self::targetOf($key);
|
||||
$lock = new DistributedLock($this->redis, $key, $ttl);
|
||||
$labels = ['target' => $target, 'project' => $this->projectInternalId];
|
||||
|
||||
try {
|
||||
$acquired = $orFail ? $lock->acquire($waitTimeout) : $lock->tryAcquire();
|
||||
} catch (\RedisException $e) {
|
||||
$this->attempts->add(1, ['outcome' => 'backend_error', 'target' => $target]);
|
||||
$this->attempts->add(1, ['outcome' => 'backend_error', ...$labels]);
|
||||
$this->reportError('backend_error', $key, $target, $e);
|
||||
|
||||
return $fn();
|
||||
@@ -115,23 +139,23 @@ final class Lock
|
||||
|
||||
if (! $acquired) {
|
||||
if ($orFail) {
|
||||
$this->attempts->add(1, ['outcome' => 'contended', 'target' => $target]);
|
||||
$this->attempts->add(1, ['outcome' => 'contended', ...$labels]);
|
||||
// No custom message — the lock key embeds collection + document id.
|
||||
throw new Exception(Exception::GENERAL_RESOURCE_LOCKED);
|
||||
}
|
||||
$this->attempts->add(1, ['outcome' => 'skipped', 'target' => $target]);
|
||||
$this->attempts->add(1, ['outcome' => 'skipped', ...$labels]);
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
$this->attempts->add(1, ['outcome' => 'acquired', 'target' => $target]);
|
||||
$this->attempts->add(1, ['outcome' => 'acquired', ...$labels]);
|
||||
try {
|
||||
return $fn();
|
||||
} finally {
|
||||
try {
|
||||
$lock->release();
|
||||
} catch (Throwable $e) {
|
||||
$this->attempts->add(1, ['outcome' => 'release_error', 'target' => $target]);
|
||||
$this->attempts->add(1, ['outcome' => 'release_error', ...$labels]);
|
||||
$this->reportError('release_error', $key, $target, $e);
|
||||
}
|
||||
}
|
||||
@@ -173,6 +197,7 @@ final class Lock
|
||||
? Log::ENVIRONMENT_PRODUCTION
|
||||
: Log::ENVIRONMENT_STAGING);
|
||||
$this->log->addTag('lock.target', $target);
|
||||
$this->log->addTag('lock.project', $this->projectInternalId);
|
||||
// Strip trailing document ID to keep aggregator cardinality bounded.
|
||||
$this->log->addTag('lock.key_pattern', preg_replace('/:[^:]+$/', ':*', $key));
|
||||
$this->log->addTag('code', $e->getCode());
|
||||
|
||||
Reference in New Issue
Block a user