From c6344d7da34ff6646943053bed5745caee796420 Mon Sep 17 00:00:00 2001 From: Bergmann89 Date: Wed, 18 Nov 2020 01:48:03 +0100 Subject: [PATCH] Implemented 'zip' and 'zip_eq' operation --- asparit/src/core/iterator.rs | 62 +++++++++ asparit/src/iter/mod.rs | 3 + asparit/src/iter/zip.rs | 243 +++++++++++++++++++++++++++++++++++ 3 files changed, 308 insertions(+) create mode 100644 asparit/src/iter/zip.rs diff --git a/asparit/src/core/iterator.rs b/asparit/src/core/iterator.rs index cc6a21c..8363860 100644 --- a/asparit/src/core/iterator.rs +++ b/asparit/src/core/iterator.rs @@ -35,6 +35,7 @@ use crate::{ unzip::Unzip, update::Update, while_some::WhileSome, + zip::Zip, }, misc::Try, }; @@ -1769,4 +1770,65 @@ pub trait IndexedParallelIterator<'a>: ParallelIterator<'a> { /// assert_eq!(vec.len(), 10); /// ``` fn len_hint(&self) -> usize; + + /// Iterates over tuples `(A, B)`, where the items `A` are from + /// this iterator and `B` are from the iterator given as argument. + /// Like the `zip` method on ordinary iterators, if the two + /// iterators are of unequal length, you only get the items they + /// have in common. + /// + /// # Examples + /// + /// ``` + /// use rayon::prelude::*; + /// + /// let result: Vec<_> = (1..4) + /// .into_par_iter() + /// .zip(vec!['a', 'b', 'c']) + /// .collect(); + /// + /// assert_eq!(result, [(1, 'a'), (2, 'b'), (3, 'c')]); + /// ``` + fn zip(self, other: X) -> Zip + where + X: IntoParallelIterator<'a>, + X::Iter: IndexedParallelIterator<'a>, + { + Zip::new(self, other.into_par_iter()) + } + + /// The same as `Zip`, but requires that both iterators have the same length. + /// + /// # Panics + /// Will panic if `self` and `other` are not the same length. + /// + /// ```should_panic + /// use rayon::prelude::*; + /// + /// let one = [1u8]; + /// let two = [2u8, 2]; + /// let one_iter = one.par_iter(); + /// let two_iter = two.par_iter(); + /// + /// // this will panic + /// let zipped: Vec<(&u8, &u8)> = one_iter.zip_eq(two_iter).collect(); + /// + /// // we should never get here + /// assert_eq!(1, zipped.len()); + /// ``` + fn zip_eq(self, other: X) -> Zip + where + X: IntoParallelIterator<'a>, + X::Iter: IndexedParallelIterator<'a>, + { + let other = other.into_par_iter(); + + assert_eq!( + self.len_hint(), + other.len_hint(), + "Iterators of 'zip_eq' operation must hae the same length!" + ); + + Zip::new(self, other) + } } diff --git a/asparit/src/iter/mod.rs b/asparit/src/iter/mod.rs index 717adda..28b480e 100644 --- a/asparit/src/iter/mod.rs +++ b/asparit/src/iter/mod.rs @@ -29,6 +29,7 @@ pub mod try_reduce; pub mod unzip; pub mod update; pub mod while_some; +pub mod zip; #[cfg(test)] mod tests { @@ -58,6 +59,8 @@ mod tests { .cloned() .chain(b) .update(|x| x.push(0)) + .zip_eq(vec![10usize, 11usize, 12usize, 13usize, 14usize, 15usize]) + .map(|x| x.0) .flatten_iter() .intersperse(100) .panic_fuse() diff --git a/asparit/src/iter/zip.rs b/asparit/src/iter/zip.rs new file mode 100644 index 0000000..986b9d7 --- /dev/null +++ b/asparit/src/iter/zip.rs @@ -0,0 +1,243 @@ +use std::cmp::min; + +use crate::{ + Consumer, Executor, ExecutorCallback, IndexedParallelIterator, IndexedProducer, + IndexedProducerCallback, ParallelIterator, Producer, Reducer, Setup, WithIndexedProducer, + WithSetup, +}; + +pub struct Zip { + iterator_a: XA, + iterator_b: XB, +} + +impl Zip { + pub fn new(iterator_a: XA, iterator_b: XB) -> Self { + Self { + iterator_a, + iterator_b, + } + } +} + +impl<'a, XA, XB, A, B> ParallelIterator<'a> for Zip +where + XA: IndexedParallelIterator<'a, Item = A> + WithIndexedProducer<'a, Item = A>, + XB: IndexedParallelIterator<'a, Item = B> + WithIndexedProducer<'a, Item = B>, + A: Send + 'a, + B: Send + 'a, +{ + type Item = (A, B); + + fn drive(self, executor: E, consumer: C) -> E::Result + where + E: Executor<'a, D>, + C: Consumer + 'a, + D: Send + 'a, + R: Reducer + Send + 'a, + { + self.with_indexed_producer(ExecutorCallback::new(executor, consumer)) + } + + fn len_hint_opt(&self) -> Option { + match ( + self.iterator_a.len_hint_opt(), + self.iterator_b.len_hint_opt(), + ) { + (Some(a), Some(b)) => Some(min(a, b)), + (_, _) => None, + } + } +} + +impl<'a, XA, XB, A, B> IndexedParallelIterator<'a> for Zip +where + XA: IndexedParallelIterator<'a, Item = A> + WithIndexedProducer<'a, Item = A>, + XB: IndexedParallelIterator<'a, Item = B> + WithIndexedProducer<'a, Item = B>, + A: Send + 'a, + B: Send + 'a, +{ + fn drive_indexed(self, executor: E, consumer: C) -> E::Result + where + E: Executor<'a, D>, + C: Consumer + 'a, + D: Send + 'a, + R: Reducer + Send + 'a, + { + self.with_indexed_producer(ExecutorCallback::new(executor, consumer)) + } + + fn len_hint(&self) -> usize { + min(self.iterator_a.len_hint(), self.iterator_b.len_hint()) + } +} + +impl<'a, XA, XB> WithIndexedProducer<'a> for Zip +where + XA: WithIndexedProducer<'a>, + XB: WithIndexedProducer<'a>, +{ + type Item = (XA::Item, XB::Item); + + fn with_indexed_producer(self, base: CB) -> CB::Output + where + CB: IndexedProducerCallback<'a, Self::Item>, + { + let iterator_a = self.iterator_a; + let iterator_b = self.iterator_b; + + iterator_a.with_indexed_producer(CallbackA { base, iterator_b }) + } +} + +/* CallbackA */ + +struct CallbackA { + base: CB, + iterator_b: XB, +} + +impl<'a, CB, XB, A, B> IndexedProducerCallback<'a, A> for CallbackA +where + CB: IndexedProducerCallback<'a, (A, B)>, + XB: WithIndexedProducer<'a, Item = B>, + A: Send + 'a, + B: Send + 'a, +{ + type Output = CB::Output; + + fn callback

(self, producer_a: P) -> Self::Output + where + P: IndexedProducer + 'a, + { + let CallbackA { base, iterator_b } = self; + + iterator_b.with_indexed_producer(CallbackB { base, producer_a }) + } +} + +/* CallbackB */ + +struct CallbackB { + base: CB, + producer_a: PA, +} + +impl<'a, CB, PA, A, B> IndexedProducerCallback<'a, B> for CallbackB +where + CB: IndexedProducerCallback<'a, (A, B)>, + PA: IndexedProducer + 'a, + A: Send + 'a, + B: Send + 'a, +{ + type Output = CB::Output; + + fn callback

(self, producer_b: P) -> Self::Output + where + P: IndexedProducer + 'a, + { + let CallbackB { base, producer_a } = self; + + let producer = ZipProducer { + producer_a, + producer_b, + }; + + base.callback(producer) + } +} + +/* ZipProducer */ + +struct ZipProducer { + producer_a: PA, + producer_b: PB, +} + +impl WithSetup for ZipProducer +where + PA: WithSetup, + PB: WithSetup, +{ + fn setup(&self) -> Setup { + let a = self.producer_a.setup(); + let b = self.producer_b.setup(); + + a.merge(b) + } +} + +impl Producer for ZipProducer +where + PA: IndexedProducer, + PB: IndexedProducer, +{ + type Item = (PA::Item, PB::Item); + type IntoIter = std::iter::Zip; + + fn into_iter(self) -> Self::IntoIter { + let a = self.producer_a.into_iter(); + let b = self.producer_b.into_iter(); + + a.zip(b) + } + + fn split(self) -> (Self, Option) { + let len = self.len(); + + if len < 2 { + return (self, None); + } + + let index = len / 2; + let (left_a, right_a) = self.producer_a.split_at(index); + let (left_b, right_b) = self.producer_b.split_at(index); + + let left = Self { + producer_a: left_a, + producer_b: left_b, + }; + let right = Self { + producer_a: right_a, + producer_b: right_b, + }; + + (left, Some(right)) + } +} + +impl IndexedProducer for ZipProducer +where + PA: IndexedProducer, + PB: IndexedProducer, +{ + type Item = (PA::Item, PB::Item); + type IntoIter = std::iter::Zip; + + fn into_iter(self) -> Self::IntoIter { + let a = self.producer_a.into_iter(); + let b = self.producer_b.into_iter(); + + a.zip(b) + } + + fn len(&self) -> usize { + min(self.producer_a.len(), self.producer_b.len()) + } + + fn split_at(self, index: usize) -> (Self, Self) { + let (left_a, right_a) = self.producer_a.split_at(index); + let (left_b, right_b) = self.producer_b.split_at(index); + + let left = Self { + producer_a: left_a, + producer_b: left_b, + }; + let right = Self { + producer_a: right_a, + producer_b: right_b, + }; + + (left, right) + } +}