@@ -3,9 +3,12 @@ use super::{ | |||
ProducerCallback, Reducer, | |||
}; | |||
use crate::inner::{ | |||
collect::Collect, for_each::ForEach, map::Map, map_init::MapInit, map_with::MapWith, | |||
reduce::Reduce, | |||
use crate::{ | |||
inner::{ | |||
collect::Collect, for_each::ForEach, map::Map, map_init::MapInit, map_with::MapWith, | |||
reduce::Reduce, try_reduce::TryReduce, | |||
}, | |||
misc::Try, | |||
}; | |||
/// Parallel version of the standard iterator trait. | |||
@@ -303,6 +306,46 @@ pub trait ParallelIterator<'a>: Sized + Send { | |||
Reduce::new(self, identity, operation) | |||
} | |||
/// Reduces the items in the iterator into one item using a fallible `operation`. | |||
/// The `identity` argument is used the same way as in [`reduce()`]. | |||
/// | |||
/// [`reduce()`]: #method.reduce | |||
/// | |||
/// If a `Result::Err` or `Option::None` item is found, or if `operation` reduces | |||
/// to one, we will attempt to stop processing the rest of the items in the | |||
/// iterator as soon as possible, and we will return that terminating value. | |||
/// Otherwise, we will return the final reduced `Result::Ok(T)` or | |||
/// `Option::Some(T)`. If there are multiple errors in parallel, it is not | |||
/// specified which will be returned. | |||
/// | |||
/// # Examples | |||
/// | |||
/// ``` | |||
/// use rayon::prelude::*; | |||
/// | |||
/// // Compute the sum of squares, being careful about overflow. | |||
/// fn sum_squares<I: IntoParallelIterator<Item = i32>>(iter: I) -> Option<i32> { | |||
/// iter.into_par_iter() | |||
/// .map(|i| i.checked_mul(i)) // square each item, | |||
/// .try_reduce(|| 0, i32::checked_add) // and add them up! | |||
/// } | |||
/// assert_eq!(sum_squares(0..5), Some(0 + 1 + 4 + 9 + 16)); | |||
/// | |||
/// // The sum might overflow | |||
/// assert_eq!(sum_squares(0..10_000), None); | |||
/// | |||
/// // Or the squares might overflow before it even reaches `try_reduce` | |||
/// assert_eq!(sum_squares(1_000_000..1_000_001), None); | |||
/// ``` | |||
fn try_reduce<S, O, T>(self, identity: S, operation: O) -> TryReduce<Self, S, O> | |||
where | |||
S: Fn() -> T + Sync + Send, | |||
O: Fn(T, T) -> Self::Item + Sync + Send, | |||
Self::Item: Try<Ok = T>, | |||
{ | |||
TryReduce::new(self, identity, operation) | |||
} | |||
/// Creates a fresh collection containing all the elements produced | |||
/// by this parallel iterator. | |||
/// | |||
@@ -5,6 +5,7 @@ pub mod map_init; | |||
pub mod map_with; | |||
pub mod noop; | |||
pub mod reduce; | |||
pub mod try_reduce; | |||
#[cfg(test)] | |||
mod tests { | |||
@@ -40,12 +41,13 @@ mod tests { | |||
async fn test_reduce() { | |||
let x = (0..10usize) | |||
.into_par_iter() | |||
.reduce(|| 0, |a, b| a + b) | |||
.map::<_, Result<usize, ()>>(Ok) | |||
.try_reduce(|| 0, |a, b| Ok(a + b)) | |||
.exec() | |||
.await; | |||
dbg!(x); | |||
dbg!(&x); | |||
assert_eq!(45, x); | |||
assert_eq!(Ok(45), x); | |||
} | |||
} |
@@ -0,0 +1,164 @@ | |||
use std::sync::{ | |||
atomic::{AtomicBool, Ordering}, | |||
Arc, | |||
}; | |||
use crate::{ | |||
core::Driver, misc::Try, Consumer, Executor, Folder, IndexedConsumer, ParallelIterator, Reducer, | |||
}; | |||
pub struct TryReduce<X, S, O> { | |||
iterator: X, | |||
identity: S, | |||
operation: O, | |||
} | |||
impl<X, S, O> TryReduce<X, S, O> { | |||
pub fn new(iterator: X, identity: S, operation: O) -> Self { | |||
Self { | |||
iterator, | |||
identity, | |||
operation, | |||
} | |||
} | |||
} | |||
impl<'a, X, S, O, T> Driver<'a, T> for TryReduce<X, S, O> | |||
where | |||
X: ParallelIterator<'a, Item = T>, | |||
S: Fn() -> T::Ok + Clone + Send + 'a, | |||
O: Fn(T::Ok, T::Ok) -> T + Clone + Send + 'a, | |||
T: Try + Send, | |||
{ | |||
fn exec_with<E>(self, executor: E) -> E::Result | |||
where | |||
E: Executor<'a, X::Item>, | |||
{ | |||
let iterator = self.iterator; | |||
let identity = self.identity; | |||
let operation = self.operation; | |||
let consumer = TryReduceConsumer { | |||
identity, | |||
operation, | |||
is_full: Arc::new(AtomicBool::new(false)), | |||
}; | |||
iterator.drive(executor, consumer) | |||
} | |||
} | |||
/* TryReduceConsumer */ | |||
struct TryReduceConsumer<S, O> { | |||
identity: S, | |||
operation: O, | |||
is_full: Arc<AtomicBool>, | |||
} | |||
impl<S, O> Clone for TryReduceConsumer<S, O> | |||
where | |||
S: Clone, | |||
O: Clone, | |||
{ | |||
fn clone(&self) -> Self { | |||
Self { | |||
identity: self.identity.clone(), | |||
operation: self.operation.clone(), | |||
is_full: self.is_full.clone(), | |||
} | |||
} | |||
} | |||
impl<S, O, T> Consumer<T> for TryReduceConsumer<S, O> | |||
where | |||
S: Fn() -> T::Ok + Clone + Send, | |||
O: Fn(T::Ok, T::Ok) -> T + Clone + Send, | |||
T: Try + Send, | |||
{ | |||
type Folder = TryReduceFolder<O, T>; | |||
type Reducer = Self; | |||
type Result = T; | |||
fn split_off_left(&self) -> (Self, Self::Reducer) { | |||
(self.clone(), self.clone()) | |||
} | |||
fn into_folder(self) -> Self::Folder { | |||
TryReduceFolder { | |||
operation: self.operation, | |||
item: Ok((self.identity)()), | |||
is_full: self.is_full, | |||
} | |||
} | |||
} | |||
impl<S, O, T> IndexedConsumer<T> for TryReduceConsumer<S, O> | |||
where | |||
S: Fn() -> T::Ok + Clone + Send, | |||
O: Fn(T::Ok, T::Ok) -> T + Clone + Send, | |||
T: Try + Send, | |||
{ | |||
fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) { | |||
(self.clone(), self.clone(), self) | |||
} | |||
} | |||
impl<S, O, T> Reducer<T> for TryReduceConsumer<S, O> | |||
where | |||
O: Fn(T::Ok, T::Ok) -> T, | |||
S: Fn() -> T::Ok, | |||
T: Try + Send, | |||
{ | |||
fn reduce(self, left: T, right: T) -> T { | |||
match (left.into_result(), right.into_result()) { | |||
(Ok(left), Ok(right)) => (self.operation)(left, right), | |||
(Err(e), _) | (_, Err(e)) => T::from_error(e), | |||
} | |||
} | |||
} | |||
/* TryReduceFolder */ | |||
struct TryReduceFolder<O, T> | |||
where | |||
T: Try, | |||
{ | |||
operation: O, | |||
item: Result<T::Ok, T::Error>, | |||
is_full: Arc<AtomicBool>, | |||
} | |||
impl<O, T> Folder<T> for TryReduceFolder<O, T> | |||
where | |||
O: Fn(T::Ok, T::Ok) -> T + Clone, | |||
T: Try, | |||
{ | |||
type Result = T; | |||
fn consume(mut self, item: T) -> Self { | |||
if let Ok(left) = self.item { | |||
self.item = match item.into_result() { | |||
Ok(right) => (self.operation)(left, right).into_result(), | |||
Err(error) => Err(error), | |||
}; | |||
} | |||
if self.item.is_err() { | |||
self.is_full.store(true, Ordering::Relaxed) | |||
} | |||
self | |||
} | |||
fn complete(self) -> Self::Result { | |||
match self.item { | |||
Ok(v) => T::from_ok(v), | |||
Err(v) => T::from_error(v), | |||
} | |||
} | |||
fn is_full(&self) -> bool { | |||
self.is_full.load(Ordering::Relaxed) | |||
} | |||
} |
@@ -1,6 +1,7 @@ | |||
mod core; | |||
mod executor; | |||
mod inner; | |||
mod misc; | |||
mod std; | |||
pub use self::core::{ | |||
@@ -0,0 +1,3 @@ | |||
mod try_; | |||
pub use try_::Try; |
@@ -0,0 +1,42 @@ | |||
pub trait Try { | |||
type Ok; | |||
type Error; | |||
fn into_result(self) -> Result<Self::Ok, Self::Error>; | |||
fn from_ok(v: Self::Ok) -> Self; | |||
fn from_error(v: Self::Error) -> Self; | |||
} | |||
impl<T> Try for Option<T> { | |||
type Ok = T; | |||
type Error = (); | |||
fn into_result(self) -> Result<T, ()> { | |||
self.ok_or(()) | |||
} | |||
fn from_ok(v: T) -> Self { | |||
Some(v) | |||
} | |||
fn from_error(_: ()) -> Self { | |||
None | |||
} | |||
} | |||
impl<T, E> Try for Result<T, E> { | |||
type Ok = T; | |||
type Error = E; | |||
fn into_result(self) -> Result<T, E> { | |||
self | |||
} | |||
fn from_ok(v: T) -> Self { | |||
Ok(v) | |||
} | |||
fn from_error(v: E) -> Self { | |||
Err(v) | |||
} | |||
} |