Kaynağa Gözat

Implemented parallel iterator for 'Vec'

master
Bergmann89 4 yıl önce
ebeveyn
işleme
a72cf2401b
8 değiştirilmiş dosya ile 632 ekleme ve 13 silme
  1. +121
    -0
      asparit/src/core/drain.rs
  2. +2
    -0
      asparit/src/core/mod.rs
  3. +11
    -11
      asparit/src/core/producer.rs
  4. +10
    -1
      asparit/src/inner/mod.rs
  5. +2
    -1
      asparit/src/lib.rs
  6. +28
    -0
      asparit/src/misc/mod.rs
  7. +1
    -0
      asparit/src/std/mod.rs
  8. +457
    -0
      asparit/src/std/vec.rs

+ 121
- 0
asparit/src/core/drain.rs Dosyayı Görüntüle

@@ -0,0 +1,121 @@
use std::ops::RangeBounds;

use crate::ParallelIterator;

/// `ParallelDrainFull` creates a parallel iterator that moves all items
/// from a collection while retaining the original capacity.
///
/// Types which are indexable typically implement [`ParallelDrainRange`]
/// instead, where you can drain fully with `par_drain(..)`.
///
/// [`ParallelDrainRange`]: trait.ParallelDrainRange.html
pub trait ParallelDrainFull<'a> {
/// The draining parallel iterator type that will be created.
type Iter: ParallelIterator<'a, Item = Self::Item>;

/// The type of item that the parallel iterator will produce.
/// This is usually the same as `IntoParallelIterator::Item`.
type Item: Send;

/// Returns a draining parallel iterator over an entire collection.
///
/// When the iterator is dropped, all items are removed, even if the
/// iterator was not fully consumed. If the iterator is leaked, for example
/// using `std::mem::forget`, it is unspecified how many items are removed.
///
/// # Examples
///
/// ```
/// use rayon::prelude::*;
/// use std::collections::{BinaryHeap, HashSet};
///
/// let squares: HashSet<i32> = (0..10).map(|x| x * x).collect();
///
/// let mut heap: BinaryHeap<_> = squares.iter().copied().collect();
/// assert_eq!(
/// // heaps are drained in arbitrary order
/// heap.par_drain()
/// .inspect(|x| assert!(squares.contains(x)))
/// .count(),
/// squares.len(),
/// );
/// assert!(heap.is_empty());
/// assert!(heap.capacity() >= squares.len());
/// ```
fn par_drain(self) -> Self::Iter;
}

/// `ParallelDrainRange` creates a parallel iterator that moves a range of items
/// from a collection while retaining the original capacity.
///
/// Types which are not indexable may implement [`ParallelDrainFull`] instead.
///
/// [`ParallelDrainFull`]: trait.ParallelDrainFull.html
pub trait ParallelDrainRange<'a, Idx = usize> {
/// The draining parallel iterator type that will be created.
type Iter: ParallelIterator<'a, Item = Self::Item>;

/// The type of item that the parallel iterator will produce.
/// This is usually the same as `IntoParallelIterator::Item`.
type Item: Send;

/// Returns a draining parallel iterator over a range of the collection.
///
/// When the iterator is dropped, all items in the range are removed, even
/// if the iterator was not fully consumed. If the iterator is leaked, for
/// example using `std::mem::forget`, it is unspecified how many items are
/// removed.
///
/// # Examples
///
/// ```
/// use rayon::prelude::*;
///
/// let squares: Vec<i32> = (0..10).map(|x| x * x).collect();
///
/// println!("RangeFull");
/// let mut vec = squares.clone();
/// assert!(vec.par_drain(..)
/// .eq(squares.par_iter().copied()));
/// assert!(vec.is_empty());
/// assert!(vec.capacity() >= squares.len());
///
/// println!("RangeFrom");
/// let mut vec = squares.clone();
/// assert!(vec.par_drain(5..)
/// .eq(squares[5..].par_iter().copied()));
/// assert_eq!(&vec[..], &squares[..5]);
/// assert!(vec.capacity() >= squares.len());
///
/// println!("RangeTo");
/// let mut vec = squares.clone();
/// assert!(vec.par_drain(..5)
/// .eq(squares[..5].par_iter().copied()));
/// assert_eq!(&vec[..], &squares[5..]);
/// assert!(vec.capacity() >= squares.len());
///
/// println!("RangeToInclusive");
/// let mut vec = squares.clone();
/// assert!(vec.par_drain(..=5)
/// .eq(squares[..=5].par_iter().copied()));
/// assert_eq!(&vec[..], &squares[6..]);
/// assert!(vec.capacity() >= squares.len());
///
/// println!("Range");
/// let mut vec = squares.clone();
/// assert!(vec.par_drain(3..7)
/// .eq(squares[3..7].par_iter().copied()));
/// assert_eq!(&vec[..3], &squares[..3]);
/// assert_eq!(&vec[3..], &squares[7..]);
/// assert!(vec.capacity() >= squares.len());
///
/// println!("RangeInclusive");
/// let mut vec = squares.clone();
/// assert!(vec.par_drain(3..=7)
/// .eq(squares[3..=7].par_iter().copied()));
/// assert_eq!(&vec[..3], &squares[..3]);
/// assert_eq!(&vec[3..], &squares[8..]);
/// assert!(vec.capacity() >= squares.len());
/// ```
fn par_drain<R: RangeBounds<Idx>>(self, range: R) -> Self::Iter;
}

+ 2
- 0
asparit/src/core/mod.rs Dosyayı Görüntüle

@@ -1,4 +1,5 @@
mod consumer;
mod drain;
mod driver;
mod executor;
mod folder;
@@ -9,6 +10,7 @@ mod producer;
mod reducer;

pub use consumer::Consumer;
pub use drain::{ParallelDrainFull, ParallelDrainRange};
pub use driver::Driver;
pub use executor::{Executor, ExecutorCallback};
pub use folder::Folder;


+ 11
- 11
asparit/src/core/producer.rs Dosyayı Görüntüle

@@ -19,14 +19,14 @@ pub trait Producer: Send + Sized {
/// are possible.
fn into_iter(self) -> Self::IntoIter;

/// Split midway into a new producer if possible, otherwise return `None`.
fn split(self) -> (Self, Option<Self>);

/// Number of splits/threads this iterator will use to proceed.
fn splits(&self) -> Option<usize> {
None
}

/// Split midway into a new producer if possible, otherwise return `None`.
fn split(self) -> (Self, Option<Self>);

/// Iterate the producer, feeding each element to `folder`, and
/// stop when the folder is full (or all elements have been consumed).
///
@@ -76,15 +76,19 @@ pub trait IndexedProducer: Send + Sized {
/// are possible.
fn into_iter(self) -> Self::IntoIter;

/// Produces an exact count of how many items this producer will
/// emit, presuming no panic occurs.
fn len(&self) -> usize;

/// Split into two producers; one produces items `0..index`, the
/// other `index..N`. Index must be less than or equal to `N`.
fn split_at(self, index: usize) -> (Self, Self);

/// Number of splits/threads this iterator will use to proceed.
fn splits(&self) -> Option<usize> {
None
}

/// Produces an exact count of how many items this producer will
/// emit, presuming no panic occurs.
fn len(&self) -> usize;

/// The minimum number of items that we will process
/// sequentially. Defaults to 1, which means that we will split
/// all the way down to a single item. This can be raised higher
@@ -112,10 +116,6 @@ pub trait IndexedProducer: Send + Sized {
None
}

/// Split into two producers; one produces items `0..index`, the
/// other `index..N`. Index must be less than or equal to `N`.
fn split_at(self, index: usize) -> (Self, Self);

/// Iterate the producer, feeding each element to `folder`, and
/// stop when the folder is full (or all elements have been consumed).
///


+ 10
- 1
asparit/src/inner/mod.rs Dosyayı Görüntüle

@@ -20,7 +20,16 @@ mod tests {
let i = Arc::new(AtomicUsize::new(0));
let j = Arc::new(AtomicUsize::new(0));

let x = (0..10usize)
let x = vec![
vec![0usize],
vec![1usize],
vec![2usize],
vec![3usize],
vec![4usize],
vec![5usize],
];

let x = x
.into_par_iter()
.map_init(
move || i.fetch_add(1, Ordering::Relaxed),


+ 2
- 1
asparit/src/lib.rs Dosyayı Görüntüle

@@ -7,6 +7,7 @@ mod std;
pub use self::core::{
Consumer, Driver, Executor, ExecutorCallback, Folder, IndexedParallelIterator, IndexedProducer,
IndexedProducerCallback, IntoParallelIterator, IntoParallelRefIterator,
IntoParallelRefMutIterator, ParallelIterator, Producer, ProducerCallback, Reducer,
IntoParallelRefMutIterator, ParallelDrainFull, ParallelDrainRange, ParallelIterator, Producer,
ProducerCallback, Reducer,
};
pub use self::executor::{DefaultExecutor, SequentialExecutor};

+ 28
- 0
asparit/src/misc/mod.rs Dosyayı Görüntüle

@@ -1,3 +1,31 @@
mod try_;

pub use try_::Try;

use std::ops::{Bound, Range, RangeBounds};

pub fn simplify_range(range: impl RangeBounds<usize>, len: usize) -> Range<usize> {
let start = match range.start_bound() {
Bound::Unbounded => 0,
Bound::Included(&i) if i <= len => i,
Bound::Excluded(&i) if i < len => i + 1,
bound => panic!("range start {:?} should be <= length {}", bound, len),
};

let end = match range.end_bound() {
Bound::Unbounded => len,
Bound::Excluded(&i) if i <= len => i,
Bound::Included(&i) if i < len => i + 1,
bound => panic!("range end {:?} should be <= length {}", bound, len),
};

if start > end {
panic!(
"range start {:?} should be <= range end {:?}",
range.start_bound(),
range.end_bound()
);
}

start..end
}

+ 1
- 0
asparit/src/std/mod.rs Dosyayı Görüntüle

@@ -1 +1,2 @@
mod range;
mod vec;

+ 457
- 0
asparit/src/std/vec.rs Dosyayı Görüntüle

@@ -0,0 +1,457 @@
use std::iter::FusedIterator;
use std::mem::replace;
use std::ops::{Range, RangeBounds};
use std::ptr::{copy, drop_in_place, read};
use std::slice::{from_raw_parts_mut, IterMut};
use std::sync::Arc;

use crate::{
misc::simplify_range, Consumer, Executor, ExecutorCallback, IndexedParallelIterator,
IndexedProducer, IndexedProducerCallback, IntoParallelIterator, ParallelDrainRange,
ParallelIterator, Producer, ProducerCallback, Reducer,
};

/// Parallel iterator that moves out of a vector.
#[derive(Debug, Clone)]
pub struct IntoIter<T: Send> {
vec: Vec<T>,
}

impl<'a, T> IntoParallelIterator<'a> for Vec<T>
where
T: Send + 'a,
{
type Item = T;
type Iter = IntoIter<T>;

fn into_par_iter(self) -> Self::Iter {
IntoIter { vec: self }
}
}

impl<'a, T> ParallelIterator<'a> for IntoIter<T>
where
T: Send + 'a,
{
type Item = T;

fn drive<E, C, D, R>(self, executor: E, consumer: C) -> E::Result
where
E: Executor<'a, D>,
C: Consumer<Self::Item, Result = D, Reducer = R> + 'a,
D: Send,
R: Reducer<D> + Send,
{
self.with_producer_indexed(ExecutorCallback::new(executor, consumer))
}

fn with_producer<CB>(self, callback: CB) -> CB::Output
where
CB: ProducerCallback<'a, Self::Item>,
{
callback.callback(VecProducer::new(self.vec))
}

fn len_hint_opt(&self) -> Option<usize> {
Some(self.vec.len())
}
}

impl<'a, T> IndexedParallelIterator<'a> for IntoIter<T>
where
T: Send + 'a,
{
fn drive_indexed<E, C, D, R>(self, executor: E, consumer: C) -> E::Result
where
E: Executor<'a, D>,
C: Consumer<Self::Item, Result = D, Reducer = R> + 'a,
D: Send,
R: Reducer<D> + Send,
{
self.with_producer_indexed(ExecutorCallback::new(executor, consumer))
}

fn with_producer_indexed<CB>(self, callback: CB) -> CB::Output
where
CB: IndexedProducerCallback<'a, Self::Item>,
{
callback.callback(VecProducer::new(self.vec))
}

fn len_hint(&self) -> usize {
self.vec.len()
}
}

impl<'a, T> ParallelDrainRange<'a, usize> for &'a mut Vec<T>
where
T: Send,
{
type Iter = Drain<'a, T>;
type Item = T;

fn par_drain<R: RangeBounds<usize>>(self, range: R) -> Self::Iter {
let length = self.len();

Drain {
vec: self,
range: simplify_range(range, length),
length,
}
}
}

/* VecProducer */

struct VecProducer<'a, T> {
vec: Arc<VecContainer<T>>,
slice: &'a mut [T],
}

struct VecContainer<T>(Vec<T>);

impl<T> Drop for VecContainer<T> {
fn drop(&mut self) {
unsafe {
self.0.set_len(0);
}
}
}

unsafe impl<T> Sync for VecContainer<T> {}

impl<'a, T> VecProducer<'a, T> {
fn new(mut vec: Vec<T>) -> Self {
unsafe {
let len = vec.len();
let ptr = vec.as_mut_ptr();
let slice = from_raw_parts_mut(ptr, len);

Self {
vec: Arc::new(VecContainer(vec)),
slice,
}
}
}
}

impl<'a, T> Drop for VecProducer<'a, T> {
fn drop(&mut self) {
unsafe {
drop_in_place(self.slice);
}
}
}

impl<'a, T> Producer for VecProducer<'a, T>
where
T: Send,
{
type Item = T;
type IntoIter = SliceIter<'a, T, Arc<VecContainer<T>>>;

fn into_iter(mut self) -> Self::IntoIter {
// replace the slice so we don't drop it twice
let slice = replace(&mut self.slice, &mut []);

SliceIter {
_container: self.vec.clone(),
iter: slice.iter_mut(),
}
}

fn split(self) -> (Self, Option<Self>) {
if self.slice.len() < 2 {
(self, None)
} else {
let mid = self.slice.len() / 2;
let (left, right) = self.split_at(mid);

(left, Some(right))
}
}
}

impl<'a, T> IndexedProducer for VecProducer<'a, T>
where
T: Send,
{
type Item = T;
type IntoIter = SliceIter<'a, T, Arc<VecContainer<T>>>;

fn into_iter(mut self) -> Self::IntoIter {
// replace the slice so we don't drop it twice
let slice = replace(&mut self.slice, &mut []);

SliceIter {
_container: self.vec.clone(),
iter: slice.iter_mut(),
}
}

fn len(&self) -> usize {
self.slice.len()
}

fn split_at(mut self, index: usize) -> (Self, Self) {
// replace the slice so we don't drop it twice
let slice = replace(&mut self.slice, &mut []);
let (left, right) = slice.split_at_mut(index);

let left = VecProducer {
vec: self.vec.clone(),
slice: left,
};
let right = VecProducer {
vec: self.vec.clone(),
slice: right,
};

(left, right)
}
}

/* Drain */

#[derive(Debug)]
pub struct Drain<'a, T> {
vec: &'a mut Vec<T>,
range: Range<usize>,
length: usize,
}

impl<'a, T> Drain<'a, T>
where
Self: 'a,
{
fn into_producer(self) -> DrainProducer<'a, T> {
unsafe {
let mut drain = Arc::new(self);
let this = Arc::get_mut(&mut drain).unwrap();

// Make the vector forget about the drained items, and temporarily the tail too.
let start = this.range.start;
this.vec.set_len(start);

// Get slice of the processed data.
let ptr_start = this.vec.as_mut_ptr().add(start);
let slice = from_raw_parts_mut(ptr_start, this.range.len());

DrainProducer { drain, slice }
}
}
}

unsafe impl<'a, T> Sync for Drain<'a, T> {}

impl<'a, T> Drop for Drain<'a, T> {
fn drop(&mut self) {
if self.range.is_empty() {
return;
}

let Range { start, end } = self.range;
if self.vec.len() != start {
assert_eq!(self.vec.len(), self.length);

self.vec.drain(start..end);
} else if end < self.length {
unsafe {
let ptr_start = self.vec.as_mut_ptr().add(start);
let ptr_end = self.vec.as_ptr().add(end);
let count = self.length - end;

copy(ptr_end, ptr_start, count);

self.vec.set_len(start + count);
}
}
}
}

impl<'a, T> ParallelIterator<'a> for Drain<'a, T>
where
T: Send,
{
type Item = T;

fn drive<E, C, D, R>(self, executor: E, consumer: C) -> E::Result
where
E: Executor<'a, D>,
C: Consumer<Self::Item, Result = D, Reducer = R> + 'a,
D: Send,
R: Reducer<D> + Send,
{
self.with_producer_indexed(ExecutorCallback::new(executor, consumer))
}

fn with_producer<CB>(self, callback: CB) -> CB::Output
where
CB: ProducerCallback<'a, Self::Item>,
{
callback.callback(self.into_producer())
}

fn len_hint_opt(&self) -> Option<usize> {
Some(self.range.len())
}
}

impl<'a, T> IndexedParallelIterator<'a> for Drain<'a, T>
where
T: Send,
{
fn drive_indexed<E, C, D, R>(self, executor: E, consumer: C) -> E::Result
where
E: Executor<'a, D>,
C: Consumer<Self::Item, Result = D, Reducer = R> + 'a,
D: Send,
R: Reducer<D> + Send,
{
self.with_producer_indexed(ExecutorCallback::new(executor, consumer))
}

fn with_producer_indexed<CB>(self, callback: CB) -> CB::Output
where
CB: IndexedProducerCallback<'a, Self::Item>,
{
callback.callback(self.into_producer())
}

fn len_hint(&self) -> usize {
self.range.len()
}
}

/* DrainProducer */

struct DrainProducer<'a, T> {
drain: Arc<Drain<'a, T>>,
slice: &'a mut [T],
}

impl<'a, T> Drop for DrainProducer<'a, T> {
fn drop(&mut self) {
unsafe {
drop_in_place(self.slice);
}
}
}

impl<'a, T> Producer for DrainProducer<'a, T>
where
T: Send,
{
type Item = T;
type IntoIter = SliceIter<'a, T, Arc<Drain<'a, T>>>;

fn into_iter(mut self) -> Self::IntoIter {
// replace the slice so we don't drop it twice
let slice = replace(&mut self.slice, &mut []);

SliceIter {
_container: self.drain.clone(),
iter: slice.iter_mut(),
}
}

fn split(self) -> (Self, Option<Self>) {
if self.slice.len() < 2 {
(self, None)
} else {
let mid = self.slice.len() / 2;
let (left, right) = self.split_at(mid);

(left, Some(right))
}
}
}

impl<'a, T> IndexedProducer for DrainProducer<'a, T>
where
T: Send,
{
type Item = T;
type IntoIter = SliceIter<'a, T, Arc<Drain<'a, T>>>;

fn into_iter(mut self) -> Self::IntoIter {
// replace the slice so we don't drop it twice
let slice = replace(&mut self.slice, &mut []);

SliceIter {
_container: self.drain.clone(),
iter: slice.iter_mut(),
}
}

fn len(&self) -> usize {
self.slice.len()
}

fn split_at(mut self, index: usize) -> (Self, Self) {
// replace the slice so we don't drop it twice
let slice = replace(&mut self.slice, &mut []);
let (left, right) = slice.split_at_mut(index);

let left = DrainProducer {
slice: left,
drain: self.drain.clone(),
};
let right = DrainProducer {
slice: right,
drain: self.drain.clone(),
};

(left, right)
}
}

/* SliceIter */

struct SliceIter<'a, T, C> {
_container: C,
iter: IterMut<'a, T>,
}

impl<'a, T, C> Drop for SliceIter<'a, T, C> {
fn drop(&mut self) {
// extract the iterator so we can use `Drop for [T]`
let iter = replace(&mut self.iter, [].iter_mut());

unsafe { drop_in_place(iter.into_slice()) };
}
}

impl<'a, T, C> Iterator for SliceIter<'a, T, C> {
type Item = T;

fn next(&mut self) -> Option<T> {
let ptr = self.iter.next()?;

Some(unsafe { read(ptr) })
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.iter.size_hint()
}

fn count(self) -> usize {
self.iter.len()
}
}

impl<'a, T, C> DoubleEndedIterator for SliceIter<'a, T, C> {
fn next_back(&mut self) -> Option<Self::Item> {
let ptr = self.iter.next_back()?;

Some(unsafe { read(ptr) })
}
}

impl<'a, T, C> ExactSizeIterator for SliceIter<'a, T, C> {
fn len(&self) -> usize {
self.iter.len()
}
}

impl<'a, T, C> FusedIterator for SliceIter<'a, T, C> {}

Yükleniyor…
İptal
Kaydet