| @@ -33,7 +33,9 @@ use crate::{ | |||||
| partition::{Partition, PartitionMap}, | partition::{Partition, PartitionMap}, | ||||
| product::Product, | product::Product, | ||||
| reduce::{Reduce, ReduceWith}, | reduce::{Reduce, ReduceWith}, | ||||
| skip::Skip, | |||||
| splits::Splits, | splits::Splits, | ||||
| step_by::StepBy, | |||||
| sum::Sum, | sum::Sum, | ||||
| take::Take, | take::Take, | ||||
| try_fold::{TryFold, TryFoldWith}, | try_fold::{TryFold, TryFoldWith}, | ||||
| @@ -2061,6 +2063,43 @@ pub trait IndexedParallelIterator<'a>: ParallelIterator<'a> { | |||||
| Enumerate::new(self) | Enumerate::new(self) | ||||
| } | } | ||||
| /// Creates an iterator that steps by the given amount | |||||
| /// | |||||
| /// # Examples | |||||
| /// | |||||
| /// ``` | |||||
| ///use rayon::prelude::*; | |||||
| /// | |||||
| /// let range = (3..10); | |||||
| /// let result: Vec<i32> = range | |||||
| /// .into_par_iter() | |||||
| /// .step_by(3) | |||||
| /// .collect(); | |||||
| /// | |||||
| /// assert_eq!(result, [3, 6, 9]) | |||||
| /// ``` | |||||
| fn step_by(self, step: usize) -> StepBy<Self> { | |||||
| StepBy::new(self, step) | |||||
| } | |||||
| /// Creates an iterator that skips the first `n` elements. | |||||
| /// | |||||
| /// # Examples | |||||
| /// | |||||
| /// ``` | |||||
| /// use rayon::prelude::*; | |||||
| /// | |||||
| /// let result: Vec<_> = (0..100) | |||||
| /// .into_par_iter() | |||||
| /// .skip(95) | |||||
| /// .collect(); | |||||
| /// | |||||
| /// assert_eq!(result, [95, 96, 97, 98, 99]); | |||||
| /// ``` | |||||
| fn skip(self, n: usize) -> Skip<Self> { | |||||
| Skip::new(self, n) | |||||
| } | |||||
| /// Creates an iterator that yields the first `n` elements. | /// Creates an iterator that yields the first `n` elements. | ||||
| /// | /// | ||||
| /// # Examples | /// # Examples | ||||
| @@ -137,7 +137,7 @@ where | |||||
| iterator_a | iterator_a | ||||
| .zip(iterator_b) | .zip(iterator_b) | ||||
| .all(move |(x, y)| dbg!(PartialEq::eq(&x, &y)) == expected) | |||||
| .all(move |(x, y)| PartialEq::eq(&x, &y) == expected) | |||||
| .exec_with(executor) | .exec_with(executor) | ||||
| } | } | ||||
| } | } | ||||
| @@ -7,6 +7,8 @@ use crate::{ | |||||
| WithSetup, | WithSetup, | ||||
| }; | }; | ||||
| /* Enumerate */ | |||||
| pub struct Enumerate<X> { | pub struct Enumerate<X> { | ||||
| base: X, | base: X, | ||||
| } | } | ||||
| @@ -25,7 +25,9 @@ pub mod panic_fuse; | |||||
| pub mod partition; | pub mod partition; | ||||
| pub mod product; | pub mod product; | ||||
| pub mod reduce; | pub mod reduce; | ||||
| pub mod skip; | |||||
| pub mod splits; | pub mod splits; | ||||
| pub mod step_by; | |||||
| pub mod sum; | pub mod sum; | ||||
| pub mod take; | pub mod take; | ||||
| pub mod try_fold; | pub mod try_fold; | ||||
| @@ -42,8 +44,10 @@ mod tests { | |||||
| #[tokio::test(flavor = "multi_thread")] | #[tokio::test(flavor = "multi_thread")] | ||||
| async fn test_for_each() { | async fn test_for_each() { | ||||
| let x = vec![0usize, 1, 2, 3, 4] | |||||
| let x = vec![0usize, 1, 2, 3, 4, 5, 6, 7, 8, 9] | |||||
| .into_par_iter() | .into_par_iter() | ||||
| .skip(1) | |||||
| .step_by(3) | |||||
| .enumerate() | .enumerate() | ||||
| .for_each(|x| { | .for_each(|x| { | ||||
| dbg!(x); | dbg!(x); | ||||
| @@ -0,0 +1,100 @@ | |||||
| use std::cmp::min; | |||||
| use crate::{ | |||||
| Consumer, Executor, ExecutorCallback, IndexedParallelIterator, IndexedProducer, | |||||
| IndexedProducerCallback, ParallelIterator, Reducer, WithIndexedProducer, | |||||
| }; | |||||
| pub struct Skip<X> { | |||||
| base: X, | |||||
| len: usize, | |||||
| } | |||||
| impl<X> Skip<X> { | |||||
| pub fn new(base: X, len: usize) -> Self { | |||||
| Self { base, len } | |||||
| } | |||||
| } | |||||
| impl<'a, X, I> ParallelIterator<'a> for Skip<X> | |||||
| where | |||||
| X: IndexedParallelIterator<'a, Item = I> + WithIndexedProducer<'a, Item = I>, | |||||
| I: Send + 'a, | |||||
| { | |||||
| type Item = I; | |||||
| fn drive<E, C, D, R>(self, executor: E, consumer: C) -> E::Result | |||||
| where | |||||
| E: Executor<'a, D>, | |||||
| C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | |||||
| D: Send + 'a, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | |||||
| self.with_indexed_producer(ExecutorCallback::new(executor, consumer)) | |||||
| } | |||||
| fn len_hint_opt(&self) -> Option<usize> { | |||||
| self.base.len_hint_opt().map(|len| min(len, self.len)) | |||||
| } | |||||
| } | |||||
| impl<'a, X, I> IndexedParallelIterator<'a> for Skip<X> | |||||
| where | |||||
| X: IndexedParallelIterator<'a, Item = I> + WithIndexedProducer<'a, Item = I>, | |||||
| I: Send + 'a, | |||||
| { | |||||
| fn drive_indexed<E, C, D, R>(self, executor: E, consumer: C) -> E::Result | |||||
| where | |||||
| E: Executor<'a, D>, | |||||
| C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | |||||
| D: Send + 'a, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | |||||
| self.with_indexed_producer(ExecutorCallback::new(executor, consumer)) | |||||
| } | |||||
| fn len_hint(&self) -> usize { | |||||
| min(self.base.len_hint(), self.len) | |||||
| } | |||||
| } | |||||
| impl<'a, X> WithIndexedProducer<'a> for Skip<X> | |||||
| where | |||||
| X: WithIndexedProducer<'a>, | |||||
| { | |||||
| type Item = X::Item; | |||||
| fn with_indexed_producer<CB>(self, base: CB) -> CB::Output | |||||
| where | |||||
| CB: IndexedProducerCallback<'a, Self::Item>, | |||||
| { | |||||
| self.base.with_indexed_producer(SkipCallback { | |||||
| base, | |||||
| len: self.len, | |||||
| }) | |||||
| } | |||||
| } | |||||
| /* SkipCallback */ | |||||
| struct SkipCallback<CB> { | |||||
| base: CB, | |||||
| len: usize, | |||||
| } | |||||
| impl<'a, CB, I> IndexedProducerCallback<'a, I> for SkipCallback<CB> | |||||
| where | |||||
| CB: IndexedProducerCallback<'a, I>, | |||||
| { | |||||
| type Output = CB::Output; | |||||
| fn callback<P>(self, producer: P) -> Self::Output | |||||
| where | |||||
| P: IndexedProducer<Item = I> + 'a, | |||||
| { | |||||
| let index = min(self.len, producer.len()); | |||||
| let (_, producer) = producer.split_at(index); | |||||
| self.base.callback(producer) | |||||
| } | |||||
| } | |||||
| @@ -0,0 +1,185 @@ | |||||
| use std::cmp::min; | |||||
| use crate::{ | |||||
| Consumer, Executor, ExecutorCallback, IndexedParallelIterator, IndexedProducer, | |||||
| IndexedProducerCallback, ParallelIterator, Producer, Reducer, Setup, WithIndexedProducer, | |||||
| WithSetup, | |||||
| }; | |||||
| /* StepBy */ | |||||
| pub struct StepBy<X> { | |||||
| base: X, | |||||
| step: usize, | |||||
| } | |||||
| impl<X> StepBy<X> { | |||||
| pub fn new(base: X, step: usize) -> Self { | |||||
| Self { base, step } | |||||
| } | |||||
| } | |||||
| impl<'a, X, I> ParallelIterator<'a> for StepBy<X> | |||||
| where | |||||
| X: IndexedParallelIterator<'a, Item = I> + WithIndexedProducer<'a, Item = I>, | |||||
| I: Send + 'a, | |||||
| { | |||||
| type Item = I; | |||||
| fn drive<E, C, D, R>(self, executor: E, consumer: C) -> E::Result | |||||
| where | |||||
| E: Executor<'a, D>, | |||||
| C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | |||||
| D: Send + 'a, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | |||||
| self.with_indexed_producer(ExecutorCallback::new(executor, consumer)) | |||||
| } | |||||
| fn len_hint_opt(&self) -> Option<usize> { | |||||
| self.base.len_hint_opt() | |||||
| } | |||||
| } | |||||
| impl<'a, X, I> IndexedParallelIterator<'a> for StepBy<X> | |||||
| where | |||||
| X: IndexedParallelIterator<'a, Item = I> + WithIndexedProducer<'a, Item = I>, | |||||
| I: Send + 'a, | |||||
| { | |||||
| fn drive_indexed<E, C, D, R>(self, executor: E, consumer: C) -> E::Result | |||||
| where | |||||
| E: Executor<'a, D>, | |||||
| C: Consumer<Self::Item, Result = D, Reducer = R> + 'a, | |||||
| D: Send + 'a, | |||||
| R: Reducer<D> + Send + 'a, | |||||
| { | |||||
| self.with_indexed_producer(ExecutorCallback::new(executor, consumer)) | |||||
| } | |||||
| fn len_hint(&self) -> usize { | |||||
| self.base.len_hint() | |||||
| } | |||||
| } | |||||
| impl<'a, X> WithIndexedProducer<'a> for StepBy<X> | |||||
| where | |||||
| X: WithIndexedProducer<'a>, | |||||
| { | |||||
| type Item = X::Item; | |||||
| fn with_indexed_producer<CB>(self, base: CB) -> CB::Output | |||||
| where | |||||
| CB: IndexedProducerCallback<'a, Self::Item>, | |||||
| { | |||||
| self.base.with_indexed_producer(StepByCallback { | |||||
| base, | |||||
| step: self.step, | |||||
| }) | |||||
| } | |||||
| } | |||||
| /* StepByCallback */ | |||||
| struct StepByCallback<CB> { | |||||
| base: CB, | |||||
| step: usize, | |||||
| } | |||||
| impl<'a, CB, I> IndexedProducerCallback<'a, I> for StepByCallback<CB> | |||||
| where | |||||
| CB: IndexedProducerCallback<'a, I>, | |||||
| { | |||||
| type Output = CB::Output; | |||||
| fn callback<P>(self, base: P) -> Self::Output | |||||
| where | |||||
| P: IndexedProducer<Item = I> + 'a, | |||||
| { | |||||
| self.base.callback(StepByProducer { | |||||
| base, | |||||
| step: self.step, | |||||
| }) | |||||
| } | |||||
| } | |||||
| /* StepByProducer */ | |||||
| struct StepByProducer<P> { | |||||
| base: P, | |||||
| step: usize, | |||||
| } | |||||
| impl<P> WithSetup for StepByProducer<P> | |||||
| where | |||||
| P: WithSetup, | |||||
| { | |||||
| fn setup(&self) -> Setup { | |||||
| let Setup { | |||||
| splits, | |||||
| min_len, | |||||
| max_len, | |||||
| } = self.base.setup(); | |||||
| Setup { | |||||
| splits, | |||||
| min_len: min_len.map(|x| if x > 0 { (x - 1) / self.step + 1 } else { x }), | |||||
| max_len: max_len.map(|x| x / self.step), | |||||
| } | |||||
| } | |||||
| } | |||||
| impl<P> Producer for StepByProducer<P> | |||||
| where | |||||
| P: IndexedProducer, | |||||
| { | |||||
| type Item = P::Item; | |||||
| type IntoIter = std::iter::StepBy<P::IntoIter>; | |||||
| fn into_iter(self) -> Self::IntoIter { | |||||
| self.base.into_iter().step_by(self.step) | |||||
| } | |||||
| fn split(self) -> (Self, Option<Self>) { | |||||
| let len = self.len(); | |||||
| if len < 2 { | |||||
| return (self, None); | |||||
| } | |||||
| let (left, right) = self.split_at(len / 2); | |||||
| (left, Some(right)) | |||||
| } | |||||
| } | |||||
| impl<P> IndexedProducer for StepByProducer<P> | |||||
| where | |||||
| P: IndexedProducer, | |||||
| { | |||||
| type Item = P::Item; | |||||
| type IntoIter = std::iter::StepBy<P::IntoIter>; | |||||
| fn into_iter(self) -> Self::IntoIter { | |||||
| self.base.into_iter().step_by(self.step) | |||||
| } | |||||
| fn len(&self) -> usize { | |||||
| self.base.len() / self.step | |||||
| } | |||||
| fn split_at(self, index: usize) -> (Self, Self) { | |||||
| let index = min(index * self.step, self.base.len()); | |||||
| let (left, right) = self.base.split_at(index); | |||||
| let left = Self { | |||||
| base: left, | |||||
| step: self.step, | |||||
| }; | |||||
| let right = Self { | |||||
| base: right, | |||||
| step: self.step, | |||||
| }; | |||||
| (left, right) | |||||
| } | |||||
| } | |||||
| @@ -92,8 +92,8 @@ where | |||||
| where | where | ||||
| P: IndexedProducer<Item = I> + 'a, | P: IndexedProducer<Item = I> + 'a, | ||||
| { | { | ||||
| let len = dbg!(min(self.len, producer.len())); | |||||
| let (producer, _) = producer.split_at(len); | |||||
| let index = min(self.len, producer.len()); | |||||
| let (producer, _) = producer.split_at(index); | |||||
| self.base.callback(producer) | self.base.callback(producer) | ||||
| } | } | ||||