Bläddra i källkod

Implemented 'unzip' operation

master
Bergmann89 5 år sedan
förälder
incheckning
1f789d179d
12 ändrade filer med 417 tillägg och 27 borttagningar
  1. +1
    -1
      asparit/src/core/executor.rs
  2. +47
    -0
      asparit/src/core/extend.rs
  3. +5
    -5
      asparit/src/core/from_iter.rs
  4. +40
    -1
      asparit/src/core/iterator.rs
  5. +2
    -0
      asparit/src/core/mod.rs
  6. +2
    -2
      asparit/src/executor/sequential.rs
  7. +2
    -2
      asparit/src/executor/tokio.rs
  8. +1
    -1
      asparit/src/iter/collect.rs
  9. +9
    -11
      asparit/src/iter/mod.rs
  10. +180
    -0
      asparit/src/iter/unzip.rs
  11. +2
    -2
      asparit/src/lib.rs
  12. +126
    -2
      asparit/src/std/vec.rs

+ 1
- 1
asparit/src/core/executor.rs Visa fil

@@ -37,7 +37,7 @@ where
operation: O,
) -> Self::Result
where
O: Fn(T2) -> T1 + Send + 'a;
O: FnMut(T2) -> T1 + Send + 'a;
}

pub struct ExecutorCallback<E, C> {


+ 47
- 0
asparit/src/core/extend.rs Visa fil

@@ -0,0 +1,47 @@
use super::{Consumer, Reducer};

/// `ParallelExtend` extends an existing collection with items from a [`ParallelIterator`].
///
/// [`ParallelIterator`]: trait.ParallelIterator.html
///
/// # Examples
///
/// Implementing `ParallelExtend` for your type:
///
/// ```
/// use rayon::prelude::*;
/// use std::mem;
///
/// struct BlackHole {
/// mass: usize,
/// }
///
/// impl<T: Send> ParallelExtend<T> for BlackHole {
/// fn par_extend<I>(&mut self, par_iter: I)
/// where I: IntoParallelIterator<Item = T>
/// {
/// let par_iter = par_iter.into_par_iter();
/// self.mass += par_iter.count() * mem::size_of::<T>();
/// }
/// }
///
/// let mut bh = BlackHole { mass: 0 };
/// bh.par_extend(0i32..1000);
/// assert_eq!(bh.mass, 4000);
/// bh.par_extend(0i64..10);
/// assert_eq!(bh.mass, 4080);
/// ```
pub trait ParallelExtend<'a, I, T>: Send + Sized
where
I: Send + 'a,
T: Send,
<Self::Consumer as Consumer<I>>::Reducer: Reducer<T>,
{
type Consumer: Consumer<I, Result = T> + 'a;

/// Creates a consumer that is used to handle the items from the iterator.
fn into_consumer(self) -> Self::Consumer;

/// Converts the result of the consumer into the final type
fn map_result(inner: T) -> Self;
}

+ 5
- 5
asparit/src/core/from_iter.rs Visa fil

@@ -36,9 +36,9 @@ use super::{Executor, IntoParallelIterator};
/// let bh: BlackHole = (0i32..1000).into_par_iter().collect();
/// assert_eq!(bh.mass, 4000);
/// ```
pub trait FromParallelIterator<T>: Send + Sized
pub trait FromParallelIterator<'a, T>: Send + Sized
where
T: Send,
T: Send + 'a,
{
/// Creates an instance of the collection from the parallel iterator `iterator`.
///
@@ -55,15 +55,15 @@ where
/// [`iterator.fold`]: trait.ParallelIterator.html#method.fold
/// [`iterator.fold_with`]: trait.ParallelIterator.html#method.fold_with
/// [`iterator.for_each`]: trait.ParallelIterator.html#method.for_each
fn from_par_iter<'a, E, X>(executor: E, iterator: X) -> E::Result
fn from_par_iter<E, X>(executor: E, iterator: X) -> E::Result
where
E: Executor<'a, Self>,
X: IntoParallelIterator<'a, Item = T>,
T: 'a;
}

impl FromParallelIterator<()> for () {
fn from_par_iter<'a, E, X>(executor: E, iterator: X) -> E::Result
impl<'a> FromParallelIterator<'a, ()> for () {
fn from_par_iter<E, X>(executor: E, iterator: X) -> E::Result
where
E: Executor<'a, Self>,
X: IntoParallelIterator<'a, Item = ()>,


+ 40
- 1
asparit/src/core/iterator.rs Visa fil

@@ -32,6 +32,7 @@ use crate::{
try_fold::{TryFold, TryFoldWith},
try_for_each::{TryForEach, TryForEachInit, TryForEachWith},
try_reduce::{TryReduce, TryReduceWith},
unzip::Unzip,
update::Update,
while_some::WhileSome,
},
@@ -1590,10 +1591,48 @@ pub trait ParallelIterator<'a>: Sized + Send {
/// ```
fn collect<T>(self) -> Collect<Self, T>
where
T: FromParallelIterator<Self::Item>,
T: FromParallelIterator<'a, Self::Item>,
{
Collect::new(self)
}

/// Unzips the items of a parallel iterator into a pair of arbitrary
/// `ParallelExtend` containers.
///
/// You may prefer to use `unzip_into_vecs()`, which allocates more
/// efficiently with precise knowledge of how many elements the
/// iterator contains, and even allows you to reuse existing
/// vectors' backing stores rather than allocating fresh vectors.
///
/// # Examples
///
/// ```
/// use rayon::prelude::*;
///
/// let a = [(0, 1), (1, 2), (2, 3), (3, 4)];
///
/// let (left, right): (Vec<_>, Vec<_>) = a.par_iter().cloned().unzip();
///
/// assert_eq!(left, [0, 1, 2, 3]);
/// assert_eq!(right, [1, 2, 3, 4]);
/// ```
///
/// Nested pairs can be unzipped too.
///
/// ```
/// use rayon::prelude::*;
///
/// let (values, squares, cubes): (Vec<_>, Vec<_>, Vec<_>) = (0..4).into_par_iter()
/// .map(|i| (i, i * i, i * i * i))
/// .unzip();
///
/// assert_eq!(values, [0, 1, 2, 3]);
/// assert_eq!(squares, [0, 1, 4, 9]);
/// assert_eq!(cubes, [0, 1, 8, 27]);
/// ```
fn unzip(self) -> Unzip<Self> {
Unzip::new(self)
}
}

/// An iterator that supports "random access" to its data, meaning


+ 2
- 0
asparit/src/core/mod.rs Visa fil

@@ -2,6 +2,7 @@ mod consumer;
mod drain;
mod driver;
mod executor;
mod extend;
mod folder;
mod from_iter;
mod into_iter;
@@ -13,6 +14,7 @@ pub use consumer::Consumer;
pub use drain::{ParallelDrainFull, ParallelDrainRange};
pub use driver::Driver;
pub use executor::{Executor, ExecutorCallback};
pub use extend::ParallelExtend;
pub use folder::Folder;
pub use from_iter::FromParallelIterator;
pub use into_iter::{IntoParallelIterator, IntoParallelRefIterator, IntoParallelRefMutIterator};


+ 2
- 2
asparit/src/executor/sequential.rs Visa fil

@@ -55,10 +55,10 @@ where

fn map<O>(
inner: <Self::Inner as Executor<'a, T2, T3, ()>>::Result,
operation: O,
mut operation: O,
) -> Self::Result
where
O: Fn(T2) -> T1,
O: FnMut(T2) -> T1,
{
operation(inner)
}


+ 2
- 2
asparit/src/executor/tokio.rs Visa fil

@@ -96,10 +96,10 @@ where

fn map<O>(
inner: <Self::Inner as Executor<'a, T2, T3, ()>>::Result,
operation: O,
mut operation: O,
) -> Self::Result
where
O: Fn(T2) -> T1 + Send + 'a,
O: FnMut(T2) -> T1 + Send + 'a,
{
async move {
let value = inner.await;


+ 1
- 1
asparit/src/iter/collect.rs Visa fil

@@ -22,7 +22,7 @@ impl<X, T> Collect<X, T> {
impl<'a, X, T> Driver<'a, T> for Collect<X, T>
where
X: ParallelIterator<'a>,
T: FromParallelIterator<X::Item> + Send + 'a,
T: FromParallelIterator<'a, X::Item> + Send + 'a,
{
fn exec_with<E>(self, executor: E) -> E::Result
where


+ 9
- 11
asparit/src/iter/mod.rs Visa fil

@@ -23,6 +23,7 @@ pub mod sum;
pub mod try_fold;
pub mod try_for_each;
pub mod try_reduce;
pub mod unzip;
pub mod update;
pub mod while_some;

@@ -49,7 +50,7 @@ mod tests {
vec![11usize, 12usize],
];

let x = a
let (x, y, z): (Vec<_>, Vec<_>, Vec<_>) = a
.par_iter()
.cloned()
.chain(b)
@@ -60,22 +61,19 @@ mod tests {
.while_some()
.map_init(
move || i.fetch_add(1, Ordering::Relaxed),
|init, item| Some((item, *init)),
|init, item| (*init, item),
)
.try_fold_with(String::new(), |s, item| -> Result<String, ()> {
Ok(format!("{} + {:?}", s, item))
})
.try_for_each_init(
move || j.fetch_add(1, Ordering::Relaxed),
|init, item| -> Result<(), ()> {
println!("{:?} - {:?}", init, item);
Ok(())
},
.map_init(
move || j.fetch_add(2, Ordering::Relaxed),
|init, (init2, item)| (*init, init2, item),
)
.unzip()
.exec()
.await;

dbg!(&x);
dbg!(&y);
dbg!(&z);
}

#[tokio::test]


+ 180
- 0
asparit/src/iter/unzip.rs Visa fil

@@ -0,0 +1,180 @@
use crate::{core::Driver, Consumer, Executor, Folder, ParallelExtend, ParallelIterator, Reducer};

/* Unzip */

pub struct Unzip<X> {
iterator: X,
}

impl<X> Unzip<X> {
pub fn new(iterator: X) -> Self {
Self { iterator }
}
}

impl<'a, X, P, T> Driver<'a, P, T> for Unzip<X>
where
X: ParallelIterator<'a>,
P: Default + Send + 'a,
UnzipExtend<P>: ParallelExtend<'a, X::Item, T>,
<<UnzipExtend<P> as ParallelExtend<'a, X::Item, T>>::Consumer as Consumer<X::Item>>::Reducer:
Reducer<T> + Send,
T: Send + 'a,
{
fn exec_with<E>(self, executor: E) -> E::Result
where
E: Executor<'a, P, T>,
{
let executor = executor.into_inner();
let consumer = UnzipExtend(P::default()).into_consumer();

let inner = self.iterator.drive(executor, consumer);

E::map(inner, |inner| {
let UnzipExtend(ret) = UnzipExtend::map_result(inner);

ret
})
}
}

pub struct UnzipExtend<T>(T);
pub struct UnzipConsumer<T>(T);
pub struct UnzipFolder<T>(T);
pub struct UnzipReducer<T>(T);
pub struct UnzipResult<T>(T);

macro_rules! parallel_extend_tuple {
(($($E:ident),*), ($($I:ident),*), ($($T:ident),*), ($($C:ident),*), ($($F:ident),*), ($($R:ident),*)) => {

#[allow(non_snake_case)]
impl<'a, $($E,)+ $($I,)+ $($T,)+> ParallelExtend<'a, ($($I,)+), UnzipResult<($($T,)+)>> for UnzipExtend<($($E,)+)>
where
$($E: ParallelExtend<'a, $I, $T> + Send,)+
$(<$E::Consumer as Consumer<$I>>::Reducer: Reducer<$T>,)+
$($I: Send + 'a,)+
$($T: Send,)+
{
type Consumer = UnzipConsumer<($($E::Consumer,)+)>;

fn into_consumer(self) -> Self::Consumer {
let UnzipExtend(($($E,)+)) = self;

UnzipConsumer(($($E.into_consumer(),)+))
}

fn map_result(inner: UnzipResult<($($T,)+)>) -> Self {
let UnzipResult(($($T,)+)) = inner;

UnzipExtend(($($E::map_result($T),)+))
}
}

#[allow(non_snake_case)]
impl<$($C,)+ $($I,)+> Consumer<($($I,)+)> for UnzipConsumer<($($C,)+)>
where
$($C: Consumer<$I>,)+
{
type Folder = UnzipFolder<($($C::Folder,)+)>;
type Reducer = UnzipReducer<($($C::Reducer,)+)>;
type Result = UnzipResult<($($C::Result,)+)>;

fn split(self) -> (Self, Self, Self::Reducer) {
let UnzipConsumer(($($C,)+)) = self;
let ($($C,)+) = ($($C.split(),)+);

(
UnzipConsumer(($($C.0,)+)), UnzipConsumer(($($C.1,)+)), UnzipReducer(($($C.2,)+))
)
}

fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
let UnzipConsumer(($($C,)+)) = self;
let ($($C,)+) = ($($C.split_at(index),)+);

(
UnzipConsumer(($($C.0,)+)), UnzipConsumer(($($C.1,)+)), UnzipReducer(($($C.2,)+))
)
}

fn into_folder(self) -> Self::Folder {
let UnzipConsumer(($($C,)+)) = self;

UnzipFolder(($($C.into_folder(),)+))
}

fn is_full(&self) -> bool {
let UnzipConsumer(($($C,)+)) = self;

true $(&& $C.is_full())+
}
}

#[allow(non_snake_case)]
impl<$($F,)+ $($I,)+> Folder<($($I,)+)> for UnzipFolder<($($F,)+)>
where
$($F: Folder<$I>,)+
{
type Result = UnzipResult<($($F::Result,)+)>;

fn consume(self, item: ($($I,)+)) -> Self {
let UnzipFolder(($($F,)+)) = self;
let ($($I,)+) = item;

UnzipFolder(($($F.consume($I),)+))
}

fn complete(self) -> Self::Result {
let UnzipFolder(($($F,)+)) = self;

UnzipResult(($($F.complete(),)+))
}

fn is_full(&self) -> bool {
let UnzipFolder(($($F,)+)) = self;

$($F.is_full() &&)+ true
}
}

#[allow(non_snake_case)]
impl<$($R,)+ $($T,)+> Reducer<UnzipResult<($($T,)+)>> for UnzipReducer<($($R,)+)>
where
$($R: Reducer<$T>,)+
{
fn reduce(self, left: UnzipResult<($($T,)+)>, right: UnzipResult<($($T,)+)>) -> UnzipResult<($($T,)+)> {
let UnzipReducer(($($R,)+)) = self;
let UnzipResult(($($T,)+)) = left;
let UnzipResult(($($E,)+)) = right;

UnzipResult(($($R.reduce($T, $E),)+))
}
}
};
}

parallel_extend_tuple!((E1, E2), (I1, I2), (T1, T2), (C1, C2), (F1, F2), (R1, R2));
parallel_extend_tuple!(
(E1, E2, E3),
(I1, I2, I3),
(T1, T2, T3),
(C1, C2, C3),
(F1, F2, F3),
(R1, R2, R3)
);
parallel_extend_tuple!(
(E1, E2, E3, E4),
(I1, I2, I3, I4),
(T1, T2, T3, T4),
(C1, C2, C3, C4),
(F1, F2, F3, F4),
(R1, R2, R3, R4)
);
parallel_extend_tuple!(
(E1, E2, E3, E4, E5),
(I1, I2, I3, I4, I5),
(T1, T2, T3, T4, T5),
(C1, C2, C3, C4, C5),
(F1, F2, F3, F4, F5),
(R1, R2, R3, R4, R5)
);

+ 2
- 2
asparit/src/lib.rs Visa fil

@@ -7,7 +7,7 @@ mod std;
pub use self::core::{
Consumer, Driver, Executor, ExecutorCallback, Folder, IndexedParallelIterator, IndexedProducer,
IndexedProducerCallback, IntoParallelIterator, IntoParallelRefIterator,
IntoParallelRefMutIterator, ParallelDrainFull, ParallelDrainRange, ParallelIterator, Producer,
ProducerCallback, Reducer,
IntoParallelRefMutIterator, ParallelDrainFull, ParallelDrainRange, ParallelExtend,
ParallelIterator, Producer, ProducerCallback, Reducer,
};
pub use self::executor::{DefaultExecutor, SequentialExecutor};

+ 126
- 2
asparit/src/std/vec.rs Visa fil

@@ -1,3 +1,4 @@
use std::collections::LinkedList;
use std::iter::FusedIterator;
use std::mem::replace;
use std::ops::{Range, RangeBounds};
@@ -6,9 +7,9 @@ use std::slice::{from_raw_parts_mut, IterMut};
use std::sync::Arc;

use crate::{
misc::simplify_range, Consumer, Executor, ExecutorCallback, IndexedParallelIterator,
misc::simplify_range, Consumer, Executor, ExecutorCallback, Folder, IndexedParallelIterator,
IndexedProducer, IndexedProducerCallback, IntoParallelIterator, ParallelDrainRange,
ParallelIterator, Producer, ProducerCallback, Reducer,
ParallelExtend, ParallelIterator, Producer, ProducerCallback, Reducer,
};

/// Parallel iterator that moves out of a vector.
@@ -455,3 +456,126 @@ impl<'a, T, C> ExactSizeIterator for SliceIter<'a, T, C> {
}

impl<'a, T, C> FusedIterator for SliceIter<'a, T, C> {}

/* ParallelExtend */

impl<'a, I> ParallelExtend<'a, I, VecExtendResult<I>> for Vec<I>
where
I: Send + 'a,
{
type Consumer = VecConsumer<I>;

fn into_consumer(self) -> Self::Consumer {
VecConsumer { vec: Some(self) }
}

fn map_result(inner: VecExtendResult<I>) -> Self {
let mut vec = inner.vec.unwrap();

for mut items in inner.items {
vec.append(&mut items);
}

vec
}
}

/* VecConsumer */

pub struct VecConsumer<T> {
vec: Option<Vec<T>>,
}

impl<T> Consumer<T> for VecConsumer<T>
where
T: Send,
{
type Folder = VecFolder<T>;
type Reducer = VecReducer;
type Result = VecExtendResult<T>;

fn split(self) -> (Self, Self, Self::Reducer) {
let left = VecConsumer { vec: self.vec };
let right = VecConsumer { vec: None };
let reducer = VecReducer;

(left, right, reducer)
}

fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) {
self.split()
}

fn into_folder(self) -> Self::Folder {
VecFolder {
vec: self.vec,
items: Vec::new(),
}
}

fn is_full(&self) -> bool {
false
}
}

/* VecFolder */

pub struct VecFolder<T> {
vec: Option<Vec<T>>,
items: Vec<T>,
}

impl<T> Folder<T> for VecFolder<T> {
type Result = VecExtendResult<T>;

fn consume(mut self, item: T) -> Self {
self.items.push(item);

self
}

fn consume_iter<X>(mut self, iter: X) -> Self
where
X: IntoIterator<Item = T>,
{
self.items.extend(iter);

self
}

fn complete(self) -> Self::Result {
let mut items = LinkedList::new();
items.push_back(self.items);

VecExtendResult {
vec: self.vec,
items,
}
}

fn is_full(&self) -> bool {
false
}
}

/* VecReducer */

pub struct VecReducer;

impl<T> Reducer<VecExtendResult<T>> for VecReducer {
fn reduce(self, left: VecExtendResult<T>, mut right: VecExtendResult<T>) -> VecExtendResult<T> {
let mut items = left.items;
items.append(&mut right.items);

let vec = left.vec.or(right.vec);

VecExtendResult { vec, items }
}
}

/* VecExtendResult */

pub struct VecExtendResult<T> {
vec: Option<Vec<T>>,
items: LinkedList<Vec<T>>,
}

Laddar…
Avbryt
Spara