From 71844d188b103526fe05a63d05af9c12a43a809b Mon Sep 17 00:00:00 2001 From: zmstone Date: Fri, 6 Sep 2024 22:11:01 +0200 Subject: [PATCH] feat: optimize disk buffer writes Previously, the wolff_producer implementation is agreesive when try to enqueue newly received calls, even when max_linger_ms is set to non-zero because the linger was implemented from the poping end of the queue. This change moves the linger timer to the pushing end of the queue, that is, the process will delay enqueue to allow a larger collection of concurrent calls so the write batch towards disk can be larger --- README.md | 2 + src/wolff_producer.erl | 145 +++++++++++++++++++++++++++------------- src/wolff_producers.erl | 2 +- test/wolff_tests.erl | 21 ++++-- 4 files changed, 115 insertions(+), 55 deletions(-) diff --git a/README.md b/README.md index 54698b9..a783b56 100644 --- a/README.md +++ b/README.md @@ -161,6 +161,8 @@ wolff:send(Producers, [Msg], AckFun). * `max_linger_ms`: Age in milliseconds a batch can stay in queue when the connection is idle (as in no pending acks from kafka). Default=0 (as in send immediately). +* `max_linger_bytes`: Number of bytes to collect before sending it to Kafka. If set to 0, `max_batch_bytes` is taken for mem-only mode, otherwise it's 10 times `max_batch_bytes` (but never exceeds 10MB) to optimize disk write. + * `max_send_ahead`: Number of batches to be sent ahead without receiving ack for the last request. Must be 0 if messages must be delivered in strict order. diff --git a/src/wolff_producer.erl b/src/wolff_producer.erl index e3bb5f3..f0db5f9 100644 --- a/src/wolff_producer.erl +++ b/src/wolff_producer.erl @@ -45,6 +45,7 @@ ack_timeout | max_batch_bytes | max_linger_ms | + max_linger_bytes | max_send_ahead | compression | drop_if_highmem | @@ -60,6 +61,7 @@ ack_timeout => timeout(), max_batch_bytes => pos_integer(), max_linger_ms => non_neg_integer(), + max_linger_bytes => non_neg_integer(), max_send_ahead => non_neg_integer(), compression => kpro:compress_option(), drop_if_highmem => boolean(), @@ -75,6 +77,7 @@ ack_timeout => timeout(), max_batch_bytes => pos_integer(), max_linger_ms => non_neg_integer(), + max_linger_bytes => non_neg_integer(), max_send_ahead => non_neg_integer(), compression => kpro:compress_option(), drop_if_highmem => boolean(), @@ -95,7 +98,9 @@ -define(ACK_CB(AckCb, Partition), {AckCb, Partition}). -define(no_queue_ack, no_queue_ack). -define(no_caller_ack, no_caller_ack). - +-define(MAX_LINGER_BYTES, (10 bsl 20)). +-type ack_fun() :: wolff:ack_fun(). +-type send_req() :: ?SEND_REQ(pid() | reference(), [wolff:msg()], ack_fun()). -type sent() :: #{req_ref := reference(), q_items := [?Q_ITEM(_CallId, _Ts, _Batch)], q_ack_ref := replayq:ack_ref(), @@ -106,7 +111,7 @@ , client_id := wolff:client_id() , config := config_state() , conn := undefined | _ - , ?linger_expire_timer := false | timer:tref() + , ?linger_expire_timer := false | reference() , partition := partition() , pending_acks := #{} % CallId => AckCb , produce_api_vsn := undefined | _ @@ -115,6 +120,7 @@ , sent_reqs_count := non_neg_integer() , inflight_calls := non_neg_integer() , topic := topic() + , calls := empty | #{ts := pos_integer(), bytes := pos_integer(), batch_r := [send_req()]} }. %% @doc Start a per-partition producer worker. @@ -130,6 +136,9 @@ %% exact max allowed message size configured in kafka. %% * `max_linger_ms': Age in milliseconds a baatch can stay in queue when the connection %% is idle (as in no pending acks). Default: 0 (as in send immediately) +%% * `max_linger_bytes': Number of bytes to collect before sending it to Kafka. +%% If set to 0, `max_batch_bytes' is taken for mem-only mode, otherwise it's 10 times +%% `max_batch_bytes' (but never exceeds 10MB) to optimize disk write. %% * `max_send_ahead': Number of batches to be sent ahead without receiving ack for %% the last request. Must be 0 if messages must be delivered in strict order. %% * `compression': `no_compression', `snappy' or `gzip'. @@ -251,7 +260,8 @@ do_init(#{client_id := ClientId, fun(Meta) -> Meta#{partition_id => Partition} end, #{partition_id => Partition}, Config0), - Config = maps:without([replayq_dir, replayq_seg_bytes], Config1), + Config2 = resolve_max_linger_bytes(Config1, Q), + Config = maps:without([replayq_dir, replayq_seg_bytes], Config2), wolff_metrics:queuing_set(Config, replayq:count(Q)), wolff_metrics:inflight_set(Config, 0), St#{replayq => Q, @@ -263,7 +273,8 @@ do_init(#{client_id := ClientId, sent_reqs_count => 0, inflight_calls => 0, conn := undefined, - client_id => ClientId + client_id => ClientId, + calls => empty }. handle_call(stop, From, St) -> @@ -275,11 +286,14 @@ handle_call(_Call, _From, St) -> handle_info({do_init, St0}, _) -> St = do_init(St0), {noreply, St}; -handle_info(?linger_expire, St) -> - {noreply, maybe_send_to_kafka(St#{?linger_expire_timer := false})}; -handle_info(?SEND_REQ(_, Batch, _) = Call, #{config := #{max_batch_bytes := Limit}} = St0) -> - {Calls, _CollectedBytes} = collect_send_calls([Call], batch_bytes(Batch), Limit), - St1 = enqueue_calls(Calls, St0), +handle_info(?linger_expire, St0) -> + St1 = enqueue_calls(St0#{?linger_expire_timer => false}, no_linger), + St = maybe_send_to_kafka(St1), + {noreply, St}; +handle_info(?SEND_REQ(_, Batch, _) = Call, #{calls := Calls0} = St0) -> + Bytes = batch_bytes(Batch), + Calls = collect_send_calls(Call, Bytes, Calls0), + St1 = enqueue_calls(St0#{calls => Calls}, maybe_linger), St = maybe_send_to_kafka(St1), {noreply, St}; handle_info({msg, Conn, Rsp}, #{conn := Conn} = St0) -> @@ -366,11 +380,26 @@ ensure_ts(Batch) -> make_call_id(Base) -> Base + erlang:unique_integer([positive]). +resolve_max_linger_bytes(#{max_linger_bytes := 0, + max_batch_bytes := Mb + } = Config, Q) -> + case replayq:is_mem_only(Q) of + true -> + Config#{max_linger_bytes => Mb}; + false -> + %% when disk or offload mode, try to linger with more bytes + %% to optimize disk write performance + Config#{max_linger_bytes => min(?MAX_LINGER_BYTES, Mb * 10)} + end; +resolve_max_linger_bytes(Config, _Q) -> + Config. + use_defaults(Config) -> use_defaults(Config, [{required_acks, all_isr}, {ack_timeout, 10000}, {max_batch_bytes, ?WOLFF_KAFKA_DEFAULT_MAX_MESSAGE_BYTES}, {max_linger_ms, 0}, + {max_linger_bytes, 0}, {max_send_ahead, 0}, {compression, no_compression}, {reconnect_delay_ms, 2000} @@ -451,38 +480,17 @@ maybe_send_to_kafka(#{conn := Conn, replayq := Q} = St) -> maybe_send_to_kafka_has_pending(St) -> case is_send_ahead_allowed(St) of - true -> maybe_send_to_kafka_now(St); + true -> send_to_kafka(St); false -> St %% reached max inflight limit end. -maybe_send_to_kafka_now(#{?linger_expire_timer := LTimer, - replayq := Q, - config := #{max_linger_ms := Max}} = St) -> - First = replayq:peek(Q), - LingerTimeout = Max - (now_ts() - get_item_ts(First)), - case LingerTimeout =< 0 of - true -> - %% the oldest item is too old, send now - send_to_kafka(St); %% send now - false when is_reference(LTimer) -> - %% timer already started - St; - false -> - %% delay send, try to accumulate more into the batch - Ref = erlang:send_after(LingerTimeout, self(), ?linger_expire), - St#{?linger_expire_timer := Ref} - end. - send_to_kafka(#{sent_reqs := SentReqs, sent_reqs_count := SentReqsCount, inflight_calls := InflightCalls, replayq := Q, config := #{max_batch_bytes := BytesLimit} = Config, - conn := Conn, - ?linger_expire_timer := LTimer + conn := Conn } = St0) -> - %% timer might have already expired, but should do no harm - is_reference(LTimer) andalso erlang:cancel_timer(LTimer), {NewQ, QAckRef, Items} = replayq:pop(Q, #{bytes_limit => BytesLimit, count_limit => 999999999}), wolff_metrics:queuing_set(Config, replayq:count(NewQ)), @@ -491,7 +499,7 @@ send_to_kafka(#{sent_reqs := SentReqs, NewInflightCalls = InflightCalls + NrOfCalls, _ = wolff_metrics:inflight_set(Config, NewInflightCalls), #kpro_req{ref = Ref, no_ack = NoAck} = Req = make_request(Items, St0), - St1 = St0#{replayq := NewQ, ?linger_expire_timer := false}, + St1 = St0#{replayq := NewQ}, Sent = #{req_ref => Ref, q_items => Items, q_ack_ref => QAckRef, @@ -534,8 +542,6 @@ queue_item_marshaller(?Q_ITEM(_, _, _) = I) -> queue_item_marshaller(Bin) when is_binary(Bin) -> binary_to_term(Bin). -get_item_ts(?Q_ITEM(_, Ts, _)) -> Ts. - get_produce_version(#{conn := Conn} = St) when is_pid(Conn) -> Vsn = case kpro:get_api_vsn_range(Conn, produce) of {ok, {_Min, Max}} -> Max; @@ -825,24 +831,70 @@ zz(I) -> (I bsl 1) bxor (I bsr 63). request_async(Conn, Req) when is_pid(Conn) -> ok = kpro:send(Conn, Req). +collect_send_calls(Call, Bytes, empty) -> + Init = #{ts => now_ts(), bytes => 0, batch_r => []}, + collect_send_calls(Call, Bytes, Init); +collect_send_calls(Call, Bytes, #{ts := Ts, bytes := Bytes0, batch_r := BatchR}) -> + _ = maybe_reply_queued(Call), + collect_send_calls2(#{ts => Ts, bytes => Bytes0 + Bytes, batch_r => [Call | BatchR]}). + %% Collect all send requests which are already in process mailbox -collect_send_calls(Calls, Size, Limit) when Size >= Limit -> - {lists:reverse(Calls), Size}; -collect_send_calls(Calls, Size, Limit) -> +collect_send_calls2(Calls) -> receive ?SEND_REQ(_, Batch, _) = Call -> - collect_send_calls([Call | Calls], Size + batch_bytes(Batch), Limit) + Bytes = batch_bytes(Batch), + collect_send_calls(Call, Bytes, Calls) after 0 -> - {lists:reverse(Calls), Size} + Calls end. -enqueue_calls(Calls, #{replayq := Q, - pending_acks := PendingAcks0, - call_id_base := CallIdBase, - partition := Partition, - config := Config0 - } = St0) -> +ensure_linger_expire_timer_start(#{?linger_expire_timer := false} = St, Timeout) -> + %% delay enqueue, try to accumulate more into the batch + Ref = erlang:send_after(Timeout, self(), ?linger_expire), + St#{?linger_expire_timer := Ref}; +ensure_linger_expire_timer_start(St, _Timeout) -> + %% timer is already started + St. + +ensure_linger_expire_timer_cancel(#{?linger_expire_timer := LTimer} = St) -> + _ = is_reference(LTimer) andalso erlang:cancel_timer(LTimer), + St#{?linger_expire_timer => false}. + +%% check if the call collection should continue to linger before enqueue +is_linger_continue(#{calls := Calls, config := Config}) -> + #{max_linger_ms := MaxLingerMs, max_linger_bytes := MaxLingerBytes} = Config, + #{ts := Ts, bytes := Bytes} = Calls, + TimeLeft = MaxLingerMs - (now_ts() - Ts), + case TimeLeft =< 0 of + true -> + false; + false -> + (Bytes < MaxLingerBytes) andalso {true, TimeLeft} + end. + +enqueue_calls(#{calls := empty} = St, _) -> + %% no call to enqueue + St; +enqueue_calls(St, maybe_linger) -> + case is_linger_continue(St) of + {true, Timeout} -> + ensure_linger_expire_timer_start(St, Timeout); + false -> + enqueue_calls(St, no_linger) + end; +enqueue_calls(#{calls := #{batch_r := CallsR}} = St0, no_linger) -> + Calls = lists:reverse(CallsR), + St = ensure_linger_expire_timer_cancel(St0), + enqueue_calls2(Calls, St#{calls => empty}). + +enqueue_calls2(Calls, + #{replayq := Q, + pending_acks := PendingAcks0, + call_id_base := CallIdBase, + partition := Partition, + config := Config0 + } = St0) -> {QueueItems, PendingAcks, CallByteSize} = lists:foldl( fun(?SEND_REQ(_From, Batch, AckFun), {Items, PendingAcksIn, Size}) -> @@ -856,7 +908,6 @@ enqueue_calls(Calls, #{replayq := Q, end, {[], PendingAcks0, 0}, Calls), NewQ = replayq:append(Q, lists:reverse(QueueItems)), wolff_metrics:queuing_set(Config0, replayq:count(NewQ)), - lists:foreach(fun maybe_reply_queued/1, Calls), Overflow = case maps:get(drop_if_highmem, Config0, false) andalso replayq:is_mem_only(NewQ) andalso load_ctl:is_high_mem() of diff --git a/src/wolff_producers.erl b/src/wolff_producers.erl index 068d8b2..0c1edd0 100644 --- a/src/wolff_producers.erl +++ b/src/wolff_producers.erl @@ -330,7 +330,7 @@ pick_next_alive(LookupFn, Partition, Count) -> pick_next_alive(LookupFn, (Partition + 1) rem Count, Count, _Tried = 1). pick_next_alive(_LookupFn, _Partition, Count, Count) -> - throw(#{cause => all_producers_down}); + throw(#{cause => all_producers_down, count => Count}); pick_next_alive(LookupFn, Partition, Count, Tried) -> Pid = LookupFn(Partition), case is_alive(Pid) of diff --git a/test/wolff_tests.erl b/test/wolff_tests.erl index d394a04..75c50a9 100644 --- a/test/wolff_tests.erl +++ b/test/wolff_tests.erl @@ -324,20 +324,25 @@ replayq_overflow_test() -> Msg = #{key => <<>>, value => <<"12345">>}, Batch = [Msg, Msg], BatchSize = wolff_producer:batch_bytes(Batch), - ProducerCfg = #{max_batch_bytes => 1, %% make sure not collecting calls into one batch + LingerMs = 100, + ProducerCfg = #{max_batch_bytes => 1, %% ensure send one call at a time replayq_max_total_bytes => BatchSize, required_acks => all_isr, - max_linger_ms => 1000 %% do not send to kafka immediately + max_linger_ms => LingerMs, %% delay enqueue + max_linger_bytes => BatchSize + 1 %% delay enqueue }, {ok, Producers} = wolff:start_producers(Client, <<"test-topic">>, ProducerCfg), + Pid = wolff_producers:lookup_producer(Producers, 0), + ?assert(is_process_alive(Pid)), TesterPid = self(), AckFun1 = fun(_Partition, BaseOffset) -> TesterPid ! {ack_1, BaseOffset}, ok end, AckFun2 = fun(_Partition, BaseOffset) -> TesterPid ! {ack_2, BaseOffset}, ok end, SendF = fun(AckFun) -> wolff:send(Producers, Batch, AckFun) end, %% send two batches to overflow one - spawn(fun() -> SendF(AckFun1) end), + proc_lib:spawn_link(fun() -> SendF(AckFun1) end), timer:sleep(1), %% ensure order - spawn(fun() -> SendF(AckFun2) end), + proc_lib:spawn_link(fun() -> SendF(AckFun2) end), + timer:sleep(LingerMs * 2), try %% pushed out of replayq due to overflow receive @@ -363,7 +368,7 @@ replayq_overflow_test() -> [1] = get_telemetry_seq(CntrEventsTable, [wolff, dropped_queue_full]), ?assert_eq_optional_tail( wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing])), - [0,1,2,1,0]), + [0,2,1,0]), ?assert_eq_optional_tail( wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, inflight])), [0,1,0]), @@ -610,8 +615,10 @@ test_message_too_large() -> %% then it should retry sending one message at a time ProducerCfg = #{partitioner => fun(_, _) -> 0 end, max_batch_bytes => MaxMessageBytes * 3, - %% ensure batching - max_linger_ms => 100 + %% ensure batching by delay enqueue by 100 seconds + max_linger_ms => 100, + %% ensure linger is not expired by reaching size + max_linger_bytes => MaxMessageBytes * 100 }, {ok, Producers} = wolff:start_producers(Client, TopicBin, ProducerCfg), MaxBytesCompensateOverhead = MaxMessageBytes - ?BATCHING_OVERHEAD - 7,