Skip to content

Commit

Permalink
feat: add new api wolff:cast/3 and wolff:cast/4
Browse files Browse the repository at this point in the history
cast APIs have no backpressure at all.
not even wait for the messages getting queued.
  • Loading branch information
zmstone committed Sep 8, 2024
1 parent bcfceda commit 2141f51
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 14 deletions.
24 changes: 21 additions & 3 deletions src/wolff.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@

%% Messaging APIs
-export([send/3,
send_sync/3
send_sync/3,
cast/3
]).

%% Messaging APIs of dynamic producer.
-export([send2/4,
cast2/4,
send_sync2/4,
add_topic/2,
remove_topic/2
Expand Down Expand Up @@ -120,8 +122,8 @@ stop_and_delete_supervised_producers(Producers) ->
%% In case `required_acks' is configured to `none', the callback is evaluated immediately after send.
%% The partition number and the per-partition worker pid are returned in a tuple to caller,
%% so it may use them to correlate the future `AckFun' evaluation.
%% NOTE: This API has no backpressure,
%% high produce rate may cause execussive ram and disk usage.
%% NOTE: This API is blocked until the batch is enqueued to the producer buffer, otherwise no backpressure.
%% High produce rate may cause execussive ram and disk usage.
%% NOTE: In case producers are configured with `required_acks = none',
%% the second arg for callback function will always be `?UNKNOWN_OFFSET' (`-1').
-spec send(producers(), [msg()], ack_fun()) -> {partition(), pid()}.
Expand All @@ -137,6 +139,22 @@ send2(Producers, Topic, Batch, AckFun) ->
ok = wolff_producer:send(ProducerPid, Batch, AckFun),
{Partition, ProducerPid}.

%% @doc Cast a batch to a partition producer.
%% Even less backpressure than `send/3'.
%% It does not wait for the batch to be enqueued to the producer buffer.
-spec cast(producers(), [msg()], ack_fun()) -> {partition(), pid()}.
cast(Producers, Batch, AckFun) ->
{Partition, ProducerPid} = wolff_producers:pick_producer(Producers, Batch),
ok = wolff_producer:send(ProducerPid, Batch, AckFun, no_wait_for_queued),
{Partition, ProducerPid}.

%% @doc Topic as argument for dynamic producers, otherwise equivalent to `cast/3'.
-spec cast2(producers(), topic(), [msg()], ack_fun()) -> {partition(), pid()}.
cast2(Producers, Topic, Batch, AckFun) ->
{Partition, ProducerPid} = wolff_producers:pick_producer2(Producers, Topic, Batch),
ok = wolff_producer:send(ProducerPid, Batch, AckFun, no_wait_for_queued),
{Partition, ProducerPid}.

%% @doc Pick a partition producer and send a batch synchronously.
%% Raise error exception in case produce pid is down or when timed out.
%% NOTE: In case producers are configured with `required_acks => none',
Expand Down
23 changes: 16 additions & 7 deletions src/wolff_producer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
-define(MIN_DISCARD_LOG_INTERVAL, 5000).

%% APIs
-export([start_link/5, stop/1, send/3, send_sync/3]).
-export([start_link/5, stop/1, send/3, send/4, send_sync/3]).

%% gen_server callbacks
-export([code_change/3, handle_call/3, handle_cast/2, handle_info/2, init/1, terminate/2]).
Expand Down Expand Up @@ -100,7 +100,7 @@
-define(no_caller_ack, no_caller_ack).
-define(MAX_LINGER_BYTES, (10 bsl 20)).
-type ack_fun() :: wolff:ack_fun().
-type send_req() :: ?SEND_REQ(pid() | reference(), [wolff:msg()], ack_fun()).
-type send_req() :: ?SEND_REQ({pid(), reference()}, [wolff:msg()], ack_fun()).
-type sent() :: #{req_ref := reference(),
q_items := [?Q_ITEM(_CallId, _Ts, _Batch)],
q_ack_ref := replayq:ack_ref(),
Expand Down Expand Up @@ -159,16 +159,22 @@ start_link(ClientId, Topic, Partition, MaybeConnPid, Config) ->
stop(Pid) ->
gen_server:call(Pid, stop, infinity).

%% @equiv send(Pid, Batch, AckFun, wait_for_queued)
-spec send(pid(), [wolff:msg()], wolff:ack_fun()) -> ok.
send(Pid, Batch, AckFun) ->
send(Pid, Batch, AckFun, wait_for_queued).

%% @doc Send a batch asynchronously.
%% The callback function is evaluated by producer process when ack is received from kafka.
%% In case `required_acks' is configured to `none', the callback is evaluated immediately after send.
%% NOTE: This API has no backpressure,
%% high produce rate may cause execussive ram and disk usage.
%% Even less backpressure when the 4th arg is `no_linger'.
%% NOTE: It's possible that two or more batches get included into one produce request.
%% But a batch is never split into produce requests.
%% Make sure it will not exceed the `max_batch_bytes' limit when sending a batch.
-spec send(pid(), [wolff:msg()], wolff:ack_fun()) -> ok.
send(Pid, [_ | _] = Batch0, AckFun) ->
-spec send(pid(), [wolff:msg()], wolff:ack_fun(), WaitForQueued::wait_for_queued | no_wait_for_queued) -> ok.
send(Pid, [_ | _] = Batch0, AckFun, wait_for_queued) ->
Caller = self(),
Mref = erlang:monitor(process, Pid),
Batch = ensure_ts(Batch0),
Expand All @@ -179,7 +185,11 @@ send(Pid, [_ | _] = Batch0, AckFun) ->
ok;
{'DOWN', Mref, _, _, Reason} ->
erlang:error({producer_down, Reason})
end.
end;
send(Pid, [_ | _] = Batch0, AckFun, no_wait_for_queued) ->
Batch = ensure_ts(Batch0),
erlang:send(Pid, ?SEND_REQ(?no_queued_reply, Batch, AckFun)),
ok.

%% @doc Send a batch synchronously.
%% Raise error exception in case produce pid is down or when timed out.
Expand All @@ -192,8 +202,7 @@ send_sync(Pid, Batch0, Timeout) ->
_ = erlang:send(Caller, {Mref, Partition, BaseOffset}),
ok
end,
Batch = ensure_ts(Batch0),
erlang:send(Pid, ?SEND_REQ(?no_queued_reply, Batch, AckFun)),
ok = send(Pid, Batch0, AckFun, no_wait_for_queued),
receive
{Mref, Partition, BaseOffset} ->
erlang:demonitor(Mref, [flush]),
Expand Down
22 changes: 19 additions & 3 deletions test/wolff_dynamic_topics_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

-include("wolff.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("kafka_protocol/include/kpro.hrl").

-define(KEY, key(?FUNCTION_NAME)).
-define(HOSTS, [{"localhost", 9092}]).
Expand Down Expand Up @@ -31,8 +30,8 @@ dynamic_topics_test() ->
T3 = <<"test-topic-3">>,
?assertMatch({0, Pid} when is_pid(Pid), wolff:send2(Producers, T1, [Msg], AckFun)),
receive acked -> ok end,
?assertMatch({0, Offset} when is_integer(Offset), wolff:send_sync2(Producers, T2, [Msg], 10_000)),
?assertMatch({0, Offset} when is_integer(Offset), wolff:send_sync2(Producers, T3, [Msg], 10_000)),
?assertMatch({0, Offset} when is_integer(Offset), send(Producers, T2, Msg)),
?assertMatch({0, Offset} when is_integer(Offset), cast(Producers, T3, Msg)),
?assertMatch(#{metadata_ts := #{T1 := _, T2 := _, T3 := _},
known_topics := #{T1 := _, T2 := _, T3 := _},
conns := #{{T1, 0} := _, {T2, 0} := _, {T3, 0} := _}
Expand All @@ -52,6 +51,23 @@ dynamic_topics_test() ->
ok = application:stop(wolff),
ok.

send(Producers, Topic, Message) ->
wolff:send_sync2(Producers, Topic, [Message], 10_000).

cast(Producers, Topic, Message) ->
Self = self(),
Ref = make_ref(),
AckFun = fun(Partition, BaseOffset) ->
Self ! {Ref, Partition, BaseOffset},
ok
end,
{_, _} = wolff:cast2(Producers, Topic, [Message], AckFun),
receive
{Ref, Partition, BaseOffset} -> {Partition, BaseOffset}
after 10_000 ->
error(timeout)
end.

unknown_topic_expire_test() ->
_ = application:stop(wolff), %% ensure stopped
{ok, _} = application:ensure_all_started(wolff),
Expand Down
2 changes: 1 addition & 1 deletion test/wolff_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ test_message_too_large() ->
Ref = make_ref(),
Self = self(),
AckFun = {fun ?MODULE:ack_cb/4, [Self, Ref]},
spawn(fun() -> wolff:send(Producers, Batch, AckFun) end),
{_, _} = wolff:cast(Producers, Batch, AckFun),
fun() -> ?WAIT(5000, {ack, Ref, _Partition, BaseOffset}, BaseOffset) end
end,
%% Must be ok to send one message
Expand Down

0 comments on commit 2141f51

Please sign in to comment.