Skip to content

Commit

Permalink
Testnet11 crucial bug fixes and patches (#215)
Browse files Browse the repository at this point in the history
* make sure all crates use workspace tokio

* checked sub just in case (minor)

* fix race condition deadlock with header processor commit_header

* daa is not monotonic hence this can overflow

* optimize is_nearly_synced

* limit max db open files to 500

* add overflow-checks to release build

* minor

* fix yet another overflow

* increase orphan_resolution_range

* edit link to point at to latest release

* rothschild: do not burst after a pause

* reduce log verbosity for chain negotiation restart

* improve header strings

* bump version to 0.1.2

* one more potential overflow based on daa

---------

Co-authored-by: Tiram <[email protected]>
  • Loading branch information
michaelsutton and tiram88 authored Jun 28, 2023
1 parent f41b5ee commit 17c55bb
Show file tree
Hide file tree
Showing 22 changed files with 147 additions and 117 deletions.
115 changes: 67 additions & 48 deletions Cargo.lock

Large diffs are not rendered by default.

53 changes: 27 additions & 26 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ members = [
]

[workspace.package]
version = "0.1.1"
version = "0.1.2"
authors = ["Kaspa developers"]
license = "MIT/Apache-2.0"
edition = "2021"
Expand All @@ -65,33 +65,33 @@ kaspa-p2p-flows = { path = "protocol/flows" }
kaspa-p2p-lib = { path = "protocol/p2p" }
kaspa-testing-integration = { path = "testing/integration" }
kaspa-utxoindex = { path = "indexes/utxoindex" }
kaspa-rpc-service = { version = "0.1.1", path = "rpc/service" }
kaspa-rpc-service = { version = "0.1.2", path = "rpc/service" }

# published
kaspa-addresses = { version = "0.1.1", path = "crypto/addresses" }
kaspa-bip32 = { version = "0.1.1", path = "wallet/bip32" }
kaspa-consensus = { version = "0.1.1", path = "consensus" }
kaspa-consensus-core = { version = "0.1.1", path = "consensus/core" }
kaspa-consensus-notify = { version = "0.1.1", path = "consensus/notify" }
kaspa-core = { version = "0.1.1", path = "core" }
kaspa-database = { version = "0.1.1", path = "database" }
kaspa-grpc-client = { version = "0.1.1", path = "rpc/grpc/client" }
kaspa-grpc-core = { version = "0.1.1", path = "rpc/grpc/core" }
kaspa-hashes = { version = "0.1.1", path = "crypto/hashes" }
kaspa-math = { version = "0.1.1", path = "math" }
kaspa-merkle = { version = "0.1.1", path = "crypto/merkle" }
kaspa-muhash = { version = "0.1.1", path = "crypto/muhash" }
kaspa-notify = { version = "0.1.1", path = "notify" }
kaspa-pow = { version = "0.1.1", path = "consensus/pow" }
kaspa-rpc-core = { version = "0.1.1", path = "rpc/core" }
kaspa-rpc-macros = { version = "0.1.1", path = "rpc/macros" }
kaspa-txscript = { version = "0.1.1", path = "crypto/txscript" }
kaspa-txscript-errors = { version = "0.1.1", path = "crypto/txscript/errors" }
kaspa-utils = { version = "0.1.1", path = "utils" }
kaspa-wallet-core = { version = "0.1.1", path = "wallet/core" }
kaspa-wasm = { version = "0.1.1", path = "wasm" }
kaspa-wrpc-client = { version = "0.1.1", path = "rpc/wrpc/client" }
kaspa-wrpc-wasm = { version = "0.1.1", path = "rpc/wrpc/wasm" }
kaspa-addresses = { version = "0.1.2", path = "crypto/addresses" }
kaspa-bip32 = { version = "0.1.2", path = "wallet/bip32" }
kaspa-consensus = { version = "0.1.2", path = "consensus" }
kaspa-consensus-core = { version = "0.1.2", path = "consensus/core" }
kaspa-consensus-notify = { version = "0.1.2", path = "consensus/notify" }
kaspa-core = { version = "0.1.2", path = "core" }
kaspa-database = { version = "0.1.2", path = "database" }
kaspa-grpc-client = { version = "0.1.2", path = "rpc/grpc/client" }
kaspa-grpc-core = { version = "0.1.2", path = "rpc/grpc/core" }
kaspa-hashes = { version = "0.1.2", path = "crypto/hashes" }
kaspa-math = { version = "0.1.2", path = "math" }
kaspa-merkle = { version = "0.1.2", path = "crypto/merkle" }
kaspa-muhash = { version = "0.1.2", path = "crypto/muhash" }
kaspa-notify = { version = "0.1.2", path = "notify" }
kaspa-pow = { version = "0.1.2", path = "consensus/pow" }
kaspa-rpc-core = { version = "0.1.2", path = "rpc/core" }
kaspa-rpc-macros = { version = "0.1.2", path = "rpc/macros" }
kaspa-txscript = { version = "0.1.2", path = "crypto/txscript" }
kaspa-txscript-errors = { version = "0.1.2", path = "crypto/txscript/errors" }
kaspa-utils = { version = "0.1.2", path = "utils" }
kaspa-wallet-core = { version = "0.1.2", path = "wallet/core" }
kaspa-wasm = { version = "0.1.2", path = "wasm" }
kaspa-wrpc-client = { version = "0.1.2", path = "rpc/wrpc/client" }
kaspa-wrpc-wasm = { version = "0.1.2", path = "rpc/wrpc/wasm" }

# not published
kaspa-grpc-server = { path = "rpc/grpc/server" }
Expand Down Expand Up @@ -177,3 +177,4 @@ workflow-terminal = { version = "0.3.10" }
[profile.release]
lto = "thin"
strip = true
overflow-checks = true
6 changes: 3 additions & 3 deletions consensus/core/src/errors/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,16 +115,16 @@ pub enum RuleError {
#[error("wrong coinbase subsidy: expected {0} but got {1}")]
WrongSubsidy(u64, u64),

#[error("Transaction {0} is found more than once in the block")]
#[error("transaction {0} is found more than once in the block")]
DuplicateTransactions(TransactionId),

#[error("block has invalid proof-of-work")]
InvalidPoW,

#[error("Expected header pruning point is {0} but got {1}")]
#[error("expected header pruning point is {0} but got {1}")]
WrongHeaderPruningPoint(Hash, Hash),

#[error("Expected indirect parents {0} but got {1}")]
#[error("expected indirect parents {0} but got {1}")]
UnexpectedIndirectParents(TwoDimVecDisplay<Hash>, TwoDimVecDisplay<Hash>),

#[error("block {0} UTXO commitment is invalid - block header indicates {1}, but calculated value is {2}")]
Expand Down
9 changes: 6 additions & 3 deletions consensus/core/src/errors/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ use super::{difficulty::DifficultyError, sync::SyncManagerError, traversal::Trav

#[derive(Error, Debug, Clone)]
pub enum ConsensusError {
#[error("couldn't find block {0}")]
#[error("cannot find full block {0}")]
BlockNotFound(Hash),

#[error("cannot find header {0}")]
HeaderNotFound(Hash),

#[error("block {0} is invalid")]
InvalidBlock(Hash),

Expand All @@ -20,10 +23,10 @@ pub enum ConsensusError {
#[error("pruning point is not at sufficient depth from virtual, cannot obtain its final anticone at this stage")]
PruningPointInsufficientDepth,

#[error("sync manager error")]
#[error("sync manager error: {0}")]
SyncManagerError(#[from] SyncManagerError),

#[error("traversal error")]
#[error("traversal error: {0}")]
TraversalError(#[from] TraversalError),

#[error("difficulty error: {0}")]
Expand Down
11 changes: 7 additions & 4 deletions consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,14 +321,15 @@ impl Consensus {
}
}

/// Validates that a valid block *header* exists for `hash`
fn validate_block_exists(&self, hash: Hash) -> Result<(), ConsensusError> {
if match self.statuses_store.read().get(hash).unwrap_option() {
Some(status) => status.is_valid(),
None => false,
} {
Ok(())
} else {
Err(ConsensusError::BlockNotFound(hash))
Err(ConsensusError::HeaderNotFound(hash))
}
}

Expand Down Expand Up @@ -419,7 +420,9 @@ impl ConsensusApi for Consensus {

fn is_nearly_synced(&self) -> bool {
// See comment within `config.is_nearly_synced`
self.config.is_nearly_synced(self.get_sink_timestamp(), self.headers_store.get_daa_score(self.get_sink()).unwrap())
let sink = self.get_sink();
let compact = self.headers_store.get_compact_header_data(sink).unwrap();
self.config.is_nearly_synced(compact.timestamp, compact.daa_score)
}

fn get_virtual_chain_from_block(&self, hash: Hash) -> ConsensusResult<ChainPath> {
Expand Down Expand Up @@ -595,7 +598,7 @@ impl ConsensusApi for Consensus {

fn get_block_even_if_header_only(&self, hash: Hash) -> ConsensusResult<Block> {
let Some(status) = self.statuses_store.read().get(hash).unwrap_option().filter(|&status| status.has_block_header()) else {
return Err(ConsensusError::BlockNotFound(hash));
return Err(ConsensusError::HeaderNotFound(hash));
};
Ok(Block {
header: self.headers_store.get_header(hash).unwrap(),
Expand All @@ -605,7 +608,7 @@ impl ConsensusApi for Consensus {

fn get_ghostdag_data(&self, hash: Hash) -> ConsensusResult<ExternalGhostdagData> {
match self.get_block_status(hash) {
None => return Err(ConsensusError::BlockNotFound(hash)),
None => return Err(ConsensusError::HeaderNotFound(hash)),
Some(BlockStatus::StatusInvalid) => return Err(ConsensusError::InvalidBlock(hash)),
_ => {}
};
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/pipeline/deps_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl BlockTaskDependencyManager {
Self { pending: Mutex::new(HashMap::new()), idle_signal: Condvar::new() }
}

/// Registers the `(task, result_transmitters)` pair as a pending task. If a task with the same
/// Registers the `(task, result_transmitter)` pair as a pending task. If a task with the same
/// hash is already pending and has a corresponding internal task group, the task group is updated
/// with the additional task and the function returns `None` indicating that the task shall
/// not be queued for processing yet. The function is expected to be called by a single worker
Expand Down
14 changes: 7 additions & 7 deletions consensus/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ impl core::ops::Sub for &ProcessingCountersSnapshot {

fn sub(self, rhs: Self) -> Self::Output {
Self::Output {
blocks_submitted: self.blocks_submitted - rhs.blocks_submitted,
header_counts: self.header_counts - rhs.header_counts,
dep_counts: self.dep_counts - rhs.dep_counts,
body_counts: self.body_counts - rhs.body_counts,
txs_counts: self.txs_counts - rhs.txs_counts,
chain_block_counts: self.chain_block_counts - rhs.chain_block_counts,
mass_counts: self.mass_counts - rhs.mass_counts,
blocks_submitted: self.blocks_submitted.checked_sub(rhs.blocks_submitted).unwrap_or_default(),
header_counts: self.header_counts.checked_sub(rhs.header_counts).unwrap_or_default(),
dep_counts: self.dep_counts.checked_sub(rhs.dep_counts).unwrap_or_default(),
body_counts: self.body_counts.checked_sub(rhs.body_counts).unwrap_or_default(),
txs_counts: self.txs_counts.checked_sub(rhs.txs_counts).unwrap_or_default(),
chain_block_counts: self.chain_block_counts.checked_sub(rhs.chain_block_counts).unwrap_or_default(),
mass_counts: self.mass_counts.checked_sub(rhs.mass_counts).unwrap_or_default(),
}
}
}
14 changes: 6 additions & 8 deletions consensus/src/processes/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,23 +119,21 @@ impl<
}

pub fn create_headers_selected_chain_block_locator(&self, low: Option<Hash>, high: Option<Hash>) -> SyncManagerResult<Vec<Hash>> {
let sc_read_guard = self.selected_chain_store.read();
let hst_read_guard = self.header_selected_tip_store.read();
let pp_read_guard = self.pruning_point_store.read();
let low = low.unwrap_or_else(|| self.pruning_point_store.read().get().unwrap().pruning_point);
let high = high.unwrap_or_else(|| self.header_selected_tip_store.read().get().unwrap().hash);

let low = low.unwrap_or_else(|| pp_read_guard.get().unwrap().pruning_point);
let high = high.unwrap_or_else(|| hst_read_guard.get().unwrap().hash);
let sc_read = self.selected_chain_store.read();

if low == high {
return Ok(vec![low]);
}

let low_index = match sc_read_guard.get_by_hash(low).unwrap_option() {
let low_index = match sc_read.get_by_hash(low).unwrap_option() {
Some(index) => index,
None => return Err(SyncManagerError::BlockNotInSelectedParentChain(low)),
};

let high_index = match sc_read_guard.get_by_hash(high).unwrap_option() {
let high_index = match sc_read.get_by_hash(high).unwrap_option() {
Some(index) => index,
None => return Err(SyncManagerError::BlockNotInSelectedParentChain(high)),
};
Expand All @@ -148,7 +146,7 @@ impl<
let mut step = 1;
let mut current_index = high_index;
while current_index > low_index {
locator.push(sc_read_guard.get_by_index(current_index).unwrap());
locator.push(sc_read.get_by_index(current_index).unwrap());
if current_index < step {
break;
}
Expand Down
3 changes: 3 additions & 0 deletions database/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ pub fn open_db(db_path: PathBuf, create_if_missing: bool, parallelism: usize) ->
if parallelism > 1 {
opts.increase_parallelism(parallelism as i32);
}
// In most linux environments the limit is set to 1024, so we use 500 to give sufficient slack.
// TODO: fine-tune this parameter and additional parameters related to max file size
opts.set_max_open_files(500);
opts.create_if_missing(create_if_missing);
let db = Arc::new(DB::open(&opts, db_path.to_str().unwrap()).unwrap());
db
Expand Down
2 changes: 1 addition & 1 deletion docs/testnet11.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ Testnet11 uses a dedicated P2P port (16311) so that nodes from the usual tesnet
We reiterate that only the included miner should be used to maintain a level playing field.

First, we set-up a node:
1. Download and extract the [rusty-kaspa binaries](https://github.com/kaspanet/rusty-kaspa/releases/tag/v0.1.1). Alternatively, you can compile it from source yourself by following the instructions [here](https://github.com/kaspanet/rusty-kaspa/blob/master/README.md). The rest of the instructions are written assuming the former option. If you choose to locally compile the code, replace any command of the form ``<program> <arguments>`` with ``cargo run --bin <program> --release -- <arguments>`` (see example in the next item). All actions described below should be performed on a command line window where you navigated to the directory into which the binaries were extracted.
1. Download and extract the [rusty-kaspa binaries](https://github.com/kaspanet/rusty-kaspa/releases). Alternatively, you can compile it from source yourself by following the instructions [here](https://github.com/kaspanet/rusty-kaspa/blob/master/README.md). The rest of the instructions are written assuming the former option. If you choose to locally compile the code, replace any command of the form ``<program> <arguments>`` with ``cargo run --bin <program> --release -- <arguments>`` (see example in the next item). All actions described below should be performed on a command line window where you navigated to the directory into which the binaries were extracted.
2. Start the ``kaspad`` client with ``utxoindex`` enabled:

```
Expand Down
1 change: 0 additions & 1 deletion indexes/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ futures = "0.3.25"
triggered = "0.1"
derive_more.workspace = true
paste = "1.0.11"
# tokio = { version = "1.21.2", features = [ "rt-multi-thread", "macros", "signal" ]}

[dev-dependencies]
# parking_lot.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion indexes/processor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ futures = "0.3.25"
triggered = "0.1"
derive_more.workspace = true
paste = "1.0.11"
tokio = { version = "1.21.2", features = [ "rt-multi-thread", "macros", "signal" ]}
tokio = { workspace = true, features = [ "rt-multi-thread", "macros", "signal" ]}

[dev-dependencies]
kaspa-consensus.workspace = true
Expand Down
4 changes: 2 additions & 2 deletions mining/src/mempool/model/orphan_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ impl OrphanPool {

pub(crate) fn expire_low_priority_transactions(&mut self, consensus: &dyn ConsensusApi) -> RuleResult<()> {
let virtual_daa_score = consensus.get_virtual_daa_score();
if virtual_daa_score - self.last_expire_scan < self.config.orphan_expire_scan_interval_daa_score {
if virtual_daa_score < self.last_expire_scan + self.config.orphan_expire_scan_interval_daa_score {
return Ok(());
}

Expand All @@ -227,7 +227,7 @@ impl OrphanPool {
.values()
.filter_map(|x| {
if (x.priority == Priority::Low)
&& virtual_daa_score - x.added_at_daa_score > self.config.orphan_expire_interval_daa_score
&& virtual_daa_score > x.added_at_daa_score + self.config.orphan_expire_interval_daa_score
{
Some(x.id())
} else {
Expand Down
6 changes: 3 additions & 3 deletions mining/src/mempool/model/transactions_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,10 @@ impl TransactionsPool {
&& virtual_daa_score > x.added_at_daa_score + self.config.transaction_expire_interval_daa_score
{
debug!(
"Removing transaction {}, because it expired, DAAScore moved by {}, expire interval: {}",
"Removing transaction {}, because it expired, virtual DAA score is {} and expire limit is {}",
x.id(),
virtual_daa_score - x.added_at_daa_score,
self.config.transaction_expire_interval_daa_score
virtual_daa_score,
x.added_at_daa_score + self.config.transaction_expire_interval_daa_score
);
Some(x.id())
} else {
Expand Down
2 changes: 1 addition & 1 deletion protocol/flows/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ indexmap.workspace = true
async-trait.workspace = true

futures = { version = "0.3", default-features = false, features = ["alloc"] }
tokio = { version = "1.21.2", features = [
tokio = { workspace = true, features = [
"rt-multi-thread",
"macros",
"signal",
Expand Down
3 changes: 1 addition & 2 deletions protocol/flows/src/flow_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,7 @@ impl FlowContext {
) -> Self {
let hub = Hub::new();

// TODO: initial experiments show that this value is good for high bps as well so for now we avoid the log
let orphan_resolution_range = BASELINE_ORPHAN_RESOLUTION_RANGE; // + f64::log2(config.bps() as f64).round() as u32
let orphan_resolution_range = BASELINE_ORPHAN_RESOLUTION_RANGE + (config.bps() as f64).log(3.0).min(2.0) as u32;

// The maximum amount of orphans allowed in the orphans pool. This number is an
// approximation of how many orphans there can possibly be on average.
Expand Down
7 changes: 6 additions & 1 deletion protocol/flows/src/v5/ibd/negotiate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,12 @@ impl IbdFlow {
self.router, negotiation_restart_counter
)));
}
warn!("IBD chain negotiation with syncer {} restarted {} times", self.router, negotiation_restart_counter);
if negotiation_restart_counter > self.ctx.config.bps() {
// bps is just an intuitive threshold here
warn!("IBD chain negotiation with syncer {} restarted {} times", self.router, negotiation_restart_counter);
} else {
debug!("IBD chain negotiation with syncer {} restarted {} times", self.router, negotiation_restart_counter);
}

// An empty locator signals that the syncer chain was modified and no longer contains one of
// the queried hashes, so we restart the search. We use a shorter timeout here to avoid a timeout attack
Expand Down
2 changes: 1 addition & 1 deletion protocol/p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ borsh.workspace = true
futures = { version = "0.3", default-features = false, features = ["alloc"] }
prost = "0.11"
ctrlc = "3.2"
tokio = { version = "1.21.2", features = [
tokio = { workspace = true, features = [
"rt-multi-thread",
"macros",
"signal",
Expand Down
2 changes: 1 addition & 1 deletion rothschild/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ async fn main() {

let mut utxos = refresh_utxos(&rpc_client, kaspa_addr.clone(), &mut pending, coinbase_maturity).await;
let mut ticker = interval(Duration::from_secs_f64(1.0 / (args.tps.min(100) as f64)));
ticker.set_missed_tick_behavior(MissedTickBehavior::Burst);
ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);

let mut maximize_inputs = false;
let mut last_refresh = unix_now();
Expand Down
2 changes: 1 addition & 1 deletion rpc/grpc/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ futures = { version = "0.3" }
tonic = { version = "0.9", features = ["gzip"] }
prost = { version = "0.11" }
h2 = "0.3"
tokio = { version = "1.0", features = ["rt-multi-thread", "macros", "sync", "time"] }
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "sync", "time"] }
tokio-stream = "0.1"
async-stream = "0.3"
triggered = "0.1"
Expand Down
2 changes: 1 addition & 1 deletion rpc/grpc/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ futures = { version = "0.3" }
tonic = { version = "0.9.1", features = ["tls", "gzip"] }
prost = { version = "0.11" }
h2 = "0.3"
tokio = { version = "1.0", features = ["rt-multi-thread", "macros", "sync", "time"] }
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "sync", "time"] }
tokio-stream = "0.1"
async-stream = "0.3"
triggered = "0.1"
Expand Down
2 changes: 1 addition & 1 deletion rpc/grpc/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ futures = { version = "0.3" }
tonic = { version = "0.9", features = ["gzip"] }
prost = { version = "0.11" }
h2 = "0.3"
tokio = { version = "1.0", features = ["rt-multi-thread", "macros", "sync", "time"] }
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "sync", "time"] }
tokio-stream = "0.1"
async-stream = "0.3"
triggered = "0.1"
Expand Down

0 comments on commit 17c55bb

Please sign in to comment.