diff --git a/src/brod_group_coordinator.erl b/src/brod_group_coordinator.erl index 50884d1a..cd7d7e86 100644 --- a/src/brod_group_coordinator.erl +++ b/src/brod_group_coordinator.erl @@ -75,6 +75,13 @@ {lo_cmd_stabilize, AttemptCount, Reason}). -define(INITIAL_MEMBER_ID, <<>>). +-define(CALL_MEMBER(MemberPid, EXPR), + try + EXPR + catch + exit:{noproc, {gen_server, call, [MemberPid | _]}} -> + exit({shutdown, member_down}) + end). -type config() :: brod:group_config(). -type ts() :: erlang:timestamp(). @@ -499,7 +506,7 @@ stabilize(#state{ rejoin_delay_seconds = RejoinDelaySeconds log(State0, info, "re-joining group, reason:~p", [Reason]), %% 1. unsubscribe all currently assigned partitions - ok = MemberModule:assignments_revoked(MemberPid), + ?CALL_MEMBER(MemberPid, MemberModule:assignments_revoked(MemberPid)), %% 2. some brod_group_member implementations may wait for messages %% to finish processing when assignments_revoked is called. @@ -674,8 +681,8 @@ sync_group(#state{ groupId = GroupId %% get my partition assignments Assignment = kpro:find(assignment, RspBody), TopicAssignments = get_topic_assignments(State, Assignment), - ok = MemberModule:assignments_received(MemberPid, MemberId, - GenerationId, TopicAssignments), + ?CALL_MEMBER(MemberPid, + MemberModule:assignments_received(MemberPid, MemberId, GenerationId, TopicAssignments)), NewState = State#state{is_in_group = true}, log(NewState, info, "assignments received:~s", [format_assignments(TopicAssignments)]), @@ -834,7 +841,7 @@ assign_partitions(State) when ?IS_LEADER(State) -> Assignments = case Strategy =:= callback_implemented of true -> - MemberModule:assign_partitions(MemberPid, Members, AllPartitions); + ?CALL_MEMBER(MemberPid, MemberModule:assign_partitions(MemberPid, Members, AllPartitions)); false -> do_assign_partitions(Strategy, Members, AllPartitions) end,