From 84bd8186a08d9bba671c4077c27cc6d30f5986eb Mon Sep 17 00:00:00 2001 From: Indrek Juhkam Date: Mon, 15 Jul 2019 14:01:14 +0300 Subject: [PATCH] Optimize Shard.list and Shard.get_by_key Previous list and get_by_key had to go through GenServer to acquire values ets table and replicas information. In case GenServer was processing an update (e.g. heartbeat, track, untrack) then list and get_by_key functions were blocked until it was completed. We saw this behaviour in our cluster where simple list/get_by_key calls were sometimes taking over few hundred milliseconds. Storing down replicas information in an ets table allows us to avoid going through genserver and allows us to process list/get_by_key immediately. I removed dirty_list function which was not public / exposed and which was trying to resolve the same issue. dirty_list was called dirty because it didn't check for down_replicas. This solution checks down_replicas and doesn't change the api interface. This should also resolve #124 --- lib/phoenix/tracker/shard.ex | 13 +++-- lib/phoenix/tracker/state.ex | 58 +++++++++++-------- .../tracker/shard_replication_test.exs | 15 ++--- 3 files changed, 47 insertions(+), 39 deletions(-) diff --git a/lib/phoenix/tracker/shard.ex b/lib/phoenix/tracker/shard.ex index 4c5272f8c..d4033cb0d 100644 --- a/lib/phoenix/tracker/shard.ex +++ b/lib/phoenix/tracker/shard.ex @@ -68,23 +68,24 @@ defmodule Phoenix.Tracker.Shard do end @spec list(pid | atom, topic) :: [presence] - def list(server_pid, topic) do + def list(server_pid, topic) when is_pid(server_pid) do server_pid |> GenServer.call({:list, topic}) |> State.get_by_topic(topic) end - - @doc false - def dirty_list(shard_name, topic) do - State.tracked_values(shard_name, topic, []) + def list(shard_name, topic) when is_atom(shard_name) do + State.get_by_topic(shard_name, topic) end @spec get_by_key(pid | atom, topic, term) :: [presence] - def get_by_key(server_pid, topic, key) do + def get_by_key(server_pid, topic, key) when is_pid(server_pid) do server_pid |> GenServer.call({:list, topic}) |> State.get_by_key(topic, key) end + def get_by_key(shard_name, topic, key) when is_atom(shard_name) do + State.get_by_key(shard_name, topic, key) + end @spec graceful_permdown(pid) :: :ok def graceful_permdown(server_pid) do diff --git a/lib/phoenix/tracker/state.ex b/lib/phoenix/tracker/state.ex index 4072f3b67..370a1611b 100644 --- a/lib/phoenix/tracker/state.ex +++ b/lib/phoenix/tracker/state.ex @@ -21,15 +21,15 @@ defmodule Phoenix.Tracker.State do @type pid_lookup :: {pid, topic, key} @type t :: %State{ - replica: name, - context: context, - clouds: clouds, - values: values, - pids: ets_id, - mode: :unset | :delta | :normal, - delta: :unset | delta, - replicas: %{name => :up | :down}, - range: {context, context} + replica: name, + context: context, + clouds: clouds, + values: values, + pids: ets_id, + mode: :unset | :delta | :normal, + delta: :unset | delta, + down_replicas: ets_id, + range: {context, context} } defstruct replica: nil, @@ -39,7 +39,7 @@ defmodule Phoenix.Tracker.State do pids: nil, mode: :unset, delta: :unset, - replicas: %{}, + down_replicas: nil, range: {%{}, %{}} @compile {:inline, tag: 1, clock: 1, put_tag: 2, delete_tag: 2, remove_delta_tag: 2} @@ -61,7 +61,7 @@ defmodule Phoenix.Tracker.State do mode: :normal, values: :ets.new(shard_name, [:named_table, :protected, :ordered_set]), pids: :ets.new(:pids, [:duplicate_bag]), - replicas: %{replica => :up}}) + down_replicas: :ets.new(down_replicas_table(shard_name), [:named_table, :protected, :bag])}) end @doc """ @@ -119,21 +119,30 @@ defmodule Phoenix.Tracker.State do @doc """ Returns a list of elements for the topic who belong to an online replica. """ - @spec get_by_topic(t, topic) :: [key_meta] + @spec get_by_topic(t | atom, topic) :: [key_meta] def get_by_topic(%State{values: values} = state, topic) do tracked_values(values, topic, down_replicas(state)) end + def get_by_topic(shard_name, topic) do + tracked_values(shard_name, topic, down_replicas(shard_name)) + end @doc """ Returns a list of elements for the topic who belong to an online replica. """ - @spec get_by_key(t, topic, key) :: [key_meta] + @spec get_by_key(t | atom, topic, key) :: [key_meta] def get_by_key(%State{values: values} = state, topic, key) do case tracked_key(values, topic, key, down_replicas(state)) do [] -> [] [_|_] = metas -> metas end end + def get_by_key(shard_name, topic, key) do + case tracked_key(shard_name, topic, key, down_replicas(shard_name)) do + [] -> [] + [_|_] = metas -> metas + end + end @doc """ Performs table lookup for tracked elements in the topic. @@ -396,18 +405,18 @@ defmodule Phoenix.Tracker.State do Marks a replica as up in the set and returns rejoined users. """ @spec replica_up(t, name) :: {t, joins :: [values], leaves :: []} - def replica_up(%State{replicas: replicas, context: ctx} = state, replica) do - {%State{state | - context: Map.put_new(ctx, replica, 0), - replicas: Map.put(replicas, replica, :up)}, replica_users(state, replica), []} + def replica_up(%State{down_replicas: down_replicas, context: ctx} = state, replica) do + :ets.delete_object(down_replicas, replica) + {%State{state | context: Map.put_new(ctx, replica, 0)}, replica_users(state, replica), []} end @doc """ Marks a replica as down in the set and returns left users. """ @spec replica_down(t, name) :: {t, joins:: [], leaves :: [values]} - def replica_down(%State{replicas: replicas} = state, replica) do - {%State{state | replicas: Map.put(replicas, replica, :down)}, [], replica_users(state, replica)} + def replica_down(%State{down_replicas: down_replicas} = state, replica) do + :ets.insert(down_replicas, replica) + {state, [], replica_users(state, replica)} end @doc """ @@ -558,10 +567,9 @@ defmodule Phoenix.Tracker.State do delta: %State{delta | range: {start_clock, new_end}}} end - @spec down_replicas(t) :: [name] - defp down_replicas(%State{replicas: replicas}) do - for {replica, :down} <- replicas, do: replica - end + @spec down_replicas(t | atom) :: [name] + defp down_replicas(%State{down_replicas: down_replicas}), do: :ets.tab2list(down_replicas) + defp down_replicas(shard_name), do: :ets.tab2list(down_replicas_table(shard_name)) @spec replica_users(t, name) :: [value] defp replica_users(%State{values: values}, replica) do @@ -578,4 +586,8 @@ defmodule Phoenix.Tracker.State do defp foldl({objects, cont}, acc, func) do foldl(:ets.select(cont), Enum.reduce(objects, acc, func), func) end + + defp down_replicas_table(shard_name) do + :"#{shard_name}.down_replicas" + end end diff --git a/test/phoenix/tracker/shard_replication_test.exs b/test/phoenix/tracker/shard_replication_test.exs index 5481c1cc6..c44831035 100644 --- a/test/phoenix/tracker/shard_replication_test.exs +++ b/test/phoenix/tracker/shard_replication_test.exs @@ -54,6 +54,9 @@ defmodule Phoenix.Tracker.ShardReplicationTest do # node1 fulfills tranfer request and sends transfer_ack to primary assert_transfer_ack ref, from: @node1 assert_heartbeat to: @node1, from: @primary + + # small delay to ensure transfer_ack has been processed before calling list + :timer.sleep(10) assert [{"node1", _}] = list(shard, topic) end @@ -99,6 +102,8 @@ defmodule Phoenix.Tracker.ShardReplicationTest do assert_heartbeat from: @node1 assert_heartbeat from: @node2 + # small delay to ensure transfer_ack has been processed before calling list + :timer.sleep(10) assert [{"node1", _}, {"node1.2", _}, {"node2", _}] = list(shard, topic) end @@ -249,7 +254,6 @@ defmodule Phoenix.Tracker.ShardReplicationTest do assert_join ^topic, "node1", %{name: "s1"} assert %{@node1 => %Replica{status: :up}} = replicas(shard) assert [{"local1", _}, {"node1", _}] = list(shard, topic) - assert [{"local1", _}, {"node1", _}] = dirty_list(shard, topic) # nodedown Process.unlink(node_pid) @@ -257,13 +261,8 @@ defmodule Phoenix.Tracker.ShardReplicationTest do assert_leave ^topic, "node1", %{name: "s1"} assert %{@node1 => %Replica{status: :down}} = replicas(shard) assert [{"local1", _}] = list(shard, topic) - assert [{"local1", _}, {"node1", _}] = dirty_list(shard, topic) - - :timer.sleep(@permdown + 2*@heartbeat) - assert [{"local1", _}] = dirty_list(shard, topic) end - test "untrack with no tracked topic is a noop", %{shard: shard, topic: topic} do assert Shard.untrack(shard, self(), topic, "foo") == :ok @@ -439,8 +438,4 @@ defmodule Phoenix.Tracker.ShardReplicationTest do defp list(shard, topic) do Enum.sort(Shard.list(shard, topic)) end - - defp dirty_list(shard, topic) do - Enum.sort(Shard.dirty_list(shard, topic)) - end end