diff --git a/README.md b/README.md index 54698b9..a783b56 100644 --- a/README.md +++ b/README.md @@ -161,6 +161,8 @@ wolff:send(Producers, [Msg], AckFun). * `max_linger_ms`: Age in milliseconds a batch can stay in queue when the connection is idle (as in no pending acks from kafka). Default=0 (as in send immediately). +* `max_linger_bytes`: Number of bytes to collect before sending it to Kafka. If set to 0, `max_batch_bytes` is taken for mem-only mode, otherwise it's 10 times `max_batch_bytes` (but never exceeds 10MB) to optimize disk write. + * `max_send_ahead`: Number of batches to be sent ahead without receiving ack for the last request. Must be 0 if messages must be delivered in strict order. diff --git a/changelog.md b/changelog.md index 71d5f8e..593256c 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,8 @@ * 4.0.0 - Delete global stats (deprecated since 1.9). + - Move linger delay to front of the buffer queue. + The default value for `max_linger_ms` is `0` as before. + Setting `max_linger_ms=10` will make the disk write batch larger when buffer is configured to disk mode or disk-offload mode. * 3.0.4 - Upgrade to kafka_protocol-4.1.8 diff --git a/src/wolff.app.src b/src/wolff.app.src index efb0a11..6d487a1 100644 --- a/src/wolff.app.src +++ b/src/wolff.app.src @@ -1,6 +1,6 @@ {application, wolff, [{description, "Kafka's publisher"}, - {vsn, "4.0.0"}, + {vsn, "git"}, {registered, []}, {applications, [kernel, diff --git a/src/wolff.erl b/src/wolff.erl index b57993d..5f1f034 100644 --- a/src/wolff.erl +++ b/src/wolff.erl @@ -34,11 +34,13 @@ %% Messaging APIs -export([send/3, - send_sync/3 + send_sync/3, + cast/3 ]). %% Messaging APIs of dynamic producer. -export([send2/4, + cast2/4, send_sync2/4, add_topic/2, remove_topic/2 @@ -120,8 +122,8 @@ stop_and_delete_supervised_producers(Producers) -> %% In case `required_acks' is configured to `none', the callback is evaluated immediately after send. %% The partition number and the per-partition worker pid are returned in a tuple to caller, %% so it may use them to correlate the future `AckFun' evaluation. -%% NOTE: This API has no backpressure, -%% high produce rate may cause execussive ram and disk usage. +%% NOTE: This API is blocked until the batch is enqueued to the producer buffer, otherwise no backpressure. +%% High produce rate may cause excessive ram and disk usage. %% NOTE: In case producers are configured with `required_acks = none', %% the second arg for callback function will always be `?UNKNOWN_OFFSET' (`-1'). -spec send(producers(), [msg()], ack_fun()) -> {partition(), pid()}. @@ -137,6 +139,22 @@ send2(Producers, Topic, Batch, AckFun) -> ok = wolff_producer:send(ProducerPid, Batch, AckFun), {Partition, ProducerPid}. +%% @doc Cast a batch to a partition producer. +%% Even less backpressure than `send/3'. +%% It does not wait for the batch to be enqueued to the producer buffer. +-spec cast(producers(), [msg()], ack_fun()) -> {partition(), pid()}. +cast(Producers, Batch, AckFun) -> + {Partition, ProducerPid} = wolff_producers:pick_producer(Producers, Batch), + ok = wolff_producer:send(ProducerPid, Batch, AckFun, no_wait_for_queued), + {Partition, ProducerPid}. + +%% @doc Topic as argument for dynamic producers, otherwise equivalent to `cast/3'. +-spec cast2(producers(), topic(), [msg()], ack_fun()) -> {partition(), pid()}. +cast2(Producers, Topic, Batch, AckFun) -> + {Partition, ProducerPid} = wolff_producers:pick_producer2(Producers, Topic, Batch), + ok = wolff_producer:send(ProducerPid, Batch, AckFun, no_wait_for_queued), + {Partition, ProducerPid}. + %% @doc Pick a partition producer and send a batch synchronously. %% Raise error exception in case produce pid is down or when timed out. %% NOTE: In case producers are configured with `required_acks => none', diff --git a/src/wolff_producer.erl b/src/wolff_producer.erl index e3bb5f3..3b37a4b 100644 --- a/src/wolff_producer.erl +++ b/src/wolff_producer.erl @@ -20,7 +20,7 @@ -define(MIN_DISCARD_LOG_INTERVAL, 5000). %% APIs --export([start_link/5, stop/1, send/3, send_sync/3]). +-export([start_link/5, stop/1, send/3, send/4, send_sync/3]). %% gen_server callbacks -export([code_change/3, handle_call/3, handle_cast/2, handle_info/2, init/1, terminate/2]). @@ -45,6 +45,7 @@ ack_timeout | max_batch_bytes | max_linger_ms | + max_linger_bytes | max_send_ahead | compression | drop_if_highmem | @@ -60,6 +61,7 @@ ack_timeout => timeout(), max_batch_bytes => pos_integer(), max_linger_ms => non_neg_integer(), + max_linger_bytes => non_neg_integer(), max_send_ahead => non_neg_integer(), compression => kpro:compress_option(), drop_if_highmem => boolean(), @@ -75,6 +77,7 @@ ack_timeout => timeout(), max_batch_bytes => pos_integer(), max_linger_ms => non_neg_integer(), + max_linger_bytes => non_neg_integer(), max_send_ahead => non_neg_integer(), compression => kpro:compress_option(), drop_if_highmem => boolean(), @@ -95,7 +98,9 @@ -define(ACK_CB(AckCb, Partition), {AckCb, Partition}). -define(no_queue_ack, no_queue_ack). -define(no_caller_ack, no_caller_ack). - +-define(MAX_LINGER_BYTES, (10 bsl 20)). +-type ack_fun() :: wolff:ack_fun(). +-type send_req() :: ?SEND_REQ({pid(), reference()}, [wolff:msg()], ack_fun()). -type sent() :: #{req_ref := reference(), q_items := [?Q_ITEM(_CallId, _Ts, _Batch)], q_ack_ref := replayq:ack_ref(), @@ -106,7 +111,7 @@ , client_id := wolff:client_id() , config := config_state() , conn := undefined | _ - , ?linger_expire_timer := false | timer:tref() + , ?linger_expire_timer := false | reference() , partition := partition() , pending_acks := #{} % CallId => AckCb , produce_api_vsn := undefined | _ @@ -115,6 +120,7 @@ , sent_reqs_count := non_neg_integer() , inflight_calls := non_neg_integer() , topic := topic() + , calls := empty | #{ts := pos_integer(), bytes := pos_integer(), batch_r := [send_req()]} }. %% @doc Start a per-partition producer worker. @@ -130,6 +136,9 @@ %% exact max allowed message size configured in kafka. %% * `max_linger_ms': Age in milliseconds a baatch can stay in queue when the connection %% is idle (as in no pending acks). Default: 0 (as in send immediately) +%% * `max_linger_bytes': Number of bytes to collect before sending it to Kafka. +%% If set to 0, `max_batch_bytes' is taken for mem-only mode, otherwise it's 10 times +%% `max_batch_bytes' (but never exceeds 10MB) to optimize disk write. %% * `max_send_ahead': Number of batches to be sent ahead without receiving ack for %% the last request. Must be 0 if messages must be delivered in strict order. %% * `compression': `no_compression', `snappy' or `gzip'. @@ -150,16 +159,22 @@ start_link(ClientId, Topic, Partition, MaybeConnPid, Config) -> stop(Pid) -> gen_server:call(Pid, stop, infinity). +%% @equiv send(Pid, Batch, AckFun, wait_for_queued) +-spec send(pid(), [wolff:msg()], wolff:ack_fun()) -> ok. +send(Pid, Batch, AckFun) -> + send(Pid, Batch, AckFun, wait_for_queued). + %% @doc Send a batch asynchronously. %% The callback function is evaluated by producer process when ack is received from kafka. %% In case `required_acks' is configured to `none', the callback is evaluated immediately after send. %% NOTE: This API has no backpressure, -%% high produce rate may cause execussive ram and disk usage. +%% high produce rate may cause excessive ram and disk usage. +%% Even less backpressure when the 4th arg is `no_linger'. %% NOTE: It's possible that two or more batches get included into one produce request. %% But a batch is never split into produce requests. %% Make sure it will not exceed the `max_batch_bytes' limit when sending a batch. --spec send(pid(), [wolff:msg()], wolff:ack_fun()) -> ok. -send(Pid, [_ | _] = Batch0, AckFun) -> +-spec send(pid(), [wolff:msg()], wolff:ack_fun(), WaitForQueued::wait_for_queued | no_wait_for_queued) -> ok. +send(Pid, [_ | _] = Batch0, AckFun, wait_for_queued) -> Caller = self(), Mref = erlang:monitor(process, Pid), Batch = ensure_ts(Batch0), @@ -170,7 +185,11 @@ send(Pid, [_ | _] = Batch0, AckFun) -> ok; {'DOWN', Mref, _, _, Reason} -> erlang:error({producer_down, Reason}) - end. + end; +send(Pid, [_ | _] = Batch0, AckFun, no_wait_for_queued) -> + Batch = ensure_ts(Batch0), + erlang:send(Pid, ?SEND_REQ(?no_queued_reply, Batch, AckFun)), + ok. %% @doc Send a batch synchronously. %% Raise error exception in case produce pid is down or when timed out. @@ -183,8 +202,7 @@ send_sync(Pid, Batch0, Timeout) -> _ = erlang:send(Caller, {Mref, Partition, BaseOffset}), ok end, - Batch = ensure_ts(Batch0), - erlang:send(Pid, ?SEND_REQ(?no_queued_reply, Batch, AckFun)), + ok = send(Pid, Batch0, AckFun, no_wait_for_queued), receive {Mref, Partition, BaseOffset} -> erlang:demonitor(Mref, [flush]), @@ -251,7 +269,8 @@ do_init(#{client_id := ClientId, fun(Meta) -> Meta#{partition_id => Partition} end, #{partition_id => Partition}, Config0), - Config = maps:without([replayq_dir, replayq_seg_bytes], Config1), + Config2 = resolve_max_linger_bytes(Config1, Q), + Config = maps:without([replayq_dir, replayq_seg_bytes], Config2), wolff_metrics:queuing_set(Config, replayq:count(Q)), wolff_metrics:inflight_set(Config, 0), St#{replayq => Q, @@ -263,7 +282,8 @@ do_init(#{client_id := ClientId, sent_reqs_count => 0, inflight_calls => 0, conn := undefined, - client_id => ClientId + client_id => ClientId, + calls => empty }. handle_call(stop, From, St) -> @@ -275,11 +295,14 @@ handle_call(_Call, _From, St) -> handle_info({do_init, St0}, _) -> St = do_init(St0), {noreply, St}; -handle_info(?linger_expire, St) -> - {noreply, maybe_send_to_kafka(St#{?linger_expire_timer := false})}; -handle_info(?SEND_REQ(_, Batch, _) = Call, #{config := #{max_batch_bytes := Limit}} = St0) -> - {Calls, _CollectedBytes} = collect_send_calls([Call], batch_bytes(Batch), Limit), - St1 = enqueue_calls(Calls, St0), +handle_info(?linger_expire, St0) -> + St1 = enqueue_calls(St0#{?linger_expire_timer => false}, no_linger), + St = maybe_send_to_kafka(St1), + {noreply, St}; +handle_info(?SEND_REQ(_, Batch, _) = Call, #{calls := Calls0, config := #{max_linger_bytes := Max}} = St0) -> + Bytes = batch_bytes(Batch), + Calls = collect_send_calls(Call, Bytes, Calls0, Max), + St1 = enqueue_calls(St0#{calls => Calls}, maybe_linger), St = maybe_send_to_kafka(St1), {noreply, St}; handle_info({msg, Conn, Rsp}, #{conn := Conn} = St0) -> @@ -366,11 +389,26 @@ ensure_ts(Batch) -> make_call_id(Base) -> Base + erlang:unique_integer([positive]). +resolve_max_linger_bytes(#{max_linger_bytes := 0, + max_batch_bytes := Mb + } = Config, Q) -> + case replayq:is_mem_only(Q) of + true -> + Config#{max_linger_bytes => Mb}; + false -> + %% when disk or offload mode, try to linger with more bytes + %% to optimize disk write performance + Config#{max_linger_bytes => min(?MAX_LINGER_BYTES, Mb * 10)} + end; +resolve_max_linger_bytes(Config, _Q) -> + Config. + use_defaults(Config) -> use_defaults(Config, [{required_acks, all_isr}, {ack_timeout, 10000}, {max_batch_bytes, ?WOLFF_KAFKA_DEFAULT_MAX_MESSAGE_BYTES}, {max_linger_ms, 0}, + {max_linger_bytes, 0}, {max_send_ahead, 0}, {compression, no_compression}, {reconnect_delay_ms, 2000} @@ -451,38 +489,17 @@ maybe_send_to_kafka(#{conn := Conn, replayq := Q} = St) -> maybe_send_to_kafka_has_pending(St) -> case is_send_ahead_allowed(St) of - true -> maybe_send_to_kafka_now(St); + true -> send_to_kafka(St); false -> St %% reached max inflight limit end. -maybe_send_to_kafka_now(#{?linger_expire_timer := LTimer, - replayq := Q, - config := #{max_linger_ms := Max}} = St) -> - First = replayq:peek(Q), - LingerTimeout = Max - (now_ts() - get_item_ts(First)), - case LingerTimeout =< 0 of - true -> - %% the oldest item is too old, send now - send_to_kafka(St); %% send now - false when is_reference(LTimer) -> - %% timer already started - St; - false -> - %% delay send, try to accumulate more into the batch - Ref = erlang:send_after(LingerTimeout, self(), ?linger_expire), - St#{?linger_expire_timer := Ref} - end. - send_to_kafka(#{sent_reqs := SentReqs, sent_reqs_count := SentReqsCount, inflight_calls := InflightCalls, replayq := Q, config := #{max_batch_bytes := BytesLimit} = Config, - conn := Conn, - ?linger_expire_timer := LTimer + conn := Conn } = St0) -> - %% timer might have already expired, but should do no harm - is_reference(LTimer) andalso erlang:cancel_timer(LTimer), {NewQ, QAckRef, Items} = replayq:pop(Q, #{bytes_limit => BytesLimit, count_limit => 999999999}), wolff_metrics:queuing_set(Config, replayq:count(NewQ)), @@ -491,7 +508,7 @@ send_to_kafka(#{sent_reqs := SentReqs, NewInflightCalls = InflightCalls + NrOfCalls, _ = wolff_metrics:inflight_set(Config, NewInflightCalls), #kpro_req{ref = Ref, no_ack = NoAck} = Req = make_request(Items, St0), - St1 = St0#{replayq := NewQ, ?linger_expire_timer := false}, + St1 = St0#{replayq := NewQ}, Sent = #{req_ref => Ref, q_items => Items, q_ack_ref => QAckRef, @@ -534,8 +551,6 @@ queue_item_marshaller(?Q_ITEM(_, _, _) = I) -> queue_item_marshaller(Bin) when is_binary(Bin) -> binary_to_term(Bin). -get_item_ts(?Q_ITEM(_, Ts, _)) -> Ts. - get_produce_version(#{conn := Conn} = St) when is_pid(Conn) -> Vsn = case kpro:get_api_vsn_range(Conn, produce) of {ok, {_Min, Max}} -> Max; @@ -825,24 +840,78 @@ zz(I) -> (I bsl 1) bxor (I bsr 63). request_async(Conn, Req) when is_pid(Conn) -> ok = kpro:send(Conn, Req). +%% collect send calls which are already sent to mailbox, +%% the collection is size-limited by the max_linger_bytes config. +collect_send_calls(Call, Bytes, empty, Max) -> + Init = #{ts => now_ts(), bytes => 0, batch_r => []}, + collect_send_calls(Call, Bytes, Init, Max); +collect_send_calls(Call, Bytes, #{ts := Ts, bytes := Bytes0, batch_r := BatchR} = Calls, Max) -> + Sum = Bytes0 + Bytes, + R = Calls#{ts => Ts, bytes => Sum, batch_r => [Call | BatchR]}, + case Sum < Max of + true -> + collect_send_calls2(R, Max); + false -> + R + end. + %% Collect all send requests which are already in process mailbox -collect_send_calls(Calls, Size, Limit) when Size >= Limit -> - {lists:reverse(Calls), Size}; -collect_send_calls(Calls, Size, Limit) -> +collect_send_calls2(Calls, Max) -> receive ?SEND_REQ(_, Batch, _) = Call -> - collect_send_calls([Call | Calls], Size + batch_bytes(Batch), Limit) + Bytes = batch_bytes(Batch), + collect_send_calls(Call, Bytes, Calls, Max) after 0 -> - {lists:reverse(Calls), Size} + Calls end. -enqueue_calls(Calls, #{replayq := Q, - pending_acks := PendingAcks0, - call_id_base := CallIdBase, - partition := Partition, - config := Config0 - } = St0) -> +ensure_linger_expire_timer_start(#{?linger_expire_timer := false} = St, Timeout) -> + %% delay enqueue, try to accumulate more into the batch + Ref = erlang:send_after(Timeout, self(), ?linger_expire), + St#{?linger_expire_timer := Ref}; +ensure_linger_expire_timer_start(St, _Timeout) -> + %% timer is already started + St. + +ensure_linger_expire_timer_cancel(#{?linger_expire_timer := LTimer} = St) -> + _ = is_reference(LTimer) andalso erlang:cancel_timer(LTimer), + St#{?linger_expire_timer => false}. + +%% check if the call collection should continue to linger before enqueue +is_linger_continue(#{calls := Calls, config := Config}) -> + #{max_linger_ms := MaxLingerMs, max_linger_bytes := MaxLingerBytes} = Config, + #{ts := Ts, bytes := Bytes} = Calls, + case Bytes < MaxLingerBytes of + true -> + TimeLeft = MaxLingerMs - (now_ts() - Ts), + (TimeLeft > 0) andalso {true, TimeLeft}; + false -> + false + end. + +enqueue_calls(#{calls := empty} = St, _) -> + %% no call to enqueue + St; +enqueue_calls(St, maybe_linger) -> + case is_linger_continue(St) of + {true, Timeout} -> + ensure_linger_expire_timer_start(St, Timeout); + false -> + enqueue_calls(St, no_linger) + end; +enqueue_calls(#{calls := #{batch_r := CallsR}} = St0, no_linger) -> + Calls = lists:reverse(CallsR), + St = ensure_linger_expire_timer_cancel(St0), + enqueue_calls2(Calls, St#{calls => empty}). + +enqueue_calls2(Calls, + #{replayq := Q, + pending_acks := PendingAcks0, + call_id_base := CallIdBase, + partition := Partition, + config := Config0 + } = St0) -> {QueueItems, PendingAcks, CallByteSize} = lists:foldl( fun(?SEND_REQ(_From, Batch, AckFun), {Items, PendingAcksIn, Size}) -> diff --git a/src/wolff_producers.erl b/src/wolff_producers.erl index 068d8b2..0c1edd0 100644 --- a/src/wolff_producers.erl +++ b/src/wolff_producers.erl @@ -330,7 +330,7 @@ pick_next_alive(LookupFn, Partition, Count) -> pick_next_alive(LookupFn, (Partition + 1) rem Count, Count, _Tried = 1). pick_next_alive(_LookupFn, _Partition, Count, Count) -> - throw(#{cause => all_producers_down}); + throw(#{cause => all_producers_down, count => Count}); pick_next_alive(LookupFn, Partition, Count, Tried) -> Pid = LookupFn(Partition), case is_alive(Pid) of diff --git a/test/wolff_dynamic_topics_tests.erl b/test/wolff_dynamic_topics_tests.erl index 376bf6a..1603e69 100644 --- a/test/wolff_dynamic_topics_tests.erl +++ b/test/wolff_dynamic_topics_tests.erl @@ -2,7 +2,6 @@ -include("wolff.hrl"). -include_lib("eunit/include/eunit.hrl"). --include_lib("kafka_protocol/include/kpro.hrl"). -define(KEY, key(?FUNCTION_NAME)). -define(HOSTS, [{"localhost", 9092}]). @@ -31,8 +30,8 @@ dynamic_topics_test() -> T3 = <<"test-topic-3">>, ?assertMatch({0, Pid} when is_pid(Pid), wolff:send2(Producers, T1, [Msg], AckFun)), receive acked -> ok end, - ?assertMatch({0, Offset} when is_integer(Offset), wolff:send_sync2(Producers, T2, [Msg], 10_000)), - ?assertMatch({0, Offset} when is_integer(Offset), wolff:send_sync2(Producers, T3, [Msg], 10_000)), + ?assertMatch({0, Offset} when is_integer(Offset), send(Producers, T2, Msg)), + ?assertMatch({0, Offset} when is_integer(Offset), cast(Producers, T3, Msg)), ?assertMatch(#{metadata_ts := #{T1 := _, T2 := _, T3 := _}, known_topics := #{T1 := _, T2 := _, T3 := _}, conns := #{{T1, 0} := _, {T2, 0} := _, {T3, 0} := _} @@ -52,6 +51,23 @@ dynamic_topics_test() -> ok = application:stop(wolff), ok. +send(Producers, Topic, Message) -> + wolff:send_sync2(Producers, Topic, [Message], 10_000). + +cast(Producers, Topic, Message) -> + Self = self(), + Ref = make_ref(), + AckFun = fun(Partition, BaseOffset) -> + Self ! {Ref, Partition, BaseOffset}, + ok + end, + {_, _} = wolff:cast2(Producers, Topic, [Message], AckFun), + receive + {Ref, Partition, BaseOffset} -> {Partition, BaseOffset} + after 10_000 -> + error(timeout) + end. + unknown_topic_expire_test() -> _ = application:stop(wolff), %% ensure stopped {ok, _} = application:ensure_all_started(wolff), diff --git a/test/wolff_tests.erl b/test/wolff_tests.erl index d394a04..b80b965 100644 --- a/test/wolff_tests.erl +++ b/test/wolff_tests.erl @@ -324,20 +324,25 @@ replayq_overflow_test() -> Msg = #{key => <<>>, value => <<"12345">>}, Batch = [Msg, Msg], BatchSize = wolff_producer:batch_bytes(Batch), - ProducerCfg = #{max_batch_bytes => 1, %% make sure not collecting calls into one batch + LingerMs = 100, + ProducerCfg = #{max_batch_bytes => 1, %% ensure send one call at a time replayq_max_total_bytes => BatchSize, required_acks => all_isr, - max_linger_ms => 1000 %% do not send to kafka immediately + max_linger_ms => LingerMs, %% delay enqueue + max_linger_bytes => BatchSize + 1 %% delay enqueue }, {ok, Producers} = wolff:start_producers(Client, <<"test-topic">>, ProducerCfg), + Pid = wolff_producers:lookup_producer(Producers, 0), + ?assert(is_process_alive(Pid)), TesterPid = self(), AckFun1 = fun(_Partition, BaseOffset) -> TesterPid ! {ack_1, BaseOffset}, ok end, AckFun2 = fun(_Partition, BaseOffset) -> TesterPid ! {ack_2, BaseOffset}, ok end, SendF = fun(AckFun) -> wolff:send(Producers, Batch, AckFun) end, %% send two batches to overflow one - spawn(fun() -> SendF(AckFun1) end), + proc_lib:spawn_link(fun() -> SendF(AckFun1) end), timer:sleep(1), %% ensure order - spawn(fun() -> SendF(AckFun2) end), + proc_lib:spawn_link(fun() -> SendF(AckFun2) end), + timer:sleep(LingerMs * 2), try %% pushed out of replayq due to overflow receive @@ -363,7 +368,7 @@ replayq_overflow_test() -> [1] = get_telemetry_seq(CntrEventsTable, [wolff, dropped_queue_full]), ?assert_eq_optional_tail( wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing])), - [0,1,2,1,0]), + [0,2,1,0]), ?assert_eq_optional_tail( wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, inflight])), [0,1,0]), @@ -610,8 +615,10 @@ test_message_too_large() -> %% then it should retry sending one message at a time ProducerCfg = #{partitioner => fun(_, _) -> 0 end, max_batch_bytes => MaxMessageBytes * 3, - %% ensure batching - max_linger_ms => 100 + %% ensure batching by delay enqueue by 100 seconds + max_linger_ms => 100, + %% ensure linger is not expired by reaching size + max_linger_bytes => MaxMessageBytes * 100 }, {ok, Producers} = wolff:start_producers(Client, TopicBin, ProducerCfg), MaxBytesCompensateOverhead = MaxMessageBytes - ?BATCHING_OVERHEAD - 7, @@ -620,7 +627,7 @@ test_message_too_large() -> Ref = make_ref(), Self = self(), AckFun = {fun ?MODULE:ack_cb/4, [Self, Ref]}, - _ = wolff:send(Producers, Batch, AckFun), + {_, _} = wolff:cast(Producers, Batch, AckFun), fun() -> ?WAIT(5000, {ack, Ref, _Partition, BaseOffset}, BaseOffset) end end, %% Must be ok to send one message