Skip to content

Commit

Permalink
Merge pull request #5 from dragostis/extra
Browse files Browse the repository at this point in the history
Fine-tuned the heartbeat skipping.
  • Loading branch information
dragostis authored Sep 15, 2024
2 parents 0232f5f + 1bf62ff commit 1a4647c
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 14 deletions.
12 changes: 12 additions & 0 deletions src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ unsafe impl Send for Job {}
pub struct JobQueue {
sentinel: NonNull<Job>,
tail: NonNull<Job>,
len: u32,
}

impl Default for JobQueue {
Expand All @@ -248,6 +249,7 @@ impl Default for JobQueue {
Self {
sentinel: root,
tail: root,
len: 0,
}
}
}
Expand All @@ -263,6 +265,10 @@ impl Drop for JobQueue {
}

impl JobQueue {
pub fn len(&self) -> u32 {
self.len
}

/// Any `Job` pushed onto the queue should alive at least until it gets
/// popped.
pub unsafe fn push_back<T>(&mut self, job: &Job<T>) {
Expand All @@ -282,6 +288,8 @@ impl JobQueue {
.set(Some(NonNull::from(next_tail).cast()));
next_tail.prev.set(Some(current_tail.into()));

self.len += 1;

self.tail = next_tail.into();
}

Expand All @@ -301,6 +309,8 @@ impl JobQueue {
current_tail.prev.set(None);
prev_tail.fut_or_next.set(None);

self.len -= 1;

self.tail = prev_tail.into();
}
}
Expand Down Expand Up @@ -344,6 +354,8 @@ impl JobQueue {
head.fut_or_next
.set(Some(Box::leak(Box::new(Future::default())).into()));

self.len -= 1;

// `self.sentinel`'s `fut_or_next` pointer can only be set by
// `JobQueue::push_back` or by `JobQueue::pop_front` when it's set
// to a job that was previous set by `JobQueue::push_back` and
Expand Down
41 changes: 27 additions & 14 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,13 +276,34 @@ impl<'s> Scope<'s> {
self.context.heartbeats[self.worker_index].store(false, Ordering::Relaxed);
}

fn join_inner<A, B, RA, RB>(&mut self, a: A, b: B) -> (RA, RB)
fn join_seq<A, B, RA, RB>(&mut self, a: A, b: B) -> (RA, RB)
where
A: FnOnce(&mut Scope<'_>) -> RA + Send,
B: FnOnce(&mut Scope<'_>) -> RB + Send,
RA: Send,
RB: Send,
{
let rb = b(self);
let ra = a(self);

(ra, rb)
}

fn join_heartbeat<A, B, RA, RB>(&mut self, a: A, b: B) -> (RA, RB)
where
A: FnOnce(&mut Scope<'_>) -> RA + Send,
B: FnOnce(&mut Scope<'_>) -> RB + Send,
RA: Send,
RB: Send,
{
let a = move |scope: &mut Scope<'_>| {
if scope.context.heartbeats[scope.worker_index].load(Ordering::Relaxed) {
scope.heartbeat();
}

a(scope)
};

let stack = JobStack::new(a);
let job = Job::new(&stack);

Expand Down Expand Up @@ -346,7 +367,7 @@ impl<'s> Scope<'s> {
RA: Send,
RB: Send,
{
self.join_with_heartbeat_every::<16, _, _, _, _>(a, b)
self.join_with_heartbeat_every::<64, _, _, _, _>(a, b)
}

/// Runs `a` and `b` potentially in parallel on separate threads and
Expand Down Expand Up @@ -383,18 +404,10 @@ impl<'s> Scope<'s> {
{
self.join_count = self.join_count.wrapping_add(1) % TIMES;

if self.join_count == 0 {
let a = move |scope: &mut Scope<'_>| {
if scope.context.heartbeats[scope.worker_index].load(Ordering::Relaxed) {
scope.heartbeat();
}

a(scope)
};

self.join_inner(a, b)
if self.join_count == 0 || self.job_queue.len() < 3 {
self.join_heartbeat(a, b)
} else {
self.join_inner(a, b)
self.join_seq(a, b)
}
}
}
Expand Down Expand Up @@ -613,7 +626,7 @@ mod tests {
_ => {
let (head, tail) = slice.split_at_mut(1);

s.join(
s.join_with_heartbeat_every::<1, _, _, _, _>(
|_| {
thread::sleep(Duration::from_micros(10));
head[0] += 1;
Expand Down

0 comments on commit 1a4647c

Please sign in to comment.