Skip to content

Commit

Permalink
Oracle: support scan ranges and request/consumer ingores
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiimk committed Sep 10, 2024
1 parent 69cddf5 commit 88bb9a0
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 8 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased
### Changed
- Oracle provider updated to use V2 `/query` REST API
- Oracle Provider: Updated to use V2 `/query` REST API
- Oracle Provider: Added ability to scan back only a certain interval of past blocks
- Oracle Provider: Added ability to ignore requests by ID and from certain consumers

## [0.35.2] - 2024-09-09
### Fixed
Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions src/app/oracle-provider/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ axum = { version = "0.6", default-features = false, features = [
"http1",
"tokio",
] }
chrono = { version = "0.4", default-features = false }
clap = { version = "4", default-features = false, features = [
"std",
"color",
Expand All @@ -49,6 +50,9 @@ clap = { version = "4", default-features = false, features = [
ciborium = { version = "0.2", default-features = false }
confique = { version = "0.2", default-features = false, features = ["yaml"] }
dill = { version = "0.9", default-features = false }
duration-string = { version = "0.4", default-features = false, features = [
"serde",
] }
hex = { version = "0.4" }
http = { version = "0.2", default-features = false }
hyper = { version = "0.14", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion src/app/oracle-provider/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ chain_id: 31337
rpc_url: "http://localhost:8545"
api_url: "https://api.demo.kamu.dev"
oracle_contract_address: "0x5FbDB2315678afecb367f032d93F642f64180aa3"
oracle_contract_first_block: 0
scan_from_block: 0
provider_address: "0x70997970C51812dc3A010C7d01b50e0d17dc79C8"
provider_private_key: "59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d"
transaction_confirmations: 1
Expand Down
26 changes: 23 additions & 3 deletions src/app/oracle-provider/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// by the Apache License, Version 2.0.

use alloy::primitives::Address;
use duration_string::DurationString;
use url::Url;

#[derive(confique::Config, Debug)]
Expand All @@ -31,15 +32,24 @@ pub struct Config {
/// Address of the oracle contract to read logs from
pub oracle_contract_address: Address,

#[config(default = 0)]
pub oracle_contract_first_block: u64,

/// Address of this provider's account to use when submitting transactions
pub provider_address: Address,

/// Private key of the provider to use when signing transactions.
pub provider_private_key: String,

/// Block number to start scanning from on startup (precedence:
/// scan_from_block, scan_last_blocks, scan_last_blocks_period)
pub scan_from_block: Option<u64>,

/// Number of last blocks to scan on startup (precedence: scan_from_block,
/// scan_last_blocks, scan_last_blocks_period)
pub scan_last_blocks: Option<u64>,

/// Time period in which blocks will be scanned on startup (precedence:
/// scan_from_block, scan_last_blocks, scan_last_blocks_period)
pub scan_last_blocks_period: Option<DurationString>,

/// Number of blocks to examine per one getLogs RPC request when catching up
#[config(default = 100_000)]
pub blocks_stride: u64,
Expand All @@ -61,4 +71,14 @@ pub struct Config {

/// API token to use for authentication with the server
pub api_access_token: Option<String>,

/// Request IDs that provider should skip over (use as a disaster recovery
/// mechanism only)
#[config(default = [])]
pub ignore_requests: Vec<u64>,

/// Consumer addresses to ignore requests from (use as a disaster recovery
/// mechanism only)
#[config(default = [])]
pub ignore_consumers: Vec<Address>,
}
106 changes: 104 additions & 2 deletions src/app/oracle-provider/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@
use std::sync::Arc;
use std::time::Duration;

use alloy::eips::BlockNumberOrTag;
use alloy::primitives::U256;
use alloy::providers::Provider;
use alloy::rpc::types::eth::{Filter, Log};
use alloy::sol_types::{SolEvent, SolEventInterface};
use alloy::transports::BoxTransport;
use chrono::{DateTime, Utc};
use internal_error::*;
use opendatafabric::{DatasetID, Multihash};

Expand Down Expand Up @@ -236,8 +238,98 @@ impl<P: Provider + Clone> OdfOracleProvider<P> {
Ok(balance)
}

/// Determines the block to start scanning from based on multiple config
/// options
pub async fn get_starting_block(&self) -> Result<u64, InternalError> {
if let Some(scan_from_block) = self.config.scan_from_block {
Ok(scan_from_block)
} else if let Some(scan_last_blocks) = self.config.scan_last_blocks {
let latest_block = self.rpc_client.get_block_number().await.int_err()?;
Ok(latest_block.saturating_sub(scan_last_blocks))
} else if let Some(scan_last_blocks_period) = self.config.scan_last_blocks_period.clone() {

Check failure on line 249 in src/app/oracle-provider/src/provider.rs

View workflow job for this annotation

GitHub Actions / Lint / Code

using `clone` on type `Option<DurationString>` which implements the `Copy` trait
let lookback: Duration = scan_last_blocks_period.into();
self.get_approx_block_number_by_time(Utc::now() - lookback)
.await
} else {
Err("Config does not specify the scanning interval".int_err())
}
}

pub async fn get_approx_block_number_by_time(
&self,
time: DateTime<Utc>,
) -> Result<u64, InternalError> {
let latest_block = self
.rpc_client
.get_block_by_number(BlockNumberOrTag::Latest, false)
.await
.int_err()?
.ok_or("Could not read latest block".int_err())?;

let latest_block_number = latest_block.header.number.unwrap();

// Jump back N blocks to calculate average hash rate
let jump_back = 1_000;
if latest_block_number < jump_back {
Err(format!(
"With {latest_block_number} blocks there is not enough history to estimate the \
hash rate"
)
.int_err())?;
}
let jump_block = self
.rpc_client
.get_block_by_number(
BlockNumberOrTag::Number(latest_block_number - jump_back),
false,
)
.await
.int_err()?
.ok_or("Could not read block".int_err())?;

let seconds_per_block = ((latest_block.header.timestamp - jump_block.header.timestamp)
as f64)
/ (jump_back as f64);

let target_timestamp = u64::try_from(time.timestamp()).unwrap();
if target_timestamp > latest_block.header.timestamp {
Err(format!(
"Target time is in the future: {target_timestamp} > {}",
latest_block.header.timestamp
)
.int_err())?;
}

let approx_block_number = latest_block_number
- (f64::floor(
((latest_block.header.timestamp - target_timestamp) as f64) / seconds_per_block,
) as u64);

let target_block = self
.rpc_client
.get_block_by_number(BlockNumberOrTag::Number(approx_block_number), false)
.await
.int_err()?
.ok_or("Could not read block".int_err())?;

tracing::info!(
target_timestamp,
latest_block_number,
latest_block_timestamp = latest_block.header.timestamp,
jump_block_number = latest_block_number - jump_back,
jump_block_timestamp = jump_block.header.timestamp,
seconds_per_block,
approx_block_number,
actual_timestamp = target_block.header.timestamp,
off_by_seconds = u64::abs_diff(target_block.header.timestamp, target_timestamp),
"Calculated approximate block number by time",
);

Ok(approx_block_number)
}

pub async fn run(self) -> Result<(), InternalError> {
let mut from_block = self.config.oracle_contract_first_block;
let mut from_block = self.get_starting_block().await?;
let mut idle_start = None;

// Pre-flight loop: Wait until we have basic pre-requisites to function
Expand Down Expand Up @@ -284,7 +376,11 @@ impl<P: Provider + Clone> OdfOracleProvider<P> {
from_block: Option<u64>,
to_block: Option<u64>,
) -> Result<(), InternalError> {
let from_block = from_block.unwrap_or(self.config.oracle_contract_first_block);
let from_block = if let Some(from_block) = from_block {
from_block
} else {
self.get_starting_block().await?
};

let to_block = if let Some(to_block) = to_block {
to_block
Expand Down Expand Up @@ -382,6 +478,12 @@ impl<P: Provider + Clone> OdfOracleProvider<P> {
tracing::trace!(?log, "Observed log");

match log_decoded.data {
IOdfProvider::IOdfProviderEvents::SendRequest(event)
if self.config.ignore_requests.contains(&event.requestId)
|| self.config.ignore_consumers.contains(&event.consumerAddr) =>
{
tracing::debug!(request_id = ?event.requestId, "Ignoring request as per configuration");
}
IOdfProvider::IOdfProviderEvents::SendRequest(event) => {
tracing::debug!(request_id = ?event.requestId, "Adding pending request");
pending_requests.insert(
Expand Down
6 changes: 5 additions & 1 deletion src/app/oracle-provider/tests/tests/test_e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ async fn test_oracle_e2e() {
rpc_url: url::Url::parse(&anvil.endpoint()).unwrap(),
chain_id: anvil.chain_id(),
oracle_contract_address,
oracle_contract_first_block: 0,
scan_from_block: Some(0),
scan_last_blocks: None,
scan_last_blocks_period: None,
provider_address,
provider_private_key,
blocks_stride: 100_000,
Expand All @@ -114,6 +116,8 @@ async fn test_oracle_e2e() {
transaction_timeout_s: 5,
api_url: url::Url::parse("http://dontcare.com").unwrap(),
api_access_token: None,
ignore_requests: Vec::new(),
ignore_consumers: Vec::new(),
};

// Authorize provider and generate a request
Expand Down

0 comments on commit 88bb9a0

Please sign in to comment.