From cec8cd0f6287cf351b34b5fd38876a72ae870d04 Mon Sep 17 00:00:00 2001 From: Bergmann89 Date: Wed, 4 Nov 2020 23:20:20 +0100 Subject: [PATCH] Implemented 'map_init' operation --- asparit/src/core/iterator.rs | 58 +++++- asparit/src/inner/for_each.rs | 7 +- asparit/src/inner/map.rs | 8 +- asparit/src/inner/map_init.rs | 332 ++++++++++++++++++++++++++++++++++ asparit/src/inner/map_with.rs | 24 +-- asparit/src/inner/mod.rs | 1 + 6 files changed, 403 insertions(+), 27 deletions(-) create mode 100644 asparit/src/inner/map_init.rs diff --git a/asparit/src/core/iterator.rs b/asparit/src/core/iterator.rs index edc7537..0d5e8ca 100644 --- a/asparit/src/core/iterator.rs +++ b/asparit/src/core/iterator.rs @@ -1,6 +1,6 @@ use super::{Consumer, IndexedConsumer, IndexedProducerCallback, ProducerCallback, Executor, Reducer}; -use crate::inner::{for_each::ForEach, map::Map, map_with::MapWith}; +use crate::inner::{for_each::ForEach, map::Map, map_with::MapWith, map_init::MapInit}; /// Parallel version of the standard iterator trait. /// @@ -134,15 +134,16 @@ pub trait ParallelIterator<'a>: Sized + Send { /// let (sender, receiver) = channel(); /// /// let a: Vec<_> = (0..5) - /// .into_par_iter() // iterating over i32 - /// .map_with(sender, |s, x| { - /// s.send(x).unwrap(); // sending i32 values through the channel - /// x // returning i32 - /// }) - /// .collect(); // collecting the returned values into a vector - /// - /// let mut b: Vec<_> = receiver.iter() // iterating over the values in the channel - /// .collect(); // and collecting them + /// .into_par_iter() // iterating over i32 + /// .map_with(sender, |s, x| { + /// s.send(x).unwrap(); // sending i32 values through the channel + /// x // returning i32 + /// }) + /// .collect(); // collecting the returned values into a vector + /// + /// let mut b: Vec<_> = receiver + /// .iter() // iterating over the values in the channel + /// .collect(); // and collecting them /// b.sort(); /// /// assert_eq!(a, b); @@ -155,6 +156,43 @@ pub trait ParallelIterator<'a>: Sized + Send { { MapWith::new(self, init, operation) } + + /// Applies `operation` to a value returned by `init` with each item of this + /// iterator, producing a new iterator with the results. + /// + /// The `init` function will be called only as needed for a value to be + /// paired with the group of items in each rayon job. There is no + /// constraint on that returned type at all! + /// + /// # Examples + /// + /// ``` + /// use rand::Rng; + /// use rayon::prelude::*; + /// + /// let a: Vec<_> = (1i32..1_000_000) + /// .into_par_iter() + /// .map_init( + /// || rand::thread_rng(), // get the thread-local RNG + /// |rng, x| if rng.gen() { // randomly negate items + /// -x + /// } else { + /// x + /// }, + /// ).collect(); + /// + /// // There's a remote chance that this will fail... + /// assert!(a.iter().any(|&x| x < 0)); + /// assert!(a.iter().any(|&x| x > 0)); + /// ``` + fn map_init(self, init: S, operation: O) -> MapInit + where + O: Fn(&mut U, Self::Item) -> T + Sync + Send, + S: Fn() -> U + Sync + Send, + T: Send, + { + MapInit::new(self, init, operation) + } } /// An iterator that supports "random access" to its data, meaning diff --git a/asparit/src/inner/for_each.rs b/asparit/src/inner/for_each.rs index b22e6ee..9799423 100644 --- a/asparit/src/inner/for_each.rs +++ b/asparit/src/inner/for_each.rs @@ -91,9 +91,14 @@ mod tests { #[tokio::test] async fn test_for_each() { + use ::std::sync::Arc; + use ::std::sync::atomic::{AtomicUsize, Ordering}; + + let i = Arc::new(AtomicUsize::new(0)); + let x = (0..10usize) .into_par_iter() - .map_with(5, |init, item| Some((*init, item))) + .map_init(move || { i.fetch_add(1, Ordering::Relaxed) }, |init, item| Some((*init, item))) .for_each(|j| { println!("{:?}", j); }) diff --git a/asparit/src/inner/map.rs b/asparit/src/inner/map.rs index 630a144..e818bde 100644 --- a/asparit/src/inner/map.rs +++ b/asparit/src/inner/map.rs @@ -169,9 +169,9 @@ where ) } - fn fold_with(self, folder: G) -> G + fn fold_with(self, folder: F) -> F where - G: Folder, + F: Folder, { let folder = MapFolder { base: folder, @@ -226,9 +226,9 @@ where ) } - fn fold_with(self, folder: G) -> G + fn fold_with(self, folder: F) -> F where - G: Folder, + F: Folder, { let folder = MapFolder { base: folder, diff --git a/asparit/src/inner/map_init.rs b/asparit/src/inner/map_init.rs new file mode 100644 index 0000000..b21b1ef --- /dev/null +++ b/asparit/src/inner/map_init.rs @@ -0,0 +1,332 @@ +use crate::{ + Consumer, Folder, IndexedConsumer, IndexedParallelIterator, IndexedProducer, Reducer, + IndexedProducerCallback, ParallelIterator, Producer, ProducerCallback, Executor, +}; + +use super::map_with::{MapWithIter, MapWithFolder}; + +/* MapInit */ + +pub struct MapInit { + base: X, + init: S, + operation: O, +} + +impl MapInit { + pub fn new(base: X, init: S, operation: O) -> Self { + Self { base, init, operation } + } +} + +impl<'a, X, O, T, S, U> ParallelIterator<'a> for MapInit +where + X: ParallelIterator<'a>, + O: Fn(&mut U, X::Item) -> T + Clone + Sync + Send + 'a, + T: Send, + S: Fn() -> U + Clone + Send + 'a, +{ + type Item = T; + + fn drive(self, executor: E, consumer: C) -> E::Result + where + E: Executor<'a, D>, + C: Consumer + 'a, + D: Send, + R: Reducer + Send, + { + let consumer = MapInitConsumer::new(consumer, self.init, self.operation); + + self.base.drive(executor, consumer) + } + + fn with_producer(self, callback: CB) -> CB::Output + where + CB: ProducerCallback<'a, Self::Item>, + { + self.base.with_producer(MapInitCallback { + callback, + init: self.init, + operation: self.operation, + }) + } + + fn len_hint_opt(&self) -> Option { + self.base.len_hint_opt() + } +} + +impl<'a, X, O, T, S, U> IndexedParallelIterator<'a> for MapInit +where + X: IndexedParallelIterator<'a>, + O: Fn(&mut U, X::Item) -> T + Clone + Sync + Send + 'a, + T: Send, + S: Fn() -> U + Clone + Send + 'a, +{ + fn drive_indexed(self, executor: E, consumer: C) -> E::Result + where + E: Executor<'a, D>, + C: IndexedConsumer, + D: Send, + R: Reducer + { + let consumer = MapInitConsumer::new(consumer, self.init, self.operation); + + self.base.drive_indexed(executor, consumer) + } + + fn with_producer_indexed(self, callback: CB) -> CB::Output + where + CB: IndexedProducerCallback<'a, Self::Item>, + { + self.base.with_producer_indexed(MapInitCallback { + callback, + init: self.init, + operation: self.operation, + }) + } + + fn len_hint(&self) -> usize { + self.base.len_hint() + } +} + +/* MapInitCallback */ + +struct MapInitCallback { + callback: CB, + init: S, + operation: O, +} + +impl<'a, I, S, O, T, U, CB> ProducerCallback<'a, I> for MapInitCallback +where + CB: ProducerCallback<'a, T>, + O: Fn(&mut U, I) -> T + Clone + Sync + Send + 'a, + T: Send, + S: Fn() -> U + Clone + Send + 'a, +{ + type Output = CB::Output; + + fn callback

(self, base: P) -> CB::Output + where + P: Producer + 'a, + { + let producer = MapInitProducer { + base, + init: self.init, + operation: self.operation, + }; + + self.callback.callback(producer) + } +} + +impl<'a, I, S, O, T, U, CB> IndexedProducerCallback<'a, I> for MapInitCallback +where + CB: IndexedProducerCallback<'a, T>, + O: Fn(&mut U, I) -> T + Clone + Sync + Send + 'a, + T: Send, + S: Fn() -> U + Clone + Send + 'a, +{ + type Output = CB::Output; + + fn callback

(self, base: P) -> CB::Output + where + P: IndexedProducer + 'a, + { + let producer = MapInitProducer { + base, + init: self.init, + operation: self.operation, + }; + + self.callback.callback(producer) + } +} + +/* MapInitProducer */ + +struct MapInitProducer { + base: P, + init: S, + operation: O, +} + +impl Producer for MapInitProducer +where + P: Producer, + O: Fn(&mut U, P::Item) -> T + Clone + Sync + Send, + T: Send, + S: Fn() -> U + Clone + Send, +{ + type Item = T; + type IntoIter = MapWithIter; + + fn into_iter(self) -> Self::IntoIter { + MapWithIter { + base: self.base.into_iter(), + item: (self.init)(), + operation: self.operation, + } + } + + fn split(self) -> (Self, Option) { + let init = self.init; + let operation = self.operation; + let (left, right) = self.base.split(); + + ( + MapInitProducer { + base: left, + init: init.clone(), + operation: operation.clone(), + }, + right.map(|right| MapInitProducer { + base: right, + init, + operation, + }), + ) + } + + fn fold_with(self, folder: F) -> F + where + F: Folder, + { + let folder = MapWithFolder { + base: folder, + item: (self.init)(), + operation: self.operation, + }; + + self.base.fold_with(folder).base + } +} + +impl IndexedProducer for MapInitProducer +where + P: IndexedProducer, + O: Fn(&mut U, P::Item) -> T + Clone + Sync + Send, + T: Send, + S: Fn() -> U + Clone + Send, +{ + type Item = T; + type IntoIter = MapWithIter; + + fn into_iter(self) -> Self::IntoIter { + MapWithIter { + base: self.base.into_iter(), + item: (self.init)(), + operation: self.operation, + } + } + + fn splits(&self) -> Option { + self.base.splits() + } + + fn len(&self) -> usize { + self.base.len() + } + + fn min_len(&self) -> Option { + self.base.min_len() + } + + fn max_len(&self) -> Option { + self.base.max_len() + } + + fn split_at(self, index: usize) -> (Self, Self) { + let (left, right) = self.base.split_at(index); + + ( + MapInitProducer { + base: left, + init: self.init.clone(), + operation: self.operation.clone(), + }, + MapInitProducer { + base: right, + init: self.init, + operation: self.operation, + }, + ) + } + + fn fold_with(self, folder: F) -> F + where + F: Folder, + { + let folder = MapWithFolder { + base: folder, + item: (self.init)(), + operation: self.operation, + }; + + self.base.fold_with(folder).base + } +} + +/* MapInitConsumer */ + +struct MapInitConsumer { + base: C, + init: S, + operation: O, +} + +impl MapInitConsumer { + fn new(base: C, init: S, operation: O) -> Self { + Self { base, init, operation } + } +} + +impl Consumer for MapInitConsumer +where + C: Consumer, + O: Fn(&mut U, I) -> T + Clone + Send + Sync, + T: Send, + S: Fn() -> U + Clone + Send, +{ + type Folder = MapWithFolder; + type Reducer = C::Reducer; + type Result = C::Result; + + fn split_off_left(&self) -> (Self, Self::Reducer) { + let (left, reducer) = self.base.split_off_left(); + + (MapInitConsumer::new(left, self.init.clone(), self.operation.clone()), reducer) + } + + fn into_folder(self) -> Self::Folder { + MapWithFolder { + base: self.base.into_folder(), + item: (self.init)(), + operation: self.operation, + } + } + + fn is_full(&self) -> bool { + self.base.is_full() + } +} + +impl IndexedConsumer for MapInitConsumer +where + C: IndexedConsumer, + O: Fn(&mut U, I) -> T + Clone + Send + Sync, + T: Send, + S: Fn() -> U + Clone + Send, +{ + fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) { + let (left, right, reducer) = self.base.split_at(index); + + ( + MapInitConsumer::new(left, self.init.clone(), self.operation.clone()), + MapInitConsumer::new(right, self.init, self.operation), + reducer, + ) + } +} diff --git a/asparit/src/inner/map_with.rs b/asparit/src/inner/map_with.rs index db3df76..fbf0424 100644 --- a/asparit/src/inner/map_with.rs +++ b/asparit/src/inner/map_with.rs @@ -188,9 +188,9 @@ where ) } - fn fold_with(self, folder: G) -> G + fn fold_with(self, folder: F) -> F where - G: Folder, + F: Folder, { let folder = MapWithFolder { base: folder, @@ -253,9 +253,9 @@ where ) } - fn fold_with(self, folder: G) -> G + fn fold_with(self, folder: F) -> F where - G: Folder, + F: Folder, { let folder = MapWithFolder { base: folder, @@ -269,10 +269,10 @@ where /* MapWithIter */ -struct MapWithIter { - base: I, - item: S, - operation: O, +pub struct MapWithIter { + pub base: I, + pub item: S, + pub operation: O, } impl Iterator for MapWithIter @@ -375,10 +375,10 @@ where /* MapWithFolder */ -struct MapWithFolder { - base: F, - item: S, - operation: O, +pub struct MapWithFolder { + pub base: F, + pub item: S, + pub operation: O, } impl Folder for MapWithFolder diff --git a/asparit/src/inner/mod.rs b/asparit/src/inner/mod.rs index f2055fa..0c00835 100644 --- a/asparit/src/inner/mod.rs +++ b/asparit/src/inner/mod.rs @@ -2,3 +2,4 @@ pub mod for_each; pub mod map; pub mod noop; pub mod map_with; +pub mod map_init;