-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: optimize disk buffer writes #77
Conversation
71844d1
to
08330bd
Compare
Previously, the wolff_producer implementation is aggressive when try to enqueue newly received calls, even when max_linger_ms is set to non-zero because the linger was implemented from the popping end of the queue. This change moves the linger timer to the pushing end of the queue, that is, the process will delay enqueue to allow a larger collection of concurrent calls so the write batch towards disk can be larger
08330bd
to
bcfceda
Compare
cast APIs have no backpressure at all. not even wait for the messages getting queued.
de00654
to
2141f51
Compare
@@ -130,6 +136,9 @@ | |||
%% exact max allowed message size configured in kafka. | |||
%% * `max_linger_ms': Age in milliseconds a baatch can stay in queue when the connection | |||
%% is idle (as in no pending acks). Default: 0 (as in send immediately) | |||
%% * `max_linger_bytes': Number of bytes to collect before sending it to Kafka. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: what is the difference between max_linger_bytes
and max_batch_bytes
? Just looking at the descriptions, they seem the same to me. 🙈
From the code, it seems that this is the number of bytes that are collected in the producer mailbox before it's enqueued (to disk or memory), before even sending to kafka?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, max_batch_bytes is for the popping end of the queue.
max_linger_bytes is at the pushing end.
src/wolff_producer.erl
Outdated
receive | ||
?SEND_REQ(_, Batch, _) = Call -> | ||
collect_send_calls([Call | Calls], Size + batch_bytes(Batch), Limit) | ||
Bytes = batch_bytes(Batch), | ||
collect_send_calls(Call, Bytes, Calls) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add a count or byte limit before recursing, to avoid end up getting blocked for a long time collecting calls that keep arriving if multiple clients are appearing and sending requests?
collect_send_calls(Call, Bytes, #{ts := Ts, bytes := Bytes0, batch_r := BatchR} = Calls, Max) -> | ||
Sum = Bytes0 + Bytes, | ||
R = Calls#{ts => Ts, bytes => Sum, batch_r => [Call | BatchR]}, | ||
case Sum < Max of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit:
case Sum < Max of | |
case Sum =< Max of |
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
less than max, then continue collecting.
is_linger_continue(#{calls := Calls, config := Config}) -> | ||
#{max_linger_ms := MaxLingerMs, max_linger_bytes := MaxLingerBytes} = Config, | ||
#{ts := Ts, bytes := Bytes} = Calls, | ||
case Bytes < MaxLingerBytes of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case Bytes < MaxLingerBytes of | |
case Bytes =< MaxLingerBytes of |
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
less than max, then continue lingering.
Previously, the
wolff_producer
implementation was aggressive when trying to enqueue newly received calls,even when
max_linger_ms
is set to non-zero because the linger was implemented at the popping end of the queue.This change moves the linger timer to the pushing end of the queue, that is, the process will delay enqueue to allow a larger collection of concurrent calls so the write batch towards disk can be larger
This also means when
max_linger_ms
is not zero, the concurrent callers will get blocked while lingering,For completely non-blocking API, call
cast/3
orcast/4
instead.