From 6653dd427e1a88393b4caf53f6fd136bd85dccf3 Mon Sep 17 00:00:00 2001 From: Bergmann89 Date: Sat, 7 Nov 2020 10:58:17 +0100 Subject: [PATCH] Implemented 'try_fold' and 'try_fold_with' operation --- asparit/src/core/iterator.rs | 62 ++++++ asparit/src/inner/mod.rs | 5 +- asparit/src/inner/try_fold.rs | 379 ++++++++++++++++++++++++++++++++++ 3 files changed, 445 insertions(+), 1 deletion(-) create mode 100644 asparit/src/inner/try_fold.rs diff --git a/asparit/src/core/iterator.rs b/asparit/src/core/iterator.rs index d134b94..6546d6f 100644 --- a/asparit/src/core/iterator.rs +++ b/asparit/src/core/iterator.rs @@ -16,6 +16,7 @@ use crate::{ map_init::MapInit, map_with::MapWith, reduce::Reduce, + try_fold::{TryFold, TryFoldWith}, try_for_each::{TryForEach, TryForEachInit, TryForEachWith}, try_reduce::TryReduce, update::Update, @@ -789,6 +790,67 @@ pub trait ParallelIterator<'a>: Sized + Send { FoldWith::new(self, init, operation) } + /// Performs a fallible parallel fold. + /// + /// This is a variation of [`fold()`] for operations which can fail with + /// `Option::None` or `Result::Err`. The first such failure stops + /// processing the local set of items, without affecting other folds in the + /// iterator's subdivisions. + /// + /// Often, `try_fold()` will be followed by [`try_reduce()`] + /// for a final reduction and global short-circuiting effect. + /// + /// [`fold()`]: #method.fold + /// [`try_reduce()`]: #method.try_reduce + /// + /// # Examples + /// + /// ``` + /// use rayon::prelude::*; + /// + /// let bytes = 0..22_u8; + /// let sum = bytes.into_par_iter() + /// .try_fold(|| 0_u32, |a: u32, b: u8| a.checked_add(b as u32)) + /// .try_reduce(|| 0, u32::checked_add); + /// + /// assert_eq!(sum, Some((0..22).sum())); // compare to sequential + /// ``` + fn try_fold(self, init: S, operation: O) -> TryFold + where + S: Fn() -> U + Clone + Send + 'a, + O: Fn(U, Self::Item) -> T + Clone + Send + 'a, + T: Try + Send, + { + TryFold::new(self, init, operation) + } + + /// Performs a fallible parallel fold with a cloneable `init` value. + /// + /// This combines the `init` semantics of [`fold_with()`] and the failure + /// semantics of [`try_fold()`]. + /// + /// [`fold_with()`]: #method.fold_with + /// [`try_fold()`]: #method.try_fold + /// + /// ``` + /// use rayon::prelude::*; + /// + /// let bytes = 0..22_u8; + /// let sum = bytes.into_par_iter() + /// .try_fold_with(0_u32, |a: u32, b: u8| a.checked_add(b as u32)) + /// .try_reduce(|| 0, u32::checked_add); + /// + /// assert_eq!(sum, Some((0..22).sum())); // compare to sequential + /// ``` + fn try_fold_with(self, init: U, operation: O) -> TryFoldWith + where + U: Clone + Send + 'a, + O: Fn(U, Self::Item) -> T + Clone + Send + 'a, + T: Try, + { + TryFoldWith::new(self, init, operation) + } + /// Creates a fresh collection containing all the elements produced /// by this parallel iterator. /// diff --git a/asparit/src/inner/mod.rs b/asparit/src/inner/mod.rs index be19fa7..86fcb21 100644 --- a/asparit/src/inner/mod.rs +++ b/asparit/src/inner/mod.rs @@ -11,6 +11,7 @@ pub mod map_init; pub mod map_with; pub mod noop; pub mod reduce; +pub mod try_fold; pub mod try_for_each; pub mod try_reduce; pub mod update; @@ -41,7 +42,9 @@ mod tests { move || i.fetch_add(1, Ordering::Relaxed), |init, item| (item, *init), ) - .fold_with(String::new(), |s, item| format!("{} + {:?}", s, item)) + .try_fold_with(String::new(), |s, item| -> Result { + Ok(format!("{} + {:?}", s, item)) + }) .try_for_each_init( move || j.fetch_add(1, Ordering::Relaxed), |init, item| -> Result<(), ()> { diff --git a/asparit/src/inner/try_fold.rs b/asparit/src/inner/try_fold.rs new file mode 100644 index 0000000..c211876 --- /dev/null +++ b/asparit/src/inner/try_fold.rs @@ -0,0 +1,379 @@ +use std::marker::PhantomData; + +use crate::{ + misc::Try, Consumer, Executor, Folder, ParallelIterator, Producer, ProducerCallback, Reducer, +}; + +/* TryFold */ + +pub struct TryFold { + base: X, + init: S, + operation: O, + marker: PhantomData, +} + +impl TryFold { + pub fn new(base: X, init: S, operation: O) -> Self { + Self { + base, + init, + operation, + marker: PhantomData, + } + } +} + +impl<'a, X, S, O, U, T> ParallelIterator<'a> for TryFold +where + X: ParallelIterator<'a>, + S: Fn() -> U + Clone + Send + 'a, + O: Fn(U, X::Item) -> T + Clone + Send + 'a, + U: Send, + T: Try + Send + 'a, +{ + type Item = T; + + fn drive(self, executor: E, consumer: C) -> E::Result + where + E: Executor<'a, D>, + C: Consumer + 'a, + D: Send, + R: Reducer + Send, + { + self.base.drive( + executor, + TryFoldConsumer { + base: consumer, + init: self.init, + operation: self.operation, + marker: PhantomData, + }, + ) + } + + fn with_producer(self, callback: CB) -> CB::Output + where + CB: ProducerCallback<'a, Self::Item>, + { + self.base.with_producer(TryFoldCallback { + base: callback, + init: self.init, + operation: self.operation, + marker: PhantomData, + }) + } +} + +/* TryFoldWith */ + +pub struct TryFoldWith { + base: X, + init: U, + operation: O, + marker: PhantomData, +} + +impl TryFoldWith { + pub fn new(base: X, init: U, operation: O) -> Self { + Self { + base, + init, + operation, + marker: PhantomData, + } + } +} + +impl<'a, X, U, O, T> ParallelIterator<'a> for TryFoldWith +where + X: ParallelIterator<'a>, + U: Clone + Send + 'a, + O: Fn(U, X::Item) -> T + Clone + Send + 'a, + T: Try + Send + 'a, +{ + type Item = T; + + fn drive(self, executor: E, consumer: C) -> E::Result + where + E: Executor<'a, D>, + C: Consumer + 'a, + D: Send, + R: Reducer + Send, + { + let TryFoldWith { + base, + init, + operation, + .. + } = self; + + base.drive( + executor, + TryFoldConsumer { + base: consumer, + init: move || init.clone(), + operation, + marker: PhantomData, + }, + ) + } + + fn with_producer(self, callback: CB) -> CB::Output + where + CB: ProducerCallback<'a, Self::Item>, + { + let TryFoldWith { + base, + init, + operation, + .. + } = self; + + base.with_producer(TryFoldCallback { + base: callback, + init: move || init.clone(), + operation, + marker: PhantomData, + }) + } +} + +/* TryFoldConsumer */ + +struct TryFoldConsumer { + base: C, + init: S, + operation: O, + marker: PhantomData, +} + +impl<'a, C, S, O, T, I> Consumer for TryFoldConsumer +where + C: Consumer, + S: Fn() -> T::Ok + Clone + Send, + O: Fn(T::Ok, I) -> T + Clone + Send, + T: Try + Send, +{ + type Folder = TryFoldFolder; + type Reducer = C::Reducer; + type Result = C::Result; + + fn split(self) -> (Self, Self, Self::Reducer) { + let (left, right, reducer) = self.base.split(); + + let left = TryFoldConsumer { + base: left, + init: self.init.clone(), + operation: self.operation.clone(), + marker: PhantomData, + }; + let right = TryFoldConsumer { + base: right, + init: self.init, + operation: self.operation, + marker: PhantomData, + }; + + (left, right, reducer) + } + + fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) { + let (left, right, reducer) = self.base.split_at(index); + + let left = TryFoldConsumer { + base: left, + init: self.init.clone(), + operation: self.operation.clone(), + marker: PhantomData, + }; + let right = TryFoldConsumer { + base: right, + init: self.init, + operation: self.operation, + marker: PhantomData, + }; + + (left, right, reducer) + } + + fn into_folder(self) -> Self::Folder { + TryFoldFolder { + base: self.base.into_folder(), + item: T::from_ok((self.init)()), + operation: self.operation, + } + } + + fn is_full(&self) -> bool { + self.base.is_full() + } +} + +/* TryFoldCallback */ + +struct TryFoldCallback { + base: CB, + init: S, + operation: O, + marker: PhantomData, +} + +impl<'a, CB, S, O, T, I> ProducerCallback<'a, I> for TryFoldCallback +where + CB: ProducerCallback<'a, T>, + S: Fn() -> T::Ok + Clone + Send + 'a, + O: Fn(T::Ok, I) -> T + Clone + Send + 'a, + T: Try + Send + 'a, +{ + type Output = CB::Output; + + fn callback

(self, producer: P) -> Self::Output + where + P: Producer + 'a, + { + self.base.callback(TryFoldProducer { + base: producer, + init: self.init, + operation: self.operation, + marker: PhantomData, + }) + } +} + +/* TryFoldProducer */ + +struct TryFoldProducer { + base: P, + init: S, + operation: O, + marker: PhantomData, +} + +impl<'a, P, S, O, T> Producer for TryFoldProducer +where + P: Producer, + S: Fn() -> T::Ok + Clone + Send, + O: Fn(T::Ok, P::Item) -> T + Clone + Send, + T: Try + Send, +{ + type Item = T; + type IntoIter = std::iter::Once; + + fn into_iter(self) -> Self::IntoIter { + let mut ret = T::from_ok((self.init)()); + + for item in self.base.into_iter() { + match ret.into_result() { + Ok(value) => ret = (self.operation)(value, item), + Err(err) => return std::iter::once(T::from_error(err)), + } + } + + std::iter::once(ret) + } + + fn split(self) -> (Self, Option) { + let init = self.init; + let operation = self.operation; + let (left, right) = self.base.split(); + + let left = TryFoldProducer { + base: left, + init: init.clone(), + operation: operation.clone(), + marker: PhantomData, + }; + let right = right.map(move |right| TryFoldProducer { + base: right, + init, + operation, + marker: PhantomData, + }); + + (left, right) + } + + fn splits(&self) -> Option { + self.base.splits() + } + + fn fold_with(self, folder: F) -> F + where + F: Folder, + { + self.base + .fold_with(TryFoldFolder { + base: folder, + item: T::from_ok((self.init)()), + operation: self.operation, + }) + .base + } +} + +/* TryFoldFolder */ + +struct TryFoldFolder { + base: F, + operation: O, + item: T, +} + +impl Folder for TryFoldFolder +where + F: Folder, + O: Fn(T::Ok, I) -> T + Clone, + T: Try + Send, +{ + type Result = F::Result; + + fn consume(mut self, item: I) -> Self { + self.item = match self.item.into_result() { + Ok(value) => (self.operation)(value, item), + Err(err) => T::from_error(err), + }; + + self + } + + fn consume_iter(self, iter: X) -> Self + where + X: IntoIterator, + { + let TryFoldFolder { + base, + operation, + mut item, + } = self; + + for next in iter.into_iter() { + if base.is_full() { + break; + } + + match item.into_result() { + Ok(value) => item = operation(value, next), + Err(err) => { + item = T::from_error(err); + + break; + } + } + } + + TryFoldFolder { + base, + operation, + item, + } + } + + fn complete(self) -> Self::Result { + self.base.consume(self.item).complete() + } + + fn is_full(&self) -> bool { + self.base.is_full() + } +}