diff --git a/src/brod_group_subscriber_v2.erl b/src/brod_group_subscriber_v2.erl index 1fb84014..c1e43fdd 100644 --- a/src/brod_group_subscriber_v2.erl +++ b/src/brod_group_subscriber_v2.erl @@ -55,6 +55,7 @@ -export_type([ init_info/0 , subscriber_config/0 , commit_fun/0 + , ack_fun/0 ]). -include("brod_int.hrl"). @@ -71,12 +72,14 @@ }. -type commit_fun() :: fun((brod:offset()) -> ok). +-type ack_fun() :: fun((brod:offset()) -> ok). -type init_info() :: #{ group_id := brod:group_id() , topic := brod:topic() , partition := brod:partition() , commit_fun := commit_fun() + , ack_fun := ack_fun() }. -type member_id() :: brod:group_member_id(). @@ -472,15 +475,15 @@ maybe_start_worker( _MemberId State; _ -> Self = self(), - CommitFun = fun(Offset) -> - commit(Self, Topic, Partition, Offset) - end, + CommitFun = fun(Offset) -> commit(Self, Topic, Partition, Offset) end, + AckFun = fun(Offset) -> ack(Self, Topic, Partition, Offset) end, StartOptions = #{ cb_module => CbModule , cb_config => CbConfig , partition => Partition , begin_offset => BeginOffset , group_id => GroupId , commit_fun => CommitFun + , ack_fun => AckFun , topic => Topic }, {ok, Pid} = start_worker( Client diff --git a/src/brod_group_subscriber_worker.erl b/src/brod_group_subscriber_worker.erl index 7dd69f9f..ef448046 100644 --- a/src/brod_group_subscriber_worker.erl +++ b/src/brod_group_subscriber_worker.erl @@ -32,6 +32,7 @@ , cb_module := module() , cb_config := term() , commit_fun := brod_group_subscriber_v2:commit_fun() + , ack_fun := brod_group_subscriber_v2:ack_fun() }. -record(state, @@ -54,7 +55,7 @@ init(Topic, StartOpts) -> , begin_offset := BeginOffset , commit_fun := CommitFun } = StartOpts, - InitInfo = maps:with( [topic, partition, group_id, commit_fun] + InitInfo = maps:with( [topic, partition, group_id, commit_fun, ack_fun] , StartOpts ), ?BROD_LOG_INFO("Starting group_subscriber_worker: ~p~n"