Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/statistics+transactions+rebalanc…
Browse files Browse the repository at this point in the history
…e' into feature/sc-2764/gsoc-support-for-transactions
  • Loading branch information
blindspotbounty committed Aug 2, 2023
2 parents 8c77ea1 + c9aef12 commit 63ffb51
Show file tree
Hide file tree
Showing 17 changed files with 497 additions and 53 deletions.
12 changes: 12 additions & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ let package = Package(
name: "SwiftKafka",
targets: ["SwiftKafka"]
),
.library(
name: "KafkaFoundationCompat",
targets: ["KafkaFoundationCompat"]
),
],
dependencies: [
.package(url: "https://github.com/apple/swift-nio.git", from: "2.55.0"),
Expand All @@ -47,6 +51,7 @@ let package = Package(
// The zstd Swift package produces warnings that we cannot resolve:
// https://github.com/facebook/zstd/issues/3328
.package(url: "https://github.com/facebook/zstd.git", from: "1.5.0"),
.package(url: "https://github.com/swift-extras/swift-extras-json.git", .upToNextMajor(from: "0.6.0")),
],
targets: [
.target(
Expand Down Expand Up @@ -76,6 +81,13 @@ let package = Package(
.product(name: "NIOCore", package: "swift-nio"),
.product(name: "ServiceLifecycle", package: "swift-service-lifecycle"),
.product(name: "Logging", package: "swift-log"),
.product(name: "ExtrasJSON", package: "swift-extras-json"),
]
),
.target(
name: "KafkaFoundationCompat",
dependencies: [
"SwiftKafka",
]
),
.systemLibrary(
Expand Down
18 changes: 18 additions & 0 deletions Sources/KafkaFoundationCompat/Data+KafkaContiguousBytes.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the swift-kafka-gsoc open source project
//
// Copyright (c) 2023 Apple Inc. and the swift-kafka-gsoc project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Foundation
import SwiftKafka

extension Data: KafkaContiguousBytes {}
14 changes: 14 additions & 0 deletions Sources/SwiftKafka/Configuration/KafkaConfiguration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 +206,17 @@ public enum KafkaConfiguration {
public static let v6 = IPAddressFamily(description: "v6")
}
}

extension Duration {
// Internal usage only: librdkafka accepts Int32 as timeouts
internal var totalMilliseconds: Int32 {
return Int32(self.components.seconds * 1000 + self.components.attoseconds / 1_000_000_000_000_000)
}

internal var totalMillisecondsOrMinusOne: Int32 {
return max(self.totalMilliseconds, -1)
}

public static var kafkaUntilEndOfTransactionTimeout: Duration = .milliseconds(-1)
public static var kafkaNoWaitTransaction: Duration = .zero
}
13 changes: 13 additions & 0 deletions Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@ public struct KafkaConsumerConfiguration {
/// Default: `.milliseconds(100)`
public var pollInterval: Duration = .milliseconds(100)

/// Interval for librdkafka statistics reports
/// 0ms - disabled
/// >= 1ms - statistics provided every specified interval
public var statisticsInterval: Duration = .zero {
didSet {
precondition(
self.statisticsInterval.totalMilliseconds > 0 || self.statisticsInterval == .zero /*self.statisticsInterval.canBeRepresentedAsMilliseconds*/,
"Lowest granularity is milliseconds"
)
}
}

/// The strategy used for consuming messages.
/// See ``KafkaConfiguration/ConsumptionStrategy`` for more information.
public var consumptionStrategy: KafkaConfiguration.ConsumptionStrategy
Expand Down Expand Up @@ -128,6 +140,7 @@ extension KafkaConsumerConfiguration {
resultDict["group.id"] = groupID
}

resultDict["statistics.interval.ms"] = String(self.statisticsInterval.totalMilliseconds)
resultDict["session.timeout.ms"] = String(session.timeoutMilliseconds)
resultDict["heartbeat.interval.ms"] = String(heartbeatIntervalMilliseconds)
resultDict["max.poll.interval.ms"] = String(maxPollInvervalMilliseconds)
Expand Down
12 changes: 12 additions & 0 deletions Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,18 @@ public struct KafkaProducerConfiguration {
/// Default: `.milliseconds(100)`
public var pollInterval: Duration = .milliseconds(100)

/// Interval for librdkafka statistics reports
/// 0ms - disabled
/// >= 1ms - statistics provided every specified interval
public var statisticsInterval: Duration = .zero {
didSet {
precondition(
self.statisticsInterval.totalMilliseconds > 0 || self.statisticsInterval == .zero /*self.statisticsInterval.canBeRepresentedAsMilliseconds*/,
"Lowest granularity is milliseconds"
)
}
}

/// Maximum timeout for flushing outstanding produce requests when the ``KakfaProducer`` is shutting down.
/// Default: `10000`
public var flushTimeoutMilliseconds: Int = 10000 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ internal protocol KafkaProducerSharedProperties: Sendable, Hashable {
/// Maximum timeout for flushing outstanding produce requests when the ``KakfaProducer`` is shutting down.
/// Default: `10000`
var flushTimeoutMilliseconds: Int { get }

/// Interval for librdkafka statistics reports
/// 0ms - disabled
/// >= 1ms - statistics provided every specified interval
var statisticsInterval: Duration { get }

// MARK: - Producer-specific Config Properties

Expand Down Expand Up @@ -98,6 +103,7 @@ extension KafkaProducerSharedProperties {
internal var sharedPropsDictionary: [String: String] {
var resultDict: [String: String] = [:]

resultDict["statistics.interval.ms"] = String(self.statisticsInterval.totalMilliseconds)
resultDict["enable.idempotence"] = String(self.enableIdempotence)
resultDict["queue.buffering.max.messages"] = String(self.queue.bufferingMaxMessages)
resultDict["queue.buffering.max.kbytes"] = String(self.queue.bufferingMaxKBytes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,18 @@ public struct KafkaTransactionalProducerConfiguration {
}
}

/// Interval for librdkafka statistics reports
/// 0ms - disabled
/// >= 1ms - statistics provided every specified interval
public var statisticsInterval: Duration = .zero {
didSet {
precondition(
self.statisticsInterval.totalMilliseconds > 0 || self.statisticsInterval == .zero /*self.statisticsInterval.canBeRepresentedAsMilliseconds*/,
"Lowest granularity is milliseconds"
)
}
}

// MARK: - Producer-specific Config Properties

/// When set to true, the producer will ensure that messages are successfully produced exactly once and in the original produce order. The following configuration properties are adjusted automatically (if not modified by the user) when idempotence is enabled: max.in.flight.requests.per.connection=5 (must be less than or equal to 5), retries=INT32_MAX (must be greater than 0), acks=all, queuing.strategy=fifo. Producer instantation will fail if user-supplied configuration is incompatible.
Expand Down
2 changes: 1 addition & 1 deletion Sources/SwiftKafka/Data/String+KafkaContiguousBytes.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ extension String: KafkaContiguousBytes {
public func withUnsafeBytes<R>(_ body: (UnsafeRawBufferPointer) throws -> R) rethrows -> R {
if let read = try self.utf8.withContiguousStorageIfAvailable({ unsafePointer in
// Fast Path
let unsafeRawBufferPointer = UnsafeRawBufferPointer(start: unsafePointer.baseAddress, count: self.count)
let unsafeRawBufferPointer = UnsafeRawBufferPointer(start: unsafePointer.baseAddress, count: self.utf8.count)
return try body(unsafeRawBufferPointer)
}) {
return read
Expand Down
Loading

0 comments on commit 63ffb51

Please sign in to comment.