浏览代码

Implemented 'map_init' operation

master
Bergmann89 5 年前
父节点
当前提交
cec8cd0f62
共有 6 个文件被更改,包括 403 次插入27 次删除
  1. +48
    -10
      asparit/src/core/iterator.rs
  2. +6
    -1
      asparit/src/inner/for_each.rs
  3. +4
    -4
      asparit/src/inner/map.rs
  4. +332
    -0
      asparit/src/inner/map_init.rs
  5. +12
    -12
      asparit/src/inner/map_with.rs
  6. +1
    -0
      asparit/src/inner/mod.rs

+ 48
- 10
asparit/src/core/iterator.rs 查看文件

@@ -1,6 +1,6 @@
use super::{Consumer, IndexedConsumer, IndexedProducerCallback, ProducerCallback, Executor, Reducer};

use crate::inner::{for_each::ForEach, map::Map, map_with::MapWith};
use crate::inner::{for_each::ForEach, map::Map, map_with::MapWith, map_init::MapInit};

/// Parallel version of the standard iterator trait.
///
@@ -134,15 +134,16 @@ pub trait ParallelIterator<'a>: Sized + Send {
/// let (sender, receiver) = channel();
///
/// let a: Vec<_> = (0..5)
/// .into_par_iter() // iterating over i32
/// .map_with(sender, |s, x| {
/// s.send(x).unwrap(); // sending i32 values through the channel
/// x // returning i32
/// })
/// .collect(); // collecting the returned values into a vector
///
/// let mut b: Vec<_> = receiver.iter() // iterating over the values in the channel
/// .collect(); // and collecting them
/// .into_par_iter() // iterating over i32
/// .map_with(sender, |s, x| {
/// s.send(x).unwrap(); // sending i32 values through the channel
/// x // returning i32
/// })
/// .collect(); // collecting the returned values into a vector
///
/// let mut b: Vec<_> = receiver
/// .iter() // iterating over the values in the channel
/// .collect(); // and collecting them
/// b.sort();
///
/// assert_eq!(a, b);
@@ -155,6 +156,43 @@ pub trait ParallelIterator<'a>: Sized + Send {
{
MapWith::new(self, init, operation)
}

/// Applies `operation` to a value returned by `init` with each item of this
/// iterator, producing a new iterator with the results.
///
/// The `init` function will be called only as needed for a value to be
/// paired with the group of items in each rayon job. There is no
/// constraint on that returned type at all!
///
/// # Examples
///
/// ```
/// use rand::Rng;
/// use rayon::prelude::*;
///
/// let a: Vec<_> = (1i32..1_000_000)
/// .into_par_iter()
/// .map_init(
/// || rand::thread_rng(), // get the thread-local RNG
/// |rng, x| if rng.gen() { // randomly negate items
/// -x
/// } else {
/// x
/// },
/// ).collect();
///
/// // There's a remote chance that this will fail...
/// assert!(a.iter().any(|&x| x < 0));
/// assert!(a.iter().any(|&x| x > 0));
/// ```
fn map_init<O, T, S, U>(self, init: S, operation: O) -> MapInit<Self, S, O>
where
O: Fn(&mut U, Self::Item) -> T + Sync + Send,
S: Fn() -> U + Sync + Send,
T: Send,
{
MapInit::new(self, init, operation)
}
}

/// An iterator that supports "random access" to its data, meaning


+ 6
- 1
asparit/src/inner/for_each.rs 查看文件

@@ -91,9 +91,14 @@ mod tests {

#[tokio::test]
async fn test_for_each() {
use ::std::sync::Arc;
use ::std::sync::atomic::{AtomicUsize, Ordering};

let i = Arc::new(AtomicUsize::new(0));

let x = (0..10usize)
.into_par_iter()
.map_with(5, |init, item| Some((*init, item)))
.map_init(move || { i.fetch_add(1, Ordering::Relaxed) }, |init, item| Some((*init, item)))
.for_each(|j| {
println!("{:?}", j);
})


+ 4
- 4
asparit/src/inner/map.rs 查看文件

@@ -169,9 +169,9 @@ where
)
}

fn fold_with<G>(self, folder: G) -> G
fn fold_with<F>(self, folder: F) -> F
where
G: Folder<Self::Item>,
F: Folder<Self::Item>,
{
let folder = MapFolder {
base: folder,
@@ -226,9 +226,9 @@ where
)
}

fn fold_with<G>(self, folder: G) -> G
fn fold_with<F>(self, folder: F) -> F
where
G: Folder<Self::Item>,
F: Folder<Self::Item>,
{
let folder = MapFolder {
base: folder,


+ 332
- 0
asparit/src/inner/map_init.rs 查看文件

@@ -0,0 +1,332 @@
use crate::{
Consumer, Folder, IndexedConsumer, IndexedParallelIterator, IndexedProducer, Reducer,
IndexedProducerCallback, ParallelIterator, Producer, ProducerCallback, Executor,
};

use super::map_with::{MapWithIter, MapWithFolder};

/* MapInit */

pub struct MapInit<X, S, O> {
base: X,
init: S,
operation: O,
}

impl<X, S, O> MapInit<X, S, O> {
pub fn new(base: X, init: S, operation: O) -> Self {
Self { base, init, operation }
}
}

impl<'a, X, O, T, S, U> ParallelIterator<'a> for MapInit<X, S, O>
where
X: ParallelIterator<'a>,
O: Fn(&mut U, X::Item) -> T + Clone + Sync + Send + 'a,
T: Send,
S: Fn() -> U + Clone + Send + 'a,
{
type Item = T;

fn drive<E, C, D, R>(self, executor: E, consumer: C) -> E::Result
where
E: Executor<'a, D>,
C: Consumer<Self::Item, Result = D, Reducer = R> + 'a,
D: Send,
R: Reducer<D> + Send,
{
let consumer = MapInitConsumer::new(consumer, self.init, self.operation);

self.base.drive(executor, consumer)
}

fn with_producer<CB>(self, callback: CB) -> CB::Output
where
CB: ProducerCallback<'a, Self::Item>,
{
self.base.with_producer(MapInitCallback {
callback,
init: self.init,
operation: self.operation,
})
}

fn len_hint_opt(&self) -> Option<usize> {
self.base.len_hint_opt()
}
}

impl<'a, X, O, T, S, U> IndexedParallelIterator<'a> for MapInit<X, S, O>
where
X: IndexedParallelIterator<'a>,
O: Fn(&mut U, X::Item) -> T + Clone + Sync + Send + 'a,
T: Send,
S: Fn() -> U + Clone + Send + 'a,
{
fn drive_indexed<E, C, D, R>(self, executor: E, consumer: C) -> E::Result
where
E: Executor<'a, D>,
C: IndexedConsumer<Self::Item, Result = D, Reducer = R>,
D: Send,
R: Reducer<D>
{
let consumer = MapInitConsumer::new(consumer, self.init, self.operation);

self.base.drive_indexed(executor, consumer)
}

fn with_producer_indexed<CB>(self, callback: CB) -> CB::Output
where
CB: IndexedProducerCallback<'a, Self::Item>,
{
self.base.with_producer_indexed(MapInitCallback {
callback,
init: self.init,
operation: self.operation,
})
}

fn len_hint(&self) -> usize {
self.base.len_hint()
}
}

/* MapInitCallback */

struct MapInitCallback<CB, S, O> {
callback: CB,
init: S,
operation: O,
}

impl<'a, I, S, O, T, U, CB> ProducerCallback<'a, I> for MapInitCallback<CB, S, O>
where
CB: ProducerCallback<'a, T>,
O: Fn(&mut U, I) -> T + Clone + Sync + Send + 'a,
T: Send,
S: Fn() -> U + Clone + Send + 'a,
{
type Output = CB::Output;

fn callback<P>(self, base: P) -> CB::Output
where
P: Producer<Item = I> + 'a,
{
let producer = MapInitProducer {
base,
init: self.init,
operation: self.operation,
};

self.callback.callback(producer)
}
}

impl<'a, I, S, O, T, U, CB> IndexedProducerCallback<'a, I> for MapInitCallback<CB, S, O>
where
CB: IndexedProducerCallback<'a, T>,
O: Fn(&mut U, I) -> T + Clone + Sync + Send + 'a,
T: Send,
S: Fn() -> U + Clone + Send + 'a,
{
type Output = CB::Output;

fn callback<P>(self, base: P) -> CB::Output
where
P: IndexedProducer<Item = I> + 'a,
{
let producer = MapInitProducer {
base,
init: self.init,
operation: self.operation,
};

self.callback.callback(producer)
}
}

/* MapInitProducer */

struct MapInitProducer<P, S, O> {
base: P,
init: S,
operation: O,
}

impl<P, S, O, T, U> Producer for MapInitProducer<P, S, O>
where
P: Producer,
O: Fn(&mut U, P::Item) -> T + Clone + Sync + Send,
T: Send,
S: Fn() -> U + Clone + Send,
{
type Item = T;
type IntoIter = MapWithIter<P::IntoIter, U, O>;

fn into_iter(self) -> Self::IntoIter {
MapWithIter {
base: self.base.into_iter(),
item: (self.init)(),
operation: self.operation,
}
}

fn split(self) -> (Self, Option<Self>) {
let init = self.init;
let operation = self.operation;
let (left, right) = self.base.split();

(
MapInitProducer {
base: left,
init: init.clone(),
operation: operation.clone(),
},
right.map(|right| MapInitProducer {
base: right,
init,
operation,
}),
)
}

fn fold_with<F>(self, folder: F) -> F
where
F: Folder<Self::Item>,
{
let folder = MapWithFolder {
base: folder,
item: (self.init)(),
operation: self.operation,
};

self.base.fold_with(folder).base
}
}

impl<P, S, O, T, U> IndexedProducer for MapInitProducer<P, S, O>
where
P: IndexedProducer,
O: Fn(&mut U, P::Item) -> T + Clone + Sync + Send,
T: Send,
S: Fn() -> U + Clone + Send,
{
type Item = T;
type IntoIter = MapWithIter<P::IntoIter, U, O>;

fn into_iter(self) -> Self::IntoIter {
MapWithIter {
base: self.base.into_iter(),
item: (self.init)(),
operation: self.operation,
}
}

fn splits(&self) -> Option<usize> {
self.base.splits()
}

fn len(&self) -> usize {
self.base.len()
}

fn min_len(&self) -> Option<usize> {
self.base.min_len()
}

fn max_len(&self) -> Option<usize> {
self.base.max_len()
}

fn split_at(self, index: usize) -> (Self, Self) {
let (left, right) = self.base.split_at(index);

(
MapInitProducer {
base: left,
init: self.init.clone(),
operation: self.operation.clone(),
},
MapInitProducer {
base: right,
init: self.init,
operation: self.operation,
},
)
}

fn fold_with<F>(self, folder: F) -> F
where
F: Folder<Self::Item>,
{
let folder = MapWithFolder {
base: folder,
item: (self.init)(),
operation: self.operation,
};

self.base.fold_with(folder).base
}
}

/* MapInitConsumer */

struct MapInitConsumer<C, S, O> {
base: C,
init: S,
operation: O,
}

impl<C, S, O> MapInitConsumer<C, S, O> {
fn new(base: C, init: S, operation: O) -> Self {
Self { base, init, operation }
}
}

impl<I, T, C, S, U, O> Consumer<I> for MapInitConsumer<C, S, O>
where
C: Consumer<T>,
O: Fn(&mut U, I) -> T + Clone + Send + Sync,
T: Send,
S: Fn() -> U + Clone + Send,
{
type Folder = MapWithFolder<C::Folder, U, O>;
type Reducer = C::Reducer;
type Result = C::Result;

fn split_off_left(&self) -> (Self, Self::Reducer) {
let (left, reducer) = self.base.split_off_left();

(MapInitConsumer::new(left, self.init.clone(), self.operation.clone()), reducer)
}

fn into_folder(self) -> Self::Folder {
MapWithFolder {
base: self.base.into_folder(),
item: (self.init)(),
operation: self.operation,
}
}

fn is_full(&self) -> bool {
self.base.is_full()
}
}

impl<I, T, C, S, U, O> IndexedConsumer<I> for MapInitConsumer<C, S, O>
where
C: IndexedConsumer<T>,
O: Fn(&mut U, I) -> T + Clone + Send + Sync,
T: Send,
S: Fn() -> U + Clone + Send,
{
fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
let (left, right, reducer) = self.base.split_at(index);

(
MapInitConsumer::new(left, self.init.clone(), self.operation.clone()),
MapInitConsumer::new(right, self.init, self.operation),
reducer,
)
}
}

+ 12
- 12
asparit/src/inner/map_with.rs 查看文件

@@ -188,9 +188,9 @@ where
)
}

fn fold_with<G>(self, folder: G) -> G
fn fold_with<F>(self, folder: F) -> F
where
G: Folder<Self::Item>,
F: Folder<Self::Item>,
{
let folder = MapWithFolder {
base: folder,
@@ -253,9 +253,9 @@ where
)
}

fn fold_with<G>(self, folder: G) -> G
fn fold_with<F>(self, folder: F) -> F
where
G: Folder<Self::Item>,
F: Folder<Self::Item>,
{
let folder = MapWithFolder {
base: folder,
@@ -269,10 +269,10 @@ where

/* MapWithIter */

struct MapWithIter<I, S, O> {
base: I,
item: S,
operation: O,
pub struct MapWithIter<I, S, O> {
pub base: I,
pub item: S,
pub operation: O,
}

impl<I, S, O, T> Iterator for MapWithIter<I, S, O>
@@ -375,10 +375,10 @@ where

/* MapWithFolder */

struct MapWithFolder<F, S, O> {
base: F,
item: S,
operation: O,
pub struct MapWithFolder<F, S, O> {
pub base: F,
pub item: S,
pub operation: O,
}

impl<I, T, F, S, O> Folder<I> for MapWithFolder<F, S, O>


+ 1
- 0
asparit/src/inner/mod.rs 查看文件

@@ -2,3 +2,4 @@ pub mod for_each;
pub mod map;
pub mod noop;
pub mod map_with;
pub mod map_init;

正在加载...
取消
保存