diff --git a/Makefile b/Makefile index 617e55619..5e073d07f 100644 --- a/Makefile +++ b/Makefile @@ -162,9 +162,17 @@ iex: compile-all test-iex: MIX_ENV=test iex -S mix run -- --mode db +################## +# NODE RUNNERS +DYSCOVERY_PORT ?= 30303 + #▶️ checkpoint-sync: @ Run an interactive terminal using checkpoint sync. checkpoint-sync: compile-all - iex -S mix run -- --checkpoint-sync-url https://mainnet-checkpoint-sync.stakely.io/ --metrics + iex -S mix run -- --checkpoint-sync-url https://mainnet-checkpoint-sync.stakely.io/ --metrics --discovery-port $(DYSCOVERY_PORT) + +#▶️ checkpoint-sync.logfile: @ Run an interactive terminal using checkpoint sync with a log file. +checkpoint-sync.logfile: compile-all + iex -S mix run -- --checkpoint-sync-url https://mainnet-checkpoint-sync.stakely.io/ --metrics --log-file ./logs/mainnet.log #▶️ checkpoint-sync.logfile: @ Run an interactive terminal using checkpoint sync with a log file. checkpoint-sync.logfile: compile-all @@ -172,7 +180,11 @@ checkpoint-sync.logfile: compile-all #▶️ sepolia: @ Run an interactive terminal using sepolia network sepolia: compile-all - iex -S mix run -- --checkpoint-sync-url https://sepolia.beaconstate.info --network sepolia --metrics + iex -S mix run -- --checkpoint-sync-url https://sepolia.beaconstate.info --network sepolia --metrics --discovery-port $(DYSCOVERY_PORT) + +#▶️ sepolia.logfile: @ Run an interactive terminal using sepolia network with a log file +sepolia.logfile: compile-all + iex -S mix run -- --checkpoint-sync-url https://sepolia.beaconstate.info --network sepolia --metrics --log-file ./logs/sepolia.log #▶️ sepolia.logfile: @ Run an interactive terminal using sepolia network with a log file sepolia.logfile: compile-all @@ -180,7 +192,11 @@ sepolia.logfile: compile-all #▶️ holesky: @ Run an interactive terminal using holesky network holesky: compile-all - iex -S mix run -- --checkpoint-sync-url https://checkpoint-sync.holesky.ethpandaops.io --network holesky + iex -S mix run -- --checkpoint-sync-url https://checkpoint-sync.holesky.ethpandaops.io --network holesky --discovery-port $(DYSCOVERY_PORT) + +#▶️ holesky.logfile: @ Run an interactive terminal using holesky network with a log file +holesky.logfile: compile-all + iex -S mix run -- --checkpoint-sync-url https://checkpoint-sync.holesky.ethpandaops.io --network holesky --log-file ./logs/holesky.log #▶️ holesky.logfile: @ Run an interactive terminal using holesky network with a log file holesky.logfile: compile-all @@ -188,7 +204,11 @@ holesky.logfile: compile-all #▶️ gnosis: @ Run an interactive terminal using gnosis network gnosis: compile-all - iex -S mix run -- --checkpoint-sync-url https://checkpoint.gnosischain.com --network gnosis + iex -S mix run -- --checkpoint-sync-url https://checkpoint.gnosischain.com --network gnosis --discovery-port $(DYSCOVERY_PORT) + +#▶️ gnosis.logfile: @ Run an interactive terminal using gnosis network with a log file +gnosis.logfile: compile-all + iex -S mix run -- --checkpoint-sync-url https://checkpoint.gnosischain.com --network gnosis --log-file ./logs/gnosis.log --discovery-port $(DYSCOVERY_PORT) #🔴 test: @ Run tests test: compile-all diff --git a/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex b/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex index 9caa8ef77..44b046ebd 100644 --- a/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex +++ b/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex @@ -118,7 +118,7 @@ defmodule LambdaEthereumConsensus.ForkChoice do @doc """ Get the current chain slot based on the system time. - TODO: There are just 2 uses of this function outside this module: + There are just 2 uses of this function outside this module: - At the begining of SyncBlocks.run/1 function, to get the head slot - In the Helpers.block_root_by_block_id/1 function """ diff --git a/lib/lambda_ethereum_consensus/p2p/blob_downloader.ex b/lib/lambda_ethereum_consensus/p2p/blob_downloader.ex index 4493a4fc0..d5e9de110 100644 --- a/lib/lambda_ethereum_consensus/p2p/blob_downloader.ex +++ b/lib/lambda_ethereum_consensus/p2p/blob_downloader.ex @@ -32,7 +32,6 @@ defmodule LambdaEthereumConsensus.P2P.BlobDownloader do def request_blobs_by_range(slot, count, on_blobs, retries) do Logger.debug("Requesting blobs", slot: slot) - # TODO: handle no-peers asynchronously? peer_id = get_some_peer() # NOTE: BeaconBlocksByRangeRequest == BlobSidecarsByRangeRequest @@ -62,7 +61,7 @@ defmodule LambdaEthereumConsensus.P2P.BlobDownloader do P2P.Peerbook.penalize_peer(peer_id) if retries > 0 do - Logger.debug("Retrying request for #{count} blobs", slot: slot) + Logger.debug("Retrying request for #{count} blobs: #{inspect(reason)}", slot: slot) request_blobs_by_range(slot, count, on_blobs, retries - 1) {:ok, store} else @@ -123,8 +122,8 @@ defmodule LambdaEthereumConsensus.P2P.BlobDownloader do defp get_some_peer() do case P2P.Peerbook.get_some_peer() do nil -> - Process.sleep(100) - get_some_peer() + # TODO: handle no-peers asynchronously + raise "No peers available to request blobs from." peer_id -> peer_id diff --git a/lib/lambda_ethereum_consensus/p2p/block_downloader.ex b/lib/lambda_ethereum_consensus/p2p/block_downloader.ex index 6522fa84a..aa9ae0395 100644 --- a/lib/lambda_ethereum_consensus/p2p/block_downloader.ex +++ b/lib/lambda_ethereum_consensus/p2p/block_downloader.ex @@ -66,7 +66,6 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do def request_blocks_by_range(slot, count, on_blocks, retries) do Logger.debug("Requesting block", slot: slot) - # TODO: handle no-peers asynchronously? peer_id = get_some_peer() request = @@ -173,7 +172,8 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do if retries > 0 do :telemetry.execute([:network, :request], %{blocks: 0}, Map.put(tags, :result, "retry")) pretty_roots = Enum.map_join(roots, ", ", &Base.encode16/1) - Logger.debug("Retrying request for blocks with roots #{pretty_roots}") + + Logger.debug("Retrying request for block roots #{pretty_roots}: #{inspect(reason)}") request_blocks_by_root(roots, on_blocks, retries - 1) {:ok, store} else @@ -186,8 +186,8 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do defp get_some_peer() do case P2P.Peerbook.get_some_peer() do nil -> - Process.sleep(100) - get_some_peer() + # TODO: handle no-peers asynchronously + raise "No peers available to request blocks from." peer_id -> peer_id diff --git a/lib/lambda_ethereum_consensus/p2p/peerbook.ex b/lib/lambda_ethereum_consensus/p2p/peerbook.ex index 96b9c34a9..f417304a0 100644 --- a/lib/lambda_ethereum_consensus/p2p/peerbook.ex +++ b/lib/lambda_ethereum_consensus/p2p/peerbook.ex @@ -2,10 +2,13 @@ defmodule LambdaEthereumConsensus.P2P.Peerbook do @moduledoc """ General peer bookkeeping. """ + require Logger alias LambdaEthereumConsensus.Libp2pPort alias LambdaEthereumConsensus.Store.KvSchema + alias LambdaEthereumConsensus.Utils @initial_score 100 + @penalizing_score 15 @target_peers 128 @max_prune_size 8 @prune_percentage 0.05 @@ -41,24 +44,51 @@ defmodule LambdaEthereumConsensus.P2P.Peerbook do Get some peer from the peerbook. """ def get_some_peer() do - # TODO: use some algorithm to pick a good peer, for now it's random + # TODO: This is a very naive implementation of a peer selection algorithm, + # this sorts the peers every time. The same is true for the pruning. peerbook = fetch_peerbook!() if peerbook == %{} do nil else - {peer_id, _score} = Enum.random(peerbook) - peer_id + peerbook + |> Enum.sort_by(fn {_peer_id, score} -> -score end) + |> Enum.take(5) + |> Enum.random() + |> elem(0) end end def penalize_peer(peer_id) do - fetch_peerbook!() |> Map.delete(peer_id) |> store_peerbook() + Logger.debug("[Peerbook] Penalizing peer: #{inspect(Utils.format_shorten_binary(peer_id))}") + + peer_score = fetch_peerbook!() |> Map.get(peer_id) + + case peer_score do + nil -> + :ok + + score when score - @penalizing_score <= 0 -> + Logger.debug("[Peerbook] Removing peer: #{inspect(Utils.format_shorten_binary(peer_id))}") + + fetch_peerbook!() + |> Map.delete(peer_id) + |> store_peerbook() + + score -> + fetch_peerbook!() + |> Map.put(peer_id, score - @penalizing_score) + |> store_peerbook() + end end def handle_new_peer(peer_id) do peerbook = fetch_peerbook!() + Logger.debug( + "[Peerbook] New peer connected: #{inspect(Utils.format_shorten_binary(peer_id))}" + ) + if not Map.has_key?(peerbook, peer_id) do :telemetry.execute([:peers, :connection], %{id: peer_id}, %{result: "success"}) Map.put(peerbook, peer_id, @initial_score) |> store_peerbook() @@ -81,14 +111,10 @@ defmodule LambdaEthereumConsensus.P2P.Peerbook do defp prune() do peerbook = fetch_peerbook!() len = map_size(peerbook) + prune_size = if len > 0, do: calculate_prune_size(len), else: 0 - if len != 0 do - prune_size = - (len * @prune_percentage) - |> round() - |> min(@max_prune_size) - |> min(len - @target_peers) - |> max(0) + if prune_size > 0 do + Logger.debug("[Peerbook] Pruning #{prune_size} peers by challenge") n = :rand.uniform(len) @@ -100,6 +126,14 @@ defmodule LambdaEthereumConsensus.P2P.Peerbook do end end + defp calculate_prune_size(len) do + (len * @prune_percentage) + |> round() + |> min(@max_prune_size) + |> min(len - @target_peers) + |> max(0) + end + defp store_peerbook(peerbook), do: put("", peerbook) defp fetch_peerbook(), do: get("") diff --git a/lib/libp2p_port.ex b/lib/libp2p_port.ex index 52f51f8ed..48f5a7758 100644 --- a/lib/libp2p_port.ex +++ b/lib/libp2p_port.ex @@ -9,8 +9,6 @@ defmodule LambdaEthereumConsensus.Libp2pPort do use GenServer - @tick_time 1000 - alias LambdaEthereumConsensus.Beacon.PendingBlocks alias LambdaEthereumConsensus.Beacon.SyncBlocks alias LambdaEthereumConsensus.ForkChoice @@ -84,7 +82,8 @@ defmodule LambdaEthereumConsensus.Libp2pPort do discovery_addresses: [String.t()] } - @sync_delay_millis 10_000 + @tick_time 1000 + @sync_delay_millis 15_000 ###################### ### API