Browse Source

Merged 'Consumer' and 'IndexedConsumer'

master
Bergmann89 3 years ago
parent
commit
406218f0ae
16 changed files with 125 additions and 169 deletions
  1. +14
    -19
      asparit/src/core/consumer.rs
  2. +3
    -4
      asparit/src/core/executor.rs
  3. +2
    -3
      asparit/src/core/iterator.rs
  4. +1
    -1
      asparit/src/core/mod.rs
  5. +2
    -4
      asparit/src/executor/sequential.rs
  6. +4
    -7
      asparit/src/executor/tokio.rs
  7. +16
    -7
      asparit/src/inner/for_each.rs
  8. +18
    -23
      asparit/src/inner/map.rs
  9. +18
    -27
      asparit/src/inner/map_init.rs
  10. +18
    -27
      asparit/src/inner/map_with.rs
  11. +7
    -9
      asparit/src/inner/noop.rs
  12. +7
    -14
      asparit/src/inner/reduce.rs
  13. +7
    -16
      asparit/src/inner/try_reduce.rs
  14. +2
    -2
      asparit/src/lib.rs
  15. +4
    -4
      asparit/src/std/range.rs
  16. +2
    -2
      async-ecs/src/misc/tokio_executor.rs

+ 14
- 19
asparit/src/core/consumer.rs View File

@@ -24,14 +24,20 @@ pub trait Consumer<I>: Send + Sized {
/// The type of result that this consumer will ultimately produce.
type Result: Send;

/// Splits off a "left" consumer and returns it. The `self`
/// consumer should then be used to consume the "right" portion of
/// the data. (The ordering matters for methods like find_first --
/// values produced by the returned value are given precedence
/// over values produced by `self`.) Once the left and right
/// halves have been fully consumed, you should reduce the results
/// with the result of `to_reducer`.
fn split_off_left(&self) -> (Self, Self::Reducer);
/// Divide the consumer into two consumers, one processing the left items
/// and one processing the right items from. Also produces a reducer that
/// can be used to reduce the results at the end.
fn split(self) -> (Self, Self, Self::Reducer) {
panic!("Consumer could not be used in unindexed mode!");
}

/// Divide the consumer into two consumers, one processing items
/// `0..index` and one processing items from `index..`. Also
/// produces a reducer that can be used to reduce the results at
/// the end.
fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) {
panic!("Consumer could not be used in indexed mode!");
}

/// Convert the consumer into a folder that can consume items
/// sequentially, eventually producing a final result.
@@ -43,14 +49,3 @@ pub trait Consumer<I>: Send + Sized {
false
}
}

/// A stateless consumer can be freely copied. These consumers can be
/// used like regular consumers, but they also support a
/// `split_at` method that does take an index to split.
pub trait IndexedConsumer<I>: Consumer<I> {
/// Divide the consumer into two consumers, one processing items
/// `0..index` and one processing items from `index..`. Also
/// produces a reducer that can be used to reduce the results at
/// the end.
fn split_at(self, index: usize) -> (Self, Self, Self::Reducer);
}

+ 3
- 4
asparit/src/core/executor.rs View File

@@ -1,6 +1,5 @@
use super::{
Consumer, IndexedConsumer, IndexedProducer, IndexedProducerCallback, Producer,
ProducerCallback, Reducer,
Consumer, IndexedProducer, IndexedProducerCallback, Producer, ProducerCallback, Reducer,
};

pub trait Executor<'a, D>
@@ -18,7 +17,7 @@ where
fn exec_indexed<P, C, R>(self, producer: P, consumer: C) -> Self::Result
where
P: IndexedProducer + 'a,
C: IndexedConsumer<P::Item, Result = D, Reducer = R> + 'a,
C: Consumer<P::Item, Result = D, Reducer = R> + 'a,
R: Reducer<D> + Send;
}

@@ -54,7 +53,7 @@ impl<'a, E, D, C, I, R> IndexedProducerCallback<'a, I> for ExecutorCallback<E, C
where
E: Executor<'a, D>,
D: Send,
C: IndexedConsumer<I, Result = D, Reducer = R> + 'a,
C: Consumer<I, Result = D, Reducer = R> + 'a,
R: Reducer<D> + Send,
{
type Output = E::Result;


+ 2
- 3
asparit/src/core/iterator.rs View File

@@ -1,6 +1,5 @@
use super::{
Consumer, Executor, FromParallelIterator, IndexedConsumer, IndexedProducerCallback,
ProducerCallback, Reducer,
Consumer, Executor, FromParallelIterator, IndexedProducerCallback, ProducerCallback, Reducer,
};

use crate::{
@@ -509,7 +508,7 @@ pub trait IndexedParallelIterator<'a>: ParallelIterator<'a> {
fn drive_indexed<E, C, D, R>(self, executor: E, consumer: C) -> E::Result
where
E: Executor<'a, D>,
C: IndexedConsumer<Self::Item, Result = D, Reducer = R> + 'a,
C: Consumer<Self::Item, Result = D, Reducer = R> + 'a,
D: Send,
R: Reducer<D> + Send;



+ 1
- 1
asparit/src/core/mod.rs View File

@@ -8,7 +8,7 @@ mod iterator;
mod producer;
mod reducer;

pub use consumer::{Consumer, IndexedConsumer};
pub use consumer::Consumer;
pub use driver::Driver;
pub use executor::{Executor, ExecutorCallback};
pub use folder::Folder;


+ 2
- 4
asparit/src/executor/sequential.rs View File

@@ -1,6 +1,4 @@
use crate::core::{
Consumer, Executor, Folder, IndexedConsumer, IndexedProducer, Producer, Reducer,
};
use crate::core::{Consumer, Executor, Folder, IndexedProducer, Producer, Reducer};

#[derive(Default)]
pub struct Sequential;
@@ -27,7 +25,7 @@ where
fn exec_indexed<P, C, R>(self, producer: P, consumer: C) -> Self::Result
where
P: IndexedProducer,
C: IndexedConsumer<P::Item, Result = D, Reducer = R>,
C: Consumer<P::Item, Result = D, Reducer = R>,
R: Reducer<D>,
{
if consumer.is_full() {


+ 4
- 7
asparit/src/executor/tokio.rs View File

@@ -7,9 +7,7 @@ use futures::{
};
use tokio::task::spawn;

use crate::core::{
Consumer, Executor, Folder, IndexedConsumer, IndexedProducer, Producer, Reducer,
};
use crate::core::{Consumer, Executor, Folder, IndexedProducer, Producer, Reducer};

pub struct Tokio {
splits: usize,
@@ -50,7 +48,7 @@ where
fn exec_indexed<P, C, R>(self, producer: P, consumer: C) -> Self::Result
where
P: IndexedProducer + 'a,
C: IndexedConsumer<P::Item, Result = D, Reducer = R> + 'a,
C: Consumer<P::Item, Result = D, Reducer = R> + 'a,
R: Reducer<D> + Send,
{
let splits = producer.splits().unwrap_or(self.splits);
@@ -77,8 +75,7 @@ where
} else if splitter.try_split() {
match producer.split() {
(left_producer, Some(right_producer)) => {
let ((left_consumer, reducer), right_consumer) =
(consumer.split_off_left(), consumer);
let (left_consumer, right_consumer, reducer) = consumer.split();

let left = run_as_task(exec(splitter, left_producer, left_consumer));
let right = run_as_task(exec(splitter, right_producer, right_consumer));
@@ -103,7 +100,7 @@ fn exec_indexed<'a, P, C>(
) -> BoxFuture<'a, C::Result>
where
P: IndexedProducer + 'a,
C: IndexedConsumer<P::Item> + 'a,
C: Consumer<P::Item> + 'a,
C::Reducer: Send,
{
async move {


+ 16
- 7
asparit/src/inner/for_each.rs View File

@@ -46,13 +46,22 @@ where
type Reducer = NoOpReducer;
type Result = ();

fn split_off_left(&self) -> (Self, NoOpReducer) {
(
ForEachConsumer {
operation: self.operation.clone(),
},
NoOpReducer,
)
fn split(self) -> (Self, Self, NoOpReducer) {
let left = self;
let right = ForEachConsumer {
operation: left.operation.clone(),
};

(left, right, NoOpReducer)
}

fn split_at(self, _index: usize) -> (Self, Self, NoOpReducer) {
let left = self;
let right = ForEachConsumer {
operation: left.operation.clone(),
};

(left, right, NoOpReducer)
}

fn into_folder(self) -> Self {


+ 18
- 23
asparit/src/inner/map.rs View File

@@ -1,6 +1,6 @@
use crate::{
Consumer, Executor, Folder, IndexedConsumer, IndexedParallelIterator, IndexedProducer,
IndexedProducerCallback, ParallelIterator, Producer, ProducerCallback, Reducer,
Consumer, Executor, Folder, IndexedParallelIterator, IndexedProducer, IndexedProducerCallback,
ParallelIterator, Producer, ProducerCallback, Reducer,
};

/* Map */
@@ -60,7 +60,7 @@ where
fn drive_indexed<E, C, D, R>(self, executor: E, consumer: C) -> E::Result
where
E: Executor<'a, D>,
C: IndexedConsumer<Self::Item, Result = D, Reducer = R> + 'a,
C: Consumer<Self::Item, Result = D, Reducer = R> + 'a,
D: Send,
R: Reducer<D> + Send,
{
@@ -262,10 +262,22 @@ where
type Reducer = C::Reducer;
type Result = C::Result;

fn split_off_left(&self) -> (Self, Self::Reducer) {
let (left, reducer) = self.base.split_off_left();
fn split(self) -> (Self, Self, Self::Reducer) {
let (left, right, reducer) = self.base.split();

let left = MapConsumer::new(left, self.operation.clone());
let right = MapConsumer::new(right, self.operation);

(left, right, reducer)
}

fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
let (left, right, reducer) = self.base.split_at(index);

let left = MapConsumer::new(left, self.operation.clone());
let right = MapConsumer::new(right, self.operation);

(MapConsumer::new(left, self.operation.clone()), reducer)
(left, right, reducer)
}

fn into_folder(self) -> Self::Folder {
@@ -280,23 +292,6 @@ where
}
}

impl<I, T, C, O> IndexedConsumer<I> for MapConsumer<C, O>
where
C: IndexedConsumer<O::Output>,
O: Fn(I) -> T + Clone + Send + Sync,
T: Send,
{
fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
let (left, right, reducer) = self.base.split_at(index);

(
MapConsumer::new(left, self.operation.clone()),
MapConsumer::new(right, self.operation),
reducer,
)
}
}

/* MapFolder */

struct MapFolder<F, O> {


+ 18
- 27
asparit/src/inner/map_init.rs View File

@@ -1,6 +1,6 @@
use crate::{
Consumer, Executor, Folder, IndexedConsumer, IndexedParallelIterator, IndexedProducer,
IndexedProducerCallback, ParallelIterator, Producer, ProducerCallback, Reducer,
Consumer, Executor, Folder, IndexedParallelIterator, IndexedProducer, IndexedProducerCallback,
ParallelIterator, Producer, ProducerCallback, Reducer,
};

use super::map_with::{MapWithFolder, MapWithIter};
@@ -70,7 +70,7 @@ where
fn drive_indexed<E, C, D, R>(self, executor: E, consumer: C) -> E::Result
where
E: Executor<'a, D>,
C: IndexedConsumer<Self::Item, Result = D, Reducer = R> + 'a,
C: Consumer<Self::Item, Result = D, Reducer = R> + 'a,
D: Send,
R: Reducer<D> + Send,
{
@@ -302,13 +302,22 @@ where
type Reducer = C::Reducer;
type Result = C::Result;

fn split_off_left(&self) -> (Self, Self::Reducer) {
let (left, reducer) = self.base.split_off_left();
fn split(self) -> (Self, Self, Self::Reducer) {
let (left, right, reducer) = self.base.split();

(
MapInitConsumer::new(left, self.init.clone(), self.operation.clone()),
reducer,
)
let left = MapInitConsumer::new(left, self.init.clone(), self.operation.clone());
let right = MapInitConsumer::new(right, self.init, self.operation);

(left, right, reducer)
}

fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
let (left, right, reducer) = self.base.split_at(index);

let left = MapInitConsumer::new(left, self.init.clone(), self.operation.clone());
let right = MapInitConsumer::new(right, self.init, self.operation);

(left, right, reducer)
}

fn into_folder(self) -> Self::Folder {
@@ -323,21 +332,3 @@ where
self.base.is_full()
}
}

impl<I, T, C, S, U, O> IndexedConsumer<I> for MapInitConsumer<C, S, O>
where
C: IndexedConsumer<T>,
O: Fn(&mut U, I) -> T + Clone + Send + Sync,
T: Send,
S: Fn() -> U + Clone + Send,
{
fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
let (left, right, reducer) = self.base.split_at(index);

(
MapInitConsumer::new(left, self.init.clone(), self.operation.clone()),
MapInitConsumer::new(right, self.init, self.operation),
reducer,
)
}
}

+ 18
- 27
asparit/src/inner/map_with.rs View File

@@ -1,6 +1,6 @@
use crate::{
Consumer, Executor, Folder, IndexedConsumer, IndexedParallelIterator, IndexedProducer,
IndexedProducerCallback, ParallelIterator, Producer, ProducerCallback, Reducer,
Consumer, Executor, Folder, IndexedParallelIterator, IndexedProducer, IndexedProducerCallback,
ParallelIterator, Producer, ProducerCallback, Reducer,
};

/* MapWith */
@@ -68,7 +68,7 @@ where
fn drive_indexed<E, C, D, R>(self, executor: E, consumer: C) -> E::Result
where
E: Executor<'a, D>,
C: IndexedConsumer<Self::Item, Result = D, Reducer = R> + 'a,
C: Consumer<Self::Item, Result = D, Reducer = R> + 'a,
D: Send,
R: Reducer<D> + Send,
{
@@ -345,13 +345,22 @@ where
type Reducer = C::Reducer;
type Result = C::Result;

fn split_off_left(&self) -> (Self, Self::Reducer) {
let (left, reducer) = self.base.split_off_left();
fn split(self) -> (Self, Self, Self::Reducer) {
let (left, right, reducer) = self.base.split();

(
MapWithConsumer::new(left, self.item.clone(), self.operation.clone()),
reducer,
)
let left = MapWithConsumer::new(left, self.item.clone(), self.operation.clone());
let right = MapWithConsumer::new(right, self.item, self.operation);

(left, right, reducer)
}

fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
let (left, right, reducer) = self.base.split_at(index);

let left = MapWithConsumer::new(left, self.item.clone(), self.operation.clone());
let right = MapWithConsumer::new(right, self.item, self.operation);

(left, right, reducer)
}

fn into_folder(self) -> Self::Folder {
@@ -367,24 +376,6 @@ where
}
}

impl<I, T, C, S, O> IndexedConsumer<I> for MapWithConsumer<C, S, O>
where
C: IndexedConsumer<T>,
O: Fn(&mut S, I) -> T + Clone + Send + Sync,
T: Send,
S: Clone + Send,
{
fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
let (left, right, reducer) = self.base.split_at(index);

(
MapWithConsumer::new(left, self.item.clone(), self.operation.clone()),
MapWithConsumer::new(right, self.item, self.operation),
reducer,
)
}
}

/* MapWithFolder */

pub struct MapWithFolder<F, S, O> {


+ 7
- 9
asparit/src/inner/noop.rs View File

@@ -1,4 +1,4 @@
use crate::{Consumer, Folder, IndexedConsumer, Reducer};
use crate::{Consumer, Folder, Reducer};

pub struct NoOpConsumer;

@@ -7,8 +7,12 @@ impl<T> Consumer<T> for NoOpConsumer {
type Reducer = NoOpReducer;
type Result = ();

fn split_off_left(&self) -> (Self, Self::Reducer) {
(NoOpConsumer, NoOpReducer)
fn split(self) -> (Self, Self, Self::Reducer) {
(NoOpConsumer, NoOpConsumer, NoOpReducer)
}

fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) {
(NoOpConsumer, NoOpConsumer, NoOpReducer)
}

fn into_folder(self) -> Self {
@@ -38,12 +42,6 @@ impl<T> Folder<T> for NoOpConsumer {
fn complete(self) {}
}

impl<T> IndexedConsumer<T> for NoOpConsumer {
fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) {
(NoOpConsumer, NoOpConsumer, NoOpReducer)
}
}

pub struct NoOpReducer;

impl Reducer<()> for NoOpReducer {


+ 7
- 14
asparit/src/inner/reduce.rs View File

@@ -1,4 +1,4 @@
use crate::{core::Driver, Consumer, Executor, Folder, IndexedConsumer, ParallelIterator, Reducer};
use crate::{core::Driver, Consumer, Executor, Folder, ParallelIterator, Reducer};

pub struct Reduce<X, S, O> {
iterator: X,
@@ -69,8 +69,12 @@ where
type Reducer = Self;
type Result = T;

fn split_off_left(&self) -> (Self, Self::Reducer) {
(self.clone(), self.clone())
fn split(self) -> (Self, Self, Self::Reducer) {
(self.clone(), self.clone(), self)
}

fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) {
(self.clone(), self.clone(), self)
}

fn into_folder(self) -> Self::Folder {
@@ -81,17 +85,6 @@ where
}
}

impl<S, O, T> IndexedConsumer<T> for ReduceConsumer<S, O>
where
S: Fn() -> T + Clone + Send,
O: Fn(T, T) -> T + Clone + Send,
T: Send,
{
fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) {
(self.clone(), self.clone(), self)
}
}

impl<S, O, T> Reducer<T> for ReduceConsumer<S, O>
where
O: Fn(T, T) -> T,


+ 7
- 16
asparit/src/inner/try_reduce.rs View File

@@ -3,9 +3,7 @@ use std::sync::{
Arc,
};

use crate::{
core::Driver, misc::Try, Consumer, Executor, Folder, IndexedConsumer, ParallelIterator, Reducer,
};
use crate::{core::Driver, misc::Try, Consumer, Executor, Folder, ParallelIterator, Reducer};

pub struct TryReduce<X, S, O> {
iterator: X,
@@ -80,8 +78,12 @@ where
type Reducer = Self;
type Result = T;

fn split_off_left(&self) -> (Self, Self::Reducer) {
(self.clone(), self.clone())
fn split(self) -> (Self, Self, Self::Reducer) {
(self.clone(), self.clone(), self)
}

fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) {
(self.clone(), self.clone(), self)
}

fn into_folder(self) -> Self::Folder {
@@ -93,17 +95,6 @@ where
}
}

impl<S, O, T> IndexedConsumer<T> for TryReduceConsumer<S, O>
where
S: Fn() -> T::Ok + Clone + Send,
O: Fn(T::Ok, T::Ok) -> T + Clone + Send,
T: Try + Send,
{
fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) {
(self.clone(), self.clone(), self)
}
}

impl<S, O, T> Reducer<T> for TryReduceConsumer<S, O>
where
O: Fn(T::Ok, T::Ok) -> T,


+ 2
- 2
asparit/src/lib.rs View File

@@ -5,8 +5,8 @@ mod misc;
mod std;

pub use self::core::{
Consumer, Driver, Executor, ExecutorCallback, Folder, IndexedConsumer, IndexedParallelIterator,
IndexedProducer, IndexedProducerCallback, IntoParallelIterator, IntoParallelRefIterator,
Consumer, Driver, Executor, ExecutorCallback, Folder, IndexedParallelIterator, IndexedProducer,
IndexedProducerCallback, IntoParallelIterator, IntoParallelRefIterator,
IntoParallelRefMutIterator, ParallelIterator, Producer, ProducerCallback, Reducer,
};
pub use self::executor::{DefaultExecutor, SequentialExecutor};

+ 4
- 4
asparit/src/std/range.rs View File

@@ -1,9 +1,9 @@
use std::ops::Range;

use crate::{
Consumer, Executor, ExecutorCallback, IndexedConsumer, IndexedParallelIterator,
IndexedProducer, IndexedProducerCallback, IntoParallelIterator, ParallelIterator, Producer,
ProducerCallback, Reducer,
Consumer, Executor, ExecutorCallback, IndexedParallelIterator, IndexedProducer,
IndexedProducerCallback, IntoParallelIterator, ParallelIterator, Producer, ProducerCallback,
Reducer,
};

/// Parallel iterator over a range, implemented for all integer types.
@@ -143,7 +143,7 @@ macro_rules! indexed_parallel_iterator_impl {
fn drive_indexed<E, C, D, R>(self, executor: E, consumer: C) -> E::Result
where
E: Executor<'a, D>,
C: IndexedConsumer<Self::Item, Result = D, Reducer = R> + 'a,
C: Consumer<Self::Item, Result = D, Reducer = R> + 'a,
D: Send,
R: Reducer<D> + Send,
{


+ 2
- 2
async-ecs/src/misc/tokio_executor.rs View File

@@ -7,7 +7,7 @@ use futures::{
};
use tokio::task::{block_in_place, spawn};

use asparit::{Consumer, Executor, Folder, IndexedConsumer, IndexedProducer, Producer, Reducer};
use asparit::{Consumer, Executor, Folder, IndexedProducer, Producer, Reducer};

pub struct TokioExecutor;

@@ -23,7 +23,7 @@ impl Executor for TokioExecutor {
fn exec_indexed<P, C>(self, producer: P, consumer: C) -> C::Result
where
P: IndexedProducer,
C: IndexedConsumer<P::Item>,
C: Consumer<P::Item>,
{
let _producer = producer;
let _consumer = consumer;


Loading…
Cancel
Save