Skip to content

Commit

Permalink
Merge pull request #19 from GlenWalker/master
Browse files Browse the repository at this point in the history
Improved handling of network or server failure
  • Loading branch information
zolazhou authored Jul 30, 2018
2 parents 902b8d6 + cfa9378 commit 3743c09
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 22 deletions.
2 changes: 1 addition & 1 deletion package.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule Erlzk.MixFile do

def project do
[app: :erlzk,
version: "0.6.4",
version: "0.6.5",
description: "A Pure Erlang ZooKeeper Client (no C dependency)",
package: package]
end
Expand Down
2 changes: 1 addition & 1 deletion src/erlzk.app.src
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{application, erlzk, [
{description, "A Pure Erlang ZooKeeper Client (no C dependency)"},
{vsn, "0.6.4"},
{vsn, "0.6.5"},
{registered, [erlzk_sup,erlzk_conn_sup]},
{applications, [
kernel,
Expand Down
48 changes: 28 additions & 20 deletions src/erlzk_conn.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
get_children/3, get_children/4, sync/2, get_children2/3, get_children2/4,
multi/2, create2/5, add_auth/3, no_heartbeat/1, kill_session/1]).

-define(ZK_SOCKET_OPTS, [binary, {active, true}, {packet, 4}, {reuseaddr, true}, {linger, {true, 5}}]).
-define(ZK_SOCKET_OPTS, [binary, {active, true}, {packet, 4}, {reuseaddr, true}, {linger, {false, 0}}]).
-define(ZK_SOCKET_OPTS_CLOSE, [{linger, {true, 1}}]).
-ifdef(zk_connect_timeout).
-define(ZK_CONNECT_TIMEOUT, ?zk_connect_timeout).
-else.
Expand Down Expand Up @@ -83,55 +84,55 @@ stop(Pid) ->
gen_server:call(Pid, stop).

create(Pid, Path, Data, Acl, CreateMode) ->
gen_server:call(Pid, {create, {Path, Data, Acl, CreateMode}}).
op_call(Pid, {create, {Path, Data, Acl, CreateMode}}).

delete(Pid, Path, Version) ->
gen_server:call(Pid, {delete, {Path, Version}}).
op_call(Pid, {delete, {Path, Version}}).

exists(Pid, Path, Watch) ->
gen_server:call(Pid, {exists, {Path, Watch}}).
op_call(Pid, {exists, {Path, Watch}}).

exists(Pid, Path, Watch, Watcher) ->
gen_server:call(Pid, {exists, {Path, Watch}, Watcher}).
op_call(Pid, {exists, {Path, Watch}, Watcher}).

get_data(Pid, Path, Watch) ->
gen_server:call(Pid, {get_data, {Path, Watch}}).
op_call(Pid, {get_data, {Path, Watch}}).

get_data(Pid, Path, Watch, Watcher) ->
gen_server:call(Pid, {get_data, {Path, Watch}, Watcher}).
op_call(Pid, {get_data, {Path, Watch}, Watcher}).

set_data(Pid, Path, Data, Version) ->
gen_server:call(Pid, {set_data, {Path, Data, Version}}).
op_call(Pid, {set_data, {Path, Data, Version}}).

get_acl(Pid, Path) ->
gen_server:call(Pid, {get_acl, {Path}}).
op_call(Pid, {get_acl, {Path}}).

set_acl(Pid, Path, Acl, Version) ->
gen_server:call(Pid, {set_acl, {Path, Acl, Version}}).
op_call(Pid, {set_acl, {Path, Acl, Version}}).

get_children(Pid, Path, Watch) ->
gen_server:call(Pid, {get_children, {Path, Watch}}).
op_call(Pid, {get_children, {Path, Watch}}).

get_children(Pid, Path, Watch, Watcher) ->
gen_server:call(Pid, {get_children, {Path, Watch}, Watcher}).
op_call(Pid, {get_children, {Path, Watch}, Watcher}).

sync(Pid, Path) ->
gen_server:call(Pid, {sync, {Path}}).
op_call(Pid, {sync, {Path}}).

get_children2(Pid, Path, Watch) ->
gen_server:call(Pid, {get_children2, {Path, Watch}}).
op_call(Pid, {get_children2, {Path, Watch}}).

get_children2(Pid, Path, Watch, Watcher) ->
gen_server:call(Pid, {get_children2, {Path, Watch}, Watcher}).
op_call(Pid, {get_children2, {Path, Watch}, Watcher}).

multi(Pid, Ops) ->
gen_server:call(Pid, {multi, Ops}).
op_call(Pid, {multi, Ops}).

create2(Pid, Path, Data, Acl, CreateMode) ->
gen_server:call(Pid, {create2, {Path, Data, Acl, CreateMode}}).
op_call(Pid, {create2, {Path, Data, Acl, CreateMode}}).

add_auth(Pid, Scheme, Auth) ->
gen_server:call(Pid, {add_auth, {Scheme, Auth}}).
op_call(Pid, {add_auth, {Scheme, Auth}}).

no_heartbeat(Pid) ->
gen_server:cast(Pid, no_heartbeat).
Expand Down Expand Up @@ -193,6 +194,8 @@ init([ServerList, Timeout, Options]) ->
{stop, Reason}
end.

handle_call(get_timeout, _From, State=#state{timeout=Timeout, ping_interval=PingIntv}) ->
{reply, {ok, Timeout}, State, PingIntv};
handle_call(stop, _From, State) ->
{stop, normal, ok, State};
handle_call(_, _From, State=#state{socket=undefined, ping_interval=PingIntv}) ->
Expand Down Expand Up @@ -336,12 +339,12 @@ handle_info(_Info, State=#state{ping_interval=PingIntv}) ->
terminate(normal, #state{socket=Socket, heartbeat_watcher=HeartbeatWatcher}) ->
stop_heartbeat(HeartbeatWatcher),
close_connection(Socket),
error_logger:warning_msg("Server is closed~n"),
error_logger:info_msg("Server is closed~n"),
ok;
terminate(shutdown, #state{socket=Socket, heartbeat_watcher=HeartbeatWatcher}) ->
stop_heartbeat(HeartbeatWatcher),
close_connection(Socket),
error_logger:warning_msg("Server is shutdown~n"),
error_logger:info_msg("Server is shutdown~n"),
ok;
terminate(Reason, #state{socket=Socket, heartbeat_watcher=HeartbeatWatcher}) ->
error_logger:error_msg("Connection terminating with reason: ~p~n", [Reason]),
Expand Down Expand Up @@ -484,6 +487,10 @@ reconnect_after_session_expired(State=#state{servers=ServerList, auth_data=AuthD
{noreply, State}
end.

op_call(Pid, Message) ->
{ok, Timeout} = gen_server:call(Pid, get_timeout),
gen_server:call(Pid, Message, Timeout).

add_init_auths([], _State) ->
ok;
add_init_auths([AuthData|Left], State) ->
Expand Down Expand Up @@ -584,6 +591,7 @@ close_connection(Socket) ->
close_connection(undefined, _) ->
ok;
close_connection(Socket, true) ->
inet:setopts(Socket, ?ZK_SOCKET_OPTS_CLOSE),
gen_tcp:send(Socket, <<1:32, -11:32>>),
gen_tcp:close(Socket);
close_connection(Socket, false) ->
Expand Down

0 comments on commit 3743c09

Please sign in to comment.