Skip to content

Commit

Permalink
init argus
Browse files Browse the repository at this point in the history
  • Loading branch information
cctdaniel committed Nov 15, 2024
1 parent 6702d2f commit 5def4b4
Show file tree
Hide file tree
Showing 12 changed files with 923 additions and 0 deletions.
8 changes: 8 additions & 0 deletions apps/argus/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[package]
name = "argus"
version = "0.1.0"
edition = "2021"

[dependencies]
alloy = { version = "0.3", features = ["full", "node-bindings"] }
tokio = { version = "1.28", features = ["full"] }
19 changes: 19 additions & 0 deletions apps/argus/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Argus

Argus is a webservice that serves price updates according to the Pulse protocol.
The service also operates a keeper task that performs callback transactions for user requests.

A single instance of this service can simultaneously serve price updates for several different blockchains.
Each blockchain is configured in `config.yaml`.

## How It Works

1. Continuously polls the Pulse contract's storage to discover new price update requests
2. Fetches required price data from Pyth Network
3. Batches multiple requests when possible for gas efficiency
4. Executes callbacks with appropriate gas limits specified in the original requests
5. Monitors transaction success and handles retries when necessary

## Architecture

The service is built on Rust for performance and reliability, sharing architectural patterns with Fortuna (the Entropy protocol's keeper service). However, unlike Fortuna which relies on event subscriptions, Argus uses direct storage polling for more reliable request discovery.
18 changes: 18 additions & 0 deletions apps/argus/config.sample.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
chains:
ethereum:
geth_rpc_addr: "https://eth-mainnet.g.alchemy.com/v2/YOUR-API-KEY"
contract_addr: "0x1234..."
poll_interval: 5
min_batch_size: 1
max_batch_size: 10
batch_timeout: 30
min_keeper_balance: 1000000000000000000 # 1 ETH
gas_limit: 500000

provider:
uri: "http://localhost:8080"
address: "0x5678..."
private_key: "0xabcd..." # Provider private key

keeper:
private_key: "0xdef0..." # Keeper private key
1 change: 1 addition & 0 deletions apps/argus/rust-toolchain
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
nightly-2023-07-23
60 changes: 60 additions & 0 deletions apps/argus/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use {
alloy::{
primitives::Address,
providers::{Provider, ProviderBuilder},
signers::Signer,
},
anyhow::Result,
serde::Deserialize,
std::{fs, time::Duration},
};

#[derive(Debug, Deserialize)]
pub struct Config {
pub chains: HashMap<String, ChainConfig>,
pub provider: ProviderConfig,
pub keeper: KeeperConfig,
}

#[derive(Debug, Deserialize)]
pub struct ChainConfig {
pub geth_rpc_addr: String,
pub contract_addr: Address,
pub poll_interval: u64, // in seconds
pub min_batch_size: usize,
pub max_batch_size: usize,
pub batch_timeout: u64, // in seconds
pub min_keeper_balance: u64,
pub gas_limit: u64,
}

#[derive(Debug, Deserialize)]
pub struct ProviderConfig {
pub uri: String,
pub address: Address,
pub private_key: SecretString,
}

#[derive(Debug, Deserialize)]
pub struct KeeperConfig {
pub private_key: SecretString,
}

#[derive(Debug, Deserialize)]
pub struct SecretString(String);

impl Config {
pub fn load(path: &str) -> Result<Self> {
let contents = fs::read_to_string(path)?;
Ok(serde_yaml::from_str(&contents)?)
}

pub fn create_provider(&self, chain_id: &str) -> Result<Provider> {
let chain = self.chains.get(chain_id).ok_or_else(|| anyhow!("Chain not found"))?;
Ok(Provider::builder().rpc_url(&chain.geth_rpc_addr).build()?)
}

pub fn create_signer(&self, secret: &SecretString) -> Result<Signer> {
Ok(Signer::from_private_key(secret.0.parse()?)?)
}
}
41 changes: 41 additions & 0 deletions apps/argus/src/contract.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use {
alloy::{
contract::{Contract, ContractInstance},
primitives::{Address, Bytes, U256},
providers::Provider,
signers::Signer,
},
anyhow::Result,
std::sync::Arc,
};

// Contract ABI definition
abigen!(Pulse, "target_chains/ethereum/contracts/contracts/pulse/IPulse.sol");

pub struct PulseContract<P: Provider> {
instance: ContractInstance<Arc<P>, Pulse>,
}

impl<P: Provider> PulseContract<P> {
pub fn new(address: Address, provider: Arc<P>) -> Self {
Self {
instance: ContractInstance::new(address, Arc::new(Pulse::new()), provider),
}
}

pub async fn execute_callback(
&self,
provider: Address,
sequence_number: U64,
price_ids: Vec<[u8; 32]>,
update_data: Vec<Bytes>,
callback_gas_limit: U256,
) -> Result<TxHash> {
let tx = self.instance
.execute_callback(provider, sequence_number, price_ids, update_data, callback_gas_limit)
.send()
.await?;

Ok(tx.tx_hash())
}
}
64 changes: 64 additions & 0 deletions apps/argus/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use thiserror::Error;

#[derive(Debug, Error)]
pub enum ArgusError {
#[error("Failed to fetch price updates from Hermes: {0}")]
HermesError(#[from] HermesError),

#[error("Contract error: {0}")]
ContractError(#[from] ContractError),

#[error("Storage error: {0}")]
StorageError(#[from] StorageError),

#[error("Configuration error: {0}")]
ConfigError(String),

#[error(transparent)]
Other(#[from] anyhow::Error),
}

#[derive(Debug, Error)]
pub enum HermesError {
#[error("HTTP request failed: {0}")]
RequestFailed(#[from] reqwest::Error),

#[error("Invalid response encoding: {0}")]
InvalidEncoding(String),

#[error("No price updates found")]
NoPriceUpdates,

#[error("Failed to parse price data: {0}")]
ParseError(String),

#[error("Failed to decode hex data: {0}")]
HexDecodeError(#[from] hex::FromHexError),
}

#[derive(Debug, Error)]
pub enum ContractError {
#[error("Transaction failed: {0}")]
TransactionFailed(String),

#[error("Gas estimation failed: {0}")]
GasEstimationFailed(String),

#[error("Invalid contract address: {0}")]
InvalidAddress(String),

#[error("Contract call failed: {0}")]
CallFailed(String),
}

#[derive(Debug, Error)]
pub enum StorageError {
#[error("Failed to read storage slot: {0}")]
ReadError(String),

#[error("Failed to parse storage data: {0}")]
ParseError(String),

#[error("Invalid storage layout: {0}")]
InvalidLayout(String),
}
129 changes: 129 additions & 0 deletions apps/argus/src/hermes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
use {
crate::{
error::{
ArgusError,
HermesError,
},
types::PriceData,
},
reqwest::Client,
serde::{
Deserialize,
Serialize,
},
std::time::{
SystemTime,
UNIX_EPOCH,
},
};

const HERMES_API_URL: &str = "https://hermes.pyth.network";

#[derive(Debug, Serialize, Deserialize)]
struct HermesResponse {
binary: BinaryUpdate,
parsed: Option<Vec<ParsedPriceUpdate>>,
}

#[derive(Debug, Serialize, Deserialize)]
struct BinaryUpdate {
data: Vec<String>,
encoding: String,
}

#[derive(Debug, Serialize, Deserialize)]
struct ParsedPriceUpdate {
id: String,
price: RpcPrice,
ema_price: RpcPrice,
}

#[derive(Debug, Serialize, Deserialize)]
struct RpcPrice {
price: String,
conf: String,
expo: i32,
publish_time: u64,
}

pub struct HermesClient {
client: Client,
}

impl HermesClient {
pub fn new() -> Self {
Self {
client: Client::new(),
}
}

pub async fn get_price_updates(
&self,
price_ids: &[[u8; 32]],
) -> Result<Vec<(PriceData, Vec<u8>)>, HermesError> {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(|e| HermesError::ParseError(format!("Failed to get timestamp: {}", e)))?
.as_secs();

let mut url = format!(
"{}/v2/updates/price/{}?parsed=true&encoding=hex",
HERMES_API_URL, now
);

for price_id in price_ids {
url.push_str(&format!("&ids[]={}", hex::encode(price_id)));
}

let response = self
.client
.get(&url)
.send()
.await
.map_err(|e| HermesError::RequestFailed(e))?
.error_for_status()
.map_err(|e| HermesError::RequestFailed(e))?
.json::<HermesResponse>()
.await
.map_err(|e| HermesError::RequestFailed(e))?;

let update_data = if response.binary.encoding == "hex" {
response
.binary
.data
.into_iter()
.map(|data| hex::decode(&data))
.collect::<Result<Vec<_>, _>>()
.map_err(HermesError::HexDecodeError)?
} else {
return Err(HermesError::InvalidEncoding(response.binary.encoding));
};

let price_updates = response.parsed.ok_or(HermesError::NoPriceUpdates)?;

if price_updates.is_empty() {
return Err(HermesError::NoPriceUpdates);
}

let mut results = Vec::with_capacity(price_updates.len());
for (update, data) in price_updates.into_iter().zip(update_data) {
let price_data = PriceData {
price: update
.price
.price
.parse()
.map_err(|e| HermesError::ParseError(format!("Invalid price: {}", e)))?,
conf: update
.price
.conf
.parse()
.map_err(|e| HermesError::ParseError(format!("Invalid conf: {}", e)))?,
expo: update.price.expo,
publish_time: update.price.publish_time,
};
results.push((price_data, data));
}

Ok(results)
}
}
Loading

0 comments on commit 5def4b4

Please sign in to comment.