diff --git a/Package.swift b/Package.swift index 1f100f17..a60f8e73 100644 --- a/Package.swift +++ b/Package.swift @@ -51,6 +51,7 @@ let package = Package( // The zstd Swift package produces warnings that we cannot resolve: // https://github.com/facebook/zstd/issues/3328 .package(url: "https://github.com/facebook/zstd.git", from: "1.5.0"), + .package(url: "https://github.com/swift-extras/swift-extras-json.git", .upToNextMajor(from: "0.6.0")), ], targets: [ .target( @@ -80,6 +81,7 @@ let package = Package( .product(name: "NIOCore", package: "swift-nio"), .product(name: "ServiceLifecycle", package: "swift-service-lifecycle"), .product(name: "Logging", package: "swift-log"), + .product(name: "ExtrasJSON", package: "swift-extras-json"), ] ), .target( diff --git a/Sources/SwiftKafka/Configuration/KafkaConfiguration.swift b/Sources/SwiftKafka/Configuration/KafkaConfiguration.swift index e78bd062..1be45a5a 100644 --- a/Sources/SwiftKafka/Configuration/KafkaConfiguration.swift +++ b/Sources/SwiftKafka/Configuration/KafkaConfiguration.swift @@ -206,3 +206,10 @@ public enum KafkaConfiguration { public static let v6 = IPAddressFamily(description: "v6") } } + +extension Duration { + // Calculated total milliseconds + internal var totalMilliseconds: Int64 { + self.components.seconds * 1000 + self.components.attoseconds / 1_000_000_000_000_000 + } +} diff --git a/Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift b/Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift index 3b63562a..d6fb84b9 100644 --- a/Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift +++ b/Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift @@ -23,6 +23,18 @@ public struct KafkaConsumerConfiguration { /// Default: `.milliseconds(100)` public var pollInterval: Duration = .milliseconds(100) + /// Interval for librdkafka statistics reports + /// 0ms - disabled + /// >= 1ms - statistics provided every specified interval + public var statisticsInterval: Duration = .zero { + didSet { + precondition( + self.statisticsInterval.totalMilliseconds > 0 || self.statisticsInterval == .zero /*self.statisticsInterval.canBeRepresentedAsMilliseconds*/, + "Lowest granularity is milliseconds" + ) + } + } + /// The strategy used for consuming messages. /// See ``KafkaConfiguration/ConsumptionStrategy`` for more information. public var consumptionStrategy: KafkaConfiguration.ConsumptionStrategy @@ -128,6 +140,7 @@ extension KafkaConsumerConfiguration { resultDict["group.id"] = groupID } + resultDict["statistics.interval.ms"] = String(self.statisticsInterval.totalMilliseconds) resultDict["session.timeout.ms"] = String(session.timeoutMilliseconds) resultDict["heartbeat.interval.ms"] = String(heartbeatIntervalMilliseconds) resultDict["max.poll.interval.ms"] = String(maxPollInvervalMilliseconds) diff --git a/Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift b/Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift index 6130c03e..2213fe2a 100644 --- a/Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift +++ b/Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift @@ -20,6 +20,18 @@ public struct KafkaProducerConfiguration { /// Default: `.milliseconds(100)` public var pollInterval: Duration = .milliseconds(100) + /// Interval for librdkafka statistics reports + /// 0ms - disabled + /// >= 1ms - statistics provided every specified interval + public var statisticsInterval: Duration = .zero { + didSet { + precondition( + self.statisticsInterval.totalMilliseconds > 0 || self.statisticsInterval == .zero /*self.statisticsInterval.canBeRepresentedAsMilliseconds*/, + "Lowest granularity is milliseconds" + ) + } + } + /// Maximum timeout for flushing outstanding produce requests when the ``KakfaProducer`` is shutting down. /// Default: `10000` public var flushTimeoutMilliseconds: Int = 10000 { @@ -107,6 +119,7 @@ extension KafkaProducerConfiguration { internal var dictionary: [String: String] { var resultDict: [String: String] = [:] + resultDict["statistics.interval.ms"] = String(self.statisticsInterval.totalMilliseconds) resultDict["enable.idempotence"] = String(self.enableIdempotence) resultDict["queue.buffering.max.messages"] = String(self.queue.bufferingMaxMessages) resultDict["queue.buffering.max.kbytes"] = String(self.queue.bufferingMaxKBytes) diff --git a/Sources/SwiftKafka/KafkaConsumer.swift b/Sources/SwiftKafka/KafkaConsumer.swift index 16b25679..f3faefd1 100644 --- a/Sources/SwiftKafka/KafkaConsumer.swift +++ b/Sources/SwiftKafka/KafkaConsumer.swift @@ -22,6 +22,7 @@ import ServiceLifecycle /// `NIOAsyncSequenceProducerDelegate` that terminates the closes the producer when /// `didTerminate()` is invoked. internal struct KafkaConsumerCloseOnTerminate: Sendable { + let isMessageSequence: Bool let stateMachine: NIOLockedValueBox } @@ -31,7 +32,7 @@ extension KafkaConsumerCloseOnTerminate: NIOAsyncSequenceProducerDelegate { } func didTerminate() { - self.stateMachine.withLockedValue { $0.messageSequenceTerminated() } + self.stateMachine.withLockedValue { $0.messageSequenceTerminated(isMessageSequence: isMessageSequence) } } } @@ -121,6 +122,12 @@ public final class KafkaConsumer: Sendable, Service { NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure, KafkaConsumerCloseOnTerminate > + typealias ProducerEvents = NIOAsyncSequenceProducer< + KafkaConsumerEvent, + NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure, + KafkaConsumerCloseOnTerminate + > + /// The configuration object of the consumer client. private let config: KafkaConsumerConfiguration /// A logger. @@ -146,7 +153,8 @@ public final class KafkaConsumer: Sendable, Service { client: RDKafkaClient, stateMachine: NIOLockedValueBox, config: KafkaConsumerConfiguration, - logger: Logger + logger: Logger, + eventSource: ProducerEvents.Source? = nil ) throws { self.config = config self.stateMachine = stateMachine @@ -155,7 +163,7 @@ public final class KafkaConsumer: Sendable, Service { let sourceAndSequence = NIOThrowingAsyncSequenceProducer.makeSequence( elementType: KafkaConsumerMessage.self, backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure(), - delegate: KafkaConsumerCloseOnTerminate(stateMachine: self.stateMachine) + delegate: KafkaConsumerCloseOnTerminate(isMessageSequence: true, stateMachine: self.stateMachine) ) self.messages = KafkaConsumerMessages( @@ -166,7 +174,8 @@ public final class KafkaConsumer: Sendable, Service { self.stateMachine.withLockedValue { $0.initialize( client: client, - source: sourceAndSequence.source + source: sourceAndSequence.source, + eventSource: eventSource ) } @@ -242,6 +251,11 @@ public final class KafkaConsumer: Sendable, Service { if config.enableAutoCommit == false { subscribedEvents.append(.offsetCommit) } +// Don't listen to statistics even if configured +// As there are no events instantiated +// if config.statisticsInterval != .zero { +// subscribedEvents.append(.statistics) +// } let client = try RDKafkaClient.makeClient( type: .consumer, @@ -250,20 +264,22 @@ public final class KafkaConsumer: Sendable, Service { logger: logger ) - let consumer = try KafkaConsumer( - client: client, - stateMachine: stateMachine, - config: config, - logger: logger - ) - let sourceAndSequence = NIOAsyncSequenceProducer.makeSequence( elementType: KafkaConsumerEvent.self, backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure(), - delegate: KafkaConsumerCloseOnTerminate(stateMachine: stateMachine) + delegate: KafkaConsumerCloseOnTerminate(isMessageSequence: false, stateMachine: stateMachine) ) let eventsSequence = KafkaConsumerEvents(wrappedSequence: sourceAndSequence.sequence) + + let consumer = try KafkaConsumer( + client: client, + stateMachine: stateMachine, + config: config, + logger: logger, + eventSource: sourceAndSequence.source + ) + return (consumer, eventsSequence) } @@ -321,7 +337,7 @@ public final class KafkaConsumer: Sendable, Service { while !Task.isCancelled { let nextAction = self.stateMachine.withLockedValue { $0.nextPollLoopAction() } switch nextAction { - case .pollForAndYieldMessage(let client, let source): + case .pollForAndYieldMessage(let client, let source, let eventSource): let events = client.eventPoll() for event in events { switch event { @@ -332,8 +348,11 @@ public final class KafkaConsumer: Sendable, Service { _ = source.yield(message) case .failure(let error): source.finish() + eventSource?.finish() throw error } + case .statistics(let statistics): + _ = eventSource?.yield(.statistics(statistics)) default: break // Ignore } @@ -383,8 +402,9 @@ public final class KafkaConsumer: Sendable, Service { client: client, logger: self.logger ) - case .triggerGracefulShutdownAndFinishSource(let client, let source): + case .triggerGracefulShutdownAndFinishSource(let client, let source, let eventSource): source.finish() + eventSource?.finish() self._triggerGracefulShutdown( client: client, logger: self.logger @@ -428,17 +448,20 @@ extension KafkaConsumer { /// /// - Parameter client: Client used for handling the connection to the Kafka cluster. /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. + /// - Parameter eventSource: ``NIOAsyncSequenceProducer/Source`` used for yielding new events. case initializing( client: RDKafkaClient, - source: Producer.Source + source: Producer.Source, + eventSource: ProducerEvents.Source? ) /// The ``KafkaConsumer`` is consuming messages. /// /// - Parameter client: Client used for handling the connection to the Kafka cluster. - /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. + /// - Parameter eventSource: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. case consuming( client: RDKafkaClient, - source: Producer.Source + source: Producer.Source, + eventSource: ProducerEvents.Source? ) /// Consumer is still running but the messages asynchronous sequence was terminated. /// All incoming messages will be dropped. @@ -461,14 +484,16 @@ extension KafkaConsumer { /// not yet available when the normal initialization occurs. mutating func initialize( client: RDKafkaClient, - source: Producer.Source + source: Producer.Source, + eventSource: ProducerEvents.Source? ) { guard case .uninitialized = self.state else { fatalError("\(#function) can only be invoked in state .uninitialized, but was invoked in state \(self.state)") } self.state = .initializing( client: client, - source: source + source: source, + eventSource: eventSource ) } @@ -480,7 +505,8 @@ extension KafkaConsumer { /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. case pollForAndYieldMessage( client: RDKafkaClient, - source: Producer.Source + source: Producer.Source, + eventSource: ProducerEvents.Source? ) /// The ``KafkaConsumer`` stopped consuming messages or /// is in the process of shutting down. @@ -502,8 +528,8 @@ extension KafkaConsumer { fatalError("\(#function) invoked while still in state \(self.state)") case .initializing: fatalError("Subscribe to consumer group / assign to topic partition pair before reading messages") - case .consuming(let client, let source): - return .pollForAndYieldMessage(client: client, source: source) + case .consuming(let client, let source, let eventSource): + return .pollForAndYieldMessage(client: client, source: source, eventSource: eventSource) case .consumptionStopped(let client): return .pollWithoutYield(client: client) case .finishing(let client): @@ -532,10 +558,11 @@ extension KafkaConsumer { switch self.state { case .uninitialized: fatalError("\(#function) invoked while still in state \(self.state)") - case .initializing(let client, let source): + case .initializing(let client, let source, let eventSource): self.state = .consuming( client: client, - source: source + source: source, + eventSource: eventSource ) return .setUpConnection(client: client) case .consuming, .consumptionStopped, .finishing, .finished: @@ -545,16 +572,30 @@ extension KafkaConsumer { /// The messages asynchronous sequence was terminated. /// All incoming messages will be dropped. - mutating func messageSequenceTerminated() { + mutating func messageSequenceTerminated(isMessageSequence: Bool) { switch self.state { case .uninitialized: fatalError("\(#function) invoked while still in state \(self.state)") case .initializing: fatalError("Call to \(#function) before setUpConnection() was invoked") case .consumptionStopped: - fatalError("messageSequenceTerminated() must not be invoked more than once") - case .consuming(let client, _): - self.state = .consumptionStopped(client: client) + if isMessageSequence { + fatalError("messageSequenceTerminated() must not be invoked more than once") + } + case .consuming(let client, let source, let eventSource): + // only move to stopping if messages sequence was finished + if isMessageSequence { + self.state = .consumptionStopped(client: client) + // If message sequence is being terminated, it means class deinit is called + // see `messages` field, it is last change to call finish for `eventSource` + eventSource?.finish() + } + else { + // Messages are still consuming, only event source was finished + // Ok, probably, noone wants to listen to events, + // though it might be very bad for rebalancing + self.state = .consuming(client: client, source: source, eventSource: nil) + } case .finishing, .finished: break } @@ -576,7 +617,7 @@ extension KafkaConsumer { fatalError("Subscribe to consumer group / assign to topic partition pair before committing offsets") case .consumptionStopped: fatalError("Cannot store offset when consumption has been stopped") - case .consuming(let client, _): + case .consuming(let client, _, _): return .storeOffset(client: client) case .finishing, .finished: fatalError("\(#function) invoked while still in state \(self.state)") @@ -607,7 +648,7 @@ extension KafkaConsumer { fatalError("Subscribe to consumer group / assign to topic partition pair before committing offsets") case .consumptionStopped: fatalError("Cannot commit when consumption has been stopped") - case .consuming(let client, _): + case .consuming(let client, _, _): return .commitSync(client: client) case .finishing, .finished: return .throwClosedError @@ -628,7 +669,8 @@ extension KafkaConsumer { /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. case triggerGracefulShutdownAndFinishSource( client: RDKafkaClient, - source: Producer.Source + source: Producer.Source, + eventSource: ProducerEvents.Source? ) } @@ -642,11 +684,12 @@ extension KafkaConsumer { fatalError("\(#function) invoked while still in state \(self.state)") case .initializing: fatalError("subscribe() / assign() should have been invoked before \(#function)") - case .consuming(let client, let source): + case .consuming(let client, let source, let eventSource): self.state = .finishing(client: client) return .triggerGracefulShutdownAndFinishSource( client: client, - source: source + source: source, + eventSource: eventSource ) case .consumptionStopped(let client): self.state = .finishing(client: client) diff --git a/Sources/SwiftKafka/KafkaConsumerEvent.swift b/Sources/SwiftKafka/KafkaConsumerEvent.swift index 287ddd33..75b5bf64 100644 --- a/Sources/SwiftKafka/KafkaConsumerEvent.swift +++ b/Sources/SwiftKafka/KafkaConsumerEvent.swift @@ -14,11 +14,15 @@ /// An enumeration representing events that can be received through the ``KafkaConsumerEvents`` asynchronous sequence. public enum KafkaConsumerEvent: Sendable, Hashable { + /// Statistics from librdkafka + case statistics(KafkaStatistics) /// - Important: Always provide a `default` case when switiching over this `enum`. case DO_NOT_SWITCH_OVER_THIS_EXHAUSITVELY - internal init(_ event: RDKafkaClient.KafkaEvent) { + internal init?(_ event: RDKafkaClient.KafkaEvent) { switch event { + case .statistics(let stat): + self = .statistics(stat) case .deliveryReport: fatalError("Cannot cast \(event) to KafkaConsumerEvent") case .consumerMessages: diff --git a/Sources/SwiftKafka/KafkaProducer.swift b/Sources/SwiftKafka/KafkaProducer.swift index 5af88d71..af18157d 100644 --- a/Sources/SwiftKafka/KafkaProducer.swift +++ b/Sources/SwiftKafka/KafkaProducer.swift @@ -118,10 +118,16 @@ public final class KafkaProducer: Service, Sendable { ) throws { let stateMachine = NIOLockedValueBox(StateMachine(logger: logger)) + var subscribedEvents: [RDKafkaEvent] = [.log] // No .deliveryReport here! + // Listen to statistics events when statistics enabled + if config.statisticsInterval != .zero { + subscribedEvents.append(.statistics) + } + let client = try RDKafkaClient.makeClient( type: .producer, configDictionary: config.dictionary, - events: [.log], // No .deliveryReport here! + events: subscribedEvents, logger: logger ) @@ -165,11 +171,17 @@ public final class KafkaProducer: Service, Sendable { delegate: KafkaProducerCloseOnTerminate(stateMachine: stateMachine) ) let source = sourceAndSequence.source + + var subscribedEvents: [RDKafkaEvent] = [.log, .deliveryReport] + // Listen to statistics events when statistics enabled + if config.statisticsInterval != .zero { + subscribedEvents.append(.statistics) + } let client = try RDKafkaClient.makeClient( type: .producer, configDictionary: config.dictionary, - events: [.log, .deliveryReport], + events: subscribedEvents, logger: logger ) diff --git a/Sources/SwiftKafka/KafkaProducerEvent.swift b/Sources/SwiftKafka/KafkaProducerEvent.swift index 8afbf8e8..f2b88706 100644 --- a/Sources/SwiftKafka/KafkaProducerEvent.swift +++ b/Sources/SwiftKafka/KafkaProducerEvent.swift @@ -16,6 +16,8 @@ public enum KafkaProducerEvent: Sendable, Hashable { /// A collection of delivery reports received from the Kafka cluster indicating the status of produced messages. case deliveryReports([KafkaDeliveryReport]) + /// Statistics from librdkafka + case statistics(KafkaStatistics) /// - Important: Always provide a `default` case when switching over this `enum`. case DO_NOT_SWITCH_OVER_THIS_EXHAUSITVELY @@ -23,6 +25,8 @@ public enum KafkaProducerEvent: Sendable, Hashable { switch event { case .deliveryReport(results: let results): self = .deliveryReports(results) + case .statistics(let stat): + self = .statistics(stat) case .consumerMessages: fatalError("Cannot cast \(event) to KafkaProducerEvent") } diff --git a/Sources/SwiftKafka/RDKafka/RDKafkaClient.swift b/Sources/SwiftKafka/RDKafka/RDKafkaClient.swift index 45664d96..58c11094 100644 --- a/Sources/SwiftKafka/RDKafka/RDKafkaClient.swift +++ b/Sources/SwiftKafka/RDKafka/RDKafkaClient.swift @@ -148,6 +148,7 @@ final class RDKafkaClient: Sendable { enum KafkaEvent { case deliveryReport(results: [KafkaDeliveryReport]) case consumerMessages(result: Result) + case statistics(KafkaStatistics) } /// Poll the event `rd_kafka_queue_t` for new events. @@ -178,6 +179,8 @@ final class RDKafkaClient: Sendable { self.handleLogEvent(event) case .offsetCommit: self.handleOffsetCommitEvent(event) + case .statistics: + events.append(self.handleStatistics(event)) case .none: // Finished reading events, return early return events @@ -229,6 +232,11 @@ final class RDKafkaClient: Sendable { // The returned message(s) MUST NOT be freed with rd_kafka_message_destroy(). } + private func handleStatistics(_ event: OpaquePointer?) -> KafkaEvent { + let jsonStr = String(cString: rd_kafka_event_stats(event)) + return .statistics(KafkaStatistics(jsonString: jsonStr)) + } + /// Handle event of type `RDKafkaEvent.log`. /// /// - Parameter event: Pointer to underlying `rd_kafka_event_t`. diff --git a/Sources/SwiftKafka/Utilities/KafkaStatistics.swift b/Sources/SwiftKafka/Utilities/KafkaStatistics.swift new file mode 100644 index 00000000..2f9be9f2 --- /dev/null +++ b/Sources/SwiftKafka/Utilities/KafkaStatistics.swift @@ -0,0 +1,25 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the swift-kafka-gsoc open source project +// +// Copyright (c) 2022 Apple Inc. and the swift-kafka-gsoc project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import ExtrasJSON + +public struct KafkaStatistics: Sendable, Hashable { + public let jsonString: String + + public var json: KafkaStatisticsJson { + get throws { + return try XJSONDecoder().decode(KafkaStatisticsJson.self, from: self.jsonString.utf8) + } + } +} diff --git a/Sources/SwiftKafka/Utilities/KafkaStatisticsJsonModel.swift b/Sources/SwiftKafka/Utilities/KafkaStatisticsJsonModel.swift new file mode 100644 index 00000000..23217a62 --- /dev/null +++ b/Sources/SwiftKafka/Utilities/KafkaStatisticsJsonModel.swift @@ -0,0 +1,177 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the swift-kafka-gsoc open source project +// +// Copyright (c) 2022 Apple Inc. and the swift-kafka-gsoc project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +// This file was generated from JSON Schema using quicktype, do not modify it directly. +// To parse the JSON, add this file to your project and do: +// +// let statistics = try? newJSONDecoder().decode(KafkaStatisticsJsonModel.self, from: jsonData) + +// MARK: - Statistics + +public struct KafkaStatisticsJson: Hashable, Codable { + let name, clientID, type: String? + let ts, time, age, replyq: Int? + let msgCnt, msgSize, msgMax, msgSizeMax: Int? + let simpleCnt, metadataCacheCnt: Int? + let brokers: [String: Broker]? + let topics: [String: Topic]? + let cgrp: Cgrp? + let tx, txBytes, rx, rxBytes: Int? + let txmsgs, txmsgBytes, rxmsgs, rxmsgBytes: Int? + + enum CodingKeys: String, CodingKey { + case name + case clientID = "client_id" + case type, ts, time, age, replyq + case msgCnt = "msg_cnt" + case msgSize = "msg_size" + case msgMax = "msg_max" + case msgSizeMax = "msg_size_max" + case simpleCnt = "simple_cnt" + case metadataCacheCnt = "metadata_cache_cnt" + case brokers, topics, cgrp, tx + case txBytes = "tx_bytes" + case rx + case rxBytes = "rx_bytes" + case txmsgs + case txmsgBytes = "txmsg_bytes" + case rxmsgs + case rxmsgBytes = "rxmsg_bytes" + } +} + +// MARK: - Broker + +public struct Broker: Hashable, Codable { + let name: String? + let nodeid: Int? + let nodename, source, state: String? + let stateage, outbufCnt, outbufMsgCnt, waitrespCnt: Int? + let waitrespMsgCnt, tx, txbytes, txerrs: Int? + let txretries, txidle, reqTimeouts, rx: Int? + let rxbytes, rxerrs, rxcorriderrs, rxpartial: Int? + let rxidle, zbufGrow, bufGrow, wakeups: Int? + let connects, disconnects: Int? + let intLatency, outbufLatency, rtt, throttle: [String: Int]? + let req: [String: Int]? + let toppars: [String: Toppar]? + + enum CodingKeys: String, CodingKey { + case name, nodeid, nodename, source, state, stateage + case outbufCnt = "outbuf_cnt" + case outbufMsgCnt = "outbuf_msg_cnt" + case waitrespCnt = "waitresp_cnt" + case waitrespMsgCnt = "waitresp_msg_cnt" + case tx, txbytes, txerrs, txretries, txidle + case reqTimeouts = "req_timeouts" + case rx, rxbytes, rxerrs, rxcorriderrs, rxpartial, rxidle + case zbufGrow = "zbuf_grow" + case bufGrow = "buf_grow" + case wakeups, connects, disconnects + case intLatency = "int_latency" + case outbufLatency = "outbuf_latency" + case rtt, throttle, req, toppars + } +} + +// MARK: - Toppars + +struct Toppar: Hashable, Codable { + let topic: String? + let partition: Int? + + enum CodingKeys: String, CodingKey { + case topic, partition + } +} + +// MARK: - Cgrp + +struct Cgrp: Hashable, Codable { + let state: String? + let stateage: Int? + let joinState: String? + let rebalanceAge, rebalanceCnt: Int? + let rebalanceReason: String? + let assignmentSize: Int? + + enum CodingKeys: String, CodingKey { + case state, stateage + case joinState = "join_state" + case rebalanceAge = "rebalance_age" + case rebalanceCnt = "rebalance_cnt" + case rebalanceReason = "rebalance_reason" + case assignmentSize = "assignment_size" + } +} + +// MARK: - Topic + +struct Topic: Hashable, Codable { + let topic: String? + let age, metadataAge: Int? + let batchsize, batchcnt: [String: Int]? + let partitions: [String: Partition]? + + enum CodingKeys: String, CodingKey { + case topic, age + case metadataAge = "metadata_age" + case batchsize, batchcnt, partitions + } +} + +// MARK: - Partition + +struct Partition: Hashable, Codable { + let partition, broker, leader: Int? + let desired, unknown: Bool? + let msgqCnt, msgqBytes, xmitMsgqCnt, xmitMsgqBytes: Int? + let fetchqCnt, fetchqSize: Int? + let fetchState: String? + let queryOffset, nextOffset, appOffset, storedOffset: Int? + let commitedOffset, committedOffset, eofOffset, loOffset: Int? + let hiOffset, lsOffset, consumerLag, consumerLagStored: Int? + let txmsgs, txbytes, rxmsgs, rxbytes: Int? + let msgs, rxVerDrops, msgsInflight, nextACKSeq: Int? + let nextErrSeq, ackedMsgid: Int? + + enum CodingKeys: String, CodingKey { + case partition, broker, leader, desired, unknown + case msgqCnt = "msgq_cnt" + case msgqBytes = "msgq_bytes" + case xmitMsgqCnt = "xmit_msgq_cnt" + case xmitMsgqBytes = "xmit_msgq_bytes" + case fetchqCnt = "fetchq_cnt" + case fetchqSize = "fetchq_size" + case fetchState = "fetch_state" + case queryOffset = "query_offset" + case nextOffset = "next_offset" + case appOffset = "app_offset" + case storedOffset = "stored_offset" + case commitedOffset = "commited_offset" + case committedOffset = "committed_offset" + case eofOffset = "eof_offset" + case loOffset = "lo_offset" + case hiOffset = "hi_offset" + case lsOffset = "ls_offset" + case consumerLag = "consumer_lag" + case consumerLagStored = "consumer_lag_stored" + case txmsgs, txbytes, rxmsgs, rxbytes, msgs + case rxVerDrops = "rx_ver_drops" + case msgsInflight = "msgs_inflight" + case nextACKSeq = "next_ack_seq" + case nextErrSeq = "next_err_seq" + case ackedMsgid = "acked_msgid" + } +} diff --git a/Tests/SwiftKafkaTests/KafkaConsumerTests.swift b/Tests/SwiftKafkaTests/KafkaConsumerTests.swift index 6f2fab18..d42b4da1 100644 --- a/Tests/SwiftKafkaTests/KafkaConsumerTests.swift +++ b/Tests/SwiftKafkaTests/KafkaConsumerTests.swift @@ -14,6 +14,7 @@ import struct Foundation.UUID import Logging +import NIOConcurrencyHelpers import ServiceLifecycle @testable import SwiftKafka import XCTest @@ -85,4 +86,53 @@ final class KafkaConsumerTests: XCTestCase { ) } } + + func testConsumerStatistics() async throws { + let uniqueGroupID = UUID().uuidString + var config = KafkaConsumerConfiguration( + consumptionStrategy: .group(id: uniqueGroupID, topics: ["this-topic-does-not-exist"]) + ) + config.statisticsInterval = Duration.milliseconds(10) + + let statistics = NIOLockedValueBox(nil) + let (consumer, events) = try KafkaConsumer.makeConsumerWithEvents(config: config, logger: .kafkaTest) + + let serviceGroup = ServiceGroup( + services: [consumer], + configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []), + logger: .kafkaTest + ) + + try await withThrowingTaskGroup(of: Void.self) { group in + // Run Task + group.addTask { + try await serviceGroup.run() + } + + // check for librdkafka statistics + group.addTask { + for try await event in events { + if case let .statistics(stat) = event { + statistics.withLockedValue { + $0 = stat + } + break + } + } + } + + try await group.next() + + // Shutdown the serviceGroup + await serviceGroup.triggerGracefulShutdown() + } + + let stats = statistics.withLockedValue { $0 } + guard let stats else { + XCTFail("stats are not occurred") + return + } + XCTAssertFalse(stats.jsonString.isEmpty) + XCTAssertNoThrow(try stats.json) + } } diff --git a/Tests/SwiftKafkaTests/KafkaProducerTests.swift b/Tests/SwiftKafkaTests/KafkaProducerTests.swift index 5c664ecc..9e0be7a1 100644 --- a/Tests/SwiftKafkaTests/KafkaProducerTests.swift +++ b/Tests/SwiftKafkaTests/KafkaProducerTests.swift @@ -13,6 +13,7 @@ //===----------------------------------------------------------------------===// import Logging +import NIOConcurrencyHelpers import NIOCore import ServiceLifecycle @testable import SwiftKafka @@ -355,4 +356,51 @@ final class KafkaProducerTests: XCTestCase { XCTAssertNil(producerCopy) } + + func testProducerStatistics() async throws { + self.config.statisticsInterval = Duration.milliseconds(10) + self.config.debug = [.all] + + let statistics = NIOLockedValueBox(nil) + let (producer, events) = try KafkaProducer.makeProducerWithEvents( + config: self.config, + logger: .kafkaTest + ) + + let serviceGroup = ServiceGroup( + services: [producer], + configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []), + logger: .kafkaTest + ) + + try await withThrowingTaskGroup(of: Void.self) { group in + // Run Task + group.addTask { + try await serviceGroup.run() + } + + // check for librdkafka statistics + group.addTask { + for try await e in events { + if case let .statistics(stat) = e { + statistics.withLockedValue { + $0 = stat + } + break + } + } + } + + try await group.next() + // Shutdown the serviceGroup + await serviceGroup.triggerGracefulShutdown() + } + let stats = statistics.withLockedValue { $0 } + guard let stats else { + XCTFail("stats are not occurred") + return + } + XCTAssertFalse(stats.jsonString.isEmpty) + XCTAssertNoThrow(try stats.json) + } }