diff --git a/asparit/src/core/iterator.rs b/asparit/src/core/iterator.rs index 8363860..93e0382 100644 --- a/asparit/src/core/iterator.rs +++ b/asparit/src/core/iterator.rs @@ -1,7 +1,9 @@ use std::cmp::{Ord, Ordering}; use std::iter::IntoIterator; -use super::{Consumer, Executor, FromParallelIterator, IntoParallelIterator, Reducer}; +use super::{ + Consumer, Executor, FromParallelIterator, IntoParallelIterator, Reducer, WithIndexedProducer, +}; use crate::{ iter::{ @@ -17,6 +19,7 @@ use crate::{ fold::{Fold, FoldWith}, for_each::ForEach, inspect::Inspect, + interleave::Interleave, intersperse::Intersperse, map::Map, map_init::MapInit, @@ -29,6 +32,7 @@ use crate::{ reduce::{Reduce, ReduceWith}, splits::Splits, sum::Sum, + take::Take, try_fold::{TryFold, TryFoldWith}, try_for_each::{TryForEach, TryForEachInit, TryForEachWith}, try_reduce::{TryReduce, TryReduceWith}, @@ -1831,4 +1835,75 @@ pub trait IndexedParallelIterator<'a>: ParallelIterator<'a> { Zip::new(self, other) } + + /// Interleaves elements of this iterator and the other given + /// iterator. Alternately yields elements from this iterator and + /// the given iterator, until both are exhausted. If one iterator + /// is exhausted before the other, the last elements are provided + /// from the other. + /// + /// # Examples + /// + /// ``` + /// use rayon::prelude::*; + /// let (x, y) = (vec![1, 2], vec![3, 4, 5, 6]); + /// let r: Vec = x.into_par_iter().interleave(y).collect(); + /// assert_eq!(r, vec![1, 3, 2, 4, 5, 6]); + /// ``` + fn interleave(self, other: X) -> Interleave + where + X: IntoParallelIterator<'a, Item = Self::Item>, + X::Iter: IndexedParallelIterator<'a, Item = Self::Item>, + { + Interleave::new(self, other.into_par_iter()) + } + + /// Interleaves elements of this iterator and the other given + /// iterator, until one is exhausted. + /// + /// # Examples + /// + /// ``` + /// use rayon::prelude::*; + /// let (x, y) = (vec![1, 2, 3, 4], vec![5, 6]); + /// let r: Vec = x.into_par_iter().interleave_shortest(y).collect(); + /// assert_eq!(r, vec![1, 5, 2, 6, 3]); + /// ``` + fn interleave_shortest(self, other: X) -> Interleave, Take> + where + X: IntoParallelIterator<'a, Item = I>, + X::Iter: IndexedParallelIterator<'a, Item = I> + WithIndexedProducer<'a, Item = I>, + Self: IndexedParallelIterator<'a, Item = I> + WithIndexedProducer<'a, Item = I>, + I: Send + 'a, + { + let a = self; + let b = other.into_par_iter(); + + let len_a = a.len_hint(); + let len_b = b.len_hint(); + + if len_a <= len_b { + a.take(len_a).interleave(b.take(len_a)) + } else { + a.take(len_b + 1).interleave(b.take(len_b)) + } + } + + /// Creates an iterator that yields the first `n` elements. + /// + /// # Examples + /// + /// ``` + /// use rayon::prelude::*; + /// + /// let result: Vec<_> = (0..100) + /// .into_par_iter() + /// .take(5) + /// .collect(); + /// + /// assert_eq!(result, [0, 1, 2, 3, 4]); + /// ``` + fn take(self, n: usize) -> Take { + Take::new(self, n) + } } diff --git a/asparit/src/iter/interleave.rs b/asparit/src/iter/interleave.rs new file mode 100644 index 0000000..d00d541 --- /dev/null +++ b/asparit/src/iter/interleave.rs @@ -0,0 +1,346 @@ +use std::cmp::Ordering; +use std::iter::{DoubleEndedIterator, ExactSizeIterator, Fuse, Iterator}; + +use crate::{ + Consumer, Executor, ExecutorCallback, IndexedParallelIterator, IndexedProducer, + IndexedProducerCallback, ParallelIterator, Producer, Reducer, Setup, WithIndexedProducer, + WithSetup, +}; + +pub struct Interleave { + iterator_a: XA, + iterator_b: XB, +} + +impl Interleave { + pub fn new(iterator_a: XA, iterator_b: XB) -> Self { + Self { + iterator_a, + iterator_b, + } + } +} + +impl<'a, XA, XB, I> ParallelIterator<'a> for Interleave +where + XA: IndexedParallelIterator<'a, Item = I> + WithIndexedProducer<'a, Item = I>, + XB: IndexedParallelIterator<'a, Item = I> + WithIndexedProducer<'a, Item = I>, + I: Send + 'a, +{ + type Item = I; + + fn drive(self, executor: E, consumer: C) -> E::Result + where + E: Executor<'a, D>, + C: Consumer + 'a, + D: Send + 'a, + R: Reducer + Send + 'a, + { + self.with_indexed_producer(ExecutorCallback::new(executor, consumer)) + } + + fn len_hint_opt(&self) -> Option { + Some(self.len_hint()) + } +} + +impl<'a, XA, XB, I> IndexedParallelIterator<'a> for Interleave +where + XA: IndexedParallelIterator<'a, Item = I> + WithIndexedProducer<'a, Item = I>, + XB: IndexedParallelIterator<'a, Item = I> + WithIndexedProducer<'a, Item = I>, + I: Send + 'a, +{ + fn drive_indexed(self, executor: E, consumer: C) -> E::Result + where + E: Executor<'a, D>, + C: Consumer + 'a, + D: Send + 'a, + R: Reducer + Send + 'a, + { + self.with_indexed_producer(ExecutorCallback::new(executor, consumer)) + } + + fn len_hint(&self) -> usize { + self.iterator_a.len_hint() + self.iterator_b.len_hint() + } +} + +impl<'a, XA, XB, I> WithIndexedProducer<'a> for Interleave +where + XA: WithIndexedProducer<'a, Item = I>, + XB: WithIndexedProducer<'a, Item = I>, + I: Send + 'a, +{ + type Item = I; + + fn with_indexed_producer(self, base: CB) -> CB::Output + where + CB: IndexedProducerCallback<'a, Self::Item>, + { + let iterator_a = self.iterator_a; + let iterator_b = self.iterator_b; + + iterator_a.with_indexed_producer(CallbackA { base, iterator_b }) + } +} + +/* CallbackA */ + +struct CallbackA { + base: CB, + iterator_b: XB, +} + +impl<'a, CB, XB, I> IndexedProducerCallback<'a, I> for CallbackA +where + CB: IndexedProducerCallback<'a, I>, + XB: WithIndexedProducer<'a, Item = I>, + I: Send + 'a, +{ + type Output = CB::Output; + + fn callback

(self, producer_a: P) -> Self::Output + where + P: IndexedProducer + 'a, + { + let CallbackA { base, iterator_b } = self; + + iterator_b.with_indexed_producer(CallbackB { base, producer_a }) + } +} + +/* CallbackB */ + +struct CallbackB { + base: CB, + producer_a: PA, +} + +impl<'a, CB, PA, I> IndexedProducerCallback<'a, I> for CallbackB +where + CB: IndexedProducerCallback<'a, I>, + PA: IndexedProducer + 'a, + I: Send + 'a, +{ + type Output = CB::Output; + + fn callback

(self, producer_b: P) -> Self::Output + where + P: IndexedProducer + 'a, + { + let CallbackB { base, producer_a } = self; + + let producer = InterleaveProducer { + producer_a, + producer_b, + a_is_next: false, + }; + + base.callback(producer) + } +} + +/* InterleaveProducer */ + +struct InterleaveProducer { + producer_a: PA, + producer_b: PB, + a_is_next: bool, +} + +impl WithSetup for InterleaveProducer +where + PA: WithSetup, + PB: WithSetup, +{ + fn setup(&self) -> Setup { + let a = self.producer_a.setup(); + let b = self.producer_b.setup(); + + a.merge(b) + } +} + +impl Producer for InterleaveProducer +where + PA: IndexedProducer, + PB: IndexedProducer, +{ + type Item = I; + type IntoIter = InterleaveIter; + + fn into_iter(self) -> Self::IntoIter { + InterleaveIter { + iter_a: self.producer_a.into_iter().fuse(), + iter_b: self.producer_b.into_iter().fuse(), + a_is_next: self.a_is_next, + } + } + + fn split(self) -> (Self, Option) { + let len = self.len(); + + if len < 2 { + return (self, None); + } + + let (left, right) = self.split_at(len / 2); + + (left, Some(right)) + } +} + +impl IndexedProducer for InterleaveProducer +where + PA: IndexedProducer, + PB: IndexedProducer, +{ + type Item = I; + type IntoIter = InterleaveIter; + + fn into_iter(self) -> Self::IntoIter { + InterleaveIter { + iter_a: self.producer_a.into_iter().fuse(), + iter_b: self.producer_b.into_iter().fuse(), + a_is_next: self.a_is_next, + } + } + + fn len(&self) -> usize { + self.producer_a.len() + self.producer_b.len() + } + + /// We know 0 < index <= self.producer_a.len() + self.producer_a.len() + /// + /// Find a, b satisfying: + /// + /// (1) 0 < a <= self.producer_a.len() + /// (2) 0 < b <= self.producer_b.len() + /// (3) a + b == index + /// + /// For even splits, set a = b = index/2. + /// For odd splits, set a = (index / 2) + 1, b = index / 2, if `a` + /// should yield the next element, otherwise, if `b` should yield + /// the next element, set a = index / 2 and b = (index/2) + 1 + fn split_at(self, index: usize) -> (Self, Self) { + #[inline] + fn odd_offset(flag: bool) -> usize { + (!flag) as usize + } + + let even = index % 2 == 0; + let idx = index >> 1; + + let (index_a, index_b) = ( + idx + odd_offset(even || self.a_is_next), + idx + odd_offset(even || !self.a_is_next), + ); + + let (index_a, index_b) = + if self.producer_a.len() >= index_a && self.producer_b.len() >= index_b { + (index_a, index_b) + } else if self.producer_a.len() >= index_a { + (index - self.producer_b.len(), self.producer_b.len()) + } else { + (self.producer_a.len(), index - self.producer_a.len()) + }; + + let trailing_a_is_next = even == self.a_is_next; + let (left_a, right_a) = self.producer_a.split_at(index_a); + let (left_b, right_b) = self.producer_b.split_at(index_b); + + let left = Self { + producer_a: left_a, + producer_b: left_b, + a_is_next: self.a_is_next, + }; + let right = Self { + producer_a: right_a, + producer_b: right_b, + a_is_next: trailing_a_is_next, + }; + + (left, right) + } +} + +/* InterleaveIter */ + +struct InterleaveIter { + iter_a: Fuse, + iter_b: Fuse, + a_is_next: bool, +} + +impl Iterator for InterleaveIter +where + IA: Iterator, + IB: Iterator, +{ + type Item = I; + + #[inline] + fn next(&mut self) -> Option { + self.a_is_next = !self.a_is_next; + + if self.a_is_next { + match self.iter_a.next() { + None => self.iter_b.next(), + r => r, + } + } else { + match self.iter_b.next() { + None => self.iter_a.next(), + r => r, + } + } + } + + fn size_hint(&self) -> (usize, Option) { + let (min_a, max_a) = self.iter_a.size_hint(); + let (min_b, max_b) = self.iter_b.size_hint(); + + let min = min_a.saturating_add(min_b); + let max = match (max_a, max_b) { + (Some(a), Some(b)) => a.checked_add(b), + _ => None, + }; + + (min, max) + } +} + +impl DoubleEndedIterator for InterleaveIter +where + IA: DoubleEndedIterator + ExactSizeIterator, + IB: DoubleEndedIterator + ExactSizeIterator, +{ + #[inline] + fn next_back(&mut self) -> Option { + let len_a = self.iter_a.len(); + let len_b = self.iter_b.len(); + + match len_a.cmp(&len_b) { + Ordering::Less => self.iter_a.next_back(), + Ordering::Greater => self.iter_b.next_back(), + Ordering::Equal => { + if self.a_is_next { + self.iter_a.next_back() + } else { + self.iter_b.next_back() + } + } + } + } +} + +impl ExactSizeIterator for InterleaveIter +where + IA: ExactSizeIterator, + IB: ExactSizeIterator, +{ + #[inline] + fn len(&self) -> usize { + self.iter_a.len() + self.iter_b.len() + } +} diff --git a/asparit/src/iter/mod.rs b/asparit/src/iter/mod.rs index 28b480e..ac9bbe4 100644 --- a/asparit/src/iter/mod.rs +++ b/asparit/src/iter/mod.rs @@ -10,6 +10,7 @@ pub mod flatten; pub mod fold; pub mod for_each; pub mod inspect; +pub mod interleave; pub mod intersperse; pub mod map; pub mod map_init; @@ -23,6 +24,7 @@ pub mod product; pub mod reduce; pub mod splits; pub mod sum; +pub mod take; pub mod try_fold; pub mod try_for_each; pub mod try_reduce; @@ -37,12 +39,6 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn test_for_each() { - use ::std::sync::atomic::{AtomicUsize, Ordering}; - use ::std::sync::Arc; - - let i = Arc::new(AtomicUsize::new(0)); - let j = Arc::new(AtomicUsize::new(0)); - let a = vec![ vec![1usize, 2usize], vec![3usize, 4usize], @@ -54,46 +50,20 @@ mod tests { vec![11usize, 12usize], ]; - let (x, y, z): (Vec<_>, Vec<_>, Vec<_>) = a - .par_iter() + a.par_iter() .cloned() .chain(b) - .update(|x| x.push(0)) - .zip_eq(vec![10usize, 11usize, 12usize, 13usize, 14usize, 15usize]) - .map(|x| x.0) - .flatten_iter() - .intersperse(100) - .panic_fuse() - .map(Some) - .while_some() - .map_init( - move || i.fetch_add(1, Ordering::Relaxed), - |init, item| (*init, item), - ) - .map_init( - move || j.fetch_add(2, Ordering::Relaxed), - |init, (init2, item)| (*init, init2, item), + .with_splits(1) + .interleave_shortest( + vec![vec![50, 51], vec![52, 53], vec![54, 55]] + .into_par_iter() + .take(2), ) - .with_splits(16) - .inspect(|x| { - println!( - "Thread ID = {:?}; Item = {:?}", - ::std::thread::current().id(), - x - ) - }) - .partition_map(|(i, j, k)| match j % 3 { - 0 => (Some(i), None, None), - 1 => (None, Some(j), None), - 2 => (None, None, Some(k)), - _ => unreachable!(), + .for_each(|x| { + dbg!(x); }) .exec() .await; - - dbg!(&x); - dbg!(&y); - dbg!(&z); } #[tokio::test] diff --git a/asparit/src/iter/take.rs b/asparit/src/iter/take.rs new file mode 100644 index 0000000..3127b4a --- /dev/null +++ b/asparit/src/iter/take.rs @@ -0,0 +1,100 @@ +use std::cmp::min; + +use crate::{ + Consumer, Executor, ExecutorCallback, IndexedParallelIterator, IndexedProducer, + IndexedProducerCallback, ParallelIterator, Reducer, WithIndexedProducer, +}; + +pub struct Take { + base: X, + len: usize, +} + +impl Take { + pub fn new(base: X, len: usize) -> Self { + Self { base, len } + } +} + +impl<'a, X, I> ParallelIterator<'a> for Take +where + X: IndexedParallelIterator<'a, Item = I> + WithIndexedProducer<'a, Item = I>, + I: Send + 'a, +{ + type Item = I; + + fn drive(self, executor: E, consumer: C) -> E::Result + where + E: Executor<'a, D>, + C: Consumer + 'a, + D: Send + 'a, + R: Reducer + Send + 'a, + { + self.with_indexed_producer(ExecutorCallback::new(executor, consumer)) + } + + fn len_hint_opt(&self) -> Option { + self.base.len_hint_opt().map(|len| min(len, self.len)) + } +} + +impl<'a, X, I> IndexedParallelIterator<'a> for Take +where + X: IndexedParallelIterator<'a, Item = I> + WithIndexedProducer<'a, Item = I>, + I: Send + 'a, +{ + fn drive_indexed(self, executor: E, consumer: C) -> E::Result + where + E: Executor<'a, D>, + C: Consumer + 'a, + D: Send + 'a, + R: Reducer + Send + 'a, + { + self.with_indexed_producer(ExecutorCallback::new(executor, consumer)) + } + + fn len_hint(&self) -> usize { + min(self.base.len_hint(), self.len) + } +} + +impl<'a, X> WithIndexedProducer<'a> for Take +where + X: WithIndexedProducer<'a>, +{ + type Item = X::Item; + + fn with_indexed_producer(self, base: CB) -> CB::Output + where + CB: IndexedProducerCallback<'a, Self::Item>, + { + self.base.with_indexed_producer(TakeCallback { + base, + len: self.len, + }) + } +} + +/* TakeCallback */ + +struct TakeCallback { + base: CB, + len: usize, +} + +impl<'a, CB, I> IndexedProducerCallback<'a, I> for TakeCallback +where + CB: IndexedProducerCallback<'a, I>, +{ + type Output = CB::Output; + + fn callback

(self, producer: P) -> Self::Output + where + P: IndexedProducer + 'a, + { + let len = dbg!(min(self.len, producer.len())); + let (producer, _) = producer.split_at(len); + + self.base.callback(producer) + } +}