Browse Source

Implemented 'reduce_with' and 'try_reduce_with' operation

master
Bergmann89 5 years ago
parent
commit
7230f8b65e
3 changed files with 180 additions and 2 deletions
  1. +78
    -2
      asparit/src/core/iterator.rs
  2. +45
    -0
      asparit/src/inner/reduce.rs
  3. +57
    -0
      asparit/src/inner/try_reduce.rs

+ 78
- 2
asparit/src/core/iterator.rs View File

@@ -20,11 +20,11 @@ use crate::{
map_init::MapInit,
map_with::MapWith,
product::Product,
reduce::Reduce,
reduce::{Reduce, ReduceWith},
sum::Sum,
try_fold::{TryFold, TryFoldWith},
try_for_each::{TryForEach, TryForEachInit, TryForEachWith},
try_reduce::TryReduce,
try_reduce::{TryReduce, TryReduceWith},
update::Update,
},
misc::Try,
@@ -673,6 +673,39 @@ pub trait ParallelIterator<'a>: Sized + Send {
Reduce::new(self, identity, operation)
}

/// Reduces the items in the iterator into one item using `operation`.
/// If the iterator is empty, `None` is returned; otherwise,
/// `Some` is returned.
///
/// This version of `reduce` is simple but somewhat less
/// efficient. If possible, it is better to call `reduce()`, which
/// requires an identity element.
///
/// # Examples
///
/// ```
/// use rayon::prelude::*;
/// let sums = [(0, 1), (5, 6), (16, 2), (8, 9)]
/// .par_iter() // iterating over &(i32, i32)
/// .cloned() // iterating over (i32, i32)
/// .reduce_with(|a, b| (a.0 + b.0, a.1 + b.1))
/// .unwrap();
/// assert_eq!(sums, (0 + 5 + 16 + 8, 1 + 6 + 2 + 9));
/// ```
///
/// **Note:** unlike a sequential `fold` operation, the order in
/// which `operation` will be applied to reduce the result is not fully
/// specified. So `operation` should be [associative] or else the results
/// will be non-deterministic.
///
/// [associative]: https://en.wikipedia.org/wiki/Associative_property
fn reduce_with<O>(self, operation: O) -> ReduceWith<Self, O>
where
O: Fn(Self::Item, Self::Item) -> Self::Item + Sync + Send + 'a,
{
ReduceWith::new(self, operation)
}

/// Reduces the items in the iterator into one item using a fallible `operation`.
/// The `identity` argument is used the same way as in [`reduce()`].
///
@@ -713,6 +746,49 @@ pub trait ParallelIterator<'a>: Sized + Send {
TryReduce::new(self, identity, operation)
}

/// Reduces the items in the iterator into one item using a fallible `operation`.
///
/// Like [`reduce_with()`], if the iterator is empty, `None` is returned;
/// otherwise, `Some` is returned. Beyond that, it behaves like
/// [`try_reduce()`] for handling `Err`/`None`.
///
/// [`reduce_with()`]: #method.reduce_with
/// [`try_reduce()`]: #method.try_reduce
///
/// For instance, with `Option` items, the return value may be:
/// - `None`, the iterator was empty
/// - `Some(None)`, we stopped after encountering `None`.
/// - `Some(Some(x))`, the entire iterator reduced to `x`.
///
/// With `Result` items, the nesting is more obvious:
/// - `None`, the iterator was empty
/// - `Some(Err(e))`, we stopped after encountering an error `e`.
/// - `Some(Ok(x))`, the entire iterator reduced to `x`.
///
/// # Examples
///
/// ```
/// use rayon::prelude::*;
///
/// let files = ["/dev/null", "/does/not/exist"];
///
/// // Find the biggest file
/// files.into_par_iter()
/// .map(|path| std::fs::metadata(path).map(|m| (path, m.len())))
/// .try_reduce_with(|a, b| {
/// Ok(if a.1 >= b.1 { a } else { b })
/// })
/// .expect("Some value, since the iterator is not empty")
/// .expect_err("not found");
/// ```
fn try_reduce_with<O, T>(self, operation: O) -> TryReduceWith<Self, O>
where
Self::Item: Try<Ok = T>,
O: Fn(T, T) -> Self::Item + Sync + Send + 'a,
{
TryReduceWith::new(self, operation)
}

/// Parallel fold is similar to sequential fold except that the
/// sequence of items may be subdivided before it is
/// folded. Consider a list of numbers like `22 3 77 89 46`. If


+ 45
- 0
asparit/src/inner/reduce.rs View File

@@ -1,5 +1,7 @@
use crate::{core::Driver, Consumer, Executor, Folder, ParallelIterator, Reducer};

/* Reduce */

pub struct Reduce<X, S, O> {
iterator: X,
identity: S,
@@ -39,6 +41,49 @@ where
}
}

/* ReduceWith */

pub struct ReduceWith<X, O> {
iterator: X,
operation: O,
}

impl<X, O> ReduceWith<X, O> {
pub fn new(iterator: X, operation: O) -> Self {
Self {
iterator,
operation,
}
}
}

impl<'a, X, O, T> Driver<'a, Option<T>> for ReduceWith<X, O>
where
X: ParallelIterator<'a, Item = T>,
O: Fn(T, T) -> T + Clone + Send + 'a,
T: Send + 'a,
{
fn exec_with<E>(self, executor: E) -> E::Result
where
E: Executor<'a, Option<X::Item>>,
{
let fold_op = self.operation.clone();
let reduce_op = self.operation;

self.iterator
.fold(<_>::default, move |a, b| match a {
Some(a) => Some(fold_op(a, b)),
None => Some(b),
})
.reduce(<_>::default, move |a, b| match (a, b) {
(Some(a), Some(b)) => Some(reduce_op(a, b)),
(Some(v), None) | (None, Some(v)) => Some(v),
(None, None) => None,
})
.exec_with(executor)
}
}

/* ReduceConsumer */

struct ReduceConsumer<S, O> {


+ 57
- 0
asparit/src/inner/try_reduce.rs View File

@@ -5,6 +5,8 @@ use std::sync::{

use crate::{core::Driver, misc::Try, Consumer, Executor, Folder, ParallelIterator, Reducer};

/* TryReduce */

pub struct TryReduce<X, S, O> {
iterator: X,
identity: S,
@@ -46,6 +48,61 @@ where
}
}

/* TryReduceWith */

pub struct TryReduceWith<X, O> {
iterator: X,
operation: O,
}

impl<X, O> TryReduceWith<X, O> {
pub fn new(iterator: X, operation: O) -> Self {
Self {
iterator,
operation,
}
}
}

impl<'a, X, O, T> Driver<'a, Option<T>> for TryReduceWith<X, O>
where
X: ParallelIterator<'a, Item = T>,
O: Fn(T::Ok, T::Ok) -> T + Clone + Send + 'a,
T: Try + Send,
{
fn exec_with<E>(self, executor: E) -> E::Result
where
E: Executor<'a, Option<T>>,
{
let fold_op = self.operation.clone();
let reduce_op = self.operation;

self.iterator
.fold(
|| None,
move |a: Option<T>, b: T| match a {
Some(a) => match (a.into_result(), b.into_result()) {
(Ok(a), Ok(b)) => Some(fold_op(a, b)),
(Err(e), _) | (_, Err(e)) => Some(T::from_error(e)),
},
None => Some(b),
},
)
.reduce(
|| None,
move |a: Option<T>, b: Option<T>| match (a, b) {
(Some(a), Some(b)) => match (a.into_result(), b.into_result()) {
(Ok(a), Ok(b)) => Some(reduce_op(a, b)),
(Err(e), _) | (_, Err(e)) => Some(T::from_error(e)),
},
(Some(v), None) | (None, Some(v)) => Some(v),
(None, None) => None,
},
)
.exec_with(executor)
}
}

/* TryReduceConsumer */

struct TryReduceConsumer<S, O> {


Loading…
Cancel
Save