| @@ -36,10 +36,13 @@ use super::{Executor, IntoParallelIterator}; | |||||
| /// let bh: BlackHole = (0i32..1000).into_par_iter().collect(); | /// let bh: BlackHole = (0i32..1000).into_par_iter().collect(); | ||||
| /// assert_eq!(bh.mass, 4000); | /// assert_eq!(bh.mass, 4000); | ||||
| /// ``` | /// ``` | ||||
| pub trait FromParallelIterator<'a, T>: Send + Sized | |||||
| pub trait FromParallelIterator<'a, I>: Send + Sized | |||||
| where | where | ||||
| T: Send + 'a, | |||||
| I: Send + 'a, | |||||
| { | { | ||||
| type ExecutorItem2: Send + 'a; | |||||
| type ExecutorItem3: Send + 'a; | |||||
| /// Creates an instance of the collection from the parallel iterator `iterator`. | /// Creates an instance of the collection from the parallel iterator `iterator`. | ||||
| /// | /// | ||||
| /// If your collection is not naturally parallel, the easiest (and | /// If your collection is not naturally parallel, the easiest (and | ||||
| @@ -57,12 +60,14 @@ where | |||||
| /// [`iterator.for_each`]: trait.ParallelIterator.html#method.for_each | /// [`iterator.for_each`]: trait.ParallelIterator.html#method.for_each | ||||
| fn from_par_iter<E, X>(executor: E, iterator: X) -> E::Result | fn from_par_iter<E, X>(executor: E, iterator: X) -> E::Result | ||||
| where | where | ||||
| E: Executor<'a, Self>, | |||||
| X: IntoParallelIterator<'a, Item = T>, | |||||
| T: 'a; | |||||
| E: Executor<'a, Self, Self::ExecutorItem2, Self::ExecutorItem3>, | |||||
| X: IntoParallelIterator<'a, Item = I>; | |||||
| } | } | ||||
| impl<'a> FromParallelIterator<'a, ()> for () { | impl<'a> FromParallelIterator<'a, ()> for () { | ||||
| type ExecutorItem2 = (); | |||||
| type ExecutorItem3 = (); | |||||
| fn from_par_iter<E, X>(executor: E, iterator: X) -> E::Result | fn from_par_iter<E, X>(executor: E, iterator: X) -> E::Result | ||||
| where | where | ||||
| E: Executor<'a, Self>, | E: Executor<'a, Self>, | ||||
| @@ -34,6 +34,7 @@ use crate::{ | |||||
| position::Position, | position::Position, | ||||
| product::Product, | product::Product, | ||||
| reduce::{Reduce, ReduceWith}, | reduce::{Reduce, ReduceWith}, | ||||
| rev::Rev, | |||||
| skip::Skip, | skip::Skip, | ||||
| splits::Splits, | splits::Splits, | ||||
| step_by::StepBy, | step_by::StepBy, | ||||
| @@ -2205,4 +2206,23 @@ pub trait IndexedParallelIterator<'a>: ParallelIterator<'a> { | |||||
| { | { | ||||
| Position::new(self, operation, FindMatch::Last) | Position::new(self, operation, FindMatch::Last) | ||||
| } | } | ||||
| /// Produces a new iterator with the elements of this iterator in | |||||
| /// reverse order. | |||||
| /// | |||||
| /// # Examples | |||||
| /// | |||||
| /// ``` | |||||
| /// use rayon::prelude::*; | |||||
| /// | |||||
| /// let result: Vec<_> = (0..5) | |||||
| /// .into_par_iter() | |||||
| /// .rev() | |||||
| /// .collect(); | |||||
| /// | |||||
| /// assert_eq!(result, [4, 3, 2, 1, 0]); | |||||
| /// ``` | |||||
| fn rev(self) -> Rev<Self> { | |||||
| Rev::new(self) | |||||
| } | |||||
| } | } | ||||
| @@ -19,14 +19,14 @@ impl<X, T> Collect<X, T> { | |||||
| } | } | ||||
| } | } | ||||
| impl<'a, X, T> Driver<'a, T> for Collect<X, T> | |||||
| impl<'a, X, T> Driver<'a, T, T::ExecutorItem2, T::ExecutorItem3> for Collect<X, T> | |||||
| where | where | ||||
| X: ParallelIterator<'a>, | X: ParallelIterator<'a>, | ||||
| T: FromParallelIterator<'a, X::Item> + Send + 'a, | T: FromParallelIterator<'a, X::Item> + Send + 'a, | ||||
| { | { | ||||
| fn exec_with<E>(self, executor: E) -> E::Result | fn exec_with<E>(self, executor: E) -> E::Result | ||||
| where | where | ||||
| E: Executor<'a, T>, | |||||
| E: Executor<'a, T, T::ExecutorItem2, T::ExecutorItem3>, | |||||
| { | { | ||||
| T::from_par_iter(executor, self.iterator) | T::from_par_iter(executor, self.iterator) | ||||
| } | } | ||||
| @@ -26,6 +26,7 @@ pub mod partition; | |||||
| pub mod position; | pub mod position; | ||||
| pub mod product; | pub mod product; | ||||
| pub mod reduce; | pub mod reduce; | ||||
| pub mod rev; | |||||
| pub mod skip; | pub mod skip; | ||||
| pub mod splits; | pub mod splits; | ||||
| pub mod step_by; | pub mod step_by; | ||||
| @@ -45,9 +46,10 @@ mod tests { | |||||
| #[tokio::test(flavor = "multi_thread")] | #[tokio::test(flavor = "multi_thread")] | ||||
| async fn test_for_each() { | async fn test_for_each() { | ||||
| let x = vec![0usize, 1, 2, 3, 4, 5] | |||||
| let x: Vec<usize> = vec![0usize, 1, 2, 3, 4, 5] | |||||
| .into_par_iter() | .into_par_iter() | ||||
| .position_any(|x| x % 2 == 1) | |||||
| .rev() | |||||
| .collect() | |||||
| .exec() | .exec() | ||||
| .await; | .await; | ||||
| @@ -0,0 +1,151 @@ | |||||
| use crate::{ | |||||
| Consumer, Executor, ExecutorCallback, IndexedParallelIterator, IndexedProducer, | |||||
| IndexedProducerCallback, ParallelIterator, Producer, Reducer, Setup, WithIndexedProducer, | |||||
| WithSetup, | |||||
| }; | |||||
| pub struct Rev<X> { | |||||
| base: X, | |||||
| } | |||||
| impl<X> Rev<X> { | |||||
| pub fn new(base: X) -> Self { | |||||
| Self { base } | |||||
| } | |||||
| } | |||||
| impl<'a, X, I> ParallelIterator<'a> for Rev<X> | |||||
| where | |||||
| X: IndexedParallelIterator<'a, Item = I> + WithIndexedProducer<'a, Item = I>, | |||||
| I: Send + 'a, | |||||
| { | |||||
| type Item = I; | |||||
| fn drive<E, C, D, R>(self, executor: E, consumer: C) -> E::Result | |||||
| where | |||||
| E: Executor<'a, D>, | |||||
| C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | |||||
| D: Send + 'a, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | |||||
| self.with_indexed_producer(ExecutorCallback::new(executor, consumer)) | |||||
| } | |||||
| fn len_hint_opt(&self) -> Option<usize> { | |||||
| self.base.len_hint_opt() | |||||
| } | |||||
| } | |||||
| impl<'a, X, I> IndexedParallelIterator<'a> for Rev<X> | |||||
| where | |||||
| X: IndexedParallelIterator<'a, Item = I> + WithIndexedProducer<'a, Item = I>, | |||||
| I: Send + 'a, | |||||
| { | |||||
| fn drive_indexed<E, C, D, R>(self, executor: E, consumer: C) -> E::Result | |||||
| where | |||||
| E: Executor<'a, D>, | |||||
| C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | |||||
| D: Send + 'a, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | |||||
| self.with_indexed_producer(ExecutorCallback::new(executor, consumer)) | |||||
| } | |||||
| fn len_hint(&self) -> usize { | |||||
| self.base.len_hint() | |||||
| } | |||||
| } | |||||
| impl<'a, X> WithIndexedProducer<'a> for Rev<X> | |||||
| where | |||||
| X: WithIndexedProducer<'a>, | |||||
| { | |||||
| type Item = X::Item; | |||||
| fn with_indexed_producer<CB>(self, base: CB) -> CB::Output | |||||
| where | |||||
| CB: IndexedProducerCallback<'a, Self::Item>, | |||||
| { | |||||
| self.base.with_indexed_producer(RevCallback { base }) | |||||
| } | |||||
| } | |||||
| /* RevCallback */ | |||||
| struct RevCallback<CB> { | |||||
| base: CB, | |||||
| } | |||||
| impl<'a, CB, I> IndexedProducerCallback<'a, I> for RevCallback<CB> | |||||
| where | |||||
| CB: IndexedProducerCallback<'a, I>, | |||||
| { | |||||
| type Output = CB::Output; | |||||
| fn callback<P>(self, base: P) -> Self::Output | |||||
| where | |||||
| P: IndexedProducer<Item = I> + 'a, | |||||
| { | |||||
| self.base.callback(RevProducer { base }) | |||||
| } | |||||
| } | |||||
| /* RevProducer */ | |||||
| struct RevProducer<P> { | |||||
| base: P, | |||||
| } | |||||
| impl<P> WithSetup for RevProducer<P> | |||||
| where | |||||
| P: WithSetup, | |||||
| { | |||||
| fn setup(&self) -> Setup { | |||||
| self.base.setup() | |||||
| } | |||||
| } | |||||
| impl<P> Producer for RevProducer<P> | |||||
| where | |||||
| P: IndexedProducer, | |||||
| { | |||||
| type Item = P::Item; | |||||
| type IntoIter = std::iter::Rev<P::IntoIter>; | |||||
| fn into_iter(self) -> Self::IntoIter { | |||||
| self.base.into_iter().rev() | |||||
| } | |||||
| fn split(self) -> (Self, Option<Self>) { | |||||
| let len = self.base.len(); | |||||
| if len < 2 { | |||||
| return (self, None); | |||||
| } | |||||
| let (left, right) = self.split_at(len / 2); | |||||
| (left, Some(right)) | |||||
| } | |||||
| } | |||||
| impl<P> IndexedProducer for RevProducer<P> | |||||
| where | |||||
| P: IndexedProducer, | |||||
| { | |||||
| type Item = P::Item; | |||||
| type IntoIter = std::iter::Rev<P::IntoIter>; | |||||
| fn into_iter(self) -> Self::IntoIter { | |||||
| self.base.into_iter().rev() | |||||
| } | |||||
| fn len(&self) -> usize { | |||||
| self.base.len() | |||||
| } | |||||
| fn split_at(self, index: usize) -> (Self, Self) { | |||||
| let (left, right) = self.base.split_at(index); | |||||
| (Self { base: right }, Self { base: left }) | |||||
| } | |||||
| } | |||||
| @@ -5,10 +5,10 @@ mod misc; | |||||
| mod std; | mod std; | ||||
| pub use self::core::{ | pub use self::core::{ | ||||
| Consumer, Driver, Executor, ExecutorCallback, Folder, IndexedParallelIterator, IndexedProducer, | |||||
| IndexedProducerCallback, IntoParallelIterator, IntoParallelRefIterator, | |||||
| IntoParallelRefMutIterator, ParallelDrainFull, ParallelDrainRange, ParallelExtend, | |||||
| ParallelIterator, Producer, ProducerCallback, Reducer, Setup, WithIndexedProducer, | |||||
| WithProducer, WithSetup, | |||||
| Consumer, Driver, Executor, ExecutorCallback, Folder, FromParallelIterator, | |||||
| IndexedParallelIterator, IndexedProducer, IndexedProducerCallback, IntoParallelIterator, | |||||
| IntoParallelRefIterator, IntoParallelRefMutIterator, ParallelDrainFull, ParallelDrainRange, | |||||
| ParallelExtend, ParallelIterator, Producer, ProducerCallback, Reducer, Setup, | |||||
| WithIndexedProducer, WithProducer, WithSetup, | |||||
| }; | }; | ||||
| pub use self::executor::{DefaultExecutor, SequentialExecutor}; | pub use self::executor::{DefaultExecutor, SequentialExecutor}; | ||||
| @@ -7,10 +7,10 @@ use std::slice::{from_raw_parts_mut, IterMut}; | |||||
| use std::sync::Arc; | use std::sync::Arc; | ||||
| use crate::{ | use crate::{ | ||||
| misc::simplify_range, Consumer, Executor, ExecutorCallback, Folder, IndexedParallelIterator, | |||||
| IndexedProducer, IndexedProducerCallback, IntoParallelIterator, ParallelDrainRange, | |||||
| ParallelExtend, ParallelIterator, Producer, ProducerCallback, Reducer, WithIndexedProducer, | |||||
| WithProducer, WithSetup, | |||||
| misc::simplify_range, Consumer, Executor, ExecutorCallback, Folder, FromParallelIterator, | |||||
| IndexedParallelIterator, IndexedProducer, IndexedProducerCallback, IntoParallelIterator, | |||||
| ParallelDrainRange, ParallelExtend, ParallelIterator, Producer, ProducerCallback, Reducer, | |||||
| WithIndexedProducer, WithProducer, WithSetup, | |||||
| }; | }; | ||||
| /// Parallel iterator that moves out of a vector. | /// Parallel iterator that moves out of a vector. | ||||
| @@ -31,6 +31,69 @@ where | |||||
| } | } | ||||
| } | } | ||||
| impl<'a, I> ParallelExtend<'a, I, VecExtendResult<I>> for Vec<I> | |||||
| where | |||||
| I: Send + 'a, | |||||
| { | |||||
| type Consumer = VecConsumer<I>; | |||||
| fn into_consumer(self) -> Self::Consumer { | |||||
| VecConsumer { vec: Some(self) } | |||||
| } | |||||
| fn map_result(inner: VecExtendResult<I>) -> Self { | |||||
| let mut vec = inner.vec.unwrap(); | |||||
| for mut items in inner.items { | |||||
| vec.append(&mut items); | |||||
| } | |||||
| vec | |||||
| } | |||||
| } | |||||
| impl<'a, I> FromParallelIterator<'a, I> for Vec<I> | |||||
| where | |||||
| I: Send + 'a, | |||||
| { | |||||
| type ExecutorItem2 = VecExtendResult<I>; | |||||
| type ExecutorItem3 = (); | |||||
| fn from_par_iter<E, X>(executor: E, iterator: X) -> E::Result | |||||
| where | |||||
| E: Executor<'a, Self, VecExtendResult<I>>, | |||||
| X: IntoParallelIterator<'a, Item = I>, | |||||
| { | |||||
| let result = Self::default(); | |||||
| let consumer = result.into_consumer(); | |||||
| let iterator = iterator.into_par_iter(); | |||||
| let inner = iterator.drive(executor.into_inner(), consumer); | |||||
| E::map(inner, ParallelExtend::map_result) | |||||
| } | |||||
| } | |||||
| impl<'a, T> ParallelDrainRange<'a, usize> for &'a mut Vec<T> | |||||
| where | |||||
| T: Send, | |||||
| { | |||||
| type Iter = Drain<'a, T>; | |||||
| type Item = T; | |||||
| fn par_drain<R: RangeBounds<usize>>(self, range: R) -> Self::Iter { | |||||
| let length = self.len(); | |||||
| Drain { | |||||
| vec: self, | |||||
| range: simplify_range(range, length), | |||||
| length, | |||||
| } | |||||
| } | |||||
| } | |||||
| /* IntoIter */ | |||||
| impl<'a, T> ParallelIterator<'a> for IntoIter<T> | impl<'a, T> ParallelIterator<'a> for IntoIter<T> | ||||
| where | where | ||||
| T: Send + 'a, | T: Send + 'a, | ||||
| @@ -99,20 +162,16 @@ where | |||||
| } | } | ||||
| } | } | ||||
| impl<'a, T> ParallelDrainRange<'a, usize> for &'a mut Vec<T> | |||||
| where | |||||
| T: Send, | |||||
| { | |||||
| type Iter = Drain<'a, T>; | |||||
| type Item = T; | |||||
| /* VecContainer */ | |||||
| fn par_drain<R: RangeBounds<usize>>(self, range: R) -> Self::Iter { | |||||
| let length = self.len(); | |||||
| struct VecContainer<T>(Vec<T>); | |||||
| Drain { | |||||
| vec: self, | |||||
| range: simplify_range(range, length), | |||||
| length, | |||||
| unsafe impl<T> Sync for VecContainer<T> {} | |||||
| impl<T> Drop for VecContainer<T> { | |||||
| fn drop(&mut self) { | |||||
| unsafe { | |||||
| self.0.set_len(0); | |||||
| } | } | ||||
| } | } | ||||
| } | } | ||||
| @@ -124,18 +183,6 @@ struct VecProducer<'a, T> { | |||||
| slice: &'a mut [T], | slice: &'a mut [T], | ||||
| } | } | ||||
| struct VecContainer<T>(Vec<T>); | |||||
| impl<T> Drop for VecContainer<T> { | |||||
| fn drop(&mut self) { | |||||
| unsafe { | |||||
| self.0.set_len(0); | |||||
| } | |||||
| } | |||||
| } | |||||
| unsafe impl<T> Sync for VecContainer<T> {} | |||||
| impl<'a, T> VecProducer<'a, T> { | impl<'a, T> VecProducer<'a, T> { | ||||
| fn new(mut vec: Vec<T>) -> Self { | fn new(mut vec: Vec<T>) -> Self { | ||||
| unsafe { | unsafe { | ||||
| @@ -490,29 +537,6 @@ impl<'a, T, C> ExactSizeIterator for SliceIter<'a, T, C> { | |||||
| impl<'a, T, C> FusedIterator for SliceIter<'a, T, C> {} | impl<'a, T, C> FusedIterator for SliceIter<'a, T, C> {} | ||||
| /* ParallelExtend */ | |||||
| impl<'a, I> ParallelExtend<'a, I, VecExtendResult<I>> for Vec<I> | |||||
| where | |||||
| I: Send + 'a, | |||||
| { | |||||
| type Consumer = VecConsumer<I>; | |||||
| fn into_consumer(self) -> Self::Consumer { | |||||
| VecConsumer { vec: Some(self) } | |||||
| } | |||||
| fn map_result(inner: VecExtendResult<I>) -> Self { | |||||
| let mut vec = inner.vec.unwrap(); | |||||
| for mut items in inner.items { | |||||
| vec.append(&mut items); | |||||
| } | |||||
| vec | |||||
| } | |||||
| } | |||||
| /* VecConsumer */ | /* VecConsumer */ | ||||
| pub struct VecConsumer<T> { | pub struct VecConsumer<T> { | ||||