Files
Alexey Bezhan cb9237ec5a Replace separate docker-compose files with a configurable one (#214)
### Motivation

`Dockerfile` and `docker-compose.yaml` still used Swift 5.9 as the
default base image.
Having a separate compose file for each Swift version and separate
service for each command is potentially hard to maintain and not very
flexible.

### Modifications

#### Update Dockerfile to use swift:6.0 base image

Drop the separate swift format installation steps now that it's part of
the toolchain.

Merge dependency installation steps to reduce the number of image
layers.

#### Replace separate docker-compose files with a configurable one

Instead of having a service per command and separate compose files for
each Swift version we have a compose files with two services: kafka
broker and the client.

Client service will build and tag the docker image on demand using the
`SWIFT_VERSION` and `UBUNTU_VERSION` environment variables to set base
Swift image, removing the need for separate compose files for each
version.

Client service can be started with any command so we don't need separate
services for each one.

#### Add Makefile targets to run build and tests using docker-compose

#### Update README local development instructions

Also removes suggestions to install local Kafka and ZK for running tests
on macOS.

### Result

Tests can be executed inside docker for any support Swift version with:

```shell
SWIFT_VERSION=6.x make docker-test
```
2026-03-12 11:43:36 +00:00

8.1 KiB

Swift Kafka Client

The Swift Kafka Client library provides a convenient way to interact with Apache Kafka by leveraging Swift's new concurrency features. This package wraps the native librdkafka library.

Adding Kafka as a Dependency

To use the Kafka library in a SwiftPM project, add the following line to the dependencies in your Package.swift file:

.package(url: "https://github.com/swift-server/swift-kafka-client", branch: "main")

Include "Kafka" as a dependency for your executable target:

.target(name: "<target>", dependencies: [
    .product(name: "Kafka", package: "swift-kafka-client"),
]),

Finally, add import Kafka to your source code.

Usage

Kafka should be used within a Swift Service Lifecycle ServiceGroup for proper startup and shutdown handling. Both the KafkaProducer and the KafkaConsumer implement the Service protocol.

Producer API

The send(_:) method of KafkaProducer returns a message-id that can later be used to identify the corresponding acknowledgement. Acknowledgements are received through the events AsyncSequence. Each acknowledgement indicates that producing a message was successful or returns an error.

let brokerAddress = KafkaConfiguration.BrokerAddress(host: "localhost", port: 9092)
let configuration = KafkaProducerConfiguration(bootstrapBrokerAddresses: [brokerAddress])

let (producer, events) = try KafkaProducer.makeProducerWithEvents(
    configuration: configuration,
    logger: logger
)

await withThrowingTaskGroup(of: Void.self) { group in

    // Run Task
    group.addTask {
        let serviceGroup = ServiceGroup(
            services: [producer],
            configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []),
            logger: logger
        )
        try await serviceGroup.run()
    }

    // Task sending message and receiving events
    group.addTask {
        let messageID = try producer.send(
            KafkaProducerMessage(
                topic: "topic-name",
                value: "Hello, World!"
            )
        )

        for await event in events {
            switch event {
            case .deliveryReports(let deliveryReports):
                // Check what messages the delivery reports belong to
            default:
                break // Ignore any other events
            }
        }
    }
}

Consumer API

After initializing the KafkaConsumer with a topic-partition pair to read from, messages can be consumed using the messages AsyncSequence.

let brokerAddress = KafkaConfiguration.BrokerAddress(host: "localhost", port: 9092)
let configuration = KafkaConsumerConfiguration(
    consumptionStrategy: .partition(
        KafkaPartition(rawValue: 0),
        topic: "topic-name"
    ),
    bootstrapBrokerAddresses: [brokerAddress]
)

let consumer = try KafkaConsumer(
    configuration: configuration,
    logger: logger
)

await withThrowingTaskGroup(of: Void.self) { group in

    // Run Task
    group.addTask {
        let serviceGroup = ServiceGroup(
            services: [consumer],
            configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []),
            logger: logger
        )
        try await serviceGroup.run()
    }

    // Task receiving messages
    group.addTask {
        for try await message in consumer.messages {
            // Do something with message
        }
    }
}

Consumer Groups

Kafka also allows users to subscribe to an array of topics as part of a consumer group.

let brokerAddress = KafkaConfiguration.BrokerAddress(host: "localhost", port: 9092)
let configuration = KafkaConsumerConfiguration(
    consumptionStrategy: .group(id: "example-group", topics: ["topic-name"]),
    bootstrapBrokerAddresses: [brokerAddress]
)

let consumer = try KafkaConsumer(
    configuration: configuration,
    logger: logger
)

await withThrowingTaskGroup(of: Void.self) { group in

    // Run Task
    group.addTask {
        let serviceGroup = ServiceGroup(
            services: [consumer],
            configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []),
            logger: logger
        )
        try await serviceGroup.run()
    }

    // Task receiving messages
    group.addTask {
        for try await message in consumer.messages {
            // Do something with message
        }
    }
}

Manual commits

By default, the KafkaConsumer automatically commits message offsets after receiving the corresponding message. However, we allow users to disable this setting and commit message offsets manually.

let brokerAddress = KafkaConfiguration.BrokerAddress(host: "localhost", port: 9092)
var configuration = KafkaConsumerConfiguration(
    consumptionStrategy: .group(id: "example-group", topics: ["topic-name"]),
    bootstrapBrokerAddresses: [brokerAddress]
)
configuration.isAutoCommitEnabled = false

let consumer = try KafkaConsumer(
    configuration: configuration,
    logger: logger
)

await withThrowingTaskGroup(of: Void.self) { group in

    // Run Task
    group.addTask {
        let serviceGroup = ServiceGroup(
            services: [consumer],
            configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []),
            logger: logger
        )
        try await serviceGroup.run()
    }

    // Task receiving messages
    group.addTask {
        for try await message in consumer.messages {
            // Do something with message
            // ...
            try await consumer.commitSync(message)
        }
    }
}

Security Mechanisms

Both the KafkaProducer and the KafkaConsumer can be configured to use different security mechanisms.

Plaintext

var configuration = KafkaProducerConfiguration(bootstrapBrokerAddresses: [])
configuration.securityProtocol = .plaintext

TLS

var configuration = KafkaProducerConfiguration(bootstrapBrokerAddresses: [])
configuration.securityProtocol = .tls()

SASL

let kerberosConfiguration = KafkaConfiguration.SASLMechanism.KerberosConfiguration(
    keytab: "KEYTAB_FILE"
)

var config = KafkaProducerConfiguration(bootstrapBrokerAddresses: [])
config.securityProtocol = .saslPlaintext(
    mechanism: .gssapi(kerberosConfiguration: kerberosConfiguration)
)

SASL + TLS

let saslMechanism = KafkaConfiguration.SASLMechanism.scramSHA256(
    username: "USERNAME",
    password: "PASSWORD"
)

var config = KafkaProducerConfiguration(bootstrapBrokerAddresses: [])
config.securityProtocol = .saslTLS(
    saslMechanism: saslMechanism
)

librdkafka

The Package depends on the librdkafka library, which is included as a git submodule. It has source files that are excluded in Package.swift.

Dependencies

librdkafka depends on openssl, meaning that libssl-dev must be present at build time. openssl@3 can be installed on macOS, among others, through brew.

Development Setup

Running tests locally

Integration tests require a running Kafka broker which can be started with Docker:

docker run -d -p 9092:9092 apache/kafka:3.9.1

# After starting the container run the tests
swift test

Running tests in Docker

We provide a Docker environment for this package. This will automatically start a local Kafka server and run the tests:

docker compose -f docker/docker-compose.yaml run client swift test

Alternatively you can use a Makefile target:

make docker-test

You can specify Swift compiler version using SWIFT_VERSION environment variable:

SWIFT_VERSION=6.2 make docker-test