Skip to content

Commit

Permalink
Reduce StatusCache contention.
Browse files Browse the repository at this point in the history
  • Loading branch information
alessandrod committed Dec 9, 2024
1 parent be115c7 commit fc67d2f
Show file tree
Hide file tree
Showing 15 changed files with 528 additions and 230 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ bitflags = { version = "2.6.0" }
blake3 = "1.5.5"
borsh = { version = "1.5.3", features = ["derive", "unstable__schema"] }
borsh0-10 = { package = "borsh", version = "0.10.3" }
boxcar = "0.2.7"
bs58 = { version = "0.5.1", default-features = false }
bv = "0.11.1"
byte-unit = "4.0.19"
Expand All @@ -295,7 +296,7 @@ crossbeam-channel = "0.5.13"
csv = "1.3.1"
ctrlc = "3.4.5"
curve25519-dalek = { version = "4.1.3", features = ["digest", "rand_core"] }
dashmap = "5.5.3"
dashmap = { version = "5.5.3", features = ["serde"] }
derivation-path = { version = "0.2.0", default-features = false }
derive-where = "1.2.7"
dialoguer = "0.10.4"
Expand Down
1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ solana-logger = { workspace = true }
solana-net-utils = { workspace = true, features = ["dev-context-only-utils"] }
solana-poh = { workspace = true, features = ["dev-context-only-utils"] }
solana-program-runtime = { workspace = true }
solana-runtime = { workspace = true, features = ["dev-context-only-utils"] }
solana-sdk = { workspace = true, features = ["dev-context-only-utils"] }
solana-stake-program = { workspace = true }
solana-unified-scheduler-pool = { workspace = true, features = [
Expand Down
4 changes: 0 additions & 4 deletions core/tests/snapshots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,11 +363,7 @@ fn test_slots_to_snapshot(snapshot_version: SnapshotVersion, cluster_type: Clust
.unwrap()
.root_bank()
.status_cache
.read()
.unwrap()
.roots()
.iter()
.cloned()
.sorted();
assert!(slots_to_snapshot.into_iter().eq(expected_slots_to_snapshot));
}
Expand Down
9 changes: 9 additions & 0 deletions programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ arrayref = { workspace = true }
base64 = { workspace = true }
bincode = { workspace = true }
blake3 = { workspace = true }
boxcar = { workspace = true }
bv = { workspace = true, features = ["serde"] }
bytemuck = { workspace = true }
byteorder = { workspace = true }
Expand Down Expand Up @@ -48,6 +49,7 @@ serde = { workspace = true, features = ["rc"] }
serde_derive = { workspace = true }
serde_json = { workspace = true }
serde_with = { workspace = true }
smallvec = { workspace = true }
solana-accounts-db = { workspace = true }
solana-address-lookup-table-program = { workspace = true }
solana-bpf-loader-program = { workspace = true }
Expand Down
6 changes: 3 additions & 3 deletions runtime/benches/status_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use {

#[bench]
fn bench_status_cache_serialize(bencher: &mut Bencher) {
let mut status_cache = BankStatusCache::default();
let status_cache = BankStatusCache::default();
status_cache.add_root(0);
status_cache.clear();
for hash_index in 0..100 {
Expand All @@ -30,15 +30,15 @@ fn bench_status_cache_serialize(bencher: &mut Bencher) {
status_cache.insert(&blockhash, sig, 0, Ok(()));
}
}
assert!(status_cache.roots().contains(&0));
assert!(status_cache.roots().collect::<Vec<_>>().contains(&0));
bencher.iter(|| {
let _ = serialize(&status_cache.root_slot_deltas()).unwrap();
});
}

#[bench]
fn bench_status_cache_root_slot_deltas(bencher: &mut Bencher) {
let mut status_cache = BankStatusCache::default();
let status_cache = BankStatusCache::default();

// fill the status cache
let slots: Vec<_> = (42..).take(MAX_CACHE_ENTRIES).collect();
Expand Down
29 changes: 14 additions & 15 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ struct RentMetrics {
pub type BankStatusCache = StatusCache<Result<()>>;
#[cfg_attr(
feature = "frozen-abi",
frozen_abi(digest = "BHg4qpwegtaJypLUqAdjQYzYeLfEGf6tA4U5cREbHMHi")
frozen_abi(digest = "CQE8Pab7YMwvUj6rjD95kqt9fgBE4mkG7GRUko2DyveD")
)]
pub type BankSlotDelta = SlotDelta<Result<()>>;

Expand Down Expand Up @@ -753,7 +753,7 @@ pub struct Bank {
pub rc: BankRc,

/// A cache of signature statuses
pub status_cache: Arc<RwLock<BankStatusCache>>,
pub status_cache: Arc<BankStatusCache>,

/// FIFO queue of `recent_blockhash` items
blockhash_queue: RwLock<BlockhashQueue>,
Expand Down Expand Up @@ -1102,7 +1102,7 @@ impl Bank {
let mut bank = Self {
skipped_rewrites: Mutex::default(),
rc: BankRc::new(accounts),
status_cache: Arc::<RwLock<BankStatusCache>>::default(),
status_cache: Arc::<BankStatusCache>::default(),
blockhash_queue: RwLock::<BlockhashQueue>::default(),
ancestors: Ancestors::default(),
hash: RwLock::<Hash>::default(),
Expand Down Expand Up @@ -1769,7 +1769,7 @@ impl Bank {
let mut bank = Self {
skipped_rewrites: Mutex::default(),
rc: bank_rc,
status_cache: Arc::<RwLock<BankStatusCache>>::default(),
status_cache: Arc::<BankStatusCache>::default(),
blockhash_queue: RwLock::new(fields.blockhash_queue),
ancestors,
hash: RwLock::new(fields.hash),
Expand Down Expand Up @@ -2046,7 +2046,7 @@ impl Bank {
}

pub fn status_cache_ancestors(&self) -> Vec<u64> {
let mut roots = self.status_cache.read().unwrap().roots().clone();
let mut roots = self.status_cache.roots().collect::<HashSet<_>>();
let min = roots.iter().min().cloned().unwrap_or(0);
for ancestor in self.ancestors.keys() {
if ancestor >= min {
Expand Down Expand Up @@ -3193,7 +3193,7 @@ impl Bank {
let mut squash_cache_time = Measure::start("squash_cache_time");
roots
.iter()
.for_each(|slot| self.status_cache.write().unwrap().add_root(*slot));
.for_each(|slot| self.status_cache.add_root(*slot));
squash_cache_time.stop();

SquashTiming {
Expand Down Expand Up @@ -3459,26 +3459,26 @@ impl Bank {
}

/// Forget all signatures. Useful for benchmarking.
#[cfg(feature = "dev-context-only-utils")]
pub fn clear_signatures(&self) {
self.status_cache.write().unwrap().clear();
self.status_cache.clear();
}

pub fn clear_slot_signatures(&self, slot: Slot) {
self.status_cache.write().unwrap().clear_slot_entries(slot);
self.status_cache.clear_slot_entries(slot);
}

fn update_transaction_statuses(
&self,
sanitized_txs: &[impl TransactionWithMeta],
processing_results: &[TransactionProcessingResult],
) {
let mut status_cache = self.status_cache.write().unwrap();
assert_eq!(sanitized_txs.len(), processing_results.len());
for (tx, processing_result) in sanitized_txs.iter().zip(processing_results) {
if let Ok(processed_tx) = &processing_result {
// Add the message hash to the status cache to ensure that this message
// won't be processed again with a different signature.
status_cache.insert(
self.status_cache.insert(
tx.recent_blockhash(),
tx.message_hash(),
self.slot(),
Expand All @@ -3487,7 +3487,7 @@ impl Bank {
// Add the transaction signature to the status cache so that transaction status
// can be queried by transaction signature over RPC. In the future, this should
// only be added for API nodes because voting validators don't need to do this.
status_cache.insert(
self.status_cache.insert(
tx.recent_blockhash(),
tx.signature(),
self.slot(),
Expand Down Expand Up @@ -5635,15 +5635,14 @@ impl Bank {
signature: &Signature,
blockhash: &Hash,
) -> Option<Result<()>> {
let rcache = self.status_cache.read().unwrap();
rcache
self.status_cache
.get_status(signature, blockhash, &self.ancestors)
.map(|v| v.1)
}

pub fn get_signature_status_slot(&self, signature: &Signature) -> Option<(Slot, Result<()>)> {
let rcache = self.status_cache.read().unwrap();
rcache.get_status_any_blockhash(signature, &self.ancestors)
self.status_cache
.get_status_any_blockhash(signature, &self.ancestors)
}

pub fn get_signature_status(&self, signature: &Signature) -> Option<Result<()>> {
Expand Down
3 changes: 1 addition & 2 deletions runtime/src/bank/check_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,13 +191,12 @@ impl Bank {
) -> Vec<TransactionCheckResult> {
// Do allocation before acquiring the lock on the status cache.
let mut check_results = Vec::with_capacity(sanitized_txs.len());
let rcache = self.status_cache.read().unwrap();

check_results.extend(sanitized_txs.iter().zip(lock_results).map(
|(sanitized_tx, lock_result)| {
let sanitized_tx = sanitized_tx.borrow();
if lock_result.is_ok()
&& self.is_transaction_already_processed(sanitized_tx, &rcache)
&& self.is_transaction_already_processed(sanitized_tx, &self.status_cache)
{
error_counters.already_processed += 1;
return Err(TransactionError::AlreadyProcessed);
Expand Down
3 changes: 1 addition & 2 deletions runtime/src/bank_forks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,8 +452,7 @@ impl BankForks {
if bank.is_startup_verification_complete() {
// Save off the status cache because these may get pruned if another
// `set_root()` is called before the snapshots package can be generated
let status_cache_slot_deltas =
bank.status_cache.read().unwrap().root_slot_deltas();
let status_cache_slot_deltas = bank.status_cache.root_slot_deltas();
if let Err(e) =
accounts_background_request_sender.send_snapshot_request(SnapshotRequest {
snapshot_root_bank: Arc::clone(bank),
Expand Down
8 changes: 4 additions & 4 deletions runtime/src/snapshot_bank_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ fn rebuild_bank_from_unarchived_snapshots(

verify_slot_deltas(slot_deltas.as_slice(), &bank)?;

bank.status_cache.write().unwrap().append(&slot_deltas);
bank.status_cache.append(&slot_deltas);

info!("Rebuilt bank for slot: {}", bank.slot());
Ok((
Expand Down Expand Up @@ -686,7 +686,7 @@ fn rebuild_bank_from_snapshot(

verify_slot_deltas(slot_deltas.as_slice(), &bank)?;

bank.status_cache.write().unwrap().append(&slot_deltas);
bank.status_cache.append(&slot_deltas);

info!("Rebuilt bank for slot: {}", bank.slot());
Ok((
Expand Down Expand Up @@ -912,7 +912,7 @@ fn bank_to_full_snapshot_archive_with(
bank.update_accounts_hash(CalcAccountsHashDataSource::Storages, false, false);

let snapshot_storages = bank.get_snapshot_storages(None);
let status_cache_slot_deltas = bank.status_cache.read().unwrap().root_slot_deltas();
let status_cache_slot_deltas = bank.status_cache.root_slot_deltas();
let accounts_package = AccountsPackage::new_for_snapshot(
AccountsPackageKind::Snapshot(SnapshotKind::FullSnapshot),
bank,
Expand Down Expand Up @@ -975,7 +975,7 @@ pub fn bank_to_incremental_snapshot_archive(
bank.update_incremental_accounts_hash(full_snapshot_slot);

let snapshot_storages = bank.get_snapshot_storages(Some(full_snapshot_slot));
let status_cache_slot_deltas = bank.status_cache.read().unwrap().root_slot_deltas();
let status_cache_slot_deltas = bank.status_cache.root_slot_deltas();
let accounts_package = AccountsPackage::new_for_snapshot(
AccountsPackageKind::Snapshot(SnapshotKind::IncrementalSnapshot(full_snapshot_slot)),
bank,
Expand Down
Loading

0 comments on commit fc67d2f

Please sign in to comment.