Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main-1.5' into sync-1.5.13
Browse files Browse the repository at this point in the history
  • Loading branch information
zmstone committed Feb 7, 2024
2 parents f5a4f22 + c1ebe49 commit f92a905
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 56 deletions.
5 changes: 5 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
* 1.10.2 (merge 1.5.13)
- Use long-lived metadata connection.
This is to avoid having to excessively re-establish connection when there are many concurrent connectivity checks.
- Fix connection error reason translation, the error log is now more compact when e.g. connect timeout happens.

* 1.10.1
- Add `max_partitions` producer config to limit the number of partition producers so the client side is also possible to have control over resource utilization.

Expand Down
2 changes: 1 addition & 1 deletion src/wolff.app.src
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{application, wolff,
[{description, "Kafka's publisher"},
{vsn, "1.10.1"},
{vsn, "1.10.2"},
{registered, []},
{applications,
[kernel,
Expand Down
2 changes: 1 addition & 1 deletion src/wolff.appup.src
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%% -*- mode: erlang; -*-
{"1.9.1",
{"1.10.2",
[
],
[
Expand Down
3 changes: 2 additions & 1 deletion src/wolff.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
-export_type([client_id/0, host/0, producers/0, msg/0, ack_fun/0, partitioner/0,
name/0, offset_reply/0, topic/0]).

-deprecated({check_if_topic_exists, 3}).
-deprecated({stop_and_delete_supervised_producers, 3}).

-type client_id() :: binary().
Expand Down Expand Up @@ -158,7 +159,7 @@ check_connectivity(ClientId) ->
check_connectivity(Hosts, ConnConfig) ->
wolff_client:check_connectivity(Hosts, ConnConfig).

%% @doc Check if the cluster is reachable and the topic is created.
%% @hidden Deprecated. Check if the cluster is reachable and the topic is created.
-spec check_if_topic_exists([host()], wolff_client:config(), topic()) ->
ok | {error, unknown_topic_or_partition | [#{host := binary(), reason := term()}] | any()}.
check_if_topic_exists(Hosts, ConnConfig, Topic) ->
Expand Down
157 changes: 104 additions & 53 deletions src/wolff_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@
get_id/1,
delete_producers_metadata/2]).
-export([check_connectivity/1, check_connectivity/2]).
-export([check_if_topic_exists/2, check_if_topic_exists/3, check_topic_exists_with_client_pid/2]).
-export([check_topic_exists_with_client_pid/2]).

%% gen_server callbacks
-export([code_change/3, handle_call/3, handle_cast/2, handle_info/2, init/1, terminate/2]).

-export_type([config/0]).

-deprecated({check_if_topic_exists, 3}).
-export([check_if_topic_exists/3]).

-type config() :: map().
-type topic() :: kpro:topic().
-type partition() :: kpro:partition().
Expand All @@ -45,6 +48,7 @@
config := config(),
conn_config := kpro:conn_config(),
conns := #{conn_id() => connection()},
metadata_conn := pid() | not_initialized | down,
metadata_ts := #{topic() => erlang:timestamp()},
%% only applicable when connection strategy is per_broker
%% because in this case the connections are keyed by host()
Expand Down Expand Up @@ -75,6 +79,7 @@ start_link(ClientId, Hosts, Config) ->
config => MyCfg,
conn_config => ConnCfg,
conns => #{},
metadata_conn => not_initialized,
metadata_ts => #{},
leaders => #{}
},
Expand All @@ -99,39 +104,29 @@ get_leader_connections(Client, Topic) ->
get_leader_connections(Client, Topic, MaxPartitions) ->
safe_call(Client, {get_leader_connections, Topic, MaxPartitions}).

%% @doc Check if client has a metadata connection alive.
%% Trigger a reconnect if the connection is down for whatever reason.
-spec check_connectivity(pid()) -> ok | {error, any()}.
check_connectivity(Pid) ->
safe_call(Pid, check_connectivity).

%% @doc Connect to any host in the list and immediately disconnect.
-spec check_connectivity([host()], kpro:conn_config()) -> ok | {error, any()}.
check_connectivity(Hosts, ConnConfig) when Hosts =/= [] ->
case kpro:connect_any(Hosts, ConnConfig) of
{ok, Conn} ->
close_connection(Conn),
ok;
{error, Reasons} ->
{error, tr_reasons(Reasons)}
end.
case kpro:connect_any(Hosts, ConnConfig) of
{ok, Conn} ->
ok = close_connection(Conn);
{error, Reasons} ->
{error, tr_reasons(Reasons)}
end.

%% @doc Check if a topic exists by creating a temp connecton to any of the seed hosts.
%% @hidden Deprecated. Check if a topic exists by creating a temp connecton to any of the seed hosts.
-spec check_if_topic_exists([host()], kpro:conn_config(), topic()) ->
ok | {error, unknown_topic_or_partition | [#{host := binary(), reason := term()}] | any()}.
check_if_topic_exists(Hosts, ConnConfig, Topic) when Hosts =/= [] ->
case get_metadata(Hosts, ConnConfig, Topic) of
{ok, _} ->
ok;
{error, Errors} ->
{error, Errors}
end.

-spec check_if_topic_exists(pid(), topic()) ->
ok | {error, unknown_topic_or_partition | [#{host := binary(), reason := term()}] | any()}.
check_if_topic_exists(Pid, Topic) when is_pid(Pid) ->
{ok, Vsns} = kpro:get_api_versions(Pid),
{_, Vsn} = maps:get(metadata, Vsns),
case do_get_metadata(Vsn, Pid, Topic) of
{ok, _} ->
ok;
{ok, {Pid, _}} ->
ok = close_connection(Pid);
{error, Errors} ->
{error, Errors}
end.
Expand Down Expand Up @@ -164,8 +159,14 @@ handle_call(Call, From, #{connect := _Fun} = St) ->
handle_call(Call, From, upgrade(St));
handle_call(get_id, _From, #{client_id := Id} = St) ->
{reply, Id, St};
handle_call({check_if_topic_exists, Topic}, _From, #{seed_hosts := Hosts, conn_config := ConnConfig} = St) ->
{reply, check_if_topic_exists(Hosts, ConnConfig, Topic), St};
handle_call({check_if_topic_exists, Topic}, _From, #{conn_config := ConnConfig} = St0) ->
case ensure_metadata_conn(St0) of
{ok, #{metadata_conn := ConnPid} = St} ->
Timeout = maps:get(request_timeout, ConnConfig, ?DEFAULT_METADATA_TIMEOUT),
{reply, check_if_topic_exists2(ConnPid, Topic, Timeout), St};
{error, Reason} ->
{reply, {error, Reason}, St0}
end;
handle_call({get_leader_connections, Topic, MaxPartitions}, _From, St0) ->
case ensure_leader_connections(St0, Topic, MaxPartitions) of
{ok, St} ->
Expand All @@ -178,9 +179,13 @@ handle_call(stop, From, #{conns := Conns} = St) ->
ok = close_connections(Conns),
gen_server:reply(From, ok),
{stop, normal, St#{conns := #{}}};
handle_call(check_connectivity, _From, #{seed_hosts := Hosts, conn_config := ConnConfig} = St) ->
Res = check_connectivity(Hosts, ConnConfig),
{reply, Res, St};
handle_call(check_connectivity, _From, St0) ->
case ensure_metadata_conn(St0) of
{ok, St} ->
{reply, ok, St};
{error, Reason} ->
{reply, {error, Reason}, St0}
end;
handle_call(Call, _From, St) ->
{reply, {error, {unknown_call, Call}}, St}.

Expand Down Expand Up @@ -215,11 +220,34 @@ code_change(_OldVsn, St, _Extra) ->

terminate(_, #{client_id := ClientID, conns := Conns} = St) ->
ok = wolff_client_sup:deregister_client(ClientID),
MetadataConn = maps:get(metadata_conn, St, none),
ok = close_connections(Conns),
ok = close_connection(MetadataConn),
{ok, St#{conns := #{}}}.

%% == internals ======================================================

ensure_metadata_conn(#{seed_hosts := Hosts, conn_config := ConnConfig, metadata_conn := Pid} = St) ->
case is_alive(Pid) of
true ->
{ok, St};
false ->
case kpro:connect_any(Hosts, ConnConfig) of
{ok, NewPid} ->
{ok, St#{metadata_conn => NewPid}};
{error, Reasons} ->
{error, tr_reasons(Reasons)}
end
end.

check_if_topic_exists2(Pid, Topic, Timeout) when is_pid(Pid) ->
case do_get_metadata(Pid, Topic, Timeout) of
{ok, _} ->
ok;
{error, Reason} ->
{error, Reason}
end.

close_connections(Conns) ->
lists:foreach(fun({_, Pid}) -> close_connection(Pid) end, maps:to_list(Conns)).

Expand Down Expand Up @@ -272,25 +300,40 @@ is_metadata_fresh(#{metadata_ts := Topics, config := Config}, Topic) ->
ensure_leader_connections(St, Topic, MaxPartitions) ->
case is_metadata_fresh(St, Topic) of
true -> {ok, St};
false -> do_ensure_leader_connections(St, Topic, MaxPartitions)
false -> ensure_leader_connections2(St, Topic, MaxPartitions)
end.

do_ensure_leader_connections(#{conn_config := ConnConfig,
seed_hosts := SeedHosts,
metadata_ts := MetadataTs
} = St0, Topic, MaxPartitions) ->
case get_metadata(SeedHosts, ConnConfig, Topic) of
{ok, {Brokers, PartitionMetaList0}} ->
PartitionMetaList = limit_partitions_count(PartitionMetaList0, MaxPartitions),
St = lists:foldl(fun(PartitionMeta, StIn) ->
ensure_leader_connection(StIn, Brokers, Topic, PartitionMeta)
end, St0, PartitionMetaList),
{ok, St#{metadata_ts := MetadataTs#{Topic => erlang:timestamp()}}};
ensure_leader_connections2(#{metadata_conn := Pid, conn_config := ConnConfig} = St, Topic, MaxPartitions) when is_pid(Pid) ->
Timeout = maps:get(request_timeout, ConnConfig, ?DEFAULT_METADATA_TIMEOUT),
case do_get_metadata(Pid, Topic, Timeout) of
{ok, {Brokers, PartitionMetaList}} ->
ensure_leader_connections3(St, Topic, Pid, Brokers, PartitionMetaList, MaxPartitions);
{error, _Reason} ->
%% ensure metadata connection is down, try to establish a new one in the next clause,
%% reason is discarded here, because the next clause will log error if the immediate retry fails
exit(Pid, kill),
ensure_leader_connections2(St#{metadata_conn => down}, Topic, MaxPartitions)
end;
ensure_leader_connections2(#{conn_config := ConnConfig,
seed_hosts := SeedHosts} = St, Topic, MaxPartitions) ->
case get_metadata(SeedHosts, ConnConfig, Topic, []) of
{ok, {ConnPid, {Brokers, PartitionMetaList}}} ->
ensure_leader_connections3(St, Topic, ConnPid, Brokers, PartitionMetaList, MaxPartitions);
{error, Errors} ->
log_warn(failed_to_fetch_metadata, #{topic => Topic, errors => Errors}),
{error, failed_to_fetch_metadata}
end.

ensure_leader_connections3(#{metadata_ts := MetadataTs} = St0, Topic,
ConnPid, Brokers, PartitionMetaList0, MaxPartitions) ->
PartitionMetaList = limit_partitions_count(PartitionMetaList0, MaxPartitions),
St = lists:foldl(fun(PartitionMeta, StIn) ->
ensure_leader_connection(StIn, Brokers, Topic, PartitionMeta)
end, St0, PartitionMetaList),
{ok, St#{metadata_ts := MetadataTs#{Topic => erlang:timestamp()},
metadata_conn => ConnPid
}}.

limit_partitions_count(PartitionMetaList, Max) when is_integer(Max) andalso Max < length(PartitionMetaList) ->
lists:sublist(PartitionMetaList, Max);
limit_partitions_count(PartitionMetaList, _) ->
Expand Down Expand Up @@ -409,27 +452,33 @@ get_metadata(Hosts, ConnectFun, Topic) ->
get_metadata(Hosts, ConnectFun, Topic, []).

get_metadata([], _ConnectFun, _Topic, Errors) ->
%% failed to connect to ALL seed hosts, crash instead of return {error, Reason}
{error, Errors};
get_metadata([Host | Rest], ConnConfig, Topic, Errors) ->
case do_connect(Host, ConnConfig) of
{ok, Pid} ->
try
{ok, Vsns} = kpro:get_api_versions(Pid),
{_, Vsn} = maps:get(metadata, Vsns),
Timeout = maps:get(request_timeout, ConnConfig, ?DEFAULT_METADATA_TIMEOUT),
do_get_metadata(Vsn, Pid, Topic, Timeout)
after
_ = close_connection(Pid)
Timeout = maps:get(request_timeout, ConnConfig, ?DEFAULT_METADATA_TIMEOUT),
case do_get_metadata(Pid, Topic, Timeout) of
{ok, Result} ->
{ok, {Pid, Result}};
{error, Reason} ->
%% failed to fetch metadata, make sure this connection is closed
ok = close_connection(Pid),
{error, Reason}
end;
{error, Reason} ->
get_metadata(Rest, ConnConfig, Topic, [Reason | Errors])
end.

do_get_metadata(Vsn, Connection, Topic) ->
do_get_metadata(Vsn, Connection, Topic, ?DEFAULT_METADATA_TIMEOUT).
do_get_metadata(Connection, Topic, Timeout) ->
case kpro:get_api_versions(Connection) of
{ok, Vsns} ->
{_, Vsn} = maps:get(metadata, Vsns),
do_get_metadata2(Vsn, Connection, Topic, Timeout);
{error, Reason} ->
{error, Reason}
end.

do_get_metadata(Vsn, Connection, Topic, Timeout) ->
do_get_metadata2(Vsn, Connection, Topic, Timeout) ->
Req = kpro_req_lib:metadata(Vsn, [Topic], _IsAutoCreateAllowed = false),
case kpro:request_sync(Connection, Req, Timeout) of
{ok, #kpro_rsp{msg = Meta}} ->
Expand All @@ -439,8 +488,10 @@ do_get_metadata(Vsn, Connection, Topic, Timeout) ->
ErrorCode = kpro:find(error_code, TopicMeta),
Partitions = kpro:find(partitions, TopicMeta),
case ErrorCode =:= ?no_error of
true -> {ok, {Brokers, Partitions}};
false -> {error, ErrorCode} %% no such topic ?
true ->
{ok, {Brokers, Partitions}};
false ->
{error, ErrorCode} %% no such topic ?
end;
{error, Reason} ->
{error, Reason}
Expand Down
36 changes: 36 additions & 0 deletions test/wolff_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,42 @@ ack_cb(Partition, Offset, Self, Ref) ->
Self ! {ack, Ref, Partition, Offset},
ok.

metadata_connection_restart_test() ->
ClientCfg = client_config(),
ClientId = <<"client-1">>,
{ok, Client} = start_client(ClientId, ?HOSTS, ClientCfg),
GetMetadataConn = fun() ->
ok = wolff:check_connectivity(ClientId),
ok = wolff_client:check_connectivity(Client),
State = sys:get_state(Client),
Pid = maps:get(metadata_conn, State),
?assert(is_process_alive(Pid)),
Pid
end,
Pid1 = GetMetadataConn(),
exit(Pid1, kill),
Pid2 = GetMetadataConn(),
ok = stop_client(Client),
?assertNot(is_process_alive(Pid2)).

metadata_connection_restart2_test() ->
ClientCfg0 = client_config(),
ClientCfg = ClientCfg0#{min_metadata_refresh_interval => 0},
ClientId = <<"client-1">>,
{ok, Client} = start_client(ClientId, ?HOSTS, ClientCfg),
GetMetadataConn = fun() ->
?assertMatch({ok, _}, wolff_client:get_leader_connections(Client, <<"test-topic">>)),
State = sys:get_state(Client),
Pid = maps:get(metadata_conn, State),
?assert(is_process_alive(Pid)),
Pid
end,
Pid1 = GetMetadataConn(),
exit(Pid1, kill),
Pid2 = GetMetadataConn(),
ok = stop_client(Client),
?assertNot(is_process_alive(Pid2)).

send_test() ->
CntrEventsTable = ets:new(cntr_events, [public]),
install_event_logging(?FUNCTION_NAME, CntrEventsTable, false),
Expand Down

0 comments on commit f92a905

Please sign in to comment.