## Summary Add `scheduleCommit()` and `commit()` parameterless overloads to `KafkaConsumer` that commit **all stored offsets** to the broker. This is the standard pattern for at-least-once consumers using `storeOffset()` + manual commit, matching the `rd_kafka_commit(rk, NULL, ...)` API available in Rust, Java, Go, and Python Kafka clients. ## New API ```swift // Fire-and-forget — schedules commit, returns immediately try consumer.scheduleCommit() // Awaitable — waits for broker acknowledgement try await consumer.commit() ``` Both methods: - Commit all offsets currently in librdkafka's local offset store - Require `enableAutoCommit = false` (throws config error otherwise) - Throw `connectionClosed` on a closed consumer ## Fixes (commit safety) While adding the new API, we identified and fixed safety issues in the existing `commit(_ message:)` that also apply to the new `commit()`: **CommitPromise** replaces `CapturedCommitCallback`: - 3-state enum (`initial` → `waiting` → `completed`) with `NIOLockedValueBox` guarantees exactly-one continuation resume - `withTaskCancellationHandler` ensures cancelled tasks get `CancellationError` instead of leaking the continuation - `passRetained`/`takeRetainedValue` properly balances ARC across the C callback boundary - Error check on `rd_kafka_commit_queue` return value with retain count cleanup on the synchronous error path ## Test plan **Unit tests** (mock broker, no Kafka needed): - `scheduleCommitAllFailsWithAutoCommitEnabled` - `commitAllFailsWithAutoCommitEnabled` - `scheduleCommitAllFailsOnClosedConsumer` - `commitAllFailsOnClosedConsumer` - `scheduleCommitAllSucceeds` **Integration tests** (requires broker): - `commitAllCommitsStoredOffsets` — produce → consume → storeOffset → `commit()` → verify via `committed()` - `scheduleCommitAllCommitsStoredOffsets` — same flow with `scheduleCommit()` (fire-and-forget)
Swift Kafka Client
The Swift Kafka Client library provides a convenient way to interact with Apache Kafka by leveraging Swift's new concurrency features. This package wraps the native librdkafka library.
Adding Kafka as a Dependency
To use the Kafka library in a SwiftPM project,
add the following line to the dependencies in your Package.swift file:
.package(url: "https://github.com/swift-server/swift-kafka-client", branch: "main")
Include "Kafka" as a dependency for your executable target:
.target(name: "<target>", dependencies: [
.product(name: "Kafka", package: "swift-kafka-client"),
]),
Finally, add import Kafka to your source code.
Usage
Kafka should be used within a Swift Service Lifecycle
ServiceGroup for proper startup and shutdown handling.
Both the KafkaProducer and the KafkaConsumer implement the Service protocol.
Producer API
The send(_:) method of KafkaProducer returns a message-id that can later be used to identify the corresponding acknowledgement. Acknowledgements are received through the events AsyncSequence. Each acknowledgement indicates that producing a message was successful or returns an error.
let brokerAddress = KafkaConfiguration.BrokerAddress(host: "localhost", port: 9092)
let configuration = KafkaProducerConfiguration(bootstrapBrokerAddresses: [brokerAddress])
let (producer, events) = try KafkaProducer.makeProducerWithEvents(
configuration: configuration,
logger: logger
)
await withThrowingTaskGroup(of: Void.self) { group in
// Run Task
group.addTask {
let serviceGroup = ServiceGroup(
services: [producer],
configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []),
logger: logger
)
try await serviceGroup.run()
}
// Task sending message and receiving events
group.addTask {
let messageID = try producer.send(
KafkaProducerMessage(
topic: "topic-name",
value: "Hello, World!"
)
)
for await event in events {
switch event {
case .deliveryReports(let deliveryReports):
// Check what messages the delivery reports belong to
default:
break // Ignore any other events
}
}
}
}
Consumer API
After initializing the KafkaConsumer with a topic-partition pair to read from, messages can be consumed using the messages AsyncSequence.
let brokerAddress = KafkaConfiguration.BrokerAddress(host: "localhost", port: 9092)
let configuration = KafkaConsumerConfiguration(
consumptionStrategy: .partition(
KafkaPartition(rawValue: 0),
topic: "topic-name"
),
bootstrapBrokerAddresses: [brokerAddress]
)
let consumer = try KafkaConsumer(
configuration: configuration,
logger: logger
)
await withThrowingTaskGroup(of: Void.self) { group in
// Run Task
group.addTask {
let serviceGroup = ServiceGroup(
services: [consumer],
configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []),
logger: logger
)
try await serviceGroup.run()
}
// Task receiving messages
group.addTask {
for try await message in consumer.messages {
// Do something with message
}
}
}
Consumer Groups
Kafka also allows users to subscribe to an array of topics as part of a consumer group.
let brokerAddress = KafkaConfiguration.BrokerAddress(host: "localhost", port: 9092)
let configuration = KafkaConsumerConfiguration(
consumptionStrategy: .group(id: "example-group", topics: ["topic-name"]),
bootstrapBrokerAddresses: [brokerAddress]
)
let consumer = try KafkaConsumer(
configuration: configuration,
logger: logger
)
await withThrowingTaskGroup(of: Void.self) { group in
// Run Task
group.addTask {
let serviceGroup = ServiceGroup(
services: [consumer],
configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []),
logger: logger
)
try await serviceGroup.run()
}
// Task receiving messages
group.addTask {
for try await message in consumer.messages {
// Do something with message
}
}
}
Manual commits
By default, the KafkaConsumer automatically commits message offsets after receiving the corresponding message. However, we allow users to disable this setting and commit message offsets manually.
let brokerAddress = KafkaConfiguration.BrokerAddress(host: "localhost", port: 9092)
var configuration = KafkaConsumerConfiguration(
consumptionStrategy: .group(id: "example-group", topics: ["topic-name"]),
bootstrapBrokerAddresses: [brokerAddress]
)
configuration.isAutoCommitEnabled = false
let consumer = try KafkaConsumer(
configuration: configuration,
logger: logger
)
await withThrowingTaskGroup(of: Void.self) { group in
// Run Task
group.addTask {
let serviceGroup = ServiceGroup(
services: [consumer],
configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []),
logger: logger
)
try await serviceGroup.run()
}
// Task receiving messages
group.addTask {
for try await message in consumer.messages {
// Do something with message
// ...
try await consumer.commitSync(message)
}
}
}
Security Mechanisms
Both the KafkaProducer and the KafkaConsumer can be configured to use different security mechanisms.
Plaintext
var configuration = KafkaProducerConfiguration(bootstrapBrokerAddresses: [])
configuration.securityProtocol = .plaintext
TLS
var configuration = KafkaProducerConfiguration(bootstrapBrokerAddresses: [])
configuration.securityProtocol = .tls()
SASL
let kerberosConfiguration = KafkaConfiguration.SASLMechanism.KerberosConfiguration(
keytab: "KEYTAB_FILE"
)
var config = KafkaProducerConfiguration(bootstrapBrokerAddresses: [])
config.securityProtocol = .saslPlaintext(
mechanism: .gssapi(kerberosConfiguration: kerberosConfiguration)
)
SASL + TLS
let saslMechanism = KafkaConfiguration.SASLMechanism.scramSHA256(
username: "USERNAME",
password: "PASSWORD"
)
var config = KafkaProducerConfiguration(bootstrapBrokerAddresses: [])
config.securityProtocol = .saslTLS(
saslMechanism: saslMechanism
)
librdkafka
The Package depends on the librdkafka library, which is included as a git submodule.
It has source files that are excluded in Package.swift.
Dependencies
librdkafka depends on openssl, meaning that libssl-dev must be present at build time.
openssl@3 can be installed on macOS, among others, through brew.
Development Setup
Running tests locally
Integration tests require a running Kafka broker which can be started with Docker:
docker run -d -p 9092:9092 apache/kafka:3.9.1
# After starting the container run the tests
swift test
Running tests in Docker
We provide a Docker environment for this package. This will automatically start a local Kafka server and run the tests:
docker compose -f docker/docker-compose.yaml run client swift test
Alternatively you can use a Makefile target:
make docker-test
You can specify Swift compiler version using SWIFT_VERSION environment variable:
SWIFT_VERSION=6.2 make docker-test