From 1d6536ece077539d8f50360687b6e7964c594a92 Mon Sep 17 00:00:00 2001 From: Bergmann89 Date: Fri, 6 Nov 2020 22:11:15 +0100 Subject: [PATCH] Implemented 'update' operation --- asparit/src/core/iterator.rs | 21 ++ asparit/src/inner/mod.rs | 3 +- asparit/src/inner/update.rs | 421 +++++++++++++++++++++++++++++++++++ 3 files changed, 444 insertions(+), 1 deletion(-) create mode 100644 asparit/src/inner/update.rs diff --git a/asparit/src/core/iterator.rs b/asparit/src/core/iterator.rs index cde3437..6a26332 100644 --- a/asparit/src/core/iterator.rs +++ b/asparit/src/core/iterator.rs @@ -15,6 +15,7 @@ use crate::{ reduce::Reduce, try_for_each::{TryForEach, TryForEachInit, TryForEachWith}, try_reduce::TryReduce, + update::Update, }, misc::Try, }; @@ -472,6 +473,26 @@ pub trait ParallelIterator<'a>: Sized + Send { Inspect::new(self, operation) } + /// Mutates each item of this iterator before yielding it. + /// + /// # Examples + /// + /// ``` + /// use rayon::prelude::*; + /// + /// let par_iter = (0..5).into_par_iter().update(|x| {*x *= 2;}); + /// + /// let doubles: Vec<_> = par_iter.collect(); + /// + /// assert_eq!(&doubles[..], &[0, 2, 4, 6, 8]); + /// ``` + fn update(self, operation: O) -> Update + where + O: Fn(&mut Self::Item) + Clone + Send + 'a, + { + Update::new(self, operation) + } + /// Reduces the items in the iterator into one item using `operation`. /// The argument `identity` should be a closure that can produce /// "identity" value which may be inserted into the sequence as diff --git a/asparit/src/inner/mod.rs b/asparit/src/inner/mod.rs index ea8b00d..80219c5 100644 --- a/asparit/src/inner/mod.rs +++ b/asparit/src/inner/mod.rs @@ -10,6 +10,7 @@ pub mod noop; pub mod reduce; pub mod try_for_each; pub mod try_reduce; +pub mod update; #[cfg(test)] mod tests { @@ -35,7 +36,7 @@ mod tests { let x = x .par_iter() .cloned() - .inspect(|x| println!("inspect: {:?}", x)) + .update(|x| x.push(5)) .map_init( move || i.fetch_add(1, Ordering::Relaxed), |init, item| Some((*init, item)), diff --git a/asparit/src/inner/update.rs b/asparit/src/inner/update.rs new file mode 100644 index 0000000..315d02a --- /dev/null +++ b/asparit/src/inner/update.rs @@ -0,0 +1,421 @@ +use crate::{ + Consumer, Executor, Folder, IndexedParallelIterator, IndexedProducer, IndexedProducerCallback, + ParallelIterator, Producer, ProducerCallback, Reducer, +}; + +/* Update */ + +pub struct Update { + base: X, + operation: O, +} + +impl Update { + pub fn new(base: X, operation: O) -> Self { + Self { base, operation } + } +} + +impl<'a, X, O> ParallelIterator<'a> for Update +where + X: ParallelIterator<'a>, + O: Fn(&mut X::Item) + Clone + Send + 'a, +{ + type Item = X::Item; + + fn drive(self, executor: E, consumer: C) -> E::Result + where + E: Executor<'a, D>, + C: Consumer + 'a, + D: Send, + R: Reducer + Send, + { + self.base.drive( + executor, + UpdateConsumer { + base: consumer, + operation: self.operation, + }, + ) + } + + fn with_producer(self, callback: CB) -> CB::Output + where + CB: ProducerCallback<'a, Self::Item>, + { + self.base.with_producer(UpdateCallback { + base: callback, + operation: self.operation, + }) + } + + fn len_hint_opt(&self) -> Option { + self.base.len_hint_opt() + } +} + +impl<'a, X, O> IndexedParallelIterator<'a> for Update +where + X: IndexedParallelIterator<'a>, + O: Fn(&mut X::Item) + Clone + Send + 'a, +{ + fn drive_indexed(self, executor: E, consumer: C) -> E::Result + where + E: Executor<'a, D>, + C: Consumer + 'a, + D: Send, + R: Reducer + Send, + { + self.base.drive_indexed( + executor, + UpdateConsumer { + base: consumer, + operation: self.operation, + }, + ) + } + + fn with_producer_indexed(self, callback: CB) -> CB::Output + where + CB: IndexedProducerCallback<'a, Self::Item>, + { + self.base.with_producer_indexed(UpdateCallback { + base: callback, + operation: self.operation, + }) + } + + fn len_hint(&self) -> usize { + self.base.len_hint() + } +} + +/* UpdateConsumer */ + +struct UpdateConsumer { + base: C, + operation: O, +} + +impl<'a, C, O, T> Consumer for UpdateConsumer +where + C: Consumer, + O: Fn(&mut T) + Clone + Send, +{ + type Folder = UpdateFolder; + type Reducer = C::Reducer; + type Result = C::Result; + + fn split(self) -> (Self, Self, Self::Reducer) { + let (left, right, reducer) = self.base.split(); + + let left = UpdateConsumer { + base: left, + operation: self.operation.clone(), + }; + let right = UpdateConsumer { + base: right, + operation: self.operation, + }; + + (left, right, reducer) + } + + fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) { + let (left, right, reducer) = self.base.split_at(index); + + let left = UpdateConsumer { + base: left, + operation: self.operation.clone(), + }; + let right = UpdateConsumer { + base: right, + operation: self.operation, + }; + + (left, right, reducer) + } + + fn into_folder(self) -> Self::Folder { + UpdateFolder { + base: self.base.into_folder(), + operation: self.operation, + } + } + + fn is_full(&self) -> bool { + self.base.is_full() + } +} + +/* UpdateFolder */ + +struct UpdateFolder { + base: F, + operation: O, +} + +impl Folder for UpdateFolder +where + F: Folder, + O: Fn(&mut T) + Clone, +{ + type Result = F::Result; + + fn consume(mut self, mut item: T) -> Self { + (self.operation)(&mut item); + + self.base = self.base.consume(item); + + self + } + + fn consume_iter(mut self, iter: X) -> Self + where + X: IntoIterator, + { + self.base = self + .base + .consume_iter(iter.into_iter().map(apply(self.operation.clone()))); + + self + } + + fn complete(self) -> Self::Result { + self.base.complete() + } + + fn is_full(&self) -> bool { + self.base.is_full() + } +} + +/* UpdateCallback */ + +struct UpdateCallback { + base: CB, + operation: O, +} + +impl<'a, CB, O, T> ProducerCallback<'a, T> for UpdateCallback +where + CB: ProducerCallback<'a, T>, + O: Fn(&mut T) + Clone + Send + 'a, +{ + type Output = CB::Output; + + fn callback

(self, producer: P) -> Self::Output + where + P: Producer + 'a, + { + self.base.callback(UpdateProducer { + base: producer, + operation: self.operation, + }) + } +} + +impl<'a, CB, O, T> IndexedProducerCallback<'a, T> for UpdateCallback +where + CB: IndexedProducerCallback<'a, T>, + O: Fn(&mut T) + Clone + Send + 'a, +{ + type Output = CB::Output; + + fn callback

(self, producer: P) -> Self::Output + where + P: IndexedProducer + 'a, + { + self.base.callback(UpdateProducer { + base: producer, + operation: self.operation, + }) + } +} + +/* UpdateProducer */ + +struct UpdateProducer { + base: P, + operation: O, +} + +impl<'a, P, O, T> Producer for UpdateProducer +where + P: Producer, + O: Fn(&mut T) + Clone + Send, +{ + type Item = T; + type IntoIter = UpdateIter; + + fn into_iter(self) -> Self::IntoIter { + UpdateIter { + base: self.base.into_iter(), + operation: self.operation, + } + } + + fn split(self) -> (Self, Option) { + let operation = self.operation; + let (left, right) = self.base.split(); + + let left = UpdateProducer { + base: left, + operation: operation.clone(), + }; + let right = right.map(move |right| UpdateProducer { + base: right, + operation, + }); + + (left, right) + } + + fn splits(&self) -> Option { + self.base.splits() + } + + fn fold_with(self, folder: F) -> F + where + F: Folder, + { + self.base + .fold_with(UpdateFolder { + base: folder, + operation: self.operation, + }) + .base + } +} + +impl<'a, P, O, T> IndexedProducer for UpdateProducer +where + P: IndexedProducer, + O: Fn(&mut T) + Clone + Send, +{ + type Item = T; + type IntoIter = UpdateIter; + + fn into_iter(self) -> Self::IntoIter { + UpdateIter { + base: self.base.into_iter(), + operation: self.operation, + } + } + + fn len(&self) -> usize { + self.base.len() + } + + fn split_at(self, index: usize) -> (Self, Self) { + let (left, right) = self.base.split_at(index); + + let left = UpdateProducer { + base: left, + operation: self.operation.clone(), + }; + let right = UpdateProducer { + base: right, + operation: self.operation, + }; + + (left, right) + } + + fn splits(&self) -> Option { + self.base.splits() + } + + fn min_len(&self) -> Option { + self.base.min_len() + } + + fn max_len(&self) -> Option { + self.base.max_len() + } + + fn fold_with(self, folder: F) -> F + where + F: Folder, + { + self.base + .fold_with(UpdateFolder { + base: folder, + operation: self.operation, + }) + .base + } +} + +/* UpdateIter */ + +struct UpdateIter { + base: X, + operation: O, +} + +impl Iterator for UpdateIter +where + X: Iterator, + O: Fn(&mut X::Item), +{ + type Item = X::Item; + + fn next(&mut self) -> Option { + let mut v = self.base.next()?; + + (self.operation)(&mut v); + + Some(v) + } + + fn size_hint(&self) -> (usize, Option) { + self.base.size_hint() + } + + fn fold(self, init: S, op: G) -> S + where + G: FnMut(S, Self::Item) -> S, + { + self.base.map(apply(self.operation)).fold(init, op) + } + + fn collect(self) -> C + where + C: std::iter::FromIterator, + { + self.base.map(apply(self.operation)).collect() + } +} + +impl ExactSizeIterator for UpdateIter +where + X: ExactSizeIterator, + O: Fn(&mut X::Item), +{ +} + +impl DoubleEndedIterator for UpdateIter +where + X: DoubleEndedIterator, + O: Fn(&mut X::Item), +{ + fn next_back(&mut self) -> Option { + let mut v = self.base.next_back()?; + + (self.operation)(&mut v); + + Some(v) + } +} + +fn apply(operation: O) -> impl Fn(T) -> T +where + O: Fn(&mut T), +{ + move |mut item| { + operation(&mut item); + + item + } +}