diff --git a/README.md b/README.md index 5cd8e94..57a012b 100644 --- a/README.md +++ b/README.md @@ -165,10 +165,10 @@ wolff:send(Producers, [Msg], AckFun). `fun((PartitionCount, [msg()]) -> partition())`: Caller defined callback. `partition()`: Caller specified exact partition. -* `name`: default=`wolff_producers` - Atom used to register producer manager process when starting producers - under wolff's supervision tree. It is also used as the ets table name - for producer worker lookup. +* `name`: `atom() | binary()`, Mandatory when started under wolff's supervision tree. + The name, (eg. `{clientid}-{topicname}`) should be globally unique as it + is used as the namespace for partition-producder process registration. + An atom name is also used to register `wolff_producers` process. * `partition_count_refresh_interval_seconds`: default=`300` (5 minutes) Non-negative integer to refresh topic metadata in order to auto-discover newly added partitions. diff --git a/changelog.md b/changelog.md index 9e0c0b1..a88c051 100644 --- a/changelog.md +++ b/changelog.md @@ -2,6 +2,9 @@ - No global stats collection by default. There is a ets table based stats collector to record the number of sent bytes and messages. Consider this feature deprecated. Since 1.7.0, there there is a better integration for metrics. + - For supervised producers, use a global ets table (named `wolff_producers_global`) to store producer workers. + This should avoid having to create an atom for each supervised topic producer. + * 1.8.0 - Add wolff:check_if_topic_exists/2 for checking if a topic exists making use of an existing client process. [#52](https://github.com/kafka4beam/wolff/pull/52) - Improved logs when reporting connection errors. (merged 1.5.12) diff --git a/docker-compose.yml b/docker-compose.yml index 2bfdadb..d9adeb1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,8 +2,6 @@ version: '2' services: zookeeper: image: wurstmeister/zookeeper - ports: - - "2181:2181" container_name: wolff-zk kafka_1: depends_on: @@ -35,4 +33,3 @@ services: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DEFAULT:PLAINTEXT,INTRA:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INTRA KAFKA_CREATE_TOPICS: test-topic-r2:1:2 - diff --git a/include/wolff.hrl b/include/wolff.hrl index aee6331..0bfe9ae 100644 --- a/include/wolff.hrl +++ b/include/wolff.hrl @@ -11,4 +11,5 @@ %% Since Kafka 2.4, it has been extended to 1048588. %% We keep it backward compatible here. -define(WOLFF_KAFKA_DEFAULT_MAX_MESSAGE_BYTES, 1000000). +-define(WOLFF_PRODUCERS_GLOBAL_TABLE, wolff_producers_global). -endif. diff --git a/src/wolff.erl b/src/wolff.erl index 91908aa..ec37f79 100644 --- a/src/wolff.erl +++ b/src/wolff.erl @@ -50,7 +50,7 @@ -type host() :: kpro:endpoint(). -type topic() :: kpro:topic(). -type partition() :: kpro:partition(). --type name() :: atom(). +-type name() :: atom() | binary(). -type offset() :: kpro:offset(). -type offset_reply() :: offset() | buffer_overflow_discarded. -type producers_cfg() :: wolff_producers:config(). diff --git a/src/wolff_producers.erl b/src/wolff_producers.erl index 930f32e..d3fe1dc 100644 --- a/src/wolff_producers.erl +++ b/src/wolff_producers.erl @@ -19,20 +19,24 @@ -export([start_link/3]). -export([start_linked_producers/3, stop_linked/1]). -export([start_supervised/3, stop_supervised/1, stop_supervised/3]). --export([pick_producer/2, lookup_producer/2]). +-export([pick_producer/2, lookup_producer/2, cleanup_workers_table/1]). %% gen_server callbacks -export([code_change/3, handle_call/3, handle_cast/2, handle_info/2, init/1, terminate/2]). +%% tests +-export([find_producer_by_partition/2]). + -export_type([producers/0, config/0]). -include("wolff.hrl"). +-include_lib("stdlib/include/ms_transform.hrl"). -opaque producers() :: - #{workers := #{partition() => pid()} | ets:tab(), + #{workers := #{partition() => pid()} | wolff:name(), partitioner := partitioner(), - client_id => wolff:client_id(), - topic => kpro:topic() + client_id := wolff:client_id(), + topic := kpro:topic() }. -type topic() :: kpro:topic(). @@ -53,15 +57,21 @@ -define(init_producers, init_producers). -define(init_producers_delay, 1000). -define(not_initialized, not_initialized). +-define(initialized, initialized). -define(partition_count_refresh_interval_seconds, 300). -define(refresh_partition_count, refresh_partition_count). -%% @doc start wolff_producdrs gen_server +%% @doc Called by wolff_producers_sup to start wolff_producers process. start_link(ClientId, Topic, Config) -> Name = get_name(Config), - gen_server:start_link({local, Name}, ?MODULE, {ClientId, Topic, Config}, []). + case is_atom(Name) of + true -> + gen_server:start_link({local, Name}, ?MODULE, {ClientId, Topic, Config}, []); + false -> + gen_server:start_link(?MODULE, {ClientId, Topic, Config}, []) + end. -%% @doc start wolff_producdrs gen_server +%% @doc Start wolff_producer processes linked to caller. -spec start_linked_producers(wolff:client_id() | pid(), topic(), config()) -> {ok, producers()} | {error, any()}. start_linked_producers(ClientId, Topic, ProducerCfg) when is_binary(ClientId) -> @@ -80,7 +90,8 @@ start_linked_producers(ClientId, ClientPid, Topic, ProducerCfg) -> {ok, #{client_id => ClientId, topic => Topic, workers => Workers, - partitioner => Partitioner}}; + partitioner => Partitioner + }}; {error, Reason} -> {error, Reason} end. @@ -102,11 +113,11 @@ start_supervised(ClientId, Topic, ProducerCfg) -> %% for this topic. _ = wolff_producers_sup:ensure_absence(ClientId, get_name(ProducerCfg)), {error, failed_to_initialize_producers_in_time}; - Ets -> + _ -> {ok, #{client_id => ClientId, - topic => Topic, - workers => Ets, - partitioner => maps:get(partitioner, ProducerCfg, random) + topic => Topic, + workers => get_name(ProducerCfg), + partitioner => maps:get(partitioner, ProducerCfg, random) }} end; {error, Reason} -> @@ -115,13 +126,13 @@ start_supervised(ClientId, Topic, ProducerCfg) -> %% @doc Ensure workers and clean up meta data. -spec stop_supervised(producers()) -> ok. -stop_supervised(#{client_id := ClientId, workers := NamedEts, topic := Topic}) -> - stop_supervised(ClientId, Topic, NamedEts). +stop_supervised(#{client_id := ClientId, workers := Name, topic := Topic}) -> + stop_supervised(ClientId, Topic, Name). %% @doc Ensure workers and clean up meta data. -spec stop_supervised(wolff:client_id(), topic(), wolff:name()) -> ok. -stop_supervised(ClientId, Topic, NamedEts) -> - wolff_producers_sup:ensure_absence(ClientId, NamedEts), +stop_supervised(ClientId, Topic, Name) -> + wolff_producers_sup:ensure_absence(ClientId, Name), case wolff_client_sup:find_client(ClientId) of {ok, Pid} -> ok = wolff_client:delete_producers_metadata(Pid, Topic); @@ -173,8 +184,8 @@ lookup_producer(#{workers := Workers}, Partition) -> lookup_producer(Workers, Partition); lookup_producer(Workers, Partition) when is_map(Workers) -> maps:get(Partition, Workers); -lookup_producer(Workers, Partition) -> - [{Partition, Pid}] = ets:lookup(Workers, Partition), +lookup_producer(Name, Partition) -> + {ok, Pid} = find_producer_by_partition(Name, Partition), Pid. pick_partition(_Count, Partition, _) when is_integer(Partition) -> @@ -204,7 +215,7 @@ init({ClientId, Topic, Config}) -> client_pid => false, topic => Topic, config => Config, - ets => ?not_initialized, + producers_status => ?not_initialized, refresh_tref => start_partition_refresh_timer(Config) }}. @@ -247,17 +258,17 @@ handle_info({'DOWN', _, process, Pid, Reason}, #{client_id := ClientId, %% expect their 'EXIT' signals soon {noreply, ensure_rediscover_client_timer(St#{client_pid := false})}; handle_info({'EXIT', Pid, Reason}, - #{ets := Ets, - topic := Topic, + #{topic := Topic, client_id := ClientId, client_pid := ClientPid, config := Config } = St) -> - case ets:match(Ets, {'$1', Pid}) of + Name = get_name(Config), + case find_producer_by_pid(Name, Pid) of [] -> %% this should not happen, hence error level log_error("unknown_EXIT_message", #{pid => Pid, reason => Reason}); - [[Partition]] -> + [Partition] -> case is_alive(ClientPid) of true -> %% wolff_producer is not designed to crash & restart @@ -265,10 +276,10 @@ handle_info({'EXIT', Pid, Reason}, log_error("producer_down", #{topic => Topic, partition => Partition, partition_worker => Pid, reason => Reason}), - ok = start_producer_and_insert_pid(Ets, ClientId, Topic, Partition, Config); + ok = start_producer_and_insert_pid(ClientId, Topic, Partition, Config); false -> %% no client, restart will be triggered when client connection is back. - ets:insert(Ets, {Partition, ?down(Reason)}) + insert_producers(get_name(Config), #{Partition => ?down(Reason)}) end end, {noreply, St}; @@ -276,8 +287,8 @@ handle_info(Info, St) -> log_error("unknown_info", #{info => Info}), {noreply, St}. -handle_call(get_workers, _From, #{ets := Ets} = St) -> - {reply, Ets, St}; +handle_call(get_workers, _From, #{producers_status := Status} = St) -> + {reply, Status, St}; handle_call(Call, From, St) -> log_error("unknown_call", #{call => Call, from => From}), {reply, {error, unknown_call}, St}. @@ -289,7 +300,8 @@ handle_cast(Cast, St) -> code_change(_OldVsn, St, _Extra) -> {ok, St}. -terminate(_, _St) -> ok. +terminate(_, #{config := Config}) -> + ok = cleanup_workers_table(get_name(Config)). ensure_rediscover_client_timer(#{?rediscover_client_tref := false} = St) -> Tref = erlang:send_after(?rediscover_client_delay, self(), ?rediscover_client), @@ -312,16 +324,15 @@ start_link_producers(ClientId, Topic, Connections, Config) -> Acc#{Partition => WorkerPid} end, #{}, Connections). -maybe_init_producers(#{ets := ?not_initialized, +maybe_init_producers(#{producers_status := ?not_initialized, topic := Topic, client_id := ClientId, config := Config } = St) -> case start_linked_producers(ClientId, Topic, Config) of {ok, #{workers := Workers}} -> - Ets = ets:new(get_name(Config), [protected, named_table, {read_concurrency, true}]), - true = ets:insert(Ets, maps:to_list(Workers)), - St#{ets := Ets}; + ok = insert_producers(get_name(Config), Workers), + St#{producers_status := ?initialized}; {error, Reason} -> log_error("failed_to_init_producers", #{topic => Topic, reason => Reason}), erlang:send_after(?init_producers_delay, self(), ?init_producers), @@ -330,28 +341,56 @@ maybe_init_producers(#{ets := ?not_initialized, maybe_init_producers(St) -> St. -maybe_restart_producers(#{ets := ?not_initialized} = St) -> St; -maybe_restart_producers(#{ets := Ets, - client_id := ClientId, +maybe_restart_producers(#{producers_status := ?not_initialized} = St) -> St; +maybe_restart_producers(#{client_id := ClientId, topic := Topic, config := Config } = St) -> + Producers = find_producers_by_name(get_name(Config)), lists:foreach( fun({Partition, Pid}) -> case is_alive(Pid) of true -> ok; - false -> start_producer_and_insert_pid(Ets, ClientId, Topic, Partition, Config) + false -> start_producer_and_insert_pid(ClientId, Topic, Partition, Config) end - end, ets:tab2list(Ets)), + end, Producers), St. +-spec cleanup_workers_table(wolff:name()) -> ok. +cleanup_workers_table(Name) -> + Ms = ets:fun2ms(fun({{N, _Partition}, _Pid}) when N =:= Name -> true end), + ets:select_delete(?WOLFF_PRODUCERS_GLOBAL_TABLE, Ms), + ok. + +find_producer_by_partition(Name, Partition) -> + case ets:lookup(?WOLFF_PRODUCERS_GLOBAL_TABLE, {Name, Partition}) of + [{_, Pid}] -> + {ok, Pid}; + [] -> + {error, not_found} + end. + +find_producers_by_name(Name) -> + Ms = ets:fun2ms(fun({{N, Partition}, Pid}) when N =:= Name -> {Partition, Pid} end), + ets:select(?WOLFF_PRODUCERS_GLOBAL_TABLE, Ms). + +find_producer_by_pid(Name, Pid) -> + Ms = ets:fun2ms(fun({{N, Partition}, P}) when N =:= Name andalso P =:= Pid -> Partition end), + ets:select(?WOLFF_PRODUCERS_GLOBAL_TABLE, Ms). + +insert_producers(Name, Workers0) -> + Workers = lists:map(fun({Partition, Pid}) -> + {{Name, Partition}, Pid} + end, maps:to_list(Workers0)), + true = ets:insert(?WOLFF_PRODUCERS_GLOBAL_TABLE, Workers), + ok. + get_name(Config) -> maps:get(name, Config, ?MODULE). -start_producer_and_insert_pid(Ets, ClientId, Topic, Partition, Config) -> +start_producer_and_insert_pid(ClientId, Topic, Partition, Config) -> {ok, Pid} = wolff_producer:start_link(ClientId, Topic, Partition, ?conn_down(to_be_discovered), Config), - ets:insert(Ets, {Partition, Pid}), - ok. + ok = insert_producers(get_name(Config), #{Partition => Pid}). %% Config is not used so far. start_partition_refresh_timer(Config) -> @@ -368,7 +407,7 @@ start_partition_refresh_timer(Config) -> refresh_partition_count(#{client_pid := Pid} = St) when not is_pid(Pid) -> %% client is to be (re)discovered St; -refresh_partition_count(#{ets := ?not_initialized} = St) -> +refresh_partition_count(#{producers_status := ?not_initialized} = St) -> %% to be initialized St; refresh_partition_count(#{client_pid := Pid, topic := Topic} = St) -> @@ -383,15 +422,16 @@ refresh_partition_count(#{client_pid := Pid, topic := Topic} = St) -> start_new_producers(#{client_id := ClientId, topic := Topic, - config := Config, - ets := Ets + config := Config } = St, Connections0) -> NowCount = length(Connections0), %% process only the newly discovered connections - F = fun({Partition, _MaybeConnPid}) -> [] =:= ets:lookup(Ets, Partition) end, + F = fun({Partition, _MaybeConnPid}) -> + {error, not_found} =:= find_producer_by_partition(get_name(Config), Partition) + end, Connections = lists:filter(F, Connections0), Workers = start_link_producers(ClientId, Topic, Connections, Config), - true = ets:insert(Ets, maps:to_list(Workers)), + ok = insert_producers(get_name(Config), Workers), OldCount = partition_cnt(ClientId, Topic), case OldCount < NowCount of true -> diff --git a/src/wolff_producers_sup.erl b/src/wolff_producers_sup.erl index 86d9da8..e043298 100644 --- a/src/wolff_producers_sup.erl +++ b/src/wolff_producers_sup.erl @@ -21,7 +21,6 @@ -export([ensure_present/3, ensure_absence/2]). -define(SUPERVISOR, ?MODULE). --define(WORKER_ID(ClientId, Topic), {ClientId, Topic}). start_link() -> supervisor:start_link({local, ?SUPERVISOR}, ?MODULE, []). @@ -47,19 +46,20 @@ ensure_present(ClientId, Topic, Config) -> %% ensure client stopped and deleted under supervisor -spec ensure_absence(wolff:client_id(), wolff:name()) -> ok. -ensure_absence(ClientId, Name) -> - Id = ?WORKER_ID(ClientId, Name), +ensure_absence(_ClientId, Name) -> + Id = Name, case supervisor:terminate_child(?SUPERVISOR, Id) of - ok -> ok = supervisor:delete_child(?SUPERVISOR, Id); - {error, not_found} -> ok + ok -> + ok = wolff_producers:cleanup_workers_table(Name), + ok = supervisor:delete_child(?SUPERVISOR, Id); + {error, not_found} -> + ok end. -child_spec(ClientId, Topic, Config) -> - #{id => ?WORKER_ID(ClientId, get_name(Config)), +child_spec(ClientId, Topic, #{name := Name} = Config) -> + #{id => Name, start => {wolff_producers, start_link, [ClientId, Topic, Config]}, restart => transient, type => worker, modules => [wolff_producers] }. - -get_name(Config) -> maps:get(name, Config, wolff_producers). diff --git a/src/wolff_sup.erl b/src/wolff_sup.erl index 4813044..b561702 100644 --- a/src/wolff_sup.erl +++ b/src/wolff_sup.erl @@ -16,6 +16,8 @@ -behaviour(supervisor). +-include("wolff.hrl"). + -export([start_link/0, init/1]). start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). @@ -25,6 +27,7 @@ init([]) -> intensity => 10, period => 5}, Children = [stats_worker(), client_sup(), producers_sup()], + ets:new(?WOLFF_PRODUCERS_GLOBAL_TABLE, [named_table, public, ordered_set, {read_concurrency, true}]), {ok, {SupFlags, Children}}. stats_worker() -> diff --git a/test/wolff_supervised_tests.erl b/test/wolff_supervised_tests.erl index eabd95a..693439b 100644 --- a/test/wolff_supervised_tests.erl +++ b/test/wolff_supervised_tests.erl @@ -17,7 +17,7 @@ supervised_client_test() -> %% start it again should result in the same client pid ?assertEqual({ok, Client}, wolff:ensure_supervised_client(ClientId, ?HOSTS, ClientCfg)), - ProducerCfg0 = producer_config(), + ProducerCfg0 = producer_config(?FUNCTION_NAME), ProducerCfg = ProducerCfg0#{required_acks => leader_only}, {ok, Producers} = wolff:start_producers(Client, <<"test-topic">>, ProducerCfg), Msg = #{key => ?KEY, value => <<"value">>}, @@ -41,7 +41,12 @@ supervised_client_test() -> wolff_tests:deinstall_event_logging(?FUNCTION_NAME), ok. -supervised_producers_test() -> +supervised_producers_test_() -> + [{"atom-name", fun() -> test_supervised_producers(test_producers) end}, + {"binary-name", fun() -> test_supervised_producers(<<"test-producers">>) end} + ]. + +test_supervised_producers(Name) -> CntrEventsTable = ets:new(cntr_events, [public]), wolff_tests:install_event_logging(?FUNCTION_NAME, CntrEventsTable, false), ClientId = <<"supervised-producers">>, @@ -49,10 +54,10 @@ supervised_producers_test() -> {ok, _} = application:ensure_all_started(wolff), ClientCfg = client_config(), {ok, _ClientPid} = wolff:ensure_supervised_client(ClientId, ?HOSTS, ClientCfg), - ProducerCfg0 = producer_config(), + ProducerCfg0 = producer_config(Name), ProducerCfg = ProducerCfg0#{required_acks => all_isr}, {ok, Producers} = wolff:ensure_supervised_producers(ClientId, <<"test-topic">>, ProducerCfg), - {ok, Producers} = wolff:ensure_supervised_producers(ClientId, <<"test-topic">>, ProducerCfg), %% assert + ?assertEqual({ok, Producers}, wolff:ensure_supervised_producers(ClientId, <<"test-topic">>, ProducerCfg)), Msg = #{key => ?KEY, value => <<"value">>}, Self = self(), AckFun = fun(_Partition, _BaseOffset) -> Self ! acked, ok end, @@ -93,7 +98,8 @@ test_client_restart(ClientId, Topic, Partition) -> ProducerCfg = #{replayq_dir => "test-data/client-restart-test", required_acks => all_isr, partitioner => Partition, %% always send to the same partition - partition_count_refresh_interval_seconds => 0 + partition_count_refresh_interval_seconds => 0, + name => ?FUNCTION_NAME }, {ok, Producers} = wolff:ensure_supervised_producers(ClientId, Topic, ProducerCfg), Msg1 = #{key => ?KEY, value => <<"1">>}, @@ -125,7 +131,7 @@ bad_host_test() -> _ = application:stop(wolff), %% ensure stopped {ok, _} = application:ensure_all_started(wolff), {ok, _} = wolff:ensure_supervised_client(ClientId, [{"badhost", 9092}], #{}), - ?assertMatch({error, _}, wolff:ensure_supervised_producers(ClientId, <<"t">>, #{})), + ?assertMatch({error, _}, wolff:ensure_supervised_producers(ClientId, <<"t">>, #{name => ?FUNCTION_NAME})), ok = wolff:stop_and_delete_supervised_client(ClientId). producer_restart_test() -> @@ -141,11 +147,12 @@ producer_restart_test() -> ProducerCfg = #{replayq_dir => "test-data/producer-restart-test", required_acks => all_isr, partitioner => Partition, - reconnect_delay_ms => 0 + reconnect_delay_ms => 0, + name => ?FUNCTION_NAME }, {ok, Producers} = wolff:ensure_supervised_producers(ClientId, Topic, ProducerCfg), - #{workers := Ets} = Producers, - GetPid = fun() -> [{Partition, Pid}] = ets:lookup(Ets, Partition), Pid end, + #{workers := Name} = Producers, + GetPid = fun() -> {ok, Pid} = wolff_producers:find_producer_by_partition(Name, Partition), Pid end, Producer0 = GetPid(), Msg0 = #{key => ?KEY, value => <<"0">>}, {0, Offset0} = wolff_producer:send_sync(Producer0, [Msg0], 2000), @@ -272,7 +279,7 @@ fail_retry_success_test() -> {ok, _} = application:ensure_all_started(wolff), ClientCfg = client_config(), {ok, _ClientPid} = wolff:ensure_supervised_client(ClientId, ?HOSTS, ClientCfg), - ProducerCfg0 = producer_config(), + ProducerCfg0 = producer_config(?FUNCTION_NAME), ProducerCfg = ProducerCfg0#{required_acks => all_isr}, {ok, Producers} = wolff:ensure_supervised_producers(ClientId, <<"test-topic">>, ProducerCfg), Msg = #{key => ?KEY, value => <<"value">>}, @@ -322,7 +329,7 @@ fail_retry_failed_test() -> {ok, _} = application:ensure_all_started(wolff), ClientCfg = client_config(), {ok, _ClientPid} = wolff:ensure_supervised_client(ClientId, ?HOSTS, ClientCfg), - ProducerCfg0 = producer_config(), + ProducerCfg0 = producer_config(?FUNCTION_NAME), ProducerCfg = ProducerCfg0#{required_acks => all_isr}, {ok, Producers} = wolff:ensure_supervised_producers(ClientId, <<"test-topic">>, ProducerCfg), Msg = #{key => ?KEY, value => <<"value">>}, @@ -439,9 +446,10 @@ fetch(Connection, Topic, Partition, Offset, MaxBytes) -> client_config() -> #{}. -producer_config() -> +producer_config(Name) -> #{replayq_dir => "test-data", - enable_global_stats => true + enable_global_stats => true, + name => Name }. key(Name) ->