diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8ec601f..2a560ac 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -11,8 +11,8 @@ jobs: fail-fast: false matrix: otp: - - '24.1' - - '23.3.4.7' + - '26.2' + - '25.3' kafka: - '2.4' - '1.1' diff --git a/changelog.md b/changelog.md index 9bb785b..d1971be 100644 --- a/changelog.md +++ b/changelog.md @@ -1,3 +1,7 @@ +* 1.9.1 + - Use ETS (named `wolff_clients_global`) for client ID registration. + When there are thousands of clients, `supervisor:which_children` becomes quite expensive. + * 1.9.0 - No global stats collection by default. There is a ets table based stats collector to record the number of sent bytes and messages. Consider this feature deprecated. diff --git a/include/wolff.hrl b/include/wolff.hrl index 0bfe9ae..68d67b4 100644 --- a/include/wolff.hrl +++ b/include/wolff.hrl @@ -11,5 +11,18 @@ %% Since Kafka 2.4, it has been extended to 1048588. %% We keep it backward compatible here. -define(WOLFF_KAFKA_DEFAULT_MAX_MESSAGE_BYTES, 1000000). + +%% Table to register ClientID -> Pid mapping. +%% Applications may often need to find the client pid and run +%% some ad-hoc requests e.g. for health check purposes. +%% This talbe helps to avoid calling supervisor:which_children intensively. +-define(WOLFF_CLIENTS_GLOBAL_TABLE, wolff_clients_global). + +%% Table to register {ClientID, Topic, Partition} -> Pid mapping. +%% This allows all producers to share this one ETS table for quick +%% partition-worker lookup. +%% A special record {{ClientId, Topic, partition_count}, Count} +%% is inserted to cache the partition count. -define(WOLFF_PRODUCERS_GLOBAL_TABLE, wolff_producers_global). + -endif. diff --git a/src/wolff.app.src b/src/wolff.app.src index 1a81b38..def3ed5 100644 --- a/src/wolff.app.src +++ b/src/wolff.app.src @@ -1,13 +1,14 @@ {application, wolff, [{description, "Kafka's publisher"}, - {vsn, "1.8.0"}, + {vsn, "1.9.1"}, {registered, []}, {applications, [kernel, stdlib, kafka_protocol, replayq, - telemetry + telemetry, + lc ]}, {env,[]}, {mod, {wolff_app, start}}, diff --git a/src/wolff.appup.src b/src/wolff.appup.src index ab8a733..312eb38 100644 --- a/src/wolff.appup.src +++ b/src/wolff.appup.src @@ -1,5 +1,5 @@ %% -*- mode: erlang; -*- -{"1.8.0", +{"1.9.1", [ ], [ diff --git a/src/wolff.erl b/src/wolff.erl index ec37f79..bd0a62f 100644 --- a/src/wolff.erl +++ b/src/wolff.erl @@ -27,6 +27,8 @@ %% Supervised producer management APIs -export([ensure_supervised_producers/3, stop_and_delete_supervised_producers/1, + stop_and_delete_supervised_producers/2, + %% /3 is deprecated, call /2 instead stop_and_delete_supervised_producers/3 ]). @@ -44,7 +46,9 @@ -export([get_producer/2]). -export_type([client_id/0, host/0, producers/0, msg/0, ack_fun/0, partitioner/0, - name/0, offset_reply/0]). + name/0, offset_reply/0, topic/0]). + +-deprecated({stop_and_delete_supervised_producers, 3}). -type client_id() :: binary(). -type host() :: kpro:endpoint(). @@ -92,10 +96,14 @@ stop_producers(Producers) -> ensure_supervised_producers(ClientId, Topic, ProducerCfg) -> wolff_producers:start_supervised(ClientId, Topic, ProducerCfg). -%% @doc Ensure supervised producers are stopped then deleted. +%% @hidden Deprecated. -spec stop_and_delete_supervised_producers(client_id(), topic(), name()) -> ok. -stop_and_delete_supervised_producers(ClientId, Topic, Name) -> - wolff_producers:stop_supervised(ClientId, Topic, Name). +stop_and_delete_supervised_producers(ClientId, Topic, _Name) -> + stop_and_delete_supervised_producers(ClientId, Topic). + +%% @doc Ensure supervised producers are stopped then deleted. +stop_and_delete_supervised_producers(ClientId, Topic) -> + wolff_producers:stop_supervised(ClientId, Topic). %% @doc Ensure supervised producers are stopped then deleted. -spec stop_and_delete_supervised_producers(wolff_producers:producers()) -> ok. diff --git a/src/wolff_client.erl b/src/wolff_client.erl index ae35bc0..768b8f6 100644 --- a/src/wolff_client.erl +++ b/src/wolff_client.erl @@ -146,8 +146,9 @@ recv_leader_connection(Client, Topic, Partition, Pid) -> delete_producers_metadata(Client, Topic) -> gen_server:cast(Client, {delete_producers_metadata, Topic}). -init(St) -> +init(#{client_id := ClientID} = St) -> erlang:process_flag(trap_exit, true), + ok = wolff_client_sup:register_client(ClientID), {ok, St}. handle_call(Call, From, #{connect := _Fun} = St) -> @@ -203,7 +204,8 @@ handle_cast(_Cast, St) -> code_change(_OldVsn, St, _Extra) -> {ok, St}. -terminate(_, #{conns := Conns} = St) -> +terminate(_, #{client_id := ClientID, conns := Conns} = St) -> + ok = wolff_client_sup:deregister_client(ClientID), ok = close_connections(Conns), {ok, St#{conns := #{}}}. diff --git a/src/wolff_client_sup.erl b/src/wolff_client_sup.erl index 61b481c..c16beef 100644 --- a/src/wolff_client_sup.erl +++ b/src/wolff_client_sup.erl @@ -19,6 +19,9 @@ -export([start_link/0, init/1]). -export([ensure_present/3, ensure_absence/1, find_client/1]). +-export([register_client/1, deregister_client/1]). + +-include("wolff.hrl"). -define(SUPERVISOR, ?MODULE). @@ -29,10 +32,11 @@ init([]) -> intensity => 10, period => 5 }, + ok = create_clients_table(), Children = [], %% dynamically added/stopped {ok, {SupFlags, Children}}. -%% ensure a client started under supervisor +%% @doc Ensure a client started under supervisor. -spec ensure_present(wolff:client_id(), [wolff:host()], wolff_client:config()) -> {ok, pid()} | {error, client_not_running}. ensure_present(ClientId, Hosts, Config) -> @@ -43,27 +47,34 @@ ensure_present(ClientId, Hosts, Config) -> {error, already_present} -> {error, client_not_running} end. -%% ensure client stopped and deleted under supervisor +%% @doc Ensure client stopped and deleted under supervisor. -spec ensure_absence(wolff:client_id()) -> ok. ensure_absence(ClientId) -> case supervisor:terminate_child(?SUPERVISOR, ClientId) of ok -> ok = supervisor:delete_child(?SUPERVISOR, ClientId); {error, not_found} -> ok - end. + end, + %% wolff_client process' terminate callback deregisters itself + %% but we make sure it's deregistered in case the client is killed + ok = deregister_client(ClientId). -%% find client pid from client id +%% @doc Find client pid from client id. -spec find_client(wolff:client_id()) -> {ok, pid()} | {error, any()}. -find_client(ClientId) -> - Children = supervisor:which_children(?SUPERVISOR), - case lists:keyfind(ClientId, 1, Children) of - {ClientId, Client, _, _} when is_pid(Client) -> - {ok, Client}; - {ClientId, Restarting, _, _} -> - {error, Restarting}; - false -> - {error, no_such_client} - end. +find_client(ClientID) -> + try + case ets:lookup(?WOLFF_CLIENTS_GLOBAL_TABLE, ClientID) of + [{ClientID, Pid}] -> + {ok, Pid}; + [] -> + {error, no_such_client} + end + catch + error : badarg -> + {error, client_supervisor_not_initialized} + end. + +%% @private Make supervisor child spec. child_spec(ClientId, Hosts, Config) -> #{id => ClientId, start => {wolff_client, start_link, [ClientId, Hosts, Config]}, @@ -71,3 +82,22 @@ child_spec(ClientId, Hosts, Config) -> type => worker, modules => [wolff_client] }. + +%% @doc Create a ets table which is used for client registration. +%% Records are of format: {ClientId, Pid} +create_clients_table() -> + EtsName = ?WOLFF_CLIENTS_GLOBAL_TABLE, + EtsOpts = [named_table, public, ordered_set, {read_concurrency, true}], + EtsName = ets:new(EtsName, EtsOpts), + ok. + +%% @doc Insert the client in the registration table. +register_client(ClientId) -> + Pid = self(), + true = ets:insert(?WOLFF_CLIENTS_GLOBAL_TABLE, {ClientId, Pid}), + ok. + +%% @doc Delete the client from the registration table. +deregister_client(ClientId) -> + _ = ets:delete(?WOLFF_CLIENTS_GLOBAL_TABLE, ClientId), + ok. diff --git a/src/wolff_producer.erl b/src/wolff_producer.erl index 8b65503..372ba77 100644 --- a/src/wolff_producer.erl +++ b/src/wolff_producer.erl @@ -77,7 +77,7 @@ -define(ACK_CB(AckCb, Partition), {AckCb, Partition}). -type queue_item() :: {kpro:req(), replayq:ack_ref(), [{_CallId, _MsgCount}]}. --type state() :: #{ call_id_base := timer:time() +-type state() :: #{ call_id_base := pos_integer() , client_id := wolff:client_id() , config := config() , conn := undefined | _ diff --git a/src/wolff_producers.erl b/src/wolff_producers.erl index d3fe1dc..ed3c7f5 100644 --- a/src/wolff_producers.erl +++ b/src/wolff_producers.erl @@ -18,22 +18,22 @@ %% APIs -export([start_link/3]). -export([start_linked_producers/3, stop_linked/1]). --export([start_supervised/3, stop_supervised/1, stop_supervised/3]). --export([pick_producer/2, lookup_producer/2, cleanup_workers_table/1]). +-export([start_supervised/3, stop_supervised/1, stop_supervised/2]). +-export([pick_producer/2, lookup_producer/2, cleanup_workers_table/2]). %% gen_server callbacks -export([code_change/3, handle_call/3, handle_cast/2, handle_info/2, init/1, terminate/2]). %% tests --export([find_producer_by_partition/2]). +-export([find_producer_by_partition/3]). --export_type([producers/0, config/0]). +-export_type([producers/0, config/0, partitioner/0]). -include("wolff.hrl"). -include_lib("stdlib/include/ms_transform.hrl"). -opaque producers() :: - #{workers := #{partition() => pid()} | wolff:name(), + #{workers => #{partition() => pid()}, partitioner := partitioner(), client_id := wolff:client_id(), topic := kpro:topic() @@ -63,7 +63,7 @@ %% @doc Called by wolff_producers_sup to start wolff_producers process. start_link(ClientId, Topic, Config) -> - Name = get_name(Config), + Name = maps:get(name, Config, <<>>), case is_atom(Name) of true -> gen_server:start_link({local, Name}, ?MODULE, {ClientId, Topic, Config}, []); @@ -111,12 +111,11 @@ start_supervised(ClientId, Topic, ProducerCfg) -> ?not_initialized -> %% This means wolff_client failed to fetch metadata %% for this topic. - _ = wolff_producers_sup:ensure_absence(ClientId, get_name(ProducerCfg)), + _ = wolff_producers_sup:ensure_absence(ClientId, Topic), {error, failed_to_initialize_producers_in_time}; _ -> {ok, #{client_id => ClientId, topic => Topic, - workers => get_name(ProducerCfg), partitioner => maps:get(partitioner, ProducerCfg, random) }} end; @@ -126,13 +125,13 @@ start_supervised(ClientId, Topic, ProducerCfg) -> %% @doc Ensure workers and clean up meta data. -spec stop_supervised(producers()) -> ok. -stop_supervised(#{client_id := ClientId, workers := Name, topic := Topic}) -> - stop_supervised(ClientId, Topic, Name). +stop_supervised(#{client_id := ClientId, topic := Topic}) -> + stop_supervised(ClientId, Topic). %% @doc Ensure workers and clean up meta data. --spec stop_supervised(wolff:client_id(), topic(), wolff:name()) -> ok. -stop_supervised(ClientId, Topic, Name) -> - wolff_producers_sup:ensure_absence(ClientId, Name), +-spec stop_supervised(wolff:client_id(), topic()) -> ok. +stop_supervised(ClientId, Topic) -> + wolff_producers_sup:ensure_absence(ClientId, Topic), case wolff_client_sup:find_client(ClientId) of {ok, Pid} -> ok = wolff_client:delete_producers_metadata(Pid, Topic); @@ -141,53 +140,62 @@ stop_supervised(ClientId, Topic, Name) -> ok end. +%% @doc Lookup producer pid. +lookup_producer(#{workers := Workers}, Partition) -> + maps:get(Partition, Workers); +lookup_producer(#{client_id := ClientId, topic := Topic}, Partition) -> + {ok, Pid} = find_producer_by_partition(ClientId, Topic, Partition), + Pid. + %% @doc Retrieve the per-partition producer pid. -spec pick_producer(producers(), [wolff:msg()]) -> {partition(), pid()}. pick_producer(#{workers := Workers, - partitioner := Partitioner, + partitioner := Partitioner + }, Batch) -> + Count = maps:size(Workers), + Partition = pick_partition(Count, Partitioner, Batch), + LookupFn = fun(P) -> maps:get(P, Workers) end, + do_pick_producer(Partitioner, Partition, Count, LookupFn); +pick_producer(#{partitioner := Partitioner, client_id := ClientId, topic := Topic }, Batch) -> - Count = partition_cnt(ClientId, Topic), + Count = get_partition_cnt(ClientId, Topic), Partition = pick_partition(Count, Partitioner, Batch), - do_pick_producer(Partitioner, Partition, Count, Workers). + LookupFn = fun(P) -> + {ok, Pid} = find_producer_by_partition(ClientId, Topic, P), + Pid + end, + do_pick_producer(Partitioner, Partition, Count, LookupFn). -do_pick_producer(Partitioner, Partition0, Count, Workers) -> - Pid0 = lookup_producer(Workers, Partition0), +do_pick_producer(Partitioner, Partition0, Count, LookupFn) -> + Pid0 = LookupFn(Partition0), case is_pid(Pid0) andalso is_process_alive(Pid0) of true -> {Partition0, Pid0}; false when Partitioner =:= random -> - pick_next_alive(Workers, Partition0, Count); + pick_next_alive(LookupFn, Partition0, Count); false when Partitioner =:= roundrobin -> - R = {Partition1, _Pid1} = pick_next_alive(Workers, Partition0, Count), + R = {Partition1, _Pid1} = pick_next_alive(LookupFn, Partition0, Count), _ = put(wolff_roundrobin, (Partition1 + 1) rem Count), R; false -> erlang:error({producer_down, Pid0}) end. -pick_next_alive(Workers, Partition, Count) -> - pick_next_alive(Workers, (Partition + 1) rem Count, Count, _Tried = 1). +pick_next_alive(LookupFn, Partition, Count) -> + pick_next_alive(LookupFn, (Partition + 1) rem Count, Count, _Tried = 1). -pick_next_alive(_Workers, _Partition, Count, Count) -> +pick_next_alive(_LookupFn, _Partition, Count, Count) -> erlang:error(all_producers_down); -pick_next_alive(Workers, Partition, Count, Tried) -> - Pid = lookup_producer(Workers, Partition), +pick_next_alive(LookupFn, Partition, Count, Tried) -> + Pid = LookupFn(Partition), case is_alive(Pid) of true -> {Partition, Pid}; - false -> pick_next_alive(Workers, (Partition + 1) rem Count, Count, Tried + 1) + false -> pick_next_alive(LookupFn, (Partition + 1) rem Count, Count, Tried + 1) end. is_alive(Pid) -> is_pid(Pid) andalso is_process_alive(Pid). -lookup_producer(#{workers := Workers}, Partition) -> - lookup_producer(Workers, Partition); -lookup_producer(Workers, Partition) when is_map(Workers) -> - maps:get(Partition, Workers); -lookup_producer(Name, Partition) -> - {ok, Pid} = find_producer_by_partition(Name, Partition), - Pid. - pick_partition(_Count, Partition, _) when is_integer(Partition) -> Partition; pick_partition(Count, F, Batch) when is_function(F) -> @@ -263,8 +271,7 @@ handle_info({'EXIT', Pid, Reason}, client_pid := ClientPid, config := Config } = St) -> - Name = get_name(Config), - case find_producer_by_pid(Name, Pid) of + case find_partition_by_pid(Pid) of [] -> %% this should not happen, hence error level log_error("unknown_EXIT_message", #{pid => Pid, reason => Reason}); @@ -279,7 +286,7 @@ handle_info({'EXIT', Pid, Reason}, ok = start_producer_and_insert_pid(ClientId, Topic, Partition, Config); false -> %% no client, restart will be triggered when client connection is back. - insert_producers(get_name(Config), #{Partition => ?down(Reason)}) + insert_producers(ClientId, Topic, #{Partition => ?down(Reason)}) end end, {noreply, St}; @@ -300,8 +307,8 @@ handle_cast(Cast, St) -> code_change(_OldVsn, St, _Extra) -> {ok, St}. -terminate(_, #{config := Config}) -> - ok = cleanup_workers_table(get_name(Config)). +terminate(_, #{client_id := ClientId, topic := Topic}) -> + ok = cleanup_workers_table(ClientId, Topic). ensure_rediscover_client_timer(#{?rediscover_client_tref := false} = St) -> Tref = erlang:send_after(?rediscover_client_delay, self(), ?rediscover_client), @@ -331,7 +338,7 @@ maybe_init_producers(#{producers_status := ?not_initialized, } = St) -> case start_linked_producers(ClientId, Topic, Config) of {ok, #{workers := Workers}} -> - ok = insert_producers(get_name(Config), Workers), + ok = insert_producers(ClientId, Topic, Workers), St#{producers_status := ?initialized}; {error, Reason} -> log_error("failed_to_init_producers", #{topic => Topic, reason => Reason}), @@ -346,7 +353,7 @@ maybe_restart_producers(#{client_id := ClientId, topic := Topic, config := Config } = St) -> - Producers = find_producers_by_name(get_name(Config)), + Producers = find_producers_by_client_topic(ClientId, Topic), lists:foreach( fun({Partition, Pid}) -> case is_alive(Pid) of @@ -356,41 +363,39 @@ maybe_restart_producers(#{client_id := ClientId, end, Producers), St. --spec cleanup_workers_table(wolff:name()) -> ok. -cleanup_workers_table(Name) -> - Ms = ets:fun2ms(fun({{N, _Partition}, _Pid}) when N =:= Name -> true end), +-spec cleanup_workers_table(wolff:client_id(), wolff:topic()) -> ok. +cleanup_workers_table(ClientId, Topic) -> + Ms = ets:fun2ms(fun({{C, T, _}, _}) when C =:= ClientId andalso T =:= Topic -> true end), ets:select_delete(?WOLFF_PRODUCERS_GLOBAL_TABLE, Ms), ok. -find_producer_by_partition(Name, Partition) -> - case ets:lookup(?WOLFF_PRODUCERS_GLOBAL_TABLE, {Name, Partition}) of - [{_, Pid}] -> +find_producer_by_partition(ClientId, Topic, Partition) when is_integer(Partition) -> + case ets:lookup(?WOLFF_PRODUCERS_GLOBAL_TABLE, {ClientId, Topic, Partition}) of + [{{_, _, _}, Pid}] -> {ok, Pid}; [] -> {error, not_found} end. -find_producers_by_name(Name) -> - Ms = ets:fun2ms(fun({{N, Partition}, Pid}) when N =:= Name -> {Partition, Pid} end), +find_producers_by_client_topic(ClientId, Topic) -> + Ms = ets:fun2ms(fun({{C, T, P}, Pid}) when C =:= ClientId andalso T =:= Topic andalso is_integer(P)-> {P, Pid} end), ets:select(?WOLFF_PRODUCERS_GLOBAL_TABLE, Ms). -find_producer_by_pid(Name, Pid) -> - Ms = ets:fun2ms(fun({{N, Partition}, P}) when N =:= Name andalso P =:= Pid -> Partition end), +find_partition_by_pid(Pid) -> + Ms = ets:fun2ms(fun({{_, _, Partition}, P}) when P =:= Pid -> Partition end), ets:select(?WOLFF_PRODUCERS_GLOBAL_TABLE, Ms). -insert_producers(Name, Workers0) -> +insert_producers(ClientId, Topic, Workers0) -> Workers = lists:map(fun({Partition, Pid}) -> - {{Name, Partition}, Pid} + {{ClientId, Topic, Partition}, Pid} end, maps:to_list(Workers0)), true = ets:insert(?WOLFF_PRODUCERS_GLOBAL_TABLE, Workers), ok. -get_name(Config) -> maps:get(name, Config, ?MODULE). - start_producer_and_insert_pid(ClientId, Topic, Partition, Config) -> {ok, Pid} = wolff_producer:start_link(ClientId, Topic, Partition, ?conn_down(to_be_discovered), Config), - ok = insert_producers(get_name(Config), #{Partition => Pid}). + ok = insert_producers(ClientId, Topic, #{Partition => Pid}). %% Config is not used so far. start_partition_refresh_timer(Config) -> @@ -426,13 +431,17 @@ start_new_producers(#{client_id := ClientId, } = St, Connections0) -> NowCount = length(Connections0), %% process only the newly discovered connections - F = fun({Partition, _MaybeConnPid}) -> - {error, not_found} =:= find_producer_by_partition(get_name(Config), Partition) - end, - Connections = lists:filter(F, Connections0), + F = fun({Partition, _MaybeConnPid} = New, {OldCnt, NewAcc}) -> + case find_producer_by_partition(ClientId, Topic, Partition) of + {ok, _} -> + {OldCnt + 1, NewAcc}; + {error, not_found} -> + {OldCnt, [New | NewAcc]} + end + end, + {OldCount, Connections} = lists:foldl(F, {0, []}, Connections0), Workers = start_link_producers(ClientId, Topic, Connections, Config), - ok = insert_producers(get_name(Config), Workers), - OldCount = partition_cnt(ClientId, Topic), + ok = insert_producers(ClientId, Topic, Workers), case OldCount < NowCount of true -> log_info("started_producers_for_newly_discovered_partitions", @@ -443,14 +452,16 @@ start_new_producers(#{client_id := ClientId, end, St. -partition_cnt(ClientId, Topic) -> - persistent_term:get({?MODULE, ClientId, Topic}). +get_partition_cnt(ClientId, Topic) -> + [{_, Count}] = ets:lookup(?WOLFF_PRODUCERS_GLOBAL_TABLE, {ClientId, Topic, partition_count}), + Count. put_partition_cnt(ClientId, Topic, Count) -> - persistent_term:put({?MODULE, ClientId, Topic}, Count). + _ = ets:insert(?WOLFF_PRODUCERS_GLOBAL_TABLE, {{ClientId, Topic, partition_count}, Count}), + ok. ensure_timer_cancelled(Tref) when is_reference(Tref) -> - _ = erlang:cancel_timer(Tref), - ok; + _ = erlang:cancel_timer(Tref), + ok; ensure_timer_cancelled(_) -> - ok. + ok. diff --git a/src/wolff_producers_sup.erl b/src/wolff_producers_sup.erl index e043298..1614713 100644 --- a/src/wolff_producers_sup.erl +++ b/src/wolff_producers_sup.erl @@ -45,21 +45,24 @@ ensure_present(ClientId, Topic, Config) -> end. %% ensure client stopped and deleted under supervisor --spec ensure_absence(wolff:client_id(), wolff:name()) -> ok. -ensure_absence(_ClientId, Name) -> - Id = Name, +-spec ensure_absence(wolff:client_id(), wolff:topic()) -> ok. +ensure_absence(ClientId, Topic) -> + Id = worker_id(ClientId, Topic), case supervisor:terminate_child(?SUPERVISOR, Id) of ok -> - ok = wolff_producers:cleanup_workers_table(Name), + ok = wolff_producers:cleanup_workers_table(ClientId, Topic), ok = supervisor:delete_child(?SUPERVISOR, Id); {error, not_found} -> ok end. -child_spec(ClientId, Topic, #{name := Name} = Config) -> - #{id => Name, +child_spec(ClientId, Topic, Config) -> + #{id => worker_id(ClientId, Topic), start => {wolff_producers, start_link, [ClientId, Topic, Config]}, restart => transient, type => worker, modules => [wolff_producers] }. + +worker_id(ClientId, Topic) -> + {ClientId, Topic}. diff --git a/test/wolff_supervised_tests.erl b/test/wolff_supervised_tests.erl index 693439b..f3a339d 100644 --- a/test/wolff_supervised_tests.erl +++ b/test/wolff_supervised_tests.erl @@ -151,8 +151,7 @@ producer_restart_test() -> name => ?FUNCTION_NAME }, {ok, Producers} = wolff:ensure_supervised_producers(ClientId, Topic, ProducerCfg), - #{workers := Name} = Producers, - GetPid = fun() -> {ok, Pid} = wolff_producers:find_producer_by_partition(Name, Partition), Pid end, + GetPid = fun() -> {ok, Pid} = wolff_producers:find_producer_by_partition(ClientId, Topic, Partition), Pid end, Producer0 = GetPid(), Msg0 = #{key => ?KEY, value => <<"0">>}, {0, Offset0} = wolff_producer:send_sync(Producer0, [Msg0], 2000), @@ -197,7 +196,7 @@ stop_with_name_test() -> }, {ok, _} = wolff:ensure_supervised_producers(ClientId, Topic, ProducerCfg), %% cleanup - ok = wolff:stop_and_delete_supervised_producers(ClientId, Topic, Name), + ok = wolff:stop_and_delete_supervised_producers(ClientId, Topic), ?assertEqual([], supervisor:which_children(wolff_producers_sup)), ok = wolff:stop_and_delete_supervised_client(ClientId), ?assertEqual([], supervisor:which_children(wolff_client_sup)),