diff --git a/asparit/src/core/iterator.rs b/asparit/src/core/iterator.rs index 6a64cd6..ebbe376 100644 --- a/asparit/src/core/iterator.rs +++ b/asparit/src/core/iterator.rs @@ -20,6 +20,7 @@ use crate::{ fold::{Fold, FoldWith}, for_each::ForEach, inspect::Inspect, + intersperse::Intersperse, map::Map, map_init::MapInit, map_with::MapWith, @@ -1707,6 +1708,25 @@ pub trait ParallelIterator<'a>: Sized + Send { fn partition_map(self, operation: O) -> PartitionMap { PartitionMap::new(self, operation) } + + /// Intersperses clones of an element between items of this iterator. + /// + /// # Examples + /// + /// ``` + /// use rayon::prelude::*; + /// + /// let x = vec![1, 2, 3]; + /// let r: Vec<_> = x.into_par_iter().intersperse(-1).collect(); + /// + /// assert_eq!(r, vec![1, -1, 2, -1, 3]); + /// ``` + fn intersperse(self, item: Self::Item) -> Intersperse + where + Self::Item: Clone, + { + Intersperse::new(self, item) + } } /// An iterator that supports "random access" to its data, meaning diff --git a/asparit/src/executor/tokio.rs b/asparit/src/executor/tokio.rs index 995d3db..fb16ea3 100644 --- a/asparit/src/executor/tokio.rs +++ b/asparit/src/executor/tokio.rs @@ -213,7 +213,7 @@ impl Splitter { #[inline] fn try_split(&mut self) -> bool { - if self.splits > 0 { + if self.splits > 1 { self.splits /= 2; true diff --git a/asparit/src/iter/intersperse.rs b/asparit/src/iter/intersperse.rs new file mode 100644 index 0000000..279611e --- /dev/null +++ b/asparit/src/iter/intersperse.rs @@ -0,0 +1,582 @@ +use std::iter::{once, DoubleEndedIterator, ExactSizeIterator, Fuse, Iterator}; + +use crate::{ + Consumer, Executor, Folder, IndexedParallelIterator, IndexedProducer, IndexedProducerCallback, + ParallelIterator, Producer, ProducerCallback, Reducer, +}; + +/* Intersperse */ + +pub struct Intersperse { + base: X, + item: I, +} + +impl Intersperse { + pub fn new(base: X, item: I) -> Self { + Self { base, item } + } +} + +impl<'a, X, I> ParallelIterator<'a> for Intersperse +where + X: ParallelIterator<'a, Item = I>, + I: Clone + Send + 'a, +{ + type Item = I; + + fn drive(self, executor: E, base: C) -> E::Result + where + E: Executor<'a, D>, + C: Consumer + 'a, + D: Send + 'a, + R: Reducer + Send + 'a, + { + self.base.drive( + executor, + IntersperseConsumer { + base, + item: self.item, + clone_first: false, + }, + ) + } + + fn with_producer(self, base: CB) -> CB::Output + where + CB: ProducerCallback<'a, Self::Item>, + { + let item = self.item; + + self.base.with_producer(IntersperseCallback { base, item }) + } + + fn len_hint_opt(&self) -> Option { + match self.base.len_hint_opt()? { + 0 => Some(0), + len => len.checked_add(len - 1), + } + } +} + +impl<'a, X, I> IndexedParallelIterator<'a> for Intersperse +where + X: IndexedParallelIterator<'a, Item = I>, + I: Clone + Send + 'a, +{ + fn drive_indexed(self, executor: E, base: C) -> E::Result + where + E: Executor<'a, D>, + C: Consumer + 'a, + D: Send + 'a, + R: Reducer + Send + 'a, + { + self.base.drive_indexed( + executor, + IntersperseConsumer { + base, + item: self.item, + clone_first: false, + }, + ) + } + + fn with_producer_indexed(self, base: CB) -> CB::Output + where + CB: IndexedProducerCallback<'a, Self::Item>, + { + let item = self.item; + + self.base + .with_producer_indexed(IntersperseCallback { base, item }) + } + + fn len_hint(&self) -> usize { + match self.base.len_hint() { + 0 => 0, + len => len - 1, + } + } +} + +/* IntersperseCallback */ + +struct IntersperseCallback { + base: CB, + item: I, +} + +impl<'a, CB, I> ProducerCallback<'a, I> for IntersperseCallback +where + CB: ProducerCallback<'a, I>, + I: Clone + Send + 'a, +{ + type Output = CB::Output; + + fn callback

(self, base: P) -> Self::Output + where + P: Producer + 'a, + { + let item = self.item; + let len = None; + + self.base.callback(IntersperseProducer { + base, + item, + len, + clone_first: false, + }) + } +} + +impl<'a, CB, I> IndexedProducerCallback<'a, I> for IntersperseCallback +where + CB: IndexedProducerCallback<'a, I>, + I: Clone + Send + 'a, +{ + type Output = CB::Output; + + fn callback

(self, base: P) -> Self::Output + where + P: IndexedProducer + 'a, + { + let item = self.item; + let len = Some(base.len()); + + self.base.callback(IntersperseProducer { + base, + item, + len, + clone_first: false, + }) + } +} + +/* IntersperseProducer */ + +struct IntersperseProducer { + base: P, + item: I, + len: Option, + clone_first: bool, +} + +impl Producer for IntersperseProducer +where + P: Producer, + I: Clone + Send, +{ + type Item = I; + type IntoIter = IntersperseIter; + + fn into_iter(self) -> Self::IntoIter { + IntersperseIter { + base: self.base.into_iter().fuse(), + item: self.item, + item_front: None, + item_back: None, + clone_first: self.clone_first, + clone_last: false, + } + } + + fn split(self) -> (Self, Option) { + let item = self.item; + let (left, right) = self.base.split(); + + let left = Self { + base: left, + item: item.clone(), + len: None, + clone_first: self.clone_first, + }; + let right = right.map(move |base| Self { + base, + item, + len: None, + clone_first: true, + }); + + (left, right) + } + + fn splits(&self) -> Option { + self.base.splits() + } + + fn fold_with(self, base: F) -> F + where + F: Folder, + { + let item = self.item; + let clone_first = self.clone_first; + let folder = IntersperseFolder { + base, + item, + clone_first, + }; + + self.base.fold_with(folder).base + } +} + +impl IndexedProducer for IntersperseProducer +where + P: IndexedProducer, + I: Clone + Send, +{ + type Item = I; + type IntoIter = IntersperseIter; + + fn into_iter(self) -> Self::IntoIter { + let len = self.len.unwrap(); + + IntersperseIter { + base: self.base.into_iter().fuse(), + item: self.item, + item_front: None, + item_back: None, + clone_first: len > 0 && self.clone_first, + clone_last: len > 1 && ((len & 1 == 0) ^ self.clone_first), + } + } + + #[allow(clippy::let_and_return)] + fn len(&self) -> usize { + let len = self.len.unwrap(); + + let clone_first = len > 0 && self.clone_first; + let clone_last = len > 1 && ((len & 1 == 0) ^ self.clone_first); + + let len = 2 * self.len.unwrap(); + let len = len + clone_first as usize; + let len = len - (len > 0 && !clone_last) as usize; + + len + } + + fn split_at(self, index: usize) -> (Self, Self) { + let len = self.len.unwrap(); + + debug_assert!(index <= len); + + // The left needs half of the items from the base producer, and the + // other half will be our interspersed item. If we're not leading with + // a cloned item, then we need to round up the base number of items, + // otherwise round down. + let base_index = (index + !self.clone_first as usize) / 2; + let (left_base, right_base) = self.base.split_at(base_index); + + let left = IntersperseProducer { + base: left_base, + item: self.item.clone(), + len: Some(index), + clone_first: self.clone_first, + }; + + let right = IntersperseProducer { + base: right_base, + item: self.item, + len: Some(len - index), + clone_first: (index & 1 == 1) ^ self.clone_first, + }; + + (left, right) + } + + fn splits(&self) -> Option { + self.base.splits() + } + + fn min_len(&self) -> Option { + self.base.min_len() + } + + fn max_len(&self) -> Option { + self.base.max_len() + } + + fn fold_with(self, base: F) -> F + where + F: Folder, + { + let folder = IntersperseFolder { + base, + item: self.item, + clone_first: self.clone_first, + }; + + self.base.fold_with(folder).base + } +} + +/* IntersperseIter */ + +struct IntersperseIter { + base: Fuse, + item: I, + item_front: Option, + item_back: Option, + clone_first: bool, + clone_last: bool, +} + +impl IntersperseIter { + fn calc_len(&self, len: usize) -> usize { + let mut len = 2 * len; + + if self.clone_first { + len += 1 + } + + if len > 0 && !self.clone_last { + len -= 1; + } + + len + } +} + +impl IntersperseIter +where + X: Iterator, +{ + fn get_next_front(&mut self) -> Option { + if let Some(item) = self.item_front.take() { + return Some(item); + } + + if let Some(item) = self.base.next() { + return Some(item); + } + + if let Some(item) = self.item_back.take() { + return Some(item); + } + + None + } +} + +impl IntersperseIter +where + X: DoubleEndedIterator, +{ + fn get_next_back(&mut self) -> Option { + if let Some(item) = self.item_back.take() { + return Some(item); + } + + if let Some(item) = self.base.next_back() { + return Some(item); + } + + if let Some(item) = self.item_front.take() { + return Some(item); + } + + None + } +} + +impl Iterator for IntersperseIter +where + X: Iterator, + I: Clone, +{ + type Item = I; + + fn next(&mut self) -> Option { + if self.clone_first { + self.clone_first = false; + + Some(self.item.clone()) + } else if let Some(next) = self.get_next_front() { + if let Some(item) = self.get_next_front() { + self.clone_first = true; + self.item_front = Some(item); + } + + Some(next) + } else if self.clone_last { + self.clone_last = false; + + Some(self.item.clone()) + } else { + None + } + } + + fn size_hint(&self) -> (usize, Option) { + let (beg, end) = self.base.size_hint(); + + let beg = self.calc_len(beg); + let end = end.map(|end| self.calc_len(end)); + + (beg, end) + } +} + +impl DoubleEndedIterator for IntersperseIter +where + X: DoubleEndedIterator, + I: Clone, +{ + fn next_back(&mut self) -> Option { + if self.clone_last { + self.clone_last = false; + + Some(self.item.clone()) + } else if let Some(next) = self.get_next_back() { + if let Some(item) = self.get_next_back() { + self.clone_last = true; + self.item_back = Some(item); + } + + Some(next) + } else if self.clone_first { + self.clone_first = false; + + Some(self.item.clone()) + } else { + None + } + } +} + +impl ExactSizeIterator for IntersperseIter +where + X: ExactSizeIterator, + I: Clone, +{ + fn len(&self) -> usize { + self.calc_len(self.base.len()) + } +} + +/* IntersperseConsumer */ + +struct IntersperseConsumer { + base: C, + item: I, + clone_first: bool, +} + +impl Consumer for IntersperseConsumer +where + C: Consumer, + I: Clone + Send, +{ + type Folder = IntersperseFolder; + type Reducer = C::Reducer; + type Result = C::Result; + + fn split(self) -> (Self, Self, Self::Reducer) { + let (left, right, reducer) = self.base.split(); + + let left = Self { + base: left, + item: self.item.clone(), + clone_first: self.clone_first, + }; + let right = Self { + base: right, + item: self.item, + clone_first: true, + }; + + (left, right, reducer) + } + + fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) { + let base_index = index + index.saturating_sub(!self.clone_first as usize); + let (left, right, reducer) = self.base.split_at(base_index); + + let left = Self { + base: left, + item: self.item.clone(), + clone_first: self.clone_first, + }; + let right = Self { + base: right, + item: self.item, + clone_first: true, + }; + + (left, right, reducer) + } + + fn into_folder(self) -> Self::Folder { + IntersperseFolder { + base: self.base.into_folder(), + item: self.item, + clone_first: self.clone_first, + } + } + + fn is_full(&self) -> bool { + self.base.is_full() + } +} + +/* IntersperseFolder */ + +struct IntersperseFolder { + base: F, + item: I, + clone_first: bool, +} + +impl Folder for IntersperseFolder +where + F: Folder, + I: Clone, +{ + type Result = F::Result; + + fn consume(mut self, item: I) -> Self { + if self.clone_first { + self.base = self.base.consume(self.item.clone()); + } else { + self.clone_first = true; + } + + self.base = self.base.consume(item); + + self + } + + fn consume_iter(self, iter: X) -> Self + where + X: IntoIterator, + { + let mut clone_first = self.clone_first; + let item = self.item; + let iter = iter.into_iter().flat_map(|x| { + let first = if clone_first { + Some(item.clone()) + } else { + clone_first = true; + + None + }; + + first.into_iter().chain(once(x)) + }); + let base = self.base.consume_iter(iter); + + IntersperseFolder { + base, + item, + clone_first, + } + } + + fn complete(self) -> Self::Result { + self.base.complete() + } + + fn is_full(&self) -> bool { + self.base.is_full() + } +} diff --git a/asparit/src/iter/mod.rs b/asparit/src/iter/mod.rs index 76f17e1..6b16cdd 100644 --- a/asparit/src/iter/mod.rs +++ b/asparit/src/iter/mod.rs @@ -10,6 +10,7 @@ pub mod flatten; pub mod fold; pub mod for_each; pub mod inspect; +pub mod intersperse; pub mod map; pub mod map_init; pub mod map_with; @@ -57,6 +58,7 @@ mod tests { .chain(b) .update(|x| x.push(0)) .flatten_iter() + .intersperse(100) .panic_fuse() .map(Some) .while_some()