Skip to content

Commit

Permalink
don't leaks clients on errors
Browse files Browse the repository at this point in the history
When an error happened the notification wasn't send to the pool when the
default pool was used by default and not set in the options. This patch also
makes sure that all error are triggerieng a cancel event. Also make sure that
all requests are monitoreThis patch also makes sure that all error are
triggerieng a cancel event. Also make sure that all requests are monitored.d

Fix benoitc/couchbeam#120
  • Loading branch information
benoitc committed Dec 8, 2014
1 parent 354096d commit b659de8
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 15 deletions.
4 changes: 2 additions & 2 deletions src/hackney_client/hackney.erl
Original file line number Diff line number Diff line change
Expand Up @@ -364,10 +364,10 @@ send_request(Client0, {Method, Path, Headers, Body}=Req) ->
Reply = maybe_redirect(Resp, Req, 0),
reply_response(Reply, Client);
_ ->
{error, invalide_state}
reply_response({error, invalide_state}, Client)
end;
Error ->
Error
reply_response(Error, Client0)
end.

%% @doc send the request body until eob. It's issued after sending a request using
Expand Down
10 changes: 5 additions & 5 deletions src/hackney_client/hackney_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ close_request(#client{}=Client) ->
state=Status,
request_ref=Ref} = Client,


%% remove the request
erase(Ref),
ets:delete(?MODULE, Ref),
Expand Down Expand Up @@ -259,13 +258,14 @@ handle_error(#client{request_ref=Ref, dynamic=true}) ->

handle_error(#client{request_ref=Ref, transport=Transport,
socket=Socket}=Client) ->

case get_state(Ref) of
req_not_found -> ok;
_ ->
catch Transport:controlling_process(Socket, self()),
catch Transport:close(Socket),
NClient = Client#client{socket=nil, state=closed},
put(Ref, NClient),
update_state(NClient),
ok
end.

Expand Down Expand Up @@ -304,7 +304,7 @@ init(_) ->

handle_call({new_request, Pid, Ref, Client}, _From, #mstate{pids=Pids}=State) ->
%% get pool name
Pool = proplists:get_value(pool, Client#client.options),
Pool = proplists:get_value(pool, Client#client.options, default),
%% set requInfo
StartTime = os:timestamp(),
ReqInfo = #request_info{pool=Pool,
Expand Down Expand Up @@ -414,12 +414,12 @@ handle_call({cancel_request, Ref}, _From, State) ->
Pids2 = dict:erase(Owner, State#mstate.pids),
%% notify the pool that the request have been canceled
PoolHandler:notify(Pool, {'DOWN', Ref, request, Owner, cancel}),

%% update metrics
finish_request(Info, State),

{reply, ok, State#mstate{pids=Pids2}};
[{Ref, {Owner, Stream, #request_info{pool=Pool}=Info}}] when is_pid(Stream) ->
[{Ref, {Owner, Stream, #request_info{pool=Pool}=Info}}]
when is_pid(Stream) ->
unlink(Owner),
unlink(Stream),
Pids2 = dict:erase(Stream, dict:erase(Owner, State#mstate.pids)),
Expand Down
16 changes: 8 additions & 8 deletions src/hackney_connect/hackney_pool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,7 @@ stop_pool(Name) ->
notify(Pool, Msg) ->
case find_pool(Pool) of
undefined -> ok;
Pid ->
Pid ! Msg
Pid -> Pid ! Msg
end.


Expand Down Expand Up @@ -386,10 +385,10 @@ remove_socket(Socket, #state{connections=Conns, sockets=Sockets}=State) ->


store_socket({_Host, _Port, Transport} = Dest, Socket,
#state{timeout=Timeout, connections=Conns,
sockets=Sockets}=State) ->
#state{timeout=Timeout, connections=Conns,
sockets=Sockets}=State) ->
Timer = erlang:send_after(Timeout, self(), {timeout, Socket}),
ok = Transport:setopts(Socket, [{active, once}]),
Transport:setopts(Socket, [{active, once}]),
ConnSockets = case dict:find(Dest, Conns) of
{ok, OldSockets} ->
[Socket | OldSockets];
Expand Down Expand Up @@ -449,9 +448,9 @@ queue_out({_Host, _Port, _Transport} = Dest, Queues) ->
deliver_socket(Socket, {_, _, Transport} = Dest, State) ->
Mod = State#state.mod_metrics,


case queue_out(Dest, State#state.queues) of
empty ->

store_socket(Dest, Socket, State);
{ok, {{PidWaiter, _} = FromWaiter, Ref}, Queues2} ->
NbWaiters = State#state.nb_waiters - 1,
Expand All @@ -472,8 +471,9 @@ deliver_socket(Socket, {_, _, Transport} = Dest, State) ->
_Error -> % Something wrong with the socket; just remove it
catch Transport:close(Socket),
gen_server:reply(FromWaiter, {error, no_socket, self()}),
State#state{queues = Queues2,
nb_waiters = NbWaiters}
monitor_client(Dest, Ref,
State#state{queues = Queues2,
nb_waiters = NbWaiters})
end
end.

Expand Down

0 comments on commit b659de8

Please sign in to comment.