Skip to content

Commit

Permalink
Implement UTXO Return Address RPC command (#436)
Browse files Browse the repository at this point in the history
* Remove duplicate GetInfo on wrpc route

* Implement UTXO Return Address RPC

* Missed rename

* Fix dropping guard and document assumption of acquisition

find_accepting_chain_block_hash_at_daa_score no longer requires guard to be passed
and assumes the caller to acquire the lock

* Move utxo return address logic to its own file

* Create explicit ReturnAddressError for all NotFound cases

* fix clippy warns, comments, minor renames, move lock acquire to outer call

* avoid cloning the full MergesetBlockAcceptanceData which isn't used by caller (contains a vec)

* use AcceptanceData type and pass by ref

* extract `find_tx_from_acceptance_data` and reuse

* last rename

* Change consensus api to get_populated_transaction

* Rename ReturnAddressError to UtxoInquirerError

* Rename and move return_address file to utxo_inquirer

* better input iteration

* minor doc comments fixes

---------

Co-authored-by: Michael Sutton <[email protected]>
  • Loading branch information
coderofstuff and michaelsutton authored Jan 7, 2025
1 parent 178c060 commit 47c1059
Show file tree
Hide file tree
Showing 34 changed files with 482 additions and 12 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions cli/src/modules/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,21 @@ impl Rpc {
let result = rpc.get_current_block_color_call(None, GetCurrentBlockColorRequest { hash }).await?;
self.println(&ctx, result);
}
RpcApiOps::GetUtxoReturnAddress => {
if argv.is_empty() || argv.len() != 2 {
return Err(Error::custom("Please specify a txid and a accepting_block_daa_score"));
}

let txid = argv.remove(0);
let txid = RpcHash::from_hex(txid.as_str())?;

let accepting_block_daa_score = argv.remove(0).parse::<u64>()?;

let result =
rpc.get_utxo_return_address_call(None, GetUtxoReturnAddressRequest { txid, accepting_block_daa_score }).await?;

self.println(&ctx, result);
}
_ => {
tprintln!(ctx, "rpc method exists but is not supported by the cli: '{op_str}'\r\n");
return Ok(());
Expand Down
2 changes: 1 addition & 1 deletion components/addressmanager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ mod address_store_with_cache {
let target_uniform_dist = Uniform::new(1.0, num_of_buckets as f64).unwrap();
let uniform_cdf = |x: f64| target_uniform_dist.cdf(&x);
for _ in 0..num_of_trials {
// The weight sampled expected uniform distibution
// The weight sampled expected uniform distribution
let prioritized_address_distribution = am
.lock()
.iterate_prioritized_random_addresses(HashSet::new())
Expand Down
1 change: 1 addition & 0 deletions components/consensusmanager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ duration-string.workspace = true
futures-util.workspace = true
futures.workspace = true
itertools.workspace = true
kaspa-addresses.workspace=true
kaspa-consensus-core.workspace = true
kaspa-consensus-notify.workspace = true
kaspa-core.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion components/consensusmanager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ impl StagingConsensus {
// Drop `prev` so that deletion below succeeds
drop(prev);
// Staging was committed and is now the active consensus so we can delete
// any pervious, now inactive, consensus entries
// any previous, now inactive, consensus entries
self.manager.delete_inactive_consensus_entries();
}

Expand Down
11 changes: 10 additions & 1 deletion components/consensusmanager/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use kaspa_consensus_core::{
header::Header,
pruning::{PruningPointProof, PruningPointTrustedData, PruningPointsList},
trusted::{ExternalGhostdagData, TrustedBlock},
tx::{MutableTransaction, Transaction, TransactionOutpoint, UtxoEntry},
tx::{MutableTransaction, SignableTransaction, Transaction, TransactionOutpoint, UtxoEntry},
utxo::utxo_inquirer::UtxoInquirerError,
BlockHashSet, BlueWorkType, ChainPath, Hash,
};
use kaspa_utils::sync::rwlock::*;
Expand Down Expand Up @@ -313,6 +314,14 @@ impl ConsensusSessionOwned {
self.clone().spawn_blocking(|c| c.get_chain_block_samples()).await
}

pub async fn async_get_populated_transaction(
&self,
txid: Hash,
accepting_block_daa_score: u64,
) -> Result<SignableTransaction, UtxoInquirerError> {
self.clone().spawn_blocking(move |c| c.get_populated_transaction(txid, accepting_block_daa_score)).await
}

/// Returns the antipast of block `hash` from the POV of `context`, i.e. `antipast(hash) ∩ past(context)`.
/// Since this might be an expensive operation for deep blocks, we allow the caller to specify a limit
/// `max_traversal_allowed` on the maximum amount of blocks to traverse for obtaining the answer
Expand Down
1 change: 1 addition & 0 deletions consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ faster-hex.workspace = true
futures-util.workspace = true
indexmap.workspace = true
itertools.workspace = true
kaspa-addresses.workspace = true
kaspa-consensus-core.workspace = true
kaspa-consensus-notify.workspace = true
kaspa-consensusmanager.workspace = true
Expand Down
9 changes: 8 additions & 1 deletion consensus/core/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use crate::{
header::Header,
pruning::{PruningPointProof, PruningPointTrustedData, PruningPointsList, PruningProofMetadata},
trusted::{ExternalGhostdagData, TrustedBlock},
tx::{MutableTransaction, Transaction, TransactionOutpoint, UtxoEntry},
tx::{MutableTransaction, SignableTransaction, Transaction, TransactionOutpoint, UtxoEntry},
utxo::utxo_inquirer::UtxoInquirerError,
BlockHashSet, BlueWorkType, ChainPath,
};
use kaspa_hashes::Hash;
Expand Down Expand Up @@ -170,6 +171,12 @@ pub trait ConsensusApi: Send + Sync {
unimplemented!()
}

/// Returns the fully populated transaction with the given txid which was accepted at the provided accepting_block_daa_score.
/// The argument `accepting_block_daa_score` is expected to be the DAA score of the accepting chain block of `txid`.
fn get_populated_transaction(&self, txid: Hash, accepting_block_daa_score: u64) -> Result<SignableTransaction, UtxoInquirerError> {
unimplemented!()
}

fn get_virtual_parents(&self) -> BlockHashSet {
unimplemented!()
}
Expand Down
1 change: 1 addition & 0 deletions consensus/core/src/utxo/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod utxo_collection;
pub mod utxo_diff;
pub mod utxo_error;
pub mod utxo_inquirer;
pub mod utxo_view;
38 changes: 38 additions & 0 deletions consensus/core/src/utxo/utxo_inquirer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use kaspa_hashes::Hash;
use thiserror::Error;

#[derive(Error, Debug, Clone)]
pub enum UtxoInquirerError {
#[error("Transaction is already pruned")]
AlreadyPruned,
#[error("Transaction return address is coinbase")]
TxFromCoinbase,
#[error("Transaction not found at given accepting daa score")]
NoTxAtScore,
#[error("Transaction was found but not standard")]
NonStandard,
#[error("Did not find compact header for block hash {0} ")]
MissingCompactHeaderForBlockHash(Hash),
#[error("Did not find containing_acceptance for tx {0} ")]
MissingContainingAcceptanceForTx(Hash),
#[error("Did not find block {0} at block tx store")]
MissingBlockFromBlockTxStore(Hash),
#[error("Did not find index {0} in transactions of block {1}")]
MissingTransactionIndexOfBlock(usize, Hash),
#[error("Expected {0} to match {1} when checking block_transaction_store using array index of transaction")]
UnexpectedTransactionMismatch(Hash, Hash),
#[error("Did not find a utxo diff for chain block {0} ")]
MissingUtxoDiffForChainBlock(Hash),
#[error("Transaction {0} acceptance data must also be in the same block in this case")]
MissingOtherTransactionAcceptanceData(Hash),
#[error("Did not find index for hash {0}")]
MissingIndexForHash(Hash),
#[error("Did not find tip data")]
MissingTipData,
#[error("Did not find a hash at index {0} ")]
MissingHashAtIndex(u64),
#[error("Did not find acceptance data for chain block {0}")]
MissingAcceptanceDataForChainBlock(Hash),
#[error("Utxo entry is not filled")]
UnfilledUtxoEntry,
}
9 changes: 8 additions & 1 deletion consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ use kaspa_consensus_core::{
network::NetworkType,
pruning::{PruningPointProof, PruningPointTrustedData, PruningPointsList, PruningProofMetadata},
trusted::{ExternalGhostdagData, TrustedBlock},
tx::{MutableTransaction, Transaction, TransactionOutpoint, UtxoEntry},
tx::{MutableTransaction, SignableTransaction, Transaction, TransactionOutpoint, UtxoEntry},
utxo::utxo_inquirer::UtxoInquirerError,
BlockHashSet, BlueWorkType, ChainPath, HashMapCustomHasher,
};
use kaspa_consensus_notify::root::ConsensusNotificationRoot;
Expand Down Expand Up @@ -687,6 +688,12 @@ impl ConsensusApi for Consensus {
sample_headers
}

fn get_populated_transaction(&self, txid: Hash, accepting_block_daa_score: u64) -> Result<SignableTransaction, UtxoInquirerError> {
// We need consistency between the pruning_point_store, utxo_diffs_store, block_transactions_store, selected chain and headers store reads
let _guard = self.pruning_lock.blocking_read();
self.virtual_processor.get_populated_transaction(txid, accepting_block_daa_score, self.get_source())
}

fn get_virtual_parents(&self) -> BlockHashSet {
self.lkg_virtual_state.load().parents.iter().copied().collect()
}
Expand Down
1 change: 1 addition & 0 deletions consensus/src/pipeline/virtual_processor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod errors;
mod processor;
mod utxo_inquirer;
mod utxo_validation;
pub use processor::*;
pub mod test_block_builder;
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/pipeline/virtual_processor/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ pub struct VirtualStateProcessor {
pub(super) block_window_cache_for_past_median_time: Arc<BlockWindowCacheStore>,

// Pruning lock
pruning_lock: SessionLock,
pub(super) pruning_lock: SessionLock,

// Notifier
notification_root: Arc<ConsensusNotificationRoot>,
Expand Down
189 changes: 189 additions & 0 deletions consensus/src/pipeline/virtual_processor/utxo_inquirer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
use std::{cmp, sync::Arc};

use kaspa_consensus_core::{
acceptance_data::AcceptanceData,
tx::{SignableTransaction, Transaction, UtxoEntry},
utxo::{utxo_diff::ImmutableUtxoDiff, utxo_inquirer::UtxoInquirerError},
};
use kaspa_core::{trace, warn};
use kaspa_hashes::Hash;

use crate::model::stores::{
acceptance_data::AcceptanceDataStoreReader, block_transactions::BlockTransactionsStoreReader, headers::HeaderStoreReader,
selected_chain::SelectedChainStoreReader, utxo_diffs::UtxoDiffsStoreReader,
};

use super::VirtualStateProcessor;

impl VirtualStateProcessor {
/// Returns the fully populated transaction with the given txid which was accepted at the provided accepting_block_daa_score.
/// The argument `accepting_block_daa_score` is expected to be the DAA score of the accepting chain block of `txid`.
///
/// *Assumed to be called under the pruning read lock.*
pub fn get_populated_transaction(
&self,
txid: Hash,
accepting_block_daa_score: u64,
source_hash: Hash,
) -> Result<SignableTransaction, UtxoInquirerError> {
let source_daa_score = self
.headers_store
.get_compact_header_data(source_hash)
.map(|compact_header| compact_header.daa_score)
.map_err(|_| UtxoInquirerError::MissingCompactHeaderForBlockHash(source_hash))?;

if accepting_block_daa_score < source_daa_score {
// Early exit if target daa score is lower than that of pruning point's daa score:
return Err(UtxoInquirerError::AlreadyPruned);
}

let (matching_chain_block_hash, acceptance_data) =
self.find_accepting_chain_block_hash_at_daa_score(accepting_block_daa_score, source_hash)?;

// Expected to never fail, since we found the acceptance data and therefore there must be matching diff
let utxo_diff = self
.utxo_diffs_store
.get(matching_chain_block_hash)
.map_err(|_| UtxoInquirerError::MissingUtxoDiffForChainBlock(matching_chain_block_hash))?;

let tx = self.find_tx_from_acceptance_data(txid, &acceptance_data)?;

let mut populated_tx = SignableTransaction::new(tx);

let removed_diffs = utxo_diff.removed();

populated_tx.tx.inputs.iter().enumerate().for_each(|(index, input)| {
let filled_utxo = if let Some(utxo_entry) = removed_diffs.get(&input.previous_outpoint) {
Some(utxo_entry.clone().to_owned())
} else {
// This handles this rare scenario:
// - UTXO0 is spent by TX1 and creates UTXO1
// - UTXO1 is spent by TX2 and creates UTXO2
// - A chain block happens to accept both of these
// In this case, removed_diff wouldn't contain the outpoint of the created-and-immediately-spent UTXO
// so we use the transaction (which also has acceptance data in this block) and look at its outputs
let other_txid = input.previous_outpoint.transaction_id;
let other_tx = self.find_tx_from_acceptance_data(other_txid, &acceptance_data).unwrap();
let output = &other_tx.outputs[input.previous_outpoint.index as usize];
let utxo_entry =
UtxoEntry::new(output.value, output.script_public_key.clone(), accepting_block_daa_score, other_tx.is_coinbase());
Some(utxo_entry)
};

populated_tx.entries[index] = filled_utxo;
});

Ok(populated_tx)
}

/// Find the accepting chain block hash at the given DAA score by binary searching
/// through selected chain store using indexes.
/// This method assumes that local caller have acquired the pruning read lock to guarantee
/// consistency between reads on the selected_chain_store and headers_store (as well as
/// other stores outside). If no such lock is acquired, this method tries to find
/// the accepting chain block hash on a best effort basis (may fail if parts of the data
/// are pruned between two sequential calls)
fn find_accepting_chain_block_hash_at_daa_score(
&self,
target_daa_score: u64,
source_hash: Hash,
) -> Result<(Hash, Arc<AcceptanceData>), UtxoInquirerError> {
let sc_read = self.selected_chain_store.read();

let source_index = sc_read.get_by_hash(source_hash).map_err(|_| UtxoInquirerError::MissingIndexForHash(source_hash))?;
let (tip_index, tip_hash) = sc_read.get_tip().map_err(|_| UtxoInquirerError::MissingTipData)?;
let tip_daa_score = self
.headers_store
.get_compact_header_data(tip_hash)
.map(|tip| tip.daa_score)
.map_err(|_| UtxoInquirerError::MissingCompactHeaderForBlockHash(tip_hash))?;

// For a chain segment it holds that len(segment) <= daa_score(segment end) - daa_score(segment start). This is true
// because each chain block increases the daa score by at least one. Hence we can lower bound our search by high index
// minus the daa score gap as done below
let mut low_index = tip_index.saturating_sub(tip_daa_score.saturating_sub(target_daa_score)).max(source_index);
let mut high_index = tip_index;

let matching_chain_block_hash = loop {
// Binary search for the chain block that matches the target_daa_score
// 0. Get the mid point index
let mid = low_index + (high_index - low_index) / 2;

// 1. Get the chain block hash at that index. Error if we cannot find a hash at that index
let hash = sc_read.get_by_index(mid).map_err(|_| {
trace!("Did not find a hash at index {}", mid);
UtxoInquirerError::MissingHashAtIndex(mid)
})?;

// 2. Get the compact header so we have access to the daa_score. Error if we cannot find the header
let compact_header = self.headers_store.get_compact_header_data(hash).map_err(|_| {
trace!("Did not find a compact header with hash {}", hash);
UtxoInquirerError::MissingCompactHeaderForBlockHash(hash)
})?;

// 3. Compare block daa score to our target
match compact_header.daa_score.cmp(&target_daa_score) {
cmp::Ordering::Equal => {
// We found the chain block we need
break hash;
}
cmp::Ordering::Greater => {
high_index = mid - 1;
}
cmp::Ordering::Less => {
low_index = mid + 1;
}
}

if low_index > high_index {
return Err(UtxoInquirerError::NoTxAtScore);
}
};

let acceptance_data = self
.acceptance_data_store
.get(matching_chain_block_hash)
.map_err(|_| UtxoInquirerError::MissingAcceptanceDataForChainBlock(matching_chain_block_hash))?;

Ok((matching_chain_block_hash, acceptance_data))
}

/// Finds a transaction's containing block hash and index within block through
/// the accepting block acceptance data
fn find_containing_block_and_index_from_acceptance_data(
&self,
txid: Hash,
acceptance_data: &AcceptanceData,
) -> Option<(Hash, usize)> {
acceptance_data.iter().find_map(|mbad| {
let tx_arr_index =
mbad.accepted_transactions.iter().find_map(|tx| (tx.transaction_id == txid).then_some(tx.index_within_block as usize));
tx_arr_index.map(|index| (mbad.block_hash, index))
})
}

/// Finds a transaction through the accepting block acceptance data (and using indexed info therein for
/// finding the tx in the block transactions store)
fn find_tx_from_acceptance_data(&self, txid: Hash, acceptance_data: &AcceptanceData) -> Result<Transaction, UtxoInquirerError> {
let (containing_block, index) = self
.find_containing_block_and_index_from_acceptance_data(txid, acceptance_data)
.ok_or(UtxoInquirerError::MissingContainingAcceptanceForTx(txid))?;

let tx = self
.block_transactions_store
.get(containing_block)
.map_err(|_| UtxoInquirerError::MissingBlockFromBlockTxStore(containing_block))
.and_then(|block_txs| {
block_txs.get(index).cloned().ok_or(UtxoInquirerError::MissingTransactionIndexOfBlock(index, containing_block))
})?;

if tx.id() != txid {
// Should never happen, but do a sanity check. This would mean something went wrong with storing block transactions.
// Sanity check is necessary to guarantee that this function will never give back a wrong address (err on the side of not found)
warn!("Expected {} to match {} when checking block_transaction_store using array index of transaction", tx.id(), txid);
return Err(UtxoInquirerError::UnexpectedTransactionMismatch(tx.id(), txid));
}

Ok(tx)
}
}
Loading

0 comments on commit 47c1059

Please sign in to comment.