Skip to content

Commit

Permalink
Expose ack_fun through the init_info
Browse files Browse the repository at this point in the history
`commit_fun` is already exposed, but `ack_fun` is not. This is useful
when acking does not happen inside the worker process.

E.g. We divide messages between multiple processes and later in a
separate process we create a transaction and commit a batch. Now we need
to let the consumer know that it's okay to fetch more messages.
  • Loading branch information
indrekj committed Jul 2, 2024
1 parent f8aec1a commit d21e32c
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 4 deletions.
9 changes: 6 additions & 3 deletions src/brod_group_subscriber_v2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
-export_type([ init_info/0
, subscriber_config/0
, commit_fun/0
, ack_fun/0
]).

-include("brod_int.hrl").
Expand All @@ -71,12 +72,14 @@
}.

-type commit_fun() :: fun((brod:offset()) -> ok).
-type ack_fun() :: fun((brod:offset()) -> ok).

-type init_info() ::
#{ group_id := brod:group_id()
, topic := brod:topic()
, partition := brod:partition()
, commit_fun := commit_fun()
, ack_fun := ack_fun()
}.

-type member_id() :: brod:group_member_id().
Expand Down Expand Up @@ -472,15 +475,15 @@ maybe_start_worker( _MemberId
State;
_ ->
Self = self(),
CommitFun = fun(Offset) ->
commit(Self, Topic, Partition, Offset)
end,
CommitFun = fun(Offset) -> commit(Self, Topic, Partition, Offset) end,
AckFun = fun(Offset) -> ack(Self, Topic, Partition, Offset) end,
StartOptions = #{ cb_module => CbModule
, cb_config => CbConfig
, partition => Partition
, begin_offset => BeginOffset
, group_id => GroupId
, commit_fun => CommitFun
, ack_fun => AckFun
, topic => Topic
},
{ok, Pid} = start_worker( Client
Expand Down
3 changes: 2 additions & 1 deletion src/brod_group_subscriber_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
, cb_module := module()
, cb_config := term()
, commit_fun := brod_group_subscriber_v2:commit_fun()
, ack_fun := brod_group_subscriber_v2:ack_fun()
}.

-record(state,
Expand All @@ -54,7 +55,7 @@ init(Topic, StartOpts) ->
, begin_offset := BeginOffset
, commit_fun := CommitFun
} = StartOpts,
InitInfo = maps:with( [topic, partition, group_id, commit_fun]
InitInfo = maps:with( [topic, partition, group_id, commit_fun, ack_fun]
, StartOpts
),
?BROD_LOG_INFO("Starting group_subscriber_worker: ~p~n"
Expand Down

0 comments on commit d21e32c

Please sign in to comment.