From fdd0bd929bfe342c9b3e8795ed49ad8321ffad72 Mon Sep 17 00:00:00 2001 From: Maciej Mensfeld Date: Mon, 23 Oct 2023 20:39:57 +0200 Subject: [PATCH] remarks --- lib/waterdrop/instrumentation/callbacks/delivery.rb | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/lib/waterdrop/instrumentation/callbacks/delivery.rb b/lib/waterdrop/instrumentation/callbacks/delivery.rb index 19b0f951..16bb5196 100644 --- a/lib/waterdrop/instrumentation/callbacks/delivery.rb +++ b/lib/waterdrop/instrumentation/callbacks/delivery.rb @@ -17,7 +17,10 @@ class Delivery # Error emitted when a message was purged while it was dispatched RD_KAFKA_RESP_PURGE_INFLIGHT = -151 - private_constant :RD_KAFKA_RESP_PURGE_QUEUE, :RD_KAFKA_RESP_PURGE_INFLIGHT + # Errors related to queue purging that is expected in transactions + PURGE_ERRORS = [RD_KAFKA_RESP_PURGE_INFLIGHT, RD_KAFKA_RESP_PURGE_QUEUE].freeze + + private_constant :RD_KAFKA_RESP_PURGE_QUEUE, :RD_KAFKA_RESP_PURGE_INFLIGHT, :PURGE_ERRORS # @param producer_id [String] id of the current producer # @param transactional [Boolean] is this handle for a transactional or regular producer @@ -36,9 +39,7 @@ def call(delivery_report) if error_code.zero? instrument_acknowledged(delivery_report) - elsif error_code == RD_KAFKA_RESP_PURGE_QUEUE && @transactional - instrument_purged(delivery_report) - elsif error_code == RD_KAFKA_RESP_PURGE_INFLIGHT && @transactional + elsif @transactional && PURGE_ERRORS.include?(error_code) instrument_purged(delivery_report) else instrument_error(delivery_report)