Skip to content

Commit

Permalink
refactor: use shared ets table for partition-producers
Browse files Browse the repository at this point in the history
Prior to this change, a unique atom is required to start
supervised producers.
This PR changes to use a shared ETS table for partition-producer.
To be backward compabtible, the old 'name' option in producer
config is still required to be used as the namespace for
partitions in the shared table.
  • Loading branch information
zmstone committed Nov 6, 2023
1 parent c40a731 commit c9c5b6b
Show file tree
Hide file tree
Showing 9 changed files with 127 additions and 75 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,10 @@ wolff:send(Producers, [Msg], AckFun).
`fun((PartitionCount, [msg()]) -> partition())`: Caller defined callback.
`partition()`: Caller specified exact partition.

* `name`: default=`wolff_producers`
Atom used to register producer manager process when starting producers
under wolff's supervision tree. It is also used as the ets table name
for producer worker lookup.
* `name`: `atom() | binary()`, Mandatory when started under wolff's supervision tree.
The name, (eg. `{clientid}-{topicname}`) should be globally unique as it
is used as the namespace for partition-producder process registration.
An atom name is also used to register `wolff_producers` process.

* `partition_count_refresh_interval_seconds`: default=`300` (5 minutes)
Non-negative integer to refresh topic metadata in order to auto-discover newly added partitions.
Expand Down
3 changes: 3 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
- No global stats collection by default.
There is a ets table based stats collector to record the number of sent bytes and messages. Consider this feature deprecated.
Since 1.7.0, there there is a better integration for metrics.
- For supervised producers, use a global ets table (named `wolff_producers_global`) to store producer workers.
This should avoid having to create an atom for each supervised topic producer.

* 1.8.0
- Add wolff:check_if_topic_exists/2 for checking if a topic exists making use of an existing client process. [#52](https://github.com/kafka4beam/wolff/pull/52)
- Improved logs when reporting connection errors. (merged 1.5.12)
Expand Down
3 changes: 0 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
container_name: wolff-zk
kafka_1:
depends_on:
Expand Down Expand Up @@ -35,4 +33,3 @@ services:
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DEFAULT:PLAINTEXT,INTRA:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTRA
KAFKA_CREATE_TOPICS: test-topic-r2:1:2

1 change: 1 addition & 0 deletions include/wolff.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@
%% Since Kafka 2.4, it has been extended to 1048588.
%% We keep it backward compatible here.
-define(WOLFF_KAFKA_DEFAULT_MAX_MESSAGE_BYTES, 1000000).
-define(WOLFF_PRODUCERS_GLOBAL_TABLE, wolff_producers_global).
-endif.
2 changes: 1 addition & 1 deletion src/wolff.erl
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
-type host() :: kpro:endpoint().
-type topic() :: kpro:topic().
-type partition() :: kpro:partition().
-type name() :: atom().
-type name() :: atom() | binary().
-type offset() :: kpro:offset().
-type offset_reply() :: offset() | buffer_overflow_discarded.
-type producers_cfg() :: wolff_producers:config().
Expand Down
130 changes: 85 additions & 45 deletions src/wolff_producers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,24 @@
-export([start_link/3]).
-export([start_linked_producers/3, stop_linked/1]).
-export([start_supervised/3, stop_supervised/1, stop_supervised/3]).
-export([pick_producer/2, lookup_producer/2]).
-export([pick_producer/2, lookup_producer/2, cleanup_workers_table/1]).

%% gen_server callbacks
-export([code_change/3, handle_call/3, handle_cast/2, handle_info/2, init/1, terminate/2]).

%% tests
-export([find_producer_by_partition/2]).

-export_type([producers/0, config/0]).

-include("wolff.hrl").
-include_lib("stdlib/include/ms_transform.hrl").

-opaque producers() ::
#{workers := #{partition() => pid()} | ets:tab(),
#{workers := #{partition() => pid()} | wolff:name(),
partitioner := partitioner(),
client_id => wolff:client_id(),
topic => kpro:topic()
client_id := wolff:client_id(),
topic := kpro:topic()
}.

-type topic() :: kpro:topic().
Expand All @@ -53,15 +57,21 @@
-define(init_producers, init_producers).
-define(init_producers_delay, 1000).
-define(not_initialized, not_initialized).
-define(initialized, initialized).
-define(partition_count_refresh_interval_seconds, 300).
-define(refresh_partition_count, refresh_partition_count).

%% @doc start wolff_producdrs gen_server
%% @doc Called by wolff_producers_sup to start wolff_producers process.
start_link(ClientId, Topic, Config) ->
Name = get_name(Config),
gen_server:start_link({local, Name}, ?MODULE, {ClientId, Topic, Config}, []).
case is_atom(Name) of
true ->
gen_server:start_link({local, Name}, ?MODULE, {ClientId, Topic, Config}, []);
false ->
gen_server:start_link(?MODULE, {ClientId, Topic, Config}, [])
end.

%% @doc start wolff_producdrs gen_server
%% @doc Start wolff_producer processes linked to caller.
-spec start_linked_producers(wolff:client_id() | pid(), topic(), config()) ->
{ok, producers()} | {error, any()}.
start_linked_producers(ClientId, Topic, ProducerCfg) when is_binary(ClientId) ->
Expand All @@ -80,7 +90,8 @@ start_linked_producers(ClientId, ClientPid, Topic, ProducerCfg) ->
{ok, #{client_id => ClientId,
topic => Topic,
workers => Workers,
partitioner => Partitioner}};
partitioner => Partitioner
}};
{error, Reason} ->
{error, Reason}
end.
Expand All @@ -102,11 +113,11 @@ start_supervised(ClientId, Topic, ProducerCfg) ->
%% for this topic.
_ = wolff_producers_sup:ensure_absence(ClientId, get_name(ProducerCfg)),
{error, failed_to_initialize_producers_in_time};
Ets ->
_ ->
{ok, #{client_id => ClientId,
topic => Topic,
workers => Ets,
partitioner => maps:get(partitioner, ProducerCfg, random)
topic => Topic,
workers => get_name(ProducerCfg),
partitioner => maps:get(partitioner, ProducerCfg, random)
}}
end;
{error, Reason} ->
Expand All @@ -115,13 +126,13 @@ start_supervised(ClientId, Topic, ProducerCfg) ->

%% @doc Ensure workers and clean up meta data.
-spec stop_supervised(producers()) -> ok.
stop_supervised(#{client_id := ClientId, workers := NamedEts, topic := Topic}) ->
stop_supervised(ClientId, Topic, NamedEts).
stop_supervised(#{client_id := ClientId, workers := Name, topic := Topic}) ->
stop_supervised(ClientId, Topic, Name).

%% @doc Ensure workers and clean up meta data.
-spec stop_supervised(wolff:client_id(), topic(), wolff:name()) -> ok.
stop_supervised(ClientId, Topic, NamedEts) ->
wolff_producers_sup:ensure_absence(ClientId, NamedEts),
stop_supervised(ClientId, Topic, Name) ->
wolff_producers_sup:ensure_absence(ClientId, Name),
case wolff_client_sup:find_client(ClientId) of
{ok, Pid} ->
ok = wolff_client:delete_producers_metadata(Pid, Topic);
Expand Down Expand Up @@ -173,8 +184,8 @@ lookup_producer(#{workers := Workers}, Partition) ->
lookup_producer(Workers, Partition);
lookup_producer(Workers, Partition) when is_map(Workers) ->
maps:get(Partition, Workers);
lookup_producer(Workers, Partition) ->
[{Partition, Pid}] = ets:lookup(Workers, Partition),
lookup_producer(Name, Partition) ->
{ok, Pid} = find_producer_by_partition(Name, Partition),
Pid.

pick_partition(_Count, Partition, _) when is_integer(Partition) ->
Expand Down Expand Up @@ -204,7 +215,7 @@ init({ClientId, Topic, Config}) ->
client_pid => false,
topic => Topic,
config => Config,
ets => ?not_initialized,
producers_status => ?not_initialized,
refresh_tref => start_partition_refresh_timer(Config)
}}.

Expand Down Expand Up @@ -247,37 +258,37 @@ handle_info({'DOWN', _, process, Pid, Reason}, #{client_id := ClientId,
%% expect their 'EXIT' signals soon
{noreply, ensure_rediscover_client_timer(St#{client_pid := false})};
handle_info({'EXIT', Pid, Reason},
#{ets := Ets,
topic := Topic,
#{topic := Topic,
client_id := ClientId,
client_pid := ClientPid,
config := Config
} = St) ->
case ets:match(Ets, {'$1', Pid}) of
Name = get_name(Config),
case find_producer_by_pid(Name, Pid) of
[] ->
%% this should not happen, hence error level
log_error("unknown_EXIT_message", #{pid => Pid, reason => Reason});
[[Partition]] ->
[Partition] ->
case is_alive(ClientPid) of
true ->
%% wolff_producer is not designed to crash & restart
%% if this happens, it's likely a bug in wolff_producer module
log_error("producer_down",
#{topic => Topic, partition => Partition,
partition_worker => Pid, reason => Reason}),
ok = start_producer_and_insert_pid(Ets, ClientId, Topic, Partition, Config);
ok = start_producer_and_insert_pid(ClientId, Topic, Partition, Config);
false ->
%% no client, restart will be triggered when client connection is back.
ets:insert(Ets, {Partition, ?down(Reason)})
insert_producers(get_name(Config), #{Partition => ?down(Reason)})
end
end,
{noreply, St};
handle_info(Info, St) ->
log_error("unknown_info", #{info => Info}),
{noreply, St}.

handle_call(get_workers, _From, #{ets := Ets} = St) ->
{reply, Ets, St};
handle_call(get_workers, _From, #{producers_status := Status} = St) ->
{reply, Status, St};
handle_call(Call, From, St) ->
log_error("unknown_call", #{call => Call, from => From}),
{reply, {error, unknown_call}, St}.
Expand All @@ -289,7 +300,8 @@ handle_cast(Cast, St) ->
code_change(_OldVsn, St, _Extra) ->
{ok, St}.

terminate(_, _St) -> ok.
terminate(_, #{config := Config}) ->
ok = cleanup_workers_table(get_name(Config)).

ensure_rediscover_client_timer(#{?rediscover_client_tref := false} = St) ->
Tref = erlang:send_after(?rediscover_client_delay, self(), ?rediscover_client),
Expand All @@ -312,16 +324,15 @@ start_link_producers(ClientId, Topic, Connections, Config) ->
Acc#{Partition => WorkerPid}
end, #{}, Connections).

maybe_init_producers(#{ets := ?not_initialized,
maybe_init_producers(#{producers_status := ?not_initialized,
topic := Topic,
client_id := ClientId,
config := Config
} = St) ->
case start_linked_producers(ClientId, Topic, Config) of
{ok, #{workers := Workers}} ->
Ets = ets:new(get_name(Config), [protected, named_table, {read_concurrency, true}]),
true = ets:insert(Ets, maps:to_list(Workers)),
St#{ets := Ets};
ok = insert_producers(get_name(Config), Workers),
St#{producers_status := ?initialized};
{error, Reason} ->
log_error("failed_to_init_producers", #{topic => Topic, reason => Reason}),
erlang:send_after(?init_producers_delay, self(), ?init_producers),
Expand All @@ -330,28 +341,56 @@ maybe_init_producers(#{ets := ?not_initialized,
maybe_init_producers(St) ->
St.

maybe_restart_producers(#{ets := ?not_initialized} = St) -> St;
maybe_restart_producers(#{ets := Ets,
client_id := ClientId,
maybe_restart_producers(#{producers_status := ?not_initialized} = St) -> St;
maybe_restart_producers(#{client_id := ClientId,
topic := Topic,
config := Config
} = St) ->
Producers = find_producers_by_name(get_name(Config)),
lists:foreach(
fun({Partition, Pid}) ->
case is_alive(Pid) of
true -> ok;
false -> start_producer_and_insert_pid(Ets, ClientId, Topic, Partition, Config)
false -> start_producer_and_insert_pid(ClientId, Topic, Partition, Config)
end
end, ets:tab2list(Ets)),
end, Producers),
St.

-spec cleanup_workers_table(wolff:name()) -> ok.
cleanup_workers_table(Name) ->
Ms = ets:fun2ms(fun({{N, _Partition}, _Pid}) when N =:= Name -> true end),
ets:select_delete(?WOLFF_PRODUCERS_GLOBAL_TABLE, Ms),
ok.

find_producer_by_partition(Name, Partition) ->
case ets:lookup(?WOLFF_PRODUCERS_GLOBAL_TABLE, {Name, Partition}) of
[{_, Pid}] ->
{ok, Pid};
[] ->
{error, not_found}
end.

find_producers_by_name(Name) ->
Ms = ets:fun2ms(fun({{N, Partition}, Pid}) when N =:= Name -> {Partition, Pid} end),
ets:select(?WOLFF_PRODUCERS_GLOBAL_TABLE, Ms).

find_producer_by_pid(Name, Pid) ->
Ms = ets:fun2ms(fun({{N, Partition}, P}) when N =:= Name andalso P =:= Pid -> Partition end),
ets:select(?WOLFF_PRODUCERS_GLOBAL_TABLE, Ms).

insert_producers(Name, Workers0) ->
Workers = lists:map(fun({Partition, Pid}) ->
{{Name, Partition}, Pid}
end, maps:to_list(Workers0)),
true = ets:insert(?WOLFF_PRODUCERS_GLOBAL_TABLE, Workers),
ok.

get_name(Config) -> maps:get(name, Config, ?MODULE).

start_producer_and_insert_pid(Ets, ClientId, Topic, Partition, Config) ->
start_producer_and_insert_pid(ClientId, Topic, Partition, Config) ->
{ok, Pid} = wolff_producer:start_link(ClientId, Topic, Partition,
?conn_down(to_be_discovered), Config),
ets:insert(Ets, {Partition, Pid}),
ok.
ok = insert_producers(get_name(Config), #{Partition => Pid}).

%% Config is not used so far.
start_partition_refresh_timer(Config) ->
Expand All @@ -368,7 +407,7 @@ start_partition_refresh_timer(Config) ->
refresh_partition_count(#{client_pid := Pid} = St) when not is_pid(Pid) ->
%% client is to be (re)discovered
St;
refresh_partition_count(#{ets := ?not_initialized} = St) ->
refresh_partition_count(#{producers_status := ?not_initialized} = St) ->
%% to be initialized
St;
refresh_partition_count(#{client_pid := Pid, topic := Topic} = St) ->
Expand All @@ -383,15 +422,16 @@ refresh_partition_count(#{client_pid := Pid, topic := Topic} = St) ->

start_new_producers(#{client_id := ClientId,
topic := Topic,
config := Config,
ets := Ets
config := Config
} = St, Connections0) ->
NowCount = length(Connections0),
%% process only the newly discovered connections
F = fun({Partition, _MaybeConnPid}) -> [] =:= ets:lookup(Ets, Partition) end,
F = fun({Partition, _MaybeConnPid}) ->
{error, not_found} =:= find_producer_by_partition(get_name(Config), Partition)
end,
Connections = lists:filter(F, Connections0),
Workers = start_link_producers(ClientId, Topic, Connections, Config),
true = ets:insert(Ets, maps:to_list(Workers)),
ok = insert_producers(get_name(Config), Workers),
OldCount = partition_cnt(ClientId, Topic),
case OldCount < NowCount of
true ->
Expand Down
18 changes: 9 additions & 9 deletions src/wolff_producers_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
-export([ensure_present/3, ensure_absence/2]).

-define(SUPERVISOR, ?MODULE).
-define(WORKER_ID(ClientId, Topic), {ClientId, Topic}).

start_link() ->
supervisor:start_link({local, ?SUPERVISOR}, ?MODULE, []).
Expand All @@ -47,19 +46,20 @@ ensure_present(ClientId, Topic, Config) ->

%% ensure client stopped and deleted under supervisor
-spec ensure_absence(wolff:client_id(), wolff:name()) -> ok.
ensure_absence(ClientId, Name) ->
Id = ?WORKER_ID(ClientId, Name),
ensure_absence(_ClientId, Name) ->
Id = Name,
case supervisor:terminate_child(?SUPERVISOR, Id) of
ok -> ok = supervisor:delete_child(?SUPERVISOR, Id);
{error, not_found} -> ok
ok ->
ok = wolff_producers:cleanup_workers_table(Name),
ok = supervisor:delete_child(?SUPERVISOR, Id);
{error, not_found} ->
ok
end.

child_spec(ClientId, Topic, Config) ->
#{id => ?WORKER_ID(ClientId, get_name(Config)),
child_spec(ClientId, Topic, #{name := Name} = Config) ->
#{id => Name,
start => {wolff_producers, start_link, [ClientId, Topic, Config]},
restart => transient,
type => worker,
modules => [wolff_producers]
}.

get_name(Config) -> maps:get(name, Config, wolff_producers).
3 changes: 3 additions & 0 deletions src/wolff_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

-behaviour(supervisor).

-include("wolff.hrl").

-export([start_link/0, init/1]).

start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []).
Expand All @@ -25,6 +27,7 @@ init([]) ->
intensity => 10,
period => 5},
Children = [stats_worker(), client_sup(), producers_sup()],
ets:new(?WOLFF_PRODUCERS_GLOBAL_TABLE, [named_table, public, ordered_set, {read_concurrency, true}]),
{ok, {SupFlags, Children}}.

stats_worker() ->
Expand Down
Loading

0 comments on commit c9c5b6b

Please sign in to comment.