Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement PeerDAS subnet decoupling (aka custody groups) #6736

Open
wants to merge 6 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/block_verification_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl<E: EthSpec> RpcBlock<E> {
let inner = if !custody_columns.is_empty() {
RpcBlockInner::BlockAndCustodyColumns(
block,
RuntimeVariableList::new(custody_columns, spec.number_of_columns)?,
RuntimeVariableList::new(custody_columns, spec.number_of_columns as usize)?,
)
} else {
RpcBlockInner::Block(block)
Expand Down
18 changes: 5 additions & 13 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,21 +116,13 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
spec: Arc<ChainSpec>,
log: Logger,
) -> Result<Self, AvailabilityCheckError> {
let custody_subnet_count = if import_all_data_columns {
spec.data_column_sidecar_subnet_count as usize
} else {
spec.custody_requirement as usize
};

let subnet_sampling_size =
std::cmp::max(custody_subnet_count, spec.samples_per_slot as usize);
let sampling_column_count =
subnet_sampling_size.saturating_mul(spec.data_columns_per_subnet());
let custody_group_count = spec.custody_group_count(import_all_data_columns);
let sampling_size = spec.sampling_size(custody_group_count);

let inner = DataAvailabilityCheckerInner::new(
OVERFLOW_LRU_CAPACITY,
store,
sampling_column_count,
sampling_size as usize,
spec.clone(),
)?;
Ok(Self {
Expand All @@ -147,7 +139,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
}

pub(crate) fn is_supernode(&self) -> bool {
self.get_sampling_column_count() == self.spec.number_of_columns
self.get_sampling_column_count() == self.spec.number_of_columns as usize
}

/// Checks if the block root is currenlty in the availability cache awaiting import because
Expand Down Expand Up @@ -424,7 +416,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
.map(CustodyDataColumn::into_inner)
.collect::<Vec<_>>();
let all_data_columns =
RuntimeVariableList::from_vec(all_data_columns, self.spec.number_of_columns);
RuntimeVariableList::from_vec(all_data_columns, self.spec.number_of_columns as usize);

// verify kzg for all data columns at once
if !all_data_columns.is_empty() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {

// If we're sampling all columns, it means we must be custodying all columns.
let custody_column_count = self.sampling_column_count();
let total_column_count = self.spec.number_of_columns;
let total_column_count = self.spec.number_of_columns as usize;
let received_column_count = pending_components.verified_data_columns.len();

if pending_components.reconstruction_started {
Expand All @@ -555,7 +555,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
if custody_column_count != total_column_count {
return ReconstructColumnsDecision::No("not required for full node");
}
if received_column_count == self.spec.number_of_columns {
if received_column_count == total_column_count {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not >=?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's dangerous to allow for unexpected columns, because it means we either have a duplicate or we have a column that we don't expect - which may unintentionally satisfy the custody column count. I think we need to make sure it's an exact match, given that we're only checking the count.

The code here validates for duplicate:

if !self.data_column_exists(data_column.index()) {
self.verified_data_columns.push(data_column);
}

Gossip validation should ensure we only get the columns we subscribe to.

Do you see any valid scenario where this would be>?

return ReconstructColumnsDecision::No("all columns received");
}
if received_column_count < total_column_count / 2 {
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/beacon_chain/src/data_column_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ fn verify_data_column_sidecar<E: EthSpec>(
data_column: &DataColumnSidecar<E>,
spec: &ChainSpec,
) -> Result<(), GossipDataColumnError> {
if data_column.index >= spec.number_of_columns as u64 {
if data_column.index >= spec.number_of_columns {
return Err(GossipDataColumnError::InvalidColumnIndex(data_column.index));
}
if data_column.kzg_commitments.is_empty() {
Expand Down Expand Up @@ -611,7 +611,7 @@ fn verify_index_matches_subnet<E: EthSpec>(
spec: &ChainSpec,
) -> Result<(), GossipDataColumnError> {
let expected_subnet: u64 =
DataColumnSubnetId::from_column_index::<E>(data_column.index as usize, spec).into();
DataColumnSubnetId::from_column_index(data_column.index, spec).into();
if expected_subnet != subnet {
return Err(GossipDataColumnError::InvalidSubnetId {
received: subnet,
Expand Down
6 changes: 3 additions & 3 deletions beacon_node/beacon_chain/src/kzg_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ fn build_data_column_sidecars<E: EthSpec>(
blob_cells_and_proofs_vec: Vec<CellsAndKzgProofs>,
spec: &ChainSpec,
) -> Result<DataColumnSidecarList<E>, String> {
let number_of_columns = spec.number_of_columns;
let number_of_columns = spec.number_of_columns as usize;
let mut columns = vec![Vec::with_capacity(E::max_blobs_per_block()); number_of_columns];
let mut column_kzg_proofs =
vec![Vec::with_capacity(E::max_blobs_per_block()); number_of_columns];
Expand Down Expand Up @@ -341,7 +341,7 @@ mod test {
.kzg_commitments_merkle_proof()
.unwrap();

assert_eq!(column_sidecars.len(), spec.number_of_columns);
assert_eq!(column_sidecars.len(), spec.number_of_columns as usize);
for (idx, col_sidecar) in column_sidecars.iter().enumerate() {
assert_eq!(col_sidecar.index, idx as u64);

Expand Down Expand Up @@ -374,7 +374,7 @@ mod test {
)
.unwrap();

for i in 0..spec.number_of_columns {
for i in 0..spec.number_of_columns as usize {
assert_eq!(reconstructed_columns.get(i), column_sidecars.get(i), "{i}");
}
}
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/observed_data_sidecars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl<E: EthSpec> ObservableDataSidecar for DataColumnSidecar<E> {
}

fn max_num_of_items(spec: &ChainSpec) -> usize {
spec.number_of_columns
spec.number_of_columns as usize
}
}

Expand Down
10 changes: 3 additions & 7 deletions beacon_node/http_api/src/publish_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,9 +395,8 @@ fn build_gossip_verified_data_columns<T: BeaconChainTypes>(
let gossip_verified_data_columns = data_column_sidecars
.into_iter()
.map(|data_column_sidecar| {
let column_index = data_column_sidecar.index as usize;
let subnet =
DataColumnSubnetId::from_column_index::<T::EthSpec>(column_index, &chain.spec);
let column_index = data_column_sidecar.index;
let subnet = DataColumnSubnetId::from_column_index(column_index, &chain.spec);
let gossip_verified_column =
GossipVerifiedDataColumn::new(data_column_sidecar, subnet.into(), chain);

Expand Down Expand Up @@ -520,10 +519,7 @@ fn publish_column_sidecars<T: BeaconChainTypes>(
let pubsub_messages = data_column_sidecars
.into_iter()
.map(|data_col| {
let subnet = DataColumnSubnetId::from_column_index::<T::EthSpec>(
data_col.index as usize,
&chain.spec,
);
let subnet = DataColumnSubnetId::from_column_index(data_col.index, &chain.spec);
PubsubMessage::DataColumnSidecar(Box::new((subnet, data_col)))
})
.collect::<Vec<_>>();
Expand Down
46 changes: 23 additions & 23 deletions beacon_node/lighthouse_network/src/discovery/enr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ pub const ETH2_ENR_KEY: &str = "eth2";
pub const ATTESTATION_BITFIELD_ENR_KEY: &str = "attnets";
/// The ENR field specifying the sync committee subnet bitfield.
pub const SYNC_COMMITTEE_BITFIELD_ENR_KEY: &str = "syncnets";
/// The ENR field specifying the peerdas custody subnet count.
pub const PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY: &str = "csc";
/// The ENR field specifying the peerdas custody group count.
pub const PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY: &str = "cgc";

/// Extension trait for ENR's within Eth2.
pub trait Eth2Enr {
Expand All @@ -38,8 +38,8 @@ pub trait Eth2Enr {
&self,
) -> Result<EnrSyncCommitteeBitfield<E>, &'static str>;

/// The peerdas custody subnet count associated with the ENR.
fn custody_subnet_count<E: EthSpec>(&self, spec: &ChainSpec) -> Result<u64, &'static str>;
/// The peerdas custody group count associated with the ENR.
fn custody_group_count<E: EthSpec>(&self, spec: &ChainSpec) -> Result<u64, &'static str>;

fn eth2(&self) -> Result<EnrForkId, &'static str>;
}
Expand Down Expand Up @@ -67,16 +67,16 @@ impl Eth2Enr for Enr {
.map_err(|_| "Could not decode the ENR syncnets bitfield")
}

fn custody_subnet_count<E: EthSpec>(&self, spec: &ChainSpec) -> Result<u64, &'static str> {
let csc = self
.get_decodable::<u64>(PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY)
.ok_or("ENR custody subnet count non-existent")?
.map_err(|_| "Could not decode the ENR custody subnet count")?;
fn custody_group_count<E: EthSpec>(&self, spec: &ChainSpec) -> Result<u64, &'static str> {
let cgc = self
.get_decodable::<u64>(PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY)
.ok_or("ENR custody group count non-existent")?
.map_err(|_| "Could not decode the ENR custody group count")?;

if csc >= spec.custody_requirement && csc <= spec.data_column_sidecar_subnet_count {
Ok(csc)
if (spec.custody_requirement..=spec.number_of_custody_groups).contains(&cgc) {
Ok(cgc)
} else {
Err("Invalid custody subnet count in ENR")
Err("Invalid custody group count in ENR")
}
}

Expand Down Expand Up @@ -253,14 +253,14 @@ pub fn build_enr<E: EthSpec>(
&bitfield.as_ssz_bytes().into(),
);

// only set `csc` if PeerDAS fork epoch has been scheduled
// only set `cgc` if PeerDAS fork epoch has been scheduled
if spec.is_peer_das_scheduled() {
let custody_subnet_count = if config.subscribe_all_data_column_subnets {
spec.data_column_sidecar_subnet_count
let custody_group_count = if config.subscribe_all_data_column_subnets {
spec.number_of_custody_groups
} else {
spec.custody_requirement
};
builder.add_value(PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY, &custody_subnet_count);
builder.add_value(PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY, &custody_group_count);
}

builder
Expand All @@ -287,11 +287,11 @@ fn compare_enr(local_enr: &Enr, disk_enr: &Enr) -> bool {
&& (local_enr.udp4().is_none() || local_enr.udp4() == disk_enr.udp4())
&& (local_enr.udp6().is_none() || local_enr.udp6() == disk_enr.udp6())
// we need the ATTESTATION_BITFIELD_ENR_KEY and SYNC_COMMITTEE_BITFIELD_ENR_KEY and
// PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY key to match, otherwise we use a new ENR. This will
// PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY key to match, otherwise we use a new ENR. This will
// likely only be true for non-validating nodes.
&& local_enr.get_decodable::<Bytes>(ATTESTATION_BITFIELD_ENR_KEY) == disk_enr.get_decodable(ATTESTATION_BITFIELD_ENR_KEY)
&& local_enr.get_decodable::<Bytes>(SYNC_COMMITTEE_BITFIELD_ENR_KEY) == disk_enr.get_decodable(SYNC_COMMITTEE_BITFIELD_ENR_KEY)
&& local_enr.get_decodable::<Bytes>(PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY) == disk_enr.get_decodable(PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY)
&& local_enr.get_decodable::<Bytes>(PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY) == disk_enr.get_decodable(PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY)
}

/// Loads enr from the given directory
Expand Down Expand Up @@ -348,7 +348,7 @@ mod test {
}

#[test]
fn custody_subnet_count_default() {
fn custody_group_count_default() {
let config = NetworkConfig {
subscribe_all_data_column_subnets: false,
..NetworkConfig::default()
Expand All @@ -358,13 +358,13 @@ mod test {
let enr = build_enr_with_config(config, &spec).0;

assert_eq!(
enr.custody_subnet_count::<E>(&spec).unwrap(),
enr.custody_group_count::<E>(&spec).unwrap(),
spec.custody_requirement,
);
}

#[test]
fn custody_subnet_count_all() {
fn custody_group_count_all() {
let config = NetworkConfig {
subscribe_all_data_column_subnets: true,
..NetworkConfig::default()
Expand All @@ -373,8 +373,8 @@ mod test {
let enr = build_enr_with_config(config, &spec).0;

assert_eq!(
enr.custody_subnet_count::<E>(&spec).unwrap(),
spec.data_column_sidecar_subnet_count,
enr.custody_group_count::<E>(&spec).unwrap(),
spec.number_of_custody_groups,
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
//! The subnet predicate used for searching for a particular subnet.
use super::*;
use crate::types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield};
use itertools::Itertools;
use slog::trace;
use std::ops::Deref;
use types::{ChainSpec, DataColumnSubnetId};
use types::data_column_custody_group::compute_subnets_for_node;
use types::ChainSpec;

/// Returns the predicate for a given subnet.
pub fn subnet_predicate<E>(
Expand Down Expand Up @@ -37,13 +37,9 @@ where
.as_ref()
.map_or(false, |b| b.get(*s.deref() as usize).unwrap_or(false)),
Subnet::DataColumn(s) => {
if let Ok(custody_subnet_count) = enr.custody_subnet_count::<E>(&spec) {
DataColumnSubnetId::compute_custody_subnets::<E>(
enr.node_id().raw(),
custody_subnet_count,
&spec,
)
.map_or(false, |mut subnets| subnets.contains(s))
if let Ok(custody_group_count) = enr.custody_group_count::<E>(&spec) {
compute_subnets_for_node(enr.node_id().raw(), custody_group_count, &spec)
.map_or(false, |subnets| subnets.contains(s))
} else {
false
}
Expand Down
8 changes: 4 additions & 4 deletions beacon_node/lighthouse_network/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,11 @@ pub static PEERS_PER_CLIENT: LazyLock<Result<IntGaugeVec>> = LazyLock::new(|| {
)
});

pub static PEERS_PER_CUSTODY_SUBNET_COUNT: LazyLock<Result<IntGaugeVec>> = LazyLock::new(|| {
pub static PEERS_PER_CUSTODY_GROUP_COUNT: LazyLock<Result<IntGaugeVec>> = LazyLock::new(|| {
try_create_int_gauge_vec(
"peers_per_custody_subnet_count",
"The current count of peers by custody subnet count",
&["custody_subnet_count"],
"peers_per_custody_group_count",
"The current count of peers by custody group count",
&["custody_group_count"],
)
});

Expand Down
Loading
Loading