Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enha: use parking_lot mutex instead of std::sync #1902

Merged
merged 6 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions src/eth/executor/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use std::cmp::max;
use std::mem;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::Mutex;

use anyhow::anyhow;
use cfg_if::cfg_if;
use parking_lot::Mutex;
use tracing::info_span;
use tracing::Span;

Expand Down Expand Up @@ -35,7 +35,6 @@ use crate::eth::storage::Storage;
use crate::eth::storage::StratusStorage;
use crate::ext::spawn_thread;
use crate::ext::to_json_string;
use crate::ext::MutexExt;
#[cfg(feature = "metrics")]
use crate::ext::OptionExt;
#[cfg(feature = "metrics")]
Expand Down Expand Up @@ -399,7 +398,7 @@ impl Executor {
// * Conflict detection runs, but it should never trigger because of the Mutex.
ExecutorStrategy::Serial => {
// acquire serial execution lock
let _serial_lock = self.locks.serial.lock_or_clear("executor serial lock was poisoned");
let _serial_lock = self.locks.serial.lock();

// execute transaction
self.execute_local_transaction_attempts(tx, EvmRoute::Serial, INFINITE_ATTEMPTS)
Expand Down
47 changes: 12 additions & 35 deletions src/eth/miner/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::RwLock;
use std::time::Duration;

use anyhow::anyhow;
use itertools::Itertools;
use keccak_hasher::KeccakHasher;
use parking_lot::Mutex;
use parking_lot::RwLock;
use tokio::sync::broadcast;
use tokio::sync::Mutex as AsyncMutex;
use tokio::task::JoinSet;
Expand All @@ -34,8 +34,6 @@ use crate::eth::storage::Storage;
use crate::eth::storage::StratusStorage;
use crate::ext::not;
use crate::ext::DisplayExt;
use crate::ext::MutexExt;
use crate::ext::MutexResultExt;
use crate::globals::STRATUS_SHUTDOWN_SIGNAL;
use crate::infra::tracing::SpanExt;
use crate::log_and_err;
Expand Down Expand Up @@ -128,7 +126,7 @@ impl Miner {

joinset.spawn(interval_miner_ticker::run(block_time, ticks_tx, new_shutdown_signal.clone()));

*self.shutdown_signal.lock_or_clear("setting up shutdown signal for interval miner") = new_shutdown_signal;
*self.shutdown_signal.lock() = new_shutdown_signal;
*self.interval_joinset.lock().await = Some(joinset);
}

Expand Down Expand Up @@ -159,19 +157,11 @@ impl Miner {
}

pub fn mode(&self) -> MinerMode {
*self.mode.read().unwrap_or_else(|poison_error| {
tracing::error!("miner mode read lock was poisoned");
self.mode.clear_poison();
poison_error.into_inner()
})
*self.mode.read()
}

fn set_mode(&self, new_mode: MinerMode) {
*self.mode.write().unwrap_or_else(|poison_error| {
tracing::error!("miner mode write lock was poisoned");
self.mode.clear_poison();
poison_error.into_inner()
}) = new_mode;
*self.mode.write() = new_mode;
}

pub fn is_interval_miner_running(&self) -> bool {
Expand All @@ -195,7 +185,7 @@ impl Miner {

tracing::warn!("Shutting down interval miner to switch to external mode");

self.shutdown_signal.lock_or_clear("sending shutdown signal to interval miner").cancel();
self.shutdown_signal.lock().cancel();

// wait for all tasks to end
while let Some(result) = joinset.join_next().await {
Expand All @@ -217,11 +207,7 @@ impl Miner {
let is_automine = self.mode().is_automine();

// if automine is enabled, only one transaction can enter the block at a time.
let _save_execution_lock = if is_automine {
Some(self.locks.save_execution.lock().map_lock_error("save_execution")?)
} else {
None
};
let _save_execution_lock = if is_automine { Some(self.locks.save_execution.lock()) } else { None };

// save execution to temporary storage
self.storage.save_execution(tx_execution, check_conflicts)?;
Expand All @@ -237,14 +223,6 @@ impl Miner {
Ok(())
}

/// Same as [`Self::mine_external`], but automatically commits the block instead of returning it.
pub fn mine_external_and_commit(&self, external_block: ExternalBlock) -> anyhow::Result<()> {
let _mine_and_commit_lock = self.locks.mine_and_commit.lock().map_lock_error("mine_external_and_commit")?;

let block = self.mine_external(external_block)?;
self.commit(block)
}

/// Mines external block and external transactions.
///
/// Local transactions are not allowed to be part of the block.
Expand All @@ -254,7 +232,7 @@ impl Miner {
let _span = info_span!("miner::mine_external", block_number = field::Empty).entered();

// lock
let _mine_lock = self.locks.mine.lock().map_lock_error("mine_external")?;
let _mine_lock = self.locks.mine.lock();

// mine block
let block = self.storage.finish_pending_block()?;
Expand All @@ -277,7 +255,7 @@ impl Miner {
/// Same as [`Self::mine_local`], but automatically commits the block instead of returning it.
/// mainly used when is_automine is enabled.
pub fn mine_local_and_commit(&self) -> anyhow::Result<()> {
let _mine_and_commit_lock = self.locks.mine_and_commit.lock().map_lock_error("mine_local_and_commit")?;
let _mine_and_commit_lock = self.locks.mine_and_commit.lock();

let block = self.mine_local()?;
self.commit(block)
Expand All @@ -291,7 +269,7 @@ impl Miner {
let _span = info_span!("miner::mine_local", block_number = field::Empty).entered();

// lock
let _mine_lock = self.locks.mine.lock().map_lock_error("mine_local")?;
let _mine_lock = self.locks.mine.lock();

// mine block
let block = self.storage.finish_pending_block()?;
Expand Down Expand Up @@ -320,7 +298,7 @@ impl Miner {
tracing::info!(%block_number, transactions_len = %block.transactions.len(), "commiting block");

// lock
let _commit_lock = self.locks.commit.lock().map_lock_error("commit")?;
let _commit_lock = self.locks.commit.lock();

tracing::info!(%block_number, "miner acquired commit lock");

Expand Down Expand Up @@ -453,7 +431,6 @@ mod interval_miner {
use tokio_util::sync::CancellationToken;

use crate::eth::miner::Miner;
use crate::ext::MutexExt;
use crate::infra::tracing::warn_task_cancellation;
use crate::infra::tracing::warn_task_rx_closed;

Expand Down Expand Up @@ -489,7 +466,7 @@ mod interval_miner {

#[inline(always)]
fn mine_and_commit(miner: &Miner) {
let _mine_and_commit_lock = miner.locks.mine_and_commit.lock_or_clear("mutex in mine_and_commit is poisoned");
let _mine_and_commit_lock = miner.locks.mine_and_commit.lock();

// mine
let block = loop {
Expand Down
18 changes: 4 additions & 14 deletions src/eth/rpc/rpc_context.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::fmt::Debug;
use std::sync::Arc;
use std::sync::RwLock;

use parking_lot::RwLock;

use crate::alias::JsonValue;
use crate::eth::executor::Executor;
Expand Down Expand Up @@ -33,22 +34,11 @@ pub struct RpcContext {

impl RpcContext {
pub fn consensus(&self) -> Option<Arc<dyn Consensus>> {
self.consensus
.read()
.unwrap_or_else(|poison_error| {
tracing::error!("consensus read lock was poisoned");
self.consensus.clear_poison();
poison_error.into_inner()
})
.clone()
self.consensus.read().clone()
}

pub fn set_consensus(&self, new_consensus: Option<Arc<dyn Consensus>>) {
*self.consensus.write().unwrap_or_else(|poison_error| {
tracing::error!("consensus write lock was poisoned");
self.consensus.clear_poison();
poison_error.into_inner()
}) = new_consensus;
*self.consensus.write() = new_consensus;
}
}

Expand Down
10 changes: 5 additions & 5 deletions src/eth/storage/permanent/inmemory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ use std::fmt::Debug;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::RwLock;
use std::sync::RwLockReadGuard;
use std::sync::RwLockWriteGuard;

use indexmap::IndexMap;
use itertools::Itertools;
use nonempty::NonEmpty;
use parking_lot::RwLock;
use parking_lot::RwLockReadGuard;
use parking_lot::RwLockWriteGuard;

use crate::eth::primitives::Account;
use crate::eth::primitives::Address;
Expand Down Expand Up @@ -52,12 +52,12 @@ impl InMemoryPermanentStorage {

/// Locks inner state for reading.
fn lock_read(&self) -> RwLockReadGuard<'_, InMemoryPermanentStorageState> {
self.state.read().unwrap()
self.state.read()
}

/// Locks inner state for writing.
fn lock_write(&self) -> RwLockWriteGuard<'_, InMemoryPermanentStorageState> {
self.state.write().unwrap()
self.state.write()
}

// -------------------------------------------------------------------------
Expand Down
6 changes: 2 additions & 4 deletions src/eth/storage/permanent/rocks/rocks_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,13 @@ use crate::eth::primitives::PointInTime;
use crate::eth::primitives::Slot;
use crate::eth::primitives::SlotIndex;
use crate::eth::primitives::TransactionMined;
#[cfg(feature = "metrics")]
use crate::ext::MutexExt;
use crate::ext::OptionExt;
use crate::log_and_err;
use crate::utils::GIGABYTE;

cfg_if::cfg_if! {
if #[cfg(feature = "metrics")] {
use std::sync::Mutex;
use parking_lot::Mutex;

use rocksdb::statistics::Histogram;
use rocksdb::statistics::Ticker;
Expand Down Expand Up @@ -597,7 +595,7 @@ impl RocksStorageState {
// The stats are cumulative since opening the db
// we can get the average in the time interval with: avg = (new_sum - sum)/(new_count - count)

let mut prev_values = self.prev_stats.lock_or_clear("mutex in get_histogram_average_in_interval is poisoned");
let mut prev_values = self.prev_stats.lock();

let (prev_sum, prev_count): (Sum, Count) = *prev_values.get(&(hist as u32)).unwrap_or(&(0, 0));
let data = self.db_options.get_histogram_data(hist);
Expand Down
32 changes: 12 additions & 20 deletions src/eth/storage/temporary/inmemory.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
//! In-memory storage implementations.

use std::collections::HashMap;
use std::sync::RwLock;

use parking_lot::RwLock;

use crate::eth::executor::EvmInput;
use crate::eth::primitives::Account;
Expand Down Expand Up @@ -111,7 +112,7 @@ impl TemporaryStorage for InMemoryTemporaryStorage {

// Uneeded clone here, return Cow
fn read_pending_block_header(&self) -> PendingBlockHeader {
self.pending_block.read().unwrap().block.header.clone()
self.pending_block.read().block.header.clone()
}

// -------------------------------------------------------------------------
Expand All @@ -120,7 +121,7 @@ impl TemporaryStorage for InMemoryTemporaryStorage {

fn save_pending_execution(&self, tx: TransactionExecution, check_conflicts: bool) -> Result<(), StratusError> {
// check conflicts
let mut pending_block = self.pending_block.write().unwrap();
let mut pending_block = self.pending_block.write();
if let TransactionExecution::Local(tx) = &tx {
let expected_input = EvmInput::from_eth_transaction(&tx.input, &pending_block.block.header);

Expand Down Expand Up @@ -174,11 +175,11 @@ impl TemporaryStorage for InMemoryTemporaryStorage {
}

fn read_pending_executions(&self) -> Vec<TransactionExecution> {
self.pending_block.read().unwrap().block.transactions.iter().map(|(_, tx)| tx.clone()).collect()
self.pending_block.read().block.transactions.iter().map(|(_, tx)| tx.clone()).collect()
}

fn finish_pending_block(&self) -> anyhow::Result<PendingBlock> {
let mut pending_block = self.pending_block.write().unwrap();
let mut pending_block = self.pending_block.write();

#[cfg(feature = "dev")]
let mut finished_block = pending_block.block.clone();
Expand All @@ -194,7 +195,7 @@ impl TemporaryStorage for InMemoryTemporaryStorage {
}
}

let mut latest = self.latest_block.write().unwrap();
let mut latest = self.latest_block.write();
*latest = Some(std::mem::replace(
&mut *pending_block,
InMemoryTemporaryStorageState::new(finished_block.header.number.next_block_number()),
Expand All @@ -204,7 +205,7 @@ impl TemporaryStorage for InMemoryTemporaryStorage {
}

fn read_pending_execution(&self, hash: Hash) -> anyhow::Result<Option<TransactionExecution>> {
let pending_block = self.pending_block.read().unwrap();
let pending_block = self.pending_block.read();
match pending_block.block.transactions.get(&hash) {
Some(tx) => Ok(Some(tx.clone())),
None => Ok(None),
Expand All @@ -216,12 +217,11 @@ impl TemporaryStorage for InMemoryTemporaryStorage {
// -------------------------------------------------------------------------

fn read_account(&self, address: Address) -> anyhow::Result<Option<Account>> {
Ok(match self.pending_block.read().unwrap().accounts.get(&address) {
Ok(match self.pending_block.read().accounts.get(&address) {
Some(pending_account) => Some(pending_account.info.clone()),
None => self
.latest_block
.read()
.unwrap()
.as_ref()
.and_then(|latest| latest.accounts.get(&address))
.map(|account| account.info.clone()),
Expand All @@ -230,19 +230,11 @@ impl TemporaryStorage for InMemoryTemporaryStorage {

fn read_slot(&self, address: Address, index: SlotIndex) -> anyhow::Result<Option<Slot>> {
Ok(
match self
.pending_block
.read()
.unwrap()
.accounts
.get(&address)
.and_then(|account| account.slots.get(&index))
{
match self.pending_block.read().accounts.get(&address).and_then(|account| account.slots.get(&index)) {
Some(pending_slot) => Some(*pending_slot),
None => self
.latest_block
.read()
.unwrap()
.as_ref()
.and_then(|latest| latest.accounts.get(&address).and_then(|account| account.slots.get(&index)))
.copied(),
Expand All @@ -254,8 +246,8 @@ impl TemporaryStorage for InMemoryTemporaryStorage {
// Global state
// -------------------------------------------------------------------------
fn reset(&self) -> anyhow::Result<()> {
self.pending_block.write().unwrap().reset();
*self.latest_block.write().unwrap() = None;
self.pending_block.write().reset();
*self.latest_block.write() = None;
Ok(())
}
}
Loading
Loading