Parcourir la source

Implemented 'fold' and 'fold_with' operation

master
Bergmann89 il y a 5 ans
Parent
révision
ad88c705e0
5 fichiers modifiés avec 505 ajouts et 16 suppressions
  1. +170
    -0
      asparit/src/core/iterator.rs
  2. +0
    -4
      asparit/src/inner/filter.rs
  3. +0
    -4
      asparit/src/inner/filter_map.rs
  4. +329
    -0
      asparit/src/inner/fold.rs
  5. +6
    -8
      asparit/src/inner/mod.rs

+ 170
- 0
asparit/src/core/iterator.rs Voir le fichier

@@ -9,6 +9,7 @@ use crate::{
copied::Copied,
filter::Filter,
filter_map::FilterMap,
fold::{Fold, FoldWith},
for_each::ForEach,
inspect::Inspect,
map::Map,
@@ -619,6 +620,175 @@ pub trait ParallelIterator<'a>: Sized + Send {
TryReduce::new(self, identity, operation)
}

/// Parallel fold is similar to sequential fold except that the
/// sequence of items may be subdivided before it is
/// folded. Consider a list of numbers like `22 3 77 89 46`. If
/// you used sequential fold to add them (`fold(0, |a,b| a+b)`,
/// you would wind up first adding 0 + 22, then 22 + 3, then 25 +
/// 77, and so forth. The **parallel fold** works similarly except
/// that it first breaks up your list into sublists, and hence
/// instead of yielding up a single sum at the end, it yields up
/// multiple sums. The number of results is nondeterministic, as
/// is the point where the breaks occur.
///
/// So if did the same parallel fold (`fold(0, |a,b| a+b)`) on
/// our example list, we might wind up with a sequence of two numbers,
/// like so:
///
/// ```notrust
/// 22 3 77 89 46
/// | |
/// 102 135
/// ```
///
/// Or perhaps these three numbers:
///
/// ```notrust
/// 22 3 77 89 46
/// | | |
/// 102 89 46
/// ```
///
/// In general, Rayon will attempt to find good breaking points
/// that keep all of your cores busy.
///
/// ### Fold versus reduce
///
/// The `fold()` and `reduce()` methods each take an identity element
/// and a combining function, but they operate rather differently.
///
/// `reduce()` requires that the identity function has the same
/// type as the things you are iterating over, and it fully
/// reduces the list of items into a single item. So, for example,
/// imagine we are iterating over a list of bytes `bytes: [128_u8,
/// 64_u8, 64_u8]`. If we used `bytes.reduce(|| 0_u8, |a: u8, b:
/// u8| a + b)`, we would get an overflow. This is because `0`,
/// `a`, and `b` here are all bytes, just like the numbers in the
/// list (I wrote the types explicitly above, but those are the
/// only types you can use). To avoid the overflow, we would need
/// to do something like `bytes.map(|b| b as u32).reduce(|| 0, |a,
/// b| a + b)`, in which case our result would be `256`.
///
/// In contrast, with `fold()`, the identity function does not
/// have to have the same type as the things you are iterating
/// over, and you potentially get back many results. So, if we
/// continue with the `bytes` example from the previous paragraph,
/// we could do `bytes.fold(|| 0_u32, |a, b| a + (b as u32))` to
/// convert our bytes into `u32`. And of course we might not get
/// back a single sum.
///
/// There is a more subtle distinction as well, though it's
/// actually implied by the above points. When you use `reduce()`,
/// your reduction function is sometimes called with values that
/// were never part of your original parallel iterator (for
/// example, both the left and right might be a partial sum). With
/// `fold()`, in contrast, the left value in the fold function is
/// always the accumulator, and the right value is always from
/// your original sequence.
///
/// ### Fold vs Map/Reduce
///
/// Fold makes sense if you have some operation where it is
/// cheaper to create groups of elements at a time. For example,
/// imagine collecting characters into a string. If you were going
/// to use map/reduce, you might try this:
///
/// ```
/// use rayon::prelude::*;
///
/// let s =
/// ['a', 'b', 'c', 'd', 'e']
/// .par_iter()
/// .map(|c: &char| format!("{}", c))
/// .reduce(|| String::new(),
/// |mut a: String, b: String| { a.push_str(&b); a });
///
/// assert_eq!(s, "abcde");
/// ```
///
/// Because reduce produces the same type of element as its input,
/// you have to first map each character into a string, and then
/// you can reduce them. This means we create one string per
/// element in our iterator -- not so great. Using `fold`, we can
/// do this instead:
///
/// ```
/// use rayon::prelude::*;
///
/// let s =
/// ['a', 'b', 'c', 'd', 'e']
/// .par_iter()
/// .fold(|| String::new(),
/// |mut s: String, c: &char| { s.push(*c); s })
/// .reduce(|| String::new(),
/// |mut a: String, b: String| { a.push_str(&b); a });
///
/// assert_eq!(s, "abcde");
/// ```
///
/// Now `fold` will process groups of our characters at a time,
/// and we only make one string per group. We should wind up with
/// some small-ish number of strings roughly proportional to the
/// number of CPUs you have (it will ultimately depend on how busy
/// your processors are). Note that we still need to do a reduce
/// afterwards to combine those groups of strings into a single
/// string.
///
/// You could use a similar trick to save partial results (e.g., a
/// cache) or something similar.
///
/// ### Combining fold with other operations
///
/// You can combine `fold` with `reduce` if you want to produce a
/// single value. This is then roughly equivalent to a map/reduce
/// combination in effect:
///
/// ```
/// use rayon::prelude::*;
///
/// let bytes = 0..22_u8;
/// let sum = bytes.into_par_iter()
/// .fold(|| 0_u32, |a: u32, b: u8| a + (b as u32))
/// .sum::<u32>();
///
/// assert_eq!(sum, (0..22).sum()); // compare to sequential
/// ```
fn fold<S, O, U>(self, init: S, operation: O) -> Fold<Self, S, O>
where
S: Fn() -> U + Clone + Send + 'a,
O: Fn(U, Self::Item) -> U + Clone + Send + 'a,
U: Send,
{
Fold::new(self, init, operation)
}

/// Applies `operation` to the given `init` value with each item of this
/// iterator, finally producing the value for further use.
///
/// This works essentially like `fold(|| init.clone(), operation)`, except
/// it doesn't require the `init` type to be `Sync`, nor any other form
/// of added synchronization.
///
/// # Examples
///
/// ```
/// use rayon::prelude::*;
///
/// let bytes = 0..22_u8;
/// let sum = bytes.into_par_iter()
/// .fold_with(0_u32, |a: u32, b: u8| a + (b as u32))
/// .sum::<u32>();
///
/// assert_eq!(sum, (0..22).sum()); // compare to sequential
/// ```
fn fold_with<U, O>(self, init: U, operation: O) -> FoldWith<Self, U, O>
where
U: Clone + Send + 'a,
O: Fn(U, Self::Item) -> U + Clone + Send + 'a,
{
FoldWith::new(self, init, operation)
}

/// Creates a fresh collection containing all the elements produced
/// by this parallel iterator.
///


+ 0
- 4
asparit/src/inner/filter.rs Voir le fichier

@@ -45,10 +45,6 @@ where
operation: self.operation,
})
}

fn len_hint_opt(&self) -> Option<usize> {
self.base.len_hint_opt()
}
}

/* FilterConsumer */


+ 0
- 4
asparit/src/inner/filter_map.rs Voir le fichier

@@ -46,10 +46,6 @@ where
operation: self.operation,
})
}

fn len_hint_opt(&self) -> Option<usize> {
self.base.len_hint_opt()
}
}

/* FilterMapConsumer */


+ 329
- 0
asparit/src/inner/fold.rs Voir le fichier

@@ -0,0 +1,329 @@
use crate::{Consumer, Executor, Folder, ParallelIterator, Producer, ProducerCallback, Reducer};

/* Fold */

pub struct Fold<X, S, O> {
base: X,
init: S,
operation: O,
}

impl<X, S, O> Fold<X, S, O> {
pub fn new(base: X, init: S, operation: O) -> Self {
Self {
base,
init,
operation,
}
}
}

impl<'a, X, S, O, U> ParallelIterator<'a> for Fold<X, S, O>
where
X: ParallelIterator<'a>,
S: Fn() -> U + Clone + Send + 'a,
O: Fn(U, X::Item) -> U + Clone + Send + 'a,
U: Send,
{
type Item = U;

fn drive<E, C, D, R>(self, executor: E, consumer: C) -> E::Result
where
E: Executor<'a, D>,
C: Consumer<Self::Item, Result = D, Reducer = R> + 'a,
D: Send,
R: Reducer<D> + Send,
{
self.base.drive(
executor,
FoldConsumer {
base: consumer,
init: self.init,
operation: self.operation,
},
)
}

fn with_producer<CB>(self, callback: CB) -> CB::Output
where
CB: ProducerCallback<'a, Self::Item>,
{
self.base.with_producer(FoldCallback {
base: callback,
init: self.init,
operation: self.operation,
})
}
}

/* FoldWith */

pub struct FoldWith<X, U, O> {
base: X,
init: U,
operation: O,
}

impl<X, U, O> FoldWith<X, U, O> {
pub fn new(base: X, init: U, operation: O) -> Self {
Self {
base,
init,
operation,
}
}
}

impl<'a, X, U, O> ParallelIterator<'a> for FoldWith<X, U, O>
where
X: ParallelIterator<'a>,
U: Clone + Send + 'a,
O: Fn(U, X::Item) -> U + Clone + Send + 'a,
{
type Item = U;

fn drive<E, C, D, R>(self, executor: E, consumer: C) -> E::Result
where
E: Executor<'a, D>,
C: Consumer<Self::Item, Result = D, Reducer = R> + 'a,
D: Send,
R: Reducer<D> + Send,
{
let FoldWith {
base,
init,
operation,
} = self;

base.drive(
executor,
FoldConsumer {
base: consumer,
init: move || init.clone(),
operation,
},
)
}

fn with_producer<CB>(self, callback: CB) -> CB::Output
where
CB: ProducerCallback<'a, Self::Item>,
{
let FoldWith {
base,
init,
operation,
} = self;

base.with_producer(FoldCallback {
base: callback,
init: move || init.clone(),
operation,
})
}
}

/* FoldConsumer */

struct FoldConsumer<C, S, O> {
base: C,
init: S,
operation: O,
}

impl<'a, C, S, O, T, U> Consumer<T> for FoldConsumer<C, S, O>
where
C: Consumer<U>,
S: Fn() -> U + Clone + Send,
O: Fn(U, T) -> U + Clone + Send,
{
type Folder = FoldFolder<C::Folder, O, U>;
type Reducer = C::Reducer;
type Result = C::Result;

fn split(self) -> (Self, Self, Self::Reducer) {
let (left, right, reducer) = self.base.split();

let left = FoldConsumer {
base: left,
init: self.init.clone(),
operation: self.operation.clone(),
};
let right = FoldConsumer {
base: right,
init: self.init,
operation: self.operation,
};

(left, right, reducer)
}

fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
let (left, right, reducer) = self.base.split_at(index);

let left = FoldConsumer {
base: left,
init: self.init.clone(),
operation: self.operation.clone(),
};
let right = FoldConsumer {
base: right,
init: self.init,
operation: self.operation,
};

(left, right, reducer)
}

fn into_folder(self) -> Self::Folder {
FoldFolder {
base: self.base.into_folder(),
item: (self.init)(),
operation: self.operation,
}
}

fn is_full(&self) -> bool {
self.base.is_full()
}
}

/* FoldCallback */

struct FoldCallback<CB, S, O> {
base: CB,
init: S,
operation: O,
}

impl<'a, CB, S, O, T, U> ProducerCallback<'a, T> for FoldCallback<CB, S, O>
where
CB: ProducerCallback<'a, U>,
S: Fn() -> U + Clone + Send + 'a,
O: Fn(U, T) -> U + Clone + Send + 'a,
{
type Output = CB::Output;

fn callback<P>(self, producer: P) -> Self::Output
where
P: Producer<Item = T> + 'a,
{
self.base.callback(FoldProducer {
base: producer,
init: self.init,
operation: self.operation,
})
}
}

/* FoldProducer */

struct FoldProducer<P, S, O> {
base: P,
init: S,
operation: O,
}

impl<'a, P, S, O, U> Producer for FoldProducer<P, S, O>
where
P: Producer,
S: Fn() -> U + Clone + Send,
O: Fn(U, P::Item) -> U + Clone + Send,
{
type Item = U;
type IntoIter = std::iter::Once<U>;

fn into_iter(self) -> Self::IntoIter {
let item = (self.init)();
let item = self.base.into_iter().fold(item, self.operation);

std::iter::once(item)
}

fn split(self) -> (Self, Option<Self>) {
let init = self.init;
let operation = self.operation;
let (left, right) = self.base.split();

let left = FoldProducer {
base: left,
init: init.clone(),
operation: operation.clone(),
};
let right = right.map(move |right| FoldProducer {
base: right,
init,
operation,
});

(left, right)
}

fn splits(&self) -> Option<usize> {
self.base.splits()
}

fn fold_with<F>(self, folder: F) -> F
where
F: Folder<Self::Item>,
{
self.base
.fold_with(FoldFolder {
base: folder,
item: (self.init)(),
operation: self.operation,
})
.base
}
}

/* FoldFolder */

struct FoldFolder<F, O, U> {
base: F,
operation: O,
item: U,
}

impl<F, O, U, T> Folder<T> for FoldFolder<F, O, U>
where
F: Folder<U>,
O: Fn(U, T) -> U + Clone,
{
type Result = F::Result;

fn consume(mut self, item: T) -> Self {
self.item = (self.operation)(self.item, item);

self
}

fn consume_iter<X>(self, iter: X) -> Self
where
X: IntoIterator<Item = T>,
{
let FoldFolder {
base,
operation,
item,
} = self;
let item = iter
.into_iter()
.take_while(|_| !base.is_full())
.fold(item, operation.clone());

FoldFolder {
base,
operation,
item,
}
}

fn complete(self) -> Self::Result {
self.base.consume(self.item).complete()
}

fn is_full(&self) -> bool {
self.base.is_full()
}
}

+ 6
- 8
asparit/src/inner/mod.rs Voir le fichier

@@ -3,6 +3,7 @@ pub mod collect;
pub mod copied;
pub mod filter;
pub mod filter_map;
pub mod fold;
pub mod for_each;
pub mod inspect;
pub mod map;
@@ -27,23 +28,20 @@ mod tests {
let j = Arc::new(AtomicUsize::new(0));

let x = vec![
vec![0usize],
vec![1usize],
vec![2usize],
vec![3usize],
vec![4usize],
vec![5usize],
vec![1usize, 2usize],
vec![3usize, 4usize],
vec![5usize, 6usize],
];

let x = x
.par_iter()
.cloned()
.update(|x| x.push(5))
.update(|x| x.push(0))
.map_init(
move || i.fetch_add(1, Ordering::Relaxed),
|init, item| (item, *init),
)
.filter_map(|(x, i)| if i % 2 == 0 { Some((i, x)) } else { None })
.fold_with(String::new(), |s, item| format!("{} + {:?}", s, item))
.try_for_each_init(
move || j.fetch_add(1, Ordering::Relaxed),
|init, item| -> Result<(), ()> {


Chargement…
Annuler
Enregistrer