Add Span tracing to Messaging worker

Replace Console logging with Span tracing in the Messaging worker
for better observability. Initialize Span storage and exporter in
worker.php to enable tracing in all workers.
This commit is contained in:
Chirag Aggarwal
2026-02-03 11:59:49 +05:30
parent 92d2310a53
commit 787e7aed43
2 changed files with 37 additions and 28 deletions
+5
View File
@@ -48,8 +48,13 @@ use Utopia\Storage\Device\Telemetry as TelemetryDevice;
use Utopia\System\System;
use Utopia\Telemetry\Adapter as Telemetry;
use Utopia\Telemetry\Adapter\None as NoTelemetry;
use Utopia\Span\Exporter;
use Utopia\Span\Span;
use Utopia\Span\Storage;
Runtime::enableCoroutine();
Span::setStorage(new Storage\Coroutine());
Span::addExporter(new Exporter\Stdout());
Server::setResource('register', fn () => $register);
+32 -28
View File
@@ -5,7 +5,7 @@ namespace Appwrite\Platform\Workers;
use Appwrite\Event\StatsUsage;
use Appwrite\Messaging\Status as MessageStatus;
use Swoole\Runtime;
use Utopia\CLI\Console;
use Utopia\Span\Span;
use Utopia\Config\Config;
use Utopia\Database\Database;
use Utopia\Database\DateTime;
@@ -100,20 +100,31 @@ class Messaging extends Action
$type = $payload['type'] ?? '';
switch ($type) {
case MESSAGE_SEND_TYPE_INTERNAL:
$message = new Document($payload['message'] ?? []);
$recipients = $payload['recipients'] ?? [];
Span::init('messaging');
Span::add('project', $project->getId());
Span::add('type', $type);
$this->sendInternalSMSMessage($message, $project, $recipients, $log);
break;
case MESSAGE_SEND_TYPE_EXTERNAL:
$message = $dbForProject->getDocument('messages', $payload['messageId']);
try {
switch ($type) {
case MESSAGE_SEND_TYPE_INTERNAL:
$message = new Document($payload['message'] ?? []);
$recipients = $payload['recipients'] ?? [];
$this->sendExternalMessage($dbForProject, $message, $deviceForFiles, $project, $queueForStatsUsage);
break;
default:
throw new \Exception('Unknown message type: ' . $type);
$this->sendInternalSMSMessage($message, $project, $recipients, $log);
break;
case MESSAGE_SEND_TYPE_EXTERNAL:
$message = $dbForProject->getDocument('messages', $payload['messageId']);
$this->sendExternalMessage($dbForProject, $message, $deviceForFiles, $project, $queueForStatsUsage);
break;
default:
throw new \Exception('Unknown message type: ' . $type);
}
} catch (\Throwable $e) {
Span::error($e);
throw $e;
} finally {
Span::current()->finish();
}
}
@@ -129,10 +140,7 @@ class Messaging extends Action
$userIds = $message->getAttribute('users', []);
$providerType = $message->getAttribute('providerType');
Console::log(json_encode([
'project' => $project->getId(),
'type' => $providerType,
]));
Span::add('providerType', $providerType);
/**
* @var array<Document> $allTargets
@@ -183,7 +191,7 @@ class Messaging extends Action
'deliveryErrors' => ['No valid recipients found.']
]));
Console::warning('No valid recipients found.');
Span::add('error', 'No valid recipients found.');
return;
}
@@ -198,7 +206,7 @@ class Messaging extends Action
'deliveryErrors' => ['No enabled provider found.']
]));
Console::warning('No enabled provider found.');
Span::add('error', 'No enabled provider found.');
return;
}
@@ -397,7 +405,7 @@ class Messaging extends Action
}
if ($this->adapter === null) {
Console::warning('Skipped SMS processing. SMS adapter is not set.');
Span::add('warning', 'Skipped SMS processing. SMS adapter is not set.');
return;
}
@@ -405,14 +413,10 @@ class Messaging extends Action
throw new \Exception('Project not set in payload');
}
Console::log(json_encode([
'project' => $project->getId(),
'type' => 'internal-sms'
]));
$denyList = System::getEnv('_APP_SMS_PROJECTS_DENY_LIST', '');
$denyList = explode(',', $denyList);
if (\in_array($project->getId(), $denyList)) {
Console::error('Project is in the deny list. Skipping...');
Span::add('error', 'Project is in the deny list. Skipping...');
return;
}
@@ -699,7 +703,7 @@ class Messaging extends Action
private function createInternalSMSAdapter(): ?SMSAdapter
{
if (empty(System::getEnv('_APP_SMS_PROVIDER')) || empty(System::getEnv('_APP_SMS_FROM'))) {
Console::warning('Skipped SMS processing. Missing "_APP_SMS_PROVIDER" or "_APP_SMS_FROM" environment variables.');
Span::add('warning', 'Skipped SMS processing. Missing "_APP_SMS_PROVIDER" or "_APP_SMS_FROM" environment variables.');
return null;
}
@@ -745,13 +749,13 @@ class Messaging extends Action
$provider = $this->createProviderFromDSN($localDSN);
$adapter = $this->getSmsAdapter($provider);
} catch (\Exception) {
Console::warning('Unable to create adapter: ' . $localDSN->getHost());
Span::add('warning', 'Unable to create adapter: ' . $localDSN->getHost());
continue;
}
$callingCode = $localDSN->getParam('local', '');
if (empty($callingCode)) {
Console::warning('Unable to register adapter: ' . $localDSN->getHost() . '. Missing `local` parameter.');
Span::add('warning', 'Unable to register adapter: ' . $localDSN->getHost() . '. Missing `local` parameter.');
continue;
}