From f35d281bb93de16023a5d8a3397f392471fa0a33 Mon Sep 17 00:00:00 2001 From: urmastalimaa Date: Mon, 27 May 2024 23:33:37 +0300 Subject: [PATCH 1/2] Forward unhandled messages to optional subscriber handle_info callback The motivation for adding handle_info callbacks is to allow subscriber worker processes which are spawned by brod to participate in message passing, supporting a variety of use cases utilizing async acking and committing. An example use case: * Start a group subscriber using `brod_group_subscriber_v2` * In a partition worker spawn a new process for every message under a supervisor specific to the worker's topic-partition * When the supervisor has <= N processes, ack last seen offset to fetch new messages. When the supervisor has > N processes, messages are not acked to apply backpressure * When all processes up to offset O1 have completed, commit offset O1 Allowing arbitrary message passing in the topic and group subscriber workers supports not only that use case but many others. --- src/brod_group_subscriber_worker.erl | 11 +++++- src/brod_topic_subscriber.erl | 17 +++++++-- test/brod_topic_subscriber_SUITE.erl | 55 ++++++++++++++++++++++++++++ 3 files changed, 79 insertions(+), 4 deletions(-) diff --git a/src/brod_group_subscriber_worker.erl b/src/brod_group_subscriber_worker.erl index 4f0c40ae..7dd69f9f 100644 --- a/src/brod_group_subscriber_worker.erl +++ b/src/brod_group_subscriber_worker.erl @@ -22,7 +22,7 @@ -include("brod_int.hrl"). %% brod_topic_subscriber callbacks --export([init/2, handle_message/3, terminate/2]). +-export([init/2, handle_message/3, handle_info/2, terminate/2]). -type start_options() :: #{ group_id := brod:group_id() @@ -91,6 +91,15 @@ handle_message(_Partition, Msg, State) -> {ok, NewState} end. +handle_info(Info, #state{cb_module = CbModule , cb_state = CbState} = State) -> + %% Any unhandled messages are forwarded to the callback module to + %% support arbitrary message-passing. + %% Only the {noreply, State} return value is supported. + case brod_utils:optional_callback(CbModule, handle_info, [Info, CbState], {noreply, CbState}) of + {noreply, NewCbState} -> + {noreply, State#state{cb_state = NewCbState}} + end. + terminate(Reason, #state{cb_module = CbModule, cb_state = State}) -> brod_utils:optional_callback(CbModule, terminate, [Reason, State], ok). diff --git a/src/brod_topic_subscriber.erl b/src/brod_topic_subscriber.erl index 08f6512d..9b8add98 100644 --- a/src/brod_topic_subscriber.erl +++ b/src/brod_topic_subscriber.erl @@ -108,7 +108,12 @@ %% This callback is called before stopping the subscriber -callback terminate(_Reason, cb_state()) -> _. --optional_callbacks([terminate/2]). +%% This callback is called when the subscriber receives a message unrelated to +%% the subscription. +%% The callback must return `{noreply, NewCallbackState}'. +-callback handle_info(_Msg, cb_state()) -> {noreply, cb_state()}. + +-optional_callbacks([terminate/2, handle_info/2]). %%%_* Types and macros ========================================================= @@ -357,8 +362,14 @@ handle_info({'DOWN', _Mref, process, Pid, Reason}, %% not a consumer pid {noreply, State} end; -handle_info(_Info, State) -> - {noreply, State}. +handle_info(Info, #state{cb_module = CbModule, cb_state = CbState} = State) -> + %% Any unhandled messages are forwarded to the callback module to + %% support arbitrary message-passing. + %% Only the {noreply, State} return value is supported. + case brod_utils:optional_callback(CbModule, handle_info, [Info, CbState], {noreply, CbState}) of + {noreply, NewCbState} -> + {noreply, State#state{cb_state = NewCbState}} + end. %% @private handle_call(Call, _From, State) -> diff --git a/test/brod_topic_subscriber_SUITE.erl b/test/brod_topic_subscriber_SUITE.erl index 9c022a9d..49f72e91 100644 --- a/test/brod_topic_subscriber_SUITE.erl +++ b/test/brod_topic_subscriber_SUITE.erl @@ -30,6 +30,7 @@ -export([ init/2 , terminate/2 , handle_message/3 + , handle_info/2 ]). %% Test cases @@ -40,6 +41,7 @@ , t_callback_crash/1 , t_begin_offset/1 , t_cb_fun/1 + , t_consumer_ack_via_message_passing/1 ]). -include("brod_test_setup.hrl"). @@ -107,6 +109,21 @@ handle_message(Partition, Message, #state{ is_async_ack = IsAsyncAck false -> {ok, ack, State} end. +handle_info({ack_offset, Partition, Offset} = Msg, #state{ counter = Counter + , worker_id = Ref + } = State0) -> + %% Participate in state continuity checks + ?tp(topic_subscriber_seen_info, + #{ partition => Partition + , offset => Offset + , msg => Msg + , state => Counter + , worker_id => Ref + }), + State = State0#state{counter = Counter + 1}, + ok = brod_topic_subscriber:ack(self(), Partition, Offset), + {noreply, State}. + terminate(Reason, #state{worker_id = Ref, counter = Counter}) -> ?tp(topic_subscriber_terminate, #{ worker_id => Ref @@ -184,6 +201,44 @@ t_async_acks(Config) when is_list(Config) -> check_init_terminate(Trace) end). +t_consumer_ack_via_message_passing(Config) when is_list(Config) -> + %% Process messages one by one with no prefetch + ConsumerConfig = [ {prefetch_count, 0} + , {prefetch_bytes, 0} + , {sleep_timeout, 0} + , {max_bytes, 0} + ], + Partition = 0, + SendFun = + fun(I) -> + produce({?topic, Partition}, <>) + end, + ?check_trace( + %% Run stage: + begin + O0 = SendFun(0), + %% Send two messages + Offset0 = SendFun(1), + _Offset1 = SendFun(2), + InitArgs = {_IsAsyncAck = true, + _ConsumerOffsets = [{0, O0}]}, + {ok, SubscriberPid} = + brod:start_link_topic_subscriber(?CLIENT_ID, ?topic, ConsumerConfig, + ?MODULE, InitArgs), + {ok, _} = wait_message(<<1>>), + %% ack_offset allows consumer to proceed to message 2 + SubscriberPid ! {ack_offset, 0, Offset0}, + {ok, _} = wait_message(<<2>>), + ok = brod_topic_subscriber:stop(SubscriberPid), + _Expected = [<<1>>, <<2>>] + end, + %% Check stage: + fun(Expected, Trace) -> + check_received_messages(Expected, Trace), + check_state_continuity(Trace), + check_init_terminate(Trace) + end). + t_begin_offset(Config) when is_list(Config) -> ConsumerConfig = [ {prefetch_count, 100} , {prefetch_bytes, 0} %% as discard From f32caafc49e6582e6a3c0c05676d2fb4bd5fb13d Mon Sep 17 00:00:00 2001 From: urmastalimaa Date: Fri, 7 Jun 2024 17:35:27 +0300 Subject: [PATCH 2/2] Add changelog for version 3.19.0 --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1feefd57..820e4874 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +- 3.19.0 + - Forward unhandled messages in topic/group consumer processes to handle_info/2 callbacks + in order to support arbitrary message passing [PR#580](https://github.com/kafka4beam/brod/pull/580) + - 3.18.0 - Add transactional APIs. [PR#549](https://github.com/kafka4beam/brod/pull/549) - Fix unnecessary group coordinator restart due to `hb_timeout` exception. [PR#578](https://github.com/kafka4beam/brod/pull/578)