Skip to content

Commit

Permalink
Merge pull request #16 from GlenWalker/master
Browse files Browse the repository at this point in the history
Improve handling of server name resolving errors (fixes #15)
  • Loading branch information
zolazhou authored Aug 14, 2017
2 parents a1432aa + 30e2810 commit 100084c
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 28 deletions.
2 changes: 1 addition & 1 deletion package.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule Erlzk.MixFile do

def project do
[app: :erlzk,
version: "0.6.3",
version: "0.6.4",
description: "A Pure Erlang ZooKeeper Client (no C dependency)",
package: package]
end
Expand Down
2 changes: 1 addition & 1 deletion src/erlzk.app.src
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{application, erlzk, [
{description, "A Pure Erlang ZooKeeper Client (no C dependency)"},
{vsn, "0.6.3"},
{vsn, "0.6.4"},
{registered, [erlzk_sup,erlzk_conn_sup]},
{applications, [
kernel,
Expand Down
66 changes: 40 additions & 26 deletions src/erlzk_conn.erl
Original file line number Diff line number Diff line change
Expand Up @@ -162,29 +162,35 @@ init([ServerList, Timeout, Options]) ->
CrValue -> get_chroot_path(CrValue)
end,
process_flag(trap_exit, true),
ResolvedServerList = resolve_servers(ServerList),
DedupedServerList = lists:usort(ResolvedServerList),
ShuffledServerList = shuffle(DedupedServerList),
ProtocolVersion = 0,
Zxid = 0,
SessionId = 0,
Password = <<0:128>>,
case connect(ShuffledServerList, ProtocolVersion, Zxid, Timeout, SessionId, Password) of
{ok, State=#state{host=Host, port=Port, ping_interval=PingIntv,
heartbeat_watcher=HeartbeatWatcher}} ->
NewState = State#state{auth_data=AuthData, chroot=Chroot,
reset_watch=ResetWatch, reconnect_expired=ReconnectExpired,
monitor=Monitor, heartbeat_watcher=HeartbeatWatcher},
add_init_auths(AuthData, NewState),
notify_monitor_server_state(Monitor, connected, Host, Port),
{ok, NewState, PingIntv};
case resolve_servers(ServerList) of
{ok, []} ->
{stop, no_servers};
{ok, ResolvedServerList} ->
DedupedServerList = lists:usort(ResolvedServerList),
ShuffledServerList = shuffle(DedupedServerList),

ProtocolVersion = 0,
Zxid = 0,
SessionId = 0,
Password = <<0:128>>,
case connect(ShuffledServerList, ProtocolVersion, Zxid, Timeout, SessionId, Password) of
{ok, State=#state{host=Host, port=Port, ping_interval=PingIntv}} ->
NewState = State#state{auth_data=AuthData, chroot=Chroot,
reset_watch=ResetWatch, reconnect_expired=ReconnectExpired,
monitor=Monitor},
add_init_auths(AuthData, NewState),
notify_monitor_server_state(Monitor, connected, Host, Port),
{ok, NewState, PingIntv};
{error, Reason} ->
error_logger:error_msg("Connect failed: ~p, will try again after ~pms~n", [Reason, ?ZK_RECONNECT_INTERVAL]),
erlang:send_after(?ZK_RECONNECT_INTERVAL, self(), reconnect),
State = #state{servers=ShuffledServerList, auth_data=AuthData, chroot=Chroot,
proto_ver=ProtocolVersion, timeout=Timeout, session_id=SessionId, password=Password,
reset_watch=ResetWatch, reconnect_expired=ReconnectExpired, monitor=Monitor},
{ok, State}
end;
{error, Reason} ->
error_logger:error_msg("Connect failed: ~p, will try again after ~pms~n", [Reason, ?ZK_RECONNECT_INTERVAL]),
erlang:send_after(?ZK_RECONNECT_INTERVAL, self(), reconnect),
State = #state{servers=ShuffledServerList, auth_data=AuthData, chroot=Chroot,
proto_ver=ProtocolVersion, timeout=Timeout, session_id=SessionId, password=Password,
reset_watch=ResetWatch, reconnect_expired=ReconnectExpired, monitor=Monitor},
{ok, State}
{stop, Reason}
end.

handle_call(stop, _From, State) ->
Expand Down Expand Up @@ -354,17 +360,25 @@ resolve_servers(ServerList) ->
resolve_servers(ServerList, []).

resolve_servers([], ResolvedServerAcc) ->
lists:flatten(ResolvedServerAcc);
{ok, lists:flatten(ResolvedServerAcc)};
resolve_servers([Server|Left], ResolvedServerAcc) ->
resolve_servers(Left, [resolve_server(Server) | ResolvedServerAcc]).
case resolve_server(Server) of
{ok, Servers} ->
resolve_servers(Left, [Servers | ResolvedServerAcc]);
{error, Reason} ->
{error, Reason}
end.

resolve_server({Host, Port}) ->
case inet:gethostbyname(Host) of
{ok, #hostent{h_addr_list=Addresses}} ->
[{Address, Port} || Address <- Addresses];
{ok, [{Address, Port} || Address <- Addresses]};
{error, nxdomain} ->
error_logger:error_msg("Resolving ~p:~p encountered an error: nxdomain~n", [Host, Port]),
{ok, []};
{error, Reason} ->
error_logger:error_msg("Resolving ~p:~p encountered an error: ~p~n", [Host, Port, Reason]),
[]
{error, Reason}
end.

shuffle(L) ->
Expand Down

0 comments on commit 100084c

Please sign in to comment.