Преглед изворни кода

Implemented 'interleave', 'interleave_shortest' and 'take' operation

master
Bergmann89 пре 5 година
родитељ
комит
70f70e4a27
4 измењених фајлова са 532 додато и 41 уклоњено
  1. +76
    -1
      asparit/src/core/iterator.rs
  2. +346
    -0
      asparit/src/iter/interleave.rs
  3. +10
    -40
      asparit/src/iter/mod.rs
  4. +100
    -0
      asparit/src/iter/take.rs

+ 76
- 1
asparit/src/core/iterator.rs Прегледај датотеку

@@ -1,7 +1,9 @@
use std::cmp::{Ord, Ordering};
use std::iter::IntoIterator;

use super::{Consumer, Executor, FromParallelIterator, IntoParallelIterator, Reducer};
use super::{
Consumer, Executor, FromParallelIterator, IntoParallelIterator, Reducer, WithIndexedProducer,
};

use crate::{
iter::{
@@ -17,6 +19,7 @@ use crate::{
fold::{Fold, FoldWith},
for_each::ForEach,
inspect::Inspect,
interleave::Interleave,
intersperse::Intersperse,
map::Map,
map_init::MapInit,
@@ -29,6 +32,7 @@ use crate::{
reduce::{Reduce, ReduceWith},
splits::Splits,
sum::Sum,
take::Take,
try_fold::{TryFold, TryFoldWith},
try_for_each::{TryForEach, TryForEachInit, TryForEachWith},
try_reduce::{TryReduce, TryReduceWith},
@@ -1831,4 +1835,75 @@ pub trait IndexedParallelIterator<'a>: ParallelIterator<'a> {

Zip::new(self, other)
}

/// Interleaves elements of this iterator and the other given
/// iterator. Alternately yields elements from this iterator and
/// the given iterator, until both are exhausted. If one iterator
/// is exhausted before the other, the last elements are provided
/// from the other.
///
/// # Examples
///
/// ```
/// use rayon::prelude::*;
/// let (x, y) = (vec![1, 2], vec![3, 4, 5, 6]);
/// let r: Vec<i32> = x.into_par_iter().interleave(y).collect();
/// assert_eq!(r, vec![1, 3, 2, 4, 5, 6]);
/// ```
fn interleave<X>(self, other: X) -> Interleave<Self, X::Iter>
where
X: IntoParallelIterator<'a, Item = Self::Item>,
X::Iter: IndexedParallelIterator<'a, Item = Self::Item>,
{
Interleave::new(self, other.into_par_iter())
}

/// Interleaves elements of this iterator and the other given
/// iterator, until one is exhausted.
///
/// # Examples
///
/// ```
/// use rayon::prelude::*;
/// let (x, y) = (vec![1, 2, 3, 4], vec![5, 6]);
/// let r: Vec<i32> = x.into_par_iter().interleave_shortest(y).collect();
/// assert_eq!(r, vec![1, 5, 2, 6, 3]);
/// ```
fn interleave_shortest<X, I>(self, other: X) -> Interleave<Take<Self>, Take<X::Iter>>
where
X: IntoParallelIterator<'a, Item = I>,
X::Iter: IndexedParallelIterator<'a, Item = I> + WithIndexedProducer<'a, Item = I>,
Self: IndexedParallelIterator<'a, Item = I> + WithIndexedProducer<'a, Item = I>,
I: Send + 'a,
{
let a = self;
let b = other.into_par_iter();

let len_a = a.len_hint();
let len_b = b.len_hint();

if len_a <= len_b {
a.take(len_a).interleave(b.take(len_a))
} else {
a.take(len_b + 1).interleave(b.take(len_b))
}
}

/// Creates an iterator that yields the first `n` elements.
///
/// # Examples
///
/// ```
/// use rayon::prelude::*;
///
/// let result: Vec<_> = (0..100)
/// .into_par_iter()
/// .take(5)
/// .collect();
///
/// assert_eq!(result, [0, 1, 2, 3, 4]);
/// ```
fn take(self, n: usize) -> Take<Self> {
Take::new(self, n)
}
}

+ 346
- 0
asparit/src/iter/interleave.rs Прегледај датотеку

@@ -0,0 +1,346 @@
use std::cmp::Ordering;
use std::iter::{DoubleEndedIterator, ExactSizeIterator, Fuse, Iterator};

use crate::{
Consumer, Executor, ExecutorCallback, IndexedParallelIterator, IndexedProducer,
IndexedProducerCallback, ParallelIterator, Producer, Reducer, Setup, WithIndexedProducer,
WithSetup,
};

pub struct Interleave<XA, XB> {
iterator_a: XA,
iterator_b: XB,
}

impl<XA, XB> Interleave<XA, XB> {
pub fn new(iterator_a: XA, iterator_b: XB) -> Self {
Self {
iterator_a,
iterator_b,
}
}
}

impl<'a, XA, XB, I> ParallelIterator<'a> for Interleave<XA, XB>
where
XA: IndexedParallelIterator<'a, Item = I> + WithIndexedProducer<'a, Item = I>,
XB: IndexedParallelIterator<'a, Item = I> + WithIndexedProducer<'a, Item = I>,
I: Send + 'a,
{
type Item = I;

fn drive<E, C, D, R>(self, executor: E, consumer: C) -> E::Result
where
E: Executor<'a, D>,
C: Consumer<Self::Item, Result = D, Reducer = R> + 'a,
D: Send + 'a,
R: Reducer<D> + Send + 'a,
{
self.with_indexed_producer(ExecutorCallback::new(executor, consumer))
}

fn len_hint_opt(&self) -> Option<usize> {
Some(self.len_hint())
}
}

impl<'a, XA, XB, I> IndexedParallelIterator<'a> for Interleave<XA, XB>
where
XA: IndexedParallelIterator<'a, Item = I> + WithIndexedProducer<'a, Item = I>,
XB: IndexedParallelIterator<'a, Item = I> + WithIndexedProducer<'a, Item = I>,
I: Send + 'a,
{
fn drive_indexed<E, C, D, R>(self, executor: E, consumer: C) -> E::Result
where
E: Executor<'a, D>,
C: Consumer<Self::Item, Result = D, Reducer = R> + 'a,
D: Send + 'a,
R: Reducer<D> + Send + 'a,
{
self.with_indexed_producer(ExecutorCallback::new(executor, consumer))
}

fn len_hint(&self) -> usize {
self.iterator_a.len_hint() + self.iterator_b.len_hint()
}
}

impl<'a, XA, XB, I> WithIndexedProducer<'a> for Interleave<XA, XB>
where
XA: WithIndexedProducer<'a, Item = I>,
XB: WithIndexedProducer<'a, Item = I>,
I: Send + 'a,
{
type Item = I;

fn with_indexed_producer<CB>(self, base: CB) -> CB::Output
where
CB: IndexedProducerCallback<'a, Self::Item>,
{
let iterator_a = self.iterator_a;
let iterator_b = self.iterator_b;

iterator_a.with_indexed_producer(CallbackA { base, iterator_b })
}
}

/* CallbackA */

struct CallbackA<CB, XB> {
base: CB,
iterator_b: XB,
}

impl<'a, CB, XB, I> IndexedProducerCallback<'a, I> for CallbackA<CB, XB>
where
CB: IndexedProducerCallback<'a, I>,
XB: WithIndexedProducer<'a, Item = I>,
I: Send + 'a,
{
type Output = CB::Output;

fn callback<P>(self, producer_a: P) -> Self::Output
where
P: IndexedProducer<Item = I> + 'a,
{
let CallbackA { base, iterator_b } = self;

iterator_b.with_indexed_producer(CallbackB { base, producer_a })
}
}

/* CallbackB */

struct CallbackB<CB, PA> {
base: CB,
producer_a: PA,
}

impl<'a, CB, PA, I> IndexedProducerCallback<'a, I> for CallbackB<CB, PA>
where
CB: IndexedProducerCallback<'a, I>,
PA: IndexedProducer<Item = I> + 'a,
I: Send + 'a,
{
type Output = CB::Output;

fn callback<P>(self, producer_b: P) -> Self::Output
where
P: IndexedProducer<Item = I> + 'a,
{
let CallbackB { base, producer_a } = self;

let producer = InterleaveProducer {
producer_a,
producer_b,
a_is_next: false,
};

base.callback(producer)
}
}

/* InterleaveProducer */

struct InterleaveProducer<PA, PB> {
producer_a: PA,
producer_b: PB,
a_is_next: bool,
}

impl<PA, PB> WithSetup for InterleaveProducer<PA, PB>
where
PA: WithSetup,
PB: WithSetup,
{
fn setup(&self) -> Setup {
let a = self.producer_a.setup();
let b = self.producer_b.setup();

a.merge(b)
}
}

impl<PA, PB, I> Producer for InterleaveProducer<PA, PB>
where
PA: IndexedProducer<Item = I>,
PB: IndexedProducer<Item = I>,
{
type Item = I;
type IntoIter = InterleaveIter<PA::IntoIter, PB::IntoIter>;

fn into_iter(self) -> Self::IntoIter {
InterleaveIter {
iter_a: self.producer_a.into_iter().fuse(),
iter_b: self.producer_b.into_iter().fuse(),
a_is_next: self.a_is_next,
}
}

fn split(self) -> (Self, Option<Self>) {
let len = self.len();

if len < 2 {
return (self, None);
}

let (left, right) = self.split_at(len / 2);

(left, Some(right))
}
}

impl<PA, PB, I> IndexedProducer for InterleaveProducer<PA, PB>
where
PA: IndexedProducer<Item = I>,
PB: IndexedProducer<Item = I>,
{
type Item = I;
type IntoIter = InterleaveIter<PA::IntoIter, PB::IntoIter>;

fn into_iter(self) -> Self::IntoIter {
InterleaveIter {
iter_a: self.producer_a.into_iter().fuse(),
iter_b: self.producer_b.into_iter().fuse(),
a_is_next: self.a_is_next,
}
}

fn len(&self) -> usize {
self.producer_a.len() + self.producer_b.len()
}

/// We know 0 < index <= self.producer_a.len() + self.producer_a.len()
///
/// Find a, b satisfying:
///
/// (1) 0 < a <= self.producer_a.len()
/// (2) 0 < b <= self.producer_b.len()
/// (3) a + b == index
///
/// For even splits, set a = b = index/2.
/// For odd splits, set a = (index / 2) + 1, b = index / 2, if `a`
/// should yield the next element, otherwise, if `b` should yield
/// the next element, set a = index / 2 and b = (index/2) + 1
fn split_at(self, index: usize) -> (Self, Self) {
#[inline]
fn odd_offset(flag: bool) -> usize {
(!flag) as usize
}

let even = index % 2 == 0;
let idx = index >> 1;

let (index_a, index_b) = (
idx + odd_offset(even || self.a_is_next),
idx + odd_offset(even || !self.a_is_next),
);

let (index_a, index_b) =
if self.producer_a.len() >= index_a && self.producer_b.len() >= index_b {
(index_a, index_b)
} else if self.producer_a.len() >= index_a {
(index - self.producer_b.len(), self.producer_b.len())
} else {
(self.producer_a.len(), index - self.producer_a.len())
};

let trailing_a_is_next = even == self.a_is_next;
let (left_a, right_a) = self.producer_a.split_at(index_a);
let (left_b, right_b) = self.producer_b.split_at(index_b);

let left = Self {
producer_a: left_a,
producer_b: left_b,
a_is_next: self.a_is_next,
};
let right = Self {
producer_a: right_a,
producer_b: right_b,
a_is_next: trailing_a_is_next,
};

(left, right)
}
}

/* InterleaveIter */

struct InterleaveIter<IA, IB> {
iter_a: Fuse<IA>,
iter_b: Fuse<IB>,
a_is_next: bool,
}

impl<IA, IB, I> Iterator for InterleaveIter<IA, IB>
where
IA: Iterator<Item = I>,
IB: Iterator<Item = I>,
{
type Item = I;

#[inline]
fn next(&mut self) -> Option<Self::Item> {
self.a_is_next = !self.a_is_next;

if self.a_is_next {
match self.iter_a.next() {
None => self.iter_b.next(),
r => r,
}
} else {
match self.iter_b.next() {
None => self.iter_a.next(),
r => r,
}
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
let (min_a, max_a) = self.iter_a.size_hint();
let (min_b, max_b) = self.iter_b.size_hint();

let min = min_a.saturating_add(min_b);
let max = match (max_a, max_b) {
(Some(a), Some(b)) => a.checked_add(b),
_ => None,
};

(min, max)
}
}

impl<IA, IB, I> DoubleEndedIterator for InterleaveIter<IA, IB>
where
IA: DoubleEndedIterator<Item = I> + ExactSizeIterator<Item = I>,
IB: DoubleEndedIterator<Item = I> + ExactSizeIterator<Item = I>,
{
#[inline]
fn next_back(&mut self) -> Option<I> {
let len_a = self.iter_a.len();
let len_b = self.iter_b.len();

match len_a.cmp(&len_b) {
Ordering::Less => self.iter_a.next_back(),
Ordering::Greater => self.iter_b.next_back(),
Ordering::Equal => {
if self.a_is_next {
self.iter_a.next_back()
} else {
self.iter_b.next_back()
}
}
}
}
}

impl<IA, IB, I> ExactSizeIterator for InterleaveIter<IA, IB>
where
IA: ExactSizeIterator<Item = I>,
IB: ExactSizeIterator<Item = I>,
{
#[inline]
fn len(&self) -> usize {
self.iter_a.len() + self.iter_b.len()
}
}

+ 10
- 40
asparit/src/iter/mod.rs Прегледај датотеку

@@ -10,6 +10,7 @@ pub mod flatten;
pub mod fold;
pub mod for_each;
pub mod inspect;
pub mod interleave;
pub mod intersperse;
pub mod map;
pub mod map_init;
@@ -23,6 +24,7 @@ pub mod product;
pub mod reduce;
pub mod splits;
pub mod sum;
pub mod take;
pub mod try_fold;
pub mod try_for_each;
pub mod try_reduce;
@@ -37,12 +39,6 @@ mod tests {

#[tokio::test(flavor = "multi_thread")]
async fn test_for_each() {
use ::std::sync::atomic::{AtomicUsize, Ordering};
use ::std::sync::Arc;

let i = Arc::new(AtomicUsize::new(0));
let j = Arc::new(AtomicUsize::new(0));

let a = vec![
vec![1usize, 2usize],
vec![3usize, 4usize],
@@ -54,46 +50,20 @@ mod tests {
vec![11usize, 12usize],
];

let (x, y, z): (Vec<_>, Vec<_>, Vec<_>) = a
.par_iter()
a.par_iter()
.cloned()
.chain(b)
.update(|x| x.push(0))
.zip_eq(vec![10usize, 11usize, 12usize, 13usize, 14usize, 15usize])
.map(|x| x.0)
.flatten_iter()
.intersperse(100)
.panic_fuse()
.map(Some)
.while_some()
.map_init(
move || i.fetch_add(1, Ordering::Relaxed),
|init, item| (*init, item),
)
.map_init(
move || j.fetch_add(2, Ordering::Relaxed),
|init, (init2, item)| (*init, init2, item),
.with_splits(1)
.interleave_shortest(
vec![vec![50, 51], vec![52, 53], vec![54, 55]]
.into_par_iter()
.take(2),
)
.with_splits(16)
.inspect(|x| {
println!(
"Thread ID = {:?}; Item = {:?}",
::std::thread::current().id(),
x
)
})
.partition_map(|(i, j, k)| match j % 3 {
0 => (Some(i), None, None),
1 => (None, Some(j), None),
2 => (None, None, Some(k)),
_ => unreachable!(),
.for_each(|x| {
dbg!(x);
})
.exec()
.await;

dbg!(&x);
dbg!(&y);
dbg!(&z);
}

#[tokio::test]


+ 100
- 0
asparit/src/iter/take.rs Прегледај датотеку

@@ -0,0 +1,100 @@
use std::cmp::min;

use crate::{
Consumer, Executor, ExecutorCallback, IndexedParallelIterator, IndexedProducer,
IndexedProducerCallback, ParallelIterator, Reducer, WithIndexedProducer,
};

pub struct Take<X> {
base: X,
len: usize,
}

impl<X> Take<X> {
pub fn new(base: X, len: usize) -> Self {
Self { base, len }
}
}

impl<'a, X, I> ParallelIterator<'a> for Take<X>
where
X: IndexedParallelIterator<'a, Item = I> + WithIndexedProducer<'a, Item = I>,
I: Send + 'a,
{
type Item = I;

fn drive<E, C, D, R>(self, executor: E, consumer: C) -> E::Result
where
E: Executor<'a, D>,
C: Consumer<Self::Item, Result = D, Reducer = R> + 'a,
D: Send + 'a,
R: Reducer<D> + Send + 'a,
{
self.with_indexed_producer(ExecutorCallback::new(executor, consumer))
}

fn len_hint_opt(&self) -> Option<usize> {
self.base.len_hint_opt().map(|len| min(len, self.len))
}
}

impl<'a, X, I> IndexedParallelIterator<'a> for Take<X>
where
X: IndexedParallelIterator<'a, Item = I> + WithIndexedProducer<'a, Item = I>,
I: Send + 'a,
{
fn drive_indexed<E, C, D, R>(self, executor: E, consumer: C) -> E::Result
where
E: Executor<'a, D>,
C: Consumer<Self::Item, Result = D, Reducer = R> + 'a,
D: Send + 'a,
R: Reducer<D> + Send + 'a,
{
self.with_indexed_producer(ExecutorCallback::new(executor, consumer))
}

fn len_hint(&self) -> usize {
min(self.base.len_hint(), self.len)
}
}

impl<'a, X> WithIndexedProducer<'a> for Take<X>
where
X: WithIndexedProducer<'a>,
{
type Item = X::Item;

fn with_indexed_producer<CB>(self, base: CB) -> CB::Output
where
CB: IndexedProducerCallback<'a, Self::Item>,
{
self.base.with_indexed_producer(TakeCallback {
base,
len: self.len,
})
}
}

/* TakeCallback */

struct TakeCallback<CB> {
base: CB,
len: usize,
}

impl<'a, CB, I> IndexedProducerCallback<'a, I> for TakeCallback<CB>
where
CB: IndexedProducerCallback<'a, I>,
{
type Output = CB::Output;

fn callback<P>(self, producer: P) -> Self::Output
where
P: IndexedProducer<Item = I> + 'a,
{
let len = dbg!(min(self.len, producer.len()));
let (producer, _) = producer.split_at(len);

self.base.callback(producer)
}
}

Loading…
Откажи
Сачувај