Skip to content

Commit

Permalink
prover_server: separate dev logic
Browse files Browse the repository at this point in the history
  • Loading branch information
fborello-lambda committed Nov 25, 2024
1 parent 95dcec7 commit 45f8d3a
Showing 1 changed file with 151 additions and 160 deletions.
311 changes: 151 additions & 160 deletions crates/l2/proposer/prover_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,27 @@ impl ProverServer {

pub async fn run(&mut self, server_config: &ProverServerConfig) {
loop {
if let Err(err) = self.clone().main_logic(server_config).await {
error!("Prover Server Error: {}", err);
let result = if server_config.dev_mode {
self.main_logic_dev().await
} else {
info!("Prover Server Shut Down");
break;
self.clone().main_logic(server_config).await
};

match result {
Ok(_) => {
if !server_config.dev_mode {
warn!("Prover Server shutting down");
break;
}
}
Err(e) => {
let error_message = if !server_config.dev_mode {
format!("Prover Server, severe Error, trying to restart the main_logic function: {e}")
} else {
format!("Prover Server Dev Error: {e}")
};
error!(error_message);
}
}

sleep(Duration::from_millis(200)).await;
Expand All @@ -135,125 +151,29 @@ impl ProverServer {
mut self,
server_config: &ProverServerConfig,
) -> Result<(), ProverServerError> {
if server_config.dev_mode {
loop {
thread::sleep(Duration::from_millis(200));

let last_committed_block = EthClient::get_last_committed_block(
&self.eth_client,
self.on_chain_proposer_address,
)
.await?;

let last_verified_block = EthClient::get_last_verified_block(
&self.eth_client,
self.on_chain_proposer_address,
)
.await?;
let (tx, rx) = mpsc::channel();

if last_committed_block == u64::MAX {
debug!("No blocks commited yet");
continue;
}

if last_committed_block == last_verified_block {
debug!("No new blocks to prove");
continue;
}

info!(
"Last committed: {last_committed_block} - Last verified: {last_verified_block}"
);

// IOnChainProposer
// function verify(uint256,bytes,bytes32,bytes32)
// blockNumber, seal, imageId, journalDigest
// From crates/l2/contracts/l1/interfaces/IOnChainProposer.sol
let mut calldata = keccak(b"verify(uint256,bytes,bytes32,bytes32)")
.as_bytes()
.get(..4)
.ok_or(ProverServerError::Custom(
"Failed to get verify_proof_selector in send_proof()".to_owned(),
))?
.to_vec();
calldata.extend(H256::from_low_u64_be(last_verified_block + 1).as_bytes());
calldata.extend(H256::from_low_u64_be(128).as_bytes());
calldata.extend(H256::zero().as_bytes());
calldata.extend(H256::zero().as_bytes());
calldata.extend(H256::zero().as_bytes());
calldata.extend(H256::zero().as_bytes());
let verify_tx = self
.eth_client
.build_eip1559_transaction(
self.on_chain_proposer_address,
self.verifier_address,
calldata.into(),
Overrides {
..Default::default()
},
)
.await?;

let tx_hash = self
.eth_client
.send_eip1559_transaction(&verify_tx, &self.verifier_private_key)
.await
.unwrap();
// It should never exit the start() fn, handling errors inside the for loop of the function.
let server_handle = tokio::spawn(async move { self.start(rx).await });

info!("Sending verify transaction with tx hash: {tx_hash:#x}");
ProverServer::handle_sigint(tx, server_config).await?;

let mut retries = 1;
while self
.eth_client
.get_transaction_receipt(tx_hash)
.await?
.is_none()
{
thread::sleep(Duration::from_secs(1));
retries += 1;
if retries > 10 {
error!("Couldn't find receipt for transaction {tx_hash:#x}");
panic!("Couldn't find receipt for transaction {tx_hash:#x}");
}
}

info!(
"Mocked verify transaction sent for block {}",
last_verified_block + 1
);
}
} else {
let (tx, rx) = mpsc::channel();

let server_handle = tokio::spawn(async move { self.start(rx).await });

ProverServer::handle_sigint(tx, server_config).await?;

//
//tokio::select! {
// ret = server_handle => {
// ret??;
// info!("Program ended with Ctrl+C");
// }
//
//}
match server_handle.await {
Ok(result) => match result {
Ok(_) => (),
Err(e) => return Err(e),
},
Err(e) => return Err(e.into()),
};
match server_handle.await {
Ok(result) => match result {
Ok(_) => (),
Err(e) => return Err(e),
},
Err(e) => return Err(e.into()),
};

Ok(())
}
Ok(())
}

async fn handle_sigint(
tx: mpsc::Sender<()>,
config: &ProverServerConfig,
) -> Result<(), ProverServerError> {
let mut sigint = signal(SignalKind::interrupt()).expect("Failed to create SIGINT stream");
let mut sigint = signal(SignalKind::interrupt())?;
sigint.recv().await.ok_or(SigIntError::Recv)?;
tx.send(()).map_err(SigIntError::Send)?;
TcpStream::connect(format!("{}:{}", config.listen_ip, config.listen_port))?
Expand All @@ -267,14 +187,25 @@ impl ProverServer {
let listener = TcpListener::bind(format!("{}:{}", self.ip, self.port))?;

info!("Starting TCP server at {}:{}", self.ip, self.port);

for stream in listener.incoming() {
if let Ok(()) = rx.try_recv() {
info!("Shutting down Prover Server");
break;
}
match stream {
Ok(stream) => {
debug!("Connection established!");

debug!("Connection established!");
self.handle_connection(stream?).await?;
if let Ok(()) = rx.try_recv() {
info!("Shutting down Prover Server");
break;
}

if let Err(e) = self.handle_connection(stream).await {
error!("Error handling connection: {}", e);
}
}
Err(e) => {
error!("Failed to accept connection: {}", e);
}
}
}
Ok(())
}
Expand All @@ -300,7 +231,9 @@ impl ProverServer {

self.handle_proof_submission(block_number, receipt).await?;

assert!(block_number == (self.last_verified_block + 1), "Prover Client submitted an invalid block_number: {block_number}. The last_proved_block is: {}", self.last_verified_block);
if block_number != (self.last_verified_block + 1) {
return Err(ProverServerError::Custom(format!("Prover Client submitted an invalid block_number: {block_number}. The last_proved_block is: {}", self.last_verified_block)));
}
self.last_verified_block = block_number;
}
Err(e) => {
Expand All @@ -322,7 +255,10 @@ impl ProverServer {
) -> Result<(), ProverServerError> {
debug!("Request received");

let latest_block_number = self.store.get_latest_block_number()?.unwrap();
let latest_block_number = self
.store
.get_latest_block_number()?
.ok_or(ProverServerError::StorageDataIsNone)?;

let response = if block_number > latest_block_number {
let response = ProofData::Response {
Expand Down Expand Up @@ -373,58 +309,33 @@ impl ProverServer {
Ok(inner) => {
// The SELECTOR is used to perform an extra check inside the groth16 verifier contract.
let mut selector =
hex::encode(inner.verifier_parameters.as_bytes().get(..4).unwrap());
hex::encode(inner.verifier_parameters.as_bytes().get(..4).ok_or(
ProverServerError::Custom(
"Failed to get verify_proof_selector in send_proof()".to_owned(),
),
)?);
let seal = hex::encode(inner.clone().seal);
selector.push_str(&seal);
hex::decode(selector).unwrap()
hex::decode(selector).map_err(|e| {
ProverServerError::Custom(format!("Failed to hex::decode(selector): {e}"))
})?
}
Err(_) => vec![32; 0],
};

let mut image_id: [u32; 8] = [0; 8];
for (i, b) in image_id.iter_mut().enumerate() {
*b = *receipt.1.get(i).unwrap();
*b = *receipt.1.get(i).ok_or(ProverServerError::Custom(
"Failed to get image_id in handle_proof_submission()".to_owned(),
))?;
}

let image_id: risc0_zkvm::sha::Digest = image_id.into();

let journal_digest = Digestible::digest(&receipt.0.journal);

// Retry proof verification, the transaction will fail if the block wasn't committed.
// It's being caused by the prover_server advancing faster than the block_generation_time + commitment_tx_approval_time
// The error message is `block not committed`. Retrying 100 times, if there is another error it panics.
let mut attempts = 0;
let max_retries = 100;
let retry_secs = std::time::Duration::from_secs(5);
while attempts < max_retries {
match self
.send_proof(block_number, &seal, image_id, journal_digest)
.await
{
Ok(tx_hash) => {
info!(
"Sent proof for block {block_number}, with transaction hash {tx_hash:#x}"
);
break; // Exit the while loop
}

Err(e) => {
warn!("Failed to send proof to block {block_number:#x}. Error: {e}");
let eth_client_error = format!("{e}");
if eth_client_error.contains("block not committed") {
attempts += 1;
if attempts < max_retries {
warn!("Retrying... Attempt {}/{}", attempts, max_retries);
sleep(retry_secs).await; // Wait before retrying
} else {
return Err(e);
}
} else {
return Err(e);
}
}
}
}
self.send_proof(block_number, &seal, image_id, journal_digest)
.await?;

Ok(())
}
Expand Down Expand Up @@ -464,7 +375,7 @@ impl ProverServer {
image_id: Digest,
journal_digest: Digest,
) -> Result<H256, ProverServerError> {
info!("Sending proof");
debug!("Sending proof for {block_number}");
let mut calldata = Vec::new();

// IOnChainProposer
Expand Down Expand Up @@ -531,6 +442,86 @@ impl ProverServer {
)
.await?;

info!("Sent proof for block {block_number}, with transaction hash {verify_tx_hash:#x}");

Ok(verify_tx_hash)
}

pub async fn main_logic_dev(&self) -> Result<(), ProverServerError> {
loop {
thread::sleep(Duration::from_millis(200));

let last_committed_block = EthClient::get_last_committed_block(
&self.eth_client,
self.on_chain_proposer_address,
)
.await?;

let last_verified_block = EthClient::get_last_verified_block(
&self.eth_client,
self.on_chain_proposer_address,
)
.await?;

if last_committed_block == u64::MAX {
debug!("No blocks commited yet");
continue;
}

if last_committed_block == last_verified_block {
debug!("No new blocks to prove");
continue;
}

info!("Last committed: {last_committed_block} - Last verified: {last_verified_block}");

// IOnChainProposer
// function verify(uint256,bytes,bytes32,bytes32)
// blockNumber, seal, imageId, journalDigest
// From crates/l2/contracts/l1/interfaces/IOnChainProposer.sol
let mut calldata = keccak(b"verify(uint256,bytes,bytes32,bytes32)")
.as_bytes()
.get(..4)
.ok_or(ProverServerError::Custom(
"Failed to get verify_proof_selector in send_proof()".to_owned(),
))?
.to_vec();
calldata.extend(H256::from_low_u64_be(last_verified_block + 1).as_bytes());
calldata.extend(H256::from_low_u64_be(128).as_bytes());
calldata.extend(H256::zero().as_bytes());
calldata.extend(H256::zero().as_bytes());
calldata.extend(H256::zero().as_bytes());
calldata.extend(H256::zero().as_bytes());
let verify_tx = self
.eth_client
.build_eip1559_transaction(
self.on_chain_proposer_address,
self.verifier_address,
calldata.into(),
Overrides {
..Default::default()
},
)
.await?;

info!("Sending verify transaction.");

let verify_tx_hash = self
.eth_client
.send_wrapped_transaction_with_retry(
&WrappedTransaction::EIP1559(verify_tx),
&self.verifier_private_key,
3 * 60,
10,
)
.await?;

info!("Sent proof for block {last_verified_block}, with transaction hash {verify_tx_hash:#x}");

info!(
"Mocked verify transaction sent for block {}",
last_verified_block + 1
);
}
}
}

0 comments on commit 45f8d3a

Please sign in to comment.