diff --git a/asparit/src/core/iterator.rs b/asparit/src/core/iterator.rs index 0494aa4..edc7537 100644 --- a/asparit/src/core/iterator.rs +++ b/asparit/src/core/iterator.rs @@ -1,6 +1,6 @@ use super::{Consumer, IndexedConsumer, IndexedProducerCallback, ProducerCallback, Executor, Reducer}; -use crate::inner::{for_each::ForEach, map::Map}; +use crate::inner::{for_each::ForEach, map::Map, map_with::MapWith}; /// Parallel version of the standard iterator trait. /// @@ -96,7 +96,7 @@ pub trait ParallelIterator<'a>: Sized + Send { ForEach::new(self, operation) } - /// Applies `map_op` to each item of this iterator, producing a new + /// Applies `operation` to each item of this iterator, producing a new /// iterator with the results. /// /// # Examples @@ -117,6 +117,44 @@ pub trait ParallelIterator<'a>: Sized + Send { { Map::new(self, operation) } + + /// Applies `operation` to the given `init` value with each item of this + /// iterator, producing a new iterator with the results. + /// + /// The `init` value will be cloned only as needed to be paired with + /// the group of items in each rayon job. It does not require the type + /// to be `Sync`. + /// + /// # Examples + /// + /// ``` + /// use std::sync::mpsc::channel; + /// use rayon::prelude::*; + /// + /// let (sender, receiver) = channel(); + /// + /// let a: Vec<_> = (0..5) + /// .into_par_iter() // iterating over i32 + /// .map_with(sender, |s, x| { + /// s.send(x).unwrap(); // sending i32 values through the channel + /// x // returning i32 + /// }) + /// .collect(); // collecting the returned values into a vector + /// + /// let mut b: Vec<_> = receiver.iter() // iterating over the values in the channel + /// .collect(); // and collecting them + /// b.sort(); + /// + /// assert_eq!(a, b); + /// ``` + fn map_with(self, init: S, operation: O) -> MapWith + where + O: Fn(&mut S, Self::Item) -> T + Sync + Send, + S: Send + Clone, + T: Send, + { + MapWith::new(self, init, operation) + } } /// An iterator that supports "random access" to its data, meaning diff --git a/asparit/src/inner/for_each.rs b/asparit/src/inner/for_each.rs index 0de3069..b22e6ee 100644 --- a/asparit/src/inner/for_each.rs +++ b/asparit/src/inner/for_each.rs @@ -93,7 +93,7 @@ mod tests { async fn test_for_each() { let x = (0..10usize) .into_par_iter() - .map(Some) + .map_with(5, |init, item| Some((*init, item))) .for_each(|j| { println!("{:?}", j); }) diff --git a/asparit/src/inner/map.rs b/asparit/src/inner/map.rs index 4d9887a..630a144 100644 --- a/asparit/src/inner/map.rs +++ b/asparit/src/inner/map.rs @@ -19,7 +19,7 @@ impl Map { impl<'a, X, O, T> ParallelIterator<'a> for Map where X: ParallelIterator<'a>, - O: Fn(X::Item) -> T + Sync + Send + Copy + 'a, + O: Fn(X::Item) -> T + Clone + Sync + Send + 'a, T: Send, { type Item = O::Output; @@ -54,7 +54,7 @@ where impl<'a, X, O, T> IndexedParallelIterator<'a> for Map where X: IndexedParallelIterator<'a>, - O: Fn(X::Item) -> T + Sync + Send + Copy + 'a, + O: Fn(X::Item) -> T + Clone + Sync + Send + 'a, T: Send, { fn drive_indexed(self, executor: E, consumer: C) -> E::Result @@ -94,7 +94,7 @@ struct MapCallback { impl<'a, I, O, T, CB> ProducerCallback<'a, I> for MapCallback where CB: ProducerCallback<'a, T>, - O: Fn(I) -> T + Sync + Send + Copy + 'a, + O: Fn(I) -> T + Clone + Sync + Send + 'a, T: Send, { type Output = CB::Output; @@ -115,7 +115,7 @@ where impl<'a, I, O, T, CB> IndexedProducerCallback<'a, I> for MapCallback where CB: IndexedProducerCallback<'a, T>, - O: Fn(I) -> T + Sync + Send + Copy + 'a, + O: Fn(I) -> T + Clone + Sync + Send + 'a, T: Send, { type Output = CB::Output; @@ -143,15 +143,14 @@ struct MapProducer { impl Producer for MapProducer where P: Producer, - O: Fn(P::Item) -> T + Sync + Send + Copy, + O: Fn(P::Item) -> T + Clone + Sync + Send, T: Send, { type Item = O::Output; type IntoIter = std::iter::Map; fn into_iter(self) -> Self::IntoIter { - // self.base.into_iter().map(self.operation) - unimplemented!() + self.base.into_iter().map(self.operation) } fn split(self) -> (Self, Option) { @@ -161,7 +160,7 @@ where ( MapProducer { base: left, - operation, + operation: operation.clone(), }, right.map(|right| MapProducer { base: right, @@ -186,7 +185,7 @@ where impl IndexedProducer for MapProducer where P: IndexedProducer, - O: Fn(P::Item) -> T + Sync + Send + Copy, + O: Fn(P::Item) -> T + Clone + Sync + Send, T: Send, { type Item = O::Output; @@ -213,17 +212,16 @@ where } fn split_at(self, index: usize) -> (Self, Self) { - let operation = self.operation; let (left, right) = self.base.split_at(index); ( MapProducer { base: left, - operation, + operation: self.operation.clone(), }, MapProducer { base: right, - operation, + operation: self.operation, }, ) } @@ -257,7 +255,7 @@ impl MapConsumer { impl Consumer for MapConsumer where C: Consumer, - O: Fn(I) -> T + Send + Sync + Copy, + O: Fn(I) -> T + Clone + Send + Sync, T: Send, { type Folder = MapFolder; @@ -267,7 +265,7 @@ where fn split_off_left(&self) -> (Self, Self::Reducer) { let (left, reducer) = self.base.split_off_left(); - (MapConsumer::new(left, self.operation), reducer) + (MapConsumer::new(left, self.operation.clone()), reducer) } fn into_folder(self) -> Self::Folder { @@ -285,14 +283,14 @@ where impl IndexedConsumer for MapConsumer where C: IndexedConsumer, - O: Fn(I) -> T + Send + Sync + Copy, + O: Fn(I) -> T + Clone + Send + Sync, T: Send, { fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) { let (left, right, reducer) = self.base.split_at(index); ( - MapConsumer::new(left, self.operation), + MapConsumer::new(left, self.operation.clone()), MapConsumer::new(right, self.operation), reducer, ) @@ -309,23 +307,23 @@ struct MapFolder { impl Folder for MapFolder where F: Folder, - O: Fn(I) -> T + Copy, + O: Fn(I) -> T + Clone, { type Result = F::Result; - fn consume(self, item: I) -> Self { + fn consume(mut self, item: I) -> Self { let mapped_item = (self.operation)(item); - MapFolder { - base: self.base.consume(mapped_item), - operation: self.operation, - } + + self.base = self.base.consume(mapped_item); + + self } fn consume_iter(mut self, iter: X) -> Self where X: IntoIterator, { - self.base = self.base.consume_iter(iter.into_iter().map(self.operation)); + self.base = self.base.consume_iter(iter.into_iter().map(self.operation.clone())); self } diff --git a/asparit/src/inner/map_with.rs b/asparit/src/inner/map_with.rs new file mode 100644 index 0000000..db3df76 --- /dev/null +++ b/asparit/src/inner/map_with.rs @@ -0,0 +1,422 @@ +use crate::{ + Consumer, Folder, IndexedConsumer, IndexedParallelIterator, IndexedProducer, Reducer, + IndexedProducerCallback, ParallelIterator, Producer, ProducerCallback, Executor, +}; + +/* MapWith */ + +pub struct MapWith { + base: X, + item: S, + operation: O, +} + +impl MapWith { + pub fn new(base: X, item: S, operation: O) -> Self { + Self { base, item, operation } + } +} + +impl<'a, X, O, T, S> ParallelIterator<'a> for MapWith +where + X: ParallelIterator<'a>, + O: Fn(&mut S, X::Item) -> T + Clone + Sync + Send + 'a, + T: Send, + S: Clone + Send + 'a, +{ + type Item = T; + + fn drive(self, executor: E, consumer: C) -> E::Result + where + E: Executor<'a, D>, + C: Consumer + 'a, + D: Send, + R: Reducer + Send, + { + let consumer = MapWithConsumer::new(consumer, self.item, self.operation); + + self.base.drive(executor, consumer) + } + + fn with_producer(self, callback: CB) -> CB::Output + where + CB: ProducerCallback<'a, Self::Item>, + { + self.base.with_producer(MapWithCallback { + callback, + item: self.item, + operation: self.operation, + }) + } + + fn len_hint_opt(&self) -> Option { + self.base.len_hint_opt() + } +} + +impl<'a, X, O, T, S> IndexedParallelIterator<'a> for MapWith +where + X: IndexedParallelIterator<'a>, + O: Fn(&mut S, X::Item) -> T + Clone + Sync + Send + 'a, + T: Send, + S: Clone + Send + 'a, +{ + fn drive_indexed(self, executor: E, consumer: C) -> E::Result + where + E: Executor<'a, D>, + C: IndexedConsumer, + D: Send, + R: Reducer + { + let consumer = MapWithConsumer::new(consumer, self.item, self.operation); + + self.base.drive_indexed(executor, consumer) + } + + fn with_producer_indexed(self, callback: CB) -> CB::Output + where + CB: IndexedProducerCallback<'a, Self::Item>, + { + self.base.with_producer_indexed(MapWithCallback { + callback, + item: self.item, + operation: self.operation, + }) + } + + fn len_hint(&self) -> usize { + self.base.len_hint() + } +} + +/* MapWithCallback */ + +struct MapWithCallback { + callback: CB, + item: S, + operation: O, +} + +impl<'a, I, S, O, T, CB> ProducerCallback<'a, I> for MapWithCallback +where + CB: ProducerCallback<'a, T>, + O: Fn(&mut S, I) -> T + Clone + Sync + Send + 'a, + T: Send, + S: Clone + Send + 'a, +{ + type Output = CB::Output; + + fn callback

(self, base: P) -> CB::Output + where + P: Producer + 'a, + { + let producer = MapWithProducer { + base, + item: self.item, + operation: self.operation, + }; + + self.callback.callback(producer) + } +} + +impl<'a, I, S, O, T, CB> IndexedProducerCallback<'a, I> for MapWithCallback +where + CB: IndexedProducerCallback<'a, T>, + O: Fn(&mut S, I) -> T + Clone + Sync + Send + 'a, + T: Send, + S: Clone + Send + 'a, +{ + type Output = CB::Output; + + fn callback

(self, base: P) -> CB::Output + where + P: IndexedProducer + 'a, + { + let producer = MapWithProducer { + base, + item: self.item, + operation: self.operation, + }; + + self.callback.callback(producer) + } +} + +/* MapWithProducer */ + +struct MapWithProducer { + base: P, + item: S, + operation: O, +} + +impl Producer for MapWithProducer +where + P: Producer, + O: Fn(&mut S, P::Item) -> T + Clone + Sync + Send, + T: Send, + S: Clone + Send, +{ + type Item = T; + type IntoIter = MapWithIter; + + fn into_iter(self) -> Self::IntoIter { + MapWithIter { + base: self.base.into_iter(), + item: self.item, + operation: self.operation, + } + } + + fn split(self) -> (Self, Option) { + let item = self.item; + let operation = self.operation; + let (left, right) = self.base.split(); + + ( + MapWithProducer { + base: left, + item: item.clone(), + operation: operation.clone(), + }, + right.map(|right| MapWithProducer { + base: right, + item, + operation, + }), + ) + } + + fn fold_with(self, folder: G) -> G + where + G: Folder, + { + let folder = MapWithFolder { + base: folder, + item: self.item, + operation: self.operation, + }; + + self.base.fold_with(folder).base + } +} + +impl IndexedProducer for MapWithProducer +where + P: IndexedProducer, + O: Fn(&mut S, P::Item) -> T + Clone + Sync + Send, + T: Send, + S: Clone + Send, +{ + type Item = T; + type IntoIter = MapWithIter; + + fn into_iter(self) -> Self::IntoIter { + MapWithIter { + base: self.base.into_iter(), + item: self.item, + operation: self.operation, + } + } + + fn splits(&self) -> Option { + self.base.splits() + } + + fn len(&self) -> usize { + self.base.len() + } + + fn min_len(&self) -> Option { + self.base.min_len() + } + + fn max_len(&self) -> Option { + self.base.max_len() + } + + fn split_at(self, index: usize) -> (Self, Self) { + let (left, right) = self.base.split_at(index); + + ( + MapWithProducer { + base: left, + item: self.item.clone(), + operation: self.operation.clone(), + }, + MapWithProducer { + base: right, + item: self.item, + operation: self.operation, + }, + ) + } + + fn fold_with(self, folder: G) -> G + where + G: Folder, + { + let folder = MapWithFolder { + base: folder, + item: self.item, + operation: self.operation, + }; + + self.base.fold_with(folder).base + } +} + +/* MapWithIter */ + +struct MapWithIter { + base: I, + item: S, + operation: O, +} + +impl Iterator for MapWithIter +where + I: Iterator, + O: Fn(&mut S, I::Item) -> T, +{ + type Item = T; + + fn next(&mut self) -> Option { + let item = self.base.next()?; + + Some((self.operation)(&mut self.item, item)) + } + + fn size_hint(&self) -> (usize, Option) { + self.base.size_hint() + } +} + +impl DoubleEndedIterator for MapWithIter +where + I: DoubleEndedIterator, + O: Fn(&mut S, I::Item) -> T, +{ + fn next_back(&mut self) -> Option { + let item = self.base.next_back()?; + + Some((self.operation)(&mut self.item, item)) + } +} + +impl ExactSizeIterator for MapWithIter +where + I: ExactSizeIterator, + O: Fn(&mut S, I::Item) -> T, +{ } + +/* MapWithConsumer */ + +struct MapWithConsumer { + base: C, + item: S, + operation: O, +} + +impl MapWithConsumer { + fn new(base: C, item: S, operation: O) -> Self { + Self { base, item, operation } + } +} + +impl Consumer for MapWithConsumer +where + C: Consumer, + O: Fn(&mut S, I) -> T + Clone + Send + Sync, + T: Send, + S: Clone + Send, +{ + type Folder = MapWithFolder; + type Reducer = C::Reducer; + type Result = C::Result; + + fn split_off_left(&self) -> (Self, Self::Reducer) { + let (left, reducer) = self.base.split_off_left(); + + (MapWithConsumer::new(left, self.item.clone(), self.operation.clone()), reducer) + } + + fn into_folder(self) -> Self::Folder { + MapWithFolder { + base: self.base.into_folder(), + item: self.item, + operation: self.operation, + } + } + + fn is_full(&self) -> bool { + self.base.is_full() + } +} + +impl IndexedConsumer for MapWithConsumer +where + C: IndexedConsumer, + O: Fn(&mut S, I) -> T + Clone + Send + Sync, + T: Send, + S: Clone + Send, +{ + fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) { + let (left, right, reducer) = self.base.split_at(index); + + ( + MapWithConsumer::new(left, self.item.clone(), self.operation.clone()), + MapWithConsumer::new(right, self.item, self.operation), + reducer, + ) + } +} + +/* MapWithFolder */ + +struct MapWithFolder { + base: F, + item: S, + operation: O, +} + +impl Folder for MapWithFolder +where + F: Folder, + O: Fn(&mut S, I) -> T + Clone, +{ + type Result = F::Result; + + fn consume(mut self, item: I) -> Self { + let mapped_item = (self.operation)(&mut self.item, item); + + self.base = self.base.consume(mapped_item); + + self + } + + fn consume_iter(mut self, iter: X) -> Self + where + X: IntoIterator, + { + fn with<'f, I, S, T>(item: &'f mut S, operation: impl Fn(&mut S, I) -> T + 'f, + ) -> impl FnMut(I) -> T + 'f { + move |x| operation(item, x) + } + + let mapped_iter = iter.into_iter().map(with(&mut self.item, self.operation.clone())); + + self.base = self.base.consume_iter(mapped_iter); + + self + } + + fn complete(self) -> F::Result { + self.base.complete() + } + + fn is_full(&self) -> bool { + self.base.is_full() + } +} diff --git a/asparit/src/inner/mod.rs b/asparit/src/inner/mod.rs index c913c54..f2055fa 100644 --- a/asparit/src/inner/mod.rs +++ b/asparit/src/inner/mod.rs @@ -1,3 +1,4 @@ pub mod for_each; pub mod map; pub mod noop; +pub mod map_with;