Skip to content

Commit

Permalink
Move static OnceCell ChainIdentifier into AuthorityState (#20680)
Browse files Browse the repository at this point in the history
## Description 

As part of fixing flakey graphql tests, I observed an inconsistency in
how the fullnode rpc in TestCluster was reporting chain_identifier. The
nodes wrote `0.chk` to file with a particular checkpoint
digest->chain_id, but the rpc would report a different chain_id.
Conspicuously, the chain_id was the same across tests.

The issue stemmed from using a global static OnceCell<ChainIdentifier>.
This global state meant that if the
`test_simple_client_validator_cluster` test did not set the `chain_id`,
then it would fail. I suppose there might be some production issues as
well, for example if someone were to run multiple nodes in the same
process to different networks.

To address, I moved chain_identifier into AuthorityState, and modified
SuiNode to explicitly pass the chain_identifier to components that need
it, which is basically just StateSnapshotUploader today.

By eagerly setting `chain_identifier`, we can also simplify some logic
around existing callsites

## Test plan 

Existing tests pass

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
wlmyng authored Jan 3, 2025
1 parent 50c085a commit 0ae930c
Show file tree
Hide file tree
Showing 10 changed files with 39 additions and 58 deletions.
25 changes: 8 additions & 17 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use sui_types::layout_resolver::LayoutResolver;
use sui_types::messages_consensus::{AuthorityCapabilitiesV1, AuthorityCapabilitiesV2};
use sui_types::object::bounded_visitor::BoundedVisitor;
use sui_types::transaction_executor::SimulateTransactionResult;
use tap::{TapFallible, TapOptional};
use tap::TapFallible;
use tokio::sync::mpsc::unbounded_channel;
use tokio::sync::{mpsc, oneshot, RwLock};
use tokio::task::JoinHandle;
Expand All @@ -70,7 +70,6 @@ use mysten_metrics::{monitored_scope, spawn_monitored_task};
use crate::jsonrpc_index::IndexStore;
use crate::jsonrpc_index::{CoinInfo, ObjectIndexChanges};
use mysten_common::debug_fatal;
use once_cell::sync::OnceCell;
use shared_crypto::intent::{AppId, Intent, IntentMessage, IntentScope, IntentVersion};
use sui_archival::reader::ArchiveReaderBalancer;
use sui_config::genesis::Genesis;
Expand Down Expand Up @@ -216,8 +215,6 @@ pub mod transaction_deferral;
pub(crate) mod authority_store;
pub mod backpressure;

pub static CHAIN_IDENTIFIER: OnceCell<ChainIdentifier> = OnceCell::new();

/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
pub struct AuthorityMetrics {
tx_orders: IntCounter,
Expand Down Expand Up @@ -820,6 +817,9 @@ pub struct AuthorityState {
pub overload_info: AuthorityOverloadInfo,

pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,

/// The chain identifier is derived from the digest of the genesis checkpoint.
chain_identifier: ChainIdentifier,
}

/// The authority state encapsulates all state, drives execution, and ensures safety.
Expand Down Expand Up @@ -2854,6 +2854,7 @@ impl AuthorityState {
indirect_objects_threshold: usize,
archive_readers: ArchiveReaderBalancer,
validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
chain_identifier: ChainIdentifier,
) -> Arc<Self> {
Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());

Expand Down Expand Up @@ -2910,6 +2911,7 @@ impl AuthorityState {
config,
overload_info: AuthorityOverloadInfo::default(),
validator_tx_finalizer,
chain_identifier,
});

// Start a task to execute ready certificates.
Expand Down Expand Up @@ -3470,19 +3472,8 @@ impl AuthorityState {
}

/// Chain Identifier is the digest of the genesis checkpoint.
pub fn get_chain_identifier(&self) -> Option<ChainIdentifier> {
if let Some(digest) = CHAIN_IDENTIFIER.get() {
return Some(*digest);
}

let checkpoint = self
.get_checkpoint_by_sequence_number(0)
.tap_err(|e| error!("Failed to get genesis checkpoint: {:?}", e))
.ok()?
.tap_none(|| error!("Genesis checkpoint is missing from DB"))?;
// It's ok if the value is already set due to data races.
let _ = CHAIN_IDENTIFIER.set(ChainIdentifier::from(*checkpoint.digest()));
Some(ChainIdentifier::from(*checkpoint.digest()))
pub fn get_chain_identifier(&self) -> ChainIdentifier {
self.chain_identifier
}

#[instrument(level = "trace", skip_all)]
Expand Down
3 changes: 3 additions & 0 deletions crates/sui-core/src/authority/test_authority_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,8 @@ impl<'a> TestAuthorityBuilder<'a> {
config.authority_overload_config = authority_overload_config;
config.authority_store_pruning_config = pruning_config;

let chain_identifier = ChainIdentifier::from(*genesis.checkpoint().digest());

let state = AuthorityState::new(
name,
secret,
Expand All @@ -329,6 +331,7 @@ impl<'a> TestAuthorityBuilder<'a> {
usize::MAX,
ArchiveReaderBalancer::default(),
None,
chain_identifier,
)
.await;

Expand Down
5 changes: 1 addition & 4 deletions crates/sui-core/src/execution_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,7 @@ pub async fn execution_process(
return;
};

state
.get_chain_identifier()
.map(|chain_id| chain_id.chain())
== Some(Chain::Mainnet)
state.get_chain_identifier().chain() == Chain::Mainnet
};

// Loop whenever there is a signal that a new transactions is ready to process.
Expand Down
8 changes: 2 additions & 6 deletions crates/sui-core/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,12 +462,8 @@ impl RpcStateReader for RestReadStore {
}
}

fn get_chain_identifier(
&self,
) -> sui_types::storage::error::Result<sui_types::digests::ChainIdentifier> {
self.state
.get_chain_identifier()
.ok_or_else(|| StorageError::missing("unable to query chain identifier"))
fn get_chain_identifier(&self) -> Result<sui_types::digests::ChainIdentifier> {
Ok(self.state.get_chain_identifier())
}

fn indexes(&self) -> Option<&dyn RpcIndexes> {
Expand Down
5 changes: 1 addition & 4 deletions crates/sui-json-rpc/src/authority_state.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use anyhow::anyhow;
use arc_swap::Guard;
use async_trait::async_trait;
use move_core_types::language_storage::TypeTag;
Expand Down Expand Up @@ -531,9 +530,7 @@ impl StateRead for AuthorityState {
}

fn get_chain_identifier(&self) -> StateReadResult<ChainIdentifier> {
Ok(self
.get_chain_identifier()
.ok_or(anyhow!("Chain identifier not found"))?)
Ok(self.get_chain_identifier())
}
}

Expand Down
24 changes: 12 additions & 12 deletions crates/sui-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use sui_core::authority::authority_store_tables::AuthorityPerpetualTablesOptions
use sui_core::authority::backpressure::BackpressureManager;
use sui_core::authority::epoch_start_configuration::EpochFlag;
use sui_core::authority::RandomnessRoundReceiver;
use sui_core::authority::CHAIN_IDENTIFIER;
use sui_core::consensus_adapter::ConsensusClient;
use sui_core::consensus_manager::UpdatableConsensusClient;
use sui_core::epoch::randomness::RandomnessManager;
Expand Down Expand Up @@ -608,8 +607,6 @@ impl SuiNode {
};

let chain_identifier = ChainIdentifier::from(*genesis.checkpoint().digest());
// It's ok if the value is already set due to data races.
let _ = CHAIN_IDENTIFIER.set(chain_identifier);

info!("creating archive reader");
// Create network
Expand Down Expand Up @@ -659,8 +656,12 @@ impl SuiNode {

info!("start snapshot upload");
// Start uploading state snapshot to remote store
let state_snapshot_handle =
Self::start_state_snapshot(&config, &prometheus_registry, checkpoint_store.clone())?;
let state_snapshot_handle = Self::start_state_snapshot(
&config,
&prometheus_registry,
checkpoint_store.clone(),
chain_identifier,
)?;

// Start uploading db checkpoints to remote store
info!("start db checkpoint");
Expand Down Expand Up @@ -709,6 +710,7 @@ impl SuiNode {
config.indirect_objects_threshold,
archive_readers,
validator_tx_finalizer,
chain_identifier,
)
.await;
// ensure genesis txn was executed
Expand Down Expand Up @@ -961,6 +963,7 @@ impl SuiNode {
config: &NodeConfig,
prometheus_registry: &Registry,
checkpoint_store: Arc<CheckpointStore>,
chain_identifier: ChainIdentifier,
) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
let snapshot_uploader = StateSnapshotUploader::new(
Expand All @@ -970,6 +973,7 @@ impl SuiNode {
60,
prometheus_registry,
checkpoint_store,
chain_identifier,
)?;
Ok(Some(snapshot_uploader.start()))
} else {
Expand Down Expand Up @@ -1989,8 +1993,8 @@ fn build_kv_store(
)
})?;

let network_str = match state.get_chain_identifier().map(|c| c.chain()) {
Some(Chain::Mainnet) => "/mainnet",
let network_str = match state.get_chain_identifier().chain() {
Chain::Mainnet => "/mainnet",
_ => {
info!("using local db only for kv store");
return Ok(Arc::new(db_store));
Expand Down Expand Up @@ -2078,11 +2082,7 @@ pub async fn build_http_server(
reverse_registry_id,
)
} else {
match CHAIN_IDENTIFIER
.get()
.expect("chain_id should be initialized")
.chain()
{
match state.get_chain_identifier().chain() {
Chain::Mainnet => sui_json_rpc::name_service::NameServiceConfig::mainnet(),
Chain::Testnet => sui_json_rpc::name_service::NameServiceConfig::testnet(),
Chain::Unknown => sui_json_rpc::name_service::NameServiceConfig::default(),
Expand Down
6 changes: 1 addition & 5 deletions crates/sui-node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,7 @@ fn main() {
let node_once_cell_clone = node_once_cell.clone();
runtimes.metrics.spawn(async move {
let node = node_once_cell_clone.get().await;
let chain_identifier = match node.state().get_chain_identifier() {
Some(chain_identifier) => chain_identifier.to_string(),
None => "unknown".to_string(),
};

let chain_identifier = node.state().get_chain_identifier().to_string();
info!("Sui chain identifier: {chain_identifier}");
prometheus_registry
.register(mysten_metrics::uptime_metric(
Expand Down
8 changes: 7 additions & 1 deletion crates/sui-snapshot/src/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use sui_storage::object_store::util::{
run_manifest_update_loop,
};
use sui_storage::FileCompression;
use sui_types::digests::ChainIdentifier;
use sui_types::messages_checkpoint::CheckpointCommitment::ECMHLiveObjectSetDigest;
use tracing::{debug, error, info};

Expand Down Expand Up @@ -66,6 +67,9 @@ pub struct StateSnapshotUploader {
/// Time interval to check for presence of new db checkpoint
interval: Duration,
metrics: Arc<StateSnapshotUploaderMetrics>,
/// The chain identifier is derived from the genesis checkpoint and used to identify the
/// network.
chain_identifier: ChainIdentifier,
}

impl StateSnapshotUploader {
Expand All @@ -76,6 +80,7 @@ impl StateSnapshotUploader {
interval_s: u64,
registry: &Registry,
checkpoint_store: Arc<CheckpointStore>,
chain_identifier: ChainIdentifier,
) -> Result<Arc<Self>> {
let db_checkpoint_store_config = ObjectStoreConfig {
object_store: Some(ObjectStoreType::File),
Expand All @@ -96,6 +101,7 @@ impl StateSnapshotUploader {
snapshot_store: snapshot_store_config.make()?,
interval: Duration::from_secs(interval_s),
metrics: StateSnapshotUploaderMetrics::new(registry),
chain_identifier,
}))
}

Expand Down Expand Up @@ -140,7 +146,7 @@ impl StateSnapshotUploader {
.expect("Expected at least one commitment")
.clone();
state_snapshot_writer
.write(*epoch, db, state_hash_commitment)
.write(*epoch, db, state_hash_commitment, self.chain_identifier)
.await?;
info!("State snapshot creation successful for epoch: {}", *epoch);
// Drop marker in the output directory that upload completed successfully
Expand Down
8 changes: 3 additions & 5 deletions crates/sui-snapshot/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
ManifestV1, FILE_MAX_BYTES, MAGIC_BYTES, MANIFEST_FILE_MAGIC, OBJECT_FILE_MAGIC,
OBJECT_REF_BYTES, REFERENCE_FILE_MAGIC, SEQUENCE_NUM_BYTES,
};
use anyhow::{anyhow, Context, Result};
use anyhow::{Context, Result};
use byteorder::{BigEndian, ByteOrder};
use fastcrypto::hash::MultisetHash;
use futures::StreamExt;
Expand All @@ -24,13 +24,13 @@ use std::path::PathBuf;
use std::sync::Arc;
use sui_config::object_storage_config::ObjectStoreConfig;
use sui_core::authority::authority_store_tables::{AuthorityPerpetualTables, LiveObject};
use sui_core::authority::CHAIN_IDENTIFIER;
use sui_core::state_accumulator::StateAccumulator;
use sui_protocol_config::{ProtocolConfig, ProtocolVersion};
use sui_storage::blob::{Blob, BlobEncoding, BLOB_ENCODING_BYTES};
use sui_storage::object_store::util::{copy_file, delete_recursively, path_to_filesystem};
use sui_types::accumulator::Accumulator;
use sui_types::base_types::{ObjectID, ObjectRef};
use sui_types::digests::ChainIdentifier;
use sui_types::messages_checkpoint::ECMHLiveObjectSetDigest;
use sui_types::sui_system_state::get_sui_system_state;
use sui_types::sui_system_state::SuiSystemStateTrait;
Expand Down Expand Up @@ -260,13 +260,11 @@ impl StateSnapshotWriterV1 {
epoch: u64,
perpetual_db: Arc<AuthorityPerpetualTables>,
root_state_hash: ECMHLiveObjectSetDigest,
chain_identifier: ChainIdentifier,
) -> Result<()> {
let system_state_object = get_sui_system_state(&perpetual_db)?;

let protocol_version = system_state_object.protocol_version();
let chain_identifier = CHAIN_IDENTIFIER
.get()
.ok_or(anyhow!("No chain identifier found"))?;
let protocol_config = ProtocolConfig::get_for_version(
ProtocolVersion::new(protocol_version),
chain_identifier.chain(),
Expand Down
5 changes: 1 addition & 4 deletions crates/sui-telemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,7 @@ struct IpResponse {
pub async fn send_telemetry_event(state: Arc<AuthorityState>, is_validator: bool) {
let git_rev = env!("CARGO_PKG_VERSION").to_string();
let ip_address = get_ip().await;
let chain_identifier = match state.get_chain_identifier() {
Some(chain_identifier) => chain_identifier.to_string(),
None => "Unknown".to_string(),
};
let chain_identifier = state.get_chain_identifier().to_string();
let since_the_epoch = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Now should be later than epoch!");
Expand Down

0 comments on commit 0ae930c

Please sign in to comment.