Skip to content

Commit

Permalink
Fix transfer req loop when a node was replaced
Browse files Browse the repository at this point in the history
Scenario how Node2 is replaced by Node3 (this is also basically a
rolling update):
1. Node1 and Node2 are up and synced
2. Kill Node2 (node1 will start permdown grace period for Node2)
3. Spawn Node3
4. Node1 sends a heartbeat that includes clocks for Node1 & Node2
5. Node3 receives the heartbeat. It sees node1 clock is dominating
  because there's Node2 clock. It requests transfer from Node1.
6. Node1 sends transfer ack to Node3
7. Node3 uses `State#extract` to process the transfer payload which
   discards Node2 values.
8. It all starts again from step 4 on the next heartbeat.

This loop between steps 4 and 8 lasts until Node1 permdown period for
Node2 triggers and it doesn't put it to the heartbeat clocks any more.

The solution here is not to include down replicas in the heartbeat
notifications.

This fixes phoenixframework#135
  • Loading branch information
indrekj committed Dec 4, 2019
1 parent 8e49784 commit 9ffdef6
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 7 deletions.
5 changes: 4 additions & 1 deletion lib/phoenix/tracker/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ defmodule Phoenix.Tracker.State do
Returns the causal context for the set.
"""
@spec clocks(t) :: {name, context}
def clocks(%State{replica: rep, context: ctx}), do: {rep, ctx}
def clocks(%State{replica: rep, context: ctx, replicas: replicas}) do
down_replicas = for {key, state} <- replicas, state == :down, do: key
{rep, Map.drop(ctx, down_replicas)}
end

@doc """
Adds a new element to the set.
Expand Down
67 changes: 62 additions & 5 deletions test/phoenix/tracker/shard_replication_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,26 @@ defmodule Phoenix.Tracker.ShardReplicationTest do
@primary :"[email protected]"
@node1 :"[email protected]"
@node2 :"[email protected]"
@node3 :"[email protected]"

setup config do
tracker = config.test
{:ok, shard_pid} = start_shard(name: tracker)
{:ok, topic: to_string(config.test),
shard: shard_name(tracker),
shard_pid: shard_pid,
tracker: tracker}

conf = [
topic: to_string(tracker),
shard: shard_name(tracker),
tracker: tracker
]

conf =
if config[:primary] != false do
{:ok, shard_pid} = start_shard(name: tracker)
Keyword.put(conf, :shard_pid, shard_pid)
else
conf
end

{:ok, conf}
end

test "heartbeats", %{shard: shard} do
Expand Down Expand Up @@ -342,6 +354,51 @@ defmodule Phoenix.Tracker.ShardReplicationTest do
assert replicas(shard) == %{}
end

@tag primary: false
test "rolling node update", %{topic: topic, tracker: tracker, shard: shard_name} do
# By default permdown period is 1.5 seconds in the tests. This however is
# not enough to test this case.
permdown_period = 2_000
tracker_opts = [name: tracker, permdown_period: permdown_period]

# Add 2 online shards
spy_on_server(@node1, self(), shard_name)
{node1_node, {:ok, node1_shard}} = start_shard(@node1, tracker_opts)

spy_on_server(@node2, self(), shard_name)
{node2_node, {:ok, node2_shard}} = start_shard(@node2, tracker_opts)

assert_receive {{:replica_up, @node1}, @node2}, @timeout
assert_receive {{:replica_up, @node2}, @node1}, @timeout

# Add one user to node1
track_presence(@node1, shard_name, spawn_pid(), topic, "node1", %{})

assert_heartbeat to: @node1, from: @node2
assert_heartbeat to: @node2, from: @node1

# Remove node 2 (starts permdown grace on node1)
Process.unlink(node2_node)
Process.exit(node2_shard, :kill)
assert_receive {{:replica_down, @node2}, @node1}, @timeout

# Start node 3 (has no knowledge of node2)
spy_on_server(@node3, self(), shard_name)
{node3_node, {:ok, node3_shard}} = start_shard(@node3, tracker_opts)

assert_receive {{:replica_up, @node3}, @node1}, @timeout
assert_receive {{:replica_up, @node1}, @node3}, @timeout

# Sends transfer request once
assert_transfer_req(from: @node3, to: @node1)

# Does not send more transfer requests
refute_transfer_req(from: @node3, to: @node1)

# Wait until node1 is permanently down
assert_receive {{:replica_permdown, @node2}, @node1}, permdown_period * 2
end

## Helpers

def spawn_pid, do: spawn(fn -> :timer.sleep(:infinity) end)
Expand Down
2 changes: 1 addition & 1 deletion test/test_helper.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Supervisor.start_link(
)

unless :clustered in exclude do
Phoenix.PubSub.Cluster.spawn([:"[email protected]", :"[email protected]"])
Phoenix.PubSub.Cluster.spawn([:"[email protected]", :"[email protected]", :"[email protected]"])
end

ExUnit.start()

0 comments on commit 9ffdef6

Please sign in to comment.