Skip to content

Commit

Permalink
Expect removed from various places (#197)
Browse files Browse the repository at this point in the history
* refactor: error handling added for the storage functions

* chore: removed expect

* chore: changelog

* refactor: http-client refactored to handle panic better

---------

Co-authored-by: mohiiit <[email protected]>
  • Loading branch information
Mohiiit and mohiiit authored Jan 7, 2025
1 parent 15b321b commit cb8d82f
Show file tree
Hide file tree
Showing 14 changed files with 212 additions and 116 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## Changed

- refactor: expect removed and added error wraps
- refactor: Readme and .env.example
- refactor: http_mock version updated
- refactor: prover-services renamed to prover-clients
Expand Down
34 changes: 29 additions & 5 deletions crates/orchestrator/src/data_storage/aws_s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use aws_sdk_s3::primitives::ByteStream;
use aws_sdk_s3::types::{BucketLocationConstraint, CreateBucketConfiguration};
use aws_sdk_s3::Client;
use bytes::Bytes;
use color_eyre::eyre::Context;
use color_eyre::Result;

use crate::data_storage::DataStorage;
Expand Down Expand Up @@ -50,8 +51,20 @@ impl AWSS3 {
impl DataStorage for AWSS3 {
/// Function to get the data from S3 bucket by Key.
async fn get_data(&self, key: &str) -> Result<Bytes> {
let response = self.client.get_object().bucket(&self.bucket).key(key).send().await?;
let data_stream = response.body.collect().await.expect("Failed to convert body into AggregatedBytes.");
let response = self
.client
.get_object()
.bucket(&self.bucket)
.key(key)
.send()
.await
.context(format!("Failed to get object from bucket: {}, key: {}", self.bucket, key))?;

let data_stream = response.body.collect().await.context(format!(
"Failed to collect body into AggregatedBytes for bucket: {}, key: {}",
self.bucket, key
))?;

tracing::debug!("DataStorage: Collected response body into data stream from {}, key={}", self.bucket, key);
let data_bytes = data_stream.into_bytes();
tracing::debug!(
Expand All @@ -74,7 +87,8 @@ impl DataStorage for AWSS3 {
.body(ByteStream::from(data))
.content_type("application/json")
.send()
.await?;
.await
.context(format!("Failed to put object in bucket: {}, key: {}", self.bucket, key))?;

tracing::debug!(
log_type = "DataStorage",
Expand All @@ -88,7 +102,12 @@ impl DataStorage for AWSS3 {

async fn create_bucket(&self, bucket_name: &str) -> Result<()> {
if self.bucket_location_constraint.as_str() == "us-east-1" {
self.client.create_bucket().bucket(bucket_name).send().await?;
self.client
.create_bucket()
.bucket(bucket_name)
.send()
.await
.context(format!("Failed to create bucket: {} in us-east-1", bucket_name))?;
return Ok(());
}

Expand All @@ -101,7 +120,12 @@ impl DataStorage for AWSS3 {
.bucket(bucket_name)
.set_create_bucket_configuration(bucket_configuration)
.send()
.await?;
.await
.context(format!(
"Failed to create bucket: {} in region: {}",
bucket_name,
self.bucket_location_constraint.as_str()
))?;

Ok(())
}
Expand Down
46 changes: 25 additions & 21 deletions crates/orchestrator/src/jobs/da_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,11 @@ impl Job for DaJob {
let blob_data_biguint = convert_to_biguint(blob_data.clone());
tracing::trace!(job_id = ?job.id, "Converted blob data to BigUint");

let transformed_data = fft_transformation(blob_data_biguint);
let transformed_data =
fft_transformation(blob_data_biguint).wrap_err("Failed to apply FFT transformation").map_err(|e| {
tracing::error!(job_id = ?job.id, error = ?e, "Failed to apply FFT transformation");
JobError::Other(OtherError(e))
})?;
// data transformation on the data
tracing::trace!(job_id = ?job.id, "Applied FFT transformation");

Expand Down Expand Up @@ -204,17 +208,18 @@ impl Job for DaJob {
}

#[tracing::instrument(skip(elements))]
pub fn fft_transformation(elements: Vec<BigUint>) -> Vec<BigUint> {
pub fn fft_transformation(elements: Vec<BigUint>) -> Result<Vec<BigUint>, JobError> {
let xs: Vec<BigUint> = (0..*BLOB_LEN)
.map(|i| {
let bin = format!("{:012b}", i);
let bin_rev = bin.chars().rev().collect::<String>();
GENERATOR.modpow(
&BigUint::from_str_radix(&bin_rev, 2).expect("Not able to convert the parameters into exponent."),
&BLS_MODULUS,
)
let exponent = BigUint::from_str_radix(&bin_rev, 2)
.wrap_err("Failed to convert binary string to exponent")
.map_err(|e| JobError::Other(OtherError(e)))?;
Ok(GENERATOR.modpow(&exponent, &BLS_MODULUS))
})
.collect();
.collect::<Result<Vec<BigUint>, JobError>>()?;

let n = elements.len();
let mut transform: Vec<BigUint> = vec![BigUint::zero(); n];

Expand All @@ -223,7 +228,7 @@ pub fn fft_transformation(elements: Vec<BigUint>) -> Vec<BigUint> {
transform[i] = (transform[i].clone().mul(&xs[i]).add(&elements[j])).rem(&*BLS_MODULUS);
}
}
transform
Ok(transform)
}

pub fn convert_to_biguint(elements: Vec<Felt>) -> Vec<BigUint> {
Expand Down Expand Up @@ -310,7 +315,7 @@ pub async fn state_update_to_blob_data(

nonce = Some(get_current_nonce_result);
}
let da_word = da_word(class_flag.is_some(), nonce, storage_entries.len() as u64);
let da_word = da_word(class_flag.is_some(), nonce, storage_entries.len() as u64)?;
blob_data.push(address);
blob_data.push(da_word);

Expand Down Expand Up @@ -355,7 +360,7 @@ async fn store_blob_data(blob_data: Vec<BigUint>, block_number: u64, config: Arc
/// DA word encoding:
/// |---padding---|---class flag---|---new nonce---|---num changes---|
/// 127 bits 1 bit 64 bits 64 bits
fn da_word(class_flag: bool, nonce_change: Option<Felt>, num_changes: u64) -> Felt {
fn da_word(class_flag: bool, nonce_change: Option<Felt>, num_changes: u64) -> Result<Felt, JobError> {
// padding of 127 bits
let mut binary_string = "0".repeat(127);

Expand All @@ -367,13 +372,8 @@ fn da_word(class_flag: bool, nonce_change: Option<Felt>, num_changes: u64) -> Fe
}

// checking for nonce here
if let Some(_new_nonce) = nonce_change {
let bytes: [u8; 32] = nonce_change
.expect(
"Not able to convert the nonce_change var into [u8; 32] type. Possible Error : Improper parameter \
length.",
)
.to_bytes_be();
if let Some(new_nonce) = nonce_change {
let bytes: [u8; 32] = new_nonce.to_bytes_be();
let biguint = BigUint::from_bytes_be(&bytes);
let binary_string_local = format!("{:b}", biguint);
let padded_binary_string = format!("{:0>64}", binary_string_local);
Expand All @@ -387,12 +387,16 @@ fn da_word(class_flag: bool, nonce_change: Option<Felt>, num_changes: u64) -> Fe
let padded_binary_string = format!("{:0>64}", binary_representation);
binary_string += &padded_binary_string;

let biguint = BigUint::from_str_radix(binary_string.as_str(), 2).expect("Invalid binary string");
let biguint = BigUint::from_str_radix(binary_string.as_str(), 2)
.wrap_err("Failed to convert binary string to BigUint")
.map_err(|e| JobError::Other(OtherError(e)))?;

// Now convert the BigUint to a decimal string
let decimal_string = biguint.to_str_radix(10);

Felt::from_dec_str(&decimal_string).expect("issue while converting to fieldElement")
Felt::from_dec_str(&decimal_string)
.wrap_err("Failed to convert decimal string to FieldElement")
.map_err(|e| JobError::Other(OtherError(e)))
}

fn refactor_state_update(state_update: &mut StateDiff) {
Expand Down Expand Up @@ -453,7 +457,7 @@ pub mod test {
#[case] expected: String,
) {
let new_nonce = if new_nonce > 0 { Some(Felt::from(new_nonce)) } else { None };
let da_word = da_word(class_flag, new_nonce, num_changes);
let da_word = da_word(class_flag, new_nonce, num_changes).expect("Failed to create DA word");
let expected = Felt::from_dec_str(expected.as_str()).unwrap();
assert_eq!(da_word, expected);
}
Expand Down Expand Up @@ -562,7 +566,7 @@ pub mod test {
// converting the data to its original format
let ifft_blob_data = blob::recover(original_blob_data.clone());
// applying the fft function again on the original format
let fft_blob_data = fft_transformation(ifft_blob_data);
let fft_blob_data = fft_transformation(ifft_blob_data).expect("FFT transformation failed during test");

// ideally the data after fft transformation and the data before ifft should be same.
assert_eq!(fft_blob_data, original_blob_data);
Expand Down
31 changes: 20 additions & 11 deletions crates/orchestrator/src/jobs/state_update_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl Job for StateUpdateJob {
for block_no in block_numbers.iter() {
tracing::debug!(job_id = %job.internal_id, block_no = %block_no, "Processing block");

let snos = self.fetch_snos_for_block(*block_no, config.clone()).await;
let snos = self.fetch_snos_for_block(*block_no, config.clone()).await?;
let txn_hash = self
.update_state_for_block(config.clone(), *block_no, snos, nonce)
.await
Expand Down Expand Up @@ -320,7 +320,7 @@ impl StateUpdateJob {
.await
.map_err(|e| JobError::Other(OtherError(e)))?;

let program_output = self.fetch_program_output_for_block(block_no, config.clone()).await;
let program_output = self.fetch_program_output_for_block(block_no, config.clone()).await?;

// TODO :
// Fetching nonce before the transaction is run
Expand All @@ -336,21 +336,30 @@ impl StateUpdateJob {
}

/// Retrieves the SNOS output for the corresponding block.
async fn fetch_snos_for_block(&self, block_no: u64, config: Arc<Config>) -> StarknetOsOutput {
async fn fetch_snos_for_block(&self, block_no: u64, config: Arc<Config>) -> Result<StarknetOsOutput, JobError> {
let storage_client = config.storage();
let key = block_no.to_string() + "/" + SNOS_OUTPUT_FILE_NAME;
let snos_output_bytes = storage_client.get_data(&key).await.expect("Unable to fetch snos output for block");
serde_json::from_slice(snos_output_bytes.iter().as_slice())
.expect("Unable to convert the data into snos output")

let snos_output_bytes = storage_client.get_data(&key).await.map_err(|e| JobError::Other(OtherError(e)))?;

serde_json::from_slice(snos_output_bytes.iter().as_slice()).map_err(|e| {
JobError::Other(OtherError(eyre!("Failed to deserialize SNOS output for block {}: {}", block_no, e)))
})
}

async fn fetch_program_output_for_block(&self, block_number: u64, config: Arc<Config>) -> Vec<[u8; 32]> {
async fn fetch_program_output_for_block(
&self,
block_number: u64,
config: Arc<Config>,
) -> Result<Vec<[u8; 32]>, JobError> {
let storage_client = config.storage();
let key = block_number.to_string() + "/" + PROGRAM_OUTPUT_FILE_NAME;
let program_output = storage_client.get_data(&key).await.expect("Unable to fetch snos output for block");
let decode_data: Vec<[u8; 32]> =
bincode::deserialize(&program_output).expect("Unable to decode the fetched data from storage provider.");
decode_data

let program_output = storage_client.get_data(&key).await.map_err(|e| JobError::Other(OtherError(e)))?;

bincode::deserialize(&program_output).map_err(|e| {
JobError::Other(OtherError(eyre!("Failed to deserialize program output for block {}: {}", block_number, e)))
})
}

/// Insert the tx hashes into the the metadata for the attempt number - will be used later by
Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/jobs/state_update_job/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub fn bytes_to_vec_u8(bytes: &[u8]) -> color_eyre::Result<Vec<[u8; 32]>> {
let trimmed = line.trim();
assert!(!trimmed.is_empty());

let result = U256::from_str(trimmed).expect("Unable to convert line");
let result = U256::from_str(trimmed)?;
let res_vec = result.to_be_bytes_vec();
let hex = to_padded_hex(res_vec.as_slice());
let vec_hex = hex_string_to_u8_vec(&hex)
Expand Down
14 changes: 11 additions & 3 deletions crates/orchestrator/src/queue/job_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,14 @@ pub struct WorkerTriggerMessage {
pub worker: WorkerTriggerType,
}

#[derive(Error, Debug)]
pub enum WorkerTriggerTypeError {
#[error("Unknown WorkerTriggerType: {0}")]
UnknownType(String),
}

impl FromStr for WorkerTriggerType {
type Err = String;
type Err = WorkerTriggerTypeError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
Expand All @@ -68,7 +74,7 @@ impl FromStr for WorkerTriggerType {
"ProofRegistration" => Ok(WorkerTriggerType::ProofRegistration),
"DataSubmission" => Ok(WorkerTriggerType::DataSubmission),
"UpdateState" => Ok(WorkerTriggerType::UpdateState),
_ => Err(format!("Unknown WorkerTriggerType: {}", s)),
_ => Err(WorkerTriggerTypeError::UnknownType(s.to_string())),
}
}
}
Expand Down Expand Up @@ -231,7 +237,9 @@ fn parse_worker_message(message: &Delivery) -> Result<Option<WorkerTriggerMessag
.borrow_payload()
.ok_or_else(|| ConsumptionError::Other(OtherError::from("Empty payload".to_string())))?;
let message_string = String::from_utf8_lossy(payload).to_string().trim_matches('\"').to_string();
let trigger_type = WorkerTriggerType::from_str(message_string.as_str()).expect("trigger type unwrapping failed");
let trigger_type = WorkerTriggerType::from_str(message_string.as_str())
.wrap_err("Failed to parse worker trigger type from message")
.map_err(|e| ConsumptionError::Other(OtherError::from(e)))?;
Ok(Some(WorkerTriggerMessage { worker: trigger_type }))
}

Expand Down
12 changes: 8 additions & 4 deletions crates/orchestrator/src/workers/snos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::collections::HashMap;
use std::sync::Arc;

use async_trait::async_trait;
use color_eyre::eyre::WrapErr;
use opentelemetry::KeyValue;
use starknet::providers::Provider;

Expand Down Expand Up @@ -35,10 +36,13 @@ impl Worker for SnosWorker {

let latest_job_in_db = config.database().get_latest_job_by_type(JobType::SnosRun).await?;

let latest_job_id = match latest_job_in_db {
Some(job) => job.internal_id.parse::<u64>().expect("Failed to parse job internal ID to u64"),
None => "0".to_string().parse::<u64>().expect("Failed to parse '0' to u64"),
};
let latest_job_id = latest_job_in_db
.map(|job| {
job.internal_id
.parse::<u64>()
.wrap_err_with(|| format!("Failed to parse job internal ID: {}", job.internal_id))
})
.unwrap_or(Ok(0))?;

// To be used when testing in specific block range
let block_start = if let Some(min_block_to_process) = config.service_config().min_block_to_process {
Expand Down
24 changes: 10 additions & 14 deletions crates/prover-clients/atlantic-service/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ impl AtlanticClient {
let prover_type = atlantic_params.atlantic_prover_type.clone();

let client = HttpClient::builder(url.as_str())
.expect("Failed to create HTTP client builder")
.default_form_data("mockFactHash", &mock_fact_hash)
.default_form_data("proverType", &prover_type)
.build()
Expand All @@ -56,7 +57,7 @@ impl AtlanticClient {
let proving_layer: Box<dyn ProvingLayer> = match atlantic_params.atlantic_settlement_layer.as_str() {
"ethereum" => Box::new(EthereumLayer),
"starknet" => Box::new(StarknetLayer),
_ => panic!("proving layer not correct"),
_ => panic!("Invalid settlement layer: {}", atlantic_params.atlantic_settlement_layer),
};

Self { client, proving_layer }
Expand All @@ -66,7 +67,7 @@ impl AtlanticClient {
&self,
pie_file: &Path,
proof_layout: LayoutName,
atlantic_api_key: String,
atlantic_api_key: impl AsRef<str>,
) -> Result<AtlanticAddJobResponse, AtlanticError> {
let proof_layout = match proof_layout {
LayoutName::dynamic => "dynamic",
Expand All @@ -76,22 +77,17 @@ impl AtlanticClient {
let response = self
.proving_layer
.customize_request(
self.client
.request()
.method(Method::POST)
.query_param("apiKey", &atlantic_api_key)
.form_file("pieFile", pie_file, "pie.zip")
.form_text("layout", proof_layout),
self.client.request().method(Method::POST).query_param("apiKey", atlantic_api_key.as_ref()),
)
.form_file("pieFile", pie_file, "pie.zip")?
.form_text("layout", proof_layout)
.send()
.await
.map_err(AtlanticError::AddJobFailure)
.expect("Failed to add job");
.map_err(AtlanticError::AddJobFailure)?;

if response.status().is_success() {
response.json().await.map_err(AtlanticError::AddJobFailure)
} else {
Err(AtlanticError::SharpService(response.status()))
match response.status().is_success() {
true => response.json().await.map_err(AtlanticError::AddJobFailure),
false => Err(AtlanticError::SharpService(response.status())),
}
}

Expand Down
Loading

0 comments on commit cb8d82f

Please sign in to comment.