Skip to content

Commit

Permalink
Merge pull request #63 from zmstone/0507-log-batch-size-in-failure-wa…
Browse files Browse the repository at this point in the history
…rning

chore: log attempts and batch size for failed produce requests
  • Loading branch information
zmstone authored May 16, 2024
2 parents dfb1f5c + 530dcd0 commit 8a81c45
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 5 deletions.
3 changes: 3 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
* 1.10.4
- Log produce failure warning message with number of attempts and batch size.

* 1.10.3
- Fixed typespec for `wolff_client:get_leader_connections/3`.

Expand Down
2 changes: 1 addition & 1 deletion src/wolff.app.src
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{application, wolff,
[{description, "Kafka's publisher"},
{vsn, "1.10.3"},
{vsn, "1.10.4"},
{registered, []},
{applications,
[kernel,
Expand Down
2 changes: 1 addition & 1 deletion src/wolff.appup.src
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%% -*- mode: erlang; -*-
{"1.10.3",
{"1.10.4",
[
],
[
Expand Down
13 changes: 10 additions & 3 deletions src/wolff_producer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,9 @@ send_to_kafka(#{sent_reqs := Sent,
NewSent = #{request => Req,
q_ack_ref => QAckRef,
calls => Calls,
attempts => 1},
attempts => 1,
batch_size => length(FlatBatch)
},
St2 = St1#{sent_reqs := queue:in(NewSent, Sent),
sent_reqs_count := NewSentReqsCount,
inflight_calls := NewInflightCalls
Expand Down Expand Up @@ -518,7 +520,9 @@ handle_kafka_ack(#kpro_rsp{api = produce,
case queue:peek(SentReqs) of
{value, #{request := #kpro_req{ref = Ref},
calls := Calls,
attempts := Attempts}} ->
attempts := Attempts,
batch_size := BatchSize
}} ->
case ErrorCode =:= ?no_error of
true ->
do_handle_kafka_ack(BaseOffset, St);
Expand All @@ -530,7 +534,10 @@ handle_kafka_ack(#kpro_rsp{api = produce,
inc_sent_failed(Config, length(Calls), AttemptedBefore),
log_warn(Topic, Partition,
"error_in_produce_response",
#{error_code => ErrorCode}),
#{error_code => ErrorCode,
batch_size => BatchSize,
attempts => Attempts
}),
erlang:throw(ErrorCode)
end;
_ ->
Expand Down

0 comments on commit 8a81c45

Please sign in to comment.