From a685b6e85dee44e28a37042d4a2b5fbcd0bda647 Mon Sep 17 00:00:00 2001 From: Bergmann89 Date: Sun, 15 Nov 2020 01:46:59 +0100 Subject: [PATCH] Implemented 'partition' and 'partition_map' operation --- asparit/src/core/iterator.rs | 74 +++++++ asparit/src/iter/mod.rs | 8 +- asparit/src/iter/partition.rs | 369 ++++++++++++++++++++++++++++++++++ 3 files changed, 450 insertions(+), 1 deletion(-) create mode 100644 asparit/src/iter/partition.rs diff --git a/asparit/src/core/iterator.rs b/asparit/src/core/iterator.rs index 0741058..6a64cd6 100644 --- a/asparit/src/core/iterator.rs +++ b/asparit/src/core/iterator.rs @@ -26,6 +26,7 @@ use crate::{ max::{Max, MaxBy, MaxByKey}, min::{Min, MinBy, MinByKey}, panic_fuse::PanicFuse, + partition::{Partition, PartitionMap}, product::Product, reduce::{Reduce, ReduceWith}, sum::Sum, @@ -1633,6 +1634,79 @@ pub trait ParallelIterator<'a>: Sized + Send { fn unzip(self) -> Unzip { Unzip::new(self) } + + /// Partitions the items of a parallel iterator into a pair of arbitrary + /// `ParallelExtend` containers. Items for which the `operation` returns + /// true go into the first container, and the rest go into the second. + /// + /// Note: unlike the standard `Iterator::partition`, this allows distinct + /// collection types for the left and right items. This is more flexible, + /// but may require new type annotations when converting sequential code + /// that used type inferrence assuming the two were the same. + /// + /// # Examples + /// + /// ``` + /// use rayon::prelude::*; + /// + /// let (left, right): (Vec<_>, Vec<_>) = (0..8).into_par_iter().partition(|x| x % 2 == 0); + /// + /// assert_eq!(left, [0, 2, 4, 6]); + /// assert_eq!(right, [1, 3, 5, 7]); + /// ``` + fn partition(self, operation: O) -> Partition + where + O: Fn(&Self::Item) -> bool + Sync + Send, + { + Partition::new(self, operation) + } + + /// Partitions and maps the items of a parallel iterator into a pair of + /// arbitrary `ParallelExtend` containers. `Either::Left` items go into + /// the first container, and `Either::Right` items go into the second. + /// + /// # Examples + /// + /// ``` + /// use rayon::prelude::*; + /// use rayon::iter::Either; + /// + /// let (left, right): (Vec<_>, Vec<_>) = (0..8).into_par_iter() + /// .partition_map(|x| { + /// if x % 2 == 0 { + /// Either::Left(x * 4) + /// } else { + /// Either::Right(x * 3) + /// } + /// }); + /// + /// assert_eq!(left, [0, 8, 16, 24]); + /// assert_eq!(right, [3, 9, 15, 21]); + /// ``` + /// + /// Nested `Either` enums can be split as well. + /// + /// ``` + /// use rayon::prelude::*; + /// use rayon::iter::Either::*; + /// + /// let ((fizzbuzz, fizz), (buzz, other)): ((Vec<_>, Vec<_>), (Vec<_>, Vec<_>)) = (1..20) + /// .into_par_iter() + /// .partition_map(|x| match (x % 3, x % 5) { + /// (0, 0) => Left(Left(x)), + /// (0, _) => Left(Right(x)), + /// (_, 0) => Right(Left(x)), + /// (_, _) => Right(Right(x)), + /// }); + /// + /// assert_eq!(fizzbuzz, [15]); + /// assert_eq!(fizz, [3, 6, 9, 12, 18]); + /// assert_eq!(buzz, [5, 10]); + /// assert_eq!(other, [1, 2, 4, 7, 8, 11, 13, 14, 16, 17, 19]); + /// ``` + fn partition_map(self, operation: O) -> PartitionMap { + PartitionMap::new(self, operation) + } } /// An iterator that supports "random access" to its data, meaning diff --git a/asparit/src/iter/mod.rs b/asparit/src/iter/mod.rs index dfe3323..76f17e1 100644 --- a/asparit/src/iter/mod.rs +++ b/asparit/src/iter/mod.rs @@ -17,6 +17,7 @@ pub mod max; pub mod min; pub mod noop; pub mod panic_fuse; +pub mod partition; pub mod product; pub mod reduce; pub mod sum; @@ -67,7 +68,12 @@ mod tests { move || j.fetch_add(2, Ordering::Relaxed), |init, (init2, item)| (*init, init2, item), ) - .unzip() + .partition_map(|(i, j, k)| match j % 3 { + 0 => (Some(i), None, None), + 1 => (None, Some(j), None), + 2 => (None, None, Some(k)), + _ => unreachable!(), + }) .exec() .await; diff --git a/asparit/src/iter/partition.rs b/asparit/src/iter/partition.rs new file mode 100644 index 0000000..4247a48 --- /dev/null +++ b/asparit/src/iter/partition.rs @@ -0,0 +1,369 @@ +use crate::{core::Driver, Consumer, Executor, Folder, ParallelExtend, ParallelIterator, Reducer}; + +/* Partition */ + +pub struct Partition { + iterator: X, + operation: O, +} + +impl Partition { + pub fn new(iterator: X, operation: O) -> Self { + Self { + iterator, + operation, + } + } +} + +impl<'a, X, O, P, T> Driver<'a, P, T> for Partition +where + X: ParallelIterator<'a>, + O: Fn(&X::Item) -> bool + Clone, + P: Default + Send + 'a, + PartitionExtend>: ParallelExtend<'a, X::Item, T>, + <> as ParallelExtend<'a, X::Item, T>>::Consumer as Consumer< + X::Item, + >>::Reducer: Reducer + Send, + T: Send + 'a, +{ + fn exec_with(self, executor: E) -> E::Result + where + E: Executor<'a, P, T>, + { + let operation = self.operation; + let executor = executor.into_inner(); + let consumer = PartitionExtend { + base: P::default(), + operation: Some(MapTwoFn(operation)), + } + .into_consumer(); + + let inner = self.iterator.drive(executor, consumer); + + E::map(inner, |inner| { + let ret = PartitionExtend::map_result(inner); + + ret.base + }) + } +} + +/* PartitionMap */ + +pub struct PartitionMap { + iterator: X, + operation: O, +} + +impl PartitionMap { + pub fn new(iterator: X, operation: O) -> Self { + Self { + iterator, + operation, + } + } +} + +impl<'a, X, O, P, T, R> Driver<'a, P, T> for PartitionMap +where + X: ParallelIterator<'a>, + O: Fn(X::Item) -> R, + P: Default + Send + 'a, + PartitionExtend>: ParallelExtend<'a, X::Item, T>, + <> as ParallelExtend<'a, X::Item, T>>::Consumer as Consumer< + X::Item, + >>::Reducer: Reducer + Send, + T: Send + 'a, +{ + fn exec_with(self, executor: E) -> E::Result + where + E: Executor<'a, P, T>, + { + let operation = self.operation; + let executor = executor.into_inner(); + let consumer = PartitionExtend { + base: P::default(), + operation: Some(MapAnyFn(operation)), + } + .into_consumer(); + + let inner = self.iterator.drive(executor, consumer); + + E::map(inner, |inner| { + let ret = PartitionExtend::map_result(inner); + + ret.base + }) + } +} + +/* Misc */ + +pub struct PartitionExtend { + base: E, + operation: Option, +} + +pub struct PartitionConsumer { + base: C, + operation: O, +} + +pub struct PartitionFolder { + base: F, + operation: O, +} + +pub struct PartitionReducer { + base: R, +} + +pub struct PartitionResult { + base: T, +} + +pub trait PartitionFn { + type Output; + + fn call(&self, item: I) -> Self::Output; +} + +macro_rules! parallel_extend_tuple { + (($($E:ident),*), ($($I:ident),*), ($($T:ident),*), ($($C:ident),*), ($($F:ident),*), ($($R:ident),*)) => { + + #[allow(non_snake_case)] + impl<'a, O, I, $($E,)+ $($I,)+ $($T,)+> ParallelExtend<'a, I, PartitionResult<($($T,)+)>> for PartitionExtend<($($E,)+), O> + where + O: PartitionFn,)+)> + Clone + Send + 'a, + I: Send + 'a, + $($E: ParallelExtend<'a, $I, $T> + Send,)+ + $(<$E::Consumer as Consumer<$I>>::Reducer: Reducer<$T>,)+ + $($I: Send + 'a,)+ + $($T: Send,)+ + { + type Consumer = PartitionConsumer<($($E::Consumer,)+), O>; + + fn into_consumer(self) -> Self::Consumer { + let ($($E,)+) = self.base; + let operation = self.operation; + + PartitionConsumer { + base: ($($E.into_consumer(),)+), + operation: operation.unwrap(), + } + } + + fn map_result(inner: PartitionResult<($($T,)+)>) -> Self { + let ($($T,)+) = inner.base; + + PartitionExtend { + base: ($($E::map_result($T),)+), + operation: None, + } + } + } + + #[allow(non_snake_case)] + impl Consumer for PartitionConsumer<($($C,)+), O> + where + O: PartitionFn,)+)> + Clone + Send, + I: Send, + $($I: Send,)+ + $($C: Consumer<$I>,)+ + { + type Folder = PartitionFolder<($($C::Folder,)+), O>; + type Reducer = PartitionReducer<($($C::Reducer,)+)>; + type Result = PartitionResult<($($C::Result,)+)>; + + fn split(self) -> (Self, Self, Self::Reducer) { + let operation = self.operation; + let ($($C,)+) = self.base; + let ($($C,)+) = ($($C.split(),)+); + + let left = PartitionConsumer { + base: ($($C.0,)+), + operation: operation.clone(), + }; + let right = PartitionConsumer { + base: ($($C.1,)+), + operation, + }; + let reducer = PartitionReducer { + base: ($($C.2,)+), + }; + + (left, right, reducer) + } + + fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) { + let operation = self.operation; + let ($($C,)+) = self.base; + let ($($C,)+) = ($($C.split_at(index),)+); + + let left = PartitionConsumer { + base: ($($C.0,)+), + operation: operation.clone(), + }; + let right = PartitionConsumer { + base: ($($C.1,)+), + operation, + }; + let reducer = PartitionReducer { + base: ($($C.2,)+), + }; + + (left, right, reducer) + } + + fn into_folder(self) -> Self::Folder { + let ($($C,)+) = self.base; + + PartitionFolder { + base: ($($C.into_folder(),)+), + operation: self.operation, + } + } + + fn is_full(&self) -> bool { + let ($($C,)+) = &self.base; + + true $(&& $C.is_full())+ + } + } + + #[allow(non_snake_case)] + impl Folder for PartitionFolder<($($F,)+), O> + where + O: PartitionFn,)+)>, + $($F: Folder<$I>,)+ + { + type Result = PartitionResult<($($F::Result,)+)>; + + fn consume(self, item: I) -> Self { + let operation = self.operation; + let ($($I,)+) = operation.call(item); + let ($(mut $F,)+) = self.base; + + $( + if let Some(item) = $I { + $F = $F.consume(item); + } + )+ + + PartitionFolder { + base: ($($F,)+), + operation, + } + } + + fn complete(self) -> Self::Result { + let ($($F,)+) = self.base; + + PartitionResult { + base: ($($F.complete(),)+), + } + } + + fn is_full(&self) -> bool { + let ($($F,)+) = &self.base; + + $($F.is_full() &&)+ true + } + } + + #[allow(non_snake_case)] + impl<$($R,)+ $($T,)+> Reducer> for PartitionReducer<($($R,)+)> + where + $($R: Reducer<$T>,)+ + { + fn reduce(self, left: PartitionResult<($($T,)+)>, right: PartitionResult<($($T,)+)>) -> PartitionResult<($($T,)+)> { + let ($($R,)+) = self.base; + let ($($T,)+) = left.base; + let ($($E,)+) = right.base; + + PartitionResult { + base: ($($R.reduce($T, $E),)+), + } + } + } + }; +} + +/* MapTwoFn */ + +pub struct MapTwoFn(O); + +impl Clone for MapTwoFn +where + O: Clone, +{ + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +impl PartitionFn for MapTwoFn +where + O: Fn(&I) -> bool, +{ + type Output = (Option, Option); + + fn call(&self, item: I) -> Self::Output { + if (self.0)(&item) { + (Some(item), None) + } else { + (None, Some(item)) + } + } +} + +/* MapAnyFn */ + +pub struct MapAnyFn(O); + +impl Clone for MapAnyFn +where + O: Clone, +{ + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +impl PartitionFn for MapAnyFn +where + O: Fn(I) -> T, +{ + type Output = T; + + fn call(&self, item: I) -> Self::Output { + (self.0)(item) + } +} + +parallel_extend_tuple!((E1, E2), (I1, I2), (T1, T2), (C1, C2), (F1, F2), (R1, R2)); +parallel_extend_tuple!( + (E1, E2, E3), + (I1, I2, I3), + (T1, T2, T3), + (C1, C2, C3), + (F1, F2, F3), + (R1, R2, R3) +); +parallel_extend_tuple!( + (E1, E2, E3, E4), + (I1, I2, I3, I4), + (T1, T2, T3, T4), + (C1, C2, C3, C4), + (F1, F2, F3, F4), + (R1, R2, R3, R4) +); +parallel_extend_tuple!( + (E1, E2, E3, E4, E5), + (I1, I2, I3, I4, I5), + (T1, T2, T3, T4, T5), + (C1, C2, C3, C4, C5), + (F1, F2, F3, F4, F5), + (R1, R2, R3, R4, R5) +);