| @@ -15,7 +15,7 @@ pub trait ParallelDrainFull<'a> { | |||||
| /// The type of item that the parallel iterator will produce. | /// The type of item that the parallel iterator will produce. | ||||
| /// This is usually the same as `IntoParallelIterator::Item`. | /// This is usually the same as `IntoParallelIterator::Item`. | ||||
| type Item: Send; | |||||
| type Item: Send + 'a; | |||||
| /// Returns a draining parallel iterator over an entire collection. | /// Returns a draining parallel iterator over an entire collection. | ||||
| /// | /// | ||||
| @@ -57,7 +57,7 @@ pub trait ParallelDrainRange<'a, Idx = usize> { | |||||
| /// The type of item that the parallel iterator will produce. | /// The type of item that the parallel iterator will produce. | ||||
| /// This is usually the same as `IntoParallelIterator::Item`. | /// This is usually the same as `IntoParallelIterator::Item`. | ||||
| type Item: Send; | |||||
| type Item: Send + 'a; | |||||
| /// Returns a draining parallel iterator over a range of the collection. | /// Returns a draining parallel iterator over a range of the collection. | ||||
| /// | /// | ||||
| @@ -2,7 +2,7 @@ use crate::{DefaultExecutor, Executor}; | |||||
| pub trait Driver<'a, D>: Sized | pub trait Driver<'a, D>: Sized | ||||
| where | where | ||||
| D: Send, | |||||
| D: Send + 'a, | |||||
| { | { | ||||
| fn exec_with<E>(self, executor: E) -> E::Result | fn exec_with<E>(self, executor: E) -> E::Result | ||||
| where | where | ||||
| @@ -2,9 +2,9 @@ use super::{ | |||||
| Consumer, IndexedProducer, IndexedProducerCallback, Producer, ProducerCallback, Reducer, | Consumer, IndexedProducer, IndexedProducerCallback, Producer, ProducerCallback, Reducer, | ||||
| }; | }; | ||||
| pub trait Executor<'a, D> | |||||
| pub trait Executor<'a, D>: Sized | |||||
| where | where | ||||
| D: Send, | |||||
| D: Send + 'a, | |||||
| { | { | ||||
| type Result: Send; | type Result: Send; | ||||
| @@ -12,13 +12,20 @@ where | |||||
| where | where | ||||
| P: Producer + 'a, | P: Producer + 'a, | ||||
| C: Consumer<P::Item, Result = D, Reducer = R> + 'a, | C: Consumer<P::Item, Result = D, Reducer = R> + 'a, | ||||
| R: Reducer<D> + Send; | |||||
| R: Reducer<D> + Send + 'a; | |||||
| fn exec_indexed<P, C, R>(self, producer: P, consumer: C) -> Self::Result | fn exec_indexed<P, C, R>(self, producer: P, consumer: C) -> Self::Result | ||||
| where | where | ||||
| P: IndexedProducer + 'a, | P: IndexedProducer + 'a, | ||||
| C: Consumer<P::Item, Result = D, Reducer = R> + 'a, | C: Consumer<P::Item, Result = D, Reducer = R> + 'a, | ||||
| R: Reducer<D> + Send; | |||||
| R: Reducer<D> + Send + 'a; | |||||
| fn split(self) -> (Self, Self); | |||||
| fn join<R>(left: Self::Result, right: Self::Result, reducer: R) -> Self::Result | |||||
| where | |||||
| R: Reducer<D> + Send + 'a, | |||||
| D: 'a; | |||||
| } | } | ||||
| pub struct ExecutorCallback<E, C> { | pub struct ExecutorCallback<E, C> { | ||||
| @@ -35,9 +42,9 @@ impl<E, C> ExecutorCallback<E, C> { | |||||
| impl<'a, E, D, C, I, R> ProducerCallback<'a, I> for ExecutorCallback<E, C> | impl<'a, E, D, C, I, R> ProducerCallback<'a, I> for ExecutorCallback<E, C> | ||||
| where | where | ||||
| E: Executor<'a, D>, | E: Executor<'a, D>, | ||||
| D: Send, | |||||
| D: Send + 'a, | |||||
| C: Consumer<I, Result = D, Reducer = R> + 'a, | C: Consumer<I, Result = D, Reducer = R> + 'a, | ||||
| R: Reducer<D> + Send, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | { | ||||
| type Output = E::Result; | type Output = E::Result; | ||||
| @@ -52,9 +59,9 @@ where | |||||
| impl<'a, E, D, C, I, R> IndexedProducerCallback<'a, I> for ExecutorCallback<E, C> | impl<'a, E, D, C, I, R> IndexedProducerCallback<'a, I> for ExecutorCallback<E, C> | ||||
| where | where | ||||
| E: Executor<'a, D>, | E: Executor<'a, D>, | ||||
| D: Send, | |||||
| D: Send + 'a, | |||||
| C: Consumer<I, Result = D, Reducer = R> + 'a, | C: Consumer<I, Result = D, Reducer = R> + 'a, | ||||
| R: Reducer<D> + Send, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | { | ||||
| type Output = E::Result; | type Output = E::Result; | ||||
| @@ -58,7 +58,8 @@ where | |||||
| fn from_par_iter<'a, E, X>(executor: E, iterator: X) -> E::Result | fn from_par_iter<'a, E, X>(executor: E, iterator: X) -> E::Result | ||||
| where | where | ||||
| E: Executor<'a, Self>, | E: Executor<'a, Self>, | ||||
| X: IntoParallelIterator<'a, Item = T>; | |||||
| X: IntoParallelIterator<'a, Item = T>, | |||||
| T: 'a; | |||||
| } | } | ||||
| impl FromParallelIterator<()> for () { | impl FromParallelIterator<()> for () { | ||||
| @@ -13,7 +13,7 @@ pub trait IntoParallelIterator<'a> { | |||||
| type Iter: ParallelIterator<'a, Item = Self::Item>; | type Iter: ParallelIterator<'a, Item = Self::Item>; | ||||
| /// The type of item that the parallel iterator will produce. | /// The type of item that the parallel iterator will produce. | ||||
| type Item: Send; | |||||
| type Item: Send + 'a; | |||||
| /// Converts `self` into a parallel iterator. | /// Converts `self` into a parallel iterator. | ||||
| /// | /// | ||||
| @@ -2,11 +2,13 @@ use std::cmp::{Ord, Ordering}; | |||||
| use std::iter::IntoIterator; | use std::iter::IntoIterator; | ||||
| use super::{ | use super::{ | ||||
| Consumer, Executor, FromParallelIterator, IndexedProducerCallback, ProducerCallback, Reducer, | |||||
| Consumer, Executor, FromParallelIterator, IndexedProducerCallback, IntoParallelIterator, | |||||
| ProducerCallback, Reducer, | |||||
| }; | }; | ||||
| use crate::{ | use crate::{ | ||||
| inner::{ | inner::{ | ||||
| chain::Chain, | |||||
| cloned::Cloned, | cloned::Cloned, | ||||
| collect::Collect, | collect::Collect, | ||||
| copied::Copied, | copied::Copied, | ||||
| @@ -53,7 +55,7 @@ pub trait ParallelIterator<'a>: Sized + Send { | |||||
| /// item that your closure will be invoked with. | /// item that your closure will be invoked with. | ||||
| /// | /// | ||||
| /// [`for_each`]: #method.for_each | /// [`for_each`]: #method.for_each | ||||
| type Item: Send; | |||||
| type Item: Send + 'a; | |||||
| /// Internal method used to define the behavior of this parallel | /// Internal method used to define the behavior of this parallel | ||||
| /// iterator. You should not need to call this directly. | /// iterator. You should not need to call this directly. | ||||
| @@ -71,8 +73,8 @@ pub trait ParallelIterator<'a>: Sized + Send { | |||||
| where | where | ||||
| E: Executor<'a, D>, | E: Executor<'a, D>, | ||||
| C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | ||||
| D: Send, | |||||
| R: Reducer<D> + Send; | |||||
| D: Send + 'a, | |||||
| R: Reducer<D> + Send + 'a; | |||||
| /// Internal method used to define the behavior of this parallel | /// Internal method used to define the behavior of this parallel | ||||
| /// iterator. You should not need to call this directly. | /// iterator. You should not need to call this directly. | ||||
| @@ -1194,6 +1196,29 @@ pub trait ParallelIterator<'a>: Sized + Send { | |||||
| MaxBy::new(self, operation) | MaxBy::new(self, operation) | ||||
| } | } | ||||
| /// Takes two iterators and creates a new iterator over both. | |||||
| /// | |||||
| /// # Examples | |||||
| /// | |||||
| /// ``` | |||||
| /// use rayon::prelude::*; | |||||
| /// | |||||
| /// let a = [0, 1, 2]; | |||||
| /// let b = [9, 8, 7]; | |||||
| /// | |||||
| /// let par_iter = a.par_iter().chain(b.par_iter()); | |||||
| /// | |||||
| /// let chained: Vec<_> = par_iter.cloned().collect(); | |||||
| /// | |||||
| /// assert_eq!(&chained[..], &[0, 1, 2, 9, 8, 7]); | |||||
| /// ``` | |||||
| fn chain<C>(self, chain: C) -> Chain<Self, C::Iter> | |||||
| where | |||||
| C: IntoParallelIterator<'a, Item = Self::Item>, | |||||
| { | |||||
| Chain::new(self, chain.into_par_iter()) | |||||
| } | |||||
| /// Creates a fresh collection containing all the elements produced | /// Creates a fresh collection containing all the elements produced | ||||
| /// by this parallel iterator. | /// by this parallel iterator. | ||||
| /// | /// | ||||
| @@ -1250,8 +1275,8 @@ pub trait IndexedParallelIterator<'a>: ParallelIterator<'a> { | |||||
| where | where | ||||
| E: Executor<'a, D>, | E: Executor<'a, D>, | ||||
| C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | ||||
| D: Send, | |||||
| R: Reducer<D> + Send; | |||||
| D: Send + 'a, | |||||
| R: Reducer<D> + Send + 'a; | |||||
| /// Internal method used to define the behavior of this parallel | /// Internal method used to define the behavior of this parallel | ||||
| /// iterator. You should not need to call this directly. | /// iterator. You should not need to call this directly. | ||||
| @@ -5,7 +5,7 @@ pub struct Sequential; | |||||
| impl<'a, D> Executor<'a, D> for Sequential | impl<'a, D> Executor<'a, D> for Sequential | ||||
| where | where | ||||
| D: Send, | |||||
| D: Send + 'a, | |||||
| { | { | ||||
| type Result = D; | type Result = D; | ||||
| @@ -34,4 +34,15 @@ where | |||||
| producer.fold_with(consumer.into_folder()).complete() | producer.fold_with(consumer.into_folder()).complete() | ||||
| } | } | ||||
| } | } | ||||
| fn split(self) -> (Self, Self) { | |||||
| (Self, Self) | |||||
| } | |||||
| fn join<R>(left: D, right: D, reducer: R) -> Self::Result | |||||
| where | |||||
| R: Reducer<D> + Send, | |||||
| { | |||||
| reducer.reduce(left, right) | |||||
| } | |||||
| } | } | ||||
| @@ -29,7 +29,7 @@ impl Default for Tokio { | |||||
| impl<'a, D> Executor<'a, D> for Tokio | impl<'a, D> Executor<'a, D> for Tokio | ||||
| where | where | ||||
| D: Send, | |||||
| D: Send + 'a, | |||||
| { | { | ||||
| type Result = BoxFuture<'a, D>; | type Result = BoxFuture<'a, D>; | ||||
| @@ -37,7 +37,7 @@ where | |||||
| where | where | ||||
| P: Producer + 'a, | P: Producer + 'a, | ||||
| C: Consumer<P::Item, Result = D, Reducer = R> + 'a, | C: Consumer<P::Item, Result = D, Reducer = R> + 'a, | ||||
| R: Reducer<D> + Send, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | { | ||||
| let splits = producer.splits().unwrap_or(self.splits); | let splits = producer.splits().unwrap_or(self.splits); | ||||
| let splitter = Splitter::new(splits); | let splitter = Splitter::new(splits); | ||||
| @@ -49,7 +49,7 @@ where | |||||
| where | where | ||||
| P: IndexedProducer + 'a, | P: IndexedProducer + 'a, | ||||
| C: Consumer<P::Item, Result = D, Reducer = R> + 'a, | C: Consumer<P::Item, Result = D, Reducer = R> + 'a, | ||||
| R: Reducer<D> + Send, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | { | ||||
| let splits = producer.splits().unwrap_or(self.splits); | let splits = producer.splits().unwrap_or(self.splits); | ||||
| let splitter = IndexedSplitter::new( | let splitter = IndexedSplitter::new( | ||||
| @@ -61,6 +61,31 @@ where | |||||
| exec_indexed(splitter, producer, consumer) | exec_indexed(splitter, producer, consumer) | ||||
| } | } | ||||
| fn split(self) -> (Self, Self) { | |||||
| let mut left = self; | |||||
| let right = Self { | |||||
| splits: left.splits / 2, | |||||
| }; | |||||
| left.splits -= right.splits; | |||||
| (left, right) | |||||
| } | |||||
| fn join<R>(left: Self::Result, right: Self::Result, reducer: R) -> Self::Result | |||||
| where | |||||
| R: Reducer<D> + Send + 'a, | |||||
| D: 'a, | |||||
| { | |||||
| async move { | |||||
| let left = left.await; | |||||
| let right = right.await; | |||||
| reducer.reduce(left, right) | |||||
| } | |||||
| .boxed() | |||||
| } | |||||
| } | } | ||||
| fn exec<'a, P, C>(mut splitter: Splitter, producer: P, consumer: C) -> BoxFuture<'a, C::Result> | fn exec<'a, P, C>(mut splitter: Splitter, producer: P, consumer: C) -> BoxFuture<'a, C::Result> | ||||
| @@ -0,0 +1,530 @@ | |||||
| use std::cmp::{max, min}; | |||||
| use std::iter::{DoubleEndedIterator, ExactSizeIterator, Iterator}; | |||||
| use crate::{ | |||||
| Consumer, Executor, Folder, IndexedParallelIterator, IndexedProducer, IndexedProducerCallback, | |||||
| ParallelIterator, Producer, ProducerCallback, Reducer, | |||||
| }; | |||||
| /* Chain */ | |||||
| pub struct Chain<X1, X2> { | |||||
| iterator_1: X1, | |||||
| iterator_2: X2, | |||||
| } | |||||
| impl<X1, X2> Chain<X1, X2> { | |||||
| pub fn new(iterator_1: X1, iterator_2: X2) -> Self { | |||||
| Self { | |||||
| iterator_1, | |||||
| iterator_2, | |||||
| } | |||||
| } | |||||
| } | |||||
| impl<'a, X1, X2, T> ParallelIterator<'a> for Chain<X1, X2> | |||||
| where | |||||
| X1: ParallelIterator<'a, Item = T>, | |||||
| X2: ParallelIterator<'a, Item = T>, | |||||
| T: Send + 'a, | |||||
| { | |||||
| type Item = T; | |||||
| fn drive<E, C, D, R>(self, executor: E, consumer: C) -> E::Result | |||||
| where | |||||
| E: Executor<'a, D>, | |||||
| C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | |||||
| D: Send + 'a, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | |||||
| let left = self.iterator_1; | |||||
| let right = self.iterator_2; | |||||
| let (left_executor, right_executor) = executor.split(); | |||||
| let (left_consumer, right_consumer, reducer) = match left.len_hint_opt() { | |||||
| Some(len) => consumer.split_at(len), | |||||
| None => consumer.split(), | |||||
| }; | |||||
| let left = left.drive(left_executor, left_consumer); | |||||
| let right = right.drive(right_executor, right_consumer); | |||||
| E::join(left, right, reducer) | |||||
| } | |||||
| fn with_producer<CB>(self, base: CB) -> CB::Output | |||||
| where | |||||
| CB: ProducerCallback<'a, Self::Item>, | |||||
| { | |||||
| self.iterator_1.with_producer(ChainCallback1 { | |||||
| base, | |||||
| iterator_2: self.iterator_2, | |||||
| }) | |||||
| } | |||||
| fn len_hint_opt(&self) -> Option<usize> { | |||||
| let len_1 = self.iterator_1.len_hint_opt(); | |||||
| let len_2 = self.iterator_2.len_hint_opt(); | |||||
| match (len_1, len_2) { | |||||
| (Some(x1), Some(x2)) => Some(x1 + x2), | |||||
| (_, _) => None, | |||||
| } | |||||
| } | |||||
| } | |||||
| impl<'a, X1, X2, T> IndexedParallelIterator<'a> for Chain<X1, X2> | |||||
| where | |||||
| X1: IndexedParallelIterator<'a, Item = T>, | |||||
| X2: IndexedParallelIterator<'a, Item = T>, | |||||
| T: Send + 'a, | |||||
| { | |||||
| fn drive_indexed<E, C, D, R>(self, executor: E, consumer: C) -> E::Result | |||||
| where | |||||
| E: Executor<'a, D>, | |||||
| C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | |||||
| D: Send + 'a, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | |||||
| let left = self.iterator_1; | |||||
| let right = self.iterator_2; | |||||
| let (left_executor, right_executor) = executor.split(); | |||||
| let (left_consumer, right_consumer, reducer) = consumer.split_at(left.len_hint()); | |||||
| let left = left.drive(left_executor, left_consumer); | |||||
| let right = right.drive(right_executor, right_consumer); | |||||
| E::join(left, right, reducer) | |||||
| } | |||||
| fn with_producer_indexed<CB>(self, base: CB) -> CB::Output | |||||
| where | |||||
| CB: IndexedProducerCallback<'a, Self::Item>, | |||||
| { | |||||
| self.iterator_1.with_producer_indexed(ChainCallback1 { | |||||
| base, | |||||
| iterator_2: self.iterator_2, | |||||
| }) | |||||
| } | |||||
| fn len_hint(&self) -> usize { | |||||
| self.iterator_1.len_hint() + self.iterator_2.len_hint() | |||||
| } | |||||
| } | |||||
| /* ChainCallback1 */ | |||||
| struct ChainCallback1<CB, X2> { | |||||
| base: CB, | |||||
| iterator_2: X2, | |||||
| } | |||||
| impl<'a, CB, X2, T> ProducerCallback<'a, T> for ChainCallback1<CB, X2> | |||||
| where | |||||
| CB: ProducerCallback<'a, T>, | |||||
| X2: ParallelIterator<'a, Item = T>, | |||||
| T: Send + 'a, | |||||
| { | |||||
| type Output = CB::Output; | |||||
| fn callback<P>(self, producer_1: P) -> Self::Output | |||||
| where | |||||
| P: Producer<Item = T> + 'a, | |||||
| { | |||||
| let base = self.base; | |||||
| self.iterator_2 | |||||
| .with_producer(ChainCallback2 { base, producer_1 }) | |||||
| } | |||||
| } | |||||
| impl<'a, CB, X2, T> IndexedProducerCallback<'a, T> for ChainCallback1<CB, X2> | |||||
| where | |||||
| CB: IndexedProducerCallback<'a, T>, | |||||
| X2: IndexedParallelIterator<'a, Item = T>, | |||||
| T: Send + 'a, | |||||
| { | |||||
| type Output = CB::Output; | |||||
| fn callback<P>(self, producer_1: P) -> Self::Output | |||||
| where | |||||
| P: IndexedProducer<Item = T> + 'a, | |||||
| { | |||||
| let base = self.base; | |||||
| self.iterator_2 | |||||
| .with_producer_indexed(ChainCallback2 { base, producer_1 }) | |||||
| } | |||||
| } | |||||
| /* ChainCallback2 */ | |||||
| struct ChainCallback2<CB, P1> { | |||||
| base: CB, | |||||
| producer_1: P1, | |||||
| } | |||||
| impl<'a, CB, P1, T> ProducerCallback<'a, T> for ChainCallback2<CB, P1> | |||||
| where | |||||
| CB: ProducerCallback<'a, T>, | |||||
| P1: Producer<Item = T> + 'a, | |||||
| T: Send + 'a, | |||||
| { | |||||
| type Output = CB::Output; | |||||
| fn callback<P>(self, producer_2: P) -> Self::Output | |||||
| where | |||||
| P: Producer<Item = T> + 'a, | |||||
| { | |||||
| let producer_1 = self.producer_1; | |||||
| self.base.callback(ChainProducer { | |||||
| producer_1: Some(producer_1), | |||||
| producer_2: Some(producer_2), | |||||
| }) | |||||
| } | |||||
| } | |||||
| impl<'a, CB, P1, T> IndexedProducerCallback<'a, T> for ChainCallback2<CB, P1> | |||||
| where | |||||
| CB: IndexedProducerCallback<'a, T>, | |||||
| P1: IndexedProducer<Item = T> + 'a, | |||||
| T: Send + 'a, | |||||
| { | |||||
| type Output = CB::Output; | |||||
| fn callback<P>(self, producer_2: P) -> Self::Output | |||||
| where | |||||
| P: IndexedProducer<Item = T> + 'a, | |||||
| { | |||||
| let producer_1 = self.producer_1; | |||||
| self.base.callback(ChainProducer { | |||||
| producer_1: Some(producer_1), | |||||
| producer_2: Some(producer_2), | |||||
| }) | |||||
| } | |||||
| } | |||||
| /* ChainProducer */ | |||||
| struct ChainProducer<P1, P2> { | |||||
| producer_1: Option<P1>, | |||||
| producer_2: Option<P2>, | |||||
| } | |||||
| impl<'a, P1, P2, T> Producer for ChainProducer<P1, P2> | |||||
| where | |||||
| P1: Producer<Item = T>, | |||||
| P2: Producer<Item = T>, | |||||
| { | |||||
| type Item = T; | |||||
| type IntoIter = ChainIter<P1::IntoIter, P2::IntoIter>; | |||||
| fn into_iter(self) -> Self::IntoIter { | |||||
| ChainIter::new( | |||||
| self.producer_1.map(Producer::into_iter), | |||||
| self.producer_2.map(Producer::into_iter), | |||||
| ) | |||||
| } | |||||
| fn split(self) -> (Self, Option<Self>) { | |||||
| match (self.producer_1, self.producer_2) { | |||||
| (Some(p1), Some(p2)) => { | |||||
| let left = ChainProducer { | |||||
| producer_1: Some(p1), | |||||
| producer_2: None, | |||||
| }; | |||||
| let right = Some(ChainProducer { | |||||
| producer_1: None, | |||||
| producer_2: Some(p2), | |||||
| }); | |||||
| (left, right) | |||||
| } | |||||
| (Some(p1), None) => { | |||||
| let (left, right) = p1.split(); | |||||
| let left = ChainProducer { | |||||
| producer_1: Some(left), | |||||
| producer_2: None, | |||||
| }; | |||||
| let right = right.map(|right| ChainProducer { | |||||
| producer_1: Some(right), | |||||
| producer_2: None, | |||||
| }); | |||||
| (left, right) | |||||
| } | |||||
| (None, Some(p2)) => { | |||||
| let (left, right) = p2.split(); | |||||
| let left = ChainProducer { | |||||
| producer_1: None, | |||||
| producer_2: Some(left), | |||||
| }; | |||||
| let right = right.map(|right| ChainProducer { | |||||
| producer_1: None, | |||||
| producer_2: Some(right), | |||||
| }); | |||||
| (left, right) | |||||
| } | |||||
| (None, None) => unreachable!(), | |||||
| } | |||||
| } | |||||
| fn splits(&self) -> Option<usize> { | |||||
| let splits_1 = self.producer_1.as_ref().and_then(|p| p.splits()); | |||||
| let splits_2 = self.producer_2.as_ref().and_then(|p| p.splits()); | |||||
| match (splits_1, splits_2) { | |||||
| (Some(splits), _) => Some(splits), | |||||
| (None, Some(splits)) => Some(splits), | |||||
| (None, None) => None, | |||||
| } | |||||
| } | |||||
| fn fold_with<F>(self, folder: F) -> F | |||||
| where | |||||
| F: Folder<Self::Item>, | |||||
| { | |||||
| let folder = match self.producer_1 { | |||||
| Some(p) => p.fold_with(folder), | |||||
| None => folder, | |||||
| }; | |||||
| if folder.is_full() { | |||||
| return folder; | |||||
| } | |||||
| match self.producer_2 { | |||||
| Some(p) => p.fold_with(folder), | |||||
| None => folder, | |||||
| } | |||||
| } | |||||
| } | |||||
| impl<'a, P1, P2, T> IndexedProducer for ChainProducer<P1, P2> | |||||
| where | |||||
| P1: IndexedProducer<Item = T>, | |||||
| P2: IndexedProducer<Item = T>, | |||||
| { | |||||
| type Item = T; | |||||
| type IntoIter = ChainIter<P1::IntoIter, P2::IntoIter>; | |||||
| fn into_iter(self) -> Self::IntoIter { | |||||
| ChainIter::new( | |||||
| self.producer_1.map(IndexedProducer::into_iter), | |||||
| self.producer_2.map(IndexedProducer::into_iter), | |||||
| ) | |||||
| } | |||||
| fn len(&self) -> usize { | |||||
| let mut len = 0; | |||||
| if let Some(p) = &self.producer_1 { | |||||
| len += p.len(); | |||||
| } | |||||
| if let Some(p) = &self.producer_2 { | |||||
| len += p.len(); | |||||
| } | |||||
| len | |||||
| } | |||||
| fn split_at(self, mut index: usize) -> (Self, Self) { | |||||
| match (self.producer_1, self.producer_2) { | |||||
| (Some(p1), Some(p2)) if index < p1.len() => { | |||||
| let (left, right) = p1.split_at(index); | |||||
| let left = ChainProducer { | |||||
| producer_1: Some(left), | |||||
| producer_2: None, | |||||
| }; | |||||
| let right = ChainProducer { | |||||
| producer_1: Some(right), | |||||
| producer_2: Some(p2), | |||||
| }; | |||||
| (left, right) | |||||
| } | |||||
| (Some(p1), Some(p2)) if index > p1.len() => { | |||||
| index -= p1.len(); | |||||
| let (left, right) = p2.split_at(index); | |||||
| let left = ChainProducer { | |||||
| producer_1: Some(p1), | |||||
| producer_2: Some(left), | |||||
| }; | |||||
| let right = ChainProducer { | |||||
| producer_1: None, | |||||
| producer_2: Some(right), | |||||
| }; | |||||
| (left, right) | |||||
| } | |||||
| (Some(p1), Some(p2)) => { | |||||
| let left = ChainProducer { | |||||
| producer_1: Some(p1), | |||||
| producer_2: None, | |||||
| }; | |||||
| let right = ChainProducer { | |||||
| producer_1: None, | |||||
| producer_2: Some(p2), | |||||
| }; | |||||
| (left, right) | |||||
| } | |||||
| (Some(p1), None) => { | |||||
| let (left, right) = p1.split_at(index); | |||||
| let left = ChainProducer { | |||||
| producer_1: Some(left), | |||||
| producer_2: None, | |||||
| }; | |||||
| let right = ChainProducer { | |||||
| producer_1: Some(right), | |||||
| producer_2: None, | |||||
| }; | |||||
| (left, right) | |||||
| } | |||||
| (None, Some(p2)) => { | |||||
| let (left, right) = p2.split_at(index); | |||||
| let left = ChainProducer { | |||||
| producer_1: None, | |||||
| producer_2: Some(left), | |||||
| }; | |||||
| let right = ChainProducer { | |||||
| producer_1: None, | |||||
| producer_2: Some(right), | |||||
| }; | |||||
| (left, right) | |||||
| } | |||||
| (None, None) => unreachable!(), | |||||
| } | |||||
| } | |||||
| fn splits(&self) -> Option<usize> { | |||||
| let splits_1 = self.producer_1.as_ref().and_then(|p| p.splits()); | |||||
| let splits_2 = self.producer_2.as_ref().and_then(|p| p.splits()); | |||||
| match (splits_1, splits_2) { | |||||
| (Some(splits), _) => Some(splits), | |||||
| (None, Some(splits)) => Some(splits), | |||||
| (None, None) => None, | |||||
| } | |||||
| } | |||||
| fn min_len(&self) -> Option<usize> { | |||||
| let min_1 = self.producer_1.as_ref().and_then(|p| p.min_len()); | |||||
| let min_2 = self.producer_2.as_ref().and_then(|p| p.min_len()); | |||||
| min(min_1, min_2) | |||||
| } | |||||
| fn max_len(&self) -> Option<usize> { | |||||
| let max_1 = self.producer_1.as_ref().and_then(|p| p.max_len()); | |||||
| let max_2 = self.producer_2.as_ref().and_then(|p| p.max_len()); | |||||
| max(max_1, max_2) | |||||
| } | |||||
| fn fold_with<F>(self, folder: F) -> F | |||||
| where | |||||
| F: Folder<Self::Item>, | |||||
| { | |||||
| let folder = match self.producer_1 { | |||||
| Some(p) => p.fold_with(folder), | |||||
| None => folder, | |||||
| }; | |||||
| if folder.is_full() { | |||||
| return folder; | |||||
| } | |||||
| match self.producer_2 { | |||||
| Some(p) => p.fold_with(folder), | |||||
| None => folder, | |||||
| } | |||||
| } | |||||
| } | |||||
| /* ChainIter */ | |||||
| enum ChainIter<I1, I2> { | |||||
| Empty, | |||||
| Iter1(I1), | |||||
| Iter2(I2), | |||||
| Chain(std::iter::Chain<I1, I2>), | |||||
| } | |||||
| impl<I1, I2, T> ChainIter<I1, I2> | |||||
| where | |||||
| I1: Iterator<Item = T>, | |||||
| I2: Iterator<Item = T>, | |||||
| { | |||||
| fn new(i1: Option<I1>, i2: Option<I2>) -> Self { | |||||
| match (i1, i2) { | |||||
| (Some(i1), Some(i2)) => Self::Chain(i1.chain(i2)), | |||||
| (Some(i1), None) => Self::Iter1(i1), | |||||
| (None, Some(i2)) => Self::Iter2(i2), | |||||
| (None, None) => Self::Empty, | |||||
| } | |||||
| } | |||||
| } | |||||
| impl<I1, I2, T> Iterator for ChainIter<I1, I2> | |||||
| where | |||||
| I1: Iterator<Item = T>, | |||||
| I2: Iterator<Item = T>, | |||||
| { | |||||
| type Item = T; | |||||
| fn next(&mut self) -> Option<Self::Item> { | |||||
| match self { | |||||
| Self::Empty => None, | |||||
| Self::Iter1(i) => i.next(), | |||||
| Self::Iter2(i) => i.next(), | |||||
| Self::Chain(i) => i.next(), | |||||
| } | |||||
| } | |||||
| fn size_hint(&self) -> (usize, Option<usize>) { | |||||
| match self { | |||||
| Self::Empty => (0, Some(0)), | |||||
| Self::Iter1(i) => i.size_hint(), | |||||
| Self::Iter2(i) => i.size_hint(), | |||||
| Self::Chain(i) => i.size_hint(), | |||||
| } | |||||
| } | |||||
| } | |||||
| impl<I1, I2, T> DoubleEndedIterator for ChainIter<I1, I2> | |||||
| where | |||||
| I1: DoubleEndedIterator<Item = T>, | |||||
| I2: DoubleEndedIterator<Item = T>, | |||||
| { | |||||
| fn next_back(&mut self) -> Option<Self::Item> { | |||||
| match self { | |||||
| Self::Empty => None, | |||||
| Self::Iter1(i) => i.next_back(), | |||||
| Self::Iter2(i) => i.next_back(), | |||||
| Self::Chain(i) => i.next_back(), | |||||
| } | |||||
| } | |||||
| } | |||||
| impl<I1, I2, T> ExactSizeIterator for ChainIter<I1, I2> | |||||
| where | |||||
| I1: ExactSizeIterator<Item = T>, | |||||
| I2: ExactSizeIterator<Item = T>, | |||||
| { | |||||
| } | |||||
| @@ -26,8 +26,8 @@ where | |||||
| where | where | ||||
| E: Executor<'a, D>, | E: Executor<'a, D>, | ||||
| C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | ||||
| D: Send, | |||||
| R: Reducer<D> + Send, | |||||
| D: Send + 'a, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | { | ||||
| self.base.drive(executor, ClonedConsumer { base: consumer }) | self.base.drive(executor, ClonedConsumer { base: consumer }) | ||||
| } | } | ||||
| @@ -53,8 +53,8 @@ where | |||||
| where | where | ||||
| E: Executor<'a, D>, | E: Executor<'a, D>, | ||||
| C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | ||||
| D: Send, | |||||
| R: Reducer<D> + Send, | |||||
| D: Send + 'a, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | { | ||||
| self.base | self.base | ||||
| .drive_indexed(executor, ClonedConsumer { base: consumer }) | .drive_indexed(executor, ClonedConsumer { base: consumer }) | ||||
| @@ -22,7 +22,7 @@ impl<X, T> Collect<X, T> { | |||||
| impl<'a, X, T> Driver<'a, T> for Collect<X, T> | impl<'a, X, T> Driver<'a, T> for Collect<X, T> | ||||
| where | where | ||||
| X: ParallelIterator<'a>, | X: ParallelIterator<'a>, | ||||
| T: FromParallelIterator<X::Item> + Send, | |||||
| T: FromParallelIterator<X::Item> + Send + 'a, | |||||
| { | { | ||||
| fn exec_with<E>(self, executor: E) -> E::Result | fn exec_with<E>(self, executor: E) -> E::Result | ||||
| where | where | ||||
| @@ -26,8 +26,8 @@ where | |||||
| where | where | ||||
| E: Executor<'a, D>, | E: Executor<'a, D>, | ||||
| C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | ||||
| D: Send, | |||||
| R: Reducer<D> + Send, | |||||
| D: Send + 'a, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | { | ||||
| self.base.drive(executor, CopiedConsumer { base: consumer }) | self.base.drive(executor, CopiedConsumer { base: consumer }) | ||||
| } | } | ||||
| @@ -53,8 +53,8 @@ where | |||||
| where | where | ||||
| E: Executor<'a, D>, | E: Executor<'a, D>, | ||||
| C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | ||||
| D: Send, | |||||
| R: Reducer<D> + Send, | |||||
| D: Send + 'a, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | { | ||||
| self.base | self.base | ||||
| .drive_indexed(executor, CopiedConsumer { base: consumer }) | .drive_indexed(executor, CopiedConsumer { base: consumer }) | ||||
| @@ -24,8 +24,8 @@ where | |||||
| where | where | ||||
| E: Executor<'a, D>, | E: Executor<'a, D>, | ||||
| C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | ||||
| D: Send, | |||||
| R: Reducer<D> + Send, | |||||
| D: Send + 'a, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | { | ||||
| self.base.drive( | self.base.drive( | ||||
| executor, | executor, | ||||
| @@ -17,7 +17,7 @@ impl<'a, X, O, S> ParallelIterator<'a> for FilterMap<X, O> | |||||
| where | where | ||||
| X: ParallelIterator<'a>, | X: ParallelIterator<'a>, | ||||
| O: Fn(X::Item) -> Option<S> + Clone + Send + 'a, | O: Fn(X::Item) -> Option<S> + Clone + Send + 'a, | ||||
| S: Send, | |||||
| S: Send + 'a, | |||||
| { | { | ||||
| type Item = S; | type Item = S; | ||||
| @@ -25,8 +25,8 @@ where | |||||
| where | where | ||||
| E: Executor<'a, D>, | E: Executor<'a, D>, | ||||
| C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | ||||
| D: Send, | |||||
| R: Reducer<D> + Send, | |||||
| D: Send + 'a, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | { | ||||
| self.base.drive( | self.base.drive( | ||||
| executor, | executor, | ||||
| @@ -17,8 +17,8 @@ impl<X> FlattenIter<X> { | |||||
| impl<'a, X, SI> ParallelIterator<'a> for FlattenIter<X> | impl<'a, X, SI> ParallelIterator<'a> for FlattenIter<X> | ||||
| where | where | ||||
| X: ParallelIterator<'a, Item = SI>, | X: ParallelIterator<'a, Item = SI>, | ||||
| SI: IntoIterator + Send, | |||||
| SI::Item: Send, | |||||
| SI: IntoIterator + Send + 'a, | |||||
| SI::Item: Send + 'a, | |||||
| { | { | ||||
| type Item = SI::Item; | type Item = SI::Item; | ||||
| @@ -26,8 +26,8 @@ where | |||||
| where | where | ||||
| E: Executor<'a, D>, | E: Executor<'a, D>, | ||||
| C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | ||||
| D: Send, | |||||
| R: Reducer<D> + Send, | |||||
| D: Send + 'a, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | { | ||||
| self.base.drive( | self.base.drive( | ||||
| executor, | executor, | ||||
| @@ -66,8 +66,8 @@ impl<'a, X, O, SI> ParallelIterator<'a> for FlatMapIter<X, O> | |||||
| where | where | ||||
| X: ParallelIterator<'a>, | X: ParallelIterator<'a>, | ||||
| O: Fn(X::Item) -> SI + Clone + Send + 'a, | O: Fn(X::Item) -> SI + Clone + Send + 'a, | ||||
| SI: IntoIterator, | |||||
| SI::Item: Send, | |||||
| SI: IntoIterator + 'a, | |||||
| SI::Item: Send + 'a, | |||||
| { | { | ||||
| type Item = SI::Item; | type Item = SI::Item; | ||||
| @@ -75,8 +75,8 @@ where | |||||
| where | where | ||||
| E: Executor<'a, D>, | E: Executor<'a, D>, | ||||
| C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | ||||
| D: Send, | |||||
| R: Reducer<D> + Send, | |||||
| D: Send + 'a, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | { | ||||
| self.base.drive( | self.base.drive( | ||||
| executor, | executor, | ||||
| @@ -23,7 +23,7 @@ where | |||||
| X: ParallelIterator<'a>, | X: ParallelIterator<'a>, | ||||
| S: Fn() -> U + Clone + Send + 'a, | S: Fn() -> U + Clone + Send + 'a, | ||||
| O: Fn(U, X::Item) -> U + Clone + Send + 'a, | O: Fn(U, X::Item) -> U + Clone + Send + 'a, | ||||
| U: Send, | |||||
| U: Send + 'a, | |||||
| { | { | ||||
| type Item = U; | type Item = U; | ||||
| @@ -31,8 +31,8 @@ where | |||||
| where | where | ||||
| E: Executor<'a, D>, | E: Executor<'a, D>, | ||||
| C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | ||||
| D: Send, | |||||
| R: Reducer<D> + Send, | |||||
| D: Send + 'a, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | { | ||||
| self.base.drive( | self.base.drive( | ||||
| executor, | executor, | ||||
| @@ -86,8 +86,8 @@ where | |||||
| where | where | ||||
| E: Executor<'a, D>, | E: Executor<'a, D>, | ||||
| C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | ||||
| D: Send, | |||||
| R: Reducer<D> + Send, | |||||
| D: Send + 'a, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | { | ||||
| let FoldWith { | let FoldWith { | ||||
| base, | base, | ||||
| @@ -27,8 +27,8 @@ where | |||||
| where | where | ||||
| E: Executor<'a, D>, | E: Executor<'a, D>, | ||||
| C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | ||||
| D: Send, | |||||
| R: Reducer<D> + Send, | |||||
| D: Send + 'a, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | { | ||||
| self.base.drive( | self.base.drive( | ||||
| executor, | executor, | ||||
| @@ -63,8 +63,8 @@ where | |||||
| where | where | ||||
| E: Executor<'a, D>, | E: Executor<'a, D>, | ||||
| C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | ||||
| D: Send, | |||||
| R: Reducer<D> + Send, | |||||
| D: Send + 'a, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | { | ||||
| self.base.drive_indexed( | self.base.drive_indexed( | ||||
| executor, | executor, | ||||
| @@ -20,7 +20,7 @@ impl<'a, X, O, T> ParallelIterator<'a> for Map<X, O> | |||||
| where | where | ||||
| X: ParallelIterator<'a>, | X: ParallelIterator<'a>, | ||||
| O: Fn(X::Item) -> T + Clone + Sync + Send + 'a, | O: Fn(X::Item) -> T + Clone + Sync + Send + 'a, | ||||
| T: Send, | |||||
| T: Send + 'a, | |||||
| { | { | ||||
| type Item = O::Output; | type Item = O::Output; | ||||
| @@ -28,8 +28,8 @@ where | |||||
| where | where | ||||
| E: Executor<'a, D>, | E: Executor<'a, D>, | ||||
| C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | ||||
| D: Send, | |||||
| R: Reducer<D> + Send, | |||||
| D: Send + 'a, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | { | ||||
| let consumer = MapConsumer::new(consumer, self.operation); | let consumer = MapConsumer::new(consumer, self.operation); | ||||
| @@ -55,14 +55,14 @@ impl<'a, X, O, T> IndexedParallelIterator<'a> for Map<X, O> | |||||
| where | where | ||||
| X: IndexedParallelIterator<'a>, | X: IndexedParallelIterator<'a>, | ||||
| O: Fn(X::Item) -> T + Clone + Sync + Send + 'a, | O: Fn(X::Item) -> T + Clone + Sync + Send + 'a, | ||||
| T: Send, | |||||
| T: Send + 'a, | |||||
| { | { | ||||
| fn drive_indexed<E, C, D, R>(self, executor: E, consumer: C) -> E::Result | fn drive_indexed<E, C, D, R>(self, executor: E, consumer: C) -> E::Result | ||||
| where | where | ||||
| E: Executor<'a, D>, | E: Executor<'a, D>, | ||||
| C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | ||||
| D: Send, | |||||
| R: Reducer<D> + Send, | |||||
| D: Send + 'a, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | { | ||||
| let consumer = MapConsumer::new(consumer, self.operation); | let consumer = MapConsumer::new(consumer, self.operation); | ||||
| @@ -27,7 +27,7 @@ impl<'a, X, O, T, S, U> ParallelIterator<'a> for MapInit<X, S, O> | |||||
| where | where | ||||
| X: ParallelIterator<'a>, | X: ParallelIterator<'a>, | ||||
| O: Fn(&mut U, X::Item) -> T + Clone + Sync + Send + 'a, | O: Fn(&mut U, X::Item) -> T + Clone + Sync + Send + 'a, | ||||
| T: Send, | |||||
| T: Send + 'a, | |||||
| S: Fn() -> U + Clone + Send + 'a, | S: Fn() -> U + Clone + Send + 'a, | ||||
| { | { | ||||
| type Item = T; | type Item = T; | ||||
| @@ -36,8 +36,8 @@ where | |||||
| where | where | ||||
| E: Executor<'a, D>, | E: Executor<'a, D>, | ||||
| C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | ||||
| D: Send, | |||||
| R: Reducer<D> + Send, | |||||
| D: Send + 'a, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | { | ||||
| let consumer = MapInitConsumer::new(consumer, self.init, self.operation); | let consumer = MapInitConsumer::new(consumer, self.init, self.operation); | ||||
| @@ -64,15 +64,15 @@ impl<'a, X, O, T, S, U> IndexedParallelIterator<'a> for MapInit<X, S, O> | |||||
| where | where | ||||
| X: IndexedParallelIterator<'a>, | X: IndexedParallelIterator<'a>, | ||||
| O: Fn(&mut U, X::Item) -> T + Clone + Sync + Send + 'a, | O: Fn(&mut U, X::Item) -> T + Clone + Sync + Send + 'a, | ||||
| T: Send, | |||||
| T: Send + 'a, | |||||
| S: Fn() -> U + Clone + Send + 'a, | S: Fn() -> U + Clone + Send + 'a, | ||||
| { | { | ||||
| fn drive_indexed<E, C, D, R>(self, executor: E, consumer: C) -> E::Result | fn drive_indexed<E, C, D, R>(self, executor: E, consumer: C) -> E::Result | ||||
| where | where | ||||
| E: Executor<'a, D>, | E: Executor<'a, D>, | ||||
| C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | ||||
| D: Send, | |||||
| R: Reducer<D> + Send, | |||||
| D: Send + 'a, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | { | ||||
| let consumer = MapInitConsumer::new(consumer, self.init, self.operation); | let consumer = MapInitConsumer::new(consumer, self.init, self.operation); | ||||
| @@ -25,7 +25,7 @@ impl<'a, X, O, T, S> ParallelIterator<'a> for MapWith<X, S, O> | |||||
| where | where | ||||
| X: ParallelIterator<'a>, | X: ParallelIterator<'a>, | ||||
| O: Fn(&mut S, X::Item) -> T + Clone + Sync + Send + 'a, | O: Fn(&mut S, X::Item) -> T + Clone + Sync + Send + 'a, | ||||
| T: Send, | |||||
| T: Send + 'a, | |||||
| S: Clone + Send + 'a, | S: Clone + Send + 'a, | ||||
| { | { | ||||
| type Item = T; | type Item = T; | ||||
| @@ -34,8 +34,8 @@ where | |||||
| where | where | ||||
| E: Executor<'a, D>, | E: Executor<'a, D>, | ||||
| C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | ||||
| D: Send, | |||||
| R: Reducer<D> + Send, | |||||
| D: Send + 'a, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | { | ||||
| let consumer = MapWithConsumer::new(consumer, self.item, self.operation); | let consumer = MapWithConsumer::new(consumer, self.item, self.operation); | ||||
| @@ -62,15 +62,15 @@ impl<'a, X, O, T, S> IndexedParallelIterator<'a> for MapWith<X, S, O> | |||||
| where | where | ||||
| X: IndexedParallelIterator<'a>, | X: IndexedParallelIterator<'a>, | ||||
| O: Fn(&mut S, X::Item) -> T + Clone + Sync + Send + 'a, | O: Fn(&mut S, X::Item) -> T + Clone + Sync + Send + 'a, | ||||
| T: Send, | |||||
| T: Send + 'a, | |||||
| S: Clone + Send + 'a, | S: Clone + Send + 'a, | ||||
| { | { | ||||
| fn drive_indexed<E, C, D, R>(self, executor: E, consumer: C) -> E::Result | fn drive_indexed<E, C, D, R>(self, executor: E, consumer: C) -> E::Result | ||||
| where | where | ||||
| E: Executor<'a, D>, | E: Executor<'a, D>, | ||||
| C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | ||||
| D: Send, | |||||
| R: Reducer<D> + Send, | |||||
| D: Send + 'a, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | { | ||||
| let consumer = MapWithConsumer::new(consumer, self.item, self.operation); | let consumer = MapWithConsumer::new(consumer, self.item, self.operation); | ||||
| @@ -1,3 +1,4 @@ | |||||
| pub mod chain; | |||||
| pub mod cloned; | pub mod cloned; | ||||
| pub mod collect; | pub mod collect; | ||||
| pub mod copied; | pub mod copied; | ||||
| @@ -34,15 +35,21 @@ mod tests { | |||||
| let i = Arc::new(AtomicUsize::new(0)); | let i = Arc::new(AtomicUsize::new(0)); | ||||
| let j = Arc::new(AtomicUsize::new(0)); | let j = Arc::new(AtomicUsize::new(0)); | ||||
| let x = vec![ | |||||
| let a = vec![ | |||||
| vec![1usize, 2usize], | vec![1usize, 2usize], | ||||
| vec![3usize, 4usize], | vec![3usize, 4usize], | ||||
| vec![5usize, 6usize], | vec![5usize, 6usize], | ||||
| ]; | ]; | ||||
| let b = vec![ | |||||
| vec![7usize, 8usize], | |||||
| vec![9usize, 10usize], | |||||
| vec![11usize, 12usize], | |||||
| ]; | |||||
| let x = x | |||||
| let x = a | |||||
| .par_iter() | .par_iter() | ||||
| .cloned() | .cloned() | ||||
| .chain(b) | |||||
| .update(|x| x.push(0)) | .update(|x| x.push(0)) | ||||
| .flatten_iter() | .flatten_iter() | ||||
| .map_init( | .map_init( | ||||
| @@ -38,8 +38,8 @@ where | |||||
| where | where | ||||
| E: Executor<'a, D>, | E: Executor<'a, D>, | ||||
| C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | ||||
| D: Send, | |||||
| R: Reducer<D> + Send, | |||||
| D: Send + 'a, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | { | ||||
| self.base.drive( | self.base.drive( | ||||
| executor, | executor, | ||||
| @@ -98,8 +98,8 @@ where | |||||
| where | where | ||||
| E: Executor<'a, D>, | E: Executor<'a, D>, | ||||
| C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | ||||
| D: Send, | |||||
| R: Reducer<D> + Send, | |||||
| D: Send + 'a, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | { | ||||
| let TryFoldWith { | let TryFoldWith { | ||||
| base, | base, | ||||
| @@ -28,7 +28,7 @@ where | |||||
| X: ParallelIterator<'a, Item = T>, | X: ParallelIterator<'a, Item = T>, | ||||
| S: Fn() -> T::Ok + Clone + Send + 'a, | S: Fn() -> T::Ok + Clone + Send + 'a, | ||||
| O: Fn(T::Ok, T::Ok) -> T + Clone + Send + 'a, | O: Fn(T::Ok, T::Ok) -> T + Clone + Send + 'a, | ||||
| T: Try + Send, | |||||
| T: Try + Send + 'a, | |||||
| { | { | ||||
| fn exec_with<E>(self, executor: E) -> E::Result | fn exec_with<E>(self, executor: E) -> E::Result | ||||
| where | where | ||||
| @@ -68,7 +68,7 @@ impl<'a, X, O, T> Driver<'a, Option<T>> for TryReduceWith<X, O> | |||||
| where | where | ||||
| X: ParallelIterator<'a, Item = T>, | X: ParallelIterator<'a, Item = T>, | ||||
| O: Fn(T::Ok, T::Ok) -> T + Clone + Send + 'a, | O: Fn(T::Ok, T::Ok) -> T + Clone + Send + 'a, | ||||
| T: Try + Send, | |||||
| T: Try + Send + 'a, | |||||
| { | { | ||||
| fn exec_with<E>(self, executor: E) -> E::Result | fn exec_with<E>(self, executor: E) -> E::Result | ||||
| where | where | ||||
| @@ -27,8 +27,8 @@ where | |||||
| where | where | ||||
| E: Executor<'a, D>, | E: Executor<'a, D>, | ||||
| C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | ||||
| D: Send, | |||||
| R: Reducer<D> + Send, | |||||
| D: Send + 'a, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | { | ||||
| self.base.drive( | self.base.drive( | ||||
| executor, | executor, | ||||
| @@ -63,8 +63,8 @@ where | |||||
| where | where | ||||
| E: Executor<'a, D>, | E: Executor<'a, D>, | ||||
| C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | ||||
| D: Send, | |||||
| R: Reducer<D> + Send, | |||||
| D: Send + 'a, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | { | ||||
| self.base.drive_indexed( | self.base.drive_indexed( | ||||
| executor, | executor, | ||||
| @@ -91,8 +91,8 @@ macro_rules! unindexed_parallel_iterator_impl { | |||||
| where | where | ||||
| E: Executor<'a, D>, | E: Executor<'a, D>, | ||||
| C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | ||||
| D: Send, | |||||
| R: Reducer<D> + Send, | |||||
| D: Send + 'a, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | { | ||||
| self.with_producer(ExecutorCallback::new(executor, consumer)) | self.with_producer(ExecutorCallback::new(executor, consumer)) | ||||
| } | } | ||||
| @@ -144,8 +144,8 @@ macro_rules! indexed_parallel_iterator_impl { | |||||
| where | where | ||||
| E: Executor<'a, D>, | E: Executor<'a, D>, | ||||
| C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | ||||
| D: Send, | |||||
| R: Reducer<D> + Send, | |||||
| D: Send + 'a, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | { | ||||
| self.with_producer_indexed(ExecutorCallback::new(executor, consumer)) | self.with_producer_indexed(ExecutorCallback::new(executor, consumer)) | ||||
| } | } | ||||
| @@ -68,8 +68,8 @@ where | |||||
| where | where | ||||
| E: Executor<'a, D>, | E: Executor<'a, D>, | ||||
| C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | ||||
| D: Send, | |||||
| R: Reducer<D> + Send, | |||||
| D: Send + 'a, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | { | ||||
| self.with_producer_indexed(ExecutorCallback::new(executor, consumer)) | self.with_producer_indexed(ExecutorCallback::new(executor, consumer)) | ||||
| } | } | ||||
| @@ -94,8 +94,8 @@ where | |||||
| where | where | ||||
| E: Executor<'a, D>, | E: Executor<'a, D>, | ||||
| C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | ||||
| D: Send, | |||||
| R: Reducer<D> + Send, | |||||
| D: Send + 'a, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | { | ||||
| self.with_producer_indexed(ExecutorCallback::new(executor, consumer)) | self.with_producer_indexed(ExecutorCallback::new(executor, consumer)) | ||||
| } | } | ||||
| @@ -182,8 +182,8 @@ where | |||||
| where | where | ||||
| E: Executor<'a, D>, | E: Executor<'a, D>, | ||||
| C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | ||||
| D: Send, | |||||
| R: Reducer<D> + Send, | |||||
| D: Send + 'a, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | { | ||||
| self.with_producer_indexed(ExecutorCallback::new(executor, consumer)) | self.with_producer_indexed(ExecutorCallback::new(executor, consumer)) | ||||
| } | } | ||||
| @@ -208,8 +208,8 @@ where | |||||
| where | where | ||||
| E: Executor<'a, D>, | E: Executor<'a, D>, | ||||
| C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | ||||
| D: Send, | |||||
| R: Reducer<D> + Send, | |||||
| D: Send + 'a, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | { | ||||
| self.with_producer_indexed(ExecutorCallback::new(executor, consumer)) | self.with_producer_indexed(ExecutorCallback::new(executor, consumer)) | ||||
| } | } | ||||
| @@ -39,8 +39,8 @@ where | |||||
| where | where | ||||
| E: Executor<'a, D>, | E: Executor<'a, D>, | ||||
| C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | ||||
| D: Send, | |||||
| R: Reducer<D> + Send, | |||||
| D: Send + 'a, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | { | ||||
| self.with_producer_indexed(ExecutorCallback::new(executor, consumer)) | self.with_producer_indexed(ExecutorCallback::new(executor, consumer)) | ||||
| } | } | ||||
| @@ -65,8 +65,8 @@ where | |||||
| where | where | ||||
| E: Executor<'a, D>, | E: Executor<'a, D>, | ||||
| C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | ||||
| D: Send, | |||||
| R: Reducer<D> + Send, | |||||
| D: Send + 'a, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | { | ||||
| self.with_producer_indexed(ExecutorCallback::new(executor, consumer)) | self.with_producer_indexed(ExecutorCallback::new(executor, consumer)) | ||||
| } | } | ||||
| @@ -279,8 +279,8 @@ where | |||||
| where | where | ||||
| E: Executor<'a, D>, | E: Executor<'a, D>, | ||||
| C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | ||||
| D: Send, | |||||
| R: Reducer<D> + Send, | |||||
| D: Send + 'a, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | { | ||||
| self.with_producer_indexed(ExecutorCallback::new(executor, consumer)) | self.with_producer_indexed(ExecutorCallback::new(executor, consumer)) | ||||
| } | } | ||||
| @@ -305,8 +305,8 @@ where | |||||
| where | where | ||||
| E: Executor<'a, D>, | E: Executor<'a, D>, | ||||
| C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | ||||
| D: Send, | |||||
| R: Reducer<D> + Send, | |||||
| D: Send + 'a, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | { | ||||
| self.with_producer_indexed(ExecutorCallback::new(executor, consumer)) | self.with_producer_indexed(ExecutorCallback::new(executor, consumer)) | ||||
| } | } | ||||