diff --git a/asparit/src/core/from_iter.rs b/asparit/src/core/from_iter.rs new file mode 100644 index 0000000..ed71846 --- /dev/null +++ b/asparit/src/core/from_iter.rs @@ -0,0 +1,74 @@ +use super::{IntoParallelIterator, Executor}; + +/// `FromParallelIterator` implements the creation of a collection +/// from a [`ParallelIterator`]. By implementing +/// `FromParallelIterator` for a given type, you define how it will be +/// created from an iterator. +/// +/// `FromParallelIterator` is used through [`ParallelIterator`]'s [`collect()`] method. +/// +/// [`ParallelIterator`]: trait.ParallelIterator.html +/// [`collect()`]: trait.ParallelIterator.html#method.collect +/// +/// # Examples +/// +/// Implementing `FromParallelIterator` for your type: +/// +/// ``` +/// use rayon::prelude::*; +/// use std::mem; +/// +/// struct BlackHole { +/// mass: usize, +/// } +/// +/// impl FromParallelIterator for BlackHole { +/// fn from_par_iter(iterator: I) -> Self +/// where I: IntoParallelIterator +/// { +/// let iterator = iterator.into_par_iter(); +/// BlackHole { +/// mass: iterator.count() * mem::size_of::(), +/// } +/// } +/// } +/// +/// let bh: BlackHole = (0i32..1000).into_par_iter().collect(); +/// assert_eq!(bh.mass, 4000); +/// ``` +pub trait FromParallelIterator: Send + Sized +where + T: Send, +{ + /// Creates an instance of the collection from the parallel iterator `iterator`. + /// + /// If your collection is not naturally parallel, the easiest (and + /// fastest) way to do this is often to collect `iterator` into a + /// [`LinkedList`] or other intermediate data structure and then + /// sequentially extend your collection. However, a more 'native' + /// technique is to use the [`iterator.fold`] or + /// [`iterator.fold_with`] methods to create the collection. + /// Alternatively, if your collection is 'natively' parallel, you + /// can use `iterator.for_each` to process each element in turn. + /// + /// [`LinkedList`]: https://doc.rust-lang.org/std/collections/struct.LinkedList.html + /// [`iterator.fold`]: trait.ParallelIterator.html#method.fold + /// [`iterator.fold_with`]: trait.ParallelIterator.html#method.fold_with + /// [`iterator.for_each`]: trait.ParallelIterator.html#method.for_each + fn from_par_iter<'a, E, X>(executor: E, iterator: X) -> E::Result + where + E: Executor<'a, Self>, + X: IntoParallelIterator<'a, Item = T>; +} + +impl FromParallelIterator<()> for () { + fn from_par_iter<'a, E, X>(executor: E, iterator: X) -> E::Result + where + E: Executor<'a, Self>, + X: IntoParallelIterator<'a, Item = ()>, + { + use crate::{ParallelIterator, inner::noop::NoOpConsumer}; + + iterator.into_par_iter().drive(executor, NoOpConsumer) + } +} diff --git a/asparit/src/core/iterator.rs b/asparit/src/core/iterator.rs index 0d5e8ca..c0b59ea 100644 --- a/asparit/src/core/iterator.rs +++ b/asparit/src/core/iterator.rs @@ -1,6 +1,6 @@ -use super::{Consumer, IndexedConsumer, IndexedProducerCallback, ProducerCallback, Executor, Reducer}; +use super::{Consumer, IndexedConsumer, IndexedProducerCallback, ProducerCallback, Executor, Reducer, FromParallelIterator}; -use crate::inner::{for_each::ForEach, map::Map, map_with::MapWith, map_init::MapInit}; +use crate::inner::{for_each::ForEach, map::Map, map_with::MapWith, map_init::MapInit, collect::Collect}; /// Parallel version of the standard iterator trait. /// @@ -80,7 +80,7 @@ pub trait ParallelIterator<'a>: Sized + Send { None } - /// Executes `OP` on each item produced by the iterator, in parallel. + /// Executes `operation` on each item produced by the iterator, in parallel. /// /// # Examples /// @@ -96,6 +96,37 @@ pub trait ParallelIterator<'a>: Sized + Send { ForEach::new(self, operation) } + /// Executes `operation` on the given `init` value with each item produced by + /// the iterator, in parallel. + /// + /// The `init` value will be cloned only as needed to be paired with + /// the group of items in each rayon job. It does not require the type + /// to be `Sync`. + /// + /// # Examples + /// + /// ``` + /// use std::sync::mpsc::channel; + /// use rayon::prelude::*; + /// + /// let (sender, receiver) = channel(); + /// + /// (0..5).into_par_iter().for_each_with(sender, |s, x| s.send(x).unwrap()); + /// + /// let mut res: Vec<_> = receiver.iter().collect(); + /// + /// res.sort(); + /// + /// assert_eq!(&res[..], &[0, 1, 2, 3, 4]) + /// ``` + fn for_each_with(self, init: T, operation: O) -> Collect, ()> + where + O: Fn(&mut T, Self::Item) + Clone + Send + Sync + 'a, + T: Clone + Send + 'a, + { + self.map_with(init, operation).collect() + } + /// Applies `operation` to each item of this iterator, producing a new /// iterator with the results. /// @@ -150,7 +181,7 @@ pub trait ParallelIterator<'a>: Sized + Send { /// ``` fn map_with(self, init: S, operation: O) -> MapWith where - O: Fn(&mut S, Self::Item) -> T + Sync + Send, + O: Fn(&mut S, Self::Item) -> T + Clone + Sync + Send, S: Send + Clone, T: Send, { @@ -193,6 +224,37 @@ pub trait ParallelIterator<'a>: Sized + Send { { MapInit::new(self, init, operation) } + + /// Creates a fresh collection containing all the elements produced + /// by this parallel iterator. + /// + /// You may prefer [`collect_into_vec()`] implemented on + /// [`IndexedParallelIterator`], if your underlying iterator also implements + /// it. [`collect_into_vec()`] allocates efficiently with precise knowledge + /// of how many elements the iterator contains, and even allows you to reuse + /// an existing vector's backing store rather than allocating a fresh vector. + /// + /// [`IndexedParallelIterator`]: trait.IndexedParallelIterator.html + /// [`collect_into_vec()`]: + /// trait.IndexedParallelIterator.html#method.collect_into_vec + /// + /// # Examples + /// + /// ``` + /// use rayon::prelude::*; + /// + /// let sync_vec: Vec<_> = (0..100).into_iter().collect(); + /// + /// let async_vec: Vec<_> = (0..100).into_par_iter().collect(); + /// + /// assert_eq!(sync_vec, async_vec); + /// ``` + fn collect(self) -> Collect + where + T: FromParallelIterator, + { + Collect::new(self) + } } /// An iterator that supports "random access" to its data, meaning diff --git a/asparit/src/core/mod.rs b/asparit/src/core/mod.rs index 772ef7d..01fd9bf 100644 --- a/asparit/src/core/mod.rs +++ b/asparit/src/core/mod.rs @@ -6,7 +6,9 @@ mod into_iter; mod iterator; mod producer; mod reducer; +mod from_iter; +pub use from_iter::FromParallelIterator; pub use consumer::{Consumer, IndexedConsumer}; pub use driver::Driver; pub use executor::{Executor, ExecutorCallback}; diff --git a/asparit/src/inner/collect.rs b/asparit/src/inner/collect.rs new file mode 100644 index 0000000..67dedf4 --- /dev/null +++ b/asparit/src/inner/collect.rs @@ -0,0 +1,29 @@ +use std::marker::PhantomData; + +use crate::{core::{FromParallelIterator, Driver}, ParallelIterator, Executor}; + +pub struct Collect { + iterator: X, + marker: PhantomData, +} + +impl Collect { + pub fn new(iterator: X) -> Self { + Self { + iterator, + marker: PhantomData, + } + } +} + +impl<'a, X, T> Driver<'a, T> for Collect +where + X: ParallelIterator<'a>, + T: FromParallelIterator + Send, +{ + fn exec_with(self, executor: E) -> E::Result + where E: Executor<'a, T> + { + T::from_par_iter(executor, self.iterator) + } +} diff --git a/asparit/src/inner/for_each.rs b/asparit/src/inner/for_each.rs index 9799423..87c7183 100644 --- a/asparit/src/inner/for_each.rs +++ b/asparit/src/inner/for_each.rs @@ -99,8 +99,8 @@ mod tests { let x = (0..10usize) .into_par_iter() .map_init(move || { i.fetch_add(1, Ordering::Relaxed) }, |init, item| Some((*init, item))) - .for_each(|j| { - println!("{:?}", j); + .for_each_with(5usize, |x, j| { + println!("{:?} {:?}", x, j); }) .exec().await; diff --git a/asparit/src/inner/mod.rs b/asparit/src/inner/mod.rs index 0c00835..bba4bbd 100644 --- a/asparit/src/inner/mod.rs +++ b/asparit/src/inner/mod.rs @@ -3,3 +3,4 @@ pub mod map; pub mod noop; pub mod map_with; pub mod map_init; +pub mod collect;