Browse Source

Implemented tokio executor

master
Bergmann89 3 years ago
parent
commit
a661d208b5
12 changed files with 340 additions and 97 deletions
  1. +7
    -0
      asparit/Cargo.toml
  2. +3
    -3
      asparit/src/core/driver.rs
  3. +17
    -17
      asparit/src/core/executor.rs
  4. +27
    -23
      asparit/src/core/into_iter.rs
  5. +8
    -8
      asparit/src/core/iterator.rs
  6. +23
    -8
      asparit/src/core/producer.rs
  7. +8
    -0
      asparit/src/executor/mod.rs
  8. +3
    -3
      asparit/src/executor/sequential.rs
  9. +201
    -0
      asparit/src/executor/tokio.rs
  10. +7
    -7
      asparit/src/inner/for_each.rs
  11. +30
    -22
      asparit/src/inner/map.rs
  12. +6
    -6
      asparit/src/std/range.rs

+ 7
- 0
asparit/Cargo.toml View File

@@ -4,4 +4,11 @@ version = "0.1.0"
authors = ["Bergmann89 <info@bergmann89.de>"]
edition = "2018"

[features]
default = [ "tokio-executor" ]
tokio-executor = [ "futures", "num_cpus", "tokio" ]

[dependencies]
futures = { version = "0.3", optional = true }
num_cpus = { version = "1.13", optional = true }
tokio = { version = "0.3", features = [ "macros", "rt-multi-thread" ], optional = true }

+ 3
- 3
asparit/src/core/driver.rs View File

@@ -1,11 +1,11 @@
use crate::{Executor, DefaultExecutor};

pub trait Driver<D>: Sized
pub trait Driver<'a, D>: Sized
where D: Send,
{
fn exec_with<E>(self, executor: E) -> E::Result
where E: Executor<D>;
where E: Executor<'a, D>;

fn exec(self) -> <DefaultExecutor as Executor<D>>::Result
fn exec(self) -> <DefaultExecutor as Executor<'a, D>>::Result
{ self.exec_with(DefaultExecutor::default()) }
}

+ 17
- 17
asparit/src/core/executor.rs View File

@@ -2,22 +2,22 @@ use super::{
Consumer, IndexedConsumer, Producer, IndexedProducer, Reducer, ProducerCallback, IndexedProducerCallback,
};

pub trait Executor<D>
pub trait Executor<'a, D>
where D: Send,
{
type Result: Send;

fn exec<P, C, R>(self, producer: P, consumer: C) -> Self::Result
where
P: Producer,
C: Consumer<P::Item, Result = D, Reducer = R>,
R: Reducer<D>;
P: Producer + 'a,
C: Consumer<P::Item, Result = D, Reducer = R> + 'a,
R: Reducer<D> + Send;

fn exec_indexed<P, C, R>(self, producer: P, consumer: C) -> Self::Result
where
P: IndexedProducer,
C: IndexedConsumer<P::Item, Result = D, Reducer = R>,
R: Reducer<D>;
P: IndexedProducer + 'a,
C: IndexedConsumer<P::Item, Result = D, Reducer = R> + 'a,
R: Reducer<D> + Send;
}

pub struct ExecutorCallback<E, C> {
@@ -34,35 +34,35 @@ impl<E, C> ExecutorCallback<E, C> {
}
}

impl<E, D, C, I, R> ProducerCallback<I> for ExecutorCallback<E, C>
impl<'a, E, D, C, I, R> ProducerCallback<'a, I> for ExecutorCallback<E, C>
where
E: Executor<D>,
E: Executor<'a, D>,
D: Send,
C: Consumer<I, Result = D, Reducer = R>,
R: Reducer<D>,
C: Consumer<I, Result = D, Reducer = R> + 'a,
R: Reducer<D> + Send,
{
type Output = E::Result;

fn callback<P>(self, producer: P) -> Self::Output
where
P: Producer<Item = I>
P: Producer<Item = I> + 'a
{
self.executor.exec(producer, self.consumer)
}
}

impl<E, D, C, I, R> IndexedProducerCallback<I> for ExecutorCallback<E, C>
impl<'a, E, D, C, I, R> IndexedProducerCallback<'a, I> for ExecutorCallback<E, C>
where
E: Executor<D>,
E: Executor<'a, D>,
D: Send,
C: IndexedConsumer<I, Result = D, Reducer = R>,
R: Reducer<D>,
C: IndexedConsumer<I, Result = D, Reducer = R> + 'a,
R: Reducer<D> + Send,
{
type Output = E::Result;

fn callback<P>(self, producer: P) -> Self::Output
where
P: IndexedProducer<Item = I>
P: IndexedProducer<Item = I> + 'a
{
self.executor.exec_indexed(producer, self.consumer)
}


+ 27
- 23
asparit/src/core/into_iter.rs View File

@@ -8,9 +8,9 @@ use super::ParallelIterator;
///
/// [`ParallelIterator`]: trait.ParallelIterator.html
/// [`std::iter::IntoIterator`]: https://doc.rust-lang.org/std/iter/trait.IntoIterator.html
pub trait IntoParallelIterator {
pub trait IntoParallelIterator<'a> {
/// The parallel iterator type that will be created.
type Iter: ParallelIterator<Item = Self::Item>;
type Iter: ParallelIterator<'a, Item = Self::Item>;

/// The type of item that the parallel iterator will produce.
type Item: Send;
@@ -53,13 +53,13 @@ pub trait IntoParallelIterator {
///
/// [`ParallelIterator`]: trait.ParallelIterator.html
/// [`IntoParallelIterator`]: trait.IntoParallelIterator.html
pub trait IntoParallelRefIterator<'data> {
pub trait IntoParallelRefIterator<'a> {
/// The type of the parallel iterator that will be returned.
type Iter: ParallelIterator<Item = Self::Item>;
type Iter: ParallelIterator<'a, Item = Self::Item>;

/// The type of item that the parallel iterator will produce.
/// This will typically be an `&'data T` reference type.
type Item: Send + 'data;
/// This will typically be an `&'a T` reference type.
type Item: Send + 'a;

/// Converts `self` into a parallel iterator.
///
@@ -76,7 +76,7 @@ pub trait IntoParallelRefIterator<'data> {
/// assert!(v.par_iter().zip(&v)
/// .all(|(a, b)| std::ptr::eq(a, b)));
/// ```
fn par_iter(&'data self) -> Self::Iter;
fn par_iter(&'a self) -> Self::Iter;
}

/// `IntoParallelRefMutIterator` implements the conversion to a
@@ -92,13 +92,13 @@ pub trait IntoParallelRefIterator<'data> {
///
/// [`ParallelIterator`]: trait.ParallelIterator.html
/// [`IntoParallelIterator`]: trait.IntoParallelIterator.html
pub trait IntoParallelRefMutIterator<'data> {
pub trait IntoParallelRefMutIterator<'a> {
/// The type of iterator that will be created.
type Iter: ParallelIterator<Item = Self::Item>;
type Iter: ParallelIterator<'a, Item = Self::Item>;

/// The type of item that will be produced; this is typically an
/// `&'data mut T` reference.
type Item: Send + 'data;
/// `&'a mut T` reference.
type Item: Send + 'a;

/// Creates the parallel iterator from `self`.
///
@@ -111,10 +111,12 @@ pub trait IntoParallelRefMutIterator<'data> {
/// v.par_iter_mut().enumerate().for_each(|(i, x)| *x = i);
/// assert_eq!(v, [0, 1, 2, 3, 4]);
/// ```
fn par_iter_mut(&'data mut self) -> Self::Iter;
fn par_iter_mut(&'a mut self) -> Self::Iter;
}

impl<T: ParallelIterator> IntoParallelIterator for T {
impl<'a, T> IntoParallelIterator<'a> for T
where T: ParallelIterator<'a>
{
type Iter = T;
type Item = T::Item;

@@ -123,26 +125,28 @@ impl<T: ParallelIterator> IntoParallelIterator for T {
}
}

impl<'data, I: 'data + ?Sized> IntoParallelRefIterator<'data> for I
impl<'a, I> IntoParallelRefIterator<'a> for I
where
&'data I: IntoParallelIterator,
I: 'a + ?Sized,
&'a I: IntoParallelIterator<'a>,
{
type Iter = <&'data I as IntoParallelIterator>::Iter;
type Item = <&'data I as IntoParallelIterator>::Item;
type Iter = <&'a I as IntoParallelIterator<'a>>::Iter;
type Item = <&'a I as IntoParallelIterator<'a>>::Item;

fn par_iter(&'data self) -> Self::Iter {
fn par_iter(&'a self) -> Self::Iter {
self.into_par_iter()
}
}

impl<'data, I: 'data + ?Sized> IntoParallelRefMutIterator<'data> for I
impl<'a, I> IntoParallelRefMutIterator<'a> for I
where
&'data mut I: IntoParallelIterator,
I: 'a + ?Sized,
&'a mut I: IntoParallelIterator<'a>,
{
type Iter = <&'data mut I as IntoParallelIterator>::Iter;
type Item = <&'data mut I as IntoParallelIterator>::Item;
type Iter = <&'a mut I as IntoParallelIterator<'a>>::Iter;
type Item = <&'a mut I as IntoParallelIterator<'a>>::Item;

fn par_iter_mut(&'data mut self) -> Self::Iter {
fn par_iter_mut(&'a mut self) -> Self::Iter {
self.into_par_iter()
}
}

+ 8
- 8
asparit/src/core/iterator.rs View File

@@ -16,7 +16,7 @@ use crate::inner::{for_each::ForEach, map::Map};
///
/// [iter]: index.html
/// [`IndexedParallelIterator`]: trait.IndexedParallelIterator.html
pub trait ParallelIterator: Sized + Send {
pub trait ParallelIterator<'a>: Sized + Send {
/// The type of item that this parallel iterator produces.
/// For example, if you use the [`for_each`] method, this is the type of
/// item that your closure will be invoked with.
@@ -38,10 +38,10 @@ pub trait ParallelIterator: Sized + Send {
/// [README]: README.md
fn drive<E, C, D, R>(self, executor: E, consumer: C) -> E::Result
where
E: Executor<D>,
C: Consumer<Self::Item, Result = D, Reducer = R>,
E: Executor<'a, D>,
C: Consumer<Self::Item, Result = D, Reducer = R> + 'a,
D: Send,
R: Reducer<D>;
R: Reducer<D> + Send;

/// Internal method used to define the behavior of this parallel
/// iterator. You should not need to call this directly.
@@ -60,7 +60,7 @@ pub trait ParallelIterator: Sized + Send {
/// [README]: README.md
fn with_producer<CB>(self, callback: CB) -> CB::Output
where
CB: ProducerCallback<Self::Item>;
CB: ProducerCallback<'a, Self::Item>;

/// Internal method used to define the behavior of this parallel
/// iterator. You should not need to call this directly.
@@ -124,7 +124,7 @@ pub trait ParallelIterator: Sized + Send {
/// those points.
///
/// **Note:** Not implemented for `u64`, `i64`, `u128`, or `i128` ranges
pub trait IndexedParallelIterator: ParallelIterator {
pub trait IndexedParallelIterator<'a>: ParallelIterator<'a> {
/// Internal method used to define the behavior of this parallel
/// iterator. You should not need to call this directly.
///
@@ -141,7 +141,7 @@ pub trait IndexedParallelIterator: ParallelIterator {
/// [README]: README.md
fn drive_indexed<E, C, D, R>(self, executor: E, consumer: C) -> E::Result
where
E: Executor<D>,
E: Executor<'a, D>,
C: IndexedConsumer<Self::Item, Result = D, Reducer = R>,
D: Send,
R: Reducer<D>;
@@ -163,7 +163,7 @@ pub trait IndexedParallelIterator: ParallelIterator {
/// [README]: README.md
fn with_producer_indexed<CB>(self, callback: CB) -> CB::Output
where
CB: IndexedProducerCallback<Self::Item>;
CB: IndexedProducerCallback<'a, Self::Item>;

/// Produces an exact count of how many items this iterator will
/// produce, presuming no panic occurs.


+ 23
- 8
asparit/src/core/producer.rs View File

@@ -19,6 +19,11 @@ pub trait Producer: Send + Sized {
/// are possible.
fn into_iter(self) -> Self::IntoIter;

/// Number of splits/threads this iterator will use to proceed.
fn splits(&self) -> Option<usize> {
None
}

/// Split midway into a new producer if possible, otherwise return `None`.
fn split(self) -> (Self, Option<Self>);

@@ -58,6 +63,7 @@ pub trait Producer: Send + Sized {
///
/// [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md
/// [20671]: https://github.com/rust-lang/rust/issues/20671
#[allow(clippy::len_without_is_empty)]
pub trait IndexedProducer: Send + Sized {
/// The type of item that will be produced by this producer once
/// it is converted into an iterator.
@@ -70,6 +76,15 @@ pub trait IndexedProducer: Send + Sized {
/// are possible.
fn into_iter(self) -> Self::IntoIter;

/// Number of splits/threads this iterator will use to proceed.
fn splits(&self) -> Option<usize> {
None
}

/// Produces an exact count of how many items this producer will
/// emit, presuming no panic occurs.
fn len(&self) -> usize;

/// The minimum number of items that we will process
/// sequentially. Defaults to 1, which means that we will split
/// all the way down to a single item. This can be raised higher
@@ -80,8 +95,8 @@ pub trait IndexedProducer: Send + Sized {
/// needed.
///
/// [`with_min_len`]: ../trait.IndexedParallelIterator.html#method.with_min_len
fn min_len(&self) -> usize {
1
fn min_len(&self) -> Option<usize> {
None
}

/// The maximum number of items that we will process
@@ -93,8 +108,8 @@ pub trait IndexedProducer: Send + Sized {
/// overhead, so this should not be needed.
///
/// [`with_max_len`]: ../trait.IndexedParallelIterator.html#method.with_max_len
fn max_len(&self) -> usize {
usize::MAX
fn max_len(&self) -> Option<usize> {
None
}

/// Split into two producers; one produces items `0..index`, the
@@ -119,7 +134,7 @@ pub trait IndexedProducer: Send + Sized {
///
/// [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md#producer-callback
/// [FnOnce]: https://doc.rust-lang.org/std/ops/trait.FnOnce.html
pub trait ProducerCallback<I> {
pub trait ProducerCallback<'p, I> {
/// The type of value returned by this callback. Analogous to
/// [`Output` from the `FnOnce` trait][Output].
///
@@ -131,7 +146,7 @@ pub trait ProducerCallback<I> {
/// `P`, and hence implementors must be defined for any producer.
fn callback<P>(self, producer: P) -> Self::Output
where
P: Producer<Item = I>;
P: Producer<Item = I> + 'p;
}

/// The `IndexedProducerCallback` trait is a kind of generic closure,
@@ -140,7 +155,7 @@ pub trait ProducerCallback<I> {
///
/// [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md#producer-callback
/// [FnOnce]: https://doc.rust-lang.org/std/ops/trait.FnOnce.html
pub trait IndexedProducerCallback<I> {
pub trait IndexedProducerCallback<'p, I> {
/// The type of value returned by this callback. Analogous to
/// [`Output` from the `FnOnce` trait][Output].
///
@@ -152,5 +167,5 @@ pub trait IndexedProducerCallback<I> {
/// `P`, and hence implementors must be defined for any producer.
fn callback<P>(self, producer: P) -> Self::Output
where
P: IndexedProducer<Item = I>;
P: IndexedProducer<Item = I> + 'p;
}

+ 8
- 0
asparit/src/executor/mod.rs View File

@@ -1,5 +1,13 @@
mod sequential;
#[cfg(feature = "tokio-executor")]
mod tokio;

pub use sequential::Sequential as SequentialExecutor;
#[cfg(feature = "tokio-executor")]
pub use self::tokio::Tokio as TokioExecutor;

#[cfg(feature = "tokio-executor")]
pub type DefaultExecutor = TokioExecutor;

#[cfg(not(feature = "tokio-executor"))]
pub type DefaultExecutor = SequentialExecutor;

+ 3
- 3
asparit/src/executor/sequential.rs View File

@@ -6,15 +6,15 @@ use crate::core::{
#[derive(Default)]
pub struct Sequential;

impl<D> Executor<D> for Sequential
impl<'a, D> Executor<'a, D> for Sequential
where D: Send,
{
type Result = D;

fn exec<P, C, R>(self, producer: P, consumer: C) -> Self::Result
where
P: Producer,
C: Consumer<P::Item, Result = D, Reducer = R>,
P: Producer + 'a,
C: Consumer<P::Item, Result = D, Reducer = R> + 'a,
R: Reducer<D>,
{
if consumer.is_full() {


+ 201
- 0
asparit/src/executor/tokio.rs View File

@@ -0,0 +1,201 @@
use std::mem::transmute;
use std::cmp;

use futures::{future::{Future, BoxFuture, FutureExt}, join};
use tokio::task::{spawn};

use crate::core::{
Consumer, Executor, Folder, IndexedConsumer, IndexedProducer, Producer, Reducer,
};

pub struct Tokio {
splits: usize,
}

impl Tokio {
pub fn new(splits: usize) -> Self {
Self { splits }
}
}

impl Default for Tokio {
fn default() -> Self {
Self {
splits: 2 * num_cpus::get()
}
}
}

impl<'a, D> Executor<'a, D> for Tokio
where D: Send,
{
type Result = BoxFuture<'a, D>;

fn exec<P, C, R>(self, producer: P, consumer: C) -> Self::Result
where
P: Producer + 'a,
C: Consumer<P::Item, Result = D, Reducer = R> + 'a,
R: Reducer<D> + Send,
{
let splits = producer.splits().unwrap_or(self.splits);
let splitter = Splitter::new(splits);

exec(splitter, producer, consumer)
}

fn exec_indexed<P, C, R>(self, producer: P, consumer: C) -> Self::Result
where
P: IndexedProducer + 'a,
C: IndexedConsumer<P::Item, Result = D, Reducer = R> + 'a,
R: Reducer<D> + Send,
{
let splits = producer.splits().unwrap_or(self.splits);
let splitter = IndexedSplitter::new(splits, producer.len(), producer.min_len(), producer.max_len());

exec_indexed(splitter, producer, consumer)
}
}


fn exec<'a, P, C>(mut splitter: Splitter, producer: P, consumer: C) -> BoxFuture<'a, C::Result>
where
P: Producer + 'a,
C: Consumer<P::Item> + 'a,
C::Reducer: Send,
{
async move {
if consumer.is_full() {
consumer.into_folder().complete()
} else if splitter.try_split() {
match producer.split() {
(left_producer, Some(right_producer)) => {
let ((left_consumer, reducer), right_consumer) =
(consumer.split_off_left(), consumer);

let left = run_as_task(exec(splitter, left_producer, left_consumer));
let right = run_as_task(exec(splitter, right_producer, right_consumer));

let (left_result, right_result) = join!(left, right);

reducer.reduce(left_result, right_result)
}
(producer, None) => producer.fold_with(consumer.into_folder()).complete(),
}
} else {
producer.fold_with(consumer.into_folder()).complete()
}
}
.boxed()
}

fn exec_indexed<'a, P, C>(mut splitter: IndexedSplitter, producer: P, consumer: C) -> BoxFuture<'a, C::Result>
where
P: IndexedProducer + 'a,
C: IndexedConsumer<P::Item> + 'a,
C::Reducer: Send,
{
async move {
if consumer.is_full() {
consumer.into_folder().complete()
} else {
let len = producer.len();
if splitter.try_split(len) {
let mid = len / 2;

let (left_producer, right_producer) = producer.split_at(mid);
let (left_consumer, right_consumer, reducer) = consumer.split_at(mid);

let left = run_as_task(exec_indexed(splitter, left_producer, left_consumer));
let right = run_as_task(exec_indexed(splitter, right_producer, right_consumer));

let (left_result, right_result) = join!(left, right);

reducer.reduce(left_result, right_result)
} else {
producer.fold_with(consumer.into_folder()).complete()
}
}
}
.boxed()
}

async fn run_as_task<'a, T, F>(f: F) -> T
where
T: Send + 'a,
F: Future<Output = T> + Send + 'a,
{
struct Pointer<T>(*mut T);

unsafe impl<T> Send for Pointer<T> {}

let mut result = None;
let r = Pointer(&mut result as *mut _);

let task: BoxFuture<'a, ()> = async move {
unsafe {
*r.0 = Some(f.await);
}
}
.boxed();
let task: BoxFuture<'static, ()> = unsafe { transmute(task) };

spawn(task).await.expect("Error in tokio executor");

result.unwrap()
}

#[derive(Clone, Copy)]
struct Splitter {
splits: usize,
}

impl Splitter {
#[inline]
fn new(splits: usize) -> Self {
Self {
splits,
}
}

#[inline]
fn try_split(&mut self) -> bool {
if self.splits > 0 {
self.splits /= 2;

true
} else {
false
}
}
}

#[derive(Clone, Copy)]
struct IndexedSplitter {
inner: Splitter,
min: usize,
}

impl IndexedSplitter {
#[inline]
fn new(splits: usize, len: usize, min: Option<usize>, max: Option<usize>) -> Self {
let min = min.unwrap_or_default();
let mut ret = Self {
inner: Splitter::new(splits),
min: cmp::max(min, 1),
};

if let Some(max) = max {
let min_splits = len / cmp::max(max, 1);
if min_splits > ret.inner.splits {
ret.inner.splits = min_splits;
}
}

ret
}

#[inline]
fn try_split(&mut self, len: usize) -> bool {
len / 2 >= self.min && self.inner.try_split()
}
}

+ 7
- 7
asparit/src/inner/for_each.rs View File

@@ -17,13 +17,13 @@ impl<X, O> ForEach<X, O>
}
}

impl<X, O> Driver<()> for ForEach<X, O>
impl<'a, X, O> Driver<'a, ()> for ForEach<X, O>
where
X: ParallelIterator,
O: Fn(X::Item) + Clone + Send,
X: ParallelIterator<'a>,
O: Fn(X::Item) + Clone + Send + 'a,
{
fn exec_with<E>(self, executor: E) -> E::Result
where E: Executor<()>
where E: Executor<'a, ()>
{
let iterator = self.iterator;
let operation = self.operation;
@@ -89,15 +89,15 @@ mod tests {
use super::*;
use crate::*;

#[test]
fn test_for_each() {
#[tokio::test]
async fn test_for_each() {
let x = (0..10usize)
.into_par_iter()
.map(Some)
.for_each(|j| {
println!("{:?}", j);
})
.exec();
.exec().await;

dbg!(x);
}


+ 30
- 22
asparit/src/inner/map.rs View File

@@ -16,20 +16,20 @@ impl<X, O> Map<X, O> {
}
}

impl<X, O, T> ParallelIterator for Map<X, O>
impl<'a, X, O, T> ParallelIterator<'a> for Map<X, O>
where
X: ParallelIterator,
O: Fn(X::Item) -> T + Sync + Send + Copy,
X: ParallelIterator<'a>,
O: Fn(X::Item) -> T + Sync + Send + Copy + 'a,
T: Send,
{
type Item = O::Output;

fn drive<E, C, D, R>(self, executor: E, consumer: C) -> E::Result
where
E: Executor<D>,
C: Consumer<Self::Item, Result = D, Reducer = R>,
E: Executor<'a, D>,
C: Consumer<Self::Item, Result = D, Reducer = R> + 'a,
D: Send,
R: Reducer<D>
R: Reducer<D> + Send,
{
let consumer = MapConsumer::new(consumer, self.operation);

@@ -38,7 +38,7 @@ where

fn with_producer<CB>(self, callback: CB) -> CB::Output
where
CB: ProducerCallback<Self::Item>,
CB: ProducerCallback<'a, Self::Item>,
{
self.base.with_producer(MapCallback {
callback,
@@ -51,15 +51,15 @@ where
}
}

impl<X, O, T> IndexedParallelIterator for Map<X, O>
impl<'a, X, O, T> IndexedParallelIterator<'a> for Map<X, O>
where
X: IndexedParallelIterator,
O: Fn(X::Item) -> T + Sync + Send + Copy,
X: IndexedParallelIterator<'a>,
O: Fn(X::Item) -> T + Sync + Send + Copy + 'a,
T: Send,
{
fn drive_indexed<E, C, D, R>(self, executor: E, consumer: C) -> E::Result
where
E: Executor<D>,
E: Executor<'a, D>,
C: IndexedConsumer<Self::Item, Result = D, Reducer = R>,
D: Send,
R: Reducer<D>
@@ -71,7 +71,7 @@ where

fn with_producer_indexed<CB>(self, callback: CB) -> CB::Output
where
CB: IndexedProducerCallback<Self::Item>,
CB: IndexedProducerCallback<'a, Self::Item>,
{
self.base.with_producer_indexed(MapCallback {
callback,
@@ -91,17 +91,17 @@ struct MapCallback<CB, O> {
operation: O,
}

impl<I, O, T, CB> ProducerCallback<I> for MapCallback<CB, O>
impl<'a, I, O, T, CB> ProducerCallback<'a, I> for MapCallback<CB, O>
where
CB: ProducerCallback<T>,
O: Fn(I) -> T + Sync + Send + Copy,
CB: ProducerCallback<'a, T>,
O: Fn(I) -> T + Sync + Send + Copy + 'a,
T: Send,
{
type Output = CB::Output;

fn callback<P>(self, base: P) -> CB::Output
where
P: Producer<Item = I>,
P: Producer<Item = I> + 'a,
{
let producer = MapProducer {
base,
@@ -112,17 +112,17 @@ where
}
}

impl<I, O, T, CB> IndexedProducerCallback<I> for MapCallback<CB, O>
impl<'a, I, O, T, CB> IndexedProducerCallback<'a, I> for MapCallback<CB, O>
where
CB: IndexedProducerCallback<T>,
O: Fn(I) -> T + Sync + Send + Copy,
CB: IndexedProducerCallback<'a, T>,
O: Fn(I) -> T + Sync + Send + Copy + 'a,
T: Send,
{
type Output = CB::Output;

fn callback<P>(self, base: P) -> CB::Output
where
P: IndexedProducer<Item = I>,
P: IndexedProducer<Item = I> + 'a,
{
let producer = MapProducer {
base,
@@ -196,11 +196,19 @@ where
self.base.into_iter().map(self.operation)
}

fn min_len(&self) -> usize {
fn splits(&self) -> Option<usize> {
self.base.splits()
}

fn len(&self) -> usize {
self.base.len()
}

fn min_len(&self) -> Option<usize> {
self.base.min_len()
}

fn max_len(&self) -> usize {
fn max_len(&self) -> Option<usize> {
self.base.max_len()
}



+ 6
- 6
asparit/src/std/range.rs View File

@@ -10,7 +10,7 @@ struct IterProducer {
range: Range<usize>,
}

impl IntoParallelIterator for Range<usize> {
impl<'a> IntoParallelIterator<'a> for Range<usize> {
type Iter = Iter;
type Item = usize;

@@ -19,15 +19,15 @@ impl IntoParallelIterator for Range<usize> {
}
}

impl ParallelIterator for Iter {
impl<'a> ParallelIterator<'a> for Iter {
type Item = usize;

fn drive<E, C, D, R>(self, executor: E, consumer: C) -> E::Result
where
E: Executor<D>,
C: Consumer<Self::Item, Result = D, Reducer = R>,
E: Executor<'a, D>,
C: Consumer<Self::Item, Result = D, Reducer = R> + 'a,
D: Send,
R: Reducer<D>
R: Reducer<D> + Send,
{
self.with_producer(ExecutorCallback::new(executor, consumer))
}
@@ -38,7 +38,7 @@ impl ParallelIterator for Iter {

fn with_producer<CB>(self, callback: CB) -> CB::Output
where
CB: ProducerCallback<Self::Item>,
CB: ProducerCallback<'a, Self::Item>,
{
callback.callback(IterProducer { range: self.range })
}


Loading…
Cancel
Save