Browse Source

Implemented basics of parallel iterator

master
Bergmann89 3 years ago
parent
commit
f89258cbfe
20 changed files with 1027 additions and 44 deletions
  1. +3
    -1
      Cargo.toml
  2. +2
    -0
      async-ecs/Cargo.toml
  3. +22
    -1
      async-ecs/src/access/join.rs
  4. +2
    -0
      async-ecs/src/access/mod.rs
  5. +113
    -0
      async-ecs/src/access/par_join.rs
  6. +3
    -1
      async-ecs/src/lib.rs
  7. +39
    -26
      async-ecs/src/main.rs
  8. +302
    -0
      async-ecs/src/misc/bit_average.rs
  9. +100
    -0
      async-ecs/src/misc/bit_iter.rs
  10. +93
    -0
      async-ecs/src/misc/bit_producer.rs
  11. +8
    -0
      async-ecs/src/misc/mod.rs
  12. +90
    -0
      async-ecs/src/misc/tokio_executor.rs
  13. +26
    -0
      async-ecs/src/storage/anti_storage.rs
  14. +4
    -0
      async-ecs/src/storage/mod.rs
  15. +21
    -14
      async-ecs/src/storage/storage_wrapper.rs
  16. +3
    -1
      async-ecs/src/storage/vec_storage.rs
  17. +7
    -0
      parallel-iterator/Cargo.toml
  18. +61
    -0
      parallel-iterator/src/for_each.rs
  19. +121
    -0
      parallel-iterator/src/lib.rs
  20. +7
    -0
      parallel-iterator/src/no_op.rs

+ 3
- 1
Cargo.toml View File

@@ -1,8 +1,10 @@
[workspace]
members = [
"async-ecs",
"async-ecs-derive"
"async-ecs-derive",
"parallel-iterator"
]

[patch.crates-io]
async-ecs-derive = { path = "async-ecs-derive" }
parallel-iterator = { path = "parallel-iterator" }

+ 2
- 0
async-ecs/Cargo.toml View File

@@ -12,6 +12,8 @@ hashbrown = "0.9"
hibitset = { version = "0.6", default-features = false }
log = "0.4"
mopa = "0.2"
num_cpus = "1.13"
parallel-iterator = "0.1"
rand = "0.7"
thiserror = "1.0"
tokio = { version = "0.3", features = [ "full", "net", "time", "rt-multi-thread" ] }

+ 22
- 1
async-ecs/src/access/join.rs View File

@@ -9,7 +9,7 @@ use crate::{
resource::{Ref, RefMut, Resource},
};

use super::{read::Read, write::Write};
use super::{read::Read, write::Write, ParJoin};

pub trait Join {
type Type;
@@ -69,6 +69,8 @@ where
}
}

impl<T> ParJoin for MaybeJoin<T> where T: ParJoin {}

pub struct JoinIter<J: Join> {
keys: BitIter<J::Mask>,
values: J::Value,
@@ -157,6 +159,11 @@ macro_rules! define_tuple_join {
unconstrained
}
}

impl<$($from,)*> ParJoin for ($($from),*,)
where $($from: ParJoin),*,
($(<$from as Join>::Mask,)*): BitAnd,
{}
}
}

@@ -201,6 +208,13 @@ macro_rules! define_mutable_join {
<&'a mut T as Join>::is_unconstrained()
}
}

impl<'a, 'b, T> ParJoin for &'a mut $ty
where
&'a mut T: ParJoin,
T: Resource,
{
}
};
}

@@ -231,6 +245,13 @@ macro_rules! define_immutable_join {
<&'a T as Join>::is_unconstrained()
}
}

impl<'a, 'b, T> ParJoin for &'a $ty
where
&'a T: ParJoin,
T: Resource,
{
}
};
}



+ 2
- 0
async-ecs/src/access/mod.rs View File

@@ -1,5 +1,6 @@
pub mod accessor;
pub mod join;
pub mod par_join;
pub mod read;
pub mod read_storage;
pub mod write;
@@ -7,6 +8,7 @@ pub mod write_storage;

pub use accessor::{Accessor, AccessorCow, AccessorType, StaticAccessor};
pub use join::Join;
pub use par_join::{JoinParIter, ParJoin};
pub use read::Read;
pub use read_storage::ReadStorage;
pub use write::Write;


+ 113
- 0
async-ecs/src/access/par_join.rs View File

@@ -0,0 +1,113 @@
use std::cell::UnsafeCell;

use hibitset::BitSetLike;
use parallel_iterator::{Consumer, Executor, Folder, ParallelIterator, Producer};

use crate::misc::{BitIter, BitProducer, TokioExecutor};

use super::Join;

pub trait ParJoin: Join {
fn par_join(self) -> JoinParIter<Self>
where
Self: Sized,
{
if <Self as Join>::is_unconstrained() {
log::warn!(
"`ParJoin` possibly iterating through all indices, you might've made a join with all `MaybeJoin`s, which is unbounded in length."
);
}

JoinParIter(self)
}
}

pub struct JoinParIter<J>(J);

impl<J> ParallelIterator for JoinParIter<J>
where
J: Join + Send,
J::Type: Send,
J::Mask: Send + Sync,
J::Value: Send,
{
type Item = J::Type;

fn drive<C>(self, consumer: C) -> C::Result
where
C: Consumer<Self::Item>,
{
let (keys, values) = self.0.open();
let values = UnsafeCell::new(values);

let keys = BitIter::new(&keys, [0, 0, 0, keys.layer3()], [0; 3]);

let producer = BitProducer::new(keys, 3);
let producer = JoinProducer::<J>::new(producer, &values);

TokioExecutor.exec(producer, consumer)
}
}

struct JoinProducer<'a, J>
where
J: Join + Send,
J::Type: Send,
J::Mask: Send,
J::Value: Send,
{
pub(crate) keys: BitProducer<'a, J::Mask>,
pub(crate) values: &'a UnsafeCell<J::Value>,
}

impl<'a, J> JoinProducer<'a, J>
where
J: Join + Send,
J::Type: Send,
J::Mask: Send,
J::Value: Send,
{
fn new(keys: BitProducer<'a, J::Mask>, values: &'a UnsafeCell<J::Value>) -> Self {
JoinProducer { keys, values }
}
}

unsafe impl<'a, J> Send for JoinProducer<'a, J>
where
J: Join + Send,
J::Type: Send,
J::Value: Send + 'a,
J::Mask: Send + Sync + 'a,
{
}

impl<'a, J> Producer for JoinProducer<'a, J>
where
J: Join + Send,
J::Type: Send,
J::Value: Send + 'a,
J::Mask: Send + Sync + 'a,
{
type Item = J::Type;

fn split(self) -> (Self, Option<Self>) {
let (cur, other) = self.keys.split();
let values = self.values;
let first = JoinProducer::new(cur, values);
let second = other.map(|o| JoinProducer::new(o, values));

(first, second)
}

fn fold_with<F>(self, folder: F) -> F
where
F: Folder<Self::Item>,
{
let JoinProducer { values, keys, .. } = self;
let iter = keys
.iter
.map(|idx| unsafe { J::get(&mut *values.get(), idx) });

folder.consume_iter(iter)
}
}

+ 3
- 1
async-ecs/src/lib.rs View File

@@ -11,7 +11,9 @@ pub mod storage;
pub mod system;
pub mod world;

pub use access::{Join, ReadStorage, WriteStorage};
pub use parallel_iterator::ParallelIterator;

pub use access::{Join, ParJoin, ReadStorage, WriteStorage};
pub use dispatcher::Dispatcher;
pub use resource::Resources;
pub use storage::VecStorage;


+ 39
- 26
async-ecs/src/main.rs View File

@@ -2,8 +2,8 @@ use std::io::Error as IoError;
use std::time::{Duration, Instant};

use async_ecs::{
dispatcher::Error as DispatcherError, AsyncSystem, Dispatcher, Join, ReadStorage, System,
VecStorage, World, WriteStorage,
dispatcher::Error as DispatcherError, AsyncSystem, Dispatcher, Join, ParJoin, ParallelIterator,
ReadStorage, System, VecStorage, World, WriteStorage,
};
use async_ecs_derive::Component;
use log::info;
@@ -17,14 +17,17 @@ fn main() -> Result<(), Error> {
.format_timestamp_nanos()
.init();

Builder::new_multi_thread()
let rt = Builder::new_multi_thread()
.worker_threads(num_cpus::get() + 4)
.enable_all()
.build()?
.block_on(async move { LocalSet::new().run_until(run()).await })
.build()?;

rt.block_on(async move { LocalSet::new().run_until(run()).await })
}

async fn run() -> Result<(), Error> {
info!("Application started!");
info!(" num_cpus = {}", num_cpus::get());

let mut world = World::default();
world.register_component::<Position>();
@@ -43,8 +46,8 @@ async fn run() -> Result<(), Error> {
info!("World initialized!");

let mut dispatcher = Dispatcher::builder()
.with(Move, "move", &[])?
.with(Accelerate, "accelerate", &["move"])?
.with(ParMove, "move", &[])?
.with(ParAccelerate, "accelerate", &["move"])?
.build();

info!("Setup done!");
@@ -72,7 +75,7 @@ async fn run() -> Result<(), Error> {
}

const ENTITY_COUNT: usize = 1_000_000;
const REPEAT_COUNT: u32 = 10;
const REPEAT_COUNT: u32 = 100;

#[derive(Debug, Component)]
#[storage(VecStorage)]
@@ -95,8 +98,11 @@ struct Acceleration {
y: f64,
}

struct Move;
struct Accelerate;
struct SeqMove;
struct SeqAccelerate;

struct ParMove;
struct ParAccelerate;

#[derive(Debug, Error)]
enum Error {
@@ -134,7 +140,7 @@ impl Acceleration {
}
}

impl<'a> System<'a> for Move {
impl<'a> System<'a> for SeqMove {
type SystemData = (WriteStorage<'a, Position>, ReadStorage<'a, Velocity>);

fn run(&mut self, (mut position, velocity): Self::SystemData) {
@@ -142,21 +148,10 @@ impl<'a> System<'a> for Move {
position.x += velocity.x;
position.y += velocity.y;
}

/*
use specs::{prelude::ParallelIterator, ParJoin};

(&mut position, &velocity)
.par_join()
.for_each(|(position, velocity)| {
position.x += velocity.x;
position.y += velocity.y;
});
*/
}
}

impl<'a> AsyncSystem<'a> for Accelerate {
impl<'a> AsyncSystem<'a> for SeqAccelerate {
type SystemData = (WriteStorage<'a, Velocity>, ReadStorage<'a, Acceleration>);
type Future = futures::future::Ready<()>;

@@ -167,17 +162,35 @@ impl<'a> AsyncSystem<'a> for Accelerate {
}

futures::future::ready(())
}
}

/*
use specs::{prelude::ParallelIterator, ParJoin};
impl<'a> System<'a> for ParMove {
type SystemData = (WriteStorage<'a, Position>, ReadStorage<'a, Velocity>);

fn run(&mut self, (mut position, velocity): Self::SystemData) {
(&mut position, &velocity)
.par_join()
.for_each(|(position, velocity)| {
position.x += velocity.x;
position.y += velocity.y;
});
}
}

impl<'a> AsyncSystem<'a> for ParAccelerate {
type SystemData = (WriteStorage<'a, Velocity>, ReadStorage<'a, Acceleration>);
type Future = futures::future::Ready<()>;

fn run(&mut self, (mut velocity, acceleration): Self::SystemData) -> Self::Future {
(&mut velocity, &acceleration)
.par_join()
.for_each(|(velocity, acceleration)| {
velocity.x += acceleration.x;
velocity.y += acceleration.y;
});
*/

futures::future::ready(())
}
}



+ 302
- 0
async-ecs/src/misc/bit_average.rs View File

@@ -0,0 +1,302 @@
pub fn bit_average(n: usize) -> Option<usize> {
#[cfg(target_pointer_width = "64")]
let average = bit_average_u64(n as u64).map(|n| n as usize);

#[cfg(target_pointer_width = "32")]
let average = bit_average_u32(n as u32).map(|n| n as usize);

average
}

#[allow(clippy::many_single_char_names)]
#[cfg(any(test, target_pointer_width = "32"))]
fn bit_average_u32(n: u32) -> Option<u32> {
const PAR: [u32; 5] = [!0 / 0x3, !0 / 0x5, !0 / 0x11, !0 / 0x101, !0 / 0x10001];

let a = n - ((n >> 1) & PAR[0]);
let b = (a & PAR[1]) + ((a >> 2) & PAR[1]);
let c = (b + (b >> 4)) & PAR[2];
let d = (c + (c >> 8)) & PAR[3];

let mut cur = d >> 16;
let count = (d + cur) & PAR[4];

if count <= 1 {
return None;
}

let mut target = count / 2;
let mut result = 32;

{
let mut descend = |child, child_stride, child_mask| {
if cur < target {
result -= 2 * child_stride;
target -= cur;
}

cur = (child >> (result - child_stride)) & child_mask;
};

descend(c, 8, 16 - 1); // PAR[3]
descend(b, 4, 8 - 1); // PAR[2]
descend(a, 2, 4 - 1); // PAR[1]
descend(n, 1, 2 - 1); // PAR[0]
}

if cur < target {
result -= 1;
}

Some(result - 1)
}

#[allow(clippy::many_single_char_names)]
#[cfg(any(test, target_pointer_width = "64"))]
fn bit_average_u64(n: u64) -> Option<u64> {
const PAR: [u64; 6] = [
!0 / 0x3,
!0 / 0x5,
!0 / 0x11,
!0 / 0x101,
!0 / 0x10001,
!0 / 0x100000001,
];

let a = n - ((n >> 1) & PAR[0]);
let b = (a & PAR[1]) + ((a >> 2) & PAR[1]);
let c = (b + (b >> 4)) & PAR[2];
let d = (c + (c >> 8)) & PAR[3];
let e = (d + (d >> 16)) & PAR[4];

let mut cur = e >> 32;
let count = (e + cur) & PAR[5];

if count <= 1 {
return None;
}

let mut target = count / 2;
let mut result = 64;

{
let mut descend = |child, child_stride, child_mask| {
if cur < target {
result -= 2 * child_stride;
target -= cur;
}

cur = (child >> (result - child_stride)) & child_mask;
};

descend(d, 16, 256 - 1); // PAR[4]
descend(c, 8, 16 - 1); // PAR[3]
descend(b, 4, 8 - 1); // PAR[2]
descend(a, 2, 4 - 1); // PAR[1]
descend(n, 1, 2 - 1); // PAR[0]
}

if cur < target {
result -= 1;
}

Some(result - 1)
}

#[cfg(test)]
mod test_bit_average {
use super::*;

#[test]
fn parity_0_bit_average_u32() {
struct EvenParity(u32);

impl Iterator for EvenParity {
type Item = u32;
fn next(&mut self) -> Option<Self::Item> {
if self.0 == u32::max_value() {
return None;
}
self.0 += 1;
while self.0.count_ones() & 1 != 0 {
if self.0 == u32::max_value() {
return None;
}
self.0 += 1;
}
Some(self.0)
}
}

let steps = 1000;
for i in 0..steps {
let pos = i * (u32::max_value() / steps);
for i in EvenParity(pos).take(steps as usize) {
let mask = (1 << bit_average_u32(i).unwrap_or(31)) - 1;
assert_eq!((i & mask).count_ones(), (i & !mask).count_ones(), "{:x}", i);
}
}
}

#[test]
fn parity_1_bit_average_u32() {
struct OddParity(u32);

impl Iterator for OddParity {
type Item = u32;
fn next(&mut self) -> Option<Self::Item> {
if self.0 == u32::max_value() {
return None;
}
self.0 += 1;
while self.0.count_ones() & 1 == 0 {
if self.0 == u32::max_value() {
return None;
}
self.0 += 1;
}
Some(self.0)
}
}

let steps = 1000;
for i in 0..steps {
let pos = i * (u32::max_value() / steps);
for i in OddParity(pos).take(steps as usize) {
let mask = (1 << bit_average_u32(i).unwrap_or(31)) - 1;
let a = (i & mask).count_ones();
let b = (i & !mask).count_ones();
if a < b {
assert_eq!(a + 1, b, "{:x}", i);
} else if b < a {
assert_eq!(a, b + 1, "{:x}", i);
} else {
panic!("Odd parity shouldn't split in exactly half");
}
}
}
}

#[test]
fn empty_bit_average_u32() {
assert_eq!(None, bit_average_u32(0));
}

#[test]
fn singleton_bit_average_u32() {
for i in 0..32 {
assert_eq!(None, bit_average_u32(1 << i), "{:x}", i);
}
}

#[test]
fn parity_0_bit_average_u64() {
struct EvenParity(u64);

impl Iterator for EvenParity {
type Item = u64;
fn next(&mut self) -> Option<Self::Item> {
if self.0 == u64::max_value() {
return None;
}
self.0 += 1;
while self.0.count_ones() & 1 != 0 {
if self.0 == u64::max_value() {
return None;
}
self.0 += 1;
}
Some(self.0)
}
}

let steps = 1000;
for i in 0..steps {
let pos = i * (u64::max_value() / steps);
for i in EvenParity(pos).take(steps as usize) {
let mask = (1 << bit_average_u64(i).unwrap_or(63)) - 1;
assert_eq!((i & mask).count_ones(), (i & !mask).count_ones(), "{:x}", i);
}
}
}

#[test]
fn parity_1_bit_average_u64() {
struct OddParity(u64);

impl Iterator for OddParity {
type Item = u64;
fn next(&mut self) -> Option<Self::Item> {
if self.0 == u64::max_value() {
return None;
}
self.0 += 1;
while self.0.count_ones() & 1 == 0 {
if self.0 == u64::max_value() {
return None;
}
self.0 += 1;
}
Some(self.0)
}
}

let steps = 1000;
for i in 0..steps {
let pos = i * (u64::max_value() / steps);
for i in OddParity(pos).take(steps as usize) {
let mask = (1 << bit_average_u64(i).unwrap_or(63)) - 1;
let a = (i & mask).count_ones();
let b = (i & !mask).count_ones();
if a < b {
assert_eq!(a + 1, b, "{:x}", i);
} else if b < a {
assert_eq!(a, b + 1, "{:x}", i);
} else {
panic!("Odd parity shouldn't split in exactly half");
}
}
}
}

#[test]
fn empty_bit_average_u64() {
assert_eq!(None, bit_average_u64(0));
}

#[test]
fn singleton_bit_average_u64() {
for i in 0..64 {
assert_eq!(None, bit_average_u64(1 << i), "{:x}", i);
}
}

#[test]
fn bit_average_agree_u32_u64() {
let steps = 1000;
for i in 0..steps {
let pos = i * (u32::max_value() / steps);
for i in pos..steps {
assert_eq!(
bit_average_u32(i),
bit_average_u64(i as u64).map(|n| n as u32),
"{:x}",
i
);
}
}
}

#[test]
fn specific_values() {
assert_eq!(Some(4), bit_average_u32(0b10110));
assert_eq!(Some(5), bit_average_u32(0b100010));
assert_eq!(None, bit_average_u32(0));
assert_eq!(None, bit_average_u32(1));

assert_eq!(Some(4), bit_average_u64(0b10110));
assert_eq!(Some(5), bit_average_u64(0b100010));
assert_eq!(None, bit_average_u64(0));
assert_eq!(None, bit_average_u64(1));
}
}

+ 100
- 0
async-ecs/src/misc/bit_iter.rs View File

@@ -0,0 +1,100 @@
use hibitset::BitSetLike;

#[derive(Debug, Clone)]
pub struct BitIter<T> {
pub(crate) set: T,
pub(crate) masks: [usize; LAYERS],
pub(crate) prefix: [u32; LAYERS - 1],
}

impl<T> BitIter<T> {
pub fn new(set: T, masks: [usize; LAYERS], prefix: [u32; LAYERS - 1]) -> Self {
Self { set, masks, prefix }
}
}

impl<T: BitSetLike> BitIter<T> {
pub fn contains(&self, i: u32) -> bool {
self.set.contains(i)
}
}

#[derive(PartialEq)]
pub(crate) enum State {
Empty,
Continue,
Value(u32),
}

impl<T> Iterator for BitIter<T>
where
T: BitSetLike,
{
type Item = u32;

fn next(&mut self) -> Option<Self::Item> {
use self::State::*;

'find: loop {
for level in 0..LAYERS {
match self.handle_level(level) {
Value(v) => return Some(v),
Continue => continue 'find,
Empty => {}
}
}

return None;
}
}
}

impl<T: BitSetLike> BitIter<T> {
pub(crate) fn handle_level(&mut self, level: usize) -> State {
use self::State::*;

if self.masks[level] == 0 {
Empty
} else {
let first_bit = self.masks[level].trailing_zeros();
self.masks[level] &= !(1 << first_bit);

let idx = self.prefix.get(level).cloned().unwrap_or(0) | first_bit;

if level == 0 {
Value(idx)
} else {
self.masks[level - 1] = self.set.get_from_layer(level - 1, idx as usize);
self.prefix[level - 1] = idx << BITS;

Continue
}
}
}
}

const LAYERS: usize = 4;

#[cfg(target_pointer_width = "64")]
pub const BITS: usize = 6;

#[cfg(target_pointer_width = "32")]
pub const BITS: usize = 5;

#[cfg(test)]
mod tests {
use hibitset::{BitSet, BitSetLike};

#[test]
fn iterator_clone() {
let mut set = BitSet::new();

set.add(1);
set.add(3);

let iter = set.iter().skip(1);
for (a, b) in iter.clone().zip(iter) {
assert_eq!(a, b);
}
}
}

+ 93
- 0
async-ecs/src/misc/bit_producer.rs View File

@@ -0,0 +1,93 @@
use hibitset::BitSetLike;
use parallel_iterator::{Folder, Producer};

use crate::misc::bit_average;

use super::BitIter;

pub struct BitProducer<'a, T> {
pub(crate) iter: BitIter<&'a T>,
pub(crate) splits: u8,
}

impl<'a, T> BitProducer<'a, T>
where
T: BitSetLike,
{
pub fn new(iter: BitIter<&'a T>, splits: u8) -> Self {
Self { iter, splits }
}

fn handle_level(&mut self, level: usize) -> Option<Self> {
if self.iter.masks[level] == 0 {
None
} else {
let level_prefix = self.iter.prefix.get(level).cloned().unwrap_or(0);
let first_bit = self.iter.masks[level].trailing_zeros();

bit_average(self.iter.masks[level])
.map(|average_bit| {
let mask = (1 << average_bit) - 1;
let mut other = BitProducer::new(
BitIter::new(self.iter.set, [0; LAYERS], [0; LAYERS - 1]),
self.splits,
);

other.iter.masks[level] = self.iter.masks[level] & !mask;
other.iter.prefix[level - 1] = (level_prefix | average_bit as u32) << BITS;
other.iter.prefix[level..].copy_from_slice(&self.iter.prefix[level..]);

self.iter.masks[level] &= mask;
self.iter.prefix[level - 1] = (level_prefix | first_bit) << BITS;

other
})
.or_else(|| {
let idx = level_prefix as usize | first_bit as usize;

self.iter.prefix[level - 1] = (idx as u32) << BITS;
self.iter.masks[level] = 0;
self.iter.masks[level - 1] = self.iter.set.get_from_layer(level - 1, idx);

None
})
}
}
}

impl<'a, T> Producer for BitProducer<'a, T>
where
T: BitSetLike + Send + Sync,
{
type Item = u32;

fn split(mut self) -> (Self, Option<Self>) {
let other = {
let top_layer = LAYERS - 1;
let mut h = self.handle_level(top_layer);

for i in 1..self.splits {
h = h.or_else(|| self.handle_level(top_layer - i as usize));
}

h
};

(self, other)
}

fn fold_with<F>(self, folder: F) -> F
where
F: Folder<Self::Item>,
{
folder.consume_iter(self.iter)
}
}

const LAYERS: usize = 4;

#[cfg(target_pointer_width = "64")]
pub const BITS: usize = 6;

#[cfg(target_pointer_width = "32")]
pub const BITS: usize = 5;

+ 8
- 0
async-ecs/src/misc/mod.rs View File

@@ -1,7 +1,15 @@
pub mod bit_and;
pub mod bit_average;
pub mod bit_iter;
pub mod bit_producer;
pub mod split;
pub mod tokio_executor;
pub mod try_default;

pub use bit_and::BitAnd;
pub use bit_average::bit_average;
pub use bit_iter::BitIter;
pub use bit_producer::BitProducer;
pub use split::Split;
pub use tokio_executor::TokioExecutor;
pub use try_default::TryDefault;

+ 90
- 0
async-ecs/src/misc/tokio_executor.rs View File

@@ -0,0 +1,90 @@
use std::mem::transmute;

use futures::{
executor::block_on,
future::{BoxFuture, Future, FutureExt},
join,
};
use tokio::task::{block_in_place, spawn};

use parallel_iterator::{
Consumer, Executor, Folder, IndexedConsumer, IndexedProducer, Producer, Reducer,
};

pub struct TokioExecutor;

impl Executor for TokioExecutor {
fn exec<P, C>(self, producer: P, consumer: C) -> C::Result
where
P: Producer,
C: Consumer<P::Item>,
{
block_in_place(|| block_on(exec(2 * num_cpus::get(), producer, consumer)))
}

fn exec_indexed<P, C>(self, producer: P, consumer: C) -> C::Result
where
P: IndexedProducer,
C: IndexedConsumer<P::Item>,
{
let _producer = producer;
let _consumer = consumer;

unimplemented!()
}
}

fn exec<'a, P, C>(splits: usize, producer: P, consumer: C) -> BoxFuture<'a, C::Result>
where
P: Producer + 'a,
C: Consumer<P::Item> + 'a,
{
async move {
if consumer.is_full() {
consumer.into_folder().complete()
} else if splits > 0 {
match producer.split() {
(left_producer, Some(right_producer)) => {
let ((left_consumer, reducer), right_consumer) =
(consumer.split_off_left(), consumer);

let left = run_as_task(exec(splits / 2, left_producer, left_consumer));
let right = run_as_task(exec(splits / 2, right_producer, right_consumer));

let (left_result, right_result) = join!(left, right);

reducer.reduce(left_result, right_result)
}
(producer, None) => producer.fold_with(consumer.into_folder()).complete(),
}
} else {
producer.fold_with(consumer.into_folder()).complete()
}
}
.boxed()
}

struct Pointer<T>(*mut T);

unsafe impl<T> Send for Pointer<T> {}

async fn run_as_task<'a, T, F>(f: F) -> T
where
T: Send + 'a,
F: Future<Output = T> + Send + 'a,
{
let mut result = None;
let r = Pointer(&mut result as *mut _);

let task: BoxFuture<'a, ()> = async move {
unsafe {
*r.0 = Some(f.await);
}
}
.boxed();
let task: BoxFuture<'static, ()> = unsafe { transmute(task) };

spawn(task).await.expect("Error in iterator executor");

result.unwrap()
}

+ 26
- 0
async-ecs/src/storage/anti_storage.rs View File

@@ -0,0 +1,26 @@
use hibitset::{BitSet, BitSetNot};

use crate::{
access::{Join, ParJoin},
entity::Index,
};

use super::DistinctStorage;

pub struct AntiStorage<'a>(pub &'a BitSet);

impl<'a> DistinctStorage for AntiStorage<'a> {}

impl<'a> Join for AntiStorage<'a> {
type Mask = BitSetNot<&'a BitSet>;
type Type = ();
type Value = ();

fn open(self) -> (Self::Mask, ()) {
(BitSetNot(self.0), ())
}

fn get(_: &mut (), _: Index) {}
}

impl<'a> ParJoin for AntiStorage<'a> {}

+ 4
- 0
async-ecs/src/storage/mod.rs View File

@@ -1,7 +1,9 @@
mod anti_storage;
mod masked_storage;
mod storage_wrapper;
mod vec_storage;

pub use anti_storage::AntiStorage;
pub use masked_storage::MaskedStorage;
pub use storage_wrapper::StorageWrapper;
pub use vec_storage::VecStorage;
@@ -13,3 +15,5 @@ pub trait Storage<T>: TryDefault {
fn get_mut(&mut self, index: Index) -> &mut T;
fn insert(&mut self, index: Index, value: T);
}

pub trait DistinctStorage {}

+ 21
- 14
async-ecs/src/storage/storage_wrapper.rs View File

@@ -1,10 +1,10 @@
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut, Not};

use hibitset::{BitSet, BitSetNot};
use hibitset::BitSet;

use crate::{
access::Join,
access::{Join, ParJoin},
component::Component,
entity::{Entities, Entity, Index},
error::Error,
@@ -12,7 +12,7 @@ use crate::{
storage::MaskedStorage,
};

use super::Storage;
use super::{AntiStorage, DistinctStorage, Storage};

pub struct StorageWrapper<'a, T, D> {
data: D,
@@ -20,8 +20,6 @@ pub struct StorageWrapper<'a, T, D> {
phantom: PhantomData<T>,
}

pub struct AntiStorage<'a>(pub &'a BitSet);

impl<'a, T, D> StorageWrapper<'a, T, D> {
pub fn new(data: D, entities: Ref<'a, Entities>) -> Self {
Self {
@@ -50,6 +48,11 @@ where
}
}

impl<'a, T: Component, D> DistinctStorage for StorageWrapper<'a, T, D> where
T::Storage: DistinctStorage
{
}

impl<'a, 'e, T, D> Not for &'a StorageWrapper<'e, T, D>
where
T: Component,
@@ -101,14 +104,18 @@ where
}
}

impl<'a> Join for AntiStorage<'a> {
type Mask = BitSetNot<&'a BitSet>;
type Type = ();
type Value = ();

fn open(self) -> (Self::Mask, ()) {
(BitSetNot(self.0), ())
}
impl<'a, 'e, T, D> ParJoin for &'a StorageWrapper<'e, T, D>
where
T: Component,
D: Deref<Target = MaskedStorage<T>>,
T::Storage: Sync,
{
}

fn get(_: &mut (), _: Index) {}
impl<'a, 'e, T, D> ParJoin for &'a mut StorageWrapper<'e, T, D>
where
T: Component,
D: DerefMut<Target = MaskedStorage<T>>,
T::Storage: Sync + DistinctStorage,
{
}

+ 3
- 1
async-ecs/src/storage/vec_storage.rs View File

@@ -2,7 +2,7 @@ use std::mem::MaybeUninit;

use crate::entity::Index;

use super::Storage;
use super::{DistinctStorage, Storage};

pub struct VecStorage<T>(Vec<MaybeUninit<T>>);

@@ -34,6 +34,8 @@ impl<T> Storage<T> for VecStorage<T> {
}
}

impl<T> DistinctStorage for VecStorage<T> {}

impl<T> Default for VecStorage<T> {
fn default() -> Self {
Self(Vec::new())


+ 7
- 0
parallel-iterator/Cargo.toml View File

@@ -0,0 +1,7 @@
[package]
name = "parallel-iterator"
version = "0.1.0"
authors = ["Bergmann89 <info@bergmann89.de>"]
edition = "2018"

[dependencies]

+ 61
- 0
parallel-iterator/src/for_each.rs View File

@@ -0,0 +1,61 @@
use super::{no_op::NoOpReducer, Consumer, Folder, ParallelIterator};

pub fn for_each<I, F, T>(iter: I, f: &F)
where
I: ParallelIterator<Item = T>,
F: Fn(T) + Sync,
T: Send,
{
let consumer = ForEachConsumer { f };

iter.drive(consumer)
}

struct ForEachConsumer<'f, F> {
f: &'f F,
}

impl<'f, F, T> Consumer<T> for ForEachConsumer<'f, F>
where
F: Fn(T) + Sync,
{
type Folder = ForEachConsumer<'f, F>;
type Reducer = NoOpReducer;
type Result = ();

fn split_off_left(&self) -> (Self, Self::Reducer) {
(Self { f: self.f }, NoOpReducer)
}

fn into_folder(self) -> Self::Folder {
self
}
}

impl<'f, F, T> Folder<T> for ForEachConsumer<'f, F>
where
F: Fn(T) + Sync,
{
type Result = ();

fn is_full(&self) -> bool {
false
}

fn complete(self) -> Self::Result {}

fn consume(self, item: T) -> Self {
(self.f)(item);

self
}

fn consume_iter<I>(self, iter: I) -> Self
where
I: IntoIterator<Item = T>,
{
iter.into_iter().for_each(self.f);

self
}
}

+ 121
- 0
parallel-iterator/src/lib.rs View File

@@ -0,0 +1,121 @@
mod for_each;
mod no_op;

pub trait ParallelIterator: Sized + Send {
type Item: Send;

fn for_each<F>(self, f: F)
where
F: Fn(Self::Item) + Sync + Send,
{
for_each::for_each(self, &f)
}

fn drive<C>(self, consumer: C) -> C::Result
where
C: Consumer<Self::Item>;

fn len_hint_opt(&self) -> Option<usize> {
None
}
}

pub trait IndexedParallelIterator: ParallelIterator {
fn drive_indexed<C>(self, consumer: C) -> C::Result
where
C: IndexedConsumer<Self::Item>;

fn len_hint(&self) -> usize;
}

pub trait Producer: Send + Sized {
type Item;

fn split(self) -> (Self, Option<Self>);

fn fold_with<F>(self, folder: F) -> F
where
F: Folder<Self::Item>;
}

pub trait IndexedProducer: Send + Sized {
type Item;
type IntoIter: Iterator<Item = Self::Item> + DoubleEndedIterator + ExactSizeIterator;

fn into_iter(self) -> Self::IntoIter;

fn min_len(&self) -> usize {
1
}

fn max_len(&self) -> usize {
usize::MAX
}

fn split_at(self, index: usize) -> (Self, Self);

fn fold_with<F>(self, folder: F) -> F
where
F: Folder<Self::Item>,
{
folder.consume_iter(self.into_iter())
}
}

pub trait Consumer<Item>: Send + Sized {
type Folder: Folder<Item, Result = Self::Result>;
type Reducer: Reducer<Self::Result> + Send;
type Result: Send;

fn split_off_left(&self) -> (Self, Self::Reducer);

fn into_folder(self) -> Self::Folder;

fn is_full(&self) -> bool {
false
}
}

pub trait IndexedConsumer<Item>: Consumer<Item> {
fn split_at(self, index: usize) -> (Self, Self, Self::Reducer);
}

pub trait Folder<Item>: Sized {
type Result;

fn is_full(&self) -> bool;

fn complete(self) -> Self::Result;

fn consume(self, item: Item) -> Self;

fn consume_iter<I>(mut self, iter: I) -> Self
where
I: IntoIterator<Item = Item>,
{
for item in iter {
self = self.consume(item);
if self.is_full() {
break;
}
}

self
}
}

pub trait Reducer<Result> {
fn reduce(self, left: Result, right: Result) -> Result;
}

pub trait Executor {
fn exec<P, C>(self, producer: P, consumer: C) -> C::Result
where
P: Producer,
C: Consumer<P::Item>;

fn exec_indexed<P, C>(self, producer: P, consumer: C) -> C::Result
where
P: IndexedProducer,
C: IndexedConsumer<P::Item>;
}

+ 7
- 0
parallel-iterator/src/no_op.rs View File

@@ -0,0 +1,7 @@
use super::Reducer;

pub struct NoOpReducer;

impl Reducer<()> for NoOpReducer {
fn reduce(self, _left: (), _right: ()) {}
}

Loading…
Cancel
Save