Skip to content

Commit

Permalink
add/change some comments
Browse files Browse the repository at this point in the history
  • Loading branch information
blindspotbounty committed Jul 31, 2023
1 parent ce702f3 commit 8c77ea1
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 5 deletions.
11 changes: 7 additions & 4 deletions Sources/SwiftKafka/KafkaTransactionalProducer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public final class KafkaTransactionalProducer: Service, Sendable {
///
/// This creates a producer without listening for events.
///
/// - Parameter config: The ``KafkaProducerConfiguration`` for configuring the ``KafkaProducer``.
/// - Parameter config: The ``KafkaTransactionalProducerConfiguration`` for configuring the ``KafkaProducer``.
/// - Parameter topicConfig: The ``KafkaTopicConfiguration`` used for newly created topics.
/// - Parameter logger: A logger.
/// - Returns: The newly created ``KafkaProducer``.
Expand Down Expand Up @@ -57,15 +57,18 @@ public final class KafkaTransactionalProducer: Service, Sendable {
return (transactionalProducer, events)
}

//
public func withTransaction(_ body: @Sendable (KafkaTransaction) async throws -> Void) async throws {
/// Begins Kafka transaction, provides it to given closure
/// Commits transaction unless closure throws
/// - Parameters:
/// - function: revieve KafkaTransaction and use fills it with produced messages and offsets
public func withTransaction(_ function: @Sendable (KafkaTransaction) async throws -> Void) async throws {
let transaction = try KafkaTransaction(
client: try producer.client(),
producer: self.producer
)

do { // need to think here a little bit how to abort transaction
try await body(transaction)
try await function(transaction)
try await transaction.commit()
} catch { // FIXME: maybe catch AbortTransaction?
do {
Expand Down
2 changes: 1 addition & 1 deletion Tests/IntegrationTests/Utilities.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import Logging
extension Logger {
static var kafkaTest: Logger {
var logger = Logger(label: "kafka.test")
logger.logLevel = .debug
logger.logLevel = .debug // TODO: revert
return logger
}
}
Expand Down

0 comments on commit 8c77ea1

Please sign in to comment.