瀏覽代碼

Refactored executor and implemented map operation

master
Bergmann89 5 年之前
父節點
當前提交
261023601f
共有 14 個文件被更改,包括 548 次插入174 次删除
  1. +0
    -30
      asparit/src/core/collector.rs
  2. +3
    -3
      asparit/src/core/consumer.rs
  3. +11
    -0
      asparit/src/core/driver.rs
  4. +59
    -11
      asparit/src/core/executor.rs
  5. +6
    -4
      asparit/src/core/folder.rs
  6. +51
    -24
      asparit/src/core/iterator.rs
  7. +3
    -3
      asparit/src/core/mod.rs
  8. +17
    -5
      asparit/src/core/producer.rs
  9. +21
    -52
      asparit/src/executor/sequential.rs
  10. +29
    -32
      asparit/src/inner/for_each.rs
  11. +332
    -0
      asparit/src/inner/map.rs
  12. +1
    -0
      asparit/src/inner/mod.rs
  13. +3
    -3
      asparit/src/lib.rs
  14. +12
    -7
      asparit/src/std/range.rs

+ 0
- 30
asparit/src/core/collector.rs 查看文件

@@ -1,30 +0,0 @@
use crate::{
Consumer, DefaultExecutor, Executor, IndexedConsumer, IndexedExecutor, IndexedParallelIterator,
ParallelIterator,
};

pub trait Collector: Sized {
type Iterator: ParallelIterator;
type Consumer: Consumer<<Self::Iterator as ParallelIterator>::Item>;

fn exec_with<E>(self, executor: E) -> E::Result
where
E: Executor<Self::Iterator, Self::Consumer>;

fn exec(self) -> <DefaultExecutor as Executor<Self::Iterator, Self::Consumer>>::Result {
self.exec_with(DefaultExecutor::default())
}
}

pub trait IndexedCollector: Sized {
type Iterator: IndexedParallelIterator;
type Consumer: IndexedConsumer<<Self::Iterator as ParallelIterator>::Item>;

fn exec_with<E>(self, executor: E) -> E::Result
where
E: IndexedExecutor<Self::Iterator, Self::Consumer>;

fn exec(self) -> <DefaultExecutor as Executor<Self::Iterator, Self::Consumer>>::Result {
self.exec_with(DefaultExecutor::default())
}
}

+ 3
- 3
asparit/src/core/consumer.rs 查看文件

@@ -14,9 +14,9 @@ use super::{Folder, Reducer};
/// [fold]: https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.fold
/// [`Folder`]: trait.Folder.html
/// [`Producer`]: trait.Producer.html
pub trait Consumer<Item>: Send + Sized {
pub trait Consumer<I>: Send + Sized {
/// The type of folder that this consumer can be converted into.
type Folder: Folder<Item, Result = Self::Result>;
type Folder: Folder<I, Result = Self::Result>;

/// The type of reducer that is produced if this consumer is split.
type Reducer: Reducer<Self::Result>;
@@ -47,7 +47,7 @@ pub trait Consumer<Item>: Send + Sized {
/// A stateless consumer can be freely copied. These consumers can be
/// used like regular consumers, but they also support a
/// `split_at` method that does take an index to split.
pub trait IndexedConsumer<Item>: Consumer<Item> {
pub trait IndexedConsumer<I>: Consumer<I> {
/// Divide the consumer into two consumers, one processing items
/// `0..index` and one processing items from `index..`. Also
/// produces a reducer that can be used to reduce the results at


+ 11
- 0
asparit/src/core/driver.rs 查看文件

@@ -0,0 +1,11 @@
use crate::{Executor, DefaultExecutor};

pub trait Driver<D>: Sized
where D: Send,
{
fn exec_with<E>(self, executor: E) -> E::Result
where E: Executor<D>;

fn exec(self) -> <DefaultExecutor as Executor<D>>::Result
{ self.exec_with(DefaultExecutor::default()) }
}

+ 59
- 11
asparit/src/core/executor.rs 查看文件

@@ -1,21 +1,69 @@
use super::{Consumer, IndexedConsumer, IndexedParallelIterator, ParallelIterator};
use super::{
Consumer, IndexedConsumer, Producer, IndexedProducer, Reducer, ProducerCallback, IndexedProducerCallback,
};

pub trait Executor<I, C>
pub trait Executor<D>
where D: Send,
{
type Result: Send;

fn exec<P, C, R>(self, producer: P, consumer: C) -> Self::Result
where
P: Producer,
C: Consumer<P::Item, Result = D, Reducer = R>,
R: Reducer<D>;

fn exec_indexed<P, C, R>(self, producer: P, consumer: C) -> Self::Result
where
P: IndexedProducer,
C: IndexedConsumer<P::Item, Result = D, Reducer = R>,
R: Reducer<D>;
}

pub struct ExecutorCallback<E, C> {
executor: E,
consumer: C,
}

impl<E, C> ExecutorCallback<E, C> {
pub fn new(executor: E, consumer: C) -> Self {
Self {
executor,
consumer,
}
}
}

impl<E, D, C, I, R> ProducerCallback<I> for ExecutorCallback<E, C>
where
I: ParallelIterator,
C: Consumer<I::Item>,
E: Executor<D>,
D: Send,
C: Consumer<I, Result = D, Reducer = R>,
R: Reducer<D>,
{
type Result;
type Output = E::Result;

fn exec(self, iterator: I, consumer: C) -> Self::Result;
fn callback<P>(self, producer: P) -> Self::Output
where
P: Producer<Item = I>
{
self.executor.exec(producer, self.consumer)
}
}

pub trait IndexedExecutor<I, C>
impl<E, D, C, I, R> IndexedProducerCallback<I> for ExecutorCallback<E, C>
where
I: IndexedParallelIterator,
C: IndexedConsumer<I::Item>,
E: Executor<D>,
D: Send,
C: IndexedConsumer<I, Result = D, Reducer = R>,
R: Reducer<D>,
{
type Result;
type Output = E::Result;

fn exec_indexed(self, iterator: I, consumer: C) -> Self::Result;
fn callback<P>(self, producer: P) -> Self::Output
where
P: IndexedProducer<Item = I>
{
self.executor.exec_indexed(producer, self.consumer)
}
}

+ 6
- 4
asparit/src/core/folder.rs 查看文件

@@ -4,12 +4,12 @@
/// be converted (using `complete`) into a final value.
///
/// [fold]: https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.fold
pub trait Folder<Item>: Sized {
pub trait Folder<I>: Sized {
/// The type of result that will ultimately be produced by the folder.
type Result;

/// Consume next item and return new sequential state.
fn consume(self, item: Item) -> Self;
fn consume(self, item: I) -> Self;

/// Consume items from the iterator until full, and return new sequential state.
///
@@ -19,16 +19,18 @@ pub trait Folder<Item>: Sized {
///
/// The main reason to override it is if you can provide a more
/// specialized, efficient implementation.
fn consume_iter<I>(mut self, iter: I) -> Self
fn consume_iter<X>(mut self, iter: X) -> Self
where
I: IntoIterator<Item = Item>,
X: IntoIterator<Item = I>,
{
for item in iter {
self = self.consume(item);

if self.is_full() {
break;
}
}

self
}



+ 51
- 24
asparit/src/core/iterator.rs 查看文件

@@ -1,6 +1,6 @@
use super::{Consumer, Executor, IndexedConsumer, IndexedProducerCallback, ProducerCallback};
use super::{Consumer, IndexedConsumer, IndexedProducerCallback, ProducerCallback, Executor, Reducer};

use crate::inner::for_each::ForEach;
use crate::inner::{for_each::ForEach, map::Map};

/// Parallel version of the standard iterator trait.
///
@@ -36,10 +36,12 @@ pub trait ParallelIterator: Sized + Send {
/// iterators.
///
/// [README]: README.md
fn drive<E, C>(self, executor: E, consumer: C) -> E::Result
fn drive<E, C, D, R>(self, executor: E, consumer: C) -> E::Result
where
E: Executor<Self, C>,
C: Consumer<Self::Item>;
E: Executor<D>,
C: Consumer<Self::Item, Result = D, Reducer = R>,
D: Send,
R: Reducer<D>;

/// Internal method used to define the behavior of this parallel
/// iterator. You should not need to call this directly.
@@ -87,36 +89,42 @@ pub trait ParallelIterator: Sized + Send {
///
/// (0..100).into_par_iter().for_each(|x| println!("{:?}", x));
/// ```
fn for_each<F>(self, operation: F) -> ForEach<Self, F>
fn for_each<O>(self, operation: O) -> ForEach<Self, O>
where
F: Fn(Self::Item) + Sync + Send,
O: Fn(Self::Item),
{
ForEach::new(self, operation)
}
}

/// An iterator that supports "random access" to its data, meaning
/// that you can split it at arbitrary indices and draw data from
/// those points.
///
/// **Note:** Not implemented for `u64`, `i64`, `u128`, or `i128` ranges
pub trait IndexedParallelIterator: ParallelIterator {
/// Produces an exact count of how many items this iterator will
/// produce, presuming no panic occurs.
/// Applies `map_op` to each item of this iterator, producing a new
/// iterator with the results.
///
/// # Examples
///
/// ```
/// use asparit::*;
/// use rayon::prelude::*;
///
/// let par_iter = (0..100).into_par_iter().zip(vec![0; 10]);
/// assert_eq!(par_iter.len(), 10);
/// let mut par_iter = (0..5).into_par_iter().map(|x| x * 2);
///
/// let vec: Vec<_> = par_iter.collect();
/// assert_eq!(vec.len(), 10);
/// let doubles: Vec<_> = par_iter.collect();
///
/// assert_eq!(&doubles[..], &[0, 2, 4, 6, 8]);
/// ```
fn len_hint(&self) -> usize;
fn map<O, T>(self, operation: O) -> Map<Self, O>
where
O: Fn(Self::Item) -> T + Sync + Send,
T: Send,
{
Map::new(self, operation)
}
}

/// An iterator that supports "random access" to its data, meaning
/// that you can split it at arbitrary indices and draw data from
/// those points.
///
/// **Note:** Not implemented for `u64`, `i64`, `u128`, or `i128` ranges
pub trait IndexedParallelIterator: ParallelIterator {
/// Internal method used to define the behavior of this parallel
/// iterator. You should not need to call this directly.
///
@@ -131,9 +139,12 @@ pub trait IndexedParallelIterator: ParallelIterator {
/// iterators.
///
/// [README]: README.md
fn drive_indexed<C>(self, consumer: C) -> C::Result
fn drive_indexed<E, C, D, R>(self, executor: E, consumer: C) -> E::Result
where
C: IndexedConsumer<Self::Item>;
E: Executor<D>,
C: IndexedConsumer<Self::Item, Result = D, Reducer = R>,
D: Send,
R: Reducer<D>;

/// Internal method used to define the behavior of this parallel
/// iterator. You should not need to call this directly.
@@ -153,4 +164,20 @@ pub trait IndexedParallelIterator: ParallelIterator {
fn with_producer_indexed<CB>(self, callback: CB) -> CB::Output
where
CB: IndexedProducerCallback<Self::Item>;

/// Produces an exact count of how many items this iterator will
/// produce, presuming no panic occurs.
///
/// # Examples
///
/// ```
/// use asparit::*;
///
/// let par_iter = (0..100).into_par_iter().zip(vec![0; 10]);
/// assert_eq!(par_iter.len(), 10);
///
/// let vec: Vec<_> = par_iter.collect();
/// assert_eq!(vec.len(), 10);
/// ```
fn len_hint(&self) -> usize;
}

+ 3
- 3
asparit/src/core/mod.rs 查看文件

@@ -1,5 +1,5 @@
mod collector;
mod consumer;
mod driver;
mod executor;
mod folder;
mod into_iter;
@@ -7,9 +7,9 @@ mod iterator;
mod producer;
mod reducer;

pub use collector::Collector;
pub use consumer::{Consumer, IndexedConsumer};
pub use executor::{Executor, IndexedExecutor};
pub use driver::Driver;
pub use executor::{Executor, ExecutorCallback};
pub use folder::Folder;
pub use into_iter::{IntoParallelIterator, IntoParallelRefIterator, IntoParallelRefMutIterator};
pub use iterator::{IndexedParallelIterator, ParallelIterator};


+ 17
- 5
asparit/src/core/producer.rs 查看文件

@@ -12,14 +12,26 @@ pub trait Producer: Send + Sized {
/// The type of item returned by this producer.
type Item;

/// The type of iterator we will become.
type IntoIter: Iterator<Item = Self::Item>;

/// Convert `self` into an iterator; at this point, no more parallel splits
/// are possible.
fn into_iter(self) -> Self::IntoIter;

/// Split midway into a new producer if possible, otherwise return `None`.
fn split(self) -> (Self, Option<Self>);

/// Iterate the producer, feeding each element to `folder`, and
/// stop when the folder is full (or all elements have been consumed).
///
/// The provided implementation is sufficient for most iterables.
fn fold_with<F>(self, folder: F) -> F
where
F: Folder<Self::Item>;
F: Folder<Self::Item>,
{
folder.consume_iter(self.into_iter())
}
}

/// A `Producer` is effectively a "splittable `IntoIterator`". That
@@ -107,7 +119,7 @@ pub trait IndexedProducer: Send + Sized {
///
/// [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md#producer-callback
/// [FnOnce]: https://doc.rust-lang.org/std/ops/trait.FnOnce.html
pub trait ProducerCallback<T> {
pub trait ProducerCallback<I> {
/// The type of value returned by this callback. Analogous to
/// [`Output` from the `FnOnce` trait][Output].
///
@@ -119,7 +131,7 @@ pub trait ProducerCallback<T> {
/// `P`, and hence implementors must be defined for any producer.
fn callback<P>(self, producer: P) -> Self::Output
where
P: Producer<Item = T>;
P: Producer<Item = I>;
}

/// The `IndexedProducerCallback` trait is a kind of generic closure,
@@ -128,7 +140,7 @@ pub trait ProducerCallback<T> {
///
/// [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md#producer-callback
/// [FnOnce]: https://doc.rust-lang.org/std/ops/trait.FnOnce.html
pub trait IndexedProducerCallback<T> {
pub trait IndexedProducerCallback<I> {
/// The type of value returned by this callback. Analogous to
/// [`Output` from the `FnOnce` trait][Output].
///
@@ -140,5 +152,5 @@ pub trait IndexedProducerCallback<T> {
/// `P`, and hence implementors must be defined for any producer.
fn callback<P>(self, producer: P) -> Self::Output
where
P: IndexedProducer<Item = T>;
P: IndexedProducer<Item = I>;
}

+ 21
- 52
asparit/src/executor/sequential.rs 查看文件

@@ -1,70 +1,39 @@
use crate::core::{
Consumer, Executor, Folder, IndexedConsumer, IndexedExecutor, IndexedParallelIterator,
IndexedProducer, IndexedProducerCallback, ParallelIterator, Producer, ProducerCallback,
Consumer, Executor, Folder, IndexedConsumer, IndexedProducer,
Producer, Reducer,
};

#[derive(Default)]
pub struct Sequential;

struct Callback<C> {
consumer: C,
}

struct IndexedCallback<C> {
consumer: C,
}

impl<I, C> Executor<I, C> for Sequential
where
I: ParallelIterator,
C: Consumer<I::Item>,
{
type Result = C::Result;

fn exec(self, iterator: I, consumer: C) -> Self::Result {
iterator.with_producer(Callback { consumer })
}
}

impl<I, C> IndexedExecutor<I, C> for Sequential
where
I: IndexedParallelIterator,
C: IndexedConsumer<I::Item>,
impl<D> Executor<D> for Sequential
where D: Send,
{
type Result = C::Result;
type Result = D;

fn exec_indexed(self, iterator: I, consumer: C) -> Self::Result {
iterator.with_producer_indexed(IndexedCallback { consumer })
}
}

impl<C, T> ProducerCallback<T> for Callback<C>
where
C: Consumer<T>,
{
type Output = C::Result;

fn callback<P>(self, producer: P) -> C::Result
fn exec<P, C, R>(self, producer: P, consumer: C) -> Self::Result
where
P: Producer<Item = T>,
P: Producer,
C: Consumer<P::Item, Result = D, Reducer = R>,
R: Reducer<D>,
{
if self.consumer.is_full() {
self.consumer.into_folder().complete()
if consumer.is_full() {
consumer.into_folder().complete()
} else {
producer.fold_with(self.consumer.into_folder()).complete()
producer.fold_with(consumer.into_folder()).complete()
}
}
}

impl<C, T> IndexedProducerCallback<T> for IndexedCallback<C>
where
C: IndexedConsumer<T>,
{
type Output = C::Result;
fn callback<P>(self, _producer: P) -> C::Result
fn exec_indexed<P, C, R>(self, producer: P, consumer: C) -> Self::Result
where
P: IndexedProducer<Item = T>,
P: IndexedProducer,
C: IndexedConsumer<P::Item, Result = D, Reducer = R>,
R: Reducer<D>,
{
self.consumer.into_folder().complete()
if consumer.is_full() {
consumer.into_folder().complete()
} else {
producer.fold_with(consumer.into_folder()).complete()
}
}
}

+ 29
- 32
asparit/src/inner/for_each.rs 查看文件

@@ -1,17 +1,15 @@
use crate::{
core::{Collector, Executor},
Consumer, Folder, ParallelIterator,
};
use crate::{core::Driver, Consumer, Folder, ParallelIterator, Executor};

use super::noop::NoOpReducer;

pub struct ForEach<I, F> {
iterator: I,
operation: F,
pub struct ForEach<X, O> {
iterator: X,
operation: O,
}

impl<I, F> ForEach<I, F> {
pub fn new(iterator: I, operation: F) -> Self {
impl<X, O> ForEach<X, O>
{
pub fn new(iterator: X, operation: O) -> Self {
Self {
iterator,
operation,
@@ -19,17 +17,13 @@ impl<I, F> ForEach<I, F> {
}
}

impl<I, F> Collector for ForEach<I, F>
impl<X, O> Driver<()> for ForEach<X, O>
where
I: ParallelIterator,
F: Fn(I::Item) + Sync + Send + Copy,
X: ParallelIterator,
O: Fn(X::Item) + Clone + Send,
{
type Iterator = I;
type Consumer = ForEachConsumer<F>;

fn exec_with<E>(self, executor: E) -> E::Result
where
E: Executor<Self::Iterator, Self::Consumer>,
where E: Executor<()>
{
let iterator = self.iterator;
let operation = self.operation;
@@ -40,22 +34,22 @@ where
}
}

pub struct ForEachConsumer<F> {
operation: F,
pub struct ForEachConsumer<O> {
operation: O,
}

impl<F, T> Consumer<T> for ForEachConsumer<F>
impl<O, I> Consumer<I> for ForEachConsumer<O>
where
F: Fn(T) + Sync + Send + Copy,
O: Fn(I) + Clone + Send,
{
type Folder = ForEachConsumer<F>;
type Folder = ForEachConsumer<O>;
type Reducer = NoOpReducer;
type Result = ();

fn split_off_left(&self) -> (Self, NoOpReducer) {
(
ForEachConsumer {
operation: self.operation,
operation: self.operation.clone(),
},
NoOpReducer,
)
@@ -66,23 +60,23 @@ where
}
}

impl<F, T> Folder<T> for ForEachConsumer<F>
impl<O, I> Folder<I> for ForEachConsumer<O>
where
F: Fn(T) + Sync + Send + Copy,
O: Fn(I),
{
type Result = ();

fn consume(self, item: T) -> Self {
fn consume(self, item: I) -> Self {
(self.operation)(item);

self
}

fn consume_iter<I>(self, iter: I) -> Self
fn consume_iter<X>(self, iter: X) -> Self
where
I: IntoIterator<Item = T>,
X: IntoIterator<Item = I>,
{
iter.into_iter().for_each(self.operation);
iter.into_iter().for_each(&self.operation);

self
}
@@ -97,11 +91,14 @@ mod tests {

#[test]
fn test_for_each() {
(0..10usize)
let x = (0..10usize)
.into_par_iter()
.for_each(&|j| {
println!("{}", j);
.map(Some)
.for_each(|j| {
println!("{:?}", j);
})
.exec();

dbg!(x);
}
}

+ 332
- 0
asparit/src/inner/map.rs 查看文件

@@ -0,0 +1,332 @@
use crate::{
Consumer, Folder, IndexedConsumer, IndexedParallelIterator, IndexedProducer, Reducer,
IndexedProducerCallback, ParallelIterator, Producer, ProducerCallback, Executor,
};

/* Map */

pub struct Map<X, O> {
base: X,
operation: O,
}

impl<X, O> Map<X, O> {
pub fn new(base: X, operation: O) -> Self {
Self { base, operation }
}
}

impl<X, O, T> ParallelIterator for Map<X, O>
where
X: ParallelIterator,
O: Fn(X::Item) -> T + Sync + Send + Copy,
T: Send,
{
type Item = O::Output;

fn drive<E, C, D, R>(self, executor: E, consumer: C) -> E::Result
where
E: Executor<D>,
C: Consumer<Self::Item, Result = D, Reducer = R>,
D: Send,
R: Reducer<D>
{
let consumer = MapConsumer::new(consumer, self.operation);

self.base.drive(executor, consumer)
}

fn with_producer<CB>(self, callback: CB) -> CB::Output
where
CB: ProducerCallback<Self::Item>,
{
self.base.with_producer(MapCallback {
callback,
operation: self.operation,
})
}

fn len_hint_opt(&self) -> Option<usize> {
self.base.len_hint_opt()
}
}

impl<X, O, T> IndexedParallelIterator for Map<X, O>
where
X: IndexedParallelIterator,
O: Fn(X::Item) -> T + Sync + Send + Copy,
T: Send,
{
fn drive_indexed<E, C, D, R>(self, executor: E, consumer: C) -> E::Result
where
E: Executor<D>,
C: IndexedConsumer<Self::Item, Result = D, Reducer = R>,
D: Send,
R: Reducer<D>
{
let consumer = MapConsumer::new(consumer, self.operation);

self.base.drive_indexed(executor, consumer)
}

fn with_producer_indexed<CB>(self, callback: CB) -> CB::Output
where
CB: IndexedProducerCallback<Self::Item>,
{
self.base.with_producer_indexed(MapCallback {
callback,
operation: self.operation,
})
}

fn len_hint(&self) -> usize {
self.base.len_hint()
}
}

/* MapCallback */

struct MapCallback<CB, O> {
callback: CB,
operation: O,
}

impl<I, O, T, CB> ProducerCallback<I> for MapCallback<CB, O>
where
CB: ProducerCallback<T>,
O: Fn(I) -> T + Sync + Send + Copy,
T: Send,
{
type Output = CB::Output;

fn callback<P>(self, base: P) -> CB::Output
where
P: Producer<Item = I>,
{
let producer = MapProducer {
base,
operation: self.operation,
};

self.callback.callback(producer)
}
}

impl<I, O, T, CB> IndexedProducerCallback<I> for MapCallback<CB, O>
where
CB: IndexedProducerCallback<T>,
O: Fn(I) -> T + Sync + Send + Copy,
T: Send,
{
type Output = CB::Output;

fn callback<P>(self, base: P) -> CB::Output
where
P: IndexedProducer<Item = I>,
{
let producer = MapProducer {
base,
operation: self.operation,
};

self.callback.callback(producer)
}
}

/* MapProducer */

struct MapProducer<P, O> {
base: P,
operation: O,
}

impl<P, O, T> Producer for MapProducer<P, O>
where
P: Producer,
O: Fn(P::Item) -> T + Sync + Send + Copy,
T: Send,
{
type Item = O::Output;
type IntoIter = std::iter::Map<P::IntoIter, O>;

fn into_iter(self) -> Self::IntoIter {
// self.base.into_iter().map(self.operation)
unimplemented!()
}

fn split(self) -> (Self, Option<Self>) {
let operation = self.operation;
let (left, right) = self.base.split();

(
MapProducer {
base: left,
operation,
},
right.map(|right| MapProducer {
base: right,
operation,
}),
)
}

fn fold_with<G>(self, folder: G) -> G
where
G: Folder<Self::Item>,
{
let folder = MapFolder {
base: folder,
operation: self.operation,
};

self.base.fold_with(folder).base
}
}

impl<P, O, T> IndexedProducer for MapProducer<P, O>
where
P: IndexedProducer,
O: Fn(P::Item) -> T + Sync + Send + Copy,
T: Send,
{
type Item = O::Output;
type IntoIter = std::iter::Map<P::IntoIter, O>;

fn into_iter(self) -> Self::IntoIter {
self.base.into_iter().map(self.operation)
}

fn min_len(&self) -> usize {
self.base.min_len()
}

fn max_len(&self) -> usize {
self.base.max_len()
}

fn split_at(self, index: usize) -> (Self, Self) {
let operation = self.operation;
let (left, right) = self.base.split_at(index);

(
MapProducer {
base: left,
operation,
},
MapProducer {
base: right,
operation,
},
)
}

fn fold_with<G>(self, folder: G) -> G
where
G: Folder<Self::Item>,
{
let folder = MapFolder {
base: folder,
operation: self.operation,
};

self.base.fold_with(folder).base
}
}

/* MapConsumer */

struct MapConsumer<C, O> {
base: C,
operation: O,
}

impl<C, O> MapConsumer<C, O> {
fn new(base: C, operation: O) -> Self {
Self { base, operation }
}
}

impl<I, T, C, O> Consumer<I> for MapConsumer<C, O>
where
C: Consumer<O::Output>,
O: Fn(I) -> T + Send + Sync + Copy,
T: Send,
{
type Folder = MapFolder<C::Folder, O>;
type Reducer = C::Reducer;
type Result = C::Result;

fn split_off_left(&self) -> (Self, Self::Reducer) {
let (left, reducer) = self.base.split_off_left();

(MapConsumer::new(left, self.operation), reducer)
}

fn into_folder(self) -> Self::Folder {
MapFolder {
base: self.base.into_folder(),
operation: self.operation,
}
}

fn is_full(&self) -> bool {
self.base.is_full()
}
}

impl<I, T, C, O> IndexedConsumer<I> for MapConsumer<C, O>
where
C: IndexedConsumer<O::Output>,
O: Fn(I) -> T + Send + Sync + Copy,
T: Send,
{
fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
let (left, right, reducer) = self.base.split_at(index);

(
MapConsumer::new(left, self.operation),
MapConsumer::new(right, self.operation),
reducer,
)
}
}

/* MapFolder */

struct MapFolder<F, O> {
base: F,
operation: O,
}

impl<I, T, F, O> Folder<I> for MapFolder<F, O>
where
F: Folder<O::Output>,
O: Fn(I) -> T + Copy,
{
type Result = F::Result;

fn consume(self, item: I) -> Self {
let mapped_item = (self.operation)(item);
MapFolder {
base: self.base.consume(mapped_item),
operation: self.operation,
}
}

fn consume_iter<X>(mut self, iter: X) -> Self
where
X: IntoIterator<Item = I>,
{
self.base = self.base.consume_iter(iter.into_iter().map(self.operation));

self
}

fn complete(self) -> F::Result {
self.base.complete()
}

fn is_full(&self) -> bool {
self.base.is_full()
}
}

+ 1
- 0
asparit/src/inner/mod.rs 查看文件

@@ -1,2 +1,3 @@
pub mod for_each;
pub mod map;
pub mod noop;

+ 3
- 3
asparit/src/lib.rs 查看文件

@@ -4,8 +4,8 @@ mod inner;
mod std;

pub use self::core::{
Consumer, Executor, Folder, IndexedConsumer, IndexedExecutor, IndexedParallelIterator,
IndexedProducer, IntoParallelIterator, IntoParallelRefIterator, IntoParallelRefMutIterator,
ParallelIterator, Producer, ProducerCallback, Reducer,
Consumer, Executor, Folder, IndexedConsumer, IndexedParallelIterator, IndexedProducer,
IndexedProducerCallback, IntoParallelIterator, IntoParallelRefIterator, ExecutorCallback,
IntoParallelRefMutIterator, ParallelIterator, Producer, ProducerCallback, Reducer,
};
pub use self::executor::{DefaultExecutor, SequentialExecutor};

+ 12
- 7
asparit/src/std/range.rs 查看文件

@@ -1,8 +1,6 @@
use std::ops::Range;

use crate::{
Consumer, Executor, Folder, IntoParallelIterator, ParallelIterator, Producer, ProducerCallback,
};
use crate::{Consumer, Folder, IntoParallelIterator, ParallelIterator, Producer, ProducerCallback, Executor, Reducer, ExecutorCallback};

pub struct Iter {
range: Range<usize>,
@@ -24,12 +22,14 @@ impl IntoParallelIterator for Range<usize> {
impl ParallelIterator for Iter {
type Item = usize;

fn drive<E, C>(self, executor: E, consumer: C) -> E::Result
fn drive<E, C, D, R>(self, executor: E, consumer: C) -> E::Result
where
E: Executor<Self, C>,
C: Consumer<Self::Item>,
E: Executor<D>,
C: Consumer<Self::Item, Result = D, Reducer = R>,
D: Send,
R: Reducer<D>
{
executor.exec(self, consumer)
self.with_producer(ExecutorCallback::new(executor, consumer))
}

fn len_hint_opt(&self) -> Option<usize> {
@@ -46,6 +46,11 @@ impl ParallelIterator for Iter {

impl Producer for IterProducer {
type Item = usize;
type IntoIter = Range<usize>;

fn into_iter(self) -> Self::IntoIter {
self.range
}

fn split(mut self) -> (Self, Option<Self>) {
let index = self.range.len() / 2;


Loading…
取消
儲存