Kaynağa Gözat

Implemented 'try_for_each', 'try_for_each_with' and 'try_for_each_init' operation

master
Bergmann89 5 yıl önce
ebeveyn
işleme
6190e93e8c
3 değiştirilmiş dosya ile 251 ekleme ve 5 silme
  1. +110
    -2
      asparit/src/core/iterator.rs
  2. +6
    -3
      asparit/src/inner/mod.rs
  3. +135
    -0
      asparit/src/inner/try_for_each.rs

+ 110
- 2
asparit/src/core/iterator.rs Dosyayı Görüntüle

@@ -5,8 +5,14 @@ use super::{

use crate::{
inner::{
collect::Collect, for_each::ForEach, map::Map, map_init::MapInit, map_with::MapWith,
reduce::Reduce, try_reduce::TryReduce,
collect::Collect,
for_each::ForEach,
map::Map,
map_init::MapInit,
map_with::MapWith,
reduce::Reduce,
try_for_each::{TryForEach, TryForEachInit, TryForEachWith},
try_reduce::TryReduce,
},
misc::Try,
};
@@ -170,6 +176,108 @@ pub trait ParallelIterator<'a>: Sized + Send {
self.map_init(init, operation).collect()
}

/// Executes a fallible `operation` on each item produced by the iterator, in parallel.
///
/// If the `operation` returns `Result::Err` or `Option::None`, we will attempt to
/// stop processing the rest of the items in the iterator as soon as
/// possible, and we will return that terminating value. Otherwise, we will
/// return an empty `Result::Ok(())` or `Option::Some(())`. If there are
/// multiple errors in parallel, it is not specified which will be returned.
///
/// # Examples
///
/// ```
/// use rayon::prelude::*;
/// use std::io::{self, Write};
///
/// // This will stop iteration early if there's any write error, like
/// // having piped output get closed on the other end.
/// (0..100).into_par_iter()
/// .try_for_each(|x| writeln!(io::stdout(), "{:?}", x))
/// .expect("expected no write errors");
/// ```
fn try_for_each<O, T>(self, operation: O) -> TryForEach<Self, O>
where
O: Fn(Self::Item) -> T + Clone + Sync + Send,
T: Try<Ok = ()> + Send,
{
TryForEach::new(self, operation)
}

/// Executes a fallible `operation` on the given `init` value with each item
/// produced by the iterator, in parallel.
///
/// This combines the `init` semantics of [`for_each_with()`] and the
/// failure semantics of [`try_for_each()`].
///
/// [`for_each_with()`]: #method.for_each_with
/// [`try_for_each()`]: #method.try_for_each
///
/// # Examples
///
/// ```
/// use std::sync::mpsc::channel;
/// use rayon::prelude::*;
///
/// let (sender, receiver) = channel();
///
/// (0..5).into_par_iter()
/// .try_for_each_with(sender, |s, x| s.send(x))
/// .expect("expected no send errors");
///
/// let mut res: Vec<_> = receiver.iter().collect();
///
/// res.sort();
///
/// assert_eq!(&res[..], &[0, 1, 2, 3, 4])
/// ```
fn try_for_each_with<O, S, T>(self, init: S, operation: O) -> TryForEachWith<Self, S, O>
where
S: Clone + Send + 'a,
O: Fn(&mut S, Self::Item) -> T + Clone + Sync + Send + 'a,
T: Try<Ok = ()> + Send + 'a,
{
TryForEachWith::new(self, init, operation)
}

/// Executes a fallible `operation` on a value returned by `init` with each item
/// produced by the iterator, in parallel.
///
/// This combines the `init` semantics of [`for_each_init()`] and the
/// failure semantics of [`try_for_each()`].
///
/// [`for_each_init()`]: #method.for_each_init
/// [`try_for_each()`]: #method.try_for_each
///
/// # Examples
///
/// ```
/// use rand::Rng;
/// use rayon::prelude::*;
///
/// let mut v = vec![0u8; 1_000_000];
///
/// v.par_chunks_mut(1000)
/// .try_for_each_init(
/// || rand::thread_rng(),
/// |rng, chunk| rng.try_fill(chunk),
/// )
/// .expect("expected no rand errors");
///
/// // There's a remote chance that this will fail...
/// for i in 0u8..=255 {
/// assert!(v.contains(&i));
/// }
/// ```
fn try_for_each_init<O, S, T, U>(self, init: S, operation: O) -> TryForEachInit<Self, S, O>
where
O: Fn(&mut U, Self::Item) -> T + Clone + Sync + Send + 'a,
S: Fn() -> U + Clone + Send + Sync + 'a,
T: Try<Ok = ()> + Send + 'a,
{
TryForEachInit::new(self, init, operation)
}

/// Applies `operation` to each item of this iterator, producing a new
/// iterator with the results.
///


+ 6
- 3
asparit/src/inner/mod.rs Dosyayı Görüntüle

@@ -5,6 +5,7 @@ pub mod map_init;
pub mod map_with;
pub mod noop;
pub mod reduce;
pub mod try_for_each;
pub mod try_reduce;

#[cfg(test)]
@@ -25,16 +26,18 @@ mod tests {
move || i.fetch_add(1, Ordering::Relaxed),
|init, item| Some((*init, item)),
)
.for_each_init(
.try_for_each_init(
move || j.fetch_add(1, Ordering::Relaxed),
|init, item| {
|init, item| -> Result<(), ()> {
println!("{:?} {:?}", init, item);

Ok(())
},
)
.exec()
.await;

dbg!(x);
dbg!(&x);
}

#[tokio::test]


+ 135
- 0
asparit/src/inner/try_for_each.rs Dosyayı Görüntüle

@@ -0,0 +1,135 @@
use crate::{misc::Try, Driver, Executor, ParallelIterator};

/* TryForEach */

pub struct TryForEach<X, O> {
iterator: X,
operation: O,
}

impl<X, O> TryForEach<X, O> {
pub fn new(iterator: X, operation: O) -> Self {
Self {
iterator,
operation,
}
}
}

impl<'a, X, O, T> Driver<'a, T> for TryForEach<X, O>
where
X: ParallelIterator<'a>,
O: Fn(X::Item) -> T + Clone + Sync + Send + 'a,
T: Try<Ok = ()> + Send + 'a,
{
fn exec_with<E>(self, executor: E) -> E::Result
where
E: Executor<'a, T>,
{
let TryForEach {
iterator,
operation,
} = self;

fn ok<R: Try<Ok = ()>>(_: (), _: ()) -> R {
R::from_ok(())
}

iterator
.map(operation)
.try_reduce(<()>::default, ok)
.exec_with(executor)
}
}

/* TryForEachWith */

pub struct TryForEachWith<X, S, O> {
iterator: X,
init: S,
operation: O,
}

impl<X, S, O> TryForEachWith<X, S, O> {
pub fn new(iterator: X, init: S, operation: O) -> Self {
Self {
iterator,
init,
operation,
}
}
}

impl<'a, X, S, O, T> Driver<'a, T> for TryForEachWith<X, S, O>
where
X: ParallelIterator<'a>,
S: Clone + Send + 'a,
O: Fn(&mut S, X::Item) -> T + Clone + Sync + Send + 'a,
T: Try<Ok = ()> + Send + 'a,
{
fn exec_with<E>(self, executor: E) -> E::Result
where
E: Executor<'a, T>,
{
let TryForEachWith {
iterator,
init,
operation,
} = self;

fn ok<R: Try<Ok = ()>>(_: (), _: ()) -> R {
R::from_ok(())
}

iterator
.map_with(init, operation)
.try_reduce(<()>::default, ok)
.exec_with(executor)
}
}

/* TryForEachInit */

pub struct TryForEachInit<X, S, O> {
iterator: X,
init: S,
operation: O,
}

impl<X, S, O> TryForEachInit<X, S, O> {
pub fn new(iterator: X, init: S, operation: O) -> Self {
Self {
iterator,
init,
operation,
}
}
}

impl<'a, X, S, O, T, U> Driver<'a, T> for TryForEachInit<X, S, O>
where
X: ParallelIterator<'a>,
S: Fn() -> U + Clone + Send + Sync + 'a,
O: Fn(&mut U, X::Item) -> T + Clone + Sync + Send + 'a,
T: Try<Ok = ()> + Send + 'a,
{
fn exec_with<E>(self, executor: E) -> E::Result
where
E: Executor<'a, T>,
{
let TryForEachInit {
iterator,
init,
operation,
} = self;

fn ok<R: Try<Ok = ()>>(_: (), _: ()) -> R {
R::from_ok(())
}

iterator
.map_init(init, operation)
.try_reduce(<()>::default, ok)
.exec_with(executor)
}
}

Yükleniyor…
İptal
Kaydet