From ad88c705e0667be650a2a9f73dbc6b1db330e5f3 Mon Sep 17 00:00:00 2001 From: Bergmann89 Date: Sat, 7 Nov 2020 00:22:31 +0100 Subject: [PATCH] Implemented 'fold' and 'fold_with' operation --- asparit/src/core/iterator.rs | 170 +++++++++++++++++ asparit/src/inner/filter.rs | 4 - asparit/src/inner/filter_map.rs | 4 - asparit/src/inner/fold.rs | 329 ++++++++++++++++++++++++++++++++ asparit/src/inner/mod.rs | 14 +- 5 files changed, 505 insertions(+), 16 deletions(-) create mode 100644 asparit/src/inner/fold.rs diff --git a/asparit/src/core/iterator.rs b/asparit/src/core/iterator.rs index 8a6e5a8..d134b94 100644 --- a/asparit/src/core/iterator.rs +++ b/asparit/src/core/iterator.rs @@ -9,6 +9,7 @@ use crate::{ copied::Copied, filter::Filter, filter_map::FilterMap, + fold::{Fold, FoldWith}, for_each::ForEach, inspect::Inspect, map::Map, @@ -619,6 +620,175 @@ pub trait ParallelIterator<'a>: Sized + Send { TryReduce::new(self, identity, operation) } + /// Parallel fold is similar to sequential fold except that the + /// sequence of items may be subdivided before it is + /// folded. Consider a list of numbers like `22 3 77 89 46`. If + /// you used sequential fold to add them (`fold(0, |a,b| a+b)`, + /// you would wind up first adding 0 + 22, then 22 + 3, then 25 + + /// 77, and so forth. The **parallel fold** works similarly except + /// that it first breaks up your list into sublists, and hence + /// instead of yielding up a single sum at the end, it yields up + /// multiple sums. The number of results is nondeterministic, as + /// is the point where the breaks occur. + /// + /// So if did the same parallel fold (`fold(0, |a,b| a+b)`) on + /// our example list, we might wind up with a sequence of two numbers, + /// like so: + /// + /// ```notrust + /// 22 3 77 89 46 + /// | | + /// 102 135 + /// ``` + /// + /// Or perhaps these three numbers: + /// + /// ```notrust + /// 22 3 77 89 46 + /// | | | + /// 102 89 46 + /// ``` + /// + /// In general, Rayon will attempt to find good breaking points + /// that keep all of your cores busy. + /// + /// ### Fold versus reduce + /// + /// The `fold()` and `reduce()` methods each take an identity element + /// and a combining function, but they operate rather differently. + /// + /// `reduce()` requires that the identity function has the same + /// type as the things you are iterating over, and it fully + /// reduces the list of items into a single item. So, for example, + /// imagine we are iterating over a list of bytes `bytes: [128_u8, + /// 64_u8, 64_u8]`. If we used `bytes.reduce(|| 0_u8, |a: u8, b: + /// u8| a + b)`, we would get an overflow. This is because `0`, + /// `a`, and `b` here are all bytes, just like the numbers in the + /// list (I wrote the types explicitly above, but those are the + /// only types you can use). To avoid the overflow, we would need + /// to do something like `bytes.map(|b| b as u32).reduce(|| 0, |a, + /// b| a + b)`, in which case our result would be `256`. + /// + /// In contrast, with `fold()`, the identity function does not + /// have to have the same type as the things you are iterating + /// over, and you potentially get back many results. So, if we + /// continue with the `bytes` example from the previous paragraph, + /// we could do `bytes.fold(|| 0_u32, |a, b| a + (b as u32))` to + /// convert our bytes into `u32`. And of course we might not get + /// back a single sum. + /// + /// There is a more subtle distinction as well, though it's + /// actually implied by the above points. When you use `reduce()`, + /// your reduction function is sometimes called with values that + /// were never part of your original parallel iterator (for + /// example, both the left and right might be a partial sum). With + /// `fold()`, in contrast, the left value in the fold function is + /// always the accumulator, and the right value is always from + /// your original sequence. + /// + /// ### Fold vs Map/Reduce + /// + /// Fold makes sense if you have some operation where it is + /// cheaper to create groups of elements at a time. For example, + /// imagine collecting characters into a string. If you were going + /// to use map/reduce, you might try this: + /// + /// ``` + /// use rayon::prelude::*; + /// + /// let s = + /// ['a', 'b', 'c', 'd', 'e'] + /// .par_iter() + /// .map(|c: &char| format!("{}", c)) + /// .reduce(|| String::new(), + /// |mut a: String, b: String| { a.push_str(&b); a }); + /// + /// assert_eq!(s, "abcde"); + /// ``` + /// + /// Because reduce produces the same type of element as its input, + /// you have to first map each character into a string, and then + /// you can reduce them. This means we create one string per + /// element in our iterator -- not so great. Using `fold`, we can + /// do this instead: + /// + /// ``` + /// use rayon::prelude::*; + /// + /// let s = + /// ['a', 'b', 'c', 'd', 'e'] + /// .par_iter() + /// .fold(|| String::new(), + /// |mut s: String, c: &char| { s.push(*c); s }) + /// .reduce(|| String::new(), + /// |mut a: String, b: String| { a.push_str(&b); a }); + /// + /// assert_eq!(s, "abcde"); + /// ``` + /// + /// Now `fold` will process groups of our characters at a time, + /// and we only make one string per group. We should wind up with + /// some small-ish number of strings roughly proportional to the + /// number of CPUs you have (it will ultimately depend on how busy + /// your processors are). Note that we still need to do a reduce + /// afterwards to combine those groups of strings into a single + /// string. + /// + /// You could use a similar trick to save partial results (e.g., a + /// cache) or something similar. + /// + /// ### Combining fold with other operations + /// + /// You can combine `fold` with `reduce` if you want to produce a + /// single value. This is then roughly equivalent to a map/reduce + /// combination in effect: + /// + /// ``` + /// use rayon::prelude::*; + /// + /// let bytes = 0..22_u8; + /// let sum = bytes.into_par_iter() + /// .fold(|| 0_u32, |a: u32, b: u8| a + (b as u32)) + /// .sum::(); + /// + /// assert_eq!(sum, (0..22).sum()); // compare to sequential + /// ``` + fn fold(self, init: S, operation: O) -> Fold + where + S: Fn() -> U + Clone + Send + 'a, + O: Fn(U, Self::Item) -> U + Clone + Send + 'a, + U: Send, + { + Fold::new(self, init, operation) + } + + /// Applies `operation` to the given `init` value with each item of this + /// iterator, finally producing the value for further use. + /// + /// This works essentially like `fold(|| init.clone(), operation)`, except + /// it doesn't require the `init` type to be `Sync`, nor any other form + /// of added synchronization. + /// + /// # Examples + /// + /// ``` + /// use rayon::prelude::*; + /// + /// let bytes = 0..22_u8; + /// let sum = bytes.into_par_iter() + /// .fold_with(0_u32, |a: u32, b: u8| a + (b as u32)) + /// .sum::(); + /// + /// assert_eq!(sum, (0..22).sum()); // compare to sequential + /// ``` + fn fold_with(self, init: U, operation: O) -> FoldWith + where + U: Clone + Send + 'a, + O: Fn(U, Self::Item) -> U + Clone + Send + 'a, + { + FoldWith::new(self, init, operation) + } + /// Creates a fresh collection containing all the elements produced /// by this parallel iterator. /// diff --git a/asparit/src/inner/filter.rs b/asparit/src/inner/filter.rs index 2bdfc2d..021977c 100644 --- a/asparit/src/inner/filter.rs +++ b/asparit/src/inner/filter.rs @@ -45,10 +45,6 @@ where operation: self.operation, }) } - - fn len_hint_opt(&self) -> Option { - self.base.len_hint_opt() - } } /* FilterConsumer */ diff --git a/asparit/src/inner/filter_map.rs b/asparit/src/inner/filter_map.rs index a7b7e36..7f65648 100644 --- a/asparit/src/inner/filter_map.rs +++ b/asparit/src/inner/filter_map.rs @@ -46,10 +46,6 @@ where operation: self.operation, }) } - - fn len_hint_opt(&self) -> Option { - self.base.len_hint_opt() - } } /* FilterMapConsumer */ diff --git a/asparit/src/inner/fold.rs b/asparit/src/inner/fold.rs new file mode 100644 index 0000000..2ecd921 --- /dev/null +++ b/asparit/src/inner/fold.rs @@ -0,0 +1,329 @@ +use crate::{Consumer, Executor, Folder, ParallelIterator, Producer, ProducerCallback, Reducer}; + +/* Fold */ + +pub struct Fold { + base: X, + init: S, + operation: O, +} + +impl Fold { + pub fn new(base: X, init: S, operation: O) -> Self { + Self { + base, + init, + operation, + } + } +} + +impl<'a, X, S, O, U> ParallelIterator<'a> for Fold +where + X: ParallelIterator<'a>, + S: Fn() -> U + Clone + Send + 'a, + O: Fn(U, X::Item) -> U + Clone + Send + 'a, + U: Send, +{ + type Item = U; + + fn drive(self, executor: E, consumer: C) -> E::Result + where + E: Executor<'a, D>, + C: Consumer + 'a, + D: Send, + R: Reducer + Send, + { + self.base.drive( + executor, + FoldConsumer { + base: consumer, + init: self.init, + operation: self.operation, + }, + ) + } + + fn with_producer(self, callback: CB) -> CB::Output + where + CB: ProducerCallback<'a, Self::Item>, + { + self.base.with_producer(FoldCallback { + base: callback, + init: self.init, + operation: self.operation, + }) + } +} + +/* FoldWith */ + +pub struct FoldWith { + base: X, + init: U, + operation: O, +} + +impl FoldWith { + pub fn new(base: X, init: U, operation: O) -> Self { + Self { + base, + init, + operation, + } + } +} + +impl<'a, X, U, O> ParallelIterator<'a> for FoldWith +where + X: ParallelIterator<'a>, + U: Clone + Send + 'a, + O: Fn(U, X::Item) -> U + Clone + Send + 'a, +{ + type Item = U; + + fn drive(self, executor: E, consumer: C) -> E::Result + where + E: Executor<'a, D>, + C: Consumer + 'a, + D: Send, + R: Reducer + Send, + { + let FoldWith { + base, + init, + operation, + } = self; + + base.drive( + executor, + FoldConsumer { + base: consumer, + init: move || init.clone(), + operation, + }, + ) + } + + fn with_producer(self, callback: CB) -> CB::Output + where + CB: ProducerCallback<'a, Self::Item>, + { + let FoldWith { + base, + init, + operation, + } = self; + + base.with_producer(FoldCallback { + base: callback, + init: move || init.clone(), + operation, + }) + } +} + +/* FoldConsumer */ + +struct FoldConsumer { + base: C, + init: S, + operation: O, +} + +impl<'a, C, S, O, T, U> Consumer for FoldConsumer +where + C: Consumer, + S: Fn() -> U + Clone + Send, + O: Fn(U, T) -> U + Clone + Send, +{ + type Folder = FoldFolder; + type Reducer = C::Reducer; + type Result = C::Result; + + fn split(self) -> (Self, Self, Self::Reducer) { + let (left, right, reducer) = self.base.split(); + + let left = FoldConsumer { + base: left, + init: self.init.clone(), + operation: self.operation.clone(), + }; + let right = FoldConsumer { + base: right, + init: self.init, + operation: self.operation, + }; + + (left, right, reducer) + } + + fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) { + let (left, right, reducer) = self.base.split_at(index); + + let left = FoldConsumer { + base: left, + init: self.init.clone(), + operation: self.operation.clone(), + }; + let right = FoldConsumer { + base: right, + init: self.init, + operation: self.operation, + }; + + (left, right, reducer) + } + + fn into_folder(self) -> Self::Folder { + FoldFolder { + base: self.base.into_folder(), + item: (self.init)(), + operation: self.operation, + } + } + + fn is_full(&self) -> bool { + self.base.is_full() + } +} + +/* FoldCallback */ + +struct FoldCallback { + base: CB, + init: S, + operation: O, +} + +impl<'a, CB, S, O, T, U> ProducerCallback<'a, T> for FoldCallback +where + CB: ProducerCallback<'a, U>, + S: Fn() -> U + Clone + Send + 'a, + O: Fn(U, T) -> U + Clone + Send + 'a, +{ + type Output = CB::Output; + + fn callback

(self, producer: P) -> Self::Output + where + P: Producer + 'a, + { + self.base.callback(FoldProducer { + base: producer, + init: self.init, + operation: self.operation, + }) + } +} + +/* FoldProducer */ + +struct FoldProducer { + base: P, + init: S, + operation: O, +} + +impl<'a, P, S, O, U> Producer for FoldProducer +where + P: Producer, + S: Fn() -> U + Clone + Send, + O: Fn(U, P::Item) -> U + Clone + Send, +{ + type Item = U; + type IntoIter = std::iter::Once; + + fn into_iter(self) -> Self::IntoIter { + let item = (self.init)(); + let item = self.base.into_iter().fold(item, self.operation); + + std::iter::once(item) + } + + fn split(self) -> (Self, Option) { + let init = self.init; + let operation = self.operation; + let (left, right) = self.base.split(); + + let left = FoldProducer { + base: left, + init: init.clone(), + operation: operation.clone(), + }; + let right = right.map(move |right| FoldProducer { + base: right, + init, + operation, + }); + + (left, right) + } + + fn splits(&self) -> Option { + self.base.splits() + } + + fn fold_with(self, folder: F) -> F + where + F: Folder, + { + self.base + .fold_with(FoldFolder { + base: folder, + item: (self.init)(), + operation: self.operation, + }) + .base + } +} + +/* FoldFolder */ + +struct FoldFolder { + base: F, + operation: O, + item: U, +} + +impl Folder for FoldFolder +where + F: Folder, + O: Fn(U, T) -> U + Clone, +{ + type Result = F::Result; + + fn consume(mut self, item: T) -> Self { + self.item = (self.operation)(self.item, item); + + self + } + + fn consume_iter(self, iter: X) -> Self + where + X: IntoIterator, + { + let FoldFolder { + base, + operation, + item, + } = self; + let item = iter + .into_iter() + .take_while(|_| !base.is_full()) + .fold(item, operation.clone()); + + FoldFolder { + base, + operation, + item, + } + } + + fn complete(self) -> Self::Result { + self.base.consume(self.item).complete() + } + + fn is_full(&self) -> bool { + self.base.is_full() + } +} diff --git a/asparit/src/inner/mod.rs b/asparit/src/inner/mod.rs index aa15b1b..be19fa7 100644 --- a/asparit/src/inner/mod.rs +++ b/asparit/src/inner/mod.rs @@ -3,6 +3,7 @@ pub mod collect; pub mod copied; pub mod filter; pub mod filter_map; +pub mod fold; pub mod for_each; pub mod inspect; pub mod map; @@ -27,23 +28,20 @@ mod tests { let j = Arc::new(AtomicUsize::new(0)); let x = vec![ - vec![0usize], - vec![1usize], - vec![2usize], - vec![3usize], - vec![4usize], - vec![5usize], + vec![1usize, 2usize], + vec![3usize, 4usize], + vec![5usize, 6usize], ]; let x = x .par_iter() .cloned() - .update(|x| x.push(5)) + .update(|x| x.push(0)) .map_init( move || i.fetch_add(1, Ordering::Relaxed), |init, item| (item, *init), ) - .filter_map(|(x, i)| if i % 2 == 0 { Some((i, x)) } else { None }) + .fold_with(String::new(), |s, item| format!("{} + {:?}", s, item)) .try_for_each_init( move || j.fetch_add(1, Ordering::Relaxed), |init, item| -> Result<(), ()> {