From bca1aeeab2be93071d695e81d12fc2b3dadca9d8 Mon Sep 17 00:00:00 2001 From: zmstone Date: Tue, 7 May 2024 10:58:01 +0200 Subject: [PATCH] chore: log attempts and batch size for failed produce requests --- changelog.md | 3 +++ src/wolff.app.src | 2 +- src/wolff_producer.erl | 13 ++++++++++--- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/changelog.md b/changelog.md index 504a7a6..137be9a 100644 --- a/changelog.md +++ b/changelog.md @@ -1,3 +1,6 @@ +* 1.10.4 + - Log produce failure warning message with number of attempts and batch size. + * 1.10.3 - Fixed typespec for `wolff_client:get_leader_connections/3`. diff --git a/src/wolff.app.src b/src/wolff.app.src index 498e7bb..bed31e2 100644 --- a/src/wolff.app.src +++ b/src/wolff.app.src @@ -1,6 +1,6 @@ {application, wolff, [{description, "Kafka's publisher"}, - {vsn, "1.10.3"}, + {vsn, "1.10.4"}, {registered, []}, {applications, [kernel, diff --git a/src/wolff_producer.erl b/src/wolff_producer.erl index dc507ec..63588fa 100644 --- a/src/wolff_producer.erl +++ b/src/wolff_producer.erl @@ -451,7 +451,9 @@ send_to_kafka(#{sent_reqs := Sent, NewSent = #{request => Req, q_ack_ref => QAckRef, calls => Calls, - attempts => 1}, + attempts => 1, + batch_size => length(FlatBatch) + }, St2 = St1#{sent_reqs := queue:in(NewSent, Sent), sent_reqs_count := NewSentReqsCount, inflight_calls := NewInflightCalls @@ -518,7 +520,9 @@ handle_kafka_ack(#kpro_rsp{api = produce, case queue:peek(SentReqs) of {value, #{request := #kpro_req{ref = Ref}, calls := Calls, - attempts := Attempts}} -> + attempts := Attempts, + batch_size := BatchSize + }} -> case ErrorCode =:= ?no_error of true -> do_handle_kafka_ack(BaseOffset, St); @@ -530,7 +534,10 @@ handle_kafka_ack(#kpro_rsp{api = produce, inc_sent_failed(Config, length(Calls), AttemptedBefore), log_warn(Topic, Partition, "error_in_produce_response", - #{error_code => ErrorCode}), + #{error_code => ErrorCode, + batch_size => BatchSize, + attempts => Attempts + }), erlang:throw(ErrorCode) end; _ ->