| @@ -0,0 +1,74 @@ | |||||
| use super::{IntoParallelIterator, Executor}; | |||||
| /// `FromParallelIterator` implements the creation of a collection | |||||
| /// from a [`ParallelIterator`]. By implementing | |||||
| /// `FromParallelIterator` for a given type, you define how it will be | |||||
| /// created from an iterator. | |||||
| /// | |||||
| /// `FromParallelIterator` is used through [`ParallelIterator`]'s [`collect()`] method. | |||||
| /// | |||||
| /// [`ParallelIterator`]: trait.ParallelIterator.html | |||||
| /// [`collect()`]: trait.ParallelIterator.html#method.collect | |||||
| /// | |||||
| /// # Examples | |||||
| /// | |||||
| /// Implementing `FromParallelIterator` for your type: | |||||
| /// | |||||
| /// ``` | |||||
| /// use rayon::prelude::*; | |||||
| /// use std::mem; | |||||
| /// | |||||
| /// struct BlackHole { | |||||
| /// mass: usize, | |||||
| /// } | |||||
| /// | |||||
| /// impl<T: Send> FromParallelIterator<T> for BlackHole { | |||||
| /// fn from_par_iter<I>(iterator: I) -> Self | |||||
| /// where I: IntoParallelIterator<Item = T> | |||||
| /// { | |||||
| /// let iterator = iterator.into_par_iter(); | |||||
| /// BlackHole { | |||||
| /// mass: iterator.count() * mem::size_of::<T>(), | |||||
| /// } | |||||
| /// } | |||||
| /// } | |||||
| /// | |||||
| /// let bh: BlackHole = (0i32..1000).into_par_iter().collect(); | |||||
| /// assert_eq!(bh.mass, 4000); | |||||
| /// ``` | |||||
| pub trait FromParallelIterator<T>: Send + Sized | |||||
| where | |||||
| T: Send, | |||||
| { | |||||
| /// Creates an instance of the collection from the parallel iterator `iterator`. | |||||
| /// | |||||
| /// If your collection is not naturally parallel, the easiest (and | |||||
| /// fastest) way to do this is often to collect `iterator` into a | |||||
| /// [`LinkedList`] or other intermediate data structure and then | |||||
| /// sequentially extend your collection. However, a more 'native' | |||||
| /// technique is to use the [`iterator.fold`] or | |||||
| /// [`iterator.fold_with`] methods to create the collection. | |||||
| /// Alternatively, if your collection is 'natively' parallel, you | |||||
| /// can use `iterator.for_each` to process each element in turn. | |||||
| /// | |||||
| /// [`LinkedList`]: https://doc.rust-lang.org/std/collections/struct.LinkedList.html | |||||
| /// [`iterator.fold`]: trait.ParallelIterator.html#method.fold | |||||
| /// [`iterator.fold_with`]: trait.ParallelIterator.html#method.fold_with | |||||
| /// [`iterator.for_each`]: trait.ParallelIterator.html#method.for_each | |||||
| fn from_par_iter<'a, E, X>(executor: E, iterator: X) -> E::Result | |||||
| where | |||||
| E: Executor<'a, Self>, | |||||
| X: IntoParallelIterator<'a, Item = T>; | |||||
| } | |||||
| impl FromParallelIterator<()> for () { | |||||
| fn from_par_iter<'a, E, X>(executor: E, iterator: X) -> E::Result | |||||
| where | |||||
| E: Executor<'a, Self>, | |||||
| X: IntoParallelIterator<'a, Item = ()>, | |||||
| { | |||||
| use crate::{ParallelIterator, inner::noop::NoOpConsumer}; | |||||
| iterator.into_par_iter().drive(executor, NoOpConsumer) | |||||
| } | |||||
| } | |||||
| @@ -1,6 +1,6 @@ | |||||
| use super::{Consumer, IndexedConsumer, IndexedProducerCallback, ProducerCallback, Executor, Reducer}; | |||||
| use super::{Consumer, IndexedConsumer, IndexedProducerCallback, ProducerCallback, Executor, Reducer, FromParallelIterator}; | |||||
| use crate::inner::{for_each::ForEach, map::Map, map_with::MapWith, map_init::MapInit}; | |||||
| use crate::inner::{for_each::ForEach, map::Map, map_with::MapWith, map_init::MapInit, collect::Collect}; | |||||
| /// Parallel version of the standard iterator trait. | /// Parallel version of the standard iterator trait. | ||||
| /// | /// | ||||
| @@ -80,7 +80,7 @@ pub trait ParallelIterator<'a>: Sized + Send { | |||||
| None | None | ||||
| } | } | ||||
| /// Executes `OP` on each item produced by the iterator, in parallel. | |||||
| /// Executes `operation` on each item produced by the iterator, in parallel. | |||||
| /// | /// | ||||
| /// # Examples | /// # Examples | ||||
| /// | /// | ||||
| @@ -96,6 +96,37 @@ pub trait ParallelIterator<'a>: Sized + Send { | |||||
| ForEach::new(self, operation) | ForEach::new(self, operation) | ||||
| } | } | ||||
| /// Executes `operation` on the given `init` value with each item produced by | |||||
| /// the iterator, in parallel. | |||||
| /// | |||||
| /// The `init` value will be cloned only as needed to be paired with | |||||
| /// the group of items in each rayon job. It does not require the type | |||||
| /// to be `Sync`. | |||||
| /// | |||||
| /// # Examples | |||||
| /// | |||||
| /// ``` | |||||
| /// use std::sync::mpsc::channel; | |||||
| /// use rayon::prelude::*; | |||||
| /// | |||||
| /// let (sender, receiver) = channel(); | |||||
| /// | |||||
| /// (0..5).into_par_iter().for_each_with(sender, |s, x| s.send(x).unwrap()); | |||||
| /// | |||||
| /// let mut res: Vec<_> = receiver.iter().collect(); | |||||
| /// | |||||
| /// res.sort(); | |||||
| /// | |||||
| /// assert_eq!(&res[..], &[0, 1, 2, 3, 4]) | |||||
| /// ``` | |||||
| fn for_each_with<O, T>(self, init: T, operation: O) -> Collect<MapWith<Self, T, O>, ()> | |||||
| where | |||||
| O: Fn(&mut T, Self::Item) + Clone + Send + Sync + 'a, | |||||
| T: Clone + Send + 'a, | |||||
| { | |||||
| self.map_with(init, operation).collect() | |||||
| } | |||||
| /// Applies `operation` to each item of this iterator, producing a new | /// Applies `operation` to each item of this iterator, producing a new | ||||
| /// iterator with the results. | /// iterator with the results. | ||||
| /// | /// | ||||
| @@ -150,7 +181,7 @@ pub trait ParallelIterator<'a>: Sized + Send { | |||||
| /// ``` | /// ``` | ||||
| fn map_with<O, T, S>(self, init: S, operation: O) -> MapWith<Self, S, O> | fn map_with<O, T, S>(self, init: S, operation: O) -> MapWith<Self, S, O> | ||||
| where | where | ||||
| O: Fn(&mut S, Self::Item) -> T + Sync + Send, | |||||
| O: Fn(&mut S, Self::Item) -> T + Clone + Sync + Send, | |||||
| S: Send + Clone, | S: Send + Clone, | ||||
| T: Send, | T: Send, | ||||
| { | { | ||||
| @@ -193,6 +224,37 @@ pub trait ParallelIterator<'a>: Sized + Send { | |||||
| { | { | ||||
| MapInit::new(self, init, operation) | MapInit::new(self, init, operation) | ||||
| } | } | ||||
| /// Creates a fresh collection containing all the elements produced | |||||
| /// by this parallel iterator. | |||||
| /// | |||||
| /// You may prefer [`collect_into_vec()`] implemented on | |||||
| /// [`IndexedParallelIterator`], if your underlying iterator also implements | |||||
| /// it. [`collect_into_vec()`] allocates efficiently with precise knowledge | |||||
| /// of how many elements the iterator contains, and even allows you to reuse | |||||
| /// an existing vector's backing store rather than allocating a fresh vector. | |||||
| /// | |||||
| /// [`IndexedParallelIterator`]: trait.IndexedParallelIterator.html | |||||
| /// [`collect_into_vec()`]: | |||||
| /// trait.IndexedParallelIterator.html#method.collect_into_vec | |||||
| /// | |||||
| /// # Examples | |||||
| /// | |||||
| /// ``` | |||||
| /// use rayon::prelude::*; | |||||
| /// | |||||
| /// let sync_vec: Vec<_> = (0..100).into_iter().collect(); | |||||
| /// | |||||
| /// let async_vec: Vec<_> = (0..100).into_par_iter().collect(); | |||||
| /// | |||||
| /// assert_eq!(sync_vec, async_vec); | |||||
| /// ``` | |||||
| fn collect<T>(self) -> Collect<Self, T> | |||||
| where | |||||
| T: FromParallelIterator<Self::Item>, | |||||
| { | |||||
| Collect::new(self) | |||||
| } | |||||
| } | } | ||||
| /// An iterator that supports "random access" to its data, meaning | /// An iterator that supports "random access" to its data, meaning | ||||
| @@ -6,7 +6,9 @@ mod into_iter; | |||||
| mod iterator; | mod iterator; | ||||
| mod producer; | mod producer; | ||||
| mod reducer; | mod reducer; | ||||
| mod from_iter; | |||||
| pub use from_iter::FromParallelIterator; | |||||
| pub use consumer::{Consumer, IndexedConsumer}; | pub use consumer::{Consumer, IndexedConsumer}; | ||||
| pub use driver::Driver; | pub use driver::Driver; | ||||
| pub use executor::{Executor, ExecutorCallback}; | pub use executor::{Executor, ExecutorCallback}; | ||||
| @@ -0,0 +1,29 @@ | |||||
| use std::marker::PhantomData; | |||||
| use crate::{core::{FromParallelIterator, Driver}, ParallelIterator, Executor}; | |||||
| pub struct Collect<X, T> { | |||||
| iterator: X, | |||||
| marker: PhantomData<T>, | |||||
| } | |||||
| impl<X, T> Collect<X, T> { | |||||
| pub fn new(iterator: X) -> Self { | |||||
| Self { | |||||
| iterator, | |||||
| marker: PhantomData, | |||||
| } | |||||
| } | |||||
| } | |||||
| impl<'a, X, T> Driver<'a, T> for Collect<X, T> | |||||
| where | |||||
| X: ParallelIterator<'a>, | |||||
| T: FromParallelIterator<X::Item> + Send, | |||||
| { | |||||
| fn exec_with<E>(self, executor: E) -> E::Result | |||||
| where E: Executor<'a, T> | |||||
| { | |||||
| T::from_par_iter(executor, self.iterator) | |||||
| } | |||||
| } | |||||
| @@ -99,8 +99,8 @@ mod tests { | |||||
| let x = (0..10usize) | let x = (0..10usize) | ||||
| .into_par_iter() | .into_par_iter() | ||||
| .map_init(move || { i.fetch_add(1, Ordering::Relaxed) }, |init, item| Some((*init, item))) | .map_init(move || { i.fetch_add(1, Ordering::Relaxed) }, |init, item| Some((*init, item))) | ||||
| .for_each(|j| { | |||||
| println!("{:?}", j); | |||||
| .for_each_with(5usize, |x, j| { | |||||
| println!("{:?} {:?}", x, j); | |||||
| }) | }) | ||||
| .exec().await; | .exec().await; | ||||
| @@ -3,3 +3,4 @@ pub mod map; | |||||
| pub mod noop; | pub mod noop; | ||||
| pub mod map_with; | pub mod map_with; | ||||
| pub mod map_init; | pub mod map_init; | ||||
| pub mod collect; | |||||