Skip to content

Commit

Permalink
update: improve scheduler and scheduler runner logic
Browse files Browse the repository at this point in the history
  • Loading branch information
calebpitan committed Oct 21, 2024
1 parent 6df19a9 commit 3006b66
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 27 deletions.
26 changes: 21 additions & 5 deletions packages/scheduler/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,35 @@
mod utils;

mod queue;
mod scheduler;
mod utils;

use wasm_bindgen::prelude::*;

use scheduler::scheduler::StScheduler;

#[wasm_bindgen]
extern "C" {
fn alert(s: &str);
#[wasm_bindgen(js_namespace = console)]
fn log(s: &str);

#[wasm_bindgen(js_namespace = console)]
fn error(s: &str);
}

#[wasm_bindgen]
pub fn greet() {
alert("Hello, scheduler!");
#[macro_export]
macro_rules! console_log {
($($t:tt)*) => {
let time = crate::scheduler::time::utc_now().to_datetime().format("%Y-%m-%dT%H:%M:%S%.3f%z");
$crate::log(&format!("[Scheduler] - {} INFO {}", time, &format!($($t)*)));
};
}

#[macro_export]
macro_rules! console_error {
($($t:tt)*) => {
let time = crate::scheduler::time::utc_now().to_datetime().format("%Y-%m-%dT%H:%M:%S%.3f%z");
$crate::error(&format!("[Scheduler] - {} ERROR {}", time, &format!($($t)*)));
};
}

#[wasm_bindgen]
Expand Down
113 changes: 91 additions & 22 deletions packages/scheduler/src/scheduler/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use std::marker::PhantomData;
use std::thread::sleep;
use std::ops::Sub;
use std::time::Duration;

use async_std::task;
use wasm_bindgen::prelude::*;

use super::schedule::StSchedule;
use super::time::utc_now;
use crate::console_log;
use crate::queue::priority_queue::{Comparator, PriorityQueue};
use crate::scheduler::schedule::StSchedule;
use crate::scheduler::time::utc_now;
use crate::scheduler::time::Timestamp;

pub struct ClosureComparator<F, T>
where
Expand All @@ -28,6 +31,7 @@ where
#[wasm_bindgen]
pub struct StScheduler {
pq: PriorityQueue<StSchedule>,
receiver: Option<js_sys::Function>,
_aborted: bool,
}

Expand All @@ -41,45 +45,110 @@ impl StScheduler {

StScheduler {
pq: PriorityQueue::new(Box::new(comparator)),
receiver: None,
_aborted: false,
}
}

pub fn schedule(&mut self, config: StSchedule) {
let next_schedule = config.get_next_schedule();
async fn idle(&self, till: Option<Timestamp>) {
let sleep_ts = match till {
Some(t) => t,
None => Timestamp::Millis(1000),
};

console_log!("scheduler idling for {} milliseconds", sleep_ts.to_ms());

self.pq.enqueue(next_schedule);
task::sleep(Duration::from_millis(sleep_ts.to_ms())).await
}

pub fn run(&mut self) {
let sleep_duration_ms = 1000;
pub fn add_schedule(&mut self, schedule: StSchedule) {
if schedule.is_passed() {
console_log!("Evaluating upcoming schedules");
let upcoming_schedules = schedule.get_upcoming_schedule();

match upcoming_schedules {
Some(s) => {
self.pq.enqueue(s);
console_log!(
"Added an upcoming schedule to queue since original schedule has passed"
);
}
None => (),
}
} else {
console_log!("Adding schedule to queue");
self.pq.enqueue(schedule)
}
}

self._aborted = false;
pub fn subscribe(&mut self, receiver: js_sys::Function) {
console_log!("subscription received");
self.receiver = Some(receiver)
}

pub fn abort(&mut self) {
if self._aborted == false {
console_log!("scheduler aborted!");
self._aborted = true;
}
}

fn unabort(&mut self) {
if self._aborted == true {
console_log!("scheduler resumed!");
self._aborted = false;
}
}

fn isaborted(&self) -> bool {
return self._aborted == true;
}

pub async fn run(&mut self) {
self.unabort();
let sleep_ts = Timestamp::Millis(1000);

console_log!("scheduler running");

loop {
if self._aborted {
if self.isaborted() {
break;
}

if self.pq.is_empty() {
sleep(Duration::from_millis(sleep_duration_ms));
self.idle(None).await;
continue;
}

let peaked = self.pq.peek().expect("Expected a non-empty queue");
let timestamp = peaked.get_timestamp();
let now = utc_now() as u64;
let peeked = self.pq.peek().expect("Expected a non-empty queue");
let peeked_id = peeked.get_id();
let timestamp = Timestamp::Millis(peeked.get_timestamp().sub(3_600_000));
let now = utc_now();
let difference = timestamp - now;

if timestamp > now && difference > sleep_duration_ms {
sleep(Duration::from_millis(sleep_duration_ms));
} else {
// trigger an event
if timestamp > now {
console_log!("there are no due schedules yet");
console_log!(
"timestamp: {}; current time: {}",
timestamp.to_ms().to_string().as_str(),
now.to_ms().to_string().as_str()
);

if difference > sleep_ts {
self.idle(Some(sleep_ts)).await;
continue;
}
continue;
} else if timestamp >= now {
console_log!("dispatching one schedule found to be due");

let item = self.pq.dequeue().unwrap();
let item_id = item.get_id();
assert_eq!(item_id, peeked_id);
self.receiver
.as_ref()
.map(|r| r.call1(&JsValue::NULL, &JsValue::from_str(item.get_id().as_str())));
}
}
}

pub fn abort(&mut self) {
self._aborted = true;
}
}

0 comments on commit 3006b66

Please sign in to comment.