Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: optimize disk buffer writes #77

Merged
merged 3 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ wolff:send(Producers, [Msg], AckFun).
* `max_linger_ms`: Age in milliseconds a batch can stay in queue when the connection
is idle (as in no pending acks from kafka). Default=0 (as in send immediately).

* `max_linger_bytes`: Number of bytes to collect before sending it to Kafka. If set to 0, `max_batch_bytes` is taken for mem-only mode, otherwise it's 10 times `max_batch_bytes` (but never exceeds 10MB) to optimize disk write.

* `max_send_ahead`: Number of batches to be sent ahead without receiving ack for
the last request. Must be 0 if messages must be delivered in strict order.

Expand Down
3 changes: 3 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
* 4.0.0
- Delete global stats (deprecated since 1.9).
- Move linger delay to front of the buffer queue.
The default value for `max_linger_ms` is `0` as before.
Setting `max_linger_ms=10` will make the disk write batch larger when buffer is configured to disk mode or disk-offload mode.

* 3.0.4
- Upgrade to kafka_protocol-4.1.8
Expand Down
2 changes: 1 addition & 1 deletion src/wolff.app.src
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{application, wolff,
[{description, "Kafka's publisher"},
{vsn, "4.0.0"},
{vsn, "git"},
{registered, []},
{applications,
[kernel,
Expand Down
24 changes: 21 additions & 3 deletions src/wolff.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@

%% Messaging APIs
-export([send/3,
send_sync/3
send_sync/3,
cast/3
]).

%% Messaging APIs of dynamic producer.
-export([send2/4,
cast2/4,
send_sync2/4,
add_topic/2,
remove_topic/2
Expand Down Expand Up @@ -120,8 +122,8 @@ stop_and_delete_supervised_producers(Producers) ->
%% In case `required_acks' is configured to `none', the callback is evaluated immediately after send.
%% The partition number and the per-partition worker pid are returned in a tuple to caller,
%% so it may use them to correlate the future `AckFun' evaluation.
%% NOTE: This API has no backpressure,
%% high produce rate may cause execussive ram and disk usage.
%% NOTE: This API is blocked until the batch is enqueued to the producer buffer, otherwise no backpressure.
%% High produce rate may cause excessive ram and disk usage.
%% NOTE: In case producers are configured with `required_acks = none',
%% the second arg for callback function will always be `?UNKNOWN_OFFSET' (`-1').
-spec send(producers(), [msg()], ack_fun()) -> {partition(), pid()}.
Expand All @@ -137,6 +139,22 @@ send2(Producers, Topic, Batch, AckFun) ->
ok = wolff_producer:send(ProducerPid, Batch, AckFun),
{Partition, ProducerPid}.

%% @doc Cast a batch to a partition producer.
%% Even less backpressure than `send/3'.
%% It does not wait for the batch to be enqueued to the producer buffer.
-spec cast(producers(), [msg()], ack_fun()) -> {partition(), pid()}.
cast(Producers, Batch, AckFun) ->
{Partition, ProducerPid} = wolff_producers:pick_producer(Producers, Batch),
ok = wolff_producer:send(ProducerPid, Batch, AckFun, no_wait_for_queued),
{Partition, ProducerPid}.

%% @doc Topic as argument for dynamic producers, otherwise equivalent to `cast/3'.
-spec cast2(producers(), topic(), [msg()], ack_fun()) -> {partition(), pid()}.
cast2(Producers, Topic, Batch, AckFun) ->
{Partition, ProducerPid} = wolff_producers:pick_producer2(Producers, Topic, Batch),
ok = wolff_producer:send(ProducerPid, Batch, AckFun, no_wait_for_queued),
{Partition, ProducerPid}.

%% @doc Pick a partition producer and send a batch synchronously.
%% Raise error exception in case produce pid is down or when timed out.
%% NOTE: In case producers are configured with `required_acks => none',
Expand Down
Loading
Loading