Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(argus): add argus #2118

Draft
wants to merge 1 commit into
base: pulse
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions apps/argus/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[package]
name = "argus"
version = "0.1.0"
edition = "2021"

[dependencies]
alloy = { version = "0.3", features = ["full", "node-bindings"] }
tokio = { version = "1.28", features = ["full"] }
19 changes: 19 additions & 0 deletions apps/argus/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Argus

Argus is a webservice that serves price updates according to the Pulse protocol.
The service also operates a keeper task that performs callback transactions for user requests.

A single instance of this service can simultaneously serve price updates for several different blockchains.
Each blockchain is configured in `config.yaml`.

## How It Works

1. Continuously polls the Pulse contract's storage to discover new price update requests
2. Fetches required price data from Pyth Network
3. Batches multiple requests when possible for gas efficiency
4. Executes callbacks with appropriate gas limits specified in the original requests
5. Monitors transaction success and handles retries when necessary

## Architecture

The service is built on Rust for performance and reliability, sharing architectural patterns with Fortuna (the Entropy protocol's keeper service). However, unlike Fortuna which relies on event subscriptions, Argus uses direct storage polling for more reliable request discovery.
18 changes: 18 additions & 0 deletions apps/argus/config.sample.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
chains:
ethereum:
geth_rpc_addr: "https://eth-mainnet.g.alchemy.com/v2/YOUR-API-KEY"
contract_addr: "0x1234..."
poll_interval: 5
min_batch_size: 1
max_batch_size: 10
batch_timeout: 30
min_keeper_balance: 1000000000000000000 # 1 ETH
gas_limit: 500000

provider:
uri: "http://localhost:8080"
address: "0x5678..."
private_key: "0xabcd..." # Provider private key

keeper:
private_key: "0xdef0..." # Keeper private key
1 change: 1 addition & 0 deletions apps/argus/rust-toolchain
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
nightly-2023-07-23
60 changes: 60 additions & 0 deletions apps/argus/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use {
alloy::{
primitives::Address,
providers::{Provider, ProviderBuilder},
signers::Signer,
},
anyhow::Result,
serde::Deserialize,
std::{fs, time::Duration},
};

#[derive(Debug, Deserialize)]
pub struct Config {
pub chains: HashMap<String, ChainConfig>,
pub provider: ProviderConfig,
pub keeper: KeeperConfig,
}

#[derive(Debug, Deserialize)]
pub struct ChainConfig {
pub geth_rpc_addr: String,
pub contract_addr: Address,
pub poll_interval: u64, // in seconds
pub min_batch_size: usize,
pub max_batch_size: usize,
pub batch_timeout: u64, // in seconds
pub min_keeper_balance: u64,
pub gas_limit: u64,
}

#[derive(Debug, Deserialize)]
pub struct ProviderConfig {
pub uri: String,
pub address: Address,
pub private_key: SecretString,
}

#[derive(Debug, Deserialize)]
pub struct KeeperConfig {
pub private_key: SecretString,
}

#[derive(Debug, Deserialize)]
pub struct SecretString(String);

impl Config {
pub fn load(path: &str) -> Result<Self> {
let contents = fs::read_to_string(path)?;
Ok(serde_yaml::from_str(&contents)?)
}

pub fn create_provider(&self, chain_id: &str) -> Result<Provider> {
let chain = self.chains.get(chain_id).ok_or_else(|| anyhow!("Chain not found"))?;
Ok(Provider::builder().rpc_url(&chain.geth_rpc_addr).build()?)
}

pub fn create_signer(&self, secret: &SecretString) -> Result<Signer> {
Ok(Signer::from_private_key(secret.0.parse()?)?)
}
}
41 changes: 41 additions & 0 deletions apps/argus/src/contract.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use {
alloy::{
contract::{Contract, ContractInstance},
primitives::{Address, Bytes, U256},
providers::Provider,
signers::Signer,
},
anyhow::Result,
std::sync::Arc,
};

// Contract ABI definition
abigen!(Pulse, "target_chains/ethereum/contracts/contracts/pulse/IPulse.sol");

pub struct PulseContract<P: Provider> {
instance: ContractInstance<Arc<P>, Pulse>,
}

impl<P: Provider> PulseContract<P> {
pub fn new(address: Address, provider: Arc<P>) -> Self {
Self {
instance: ContractInstance::new(address, Arc::new(Pulse::new()), provider),
}
}

pub async fn execute_callback(
&self,
provider: Address,
sequence_number: U64,
price_ids: Vec<[u8; 32]>,
update_data: Vec<Bytes>,
callback_gas_limit: U256,
) -> Result<TxHash> {
let tx = self.instance
.execute_callback(provider, sequence_number, price_ids, update_data, callback_gas_limit)
.send()
.await?;

Ok(tx.tx_hash())
}
}
64 changes: 64 additions & 0 deletions apps/argus/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use thiserror::Error;

#[derive(Debug, Error)]
pub enum ArgusError {
#[error("Failed to fetch price updates from Hermes: {0}")]
HermesError(#[from] HermesError),

#[error("Contract error: {0}")]
ContractError(#[from] ContractError),

#[error("Storage error: {0}")]
StorageError(#[from] StorageError),

#[error("Configuration error: {0}")]
ConfigError(String),

#[error(transparent)]
Other(#[from] anyhow::Error),
}

#[derive(Debug, Error)]
pub enum HermesError {
#[error("HTTP request failed: {0}")]
RequestFailed(#[from] reqwest::Error),

#[error("Invalid response encoding: {0}")]
InvalidEncoding(String),

#[error("No price updates found")]
NoPriceUpdates,

#[error("Failed to parse price data: {0}")]
ParseError(String),

#[error("Failed to decode hex data: {0}")]
HexDecodeError(#[from] hex::FromHexError),
}

#[derive(Debug, Error)]
pub enum ContractError {
#[error("Transaction failed: {0}")]
TransactionFailed(String),

#[error("Gas estimation failed: {0}")]
GasEstimationFailed(String),

#[error("Invalid contract address: {0}")]
InvalidAddress(String),

#[error("Contract call failed: {0}")]
CallFailed(String),
}

#[derive(Debug, Error)]
pub enum StorageError {
#[error("Failed to read storage slot: {0}")]
ReadError(String),

#[error("Failed to parse storage data: {0}")]
ParseError(String),

#[error("Invalid storage layout: {0}")]
InvalidLayout(String),
}
129 changes: 129 additions & 0 deletions apps/argus/src/hermes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
use {
crate::{
error::{
ArgusError,
HermesError,
},
types::PriceData,
},
reqwest::Client,
serde::{
Deserialize,
Serialize,
},
std::time::{
SystemTime,
UNIX_EPOCH,
},
};

const HERMES_API_URL: &str = "https://hermes.pyth.network";

#[derive(Debug, Serialize, Deserialize)]
struct HermesResponse {
binary: BinaryUpdate,
parsed: Option<Vec<ParsedPriceUpdate>>,
}

#[derive(Debug, Serialize, Deserialize)]
struct BinaryUpdate {
data: Vec<String>,
encoding: String,
}

#[derive(Debug, Serialize, Deserialize)]
struct ParsedPriceUpdate {
id: String,
price: RpcPrice,
ema_price: RpcPrice,
}

#[derive(Debug, Serialize, Deserialize)]
struct RpcPrice {
price: String,
conf: String,
expo: i32,
publish_time: u64,
}

pub struct HermesClient {
client: Client,
}

impl HermesClient {
pub fn new() -> Self {
Self {
client: Client::new(),
}
}

pub async fn get_price_updates(
&self,
price_ids: &[[u8; 32]],
) -> Result<Vec<(PriceData, Vec<u8>)>, HermesError> {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(|e| HermesError::ParseError(format!("Failed to get timestamp: {}", e)))?
.as_secs();

let mut url = format!(
"{}/v2/updates/price/{}?parsed=true&encoding=hex",
HERMES_API_URL, now
);

for price_id in price_ids {
url.push_str(&format!("&ids[]={}", hex::encode(price_id)));
}

let response = self
.client
.get(&url)
.send()
.await
.map_err(|e| HermesError::RequestFailed(e))?
.error_for_status()
.map_err(|e| HermesError::RequestFailed(e))?
.json::<HermesResponse>()
.await
.map_err(|e| HermesError::RequestFailed(e))?;

let update_data = if response.binary.encoding == "hex" {
response
.binary
.data
.into_iter()
.map(|data| hex::decode(&data))
.collect::<Result<Vec<_>, _>>()
.map_err(HermesError::HexDecodeError)?
} else {
return Err(HermesError::InvalidEncoding(response.binary.encoding));
};

let price_updates = response.parsed.ok_or(HermesError::NoPriceUpdates)?;

if price_updates.is_empty() {
return Err(HermesError::NoPriceUpdates);
}

let mut results = Vec::with_capacity(price_updates.len());
for (update, data) in price_updates.into_iter().zip(update_data) {
let price_data = PriceData {
price: update
.price
.price
.parse()
.map_err(|e| HermesError::ParseError(format!("Invalid price: {}", e)))?,
conf: update
.price
.conf
.parse()
.map_err(|e| HermesError::ParseError(format!("Invalid conf: {}", e)))?,
expo: update.price.expo,
publish_time: update.price.publish_time,
};
results.push((price_data, data));
}

Ok(results)
}
}
Loading
Loading