Ver a proveniência

Implemented 'for_each_init' operation

master
Bergmann89 há 5 anos
ascendente
cometimento
a714d58825
17 ficheiros alterados com 170 adições e 74 eliminações
  1. +8
    -5
      asparit/src/core/driver.rs
  2. +7
    -8
      asparit/src/core/executor.rs
  3. +2
    -2
      asparit/src/core/from_iter.rs
  4. +2
    -1
      asparit/src/core/into_iter.rs
  5. +41
    -2
      asparit/src/core/iterator.rs
  6. +2
    -2
      asparit/src/core/mod.rs
  7. +1
    -1
      asparit/src/executor/mod.rs
  8. +3
    -3
      asparit/src/executor/sequential.rs
  9. +22
    -12
      asparit/src/executor/tokio.rs
  10. +6
    -2
      asparit/src/inner/collect.rs
  11. +18
    -10
      asparit/src/inner/for_each.rs
  12. +6
    -4
      asparit/src/inner/map.rs
  13. +18
    -7
      asparit/src/inner/map_init.rs
  14. +25
    -9
      asparit/src/inner/map_with.rs
  15. +3
    -3
      asparit/src/inner/mod.rs
  16. +2
    -2
      asparit/src/lib.rs
  17. +4
    -1
      asparit/src/std/range.rs

+ 8
- 5
asparit/src/core/driver.rs Ver ficheiro

@@ -1,11 +1,14 @@
use crate::{Executor, DefaultExecutor};
use crate::{DefaultExecutor, Executor};

pub trait Driver<'a, D>: Sized
where D: Send,
where
D: Send,
{
fn exec_with<E>(self, executor: E) -> E::Result
where E: Executor<'a, D>;
where
E: Executor<'a, D>;

fn exec(self) -> <DefaultExecutor as Executor<'a, D>>::Result
{ self.exec_with(DefaultExecutor::default()) }
fn exec(self) -> <DefaultExecutor as Executor<'a, D>>::Result {
self.exec_with(DefaultExecutor::default())
}
}

+ 7
- 8
asparit/src/core/executor.rs Ver ficheiro

@@ -1,9 +1,11 @@
use super::{
Consumer, IndexedConsumer, Producer, IndexedProducer, Reducer, ProducerCallback, IndexedProducerCallback,
Consumer, IndexedConsumer, IndexedProducer, IndexedProducerCallback, Producer,
ProducerCallback, Reducer,
};

pub trait Executor<'a, D>
where D: Send,
where
D: Send,
{
type Result: Send;

@@ -27,10 +29,7 @@ pub struct ExecutorCallback<E, C> {

impl<E, C> ExecutorCallback<E, C> {
pub fn new(executor: E, consumer: C) -> Self {
Self {
executor,
consumer,
}
Self { executor, consumer }
}
}

@@ -45,7 +44,7 @@ where

fn callback<P>(self, producer: P) -> Self::Output
where
P: Producer<Item = I> + 'a
P: Producer<Item = I> + 'a,
{
self.executor.exec(producer, self.consumer)
}
@@ -62,7 +61,7 @@ where

fn callback<P>(self, producer: P) -> Self::Output
where
P: IndexedProducer<Item = I> + 'a
P: IndexedProducer<Item = I> + 'a,
{
self.executor.exec_indexed(producer, self.consumer)
}


+ 2
- 2
asparit/src/core/from_iter.rs Ver ficheiro

@@ -1,4 +1,4 @@
use super::{IntoParallelIterator, Executor};
use super::{Executor, IntoParallelIterator};

/// `FromParallelIterator` implements the creation of a collection
/// from a [`ParallelIterator`]. By implementing
@@ -67,7 +67,7 @@ impl FromParallelIterator<()> for () {
E: Executor<'a, Self>,
X: IntoParallelIterator<'a, Item = ()>,
{
use crate::{ParallelIterator, inner::noop::NoOpConsumer};
use crate::{inner::noop::NoOpConsumer, ParallelIterator};

iterator.into_par_iter().drive(executor, NoOpConsumer)
}


+ 2
- 1
asparit/src/core/into_iter.rs Ver ficheiro

@@ -115,7 +115,8 @@ pub trait IntoParallelRefMutIterator<'a> {
}

impl<'a, T> IntoParallelIterator<'a> for T
where T: ParallelIterator<'a>
where
T: ParallelIterator<'a>,
{
type Iter = T;
type Item = T::Item;


+ 41
- 2
asparit/src/core/iterator.rs Ver ficheiro

@@ -1,6 +1,11 @@
use super::{Consumer, IndexedConsumer, IndexedProducerCallback, ProducerCallback, Executor, Reducer, FromParallelIterator};
use super::{
Consumer, Executor, FromParallelIterator, IndexedConsumer, IndexedProducerCallback,
ProducerCallback, Reducer,
};

use crate::inner::{for_each::ForEach, map::Map, map_with::MapWith, map_init::MapInit, collect::Collect};
use crate::inner::{
collect::Collect, for_each::ForEach, map::Map, map_init::MapInit, map_with::MapWith,
};

/// Parallel version of the standard iterator trait.
///
@@ -127,6 +132,40 @@ pub trait ParallelIterator<'a>: Sized + Send {
self.map_with(init, operation).collect()
}

/// Executes `operation` on a value returned by `init` with each item produced by
/// the iterator, in parallel.
///
/// The `init` function will be called only as needed for a value to be
/// paired with the group of items in each rayon job. There is no
/// constraint on that returned type at all!
///
/// # Examples
///
/// ```
/// use rand::Rng;
/// use rayon::prelude::*;
///
/// let mut v = vec![0u8; 1_000_000];
///
/// v.par_chunks_mut(1000)
/// .for_each_init(
/// || rand::thread_rng(),
/// |rng, chunk| rng.fill(chunk),
/// );
///
/// // There's a remote chance that this will fail...
/// for i in 0u8..=255 {
/// assert!(v.contains(&i));
/// }
/// ```
fn for_each_init<O, S, T>(self, init: S, operation: O) -> Collect<MapInit<Self, S, O>, ()>
where
O: Fn(&mut T, Self::Item) + Clone + Sync + Send + 'a,
S: Fn() -> T + Clone + Sync + Send + 'a,
{
self.map_init(init, operation).collect()
}

/// Applies `operation` to each item of this iterator, producing a new
/// iterator with the results.
///


+ 2
- 2
asparit/src/core/mod.rs Ver ficheiro

@@ -2,17 +2,17 @@ mod consumer;
mod driver;
mod executor;
mod folder;
mod from_iter;
mod into_iter;
mod iterator;
mod producer;
mod reducer;
mod from_iter;

pub use from_iter::FromParallelIterator;
pub use consumer::{Consumer, IndexedConsumer};
pub use driver::Driver;
pub use executor::{Executor, ExecutorCallback};
pub use folder::Folder;
pub use from_iter::FromParallelIterator;
pub use into_iter::{IntoParallelIterator, IntoParallelRefIterator, IntoParallelRefMutIterator};
pub use iterator::{IndexedParallelIterator, ParallelIterator};
pub use producer::{IndexedProducer, IndexedProducerCallback, Producer, ProducerCallback};


+ 1
- 1
asparit/src/executor/mod.rs Ver ficheiro

@@ -2,9 +2,9 @@ mod sequential;
#[cfg(feature = "tokio-executor")]
mod tokio;

pub use sequential::Sequential as SequentialExecutor;
#[cfg(feature = "tokio-executor")]
pub use self::tokio::Tokio as TokioExecutor;
pub use sequential::Sequential as SequentialExecutor;

#[cfg(feature = "tokio-executor")]
pub type DefaultExecutor = TokioExecutor;


+ 3
- 3
asparit/src/executor/sequential.rs Ver ficheiro

@@ -1,13 +1,13 @@
use crate::core::{
Consumer, Executor, Folder, IndexedConsumer, IndexedProducer,
Producer, Reducer,
Consumer, Executor, Folder, IndexedConsumer, IndexedProducer, Producer, Reducer,
};

#[derive(Default)]
pub struct Sequential;

impl<'a, D> Executor<'a, D> for Sequential
where D: Send,
where
D: Send,
{
type Result = D;



+ 22
- 12
asparit/src/executor/tokio.rs Ver ficheiro

@@ -1,11 +1,14 @@
use std::mem::transmute;
use std::cmp;
use std::mem::transmute;

use futures::{future::{Future, BoxFuture, FutureExt}, join};
use tokio::task::{spawn};
use futures::{
future::{BoxFuture, Future, FutureExt},
join,
};
use tokio::task::spawn;

use crate::core::{
Consumer, Executor, Folder, IndexedConsumer, IndexedProducer, Producer, Reducer,
Consumer, Executor, Folder, IndexedConsumer, IndexedProducer, Producer, Reducer,
};

pub struct Tokio {
@@ -21,13 +24,14 @@ impl Tokio {
impl Default for Tokio {
fn default() -> Self {
Self {
splits: 2 * num_cpus::get()
splits: 2 * num_cpus::get(),
}
}
}

impl<'a, D> Executor<'a, D> for Tokio
where D: Send,
where
D: Send,
{
type Result = BoxFuture<'a, D>;

@@ -50,13 +54,17 @@ where D: Send,
R: Reducer<D> + Send,
{
let splits = producer.splits().unwrap_or(self.splits);
let splitter = IndexedSplitter::new(splits, producer.len(), producer.min_len(), producer.max_len());
let splitter = IndexedSplitter::new(
splits,
producer.len(),
producer.min_len(),
producer.max_len(),
);

exec_indexed(splitter, producer, consumer)
}
}


fn exec<'a, P, C>(mut splitter: Splitter, producer: P, consumer: C) -> BoxFuture<'a, C::Result>
where
P: Producer + 'a,
@@ -88,7 +96,11 @@ where
.boxed()
}

fn exec_indexed<'a, P, C>(mut splitter: IndexedSplitter, producer: P, consumer: C) -> BoxFuture<'a, C::Result>
fn exec_indexed<'a, P, C>(
mut splitter: IndexedSplitter,
producer: P,
consumer: C,
) -> BoxFuture<'a, C::Result>
where
P: IndexedProducer + 'a,
C: IndexedConsumer<P::Item> + 'a,
@@ -152,9 +164,7 @@ struct Splitter {
impl Splitter {
#[inline]
fn new(splits: usize) -> Self {
Self {
splits,
}
Self { splits }
}

#[inline]


+ 6
- 2
asparit/src/inner/collect.rs Ver ficheiro

@@ -1,6 +1,9 @@
use std::marker::PhantomData;

use crate::{core::{FromParallelIterator, Driver}, ParallelIterator, Executor};
use crate::{
core::{Driver, FromParallelIterator},
Executor, ParallelIterator,
};

pub struct Collect<X, T> {
iterator: X,
@@ -22,7 +25,8 @@ where
T: FromParallelIterator<X::Item> + Send,
{
fn exec_with<E>(self, executor: E) -> E::Result
where E: Executor<'a, T>
where
E: Executor<'a, T>,
{
T::from_par_iter(executor, self.iterator)
}


+ 18
- 10
asparit/src/inner/for_each.rs Ver ficheiro

@@ -1,4 +1,4 @@
use crate::{core::Driver, Consumer, Folder, ParallelIterator, Executor};
use crate::{core::Driver, Consumer, Executor, Folder, ParallelIterator};

use super::noop::NoOpReducer;

@@ -7,8 +7,7 @@ pub struct ForEach<X, O> {
operation: O,
}

impl<X, O> ForEach<X, O>
{
impl<X, O> ForEach<X, O> {
pub fn new(iterator: X, operation: O) -> Self {
Self {
iterator,
@@ -23,7 +22,8 @@ where
O: Fn(X::Item) + Clone + Send + 'a,
{
fn exec_with<E>(self, executor: E) -> E::Result
where E: Executor<'a, ()>
where
E: Executor<'a, ()>,
{
let iterator = self.iterator;
let operation = self.operation;
@@ -91,18 +91,26 @@ mod tests {

#[tokio::test]
async fn test_for_each() {
use ::std::sync::Arc;
use ::std::sync::atomic::{AtomicUsize, Ordering};
use ::std::sync::Arc;

let i = Arc::new(AtomicUsize::new(0));
let j = Arc::new(AtomicUsize::new(0));

let x = (0..10usize)
.into_par_iter()
.map_init(move || { i.fetch_add(1, Ordering::Relaxed) }, |init, item| Some((*init, item)))
.for_each_with(5usize, |x, j| {
println!("{:?} {:?}", x, j);
})
.exec().await;
.map_init(
move || i.fetch_add(1, Ordering::Relaxed),
|init, item| Some((*init, item)),
)
.for_each_init(
move || j.fetch_add(1, Ordering::Relaxed),
|init, item| {
println!("{:?} {:?}", init, item);
},
)
.exec()
.await;

dbg!(x);
}


+ 6
- 4
asparit/src/inner/map.rs Ver ficheiro

@@ -1,6 +1,6 @@
use crate::{
Consumer, Folder, IndexedConsumer, IndexedParallelIterator, IndexedProducer, Reducer,
IndexedProducerCallback, ParallelIterator, Producer, ProducerCallback, Executor,
Consumer, Executor, Folder, IndexedConsumer, IndexedParallelIterator, IndexedProducer,
IndexedProducerCallback, ParallelIterator, Producer, ProducerCallback, Reducer,
};

/* Map */
@@ -62,7 +62,7 @@ where
E: Executor<'a, D>,
C: IndexedConsumer<Self::Item, Result = D, Reducer = R>,
D: Send,
R: Reducer<D>
R: Reducer<D>,
{
let consumer = MapConsumer::new(consumer, self.operation);

@@ -323,7 +323,9 @@ where
where
X: IntoIterator<Item = I>,
{
self.base = self.base.consume_iter(iter.into_iter().map(self.operation.clone()));
self.base = self
.base
.consume_iter(iter.into_iter().map(self.operation.clone()));

self
}


+ 18
- 7
asparit/src/inner/map_init.rs Ver ficheiro

@@ -1,9 +1,9 @@
use crate::{
Consumer, Folder, IndexedConsumer, IndexedParallelIterator, IndexedProducer, Reducer,
IndexedProducerCallback, ParallelIterator, Producer, ProducerCallback, Executor,
Consumer, Executor, Folder, IndexedConsumer, IndexedParallelIterator, IndexedProducer,
IndexedProducerCallback, ParallelIterator, Producer, ProducerCallback, Reducer,
};

use super::map_with::{MapWithIter, MapWithFolder};
use super::map_with::{MapWithFolder, MapWithIter};

/* MapInit */

@@ -15,7 +15,11 @@ pub struct MapInit<X, S, O> {

impl<X, S, O> MapInit<X, S, O> {
pub fn new(base: X, init: S, operation: O) -> Self {
Self { base, init, operation }
Self {
base,
init,
operation,
}
}
}

@@ -68,7 +72,7 @@ where
E: Executor<'a, D>,
C: IndexedConsumer<Self::Item, Result = D, Reducer = R>,
D: Send,
R: Reducer<D>
R: Reducer<D>,
{
let consumer = MapInitConsumer::new(consumer, self.init, self.operation);

@@ -279,7 +283,11 @@ struct MapInitConsumer<C, S, O> {

impl<C, S, O> MapInitConsumer<C, S, O> {
fn new(base: C, init: S, operation: O) -> Self {
Self { base, init, operation }
Self {
base,
init,
operation,
}
}
}

@@ -297,7 +305,10 @@ where
fn split_off_left(&self) -> (Self, Self::Reducer) {
let (left, reducer) = self.base.split_off_left();

(MapInitConsumer::new(left, self.init.clone(), self.operation.clone()), reducer)
(
MapInitConsumer::new(left, self.init.clone(), self.operation.clone()),
reducer,
)
}

fn into_folder(self) -> Self::Folder {


+ 25
- 9
asparit/src/inner/map_with.rs Ver ficheiro

@@ -1,6 +1,6 @@
use crate::{
Consumer, Folder, IndexedConsumer, IndexedParallelIterator, IndexedProducer, Reducer,
IndexedProducerCallback, ParallelIterator, Producer, ProducerCallback, Executor,
Consumer, Executor, Folder, IndexedConsumer, IndexedParallelIterator, IndexedProducer,
IndexedProducerCallback, ParallelIterator, Producer, ProducerCallback, Reducer,
};

/* MapWith */
@@ -13,7 +13,11 @@ pub struct MapWith<X, S, O> {

impl<X, S, O> MapWith<X, S, O> {
pub fn new(base: X, item: S, operation: O) -> Self {
Self { base, item, operation }
Self {
base,
item,
operation,
}
}
}

@@ -66,7 +70,7 @@ where
E: Executor<'a, D>,
C: IndexedConsumer<Self::Item, Result = D, Reducer = R>,
D: Send,
R: Reducer<D>
R: Reducer<D>,
{
let consumer = MapWithConsumer::new(consumer, self.item, self.operation);

@@ -309,7 +313,8 @@ impl<I, S, O, T> ExactSizeIterator for MapWithIter<I, S, O>
where
I: ExactSizeIterator,
O: Fn(&mut S, I::Item) -> T,
{ }
{
}

/* MapWithConsumer */

@@ -321,7 +326,11 @@ struct MapWithConsumer<C, S, O> {

impl<C, S, O> MapWithConsumer<C, S, O> {
fn new(base: C, item: S, operation: O) -> Self {
Self { base, item, operation }
Self {
base,
item,
operation,
}
}
}

@@ -339,7 +348,10 @@ where
fn split_off_left(&self) -> (Self, Self::Reducer) {
let (left, reducer) = self.base.split_off_left();

(MapWithConsumer::new(left, self.item.clone(), self.operation.clone()), reducer)
(
MapWithConsumer::new(left, self.item.clone(), self.operation.clone()),
reducer,
)
}

fn into_folder(self) -> Self::Folder {
@@ -400,12 +412,16 @@ where
where
X: IntoIterator<Item = I>,
{
fn with<'f, I, S, T>(item: &'f mut S, operation: impl Fn(&mut S, I) -> T + 'f,
fn with<'f, I, S, T>(
item: &'f mut S,
operation: impl Fn(&mut S, I) -> T + 'f,
) -> impl FnMut(I) -> T + 'f {
move |x| operation(item, x)
}

let mapped_iter = iter.into_iter().map(with(&mut self.item, self.operation.clone()));
let mapped_iter = iter
.into_iter()
.map(with(&mut self.item, self.operation.clone()));

self.base = self.base.consume_iter(mapped_iter);



+ 3
- 3
asparit/src/inner/mod.rs Ver ficheiro

@@ -1,6 +1,6 @@
pub mod collect;
pub mod for_each;
pub mod map;
pub mod noop;
pub mod map_with;
pub mod map_init;
pub mod collect;
pub mod map_with;
pub mod noop;

+ 2
- 2
asparit/src/lib.rs Ver ficheiro

@@ -4,8 +4,8 @@ mod inner;
mod std;

pub use self::core::{
Consumer, Executor, Folder, IndexedConsumer, IndexedParallelIterator, IndexedProducer,
IndexedProducerCallback, IntoParallelIterator, IntoParallelRefIterator, ExecutorCallback,
Consumer, Executor, ExecutorCallback, Folder, IndexedConsumer, IndexedParallelIterator,
IndexedProducer, IndexedProducerCallback, IntoParallelIterator, IntoParallelRefIterator,
IntoParallelRefMutIterator, ParallelIterator, Producer, ProducerCallback, Reducer,
};
pub use self::executor::{DefaultExecutor, SequentialExecutor};

+ 4
- 1
asparit/src/std/range.rs Ver ficheiro

@@ -1,6 +1,9 @@
use std::ops::Range;

use crate::{Consumer, Folder, IntoParallelIterator, ParallelIterator, Producer, ProducerCallback, Executor, Reducer, ExecutorCallback};
use crate::{
Consumer, Executor, ExecutorCallback, Folder, IntoParallelIterator, ParallelIterator, Producer,
ProducerCallback, Reducer,
};

pub struct Iter {
range: Range<usize>,


Carregando…
Cancelar
Guardar