From 7353dc1b5de7bf47c63f98001e3a4b27fcd0c000 Mon Sep 17 00:00:00 2001 From: Indrek Juhkam Date: Tue, 2 Jul 2024 19:45:13 +0300 Subject: [PATCH] Expose ack_fun through the init_info `commit_fun` is already exposed, but `ack_fun` is not. This is useful when acking does not happen inside the worker process. E.g. We divide messages between multiple processes and later in a separate process we create a transaction and commit a batch. Now we need to let the consumer know that it's okay to fetch more messages. --- src/brod_group_subscriber_v2.erl | 9 ++++++--- src/brod_group_subscriber_worker.erl | 2 +- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/brod_group_subscriber_v2.erl b/src/brod_group_subscriber_v2.erl index 1fb84014..c1e43fdd 100644 --- a/src/brod_group_subscriber_v2.erl +++ b/src/brod_group_subscriber_v2.erl @@ -55,6 +55,7 @@ -export_type([ init_info/0 , subscriber_config/0 , commit_fun/0 + , ack_fun/0 ]). -include("brod_int.hrl"). @@ -71,12 +72,14 @@ }. -type commit_fun() :: fun((brod:offset()) -> ok). +-type ack_fun() :: fun((brod:offset()) -> ok). -type init_info() :: #{ group_id := brod:group_id() , topic := brod:topic() , partition := brod:partition() , commit_fun := commit_fun() + , ack_fun := ack_fun() }. -type member_id() :: brod:group_member_id(). @@ -472,15 +475,15 @@ maybe_start_worker( _MemberId State; _ -> Self = self(), - CommitFun = fun(Offset) -> - commit(Self, Topic, Partition, Offset) - end, + CommitFun = fun(Offset) -> commit(Self, Topic, Partition, Offset) end, + AckFun = fun(Offset) -> ack(Self, Topic, Partition, Offset) end, StartOptions = #{ cb_module => CbModule , cb_config => CbConfig , partition => Partition , begin_offset => BeginOffset , group_id => GroupId , commit_fun => CommitFun + , ack_fun => AckFun , topic => Topic }, {ok, Pid} = start_worker( Client diff --git a/src/brod_group_subscriber_worker.erl b/src/brod_group_subscriber_worker.erl index 7dd69f9f..b93eee52 100644 --- a/src/brod_group_subscriber_worker.erl +++ b/src/brod_group_subscriber_worker.erl @@ -54,7 +54,7 @@ init(Topic, StartOpts) -> , begin_offset := BeginOffset , commit_fun := CommitFun } = StartOpts, - InitInfo = maps:with( [topic, partition, group_id, commit_fun] + InitInfo = maps:with( [topic, partition, group_id, commit_fun, ack_fun] , StartOpts ), ?BROD_LOG_INFO("Starting group_subscriber_worker: ~p~n"