diff --git a/asparit/src/core/iterator.rs b/asparit/src/core/iterator.rs index c3d4592..88c5917 100644 --- a/asparit/src/core/iterator.rs +++ b/asparit/src/core/iterator.rs @@ -5,6 +5,7 @@ use super::{ use crate::inner::{ collect::Collect, for_each::ForEach, map::Map, map_init::MapInit, map_with::MapWith, + reduce::Reduce, }; /// Parallel version of the standard iterator trait. @@ -264,6 +265,44 @@ pub trait ParallelIterator<'a>: Sized + Send { MapInit::new(self, init, operation) } + /// Reduces the items in the iterator into one item using `operation`. + /// The argument `identity` should be a closure that can produce + /// "identity" value which may be inserted into the sequence as + /// needed to create opportunities for parallel execution. So, for + /// example, if you are doing a summation, then `identity()` ought + /// to produce something that represents the zero for your type + /// (but consider just calling `sum()` in that case). + /// + /// # Examples + /// + /// ``` + /// // Iterate over a sequence of pairs `(x0, y0), ..., (xN, yN)` + /// // and use reduce to compute one pair `(x0 + ... + xN, y0 + ... + yN)` + /// // where the first/second elements are summed separately. + /// use rayon::prelude::*; + /// let sums = [(0, 1), (5, 6), (16, 2), (8, 9)] + /// .par_iter() // iterating over &(i32, i32) + /// .cloned() // iterating over (i32, i32) + /// .reduce(|| (0, 0), // the "identity" is 0 in both columns + /// |a, b| (a.0 + b.0, a.1 + b.1)); + /// assert_eq!(sums, (0 + 5 + 16 + 8, 1 + 6 + 2 + 9)); + /// ``` + /// + /// **Note:** unlike a sequential `fold` operation, the order in + /// which `operation` will be applied to reduce the result is not fully + /// specified. So `operation` should be [associative] or else the results + /// will be non-deterministic. And of course `identity()` should + /// produce a true identity. + /// + /// [associative]: https://en.wikipedia.org/wiki/Associative_property + fn reduce(self, identity: S, operation: O) -> Reduce + where + S: Fn() -> Self::Item + Clone + Send + 'a, + O: Fn(Self::Item, Self::Item) -> Self::Item + Clone + Send + 'a, + { + Reduce::new(self, identity, operation) + } + /// Creates a fresh collection containing all the elements produced /// by this parallel iterator. /// diff --git a/asparit/src/inner/for_each.rs b/asparit/src/inner/for_each.rs index f882773..08b73fa 100644 --- a/asparit/src/inner/for_each.rs +++ b/asparit/src/inner/for_each.rs @@ -83,35 +83,3 @@ where fn complete(self) {} } - -#[cfg(test)] -mod tests { - use super::*; - use crate::*; - - #[tokio::test] - async fn test_for_each() { - use ::std::sync::atomic::{AtomicUsize, Ordering}; - use ::std::sync::Arc; - - let i = Arc::new(AtomicUsize::new(0)); - let j = Arc::new(AtomicUsize::new(0)); - - let x = (0..10usize) - .into_par_iter() - .map_init( - move || i.fetch_add(1, Ordering::Relaxed), - |init, item| Some((*init, item)), - ) - .for_each_init( - move || j.fetch_add(1, Ordering::Relaxed), - |init, item| { - println!("{:?} {:?}", init, item); - }, - ) - .exec() - .await; - - dbg!(x); - } -} diff --git a/asparit/src/inner/mod.rs b/asparit/src/inner/mod.rs index 6e3604c..5e6e094 100644 --- a/asparit/src/inner/mod.rs +++ b/asparit/src/inner/mod.rs @@ -4,3 +4,48 @@ pub mod map; pub mod map_init; pub mod map_with; pub mod noop; +pub mod reduce; + +#[cfg(test)] +mod tests { + use crate::*; + + #[tokio::test] + async fn test_for_each() { + use ::std::sync::atomic::{AtomicUsize, Ordering}; + use ::std::sync::Arc; + + let i = Arc::new(AtomicUsize::new(0)); + let j = Arc::new(AtomicUsize::new(0)); + + let x = (0..10usize) + .into_par_iter() + .map_init( + move || i.fetch_add(1, Ordering::Relaxed), + |init, item| Some((*init, item)), + ) + .for_each_init( + move || j.fetch_add(1, Ordering::Relaxed), + |init, item| { + println!("{:?} {:?}", init, item); + }, + ) + .exec() + .await; + + dbg!(x); + } + + #[tokio::test] + async fn test_reduce() { + let x = (0..10usize) + .into_par_iter() + .reduce(|| 0, |a, b| a + b) + .exec() + .await; + + dbg!(x); + + assert_eq!(45, x); + } +} diff --git a/asparit/src/inner/reduce.rs b/asparit/src/inner/reduce.rs new file mode 100644 index 0000000..8634c63 --- /dev/null +++ b/asparit/src/inner/reduce.rs @@ -0,0 +1,137 @@ +use crate::{core::Driver, Consumer, Executor, Folder, IndexedConsumer, ParallelIterator, Reducer}; + +pub struct Reduce { + iterator: X, + identity: S, + operation: O, +} + +impl Reduce { + pub fn new(iterator: X, identity: S, operation: O) -> Self { + Self { + iterator, + identity, + operation, + } + } +} + +impl<'a, X, S, O> Driver<'a, X::Item> for Reduce +where + X: ParallelIterator<'a>, + S: Fn() -> X::Item + Clone + Send + 'a, + O: Fn(X::Item, X::Item) -> X::Item + Clone + Send + 'a, +{ + fn exec_with(self, executor: E) -> E::Result + where + E: Executor<'a, X::Item>, + { + let iterator = self.iterator; + let identity = self.identity; + let operation = self.operation; + + let consumer = ReduceConsumer { + identity, + operation, + }; + + iterator.drive(executor, consumer) + } +} + +/* ReduceConsumer */ + +struct ReduceConsumer { + identity: S, + operation: O, +} + +impl Clone for ReduceConsumer +where + S: Clone, + O: Clone, +{ + fn clone(&self) -> Self { + Self { + identity: self.identity.clone(), + operation: self.operation.clone(), + } + } +} + +impl Consumer for ReduceConsumer +where + S: Fn() -> T + Clone + Send, + O: Fn(T, T) -> T + Clone + Send, + T: Send, +{ + type Folder = ReduceFolder; + type Reducer = Self; + type Result = T; + + fn split_off_left(&self) -> (Self, Self::Reducer) { + (self.clone(), self.clone()) + } + + fn into_folder(self) -> Self::Folder { + ReduceFolder { + operation: self.operation, + item: (self.identity)(), + } + } +} + +impl IndexedConsumer for ReduceConsumer +where + S: Fn() -> T + Clone + Send, + O: Fn(T, T) -> T + Clone + Send, + T: Send, +{ + fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) { + (self.clone(), self.clone(), self) + } +} + +impl Reducer for ReduceConsumer +where + O: Fn(T, T) -> T, + S: Fn() -> T, + T: Send, +{ + fn reduce(self, left: T, right: T) -> T { + (self.operation)(left, right) + } +} + +/* ReduceFolder */ + +struct ReduceFolder { + operation: O, + item: T, +} + +impl Folder for ReduceFolder +where + O: Fn(T, T) -> T + Clone, +{ + type Result = T; + + fn consume(mut self, item: T) -> Self { + self.item = (self.operation)(self.item, item); + + self + } + + fn consume_iter(mut self, iter: X) -> Self + where + X: IntoIterator, + { + self.item = iter.into_iter().fold(self.item, self.operation.clone()); + + self + } + + fn complete(self) -> Self::Result { + self.item + } +} diff --git a/asparit/src/lib.rs b/asparit/src/lib.rs index ff5ea6e..0fa9642 100644 --- a/asparit/src/lib.rs +++ b/asparit/src/lib.rs @@ -4,7 +4,7 @@ mod inner; mod std; pub use self::core::{ - Consumer, Executor, ExecutorCallback, Folder, IndexedConsumer, IndexedParallelIterator, + Consumer, Driver, Executor, ExecutorCallback, Folder, IndexedConsumer, IndexedParallelIterator, IndexedProducer, IndexedProducerCallback, IntoParallelIterator, IntoParallelRefIterator, IntoParallelRefMutIterator, ParallelIterator, Producer, ProducerCallback, Reducer, };