diff --git a/core/node/da_clients/src/eigen/client.rs b/core/node/da_clients/src/eigen/client.rs index ef4baa1c2ad..5baee9475e9 100644 --- a/core/node/da_clients/src/eigen/client.rs +++ b/core/node/da_clients/src/eigen/client.rs @@ -13,21 +13,23 @@ use super::sdk::RawEigenClient; use crate::utils::to_retriable_da_error; #[async_trait] -pub trait GetBlobData: Clone + std::fmt::Debug + Send + Sync { +pub trait GetBlobData: std::fmt::Debug + Send + Sync { async fn get_blob_data(&self, input: &str) -> anyhow::Result>>; + + fn clone_boxed(&self) -> Box; } /// EigenClient is a client for the Eigen DA service. #[derive(Debug, Clone)] -pub struct EigenClient { - pub(crate) client: Arc>, +pub struct EigenClient { + pub(crate) client: Arc, } -impl EigenClient { +impl EigenClient { pub async fn new( config: EigenConfig, secrets: EigenSecrets, - get_blob_data: Box, + get_blob_data: Box, ) -> anyhow::Result { let private_key = SecretKey::from_str(secrets.private_key.0.expose_secret().as_str()) .map_err(|e| anyhow::anyhow!("Failed to parse private key: {}", e))?; @@ -40,7 +42,7 @@ impl EigenClient { } #[async_trait] -impl DataAvailabilityClient for EigenClient { +impl DataAvailabilityClient for EigenClient { async fn dispatch_blob( &self, _: u32, // batch number @@ -75,6 +77,6 @@ impl DataAvailabilityClient for EigenClient { } fn blob_size_limit(&self) -> Option { - Some(RawEigenClient::::blob_size_limit()) + Some(RawEigenClient::blob_size_limit()) } } diff --git a/core/node/da_clients/src/eigen/client_tests.rs b/core/node/da_clients/src/eigen/client_tests.rs index 99cd81cf279..4fbbd5c3676 100644 --- a/core/node/da_clients/src/eigen/client_tests.rs +++ b/core/node/da_clients/src/eigen/client_tests.rs @@ -17,7 +17,7 @@ mod tests { use crate::eigen::{blob_info::BlobInfo, EigenClient, GetBlobData}; - impl EigenClient { + impl EigenClient { pub async fn get_blob_data( &self, blob_id: BlobInfo, @@ -32,8 +32,8 @@ mod tests { const STATUS_QUERY_TIMEOUT: u64 = 1800000; // 30 minutes const STATUS_QUERY_INTERVAL: u64 = 5; // 5 ms - async fn get_blob_info( - client: &EigenClient, + async fn get_blob_info( + client: &EigenClient, result: &DispatchResponse, ) -> anyhow::Result { let blob_info = (|| async { @@ -62,6 +62,10 @@ mod tests { async fn get_blob_data(&self, _input: &'_ str) -> anyhow::Result>> { Ok(None) } + + fn clone_boxed(&self) -> Box { + Box::new(self.clone()) + } } #[ignore = "depends on external RPC"] diff --git a/core/node/da_clients/src/eigen/sdk.rs b/core/node/da_clients/src/eigen/sdk.rs index 0ce87a14210..3a3b1202690 100644 --- a/core/node/da_clients/src/eigen/sdk.rs +++ b/core/node/da_clients/src/eigen/sdk.rs @@ -31,24 +31,36 @@ use crate::eigen::{ verifier::VerificationError, }; -#[derive(Debug, Clone)] -pub(crate) struct RawEigenClient { +#[derive(Debug)] +pub(crate) struct RawEigenClient { client: Arc>>, private_key: SecretKey, pub config: EigenConfig, verifier: Verifier, - get_blob_data: Box, + get_blob_data: Box, +} + +impl Clone for RawEigenClient { + fn clone(&self) -> Self { + Self { + client: self.client.clone(), + private_key: self.private_key, + config: self.config.clone(), + verifier: self.verifier.clone(), + get_blob_data: self.get_blob_data.clone_boxed(), + } + } } pub(crate) const DATA_CHUNK_SIZE: usize = 32; -impl RawEigenClient { +impl RawEigenClient { const BLOB_SIZE_LIMIT: usize = 1024 * 1024 * 2; // 2 MB pub async fn new( private_key: SecretKey, config: EigenConfig, - get_blob_data: Box, + get_blob_data: Box, ) -> anyhow::Result { let endpoint = Endpoint::from_str(config.disperser_rpc.as_str())?.tls_config(ClientTlsConfig::new())?; diff --git a/core/node/da_clients/src/eigen/verifier.rs b/core/node/da_clients/src/eigen/verifier.rs index b8d16f8ad51..2aef1982d37 100644 --- a/core/node/da_clients/src/eigen/verifier.rs +++ b/core/node/da_clients/src/eigen/verifier.rs @@ -1,8 +1,9 @@ -use std::{collections::HashMap, fs::File, io::copy, path::Path}; +use std::{collections::HashMap, path::Path}; use ark_bn254::{Fq, G1Affine}; use ethabi::{encode, ParamType, Token}; use rust_kzg_bn254::{blob::Blob, kzg::Kzg, polynomial::PolynomialFormat}; +use tokio::{fs::File, io::AsyncWriteExt}; use url::Url; use zksync_basic_types::web3::CallRequest; use zksync_eth_client::{clients::PKSigningClient, EnrichedClientResult}; @@ -112,12 +113,14 @@ impl Verifier { } let path = format!("./{}", point); let path = Path::new(&path); - let mut file = File::create(path).map_err(|_| VerificationError::LinkError)?; + let mut file = File::create(path).await.map_err(|_| VerificationError::LinkError)?; let content = response .bytes() .await .map_err(|_| VerificationError::LinkError)?; - copy(&mut content.as_ref(), &mut file).map_err(|_| VerificationError::LinkError)?; + file.write_all(&content) + .await + .map_err(|_| VerificationError::LinkError)?; Ok(()) } async fn save_points(url_g1: Url, url_g2: Url) -> Result { @@ -132,15 +135,20 @@ impl Verifier { ) -> Result { let srs_points_to_load = cfg.max_blob_size / Self::POINT_SIZE; let path = Self::save_points(cfg.clone().g1_url, cfg.clone().g2_url).await?; - let kzg = Kzg::setup( - &format!("{}/{}", path, Self::G1POINT), - "", - &format!("{}/{}", path, Self::G2POINT), - Self::SRSORDER, - srs_points_to_load, - "".to_string(), - ); - let kzg = kzg.map_err(|e| { + let kzg_handle = tokio::task::spawn_blocking(move || { + Kzg::setup( + &format!("{}/{}", path, Self::G1POINT), + "", + &format!("{}/{}", path, Self::G2POINT), + Self::SRSORDER, + srs_points_to_load, + "".to_string(), + ) + }); + let kzg = kzg_handle.await.map_err(|e| { + tracing::error!("Failed to setup KZG: {:?}", e); + VerificationError::KzgError + })?.map_err(|e| { tracing::error!("Failed to setup KZG: {:?}", e); VerificationError::KzgError })?; diff --git a/core/node/node_framework/src/implementations/layers/da_clients/eigen.rs b/core/node/node_framework/src/implementations/layers/da_clients/eigen.rs index ee5755d85c1..3eb9675c03b 100644 --- a/core/node/node_framework/src/implementations/layers/da_clients/eigen.rs +++ b/core/node/node_framework/src/implementations/layers/da_clients/eigen.rs @@ -74,4 +74,8 @@ impl GetBlobData for GetBlobFromDB { .await?; Ok(batch.map(|b| b.pubdata)) } + + fn clone_boxed(&self) -> Box { + Box::new(self.clone()) + } }