diff --git a/.iex.exs b/.iex.exs new file mode 100644 index 000000000..e69de29bb diff --git a/config/runtime.exs b/config/runtime.exs index c187a1329..197cb9612 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -185,7 +185,7 @@ if keystore_pass_dir != nil and not File.dir?(keystore_pass_dir) do System.halt(2) end -config :lambda_ethereum_consensus, LambdaEthereumConsensus.Validator.Setup, +config :lambda_ethereum_consensus, LambdaEthereumConsensus.ValidatorSet, keystore_dir: keystore_dir, keystore_pass_dir: keystore_pass_dir diff --git a/lib/lambda_ethereum_consensus/beacon/beacon_node.ex b/lib/lambda_ethereum_consensus/beacon/beacon_node.ex index 491ba2b07..52ff97d63 100644 --- a/lib/lambda_ethereum_consensus/beacon/beacon_node.ex +++ b/lib/lambda_ethereum_consensus/beacon/beacon_node.ex @@ -8,7 +8,7 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do alias LambdaEthereumConsensus.ForkChoice alias LambdaEthereumConsensus.StateTransition.Cache alias LambdaEthereumConsensus.Store.BlockStates - alias LambdaEthereumConsensus.Validator + alias LambdaEthereumConsensus.ValidatorSet alias Types.BeaconState def start_link(opts) do @@ -27,13 +27,13 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do store = ForkChoice.init_store(store, time) - validators = Validator.Setup.init(store.head_slot, store.head_root) + validator_set = ValidatorSet.init(store.head_slot, store.head_root) StoreSetup.get_deposit_snapshot!() |> init_execution_chain(store.head_root) libp2p_args = - [genesis_time: store.genesis_time, validators: validators, store: store] ++ + [genesis_time: store.genesis_time, validator_set: validator_set, store: store] ++ get_libp2p_args() children = diff --git a/lib/lambda_ethereum_consensus/validator/duties.ex b/lib/lambda_ethereum_consensus/validator/duties.ex index 5e590fd7b..83f0b09ce 100644 --- a/lib/lambda_ethereum_consensus/validator/duties.ex +++ b/lib/lambda_ethereum_consensus/validator/duties.ex @@ -5,171 +5,139 @@ defmodule LambdaEthereumConsensus.Validator.Duties do alias LambdaEthereumConsensus.StateTransition.Accessors alias LambdaEthereumConsensus.StateTransition.Misc alias LambdaEthereumConsensus.Validator.Utils + alias LambdaEthereumConsensus.ValidatorSet alias Types.BeaconState require Logger @type attester_duty :: %{ attested?: boolean(), + # should_aggregate? is used to check if aggregation is needed for this attestation. + # and also to avoid double aggregation. should_aggregate?: boolean(), selection_proof: Bls.signature(), signing_domain: Types.domain(), subnet_id: Types.uint64(), - slot: Types.slot(), + validator_index: Types.validator_index(), committee_index: Types.uint64(), committee_length: Types.uint64(), index_in_committee: Types.uint64() } + @type proposer_duty :: Types.slot() - @type attester_duties :: list(:not_computed | attester_duty()) - @type proposer_duties :: :not_computed | list(Types.slot()) + @type attester_duties :: [attester_duty()] + @type proposer_duties :: [proposer_duty()] - @type duties :: %{ - attester: attester_duties(), - proposer: proposer_duties() - } + @type attester_duties_per_slot :: %{Types.slot() => attester_duties()} + @type proposer_duties_per_slot :: %{Types.slot() => proposer_duties()} - @spec empty_duties() :: duties() - def empty_duties() do - %{ - # Order is: previous epoch, current epoch, next epoch - attester: [:not_computed, :not_computed, :not_computed], - proposer: :not_computed - } - end + @type kind :: :proposers | :attesters + @type duties :: %{kind() => attester_duties_per_slot() | proposer_duties_per_slot()} - @spec get_current_attester_duty(duties :: duties(), current_slot :: Types.slot()) :: - attester_duty() - def get_current_attester_duty(%{attester: attester_duties}, current_slot) do - Enum.find(attester_duties, fn - :not_computed -> false - duty -> duty.slot == current_slot - end) - end + ############################ + # Accessors - @spec replace_attester_duty( - duties :: duties(), - duty :: attester_duty(), - new_duty :: attester_duty() - ) :: duties() - def replace_attester_duty(duties, duty, new_duty) do - attester_duties = - Enum.map(duties.attester, fn - ^duty -> new_duty - d -> d - end) - - %{duties | attester: attester_duties} - end + @spec current_proposer(duties(), Types.epoch(), Types.slot()) :: proposer_duty() | nil + def current_proposer(duties, epoch, slot), + do: get_in(duties, [epoch, :proposers, slot]) - @spec log_duties(duties :: duties(), validator_index :: Types.validator_index()) :: :ok - def log_duties(%{attester: attester_duties, proposer: proposer_duties}, validator_index) do - attester_duties - # Drop the first element, which is the previous epoch's duty - |> Stream.drop(1) - |> Enum.each(fn %{ - index_in_committee: i, - committee_index: ci, - slot: slot, - should_aggregate?: sa - } -> - Logger.info( - "[Validator] #{validator_index} has to attest in committee #{ci} of slot #{slot} with index #{i}, and should_aggregate?: #{sa}" - ) - end) - - Enum.each(proposer_duties, fn slot -> - Logger.info("[Validator] #{validator_index} has to propose a block in slot #{slot}!") - end) + @spec current_attesters(duties(), Types.epoch(), Types.slot()) :: attester_duties() + def current_attesters(duties, epoch, slot) do + for %{attested?: false} = duty <- attesters(duties, epoch, slot) do + duty + end end - @spec compute_proposer_duties( - beacon_state :: BeaconState.t(), - epoch :: Types.epoch(), - validator_index :: Types.validator_index() - ) :: proposer_duties() - def compute_proposer_duties(beacon_state, epoch, validator_index) do - start_slot = Misc.compute_start_slot_at_epoch(epoch) - - start_slot..(start_slot + ChainSpec.get("SLOTS_PER_EPOCH") - 1) - |> Enum.flat_map(fn slot -> - # Can't fail - {:ok, proposer_index} = Accessors.get_beacon_proposer_index(beacon_state, slot) - if proposer_index == validator_index, do: [slot], else: [] - end) + @spec current_aggregators(duties(), Types.epoch(), Types.slot()) :: attester_duties() + def current_aggregators(duties, epoch, slot) do + for %{should_aggregate?: true} = duty <- attesters(duties, epoch, slot) do + duty + end end - def maybe_update_duties(duties, beacon_state, epoch, validator_index, privkey) do - attester_duties = - maybe_update_attester_duties(duties.attester, beacon_state, epoch, validator_index, privkey) + defp attesters(duties, epoch, slot), do: get_in(duties, [epoch, :attesters, slot]) || [] - proposer_duties = compute_proposer_duties(beacon_state, epoch, validator_index) - # To avoid edge-cases - old_duty = - case duties.proposer do - :not_computed -> [] - old -> old |> Enum.reverse() |> Enum.take(1) - end + ############################ + # Update functions - %{duties | attester: attester_duties, proposer: old_duty ++ proposer_duties} + @spec update_duties!( + duties(), + kind(), + Types.epoch(), + Types.slot(), + attester_duties() | proposer_duties() + ) :: duties() + def update_duties!(duties, kind, epoch, slot, updated), + do: put_in(duties, [epoch, kind, slot], updated) + + @spec attested(attester_duty()) :: attester_duty() + def attested(duty), do: Map.put(duty, :attested?, true) + + @spec aggregated(attester_duty()) :: attester_duty() + # should_aggregate? is set to false to avoid double aggregation. + def aggregated(duty), do: Map.put(duty, :should_aggregate?, false) + + ############################ + # Main functions + + @spec compute_proposers_for_epoch(BeaconState.t(), Types.epoch(), ValidatorSet.validators()) :: + proposer_duties_per_slot() + def compute_proposers_for_epoch(%BeaconState{} = state, epoch, validators) do + with {:ok, epoch} <- check_valid_epoch(state, epoch), + {start_slot, end_slot} <- boundary_slots(epoch) do + for slot <- start_slot..end_slot, + {:ok, proposer_index} = Accessors.get_beacon_proposer_index(state, slot), + Map.has_key?(validators, proposer_index), + into: %{} do + {slot, proposer_index} + end + end end - defp maybe_update_attester_duties( - [epp, ep0, ep1], - beacon_state, - epoch, - validator_index, - privkey - ) do - duties = - Stream.with_index([ep0, ep1]) - |> Enum.map(fn - {:not_computed, i} -> - compute_attester_duties(beacon_state, epoch + i, validator_index, privkey) - - {d, _} -> - d - end) - - [epp | duties] - end + @spec compute_attesters_for_epoch(BeaconState.t(), Types.epoch(), ValidatorSet.validators()) :: + attester_duties_per_slot() + def compute_attesters_for_epoch(%BeaconState{} = state, epoch, validators) do + with {:ok, epoch} <- check_valid_epoch(state, epoch), + {start_slot, end_slot} <- boundary_slots(epoch) do + committee_count_per_slot = Accessors.get_committee_count_per_slot(state, epoch) - def shift_duties(%{attester: [_ep0, ep1, ep2]} = duties, epoch, current_epoch) do - case current_epoch - epoch do - 1 -> %{duties | attester: [ep1, ep2, :not_computed]} - 2 -> %{duties | attester: [ep2, :not_computed, :not_computed]} - _ -> %{duties | attester: [:not_computed, :not_computed, :not_computed]} + for slot <- start_slot..end_slot, + committee_i <- 0..(committee_count_per_slot - 1), + reduce: %{} do + acc -> + new_duties = compute_duties_per_committee(state, epoch, slot, validators, committee_i) + Map.update(acc, slot, new_duties, &(new_duties ++ &1)) + end end end - @spec compute_attester_duties( - beacon_state :: BeaconState.t(), - epoch :: Types.epoch(), - validator_index :: non_neg_integer(), - privkey :: Bls.privkey() - ) :: attester_duty() | nil - defp compute_attester_duties(beacon_state, epoch, validator_index, privkey) do - # Can't fail - {:ok, duty} = get_committee_assignment(beacon_state, epoch, validator_index) - - case duty do - nil -> - nil - - duty -> - duty - |> Map.put(:attested?, false) - |> update_with_aggregation_duty(beacon_state, privkey) - |> update_with_subnet_id(beacon_state, epoch) + defp compute_duties_per_committee(state, epoch, slot, validators, committee_index) do + case Accessors.get_beacon_committee(state, slot, committee_index) do + {:ok, committee} -> + for {validator_index, index_in_committee} <- Enum.with_index(committee), + validator = Map.get(validators, validator_index) do + %{ + validator_index: validator_index, + index_in_committee: index_in_committee, + committee_length: length(committee), + committee_index: committee_index, + attested?: false + } + |> update_with_aggregation_duty(state, slot, validator.keystore.privkey) + |> update_with_subnet_id(state, epoch, slot) + end + + {:error, _} -> + [] end end - defp update_with_aggregation_duty(duty, beacon_state, privkey) do - proof = Utils.get_slot_signature(beacon_state, duty.slot, privkey) + defp update_with_aggregation_duty(duty, beacon_state, slot, privkey) do + proof = Utils.get_slot_signature(beacon_state, slot, privkey) if Utils.aggregator?(proof, duty.committee_length) do - epoch = Misc.compute_epoch_at_slot(duty.slot) + epoch = Misc.compute_epoch_at_slot(slot) domain = Accessors.get_domain(beacon_state, Constants.domain_aggregate_and_proof(), epoch) Map.put(duty, :should_aggregate?, true) @@ -180,70 +148,60 @@ defmodule LambdaEthereumConsensus.Validator.Duties do end end - defp update_with_subnet_id(duty, beacon_state, epoch) do + defp update_with_subnet_id(duty, beacon_state, epoch, slot) do committees_per_slot = Accessors.get_committee_count_per_slot(beacon_state, epoch) subnet_id = - Utils.compute_subnet_for_attestation(committees_per_slot, duty.slot, duty.committee_index) + Utils.compute_subnet_for_attestation(committees_per_slot, slot, duty.committee_index) Map.put(duty, :subnet_id, subnet_id) end - @doc """ - Return the committee assignment in the ``epoch`` for ``validator_index``. - ``assignment`` returned is a tuple of the following form: - * ``assignment[0]`` is the index of the validator in the committee - * ``assignment[1]`` is the index to which the committee is assigned - * ``assignment[2]`` is the slot at which the committee is assigned - Return `nil` if no assignment. - """ - @spec get_committee_assignment(BeaconState.t(), Types.epoch(), Types.validator_index()) :: - {:ok, nil | attester_duty()} | {:error, String.t()} - def get_committee_assignment(%BeaconState{} = state, epoch, validator_index) do + ############################ + # Helpers + + @spec log_duties_for_epoch(duties(), Types.epoch()) :: :ok + def log_duties_for_epoch(%{proposers: proposers, attesters: attesters}, epoch) do + Logger.info("[Duties] Proposers for epoch #{epoch} (slot=>validator): #{inspect(proposers)}") + + for {slot, att_duties} <- attesters do + Logger.info("[Duties] Attesters for epoch: #{epoch}, slot #{slot}:") + + for %{ + index_in_committee: ic, + committee_index: ci, + committee_length: cl, + subnet_id: si, + should_aggregate?: agg, + validator_index: vi + } <- att_duties do + Logger.info([ + "[Duties] Validator: #{vi}, will attest in committee #{ci} ", + "as #{ic}/#{cl - 1} in subnet: #{si}#{if agg, do: " and should Aggregate"}." + ]) + end + end + + :ok + end + + def log_duties_for_epoch(_duties, epoch), + do: Logger.info("[Duties] No duties for epoch: #{epoch}.") + + defp check_valid_epoch(state, epoch) do next_epoch = Accessors.get_current_epoch(state) + 1 if epoch > next_epoch do {:error, "epoch must be <= next_epoch"} else - start_slot = Misc.compute_start_slot_at_epoch(epoch) - committee_count_per_slot = Accessors.get_committee_count_per_slot(state, epoch) - end_slot = start_slot + ChainSpec.get("SLOTS_PER_EPOCH") - - start_slot..end_slot - |> Stream.map(fn slot -> - 0..(committee_count_per_slot - 1) - |> Stream.map(&compute_attester_duty(state, slot, validator_index, &1)) - |> Enum.find(&(not is_nil(&1))) - end) - |> Enum.find(&(not is_nil(&1))) - |> then(&{:ok, &1}) + {:ok, epoch} end end - @spec compute_attester_duty( - state :: BeaconState.t(), - slot :: Types.slot(), - validator_index :: Types.validator_index(), - committee_index :: Types.uint64() - ) :: attester_duty() | nil - defp compute_attester_duty(state, slot, validator_index, committee_index) do - case Accessors.get_beacon_committee(state, slot, committee_index) do - {:ok, committee} -> - case Enum.find_index(committee, &(&1 == validator_index)) do - nil -> - nil - - index -> - %{ - index_in_committee: index, - committee_length: length(committee), - committee_index: committee_index, - slot: slot - } - end + defp boundary_slots(epoch) do + start_slot = Misc.compute_start_slot_at_epoch(epoch) + end_slot = start_slot + ChainSpec.get("SLOTS_PER_EPOCH") - 1 - {:error, _} -> - nil - end + {start_slot, end_slot} end end diff --git a/lib/lambda_ethereum_consensus/validator/setup.ex b/lib/lambda_ethereum_consensus/validator/setup.ex deleted file mode 100644 index 7f88ddeb8..000000000 --- a/lib/lambda_ethereum_consensus/validator/setup.ex +++ /dev/null @@ -1,105 +0,0 @@ -defmodule LambdaEthereumConsensus.Validator.Setup do - @moduledoc """ - Module that setups the initial validators state - """ - - require Logger - alias LambdaEthereumConsensus.Validator - - @spec init(Types.slot(), Types.root()) :: %{Bls.pubkey() => Validator.t()} - def init(slot, head_root) do - config = Application.get_env(:lambda_ethereum_consensus, __MODULE__, []) - keystore_dir = Keyword.get(config, :keystore_dir) - keystore_pass_dir = Keyword.get(config, :keystore_pass_dir) - - setup_validators(slot, head_root, keystore_dir, keystore_pass_dir) - end - - defp setup_validators(_s, _r, keystore_dir, keystore_pass_dir) - when is_nil(keystore_dir) or is_nil(keystore_pass_dir) do - Logger.warning( - "[Validator] No keystore_dir or keystore_pass_dir provided. Validator will not start." - ) - - %{} - end - - defp setup_validators(slot, head_root, keystore_dir, keystore_pass_dir) do - validator_keystores = decode_validator_keystores(keystore_dir, keystore_pass_dir) - - validators = - validator_keystores - |> Enum.map(fn keystore -> - {keystore.pubkey, Validator.new({slot, head_root, keystore})} - end) - |> Map.new() - - Logger.info("[Validator] Initialized #{Enum.count(validators)} validators") - - validators - end - - @doc """ - Get validator keystores from the keystore directory. - This function expects two files for each validator: - - /.json - - /.txt - """ - @spec decode_validator_keystores(binary(), binary()) :: - list(Keystore.t()) - def decode_validator_keystores(keystore_dir, keystore_pass_dir) - when is_binary(keystore_dir) and is_binary(keystore_pass_dir) do - File.ls!(keystore_dir) - |> Enum.map(fn filename -> - if String.ends_with?(filename, ".json") do - base_name = String.trim_trailing(filename, ".json") - - keystore_file = Path.join(keystore_dir, "#{base_name}.json") - keystore_pass_file = Path.join(keystore_pass_dir, "#{base_name}.txt") - - {keystore_file, keystore_pass_file} - else - Logger.warning("[Validator] Skipping file: #{filename}. Not a keystore file.") - nil - end - end) - |> Enum.reject(&is_nil/1) - |> Enum.map(fn {keystore_file, keystore_pass_file} -> - # TODO: remove `try` and handle errors properly - try do - Keystore.decode_from_files!(keystore_file, keystore_pass_file) - rescue - error -> - Logger.error( - "[Validator] Failed to decode keystore file: #{keystore_file}. Pass file: #{keystore_pass_file} Error: #{inspect(error)}" - ) - - nil - end - end) - |> Enum.reject(&is_nil/1) - end - - @spec notify_validators(map(), tuple()) :: map() - def notify_validators(validators, msg) do - start_time = System.monotonic_time(:millisecond) - - Logger.debug("[Validator] Notifying all Validators with message: #{inspect(msg)}") - - updated_validators = Map.new(validators, ¬ify_validator(&1, msg)) - - end_time = System.monotonic_time(:millisecond) - - Logger.debug( - "[Validator] #{inspect(msg)} notified to all Validators after #{end_time - start_time} ms" - ) - - updated_validators - end - - defp notify_validator({pubkey, validator}, {:on_tick, slot_data}), - do: {pubkey, Validator.handle_tick(slot_data, validator)} - - defp notify_validator({pubkey, validator}, {:new_head, slot, head_root}), - do: {pubkey, Validator.handle_new_head(slot, head_root, validator)} -end diff --git a/lib/lambda_ethereum_consensus/validator/validator.ex b/lib/lambda_ethereum_consensus/validator/validator.ex index 50a468607..07fbaa661 100644 --- a/lib/lambda_ethereum_consensus/validator/validator.ex +++ b/lib/lambda_ethereum_consensus/validator/validator.ex @@ -5,10 +5,6 @@ defmodule LambdaEthereumConsensus.Validator do require Logger defstruct [ - :slot, - :root, - :epoch, - :duties, :index, :keystore, :payload_builder @@ -32,220 +28,76 @@ defmodule LambdaEthereumConsensus.Validator do @default_graffiti_message "Lambda, so gentle, so good" - # TODO: Slot and Root are redundant, we should also have the duties separated and calculated - # just at the begining of every epoch, and then just update them as needed. + @type index :: non_neg_integer() + @type t :: %__MODULE__{ - slot: Types.slot(), - epoch: Types.epoch(), - root: Types.root(), - duties: Duties.duties(), - index: non_neg_integer() | nil, + index: index() | nil, keystore: Keystore.t(), payload_builder: {Types.slot(), Types.root(), BlockBuilder.payload_id()} | nil } - @spec new({Types.slot(), Types.root(), Keystore.t()}) :: t() - def new({head_slot, head_root, keystore}) do + @spec new(Keystore.t(), Types.slot(), Types.root()) :: t() + def new(keystore, head_slot, head_root) do + epoch = Misc.compute_epoch_at_slot(head_slot) + beacon = fetch_target_state_and_go_to_slot(epoch, head_slot, head_root) + + new(keystore, beacon) + end + + @spec new(Keystore.t(), Types.BeaconState.t()) :: t() + def new(keystore, beacon) do state = %__MODULE__{ - slot: head_slot, - epoch: Misc.compute_epoch_at_slot(head_slot), - root: head_root, - duties: Duties.empty_duties(), index: nil, keystore: keystore, payload_builder: nil } - case try_setup_validator(state, head_slot, head_root) do - nil -> - # TODO: Previously this was handled by the validator continously trying to setup itself, - # but now that they are processed syncronously, we should handle this case different. - # Right now it's just omitted and logged. - Logger.error("[Validator] Public key not found in the validator set") - state - - new_state -> - new_state - end - end - - @spec try_setup_validator(t(), Types.slot(), Types.root()) :: t() | nil - defp try_setup_validator(state, slot, root) do - epoch = Misc.compute_epoch_at_slot(slot) - beacon = fetch_target_state(epoch, root) - case fetch_validator_index(beacon, state.keystore.pubkey) do nil -> - nil + Logger.warning( + "[Validator] Public key #{state.keystore.pubkey} not found in the validator set" + ) + + state validator_index -> - log_info(validator_index, "setup validator", slot: slot, root: root) - - duties = - Duties.maybe_update_duties( - state.duties, - beacon, - epoch, - validator_index, - state.keystore.privkey - ) - - join_subnets_for_duties(duties) - Duties.log_duties(duties, validator_index) - %{state | duties: duties, index: validator_index} + log_debug(validator_index, "Setup completed") + %{state | index: validator_index} end end - @spec handle_new_head(Types.slot(), Types.root(), t()) :: t() - def handle_new_head(slot, head_root, %{index: nil} = state) do - log_error("-1", "setup validator", "index not present handle block", - slot: slot, - root: head_root - ) - - state - end - - def handle_new_head(slot, head_root, state) do - log_debug(state.index, "recieved new head", slot: slot, root: head_root) - - # TODO: this doesn't take into account reorgs - state - |> update_state(slot, head_root) - |> maybe_attest(slot) - |> maybe_build_payload(slot + 1) - end - - @spec handle_tick({Types.slot(), atom()}, t()) :: t() - def handle_tick(_logical_time, %{index: nil} = state) do - log_error("-1", "setup validator", "index not present for handle tick") - state - end - - def handle_tick({slot, :first_third}, state) do - log_debug(state.index, "started first third", slot: slot) - # Here we may: - # 1. propose our blocks - # 2. (TODO) start collecting attestations for aggregation - maybe_propose(state, slot) - |> update_state(slot, state.root) - end - - def handle_tick({slot, :second_third}, state) do - log_debug(state.index, "started second third", slot: slot) - # Here we may: - # 1. send our attestation for an empty slot - # 2. start building a payload - state - |> maybe_attest(slot) - |> maybe_build_payload(slot + 1) - end - - def handle_tick({slot, :last_third}, state) do - log_debug(state.index, "started last third", slot: slot) - # Here we may publish our attestation aggregate - maybe_publish_aggregate(state, slot) - end - - ########################## - ### Private Functions ########################## + # Target State - @spec update_state(t(), Types.slot(), Types.root()) :: t() - - defp update_state(%{slot: slot, root: root} = state, slot, root), do: state - - # Epoch as part of the state now avoids recomputing the duties at every block - defp update_state(%{epoch: last_epoch} = state, slot, head_root) do - epoch = Misc.compute_epoch_at_slot(slot + 1) - - if last_epoch == epoch do - %{state | slot: slot, root: head_root} - else - recompute_duties(state, last_epoch, epoch, slot, head_root) - end + @spec fetch_target_state_and_go_to_slot(Types.epoch(), Types.slot(), Types.root()) :: + Types.BeaconState.t() + def fetch_target_state_and_go_to_slot(epoch, slot, root) do + epoch |> fetch_target_state(root) |> go_to_slot(slot) end - @spec recompute_duties(t(), Types.epoch(), Types.epoch(), Types.slot(), Types.root()) :: t() - defp recompute_duties(%{root: last_root} = state, last_epoch, epoch, slot, head_root) do - start_slot = Misc.compute_start_slot_at_epoch(epoch) - target_root = if slot == start_slot, do: head_root, else: last_root - - # Process the start of the new epoch - new_beacon = fetch_target_state(epoch, target_root) |> go_to_slot(start_slot) - - new_duties = - Duties.shift_duties(state.duties, epoch, last_epoch) - |> Duties.maybe_update_duties(new_beacon, epoch, state.index, state.keystore.privkey) - - move_subnets(state.duties, new_duties) - Duties.log_duties(new_duties, state.index) - - %{state | slot: slot, root: head_root, duties: new_duties, epoch: epoch} - end - - @spec fetch_target_state(Types.epoch(), Types.root()) :: Types.BeaconState.t() defp fetch_target_state(epoch, root) do {:ok, state} = CheckpointStates.compute_target_checkpoint_state(epoch, root) state end - defp get_subnet_ids(duties), - do: duties |> Stream.reject(&(&1 == :not_computed)) |> Enum.map(& &1.subnet_id) - - defp move_subnets(%{attester: old_duties}, %{attester: new_duties}) do - old_subnets = old_duties |> get_subnet_ids() |> MapSet.new() - new_subnets = new_duties |> get_subnet_ids() |> MapSet.new() - - # leave old subnets (except for recurring ones) - MapSet.difference(old_subnets, new_subnets) |> leave() - - # join new subnets (except for recurring ones) - MapSet.difference(new_subnets, old_subnets) |> join() - end - - defp join_subnets_for_duties(%{attester: duties}) do - duties |> get_subnet_ids() |> join() - end - - defp join(subnets) do - if not Enum.empty?(subnets) do - Logger.debug("Joining subnets: #{Enum.join(subnets, ", ")}") - Enum.each(subnets, &Gossip.Attestation.join/1) - end - end + defp go_to_slot(%{slot: old_slot} = state, slot) when old_slot == slot, do: state - defp leave(subnets) do - if not Enum.empty?(subnets) do - Logger.debug("Leaving subnets: #{Enum.join(subnets, ", ")}") - Enum.each(subnets, &Gossip.Attestation.leave/1) - end + defp go_to_slot(%{slot: old_slot} = state, slot) when old_slot < slot do + {:ok, st} = StateTransition.process_slots(state, slot) + st end - @spec maybe_attest(t(), Types.slot()) :: t() - defp maybe_attest(state, slot) do - case Duties.get_current_attester_duty(state.duties, slot) do - %{attested?: false} = duty -> - attest(state, duty) - - new_duties = - Duties.replace_attester_duty(state.duties, duty, %{duty | attested?: true}) - - %{state | duties: new_duties} - - _ -> - state - end - end + ########################## + # Attestations - @spec attest(t(), Duties.attester_duty()) :: :ok - defp attest(%{index: validator_index, keystore: keystore} = state, current_duty) do + @spec attest(t(), Duties.attester_duty(), Types.slot(), Types.root()) :: :ok + def attest(%{index: validator_index, keystore: keystore}, current_duty, slot, head_root) do subnet_id = current_duty.subnet_id - log_debug(validator_index, "attesting", slot: current_duty.slot, subnet_id: subnet_id) + log_debug(validator_index, "attesting", slot: slot, subnet_id: subnet_id) - attestation = produce_attestation(current_duty, state.root, keystore.privkey) + attestation = produce_attestation(current_duty, slot, head_root, keystore.privkey) - log_md = [slot: attestation.data.slot, attestation: attestation, subnet_id: subnet_id] + log_md = [slot: slot, attestation: attestation, subnet_id: subnet_id] debug_log_msg = "publishing attestation on committee index: #{current_duty.committee_index} | as #{current_duty.index_in_committee}/#{current_duty.committee_length - 1} and pubkey: #{LambdaEthereumConsensus.Utils.format_shorten_binary(keystore.pubkey)}" @@ -263,26 +115,12 @@ defmodule LambdaEthereumConsensus.Validator do end end - # We publish our aggregate on the next slot, and when we're an aggregator - defp maybe_publish_aggregate(%{index: validator_index, keystore: keystore} = state, slot) do - case Duties.get_current_attester_duty(state.duties, slot) do - %{should_aggregate?: true} = duty -> - publish_aggregate(duty, validator_index, keystore) - - new_duties = - Duties.replace_attester_duty(state.duties, duty, %{duty | should_aggregate?: false}) - - %{state | duties: new_duties} - - _ -> - state - end - end - - defp publish_aggregate(duty, validator_index, keystore) do + @spec publish_aggregate(t(), Duties.attester_duty(), Types.slot()) :: + :ok + def publish_aggregate(%{index: validator_index, keystore: keystore}, duty, slot) do case Gossip.Attestation.stop_collecting(duty.subnet_id) do {:ok, attestations} -> - log_md = [slot: duty.slot, attestations: attestations] + log_md = [slot: slot, attestations: attestations] log_debug(validator_index, "publishing aggregate", log_md) aggregate_attestations(attestations) @@ -326,12 +164,11 @@ defmodule LambdaEthereumConsensus.Validator do %Types.SignedAggregateAndProof{message: aggregate_and_proof, signature: signature} end - defp produce_attestation(duty, head_root, privkey) do + defp produce_attestation(duty, slot, head_root, privkey) do %{ index_in_committee: index_in_committee, committee_length: committee_length, - committee_index: committee_index, - slot: slot + committee_index: committee_index } = duty head_state = BlockStates.get_state_info!(head_root).beacon_state |> go_to_slot(slot) @@ -368,41 +205,23 @@ defmodule LambdaEthereumConsensus.Validator do } end - defp go_to_slot(%{slot: old_slot} = state, slot) when old_slot == slot, do: state - - defp go_to_slot(%{slot: old_slot} = state, slot) when old_slot < slot do - {:ok, st} = StateTransition.process_slots(state, slot) - st - end - - defp go_to_slot(%{latest_block_header: %{parent_root: parent_root}}, slot) do - BlockStates.get_state_info!(parent_root).beacon_state |> go_to_slot(slot) - end - @spec fetch_validator_index(Types.BeaconState.t(), Bls.pubkey()) :: non_neg_integer() | nil defp fetch_validator_index(beacon, pubkey) do Enum.find_index(beacon.validators, &(&1.pubkey == pubkey)) end - defp proposer?(%{duties: %{proposer: slots}}, slot), do: Enum.member?(slots, slot) - - @spec maybe_build_payload(t(), Types.slot()) :: t() - defp maybe_build_payload(%{root: head_root} = state, proposed_slot) do - if proposer?(state, proposed_slot) do - start_payload_builder(state, proposed_slot, head_root) - else - state - end - end + ################################ + # Payload building and proposing @spec start_payload_builder(t(), Types.slot(), Types.root()) :: t() + def start_payload_builder(%{payload_builder: {slot, root, _}} = state, slot, root), do: state - defp start_payload_builder(%{payload_builder: {slot, root, _}} = state, slot, root), do: state - - defp start_payload_builder(%{index: validator_index} = state, proposed_slot, head_root) do + def start_payload_builder(%{index: validator_index} = state, proposed_slot, head_root) do # TODO: handle reorgs and late blocks - log_debug(validator_index, "starting building payload for slot #{proposed_slot}") + log_debug(validator_index, "starting building payload for slot #{proposed_slot}", + root: head_root + ) case BlockBuilder.start_building_payload(proposed_slot, head_root) do {:ok, payload_id} -> @@ -417,23 +236,16 @@ defmodule LambdaEthereumConsensus.Validator do end end - defp maybe_propose(state, slot) do - if proposer?(state, slot) do - propose(state, slot) - else - state - end - end - - defp propose( - %{ - root: head_root, - index: validator_index, - payload_builder: {proposed_slot, head_root, payload_id}, - keystore: keystore - } = state, - proposed_slot - ) do + @spec propose(t(), Types.slot(), Types.root()) :: t() + def propose( + %{ + index: validator_index, + payload_builder: {proposed_slot, head_root, payload_id}, + keystore: keystore + } = state, + proposed_slot, + head_root + ) do log_debug(validator_index, "building block", slot: proposed_slot) build_result = @@ -460,13 +272,12 @@ defmodule LambdaEthereumConsensus.Validator do %{state | payload_builder: nil} end - # TODO: at least in kurtosis there are blocks that are proposed without a payload apparently, must investigate. - defp propose(%{payload_builder: nil} = state, _proposed_slot) do + def propose(%{payload_builder: nil} = state, _proposed_slot, _head_root) do log_error(state.index, "propose block", "lack of execution payload") state end - defp propose(state, proposed_slot) do + def propose(state, proposed_slot, _head_root) do Logger.error( "[Validator] Skipping block proposal for slot #{proposed_slot} due to missing validator data" ) @@ -506,7 +317,8 @@ defmodule LambdaEthereumConsensus.Validator do rem(blob_index, ChainSpec.get("BLOB_SIDECAR_SUBNET_COUNT")) end - # Some Log Helpers to avoid repetition + ################################ + # Log Helpers defp log_info_result(result, index, message, metadata), do: log_result(result, :info, index, message, metadata) diff --git a/lib/lambda_ethereum_consensus/validator/validator_set.ex b/lib/lambda_ethereum_consensus/validator/validator_set.ex new file mode 100644 index 000000000..c06abb172 --- /dev/null +++ b/lib/lambda_ethereum_consensus/validator/validator_set.ex @@ -0,0 +1,295 @@ +defmodule LambdaEthereumConsensus.ValidatorSet do + @moduledoc """ + Module that holds the set of validators and their states, + it also manages the validator's duties as bitmaps to + simplify the delegation of work. + """ + + defstruct head_root: nil, duties: %{}, validators: %{} + + require Logger + + alias LambdaEthereumConsensus.StateTransition.Accessors + alias LambdaEthereumConsensus.StateTransition.Misc + alias LambdaEthereumConsensus.Validator + alias LambdaEthereumConsensus.Validator.Duties + + @type validators :: %{Validator.index() => Validator.t()} + + @type t :: %__MODULE__{ + head_root: Types.root() | nil, + duties: %{ + Types.epoch() => %{ + proposers: Duties.proposer_duties(), + attesters: Duties.attester_duties() + } + }, + validators: validators() + } + + @doc "Check if the duties for the given epoch are already computed." + defguard is_duties_computed(set, epoch) + when is_map(set.duties) and not is_nil(:erlang.map_get(epoch, set.duties)) + + @doc """ + Initiate the set of validators, given the slot and head root. + """ + @spec init(Types.slot(), Types.root()) :: t() + def init(slot, head_root) do + config = Application.get_env(:lambda_ethereum_consensus, __MODULE__, []) + keystore_dir = Keyword.get(config, :keystore_dir) + keystore_pass_dir = Keyword.get(config, :keystore_pass_dir) + + setup_validators(slot, head_root, keystore_dir, keystore_pass_dir) + end + + defp setup_validators(_s, _r, keystore_dir, keystore_pass_dir) + when is_nil(keystore_dir) or is_nil(keystore_pass_dir) do + Logger.warning( + "[Validator] No keystore_dir or keystore_pass_dir provided. Validators won't start." + ) + + %__MODULE__{} + end + + defp setup_validators(slot, head_root, keystore_dir, keystore_pass_dir) do + validator_keystores = decode_validator_keystores(keystore_dir, keystore_pass_dir) + epoch = Misc.compute_epoch_at_slot(slot) + beacon = Validator.fetch_target_state_and_go_to_slot(epoch, slot, head_root) + + validators = + Map.new(validator_keystores, fn keystore -> + validator = Validator.new(keystore, beacon) + {validator.index, validator} + end) + + Logger.info("[Validator] Initialized #{Enum.count(validators)} validators") + + %__MODULE__{validators: validators} + |> update_state(epoch, slot, head_root) + end + + @doc """ + Notify all validators of a new head. + """ + @spec notify_head(t(), Types.slot(), Types.root()) :: t() + def notify_head(%{validators: validators} = state, _slot, _head_root) when validators == %{}, + do: state + + def notify_head(set, slot, head_root) do + Logger.debug("[ValidatorSet] New Head", root: head_root, slot: slot) + epoch = Misc.compute_epoch_at_slot(slot) + + # TODO: this doesn't take into account reorgs + set + |> update_state(epoch, slot, head_root) + |> maybe_attests(epoch, slot, head_root) + |> maybe_build_payload(slot + 1, head_root) + end + + @doc """ + Notify all validators of a new tick. + """ + @spec notify_tick(t(), tuple()) :: t() + def notify_tick(%{validators: validators} = state, _slot_data) when validators == %{}, + do: state + + def notify_tick(%{head_root: head_root} = set, {slot, third} = slot_data) do + Logger.debug("[ValidatorSet] Tick #{inspect(third)}", root: head_root, slot: slot) + epoch = Misc.compute_epoch_at_slot(slot) + + set + |> update_state(epoch, slot, head_root) + |> process_tick(epoch, slot_data) + end + + defp process_tick(%{head_root: head_root} = set, epoch, {slot, :first_third}) do + maybe_propose(set, epoch, slot, head_root) + end + + defp process_tick(%{head_root: head_root} = set, epoch, {slot, :second_third}) do + set + |> maybe_attests(epoch, slot, head_root) + |> maybe_build_payload(slot + 1, head_root) + end + + defp process_tick(set, epoch, {slot, :last_third}) do + maybe_publish_aggregates(set, epoch, slot) + end + + ############################## + # State update + + defp update_state(set, epoch, slot, head_root) do + set + |> update_head(head_root) + |> compute_duties(epoch, slot, head_root) + end + + defp update_head(%{head_root: head_root} = set, head_root), do: set + defp update_head(set, head_root), do: %{set | head_root: head_root} + + defp compute_duties(set, epoch, _slot, _head_root) + when is_duties_computed(set, epoch) and is_duties_computed(set, epoch + 1), + do: set + + defp compute_duties(set, epoch, slot, head_root) do + epochs_to_calculate = + [{epoch, slot}, {epoch + 1, Misc.compute_start_slot_at_epoch(epoch + 1)}] + |> Enum.reject(&Map.has_key?(set.duties, elem(&1, 0))) + + epochs_to_calculate + |> Map.new(&compute_duties_for_epoch!(set, &1, head_root)) + |> merge_duties_and_prune(epoch, set) + end + + defp compute_duties_for_epoch!(set, {epoch, slot}, head_root) do + beacon = Validator.fetch_target_state_and_go_to_slot(epoch, slot, head_root) + # If committees are not already calculated for the epoch, this is way faster than + # calculating them on the fly. + Accessors.maybe_prefetch_committees(beacon, epoch) + + duties = %{ + proposers: Duties.compute_proposers_for_epoch(beacon, epoch, set.validators), + attesters: Duties.compute_attesters_for_epoch(beacon, epoch, set.validators) + } + + Duties.log_duties_for_epoch(duties, epoch) + + {epoch, duties} + end + + defp merge_duties_and_prune(new_duties, current_epoch, set) do + set.duties + # Remove duties from epoch - 2 or older + |> Map.reject(fn {old_epoch, _} -> old_epoch < current_epoch - 1 end) + |> Map.merge(new_duties) + |> then(fn current_duties -> %{set | duties: current_duties} end) + end + + ############################## + # Block proposal + + defp maybe_build_payload(%{validators: validators} = set, slot, head_root) do + # We calculate payloads from a previous slot, we need to recompute the epoch + epoch = Misc.compute_epoch_at_slot(slot) + + case Duties.current_proposer(set.duties, epoch, slot) do + nil -> + set + + validator_index -> + validators + |> Map.update!(validator_index, &Validator.start_payload_builder(&1, slot, head_root)) + |> update_validators(set) + end + end + + defp maybe_propose(%{validators: validators} = set, epoch, slot, head_root) do + case Duties.current_proposer(set.duties, epoch, slot) do + nil -> + set + + validator_index -> + validators + |> Map.update!(validator_index, &Validator.propose(&1, slot, head_root)) + |> update_validators(set) + end + end + + defp update_validators(new_validators, set), do: %{set | validators: new_validators} + + ############################## + # Attestation + + defp maybe_attests(set, epoch, slot, head_root) do + case Duties.current_attesters(set.duties, epoch, slot) do + [] -> + set + + attester_duties -> + attester_duties + |> Enum.map(&attest(&1, slot, head_root, set.validators)) + |> update_duties(set, epoch, :attesters, slot) + end + end + + defp maybe_publish_aggregates(set, epoch, slot) do + case Duties.current_aggregators(set.duties, epoch, slot) do + [] -> + set + + aggregator_duties -> + aggregator_duties + |> Enum.map(&publish_aggregate(&1, slot, set.validators)) + |> update_duties(set, epoch, :attesters, slot) + end + end + + defp attest(duty, slot, head_root, validators) do + validators + |> Map.get(duty.validator_index) + |> Validator.attest(duty, slot, head_root) + + Duties.attested(duty) + end + + defp publish_aggregate(duty, slot, validators) do + validators + |> Map.get(duty.validator_index) + |> Validator.publish_aggregate(duty, slot) + + Duties.aggregated(duty) + end + + defp update_duties(new_duties, set, epoch, kind, slot) do + set.duties + |> Duties.update_duties!(kind, epoch, slot, new_duties) + |> then(&%{set | duties: &1}) + end + + ############################## + # Key management + + @doc """ + Get validator keystores from the keystore directory. + This function expects two files for each validator: + - /.json + - /.txt + """ + @spec decode_validator_keystores(binary(), binary()) :: + list(Keystore.t()) + def decode_validator_keystores(keystore_dir, keystore_pass_dir) + when is_binary(keystore_dir) and is_binary(keystore_pass_dir) do + keystore_dir + |> File.ls!() + |> Enum.flat_map(&paths_from_filename(keystore_dir, keystore_pass_dir, &1, Path.extname(&1))) + |> Enum.flat_map(&decode_key/1) + end + + defp paths_from_filename(keystore_dir, keystore_pass_dir, filename, ".json") do + basename = Path.basename(filename, ".json") + + keystore_file = Path.join(keystore_dir, "#{basename}.json") + keystore_pass_file = Path.join(keystore_pass_dir, "#{basename}.txt") + + [{keystore_file, keystore_pass_file}] + end + + defp paths_from_filename(_keystore_dir, _keystore_pass_dir, basename, _ext) do + Logger.warning("[Validator] Skipping file: #{basename}. Not a json keystore file.") + [] + end + + defp decode_key({keystore_file, keystore_pass_file}) do + # TODO: remove `try` and handle errors properly + [Keystore.decode_from_files!(keystore_file, keystore_pass_file)] + rescue + error -> + Logger.error( + "[Validator] Failed to decode keystore file: #{keystore_file}. Pass file: #{keystore_pass_file} Error: #{inspect(error)}" + ) + + [] + end +end diff --git a/lib/libp2p_port.ex b/lib/libp2p_port.ex index 48fb52651..75b4f5a74 100644 --- a/lib/libp2p_port.ex +++ b/lib/libp2p_port.ex @@ -23,6 +23,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do alias LambdaEthereumConsensus.StateTransition.Misc alias LambdaEthereumConsensus.Utils.BitVector alias LambdaEthereumConsensus.Validator + alias LambdaEthereumConsensus.ValidatorSet alias Libp2pProto.AddPeer alias Libp2pProto.Command alias Libp2pProto.Enr @@ -65,7 +66,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do @type init_arg :: {:genesis_time, Types.uint64()} - | {:validators, %{}} + | {:validator_set, ValidatorSet.t()} | {:listen_addr, [String.t()]} | {:enable_discovery, boolean()} | {:discovery_addr, String.t()} @@ -391,7 +392,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do @impl GenServer def init(args) do {genesis_time, args} = Keyword.pop!(args, :genesis_time) - {validators, args} = Keyword.pop(args, :validators, %{}) + {validator_set, args} = Keyword.pop(args, :validator_set, %{}) {join_init_topics, args} = Keyword.pop(args, :join_init_topics, false) {enable_request_handlers, args} = Keyword.pop(args, :enable_request_handlers, false) {store, args} = Keyword.pop!(args, :store) @@ -420,7 +421,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do {:ok, %{ genesis_time: genesis_time, - validators: validators, + validator_set: validator_set, slot_data: nil, port: port, subscribers: %{}, @@ -523,11 +524,11 @@ defmodule LambdaEthereumConsensus.Libp2pPort do end @impl GenServer - def handle_info({:new_head, slot, head_root}, %{validators: validators} = state) do - updated_validators = - Validator.Setup.notify_validators(validators, {:new_head, slot, head_root}) + def handle_info({:new_head, slot, head_root}, %{validator_set: validator_set} = state) do + updated_validator_set = + ValidatorSet.notify_head(validator_set, slot, head_root) - {:noreply, %{state | validators: updated_validators}} + {:noreply, %{state | validator_set: updated_validator_set}} end @impl GenServer @@ -549,46 +550,42 @@ defmodule LambdaEthereumConsensus.Libp2pPort do end @impl GenServer - def handle_call(:get_keystores, _from, %{validators: validators} = state), - do: {:reply, Enum.map(validators, fn {_pubkey, validator} -> validator.keystore end), state} + def handle_call(:get_keystores, _from, %{validator_set: validator_set} = state), + do: + {:reply, + Enum.map(validator_set.validators, fn {_index, validator} -> validator.keystore end), + state} @impl GenServer - def handle_call({:delete_validator, pubkey}, _from, %{validators: validators} = state) do - case Map.fetch(validators, pubkey) do - {:ok, validator} -> - Logger.warning("[Libp2pPort] Deleting validator with index #{inspect(validator.index)}.") - - {:reply, :ok, %{state | validators: Map.delete(validators, pubkey)}} - - :error -> + def handle_call({:delete_validator, pubkey}, _from, %{validator_set: validator_set} = state) do + validator_set.validators + |> Enum.find(fn {_index, validator} -> validator.keystore.pubkey == pubkey end) + |> case do + {index, _validator} -> + Logger.warning("[Libp2pPort] Deleting validator with index #{inspect(index)}.") + updated_validators = Map.delete(validator_set.validators, index) + {:reply, :ok, Map.put(state.validator_set, :validators, updated_validators)} + + _ -> {:error, "Pubkey #{inspect(pubkey)} not found."} end end @impl GenServer def handle_call( - {:add_validator, %Keystore{pubkey: pubkey} = keystore}, + {:add_validator, keystore}, _from, - %{validators: validators} = state + %{validator_set: %{head_root: head_root}, slot_data: {slot, _third}} = + state ) do # TODO (#1263): handle 0 validators - first_validator = validators |> Map.values() |> List.first() - validator = Validator.new({first_validator.slot, first_validator.root, keystore}) + validator = Validator.new(keystore, slot, head_root) Logger.warning( - "[Libp2pPort] Adding validator with index #{inspect(validator.index)}. head_slot: #{inspect(validator.slot)}." + "[Libp2pPort] Adding validator with index #{inspect(validator.index)}. head_slot: #{inspect(slot)}." ) - {:reply, :ok, - %{ - state - | validators: - Map.put( - validators, - pubkey, - validator - ) - }} + {:reply, :ok, put_in(state.validator_set, [:validators, validator.index], validator)} end ###################### @@ -801,10 +798,10 @@ defmodule LambdaEthereumConsensus.Libp2pPort do if slot_data == new_slot_data do state else - updated_validators = - Validator.Setup.notify_validators(state.validators, {:on_tick, new_slot_data}) + updated_validator_set = + ValidatorSet.notify_tick(state.validator_set, new_slot_data) - %{state | slot_data: new_slot_data, validators: updated_validators} + %{state | slot_data: new_slot_data, validator_set: updated_validator_set} end maybe_log_new_slot(slot_data, new_slot_data) diff --git a/test/unit/beacon_api/beacon_api_v1_test.exs b/test/unit/beacon_api/beacon_api_v1_test.exs index e3b5d9a40..3a9fbfe50 100644 --- a/test/unit/beacon_api/beacon_api_v1_test.exs +++ b/test/unit/beacon_api/beacon_api_v1_test.exs @@ -160,7 +160,7 @@ defmodule Unit.BeaconApiTest.V1 do alias LambdaEthereumConsensus.P2P.Metadata patch(ForkChoice, :get_fork_version, fn -> ChainSpec.get("DENEB_FORK_VERSION") end) - start_link_supervised!({Libp2pPort, genesis_time: 42, store: %Store{}}) + start_link_supervised!({Libp2pPort, genesis_time: :os.system_time(:second), store: %Store{}}) Metadata.init() identity = Libp2pPort.get_node_identity() metadata = Metadata.get_metadata() diff --git a/test/unit/libp2p_port_test.exs b/test/unit/libp2p_port_test.exs index e039f1c5e..0ebc920c6 100644 --- a/test/unit/libp2p_port_test.exs +++ b/test/unit/libp2p_port_test.exs @@ -19,7 +19,8 @@ defmodule Unit.Libp2pPortTest do defp start_port(name \\ Libp2pPort, init_args \\ []) do start_link_supervised!( - {Libp2pPort, [opts: [name: name], store: %Store{}, genesis_time: 42] ++ init_args}, + {Libp2pPort, + [opts: [name: name], store: %Store{}, genesis_time: :os.system_time(:second)] ++ init_args}, id: name ) end