### Motivation `Dockerfile` and `docker-compose.yaml` still used Swift 5.9 as the default base image. Having a separate compose file for each Swift version and separate service for each command is potentially hard to maintain and not very flexible. ### Modifications #### Update Dockerfile to use swift:6.0 base image Drop the separate swift format installation steps now that it's part of the toolchain. Merge dependency installation steps to reduce the number of image layers. #### Replace separate docker-compose files with a configurable one Instead of having a service per command and separate compose files for each Swift version we have a compose files with two services: kafka broker and the client. Client service will build and tag the docker image on demand using the `SWIFT_VERSION` and `UBUNTU_VERSION` environment variables to set base Swift image, removing the need for separate compose files for each version. Client service can be started with any command so we don't need separate services for each one. #### Add Makefile targets to run build and tests using docker-compose #### Update README local development instructions Also removes suggestions to install local Kafka and ZK for running tests on macOS. ### Result Tests can be executed inside docker for any support Swift version with: ```shell SWIFT_VERSION=6.x make docker-test ```
8.1 KiB
Swift Kafka Client
The Swift Kafka Client library provides a convenient way to interact with Apache Kafka by leveraging Swift's new concurrency features. This package wraps the native librdkafka library.
Adding Kafka as a Dependency
To use the Kafka library in a SwiftPM project,
add the following line to the dependencies in your Package.swift file:
.package(url: "https://github.com/swift-server/swift-kafka-client", branch: "main")
Include "Kafka" as a dependency for your executable target:
.target(name: "<target>", dependencies: [
.product(name: "Kafka", package: "swift-kafka-client"),
]),
Finally, add import Kafka to your source code.
Usage
Kafka should be used within a Swift Service Lifecycle
ServiceGroup for proper startup and shutdown handling.
Both the KafkaProducer and the KafkaConsumer implement the Service protocol.
Producer API
The send(_:) method of KafkaProducer returns a message-id that can later be used to identify the corresponding acknowledgement. Acknowledgements are received through the events AsyncSequence. Each acknowledgement indicates that producing a message was successful or returns an error.
let brokerAddress = KafkaConfiguration.BrokerAddress(host: "localhost", port: 9092)
let configuration = KafkaProducerConfiguration(bootstrapBrokerAddresses: [brokerAddress])
let (producer, events) = try KafkaProducer.makeProducerWithEvents(
configuration: configuration,
logger: logger
)
await withThrowingTaskGroup(of: Void.self) { group in
// Run Task
group.addTask {
let serviceGroup = ServiceGroup(
services: [producer],
configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []),
logger: logger
)
try await serviceGroup.run()
}
// Task sending message and receiving events
group.addTask {
let messageID = try producer.send(
KafkaProducerMessage(
topic: "topic-name",
value: "Hello, World!"
)
)
for await event in events {
switch event {
case .deliveryReports(let deliveryReports):
// Check what messages the delivery reports belong to
default:
break // Ignore any other events
}
}
}
}
Consumer API
After initializing the KafkaConsumer with a topic-partition pair to read from, messages can be consumed using the messages AsyncSequence.
let brokerAddress = KafkaConfiguration.BrokerAddress(host: "localhost", port: 9092)
let configuration = KafkaConsumerConfiguration(
consumptionStrategy: .partition(
KafkaPartition(rawValue: 0),
topic: "topic-name"
),
bootstrapBrokerAddresses: [brokerAddress]
)
let consumer = try KafkaConsumer(
configuration: configuration,
logger: logger
)
await withThrowingTaskGroup(of: Void.self) { group in
// Run Task
group.addTask {
let serviceGroup = ServiceGroup(
services: [consumer],
configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []),
logger: logger
)
try await serviceGroup.run()
}
// Task receiving messages
group.addTask {
for try await message in consumer.messages {
// Do something with message
}
}
}
Consumer Groups
Kafka also allows users to subscribe to an array of topics as part of a consumer group.
let brokerAddress = KafkaConfiguration.BrokerAddress(host: "localhost", port: 9092)
let configuration = KafkaConsumerConfiguration(
consumptionStrategy: .group(id: "example-group", topics: ["topic-name"]),
bootstrapBrokerAddresses: [brokerAddress]
)
let consumer = try KafkaConsumer(
configuration: configuration,
logger: logger
)
await withThrowingTaskGroup(of: Void.self) { group in
// Run Task
group.addTask {
let serviceGroup = ServiceGroup(
services: [consumer],
configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []),
logger: logger
)
try await serviceGroup.run()
}
// Task receiving messages
group.addTask {
for try await message in consumer.messages {
// Do something with message
}
}
}
Manual commits
By default, the KafkaConsumer automatically commits message offsets after receiving the corresponding message. However, we allow users to disable this setting and commit message offsets manually.
let brokerAddress = KafkaConfiguration.BrokerAddress(host: "localhost", port: 9092)
var configuration = KafkaConsumerConfiguration(
consumptionStrategy: .group(id: "example-group", topics: ["topic-name"]),
bootstrapBrokerAddresses: [brokerAddress]
)
configuration.isAutoCommitEnabled = false
let consumer = try KafkaConsumer(
configuration: configuration,
logger: logger
)
await withThrowingTaskGroup(of: Void.self) { group in
// Run Task
group.addTask {
let serviceGroup = ServiceGroup(
services: [consumer],
configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []),
logger: logger
)
try await serviceGroup.run()
}
// Task receiving messages
group.addTask {
for try await message in consumer.messages {
// Do something with message
// ...
try await consumer.commitSync(message)
}
}
}
Security Mechanisms
Both the KafkaProducer and the KafkaConsumer can be configured to use different security mechanisms.
Plaintext
var configuration = KafkaProducerConfiguration(bootstrapBrokerAddresses: [])
configuration.securityProtocol = .plaintext
TLS
var configuration = KafkaProducerConfiguration(bootstrapBrokerAddresses: [])
configuration.securityProtocol = .tls()
SASL
let kerberosConfiguration = KafkaConfiguration.SASLMechanism.KerberosConfiguration(
keytab: "KEYTAB_FILE"
)
var config = KafkaProducerConfiguration(bootstrapBrokerAddresses: [])
config.securityProtocol = .saslPlaintext(
mechanism: .gssapi(kerberosConfiguration: kerberosConfiguration)
)
SASL + TLS
let saslMechanism = KafkaConfiguration.SASLMechanism.scramSHA256(
username: "USERNAME",
password: "PASSWORD"
)
var config = KafkaProducerConfiguration(bootstrapBrokerAddresses: [])
config.securityProtocol = .saslTLS(
saslMechanism: saslMechanism
)
librdkafka
The Package depends on the librdkafka library, which is included as a git submodule.
It has source files that are excluded in Package.swift.
Dependencies
librdkafka depends on openssl, meaning that libssl-dev must be present at build time.
openssl@3 can be installed on macOS, among others, through brew.
Development Setup
Running tests locally
Integration tests require a running Kafka broker which can be started with Docker:
docker run -d -p 9092:9092 apache/kafka:3.9.1
# After starting the container run the tests
swift test
Running tests in Docker
We provide a Docker environment for this package. This will automatically start a local Kafka server and run the tests:
docker compose -f docker/docker-compose.yaml run client swift test
Alternatively you can use a Makefile target:
make docker-test
You can specify Swift compiler version using SWIFT_VERSION environment variable:
SWIFT_VERSION=6.2 make docker-test