Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reduces std::mem::size_of::<gossip::CrdsData>() #4391

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions core/src/banking_simulation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use {
solana_client::connection_cache::ConnectionCache,
solana_gossip::{
cluster_info::{ClusterInfo, Node},
contact_info::ContactInfo,
contact_info::ContactInfoQuery,
},
solana_ledger::{
blockstore::{Blockstore, PurgeType},
Expand Down Expand Up @@ -248,10 +248,7 @@ impl LikeClusterInfo for Arc<DummyClusterInfo> {
*self.id.read().unwrap()
}

fn lookup_contact_info<F, Y>(&self, _id: &Pubkey, _map: F) -> Option<Y>
where
F: FnOnce(&ContactInfo) -> Y,
{
fn lookup_contact_info<R>(&self, _: &Pubkey, _: impl ContactInfoQuery<R>) -> Option<R> {
None
}
}
Expand Down
13 changes: 4 additions & 9 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use {
crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender},
histogram::Histogram,
solana_client::connection_cache::ConnectionCache,
solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo},
solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfoQuery},
solana_ledger::blockstore_processor::TransactionStatusSender,
solana_measure::measure_us,
solana_perf::{data_budget::DataBudget, packet::PACKETS_PER_BATCH},
Expand Down Expand Up @@ -328,21 +328,16 @@ pub struct FilterForwardingResults {
pub trait LikeClusterInfo: Send + Sync + 'static + Clone {
fn id(&self) -> Pubkey;

fn lookup_contact_info<F, Y>(&self, id: &Pubkey, map: F) -> Option<Y>
where
F: FnOnce(&ContactInfo) -> Y;
fn lookup_contact_info<R>(&self, id: &Pubkey, query: impl ContactInfoQuery<R>) -> Option<R>;
}

impl LikeClusterInfo for Arc<ClusterInfo> {
fn id(&self) -> Pubkey {
self.deref().id()
}

fn lookup_contact_info<F, Y>(&self, id: &Pubkey, map: F) -> Option<Y>
where
F: FnOnce(&ContactInfo) -> Y,
{
self.deref().lookup_contact_info(id, map)
fn lookup_contact_info<R>(&self, id: &Pubkey, query: impl ContactInfoQuery<R>) -> Option<R> {
self.deref().lookup_contact_info(id, query)
}
}

Expand Down
11 changes: 4 additions & 7 deletions core/src/next_leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use {
itertools::Itertools,
solana_gossip::{
cluster_info::ClusterInfo,
contact_info::{ContactInfo, Protocol},
contact_info::{ContactInfoQuery, Protocol},
},
solana_poh::poh_recorder::PohRecorder,
solana_sdk::{clock::FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, pubkey::Pubkey},
Expand Down Expand Up @@ -45,14 +45,11 @@ pub(crate) fn next_leader_tpu_vote(
})
}

pub(crate) fn next_leader<F>(
pub(crate) fn next_leader(
cluster_info: &impl LikeClusterInfo,
poh_recorder: &RwLock<PohRecorder>,
port_selector: F,
) -> Option<(Pubkey, SocketAddr)>
where
F: FnOnce(&ContactInfo) -> Option<SocketAddr>,
{
port_selector: impl ContactInfoQuery<Option<SocketAddr>>,
) -> Option<(Pubkey, SocketAddr)> {
let leader_pubkey = poh_recorder
.read()
.unwrap()
Expand Down
3 changes: 2 additions & 1 deletion core/src/repair/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1274,7 +1274,8 @@ impl ServeRepair {
.iter()
.filter_map(|key| {
if *key != self.my_id() {
self.cluster_info.lookup_contact_info(key, |ci| ci.clone())
self.cluster_info
.lookup_contact_info(key, |node| ContactInfo::from(node))
} else {
None
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/warm_quic_cache_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use {
rand::{thread_rng, Rng},
solana_client::connection_cache::{ConnectionCache, Protocol},
solana_connection_cache::client_connection::ClientConnection as TpuConnection,
solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo},
solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfoQuery},
solana_poh::poh_recorder::PohRecorder,
solana_pubkey::Pubkey,
std::{
Expand All @@ -32,7 +32,7 @@ impl WarmQuicCacheService {
cache: Option<&ConnectionCache>,
cluster_info: &ClusterInfo,
leader_pubkey: &Pubkey,
contact_info_selector: impl Fn(&ContactInfo) -> Option<SocketAddr>,
contact_info_selector: impl ContactInfoQuery<Option<SocketAddr>>,
log_context: &str,
) {
if let Some(connection_cache) = cache {
Expand Down
84 changes: 37 additions & 47 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use {
cluster_info_metrics::{
submit_gossip_stats, Counter, GossipStats, ScopedTimer, TimedGuard,
},
contact_info::{self, ContactInfo, Error as ContactInfoError},
contact_info::{self, ContactInfo, ContactInfoQuery, Error as ContactInfoError},
crds::{Crds, Cursor, GossipRoute},
crds_data::{
self, CrdsData, EpochSlotsIndex, LowestSlot, NodeInstance, SnapshotHashes, Version,
Expand All @@ -34,7 +34,6 @@ use {
duplicate_shred::DuplicateShred,
epoch_slots::EpochSlots,
gossip_error::GossipError,
legacy_contact_info::LegacyContactInfo,
ping_pong::Pong,
protocol::{
split_gossip_messages, Ping, PingCache, Protocol, PruneData,
Expand Down Expand Up @@ -283,19 +282,16 @@ impl ClusterInfo {
}

// TODO kill insert_info, only used by tests
pub fn insert_info(&self, node: ContactInfo) {
let entries: Vec<_> = [
LegacyContactInfo::try_from(&node)
.map(CrdsData::LegacyContactInfo)
.expect("Operator must spin up node with valid contact-info"),
CrdsData::ContactInfo(node),
]
.into_iter()
.map(|entry| CrdsValue::new(entry, &self.keypair()))
.collect();
let mut gossip_crds = self.gossip.crds.write().unwrap();
for entry in entries {
let _ = gossip_crds.insert(entry, timestamp(), GossipRoute::LocalMessage);
pub fn insert_info<T>(&self, data: T)
where
CrdsData: From<T>,
{
let entry = CrdsValue::new(CrdsData::from(data), &self.keypair());
if let Err(err) = {
let mut gossip_crds = self.gossip.crds.write().unwrap();
gossip_crds.insert(entry, timestamp(), GossipRoute::LocalMessage)
} {
error!("ClusterInfo.insert_info: {err:?}");
}
}

Expand Down Expand Up @@ -464,12 +460,13 @@ impl ClusterInfo {
Ok(())
}

pub fn lookup_contact_info<F, Y>(&self, id: &Pubkey, map: F) -> Option<Y>
where
F: FnOnce(&ContactInfo) -> Y,
{
pub fn lookup_contact_info<R>(
&self,
id: &Pubkey,
query: impl ContactInfoQuery<R>,
) -> Option<R> {
let gossip_crds = self.gossip.crds.read().unwrap();
gossip_crds.get(*id).map(map)
gossip_crds.get(*id).map(query)
}

pub fn lookup_contact_info_by_gossip_addr(
Expand All @@ -480,7 +477,7 @@ impl ClusterInfo {
let mut nodes = gossip_crds.get_nodes_contact_info();
nodes
.find(|node| node.gossip() == Some(*gossip_addr))
.cloned()
.map(ContactInfo::from)
}

pub fn my_contact_info(&self) -> ContactInfo {
Expand Down Expand Up @@ -1054,7 +1051,7 @@ impl ClusterInfo {
pub fn get_node_version(&self, pubkey: &Pubkey) -> Option<solana_version::Version> {
let gossip_crds = self.gossip.crds.read().unwrap();
gossip_crds
.get::<&ContactInfo>(*pubkey)
.get::<&ContactInfo<_>>(*pubkey)
.map(ContactInfo::version)
.cloned()
}
Expand All @@ -1074,7 +1071,7 @@ impl ClusterInfo {
.filter(|node| {
node.pubkey() != &self_pubkey && self.check_socket_addr_space(&node.rpc())
})
.cloned()
.map(ContactInfo::from)
.collect()
}

Expand All @@ -1083,7 +1080,7 @@ impl ClusterInfo {
let gossip_crds = self.gossip.crds.read().unwrap();
gossip_crds
.get_nodes()
.map(|x| (x.value.contact_info().unwrap().clone(), x.local_timestamp))
.map(|x| (x.value.contact_info().unwrap().into(), x.local_timestamp))
.collect()
}

Expand All @@ -1094,7 +1091,7 @@ impl ClusterInfo {
.get_nodes_contact_info()
// shred_version not considered for gossip peers (ie, spy nodes do not set shred_version)
.filter(|node| node.pubkey() != &me && self.check_socket_addr_space(&node.gossip()))
.cloned()
.map(ContactInfo::from)
.collect()
}

Expand All @@ -1107,12 +1104,12 @@ impl ClusterInfo {
node.pubkey() != &self_pubkey
&& self.check_socket_addr_space(&node.tvu(contact_info::Protocol::UDP))
})
.cloned()
.map(ContactInfo::from)
.collect()
}

/// all validators that have a valid tvu port and are on the same `shred_version`.
pub fn tvu_peers<R>(&self, query: impl Fn(&ContactInfo) -> R) -> Vec<R> {
pub fn tvu_peers<R>(&self, query: impl ContactInfoQuery<R>) -> Vec<R> {
let self_pubkey = self.id();
let self_shred_version = self.my_shred_version();
self.time_gossip_read_lock("tvu_peers", &self.stats.tvu_peers)
Expand Down Expand Up @@ -1144,7 +1141,7 @@ impl ClusterInfo {
Some(lowest_slot) => lowest_slot.lowest <= slot,
}
})
.cloned()
.map(ContactInfo::from)
.collect()
}

Expand All @@ -1171,7 +1168,7 @@ impl ClusterInfo {
node.pubkey() != &self_pubkey
&& self.check_socket_addr_space(&node.tpu(contact_info::Protocol::UDP))
})
.cloned()
.map(ContactInfo::from)
.collect()
}

Expand All @@ -1181,12 +1178,9 @@ impl ClusterInfo {
let node = {
let mut node = self.my_contact_info.write().unwrap();
node.set_wallclock(timestamp());
node.clone()
ContactInfo::from(&*node)
};
let entries: Vec<_> = [
LegacyContactInfo::try_from(&node)
.map(CrdsData::LegacyContactInfo)
.expect("Operator must spin up node with valid contact-info"),
CrdsData::ContactInfo(node),
CrdsData::NodeInstance(instance),
]
Expand Down Expand Up @@ -1261,10 +1255,7 @@ impl ClusterInfo {
Vec<(SocketAddr, Protocol)>, // Pull requests
) {
let now = timestamp();
let self_info = CrdsValue::new(
CrdsData::ContactInfo(self.my_contact_info()),
&self.keypair(),
);
let self_info = CrdsValue::new(CrdsData::from(self.my_contact_info()), &self.keypair());
let max_bloom_filter_bytes = get_max_bloom_filter_bytes(&self_info);
let mut pings = Vec::new();
let mut pulls = {
Expand Down Expand Up @@ -1340,7 +1331,7 @@ impl ClusterInfo {
push_messages
.into_iter()
.filter_map(|(pubkey, messages)| {
let peer: &ContactInfo = gossip_crds.get(pubkey)?;
let peer: &ContactInfo<_> = gossip_crds.get(pubkey)?;
Some((peer.gossip()?, messages))
})
.collect()
Expand Down Expand Up @@ -1829,7 +1820,7 @@ impl ClusterInfo {
score
};
let score = match response.data() {
CrdsData::LegacyContactInfo(_) | CrdsData::ContactInfo(_) => 2 * score,
CrdsData::ContactInfo(_) => 2 * score,
_ => score,
};
((addr, response), score)
Expand Down Expand Up @@ -2081,7 +2072,7 @@ impl ClusterInfo {
prunes
.into_par_iter()
.filter_map(|(pubkey, prunes)| {
let addr = gossip_crds.get::<&ContactInfo>(pubkey)?.gossip()?;
let addr = gossip_crds.get::<&ContactInfo<_>>(pubkey)?.gossip()?;
Some((pubkey, addr, prunes))
})
.collect()
Expand Down Expand Up @@ -3337,7 +3328,7 @@ mod tests {

fn test_crds_values(pubkey: Pubkey) -> Vec<CrdsValue> {
let entrypoint = ContactInfo::new_localhost(&pubkey, timestamp());
let entrypoint_crdsvalue = CrdsValue::new_unsigned(CrdsData::ContactInfo(entrypoint));
let entrypoint_crdsvalue = CrdsValue::new_unsigned(CrdsData::from(entrypoint));
vec![entrypoint_crdsvalue]
}

Expand Down Expand Up @@ -3806,7 +3797,7 @@ mod tests {
node.set_shred_version(42);
let epoch_slots = EpochSlots::new_rand(&mut rng, Some(node_pubkey));
let entries = vec![
CrdsValue::new_unsigned(CrdsData::ContactInfo(node)),
CrdsValue::new_unsigned(CrdsData::from(node)),
CrdsValue::new_unsigned(CrdsData::EpochSlots(0, epoch_slots)),
];
{
Expand Down Expand Up @@ -3862,8 +3853,7 @@ mod tests {
}
}
// now add this message back to the table and make sure after the next pull, the entrypoint is unset
let entrypoint_crdsvalue =
CrdsValue::new_unsigned(CrdsData::ContactInfo(entrypoint.clone()));
let entrypoint_crdsvalue = CrdsValue::new_unsigned(CrdsData::from(&entrypoint));
let cluster_info = Arc::new(cluster_info);
let stakes = HashMap::from([(Pubkey::new_unique(), 1u64)]);
let timeouts = cluster_info.gossip.make_timeouts(
Expand Down Expand Up @@ -4178,7 +4168,7 @@ mod tests {
let mut rand_ci = ContactInfo::new_rand(&mut rng, Some(keypair.pubkey()));
rand_ci.set_shred_version(shred_version);
rand_ci.set_wallclock(timestamp());
CrdsValue::new(CrdsData::ContactInfo(rand_ci), &keypair)
CrdsValue::new(CrdsData::from(rand_ci), &keypair)
})
.take(NO_ENTRIES)
.collect();
Expand Down Expand Up @@ -4291,7 +4281,7 @@ mod tests {
let mut slots = RestartLastVotedForkSlots::new_rand(&mut rng, Some(node_pubkey));
slots.shred_version = 42;
let entries = vec![
CrdsValue::new_unsigned(CrdsData::ContactInfo(node)),
CrdsValue::new_unsigned(CrdsData::from(node)),
CrdsValue::new_unsigned(CrdsData::RestartLastVotedForkSlots(slots)),
];
{
Expand Down Expand Up @@ -4361,7 +4351,7 @@ mod tests {
let hash2 = Hash::new_unique();
let stake2 = 23_000_000;
let entries = vec![
CrdsValue::new_unsigned(CrdsData::ContactInfo(new_node)),
CrdsValue::new_unsigned(CrdsData::from(new_node)),
CrdsValue::new_unsigned(CrdsData::RestartHeaviestFork(RestartHeaviestFork {
from: pubkey2,
wallclock: timestamp(),
Expand Down
Loading
Loading