From 881e7610d99f075ddbf5fac10e45126818e8cc99 Mon Sep 17 00:00:00 2001 From: Rodrigo Oliveri Date: Mon, 30 Sep 2024 11:51:15 -0300 Subject: [PATCH 01/17] initial test --- Makefile | 28 ++++++++++++++++--- .../beacon/checkpoint_sync.ex | 13 ++++++++- network_params.yaml | 8 +++--- 3 files changed, 40 insertions(+), 9 deletions(-) diff --git a/Makefile b/Makefile index dcea8f9f3..c01e9b78e 100644 --- a/Makefile +++ b/Makefile @@ -162,21 +162,41 @@ iex: compile-all test-iex: MIX_ENV=test iex -S mix run -- --mode db +################## +# NODE RUNNERS +DYSCOVERY_PORT ?= 30303 + #▶️ checkpoint-sync: @ Run an interactive terminal using checkpoint sync. checkpoint-sync: compile-all - iex -S mix run -- --checkpoint-sync-url https://mainnet-checkpoint-sync.stakely.io/ --metrics + iex -S mix run -- --checkpoint-sync-url https://mainnet-checkpoint-sync.stakely.io/ --metrics --discover-port $(DYSCOVERY_PORT) + +#▶️ checkpoint-sync.logfile: @ Run an interactive terminal using checkpoint sync with a log file. +checkpoint-sync.logfile: compile-all + iex -S mix run -- --checkpoint-sync-url https://mainnet-checkpoint-sync.stakely.io/ --metrics --log-file ./logs/mainnet.log #▶️ sepolia: @ Run an interactive terminal using sepolia network sepolia: compile-all - iex -S mix run -- --checkpoint-sync-url https://sepolia.beaconstate.info --network sepolia --metrics + iex -S mix run -- --checkpoint-sync-url https://sepolia.beaconstate.info --network sepolia --metrics --discover-port $(DYSCOVERY_PORT) + +#▶️ sepolia.logfile: @ Run an interactive terminal using sepolia network with a log file +sepolia.logfile: compile-all + iex -S mix run -- --checkpoint-sync-url https://sepolia.beaconstate.info --network sepolia --metrics --log-file ./logs/sepolia.log #▶️ holesky: @ Run an interactive terminal using holesky network holesky: compile-all - iex -S mix run -- --checkpoint-sync-url https://checkpoint-sync.holesky.ethpandaops.io --network holesky + iex -S mix run -- --checkpoint-sync-url https://checkpoint-sync.holesky.ethpandaops.io --network holesky --discover-port $(DYSCOVERY_PORT) + +#▶️ holesky.logfile: @ Run an interactive terminal using holesky network with a log file +holesky.logfile: compile-all + iex -S mix run -- --checkpoint-sync-url https://checkpoint-sync.holesky.ethpandaops.io --network holesky --log-file ./logs/holesky.log #▶️ gnosis: @ Run an interactive terminal using gnosis network gnosis: compile-all - iex -S mix run -- --checkpoint-sync-url https://checkpoint.gnosischain.com --network gnosis + iex -S mix run -- --checkpoint-sync-url https://checkpoint.gnosischain.com --network gnosis --discover-port $(DYSCOVERY_PORT) + +#▶️ gnosis.logfile: @ Run an interactive terminal using gnosis network with a log file +gnosis.logfile: compile-all + iex -S mix run -- --checkpoint-sync-url https://checkpoint.gnosischain.com --network gnosis --log-file ./logs/gnosis.log --discover-port $(DYSCOVERY_PORT) #🔴 test: @ Run tests test: compile-all diff --git a/lib/lambda_ethereum_consensus/beacon/checkpoint_sync.ex b/lib/lambda_ethereum_consensus/beacon/checkpoint_sync.ex index 0e27b2c3f..b22cacfaa 100644 --- a/lib/lambda_ethereum_consensus/beacon/checkpoint_sync.ex +++ b/lib/lambda_ethereum_consensus/beacon/checkpoint_sync.ex @@ -19,7 +19,7 @@ defmodule LambdaEthereumConsensus.Beacon.CheckpointSync do def get_finalized_block_and_state(url, genesis_validators_root) do tasks = [Task.async(__MODULE__, :get_state, [url]), Task.async(__MODULE__, :get_block, [url])] - case Task.await_many(tasks, 60_000) do + case Task.await_many(tasks, 600_000) do [{:ok, state}, {:ok, block}] -> if state.genesis_validators_root == genesis_validators_root do check_match(url, state, block) @@ -47,11 +47,17 @@ defmodule LambdaEthereumConsensus.Beacon.CheckpointSync do """ @spec get_state(String.t()) :: {:ok, BeaconState.t()} | {:error, any()} def get_state(url) do + start_time = System.monotonic_time() with {:error, err} <- get_ssz_from_url(url, "/eth/v2/debug/beacon/states/finalized", BeaconState) do Logger.error("There has been an error retrieving the last finalized state") {:error, err} + else + {:ok, state} -> + Logger.info("Retrieved the last finalized state in #{(System.monotonic_time() - start_time) / 1_000_000_000} s") + {:ok, state} end + end @doc """ @@ -59,10 +65,15 @@ defmodule LambdaEthereumConsensus.Beacon.CheckpointSync do """ @spec get_block(String.t()) :: {:ok, SignedBeaconBlock.t()} | {:error, any()} def get_block(url, id \\ "finalized") do + start_time = System.monotonic_time() with {:error, err} <- get_ssz_from_url(url, "/eth/v2/beacon/blocks/#{id}", SignedBeaconBlock) do Logger.error("There has been an error retrieving the last finalized block") {:error, err} + else + {:ok, block} -> + Logger.info("Retrieved the last finalized block in #{(System.monotonic_time() - start_time) / 1_000_000_000} s") + {:ok, block} end end diff --git a/network_params.yaml b/network_params.yaml index 76f0325e1..5303f6e21 100644 --- a/network_params.yaml +++ b/network_params.yaml @@ -2,14 +2,14 @@ participants: - el_type: geth cl_type: lighthouse count: 2 - validator_count: 32 + validator_count: 5 - el_type: geth cl_type: lambda cl_image: lambda_ethereum_consensus:latest use_separate_vc: false count: 1 - validator_count: 32 + validator_count: 54 cl_max_mem: 4096 keymanager_enabled: true -network_params: - preset: minimal +# network_params: +# preset: minimal From d073ba9e32a060fd660fdfa6bf59b3393eb41d43 Mon Sep 17 00:00:00 2001 From: Rodrigo Oliveri Date: Mon, 30 Sep 2024 20:31:44 -0300 Subject: [PATCH 02/17] Fixed an issue regarding discovery port --- Makefile | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/Makefile b/Makefile index 84fd0c15c..5e073d07f 100644 --- a/Makefile +++ b/Makefile @@ -168,7 +168,7 @@ DYSCOVERY_PORT ?= 30303 #▶️ checkpoint-sync: @ Run an interactive terminal using checkpoint sync. checkpoint-sync: compile-all - iex -S mix run -- --checkpoint-sync-url https://mainnet-checkpoint-sync.stakely.io/ --metrics --discover-port $(DYSCOVERY_PORT) + iex -S mix run -- --checkpoint-sync-url https://mainnet-checkpoint-sync.stakely.io/ --metrics --discovery-port $(DYSCOVERY_PORT) #▶️ checkpoint-sync.logfile: @ Run an interactive terminal using checkpoint sync with a log file. checkpoint-sync.logfile: compile-all @@ -180,7 +180,7 @@ checkpoint-sync.logfile: compile-all #▶️ sepolia: @ Run an interactive terminal using sepolia network sepolia: compile-all - iex -S mix run -- --checkpoint-sync-url https://sepolia.beaconstate.info --network sepolia --metrics --discover-port $(DYSCOVERY_PORT) + iex -S mix run -- --checkpoint-sync-url https://sepolia.beaconstate.info --network sepolia --metrics --discovery-port $(DYSCOVERY_PORT) #▶️ sepolia.logfile: @ Run an interactive terminal using sepolia network with a log file sepolia.logfile: compile-all @@ -192,7 +192,7 @@ sepolia.logfile: compile-all #▶️ holesky: @ Run an interactive terminal using holesky network holesky: compile-all - iex -S mix run -- --checkpoint-sync-url https://checkpoint-sync.holesky.ethpandaops.io --network holesky --discover-port $(DYSCOVERY_PORT) + iex -S mix run -- --checkpoint-sync-url https://checkpoint-sync.holesky.ethpandaops.io --network holesky --discovery-port $(DYSCOVERY_PORT) #▶️ holesky.logfile: @ Run an interactive terminal using holesky network with a log file holesky.logfile: compile-all @@ -204,11 +204,11 @@ holesky.logfile: compile-all #▶️ gnosis: @ Run an interactive terminal using gnosis network gnosis: compile-all - iex -S mix run -- --checkpoint-sync-url https://checkpoint.gnosischain.com --network gnosis --discover-port $(DYSCOVERY_PORT) + iex -S mix run -- --checkpoint-sync-url https://checkpoint.gnosischain.com --network gnosis --discovery-port $(DYSCOVERY_PORT) #▶️ gnosis.logfile: @ Run an interactive terminal using gnosis network with a log file gnosis.logfile: compile-all - iex -S mix run -- --checkpoint-sync-url https://checkpoint.gnosischain.com --network gnosis --log-file ./logs/gnosis.log --discover-port $(DYSCOVERY_PORT) + iex -S mix run -- --checkpoint-sync-url https://checkpoint.gnosischain.com --network gnosis --log-file ./logs/gnosis.log --discovery-port $(DYSCOVERY_PORT) #🔴 test: @ Run tests test: compile-all From d6df99d05928b61f5e021c5d40dc0ed591365a61 Mon Sep 17 00:00:00 2001 From: Rodrigo Oliveri Date: Mon, 30 Sep 2024 20:33:24 -0300 Subject: [PATCH 03/17] least amount of changes that appear to stop the pruning errors --- .../fork_choice/fork_choice.ex | 11 +++++++---- lib/lambda_ethereum_consensus/store/state_db.ex | 1 + 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex b/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex index ad0ee36ad..ad3486af1 100644 --- a/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex +++ b/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex @@ -104,10 +104,10 @@ defmodule LambdaEthereumConsensus.ForkChoice do @spec on_tick(Store.t(), Types.uint64()) :: Store.t() def on_tick(store, time) do - %Store{finalized_checkpoint: last_finalized_checkpoint} = store + %Store{finalized_checkpoint: _last_finalized_checkpoint} = store Handlers.on_tick(store, time) - |> prune_old_states(last_finalized_checkpoint.epoch) + # |> prune_old_states(last_finalized_checkpoint.epoch) |> tap(&StoreDb.persist_store/1) end @@ -173,6 +173,7 @@ defmodule LambdaEthereumConsensus.ForkChoice do new_finalized_epoch = store.finalized_checkpoint.epoch if last_finalized_epoch < new_finalized_epoch do + Logger.info("Pruning states before slot #{new_finalized_epoch}") new_finalized_slot = @@ -192,9 +193,11 @@ defmodule LambdaEthereumConsensus.ForkChoice do PruneBlobsSupervisor, fn -> BlobDb.prune_old_blobs(new_finalized_slot) end ) - end - Store.prune(store) + Store.prune(store) + else + store + end end def apply_handler(iter, state, handler) do diff --git a/lib/lambda_ethereum_consensus/store/state_db.ex b/lib/lambda_ethereum_consensus/store/state_db.ex index e13cab075..bd07d9118 100644 --- a/lib/lambda_ethereum_consensus/store/state_db.ex +++ b/lib/lambda_ethereum_consensus/store/state_db.ex @@ -62,6 +62,7 @@ defmodule LambdaEthereumConsensus.Store.StateDb do result = BlockRootBySlot.fold_keys(slot, 0, fn slot, acc -> + Logger.info("[StateDb] Pruning state for slot #{slot}.") case BlockRootBySlot.get(slot) do {:ok, _block_root} -> remove_state_by_slot(slot) From 26f41cb427560063dada1db61dfd915dbf47b67e Mon Sep 17 00:00:00 2001 From: Rodrigo Oliveri Date: Mon, 30 Sep 2024 20:55:06 -0300 Subject: [PATCH 04/17] removed the pruning completely for testing --- lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex b/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex index ad3486af1..217d00763 100644 --- a/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex +++ b/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex @@ -46,7 +46,7 @@ defmodule LambdaEthereumConsensus.ForkChoice do Logger.info("[Fork choice] Adding new block", root: block_info.root, slot: slot) - %Store{finalized_checkpoint: last_finalized_checkpoint} = store + %Store{finalized_checkpoint: _last_finalized_checkpoint} = store result = :telemetry.span([:sync, :on_block], %{}, fn -> @@ -61,7 +61,7 @@ defmodule LambdaEthereumConsensus.ForkChoice do :telemetry.span([:fork_choice, :recompute_head], %{}, fn -> {recompute_head(new_store), %{}} end) - |> prune_old_states(last_finalized_checkpoint.epoch) + # |> prune_old_states(last_finalized_checkpoint.epoch) |> tap(fn store -> StoreDb.persist_store(store) Logger.info("[Fork choice] Added new block", slot: slot, root: block_root) From 61d92bff4ac6ef30276fb98f0388f390c1513ffc Mon Sep 17 00:00:00 2001 From: Rodrigo Oliveri Date: Mon, 30 Sep 2024 21:38:42 -0300 Subject: [PATCH 05/17] final test for the day --- lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex | 8 ++++---- lib/lambda_ethereum_consensus/fork_choice/head.ex | 2 +- lib/lambda_ethereum_consensus/fork_choice/simple_tree.ex | 2 +- lib/types/store.ex | 7 +++++-- 4 files changed, 11 insertions(+), 8 deletions(-) diff --git a/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex b/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex index 217d00763..e0cfc97b5 100644 --- a/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex +++ b/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex @@ -46,7 +46,7 @@ defmodule LambdaEthereumConsensus.ForkChoice do Logger.info("[Fork choice] Adding new block", root: block_info.root, slot: slot) - %Store{finalized_checkpoint: _last_finalized_checkpoint} = store + %Store{finalized_checkpoint: last_finalized_checkpoint} = store result = :telemetry.span([:sync, :on_block], %{}, fn -> @@ -61,7 +61,7 @@ defmodule LambdaEthereumConsensus.ForkChoice do :telemetry.span([:fork_choice, :recompute_head], %{}, fn -> {recompute_head(new_store), %{}} end) - # |> prune_old_states(last_finalized_checkpoint.epoch) + |> prune_old_states(last_finalized_checkpoint.epoch) |> tap(fn store -> StoreDb.persist_store(store) Logger.info("[Fork choice] Added new block", slot: slot, root: block_root) @@ -104,10 +104,10 @@ defmodule LambdaEthereumConsensus.ForkChoice do @spec on_tick(Store.t(), Types.uint64()) :: Store.t() def on_tick(store, time) do - %Store{finalized_checkpoint: _last_finalized_checkpoint} = store + %Store{finalized_checkpoint: last_finalized_checkpoint} = store Handlers.on_tick(store, time) - # |> prune_old_states(last_finalized_checkpoint.epoch) + |> prune_old_states(last_finalized_checkpoint.epoch) |> tap(&StoreDb.persist_store/1) end diff --git a/lib/lambda_ethereum_consensus/fork_choice/head.ex b/lib/lambda_ethereum_consensus/fork_choice/head.ex index 9114bc0e8..62d0e2836 100644 --- a/lib/lambda_ethereum_consensus/fork_choice/head.ex +++ b/lib/lambda_ethereum_consensus/fork_choice/head.ex @@ -101,7 +101,7 @@ defmodule LambdaEthereumConsensus.ForkChoice.Head do {true, Map.put(new_blocks, block_root, block)} not Enum.empty?(children) -> - {false, new_blocks} + {false, blocks} true -> filter_leaf_block(store, block_root, block, blocks) diff --git a/lib/lambda_ethereum_consensus/fork_choice/simple_tree.ex b/lib/lambda_ethereum_consensus/fork_choice/simple_tree.ex index 569414541..5fedf768a 100644 --- a/lib/lambda_ethereum_consensus/fork_choice/simple_tree.ex +++ b/lib/lambda_ethereum_consensus/fork_choice/simple_tree.ex @@ -82,7 +82,7 @@ defmodule LambdaEthereumConsensus.ForkChoice.Simple.Tree do def get_children!(tree, parent_id) do case get_children(tree, parent_id) do {:error, :not_found} -> raise "Parent #{Base.encode16(parent_id)} not found in tree" - {:ok, res} -> res + {:ok, res} -> res |> IO.inspect(label: "Children of #{Base.encode16(parent_id)}") end end diff --git a/lib/types/store.ex b/lib/types/store.ex index 736684034..00276ee62 100644 --- a/lib/types/store.ex +++ b/lib/types/store.ex @@ -144,8 +144,11 @@ defmodule Types.Store do @spec get_children(t(), Types.root()) :: [BeaconBlock.t()] def get_children(%__MODULE__{tree_cache: tree}, parent_root) do - Tree.get_children!(tree, parent_root) - |> Enum.map(&{&1, Blocks.get_block!(&1)}) + Tree.get_children(tree, parent_root) + |> case do + {:ok, children} -> Enum.map(children, &{&1, Blocks.get_block!(&1)}) + {:error, :not_found} -> [] + end end @spec store_block_info(t(), BlockInfo.t()) :: t() From e9fc927e40e9c4513c1fa0f87e9d7c51f7872ba4 Mon Sep 17 00:00:00 2001 From: Rodrigo Oliveri Date: Tue, 1 Oct 2024 11:59:03 -0300 Subject: [PATCH 06/17] Initial workarround for the hang of the sync --- .../p2p/blob_downloader.ex | 8 +++++-- .../p2p/block_downloader.ex | 8 +++++-- lib/lambda_ethereum_consensus/p2p/peerbook.ex | 22 ++++++++++++++++++- 3 files changed, 33 insertions(+), 5 deletions(-) diff --git a/lib/lambda_ethereum_consensus/p2p/blob_downloader.ex b/lib/lambda_ethereum_consensus/p2p/blob_downloader.ex index 4493a4fc0..81877f045 100644 --- a/lib/lambda_ethereum_consensus/p2p/blob_downloader.ex +++ b/lib/lambda_ethereum_consensus/p2p/blob_downloader.ex @@ -32,7 +32,7 @@ defmodule LambdaEthereumConsensus.P2P.BlobDownloader do def request_blobs_by_range(slot, count, on_blobs, retries) do Logger.debug("Requesting blobs", slot: slot) - # TODO: handle no-peers asynchronously? + # FIXME: handle no-peers asynchronously! this is hanging Libp2pPort when there are no peers peer_id = get_some_peer() # NOTE: BeaconBlocksByRangeRequest == BlobSidecarsByRangeRequest @@ -62,7 +62,8 @@ defmodule LambdaEthereumConsensus.P2P.BlobDownloader do P2P.Peerbook.penalize_peer(peer_id) if retries > 0 do - Logger.debug("Retrying request for #{count} blobs", slot: slot) + Logger.info("Retrying request for #{count} blobs, reason: #{inspect(reason)} in 2 second", slot: slot) + Process.sleep(2000) request_blobs_by_range(slot, count, on_blobs, retries - 1) {:ok, store} else @@ -123,6 +124,9 @@ defmodule LambdaEthereumConsensus.P2P.BlobDownloader do defp get_some_peer() do case P2P.Peerbook.get_some_peer() do nil -> + stacktrace = Process.info(self(), :current_stacktrace) + IO.inspect(stacktrace, label: "Current stacktrace") + Process.sleep(100) get_some_peer() diff --git a/lib/lambda_ethereum_consensus/p2p/block_downloader.ex b/lib/lambda_ethereum_consensus/p2p/block_downloader.ex index 6522fa84a..6438ccfd1 100644 --- a/lib/lambda_ethereum_consensus/p2p/block_downloader.ex +++ b/lib/lambda_ethereum_consensus/p2p/block_downloader.ex @@ -66,7 +66,7 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do def request_blocks_by_range(slot, count, on_blocks, retries) do Logger.debug("Requesting block", slot: slot) - # TODO: handle no-peers asynchronously? + # FIXME: handle no-peers asynchronously! this is hanging Libp2pPort when there are no peers peer_id = get_some_peer() request = @@ -173,7 +173,8 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do if retries > 0 do :telemetry.execute([:network, :request], %{blocks: 0}, Map.put(tags, :result, "retry")) pretty_roots = Enum.map_join(roots, ", ", &Base.encode16/1) - Logger.debug("Retrying request for blocks with roots #{pretty_roots}") + Logger.debug("Retrying request (reason: #{inspect(reason)}) for blocks with roots #{pretty_roots}, in 2 second") + Process.sleep(2000) request_blocks_by_root(roots, on_blocks, retries - 1) {:ok, store} else @@ -186,6 +187,9 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do defp get_some_peer() do case P2P.Peerbook.get_some_peer() do nil -> + stacktrace = Process.info(self(), :current_stacktrace) + IO.inspect(stacktrace, label: "Current stacktrace") + Process.sleep(100) get_some_peer() diff --git a/lib/lambda_ethereum_consensus/p2p/peerbook.ex b/lib/lambda_ethereum_consensus/p2p/peerbook.ex index 96b9c34a9..785e198fb 100644 --- a/lib/lambda_ethereum_consensus/p2p/peerbook.ex +++ b/lib/lambda_ethereum_consensus/p2p/peerbook.ex @@ -2,10 +2,12 @@ defmodule LambdaEthereumConsensus.P2P.Peerbook do @moduledoc """ General peer bookkeeping. """ + require Logger alias LambdaEthereumConsensus.Libp2pPort alias LambdaEthereumConsensus.Store.KvSchema @initial_score 100 + @penalize 2 @target_peers 128 @max_prune_size 8 @prune_percentage 0.05 @@ -53,11 +55,29 @@ defmodule LambdaEthereumConsensus.P2P.Peerbook do end def penalize_peer(peer_id) do - fetch_peerbook!() |> Map.delete(peer_id) |> store_peerbook() + Logger.debug("Penalizing peer: #{inspect(LambdaEthereumConsensus.Utils.format_shorten_binary(peer_id))}") + peer_score = fetch_peerbook!() |> Map.get(peer_id) + + case peer_score do + nil -> + :ok + + score when score - @penalize <= 0 -> + Logger.info("Removing peer: #{inspect(LambdaEthereumConsensus.Utils.format_shorten_binary(peer_id))}") + fetch_peerbook!() + |> Map.delete(peer_id) + |> store_peerbook() + + score -> + fetch_peerbook!() + |> Map.put(peer_id, score - @penalize) + |> store_peerbook() + end end def handle_new_peer(peer_id) do peerbook = fetch_peerbook!() + Logger.debug("New peer connected: #{inspect(LambdaEthereumConsensus.Utils.format_shorten_binary(peer_id))}") if not Map.has_key?(peerbook, peer_id) do :telemetry.execute([:peers, :connection], %{id: peer_id}, %{result: "success"}) From 95ed55b1ccf2ebc4eb5f8fb55a5e7f951710d50a Mon Sep 17 00:00:00 2001 From: Rodrigo Oliveri Date: Tue, 1 Oct 2024 17:08:25 -0300 Subject: [PATCH 07/17] Unified current slot calculation and minor fixes --- .../fork_choice/fork_choice.ex | 39 ++++++++++++++++--- .../fork_choice/handlers.ex | 18 +++++---- .../p2p/gossip/beacon_block.ex | 11 +++--- lib/lambda_ethereum_consensus/p2p/peerbook.ex | 11 ++++-- lib/types/store.ex | 17 ++++---- 5 files changed, 65 insertions(+), 31 deletions(-) diff --git a/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex b/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex index e0cfc97b5..a4fdd322c 100644 --- a/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex +++ b/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex @@ -31,7 +31,7 @@ defmodule LambdaEthereumConsensus.ForkChoice do store = Handlers.on_tick(store, time) - :telemetry.execute([:sync, :store], %{slot: Store.get_current_slot(store)}) + :telemetry.execute([:sync, :store], %{slot: get_current_slot(store)}) :telemetry.execute([:sync, :on_block], %{slot: head_slot}) Metrics.block_status(head_root, head_slot, :transitioned) @@ -111,6 +111,17 @@ defmodule LambdaEthereumConsensus.ForkChoice do |> tap(&StoreDb.persist_store/1) end + @spec get_current_slot(Types.Store.t()) :: Types.slot() + def get_current_slot(%Types.Store{} = store), + do: compute_current_slot(store.time, store.genesis_time) + + @spec get_current_slot(Types.uint64(), Types.uint64()) :: Types.slot() + def get_current_slot(time, genesis_time), + do: compute_current_slot(time, genesis_time) + + # TODO: Some parts of the code calculate the current slot using the previous function given a time + # specifically from the store (this was previously in the Store type module). This one calculates + # it using the system time, we might need to unify. @spec get_current_chain_slot() :: Types.slot() def get_current_chain_slot() do time = :os.system_time(:second) @@ -118,6 +129,19 @@ defmodule LambdaEthereumConsensus.ForkChoice do compute_current_slot(time, genesis_time) end + @doc """ + Check if a slot is in the future with respect to the current time. + """ + @spec future_slot?(Types.slot()) :: boolean() + def future_slot?(slot) do + time = :os.system_time(:second) + genesis_time = StoreDb.fetch_genesis_time!() + + time + |> compute_currents_slots_within_disparity(genesis_time) + |> Enum.all?(fn possible_slot -> possible_slot < slot end) + end + @spec get_finalized_checkpoint() :: Types.Checkpoint.t() def get_finalized_checkpoint() do %{finalized_checkpoint: finalized} = fetch_store!() @@ -282,11 +306,7 @@ defmodule LambdaEthereumConsensus.ForkChoice do Logger.debug("[Fork choice] Updated fork choice cache", slot: slot) - %{ - store - | head_root: head_root, - head_slot: slot - } + Store.update_head_info(store, slot, head_root) end defp fetch_store!() do @@ -297,6 +317,13 @@ defmodule LambdaEthereumConsensus.ForkChoice do defp compute_current_slot(time, genesis_time), do: div(time - genesis_time, ChainSpec.get("SECONDS_PER_SLOT")) + defp compute_currents_slots_within_disparity(time, genesis_time) do + [ + compute_current_slot(time - ChainSpec.get("MAXIMUM_GOSSIP_CLOCK_DISPARITY"), genesis_time), + compute_current_slot(time + ChainSpec.get("MAXIMUM_GOSSIP_CLOCK_DISPARITY"), genesis_time) + ] + end + defp compute_fork_digest(slot, genesis_validators_root) do Misc.compute_epoch_at_slot(slot) |> ChainSpec.get_fork_version_for_epoch() diff --git a/lib/lambda_ethereum_consensus/fork_choice/handlers.ex b/lib/lambda_ethereum_consensus/fork_choice/handlers.ex index 275f8c7df..d35237a31 100644 --- a/lib/lambda_ethereum_consensus/fork_choice/handlers.ex +++ b/lib/lambda_ethereum_consensus/fork_choice/handlers.ex @@ -4,6 +4,7 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do """ require Logger + alias LambdaEthereumConsensus.ForkChoice alias LambdaEthereumConsensus.Execution.ExecutionClient alias LambdaEthereumConsensus.StateTransition alias LambdaEthereumConsensus.StateTransition.Accessors @@ -38,7 +39,7 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do # to ensure that every previous slot is processed with ``on_tick_per_slot`` seconds_per_slot = ChainSpec.get("SECONDS_PER_SLOT") tick_slot = div(time - store.genesis_time, seconds_per_slot) - current_slot = Store.get_current_slot(store) + current_slot = ForkChoice.get_current_slot(store) next_slot_start = (current_slot + 1) * seconds_per_slot last_slot_start = tick_slot * seconds_per_slot @@ -69,7 +70,7 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do # Blocks cannot be in the future. If they are, their # consideration must be delayed until they are in the past. - Store.get_current_slot(store) < block.slot -> + ForkChoice.future_slot?(block.slot) -> # TODO: handle this error somehow {:error, "block is from the future"} @@ -235,8 +236,8 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do ) is_first_block = new_store.proposer_boost_root == <<0::256>> - # TODO: store block timeliness data? - is_timely = Store.get_current_slot(new_store) == block.slot and is_before_attesting_interval + # TODO: store block timeliness data? we might need to take MAXIMUM_GOSSIP_CLOCK_DISPARITY into account + is_timely = ForkChoice.get_current_slot(new_store) == block.slot and is_before_attesting_interval state = new_state_info.beacon_state @@ -283,12 +284,13 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do end defp on_tick_per_slot(%Store{} = store, time) do - previous_slot = Store.get_current_slot(store) + previous_slot = ForkChoice.get_current_slot(store) # Update store time store = %Store{store | time: time} - current_slot = Store.get_current_slot(store) + # Why is this needed? the previous line shoud be immediate. + current_slot = ForkChoice.get_current_slot(store) store # If this is a new slot, reset store.proposer_boost_root @@ -394,10 +396,10 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do target.root != Store.get_checkpoint_block(store, block_root, target.epoch) -> {:error, "mismatched head and target blocks"} - # Attestations can only affect the fork choice of subsequent slots. + # Attestations can only affect the fork choice of subsequent slots (that's why the - 1). # Delay consideration in the fork choice until their slot is in the past. # TODO: delay consideration - Store.get_current_slot(store) <= attestation.data.slot -> + ForkChoice.future_slot?(attestation.data.slot - 1) -> {:error, "attestation is for a future slot"} true -> diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/beacon_block.ex b/lib/lambda_ethereum_consensus/p2p/gossip/beacon_block.ex index a1accbab1..b66414de5 100644 --- a/lib/lambda_ethereum_consensus/p2p/gossip/beacon_block.ex +++ b/lib/lambda_ethereum_consensus/p2p/gossip/beacon_block.ex @@ -17,11 +17,9 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.BeaconBlock do @impl true def handle_gossip_message(store, _topic, msg_id, message) do - slot = ForkChoice.get_current_chain_slot() - with {:ok, uncompressed} <- :snappyer.decompress(message), {:ok, signed_block} <- Ssz.from_ssz(uncompressed, SignedBeaconBlock), - :ok <- validate(signed_block, slot) do + :ok <- validate(signed_block) do Logger.info("[Gossip] Block received, block.slot: #{signed_block.message.slot}.") Libp2pPort.validate_message(msg_id, :accept) PendingBlocks.add_block(store, signed_block) @@ -63,8 +61,9 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.BeaconBlock do ### Private functions ########################## - @spec validate(SignedBeaconBlock.t(), Types.slot()) :: :ok | {:ignore, String.t()} - defp validate(%SignedBeaconBlock{message: block}, current_slot) do + @spec validate(SignedBeaconBlock.t()) :: :ok | {:ignore, String.t()} + defp validate(%SignedBeaconBlock{message: block}) do + current_slot = ForkChoice.get_current_chain_slot() min_slot = current_slot - ChainSpec.get("SLOTS_PER_EPOCH") cond do @@ -73,7 +72,7 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.BeaconBlock do {:ignore, "Block too old: block.slot=#{block.slot}. Current slot: #{current_slot}. Minimum expected slot: #{min_slot}"} - block.slot > current_slot -> + ForkChoice.future_slot?(block.slot) -> {:ignore, "Block is from the future: block.slot=#{block.slot}. Current slot: #{current_slot}."} diff --git a/lib/lambda_ethereum_consensus/p2p/peerbook.ex b/lib/lambda_ethereum_consensus/p2p/peerbook.ex index 785e198fb..62e579774 100644 --- a/lib/lambda_ethereum_consensus/p2p/peerbook.ex +++ b/lib/lambda_ethereum_consensus/p2p/peerbook.ex @@ -7,7 +7,7 @@ defmodule LambdaEthereumConsensus.P2P.Peerbook do alias LambdaEthereumConsensus.Store.KvSchema @initial_score 100 - @penalize 2 + @penalize 5 @target_peers 128 @max_prune_size 8 @prune_percentage 0.05 @@ -43,14 +43,17 @@ defmodule LambdaEthereumConsensus.P2P.Peerbook do Get some peer from the peerbook. """ def get_some_peer() do - # TODO: use some algorithm to pick a good peer, for now it's random + # TODO: This is a very naive implementation of a peer selection algorithm. peerbook = fetch_peerbook!() if peerbook == %{} do nil else - {peer_id, _score} = Enum.random(peerbook) - peer_id + peerbook + |> Enum.sort_by(fn {_peer_id, score} -> score end) + |> Enum.take(4) + |> Enum.random() + |> elem(0) end end diff --git a/lib/types/store.ex b/lib/types/store.ex index 00276ee62..686a7e123 100644 --- a/lib/types/store.ex +++ b/lib/types/store.ex @@ -3,6 +3,7 @@ defmodule Types.Store do The Store struct is used to track information required for the fork choice algorithm. """ + alias LambdaEthereumConsensus.ForkChoice alias LambdaEthereumConsensus.ForkChoice.Head alias LambdaEthereumConsensus.ForkChoice.Simple.Tree alias LambdaEthereumConsensus.StateTransition @@ -110,13 +111,9 @@ defmodule Types.Store do end end - def get_current_slot(%__MODULE__{time: time, genesis_time: genesis_time}) do - # NOTE: this assumes GENESIS_SLOT == 0 - div(time - genesis_time, ChainSpec.get("SECONDS_PER_SLOT")) - end - + # We probably want to move this to a more appropriate module def get_current_epoch(store) do - store |> get_current_slot() |> Misc.compute_epoch_at_slot() + store |> ForkChoice.get_current_slot() |> Misc.compute_epoch_at_slot() end def get_ancestor(%__MODULE__{} = store, root, slot) do @@ -245,9 +242,15 @@ defmodule Types.Store do end end - defp update_head_info(store) do + @spec update_head_info(t()) :: t() + def update_head_info(store) do {:ok, head_root} = Head.get_head(store) %{slot: head_slot} = Blocks.get_block!(head_root) + update_head_info(store, head_slot, head_root) + end + + @spec update_head_info(t(), Types.slot(), Types.root()) :: t() + def update_head_info(store, head_slot, head_root) do %{store | head_root: head_root, head_slot: head_slot} end From 0458c17a3d7258e1edd8b567b3b3802d89700550 Mon Sep 17 00:00:00 2001 From: Rodrigo Oliveri Date: Tue, 1 Oct 2024 17:26:22 -0300 Subject: [PATCH 08/17] remove the peers on 5 failures --- lib/lambda_ethereum_consensus/p2p/peerbook.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/lambda_ethereum_consensus/p2p/peerbook.ex b/lib/lambda_ethereum_consensus/p2p/peerbook.ex index 62e579774..500a21e3d 100644 --- a/lib/lambda_ethereum_consensus/p2p/peerbook.ex +++ b/lib/lambda_ethereum_consensus/p2p/peerbook.ex @@ -7,7 +7,7 @@ defmodule LambdaEthereumConsensus.P2P.Peerbook do alias LambdaEthereumConsensus.Store.KvSchema @initial_score 100 - @penalize 5 + @penalize 20 @target_peers 128 @max_prune_size 8 @prune_percentage 0.05 From 4d4bc34574ba65c574fca6fcb3253d5cae7f1b36 Mon Sep 17 00:00:00 2001 From: Rodrigo Oliveri Date: Wed, 2 Oct 2024 16:32:57 -0300 Subject: [PATCH 09/17] Separated changes not related to the peerbook --- .../beacon/checkpoint_sync.ex | 13 +----- .../fork_choice/fork_choice.ex | 46 ++++--------------- .../fork_choice/handlers.ex | 18 ++++---- .../fork_choice/head.ex | 2 +- .../fork_choice/simple_tree.ex | 2 +- .../p2p/gossip/beacon_block.ex | 11 +++-- .../store/state_db.ex | 1 - lib/types/store.ex | 8 +++- network_params.yaml | 8 ++-- 9 files changed, 35 insertions(+), 74 deletions(-) diff --git a/lib/lambda_ethereum_consensus/beacon/checkpoint_sync.ex b/lib/lambda_ethereum_consensus/beacon/checkpoint_sync.ex index b22cacfaa..0e27b2c3f 100644 --- a/lib/lambda_ethereum_consensus/beacon/checkpoint_sync.ex +++ b/lib/lambda_ethereum_consensus/beacon/checkpoint_sync.ex @@ -19,7 +19,7 @@ defmodule LambdaEthereumConsensus.Beacon.CheckpointSync do def get_finalized_block_and_state(url, genesis_validators_root) do tasks = [Task.async(__MODULE__, :get_state, [url]), Task.async(__MODULE__, :get_block, [url])] - case Task.await_many(tasks, 600_000) do + case Task.await_many(tasks, 60_000) do [{:ok, state}, {:ok, block}] -> if state.genesis_validators_root == genesis_validators_root do check_match(url, state, block) @@ -47,17 +47,11 @@ defmodule LambdaEthereumConsensus.Beacon.CheckpointSync do """ @spec get_state(String.t()) :: {:ok, BeaconState.t()} | {:error, any()} def get_state(url) do - start_time = System.monotonic_time() with {:error, err} <- get_ssz_from_url(url, "/eth/v2/debug/beacon/states/finalized", BeaconState) do Logger.error("There has been an error retrieving the last finalized state") {:error, err} - else - {:ok, state} -> - Logger.info("Retrieved the last finalized state in #{(System.monotonic_time() - start_time) / 1_000_000_000} s") - {:ok, state} end - end @doc """ @@ -65,15 +59,10 @@ defmodule LambdaEthereumConsensus.Beacon.CheckpointSync do """ @spec get_block(String.t()) :: {:ok, SignedBeaconBlock.t()} | {:error, any()} def get_block(url, id \\ "finalized") do - start_time = System.monotonic_time() with {:error, err} <- get_ssz_from_url(url, "/eth/v2/beacon/blocks/#{id}", SignedBeaconBlock) do Logger.error("There has been an error retrieving the last finalized block") {:error, err} - else - {:ok, block} -> - Logger.info("Retrieved the last finalized block in #{(System.monotonic_time() - start_time) / 1_000_000_000} s") - {:ok, block} end end diff --git a/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex b/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex index a4fdd322c..ad0ee36ad 100644 --- a/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex +++ b/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex @@ -31,7 +31,7 @@ defmodule LambdaEthereumConsensus.ForkChoice do store = Handlers.on_tick(store, time) - :telemetry.execute([:sync, :store], %{slot: get_current_slot(store)}) + :telemetry.execute([:sync, :store], %{slot: Store.get_current_slot(store)}) :telemetry.execute([:sync, :on_block], %{slot: head_slot}) Metrics.block_status(head_root, head_slot, :transitioned) @@ -111,17 +111,6 @@ defmodule LambdaEthereumConsensus.ForkChoice do |> tap(&StoreDb.persist_store/1) end - @spec get_current_slot(Types.Store.t()) :: Types.slot() - def get_current_slot(%Types.Store{} = store), - do: compute_current_slot(store.time, store.genesis_time) - - @spec get_current_slot(Types.uint64(), Types.uint64()) :: Types.slot() - def get_current_slot(time, genesis_time), - do: compute_current_slot(time, genesis_time) - - # TODO: Some parts of the code calculate the current slot using the previous function given a time - # specifically from the store (this was previously in the Store type module). This one calculates - # it using the system time, we might need to unify. @spec get_current_chain_slot() :: Types.slot() def get_current_chain_slot() do time = :os.system_time(:second) @@ -129,19 +118,6 @@ defmodule LambdaEthereumConsensus.ForkChoice do compute_current_slot(time, genesis_time) end - @doc """ - Check if a slot is in the future with respect to the current time. - """ - @spec future_slot?(Types.slot()) :: boolean() - def future_slot?(slot) do - time = :os.system_time(:second) - genesis_time = StoreDb.fetch_genesis_time!() - - time - |> compute_currents_slots_within_disparity(genesis_time) - |> Enum.all?(fn possible_slot -> possible_slot < slot end) - end - @spec get_finalized_checkpoint() :: Types.Checkpoint.t() def get_finalized_checkpoint() do %{finalized_checkpoint: finalized} = fetch_store!() @@ -197,7 +173,6 @@ defmodule LambdaEthereumConsensus.ForkChoice do new_finalized_epoch = store.finalized_checkpoint.epoch if last_finalized_epoch < new_finalized_epoch do - Logger.info("Pruning states before slot #{new_finalized_epoch}") new_finalized_slot = @@ -217,11 +192,9 @@ defmodule LambdaEthereumConsensus.ForkChoice do PruneBlobsSupervisor, fn -> BlobDb.prune_old_blobs(new_finalized_slot) end ) - - Store.prune(store) - else - store end + + Store.prune(store) end def apply_handler(iter, state, handler) do @@ -306,7 +279,11 @@ defmodule LambdaEthereumConsensus.ForkChoice do Logger.debug("[Fork choice] Updated fork choice cache", slot: slot) - Store.update_head_info(store, slot, head_root) + %{ + store + | head_root: head_root, + head_slot: slot + } end defp fetch_store!() do @@ -317,13 +294,6 @@ defmodule LambdaEthereumConsensus.ForkChoice do defp compute_current_slot(time, genesis_time), do: div(time - genesis_time, ChainSpec.get("SECONDS_PER_SLOT")) - defp compute_currents_slots_within_disparity(time, genesis_time) do - [ - compute_current_slot(time - ChainSpec.get("MAXIMUM_GOSSIP_CLOCK_DISPARITY"), genesis_time), - compute_current_slot(time + ChainSpec.get("MAXIMUM_GOSSIP_CLOCK_DISPARITY"), genesis_time) - ] - end - defp compute_fork_digest(slot, genesis_validators_root) do Misc.compute_epoch_at_slot(slot) |> ChainSpec.get_fork_version_for_epoch() diff --git a/lib/lambda_ethereum_consensus/fork_choice/handlers.ex b/lib/lambda_ethereum_consensus/fork_choice/handlers.ex index d35237a31..275f8c7df 100644 --- a/lib/lambda_ethereum_consensus/fork_choice/handlers.ex +++ b/lib/lambda_ethereum_consensus/fork_choice/handlers.ex @@ -4,7 +4,6 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do """ require Logger - alias LambdaEthereumConsensus.ForkChoice alias LambdaEthereumConsensus.Execution.ExecutionClient alias LambdaEthereumConsensus.StateTransition alias LambdaEthereumConsensus.StateTransition.Accessors @@ -39,7 +38,7 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do # to ensure that every previous slot is processed with ``on_tick_per_slot`` seconds_per_slot = ChainSpec.get("SECONDS_PER_SLOT") tick_slot = div(time - store.genesis_time, seconds_per_slot) - current_slot = ForkChoice.get_current_slot(store) + current_slot = Store.get_current_slot(store) next_slot_start = (current_slot + 1) * seconds_per_slot last_slot_start = tick_slot * seconds_per_slot @@ -70,7 +69,7 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do # Blocks cannot be in the future. If they are, their # consideration must be delayed until they are in the past. - ForkChoice.future_slot?(block.slot) -> + Store.get_current_slot(store) < block.slot -> # TODO: handle this error somehow {:error, "block is from the future"} @@ -236,8 +235,8 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do ) is_first_block = new_store.proposer_boost_root == <<0::256>> - # TODO: store block timeliness data? we might need to take MAXIMUM_GOSSIP_CLOCK_DISPARITY into account - is_timely = ForkChoice.get_current_slot(new_store) == block.slot and is_before_attesting_interval + # TODO: store block timeliness data? + is_timely = Store.get_current_slot(new_store) == block.slot and is_before_attesting_interval state = new_state_info.beacon_state @@ -284,13 +283,12 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do end defp on_tick_per_slot(%Store{} = store, time) do - previous_slot = ForkChoice.get_current_slot(store) + previous_slot = Store.get_current_slot(store) # Update store time store = %Store{store | time: time} - # Why is this needed? the previous line shoud be immediate. - current_slot = ForkChoice.get_current_slot(store) + current_slot = Store.get_current_slot(store) store # If this is a new slot, reset store.proposer_boost_root @@ -396,10 +394,10 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do target.root != Store.get_checkpoint_block(store, block_root, target.epoch) -> {:error, "mismatched head and target blocks"} - # Attestations can only affect the fork choice of subsequent slots (that's why the - 1). + # Attestations can only affect the fork choice of subsequent slots. # Delay consideration in the fork choice until their slot is in the past. # TODO: delay consideration - ForkChoice.future_slot?(attestation.data.slot - 1) -> + Store.get_current_slot(store) <= attestation.data.slot -> {:error, "attestation is for a future slot"} true -> diff --git a/lib/lambda_ethereum_consensus/fork_choice/head.ex b/lib/lambda_ethereum_consensus/fork_choice/head.ex index 62d0e2836..9114bc0e8 100644 --- a/lib/lambda_ethereum_consensus/fork_choice/head.ex +++ b/lib/lambda_ethereum_consensus/fork_choice/head.ex @@ -101,7 +101,7 @@ defmodule LambdaEthereumConsensus.ForkChoice.Head do {true, Map.put(new_blocks, block_root, block)} not Enum.empty?(children) -> - {false, blocks} + {false, new_blocks} true -> filter_leaf_block(store, block_root, block, blocks) diff --git a/lib/lambda_ethereum_consensus/fork_choice/simple_tree.ex b/lib/lambda_ethereum_consensus/fork_choice/simple_tree.ex index 5fedf768a..569414541 100644 --- a/lib/lambda_ethereum_consensus/fork_choice/simple_tree.ex +++ b/lib/lambda_ethereum_consensus/fork_choice/simple_tree.ex @@ -82,7 +82,7 @@ defmodule LambdaEthereumConsensus.ForkChoice.Simple.Tree do def get_children!(tree, parent_id) do case get_children(tree, parent_id) do {:error, :not_found} -> raise "Parent #{Base.encode16(parent_id)} not found in tree" - {:ok, res} -> res |> IO.inspect(label: "Children of #{Base.encode16(parent_id)}") + {:ok, res} -> res end end diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/beacon_block.ex b/lib/lambda_ethereum_consensus/p2p/gossip/beacon_block.ex index b66414de5..a1accbab1 100644 --- a/lib/lambda_ethereum_consensus/p2p/gossip/beacon_block.ex +++ b/lib/lambda_ethereum_consensus/p2p/gossip/beacon_block.ex @@ -17,9 +17,11 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.BeaconBlock do @impl true def handle_gossip_message(store, _topic, msg_id, message) do + slot = ForkChoice.get_current_chain_slot() + with {:ok, uncompressed} <- :snappyer.decompress(message), {:ok, signed_block} <- Ssz.from_ssz(uncompressed, SignedBeaconBlock), - :ok <- validate(signed_block) do + :ok <- validate(signed_block, slot) do Logger.info("[Gossip] Block received, block.slot: #{signed_block.message.slot}.") Libp2pPort.validate_message(msg_id, :accept) PendingBlocks.add_block(store, signed_block) @@ -61,9 +63,8 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.BeaconBlock do ### Private functions ########################## - @spec validate(SignedBeaconBlock.t()) :: :ok | {:ignore, String.t()} - defp validate(%SignedBeaconBlock{message: block}) do - current_slot = ForkChoice.get_current_chain_slot() + @spec validate(SignedBeaconBlock.t(), Types.slot()) :: :ok | {:ignore, String.t()} + defp validate(%SignedBeaconBlock{message: block}, current_slot) do min_slot = current_slot - ChainSpec.get("SLOTS_PER_EPOCH") cond do @@ -72,7 +73,7 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.BeaconBlock do {:ignore, "Block too old: block.slot=#{block.slot}. Current slot: #{current_slot}. Minimum expected slot: #{min_slot}"} - ForkChoice.future_slot?(block.slot) -> + block.slot > current_slot -> {:ignore, "Block is from the future: block.slot=#{block.slot}. Current slot: #{current_slot}."} diff --git a/lib/lambda_ethereum_consensus/store/state_db.ex b/lib/lambda_ethereum_consensus/store/state_db.ex index bd07d9118..e13cab075 100644 --- a/lib/lambda_ethereum_consensus/store/state_db.ex +++ b/lib/lambda_ethereum_consensus/store/state_db.ex @@ -62,7 +62,6 @@ defmodule LambdaEthereumConsensus.Store.StateDb do result = BlockRootBySlot.fold_keys(slot, 0, fn slot, acc -> - Logger.info("[StateDb] Pruning state for slot #{slot}.") case BlockRootBySlot.get(slot) do {:ok, _block_root} -> remove_state_by_slot(slot) diff --git a/lib/types/store.ex b/lib/types/store.ex index 686a7e123..0641db94d 100644 --- a/lib/types/store.ex +++ b/lib/types/store.ex @@ -111,9 +111,13 @@ defmodule Types.Store do end end - # We probably want to move this to a more appropriate module + def get_current_slot(%__MODULE__{time: time, genesis_time: genesis_time}) do + # NOTE: this assumes GENESIS_SLOT == 0 + div(time - genesis_time, ChainSpec.get("SECONDS_PER_SLOT")) + end + def get_current_epoch(store) do - store |> ForkChoice.get_current_slot() |> Misc.compute_epoch_at_slot() + store |> get_current_slot() |> Misc.compute_epoch_at_slot() end def get_ancestor(%__MODULE__{} = store, root, slot) do diff --git a/network_params.yaml b/network_params.yaml index 5303f6e21..76f0325e1 100644 --- a/network_params.yaml +++ b/network_params.yaml @@ -2,14 +2,14 @@ participants: - el_type: geth cl_type: lighthouse count: 2 - validator_count: 5 + validator_count: 32 - el_type: geth cl_type: lambda cl_image: lambda_ethereum_consensus:latest use_separate_vc: false count: 1 - validator_count: 54 + validator_count: 32 cl_max_mem: 4096 keymanager_enabled: true -# network_params: -# preset: minimal +network_params: + preset: minimal From d1846dd2ebe85498f4e506aa6e9b651260561202 Mon Sep 17 00:00:00 2001 From: Rodrigo Oliveri Date: Wed, 2 Oct 2024 17:00:46 -0300 Subject: [PATCH 10/17] format and removal of store changes --- .../p2p/blob_downloader.ex | 9 +++++---- .../p2p/block_downloader.ex | 9 +++++---- lib/lambda_ethereum_consensus/p2p/peerbook.ex | 15 ++++++++++++--- lib/types/store.ex | 9 +-------- 4 files changed, 23 insertions(+), 19 deletions(-) diff --git a/lib/lambda_ethereum_consensus/p2p/blob_downloader.ex b/lib/lambda_ethereum_consensus/p2p/blob_downloader.ex index 81877f045..3b761c0ec 100644 --- a/lib/lambda_ethereum_consensus/p2p/blob_downloader.ex +++ b/lib/lambda_ethereum_consensus/p2p/blob_downloader.ex @@ -62,7 +62,11 @@ defmodule LambdaEthereumConsensus.P2P.BlobDownloader do P2P.Peerbook.penalize_peer(peer_id) if retries > 0 do - Logger.info("Retrying request for #{count} blobs, reason: #{inspect(reason)} in 2 second", slot: slot) + Logger.info( + "Retrying request for #{count} blobs, reason: #{inspect(reason)} in 2 second", + slot: slot + ) + Process.sleep(2000) request_blobs_by_range(slot, count, on_blobs, retries - 1) {:ok, store} @@ -124,9 +128,6 @@ defmodule LambdaEthereumConsensus.P2P.BlobDownloader do defp get_some_peer() do case P2P.Peerbook.get_some_peer() do nil -> - stacktrace = Process.info(self(), :current_stacktrace) - IO.inspect(stacktrace, label: "Current stacktrace") - Process.sleep(100) get_some_peer() diff --git a/lib/lambda_ethereum_consensus/p2p/block_downloader.ex b/lib/lambda_ethereum_consensus/p2p/block_downloader.ex index 6438ccfd1..6910d4a77 100644 --- a/lib/lambda_ethereum_consensus/p2p/block_downloader.ex +++ b/lib/lambda_ethereum_consensus/p2p/block_downloader.ex @@ -173,7 +173,11 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do if retries > 0 do :telemetry.execute([:network, :request], %{blocks: 0}, Map.put(tags, :result, "retry")) pretty_roots = Enum.map_join(roots, ", ", &Base.encode16/1) - Logger.debug("Retrying request (reason: #{inspect(reason)}) for blocks with roots #{pretty_roots}, in 2 second") + + Logger.debug( + "Retrying request (reason: #{inspect(reason)}) for blocks with roots #{pretty_roots}, in 2 second" + ) + Process.sleep(2000) request_blocks_by_root(roots, on_blocks, retries - 1) {:ok, store} @@ -187,9 +191,6 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do defp get_some_peer() do case P2P.Peerbook.get_some_peer() do nil -> - stacktrace = Process.info(self(), :current_stacktrace) - IO.inspect(stacktrace, label: "Current stacktrace") - Process.sleep(100) get_some_peer() diff --git a/lib/lambda_ethereum_consensus/p2p/peerbook.ex b/lib/lambda_ethereum_consensus/p2p/peerbook.ex index 500a21e3d..d0dd89bcb 100644 --- a/lib/lambda_ethereum_consensus/p2p/peerbook.ex +++ b/lib/lambda_ethereum_consensus/p2p/peerbook.ex @@ -58,7 +58,10 @@ defmodule LambdaEthereumConsensus.P2P.Peerbook do end def penalize_peer(peer_id) do - Logger.debug("Penalizing peer: #{inspect(LambdaEthereumConsensus.Utils.format_shorten_binary(peer_id))}") + Logger.debug( + "Penalizing peer: #{inspect(LambdaEthereumConsensus.Utils.format_shorten_binary(peer_id))}" + ) + peer_score = fetch_peerbook!() |> Map.get(peer_id) case peer_score do @@ -66,7 +69,10 @@ defmodule LambdaEthereumConsensus.P2P.Peerbook do :ok score when score - @penalize <= 0 -> - Logger.info("Removing peer: #{inspect(LambdaEthereumConsensus.Utils.format_shorten_binary(peer_id))}") + Logger.info( + "Removing peer: #{inspect(LambdaEthereumConsensus.Utils.format_shorten_binary(peer_id))}" + ) + fetch_peerbook!() |> Map.delete(peer_id) |> store_peerbook() @@ -80,7 +86,10 @@ defmodule LambdaEthereumConsensus.P2P.Peerbook do def handle_new_peer(peer_id) do peerbook = fetch_peerbook!() - Logger.debug("New peer connected: #{inspect(LambdaEthereumConsensus.Utils.format_shorten_binary(peer_id))}") + + Logger.debug( + "New peer connected: #{inspect(LambdaEthereumConsensus.Utils.format_shorten_binary(peer_id))}" + ) if not Map.has_key?(peerbook, peer_id) do :telemetry.execute([:peers, :connection], %{id: peer_id}, %{result: "success"}) diff --git a/lib/types/store.ex b/lib/types/store.ex index 0641db94d..00276ee62 100644 --- a/lib/types/store.ex +++ b/lib/types/store.ex @@ -3,7 +3,6 @@ defmodule Types.Store do The Store struct is used to track information required for the fork choice algorithm. """ - alias LambdaEthereumConsensus.ForkChoice alias LambdaEthereumConsensus.ForkChoice.Head alias LambdaEthereumConsensus.ForkChoice.Simple.Tree alias LambdaEthereumConsensus.StateTransition @@ -246,15 +245,9 @@ defmodule Types.Store do end end - @spec update_head_info(t()) :: t() - def update_head_info(store) do + defp update_head_info(store) do {:ok, head_root} = Head.get_head(store) %{slot: head_slot} = Blocks.get_block!(head_root) - update_head_info(store, head_slot, head_root) - end - - @spec update_head_info(t(), Types.slot(), Types.root()) :: t() - def update_head_info(store, head_slot, head_root) do %{store | head_root: head_root, head_slot: head_slot} end From 8dd552253f8d70ffbb0d3f6831849121cfe1f2a1 Mon Sep 17 00:00:00 2001 From: Rodrigo Oliveri Date: Wed, 2 Oct 2024 18:13:56 -0300 Subject: [PATCH 11/17] Fix latest changes trying to find a good balance between penalization + initial sync delay --- .../p2p/blob_downloader.ex | 11 +++-------- .../p2p/block_downloader.ex | 10 ++++------ lib/lambda_ethereum_consensus/p2p/peerbook.ex | 13 +++++-------- lib/libp2p_port.ex | 5 ++--- 4 files changed, 14 insertions(+), 25 deletions(-) diff --git a/lib/lambda_ethereum_consensus/p2p/blob_downloader.ex b/lib/lambda_ethereum_consensus/p2p/blob_downloader.ex index 3b761c0ec..f68a4a864 100644 --- a/lib/lambda_ethereum_consensus/p2p/blob_downloader.ex +++ b/lib/lambda_ethereum_consensus/p2p/blob_downloader.ex @@ -32,7 +32,6 @@ defmodule LambdaEthereumConsensus.P2P.BlobDownloader do def request_blobs_by_range(slot, count, on_blobs, retries) do Logger.debug("Requesting blobs", slot: slot) - # FIXME: handle no-peers asynchronously! this is hanging Libp2pPort when there are no peers peer_id = get_some_peer() # NOTE: BeaconBlocksByRangeRequest == BlobSidecarsByRangeRequest @@ -62,12 +61,8 @@ defmodule LambdaEthereumConsensus.P2P.BlobDownloader do P2P.Peerbook.penalize_peer(peer_id) if retries > 0 do - Logger.info( - "Retrying request for #{count} blobs, reason: #{inspect(reason)} in 2 second", - slot: slot - ) + Logger.info("Retrying request for #{count} blobs: #{inspect(reason)}", slot: slot) - Process.sleep(2000) request_blobs_by_range(slot, count, on_blobs, retries - 1) {:ok, store} else @@ -128,8 +123,8 @@ defmodule LambdaEthereumConsensus.P2P.BlobDownloader do defp get_some_peer() do case P2P.Peerbook.get_some_peer() do nil -> - Process.sleep(100) - get_some_peer() + # TODO: handle no-peers asynchronously + raise "No peers available to request blobs from." peer_id -> peer_id diff --git a/lib/lambda_ethereum_consensus/p2p/block_downloader.ex b/lib/lambda_ethereum_consensus/p2p/block_downloader.ex index 6910d4a77..92ef93510 100644 --- a/lib/lambda_ethereum_consensus/p2p/block_downloader.ex +++ b/lib/lambda_ethereum_consensus/p2p/block_downloader.ex @@ -66,7 +66,6 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do def request_blocks_by_range(slot, count, on_blocks, retries) do Logger.debug("Requesting block", slot: slot) - # FIXME: handle no-peers asynchronously! this is hanging Libp2pPort when there are no peers peer_id = get_some_peer() request = @@ -174,11 +173,10 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do :telemetry.execute([:network, :request], %{blocks: 0}, Map.put(tags, :result, "retry")) pretty_roots = Enum.map_join(roots, ", ", &Base.encode16/1) - Logger.debug( - "Retrying request (reason: #{inspect(reason)}) for blocks with roots #{pretty_roots}, in 2 second" + Logger.info( + "Retrying request for blocks with roots #{pretty_roots}: #{inspect(reason)}" ) - Process.sleep(2000) request_blocks_by_root(roots, on_blocks, retries - 1) {:ok, store} else @@ -191,8 +189,8 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do defp get_some_peer() do case P2P.Peerbook.get_some_peer() do nil -> - Process.sleep(100) - get_some_peer() + # TODO: handle no-peers asynchronously + raise "No peers available to request blocks from." peer_id -> peer_id diff --git a/lib/lambda_ethereum_consensus/p2p/peerbook.ex b/lib/lambda_ethereum_consensus/p2p/peerbook.ex index d0dd89bcb..a3e755129 100644 --- a/lib/lambda_ethereum_consensus/p2p/peerbook.ex +++ b/lib/lambda_ethereum_consensus/p2p/peerbook.ex @@ -5,9 +5,10 @@ defmodule LambdaEthereumConsensus.P2P.Peerbook do require Logger alias LambdaEthereumConsensus.Libp2pPort alias LambdaEthereumConsensus.Store.KvSchema + alias LambdaEthereumConsensus.Utils @initial_score 100 - @penalize 20 + @penalize 35 @target_peers 128 @max_prune_size 8 @prune_percentage 0.05 @@ -58,9 +59,7 @@ defmodule LambdaEthereumConsensus.P2P.Peerbook do end def penalize_peer(peer_id) do - Logger.debug( - "Penalizing peer: #{inspect(LambdaEthereumConsensus.Utils.format_shorten_binary(peer_id))}" - ) + Logger.debug("[Peerbook] Penalizing peer: #{inspect(Utils.format_shorten_binary(peer_id))}") peer_score = fetch_peerbook!() |> Map.get(peer_id) @@ -69,9 +68,7 @@ defmodule LambdaEthereumConsensus.P2P.Peerbook do :ok score when score - @penalize <= 0 -> - Logger.info( - "Removing peer: #{inspect(LambdaEthereumConsensus.Utils.format_shorten_binary(peer_id))}" - ) + Logger.debug("[Peerbook] Removing peer: #{inspect(Utils.format_shorten_binary(peer_id))}") fetch_peerbook!() |> Map.delete(peer_id) @@ -88,7 +85,7 @@ defmodule LambdaEthereumConsensus.P2P.Peerbook do peerbook = fetch_peerbook!() Logger.debug( - "New peer connected: #{inspect(LambdaEthereumConsensus.Utils.format_shorten_binary(peer_id))}" + "[Peerbook] New peer connected: #{inspect(Utils.format_shorten_binary(peer_id))}" ) if not Map.has_key?(peerbook, peer_id) do diff --git a/lib/libp2p_port.ex b/lib/libp2p_port.ex index 52f51f8ed..48f5a7758 100644 --- a/lib/libp2p_port.ex +++ b/lib/libp2p_port.ex @@ -9,8 +9,6 @@ defmodule LambdaEthereumConsensus.Libp2pPort do use GenServer - @tick_time 1000 - alias LambdaEthereumConsensus.Beacon.PendingBlocks alias LambdaEthereumConsensus.Beacon.SyncBlocks alias LambdaEthereumConsensus.ForkChoice @@ -84,7 +82,8 @@ defmodule LambdaEthereumConsensus.Libp2pPort do discovery_addresses: [String.t()] } - @sync_delay_millis 10_000 + @tick_time 1000 + @sync_delay_millis 15_000 ###################### ### API From 850f50fb807f1a711f4a1c97053691d0f5016a65 Mon Sep 17 00:00:00 2001 From: Rodrigo Oliveri Date: Wed, 2 Oct 2024 18:56:46 -0300 Subject: [PATCH 12/17] final tweaks to the peer selection and pruning algo --- .../p2p/blob_downloader.ex | 3 +-- .../p2p/block_downloader.ex | 5 +--- lib/lambda_ethereum_consensus/p2p/peerbook.ex | 26 +++++++++---------- 3 files changed, 15 insertions(+), 19 deletions(-) diff --git a/lib/lambda_ethereum_consensus/p2p/blob_downloader.ex b/lib/lambda_ethereum_consensus/p2p/blob_downloader.ex index f68a4a864..d5e9de110 100644 --- a/lib/lambda_ethereum_consensus/p2p/blob_downloader.ex +++ b/lib/lambda_ethereum_consensus/p2p/blob_downloader.ex @@ -61,8 +61,7 @@ defmodule LambdaEthereumConsensus.P2P.BlobDownloader do P2P.Peerbook.penalize_peer(peer_id) if retries > 0 do - Logger.info("Retrying request for #{count} blobs: #{inspect(reason)}", slot: slot) - + Logger.debug("Retrying request for #{count} blobs: #{inspect(reason)}", slot: slot) request_blobs_by_range(slot, count, on_blobs, retries - 1) {:ok, store} else diff --git a/lib/lambda_ethereum_consensus/p2p/block_downloader.ex b/lib/lambda_ethereum_consensus/p2p/block_downloader.ex index 92ef93510..aa9ae0395 100644 --- a/lib/lambda_ethereum_consensus/p2p/block_downloader.ex +++ b/lib/lambda_ethereum_consensus/p2p/block_downloader.ex @@ -173,10 +173,7 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do :telemetry.execute([:network, :request], %{blocks: 0}, Map.put(tags, :result, "retry")) pretty_roots = Enum.map_join(roots, ", ", &Base.encode16/1) - Logger.info( - "Retrying request for blocks with roots #{pretty_roots}: #{inspect(reason)}" - ) - + Logger.debug("Retrying request for block roots #{pretty_roots}: #{inspect(reason)}") request_blocks_by_root(roots, on_blocks, retries - 1) {:ok, store} else diff --git a/lib/lambda_ethereum_consensus/p2p/peerbook.ex b/lib/lambda_ethereum_consensus/p2p/peerbook.ex index a3e755129..d04d3a1dd 100644 --- a/lib/lambda_ethereum_consensus/p2p/peerbook.ex +++ b/lib/lambda_ethereum_consensus/p2p/peerbook.ex @@ -8,7 +8,7 @@ defmodule LambdaEthereumConsensus.P2P.Peerbook do alias LambdaEthereumConsensus.Utils @initial_score 100 - @penalize 35 + @penalizing_score 25 @target_peers 128 @max_prune_size 8 @prune_percentage 0.05 @@ -44,15 +44,16 @@ defmodule LambdaEthereumConsensus.P2P.Peerbook do Get some peer from the peerbook. """ def get_some_peer() do - # TODO: This is a very naive implementation of a peer selection algorithm. + # TODO: This is a very naive implementation of a peer selection algorithm, + # this sorts the peers every time. peerbook = fetch_peerbook!() if peerbook == %{} do nil else peerbook - |> Enum.sort_by(fn {_peer_id, score} -> score end) - |> Enum.take(4) + |> Enum.sort_by(fn {_peer_id, score} -> -score end) + |> Enum.take(5) |> Enum.random() |> elem(0) end @@ -67,7 +68,7 @@ defmodule LambdaEthereumConsensus.P2P.Peerbook do nil -> :ok - score when score - @penalize <= 0 -> + score when score - @penalizing_score <= 0 -> Logger.debug("[Peerbook] Removing peer: #{inspect(Utils.format_shorten_binary(peer_id))}") fetch_peerbook!() @@ -76,7 +77,7 @@ defmodule LambdaEthereumConsensus.P2P.Peerbook do score -> fetch_peerbook!() - |> Map.put(peer_id, score - @penalize) + |> Map.put(peer_id, score - @penalizing_score) |> store_peerbook() end end @@ -119,13 +120,12 @@ defmodule LambdaEthereumConsensus.P2P.Peerbook do |> min(len - @target_peers) |> max(0) - n = :rand.uniform(len) - - peerbook - |> Map.keys() - |> Stream.drop(n) - |> Stream.take(prune_size) - |> Enum.each(fn peer_id -> Task.start(__MODULE__, :challenge_peer, [peer_id]) end) + if prune_size > 0 do + peerbook + |> Enum.sort_by(fn {_peer_id, score} -> -score end) + |> Enum.take(prune_size) + |> Enum.each(fn peer_id -> Task.start(__MODULE__, :challenge_peer, [peer_id]) end) + end end end From 9d6eecf6e17f44a1bc997c3d672a0b57640ecd0d Mon Sep 17 00:00:00 2001 From: Rodrigo Oliveri Date: Wed, 2 Oct 2024 18:57:53 -0300 Subject: [PATCH 13/17] Removed a store change --- lib/types/store.ex | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/lib/types/store.ex b/lib/types/store.ex index 00276ee62..736684034 100644 --- a/lib/types/store.ex +++ b/lib/types/store.ex @@ -144,11 +144,8 @@ defmodule Types.Store do @spec get_children(t(), Types.root()) :: [BeaconBlock.t()] def get_children(%__MODULE__{tree_cache: tree}, parent_root) do - Tree.get_children(tree, parent_root) - |> case do - {:ok, children} -> Enum.map(children, &{&1, Blocks.get_block!(&1)}) - {:error, :not_found} -> [] - end + Tree.get_children!(tree, parent_root) + |> Enum.map(&{&1, Blocks.get_block!(&1)}) end @spec store_block_info(t(), BlockInfo.t()) :: t() From d007aab930f006fd6859286b561ce6d33633dfe3 Mon Sep 17 00:00:00 2001 From: Rodrigo Oliveri Date: Wed, 2 Oct 2024 19:02:47 -0300 Subject: [PATCH 14/17] Small enhancement in code clarity --- lib/lambda_ethereum_consensus/p2p/peerbook.ex | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/lib/lambda_ethereum_consensus/p2p/peerbook.ex b/lib/lambda_ethereum_consensus/p2p/peerbook.ex index d04d3a1dd..f42653ffc 100644 --- a/lib/lambda_ethereum_consensus/p2p/peerbook.ex +++ b/lib/lambda_ethereum_consensus/p2p/peerbook.ex @@ -111,24 +111,26 @@ defmodule LambdaEthereumConsensus.P2P.Peerbook do defp prune() do peerbook = fetch_peerbook!() len = map_size(peerbook) + prune_size = if len > 0, do: calculate_prune_size(peerbook, len), else: 0 - if len != 0 do - prune_size = - (len * @prune_percentage) - |> round() - |> min(@max_prune_size) - |> min(len - @target_peers) - |> max(0) - - if prune_size > 0 do - peerbook - |> Enum.sort_by(fn {_peer_id, score} -> -score end) - |> Enum.take(prune_size) - |> Enum.each(fn peer_id -> Task.start(__MODULE__, :challenge_peer, [peer_id]) end) - end + if prune_size > 0 do + Logger.debug("[Peerbook] Pruning #{prune_size} peers by challenge") + + peerbook + |> Enum.sort_by(fn {_peer_id, score} -> -score end) + |> Enum.take(prune_size) + |> Enum.each(fn peer_id -> Task.start(__MODULE__, :challenge_peer, [peer_id]) end) end end + defp calculate_prune_size(peerbook, len) do + (len * @prune_percentage) + |> round() + |> min(@max_prune_size) + |> min(len - @target_peers) + |> max(0) + end + defp store_peerbook(peerbook), do: put("", peerbook) defp fetch_peerbook(), do: get("") From cca6989b07b39b2b0f3065940263422128683812 Mon Sep 17 00:00:00 2001 From: Rodrigo Oliveri Date: Wed, 2 Oct 2024 19:53:08 -0300 Subject: [PATCH 15/17] Fixing an issue with peers challenge on pruning --- lib/lambda_ethereum_consensus/p2p/peerbook.ex | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/lambda_ethereum_consensus/p2p/peerbook.ex b/lib/lambda_ethereum_consensus/p2p/peerbook.ex index f42653ffc..406b24298 100644 --- a/lib/lambda_ethereum_consensus/p2p/peerbook.ex +++ b/lib/lambda_ethereum_consensus/p2p/peerbook.ex @@ -45,7 +45,7 @@ defmodule LambdaEthereumConsensus.P2P.Peerbook do """ def get_some_peer() do # TODO: This is a very naive implementation of a peer selection algorithm, - # this sorts the peers every time. + # this sorts the peers every time. The same is true for the pruning. peerbook = fetch_peerbook!() if peerbook == %{} do @@ -111,7 +111,7 @@ defmodule LambdaEthereumConsensus.P2P.Peerbook do defp prune() do peerbook = fetch_peerbook!() len = map_size(peerbook) - prune_size = if len > 0, do: calculate_prune_size(peerbook, len), else: 0 + prune_size = if len > 0, do: calculate_prune_size(len), else: 0 if prune_size > 0 do Logger.debug("[Peerbook] Pruning #{prune_size} peers by challenge") @@ -119,11 +119,11 @@ defmodule LambdaEthereumConsensus.P2P.Peerbook do peerbook |> Enum.sort_by(fn {_peer_id, score} -> -score end) |> Enum.take(prune_size) - |> Enum.each(fn peer_id -> Task.start(__MODULE__, :challenge_peer, [peer_id]) end) + |> Enum.each(fn {peer_id, _score} -> Task.start(__MODULE__, :challenge_peer, [peer_id]) end) end end - defp calculate_prune_size(peerbook, len) do + defp calculate_prune_size(len) do (len * @prune_percentage) |> round() |> min(@max_prune_size) From 877b43f9a7e67383e3a3cf4f8966f45d660c0977 Mon Sep 17 00:00:00 2001 From: Rodrigo Oliveri Date: Wed, 2 Oct 2024 20:25:36 -0300 Subject: [PATCH 16/17] rolledback the prune algo --- lib/lambda_ethereum_consensus/p2p/peerbook.ex | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/lib/lambda_ethereum_consensus/p2p/peerbook.ex b/lib/lambda_ethereum_consensus/p2p/peerbook.ex index 406b24298..f417304a0 100644 --- a/lib/lambda_ethereum_consensus/p2p/peerbook.ex +++ b/lib/lambda_ethereum_consensus/p2p/peerbook.ex @@ -8,7 +8,7 @@ defmodule LambdaEthereumConsensus.P2P.Peerbook do alias LambdaEthereumConsensus.Utils @initial_score 100 - @penalizing_score 25 + @penalizing_score 15 @target_peers 128 @max_prune_size 8 @prune_percentage 0.05 @@ -116,10 +116,13 @@ defmodule LambdaEthereumConsensus.P2P.Peerbook do if prune_size > 0 do Logger.debug("[Peerbook] Pruning #{prune_size} peers by challenge") + n = :rand.uniform(len) + peerbook - |> Enum.sort_by(fn {_peer_id, score} -> -score end) - |> Enum.take(prune_size) - |> Enum.each(fn {peer_id, _score} -> Task.start(__MODULE__, :challenge_peer, [peer_id]) end) + |> Map.keys() + |> Stream.drop(n) + |> Stream.take(prune_size) + |> Enum.each(fn peer_id -> Task.start(__MODULE__, :challenge_peer, [peer_id]) end) end end From cce111e2579f733705eeb852b2d62238e565f37e Mon Sep 17 00:00:00 2001 From: Rodrigo Oliveri Date: Mon, 7 Oct 2024 17:06:30 -0300 Subject: [PATCH 17/17] Remove an old TODO --- lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex b/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex index 9caa8ef77..44b046ebd 100644 --- a/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex +++ b/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex @@ -118,7 +118,7 @@ defmodule LambdaEthereumConsensus.ForkChoice do @doc """ Get the current chain slot based on the system time. - TODO: There are just 2 uses of this function outside this module: + There are just 2 uses of this function outside this module: - At the begining of SyncBlocks.run/1 function, to get the head slot - In the Helpers.block_root_by_block_id/1 function """