Skip to content

Commit

Permalink
Merge pull request #611 from fmcgeough/exit_group_coordinator_if_memb…
Browse files Browse the repository at this point in the history
…er_pid_gone

Exit group coordinator if member pid gone
  • Loading branch information
zmstone authored Dec 12, 2024
2 parents f3deba6 + 18c0dab commit e18151c
Showing 1 changed file with 11 additions and 4 deletions.
15 changes: 11 additions & 4 deletions src/brod_group_coordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@
{lo_cmd_stabilize, AttemptCount, Reason}).

-define(INITIAL_MEMBER_ID, <<>>).
-define(CALL_MEMBER(MemberPid, EXPR),
try
EXPR
catch
exit:{noproc, {gen_server, call, [MemberPid | _]}} ->
exit({shutdown, member_down})
end).

-type config() :: brod:group_config().
-type ts() :: erlang:timestamp().
Expand Down Expand Up @@ -499,7 +506,7 @@ stabilize(#state{ rejoin_delay_seconds = RejoinDelaySeconds
log(State0, info, "re-joining group, reason:~p", [Reason]),

%% 1. unsubscribe all currently assigned partitions
ok = MemberModule:assignments_revoked(MemberPid),
?CALL_MEMBER(MemberPid, MemberModule:assignments_revoked(MemberPid)),

%% 2. some brod_group_member implementations may wait for messages
%% to finish processing when assignments_revoked is called.
Expand Down Expand Up @@ -674,8 +681,8 @@ sync_group(#state{ groupId = GroupId
%% get my partition assignments
Assignment = kpro:find(assignment, RspBody),
TopicAssignments = get_topic_assignments(State, Assignment),
ok = MemberModule:assignments_received(MemberPid, MemberId,
GenerationId, TopicAssignments),
?CALL_MEMBER(MemberPid,
MemberModule:assignments_received(MemberPid, MemberId, GenerationId, TopicAssignments)),
NewState = State#state{is_in_group = true},
log(NewState, info, "assignments received:~s",
[format_assignments(TopicAssignments)]),
Expand Down Expand Up @@ -834,7 +841,7 @@ assign_partitions(State) when ?IS_LEADER(State) ->
Assignments =
case Strategy =:= callback_implemented of
true ->
MemberModule:assign_partitions(MemberPid, Members, AllPartitions);
?CALL_MEMBER(MemberPid, MemberModule:assign_partitions(MemberPid, Members, AllPartitions));
false ->
do_assign_partitions(Strategy, Members, AllPartitions)
end,
Expand Down

0 comments on commit e18151c

Please sign in to comment.