瀏覽代碼

Implemented 'with_splits' modificator

master
Bergmann89 5 年之前
父節點
當前提交
3d19f2c22c
共有 4 個文件被更改,包括 235 次插入2 次删除
  1. +20
    -0
      asparit/src/core/iterator.rs
  2. +10
    -1
      asparit/src/iter/mod.rs
  3. +201
    -0
      asparit/src/iter/splits.rs
  4. +4
    -1
      asparit/src/iter/while_some.rs

+ 20
- 0
asparit/src/core/iterator.rs 查看文件

@@ -30,6 +30,7 @@ use crate::{
partition::{Partition, PartitionMap},
product::Product,
reduce::{Reduce, ReduceWith},
splits::Splits,
sum::Sum,
try_fold::{TryFold, TryFoldWith},
try_for_each::{TryForEach, TryForEachInit, TryForEachWith},
@@ -1727,6 +1728,25 @@ pub trait ParallelIterator<'a>: Sized + Send {
{
Intersperse::new(self, item)
}

/// Sets the number of splits that are processed in parallel.
///
/// # Examples
///
/// ```
/// use rayon::prelude::*;
///
/// let min = (0..1_000_000)
/// .into_par_iter()
/// .with_splits(8)
/// .for_each(|| println!("Thread ID: {:?}", std::thread::current().id))
/// .exec();
///
/// assert!(min >= 1234);
/// ```
fn with_splits(self, splits: usize) -> Splits<Self> {
Splits::new(self, splits)
}
}

/// An iterator that supports "random access" to its data, meaning


+ 10
- 1
asparit/src/iter/mod.rs 查看文件

@@ -21,6 +21,7 @@ pub mod panic_fuse;
pub mod partition;
pub mod product;
pub mod reduce;
pub mod splits;
pub mod sum;
pub mod try_fold;
pub mod try_for_each;
@@ -33,7 +34,7 @@ pub mod while_some;
mod tests {
use crate::*;

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_for_each() {
use ::std::sync::atomic::{AtomicUsize, Ordering};
use ::std::sync::Arc;
@@ -70,6 +71,14 @@ mod tests {
move || j.fetch_add(2, Ordering::Relaxed),
|init, (init2, item)| (*init, init2, item),
)
.with_splits(4)
.inspect(|x| {
println!(
"Thread ID = {:?}; Item = {:?}",
::std::thread::current().id(),
x
)
})
.partition_map(|(i, j, k)| match j % 3 {
0 => (Some(i), None, None),
1 => (None, Some(j), None),


+ 201
- 0
asparit/src/iter/splits.rs 查看文件

@@ -0,0 +1,201 @@
use crate::{
Consumer, Executor, ExecutorCallback, Folder, IndexedParallelIterator, IndexedProducer,
IndexedProducerCallback, ParallelIterator, Producer, ProducerCallback, Reducer,
};

pub struct Splits<X> {
base: X,
splits: usize,
}

impl<X> Splits<X> {
pub fn new(base: X, splits: usize) -> Self {
Self { base, splits }
}
}

impl<'a, X> ParallelIterator<'a> for Splits<X>
where
X: ParallelIterator<'a>,
{
type Item = X::Item;

fn drive<E, C, D, R>(self, executor: E, consumer: C) -> E::Result
where
E: Executor<'a, D>,
C: Consumer<Self::Item, Result = D, Reducer = R> + 'a,
D: Send + 'a,
R: Reducer<D> + Send + 'a,
{
self.with_producer(ExecutorCallback::new(executor, consumer))
}

fn with_producer<CB>(self, base: CB) -> CB::Output
where
CB: ProducerCallback<'a, Self::Item>,
{
let splits = self.splits;

self.base.with_producer(SplitsCallback { base, splits })
}

fn len_hint_opt(&self) -> Option<usize> {
self.base.len_hint_opt()
}
}

impl<'a, X> IndexedParallelIterator<'a> for Splits<X>
where
X: IndexedParallelIterator<'a>,
{
fn drive_indexed<E, C, D, R>(self, executor: E, consumer: C) -> E::Result
where
E: Executor<'a, D>,
C: Consumer<Self::Item, Result = D, Reducer = R> + 'a,
D: Send + 'a,
R: Reducer<D> + Send + 'a,
{
self.with_producer_indexed(ExecutorCallback::new(executor, consumer))
}

fn with_producer_indexed<CB>(self, base: CB) -> CB::Output
where
CB: IndexedProducerCallback<'a, Self::Item>,
{
let splits = self.splits;

self.base
.with_producer_indexed(SplitsCallback { base, splits })
}

fn len_hint(&self) -> usize {
self.base.len_hint()
}
}

/* SplitsCallback */

struct SplitsCallback<CB> {
base: CB,
splits: usize,
}

impl<'a, CB, I> ProducerCallback<'a, I> for SplitsCallback<CB>
where
CB: ProducerCallback<'a, I>,
{
type Output = CB::Output;

fn callback<P>(self, base: P) -> Self::Output
where
P: Producer<Item = I> + 'a,
{
let splits = self.splits;

self.base.callback(SplitsProducer { base, splits })
}
}

impl<'a, CB, I> IndexedProducerCallback<'a, I> for SplitsCallback<CB>
where
CB: IndexedProducerCallback<'a, I>,
{
type Output = CB::Output;

fn callback<P>(self, base: P) -> Self::Output
where
P: IndexedProducer<Item = I> + 'a,
{
let splits = self.splits;

self.base.callback(SplitsProducer { base, splits })
}
}

/* SplitsProducer */

struct SplitsProducer<P> {
base: P,
splits: usize,
}

impl<P> Producer for SplitsProducer<P>
where
P: Producer,
{
type Item = P::Item;
type IntoIter = P::IntoIter;

fn into_iter(self) -> Self::IntoIter {
self.base.into_iter()
}

fn split(self) -> (Self, Option<Self>) {
let splits = self.splits;
let (left, right) = self.base.split();

let left = Self { base: left, splits };
let right = right.map(|base| Self { base, splits });

(left, right)
}

fn splits(&self) -> Option<usize> {
Some(self.splits)
}

fn fold_with<F>(self, folder: F) -> F
where
F: Folder<Self::Item>,
{
self.base.fold_with(folder)
}
}

impl<P> IndexedProducer for SplitsProducer<P>
where
P: IndexedProducer,
{
type Item = P::Item;
type IntoIter = P::IntoIter;

fn into_iter(self) -> Self::IntoIter {
self.base.into_iter()
}

fn len(&self) -> usize {
self.base.len()
}

fn split_at(self, index: usize) -> (Self, Self) {
let splits = self.splits;
let (left, right) = self.base.split_at(index);

let left = Self { base: left, splits };
let right = Self {
base: right,
splits,
};

(left, right)
}

fn splits(&self) -> Option<usize> {
Some(self.splits)
}

fn min_len(&self) -> Option<usize> {
self.base.min_len()
}

fn max_len(&self) -> Option<usize> {
self.base.max_len()
}

fn fold_with<F>(self, folder: F) -> F
where
F: Folder<Self::Item>,
{
self.base.fold_with(folder)
}
}

+ 4
- 1
asparit/src/iter/while_some.rs 查看文件

@@ -142,7 +142,10 @@ where
type IntoIter = WhileSomeIter<P::IntoIter>;

fn into_iter(self) -> Self::IntoIter {
unimplemented!()
WhileSomeIter {
base: self.base.into_iter(),
is_full: self.is_full,
}
}

fn split(self) -> (Self, Option<Self>) {


Loading…
取消
儲存