diff --git a/node/src/components/block_executor.rs b/node/src/components/block_executor.rs index 5bc6f4e7a8..44c1ea01bc 100644 --- a/node/src/components/block_executor.rs +++ b/node/src/components/block_executor.rs @@ -34,7 +34,8 @@ use crate::{ effect::{ announcements::BlockExecutorAnnouncement, requests::{ - BlockExecutorRequest, ContractRuntimeRequest, LinearChainRequest, StorageRequest, + BlockExecutorRequest, ConsensusRequest, ContractRuntimeRequest, LinearChainRequest, + StorageRequest, }, EffectBuilder, EffectExt, Effects, }, @@ -53,6 +54,7 @@ pub trait ReactorEventT: + From> + From + From + + From + Send { } @@ -63,6 +65,7 @@ impl ReactorEventT for REv where + From> + From + From + + From + Send { } @@ -439,16 +442,18 @@ impl Component for BlockExecutor { effect_builder .get_block_at_height_local(finalized_block.height()) .event(move |maybe_block| { - Event::BlockAlreadyExists(maybe_block.is_some(), finalized_block) + Event::BlockAlreadyExists(maybe_block.map(Box::new), finalized_block) }) } - Event::BlockAlreadyExists(exists, finalized_block) => { - if !exists { + Event::BlockAlreadyExists(maybe_block, finalized_block) => { + if let Some(block) = maybe_block { + effect_builder + .handle_linear_chain_block(block.take_header()) + .ignore() + } else { // If we haven't executed the block before in the past (for example during // joining), do it now. self.get_deploys(effect_builder, finalized_block) - } else { - Effects::new() } } Event::GetDeploysResult { diff --git a/node/src/components/block_executor/event.rs b/node/src/components/block_executor/event.rs index cdf5485b86..6e7438d8e3 100644 --- a/node/src/components/block_executor/event.rs +++ b/node/src/components/block_executor/event.rs @@ -17,14 +17,14 @@ use casper_types::ExecutionResult; use crate::{ crypto::hash::Digest, effect::requests::BlockExecutorRequest, - types::{BlockHash, Deploy, DeployHash, DeployHeader, FinalizedBlock}, + types::{Block, BlockHash, Deploy, DeployHash, DeployHeader, FinalizedBlock}, }; /// Block executor component event. #[derive(Debug, From)] pub enum Event { /// Indicates whether block has already been finalized and executed in the past. - BlockAlreadyExists(bool, FinalizedBlock), + BlockAlreadyExists(Option>, FinalizedBlock), /// A request made of the Block executor component. #[from] Request(BlockExecutorRequest), @@ -150,12 +150,12 @@ impl Display for Event { state.state_root_hash, result ), - Event::BlockAlreadyExists(flag, fb) => { + Event::BlockAlreadyExists(maybe_block, fb) => { write!( f, "Block at height: {} was executed before: {}", fb.height(), - flag + maybe_block.is_some() ) } } diff --git a/node/src/components/consensus.rs b/node/src/components/consensus.rs index e14e28f503..2cad2129bb 100644 --- a/node/src/components/consensus.rs +++ b/node/src/components/consensus.rs @@ -99,6 +99,8 @@ pub enum Event { }, /// An event instructing us to shutdown if the latest era received no votes Shutdown, + /// An event fired when the joiner reactor transitions into validator. + FinishedJoining(Timestamp), } impl Debug for ConsensusMessage { @@ -186,6 +188,9 @@ impl Display for Event { booking_block_hash, key_block_seed, get_validators_result ), Event::Shutdown => write!(f, "Shutdown if current era is inactive"), + Event::FinishedJoining(timestamp) => { + write!(f, "The node finished joining the network at {}", timestamp) + } } } } @@ -307,6 +312,7 @@ where ) } Event::Shutdown => handling_es.shutdown_if_necessary(), + Event::FinishedJoining(timestamp) => handling_es.finished_joining(timestamp), } } } diff --git a/node/src/components/consensus/era_supervisor.rs b/node/src/components/consensus/era_supervisor.rs index df8561e88a..fc27899d49 100644 --- a/node/src/components/consensus/era_supervisor.rs +++ b/node/src/components/consensus/era_supervisor.rs @@ -98,6 +98,8 @@ pub struct EraSupervisor { next_block_height: u64, #[data_size(skip)] metrics: ConsensusMetrics, + // TODO: discuss this quick fix + finished_joining: bool, } impl Debug for EraSupervisor { @@ -143,6 +145,7 @@ where bonded_eras, next_block_height: 0, metrics, + finished_joining: false, }; let results = era_supervisor.new_era( @@ -257,6 +260,9 @@ where } else if !validators.contains_key(&our_id) { info!(era = era_id.0, %our_id, "not voting; not a validator"); false + } else if !self.finished_joining { + info!(era = era_id.0, "not voting; still joining"); + false } else { info!(era = era_id.0, "start voting"); true @@ -283,7 +289,14 @@ where Vec::new() }; - let era = Era::new(consensus, start_height, newly_slashed, slashed, validators); + let era = Era::new( + consensus, + start_time, + start_height, + newly_slashed, + slashed, + validators, + ); let _ = self.active_eras.insert(era_id, era); // Remove the era that has become obsolete now. We keep 2 * bonded_eras past eras because @@ -312,6 +325,26 @@ where pub(crate) fn active_eras(&self) -> &HashMap> { &self.active_eras } + + /// To be called when we transition from the joiner to the validator reactor. + pub(crate) fn finished_joining( + &mut self, + now: Timestamp, + ) -> Vec> { + self.finished_joining = true; + let secret = Keypair::new(Rc::clone(&self.secret_signing_key), self.public_signing_key); + let public_key = self.public_signing_key; + self.active_eras + .get_mut(&self.current_era) + .map(|era| { + if era.start_time > now && era.validators().contains_key(&public_key) { + era.consensus.activate_validator(public_key, secret, now) + } else { + Vec::new() + } + }) + .unwrap_or_default() + } } /// A mutable `EraSupervisor` reference, together with an `EffectBuilder`. @@ -783,6 +816,11 @@ where Default::default() } } + + pub(crate) fn finished_joining(&mut self, now: Timestamp) -> Effects> { + let results = self.era_supervisor.finished_joining(now); + self.handle_consensus_results(self.era_supervisor.current_era, results) + } } /// Computes the instance ID for an era, given the state root hash, block height and chainspec. diff --git a/node/src/components/consensus/era_supervisor/era.rs b/node/src/components/consensus/era_supervisor/era.rs index 9921369753..0478432c37 100644 --- a/node/src/components/consensus/era_supervisor/era.rs +++ b/node/src/components/consensus/era_supervisor/era.rs @@ -20,7 +20,7 @@ use crate::{ ConsensusMessage, }, crypto::asymmetric_key::PublicKey, - types::ProtoBlock, + types::{ProtoBlock, Timestamp}, }; #[derive( @@ -126,6 +126,8 @@ impl PendingCandidate { pub struct Era { /// The consensus protocol instance. pub(crate) consensus: Box>, + /// The scheduled starting time of this era. + pub(crate) start_time: Timestamp, /// The height of this era's first block. pub(crate) start_height: u64, /// Pending candidate blocks, waiting for validation. The boolean is `true` if the proto block @@ -146,6 +148,7 @@ pub struct Era { impl Era { pub(crate) fn new( consensus: Box>, + start_time: Timestamp, start_height: u64, newly_slashed: Vec, slashed: HashSet, @@ -153,6 +156,7 @@ impl Era { ) -> Self { Era { consensus, + start_time, start_height, candidates: Vec::new(), newly_slashed, @@ -262,6 +266,7 @@ where // Destructure self, so we can't miss any fields. let Era { consensus, + start_time, start_height, candidates, newly_slashed, @@ -288,6 +293,7 @@ where }; consensus_heap_size + + start_time.estimate_heap_size() + start_height.estimate_heap_size() + candidates.estimate_heap_size() + newly_slashed.estimate_heap_size() diff --git a/node/src/reactor/validator.rs b/node/src/reactor/validator.rs index 1147afa8ff..998e6b6fa4 100644 --- a/node/src/reactor/validator.rs +++ b/node/src/reactor/validator.rs @@ -433,6 +433,13 @@ impl reactor::Reactor for Reactor { .event(|_| consensus::Event::Shutdown), )); + effects.extend(reactor::wrap_effects( + Event::Consensus, + effect_builder + .immediately() + .event(move |_| consensus::Event::FinishedJoining(now)), + )); + Ok(( Reactor { metrics,