From 45f8d3a72466586bc3fe8213d1d9a83f2c4d20e4 Mon Sep 17 00:00:00 2001 From: fborello-lambda Date: Mon, 25 Nov 2024 10:16:10 -0300 Subject: [PATCH] prover_server: separate dev logic --- crates/l2/proposer/prover_server.rs | 311 ++++++++++++++-------------- 1 file changed, 151 insertions(+), 160 deletions(-) diff --git a/crates/l2/proposer/prover_server.rs b/crates/l2/proposer/prover_server.rs index f34a9c57d..91c6c492c 100644 --- a/crates/l2/proposer/prover_server.rs +++ b/crates/l2/proposer/prover_server.rs @@ -120,11 +120,27 @@ impl ProverServer { pub async fn run(&mut self, server_config: &ProverServerConfig) { loop { - if let Err(err) = self.clone().main_logic(server_config).await { - error!("Prover Server Error: {}", err); + let result = if server_config.dev_mode { + self.main_logic_dev().await } else { - info!("Prover Server Shut Down"); - break; + self.clone().main_logic(server_config).await + }; + + match result { + Ok(_) => { + if !server_config.dev_mode { + warn!("Prover Server shutting down"); + break; + } + } + Err(e) => { + let error_message = if !server_config.dev_mode { + format!("Prover Server, severe Error, trying to restart the main_logic function: {e}") + } else { + format!("Prover Server Dev Error: {e}") + }; + error!(error_message); + } } sleep(Duration::from_millis(200)).await; @@ -135,125 +151,29 @@ impl ProverServer { mut self, server_config: &ProverServerConfig, ) -> Result<(), ProverServerError> { - if server_config.dev_mode { - loop { - thread::sleep(Duration::from_millis(200)); - - let last_committed_block = EthClient::get_last_committed_block( - &self.eth_client, - self.on_chain_proposer_address, - ) - .await?; - - let last_verified_block = EthClient::get_last_verified_block( - &self.eth_client, - self.on_chain_proposer_address, - ) - .await?; + let (tx, rx) = mpsc::channel(); - if last_committed_block == u64::MAX { - debug!("No blocks commited yet"); - continue; - } - - if last_committed_block == last_verified_block { - debug!("No new blocks to prove"); - continue; - } - - info!( - "Last committed: {last_committed_block} - Last verified: {last_verified_block}" - ); - - // IOnChainProposer - // function verify(uint256,bytes,bytes32,bytes32) - // blockNumber, seal, imageId, journalDigest - // From crates/l2/contracts/l1/interfaces/IOnChainProposer.sol - let mut calldata = keccak(b"verify(uint256,bytes,bytes32,bytes32)") - .as_bytes() - .get(..4) - .ok_or(ProverServerError::Custom( - "Failed to get verify_proof_selector in send_proof()".to_owned(), - ))? - .to_vec(); - calldata.extend(H256::from_low_u64_be(last_verified_block + 1).as_bytes()); - calldata.extend(H256::from_low_u64_be(128).as_bytes()); - calldata.extend(H256::zero().as_bytes()); - calldata.extend(H256::zero().as_bytes()); - calldata.extend(H256::zero().as_bytes()); - calldata.extend(H256::zero().as_bytes()); - let verify_tx = self - .eth_client - .build_eip1559_transaction( - self.on_chain_proposer_address, - self.verifier_address, - calldata.into(), - Overrides { - ..Default::default() - }, - ) - .await?; - - let tx_hash = self - .eth_client - .send_eip1559_transaction(&verify_tx, &self.verifier_private_key) - .await - .unwrap(); + // It should never exit the start() fn, handling errors inside the for loop of the function. + let server_handle = tokio::spawn(async move { self.start(rx).await }); - info!("Sending verify transaction with tx hash: {tx_hash:#x}"); + ProverServer::handle_sigint(tx, server_config).await?; - let mut retries = 1; - while self - .eth_client - .get_transaction_receipt(tx_hash) - .await? - .is_none() - { - thread::sleep(Duration::from_secs(1)); - retries += 1; - if retries > 10 { - error!("Couldn't find receipt for transaction {tx_hash:#x}"); - panic!("Couldn't find receipt for transaction {tx_hash:#x}"); - } - } - - info!( - "Mocked verify transaction sent for block {}", - last_verified_block + 1 - ); - } - } else { - let (tx, rx) = mpsc::channel(); - - let server_handle = tokio::spawn(async move { self.start(rx).await }); - - ProverServer::handle_sigint(tx, server_config).await?; - - // - //tokio::select! { - // ret = server_handle => { - // ret??; - // info!("Program ended with Ctrl+C"); - // } - // - //} - match server_handle.await { - Ok(result) => match result { - Ok(_) => (), - Err(e) => return Err(e), - }, - Err(e) => return Err(e.into()), - }; + match server_handle.await { + Ok(result) => match result { + Ok(_) => (), + Err(e) => return Err(e), + }, + Err(e) => return Err(e.into()), + }; - Ok(()) - } + Ok(()) } async fn handle_sigint( tx: mpsc::Sender<()>, config: &ProverServerConfig, ) -> Result<(), ProverServerError> { - let mut sigint = signal(SignalKind::interrupt()).expect("Failed to create SIGINT stream"); + let mut sigint = signal(SignalKind::interrupt())?; sigint.recv().await.ok_or(SigIntError::Recv)?; tx.send(()).map_err(SigIntError::Send)?; TcpStream::connect(format!("{}:{}", config.listen_ip, config.listen_port))? @@ -267,14 +187,25 @@ impl ProverServer { let listener = TcpListener::bind(format!("{}:{}", self.ip, self.port))?; info!("Starting TCP server at {}:{}", self.ip, self.port); + for stream in listener.incoming() { - if let Ok(()) = rx.try_recv() { - info!("Shutting down Prover Server"); - break; - } + match stream { + Ok(stream) => { + debug!("Connection established!"); - debug!("Connection established!"); - self.handle_connection(stream?).await?; + if let Ok(()) = rx.try_recv() { + info!("Shutting down Prover Server"); + break; + } + + if let Err(e) = self.handle_connection(stream).await { + error!("Error handling connection: {}", e); + } + } + Err(e) => { + error!("Failed to accept connection: {}", e); + } + } } Ok(()) } @@ -300,7 +231,9 @@ impl ProverServer { self.handle_proof_submission(block_number, receipt).await?; - assert!(block_number == (self.last_verified_block + 1), "Prover Client submitted an invalid block_number: {block_number}. The last_proved_block is: {}", self.last_verified_block); + if block_number != (self.last_verified_block + 1) { + return Err(ProverServerError::Custom(format!("Prover Client submitted an invalid block_number: {block_number}. The last_proved_block is: {}", self.last_verified_block))); + } self.last_verified_block = block_number; } Err(e) => { @@ -322,7 +255,10 @@ impl ProverServer { ) -> Result<(), ProverServerError> { debug!("Request received"); - let latest_block_number = self.store.get_latest_block_number()?.unwrap(); + let latest_block_number = self + .store + .get_latest_block_number()? + .ok_or(ProverServerError::StorageDataIsNone)?; let response = if block_number > latest_block_number { let response = ProofData::Response { @@ -373,58 +309,33 @@ impl ProverServer { Ok(inner) => { // The SELECTOR is used to perform an extra check inside the groth16 verifier contract. let mut selector = - hex::encode(inner.verifier_parameters.as_bytes().get(..4).unwrap()); + hex::encode(inner.verifier_parameters.as_bytes().get(..4).ok_or( + ProverServerError::Custom( + "Failed to get verify_proof_selector in send_proof()".to_owned(), + ), + )?); let seal = hex::encode(inner.clone().seal); selector.push_str(&seal); - hex::decode(selector).unwrap() + hex::decode(selector).map_err(|e| { + ProverServerError::Custom(format!("Failed to hex::decode(selector): {e}")) + })? } Err(_) => vec![32; 0], }; let mut image_id: [u32; 8] = [0; 8]; for (i, b) in image_id.iter_mut().enumerate() { - *b = *receipt.1.get(i).unwrap(); + *b = *receipt.1.get(i).ok_or(ProverServerError::Custom( + "Failed to get image_id in handle_proof_submission()".to_owned(), + ))?; } let image_id: risc0_zkvm::sha::Digest = image_id.into(); let journal_digest = Digestible::digest(&receipt.0.journal); - // Retry proof verification, the transaction will fail if the block wasn't committed. - // It's being caused by the prover_server advancing faster than the block_generation_time + commitment_tx_approval_time - // The error message is `block not committed`. Retrying 100 times, if there is another error it panics. - let mut attempts = 0; - let max_retries = 100; - let retry_secs = std::time::Duration::from_secs(5); - while attempts < max_retries { - match self - .send_proof(block_number, &seal, image_id, journal_digest) - .await - { - Ok(tx_hash) => { - info!( - "Sent proof for block {block_number}, with transaction hash {tx_hash:#x}" - ); - break; // Exit the while loop - } - - Err(e) => { - warn!("Failed to send proof to block {block_number:#x}. Error: {e}"); - let eth_client_error = format!("{e}"); - if eth_client_error.contains("block not committed") { - attempts += 1; - if attempts < max_retries { - warn!("Retrying... Attempt {}/{}", attempts, max_retries); - sleep(retry_secs).await; // Wait before retrying - } else { - return Err(e); - } - } else { - return Err(e); - } - } - } - } + self.send_proof(block_number, &seal, image_id, journal_digest) + .await?; Ok(()) } @@ -464,7 +375,7 @@ impl ProverServer { image_id: Digest, journal_digest: Digest, ) -> Result { - info!("Sending proof"); + debug!("Sending proof for {block_number}"); let mut calldata = Vec::new(); // IOnChainProposer @@ -531,6 +442,86 @@ impl ProverServer { ) .await?; + info!("Sent proof for block {block_number}, with transaction hash {verify_tx_hash:#x}"); + Ok(verify_tx_hash) } + + pub async fn main_logic_dev(&self) -> Result<(), ProverServerError> { + loop { + thread::sleep(Duration::from_millis(200)); + + let last_committed_block = EthClient::get_last_committed_block( + &self.eth_client, + self.on_chain_proposer_address, + ) + .await?; + + let last_verified_block = EthClient::get_last_verified_block( + &self.eth_client, + self.on_chain_proposer_address, + ) + .await?; + + if last_committed_block == u64::MAX { + debug!("No blocks commited yet"); + continue; + } + + if last_committed_block == last_verified_block { + debug!("No new blocks to prove"); + continue; + } + + info!("Last committed: {last_committed_block} - Last verified: {last_verified_block}"); + + // IOnChainProposer + // function verify(uint256,bytes,bytes32,bytes32) + // blockNumber, seal, imageId, journalDigest + // From crates/l2/contracts/l1/interfaces/IOnChainProposer.sol + let mut calldata = keccak(b"verify(uint256,bytes,bytes32,bytes32)") + .as_bytes() + .get(..4) + .ok_or(ProverServerError::Custom( + "Failed to get verify_proof_selector in send_proof()".to_owned(), + ))? + .to_vec(); + calldata.extend(H256::from_low_u64_be(last_verified_block + 1).as_bytes()); + calldata.extend(H256::from_low_u64_be(128).as_bytes()); + calldata.extend(H256::zero().as_bytes()); + calldata.extend(H256::zero().as_bytes()); + calldata.extend(H256::zero().as_bytes()); + calldata.extend(H256::zero().as_bytes()); + let verify_tx = self + .eth_client + .build_eip1559_transaction( + self.on_chain_proposer_address, + self.verifier_address, + calldata.into(), + Overrides { + ..Default::default() + }, + ) + .await?; + + info!("Sending verify transaction."); + + let verify_tx_hash = self + .eth_client + .send_wrapped_transaction_with_retry( + &WrappedTransaction::EIP1559(verify_tx), + &self.verifier_private_key, + 3 * 60, + 10, + ) + .await?; + + info!("Sent proof for block {last_verified_block}, with transaction hash {verify_tx_hash:#x}"); + + info!( + "Mocked verify transaction sent for block {}", + last_verified_block + 1 + ); + } + } }