From 07cdb9564581b18b8e63d93c03d910ade8e17cf2 Mon Sep 17 00:00:00 2001 From: Bergmann89 Date: Sat, 7 Nov 2020 23:58:25 +0100 Subject: [PATCH] Implemented 'chain' operation --- asparit/src/core/drain.rs | 4 +- asparit/src/core/driver.rs | 2 +- asparit/src/core/executor.rs | 23 +- asparit/src/core/from_iter.rs | 3 +- asparit/src/core/into_iter.rs | 2 +- asparit/src/core/iterator.rs | 37 +- asparit/src/executor/sequential.rs | 13 +- asparit/src/executor/tokio.rs | 31 +- asparit/src/inner/chain.rs | 530 +++++++++++++++++++++++++++++ asparit/src/inner/cloned.rs | 8 +- asparit/src/inner/collect.rs | 2 +- asparit/src/inner/copied.rs | 8 +- asparit/src/inner/filter.rs | 4 +- asparit/src/inner/filter_map.rs | 6 +- asparit/src/inner/flatten.rs | 16 +- asparit/src/inner/fold.rs | 10 +- asparit/src/inner/inspect.rs | 8 +- asparit/src/inner/map.rs | 12 +- asparit/src/inner/map_init.rs | 12 +- asparit/src/inner/map_with.rs | 12 +- asparit/src/inner/mod.rs | 11 +- asparit/src/inner/try_fold.rs | 8 +- asparit/src/inner/try_reduce.rs | 4 +- asparit/src/inner/update.rs | 8 +- asparit/src/std/range.rs | 8 +- asparit/src/std/slice.rs | 16 +- asparit/src/std/vec.rs | 16 +- 27 files changed, 710 insertions(+), 104 deletions(-) create mode 100644 asparit/src/inner/chain.rs diff --git a/asparit/src/core/drain.rs b/asparit/src/core/drain.rs index 83c7cdf..e182302 100644 --- a/asparit/src/core/drain.rs +++ b/asparit/src/core/drain.rs @@ -15,7 +15,7 @@ pub trait ParallelDrainFull<'a> { /// The type of item that the parallel iterator will produce. /// This is usually the same as `IntoParallelIterator::Item`. - type Item: Send; + type Item: Send + 'a; /// Returns a draining parallel iterator over an entire collection. /// @@ -57,7 +57,7 @@ pub trait ParallelDrainRange<'a, Idx = usize> { /// The type of item that the parallel iterator will produce. /// This is usually the same as `IntoParallelIterator::Item`. - type Item: Send; + type Item: Send + 'a; /// Returns a draining parallel iterator over a range of the collection. /// diff --git a/asparit/src/core/driver.rs b/asparit/src/core/driver.rs index 1e3de3c..7e650ac 100644 --- a/asparit/src/core/driver.rs +++ b/asparit/src/core/driver.rs @@ -2,7 +2,7 @@ use crate::{DefaultExecutor, Executor}; pub trait Driver<'a, D>: Sized where - D: Send, + D: Send + 'a, { fn exec_with(self, executor: E) -> E::Result where diff --git a/asparit/src/core/executor.rs b/asparit/src/core/executor.rs index dfaeff0..e660abe 100644 --- a/asparit/src/core/executor.rs +++ b/asparit/src/core/executor.rs @@ -2,9 +2,9 @@ use super::{ Consumer, IndexedProducer, IndexedProducerCallback, Producer, ProducerCallback, Reducer, }; -pub trait Executor<'a, D> +pub trait Executor<'a, D>: Sized where - D: Send, + D: Send + 'a, { type Result: Send; @@ -12,13 +12,20 @@ where where P: Producer + 'a, C: Consumer + 'a, - R: Reducer + Send; + R: Reducer + Send + 'a; fn exec_indexed(self, producer: P, consumer: C) -> Self::Result where P: IndexedProducer + 'a, C: Consumer + 'a, - R: Reducer + Send; + R: Reducer + Send + 'a; + + fn split(self) -> (Self, Self); + + fn join(left: Self::Result, right: Self::Result, reducer: R) -> Self::Result + where + R: Reducer + Send + 'a, + D: 'a; } pub struct ExecutorCallback { @@ -35,9 +42,9 @@ impl ExecutorCallback { impl<'a, E, D, C, I, R> ProducerCallback<'a, I> for ExecutorCallback where E: Executor<'a, D>, - D: Send, + D: Send + 'a, C: Consumer + 'a, - R: Reducer + Send, + R: Reducer + Send + 'a, { type Output = E::Result; @@ -52,9 +59,9 @@ where impl<'a, E, D, C, I, R> IndexedProducerCallback<'a, I> for ExecutorCallback where E: Executor<'a, D>, - D: Send, + D: Send + 'a, C: Consumer + 'a, - R: Reducer + Send, + R: Reducer + Send + 'a, { type Output = E::Result; diff --git a/asparit/src/core/from_iter.rs b/asparit/src/core/from_iter.rs index 1c9927f..f07b3b5 100644 --- a/asparit/src/core/from_iter.rs +++ b/asparit/src/core/from_iter.rs @@ -58,7 +58,8 @@ where fn from_par_iter<'a, E, X>(executor: E, iterator: X) -> E::Result where E: Executor<'a, Self>, - X: IntoParallelIterator<'a, Item = T>; + X: IntoParallelIterator<'a, Item = T>, + T: 'a; } impl FromParallelIterator<()> for () { diff --git a/asparit/src/core/into_iter.rs b/asparit/src/core/into_iter.rs index 5d10eb4..5194e52 100644 --- a/asparit/src/core/into_iter.rs +++ b/asparit/src/core/into_iter.rs @@ -13,7 +13,7 @@ pub trait IntoParallelIterator<'a> { type Iter: ParallelIterator<'a, Item = Self::Item>; /// The type of item that the parallel iterator will produce. - type Item: Send; + type Item: Send + 'a; /// Converts `self` into a parallel iterator. /// diff --git a/asparit/src/core/iterator.rs b/asparit/src/core/iterator.rs index ce495b0..84080d7 100644 --- a/asparit/src/core/iterator.rs +++ b/asparit/src/core/iterator.rs @@ -2,11 +2,13 @@ use std::cmp::{Ord, Ordering}; use std::iter::IntoIterator; use super::{ - Consumer, Executor, FromParallelIterator, IndexedProducerCallback, ProducerCallback, Reducer, + Consumer, Executor, FromParallelIterator, IndexedProducerCallback, IntoParallelIterator, + ProducerCallback, Reducer, }; use crate::{ inner::{ + chain::Chain, cloned::Cloned, collect::Collect, copied::Copied, @@ -53,7 +55,7 @@ pub trait ParallelIterator<'a>: Sized + Send { /// item that your closure will be invoked with. /// /// [`for_each`]: #method.for_each - type Item: Send; + type Item: Send + 'a; /// Internal method used to define the behavior of this parallel /// iterator. You should not need to call this directly. @@ -71,8 +73,8 @@ pub trait ParallelIterator<'a>: Sized + Send { where E: Executor<'a, D>, C: Consumer + 'a, - D: Send, - R: Reducer + Send; + D: Send + 'a, + R: Reducer + Send + 'a; /// Internal method used to define the behavior of this parallel /// iterator. You should not need to call this directly. @@ -1194,6 +1196,29 @@ pub trait ParallelIterator<'a>: Sized + Send { MaxBy::new(self, operation) } + /// Takes two iterators and creates a new iterator over both. + /// + /// # Examples + /// + /// ``` + /// use rayon::prelude::*; + /// + /// let a = [0, 1, 2]; + /// let b = [9, 8, 7]; + /// + /// let par_iter = a.par_iter().chain(b.par_iter()); + /// + /// let chained: Vec<_> = par_iter.cloned().collect(); + /// + /// assert_eq!(&chained[..], &[0, 1, 2, 9, 8, 7]); + /// ``` + fn chain(self, chain: C) -> Chain + where + C: IntoParallelIterator<'a, Item = Self::Item>, + { + Chain::new(self, chain.into_par_iter()) + } + /// Creates a fresh collection containing all the elements produced /// by this parallel iterator. /// @@ -1250,8 +1275,8 @@ pub trait IndexedParallelIterator<'a>: ParallelIterator<'a> { where E: Executor<'a, D>, C: Consumer + 'a, - D: Send, - R: Reducer + Send; + D: Send + 'a, + R: Reducer + Send + 'a; /// Internal method used to define the behavior of this parallel /// iterator. You should not need to call this directly. diff --git a/asparit/src/executor/sequential.rs b/asparit/src/executor/sequential.rs index c592113..5d52000 100644 --- a/asparit/src/executor/sequential.rs +++ b/asparit/src/executor/sequential.rs @@ -5,7 +5,7 @@ pub struct Sequential; impl<'a, D> Executor<'a, D> for Sequential where - D: Send, + D: Send + 'a, { type Result = D; @@ -34,4 +34,15 @@ where producer.fold_with(consumer.into_folder()).complete() } } + + fn split(self) -> (Self, Self) { + (Self, Self) + } + + fn join(left: D, right: D, reducer: R) -> Self::Result + where + R: Reducer + Send, + { + reducer.reduce(left, right) + } } diff --git a/asparit/src/executor/tokio.rs b/asparit/src/executor/tokio.rs index 29f0875..d83f65c 100644 --- a/asparit/src/executor/tokio.rs +++ b/asparit/src/executor/tokio.rs @@ -29,7 +29,7 @@ impl Default for Tokio { impl<'a, D> Executor<'a, D> for Tokio where - D: Send, + D: Send + 'a, { type Result = BoxFuture<'a, D>; @@ -37,7 +37,7 @@ where where P: Producer + 'a, C: Consumer + 'a, - R: Reducer + Send, + R: Reducer + Send + 'a, { let splits = producer.splits().unwrap_or(self.splits); let splitter = Splitter::new(splits); @@ -49,7 +49,7 @@ where where P: IndexedProducer + 'a, C: Consumer + 'a, - R: Reducer + Send, + R: Reducer + Send + 'a, { let splits = producer.splits().unwrap_or(self.splits); let splitter = IndexedSplitter::new( @@ -61,6 +61,31 @@ where exec_indexed(splitter, producer, consumer) } + + fn split(self) -> (Self, Self) { + let mut left = self; + let right = Self { + splits: left.splits / 2, + }; + + left.splits -= right.splits; + + (left, right) + } + + fn join(left: Self::Result, right: Self::Result, reducer: R) -> Self::Result + where + R: Reducer + Send + 'a, + D: 'a, + { + async move { + let left = left.await; + let right = right.await; + + reducer.reduce(left, right) + } + .boxed() + } } fn exec<'a, P, C>(mut splitter: Splitter, producer: P, consumer: C) -> BoxFuture<'a, C::Result> diff --git a/asparit/src/inner/chain.rs b/asparit/src/inner/chain.rs new file mode 100644 index 0000000..7f259f6 --- /dev/null +++ b/asparit/src/inner/chain.rs @@ -0,0 +1,530 @@ +use std::cmp::{max, min}; +use std::iter::{DoubleEndedIterator, ExactSizeIterator, Iterator}; + +use crate::{ + Consumer, Executor, Folder, IndexedParallelIterator, IndexedProducer, IndexedProducerCallback, + ParallelIterator, Producer, ProducerCallback, Reducer, +}; + +/* Chain */ + +pub struct Chain { + iterator_1: X1, + iterator_2: X2, +} + +impl Chain { + pub fn new(iterator_1: X1, iterator_2: X2) -> Self { + Self { + iterator_1, + iterator_2, + } + } +} + +impl<'a, X1, X2, T> ParallelIterator<'a> for Chain +where + X1: ParallelIterator<'a, Item = T>, + X2: ParallelIterator<'a, Item = T>, + T: Send + 'a, +{ + type Item = T; + + fn drive(self, executor: E, consumer: C) -> E::Result + where + E: Executor<'a, D>, + C: Consumer + 'a, + D: Send + 'a, + R: Reducer + Send + 'a, + { + let left = self.iterator_1; + let right = self.iterator_2; + + let (left_executor, right_executor) = executor.split(); + + let (left_consumer, right_consumer, reducer) = match left.len_hint_opt() { + Some(len) => consumer.split_at(len), + None => consumer.split(), + }; + + let left = left.drive(left_executor, left_consumer); + let right = right.drive(right_executor, right_consumer); + + E::join(left, right, reducer) + } + + fn with_producer(self, base: CB) -> CB::Output + where + CB: ProducerCallback<'a, Self::Item>, + { + self.iterator_1.with_producer(ChainCallback1 { + base, + iterator_2: self.iterator_2, + }) + } + + fn len_hint_opt(&self) -> Option { + let len_1 = self.iterator_1.len_hint_opt(); + let len_2 = self.iterator_2.len_hint_opt(); + + match (len_1, len_2) { + (Some(x1), Some(x2)) => Some(x1 + x2), + (_, _) => None, + } + } +} + +impl<'a, X1, X2, T> IndexedParallelIterator<'a> for Chain +where + X1: IndexedParallelIterator<'a, Item = T>, + X2: IndexedParallelIterator<'a, Item = T>, + T: Send + 'a, +{ + fn drive_indexed(self, executor: E, consumer: C) -> E::Result + where + E: Executor<'a, D>, + C: Consumer + 'a, + D: Send + 'a, + R: Reducer + Send + 'a, + { + let left = self.iterator_1; + let right = self.iterator_2; + + let (left_executor, right_executor) = executor.split(); + let (left_consumer, right_consumer, reducer) = consumer.split_at(left.len_hint()); + + let left = left.drive(left_executor, left_consumer); + let right = right.drive(right_executor, right_consumer); + + E::join(left, right, reducer) + } + + fn with_producer_indexed(self, base: CB) -> CB::Output + where + CB: IndexedProducerCallback<'a, Self::Item>, + { + self.iterator_1.with_producer_indexed(ChainCallback1 { + base, + iterator_2: self.iterator_2, + }) + } + + fn len_hint(&self) -> usize { + self.iterator_1.len_hint() + self.iterator_2.len_hint() + } +} + +/* ChainCallback1 */ + +struct ChainCallback1 { + base: CB, + iterator_2: X2, +} + +impl<'a, CB, X2, T> ProducerCallback<'a, T> for ChainCallback1 +where + CB: ProducerCallback<'a, T>, + X2: ParallelIterator<'a, Item = T>, + T: Send + 'a, +{ + type Output = CB::Output; + + fn callback

(self, producer_1: P) -> Self::Output + where + P: Producer + 'a, + { + let base = self.base; + + self.iterator_2 + .with_producer(ChainCallback2 { base, producer_1 }) + } +} + +impl<'a, CB, X2, T> IndexedProducerCallback<'a, T> for ChainCallback1 +where + CB: IndexedProducerCallback<'a, T>, + X2: IndexedParallelIterator<'a, Item = T>, + T: Send + 'a, +{ + type Output = CB::Output; + + fn callback

(self, producer_1: P) -> Self::Output + where + P: IndexedProducer + 'a, + { + let base = self.base; + + self.iterator_2 + .with_producer_indexed(ChainCallback2 { base, producer_1 }) + } +} + +/* ChainCallback2 */ + +struct ChainCallback2 { + base: CB, + producer_1: P1, +} + +impl<'a, CB, P1, T> ProducerCallback<'a, T> for ChainCallback2 +where + CB: ProducerCallback<'a, T>, + P1: Producer + 'a, + T: Send + 'a, +{ + type Output = CB::Output; + + fn callback

(self, producer_2: P) -> Self::Output + where + P: Producer + 'a, + { + let producer_1 = self.producer_1; + + self.base.callback(ChainProducer { + producer_1: Some(producer_1), + producer_2: Some(producer_2), + }) + } +} + +impl<'a, CB, P1, T> IndexedProducerCallback<'a, T> for ChainCallback2 +where + CB: IndexedProducerCallback<'a, T>, + P1: IndexedProducer + 'a, + T: Send + 'a, +{ + type Output = CB::Output; + + fn callback

(self, producer_2: P) -> Self::Output + where + P: IndexedProducer + 'a, + { + let producer_1 = self.producer_1; + + self.base.callback(ChainProducer { + producer_1: Some(producer_1), + producer_2: Some(producer_2), + }) + } +} + +/* ChainProducer */ + +struct ChainProducer { + producer_1: Option, + producer_2: Option, +} + +impl<'a, P1, P2, T> Producer for ChainProducer +where + P1: Producer, + P2: Producer, +{ + type Item = T; + type IntoIter = ChainIter; + + fn into_iter(self) -> Self::IntoIter { + ChainIter::new( + self.producer_1.map(Producer::into_iter), + self.producer_2.map(Producer::into_iter), + ) + } + + fn split(self) -> (Self, Option) { + match (self.producer_1, self.producer_2) { + (Some(p1), Some(p2)) => { + let left = ChainProducer { + producer_1: Some(p1), + producer_2: None, + }; + let right = Some(ChainProducer { + producer_1: None, + producer_2: Some(p2), + }); + + (left, right) + } + (Some(p1), None) => { + let (left, right) = p1.split(); + + let left = ChainProducer { + producer_1: Some(left), + producer_2: None, + }; + let right = right.map(|right| ChainProducer { + producer_1: Some(right), + producer_2: None, + }); + + (left, right) + } + (None, Some(p2)) => { + let (left, right) = p2.split(); + + let left = ChainProducer { + producer_1: None, + producer_2: Some(left), + }; + let right = right.map(|right| ChainProducer { + producer_1: None, + producer_2: Some(right), + }); + + (left, right) + } + (None, None) => unreachable!(), + } + } + + fn splits(&self) -> Option { + let splits_1 = self.producer_1.as_ref().and_then(|p| p.splits()); + let splits_2 = self.producer_2.as_ref().and_then(|p| p.splits()); + + match (splits_1, splits_2) { + (Some(splits), _) => Some(splits), + (None, Some(splits)) => Some(splits), + (None, None) => None, + } + } + + fn fold_with(self, folder: F) -> F + where + F: Folder, + { + let folder = match self.producer_1 { + Some(p) => p.fold_with(folder), + None => folder, + }; + + if folder.is_full() { + return folder; + } + + match self.producer_2 { + Some(p) => p.fold_with(folder), + None => folder, + } + } +} + +impl<'a, P1, P2, T> IndexedProducer for ChainProducer +where + P1: IndexedProducer, + P2: IndexedProducer, +{ + type Item = T; + type IntoIter = ChainIter; + + fn into_iter(self) -> Self::IntoIter { + ChainIter::new( + self.producer_1.map(IndexedProducer::into_iter), + self.producer_2.map(IndexedProducer::into_iter), + ) + } + + fn len(&self) -> usize { + let mut len = 0; + + if let Some(p) = &self.producer_1 { + len += p.len(); + } + + if let Some(p) = &self.producer_2 { + len += p.len(); + } + + len + } + + fn split_at(self, mut index: usize) -> (Self, Self) { + match (self.producer_1, self.producer_2) { + (Some(p1), Some(p2)) if index < p1.len() => { + let (left, right) = p1.split_at(index); + + let left = ChainProducer { + producer_1: Some(left), + producer_2: None, + }; + let right = ChainProducer { + producer_1: Some(right), + producer_2: Some(p2), + }; + + (left, right) + } + (Some(p1), Some(p2)) if index > p1.len() => { + index -= p1.len(); + + let (left, right) = p2.split_at(index); + + let left = ChainProducer { + producer_1: Some(p1), + producer_2: Some(left), + }; + let right = ChainProducer { + producer_1: None, + producer_2: Some(right), + }; + + (left, right) + } + (Some(p1), Some(p2)) => { + let left = ChainProducer { + producer_1: Some(p1), + producer_2: None, + }; + let right = ChainProducer { + producer_1: None, + producer_2: Some(p2), + }; + + (left, right) + } + (Some(p1), None) => { + let (left, right) = p1.split_at(index); + + let left = ChainProducer { + producer_1: Some(left), + producer_2: None, + }; + let right = ChainProducer { + producer_1: Some(right), + producer_2: None, + }; + + (left, right) + } + (None, Some(p2)) => { + let (left, right) = p2.split_at(index); + + let left = ChainProducer { + producer_1: None, + producer_2: Some(left), + }; + let right = ChainProducer { + producer_1: None, + producer_2: Some(right), + }; + + (left, right) + } + (None, None) => unreachable!(), + } + } + + fn splits(&self) -> Option { + let splits_1 = self.producer_1.as_ref().and_then(|p| p.splits()); + let splits_2 = self.producer_2.as_ref().and_then(|p| p.splits()); + + match (splits_1, splits_2) { + (Some(splits), _) => Some(splits), + (None, Some(splits)) => Some(splits), + (None, None) => None, + } + } + + fn min_len(&self) -> Option { + let min_1 = self.producer_1.as_ref().and_then(|p| p.min_len()); + let min_2 = self.producer_2.as_ref().and_then(|p| p.min_len()); + + min(min_1, min_2) + } + + fn max_len(&self) -> Option { + let max_1 = self.producer_1.as_ref().and_then(|p| p.max_len()); + let max_2 = self.producer_2.as_ref().and_then(|p| p.max_len()); + + max(max_1, max_2) + } + + fn fold_with(self, folder: F) -> F + where + F: Folder, + { + let folder = match self.producer_1 { + Some(p) => p.fold_with(folder), + None => folder, + }; + + if folder.is_full() { + return folder; + } + + match self.producer_2 { + Some(p) => p.fold_with(folder), + None => folder, + } + } +} + +/* ChainIter */ + +enum ChainIter { + Empty, + Iter1(I1), + Iter2(I2), + Chain(std::iter::Chain), +} + +impl ChainIter +where + I1: Iterator, + I2: Iterator, +{ + fn new(i1: Option, i2: Option) -> Self { + match (i1, i2) { + (Some(i1), Some(i2)) => Self::Chain(i1.chain(i2)), + (Some(i1), None) => Self::Iter1(i1), + (None, Some(i2)) => Self::Iter2(i2), + (None, None) => Self::Empty, + } + } +} + +impl Iterator for ChainIter +where + I1: Iterator, + I2: Iterator, +{ + type Item = T; + + fn next(&mut self) -> Option { + match self { + Self::Empty => None, + Self::Iter1(i) => i.next(), + Self::Iter2(i) => i.next(), + Self::Chain(i) => i.next(), + } + } + + fn size_hint(&self) -> (usize, Option) { + match self { + Self::Empty => (0, Some(0)), + Self::Iter1(i) => i.size_hint(), + Self::Iter2(i) => i.size_hint(), + Self::Chain(i) => i.size_hint(), + } + } +} + +impl DoubleEndedIterator for ChainIter +where + I1: DoubleEndedIterator, + I2: DoubleEndedIterator, +{ + fn next_back(&mut self) -> Option { + match self { + Self::Empty => None, + Self::Iter1(i) => i.next_back(), + Self::Iter2(i) => i.next_back(), + Self::Chain(i) => i.next_back(), + } + } +} + +impl ExactSizeIterator for ChainIter +where + I1: ExactSizeIterator, + I2: ExactSizeIterator, +{ +} diff --git a/asparit/src/inner/cloned.rs b/asparit/src/inner/cloned.rs index 61e9de6..d561ba9 100644 --- a/asparit/src/inner/cloned.rs +++ b/asparit/src/inner/cloned.rs @@ -26,8 +26,8 @@ where where E: Executor<'a, D>, C: Consumer + 'a, - D: Send, - R: Reducer + Send, + D: Send + 'a, + R: Reducer + Send + 'a, { self.base.drive(executor, ClonedConsumer { base: consumer }) } @@ -53,8 +53,8 @@ where where E: Executor<'a, D>, C: Consumer + 'a, - D: Send, - R: Reducer + Send, + D: Send + 'a, + R: Reducer + Send + 'a, { self.base .drive_indexed(executor, ClonedConsumer { base: consumer }) diff --git a/asparit/src/inner/collect.rs b/asparit/src/inner/collect.rs index 8b34980..a732576 100644 --- a/asparit/src/inner/collect.rs +++ b/asparit/src/inner/collect.rs @@ -22,7 +22,7 @@ impl Collect { impl<'a, X, T> Driver<'a, T> for Collect where X: ParallelIterator<'a>, - T: FromParallelIterator + Send, + T: FromParallelIterator + Send + 'a, { fn exec_with(self, executor: E) -> E::Result where diff --git a/asparit/src/inner/copied.rs b/asparit/src/inner/copied.rs index c5508f6..75b83c4 100644 --- a/asparit/src/inner/copied.rs +++ b/asparit/src/inner/copied.rs @@ -26,8 +26,8 @@ where where E: Executor<'a, D>, C: Consumer + 'a, - D: Send, - R: Reducer + Send, + D: Send + 'a, + R: Reducer + Send + 'a, { self.base.drive(executor, CopiedConsumer { base: consumer }) } @@ -53,8 +53,8 @@ where where E: Executor<'a, D>, C: Consumer + 'a, - D: Send, - R: Reducer + Send, + D: Send + 'a, + R: Reducer + Send + 'a, { self.base .drive_indexed(executor, CopiedConsumer { base: consumer }) diff --git a/asparit/src/inner/filter.rs b/asparit/src/inner/filter.rs index 021977c..a895094 100644 --- a/asparit/src/inner/filter.rs +++ b/asparit/src/inner/filter.rs @@ -24,8 +24,8 @@ where where E: Executor<'a, D>, C: Consumer + 'a, - D: Send, - R: Reducer + Send, + D: Send + 'a, + R: Reducer + Send + 'a, { self.base.drive( executor, diff --git a/asparit/src/inner/filter_map.rs b/asparit/src/inner/filter_map.rs index 7f65648..0d131d5 100644 --- a/asparit/src/inner/filter_map.rs +++ b/asparit/src/inner/filter_map.rs @@ -17,7 +17,7 @@ impl<'a, X, O, S> ParallelIterator<'a> for FilterMap where X: ParallelIterator<'a>, O: Fn(X::Item) -> Option + Clone + Send + 'a, - S: Send, + S: Send + 'a, { type Item = S; @@ -25,8 +25,8 @@ where where E: Executor<'a, D>, C: Consumer + 'a, - D: Send, - R: Reducer + Send, + D: Send + 'a, + R: Reducer + Send + 'a, { self.base.drive( executor, diff --git a/asparit/src/inner/flatten.rs b/asparit/src/inner/flatten.rs index 8f153b9..165c2a6 100644 --- a/asparit/src/inner/flatten.rs +++ b/asparit/src/inner/flatten.rs @@ -17,8 +17,8 @@ impl FlattenIter { impl<'a, X, SI> ParallelIterator<'a> for FlattenIter where X: ParallelIterator<'a, Item = SI>, - SI: IntoIterator + Send, - SI::Item: Send, + SI: IntoIterator + Send + 'a, + SI::Item: Send + 'a, { type Item = SI::Item; @@ -26,8 +26,8 @@ where where E: Executor<'a, D>, C: Consumer + 'a, - D: Send, - R: Reducer + Send, + D: Send + 'a, + R: Reducer + Send + 'a, { self.base.drive( executor, @@ -66,8 +66,8 @@ impl<'a, X, O, SI> ParallelIterator<'a> for FlatMapIter where X: ParallelIterator<'a>, O: Fn(X::Item) -> SI + Clone + Send + 'a, - SI: IntoIterator, - SI::Item: Send, + SI: IntoIterator + 'a, + SI::Item: Send + 'a, { type Item = SI::Item; @@ -75,8 +75,8 @@ where where E: Executor<'a, D>, C: Consumer + 'a, - D: Send, - R: Reducer + Send, + D: Send + 'a, + R: Reducer + Send + 'a, { self.base.drive( executor, diff --git a/asparit/src/inner/fold.rs b/asparit/src/inner/fold.rs index 2ecd921..399f743 100644 --- a/asparit/src/inner/fold.rs +++ b/asparit/src/inner/fold.rs @@ -23,7 +23,7 @@ where X: ParallelIterator<'a>, S: Fn() -> U + Clone + Send + 'a, O: Fn(U, X::Item) -> U + Clone + Send + 'a, - U: Send, + U: Send + 'a, { type Item = U; @@ -31,8 +31,8 @@ where where E: Executor<'a, D>, C: Consumer + 'a, - D: Send, - R: Reducer + Send, + D: Send + 'a, + R: Reducer + Send + 'a, { self.base.drive( executor, @@ -86,8 +86,8 @@ where where E: Executor<'a, D>, C: Consumer + 'a, - D: Send, - R: Reducer + Send, + D: Send + 'a, + R: Reducer + Send + 'a, { let FoldWith { base, diff --git a/asparit/src/inner/inspect.rs b/asparit/src/inner/inspect.rs index 2b3f0a5..d908788 100644 --- a/asparit/src/inner/inspect.rs +++ b/asparit/src/inner/inspect.rs @@ -27,8 +27,8 @@ where where E: Executor<'a, D>, C: Consumer + 'a, - D: Send, - R: Reducer + Send, + D: Send + 'a, + R: Reducer + Send + 'a, { self.base.drive( executor, @@ -63,8 +63,8 @@ where where E: Executor<'a, D>, C: Consumer + 'a, - D: Send, - R: Reducer + Send, + D: Send + 'a, + R: Reducer + Send + 'a, { self.base.drive_indexed( executor, diff --git a/asparit/src/inner/map.rs b/asparit/src/inner/map.rs index 0cb74b5..4d2a50c 100644 --- a/asparit/src/inner/map.rs +++ b/asparit/src/inner/map.rs @@ -20,7 +20,7 @@ impl<'a, X, O, T> ParallelIterator<'a> for Map where X: ParallelIterator<'a>, O: Fn(X::Item) -> T + Clone + Sync + Send + 'a, - T: Send, + T: Send + 'a, { type Item = O::Output; @@ -28,8 +28,8 @@ where where E: Executor<'a, D>, C: Consumer + 'a, - D: Send, - R: Reducer + Send, + D: Send + 'a, + R: Reducer + Send + 'a, { let consumer = MapConsumer::new(consumer, self.operation); @@ -55,14 +55,14 @@ impl<'a, X, O, T> IndexedParallelIterator<'a> for Map where X: IndexedParallelIterator<'a>, O: Fn(X::Item) -> T + Clone + Sync + Send + 'a, - T: Send, + T: Send + 'a, { fn drive_indexed(self, executor: E, consumer: C) -> E::Result where E: Executor<'a, D>, C: Consumer + 'a, - D: Send, - R: Reducer + Send, + D: Send + 'a, + R: Reducer + Send + 'a, { let consumer = MapConsumer::new(consumer, self.operation); diff --git a/asparit/src/inner/map_init.rs b/asparit/src/inner/map_init.rs index fef690f..3503142 100644 --- a/asparit/src/inner/map_init.rs +++ b/asparit/src/inner/map_init.rs @@ -27,7 +27,7 @@ impl<'a, X, O, T, S, U> ParallelIterator<'a> for MapInit where X: ParallelIterator<'a>, O: Fn(&mut U, X::Item) -> T + Clone + Sync + Send + 'a, - T: Send, + T: Send + 'a, S: Fn() -> U + Clone + Send + 'a, { type Item = T; @@ -36,8 +36,8 @@ where where E: Executor<'a, D>, C: Consumer + 'a, - D: Send, - R: Reducer + Send, + D: Send + 'a, + R: Reducer + Send + 'a, { let consumer = MapInitConsumer::new(consumer, self.init, self.operation); @@ -64,15 +64,15 @@ impl<'a, X, O, T, S, U> IndexedParallelIterator<'a> for MapInit where X: IndexedParallelIterator<'a>, O: Fn(&mut U, X::Item) -> T + Clone + Sync + Send + 'a, - T: Send, + T: Send + 'a, S: Fn() -> U + Clone + Send + 'a, { fn drive_indexed(self, executor: E, consumer: C) -> E::Result where E: Executor<'a, D>, C: Consumer + 'a, - D: Send, - R: Reducer + Send, + D: Send + 'a, + R: Reducer + Send + 'a, { let consumer = MapInitConsumer::new(consumer, self.init, self.operation); diff --git a/asparit/src/inner/map_with.rs b/asparit/src/inner/map_with.rs index 17d32f9..13d582e 100644 --- a/asparit/src/inner/map_with.rs +++ b/asparit/src/inner/map_with.rs @@ -25,7 +25,7 @@ impl<'a, X, O, T, S> ParallelIterator<'a> for MapWith where X: ParallelIterator<'a>, O: Fn(&mut S, X::Item) -> T + Clone + Sync + Send + 'a, - T: Send, + T: Send + 'a, S: Clone + Send + 'a, { type Item = T; @@ -34,8 +34,8 @@ where where E: Executor<'a, D>, C: Consumer + 'a, - D: Send, - R: Reducer + Send, + D: Send + 'a, + R: Reducer + Send + 'a, { let consumer = MapWithConsumer::new(consumer, self.item, self.operation); @@ -62,15 +62,15 @@ impl<'a, X, O, T, S> IndexedParallelIterator<'a> for MapWith where X: IndexedParallelIterator<'a>, O: Fn(&mut S, X::Item) -> T + Clone + Sync + Send + 'a, - T: Send, + T: Send + 'a, S: Clone + Send + 'a, { fn drive_indexed(self, executor: E, consumer: C) -> E::Result where E: Executor<'a, D>, C: Consumer + 'a, - D: Send, - R: Reducer + Send, + D: Send + 'a, + R: Reducer + Send + 'a, { let consumer = MapWithConsumer::new(consumer, self.item, self.operation); diff --git a/asparit/src/inner/mod.rs b/asparit/src/inner/mod.rs index f671984..d3f75ba 100644 --- a/asparit/src/inner/mod.rs +++ b/asparit/src/inner/mod.rs @@ -1,3 +1,4 @@ +pub mod chain; pub mod cloned; pub mod collect; pub mod copied; @@ -34,15 +35,21 @@ mod tests { let i = Arc::new(AtomicUsize::new(0)); let j = Arc::new(AtomicUsize::new(0)); - let x = vec![ + let a = vec![ vec![1usize, 2usize], vec![3usize, 4usize], vec![5usize, 6usize], ]; + let b = vec![ + vec![7usize, 8usize], + vec![9usize, 10usize], + vec![11usize, 12usize], + ]; - let x = x + let x = a .par_iter() .cloned() + .chain(b) .update(|x| x.push(0)) .flatten_iter() .map_init( diff --git a/asparit/src/inner/try_fold.rs b/asparit/src/inner/try_fold.rs index c211876..c510224 100644 --- a/asparit/src/inner/try_fold.rs +++ b/asparit/src/inner/try_fold.rs @@ -38,8 +38,8 @@ where where E: Executor<'a, D>, C: Consumer + 'a, - D: Send, - R: Reducer + Send, + D: Send + 'a, + R: Reducer + Send + 'a, { self.base.drive( executor, @@ -98,8 +98,8 @@ where where E: Executor<'a, D>, C: Consumer + 'a, - D: Send, - R: Reducer + Send, + D: Send + 'a, + R: Reducer + Send + 'a, { let TryFoldWith { base, diff --git a/asparit/src/inner/try_reduce.rs b/asparit/src/inner/try_reduce.rs index d1b00f1..fe8f6cf 100644 --- a/asparit/src/inner/try_reduce.rs +++ b/asparit/src/inner/try_reduce.rs @@ -28,7 +28,7 @@ where X: ParallelIterator<'a, Item = T>, S: Fn() -> T::Ok + Clone + Send + 'a, O: Fn(T::Ok, T::Ok) -> T + Clone + Send + 'a, - T: Try + Send, + T: Try + Send + 'a, { fn exec_with(self, executor: E) -> E::Result where @@ -68,7 +68,7 @@ impl<'a, X, O, T> Driver<'a, Option> for TryReduceWith where X: ParallelIterator<'a, Item = T>, O: Fn(T::Ok, T::Ok) -> T + Clone + Send + 'a, - T: Try + Send, + T: Try + Send + 'a, { fn exec_with(self, executor: E) -> E::Result where diff --git a/asparit/src/inner/update.rs b/asparit/src/inner/update.rs index 315d02a..4511721 100644 --- a/asparit/src/inner/update.rs +++ b/asparit/src/inner/update.rs @@ -27,8 +27,8 @@ where where E: Executor<'a, D>, C: Consumer + 'a, - D: Send, - R: Reducer + Send, + D: Send + 'a, + R: Reducer + Send + 'a, { self.base.drive( executor, @@ -63,8 +63,8 @@ where where E: Executor<'a, D>, C: Consumer + 'a, - D: Send, - R: Reducer + Send, + D: Send + 'a, + R: Reducer + Send + 'a, { self.base.drive_indexed( executor, diff --git a/asparit/src/std/range.rs b/asparit/src/std/range.rs index 95890f3..715c352 100644 --- a/asparit/src/std/range.rs +++ b/asparit/src/std/range.rs @@ -91,8 +91,8 @@ macro_rules! unindexed_parallel_iterator_impl { where E: Executor<'a, D>, C: Consumer + 'a, - D: Send, - R: Reducer + Send, + D: Send + 'a, + R: Reducer + Send + 'a, { self.with_producer(ExecutorCallback::new(executor, consumer)) } @@ -144,8 +144,8 @@ macro_rules! indexed_parallel_iterator_impl { where E: Executor<'a, D>, C: Consumer + 'a, - D: Send, - R: Reducer + Send, + D: Send + 'a, + R: Reducer + Send + 'a, { self.with_producer_indexed(ExecutorCallback::new(executor, consumer)) } diff --git a/asparit/src/std/slice.rs b/asparit/src/std/slice.rs index b995395..4256503 100644 --- a/asparit/src/std/slice.rs +++ b/asparit/src/std/slice.rs @@ -68,8 +68,8 @@ where where E: Executor<'a, D>, C: Consumer + 'a, - D: Send, - R: Reducer + Send, + D: Send + 'a, + R: Reducer + Send + 'a, { self.with_producer_indexed(ExecutorCallback::new(executor, consumer)) } @@ -94,8 +94,8 @@ where where E: Executor<'a, D>, C: Consumer + 'a, - D: Send, - R: Reducer + Send, + D: Send + 'a, + R: Reducer + Send + 'a, { self.with_producer_indexed(ExecutorCallback::new(executor, consumer)) } @@ -182,8 +182,8 @@ where where E: Executor<'a, D>, C: Consumer + 'a, - D: Send, - R: Reducer + Send, + D: Send + 'a, + R: Reducer + Send + 'a, { self.with_producer_indexed(ExecutorCallback::new(executor, consumer)) } @@ -208,8 +208,8 @@ where where E: Executor<'a, D>, C: Consumer + 'a, - D: Send, - R: Reducer + Send, + D: Send + 'a, + R: Reducer + Send + 'a, { self.with_producer_indexed(ExecutorCallback::new(executor, consumer)) } diff --git a/asparit/src/std/vec.rs b/asparit/src/std/vec.rs index cb2f4c6..e1ee95e 100644 --- a/asparit/src/std/vec.rs +++ b/asparit/src/std/vec.rs @@ -39,8 +39,8 @@ where where E: Executor<'a, D>, C: Consumer + 'a, - D: Send, - R: Reducer + Send, + D: Send + 'a, + R: Reducer + Send + 'a, { self.with_producer_indexed(ExecutorCallback::new(executor, consumer)) } @@ -65,8 +65,8 @@ where where E: Executor<'a, D>, C: Consumer + 'a, - D: Send, - R: Reducer + Send, + D: Send + 'a, + R: Reducer + Send + 'a, { self.with_producer_indexed(ExecutorCallback::new(executor, consumer)) } @@ -279,8 +279,8 @@ where where E: Executor<'a, D>, C: Consumer + 'a, - D: Send, - R: Reducer + Send, + D: Send + 'a, + R: Reducer + Send + 'a, { self.with_producer_indexed(ExecutorCallback::new(executor, consumer)) } @@ -305,8 +305,8 @@ where where E: Executor<'a, D>, C: Consumer + 'a, - D: Send, - R: Reducer + Send, + D: Send + 'a, + R: Reducer + Send + 'a, { self.with_producer_indexed(ExecutorCallback::new(executor, consumer)) }