diff --git a/CHANGELOG.md b/CHANGELOG.md index 42cf7e06..21073cab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased ### Changed -- Oracle provider updated to use V2 `/query` REST API +- Oracle Provider: Updated to use V2 `/query` REST API +- Oracle Provider: Added ability to scan back only a certain interval of past blocks +- Oracle Provider: Added ability to ignore requests by ID and from certain consumers ## [0.35.2] - 2024-09-09 ### Fixed diff --git a/Cargo.lock b/Cargo.lock index 4b141b16..1a827a7c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5244,10 +5244,12 @@ dependencies = [ "alloy", "async-trait", "axum", + "chrono", "ciborium", "clap", "confique", "dill", + "duration-string", "graceful-shutdown", "hex", "http 0.2.12", diff --git a/src/app/oracle-provider/Cargo.toml b/src/app/oracle-provider/Cargo.toml index 3e0a34ce..c5d4ecce 100644 --- a/src/app/oracle-provider/Cargo.toml +++ b/src/app/oracle-provider/Cargo.toml @@ -34,6 +34,7 @@ axum = { version = "0.6", default-features = false, features = [ "http1", "tokio", ] } +chrono = { version = "0.4", default-features = false } clap = { version = "4", default-features = false, features = [ "std", "color", @@ -49,6 +50,9 @@ clap = { version = "4", default-features = false, features = [ ciborium = { version = "0.2", default-features = false } confique = { version = "0.2", default-features = false, features = ["yaml"] } dill = { version = "0.9", default-features = false } +duration-string = { version = "0.4", default-features = false, features = [ + "serde", +] } hex = { version = "0.4" } http = { version = "0.2", default-features = false } hyper = { version = "0.14", default-features = false } diff --git a/src/app/oracle-provider/config.yaml b/src/app/oracle-provider/config.yaml index 30449c4b..d2f659b3 100644 --- a/src/app/oracle-provider/config.yaml +++ b/src/app/oracle-provider/config.yaml @@ -2,7 +2,7 @@ chain_id: 31337 rpc_url: "http://localhost:8545" api_url: "https://api.demo.kamu.dev" oracle_contract_address: "0x5FbDB2315678afecb367f032d93F642f64180aa3" -oracle_contract_first_block: 0 +scan_from_block: 0 provider_address: "0x70997970C51812dc3A010C7d01b50e0d17dc79C8" provider_private_key: "59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d" transaction_confirmations: 1 diff --git a/src/app/oracle-provider/src/config.rs b/src/app/oracle-provider/src/config.rs index 3cef9395..7da99258 100644 --- a/src/app/oracle-provider/src/config.rs +++ b/src/app/oracle-provider/src/config.rs @@ -8,6 +8,7 @@ // by the Apache License, Version 2.0. use alloy::primitives::Address; +use duration_string::DurationString; use url::Url; #[derive(confique::Config, Debug)] @@ -31,15 +32,24 @@ pub struct Config { /// Address of the oracle contract to read logs from pub oracle_contract_address: Address, - #[config(default = 0)] - pub oracle_contract_first_block: u64, - /// Address of this provider's account to use when submitting transactions pub provider_address: Address, /// Private key of the provider to use when signing transactions. pub provider_private_key: String, + /// Block number to start scanning from on startup (precedence: + /// scan_from_block, scan_last_blocks, scan_last_blocks_period) + pub scan_from_block: Option, + + /// Number of last blocks to scan on startup (precedence: scan_from_block, + /// scan_last_blocks, scan_last_blocks_period) + pub scan_last_blocks: Option, + + /// Time period in which blocks will be scanned on startup (precedence: + /// scan_from_block, scan_last_blocks, scan_last_blocks_period) + pub scan_last_blocks_period: Option, + /// Number of blocks to examine per one getLogs RPC request when catching up #[config(default = 100_000)] pub blocks_stride: u64, @@ -61,4 +71,14 @@ pub struct Config { /// API token to use for authentication with the server pub api_access_token: Option, + + /// Request IDs that provider should skip over (use as a disaster recovery + /// mechanism only) + #[config(default = [])] + pub ignore_requests: Vec, + + /// Consumer addresses to ignore requests from (use as a disaster recovery + /// mechanism only) + #[config(default = [])] + pub ignore_consumers: Vec
, } diff --git a/src/app/oracle-provider/src/provider.rs b/src/app/oracle-provider/src/provider.rs index 4bc5a81e..a42b77e5 100644 --- a/src/app/oracle-provider/src/provider.rs +++ b/src/app/oracle-provider/src/provider.rs @@ -10,11 +10,13 @@ use std::sync::Arc; use std::time::Duration; +use alloy::eips::BlockNumberOrTag; use alloy::primitives::U256; use alloy::providers::Provider; use alloy::rpc::types::eth::{Filter, Log}; use alloy::sol_types::{SolEvent, SolEventInterface}; use alloy::transports::BoxTransport; +use chrono::{DateTime, Utc}; use internal_error::*; use opendatafabric::{DatasetID, Multihash}; @@ -236,8 +238,98 @@ impl OdfOracleProvider

{ Ok(balance) } + /// Determines the block to start scanning from based on multiple config + /// options + pub async fn get_starting_block(&self) -> Result { + if let Some(scan_from_block) = self.config.scan_from_block { + Ok(scan_from_block) + } else if let Some(scan_last_blocks) = self.config.scan_last_blocks { + let latest_block = self.rpc_client.get_block_number().await.int_err()?; + Ok(latest_block.saturating_sub(scan_last_blocks)) + } else if let Some(scan_last_blocks_period) = self.config.scan_last_blocks_period.clone() { + let lookback: Duration = scan_last_blocks_period.into(); + self.get_approx_block_number_by_time(Utc::now() - lookback) + .await + } else { + Err("Config does not specify the scanning interval".int_err()) + } + } + + pub async fn get_approx_block_number_by_time( + &self, + time: DateTime, + ) -> Result { + let latest_block = self + .rpc_client + .get_block_by_number(BlockNumberOrTag::Latest, false) + .await + .int_err()? + .ok_or("Could not read latest block".int_err())?; + + let latest_block_number = latest_block.header.number.unwrap(); + + // Jump back N blocks to calculate average hash rate + let jump_back = 1_000; + if latest_block_number < jump_back { + Err(format!( + "With {latest_block_number} blocks there is not enough history to estimate the \ + hash rate" + ) + .int_err())?; + } + let jump_block = self + .rpc_client + .get_block_by_number( + BlockNumberOrTag::Number(latest_block_number - jump_back), + false, + ) + .await + .int_err()? + .ok_or("Could not read block".int_err())?; + + let seconds_per_block = ((latest_block.header.timestamp - jump_block.header.timestamp) + as f64) + / (jump_back as f64); + + let target_timestamp = u64::try_from(time.timestamp()).unwrap(); + if target_timestamp > latest_block.header.timestamp { + Err(format!( + "Target time is in the future: {target_timestamp} > {}", + latest_block.header.timestamp + ) + .int_err())?; + } + + let approx_block_number = latest_block_number + - (f64::floor( + ((latest_block.header.timestamp - target_timestamp) as f64) / seconds_per_block, + ) as u64); + + let target_block = self + .rpc_client + .get_block_by_number(BlockNumberOrTag::Number(approx_block_number), false) + .await + .int_err()? + .ok_or("Could not read block".int_err())?; + + tracing::info!( + target_timestamp, + latest_block_number, + latest_block_timestamp = latest_block.header.timestamp, + jump_block_number = latest_block_number - jump_back, + jump_block_timestamp = jump_block.header.timestamp, + seconds_per_block, + approx_block_number, + actual_timestamp = target_block.header.timestamp, + off_by_seconds = u64::abs_diff(target_block.header.timestamp, target_timestamp), + "Calculated approximate block number by time", + ); + + Ok(approx_block_number) + } + pub async fn run(self) -> Result<(), InternalError> { - let mut from_block = self.config.oracle_contract_first_block; + let mut from_block = self.get_starting_block().await?; let mut idle_start = None; // Pre-flight loop: Wait until we have basic pre-requisites to function @@ -284,7 +376,11 @@ impl OdfOracleProvider

{ from_block: Option, to_block: Option, ) -> Result<(), InternalError> { - let from_block = from_block.unwrap_or(self.config.oracle_contract_first_block); + let from_block = if let Some(from_block) = from_block { + from_block + } else { + self.get_starting_block().await? + }; let to_block = if let Some(to_block) = to_block { to_block @@ -382,6 +478,12 @@ impl OdfOracleProvider

{ tracing::trace!(?log, "Observed log"); match log_decoded.data { + IOdfProvider::IOdfProviderEvents::SendRequest(event) + if self.config.ignore_requests.contains(&event.requestId) + || self.config.ignore_consumers.contains(&event.consumerAddr) => + { + tracing::debug!(request_id = ?event.requestId, "Ignoring request as per configuration"); + } IOdfProvider::IOdfProviderEvents::SendRequest(event) => { tracing::debug!(request_id = ?event.requestId, "Adding pending request"); pending_requests.insert( diff --git a/src/app/oracle-provider/tests/tests/test_e2e.rs b/src/app/oracle-provider/tests/tests/test_e2e.rs index 2664974c..7e378a57 100644 --- a/src/app/oracle-provider/tests/tests/test_e2e.rs +++ b/src/app/oracle-provider/tests/tests/test_e2e.rs @@ -105,7 +105,9 @@ async fn test_oracle_e2e() { rpc_url: url::Url::parse(&anvil.endpoint()).unwrap(), chain_id: anvil.chain_id(), oracle_contract_address, - oracle_contract_first_block: 0, + scan_from_block: Some(0), + scan_last_blocks: None, + scan_last_blocks_period: None, provider_address, provider_private_key, blocks_stride: 100_000, @@ -114,6 +116,8 @@ async fn test_oracle_e2e() { transaction_timeout_s: 5, api_url: url::Url::parse("http://dontcare.com").unwrap(), api_access_token: None, + ignore_requests: Vec::new(), + ignore_consumers: Vec::new(), }; // Authorize provider and generate a request