Преглед на файлове

Implemented 'flat_map_iter' and 'flatten_iter' operation

master
Bergmann89 преди 5 години
родител
ревизия
60b6b79711
променени са 3 файла, в които са добавени 356 реда и са изтрити 0 реда
  1. +74
    -0
      asparit/src/core/iterator.rs
  2. +280
    -0
      asparit/src/inner/flatten.rs
  3. +2
    -0
      asparit/src/inner/mod.rs

+ 74
- 0
asparit/src/core/iterator.rs Целия файл

@@ -1,3 +1,5 @@
use std::iter::IntoIterator;

use super::{
Consumer, Executor, FromParallelIterator, IndexedProducerCallback, ProducerCallback, Reducer,
};
@@ -9,6 +11,7 @@ use crate::{
copied::Copied,
filter::Filter,
filter_map::FilterMap,
flatten::{FlatMapIter, FlattenIter},
fold::{Fold, FoldWith},
for_each::ForEach,
inspect::Inspect,
@@ -543,6 +546,77 @@ pub trait ParallelIterator<'a>: Sized + Send {
FilterMap::new(self, operation)
}

/// Applies `operation` to each item of this iterator to get nested serial iterators,
/// producing a new parallel iterator that flattens these back into one.
///
/// # `flat_map_iter` versus `flat_map`
///
/// These two methods are similar but behave slightly differently. With [`flat_map`],
/// each of the nested iterators must be a parallel iterator, and they will be further
/// split up with nested parallelism. With `flat_map_iter`, each nested iterator is a
/// sequential `Iterator`, and we only parallelize _between_ them, while the items
/// produced by each nested iterator are processed sequentially.
///
/// When choosing between these methods, consider whether nested parallelism suits the
/// potential iterators at hand. If there's little computation involved, or its length
/// is much less than the outer parallel iterator, then it may perform better to avoid
/// the overhead of parallelism, just flattening sequentially with `flat_map_iter`.
/// If there is a lot of computation, potentially outweighing the outer parallel
/// iterator, then the nested parallelism of `flat_map` may be worthwhile.
///
/// [`flat_map`]: #method.flat_map
///
/// # Examples
///
/// ```
/// use rayon::prelude::*;
/// use std::cell::RefCell;
///
/// let a = [[1, 2], [3, 4], [5, 6], [7, 8]];
///
/// let par_iter = a.par_iter().flat_map_iter(|a| {
/// // The serial iterator doesn't have to be thread-safe, just its items.
/// let cell_iter = RefCell::new(a.iter().cloned());
/// std::iter::from_fn(move || cell_iter.borrow_mut().next())
/// });
///
/// let vec: Vec<_> = par_iter.collect();
///
/// assert_eq!(&vec[..], &[1, 2, 3, 4, 5, 6, 7, 8]);
/// ```
fn flat_map_iter<O, SI>(self, operation: O) -> FlatMapIter<Self, O>
where
O: Fn(Self::Item) -> SI + Clone + Send + 'a,
SI: IntoIterator,
SI::Item: Send,
{
FlatMapIter::new(self, operation)
}

/// An adaptor that flattens serial-iterable `Item`s into one large iterator.
///
/// See also [`flatten`](#method.flatten) and the analagous comparison of
/// [`flat_map_iter` versus `flat_map`](#flat_map_iter-versus-flat_map).
///
/// # Examples
///
/// ```
/// use rayon::prelude::*;
///
/// let x: Vec<Vec<_>> = vec![vec![1, 2], vec![3, 4]];
/// let iters: Vec<_> = x.into_iter().map(Vec::into_iter).collect();
/// let y: Vec<_> = iters.into_par_iter().flatten_iter().collect();
///
/// assert_eq!(y, vec![1, 2, 3, 4]);
/// ```
fn flatten_iter(self) -> FlattenIter<Self>
where
Self::Item: IntoIterator + Send,
<Self::Item as IntoIterator>::Item: Send,
{
FlattenIter::new(self)
}

/// Reduces the items in the iterator into one item using `operation`.
/// The argument `identity` should be a closure that can produce
/// "identity" value which may be inserted into the sequence as


+ 280
- 0
asparit/src/inner/flatten.rs Целия файл

@@ -0,0 +1,280 @@
use std::iter::IntoIterator;

use crate::{Consumer, Executor, Folder, ParallelIterator, Producer, ProducerCallback, Reducer};

/* FlattenIter */

pub struct FlattenIter<X> {
base: X,
}

impl<X> FlattenIter<X> {
pub fn new(base: X) -> Self {
Self { base }
}
}

impl<'a, X, SI> ParallelIterator<'a> for FlattenIter<X>
where
X: ParallelIterator<'a, Item = SI>,
SI: IntoIterator + Send,
SI::Item: Send,
{
type Item = SI::Item;

fn drive<E, C, D, R>(self, executor: E, consumer: C) -> E::Result
where
E: Executor<'a, D>,
C: Consumer<Self::Item, Result = D, Reducer = R> + 'a,
D: Send,
R: Reducer<D> + Send,
{
self.base.drive(
executor,
FlatMapIterConsumer {
base: consumer,
operation: |x| x,
},
)
}

fn with_producer<CB>(self, callback: CB) -> CB::Output
where
CB: ProducerCallback<'a, Self::Item>,
{
self.base.with_producer(FlatMapIterCallback {
base: callback,
operation: |x| x,
})
}
}

/* FlatMapIter */

pub struct FlatMapIter<X, O> {
base: X,
operation: O,
}

impl<X, O> FlatMapIter<X, O> {
pub fn new(base: X, operation: O) -> Self {
Self { base, operation }
}
}

impl<'a, X, O, SI> ParallelIterator<'a> for FlatMapIter<X, O>
where
X: ParallelIterator<'a>,
O: Fn(X::Item) -> SI + Clone + Send + 'a,
SI: IntoIterator,
SI::Item: Send,
{
type Item = SI::Item;

fn drive<E, C, D, R>(self, executor: E, consumer: C) -> E::Result
where
E: Executor<'a, D>,
C: Consumer<Self::Item, Result = D, Reducer = R> + 'a,
D: Send,
R: Reducer<D> + Send,
{
self.base.drive(
executor,
FlatMapIterConsumer {
base: consumer,
operation: self.operation,
},
)
}

fn with_producer<CB>(self, callback: CB) -> CB::Output
where
CB: ProducerCallback<'a, Self::Item>,
{
self.base.with_producer(FlatMapIterCallback {
base: callback,
operation: self.operation,
})
}
}

/* FlatMapIterConsumer */

struct FlatMapIterConsumer<C, O> {
base: C,
operation: O,
}

impl<'a, C, O, T, SI> Consumer<T> for FlatMapIterConsumer<C, O>
where
C: Consumer<SI::Item>,
O: Fn(T) -> SI + Clone + Send,
SI: IntoIterator,
{
type Folder = FlatMapIterFolder<C::Folder, O>;
type Reducer = C::Reducer;
type Result = C::Result;

fn split(self) -> (Self, Self, Self::Reducer) {
let (left, right, reducer) = self.base.split();

let left = FlatMapIterConsumer {
base: left,
operation: self.operation.clone(),
};
let right = FlatMapIterConsumer {
base: right,
operation: self.operation,
};

(left, right, reducer)
}

fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
let (left, right, reducer) = self.base.split_at(index);

let left = FlatMapIterConsumer {
base: left,
operation: self.operation.clone(),
};
let right = FlatMapIterConsumer {
base: right,
operation: self.operation,
};

(left, right, reducer)
}

fn into_folder(self) -> Self::Folder {
FlatMapIterFolder {
base: self.base.into_folder(),
operation: self.operation,
}
}

fn is_full(&self) -> bool {
self.base.is_full()
}
}

/* FlatMapIterFolder */

struct FlatMapIterFolder<F, O> {
base: F,
operation: O,
}

impl<F, O, T, SI> Folder<T> for FlatMapIterFolder<F, O>
where
F: Folder<SI::Item>,
O: Fn(T) -> SI + Clone,
SI: IntoIterator,
{
type Result = F::Result;

fn consume(mut self, item: T) -> Self {
let iter = (self.operation)(item);

self.base = self.base.consume_iter(iter);

self
}

fn consume_iter<X>(mut self, iter: X) -> Self
where
X: IntoIterator<Item = T>,
{
self.base = self
.base
.consume_iter(iter.into_iter().flat_map(self.operation.clone()));

self
}

fn complete(self) -> Self::Result {
self.base.complete()
}

fn is_full(&self) -> bool {
self.base.is_full()
}
}

/* FlatMapIterCallback */

struct FlatMapIterCallback<CB, O> {
base: CB,
operation: O,
}

impl<'a, CB, O, T, SI> ProducerCallback<'a, T> for FlatMapIterCallback<CB, O>
where
CB: ProducerCallback<'a, SI::Item>,
O: Fn(T) -> SI + Clone + Send + 'a,
SI: IntoIterator,
{
type Output = CB::Output;

fn callback<P>(self, producer: P) -> Self::Output
where
P: Producer<Item = T> + 'a,
{
self.base.callback(FlatMapIterProducer {
base: producer,
operation: self.operation,
})
}
}

/* FlatMapIterProducer */

struct FlatMapIterProducer<P, O> {
base: P,
operation: O,
}

impl<'a, P, O, T, SI> Producer for FlatMapIterProducer<P, O>
where
P: Producer<Item = T>,
O: Fn(T) -> SI + Clone + Send,
SI: IntoIterator,
{
type Item = SI::Item;
type IntoIter = std::iter::FlatMap<P::IntoIter, SI, O>;

fn into_iter(self) -> Self::IntoIter {
self.base.into_iter().flat_map(self.operation)
}

fn split(self) -> (Self, Option<Self>) {
let operation = self.operation;
let (left, right) = self.base.split();

let left = FlatMapIterProducer {
base: left,
operation: operation.clone(),
};
let right = right.map(move |right| FlatMapIterProducer {
base: right,
operation,
});

(left, right)
}

fn splits(&self) -> Option<usize> {
self.base.splits()
}

fn fold_with<F>(self, folder: F) -> F
where
F: Folder<Self::Item>,
{
self.base
.fold_with(FlatMapIterFolder {
base: folder,
operation: self.operation,
})
.base
}
}

+ 2
- 0
asparit/src/inner/mod.rs Целия файл

@@ -3,6 +3,7 @@ pub mod collect;
pub mod copied;
pub mod filter;
pub mod filter_map;
pub mod flatten;
pub mod fold;
pub mod for_each;
pub mod inspect;
@@ -38,6 +39,7 @@ mod tests {
.par_iter()
.cloned()
.update(|x| x.push(0))
.flatten_iter()
.map_init(
move || i.fetch_add(1, Ordering::Relaxed),
|init, item| (item, *init),


Зареждане…
Отказ
Запис