소스 검색

Implemented 'map_with' operation

master
Bergmann89 5 년 전
부모
커밋
ac8b4498b3
5개의 변경된 파일485개의 추가작업 그리고 26개의 파일을 삭제
  1. +40
    -2
      asparit/src/core/iterator.rs
  2. +1
    -1
      asparit/src/inner/for_each.rs
  3. +21
    -23
      asparit/src/inner/map.rs
  4. +422
    -0
      asparit/src/inner/map_with.rs
  5. +1
    -0
      asparit/src/inner/mod.rs

+ 40
- 2
asparit/src/core/iterator.rs 파일 보기

@@ -1,6 +1,6 @@
use super::{Consumer, IndexedConsumer, IndexedProducerCallback, ProducerCallback, Executor, Reducer};

use crate::inner::{for_each::ForEach, map::Map};
use crate::inner::{for_each::ForEach, map::Map, map_with::MapWith};

/// Parallel version of the standard iterator trait.
///
@@ -96,7 +96,7 @@ pub trait ParallelIterator<'a>: Sized + Send {
ForEach::new(self, operation)
}

/// Applies `map_op` to each item of this iterator, producing a new
/// Applies `operation` to each item of this iterator, producing a new
/// iterator with the results.
///
/// # Examples
@@ -117,6 +117,44 @@ pub trait ParallelIterator<'a>: Sized + Send {
{
Map::new(self, operation)
}

/// Applies `operation` to the given `init` value with each item of this
/// iterator, producing a new iterator with the results.
///
/// The `init` value will be cloned only as needed to be paired with
/// the group of items in each rayon job. It does not require the type
/// to be `Sync`.
///
/// # Examples
///
/// ```
/// use std::sync::mpsc::channel;
/// use rayon::prelude::*;
///
/// let (sender, receiver) = channel();
///
/// let a: Vec<_> = (0..5)
/// .into_par_iter() // iterating over i32
/// .map_with(sender, |s, x| {
/// s.send(x).unwrap(); // sending i32 values through the channel
/// x // returning i32
/// })
/// .collect(); // collecting the returned values into a vector
///
/// let mut b: Vec<_> = receiver.iter() // iterating over the values in the channel
/// .collect(); // and collecting them
/// b.sort();
///
/// assert_eq!(a, b);
/// ```
fn map_with<O, T, S>(self, init: S, operation: O) -> MapWith<Self, S, O>
where
O: Fn(&mut S, Self::Item) -> T + Sync + Send,
S: Send + Clone,
T: Send,
{
MapWith::new(self, init, operation)
}
}

/// An iterator that supports "random access" to its data, meaning


+ 1
- 1
asparit/src/inner/for_each.rs 파일 보기

@@ -93,7 +93,7 @@ mod tests {
async fn test_for_each() {
let x = (0..10usize)
.into_par_iter()
.map(Some)
.map_with(5, |init, item| Some((*init, item)))
.for_each(|j| {
println!("{:?}", j);
})


+ 21
- 23
asparit/src/inner/map.rs 파일 보기

@@ -19,7 +19,7 @@ impl<X, O> Map<X, O> {
impl<'a, X, O, T> ParallelIterator<'a> for Map<X, O>
where
X: ParallelIterator<'a>,
O: Fn(X::Item) -> T + Sync + Send + Copy + 'a,
O: Fn(X::Item) -> T + Clone + Sync + Send + 'a,
T: Send,
{
type Item = O::Output;
@@ -54,7 +54,7 @@ where
impl<'a, X, O, T> IndexedParallelIterator<'a> for Map<X, O>
where
X: IndexedParallelIterator<'a>,
O: Fn(X::Item) -> T + Sync + Send + Copy + 'a,
O: Fn(X::Item) -> T + Clone + Sync + Send + 'a,
T: Send,
{
fn drive_indexed<E, C, D, R>(self, executor: E, consumer: C) -> E::Result
@@ -94,7 +94,7 @@ struct MapCallback<CB, O> {
impl<'a, I, O, T, CB> ProducerCallback<'a, I> for MapCallback<CB, O>
where
CB: ProducerCallback<'a, T>,
O: Fn(I) -> T + Sync + Send + Copy + 'a,
O: Fn(I) -> T + Clone + Sync + Send + 'a,
T: Send,
{
type Output = CB::Output;
@@ -115,7 +115,7 @@ where
impl<'a, I, O, T, CB> IndexedProducerCallback<'a, I> for MapCallback<CB, O>
where
CB: IndexedProducerCallback<'a, T>,
O: Fn(I) -> T + Sync + Send + Copy + 'a,
O: Fn(I) -> T + Clone + Sync + Send + 'a,
T: Send,
{
type Output = CB::Output;
@@ -143,15 +143,14 @@ struct MapProducer<P, O> {
impl<P, O, T> Producer for MapProducer<P, O>
where
P: Producer,
O: Fn(P::Item) -> T + Sync + Send + Copy,
O: Fn(P::Item) -> T + Clone + Sync + Send,
T: Send,
{
type Item = O::Output;
type IntoIter = std::iter::Map<P::IntoIter, O>;

fn into_iter(self) -> Self::IntoIter {
// self.base.into_iter().map(self.operation)
unimplemented!()
self.base.into_iter().map(self.operation)
}

fn split(self) -> (Self, Option<Self>) {
@@ -161,7 +160,7 @@ where
(
MapProducer {
base: left,
operation,
operation: operation.clone(),
},
right.map(|right| MapProducer {
base: right,
@@ -186,7 +185,7 @@ where
impl<P, O, T> IndexedProducer for MapProducer<P, O>
where
P: IndexedProducer,
O: Fn(P::Item) -> T + Sync + Send + Copy,
O: Fn(P::Item) -> T + Clone + Sync + Send,
T: Send,
{
type Item = O::Output;
@@ -213,17 +212,16 @@ where
}

fn split_at(self, index: usize) -> (Self, Self) {
let operation = self.operation;
let (left, right) = self.base.split_at(index);

(
MapProducer {
base: left,
operation,
operation: self.operation.clone(),
},
MapProducer {
base: right,
operation,
operation: self.operation,
},
)
}
@@ -257,7 +255,7 @@ impl<C, O> MapConsumer<C, O> {
impl<I, T, C, O> Consumer<I> for MapConsumer<C, O>
where
C: Consumer<O::Output>,
O: Fn(I) -> T + Send + Sync + Copy,
O: Fn(I) -> T + Clone + Send + Sync,
T: Send,
{
type Folder = MapFolder<C::Folder, O>;
@@ -267,7 +265,7 @@ where
fn split_off_left(&self) -> (Self, Self::Reducer) {
let (left, reducer) = self.base.split_off_left();

(MapConsumer::new(left, self.operation), reducer)
(MapConsumer::new(left, self.operation.clone()), reducer)
}

fn into_folder(self) -> Self::Folder {
@@ -285,14 +283,14 @@ where
impl<I, T, C, O> IndexedConsumer<I> for MapConsumer<C, O>
where
C: IndexedConsumer<O::Output>,
O: Fn(I) -> T + Send + Sync + Copy,
O: Fn(I) -> T + Clone + Send + Sync,
T: Send,
{
fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
let (left, right, reducer) = self.base.split_at(index);

(
MapConsumer::new(left, self.operation),
MapConsumer::new(left, self.operation.clone()),
MapConsumer::new(right, self.operation),
reducer,
)
@@ -309,23 +307,23 @@ struct MapFolder<F, O> {
impl<I, T, F, O> Folder<I> for MapFolder<F, O>
where
F: Folder<O::Output>,
O: Fn(I) -> T + Copy,
O: Fn(I) -> T + Clone,
{
type Result = F::Result;

fn consume(self, item: I) -> Self {
fn consume(mut self, item: I) -> Self {
let mapped_item = (self.operation)(item);
MapFolder {
base: self.base.consume(mapped_item),
operation: self.operation,
}
self.base = self.base.consume(mapped_item);
self
}

fn consume_iter<X>(mut self, iter: X) -> Self
where
X: IntoIterator<Item = I>,
{
self.base = self.base.consume_iter(iter.into_iter().map(self.operation));
self.base = self.base.consume_iter(iter.into_iter().map(self.operation.clone()));

self
}


+ 422
- 0
asparit/src/inner/map_with.rs 파일 보기

@@ -0,0 +1,422 @@
use crate::{
Consumer, Folder, IndexedConsumer, IndexedParallelIterator, IndexedProducer, Reducer,
IndexedProducerCallback, ParallelIterator, Producer, ProducerCallback, Executor,
};

/* MapWith */

pub struct MapWith<X, S, O> {
base: X,
item: S,
operation: O,
}

impl<X, S, O> MapWith<X, S, O> {
pub fn new(base: X, item: S, operation: O) -> Self {
Self { base, item, operation }
}
}

impl<'a, X, O, T, S> ParallelIterator<'a> for MapWith<X, S, O>
where
X: ParallelIterator<'a>,
O: Fn(&mut S, X::Item) -> T + Clone + Sync + Send + 'a,
T: Send,
S: Clone + Send + 'a,
{
type Item = T;

fn drive<E, C, D, R>(self, executor: E, consumer: C) -> E::Result
where
E: Executor<'a, D>,
C: Consumer<Self::Item, Result = D, Reducer = R> + 'a,
D: Send,
R: Reducer<D> + Send,
{
let consumer = MapWithConsumer::new(consumer, self.item, self.operation);

self.base.drive(executor, consumer)
}

fn with_producer<CB>(self, callback: CB) -> CB::Output
where
CB: ProducerCallback<'a, Self::Item>,
{
self.base.with_producer(MapWithCallback {
callback,
item: self.item,
operation: self.operation,
})
}

fn len_hint_opt(&self) -> Option<usize> {
self.base.len_hint_opt()
}
}

impl<'a, X, O, T, S> IndexedParallelIterator<'a> for MapWith<X, S, O>
where
X: IndexedParallelIterator<'a>,
O: Fn(&mut S, X::Item) -> T + Clone + Sync + Send + 'a,
T: Send,
S: Clone + Send + 'a,
{
fn drive_indexed<E, C, D, R>(self, executor: E, consumer: C) -> E::Result
where
E: Executor<'a, D>,
C: IndexedConsumer<Self::Item, Result = D, Reducer = R>,
D: Send,
R: Reducer<D>
{
let consumer = MapWithConsumer::new(consumer, self.item, self.operation);

self.base.drive_indexed(executor, consumer)
}

fn with_producer_indexed<CB>(self, callback: CB) -> CB::Output
where
CB: IndexedProducerCallback<'a, Self::Item>,
{
self.base.with_producer_indexed(MapWithCallback {
callback,
item: self.item,
operation: self.operation,
})
}

fn len_hint(&self) -> usize {
self.base.len_hint()
}
}

/* MapWithCallback */

struct MapWithCallback<CB, S, O> {
callback: CB,
item: S,
operation: O,
}

impl<'a, I, S, O, T, CB> ProducerCallback<'a, I> for MapWithCallback<CB, S, O>
where
CB: ProducerCallback<'a, T>,
O: Fn(&mut S, I) -> T + Clone + Sync + Send + 'a,
T: Send,
S: Clone + Send + 'a,
{
type Output = CB::Output;

fn callback<P>(self, base: P) -> CB::Output
where
P: Producer<Item = I> + 'a,
{
let producer = MapWithProducer {
base,
item: self.item,
operation: self.operation,
};

self.callback.callback(producer)
}
}

impl<'a, I, S, O, T, CB> IndexedProducerCallback<'a, I> for MapWithCallback<CB, S, O>
where
CB: IndexedProducerCallback<'a, T>,
O: Fn(&mut S, I) -> T + Clone + Sync + Send + 'a,
T: Send,
S: Clone + Send + 'a,
{
type Output = CB::Output;

fn callback<P>(self, base: P) -> CB::Output
where
P: IndexedProducer<Item = I> + 'a,
{
let producer = MapWithProducer {
base,
item: self.item,
operation: self.operation,
};

self.callback.callback(producer)
}
}

/* MapWithProducer */

struct MapWithProducer<P, S, O> {
base: P,
item: S,
operation: O,
}

impl<P, S, O, T> Producer for MapWithProducer<P, S, O>
where
P: Producer,
O: Fn(&mut S, P::Item) -> T + Clone + Sync + Send,
T: Send,
S: Clone + Send,
{
type Item = T;
type IntoIter = MapWithIter<P::IntoIter, S, O>;

fn into_iter(self) -> Self::IntoIter {
MapWithIter {
base: self.base.into_iter(),
item: self.item,
operation: self.operation,
}
}

fn split(self) -> (Self, Option<Self>) {
let item = self.item;
let operation = self.operation;
let (left, right) = self.base.split();

(
MapWithProducer {
base: left,
item: item.clone(),
operation: operation.clone(),
},
right.map(|right| MapWithProducer {
base: right,
item,
operation,
}),
)
}

fn fold_with<G>(self, folder: G) -> G
where
G: Folder<Self::Item>,
{
let folder = MapWithFolder {
base: folder,
item: self.item,
operation: self.operation,
};

self.base.fold_with(folder).base
}
}

impl<P, S, O, T> IndexedProducer for MapWithProducer<P, S, O>
where
P: IndexedProducer,
O: Fn(&mut S, P::Item) -> T + Clone + Sync + Send,
T: Send,
S: Clone + Send,
{
type Item = T;
type IntoIter = MapWithIter<P::IntoIter, S, O>;

fn into_iter(self) -> Self::IntoIter {
MapWithIter {
base: self.base.into_iter(),
item: self.item,
operation: self.operation,
}
}

fn splits(&self) -> Option<usize> {
self.base.splits()
}

fn len(&self) -> usize {
self.base.len()
}

fn min_len(&self) -> Option<usize> {
self.base.min_len()
}

fn max_len(&self) -> Option<usize> {
self.base.max_len()
}

fn split_at(self, index: usize) -> (Self, Self) {
let (left, right) = self.base.split_at(index);

(
MapWithProducer {
base: left,
item: self.item.clone(),
operation: self.operation.clone(),
},
MapWithProducer {
base: right,
item: self.item,
operation: self.operation,
},
)
}

fn fold_with<G>(self, folder: G) -> G
where
G: Folder<Self::Item>,
{
let folder = MapWithFolder {
base: folder,
item: self.item,
operation: self.operation,
};

self.base.fold_with(folder).base
}
}

/* MapWithIter */

struct MapWithIter<I, S, O> {
base: I,
item: S,
operation: O,
}

impl<I, S, O, T> Iterator for MapWithIter<I, S, O>
where
I: Iterator,
O: Fn(&mut S, I::Item) -> T,
{
type Item = T;

fn next(&mut self) -> Option<T> {
let item = self.base.next()?;

Some((self.operation)(&mut self.item, item))
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.base.size_hint()
}
}

impl<I, S, O, T> DoubleEndedIterator for MapWithIter<I, S, O>
where
I: DoubleEndedIterator,
O: Fn(&mut S, I::Item) -> T,
{
fn next_back(&mut self) -> Option<T> {
let item = self.base.next_back()?;

Some((self.operation)(&mut self.item, item))
}
}

impl<I, S, O, T> ExactSizeIterator for MapWithIter<I, S, O>
where
I: ExactSizeIterator,
O: Fn(&mut S, I::Item) -> T,
{ }

/* MapWithConsumer */

struct MapWithConsumer<C, S, O> {
base: C,
item: S,
operation: O,
}

impl<C, S, O> MapWithConsumer<C, S, O> {
fn new(base: C, item: S, operation: O) -> Self {
Self { base, item, operation }
}
}

impl<I, T, C, S, O> Consumer<I> for MapWithConsumer<C, S, O>
where
C: Consumer<T>,
O: Fn(&mut S, I) -> T + Clone + Send + Sync,
T: Send,
S: Clone + Send,
{
type Folder = MapWithFolder<C::Folder, S, O>;
type Reducer = C::Reducer;
type Result = C::Result;

fn split_off_left(&self) -> (Self, Self::Reducer) {
let (left, reducer) = self.base.split_off_left();

(MapWithConsumer::new(left, self.item.clone(), self.operation.clone()), reducer)
}

fn into_folder(self) -> Self::Folder {
MapWithFolder {
base: self.base.into_folder(),
item: self.item,
operation: self.operation,
}
}

fn is_full(&self) -> bool {
self.base.is_full()
}
}

impl<I, T, C, S, O> IndexedConsumer<I> for MapWithConsumer<C, S, O>
where
C: IndexedConsumer<T>,
O: Fn(&mut S, I) -> T + Clone + Send + Sync,
T: Send,
S: Clone + Send,
{
fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
let (left, right, reducer) = self.base.split_at(index);

(
MapWithConsumer::new(left, self.item.clone(), self.operation.clone()),
MapWithConsumer::new(right, self.item, self.operation),
reducer,
)
}
}

/* MapWithFolder */

struct MapWithFolder<F, S, O> {
base: F,
item: S,
operation: O,
}

impl<I, T, F, S, O> Folder<I> for MapWithFolder<F, S, O>
where
F: Folder<T>,
O: Fn(&mut S, I) -> T + Clone,
{
type Result = F::Result;

fn consume(mut self, item: I) -> Self {
let mapped_item = (self.operation)(&mut self.item, item);

self.base = self.base.consume(mapped_item);

self
}

fn consume_iter<X>(mut self, iter: X) -> Self
where
X: IntoIterator<Item = I>,
{
fn with<'f, I, S, T>(item: &'f mut S, operation: impl Fn(&mut S, I) -> T + 'f,
) -> impl FnMut(I) -> T + 'f {
move |x| operation(item, x)
}

let mapped_iter = iter.into_iter().map(with(&mut self.item, self.operation.clone()));

self.base = self.base.consume_iter(mapped_iter);

self
}

fn complete(self) -> F::Result {
self.base.complete()
}

fn is_full(&self) -> bool {
self.base.is_full()
}
}

+ 1
- 0
asparit/src/inner/mod.rs 파일 보기

@@ -1,3 +1,4 @@
pub mod for_each;
pub mod map;
pub mod noop;
pub mod map_with;

불러오는 중...
취소
저장