Skip to content

Commit

Permalink
feat(en): Integrate snapshots recovery into EN (#1032)
Browse files Browse the repository at this point in the history
## What ❔

- Integrates snapshot recovery logic into EN.
- Tests recovering an EN from a snapshot in integration tests.

## Why ❔

This is the final step to implement snapshot recovery (still needs to be
thoroughly tested, of course).

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
- [x] Linkcheck has been run via `zk linkcheck`.

---------

Signed-off-by: tomg10 <[email protected]>
Co-authored-by: Alex Ostrovski <[email protected]>
  • Loading branch information
tomg10 and slowli authored Feb 16, 2024
1 parent c411924 commit c7cfaf9
Show file tree
Hide file tree
Showing 43 changed files with 864 additions and 397 deletions.
24 changes: 20 additions & 4 deletions .github/workflows/ci-core-reusable.yml
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,10 @@ jobs:
ci_run sccache --show-stats
ci_run cat /tmp/sccache_log.txt
integration:
name: Integration (consensus=${{ matrix.consensus }})
strategy:
matrix:
consensus: [false,true]
consensus: [false, true]
env:
SERVER_COMPONENTS: "api,tree,eth,state_keeper,housekeeper,basic_witness_input_producer,commitment_generator${{ matrix.consensus && ',consensus' || '' }}"

Expand Down Expand Up @@ -195,6 +196,13 @@ jobs:
- name: Server integration tests
run: ci_run zk test i server

- name: Snapshot recovery test
if: ${{ ! matrix.consensus }}
# We use `yarn` directly because the test launches `zk` commands in both server and EN envs.
# An empty topmost environment helps avoid a mess when redefining env vars shared between both envs
# (e.g., DATABASE_URL).
run: ci_run yarn snapshot-recovery-test snapshot-recovery-test

- name: Fee projection tests
run: ci_run zk test i fees

Expand All @@ -220,6 +228,13 @@ jobs:
if: always()
run: ci_run cat contract_verifier.log || true

- name: Show snapshot-creator.log logs
if: always()
run: ci_run cat core/tests/snapshot-recovery-test/snapshot-creator.log || true
- name: Show snapshot-recovery.log logs
if: always()
run: ci_run cat core/tests/snapshot-recovery-test/snapshot-recovery.log || true

- name: Show revert.log logs
if: always()
run: ci_run cat core/tests/revert-test/revert.log || true
Expand All @@ -235,14 +250,15 @@ jobs:
ci_run cat /tmp/sccache_log.txt
external-node:
name: External node (consensus=${{ matrix.consensus }})
strategy:
matrix:
consensus: [false,true]
consensus: [false, true]
runs-on: [matterlabs-ci-runner]

env:
SERVER_COMPONENTS: "api,tree,eth,state_keeper,housekeeper,basic_witness_input_producer,commitment_generator${{ matrix.consensus && ',consensus' || '' }}"
EXT_NODE_FLAGS: "${{ matrix.consensus && '--enable-consensus' || '' }}"
EXT_NODE_FLAGS: "${{ matrix.consensus && '-- --enable-consensus' || '' }}"

steps:
- name: Checkout code # Checks out the repository under $GITHUB_WORKSPACE, so the job can access it.
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Cargo.lock

/etc/env/*
!/etc/env/base
!/etc/env/dev.toml
!/etc/env/docker.toml
!/etc/env/ext-node.toml
!/etc/env/ext-node-docker.toml
Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion checks-config/era.dic
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ coinbase
FIXME
ASC
DESC
Versioning
versioning
initializer
refactoring
prefetch
Expand Down
8 changes: 4 additions & 4 deletions core/bin/external_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ zksync_utils = { path = "../../lib/utils" }
zksync_state = { path = "../../lib/state" }
zksync_basic_types = { path = "../../lib/basic_types" }
zksync_contracts = { path = "../../lib/contracts" }

zksync_concurrency = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "5b3d383d7a65b0fbe2a771fecf4313f5083be9ae" }
zksync_consensus_roles = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "5b3d383d7a65b0fbe2a771fecf4313f5083be9ae" }

zksync_snapshots_applier = { path = "../../lib/snapshots_applier" }
zksync_object_store = { path="../../lib/object_store" }
prometheus_exporter = { path = "../../lib/prometheus_exporter" }
zksync_health_check = { path = "../../lib/health_check" }
zksync_web3_decl = { path = "../../lib/web3_decl" }
zksync_types = { path = "../../lib/types" }
vlog = { path = "../../lib/vlog" }

zksync_concurrency = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "5b3d383d7a65b0fbe2a771fecf4313f5083be9ae" }
zksync_consensus_roles = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "5b3d383d7a65b0fbe2a771fecf4313f5083be9ae" }
vise = { git = "https://github.com/matter-labs/vise.git", version = "0.1.0", rev = "1c9cc500e92cf9ea052b230e114a6f9cce4fb2c1" }

anyhow = "1.0"
Expand Down
17 changes: 17 additions & 0 deletions core/bin/external_node/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use anyhow::Context;
use serde::Deserialize;
use url::Url;
use zksync_basic_types::{Address, L1ChainId, L2ChainId};
use zksync_config::ObjectStoreConfig;
use zksync_consensus_roles::node;
use zksync_core::{
api_server::{
Expand Down Expand Up @@ -435,6 +436,22 @@ pub(crate) fn read_consensus_config() -> anyhow::Result<consensus::FetcherConfig
})
}

/// Configuration for snapshot recovery. Loaded optionally, only if the corresponding command-line argument
/// is supplied to the EN binary.
#[derive(Debug, Clone)]
pub struct SnapshotsRecoveryConfig {
pub snapshots_object_store: ObjectStoreConfig,
}

pub(crate) fn read_snapshots_recovery_config() -> anyhow::Result<SnapshotsRecoveryConfig> {
let snapshots_object_store = envy::prefixed("EN_SNAPSHOTS_OBJECT_STORE_")
.from_env::<ObjectStoreConfig>()
.context("failed loading snapshot object store config from env variables")?;
Ok(SnapshotsRecoveryConfig {
snapshots_object_store,
})
}

/// External Node Config contains all the configuration required for the EN operation.
/// It is split into three parts: required, optional and remote for easier navigation.
#[derive(Debug, Clone)]
Expand Down
107 changes: 107 additions & 0 deletions core/bin/external_node/src/init.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
//! EN initialization logic.
use anyhow::Context as _;
use zksync_basic_types::{L1BatchNumber, L2ChainId};
use zksync_core::sync_layer::genesis::perform_genesis_if_needed;
use zksync_dal::ConnectionPool;
use zksync_object_store::ObjectStoreFactory;
use zksync_snapshots_applier::{SnapshotsApplierConfig, SnapshotsApplierOutcome};
use zksync_web3_decl::jsonrpsee::http_client::HttpClient;

use crate::config::read_snapshots_recovery_config;

#[derive(Debug)]
enum InitDecision {
/// Perform or check genesis.
Genesis,
/// Perform or check snapshot recovery.
SnapshotRecovery,
}

pub(crate) async fn ensure_storage_initialized(
pool: &ConnectionPool,
main_node_client: &HttpClient,
l2_chain_id: L2ChainId,
consider_snapshot_recovery: bool,
) -> anyhow::Result<()> {
let mut storage = pool.access_storage_tagged("en").await?;
let genesis_l1_batch = storage
.blocks_dal()
.get_l1_batch_header(L1BatchNumber(0))
.await
.context("failed getting genesis batch info")?;
let snapshot_recovery = storage
.snapshot_recovery_dal()
.get_applied_snapshot_status()
.await
.context("failed getting snapshot recovery info")?;
drop(storage);

let decision = match (genesis_l1_batch, snapshot_recovery) {
(Some(batch), Some(snapshot_recovery)) => {
anyhow::bail!(
"Node has both genesis L1 batch: {batch:?} and snapshot recovery information: {snapshot_recovery:?}. \
This is not supported and can be caused by broken snapshot recovery."
);
}
(Some(batch), None) => {
tracing::info!("Node has a genesis L1 batch: {batch:?} and no snapshot recovery info");
InitDecision::Genesis
}
(None, Some(snapshot_recovery)) => {
tracing::info!("Node has no genesis L1 batch and snapshot recovery information: {snapshot_recovery:?}");
InitDecision::SnapshotRecovery
}
(None, None) => {
tracing::info!("Node has neither genesis L1 batch, nor snapshot recovery info");
if consider_snapshot_recovery {
InitDecision::SnapshotRecovery
} else {
InitDecision::Genesis
}
}
};

tracing::info!("Chosen node initialization strategy: {decision:?}");
match decision {
InitDecision::Genesis => {
let mut storage = pool.access_storage_tagged("en").await?;
perform_genesis_if_needed(&mut storage, l2_chain_id, main_node_client)
.await
.context("performing genesis failed")?;
}
InitDecision::SnapshotRecovery => {
anyhow::ensure!(
consider_snapshot_recovery,
"Snapshot recovery is required to proceed, but it is not enabled. Enable by supplying \
`--enable-snapshots-recovery` command-line arg to the node binary, or reset the node storage \
to sync from genesis"
);

tracing::warn!("Proceeding with snapshot recovery. This is an experimental feature; use at your own risk");
let recovery_config = read_snapshots_recovery_config()?;
let blob_store = ObjectStoreFactory::new(recovery_config.snapshots_object_store)
.create_store()
.await;
let outcome = SnapshotsApplierConfig::default()
.run(pool, main_node_client, &blob_store)
.await
.context("snapshot recovery failed")?;
match outcome {
SnapshotsApplierOutcome::Ok => {
tracing::info!("Snapshot recovery is complete");
}
SnapshotsApplierOutcome::NoSnapshotsOnMainNode => {
anyhow::bail!("No snapshots on main node; snapshot recovery is impossible");
}
SnapshotsApplierOutcome::InitializedWithoutSnapshot => {
anyhow::bail!(
"Node contains a non-genesis L1 batch, but no genesis; snapshot recovery is unsafe. \
This should never occur unless the node DB was manually tampered with"
);
}
}
}
}
Ok(())
}
48 changes: 33 additions & 15 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ use zksync_core::{
},
sync_layer::{
batch_status_updater::BatchStatusUpdater, external_io::ExternalIO,
fetcher::MainNodeFetcher, genesis::perform_genesis_if_needed, ActionQueue, MainNodeClient,
SyncState,
fetcher::MainNodeFetcher, ActionQueue, MainNodeClient, SyncState,
},
};
use zksync_dal::{healthcheck::ConnectionPoolHealthCheck, ConnectionPool};
Expand All @@ -40,13 +39,13 @@ use zksync_state::PostgresStorageCaches;
use zksync_storage::RocksDB;
use zksync_utils::wait_for_tasks::wait_for_tasks;

use crate::{config::ExternalNodeConfig, init::ensure_storage_initialized};

mod config;
mod init;
mod metrics;

const RELEASE_MANIFEST: &str =
std::include_str!("../../../../.github/release-please/manifest.json");

use crate::config::ExternalNodeConfig;
const RELEASE_MANIFEST: &str = include_str!("../../../../.github/release-please/manifest.json");

/// Creates the state keeper configured to work in the external node mode.
#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -387,13 +386,24 @@ async fn shutdown_components(
healthcheck_handle.stop().await;
}

/// External node for zkSync Era.
#[derive(Debug, Parser)]
#[structopt(author = "Matter Labs", version)]
#[command(author = "Matter Labs", version)]
struct Cli {
/// Revert the pending L1 batch and exit.
#[arg(long)]
revert_pending_l1_batch: bool,
/// Enables consensus-based syncing instead of JSON-RPC based one. This is an experimental and incomplete feature;
/// do not use unless you know what you're doing.
#[arg(long)]
enable_consensus: bool,
/// Enables application-level snapshot recovery. Required to start a node that was recovered from a snapshot,
/// or to initialize a node from a snapshot. Has no effect if a node that was initialized from a Postgres dump
/// or was synced from genesis.
///
/// This is an experimental and incomplete feature; do not use unless you know what you're doing.
#[arg(long, conflicts_with = "enable_consensus")]
enable_snapshots_recovery: bool,
}

#[tokio::main]
Expand Down Expand Up @@ -428,13 +438,22 @@ async fn main() -> anyhow::Result<()> {
.await
.context("Failed to load external node config")?;
if opt.enable_consensus {
// This is more of a sanity check; the mutual exclusion of `enable_consensus` and `enable_snapshots_recovery`
// should be ensured by `clap`.
anyhow::ensure!(
!opt.enable_snapshots_recovery,
"Consensus logic does not support snapshot recovery yet"
);
config.consensus =
Some(config::read_consensus_config().context("read_consensus_config()")?);
}

let main_node_url = config
.required
.main_node_url()
.context("Main node URL is incorrect")?;
let main_node_client = <dyn MainNodeClient>::json_rpc(&main_node_url)
.context("Failed creating JSON-RPC client for main node")?;

if let Some(threshold) = config.optional.slow_query_threshold() {
ConnectionPool::global_config().set_slow_query_threshold(threshold)?;
Expand All @@ -450,6 +469,7 @@ async fn main() -> anyhow::Result<()> {
.build()
.await
.context("failed to build a connection_pool")?;

if opt.revert_pending_l1_batch {
tracing::info!("Rolling pending L1 batch back..");
let reverter = BlockReverter::new(
Expand Down Expand Up @@ -486,16 +506,14 @@ async fn main() -> anyhow::Result<()> {
tracing::info!("Started the external node");
tracing::info!("Main node URL is: {}", main_node_url);

// Make sure that genesis is performed.
let main_node_client = <dyn MainNodeClient>::json_rpc(&main_node_url)
.context("Failed creating JSON-RPC client for main node")?;
perform_genesis_if_needed(
&mut connection_pool.access_storage().await.unwrap(),
config.remote.l2_chain_id,
// Make sure that the node storage is initialized either via genesis or snapshot recovery.
ensure_storage_initialized(
&connection_pool,
&main_node_client,
config.remote.l2_chain_id,
opt.enable_snapshots_recovery,
)
.await
.context("Performing genesis failed")?;
.await?;

let (task_handles, stop_sender, health_check_handle, stop_receiver) =
init_tasks(config.clone(), connection_pool.clone())
Expand Down
4 changes: 4 additions & 0 deletions core/bin/snapshots_creator/src/creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,10 @@ impl SnapshotCreator {
config: SnapshotsCreatorConfig,
min_chunk_count: u64,
) -> anyhow::Result<()> {
tracing::info!(
"Starting snapshot creator with object store {:?} and config {config:?}",
self.blob_store
);
let latency = METRICS.snapshot_generation_duration.start();

let Some(progress) = self
Expand Down
Loading

0 comments on commit c7cfaf9

Please sign in to comment.