Skip to content

Commit

Permalink
In-process rbuilder (#228)
Browse files Browse the repository at this point in the history
- Refactors `LiveBuilderConfig::new_builder` to take a generic provider
- Refactors `WalletBalanceWatcher` to take a generic provider
- Adds new crate `reth-rbuilder` that runs rbuilder in-process with reth
- Changes config default ports that overlap with reth
  • Loading branch information
liamaharon authored Oct 28, 2024
1 parent ccd9cb5 commit 7ff6fe1
Show file tree
Hide file tree
Showing 17 changed files with 272 additions and 62 deletions.
23 changes: 21 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[workspace]
members = [
"crates/rbuilder",
"crates/reth-rbuilder",
"crates/rbuilder/src/test_utils",
"crates/rbuilder/src/telemetry/metrics_macros"
]
Expand All @@ -23,12 +24,16 @@ edition = "2021"

[workspace.dependencies]
reth = { git = "https://github.com/paradigmxyz/reth", tag = "v1.0.6" }
reth-cli-util = { git = "https://github.com/paradigmxyz/reth", tag = "v1.0.6" }
reth-db = { git = "https://github.com/paradigmxyz/reth", tag = "v1.0.6" }
reth-db-api = { git = "https://github.com/paradigmxyz/reth", tag = "v1.0.6" }
reth-db-common = { git = "https://github.com/paradigmxyz/reth", tag = "v1.0.6" }
reth-errors = { git = "https://github.com/paradigmxyz/reth", tag = "v1.0.6" }
reth-libmdbx = { git = "https://github.com/paradigmxyz/reth", tag = "v1.0.6" }
reth-payload-builder = { git = "https://github.com/paradigmxyz/reth", tag = "v1.0.6" }
reth-node-api = { git = "https://github.com/paradigmxyz/reth", tag = "v1.0.6" }
reth-node-builder = { git = "https://github.com/paradigmxyz/reth", tag = "v1.0.6" }
reth-node-ethereum = { git = "https://github.com/paradigmxyz/reth", tag = "v1.0.6" }
reth-trie = { git = "https://github.com/paradigmxyz/reth", tag = "v1.0.6" }
reth-trie-parallel = { git = "https://github.com/paradigmxyz/reth", tag = "v1.0.6" }
reth-basic-payload-builder = { git = "https://github.com/paradigmxyz/reth", tag = "v1.0.6" }
Expand Down Expand Up @@ -75,3 +80,12 @@ alloy-rpc-types-engine = { version = "0.3.0", features = [
] }
alloy-rpc-types-eth = { version = "0.3.0" }
alloy-signer-local = { version = "0.3.0" }

clap = { version = "4.4.3" }
eyre = { version = "0.6.8" }
libc = { version = "0.2.161" }
tikv-jemallocator = { version = "0.5.4" }
tokio = "1.38.0"
tokio-util = "0.7.11"
tracing = "0.1.37"

2 changes: 1 addition & 1 deletion config-live-example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,4 @@ drop_failed_orders = true
name = "parallel"
algo = "parallel-builder"
discard_txs = true
num_threads = 25
num_threads = 25
10 changes: 5 additions & 5 deletions crates/rbuilder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ build = "build.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = "1.38.0"
tokio.workspace = true
serde = "1.0.188"
serde_json = "1.0.105"
thiserror = "1.0.47"
eyre = "0.6.8"
eyre.workspace = true
reth.workspace = true
reth-db.workspace = true
reth-db-common.workspace = true
Expand Down Expand Up @@ -70,7 +70,7 @@ sqlx = { version = "0.7.1", features = [
"time",
"uuid",
] }
tracing = "0.1.37"
tracing.workspace = true
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] }
csv = "1.2.2"
zip = "0.6.6"
Expand All @@ -81,7 +81,7 @@ bigdecimal = "0.4.1"
mempool-dumpster = "0.1.1"
itertools = "0.11.0"
tokio-stream = "0.1.14"
clap = { version = "4.4.3", features = ["derive", "env"] }
clap = { workspace = true, features = ["derive", "env"] }
priority-queue = "2.0.3"
secp256k1 = { version = "0.29", features = [
"global-context",
Expand All @@ -97,7 +97,7 @@ ssz_rs = { git = "https://github.com/ralexstokes/ssz-rs.git", version = "0.9.0"
beacon-api-client = { git = "https://github.com/ralexstokes/ethereum-consensus/", rev = "cf3c404043230559660810bc0c9d6d5a8498d819" }
ethereum-consensus = { git = "https://github.com/ralexstokes/ethereum-consensus/", rev = "cf3c404043230559660810bc0c9d6d5a8498d819" }
ssz_rs_derive = { git = "https://github.com/ralexstokes/ssz-rs.git", version = "0.9.0" }
tokio-util = "0.7.11"
tokio-util = { workspace = true }
uuid = { version = "1.6.1", features = ["serde", "v5", "v4"] }
prometheus = "0.13.4"
hyper = { version = "1.3.1", features = ["server", "full"] }
Expand Down
2 changes: 1 addition & 1 deletion crates/rbuilder/src/backtest/backtest_build_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ where
let cli = Cli::parse();

let config: ConfigType = load_config_toml_and_env(cli.config)?;
config.base_config().setup_tracing_subsriber()?;
config.base_config().setup_tracing_subscriber()?;

let block_data = read_block_data(
&config.base_config().backtest_fetch_output_file,
Expand Down
2 changes: 1 addition & 1 deletion crates/rbuilder/src/backtest/backtest_build_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ where
));
}
let config: ConfigType = load_config_toml_and_env(cli.config.clone())?;
config.base_config().setup_tracing_subsriber()?;
config.base_config().setup_tracing_subscriber()?;

let builders_names = config.base_config().backtest_builders.clone();

Expand Down
2 changes: 1 addition & 1 deletion crates/rbuilder/src/backtest/redistribute/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ where
let cli = Cli::parse();

let config: ConfigType = load_config_toml_and_env(cli.config)?;
config.base_config().setup_tracing_subsriber()?;
config.base_config().setup_tracing_subscriber()?;

let mut historical_data_storage =
HistoricalDataStorage::new_from_path(&config.base_config().backtest_fetch_output_file)
Expand Down
2 changes: 1 addition & 1 deletion crates/rbuilder/src/bin/backtest-fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async fn main() -> eyre::Result<()> {
let cli = Cli::parse();

let config: Config = load_config_toml_and_env(cli.config)?;
config.base_config().setup_tracing_subsriber()?;
config.base_config().setup_tracing_subscriber()?;

match cli.command {
Commands::Fetch(cli) => {
Expand Down
2 changes: 1 addition & 1 deletion crates/rbuilder/src/bin/debug-bench-machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async fn main() -> eyre::Result<()> {
let cli = Cli::parse();

let config: Config = load_config_toml_and_env(cli.config)?;
config.base_config().setup_tracing_subsriber()?;
config.base_config().setup_tracing_subscriber()?;

let rpc = http_provider(cli.rpc_url.parse()?);

Expand Down
2 changes: 1 addition & 1 deletion crates/rbuilder/src/live_builder/base_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ pub fn load_config_toml_and_env<T: serde::de::DeserializeOwned>(
}

impl BaseConfig {
pub fn setup_tracing_subsriber(&self) -> eyre::Result<()> {
pub fn setup_tracing_subscriber(&self) -> eyre::Result<()> {
let log_level = self.log_level.value()?;
let config = LoggerConfig {
env_filter: log_level,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::{sync::Arc, time::Duration};
use std::time::Duration;

use alloy_primitives::{Address, BlockNumber, U256};
use reth::{
primitives::format_ether,
providers::{BlockNumReader, HeaderProvider, ProviderError, ProviderFactory},
providers::{HeaderProvider, ProviderError},
};
use reth_db::DatabaseEnv;
use reth_provider::StateProviderFactory;
use time::{error, OffsetDateTime};
use tracing::{error, info};

Expand All @@ -20,8 +20,8 @@ use super::interfaces::LandedBlockInfo;
/// 1 - Create one and you'll get also the latest history of balance changes.
/// 2 - After each new landed block is detected (or whenever you want) call update_to_block to get info up to that block.
#[derive(Debug)]
pub struct WalletBalanceWatcher {
provider_factory: ProviderFactory<Arc<DatabaseEnv>>,
pub struct WalletBalanceWatcher<P> {
provider: P,
builder_addr: Address,
/// Last block analyzed. balance is updated up to block_number (included).
block_number: BlockNumber,
Expand Down Expand Up @@ -58,15 +58,18 @@ impl BlockInfo {
}
}

impl WalletBalanceWatcher {
impl<P> WalletBalanceWatcher<P>
where
P: StateProviderFactory + HeaderProvider,
{
/// Creates a WalletBalanceWatcher pre-analyzing a window of init_window_size size.
pub fn new(
provider_factory: ProviderFactory<Arc<DatabaseEnv>>,
provider: P,
builder_addr: Address,
init_window_size: Duration,
) -> Result<(Self, Vec<LandedBlockInfo>), WalletError> {
Self {
provider_factory,
provider,
builder_addr,
block_number: 0,
balance: U256::ZERO,
Expand All @@ -81,7 +84,7 @@ impl WalletBalanceWatcher {
init_window_size: Duration,
) -> Result<(Self, Vec<LandedBlockInfo>), WalletError> {
let analysis_window_limit = OffsetDateTime::now_utc() - init_window_size;
self.block_number = self.provider_factory.last_block_number()?;
self.block_number = self.provider.last_block_number()?;
let mut block_number = self.block_number;
let last_block_info = self
.get_block_info(block_number)
Expand Down Expand Up @@ -111,12 +114,12 @@ impl WalletBalanceWatcher {
/// returns the wallet balance for the block and the block's timestamp
fn get_block_info(&mut self, block: BlockNumber) -> Result<BlockInfo, WalletError> {
let builder_balance = self
.provider_factory
.provider
.history_by_block_number(block)?
.account_balance(self.builder_addr)?
.unwrap_or_default();
let header = self
.provider_factory
.provider
.header_by_number(block)?
.ok_or(WalletError::HeaderNotFound(block))?;
Ok(BlockInfo {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::sync::Arc;

use crate::{
building::builders::{UnfinishedBlockBuildingSink, UnfinishedBlockBuildingSinkFactory},
live_builder::payload_events::MevBoostSlotData,
};
use alloy_primitives::U256;
use reth_provider::{HeaderProvider, StateProviderFactory};
use std::fmt::Debug;
use std::sync::Arc;
use tracing::error;

use super::{
Expand All @@ -23,25 +24,38 @@ use super::{
/// SlotBidder bids using a SequentialSealerBidMaker (created per block).
/// SequentialSealerBidMaker sends the bids to a BlockBuildingSink (created per block).
/// SlotBidder is subscribed to the BidValueSource.
#[derive(Debug)]
pub struct BlockSealingBidderFactory {
pub struct BlockSealingBidderFactory<P> {
/// Factory for the SlotBidder for blocks.
bidding_service: Box<dyn BiddingService>,
/// Factory for the final destination for blocks.
block_sink_factory: Box<dyn BuilderSinkFactory>,
/// SlotBidder are subscribed to the proper block in the bid_value_source.
competition_bid_value_source: Arc<dyn BidValueSource + Send + Sync>,
wallet_balance_watcher: WalletBalanceWatcher,
wallet_balance_watcher: WalletBalanceWatcher<P>,
/// See [ParallelSealerBidMaker]
max_concurrent_seals: usize,
}

impl BlockSealingBidderFactory {
impl<P> Debug for BlockSealingBidderFactory<P> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BlockSealingBidderFactory")
.field("bidding_service", &"Box<dyn BiddingService>")
.field("block_sink_factory", &"Box<dyn BuilderSinkFactory>")
.field(
"competition_bid_value_source",
&self.competition_bid_value_source,
)
.field("max_concurrent_seals", &self.max_concurrent_seals)
.finish()
}
}

impl<P> BlockSealingBidderFactory<P> {
pub fn new(
bidding_service: Box<dyn BiddingService>,
block_sink_factory: Box<dyn BuilderSinkFactory>,
competition_bid_value_source: Arc<dyn BidValueSource + Send + Sync>,
wallet_balance_watcher: WalletBalanceWatcher,
wallet_balance_watcher: WalletBalanceWatcher<P>,
max_concurrent_seals: usize,
) -> Self {
Self {
Expand All @@ -66,7 +80,10 @@ impl BidValueObs for SlotBidderToBidValueObs {
}
}

impl UnfinishedBlockBuildingSinkFactory for BlockSealingBidderFactory {
impl<P> UnfinishedBlockBuildingSinkFactory for BlockSealingBidderFactory<P>
where
P: StateProviderFactory + HeaderProvider,
{
fn create_sink(
&mut self,
slot_data: MevBoostSlotData,
Expand Down
30 changes: 14 additions & 16 deletions crates/rbuilder/src/live_builder/cli.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::{path::PathBuf, sync::Arc};
use std::path::PathBuf;

use clap::Parser;
use reth_db::{Database, DatabaseEnv};
use reth_db::Database;
use reth_payload_builder::database::CachedReads;
use reth_provider::{DatabaseProviderFactory, StateProviderFactory};
use reth_provider::{DatabaseProviderFactory, HeaderProvider, StateProviderFactory};
use serde::de::DeserializeOwned;
use std::fmt::Debug;
use tokio::signal::ctrl_c;
Expand All @@ -15,7 +15,7 @@ use crate::{
base_config::load_config_toml_and_env, payload_events::MevBoostSlotDataGenerator,
},
telemetry,
utils::{build_info::Version, ProviderFactoryReopener},
utils::build_info::Version,
};

use super::{base_config::BaseConfig, LiveBuilder};
Expand Down Expand Up @@ -45,18 +45,15 @@ pub trait LiveBuilderConfig: Debug + DeserializeOwned + Sync {
/// Create a concrete builder
///
/// Desugared from async to future to keep clippy happy
fn new_builder(
fn new_builder<P, DB>(
&self,
provider: P,
cancellation_token: CancellationToken,
) -> impl std::future::Future<
Output = eyre::Result<
LiveBuilder<
ProviderFactoryReopener<Arc<DatabaseEnv>>,
Arc<DatabaseEnv>,
MevBoostSlotDataGenerator,
>,
>,
> + Send;
) -> impl std::future::Future<Output = eyre::Result<LiveBuilder<P, DB, MevBoostSlotDataGenerator>>>
+ Send
where
DB: Database + Clone + 'static,
P: DatabaseProviderFactory<DB> + StateProviderFactory + HeaderProvider + Clone + 'static;

/// Patch until we have a unified way of backtesting using the exact algorithms we use on the LiveBuilder.
/// building_algorithm_name will come from the specific configuration.
Expand Down Expand Up @@ -91,7 +88,7 @@ where
};

let config: ConfigType = load_config_toml_and_env(cli.config)?;
config.base_config().setup_tracing_subsriber()?;
config.base_config().setup_tracing_subscriber()?;

let cancel = CancellationToken::new();

Expand All @@ -106,7 +103,8 @@ where
config.base_config().log_enable_dynamic,
)
.await?;
let builder = config.new_builder(cancel.clone()).await?;
let provider = config.base_config().create_provider_factory()?;
let builder = config.new_builder(provider, cancel.clone()).await?;

let ctrlc = tokio::spawn(async move {
ctrl_c().await.unwrap_or_default();
Expand Down
Loading

0 comments on commit 7ff6fe1

Please sign in to comment.