From 3d19f2c22cdbf4f3298653b5d1d21ad26505f2a6 Mon Sep 17 00:00:00 2001 From: Bergmann89 Date: Sun, 15 Nov 2020 22:33:18 +0100 Subject: [PATCH] Implemented 'with_splits' modificator --- asparit/src/core/iterator.rs | 20 ++++ asparit/src/iter/mod.rs | 11 +- asparit/src/iter/splits.rs | 201 +++++++++++++++++++++++++++++++++ asparit/src/iter/while_some.rs | 5 +- 4 files changed, 235 insertions(+), 2 deletions(-) create mode 100644 asparit/src/iter/splits.rs diff --git a/asparit/src/core/iterator.rs b/asparit/src/core/iterator.rs index ebbe376..01fcf1b 100644 --- a/asparit/src/core/iterator.rs +++ b/asparit/src/core/iterator.rs @@ -30,6 +30,7 @@ use crate::{ partition::{Partition, PartitionMap}, product::Product, reduce::{Reduce, ReduceWith}, + splits::Splits, sum::Sum, try_fold::{TryFold, TryFoldWith}, try_for_each::{TryForEach, TryForEachInit, TryForEachWith}, @@ -1727,6 +1728,25 @@ pub trait ParallelIterator<'a>: Sized + Send { { Intersperse::new(self, item) } + + /// Sets the number of splits that are processed in parallel. + /// + /// # Examples + /// + /// ``` + /// use rayon::prelude::*; + /// + /// let min = (0..1_000_000) + /// .into_par_iter() + /// .with_splits(8) + /// .for_each(|| println!("Thread ID: {:?}", std::thread::current().id)) + /// .exec(); + /// + /// assert!(min >= 1234); + /// ``` + fn with_splits(self, splits: usize) -> Splits { + Splits::new(self, splits) + } } /// An iterator that supports "random access" to its data, meaning diff --git a/asparit/src/iter/mod.rs b/asparit/src/iter/mod.rs index 6b16cdd..017a672 100644 --- a/asparit/src/iter/mod.rs +++ b/asparit/src/iter/mod.rs @@ -21,6 +21,7 @@ pub mod panic_fuse; pub mod partition; pub mod product; pub mod reduce; +pub mod splits; pub mod sum; pub mod try_fold; pub mod try_for_each; @@ -33,7 +34,7 @@ pub mod while_some; mod tests { use crate::*; - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_for_each() { use ::std::sync::atomic::{AtomicUsize, Ordering}; use ::std::sync::Arc; @@ -70,6 +71,14 @@ mod tests { move || j.fetch_add(2, Ordering::Relaxed), |init, (init2, item)| (*init, init2, item), ) + .with_splits(4) + .inspect(|x| { + println!( + "Thread ID = {:?}; Item = {:?}", + ::std::thread::current().id(), + x + ) + }) .partition_map(|(i, j, k)| match j % 3 { 0 => (Some(i), None, None), 1 => (None, Some(j), None), diff --git a/asparit/src/iter/splits.rs b/asparit/src/iter/splits.rs new file mode 100644 index 0000000..41d8c2c --- /dev/null +++ b/asparit/src/iter/splits.rs @@ -0,0 +1,201 @@ +use crate::{ + Consumer, Executor, ExecutorCallback, Folder, IndexedParallelIterator, IndexedProducer, + IndexedProducerCallback, ParallelIterator, Producer, ProducerCallback, Reducer, +}; + +pub struct Splits { + base: X, + splits: usize, +} + +impl Splits { + pub fn new(base: X, splits: usize) -> Self { + Self { base, splits } + } +} + +impl<'a, X> ParallelIterator<'a> for Splits +where + X: ParallelIterator<'a>, +{ + type Item = X::Item; + + fn drive(self, executor: E, consumer: C) -> E::Result + where + E: Executor<'a, D>, + C: Consumer + 'a, + D: Send + 'a, + R: Reducer + Send + 'a, + { + self.with_producer(ExecutorCallback::new(executor, consumer)) + } + + fn with_producer(self, base: CB) -> CB::Output + where + CB: ProducerCallback<'a, Self::Item>, + { + let splits = self.splits; + + self.base.with_producer(SplitsCallback { base, splits }) + } + + fn len_hint_opt(&self) -> Option { + self.base.len_hint_opt() + } +} + +impl<'a, X> IndexedParallelIterator<'a> for Splits +where + X: IndexedParallelIterator<'a>, +{ + fn drive_indexed(self, executor: E, consumer: C) -> E::Result + where + E: Executor<'a, D>, + C: Consumer + 'a, + D: Send + 'a, + R: Reducer + Send + 'a, + { + self.with_producer_indexed(ExecutorCallback::new(executor, consumer)) + } + + fn with_producer_indexed(self, base: CB) -> CB::Output + where + CB: IndexedProducerCallback<'a, Self::Item>, + { + let splits = self.splits; + + self.base + .with_producer_indexed(SplitsCallback { base, splits }) + } + + fn len_hint(&self) -> usize { + self.base.len_hint() + } +} + +/* SplitsCallback */ + +struct SplitsCallback { + base: CB, + splits: usize, +} + +impl<'a, CB, I> ProducerCallback<'a, I> for SplitsCallback +where + CB: ProducerCallback<'a, I>, +{ + type Output = CB::Output; + + fn callback

(self, base: P) -> Self::Output + where + P: Producer + 'a, + { + let splits = self.splits; + + self.base.callback(SplitsProducer { base, splits }) + } +} + +impl<'a, CB, I> IndexedProducerCallback<'a, I> for SplitsCallback +where + CB: IndexedProducerCallback<'a, I>, +{ + type Output = CB::Output; + + fn callback

(self, base: P) -> Self::Output + where + P: IndexedProducer + 'a, + { + let splits = self.splits; + + self.base.callback(SplitsProducer { base, splits }) + } +} + +/* SplitsProducer */ + +struct SplitsProducer

{ + base: P, + splits: usize, +} + +impl

Producer for SplitsProducer

+where + P: Producer, +{ + type Item = P::Item; + type IntoIter = P::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.base.into_iter() + } + + fn split(self) -> (Self, Option) { + let splits = self.splits; + let (left, right) = self.base.split(); + + let left = Self { base: left, splits }; + let right = right.map(|base| Self { base, splits }); + + (left, right) + } + + fn splits(&self) -> Option { + Some(self.splits) + } + + fn fold_with(self, folder: F) -> F + where + F: Folder, + { + self.base.fold_with(folder) + } +} + +impl

IndexedProducer for SplitsProducer

+where + P: IndexedProducer, +{ + type Item = P::Item; + type IntoIter = P::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.base.into_iter() + } + + fn len(&self) -> usize { + self.base.len() + } + + fn split_at(self, index: usize) -> (Self, Self) { + let splits = self.splits; + let (left, right) = self.base.split_at(index); + + let left = Self { base: left, splits }; + let right = Self { + base: right, + splits, + }; + + (left, right) + } + + fn splits(&self) -> Option { + Some(self.splits) + } + + fn min_len(&self) -> Option { + self.base.min_len() + } + + fn max_len(&self) -> Option { + self.base.max_len() + } + + fn fold_with(self, folder: F) -> F + where + F: Folder, + { + self.base.fold_with(folder) + } +} diff --git a/asparit/src/iter/while_some.rs b/asparit/src/iter/while_some.rs index 402950a..5a0a81f 100644 --- a/asparit/src/iter/while_some.rs +++ b/asparit/src/iter/while_some.rs @@ -142,7 +142,10 @@ where type IntoIter = WhileSomeIter; fn into_iter(self) -> Self::IntoIter { - unimplemented!() + WhileSomeIter { + base: self.base.into_iter(), + is_full: self.is_full, + } } fn split(self) -> (Self, Option) {