diff --git a/Cargo.lock b/Cargo.lock index 65407b583b..cd55e8dc94 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -537,6 +537,14 @@ name = "constant_time_eq" version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "conv" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "custom_derive 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "cookie" version = "0.12.0" @@ -660,6 +668,11 @@ dependencies = [ "winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "custom_derive" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "derive_more" version = "0.14.1" @@ -1592,6 +1605,17 @@ dependencies = [ "version_check 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "ntp" +version = "0.5.0" +source = "git+https://github.com/JeffBelgum/ntp/#d9c50ad307a25151a9d3828c8f46387c4020de7b" +dependencies = [ + "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "conv 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", + "custom_derive 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "num-derive" version = "0.3.0" @@ -3433,6 +3457,7 @@ version = "0.3.2" dependencies = [ "bech32 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "chrono 0.4.9 (registry+https://github.com/rust-lang/crates.io-index)", "exonum-build 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", "failure 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3567,6 +3592,11 @@ version = "0.3.2" dependencies = [ "chrono 0.4.9 (registry+https://github.com/rust-lang/crates.io-index)", "failure 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "ntp 0.5.0 (git+https://github.com/JeffBelgum/ntp/)", + "serde 1.0.101 (registry+https://github.com/rust-lang/crates.io-index)", + "time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -3694,6 +3724,7 @@ dependencies = [ "checksum clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5067f5bb2d80ef5d68b4c87db81601f0b75bca627bc2ef76b141d7b846a3c6d9" "checksum cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f" "checksum constant_time_eq 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "995a44c877f9212528ccc74b21a232f66ad69001e40ede5bcee2ac9ef2657120" +"checksum conv 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "78ff10625fd0ac447827aa30ea8b861fead473bb60aeb73af6c1c58caf0d1299" "checksum cookie 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "888604f00b3db336d2af898ec3c1d5d0ddf5e6d462220f2ededc33a87ac4bbd5" "checksum cookie_store 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "46750b3f362965f197996c4448e4a0935e791bf7d6631bfce9ee0af3d24c919c" "checksum copyless 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "6ff9c56c9fb2a49c05ef0e431485a22400af20d33226dc0764d891d09e724127" @@ -3708,6 +3739,7 @@ dependencies = [ "checksum crunchy 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7" "checksum crypto-mac 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "4434400df11d95d556bac068ddfedd482915eb18fe8bea89bc80b6e4b1c179e5" "checksum ctrlc 3.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c7dfd2d8b4c82121dfdff120f818e09fc4380b0b7e17a742081a89b94853e87f" +"checksum custom_derive 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "ef8ae57c4978a2acd8b869ce6b9ca1dfe817bff704c220209fdef2c0b75a01b9" "checksum derive_more 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)" = "6d944ac6003ed268757ef1ee686753b57efc5fcf0ebe7b64c9fc81e7e32ff839" "checksum derive_more 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7a141330240c921ec6d074a3e188a7c7ef95668bb95e7d44fa0e5778ec2a7afe" "checksum digest 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f3d0c8c8752312f9713efd397ff63acb9f85585afbf179282e720e7704954dd5" @@ -3812,6 +3844,7 @@ dependencies = [ "checksum nix 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)" = "6c722bee1037d430d0f8e687bbdbf222f27cc6e4e68d5caf630857bb2b6dbdce" "checksum nodrop 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)" = "72ef4a56884ca558e5ddb05a1d1e7e1bfd9a68d9ed024c21704cc98872dae1bb" "checksum nom 4.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2ad2a91a8e869eeb30b9cb3119ae87773a8f4ae617f41b1eb9c154b2905f7bd6" +"checksum ntp 0.5.0 (git+https://github.com/JeffBelgum/ntp/)" = "" "checksum num-derive 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "0c8b15b261814f992e33760b1fca9fe8b693d8a65299f20c9901688636cfb746" "checksum num-integer 0.1.41 (registry+https://github.com/rust-lang/crates.io-index)" = "b85e541ef8255f6cf42bbfe4ef361305c6c135d10919ecc26126c4e5ae94bc09" "checksum num-traits 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "6ba9a427cfca2be13aa6f6403b0b7e7368fe982bfa16fccc450ce74c46cd9b32" diff --git a/config/src/config.rs b/config/src/config.rs index 6a33a375f9..99e472769d 100644 --- a/config/src/config.rs +++ b/config/src/config.rs @@ -103,6 +103,11 @@ pub struct Config { #[partial_struct(ty = "PartialLog")] #[partial_struct(serde(default))] pub log: Log, + + /// Ntp-related configuration + #[partial_struct(ty = "PartialNtp")] + #[partial_struct(serde(default))] + pub ntp: Ntp, } /// Log-specific configuration. @@ -276,6 +281,17 @@ pub struct Mining { pub enabled: bool, } +/// Ntp-related configuration +#[derive(PartialStruct, Debug, Clone, PartialEq)] +#[partial_struct(derive(Deserialize, Default, Debug, Clone, PartialEq))] +pub struct Ntp { + /// Period that indicate the validity of a ntp timestamp + pub update_period: i64, + + /// Server to query ntp information + pub server: String, +} + impl Config { pub fn from_partial(config: &PartialConfig) -> Self { let defaults: &dyn Defaults = match config.environment { @@ -317,6 +333,7 @@ impl Config { mining: Mining::from_partial(&config.mining, defaults), wallet: Wallet::from_partial(&config.wallet, defaults), rocksdb: Rocksdb::from_partial(&config.rocksdb, defaults), + ntp: Ntp::from_partial(&config.ntp, defaults), } } } @@ -476,6 +493,21 @@ impl Mining { } } +impl Ntp { + pub fn from_partial(config: &PartialNtp, defaults: &dyn Defaults) -> Self { + Ntp { + update_period: config + .update_period + .to_owned() + .unwrap_or_else(|| defaults.ntp_update_period()), + server: config + .server + .clone() + .unwrap_or_else(|| defaults.ntp_server()), + } + } +} + /// Wallet-specific configuration. #[derive(PartialStruct, Serialize, Debug, Clone, PartialEq)] #[partial_struct(derive(Deserialize, Default, Debug, Clone, PartialEq))] diff --git a/config/src/defaults.rs b/config/src/defaults.rs index 60a98aee43..d8714762ab 100644 --- a/config/src/defaults.rs +++ b/config/src/defaults.rs @@ -204,6 +204,14 @@ pub trait Defaults { fn rocksdb_enable_statistics(&self) -> bool { false } + + fn ntp_update_period(&self) -> i64 { + 600 + } + + fn ntp_server(&self) -> String { + "0.pool.ntp.org:123".to_string() + } } /// Struct that will implement all the mainnet defaults diff --git a/data_structures/Cargo.toml b/data_structures/Cargo.toml index 5262f52645..29e019f7e9 100644 --- a/data_structures/Cargo.toml +++ b/data_structures/Cargo.toml @@ -20,6 +20,7 @@ secp256k1 = "0.15.5" vrf = "0.2.2" witnet_reputation = { path = "../reputation", features = ["serde"] } witnet_protected = { path = "../protected" } +chrono = "0.4.9" [dependencies.partial_struct] path = "../partial_struct" diff --git a/node/src/actors/epoch_manager/mod.rs b/node/src/actors/epoch_manager/mod.rs index 37ba9a8631..8d1b4e9e88 100644 --- a/node/src/actors/epoch_manager/mod.rs +++ b/node/src/actors/epoch_manager/mod.rs @@ -11,7 +11,10 @@ use witnet_data_structures::{ chain::{Epoch, EpochConstants}, error::EpochCalculationError, }; -use witnet_util::timestamp::{duration_between_timestamps, get_timestamp, get_timestamp_nanos}; +use witnet_util::timestamp::{ + duration_between_timestamps, get_global_timestamp, get_timestamp, get_timestamp_nanos, + set_global_timestamp, update_global_timestamp, +}; use crate::actors::messages::{EpochNotification, EpochResult}; use crate::config_mngr; @@ -126,6 +129,13 @@ impl EpochManager { // Start checkpoint monitoring process actor.checkpoint_monitor(ctx); + // Start ntp update process + let mut global_ts = get_global_timestamp(); + global_ts.server = config.ntp.server.clone(); + set_global_timestamp(global_ts); + update_global_timestamp(); + actor.update_ntp_timestamp(ctx, config.ntp.update_period); + fut::ok(()) }) .map_err(|err, _, _| { @@ -165,9 +175,14 @@ impl EpochManager { Err(_) => return, }; + let last_checked_epoch = act.last_checked_epoch.unwrap_or(0); + // Send message to actors which subscribed to all epochs for subscription in &mut act.subscriptions_all { - subscription.send_notification(current_epoch); + if current_epoch > last_checked_epoch { + // Only send new epoch notification + subscription.send_notification(current_epoch); + } } // Get all the checkpoints that had some subscription but were skipped for some @@ -175,7 +190,7 @@ impl EpochManager { // resources to execute in time...) let epoch_checkpoints: Vec<_> = act .subscriptions_epoch - .range(act.last_checked_epoch.unwrap_or(0)..=current_epoch) + .range(last_checked_epoch..=current_epoch) .map(|(k, _v)| *k) .collect(); @@ -209,6 +224,19 @@ impl EpochManager { }, ); } + + /// Method to monitor checkpoints and execute some actions on each + fn update_ntp_timestamp(&self, ctx: &mut Context, period: i64) { + // Wait until next checkpoint to execute the periodic function + ctx.run_later(Duration::from_secs(period as u64), move |act, ctx| { + // Get current epoch + + update_global_timestamp(); + + // Reschedule update ntp process + act.update_ntp_timestamp(ctx, period); + }); + } } /// Trait that must follow all notifications that will be sent back to subscriber actors diff --git a/util/Cargo.toml b/util/Cargo.toml index c31f27d67a..ed91fc3a3b 100644 --- a/util/Cargo.toml +++ b/util/Cargo.toml @@ -8,3 +8,8 @@ workspace = ".." [dependencies] chrono = "0.4.9" failure = "0.1.6" +lazy_static = "1.4.0" +log = "0.4.8" +ntp = {git="https://github.com/JeffBelgum/ntp/"} +time = "0.1.42" +serde = { version = "1.0.101", features = ["derive"] } diff --git a/util/src/timestamp.rs b/util/src/timestamp.rs index d811335559..b4a2ed543c 100644 --- a/util/src/timestamp.rs +++ b/util/src/timestamp.rs @@ -1,13 +1,109 @@ -use chrono::prelude::*; +use chrono::{prelude::*, TimeZone}; +use lazy_static::lazy_static; +use ntp; +use serde::{Deserialize, Serialize}; +use std::sync::RwLock; use std::time::Duration; -/// Function to get timestamp from system as UTC Unix timestamp, seconds since Unix epoch -pub fn get_timestamp() -> i64 { +/// NTP Timestamp information +#[derive(Debug, Default, Serialize, Deserialize, PartialEq, Clone)] +pub struct TimestampNTP { + /// Last UTC timestamp + pub last_utc: (i64, u32), + /// Difference between NTP and system timestamp + pub ntp_diff: Duration, + /// Flag to indicate if NTP is bigger or smaller + /// than system timestamp + pub bigger: bool, + /// Address to require NTP timestamp information + pub server: String, +} + +lazy_static! { + static ref NTP_TS: RwLock = RwLock::new(TimestampNTP { + server: String::from("0.pool.ntp.org:123"), + ..TimestampNTP::default() + }); +} + +/// Get NTP timestamp +pub fn get_global_timestamp() -> TimestampNTP { + NTP_TS.read().unwrap().clone() +} + +/// Set NTP timestamp +pub fn set_global_timestamp(ts: TimestampNTP) { + match NTP_TS.write() { + Ok(mut x) => { + if ts.last_utc.0 < x.last_utc.0 { + log::warn!( + "Set new timestamp from the past. New is {}, old is {}", + ts.last_utc.0, + x.last_utc.0 + ); + } else { + log::debug!("Set new timestamp"); + } + *x = ts; + } + Err(e) => { + log::error!("Failed to set timestamp: {}", e); + } + } +} + +/// Update NTP timestamp +pub fn update_global_timestamp() { // Get UTC current datetime let utc: DateTime = Utc::now(); - // Return number of non-leap seconds since Unix epoch - utc.timestamp() + let utc_secs = utc.timestamp(); + let utc_subsec_nanos = utc.timestamp_subsec_nanos(); + let utc = (utc_secs, utc_subsec_nanos); + + let mut old_ts = get_global_timestamp(); + let ntp = get_timestamp_ntp(old_ts.server.as_str()); + + match ntp { + Some(ntp) => { + if let Some(diff) = duration_between_timestamps(utc, ntp) { + old_ts.last_utc = utc; + old_ts.ntp_diff = diff; + old_ts.bigger = true; + } else { + let diff = duration_between_timestamps(ntp, utc).unwrap(); + old_ts.last_utc = utc; + old_ts.ntp_diff = diff; + old_ts.bigger = false; + } + } + + None => { + old_ts.last_utc = utc; + } + } + + set_global_timestamp(old_ts.clone()); +} + +fn local_time(timestamp: ntp::protocol::TimestampFormat) -> chrono::DateTime { + let unix_time = ntp::unix_time::Instant::from(timestamp); + chrono::Local.timestamp(unix_time.secs(), unix_time.subsec_nanos() as _) +} +/// Get NTP timestamp from an addr specified +pub fn get_timestamp_ntp(addr: &str) -> Option<(i64, u32)> { + ntp::request(addr) + .map(|p| { + let ts = local_time(p.receive_timestamp); + + (ts.timestamp(), ts.timestamp_subsec_nanos()) + }) + .ok() +} + +/// Function to get timestamp from system as UTC Unix timestamp, seconds since Unix epoch +pub fn get_timestamp() -> (i64) { + get_timestamp_nanos().0 } /// Function to get timestamp from system as UTC Unix timestamp, seconds and nanoseconds since Unix epoch @@ -15,8 +111,22 @@ pub fn get_timestamp_nanos() -> (i64, u32) { // Get UTC current datetime let utc: DateTime = Utc::now(); - // Return number of non-leap seconds since Unix epoch and the number of nanoseconds since the last second boundary - (utc.timestamp(), utc.timestamp_subsec_nanos()) + let utc_secs = utc.timestamp(); + let utc_subsec_nanos = utc.timestamp_subsec_nanos(); + + let old_ts = get_global_timestamp(); + + let utc_dur = Duration::new(utc_secs as u64, utc_subsec_nanos); + let result = if old_ts.bigger { + utc_dur.checked_add(old_ts.ntp_diff) + } else { + utc_dur.checked_sub(old_ts.ntp_diff) + }; + + match result { + Some(x) => (x.as_secs() as i64, x.subsec_nanos()), + None => panic!("Error: Overflow in timestamp"), + } } /// Duration needed to wait from now until the target timestamp