From 0d84424d689fc1c4255d031a19cf086924643803 Mon Sep 17 00:00:00 2001 From: Bergmann89 Date: Sun, 8 Nov 2020 23:20:15 +0100 Subject: [PATCH] Implemented 'min_by_key' and 'max_by_key' operation --- asparit/src/core/driver.rs | 10 +++--- asparit/src/core/executor.rs | 48 ++++++++++++++++---------- asparit/src/core/iterator.rs | 54 ++++++++++++++++++++++++++++-- asparit/src/executor/sequential.rs | 35 ++++++++++++++----- asparit/src/executor/tokio.rs | 40 +++++++++++++++++----- asparit/src/inner/max.rs | 42 +++++++++++++++++++++++ asparit/src/inner/min.rs | 42 +++++++++++++++++++++++ 7 files changed, 229 insertions(+), 42 deletions(-) diff --git a/asparit/src/core/driver.rs b/asparit/src/core/driver.rs index 7e650ac..4d2432a 100644 --- a/asparit/src/core/driver.rs +++ b/asparit/src/core/driver.rs @@ -1,14 +1,16 @@ use crate::{DefaultExecutor, Executor}; -pub trait Driver<'a, D>: Sized +pub trait Driver<'a, T1, T2 = (), T3 = ()>: Sized where - D: Send + 'a, + T1: Send + 'a, + T2: Send + 'a, + T3: Send + 'a, { fn exec_with(self, executor: E) -> E::Result where - E: Executor<'a, D>; + E: Executor<'a, T1, T2, T3>; - fn exec(self) -> >::Result { + fn exec(self) -> >::Result { self.exec_with(DefaultExecutor::default()) } } diff --git a/asparit/src/core/executor.rs b/asparit/src/core/executor.rs index e660abe..e575505 100644 --- a/asparit/src/core/executor.rs +++ b/asparit/src/core/executor.rs @@ -2,30 +2,42 @@ use super::{ Consumer, IndexedProducer, IndexedProducerCallback, Producer, ProducerCallback, Reducer, }; -pub trait Executor<'a, D>: Sized +pub trait Executor<'a, T1, T2 = (), T3 = ()>: Sized where - D: Send + 'a, + T1: Send + 'a, + T2: Send + 'a, + T3: Send + 'a, { type Result: Send; + type Inner: Executor<'a, T2, T3, ()>; fn exec(self, producer: P, consumer: C) -> Self::Result where P: Producer + 'a, - C: Consumer + 'a, - R: Reducer + Send + 'a; + C: Consumer + 'a, + R: Reducer + Send + 'a; fn exec_indexed(self, producer: P, consumer: C) -> Self::Result where P: IndexedProducer + 'a, - C: Consumer + 'a, - R: Reducer + Send + 'a; + C: Consumer + 'a, + R: Reducer + Send + 'a; fn split(self) -> (Self, Self); fn join(left: Self::Result, right: Self::Result, reducer: R) -> Self::Result where - R: Reducer + Send + 'a, - D: 'a; + R: Reducer + Send + 'a, + T1: 'a; + + fn into_inner(self) -> Self::Inner; + + fn map( + inner: >::Result, + operation: O, + ) -> Self::Result + where + O: Fn(T2) -> T1 + Send + 'a; } pub struct ExecutorCallback { @@ -39,12 +51,12 @@ impl ExecutorCallback { } } -impl<'a, E, D, C, I, R> ProducerCallback<'a, I> for ExecutorCallback +impl<'a, E, T1, C, I, R> ProducerCallback<'a, I> for ExecutorCallback where - E: Executor<'a, D>, - D: Send + 'a, - C: Consumer + 'a, - R: Reducer + Send + 'a, + E: Executor<'a, T1>, + T1: Send + 'a, + C: Consumer + 'a, + R: Reducer + Send + 'a, { type Output = E::Result; @@ -56,12 +68,12 @@ where } } -impl<'a, E, D, C, I, R> IndexedProducerCallback<'a, I> for ExecutorCallback +impl<'a, E, T1, C, I, R> IndexedProducerCallback<'a, I> for ExecutorCallback where - E: Executor<'a, D>, - D: Send + 'a, - C: Consumer + 'a, - R: Reducer + Send + 'a, + E: Executor<'a, T1>, + T1: Send + 'a, + C: Consumer + 'a, + R: Reducer + Send + 'a, { type Output = E::Result; diff --git a/asparit/src/core/iterator.rs b/asparit/src/core/iterator.rs index 84080d7..555433d 100644 --- a/asparit/src/core/iterator.rs +++ b/asparit/src/core/iterator.rs @@ -22,8 +22,8 @@ use crate::{ map::Map, map_init::MapInit, map_with::MapWith, - max::{Max, MaxBy}, - min::{Min, MinBy}, + max::{Max, MaxBy, MaxByKey}, + min::{Min, MinBy, MinByKey}, product::Product, reduce::{Reduce, ReduceWith}, sum::Sum, @@ -1142,6 +1142,31 @@ pub trait ParallelIterator<'a>: Sized + Send { MinBy::new(self, operation) } + /// Computes the item that yields the minimum value for the given + /// function. If the iterator is empty, `None` is returned; + /// otherwise, `Some(item)` is returned. + /// + /// Note that the order in which the items will be reduced is not + /// specified, so if the `Ord` impl is not truly associative, then + /// the results are not deterministic. + /// + /// # Examples + /// + /// ``` + /// use rayon::prelude::*; + /// + /// let a = [-3_i32, 34, 2, 5, -10, -3, -23]; + /// + /// assert_eq!(a.par_iter().min_by_key(|x| x.abs()), Some(&2)); + /// ``` + fn min_by_key(self, operation: O) -> MinByKey + where + O: Fn(&Self::Item) -> K + Clone + Send + 'a, + K: Ord + Send, + { + MinByKey::new(self, operation) + } + /// Computes the maximum of all the items in the iterator. If the /// iterator is empty, `None` is returned; otherwise, `Some(max)` /// is returned. @@ -1196,6 +1221,31 @@ pub trait ParallelIterator<'a>: Sized + Send { MaxBy::new(self, operation) } + /// Computes the item that yields the maximum value for the given + /// function. If the iterator is empty, `None` is returned; + /// otherwise, `Some(item)` is returned. + /// + /// Note that the order in which the items will be reduced is not + /// specified, so if the `Ord` impl is not truly associative, then + /// the results are not deterministic. + /// + /// # Examples + /// + /// ``` + /// use rayon::prelude::*; + /// + /// let a = [-3_i32, 34, 2, 5, -10, -3, -23]; + /// + /// assert_eq!(a.par_iter().max_by_key(|x| x.abs()), Some(&34)); + /// ``` + fn max_by_key(self, operation: O) -> MaxByKey + where + O: Fn(&Self::Item) -> K + Clone + Send + 'a, + K: Ord + Send, + { + MaxByKey::new(self, operation) + } + /// Takes two iterators and creates a new iterator over both. /// /// # Examples diff --git a/asparit/src/executor/sequential.rs b/asparit/src/executor/sequential.rs index 5d52000..87c8d00 100644 --- a/asparit/src/executor/sequential.rs +++ b/asparit/src/executor/sequential.rs @@ -3,17 +3,20 @@ use crate::core::{Consumer, Executor, Folder, IndexedProducer, Producer, Reducer #[derive(Default)] pub struct Sequential; -impl<'a, D> Executor<'a, D> for Sequential +impl<'a, T1, T2, T3> Executor<'a, T1, T2, T3> for Sequential where - D: Send + 'a, + T1: Send + 'a, + T2: Send + 'a, + T3: Send + 'a, { - type Result = D; + type Result = T1; + type Inner = Sequential; fn exec(self, producer: P, consumer: C) -> Self::Result where P: Producer + 'a, - C: Consumer + 'a, - R: Reducer, + C: Consumer + 'a, + R: Reducer, { if consumer.is_full() { consumer.into_folder().complete() @@ -25,8 +28,8 @@ where fn exec_indexed(self, producer: P, consumer: C) -> Self::Result where P: IndexedProducer, - C: Consumer, - R: Reducer, + C: Consumer, + R: Reducer, { if consumer.is_full() { consumer.into_folder().complete() @@ -39,10 +42,24 @@ where (Self, Self) } - fn join(left: D, right: D, reducer: R) -> Self::Result + fn join(left: T1, right: T1, reducer: R) -> Self::Result where - R: Reducer + Send, + R: Reducer + Send, { reducer.reduce(left, right) } + + fn into_inner(self) -> Self::Inner { + self + } + + fn map( + inner: >::Result, + operation: O, + ) -> Self::Result + where + O: Fn(T2) -> T1, + { + operation(inner) + } } diff --git a/asparit/src/executor/tokio.rs b/asparit/src/executor/tokio.rs index d83f65c..a2c624a 100644 --- a/asparit/src/executor/tokio.rs +++ b/asparit/src/executor/tokio.rs @@ -27,17 +27,20 @@ impl Default for Tokio { } } -impl<'a, D> Executor<'a, D> for Tokio +impl<'a, T1, T2, T3> Executor<'a, T1, T2, T3> for Tokio where - D: Send + 'a, + T1: Send + 'a, + T2: Send + 'a, + T3: Send + 'a, { - type Result = BoxFuture<'a, D>; + type Result = BoxFuture<'a, T1>; + type Inner = Tokio; fn exec(self, producer: P, consumer: C) -> Self::Result where P: Producer + 'a, - C: Consumer + 'a, - R: Reducer + Send + 'a, + C: Consumer + 'a, + R: Reducer + Send + 'a, { let splits = producer.splits().unwrap_or(self.splits); let splitter = Splitter::new(splits); @@ -48,8 +51,8 @@ where fn exec_indexed(self, producer: P, consumer: C) -> Self::Result where P: IndexedProducer + 'a, - C: Consumer + 'a, - R: Reducer + Send + 'a, + C: Consumer + 'a, + R: Reducer + Send + 'a, { let splits = producer.splits().unwrap_or(self.splits); let splitter = IndexedSplitter::new( @@ -75,8 +78,8 @@ where fn join(left: Self::Result, right: Self::Result, reducer: R) -> Self::Result where - R: Reducer + Send + 'a, - D: 'a, + R: Reducer + Send + 'a, + T1: 'a, { async move { let left = left.await; @@ -86,6 +89,25 @@ where } .boxed() } + + fn into_inner(self) -> Self::Inner { + self + } + + fn map( + inner: >::Result, + operation: O, + ) -> Self::Result + where + O: Fn(T2) -> T1 + Send + 'a, + { + async move { + let value = inner.await; + + operation(value) + } + .boxed() + } } fn exec<'a, P, C>(mut splitter: Splitter, producer: P, consumer: C) -> BoxFuture<'a, C::Result> diff --git a/asparit/src/inner/max.rs b/asparit/src/inner/max.rs index df7d5f1..bcacd88 100644 --- a/asparit/src/inner/max.rs +++ b/asparit/src/inner/max.rs @@ -63,3 +63,45 @@ where .exec_with(executor) } } + +/* MaxByKey */ + +pub struct MaxByKey { + iterator: X, + operation: O, +} + +impl MaxByKey { + pub fn new(iterator: X, operation: O) -> Self { + Self { + iterator, + operation, + } + } +} + +impl<'a, X, O, K> Driver<'a, Option, Option<(K, X::Item)>> for MaxByKey +where + X: ParallelIterator<'a>, + O: Fn(&X::Item) -> K + Clone + Send + Sync + 'a, + K: Send + Ord + 'a, +{ + fn exec_with(self, executor: E) -> E::Result + where + E: Executor<'a, Option, Option<(K, X::Item)>>, + { + let operation = self.operation; + let executor = executor.into_inner(); + + let ret = self + .iterator + .map(move |x| (operation(&x), x)) + .reduce_with(|a, b| match (a.0).cmp(&b.0) { + Ordering::Greater => a, + _ => b, + }) + .exec_with(executor); + + E::map(ret, |x| x.map(|x| x.1)) + } +} diff --git a/asparit/src/inner/min.rs b/asparit/src/inner/min.rs index 61759c2..7cc86c5 100644 --- a/asparit/src/inner/min.rs +++ b/asparit/src/inner/min.rs @@ -63,3 +63,45 @@ where .exec_with(executor) } } + +/* MinByKey */ + +pub struct MinByKey { + iterator: X, + operation: O, +} + +impl MinByKey { + pub fn new(iterator: X, operation: O) -> Self { + Self { + iterator, + operation, + } + } +} + +impl<'a, X, O, K> Driver<'a, Option, Option<(K, X::Item)>> for MinByKey +where + X: ParallelIterator<'a>, + O: Fn(&X::Item) -> K + Clone + Send + Sync + 'a, + K: Send + Ord + 'a, +{ + fn exec_with(self, executor: E) -> E::Result + where + E: Executor<'a, Option, Option<(K, X::Item)>>, + { + let operation = self.operation; + let executor = executor.into_inner(); + + let ret = self + .iterator + .map(move |x| (operation(&x), x)) + .reduce_with(|a, b| match (a.0).cmp(&b.0) { + Ordering::Less => a, + _ => b, + }) + .exec_with(executor); + + E::map(ret, |x| x.map(|x| x.1)) + } +}