From 5fb9e8f8a2546f5668c30ab85b26cc9e11394fd2 Mon Sep 17 00:00:00 2001 From: Bergmann89 Date: Mon, 26 Oct 2020 19:56:41 +0100 Subject: [PATCH] Implemented core features for async parallel iterator crate called asparit --- .gitignore | 1 + Cargo.toml | 4 +- {parallel-iterator => asparit}/Cargo.toml | 2 +- asparit/src/README.md | 315 ++++++++++++++++++++++ asparit/src/core/collector.rs | 30 +++ asparit/src/core/consumer.rs | 56 ++++ asparit/src/core/executor.rs | 21 ++ asparit/src/core/folder.rs | 43 +++ asparit/src/core/into_iter.rs | 148 ++++++++++ asparit/src/core/iterator.rs | 156 +++++++++++ asparit/src/core/mod.rs | 17 ++ asparit/src/core/producer.rs | 144 ++++++++++ asparit/src/core/reducer.rs | 12 + asparit/src/executor/mod.rs | 5 + asparit/src/executor/sequential.rs | 70 +++++ asparit/src/inner/for_each.rs | 107 ++++++++ asparit/src/inner/mod.rs | 2 + asparit/src/inner/noop.rs | 51 ++++ asparit/src/lib.rs | 11 + asparit/src/std/mod.rs | 1 + asparit/src/std/range.rs | 71 +++++ async-ecs/Cargo.toml | 2 +- async-ecs/src/access/par_join.rs | 2 +- async-ecs/src/lib.rs | 2 +- async-ecs/src/misc/bit_producer.rs | 2 +- async-ecs/src/misc/tokio_executor.rs | 4 +- parallel-iterator/src/for_each.rs | 61 ----- parallel-iterator/src/lib.rs | 121 --------- parallel-iterator/src/no_op.rs | 7 - 29 files changed, 1269 insertions(+), 199 deletions(-) rename {parallel-iterator => asparit}/Cargo.toml (79%) create mode 100644 asparit/src/README.md create mode 100644 asparit/src/core/collector.rs create mode 100644 asparit/src/core/consumer.rs create mode 100644 asparit/src/core/executor.rs create mode 100644 asparit/src/core/folder.rs create mode 100644 asparit/src/core/into_iter.rs create mode 100644 asparit/src/core/iterator.rs create mode 100644 asparit/src/core/mod.rs create mode 100644 asparit/src/core/producer.rs create mode 100644 asparit/src/core/reducer.rs create mode 100644 asparit/src/executor/mod.rs create mode 100644 asparit/src/executor/sequential.rs create mode 100644 asparit/src/inner/for_each.rs create mode 100644 asparit/src/inner/mod.rs create mode 100644 asparit/src/inner/noop.rs create mode 100644 asparit/src/lib.rs create mode 100644 asparit/src/std/mod.rs create mode 100644 asparit/src/std/range.rs delete mode 100644 parallel-iterator/src/for_each.rs delete mode 100644 parallel-iterator/src/lib.rs delete mode 100644 parallel-iterator/src/no_op.rs diff --git a/.gitignore b/.gitignore index 96ef6c0..ff0d847 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /target +/.vscode Cargo.lock diff --git a/Cargo.toml b/Cargo.toml index 1658552..c96bd2d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,10 +1,10 @@ [workspace] members = [ + "asparit", "async-ecs", "async-ecs-derive", - "parallel-iterator" ] [patch.crates-io] +asparit = { path = "asparit" } async-ecs-derive = { path = "async-ecs-derive" } -parallel-iterator = { path = "parallel-iterator" } diff --git a/parallel-iterator/Cargo.toml b/asparit/Cargo.toml similarity index 79% rename from parallel-iterator/Cargo.toml rename to asparit/Cargo.toml index 103b40a..c8b779e 100644 --- a/parallel-iterator/Cargo.toml +++ b/asparit/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "parallel-iterator" +name = "asparit" version = "0.1.0" authors = ["Bergmann89 "] edition = "2018" diff --git a/asparit/src/README.md b/asparit/src/README.md new file mode 100644 index 0000000..cd94eae --- /dev/null +++ b/asparit/src/README.md @@ -0,0 +1,315 @@ +# Parallel Iterators + +These are some notes on the design of the parallel iterator traits. +This file does not describe how to **use** parallel iterators. + +## The challenge + +Parallel iterators are more complicated than sequential iterators. +The reason is that they have to be able to split themselves up and +operate in parallel across the two halves. + +The current design for parallel iterators has two distinct modes in +which they can be used; as we will see, not all iterators support both +modes (which is why there are two): + +- **Pull mode** (the `Producer` and `UnindexedProducer` traits): in this mode, + the iterator is asked to produce the next item using a call to `next`. This + is basically like a normal iterator, but with a twist: you can split the + iterator in half to produce disjoint items in separate threads. + - in the `Producer` trait, splitting is done with `split_at`, which accepts + an index where the split should be performed. Only indexed iterators can + work in this mode, as they know exactly how much data they will produce, + and how to locate the requested index. + - in the `UnindexedProducer` trait, splitting is done with `split`, which + simply requests that the producer divide itself *approximately* in half. + This is useful when the exact length and/or layout is unknown, as with + `String` characters, or when the length might exceed `usize`, as with + `Range` on 32-bit platforms. + - In theory, any `Producer` could act unindexed, but we don't currently + use that possibility. When you know the exact length, a `split` can + simply be implemented as `split_at(length/2)`. +- **Push mode** (the `Consumer` and `UnindexedConsumer` traits): in + this mode, the iterator instead is *given* each item in turn, which + is then processed. This is the opposite of a normal iterator. It's + more like a `for_each` call: each time a new item is produced, the + `consume` method is called with that item. (The traits themselves are + a bit more complex, as they support state that can be threaded + through and ultimately reduced.) Unlike producers, there are two + variants of consumers. The difference is how the split is performed: + - in the `Consumer` trait, splitting is done with `split_at`, which + accepts an index where the split should be performed. All + iterators can work in this mode. The resulting halves thus have an + idea about how much data they expect to consume. + - in the `UnindexedConsumer` trait, splitting is done with + `split_off_left`. There is no index: the resulting halves must be + prepared to process any amount of data, and they don't know where that + data falls in the overall stream. + - Not all consumers can operate in this mode. It works for + `for_each` and `reduce`, for example, but it does not work for + `collect_into_vec`, since in that case the position of each item is + important for knowing where it ends up in the target collection. + +## How iterator execution proceeds + +We'll walk through this example iterator chain to start. This chain +demonstrates more-or-less the full complexity of what can happen. + +```rust +vec1.par_iter() + .zip(vec2.par_iter()) + .flat_map(some_function) + .for_each(some_other_function) +``` + +To handle an iterator chain, we start by creating consumers. This +works from the end. So in this case, the call to `for_each` is the +final step, so it will create a `ForEachConsumer` that, given an item, +just calls `some_other_function` with that item. (`ForEachConsumer` is +a very simple consumer because it doesn't need to thread any state +between items at all.) + +Now, the `for_each` call will pass this consumer to the base iterator, +which is the `flat_map`. It will do this by calling the `drive_unindexed` +method on the `ParallelIterator` trait. `drive_unindexed` basically +says "produce items for this iterator and feed them to this consumer"; +it only works for unindexed consumers. + +(As an aside, it is interesting that only some consumers can work in +unindexed mode, but all producers can *drive* an unindexed consumer. +In contrast, only some producers can drive an *indexed* consumer, but +all consumers can be supplied indexes. Isn't variance neat.) + +As it happens, `FlatMap` only works with unindexed consumers anyway. +This is because flat-map basically has no idea how many items it will +produce. If you ask flat-map to produce the 22nd item, it can't do it, +at least not without some intermediate state. It doesn't know whether +processing the first item will create 1 item, 3 items, or 100; +therefore, to produce an arbitrary item, it would basically just have +to start at the beginning and execute sequentially, which is not what +we want. But for unindexed consumers, this doesn't matter, since they +don't need to know how much data they will get. + +Therefore, `FlatMap` can wrap the `ForEachConsumer` with a +`FlatMapConsumer` that feeds to it. This `FlatMapConsumer` will be +given one item. It will then invoke `some_function` to get a parallel +iterator out. It will then ask this new parallel iterator to drive the +`ForEachConsumer`. The `drive_unindexed` method on `flat_map` can then +pass the `FlatMapConsumer` up the chain to the previous item, which is +`zip`. At this point, something interesting happens. + +## Switching from push to pull mode + +If you think about `zip`, it can't really be implemented as a +consumer, at least not without an intermediate thread and some +channels or something (or maybe coroutines). The problem is that it +has to walk two iterators *in lockstep*. Basically, it can't call two +`drive` methods simultaneously, it can only call one at a time. So at +this point, the `zip` iterator needs to switch from *push mode* into +*pull mode*. + +You'll note that `Zip` is only usable if its inputs implement +`IndexedParallelIterator`, meaning that they can produce data starting +at random points in the stream. This need to switch to push mode is +exactly why. If we want to split a zip iterator at position 22, we +need to be able to start zipping items from index 22 right away, +without having to start from index 0. + +Anyway, so at this point, the `drive_unindexed` method for `Zip` stops +creating consumers. Instead, it creates a *producer*, a `ZipProducer`, +to be exact, and calls the `bridge` function in the `internals` +module. Creating a `ZipProducer` will in turn create producers for +the two iterators being zipped. This is possible because they both +implement `IndexedParallelIterator`. + +The `bridge` function will then connect the consumer, which is +handling the `flat_map` and `for_each`, with the producer, which is +handling the `zip` and its preecessors. It will split down until the +chunks seem reasonably small, then pull items from the producer and +feed them to the consumer. + +## The base case + +The other time that `bridge` gets used is when we bottom out in an +indexed producer, such as a slice or range. There is also a +`bridge_unindexed` equivalent for - you guessed it - unindexed producers, +such as string characters. + + + +## What on earth is `ProducerCallback`? + +We saw that when you call a parallel action method like +`par_iter.reduce()`, that will create a "reducing" consumer and then +invoke `par_iter.drive_unindexed()` (or `par_iter.drive()`) as +appropriate. This may create yet more consumers as we proceed up the +parallel iterator chain. But at some point we're going to get to the +start of the chain, or to a parallel iterator (like `zip()`) that has +to coordinate multiple inputs. At that point, we need to start +converting parallel iterators into producers. + +The way we do this is by invoking the method `with_producer()`, defined on +`IndexedParallelIterator`. This is a callback scheme. In an ideal world, +it would work like this: + +```rust +base_iter.with_producer(|base_producer| { + // here, `base_producer` is the producer for `base_iter` +}); +``` + +In that case, we could implement a combinator like `map()` by getting +the producer for the base iterator, wrapping it to make our own +`MapProducer`, and then passing that to the callback. Something like +this: + +```rust +struct MapProducer<'f, P, F: 'f> { + base: P, + map_op: &'f F, +} + +impl IndexedParallelIterator for Map + where I: IndexedParallelIterator, + F: MapOp, +{ + fn with_producer(self, callback: CB) -> CB::Output { + let map_op = &self.map_op; + self.base_iter.with_producer(|base_producer| { + // Here `producer` is the producer for `self.base_iter`. + // Wrap that to make a `MapProducer` + let map_producer = MapProducer { + base: base_producer, + map_op: map_op + }; + + // invoke the callback with the wrapped version + callback(map_producer) + }); + } +}); +``` + +This example demonstrates some of the power of the callback scheme. +It winds up being a very flexible setup. For one thing, it means we +can take ownership of `par_iter`; we can then in turn give ownership +away of its bits and pieces into the producer (this is very useful if +the iterator owns an `&mut` slice, for example), or create shared +references and put *those* in the producer. In the case of map, for +example, the parallel iterator owns the `map_op`, and we borrow +references to it which we then put into the `MapProducer` (this means +the `MapProducer` can easily split itself and share those references). +The `with_producer` method can also create resources that are needed +during the parallel execution, since the producer does not have to be +returned. + +Unfortunately there is a catch. We can't actually use closures the way +I showed you. To see why, think about the type that `map_producer` +would have to have. If we were going to write the `with_producer` +method using a closure, it would have to look something like this: + +```rust +pub trait IndexedParallelIterator: ParallelIterator { + type Producer; + fn with_producer(self, callback: CB) -> R + where CB: FnOnce(Self::Producer) -> R; + ... +} +``` + +Note that we had to add this associated type `Producer` so that +we could specify the argument of the callback to be `Self::Producer`. +Now, imagine trying to write that `MapProducer` impl using this style: + +```rust +impl IndexedParallelIterator for Map + where I: IndexedParallelIterator, + F: MapOp, +{ + type MapProducer = MapProducer<'f, P::Producer, F>; + // ^^ wait, what is this `'f`? + + fn with_producer(self, callback: CB) -> R + where CB: FnOnce(Self::Producer) -> R + { + let map_op = &self.map_op; + // ^^^^^^ `'f` is (conceptually) the lifetime of this reference, + // so it will be different for each call to `with_producer`! + } +} +``` + +This may look familiar to you: it's the same problem that we have +trying to define an `Iterable` trait. Basically, the producer type +needs to include a lifetime (here, `'f`) that refers to the body of +`with_producer` and hence is not in scope at the impl level. + +If we had [associated type constructors][1598], we could solve this +problem that way. But there is another solution. We can use a +dedicated callback trait like `ProducerCallback`, instead of `FnOnce`: + +[1598]: https://github.com/rust-lang/rfcs/pull/1598 + +```rust +pub trait ProducerCallback { + type Output; + fn callback

(self, producer: P) -> Self::Output + where P: Producer; +} +``` + +Using this trait, the signature of `with_producer()` looks like this: + +```rust +fn with_producer>(self, callback: CB) -> CB::Output; +``` + +Notice that this signature **never has to name the producer type** -- +there is no associated type `Producer` anymore. This is because the +`callback()` method is generically over **all** producers `P`. + +The problem is that now the `||` sugar doesn't work anymore. So we +have to manually create the callback struct, which is a mite tedious. +So our `MapProducer` code looks like this: + +```rust +impl IndexedParallelIterator for Map + where I: IndexedParallelIterator, + F: MapOp, +{ + fn with_producer(self, callback: CB) -> CB::Output + where CB: ProducerCallback + { + return self.base.with_producer(Callback { callback: callback, map_op: self.map_op }); + // ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + // Manual version of the closure sugar: create an instance + // of a struct that implements `ProducerCallback`. + + // The struct declaration. Each field is something that need to capture from the + // creating scope. + struct Callback { + callback: CB, + map_op: F, + } + + // Implement the `ProducerCallback` trait. This is pure boilerplate. + impl ProducerCallback for Callback + where F: MapOp, + CB: ProducerCallback + { + type Output = CB::Output; + + fn callback

(self, base: P) -> CB::Output + where P: Producer + { + // The body of the closure is here: + let producer = MapProducer { base: base, + map_op: &self.map_op }; + self.callback.callback(producer) + } + } + } +} +``` + +OK, a bit tedious, but it works! diff --git a/asparit/src/core/collector.rs b/asparit/src/core/collector.rs new file mode 100644 index 0000000..211b1d7 --- /dev/null +++ b/asparit/src/core/collector.rs @@ -0,0 +1,30 @@ +use crate::{ + Consumer, DefaultExecutor, Executor, IndexedConsumer, IndexedExecutor, IndexedParallelIterator, + ParallelIterator, +}; + +pub trait Collector: Sized { + type Iterator: ParallelIterator; + type Consumer: Consumer<::Item>; + + fn exec_with(self, executor: E) -> E::Result + where + E: Executor; + + fn exec(self) -> >::Result { + self.exec_with(DefaultExecutor::default()) + } +} + +pub trait IndexedCollector: Sized { + type Iterator: IndexedParallelIterator; + type Consumer: IndexedConsumer<::Item>; + + fn exec_with(self, executor: E) -> E::Result + where + E: IndexedExecutor; + + fn exec(self) -> >::Result { + self.exec_with(DefaultExecutor::default()) + } +} diff --git a/asparit/src/core/consumer.rs b/asparit/src/core/consumer.rs new file mode 100644 index 0000000..fb9bb09 --- /dev/null +++ b/asparit/src/core/consumer.rs @@ -0,0 +1,56 @@ +use super::{Folder, Reducer}; + +/// A consumer is effectively a [generalized "fold" operation][fold], +/// and in fact each consumer will eventually be converted into a +/// [`Folder`]. What makes a consumer special is that, like a +/// [`Producer`], it can be **split** into multiple consumers using +/// the `split_off_left` method. When a consumer is split, it produces two +/// consumers, as well as a **reducer**. The two consumers can be fed +/// items independently, and when they are done the reducer is used to +/// combine their two results into one. See [the README][r] for further +/// details. +/// +/// [r]: README.md +/// [fold]: https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.fold +/// [`Folder`]: trait.Folder.html +/// [`Producer`]: trait.Producer.html +pub trait Consumer: Send + Sized { + /// The type of folder that this consumer can be converted into. + type Folder: Folder; + + /// The type of reducer that is produced if this consumer is split. + type Reducer: Reducer; + + /// The type of result that this consumer will ultimately produce. + type Result: Send; + + /// Splits off a "left" consumer and returns it. The `self` + /// consumer should then be used to consume the "right" portion of + /// the data. (The ordering matters for methods like find_first -- + /// values produced by the returned value are given precedence + /// over values produced by `self`.) Once the left and right + /// halves have been fully consumed, you should reduce the results + /// with the result of `to_reducer`. + fn split_off_left(&self) -> (Self, Self::Reducer); + + /// Convert the consumer into a folder that can consume items + /// sequentially, eventually producing a final result. + fn into_folder(self) -> Self::Folder; + + /// Hint whether this `Consumer` would like to stop processing + /// further items, e.g. if a search has been completed. + fn is_full(&self) -> bool { + false + } +} + +/// A stateless consumer can be freely copied. These consumers can be +/// used like regular consumers, but they also support a +/// `split_at` method that does take an index to split. +pub trait IndexedConsumer: Consumer { + /// Divide the consumer into two consumers, one processing items + /// `0..index` and one processing items from `index..`. Also + /// produces a reducer that can be used to reduce the results at + /// the end. + fn split_at(self, index: usize) -> (Self, Self, Self::Reducer); +} diff --git a/asparit/src/core/executor.rs b/asparit/src/core/executor.rs new file mode 100644 index 0000000..bcd4cf1 --- /dev/null +++ b/asparit/src/core/executor.rs @@ -0,0 +1,21 @@ +use super::{Consumer, IndexedConsumer, IndexedParallelIterator, ParallelIterator}; + +pub trait Executor +where + I: ParallelIterator, + C: Consumer, +{ + type Result; + + fn exec(self, iterator: I, consumer: C) -> Self::Result; +} + +pub trait IndexedExecutor +where + I: IndexedParallelIterator, + C: IndexedConsumer, +{ + type Result; + + fn exec_indexed(self, iterator: I, consumer: C) -> Self::Result; +} diff --git a/asparit/src/core/folder.rs b/asparit/src/core/folder.rs new file mode 100644 index 0000000..57bd24f --- /dev/null +++ b/asparit/src/core/folder.rs @@ -0,0 +1,43 @@ +/// The `Folder` trait encapsulates [the standard fold +/// operation][fold]. It can be fed many items using the `consume` +/// method. At the end, once all items have been consumed, it can then +/// be converted (using `complete`) into a final value. +/// +/// [fold]: https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.fold +pub trait Folder: Sized { + /// The type of result that will ultimately be produced by the folder. + type Result; + + /// Consume next item and return new sequential state. + fn consume(self, item: Item) -> Self; + + /// Consume items from the iterator until full, and return new sequential state. + /// + /// This method is **optional**. The default simply iterates over + /// `iter`, invoking `consume` and checking after each iteration + /// whether `full` returns false. + /// + /// The main reason to override it is if you can provide a more + /// specialized, efficient implementation. + fn consume_iter(mut self, iter: I) -> Self + where + I: IntoIterator, + { + for item in iter { + self = self.consume(item); + if self.is_full() { + break; + } + } + self + } + + /// Finish consuming items, produce final result. + fn complete(self) -> Self::Result; + + /// Hint whether this `Folder` would like to stop processing + /// further items, e.g. if a search has been completed. + fn is_full(&self) -> bool { + false + } +} diff --git a/asparit/src/core/into_iter.rs b/asparit/src/core/into_iter.rs new file mode 100644 index 0000000..d4c5558 --- /dev/null +++ b/asparit/src/core/into_iter.rs @@ -0,0 +1,148 @@ +use super::ParallelIterator; + +/// `IntoParallelIterator` implements the conversion to a [`ParallelIterator`]. +/// +/// By implementing `IntoParallelIterator` for a type, you define how it will +/// transformed into an iterator. This is a parallel version of the standard +/// library's [`std::iter::IntoIterator`] trait. +/// +/// [`ParallelIterator`]: trait.ParallelIterator.html +/// [`std::iter::IntoIterator`]: https://doc.rust-lang.org/std/iter/trait.IntoIterator.html +pub trait IntoParallelIterator { + /// The parallel iterator type that will be created. + type Iter: ParallelIterator; + + /// The type of item that the parallel iterator will produce. + type Item: Send; + + /// Converts `self` into a parallel iterator. + /// + /// # Examples + /// + /// ``` + /// use asparit::*; + /// + /// println!("counting in parallel:"); + /// (0..100).into_par_iter() + /// .for_each(|i| println!("{}", i)); + /// ``` + /// + /// This conversion is often implicit for arguments to methods like [`zip`]. + /// + /// ``` + /// use asparit::*; + /// + /// let v: Vec<_> = (0..5).into_par_iter().zip(5..10).collect(); + /// assert_eq!(v, [(0, 5), (1, 6), (2, 7), (3, 8), (4, 9)]); + /// ``` + /// + /// [`zip`]: trait.IndexedParallelIterator.html#method.zip + fn into_par_iter(self) -> Self::Iter; +} + +/// `IntoParallelRefIterator` implements the conversion to a +/// [`ParallelIterator`], providing shared references to the data. +/// +/// This is a parallel version of the `iter()` method +/// defined by various collections. +/// +/// This trait is automatically implemented +/// `for I where &I: IntoParallelIterator`. In most cases, users +/// will want to implement [`IntoParallelIterator`] rather than implement +/// this trait directly. +/// +/// [`ParallelIterator`]: trait.ParallelIterator.html +/// [`IntoParallelIterator`]: trait.IntoParallelIterator.html +pub trait IntoParallelRefIterator<'data> { + /// The type of the parallel iterator that will be returned. + type Iter: ParallelIterator; + + /// The type of item that the parallel iterator will produce. + /// This will typically be an `&'data T` reference type. + type Item: Send + 'data; + + /// Converts `self` into a parallel iterator. + /// + /// # Examples + /// + /// ``` + /// use asparit::*; + /// + /// let v: Vec<_> = (0..100).collect(); + /// assert_eq!(v.par_iter().sum::(), 100 * 99 / 2); + /// + /// // `v.par_iter()` is shorthand for `(&v).into_par_iter()`, + /// // producing the exact same references. + /// assert!(v.par_iter().zip(&v) + /// .all(|(a, b)| std::ptr::eq(a, b))); + /// ``` + fn par_iter(&'data self) -> Self::Iter; +} + +/// `IntoParallelRefMutIterator` implements the conversion to a +/// [`ParallelIterator`], providing mutable references to the data. +/// +/// This is a parallel version of the `iter_mut()` method +/// defined by various collections. +/// +/// This trait is automatically implemented +/// `for I where &mut I: IntoParallelIterator`. In most cases, users +/// will want to implement [`IntoParallelIterator`] rather than implement +/// this trait directly. +/// +/// [`ParallelIterator`]: trait.ParallelIterator.html +/// [`IntoParallelIterator`]: trait.IntoParallelIterator.html +pub trait IntoParallelRefMutIterator<'data> { + /// The type of iterator that will be created. + type Iter: ParallelIterator; + + /// The type of item that will be produced; this is typically an + /// `&'data mut T` reference. + type Item: Send + 'data; + + /// Creates the parallel iterator from `self`. + /// + /// # Examples + /// + /// ``` + /// use asparit::*; + /// + /// let mut v = vec![0usize; 5]; + /// v.par_iter_mut().enumerate().for_each(|(i, x)| *x = i); + /// assert_eq!(v, [0, 1, 2, 3, 4]); + /// ``` + fn par_iter_mut(&'data mut self) -> Self::Iter; +} + +impl IntoParallelIterator for T { + type Iter = T; + type Item = T::Item; + + fn into_par_iter(self) -> T { + self + } +} + +impl<'data, I: 'data + ?Sized> IntoParallelRefIterator<'data> for I +where + &'data I: IntoParallelIterator, +{ + type Iter = <&'data I as IntoParallelIterator>::Iter; + type Item = <&'data I as IntoParallelIterator>::Item; + + fn par_iter(&'data self) -> Self::Iter { + self.into_par_iter() + } +} + +impl<'data, I: 'data + ?Sized> IntoParallelRefMutIterator<'data> for I +where + &'data mut I: IntoParallelIterator, +{ + type Iter = <&'data mut I as IntoParallelIterator>::Iter; + type Item = <&'data mut I as IntoParallelIterator>::Item; + + fn par_iter_mut(&'data mut self) -> Self::Iter { + self.into_par_iter() + } +} diff --git a/asparit/src/core/iterator.rs b/asparit/src/core/iterator.rs new file mode 100644 index 0000000..78a98ab --- /dev/null +++ b/asparit/src/core/iterator.rs @@ -0,0 +1,156 @@ +use super::{Consumer, Executor, IndexedConsumer, IndexedProducerCallback, ProducerCallback}; + +use crate::inner::for_each::ForEach; + +/// Parallel version of the standard iterator trait. +/// +/// The combinators on this trait are available on **all** parallel +/// iterators. Additional methods can be found on the +/// [`IndexedParallelIterator`] trait: those methods are only +/// available for parallel iterators where the number of items is +/// known in advance (so, e.g., after invoking `filter`, those methods +/// become unavailable). +/// +/// For examples of using parallel iterators, see [the docs on the +/// `iter` module][iter]. +/// +/// [iter]: index.html +/// [`IndexedParallelIterator`]: trait.IndexedParallelIterator.html +pub trait ParallelIterator: Sized + Send { + /// The type of item that this parallel iterator produces. + /// For example, if you use the [`for_each`] method, this is the type of + /// item that your closure will be invoked with. + /// + /// [`for_each`]: #method.for_each + type Item: Send; + + /// Internal method used to define the behavior of this parallel + /// iterator. You should not need to call this directly. + /// + /// This method causes the iterator `self` to start producing + /// items and to feed them to the consumer `consumer` one by one. + /// It may split the consumer before doing so to create the + /// opportunity to produce in parallel. + /// + /// See the [README] for more details on the internals of parallel + /// iterators. + /// + /// [README]: README.md + fn drive(self, executor: E, consumer: C) -> E::Result + where + E: Executor, + C: Consumer; + + /// Internal method used to define the behavior of this parallel + /// iterator. You should not need to call this directly. + /// + /// This method converts the iterator into a producer P and then + /// invokes `callback.callback()` with P. Note that the type of + /// this producer is not defined as part of the API, since + /// `callback` must be defined generically for all producers. This + /// allows the producer type to contain references; it also means + /// that parallel iterators can adjust that type without causing a + /// breaking change. + /// + /// See the [README] for more details on the internals of parallel + /// iterators. + /// + /// [README]: README.md + fn with_producer(self, callback: CB) -> CB::Output + where + CB: ProducerCallback; + + /// Internal method used to define the behavior of this parallel + /// iterator. You should not need to call this directly. + /// + /// Returns the number of items produced by this iterator, if known + /// statically. This can be used by consumers to trigger special fast + /// paths. Therefore, if `Some(_)` is returned, this iterator must only + /// use the (indexed) `Consumer` methods when driving a consumer, such + /// as `split_at()`. Calling `UnindexedConsumer::split_off_left()` or + /// other `UnindexedConsumer` methods -- or returning an inaccurate + /// value -- may result in panics. + /// + /// This method is currently used to optimize `collect` for want + /// of true Rust specialization; it may be removed when + /// specialization is stable. + fn len_hint_opt(&self) -> Option { + None + } + + /// Executes `OP` on each item produced by the iterator, in parallel. + /// + /// # Examples + /// + /// ``` + /// use asparit::*; + /// + /// (0..100).into_par_iter().for_each(|x| println!("{:?}", x)); + /// ``` + fn for_each(self, operation: F) -> ForEach + where + F: Fn(Self::Item) + Sync + Send, + { + ForEach::new(self, operation) + } +} + +/// An iterator that supports "random access" to its data, meaning +/// that you can split it at arbitrary indices and draw data from +/// those points. +/// +/// **Note:** Not implemented for `u64`, `i64`, `u128`, or `i128` ranges +pub trait IndexedParallelIterator: ParallelIterator { + /// Produces an exact count of how many items this iterator will + /// produce, presuming no panic occurs. + /// + /// # Examples + /// + /// ``` + /// use asparit::*; + /// + /// let par_iter = (0..100).into_par_iter().zip(vec![0; 10]); + /// assert_eq!(par_iter.len(), 10); + /// + /// let vec: Vec<_> = par_iter.collect(); + /// assert_eq!(vec.len(), 10); + /// ``` + fn len_hint(&self) -> usize; + + /// Internal method used to define the behavior of this parallel + /// iterator. You should not need to call this directly. + /// + /// This method causes the iterator `self` to start producing + /// items and to feed them to the consumer `consumer` one by one. + /// It may split the consumer before doing so to create the + /// opportunity to produce in parallel. If a split does happen, it + /// will inform the consumer of the index where the split should + /// occur (unlike `ParallelIterator::drive_unindexed()`). + /// + /// See the [README] for more details on the internals of parallel + /// iterators. + /// + /// [README]: README.md + fn drive_indexed(self, consumer: C) -> C::Result + where + C: IndexedConsumer; + + /// Internal method used to define the behavior of this parallel + /// iterator. You should not need to call this directly. + /// + /// This method converts the iterator into a producer P and then + /// invokes `callback.callback()` with P. Note that the type of + /// this producer is not defined as part of the API, since + /// `callback` must be defined generically for all producers. This + /// allows the producer type to contain references; it also means + /// that parallel iterators can adjust that type without causing a + /// breaking change. + /// + /// See the [README] for more details on the internals of parallel + /// iterators. + /// + /// [README]: README.md + fn with_producer_indexed(self, callback: CB) -> CB::Output + where + CB: IndexedProducerCallback; +} diff --git a/asparit/src/core/mod.rs b/asparit/src/core/mod.rs new file mode 100644 index 0000000..9b07337 --- /dev/null +++ b/asparit/src/core/mod.rs @@ -0,0 +1,17 @@ +mod collector; +mod consumer; +mod executor; +mod folder; +mod into_iter; +mod iterator; +mod producer; +mod reducer; + +pub use collector::Collector; +pub use consumer::{Consumer, IndexedConsumer}; +pub use executor::{Executor, IndexedExecutor}; +pub use folder::Folder; +pub use into_iter::{IntoParallelIterator, IntoParallelRefIterator, IntoParallelRefMutIterator}; +pub use iterator::{IndexedParallelIterator, ParallelIterator}; +pub use producer::{IndexedProducer, IndexedProducerCallback, Producer, ProducerCallback}; +pub use reducer::Reducer; diff --git a/asparit/src/core/producer.rs b/asparit/src/core/producer.rs new file mode 100644 index 0000000..a9b0a86 --- /dev/null +++ b/asparit/src/core/producer.rs @@ -0,0 +1,144 @@ +use super::Folder; + +/// A variant on `Producer` which does not know its exact length or +/// cannot represent it in a `usize`. These producers act like +/// ordinary producers except that they cannot be told to split at a +/// particular point. Instead, you just ask them to split 'somewhere'. +/// +/// (In principle, `Producer` could extend this trait; however, it +/// does not because to do so would require producers to carry their +/// own length with them.) +pub trait Producer: Send + Sized { + /// The type of item returned by this producer. + type Item; + + /// Split midway into a new producer if possible, otherwise return `None`. + fn split(self) -> (Self, Option); + + /// Iterate the producer, feeding each element to `folder`, and + /// stop when the folder is full (or all elements have been consumed). + fn fold_with(self, folder: F) -> F + where + F: Folder; +} + +/// A `Producer` is effectively a "splittable `IntoIterator`". That +/// is, a producer is a value which can be converted into an iterator +/// at any time: at that point, it simply produces items on demand, +/// like any iterator. But what makes a `Producer` special is that, +/// *before* we convert to an iterator, we can also **split** it at a +/// particular point using the `split_at` method. This will yield up +/// two producers, one producing the items before that point, and one +/// producing the items after that point (these two producers can then +/// independently be split further, or be converted into iterators). +/// In Rayon, this splitting is used to divide between threads. +/// See [the `plumbing` README][r] for further details. +/// +/// Note that each producer will always produce a fixed number of +/// items N. However, this number N is not queryable through the API; +/// the consumer is expected to track it. +/// +/// NB. You might expect `Producer` to extend the `IntoIterator` +/// trait. However, [rust-lang/rust#20671][20671] prevents us from +/// declaring the DoubleEndedIterator and ExactSizeIterator +/// constraints on a required IntoIterator trait, so we inline +/// IntoIterator here until that issue is fixed. +/// +/// [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md +/// [20671]: https://github.com/rust-lang/rust/issues/20671 +pub trait IndexedProducer: Send + Sized { + /// The type of item that will be produced by this producer once + /// it is converted into an iterator. + type Item; + + /// The type of iterator we will become. + type IntoIter: Iterator + DoubleEndedIterator + ExactSizeIterator; + + /// Convert `self` into an iterator; at this point, no more parallel splits + /// are possible. + fn into_iter(self) -> Self::IntoIter; + + /// The minimum number of items that we will process + /// sequentially. Defaults to 1, which means that we will split + /// all the way down to a single item. This can be raised higher + /// using the [`with_min_len`] method, which will force us to + /// create sequential tasks at a larger granularity. Note that + /// Rayon automatically normally attempts to adjust the size of + /// parallel splits to reduce overhead, so this should not be + /// needed. + /// + /// [`with_min_len`]: ../trait.IndexedParallelIterator.html#method.with_min_len + fn min_len(&self) -> usize { + 1 + } + + /// The maximum number of items that we will process + /// sequentially. Defaults to MAX, which means that we can choose + /// not to split at all. This can be lowered using the + /// [`with_max_len`] method, which will force us to create more + /// parallel tasks. Note that Rayon automatically normally + /// attempts to adjust the size of parallel splits to reduce + /// overhead, so this should not be needed. + /// + /// [`with_max_len`]: ../trait.IndexedParallelIterator.html#method.with_max_len + fn max_len(&self) -> usize { + usize::MAX + } + + /// Split into two producers; one produces items `0..index`, the + /// other `index..N`. Index must be less than or equal to `N`. + fn split_at(self, index: usize) -> (Self, Self); + + /// Iterate the producer, feeding each element to `folder`, and + /// stop when the folder is full (or all elements have been consumed). + /// + /// The provided implementation is sufficient for most iterables. + fn fold_with(self, folder: F) -> F + where + F: Folder, + { + folder.consume_iter(self.into_iter()) + } +} + +/// The `ProducerCallback` trait is a kind of generic closure, +/// [analogous to `FnOnce`][FnOnce]. See [the corresponding section in +/// the plumbing README][r] for more details. +/// +/// [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md#producer-callback +/// [FnOnce]: https://doc.rust-lang.org/std/ops/trait.FnOnce.html +pub trait ProducerCallback { + /// The type of value returned by this callback. Analogous to + /// [`Output` from the `FnOnce` trait][Output]. + /// + /// [Output]: https://doc.rust-lang.org/std/ops/trait.FnOnce.html#associatedtype.Output + type Output; + + /// Invokes the callback with the given producer as argument. The + /// key point of this trait is that this method is generic over + /// `P`, and hence implementors must be defined for any producer. + fn callback

(self, producer: P) -> Self::Output + where + P: Producer; +} + +/// The `IndexedProducerCallback` trait is a kind of generic closure, +/// [analogous to `FnOnce`][FnOnce]. See [the corresponding section in +/// the plumbing README][r] for more details. +/// +/// [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md#producer-callback +/// [FnOnce]: https://doc.rust-lang.org/std/ops/trait.FnOnce.html +pub trait IndexedProducerCallback { + /// The type of value returned by this callback. Analogous to + /// [`Output` from the `FnOnce` trait][Output]. + /// + /// [Output]: https://doc.rust-lang.org/std/ops/trait.FnOnce.html#associatedtype.Output + type Output; + + /// Invokes the callback with the given producer as argument. The + /// key point of this trait is that this method is generic over + /// `P`, and hence implementors must be defined for any producer. + fn callback

(self, producer: P) -> Self::Output + where + P: IndexedProducer; +} diff --git a/asparit/src/core/reducer.rs b/asparit/src/core/reducer.rs new file mode 100644 index 0000000..2bb57e1 --- /dev/null +++ b/asparit/src/core/reducer.rs @@ -0,0 +1,12 @@ +/// The reducer is the final step of a `Consumer` -- after a consumer +/// has been split into two parts, and each of those parts has been +/// fully processed, we are left with two results. The reducer is then +/// used to combine those two results into one. See [the `plumbing` +/// README][r] for further details. +/// +/// [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md +pub trait Reducer { + /// Reduce two final results into one; this is executed after a + /// split. + fn reduce(self, left: Result, right: Result) -> Result; +} diff --git a/asparit/src/executor/mod.rs b/asparit/src/executor/mod.rs new file mode 100644 index 0000000..0ec9536 --- /dev/null +++ b/asparit/src/executor/mod.rs @@ -0,0 +1,5 @@ +mod sequential; + +pub use sequential::Sequential as SequentialExecutor; + +pub type DefaultExecutor = SequentialExecutor; diff --git a/asparit/src/executor/sequential.rs b/asparit/src/executor/sequential.rs new file mode 100644 index 0000000..316ee77 --- /dev/null +++ b/asparit/src/executor/sequential.rs @@ -0,0 +1,70 @@ +use crate::core::{ + Consumer, Executor, Folder, IndexedConsumer, IndexedExecutor, IndexedParallelIterator, + IndexedProducer, IndexedProducerCallback, ParallelIterator, Producer, ProducerCallback, +}; + +#[derive(Default)] +pub struct Sequential; + +struct Callback { + consumer: C, +} + +struct IndexedCallback { + consumer: C, +} + +impl Executor for Sequential +where + I: ParallelIterator, + C: Consumer, +{ + type Result = C::Result; + + fn exec(self, iterator: I, consumer: C) -> Self::Result { + iterator.with_producer(Callback { consumer }) + } +} + +impl IndexedExecutor for Sequential +where + I: IndexedParallelIterator, + C: IndexedConsumer, +{ + type Result = C::Result; + + fn exec_indexed(self, iterator: I, consumer: C) -> Self::Result { + iterator.with_producer_indexed(IndexedCallback { consumer }) + } +} + +impl ProducerCallback for Callback +where + C: Consumer, +{ + type Output = C::Result; + + fn callback

(self, producer: P) -> C::Result + where + P: Producer, + { + if self.consumer.is_full() { + self.consumer.into_folder().complete() + } else { + producer.fold_with(self.consumer.into_folder()).complete() + } + } +} + +impl IndexedProducerCallback for IndexedCallback +where + C: IndexedConsumer, +{ + type Output = C::Result; + fn callback

(self, _producer: P) -> C::Result + where + P: IndexedProducer, + { + self.consumer.into_folder().complete() + } +} diff --git a/asparit/src/inner/for_each.rs b/asparit/src/inner/for_each.rs new file mode 100644 index 0000000..65d018a --- /dev/null +++ b/asparit/src/inner/for_each.rs @@ -0,0 +1,107 @@ +use crate::{ + core::{Collector, Executor}, + Consumer, Folder, ParallelIterator, +}; + +use super::noop::NoOpReducer; + +pub struct ForEach { + iterator: I, + operation: F, +} + +impl ForEach { + pub fn new(iterator: I, operation: F) -> Self { + Self { + iterator, + operation, + } + } +} + +impl Collector for ForEach +where + I: ParallelIterator, + F: Fn(I::Item) + Sync + Send + Copy, +{ + type Iterator = I; + type Consumer = ForEachConsumer; + + fn exec_with(self, executor: E) -> E::Result + where + E: Executor, + { + let iterator = self.iterator; + let operation = self.operation; + + let consumer = ForEachConsumer { operation }; + + iterator.drive(executor, consumer) + } +} + +pub struct ForEachConsumer { + operation: F, +} + +impl Consumer for ForEachConsumer +where + F: Fn(T) + Sync + Send + Copy, +{ + type Folder = ForEachConsumer; + type Reducer = NoOpReducer; + type Result = (); + + fn split_off_left(&self) -> (Self, NoOpReducer) { + ( + ForEachConsumer { + operation: self.operation, + }, + NoOpReducer, + ) + } + + fn into_folder(self) -> Self { + self + } +} + +impl Folder for ForEachConsumer +where + F: Fn(T) + Sync + Send + Copy, +{ + type Result = (); + + fn consume(self, item: T) -> Self { + (self.operation)(item); + + self + } + + fn consume_iter(self, iter: I) -> Self + where + I: IntoIterator, + { + iter.into_iter().for_each(self.operation); + + self + } + + fn complete(self) {} +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::*; + + #[test] + fn test_for_each() { + (0..10usize) + .into_par_iter() + .for_each(&|j| { + println!("{}", j); + }) + .exec(); + } +} diff --git a/asparit/src/inner/mod.rs b/asparit/src/inner/mod.rs new file mode 100644 index 0000000..4ae4a5a --- /dev/null +++ b/asparit/src/inner/mod.rs @@ -0,0 +1,2 @@ +pub mod for_each; +pub mod noop; diff --git a/asparit/src/inner/noop.rs b/asparit/src/inner/noop.rs new file mode 100644 index 0000000..1b9759c --- /dev/null +++ b/asparit/src/inner/noop.rs @@ -0,0 +1,51 @@ +use crate::{Consumer, Folder, IndexedConsumer, Reducer}; + +pub struct NoOpConsumer; + +impl Consumer for NoOpConsumer { + type Folder = NoOpConsumer; + type Reducer = NoOpReducer; + type Result = (); + + fn split_off_left(&self) -> (Self, Self::Reducer) { + (NoOpConsumer, NoOpReducer) + } + + fn into_folder(self) -> Self { + self + } + + fn is_full(&self) -> bool { + false + } +} + +impl Folder for NoOpConsumer { + type Result = (); + + fn consume(self, _item: T) -> Self { + self + } + + fn consume_iter(self, iter: I) -> Self + where + I: IntoIterator, + { + iter.into_iter().for_each(drop); + self + } + + fn complete(self) {} +} + +impl IndexedConsumer for NoOpConsumer { + fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) { + (NoOpConsumer, NoOpConsumer, NoOpReducer) + } +} + +pub struct NoOpReducer; + +impl Reducer<()> for NoOpReducer { + fn reduce(self, _left: (), _right: ()) {} +} diff --git a/asparit/src/lib.rs b/asparit/src/lib.rs new file mode 100644 index 0000000..2aea8c4 --- /dev/null +++ b/asparit/src/lib.rs @@ -0,0 +1,11 @@ +mod core; +mod executor; +mod inner; +mod std; + +pub use self::core::{ + Consumer, Executor, Folder, IndexedConsumer, IndexedExecutor, IndexedParallelIterator, + IndexedProducer, IntoParallelIterator, IntoParallelRefIterator, IntoParallelRefMutIterator, + ParallelIterator, Producer, ProducerCallback, Reducer, +}; +pub use self::executor::{DefaultExecutor, SequentialExecutor}; diff --git a/asparit/src/std/mod.rs b/asparit/src/std/mod.rs new file mode 100644 index 0000000..b2277ba --- /dev/null +++ b/asparit/src/std/mod.rs @@ -0,0 +1 @@ +mod range; diff --git a/asparit/src/std/range.rs b/asparit/src/std/range.rs new file mode 100644 index 0000000..292226b --- /dev/null +++ b/asparit/src/std/range.rs @@ -0,0 +1,71 @@ +use std::ops::Range; + +use crate::{ + Consumer, Executor, Folder, IntoParallelIterator, ParallelIterator, Producer, ProducerCallback, +}; + +pub struct Iter { + range: Range, +} + +struct IterProducer { + range: Range, +} + +impl IntoParallelIterator for Range { + type Iter = Iter; + type Item = usize; + + fn into_par_iter(self) -> Self::Iter { + Iter { range: self } + } +} + +impl ParallelIterator for Iter { + type Item = usize; + + fn drive(self, executor: E, consumer: C) -> E::Result + where + E: Executor, + C: Consumer, + { + executor.exec(self, consumer) + } + + fn len_hint_opt(&self) -> Option { + Some(self.range.end - self.range.start) + } + + fn with_producer(self, callback: CB) -> CB::Output + where + CB: ProducerCallback, + { + callback.callback(IterProducer { range: self.range }) + } +} + +impl Producer for IterProducer { + type Item = usize; + + fn split(mut self) -> (Self, Option) { + let index = self.range.len() / 2; + + if index > 0 { + let mid = self.range.start.wrapping_add(index); + let right = mid..self.range.end; + + self.range.end = mid; + + (self, Some(IterProducer { range: right })) + } else { + (self, None) + } + } + + fn fold_with(self, folder: F) -> F + where + F: Folder, + { + folder.consume_iter(self.range) + } +} diff --git a/async-ecs/Cargo.toml b/async-ecs/Cargo.toml index ae7345e..f03f824 100644 --- a/async-ecs/Cargo.toml +++ b/async-ecs/Cargo.toml @@ -5,6 +5,7 @@ authors = ["Bergmann89 "] edition = "2018" [dependencies] +asparit = "0.1" async-ecs-derive = "0.1" env_logger = "0.8" futures = "0.3" @@ -13,7 +14,6 @@ hibitset = { version = "0.6", default-features = false } log = "0.4" mopa = "0.2" num_cpus = "1.13" -parallel-iterator = "0.1" rand = "0.7" thiserror = "1.0" tokio = { version = "0.3", features = [ "full", "net", "time", "rt-multi-thread" ] } diff --git a/async-ecs/src/access/par_join.rs b/async-ecs/src/access/par_join.rs index d64dd4e..c035273 100644 --- a/async-ecs/src/access/par_join.rs +++ b/async-ecs/src/access/par_join.rs @@ -1,7 +1,7 @@ use std::cell::UnsafeCell; +use asparit::{Consumer, Executor, Folder, ParallelIterator, Producer}; use hibitset::BitSetLike; -use parallel_iterator::{Consumer, Executor, Folder, ParallelIterator, Producer}; use crate::misc::{BitIter, BitProducer, TokioExecutor}; diff --git a/async-ecs/src/lib.rs b/async-ecs/src/lib.rs index fbb66dc..511c251 100644 --- a/async-ecs/src/lib.rs +++ b/async-ecs/src/lib.rs @@ -11,7 +11,7 @@ pub mod storage; pub mod system; pub mod world; -pub use parallel_iterator::ParallelIterator; +pub use asparit::ParallelIterator; pub use access::{Join, ParJoin, ReadStorage, WriteStorage}; pub use dispatcher::Dispatcher; diff --git a/async-ecs/src/misc/bit_producer.rs b/async-ecs/src/misc/bit_producer.rs index 43859ad..b037ca9 100644 --- a/async-ecs/src/misc/bit_producer.rs +++ b/async-ecs/src/misc/bit_producer.rs @@ -1,5 +1,5 @@ +use asparit::{Folder, Producer}; use hibitset::BitSetLike; -use parallel_iterator::{Folder, Producer}; use crate::misc::bit_average; diff --git a/async-ecs/src/misc/tokio_executor.rs b/async-ecs/src/misc/tokio_executor.rs index f6b21ba..972016b 100644 --- a/async-ecs/src/misc/tokio_executor.rs +++ b/async-ecs/src/misc/tokio_executor.rs @@ -7,9 +7,7 @@ use futures::{ }; use tokio::task::{block_in_place, spawn}; -use parallel_iterator::{ - Consumer, Executor, Folder, IndexedConsumer, IndexedProducer, Producer, Reducer, -}; +use asparit::{Consumer, Executor, Folder, IndexedConsumer, IndexedProducer, Producer, Reducer}; pub struct TokioExecutor; diff --git a/parallel-iterator/src/for_each.rs b/parallel-iterator/src/for_each.rs deleted file mode 100644 index dc50eff..0000000 --- a/parallel-iterator/src/for_each.rs +++ /dev/null @@ -1,61 +0,0 @@ -use super::{no_op::NoOpReducer, Consumer, Folder, ParallelIterator}; - -pub fn for_each(iter: I, f: &F) -where - I: ParallelIterator, - F: Fn(T) + Sync, - T: Send, -{ - let consumer = ForEachConsumer { f }; - - iter.drive(consumer) -} - -struct ForEachConsumer<'f, F> { - f: &'f F, -} - -impl<'f, F, T> Consumer for ForEachConsumer<'f, F> -where - F: Fn(T) + Sync, -{ - type Folder = ForEachConsumer<'f, F>; - type Reducer = NoOpReducer; - type Result = (); - - fn split_off_left(&self) -> (Self, Self::Reducer) { - (Self { f: self.f }, NoOpReducer) - } - - fn into_folder(self) -> Self::Folder { - self - } -} - -impl<'f, F, T> Folder for ForEachConsumer<'f, F> -where - F: Fn(T) + Sync, -{ - type Result = (); - - fn is_full(&self) -> bool { - false - } - - fn complete(self) -> Self::Result {} - - fn consume(self, item: T) -> Self { - (self.f)(item); - - self - } - - fn consume_iter(self, iter: I) -> Self - where - I: IntoIterator, - { - iter.into_iter().for_each(self.f); - - self - } -} diff --git a/parallel-iterator/src/lib.rs b/parallel-iterator/src/lib.rs deleted file mode 100644 index e35506b..0000000 --- a/parallel-iterator/src/lib.rs +++ /dev/null @@ -1,121 +0,0 @@ -mod for_each; -mod no_op; - -pub trait ParallelIterator: Sized + Send { - type Item: Send; - - fn for_each(self, f: F) - where - F: Fn(Self::Item) + Sync + Send, - { - for_each::for_each(self, &f) - } - - fn drive(self, consumer: C) -> C::Result - where - C: Consumer; - - fn len_hint_opt(&self) -> Option { - None - } -} - -pub trait IndexedParallelIterator: ParallelIterator { - fn drive_indexed(self, consumer: C) -> C::Result - where - C: IndexedConsumer; - - fn len_hint(&self) -> usize; -} - -pub trait Producer: Send + Sized { - type Item; - - fn split(self) -> (Self, Option); - - fn fold_with(self, folder: F) -> F - where - F: Folder; -} - -pub trait IndexedProducer: Send + Sized { - type Item; - type IntoIter: Iterator + DoubleEndedIterator + ExactSizeIterator; - - fn into_iter(self) -> Self::IntoIter; - - fn min_len(&self) -> usize { - 1 - } - - fn max_len(&self) -> usize { - usize::MAX - } - - fn split_at(self, index: usize) -> (Self, Self); - - fn fold_with(self, folder: F) -> F - where - F: Folder, - { - folder.consume_iter(self.into_iter()) - } -} - -pub trait Consumer: Send + Sized { - type Folder: Folder; - type Reducer: Reducer + Send; - type Result: Send; - - fn split_off_left(&self) -> (Self, Self::Reducer); - - fn into_folder(self) -> Self::Folder; - - fn is_full(&self) -> bool { - false - } -} - -pub trait IndexedConsumer: Consumer { - fn split_at(self, index: usize) -> (Self, Self, Self::Reducer); -} - -pub trait Folder: Sized { - type Result; - - fn is_full(&self) -> bool; - - fn complete(self) -> Self::Result; - - fn consume(self, item: Item) -> Self; - - fn consume_iter(mut self, iter: I) -> Self - where - I: IntoIterator, - { - for item in iter { - self = self.consume(item); - if self.is_full() { - break; - } - } - - self - } -} - -pub trait Reducer { - fn reduce(self, left: Result, right: Result) -> Result; -} - -pub trait Executor { - fn exec(self, producer: P, consumer: C) -> C::Result - where - P: Producer, - C: Consumer; - - fn exec_indexed(self, producer: P, consumer: C) -> C::Result - where - P: IndexedProducer, - C: IndexedConsumer; -} diff --git a/parallel-iterator/src/no_op.rs b/parallel-iterator/src/no_op.rs deleted file mode 100644 index 0f3f023..0000000 --- a/parallel-iterator/src/no_op.rs +++ /dev/null @@ -1,7 +0,0 @@ -use super::Reducer; - -pub struct NoOpReducer; - -impl Reducer<()> for NoOpReducer { - fn reduce(self, _left: (), _right: ()) {} -}