Skip to content

Commit

Permalink
Merge pull request #150 from hirosystems/chore/adjusting-0-2
Browse files Browse the repository at this point in the history
chore: adjusting v0.2
  • Loading branch information
lgalabru authored Aug 14, 2023
2 parents 42fd263 + bf7acb9 commit 150f7d5
Show file tree
Hide file tree
Showing 13 changed files with 424 additions and 319 deletions.
27 changes: 13 additions & 14 deletions components/hord-cli/src/cli/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::config::generator::generate_config;
use crate::config::Config;
use crate::core::pipeline::download_and_pipeline_blocks;
use crate::core::pipeline::processors::block_ingestion::start_block_ingestion_processor;
use crate::core::pipeline::processors::block_archiving::start_block_archiving_processor;
use crate::core::pipeline::processors::start_inscription_indexing_processor;
use crate::core::protocol::inscription_parsing::parse_ordinals_and_standardize_block;
use crate::core::protocol::inscription_parsing::parse_inscriptions_and_standardize_block;
use crate::download::download_ordinals_dataset_if_required;
use crate::scan::bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate;
use crate::service::Service;
Expand All @@ -12,8 +12,9 @@ use crate::db::{
delete_data_in_hord_db, find_all_inscription_transfers, find_all_inscriptions_in_block,
find_all_transfers_in_block, find_inscription_with_id, find_last_block_inserted,
find_latest_inscription_block_height, find_lazy_block_at_block_height,
open_readonly_hord_db_conn, open_readonly_hord_db_conn_rocks_db, open_readwrite_hord_db_conn,
open_readwrite_hord_db_conn_rocks_db, initialize_hord_db, get_default_hord_db_file_path,
get_default_hord_db_file_path, initialize_hord_db, open_readonly_hord_db_conn,
open_readonly_hord_db_conn_rocks_db, open_readwrite_hord_db_conn,
open_readwrite_hord_db_conn_rocks_db,
};
use chainhook_sdk::bitcoincore_rpc::{Auth, Client, RpcApi};
use chainhook_sdk::chainhooks::types::HttpHook;
Expand Down Expand Up @@ -263,7 +264,7 @@ struct StartCommand {
enum HordDbCommand {
/// Initialize a new hord db
#[clap(name = "new", bin_name = "new")]
New(SyncHordDbCommand),
New(SyncHordDbCommand),
/// Catch-up hord db
#[clap(name = "sync", bin_name = "sync")]
Sync(SyncHordDbCommand),
Expand Down Expand Up @@ -482,8 +483,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
let mut total_inscriptions = 0;
let mut total_transfers = 0;

let inscriptions_db_conn =
initialize_hord_db(&config.expected_cache_path(), &ctx);
let inscriptions_db_conn = initialize_hord_db(&config.expected_cache_path(), &ctx);
while let Some(block_height) = block_range.pop_front() {
let inscriptions =
find_all_inscriptions_in_block(&block_height, &inscriptions_db_conn, &ctx);
Expand Down Expand Up @@ -529,7 +529,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
if total_transfers == 0 && total_inscriptions == 0 {
let db_file_path = get_default_hord_db_file_path(&config.expected_cache_path());
warn!(ctx.expect_logger(), "No data available. Check the validity of the range being scanned and the validity of your local database {}", db_file_path.display());
}
}
}
}
Command::Scan(ScanCommand::Inscription(cmd)) => {
Expand Down Expand Up @@ -637,28 +637,28 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
Command::Db(HordDbCommand::New(cmd)) => {
let config = Config::default(false, false, false, &cmd.config_path)?;
initialize_hord_db(&config.expected_cache_path(), &ctx);
},
}
Command::Db(HordDbCommand::Sync(cmd)) => {
let config = Config::default(false, false, false, &cmd.config_path)?;
initialize_hord_db(&config.expected_cache_path(), &ctx);
let service = Service::new(config, ctx.clone());
service.update_state(None).await?;
},
}
Command::Db(HordDbCommand::Repair(subcmd)) => match subcmd {
RepairCommand::Blocks(cmd) => {
let config = Config::default(false, false, false, &cmd.config_path)?;
let mut hord_config = config.get_hord_config();
hord_config.network_thread_max = cmd.network_threads;

let block_ingestion_processor = start_block_ingestion_processor(&config, ctx, None);
let block_ingestion_processor =
start_block_archiving_processor(&config, ctx, false, None);

download_and_pipeline_blocks(
&config,
cmd.start_block,
cmd.end_block,
hord_config.first_inscription_height,
Some(&block_ingestion_processor),
Some(&block_ingestion_processor),
10_000,
&ctx,
)
Expand All @@ -677,7 +677,6 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
cmd.start_block,
cmd.end_block,
hord_config.first_inscription_height,
None,
Some(&inscription_indexing_processor),
10_000,
&ctx,
Expand Down Expand Up @@ -763,7 +762,7 @@ pub async fn fetch_and_standardize_block(
download_and_parse_block_with_retry(http_client, &block_hash, &bitcoin_config, &ctx)
.await?;

parse_ordinals_and_standardize_block(block_breakdown, &bitcoin_config.network, &ctx)
parse_inscriptions_and_standardize_block(block_breakdown, &bitcoin_config.network, &ctx)
.map_err(|(e, _)| e)
}

Expand Down
14 changes: 8 additions & 6 deletions components/hord-cli/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,9 @@ impl Config {
let bootstrap = match config_file.bootstrap {
Some(bootstrap) => match bootstrap.download_url {
Some(ref url) => BootstrapConfig::Download(url.to_string()),
None => BootstrapConfig::Build
}
None => BootstrapConfig::Build
None => BootstrapConfig::Build,
},
None => BootstrapConfig::Build,
};

let config = Config {
Expand Down Expand Up @@ -243,7 +243,7 @@ impl Config {
pub fn should_bootstrap_through_download(&self) -> bool {
match &self.bootstrap {
BootstrapConfig::Build => false,
BootstrapConfig::Download(_) => true
BootstrapConfig::Download(_) => true,
}
}

Expand All @@ -267,7 +267,7 @@ impl Config {
fn expected_remote_ordinals_sqlite_base_url(&self) -> &str {
match &self.bootstrap {
BootstrapConfig::Build => unreachable!(),
BootstrapConfig::Download(url) => &url
BootstrapConfig::Download(url) => &url,
}
}

Expand Down Expand Up @@ -367,7 +367,9 @@ impl Config {
working_dir: default_cache_path(),
},
http_api: PredicatesApi::Off,
bootstrap: BootstrapConfig::Download(DEFAULT_MAINNET_ORDINALS_SQLITE_ARCHIVE.to_string()),
bootstrap: BootstrapConfig::Download(
DEFAULT_MAINNET_ORDINALS_SQLITE_ARCHIVE.to_string(),
),
limits: LimitsConfig {
max_number_of_bitcoin_predicates: BITCOIN_MAX_PREDICATE_REGISTRATION,
max_number_of_concurrent_bitcoin_scans: BITCOIN_SCAN_THREAD_POOL_SIZE,
Expand Down
4 changes: 2 additions & 2 deletions components/hord-cli/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,9 @@ pub fn should_sync_hord_db(
start_block += 1;

// TODO: Gracefully handle Regtest, Testnet and Signet
let (mut end_block, speed) = if start_block < 200_000 {
let (mut end_block, speed) = if start_block <= 200_000 {
(end_block.min(200_000), 10_000)
} else if start_block < 550_000 {
} else if start_block <= 550_000 {
(end_block.min(550_000), 1_000)
} else {
(end_block, 100)
Expand Down
132 changes: 76 additions & 56 deletions components/hord-cli/src/core/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use chainhook_sdk::indexer::bitcoin::{
build_http_client, parse_downloaded_block, try_download_block_bytes_with_retry,
};

use super::protocol::inscription_parsing::parse_ordinals_and_standardize_block;
use super::protocol::inscription_parsing::parse_inscriptions_and_standardize_block;

pub enum PostProcessorCommand {
Start,
Expand All @@ -25,7 +25,9 @@ pub enum PostProcessorCommand {
}

pub enum PostProcessorEvent {
EmptyQueue,
Started,
Terminated,
Expired,
}

pub struct PostProcessorController {
Expand All @@ -39,8 +41,7 @@ pub async fn download_and_pipeline_blocks(
start_block: u64,
end_block: u64,
start_sequencing_blocks_at_height: u64,
blocks_post_processor_pre_sequence: Option<&PostProcessorController>,
blocks_post_processor_post_sequence: Option<&PostProcessorController>,
blocks_post_processor: Option<&PostProcessorController>,
speed: usize,
ctx: &Context,
) -> Result<(), String> {
Expand Down Expand Up @@ -101,7 +102,7 @@ pub async fn download_and_pipeline_blocks(
rx_thread_pool.push(rx);
}

for rx in rx_thread_pool.into_iter() {
for (thread_index, rx) in rx_thread_pool.into_iter().enumerate() {
let block_compressed_tx_moved = block_compressed_tx.clone();
let moved_ctx: Context = moved_ctx.clone();
let moved_bitcoin_network = moved_bitcoin_network.clone();
Expand All @@ -115,7 +116,7 @@ pub async fn download_and_pipeline_blocks(
.expect("unable to compress block");
let block_height = raw_block_data.height as u64;
let block_data = if block_height >= start_sequencing_blocks_at_height {
let block_data = parse_ordinals_and_standardize_block(
let block_data = parse_inscriptions_and_standardize_block(
raw_block_data,
&moved_bitcoin_network,
&moved_ctx,
Expand All @@ -131,18 +132,16 @@ pub async fn download_and_pipeline_blocks(
compressed_block,
)));
}
moved_ctx
.try_log(|logger| debug!(logger, "Exiting processing thread {thread_index}"));
})
.expect("unable to spawn thread");
thread_pool_handles.push(handle);
}

let cloned_ctx = ctx.clone();

let blocks_post_processor_post_sequence_commands_tx = blocks_post_processor_post_sequence
.as_ref()
.and_then(|p| Some(p.commands_tx.clone()));

let blocks_post_processor_pre_sequence_commands_tx = blocks_post_processor_pre_sequence
let blocks_post_processor_commands_tx = blocks_post_processor
.as_ref()
.and_then(|p| Some(p.commands_tx.clone()));

Expand All @@ -151,19 +150,52 @@ pub async fn download_and_pipeline_blocks(
let mut inbox = HashMap::new();
let mut inbox_cursor = start_sequencing_blocks_at_height.max(start_block);
let mut blocks_processed = 0;
let mut pre_seq_processor_started = false;
let mut post_seq_processor_started = false;
let mut processor_started = false;
let mut stop_runloop = false;

loop {
if stop_runloop {
cloned_ctx.try_log(|logger| {
info!(
logger,
"#{blocks_processed} blocks successfully sent to processor"
)
});
break;
}

// Dequeue all the blocks available
let mut new_blocks = vec![];
while let Ok(Some((block_height, block, compacted_block))) =
block_compressed_rx.try_recv()
{
blocks_processed += 1;
new_blocks.push((block_height, block, compacted_block));
if new_blocks.len() >= 10_000 {
break;
while let Ok(message) = block_compressed_rx.try_recv() {
match message {
Some((block_height, block, compacted_block)) => {
blocks_processed += 1;
new_blocks.push((block_height, block, compacted_block));
// Max batch size: 10_000 blocks
if new_blocks.len() >= 10_000 {
break;
}
}
None => {
stop_runloop = true;
}
}
}

if blocks_processed == number_of_blocks_to_process {
stop_runloop = true;
}

// Early "continue"
if new_blocks.is_empty() {
sleep(Duration::from_millis(500));
continue;
}

if let Some(ref blocks_tx) = blocks_post_processor_commands_tx {
if !processor_started {
processor_started = true;
let _ = blocks_tx.send(PostProcessorCommand::Start);
}
}

Expand All @@ -176,18 +208,15 @@ pub async fn download_and_pipeline_blocks(
}
}

if !ooo_compacted_blocks.is_empty() {
if let Some(ref blocks_tx) = blocks_post_processor_pre_sequence_commands_tx {
if !pre_seq_processor_started {
pre_seq_processor_started = true;
let _ = blocks_tx.send(PostProcessorCommand::Start);
}

// Early "continue"
if inbox.is_empty() {
if let Some(ref blocks_tx) = blocks_post_processor_commands_tx {
let _ = blocks_tx.send(PostProcessorCommand::ProcessBlocks(
ooo_compacted_blocks,
vec![],
));
}
continue;
}

// In order processing: construct the longest sequence of known blocks
Expand All @@ -199,24 +228,8 @@ pub async fn download_and_pipeline_blocks(
inbox_cursor += 1;
}

if blocks.is_empty() {
if blocks_processed == number_of_blocks_to_process {
cloned_ctx.try_log(|logger| {
info!(
logger,
"#{blocks_processed} blocks successfully sent to processor"
)
});
break;
} else {
sleep(Duration::from_secs(1));
}
} else {
if let Some(ref blocks_tx) = blocks_post_processor_post_sequence_commands_tx {
if !post_seq_processor_started {
post_seq_processor_started = true;
let _ = blocks_tx.send(PostProcessorCommand::Start);
}
if !blocks.is_empty() {
if let Some(ref blocks_tx) = blocks_post_processor_commands_tx {
let _ = blocks_tx.send(PostProcessorCommand::ProcessBlocks(
compacted_blocks,
blocks,
Expand Down Expand Up @@ -248,7 +261,7 @@ pub async fn download_and_pipeline_blocks(
}

ctx.try_log(|logger| {
info!(
debug!(
logger,
"Pipeline successfully fed with sequence of blocks ({} to {})", start_block, end_block
)
Expand All @@ -258,26 +271,33 @@ pub async fn download_and_pipeline_blocks(
let _ = tx.send(None);
}

ctx.try_log(|logger| debug!(logger, "Enqueued pipeline termination commands"));

for handle in thread_pool_handles.into_iter() {
let _ = handle.join();
}

if let Some(post_processor) = blocks_post_processor_pre_sequence {
loop {
if let Ok(PostProcessorEvent::EmptyQueue) = post_processor.events_rx.recv() {
break;
}
}
}
ctx.try_log(|logger| debug!(logger, "Pipeline successfully terminated"));

if let Some(post_processor) = blocks_post_processor_post_sequence {
if let Some(post_processor) = blocks_post_processor {
if let Ok(PostProcessorEvent::Started) = post_processor.events_rx.recv() {
ctx.try_log(|logger| debug!(logger, "Block post processing started"));
let _ = post_processor
.commands_tx
.send(PostProcessorCommand::Terminate);
}
loop {
if let Ok(PostProcessorEvent::EmptyQueue) = post_processor.events_rx.recv() {
break;
if let Ok(signal) = post_processor.events_rx.recv() {
match signal {
PostProcessorEvent::Terminated | PostProcessorEvent::Expired => break,
PostProcessorEvent::Started => unreachable!(),
}
}
}
}

let _ = block_compressed_tx.send(None);

let _ = storage_thread.join();
let _ = set.shutdown();

Expand Down
Loading

0 comments on commit 150f7d5

Please sign in to comment.