ssikka a83e841554 Add commit-all stored offsets API (#230)
## Summary

Add `scheduleCommit()` and `commit()` parameterless overloads to
`KafkaConsumer` that commit **all stored offsets** to the broker. This
is the standard pattern for at-least-once consumers using
`storeOffset()` + manual commit, matching the `rd_kafka_commit(rk, NULL,
...)` API available in Rust, Java, Go, and Python Kafka clients.

## New API

```swift
// Fire-and-forget — schedules commit, returns immediately
try consumer.scheduleCommit()

// Awaitable — waits for broker acknowledgement
try await consumer.commit()
```

Both methods:
- Commit all offsets currently in librdkafka's local offset store
- Require `enableAutoCommit = false` (throws config error otherwise)
- Throw `connectionClosed` on a closed consumer

## Fixes (commit safety)

While adding the new API, we identified and fixed safety issues in the
existing `commit(_ message:)` that also apply to the new `commit()`:

**CommitPromise** replaces `CapturedCommitCallback`:
- 3-state enum (`initial` → `waiting` → `completed`) with
`NIOLockedValueBox` guarantees exactly-one continuation resume
- `withTaskCancellationHandler` ensures cancelled tasks get
`CancellationError` instead of leaking the continuation
- `passRetained`/`takeRetainedValue` properly balances ARC across the C
callback boundary
- Error check on `rd_kafka_commit_queue` return value with retain count
cleanup on the synchronous error path

## Test plan

**Unit tests** (mock broker, no Kafka needed):
- `scheduleCommitAllFailsWithAutoCommitEnabled`
- `commitAllFailsWithAutoCommitEnabled`
- `scheduleCommitAllFailsOnClosedConsumer`
- `commitAllFailsOnClosedConsumer`
- `scheduleCommitAllSucceeds`

**Integration tests** (requires broker):
- `commitAllCommitsStoredOffsets` — produce → consume → storeOffset →
`commit()` → verify via `committed()`
- `scheduleCommitAllCommitsStoredOffsets` — same flow with
`scheduleCommit()` (fire-and-forget)
2026-04-28 15:03:53 -04:00
2023-05-12 10:15:42 +01:00
2024-11-13 10:28:48 +01:00
2024-11-13 10:28:48 +01:00
2023-05-12 10:15:42 +01:00
2024-11-13 10:28:48 +01:00
2024-11-13 10:28:48 +01:00
2024-11-13 10:28:48 +01:00
2023-06-08 14:21:26 +01:00
2022-07-15 09:15:59 +01:00
2023-08-07 05:48:39 -07:00

Swift Kafka Client

The Swift Kafka Client library provides a convenient way to interact with Apache Kafka by leveraging Swift's new concurrency features. This package wraps the native librdkafka library.

Adding Kafka as a Dependency

To use the Kafka library in a SwiftPM project, add the following line to the dependencies in your Package.swift file:

.package(url: "https://github.com/swift-server/swift-kafka-client", branch: "main")

Include "Kafka" as a dependency for your executable target:

.target(name: "<target>", dependencies: [
    .product(name: "Kafka", package: "swift-kafka-client"),
]),

Finally, add import Kafka to your source code.

Usage

Kafka should be used within a Swift Service Lifecycle ServiceGroup for proper startup and shutdown handling. Both the KafkaProducer and the KafkaConsumer implement the Service protocol.

Producer API

The send(_:) method of KafkaProducer returns a message-id that can later be used to identify the corresponding acknowledgement. Acknowledgements are received through the events AsyncSequence. Each acknowledgement indicates that producing a message was successful or returns an error.

let brokerAddress = KafkaConfiguration.BrokerAddress(host: "localhost", port: 9092)
let configuration = KafkaProducerConfiguration(bootstrapBrokerAddresses: [brokerAddress])

let (producer, events) = try KafkaProducer.makeProducerWithEvents(
    configuration: configuration,
    logger: logger
)

await withThrowingTaskGroup(of: Void.self) { group in

    // Run Task
    group.addTask {
        let serviceGroup = ServiceGroup(
            services: [producer],
            configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []),
            logger: logger
        )
        try await serviceGroup.run()
    }

    // Task sending message and receiving events
    group.addTask {
        let messageID = try producer.send(
            KafkaProducerMessage(
                topic: "topic-name",
                value: "Hello, World!"
            )
        )

        for await event in events {
            switch event {
            case .deliveryReports(let deliveryReports):
                // Check what messages the delivery reports belong to
            default:
                break // Ignore any other events
            }
        }
    }
}

Consumer API

After initializing the KafkaConsumer with a topic-partition pair to read from, messages can be consumed using the messages AsyncSequence.

let brokerAddress = KafkaConfiguration.BrokerAddress(host: "localhost", port: 9092)
let configuration = KafkaConsumerConfiguration(
    consumptionStrategy: .partition(
        KafkaPartition(rawValue: 0),
        topic: "topic-name"
    ),
    bootstrapBrokerAddresses: [brokerAddress]
)

let consumer = try KafkaConsumer(
    configuration: configuration,
    logger: logger
)

await withThrowingTaskGroup(of: Void.self) { group in

    // Run Task
    group.addTask {
        let serviceGroup = ServiceGroup(
            services: [consumer],
            configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []),
            logger: logger
        )
        try await serviceGroup.run()
    }

    // Task receiving messages
    group.addTask {
        for try await message in consumer.messages {
            // Do something with message
        }
    }
}

Consumer Groups

Kafka also allows users to subscribe to an array of topics as part of a consumer group.

let brokerAddress = KafkaConfiguration.BrokerAddress(host: "localhost", port: 9092)
let configuration = KafkaConsumerConfiguration(
    consumptionStrategy: .group(id: "example-group", topics: ["topic-name"]),
    bootstrapBrokerAddresses: [brokerAddress]
)

let consumer = try KafkaConsumer(
    configuration: configuration,
    logger: logger
)

await withThrowingTaskGroup(of: Void.self) { group in

    // Run Task
    group.addTask {
        let serviceGroup = ServiceGroup(
            services: [consumer],
            configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []),
            logger: logger
        )
        try await serviceGroup.run()
    }

    // Task receiving messages
    group.addTask {
        for try await message in consumer.messages {
            // Do something with message
        }
    }
}

Manual commits

By default, the KafkaConsumer automatically commits message offsets after receiving the corresponding message. However, we allow users to disable this setting and commit message offsets manually.

let brokerAddress = KafkaConfiguration.BrokerAddress(host: "localhost", port: 9092)
var configuration = KafkaConsumerConfiguration(
    consumptionStrategy: .group(id: "example-group", topics: ["topic-name"]),
    bootstrapBrokerAddresses: [brokerAddress]
)
configuration.isAutoCommitEnabled = false

let consumer = try KafkaConsumer(
    configuration: configuration,
    logger: logger
)

await withThrowingTaskGroup(of: Void.self) { group in

    // Run Task
    group.addTask {
        let serviceGroup = ServiceGroup(
            services: [consumer],
            configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []),
            logger: logger
        )
        try await serviceGroup.run()
    }

    // Task receiving messages
    group.addTask {
        for try await message in consumer.messages {
            // Do something with message
            // ...
            try await consumer.commitSync(message)
        }
    }
}

Security Mechanisms

Both the KafkaProducer and the KafkaConsumer can be configured to use different security mechanisms.

Plaintext

var configuration = KafkaProducerConfiguration(bootstrapBrokerAddresses: [])
configuration.securityProtocol = .plaintext

TLS

var configuration = KafkaProducerConfiguration(bootstrapBrokerAddresses: [])
configuration.securityProtocol = .tls()

SASL

let kerberosConfiguration = KafkaConfiguration.SASLMechanism.KerberosConfiguration(
    keytab: "KEYTAB_FILE"
)

var config = KafkaProducerConfiguration(bootstrapBrokerAddresses: [])
config.securityProtocol = .saslPlaintext(
    mechanism: .gssapi(kerberosConfiguration: kerberosConfiguration)
)

SASL + TLS

let saslMechanism = KafkaConfiguration.SASLMechanism.scramSHA256(
    username: "USERNAME",
    password: "PASSWORD"
)

var config = KafkaProducerConfiguration(bootstrapBrokerAddresses: [])
config.securityProtocol = .saslTLS(
    saslMechanism: saslMechanism
)

librdkafka

The Package depends on the librdkafka library, which is included as a git submodule. It has source files that are excluded in Package.swift.

Dependencies

librdkafka depends on openssl, meaning that libssl-dev must be present at build time. openssl@3 can be installed on macOS, among others, through brew.

Development Setup

Running tests locally

Integration tests require a running Kafka broker which can be started with Docker:

docker run -d -p 9092:9092 apache/kafka:3.9.1

# After starting the container run the tests
swift test

Running tests in Docker

We provide a Docker environment for this package. This will automatically start a local Kafka server and run the tests:

docker compose -f docker/docker-compose.yaml run client swift test

Alternatively you can use a Makefile target:

make docker-test

You can specify Swift compiler version using SWIFT_VERSION environment variable:

SWIFT_VERSION=6.2 make docker-test
S
Description
No description provided
Readme 19 MiB
Languages
Swift 91.8%
Python 7%
C 0.6%
Shell 0.3%
Makefile 0.2%