Skip to content

Commit

Permalink
feat(utils): use NTP timestamps to avoid sync problems
Browse files Browse the repository at this point in the history
  • Loading branch information
lrubiorod committed Nov 15, 2019
1 parent 8cba73b commit 0fcea21
Show file tree
Hide file tree
Showing 7 changed files with 206 additions and 14 deletions.
33 changes: 33 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ pub struct Config {
#[partial_struct(ty = "PartialLog")]
#[partial_struct(serde(default))]
pub log: Log,

/// Ntp-related configuration
#[partial_struct(ty = "PartialNtp")]
#[partial_struct(serde(default))]
pub ntp: Ntp,
}

/// Log-specific configuration.
Expand Down Expand Up @@ -276,6 +281,17 @@ pub struct Mining {
pub enabled: bool,
}

/// Ntp-related configuration
#[derive(PartialStruct, Debug, Clone, PartialEq)]
#[partial_struct(derive(Deserialize, Default, Debug, Clone, PartialEq))]
pub struct Ntp {
/// Period that indicate the validity of a ntp timestamp
pub update_period: i64,

/// Server to query ntp information
pub server: Vec<String>,
}

impl Config {
pub fn from_partial(config: &PartialConfig) -> Self {
let defaults: &dyn Defaults = match config.environment {
Expand Down Expand Up @@ -317,6 +333,7 @@ impl Config {
mining: Mining::from_partial(&config.mining, defaults),
wallet: Wallet::from_partial(&config.wallet, defaults),
rocksdb: Rocksdb::from_partial(&config.rocksdb, defaults),
ntp: Ntp::from_partial(&config.ntp, defaults),
}
}
}
Expand Down Expand Up @@ -476,6 +493,21 @@ impl Mining {
}
}

impl Ntp {
pub fn from_partial(config: &PartialNtp, defaults: &dyn Defaults) -> Self {
Ntp {
update_period: config
.update_period
.to_owned()
.unwrap_or_else(|| defaults.ntp_update_period()),
server: config
.server
.clone()
.unwrap_or_else(|| defaults.ntp_server()),
}
}
}

/// Wallet-specific configuration.
#[derive(PartialStruct, Serialize, Debug, Clone, PartialEq)]
#[partial_struct(derive(Deserialize, Default, Debug, Clone, PartialEq))]
Expand Down
8 changes: 8 additions & 0 deletions config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,14 @@ pub trait Defaults {
fn rocksdb_enable_statistics(&self) -> bool {
false
}

fn ntp_update_period(&self) -> i64 {
600
}

fn ntp_server(&self) -> Vec<String> {
vec!["0.pool.ntp.org:123".to_string()]
}
}

/// Struct that will implement all the mainnet defaults
Expand Down
1 change: 1 addition & 0 deletions data_structures/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ secp256k1 = "0.15.5"
vrf = "0.2.2"
witnet_reputation = { path = "../reputation", features = ["serde"] }
witnet_protected = { path = "../protected" }
chrono = "0.4.9"

[dependencies.partial_struct]
path = "../partial_struct"
Expand Down
37 changes: 33 additions & 4 deletions node/src/actors/epoch_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use witnet_data_structures::{
chain::{Epoch, EpochConstants},
error::EpochCalculationError,
};
use witnet_util::timestamp::{duration_between_timestamps, get_timestamp, get_timestamp_nanos};
use witnet_util::timestamp::{
duration_between_timestamps, get_mut_global_timestamp, get_timestamp, get_timestamp_nanos,
update_global_timestamp,
};

use crate::actors::messages::{EpochNotification, EpochResult};
use crate::config_mngr;
Expand Down Expand Up @@ -126,6 +129,14 @@ impl EpochManager {
// Start checkpoint monitoring process
actor.checkpoint_monitor(ctx);

// Start ntp update process
{
let mut global_ts = get_mut_global_timestamp();
global_ts.server = config.ntp.server[0].clone();
}
update_global_timestamp();
actor.update_ntp_timestamp(ctx, config.ntp.update_period);

fut::ok(())
})
.map_err(|err, _, _| {
Expand Down Expand Up @@ -165,17 +176,22 @@ impl EpochManager {
Err(_) => return,
};

let last_checked_epoch = act.last_checked_epoch.unwrap_or(0);

// Send message to actors which subscribed to all epochs
for subscription in &mut act.subscriptions_all {
subscription.send_notification(current_epoch);
if current_epoch > last_checked_epoch {
for subscription in &mut act.subscriptions_all {
// Only send new epoch notification
subscription.send_notification(current_epoch);
}
}

// Get all the checkpoints that had some subscription but were skipped for some
// reason (process sent to background, checkpoint monitor process had no
// resources to execute in time...)
let epoch_checkpoints: Vec<_> = act
.subscriptions_epoch
.range(act.last_checked_epoch.unwrap_or(0)..=current_epoch)
.range(last_checked_epoch..=current_epoch)
.map(|(k, _v)| *k)
.collect();

Expand Down Expand Up @@ -209,6 +225,19 @@ impl EpochManager {
},
);
}

/// Method to monitor checkpoints and execute some actions on each
fn update_ntp_timestamp(&self, ctx: &mut Context<Self>, period: i64) {
// Wait until next checkpoint to execute the periodic function
ctx.run_later(Duration::from_secs(period as u64), move |act, ctx| {
// Get current epoch

update_global_timestamp();

// Reschedule update ntp process
act.update_ntp_timestamp(ctx, period);
});
}
}

/// Trait that must follow all notifications that will be sent back to subscriber actors
Expand Down
5 changes: 5 additions & 0 deletions util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,8 @@ workspace = ".."
[dependencies]
chrono = "0.4.9"
failure = "0.1.6"
lazy_static = "1.4.0"
log = "0.4.8"
ntp = {git="https://github.com/JeffBelgum/ntp/"}
time = "0.1.42"
serde = { version = "1.0.101", features = ["derive"] }
104 changes: 94 additions & 10 deletions util/src/timestamp.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,106 @@
use chrono::prelude::*;
use chrono::{prelude::*, TimeZone};
use lazy_static::lazy_static;
use ntp;
use serde::{Deserialize, Serialize};
use std::sync::RwLock;
use std::time::Duration;

/// Function to get timestamp from system as UTC Unix timestamp, seconds since Unix epoch
pub fn get_timestamp() -> i64 {
/// NTP Timestamp information
#[derive(Debug, Default, Serialize, Deserialize, PartialEq, Clone)]
pub struct TimestampNTP {
/// Difference between NTP and system timestamp
pub ntp_diff: Duration,
/// Flag to indicate if NTP is bigger or smaller
/// than system timestamp
pub bigger: bool,
/// Address to require NTP timestamp information
pub server: String,
}

lazy_static! {
static ref NTP_TS: RwLock<TimestampNTP> = RwLock::new(TimestampNTP {
server: String::from("0.pool.ntp.org:123"),
..TimestampNTP::default()
});
}

/// Get NTP timestamp
pub fn get_global_timestamp() -> TimestampNTP {
NTP_TS.read().expect("Timestamp with poisoned lock").clone()
}

/// Set NTP timestamp
pub fn get_mut_global_timestamp() -> std::sync::RwLockWriteGuard<'static, TimestampNTP> {
NTP_TS.write().expect("Timestamp with poisoned lock")
}

/// Get Local timestamp
pub fn get_local_timestamp() -> (i64, u32) {
// Get UTC current datetime
let utc: DateTime<Utc> = Utc::now();

// Return number of non-leap seconds since Unix epoch
utc.timestamp()
let utc_secs = utc.timestamp();
let utc_subsec_nanos = utc.timestamp_subsec_nanos();

(utc_secs, utc_subsec_nanos)
}

/// Update NTP timestamp
pub fn update_global_timestamp() {
let utc = get_local_timestamp();
let mut global_ts = get_mut_global_timestamp();
let ntp = get_timestamp_ntp(global_ts.server.as_str());

if let Some(ntp) = ntp {
if let Some(diff) = duration_between_timestamps(utc, ntp) {
global_ts.ntp_diff = diff;
global_ts.bigger = true;
} else {
let diff = duration_between_timestamps(ntp, utc).unwrap();
global_ts.ntp_diff = diff;
global_ts.bigger = false;
}
} else {
log::warn!("Update global timestamp failed. Ntp request not achieve");
}
}

fn local_time(timestamp: ntp::protocol::TimestampFormat) -> chrono::DateTime<chrono::Local> {
let unix_time = ntp::unix_time::Instant::from(timestamp);
chrono::Local.timestamp(unix_time.secs(), unix_time.subsec_nanos() as u32)
}
/// Get NTP timestamp from an addr specified
pub fn get_timestamp_ntp(addr: &str) -> Option<(i64, u32)> {
ntp::request(addr)
.map(|p| {
let ts = local_time(p.receive_timestamp);

(ts.timestamp(), ts.timestamp_subsec_nanos())
})
.ok()
}

/// Function to get timestamp from system as UTC Unix timestamp, seconds since Unix epoch
pub fn get_timestamp() -> i64 {
get_timestamp_nanos().0
}

/// Function to get timestamp from system as UTC Unix timestamp, seconds and nanoseconds since Unix epoch
pub fn get_timestamp_nanos() -> (i64, u32) {
// Get UTC current datetime
let utc: DateTime<Utc> = Utc::now();

// Return number of non-leap seconds since Unix epoch and the number of nanoseconds since the last second boundary
(utc.timestamp(), utc.timestamp_subsec_nanos())
let utc_ts = get_local_timestamp();
let global_ts = get_global_timestamp();

let utc_dur = Duration::new(utc_ts.0 as u64, utc_ts.1);
let result = if global_ts.bigger {
utc_dur.checked_add(global_ts.ntp_diff)
} else {
utc_dur.checked_sub(global_ts.ntp_diff)
};

match result {
Some(x) => (x.as_secs() as i64, x.subsec_nanos()),
None => panic!("Error: Overflow in timestamp"),
}
}

/// Duration needed to wait from now until the target timestamp
Expand Down

0 comments on commit 0fcea21

Please sign in to comment.