Skip to content

Commit

Permalink
feat(utils): use NTP timestamps to avoid sync problems
Browse files Browse the repository at this point in the history
  • Loading branch information
lrubiorod committed Nov 14, 2019
1 parent 8cba73b commit 39f428d
Show file tree
Hide file tree
Showing 7 changed files with 227 additions and 10 deletions.
33 changes: 33 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ pub struct Config {
#[partial_struct(ty = "PartialLog")]
#[partial_struct(serde(default))]
pub log: Log,

/// Ntp-related configuration
#[partial_struct(ty = "PartialNtp")]
#[partial_struct(serde(default))]
pub ntp: Ntp,
}

/// Log-specific configuration.
Expand Down Expand Up @@ -276,6 +281,17 @@ pub struct Mining {
pub enabled: bool,
}

/// Ntp-related configuration
#[derive(PartialStruct, Debug, Clone, PartialEq)]
#[partial_struct(derive(Deserialize, Default, Debug, Clone, PartialEq))]
pub struct Ntp {
/// Period that indicate the validity of a ntp timestamp
pub update_period: i64,

/// Server to query ntp information
pub server: String,
}

impl Config {
pub fn from_partial(config: &PartialConfig) -> Self {
let defaults: &dyn Defaults = match config.environment {
Expand Down Expand Up @@ -317,6 +333,7 @@ impl Config {
mining: Mining::from_partial(&config.mining, defaults),
wallet: Wallet::from_partial(&config.wallet, defaults),
rocksdb: Rocksdb::from_partial(&config.rocksdb, defaults),
ntp: Ntp::from_partial(&config.ntp, defaults),
}
}
}
Expand Down Expand Up @@ -476,6 +493,21 @@ impl Mining {
}
}

impl Ntp {
pub fn from_partial(config: &PartialNtp, defaults: &dyn Defaults) -> Self {
Ntp {
update_period: config
.update_period
.to_owned()
.unwrap_or_else(|| defaults.ntp_update_period()),
server: config
.server
.clone()
.unwrap_or_else(|| defaults.ntp_server()),
}
}
}

/// Wallet-specific configuration.
#[derive(PartialStruct, Serialize, Debug, Clone, PartialEq)]
#[partial_struct(derive(Deserialize, Default, Debug, Clone, PartialEq))]
Expand Down
8 changes: 8 additions & 0 deletions config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,14 @@ pub trait Defaults {
fn rocksdb_enable_statistics(&self) -> bool {
false
}

fn ntp_update_period(&self) -> i64 {
600
}

fn ntp_server(&self) -> String {
"0.pool.ntp.org:123".to_string()
}
}

/// Struct that will implement all the mainnet defaults
Expand Down
1 change: 1 addition & 0 deletions data_structures/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ secp256k1 = "0.15.5"
vrf = "0.2.2"
witnet_reputation = { path = "../reputation", features = ["serde"] }
witnet_protected = { path = "../protected" }
chrono = "0.4.9"

[dependencies.partial_struct]
path = "../partial_struct"
Expand Down
34 changes: 31 additions & 3 deletions node/src/actors/epoch_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use witnet_data_structures::{
chain::{Epoch, EpochConstants},
error::EpochCalculationError,
};
use witnet_util::timestamp::{duration_between_timestamps, get_timestamp, get_timestamp_nanos};
use witnet_util::timestamp::{
duration_between_timestamps, get_global_timestamp, get_timestamp, get_timestamp_nanos,
set_global_timestamp, update_global_timestamp,
};

use crate::actors::messages::{EpochNotification, EpochResult};
use crate::config_mngr;
Expand Down Expand Up @@ -126,6 +129,13 @@ impl EpochManager {
// Start checkpoint monitoring process
actor.checkpoint_monitor(ctx);

// Start ntp update process
let mut global_ts = get_global_timestamp();
global_ts.server = config.ntp.server.clone();
set_global_timestamp(global_ts);
update_global_timestamp();
actor.update_ntp_timestamp(ctx, config.ntp.update_period);

fut::ok(())
})
.map_err(|err, _, _| {
Expand Down Expand Up @@ -165,17 +175,22 @@ impl EpochManager {
Err(_) => return,
};

let last_checked_epoch = act.last_checked_epoch.unwrap_or(0);

// Send message to actors which subscribed to all epochs
for subscription in &mut act.subscriptions_all {
subscription.send_notification(current_epoch);
if current_epoch > last_checked_epoch {
// Only send new epoch notification
subscription.send_notification(current_epoch);
}
}

// Get all the checkpoints that had some subscription but were skipped for some
// reason (process sent to background, checkpoint monitor process had no
// resources to execute in time...)
let epoch_checkpoints: Vec<_> = act
.subscriptions_epoch
.range(act.last_checked_epoch.unwrap_or(0)..=current_epoch)
.range(last_checked_epoch..=current_epoch)
.map(|(k, _v)| *k)
.collect();

Expand Down Expand Up @@ -209,6 +224,19 @@ impl EpochManager {
},
);
}

/// Method to monitor checkpoints and execute some actions on each
fn update_ntp_timestamp(&self, ctx: &mut Context<Self>, period: i64) {
// Wait until next checkpoint to execute the periodic function
ctx.run_later(Duration::from_secs(period as u64), move |act, ctx| {
// Get current epoch

update_global_timestamp();

// Reschedule update ntp process
act.update_ntp_timestamp(ctx, period);
});
}
}

/// Trait that must follow all notifications that will be sent back to subscriber actors
Expand Down
5 changes: 5 additions & 0 deletions util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,8 @@ workspace = ".."
[dependencies]
chrono = "0.4.9"
failure = "0.1.6"
lazy_static = "1.4.0"
log = "0.4.8"
ntp = {git="https://github.com/JeffBelgum/ntp/"}
time = "0.1.42"
serde = { version = "1.0.101", features = ["derive"] }
124 changes: 117 additions & 7 deletions util/src/timestamp.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,132 @@
use chrono::prelude::*;
use chrono::{prelude::*, TimeZone};
use lazy_static::lazy_static;
use ntp;
use serde::{Deserialize, Serialize};
use std::sync::RwLock;
use std::time::Duration;

/// Function to get timestamp from system as UTC Unix timestamp, seconds since Unix epoch
pub fn get_timestamp() -> i64 {
/// NTP Timestamp information
#[derive(Debug, Default, Serialize, Deserialize, PartialEq, Clone)]
pub struct TimestampNTP {
/// Last UTC timestamp
pub last_utc: (i64, u32),
/// Difference between NTP and system timestamp
pub ntp_diff: Duration,
/// Flag to indicate if NTP is bigger or smaller
/// than system timestamp
pub bigger: bool,
/// Address to require NTP timestamp information
pub server: String,
}

lazy_static! {
static ref NTP_TS: RwLock<TimestampNTP> = RwLock::new(TimestampNTP {
server: String::from("0.pool.ntp.org:123"),
..TimestampNTP::default()
});
}

/// Get NTP timestamp
pub fn get_global_timestamp() -> TimestampNTP {
NTP_TS.read().unwrap().clone()
}

/// Set NTP timestamp
pub fn set_global_timestamp(ts: TimestampNTP) {
match NTP_TS.write() {
Ok(mut x) => {
if ts.last_utc.0 < x.last_utc.0 {
log::warn!(
"Set new timestamp from the past. New is {}, old is {}",
ts.last_utc.0,
x.last_utc.0
);
} else {
log::debug!("Set new timestamp");
}
*x = ts;
}
Err(e) => {
log::error!("Failed to set timestamp: {}", e);
}
}
}

/// Update NTP timestamp
pub fn update_global_timestamp() {
// Get UTC current datetime
let utc: DateTime<Utc> = Utc::now();

// Return number of non-leap seconds since Unix epoch
utc.timestamp()
let utc_secs = utc.timestamp();
let utc_subsec_nanos = utc.timestamp_subsec_nanos();
let utc = (utc_secs, utc_subsec_nanos);

let mut old_ts = get_global_timestamp();
let ntp = get_timestamp_ntp(old_ts.server.as_str());

match ntp {
Some(ntp) => {
if let Some(diff) = duration_between_timestamps(utc, ntp) {
old_ts.last_utc = utc;
old_ts.ntp_diff = diff;
old_ts.bigger = true;
} else {
let diff = duration_between_timestamps(ntp, utc).unwrap();
old_ts.last_utc = utc;
old_ts.ntp_diff = diff;
old_ts.bigger = false;
}
}

None => {
old_ts.last_utc = utc;
}
}

set_global_timestamp(old_ts.clone());
}

fn local_time(timestamp: ntp::protocol::TimestampFormat) -> chrono::DateTime<chrono::Local> {
let unix_time = ntp::unix_time::Instant::from(timestamp);
chrono::Local.timestamp(unix_time.secs(), unix_time.subsec_nanos() as _)
}
/// Get NTP timestamp from an addr specified
pub fn get_timestamp_ntp(addr: &str) -> Option<(i64, u32)> {
ntp::request(addr)
.map(|p| {
let ts = local_time(p.receive_timestamp);

(ts.timestamp(), ts.timestamp_subsec_nanos())
})
.ok()
}

/// Function to get timestamp from system as UTC Unix timestamp, seconds since Unix epoch
pub fn get_timestamp() -> (i64) {
get_timestamp_nanos().0
}

/// Function to get timestamp from system as UTC Unix timestamp, seconds and nanoseconds since Unix epoch
pub fn get_timestamp_nanos() -> (i64, u32) {
// Get UTC current datetime
let utc: DateTime<Utc> = Utc::now();

// Return number of non-leap seconds since Unix epoch and the number of nanoseconds since the last second boundary
(utc.timestamp(), utc.timestamp_subsec_nanos())
let utc_secs = utc.timestamp();
let utc_subsec_nanos = utc.timestamp_subsec_nanos();

let old_ts = get_global_timestamp();

let utc_dur = Duration::new(utc_secs as u64, utc_subsec_nanos);
let result = if old_ts.bigger {
utc_dur.checked_add(old_ts.ntp_diff)
} else {
utc_dur.checked_sub(old_ts.ntp_diff)
};

match result {
Some(x) => (x.as_secs() as i64, x.subsec_nanos()),
None => panic!("Error: Overflow in timestamp"),
}
}

/// Duration needed to wait from now until the target timestamp
Expand Down

0 comments on commit 39f428d

Please sign in to comment.