diff --git a/src/lib.rs b/src/lib.rs index 3afd89e..570f323 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -182,6 +182,7 @@ pub struct Scope<'s> { context: Arc, worker_index: usize, job_queue: ThreadJobQueue<'s>, + join_count: u8, } impl<'s> Scope<'s> { @@ -194,6 +195,7 @@ impl<'s> Scope<'s> { context: thread_pool.context.clone(), worker_index, job_queue: ThreadJobQueue::Current(JobQueue::default()), + join_count: 0, } } @@ -206,6 +208,7 @@ impl<'s> Scope<'s> { context, worker_index, job_queue: ThreadJobQueue::Worker(job_queue), + join_count: 0, } } @@ -273,38 +276,13 @@ impl<'s> Scope<'s> { self.context.heartbeats[self.worker_index].store(false, Ordering::Relaxed); } - /// Runs `a` and `b` potentially in parallel on separate threads and - /// returns the results. - /// - /// # Examples - /// - /// ``` - /// # use spice::ThreadPool; - /// let mut tp = ThreadPool::new().unwrap(); - /// let mut s = tp.scope(); - /// - /// let mut vals = [0; 2]; - /// let (left, right) = vals.split_at_mut(1); - /// - /// s.join(|_|left[0] = 1, |_| right[0] = 1); - /// - /// assert_eq!(vals, [1; 2]); - /// ``` - pub fn join(&mut self, a: A, b: B) -> (RA, RB) + fn join_inner(&mut self, a: A, b: B) -> (RA, RB) where A: FnOnce(&mut Scope<'_>) -> RA + Send, B: FnOnce(&mut Scope<'_>) -> RB + Send, RA: Send, RB: Send, { - let a = move |scope: &mut Scope<'_>| { - if scope.context.heartbeats[scope.worker_index].load(Ordering::Relaxed) { - scope.heartbeat(); - } - - a(scope) - }; - let stack = JobStack::new(a); let job = Job::new(&stack); @@ -340,6 +318,85 @@ impl<'s> Scope<'s> { (ra, rb) } } + + /// Runs `a` and `b` potentially in parallel on separate threads and + /// returns the results. + /// + /// This variant skips checking for a heartbeat every 16 calls for improved + /// performance. + /// + /// # Examples + /// + /// ``` + /// # use spice::ThreadPool; + /// let mut tp = ThreadPool::new().unwrap(); + /// let mut s = tp.scope(); + /// + /// let mut vals = [0; 2]; + /// let (left, right) = vals.split_at_mut(1); + /// + /// s.join(|_|left[0] = 1, |_| right[0] = 1); + /// + /// assert_eq!(vals, [1; 2]); + /// ``` + pub fn join(&mut self, a: A, b: B) -> (RA, RB) + where + A: FnOnce(&mut Scope<'_>) -> RA + Send, + B: FnOnce(&mut Scope<'_>) -> RB + Send, + RA: Send, + RB: Send, + { + self.join_with_heartbeat_every::<16, _, _, _, _>(a, b) + } + + /// Runs `a` and `b` potentially in parallel on separate threads and + /// returns the results. + /// + /// This variant skips checking for a heartbeat every `TIMES - 1` calls for + /// improved performance. + /// + /// # Examples + /// + /// ``` + /// # use spice::ThreadPool; + /// let mut tp = ThreadPool::new().unwrap(); + /// let mut s = tp.scope(); + /// + /// let mut vals = [0; 2]; + /// let (left, right) = vals.split_at_mut(1); + /// + /// // Skip checking 7/8 calls to join_with_heartbeat_every. + /// s.join_with_heartbeat_every::<8, _, _, _, _>(|_|left[0] = 1, |_| right[0] = 1); + /// + /// assert_eq!(vals, [1; 2]); + /// ``` + pub fn join_with_heartbeat_every( + &mut self, + a: A, + b: B, + ) -> (RA, RB) + where + A: FnOnce(&mut Scope<'_>) -> RA + Send, + B: FnOnce(&mut Scope<'_>) -> RB + Send, + RA: Send, + RB: Send, + { + self.join_count = self.join_count.wrapping_add(1) % TIMES; + + if self.join_count == 0 { + let a = move |scope: &mut Scope<'_>| { + if scope.context.heartbeats[scope.worker_index].load(Ordering::Relaxed) { + scope.heartbeat(); + } + + a(scope) + }; + + self.join_inner(a, b) + } else { + self.join_inner(a, b) + } + } } /// `ThreadPool` configuration. @@ -597,9 +654,9 @@ mod tests { 0 => (), 1 => slice[0] += 1, _ => { - let (_, tail) = slice.split_at_mut(1); + let (head, tail) = slice.split_at_mut(1); - s.join( + s.join_with_heartbeat_every::<1, _, _, _, _>( |_| { thread::sleep(Duration::from_micros(100)); @@ -607,6 +664,8 @@ mod tests { threads_crossed.store(true, Ordering::Relaxed); panic!("panicked across threads"); } + + head[0] += 1; }, |s| increment(s, tail, id), );