Przeglądaj źródła

Implemented 'reduce' operation

master
Bergmann89 5 lat temu
rodzic
commit
51a8b1cde5
5 zmienionych plików z 222 dodań i 33 usunięć
  1. +39
    -0
      asparit/src/core/iterator.rs
  2. +0
    -32
      asparit/src/inner/for_each.rs
  3. +45
    -0
      asparit/src/inner/mod.rs
  4. +137
    -0
      asparit/src/inner/reduce.rs
  5. +1
    -1
      asparit/src/lib.rs

+ 39
- 0
asparit/src/core/iterator.rs Wyświetl plik

@@ -5,6 +5,7 @@ use super::{

use crate::inner::{
collect::Collect, for_each::ForEach, map::Map, map_init::MapInit, map_with::MapWith,
reduce::Reduce,
};

/// Parallel version of the standard iterator trait.
@@ -264,6 +265,44 @@ pub trait ParallelIterator<'a>: Sized + Send {
MapInit::new(self, init, operation)
}

/// Reduces the items in the iterator into one item using `operation`.
/// The argument `identity` should be a closure that can produce
/// "identity" value which may be inserted into the sequence as
/// needed to create opportunities for parallel execution. So, for
/// example, if you are doing a summation, then `identity()` ought
/// to produce something that represents the zero for your type
/// (but consider just calling `sum()` in that case).
///
/// # Examples
///
/// ```
/// // Iterate over a sequence of pairs `(x0, y0), ..., (xN, yN)`
/// // and use reduce to compute one pair `(x0 + ... + xN, y0 + ... + yN)`
/// // where the first/second elements are summed separately.
/// use rayon::prelude::*;
/// let sums = [(0, 1), (5, 6), (16, 2), (8, 9)]
/// .par_iter() // iterating over &(i32, i32)
/// .cloned() // iterating over (i32, i32)
/// .reduce(|| (0, 0), // the "identity" is 0 in both columns
/// |a, b| (a.0 + b.0, a.1 + b.1));
/// assert_eq!(sums, (0 + 5 + 16 + 8, 1 + 6 + 2 + 9));
/// ```
///
/// **Note:** unlike a sequential `fold` operation, the order in
/// which `operation` will be applied to reduce the result is not fully
/// specified. So `operation` should be [associative] or else the results
/// will be non-deterministic. And of course `identity()` should
/// produce a true identity.
///
/// [associative]: https://en.wikipedia.org/wiki/Associative_property
fn reduce<S, O>(self, identity: S, operation: O) -> Reduce<Self, S, O>
where
S: Fn() -> Self::Item + Clone + Send + 'a,
O: Fn(Self::Item, Self::Item) -> Self::Item + Clone + Send + 'a,
{
Reduce::new(self, identity, operation)
}

/// Creates a fresh collection containing all the elements produced
/// by this parallel iterator.
///


+ 0
- 32
asparit/src/inner/for_each.rs Wyświetl plik

@@ -83,35 +83,3 @@ where

fn complete(self) {}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::*;

#[tokio::test]
async fn test_for_each() {
use ::std::sync::atomic::{AtomicUsize, Ordering};
use ::std::sync::Arc;

let i = Arc::new(AtomicUsize::new(0));
let j = Arc::new(AtomicUsize::new(0));

let x = (0..10usize)
.into_par_iter()
.map_init(
move || i.fetch_add(1, Ordering::Relaxed),
|init, item| Some((*init, item)),
)
.for_each_init(
move || j.fetch_add(1, Ordering::Relaxed),
|init, item| {
println!("{:?} {:?}", init, item);
},
)
.exec()
.await;

dbg!(x);
}
}

+ 45
- 0
asparit/src/inner/mod.rs Wyświetl plik

@@ -4,3 +4,48 @@ pub mod map;
pub mod map_init;
pub mod map_with;
pub mod noop;
pub mod reduce;

#[cfg(test)]
mod tests {
use crate::*;

#[tokio::test]
async fn test_for_each() {
use ::std::sync::atomic::{AtomicUsize, Ordering};
use ::std::sync::Arc;

let i = Arc::new(AtomicUsize::new(0));
let j = Arc::new(AtomicUsize::new(0));

let x = (0..10usize)
.into_par_iter()
.map_init(
move || i.fetch_add(1, Ordering::Relaxed),
|init, item| Some((*init, item)),
)
.for_each_init(
move || j.fetch_add(1, Ordering::Relaxed),
|init, item| {
println!("{:?} {:?}", init, item);
},
)
.exec()
.await;

dbg!(x);
}

#[tokio::test]
async fn test_reduce() {
let x = (0..10usize)
.into_par_iter()
.reduce(|| 0, |a, b| a + b)
.exec()
.await;

dbg!(x);

assert_eq!(45, x);
}
}

+ 137
- 0
asparit/src/inner/reduce.rs Wyświetl plik

@@ -0,0 +1,137 @@
use crate::{core::Driver, Consumer, Executor, Folder, IndexedConsumer, ParallelIterator, Reducer};

pub struct Reduce<X, S, O> {
iterator: X,
identity: S,
operation: O,
}

impl<X, S, O> Reduce<X, S, O> {
pub fn new(iterator: X, identity: S, operation: O) -> Self {
Self {
iterator,
identity,
operation,
}
}
}

impl<'a, X, S, O> Driver<'a, X::Item> for Reduce<X, S, O>
where
X: ParallelIterator<'a>,
S: Fn() -> X::Item + Clone + Send + 'a,
O: Fn(X::Item, X::Item) -> X::Item + Clone + Send + 'a,
{
fn exec_with<E>(self, executor: E) -> E::Result
where
E: Executor<'a, X::Item>,
{
let iterator = self.iterator;
let identity = self.identity;
let operation = self.operation;

let consumer = ReduceConsumer {
identity,
operation,
};

iterator.drive(executor, consumer)
}
}

/* ReduceConsumer */

struct ReduceConsumer<S, O> {
identity: S,
operation: O,
}

impl<S, O> Clone for ReduceConsumer<S, O>
where
S: Clone,
O: Clone,
{
fn clone(&self) -> Self {
Self {
identity: self.identity.clone(),
operation: self.operation.clone(),
}
}
}

impl<S, O, T> Consumer<T> for ReduceConsumer<S, O>
where
S: Fn() -> T + Clone + Send,
O: Fn(T, T) -> T + Clone + Send,
T: Send,
{
type Folder = ReduceFolder<O, T>;
type Reducer = Self;
type Result = T;

fn split_off_left(&self) -> (Self, Self::Reducer) {
(self.clone(), self.clone())
}

fn into_folder(self) -> Self::Folder {
ReduceFolder {
operation: self.operation,
item: (self.identity)(),
}
}
}

impl<S, O, T> IndexedConsumer<T> for ReduceConsumer<S, O>
where
S: Fn() -> T + Clone + Send,
O: Fn(T, T) -> T + Clone + Send,
T: Send,
{
fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) {
(self.clone(), self.clone(), self)
}
}

impl<S, O, T> Reducer<T> for ReduceConsumer<S, O>
where
O: Fn(T, T) -> T,
S: Fn() -> T,
T: Send,
{
fn reduce(self, left: T, right: T) -> T {
(self.operation)(left, right)
}
}

/* ReduceFolder */

struct ReduceFolder<O, T> {
operation: O,
item: T,
}

impl<O, T> Folder<T> for ReduceFolder<O, T>
where
O: Fn(T, T) -> T + Clone,
{
type Result = T;

fn consume(mut self, item: T) -> Self {
self.item = (self.operation)(self.item, item);

self
}

fn consume_iter<X>(mut self, iter: X) -> Self
where
X: IntoIterator<Item = T>,
{
self.item = iter.into_iter().fold(self.item, self.operation.clone());

self
}

fn complete(self) -> Self::Result {
self.item
}
}

+ 1
- 1
asparit/src/lib.rs Wyświetl plik

@@ -4,7 +4,7 @@ mod inner;
mod std;

pub use self::core::{
Consumer, Executor, ExecutorCallback, Folder, IndexedConsumer, IndexedParallelIterator,
Consumer, Driver, Executor, ExecutorCallback, Folder, IndexedConsumer, IndexedParallelIterator,
IndexedProducer, IndexedProducerCallback, IntoParallelIterator, IntoParallelRefIterator,
IntoParallelRefMutIterator, ParallelIterator, Producer, ProducerCallback, Reducer,
};


Ładowanie…
Anuluj
Zapisz