Skip to content

Commit

Permalink
Merge branch 'eigen-client-memstore' into eigen-client-soft-conf
Browse files Browse the repository at this point in the history
  • Loading branch information
gianbelinche committed Nov 4, 2024
2 parents 70b10a9 + edafcff commit 645f15e
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 68 deletions.
124 changes: 77 additions & 47 deletions core/node/da_clients/src/eigen/client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{str::FromStr, sync::Arc};

use anyhow::anyhow;
use async_trait::async_trait;
use secp256k1::SecretKey;
use subxt_signer::ExposeSecret;
Expand All @@ -9,7 +10,7 @@ use zksync_da_client::{
DataAvailabilityClient,
};

use super::{memstore::MemStore, sdk::RawEigenClient, Disperser};
use super::{blob_info::BlobInfo, memstore::MemStore, sdk::RawEigenClient, Disperser};
use crate::utils::to_non_retriable_da_error;

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -40,20 +41,34 @@ impl DataAvailabilityClient for EigenClient {
_: u32, // batch number
data: Vec<u8>,
) -> Result<DispatchResponse, DAError> {
match &self.client {
Disperser::Remote(remote_disperser) => {
let blob_id = remote_disperser
.dispatch_blob(data)
.await
.map_err(to_non_retriable_da_error)?;
Ok(DispatchResponse::from(blob_id))
}
Disperser::Memory(memstore) => memstore.clone().store_blob(data).await,
}
let blob_id = match &self.client {
Disperser::Remote(remote_disperser) => remote_disperser
.dispatch_blob(data)
.await
.map_err(to_non_retriable_da_error)?,
Disperser::Memory(memstore) => memstore
.clone()
.put_blob(data)
.await
.map_err(to_non_retriable_da_error)?,
};

Ok(DispatchResponse::from(blob_id))
}

async fn get_inclusion_data(&self, _: &str) -> Result<Option<InclusionData>, DAError> {
Ok(Some(InclusionData { data: vec![] }))
async fn get_inclusion_data(&self, blob_id: &str) -> Result<Option<InclusionData>, DAError> {
let rlp_encoded_bytes = hex::decode(blob_id).map_err(|_| DAError {
error: anyhow!("Failed to decode blob_id"),
is_retriable: false,
})?;
let blob_info: BlobInfo = rlp::decode(&rlp_encoded_bytes).map_err(|_| DAError {
error: anyhow!("Failed to decode blob_info"),
is_retriable: false,
})?;
let inclusion_data = blob_info.blob_verification_proof.inclusion_proof;
Ok(Some(InclusionData {
data: inclusion_data,
}))
}

fn clone_boxed(&self) -> Box<dyn DataAvailabilityClient> {
Expand All @@ -74,13 +89,6 @@ impl EigenClient {
}
}
}

pub fn to_retriable_error(error: anyhow::Error) -> DAError {
DAError {
error,
is_retriable: true,
}
}
#[cfg(test)]
mod tests {
use zksync_config::configs::da_client::eigen::{DisperserConfig, MemStoreConfig};
Expand All @@ -90,12 +98,18 @@ mod tests {
use crate::eigen::blob_info::BlobInfo;

#[tokio::test]
async fn test_eigenda_memory_disperser() {
let config = EigenConfig::MemStore(MemStoreConfig {
max_blob_size_bytes: 2 * 1024 * 1024, // 2MB,
blob_expiration: 60 * 2,
get_latency: 0,
put_latency: 0,
async fn test_non_auth_dispersal() {
let config = EigenConfig::Disperser(DisperserConfig {
custom_quorum_numbers: None,
disperser_rpc: "https://disperser-holesky.eigenda.xyz:443".to_string(),
eth_confirmation_depth: -1,
eigenda_eth_rpc: String::default(),
eigenda_svc_manager_address: "0xD4A7E1Bd8015057293f0D0A557088c286942e84b".to_string(),
blob_size_limit: 2 * 1024 * 1024, // 2MB
status_query_timeout: 1800, // 30 minutes
status_query_interval: 5, // 5 ms
wait_for_finalization: false,
authenticaded: false,
});
let secrets = EigenSecrets {
private_key: PrivateKey::from_str(
Expand All @@ -104,19 +118,24 @@ mod tests {
.unwrap(),
};
let client = EigenClient::new(config, secrets).await.unwrap();
let data = vec![1u8; 100];
let data = vec![1; 20];
let result = client.dispatch_blob(0, data.clone()).await.unwrap();

let blob_info: BlobInfo =
rlp::decode(&hex::decode(result.blob_id.clone()).unwrap()).unwrap();
// TODO: once get inclusion data is added, check it

let expected_inclusion_data = blob_info.blob_verification_proof.inclusion_proof;
let actual_inclusion_data = client
.get_inclusion_data(&result.blob_id)
.await
.unwrap()
.unwrap()
.data;
assert_eq!(expected_inclusion_data, actual_inclusion_data);
let retrieved_data = client.get_blob_data(&result.blob_id).await.unwrap();
assert_eq!(retrieved_data.unwrap(), data);
}

#[tokio::test]
async fn test_non_auth_dispersal() {
async fn test_auth_dispersal() {
let config = EigenConfig::Disperser(DisperserConfig {
custom_quorum_numbers: None,
disperser_rpc: "https://disperser-holesky.eigenda.xyz:443".to_string(),
Expand All @@ -127,7 +146,7 @@ mod tests {
status_query_timeout: 1800, // 30 minutes
status_query_interval: 5, // 5 ms
wait_for_finalization: false,
authenticaded: false,
authenticaded: true,
});
let secrets = EigenSecrets {
private_key: PrivateKey::from_str(
Expand All @@ -140,23 +159,25 @@ mod tests {
let result = client.dispatch_blob(0, data.clone()).await.unwrap();
let blob_info: BlobInfo =
rlp::decode(&hex::decode(result.blob_id.clone()).unwrap()).unwrap();
// TODO: once get inclusion data is added, check it
let expected_inclusion_data = blob_info.blob_verification_proof.inclusion_proof;
let actual_inclusion_data = client
.get_inclusion_data(&result.blob_id)
.await
.unwrap()
.unwrap()
.data;
assert_eq!(expected_inclusion_data, actual_inclusion_data);
let retrieved_data = client.get_blob_data(&result.blob_id).await.unwrap();
assert_eq!(retrieved_data.unwrap(), data);
}

#[tokio::test]
async fn test_auth_dispersal() {
let config = EigenConfig::Disperser(DisperserConfig {
custom_quorum_numbers: None,
disperser_rpc: "https://disperser-holesky.eigenda.xyz:443".to_string(),
eth_confirmation_depth: -1,
eigenda_eth_rpc: String::default(),
eigenda_svc_manager_address: "0xD4A7E1Bd8015057293f0D0A557088c286942e84b".to_string(),
blob_size_limit: 2 * 1024 * 1024, // 2MB
status_query_timeout: 1800, // 30 minutes
status_query_interval: 5, // 5 ms
wait_for_finalization: false,
authenticaded: true,
async fn test_eigenda_memory_disperser() {
let config = EigenConfig::MemStore(MemStoreConfig {
max_blob_size_bytes: 2 * 1024 * 1024, // 2MB,
blob_expiration: 60 * 2,
get_latency: 0,
put_latency: 0,
});
let secrets = EigenSecrets {
private_key: PrivateKey::from_str(
Expand All @@ -165,11 +186,20 @@ mod tests {
.unwrap(),
};
let client = EigenClient::new(config, secrets).await.unwrap();
let data = vec![1; 20];
let data = vec![1u8; 100];
let result = client.dispatch_blob(0, data.clone()).await.unwrap();

let blob_info: BlobInfo =
rlp::decode(&hex::decode(result.blob_id.clone()).unwrap()).unwrap();
// TODO: once get inclusion data is added, check it
let expected_inclusion_data = blob_info.blob_verification_proof.inclusion_proof;
let actual_inclusion_data = client
.get_inclusion_data(&result.blob_id)
.await
.unwrap()
.unwrap()
.data;
assert_eq!(expected_inclusion_data, actual_inclusion_data);

let retrieved_data = client.get_blob_data(&result.blob_id).await.unwrap();
assert_eq!(retrieved_data.unwrap(), data);
}
Expand Down
24 changes: 3 additions & 21 deletions core/node/da_clients/src/eigen/memstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@ use tokio::time::interval;
use zksync_config::configs::da_client::eigen::MemStoreConfig;
use zksync_da_client::types::{DAError, DispatchResponse, InclusionData};

use super::{
blob_info::{self, BlobInfo},
client::to_retriable_error,
};
use super::blob_info::{self, BlobInfo};

#[derive(Debug, PartialEq)]
pub enum MemStoreError {
Expand Down Expand Up @@ -65,14 +62,12 @@ impl MemStore {
memstore
}

async fn put_blob(self: Arc<Self>, value: Vec<u8>) -> Result<Vec<u8>, MemStoreError> {
pub async fn put_blob(self: Arc<Self>, value: Vec<u8>) -> Result<String, MemStoreError> {
tokio::time::sleep(Duration::from_millis(self.config.put_latency)).await;
if value.len() as u64 > self.config.max_blob_size_bytes {
return Err(MemStoreError::BlobToLarge.into());
}

// todo: Encode blob?

let mut entropy = [0u8; 10];
OsRng.fill_bytes(&mut entropy);

Expand Down Expand Up @@ -136,20 +131,7 @@ impl MemStore {

data.key_starts.insert(key.clone(), Instant::now());
data.store.insert(key, value);
Ok(cert_bytes)
}

pub async fn store_blob(
self: Arc<Self>,
blob_data: Vec<u8>,
) -> Result<DispatchResponse, DAError> {
let request_id = self
.put_blob(blob_data)
.await
.map_err(|err| to_retriable_error(err.into()))?;
Ok(DispatchResponse {
blob_id: hex::encode(request_id),
})
Ok(hex::encode(cert_bytes))
}

pub async fn get_inclusion_data(
Expand Down

0 comments on commit 645f15e

Please sign in to comment.