Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: base not found in tree #1318

Merged
merged 14 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions .iex.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
alias LambdaEthereumConsensus.ForkChoice
alias LambdaEthereumConsensus.ForkChoice.Head
alias LambdaEthereumConsensus.StateTransition.Misc
alias LambdaEthereumConsensus.Store.Blocks
alias LambdaEthereumConsensus.Store.StoreDb
alias LambdaEthereumConsensus.Utils

# Some convenience functions for debugging
store = fn -> StoreDb.fetch_store() |> elem(1) end

head_root = fn -> store.() |> Head.get_head() |> elem(1) |> Utils.format_binary() end
head_slot = fn -> store.() |> Head.get_head() |> elem(1) |> Blocks.get_block_info() |> then(& &1.signed_block.message.slot) end

store_root = fn -> store.().root end
store_slot = fn -> store.().slot end
store_calculated_slot = fn -> store.() |> ForkChoice.get_current_slot() end

epoch = fn slot -> slot |> Misc.compute_epoch_at_slot() end

block_info = fn "0x"<>root -> root |> Base.decode16(case: :lower) |> elem(1) |> Blocks.get_block_info() end

blocks_by_status = fn status -> Blocks.get_blocks_with_status(status) |> elem(1) end
blocks_by_status_count = fn status -> blocks_by_status.(status) |> Enum.count() end
22 changes: 15 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -162,33 +162,41 @@ iex: compile-all
test-iex:
MIX_ENV=test iex -S mix run -- --mode db

##################
# NODE RUNNERS
DYSCOVERY_PORT ?= 30303

#▶️ checkpoint-sync: @ Run an interactive terminal using checkpoint sync.
checkpoint-sync: compile-all
iex -S mix run -- --checkpoint-sync-url https://mainnet-checkpoint-sync.stakely.io/ --metrics
iex -S mix run -- --checkpoint-sync-url https://mainnet-checkpoint-sync.stakely.io/ --metrics --discovery-port $(DYSCOVERY_PORT)

#▶️ checkpoint-sync.logfile: @ Run an interactive terminal using checkpoint sync with a log file.
checkpoint-sync.logfile: compile-all
iex -S mix run -- --checkpoint-sync-url https://mainnet-checkpoint-sync.stakely.io/ --metrics --log-file ./logs/mainnet.log
iex -S mix run -- --checkpoint-sync-url https://mainnet-checkpoint-sync.stakely.io/ --metrics --log-file ./logs/mainnet.log --discovery-port $(DYSCOVERY_PORT)

#▶️ sepolia: @ Run an interactive terminal using sepolia network
sepolia: compile-all
iex -S mix run -- --checkpoint-sync-url https://sepolia.beaconstate.info --network sepolia --metrics
iex -S mix run -- --checkpoint-sync-url https://sepolia.beaconstate.info --network sepolia --metrics --discovery-port $(DYSCOVERY_PORT)

#▶️ sepolia.logfile: @ Run an interactive terminal using sepolia network with a log file
sepolia.logfile: compile-all
iex -S mix run -- --checkpoint-sync-url https://sepolia.beaconstate.info --network sepolia --metrics --log-file ./logs/sepolia.log
iex -S mix run -- --checkpoint-sync-url https://sepolia.beaconstate.info --network sepolia --metrics --log-file ./logs/sepolia.log --discovery-port $(DYSCOVERY_PORT)

#▶️ holesky: @ Run an interactive terminal using holesky network
holesky: compile-all
iex -S mix run -- --checkpoint-sync-url https://checkpoint-sync.holesky.ethpandaops.io --network holesky
iex -S mix run -- --checkpoint-sync-url https://checkpoint-sync.holesky.ethpandaops.io --network holesky --discovery-port $(DYSCOVERY_PORT)

#▶️ holesky.logfile: @ Run an interactive terminal using holesky network with a log file
holesky.logfile: compile-all
iex -S mix run -- --checkpoint-sync-url https://checkpoint-sync.holesky.ethpandaops.io --network holesky --log-file ./logs/holesky.log
iex -S mix run -- --checkpoint-sync-url https://checkpoint-sync.holesky.ethpandaops.io --network holesky --log-file ./logs/holesky.log --discovery-port $(DYSCOVERY_PORT)

#▶️ gnosis: @ Run an interactive terminal using gnosis network
gnosis: compile-all
iex -S mix run -- --checkpoint-sync-url https://checkpoint.gnosischain.com --network gnosis
iex -S mix run -- --checkpoint-sync-url https://checkpoint.gnosischain.com --network gnosis --discovery-port $(DYSCOVERY_PORT)

#▶️ gnosis.logfile: @ Run an interactive terminal using gnosis network with a log file
gnosis.logfile: compile-all
iex -S mix run -- --checkpoint-sync-url https://checkpoint.gnosischain.com --network gnosis --log-file ./logs/gnosis.log --discovery-port $(DYSCOVERY_PORT)

#🔴 test: @ Run tests
test: compile-all
Expand Down
55 changes: 40 additions & 15 deletions lib/lambda_ethereum_consensus/beacon/pending_blocks.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
alias LambdaEthereumConsensus.P2P.BlockDownloader
alias LambdaEthereumConsensus.Store.BlobDb
alias LambdaEthereumConsensus.Store.Blocks
alias LambdaEthereumConsensus.Utils
alias Types.BlockInfo
alias Types.SignedBeaconBlock
alias Types.Store

@type block_status :: :pending | :invalid | :download | :download_blobs | :unknown
@type block_status ::
:transitioned | :pending | :invalid | :download | :download_blobs | :unknown
@type block_info ::
{SignedBeaconBlock.t(), :pending | :download_blobs}
| {nil, :invalid | :download}
Expand All @@ -40,15 +42,18 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
def add_block(store, signed_block) do
block_info = BlockInfo.from_block(signed_block)
loaded_block = Blocks.get_block_info(block_info.root)
log_md = [slot: signed_block.message.slot, root: block_info.root]

# If the block is new or was to be downloaded, we store it.
if is_nil(loaded_block) or loaded_block.status == :download do
missing_blobs = missing_blobs(block_info)

if Enum.empty?(missing_blobs) do
Logger.debug("[PendingBlocks] No missing blobs for block, process it", log_md)
Blocks.new_block_info(block_info)
process_block_and_check_children(store, block_info)
else
Logger.debug("[PendingBlocks] Missing blobs for block, scheduling download", log_md)
BlobDownloader.request_blobs_by_root(missing_blobs, &process_blobs/2, @download_retries)

block_info
Expand All @@ -72,6 +77,7 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
{:ok, blocks} ->
blocks
|> Enum.sort_by(fn %BlockInfo{} = block_info -> block_info.signed_block.message.slot end)
# Could we process just one/a small amount of blocks at a time? would it make more sense?
|> Enum.reduce(store, fn block_info, store ->
{store, _state} = process_block(store, block_info)
store
Expand Down Expand Up @@ -101,16 +107,28 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
end
end

defp process_block(store, block_info) do
defp process_block(store, %BlockInfo{signed_block: %{message: message}} = block_info) do
if block_info.status != :pending do
Logger.error("Called process block for a block that's not ready: #{block_info}")
Logger.error(
"[PendingBlocks] Called process block for a block that's not ready: #{block_info}"
)
end

parent_root = block_info.signed_block.message.parent_root
log_md = [slot: message.slot, root: block_info.root]
parent_root = message.parent_root

Logger.debug(
"[PendingBlocks] Processing block, parent: #{Utils.format_binary(parent_root)}",
log_md
)

case Blocks.get_block_info(parent_root) do
nil ->
Logger.debug("[PendingBlocks] Add parent to download #{inspect(parent_root)}")
Logger.debug(
"[PendingBlocks] Add parent with root: #{Utils.format_shorten_binary(parent_root)} to download",
log_md
)

Blocks.add_block_to_download(parent_root)

BlockDownloader.request_blocks_by_root(
Expand All @@ -127,19 +145,25 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
{store, :download_pending}

%BlockInfo{status: :invalid} ->
Logger.warning(
"[PendingBlocks] Parent block with root:#{Utils.format_shorten_binary(parent_root)} is invalid, making this block also invalid",
log_md
)

Blocks.change_status(block_info, :invalid)
{store, :invalid}

%BlockInfo{status: :transitioned} ->
case ForkChoice.on_block(store, block_info) do
{:ok, store} ->
Logger.debug("[PendingBlocks] Block transitioned after ForkChoice.on_block/2", log_md)
Blocks.change_status(block_info, :transitioned)
{store, :transitioned}

{:error, reason, store} ->
Logger.error("[PendingBlocks] Saving block as invalid #{reason}",
slot: block_info.signed_block.message.slot,
root: block_info.root
Logger.error(
"[PendingBlocks] Saving block as invalid after ForkChoice.on_block/2 error: #{reason}",
log_md
)

Blocks.change_status(block_info, :invalid)
Expand All @@ -157,15 +181,15 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do

defp process_downloaded_block(store, {:error, reason}) do
# We might want to declare a block invalid here.
Logger.error("Error downloading block: #{inspect(reason)}")
Logger.error("[PendingBlocks] Error downloading block: #{inspect(reason)}")
{:ok, store}
end

defp process_blobs(store, {:ok, blobs}), do: {:ok, add_blobs(store, blobs)}
def process_blobs(store, {:ok, blobs}), do: {:ok, add_blobs(store, blobs)}

defp process_blobs(store, {:error, reason}) do
def process_blobs(store, {:error, reason}) do
# We might want to declare a block invalid here.
Logger.error("Error downloading blobs: #{inspect(reason)}")
Logger.error("[PendingBlocks] Error downloading blobs: #{inspect(reason)}")
{:ok, store}
end

Expand All @@ -178,19 +202,20 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
|> Enum.map(&BlobDb.store_blob/1)
|> Enum.uniq()
|> Enum.reduce(store, fn root, store ->
with %BlockInfo{} = block_info <- Blocks.get_block_info(root),
with %BlockInfo{status: :download_blobs} = block_info <- Blocks.get_block_info(root),
[] <- missing_blobs(block_info) do
block_info
|> Blocks.change_status(:pending)
|> then(&process_block_and_check_children(store, &1))
else
_ -> store
_ ->
store
end
end)
end

@spec missing_blobs(BlockInfo.t()) :: [Types.BlobIdentifier.t()]
defp missing_blobs(%BlockInfo{root: root, signed_block: signed_block}) do
def missing_blobs(%BlockInfo{root: root, signed_block: signed_block}) do
signed_block.message.body.blob_kzg_commitments
|> Stream.with_index()
|> Enum.filter(&blob_needs_download?(&1, root))
Expand Down
5 changes: 5 additions & 0 deletions lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ defmodule LambdaEthereumConsensus.ForkChoice do
|> tap(fn store ->
StoreDb.persist_store(store)
Logger.info("[Fork choice] Added new block", slot: slot, root: block_root)

Logger.info("[Fork choice] Recomputed head",
slot: store.head_slot,
root: store.head_root
)
end)
|> then(&{:ok, &1})

Expand Down
7 changes: 3 additions & 4 deletions lib/lambda_ethereum_consensus/p2p/blob_downloader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ defmodule LambdaEthereumConsensus.P2P.BlobDownloader do
def request_blobs_by_range(slot, count, on_blobs, retries) do
Logger.debug("Requesting blobs", slot: slot)

# TODO: handle no-peers asynchronously?
peer_id = get_some_peer()

# NOTE: BeaconBlocksByRangeRequest == BlobSidecarsByRangeRequest
Expand Down Expand Up @@ -62,7 +61,7 @@ defmodule LambdaEthereumConsensus.P2P.BlobDownloader do
P2P.Peerbook.penalize_peer(peer_id)

if retries > 0 do
Logger.debug("Retrying request for #{count} blobs", slot: slot)
Logger.debug("Retrying request for #{count} blobs: #{inspect(reason)}", slot: slot)
request_blobs_by_range(slot, count, on_blobs, retries - 1)
{:ok, store}
else
Expand Down Expand Up @@ -123,8 +122,8 @@ defmodule LambdaEthereumConsensus.P2P.BlobDownloader do
defp get_some_peer() do
case P2P.Peerbook.get_some_peer() do
nil ->
Process.sleep(100)
get_some_peer()
# TODO: handle no-peers asynchronously
raise "No peers available to request blobs from."

peer_id ->
peer_id
Expand Down
8 changes: 4 additions & 4 deletions lib/lambda_ethereum_consensus/p2p/block_downloader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do
def request_blocks_by_range(slot, count, on_blocks, retries) do
Logger.debug("Requesting block", slot: slot)

# TODO: handle no-peers asynchronously?
peer_id = get_some_peer()

request =
Expand Down Expand Up @@ -173,7 +172,8 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do
if retries > 0 do
:telemetry.execute([:network, :request], %{blocks: 0}, Map.put(tags, :result, "retry"))
pretty_roots = Enum.map_join(roots, ", ", &Base.encode16/1)
Logger.debug("Retrying request for blocks with roots #{pretty_roots}")

Logger.debug("Retrying request for block roots #{pretty_roots}: #{inspect(reason)}")
request_blocks_by_root(roots, on_blocks, retries - 1)
{:ok, store}
else
Expand All @@ -186,8 +186,8 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do
defp get_some_peer() do
case P2P.Peerbook.get_some_peer() do
nil ->
Process.sleep(100)
get_some_peer()
# TODO: handle no-peers asynchronously
raise "No peers available to request blocks from."

peer_id ->
peer_id
Expand Down
56 changes: 45 additions & 11 deletions lib/lambda_ethereum_consensus/p2p/peerbook.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ defmodule LambdaEthereumConsensus.P2P.Peerbook do
@moduledoc """
General peer bookkeeping.
"""
require Logger
alias LambdaEthereumConsensus.Libp2pPort
alias LambdaEthereumConsensus.Store.KvSchema
alias LambdaEthereumConsensus.Utils

@initial_score 100
@penalizing_score 15
@target_peers 128
@max_prune_size 8
@prune_percentage 0.05
Expand Down Expand Up @@ -41,24 +44,51 @@ defmodule LambdaEthereumConsensus.P2P.Peerbook do
Get some peer from the peerbook.
"""
def get_some_peer() do
# TODO: use some algorithm to pick a good peer, for now it's random
# TODO: This is a very naive implementation of a peer selection algorithm,
# this sorts the peers every time. The same is true for the pruning.
peerbook = fetch_peerbook!()

if peerbook == %{} do
nil
else
{peer_id, _score} = Enum.random(peerbook)
peer_id
peerbook
|> Enum.sort_by(fn {_peer_id, score} -> -score end)
|> Enum.take(5)
|> Enum.random()
|> elem(0)
end
end

def penalize_peer(peer_id) do
fetch_peerbook!() |> Map.delete(peer_id) |> store_peerbook()
Logger.debug("[Peerbook] Penalizing peer: #{inspect(Utils.format_shorten_binary(peer_id))}")

peer_score = fetch_peerbook!() |> Map.get(peer_id)

case peer_score do
nil ->
:ok

score when score - @penalizing_score <= 0 ->
Logger.debug("[Peerbook] Removing peer: #{inspect(Utils.format_shorten_binary(peer_id))}")

fetch_peerbook!()
|> Map.delete(peer_id)
|> store_peerbook()

score ->
fetch_peerbook!()
|> Map.put(peer_id, score - @penalizing_score)
|> store_peerbook()
end
end

def handle_new_peer(peer_id) do
peerbook = fetch_peerbook!()

Logger.debug(
"[Peerbook] New peer connected: #{inspect(Utils.format_shorten_binary(peer_id))}"
)

if not Map.has_key?(peerbook, peer_id) do
:telemetry.execute([:peers, :connection], %{id: peer_id}, %{result: "success"})
Map.put(peerbook, peer_id, @initial_score) |> store_peerbook()
Expand All @@ -81,14 +111,10 @@ defmodule LambdaEthereumConsensus.P2P.Peerbook do
defp prune() do
peerbook = fetch_peerbook!()
len = map_size(peerbook)
prune_size = if len > 0, do: calculate_prune_size(len), else: 0

if len != 0 do
prune_size =
(len * @prune_percentage)
|> round()
|> min(@max_prune_size)
|> min(len - @target_peers)
|> max(0)
if prune_size > 0 do
Logger.debug("[Peerbook] Pruning #{prune_size} peers by challenge")

n = :rand.uniform(len)

Expand All @@ -100,6 +126,14 @@ defmodule LambdaEthereumConsensus.P2P.Peerbook do
end
end

defp calculate_prune_size(len) do
(len * @prune_percentage)
|> round()
|> min(@max_prune_size)
|> min(len - @target_peers)
|> max(0)
end

defp store_peerbook(peerbook), do: put("", peerbook)

defp fetch_peerbook(), do: get("")
Expand Down
Loading