Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add approx. transactional api #98

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
45 changes: 3 additions & 42 deletions Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -105,48 +105,7 @@ public struct KafkaProducerConfiguration {

extension KafkaProducerConfiguration {
internal var dictionary: [String: String] {
var resultDict: [String: String] = [:]

resultDict["enable.idempotence"] = String(self.enableIdempotence)
resultDict["queue.buffering.max.messages"] = String(self.queue.bufferingMaxMessages)
resultDict["queue.buffering.max.kbytes"] = String(self.queue.bufferingMaxKBytes)
resultDict["queue.buffering.max.ms"] = String(self.queue.bufferingMaxMilliseconds)
resultDict["message.send.max.retries"] = String(self.messageSendMaxRetries)
resultDict["allow.auto.create.topics"] = String(self.allowAutoCreateTopics)

resultDict["client.id"] = self.clientID
resultDict["bootstrap.servers"] = self.bootstrapServers.map(\.description).joined(separator: ",")
resultDict["message.max.bytes"] = String(self.message.maxBytes)
resultDict["message.copy.max.bytes"] = String(self.message.copyMaxBytes)
resultDict["receive.message.max.bytes"] = String(self.receiveMessageMaxBytes)
resultDict["max.in.flight.requests.per.connection"] = String(self.maxInFlightRequestsPerConnection)
resultDict["metadata.max.age.ms"] = String(self.metadataMaxAgeMilliseconds)
resultDict["topic.metadata.refresh.interval.ms"] = String(self.topicMetadata.refreshIntervalMilliseconds)
resultDict["topic.metadata.refresh.fast.interval.ms"] = String(self.topicMetadata.refreshFastIntervalMilliseconds)
resultDict["topic.metadata.refresh.sparse"] = String(self.topicMetadata.refreshSparse)
resultDict["topic.metadata.propagation.max.ms"] = String(self.topicMetadata.propagationMaxMilliseconds)
resultDict["topic.blacklist"] = self.topicDenylist.joined(separator: ",")
if !self.debug.isEmpty {
resultDict["debug"] = self.debug.map(\.description).joined(separator: ",")
}
resultDict["socket.timeout.ms"] = String(self.socket.timeoutMilliseconds)
resultDict["socket.send.buffer.bytes"] = String(self.socket.sendBufferBytes)
resultDict["socket.receive.buffer.bytes"] = String(self.socket.receiveBufferBytes)
resultDict["socket.keepalive.enable"] = String(self.socket.keepaliveEnable)
resultDict["socket.nagle.disable"] = String(self.socket.nagleDisable)
resultDict["socket.max.fails"] = String(self.socket.maxFails)
resultDict["socket.connection.setup.timeout.ms"] = String(self.socket.connectionSetupTimeoutMilliseconds)
resultDict["broker.address.ttl"] = String(self.broker.addressTTL)
resultDict["broker.address.family"] = self.broker.addressFamily.description
resultDict["reconnect.backoff.ms"] = String(self.reconnect.backoffMilliseconds)
resultDict["reconnect.backoff.max.ms"] = String(self.reconnect.backoffMaxMilliseconds)

// Merge with SecurityProtocol configuration dictionary
resultDict.merge(self.securityProtocol.dictionary) { _, _ in
fatalError("securityProtocol and \(#file) should not have duplicate keys")
}

return resultDict
sharedPropsDictionary
}
}

Expand All @@ -158,6 +117,8 @@ extension KafkaProducerConfiguration: Hashable {}

extension KafkaProducerConfiguration: Sendable {}

extension KafkaProducerConfiguration: KafkaProducerSharedProperties {}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tried to use at least internal shared protocol to unite mostly all properties in producer/transactional producer instead of copying them


// MARK: - KafkaConfiguration + Producer Additions

extension KafkaConfiguration {
Expand Down
142 changes: 142 additions & 0 deletions Sources/SwiftKafka/Configuration/KafkaProducerSharedProperties.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the swift-kafka-gsoc open source project
//
// Copyright (c) 2022 Apple Inc. and the swift-kafka-gsoc project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

internal protocol KafkaProducerSharedProperties: Sendable, Hashable {
// MARK: - SwiftKafka-specific Config properties

/// The time between two consecutive polls.
/// Effectively controls the rate at which incoming events are consumed.
/// Default: `.milliseconds(100)`
var pollInterval: Duration { get }

/// Maximum timeout for flushing outstanding produce requests when the ``KakfaProducer`` is shutting down.
/// Default: `10000`
var flushTimeoutMilliseconds: Int { get }

// MARK: - Producer-specific Config Properties

/// When set to true, the producer will ensure that messages are successfully produced exactly once and in the original produce order. The following configuration properties are adjusted automatically (if not modified by the user) when idempotence is enabled: max.in.flight.requests.per.connection=5 (must be less than or equal to 5), retries=INT32_MAX (must be greater than 0), acks=all, queuing.strategy=fifo. Producer instantation will fail if user-supplied configuration is incompatible.
/// Default: `false`
var enableIdempotence: Bool { get }

/// Producer queue options.
var queue: KafkaConfiguration.QueueOptions { get }

/// How many times to retry sending a failing Message. Note: retrying may cause reordering unless enable.idempotence is set to true.
/// Default: `2_147_483_647`
var messageSendMaxRetries: Int { get }

/// Allow automatic topic creation on the broker when producing to non-existent topics.
/// The broker must also be configured with auto.create.topics.enable=true for this configuration to take effect.
/// Default: `true`
var allowAutoCreateTopics: Bool { get }

// MARK: - Common Client Config Properties

/// Client identifier.
/// Default: `"rdkafka"`
var clientID: String { get }

/// Initial list of brokers.
/// Default: `[]`
var bootstrapServers: [KafkaConfiguration.Broker] { get }

/// Message options.
var message: KafkaConfiguration.MessageOptions { get }

/// Maximum Kafka protocol response message size. This serves as a safety precaution to avoid memory exhaustion in case of protocol hickups. This value must be at least fetch.max.bytes + 512 to allow for protocol overhead; the value is adjusted automatically unless the configuration property is explicitly set.
/// Default: `100_000_000`
var receiveMessageMaxBytes: Int { get }

/// Maximum number of in-flight requests per broker connection. This is a generic property applied to all broker communication, however it is primarily relevant to produce requests. In particular, note that other mechanisms limit the number of outstanding consumer fetch request per broker to one.
/// Default: `1_000_000`
var maxInFlightRequestsPerConnection: Int { get }

/// Metadata cache max age.
/// Default: `900_000`
var metadataMaxAgeMilliseconds: Int { get }

/// Topic metadata options.
var topicMetadata: KafkaConfiguration.TopicMetadataOptions { get }

/// Topic denylist.
/// Default: `[]`
var topicDenylist: [String] { get }

/// Debug options.
/// Default: `[]`
var debug: [KafkaConfiguration.DebugOption] { get }

/// Socket options.
var socket: KafkaConfiguration.SocketOptions { get }

/// Broker options.
var broker: KafkaConfiguration.BrokerOptions { get }

/// Reconnect options.
var reconnect: KafkaConfiguration.ReconnectOptions { get }

/// Security protocol to use (plaintext, ssl, sasl_plaintext, sasl_ssl).
/// Default: `.plaintext`
var securityProtocol: KafkaConfiguration.SecurityProtocol { get }

var dictionary: [String: String] { get }
}

extension KafkaProducerSharedProperties {
internal var sharedPropsDictionary: [String: String] {
var resultDict: [String: String] = [:]

resultDict["enable.idempotence"] = String(self.enableIdempotence)
resultDict["queue.buffering.max.messages"] = String(self.queue.bufferingMaxMessages)
resultDict["queue.buffering.max.kbytes"] = String(self.queue.bufferingMaxKBytes)
resultDict["queue.buffering.max.ms"] = String(self.queue.bufferingMaxMilliseconds)
resultDict["message.send.max.retries"] = String(self.messageSendMaxRetries)
resultDict["allow.auto.create.topics"] = String(self.allowAutoCreateTopics)

resultDict["client.id"] = self.clientID
resultDict["bootstrap.servers"] = self.bootstrapServers.map(\.description).joined(separator: ",")
resultDict["message.max.bytes"] = String(self.message.maxBytes)
resultDict["message.copy.max.bytes"] = String(self.message.copyMaxBytes)
resultDict["receive.message.max.bytes"] = String(self.receiveMessageMaxBytes)
resultDict["max.in.flight.requests.per.connection"] = String(self.maxInFlightRequestsPerConnection)
resultDict["metadata.max.age.ms"] = String(self.metadataMaxAgeMilliseconds)
resultDict["topic.metadata.refresh.interval.ms"] = String(self.topicMetadata.refreshIntervalMilliseconds)
resultDict["topic.metadata.refresh.fast.interval.ms"] = String(self.topicMetadata.refreshFastIntervalMilliseconds)
resultDict["topic.metadata.refresh.sparse"] = String(self.topicMetadata.refreshSparse)
resultDict["topic.metadata.propagation.max.ms"] = String(self.topicMetadata.propagationMaxMilliseconds)
resultDict["topic.blacklist"] = self.topicDenylist.joined(separator: ",")
if !self.debug.isEmpty {
resultDict["debug"] = self.debug.map(\.description).joined(separator: ",")
}
resultDict["socket.timeout.ms"] = String(self.socket.timeoutMilliseconds)
resultDict["socket.send.buffer.bytes"] = String(self.socket.sendBufferBytes)
resultDict["socket.receive.buffer.bytes"] = String(self.socket.receiveBufferBytes)
resultDict["socket.keepalive.enable"] = String(self.socket.keepaliveEnable)
resultDict["socket.nagle.disable"] = String(self.socket.nagleDisable)
resultDict["socket.max.fails"] = String(self.socket.maxFails)
resultDict["socket.connection.setup.timeout.ms"] = String(self.socket.connectionSetupTimeoutMilliseconds)
resultDict["broker.address.ttl"] = String(self.broker.addressTTL)
resultDict["broker.address.family"] = self.broker.addressFamily.description
resultDict["reconnect.backoff.ms"] = String(self.reconnect.backoffMilliseconds)
resultDict["reconnect.backoff.max.ms"] = String(self.reconnect.backoffMaxMilliseconds)

// Merge with SecurityProtocol configuration dictionary
resultDict.merge(self.securityProtocol.dictionary) { _, _ in
fatalError("securityProtocol and \(#file) should not have duplicate keys")
}

return resultDict
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the swift-kafka-gsoc open source project
//
// Copyright (c) 2022 Apple Inc. and the swift-kafka-gsoc project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

// FIXME: should we really duplicate `KafkaProducerConfiguration`
// FIXME: after public api updated?
public struct KafkaTransactionalProducerConfiguration {
// MARK: - SwiftKafka-specific Config properties

/// The time between two consecutive polls.
/// Effectively controls the rate at which incoming events are consumed.
/// Default: `.milliseconds(100)`
public var pollInterval: Duration = .milliseconds(100)

/// Maximum timeout for flushing outstanding produce requests when the ``KakfaProducer`` is shutting down.
/// Default: `10000`
public var flushTimeoutMilliseconds: Int = 10000 {
didSet {
precondition(
0...Int(Int32.max) ~= self.flushTimeoutMilliseconds,
"Flush timeout outside of valid range \(0...Int32.max)"
)
}
}

// MARK: - Producer-specific Config Properties

/// When set to true, the producer will ensure that messages are successfully produced exactly once and in the original produce order. The following configuration properties are adjusted automatically (if not modified by the user) when idempotence is enabled: max.in.flight.requests.per.connection=5 (must be less than or equal to 5), retries=INT32_MAX (must be greater than 0), acks=all, queuing.strategy=fifo. Producer instantation will fail if user-supplied configuration is incompatible.
/// Default: `false`
internal let enableIdempotence: Bool = true
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The difference between KafkaProducerConfiguration and KafkaTransactionalProducerConfiguration is in couple of properties:

  1. enableIdempotence always true for transactions
  2. maxInFlightRequestsPerConnection not greater than 5
  3. transactionsTimeout is set to socket timeout or greater than socket timeout


/// Producer queue options.
public var queue: KafkaConfiguration.QueueOptions = .init()

/// How many times to retry sending a failing Message. Note: retrying may cause reordering unless enable.idempotence is set to true.
/// Default: `2_147_483_647`
public var messageSendMaxRetries: Int = 2_147_483_647

/// Allow automatic topic creation on the broker when producing to non-existent topics.
/// The broker must also be configured with auto.create.topics.enable=true for this configuration to take effect.
/// Default: `true`
public var allowAutoCreateTopics: Bool = true

// MARK: - Common Client Config Properties

/// Client identifier.
/// Default: `"rdkafka"`
public var clientID: String = "rdkafka"

/// Initial list of brokers.
/// Default: `[]`
public var bootstrapServers: [KafkaConfiguration.Broker] = []

/// Message options.
public var message: KafkaConfiguration.MessageOptions = .init()

/// Maximum Kafka protocol response message size. This serves as a safety precaution to avoid memory exhaustion in case of protocol hickups. This value must be at least fetch.max.bytes + 512 to allow for protocol overhead; the value is adjusted automatically unless the configuration property is explicitly set.
/// Default: `100_000_000`
public var receiveMessageMaxBytes: Int = 100_000_000

/// Maximum number of in-flight requests per broker connection. This is a generic property applied to all broker communication, however it is primarily relevant to produce requests. In particular, note that other mechanisms limit the number of outstanding consumer fetch request per broker to one.
/// Default: `1_000_000`
public var maxInFlightRequestsPerConnection: Int = 5 {
didSet {
precondition(
0...5 ~= self.maxInFlightRequestsPerConnection,
"Transactional producer can have no more than 5 in flight requests"
)
}
}

/// Metadata cache max age.
/// Default: `900_000`
public var metadataMaxAgeMilliseconds: Int = 900_000

/// Topic metadata options.
public var topicMetadata: KafkaConfiguration.TopicMetadataOptions = .init()

/// Topic denylist.
/// Default: `[]`
public var topicDenylist: [String] = []

/// Debug options.
/// Default: `[]`
public var debug: [KafkaConfiguration.DebugOption] = []

/// Socket options.
public var socket: KafkaConfiguration.SocketOptions = .init()

/// Broker options.
public var broker: KafkaConfiguration.BrokerOptions = .init()

/// Reconnect options.
public var reconnect: KafkaConfiguration.ReconnectOptions = .init()

/// Security protocol to use (plaintext, ssl, sasl_plaintext, sasl_ssl).
/// Default: `.plaintext`
public var securityProtocol: KafkaConfiguration.SecurityProtocol = .plaintext

// TODO: add Docc
var transactionalId: String
var transactionsTimeout: Duration = .seconds(60) // equal to socket TODO: add didSet

public init(transactionalId: String) {
self.transactionalId = transactionalId
}
}

// MARK: - KafkaProducerConfiguration + Hashable

extension KafkaTransactionalProducerConfiguration: Hashable {}

// MARK: - KafkaProducerConfiguration + Sendable

extension KafkaTransactionalProducerConfiguration: Sendable {}

extension KafkaTransactionalProducerConfiguration: KafkaProducerSharedProperties {
internal var dictionary: [String: String] {
var resultDict: [String: String] = sharedPropsDictionary
resultDict["transactional.id"] = self.transactionalId
resultDict["transaction.timeout.ms"] = String(self.transactionsTimeout.totalMilliseconds)
return resultDict
}
}
21 changes: 21 additions & 0 deletions Sources/SwiftKafka/KafkaConsumer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,10 @@ public final class KafkaConsumer: Sendable, Service {
}
}
}

func client() throws -> RDKafkaClient {
return try self.stateMachine.withLockedValue { try $0.client() }
}
}

// MARK: - KafkaConsumer + StateMachine
Expand Down Expand Up @@ -655,5 +659,22 @@ extension KafkaConsumer {
return nil
}
}

func client() throws -> RDKafkaClient {
switch self.state {
case .uninitialized:
fatalError("\(#function) invoked while still in state \(self.state)")
case .initializing(let client, _):
return client
case .consuming(let client, _):
return client
case .consumptionStopped(let client):
return client
case .finishing(let client):
return client
case .finished:
throw KafkaError.client(reason: "Client is stopped")
}
}
}
}
Loading