diff --git a/Cargo.toml b/Cargo.toml index d63df5c..1658552 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,8 +1,10 @@ [workspace] members = [ "async-ecs", - "async-ecs-derive" + "async-ecs-derive", + "parallel-iterator" ] [patch.crates-io] async-ecs-derive = { path = "async-ecs-derive" } +parallel-iterator = { path = "parallel-iterator" } diff --git a/async-ecs/Cargo.toml b/async-ecs/Cargo.toml index 830b201..ae7345e 100644 --- a/async-ecs/Cargo.toml +++ b/async-ecs/Cargo.toml @@ -12,6 +12,8 @@ hashbrown = "0.9" hibitset = { version = "0.6", default-features = false } log = "0.4" mopa = "0.2" +num_cpus = "1.13" +parallel-iterator = "0.1" rand = "0.7" thiserror = "1.0" tokio = { version = "0.3", features = [ "full", "net", "time", "rt-multi-thread" ] } diff --git a/async-ecs/src/access/join.rs b/async-ecs/src/access/join.rs index 535a3e6..8ee9714 100644 --- a/async-ecs/src/access/join.rs +++ b/async-ecs/src/access/join.rs @@ -9,7 +9,7 @@ use crate::{ resource::{Ref, RefMut, Resource}, }; -use super::{read::Read, write::Write}; +use super::{read::Read, write::Write, ParJoin}; pub trait Join { type Type; @@ -69,6 +69,8 @@ where } } +impl ParJoin for MaybeJoin where T: ParJoin {} + pub struct JoinIter { keys: BitIter, values: J::Value, @@ -157,6 +159,11 @@ macro_rules! define_tuple_join { unconstrained } } + + impl<$($from,)*> ParJoin for ($($from),*,) + where $($from: ParJoin),*, + ($(<$from as Join>::Mask,)*): BitAnd, + {} } } @@ -201,6 +208,13 @@ macro_rules! define_mutable_join { <&'a mut T as Join>::is_unconstrained() } } + + impl<'a, 'b, T> ParJoin for &'a mut $ty + where + &'a mut T: ParJoin, + T: Resource, + { + } }; } @@ -231,6 +245,13 @@ macro_rules! define_immutable_join { <&'a T as Join>::is_unconstrained() } } + + impl<'a, 'b, T> ParJoin for &'a $ty + where + &'a T: ParJoin, + T: Resource, + { + } }; } diff --git a/async-ecs/src/access/mod.rs b/async-ecs/src/access/mod.rs index 65c5d6b..9de219f 100644 --- a/async-ecs/src/access/mod.rs +++ b/async-ecs/src/access/mod.rs @@ -1,5 +1,6 @@ pub mod accessor; pub mod join; +pub mod par_join; pub mod read; pub mod read_storage; pub mod write; @@ -7,6 +8,7 @@ pub mod write_storage; pub use accessor::{Accessor, AccessorCow, AccessorType, StaticAccessor}; pub use join::Join; +pub use par_join::{JoinParIter, ParJoin}; pub use read::Read; pub use read_storage::ReadStorage; pub use write::Write; diff --git a/async-ecs/src/access/par_join.rs b/async-ecs/src/access/par_join.rs new file mode 100644 index 0000000..d64dd4e --- /dev/null +++ b/async-ecs/src/access/par_join.rs @@ -0,0 +1,113 @@ +use std::cell::UnsafeCell; + +use hibitset::BitSetLike; +use parallel_iterator::{Consumer, Executor, Folder, ParallelIterator, Producer}; + +use crate::misc::{BitIter, BitProducer, TokioExecutor}; + +use super::Join; + +pub trait ParJoin: Join { + fn par_join(self) -> JoinParIter + where + Self: Sized, + { + if ::is_unconstrained() { + log::warn!( + "`ParJoin` possibly iterating through all indices, you might've made a join with all `MaybeJoin`s, which is unbounded in length." + ); + } + + JoinParIter(self) + } +} + +pub struct JoinParIter(J); + +impl ParallelIterator for JoinParIter +where + J: Join + Send, + J::Type: Send, + J::Mask: Send + Sync, + J::Value: Send, +{ + type Item = J::Type; + + fn drive(self, consumer: C) -> C::Result + where + C: Consumer, + { + let (keys, values) = self.0.open(); + let values = UnsafeCell::new(values); + + let keys = BitIter::new(&keys, [0, 0, 0, keys.layer3()], [0; 3]); + + let producer = BitProducer::new(keys, 3); + let producer = JoinProducer::::new(producer, &values); + + TokioExecutor.exec(producer, consumer) + } +} + +struct JoinProducer<'a, J> +where + J: Join + Send, + J::Type: Send, + J::Mask: Send, + J::Value: Send, +{ + pub(crate) keys: BitProducer<'a, J::Mask>, + pub(crate) values: &'a UnsafeCell, +} + +impl<'a, J> JoinProducer<'a, J> +where + J: Join + Send, + J::Type: Send, + J::Mask: Send, + J::Value: Send, +{ + fn new(keys: BitProducer<'a, J::Mask>, values: &'a UnsafeCell) -> Self { + JoinProducer { keys, values } + } +} + +unsafe impl<'a, J> Send for JoinProducer<'a, J> +where + J: Join + Send, + J::Type: Send, + J::Value: Send + 'a, + J::Mask: Send + Sync + 'a, +{ +} + +impl<'a, J> Producer for JoinProducer<'a, J> +where + J: Join + Send, + J::Type: Send, + J::Value: Send + 'a, + J::Mask: Send + Sync + 'a, +{ + type Item = J::Type; + + fn split(self) -> (Self, Option) { + let (cur, other) = self.keys.split(); + let values = self.values; + let first = JoinProducer::new(cur, values); + let second = other.map(|o| JoinProducer::new(o, values)); + + (first, second) + } + + fn fold_with(self, folder: F) -> F + where + F: Folder, + { + let JoinProducer { values, keys, .. } = self; + let iter = keys + .iter + .map(|idx| unsafe { J::get(&mut *values.get(), idx) }); + + folder.consume_iter(iter) + } +} diff --git a/async-ecs/src/lib.rs b/async-ecs/src/lib.rs index 2dfa45d..fbb66dc 100644 --- a/async-ecs/src/lib.rs +++ b/async-ecs/src/lib.rs @@ -11,7 +11,9 @@ pub mod storage; pub mod system; pub mod world; -pub use access::{Join, ReadStorage, WriteStorage}; +pub use parallel_iterator::ParallelIterator; + +pub use access::{Join, ParJoin, ReadStorage, WriteStorage}; pub use dispatcher::Dispatcher; pub use resource::Resources; pub use storage::VecStorage; diff --git a/async-ecs/src/main.rs b/async-ecs/src/main.rs index 9cb1bd3..78ce9a5 100644 --- a/async-ecs/src/main.rs +++ b/async-ecs/src/main.rs @@ -2,8 +2,8 @@ use std::io::Error as IoError; use std::time::{Duration, Instant}; use async_ecs::{ - dispatcher::Error as DispatcherError, AsyncSystem, Dispatcher, Join, ReadStorage, System, - VecStorage, World, WriteStorage, + dispatcher::Error as DispatcherError, AsyncSystem, Dispatcher, Join, ParJoin, ParallelIterator, + ReadStorage, System, VecStorage, World, WriteStorage, }; use async_ecs_derive::Component; use log::info; @@ -17,14 +17,17 @@ fn main() -> Result<(), Error> { .format_timestamp_nanos() .init(); - Builder::new_multi_thread() + let rt = Builder::new_multi_thread() + .worker_threads(num_cpus::get() + 4) .enable_all() - .build()? - .block_on(async move { LocalSet::new().run_until(run()).await }) + .build()?; + + rt.block_on(async move { LocalSet::new().run_until(run()).await }) } async fn run() -> Result<(), Error> { info!("Application started!"); + info!(" num_cpus = {}", num_cpus::get()); let mut world = World::default(); world.register_component::(); @@ -43,8 +46,8 @@ async fn run() -> Result<(), Error> { info!("World initialized!"); let mut dispatcher = Dispatcher::builder() - .with(Move, "move", &[])? - .with(Accelerate, "accelerate", &["move"])? + .with(ParMove, "move", &[])? + .with(ParAccelerate, "accelerate", &["move"])? .build(); info!("Setup done!"); @@ -72,7 +75,7 @@ async fn run() -> Result<(), Error> { } const ENTITY_COUNT: usize = 1_000_000; -const REPEAT_COUNT: u32 = 10; +const REPEAT_COUNT: u32 = 100; #[derive(Debug, Component)] #[storage(VecStorage)] @@ -95,8 +98,11 @@ struct Acceleration { y: f64, } -struct Move; -struct Accelerate; +struct SeqMove; +struct SeqAccelerate; + +struct ParMove; +struct ParAccelerate; #[derive(Debug, Error)] enum Error { @@ -134,7 +140,7 @@ impl Acceleration { } } -impl<'a> System<'a> for Move { +impl<'a> System<'a> for SeqMove { type SystemData = (WriteStorage<'a, Position>, ReadStorage<'a, Velocity>); fn run(&mut self, (mut position, velocity): Self::SystemData) { @@ -142,21 +148,10 @@ impl<'a> System<'a> for Move { position.x += velocity.x; position.y += velocity.y; } - - /* - use specs::{prelude::ParallelIterator, ParJoin}; - - (&mut position, &velocity) - .par_join() - .for_each(|(position, velocity)| { - position.x += velocity.x; - position.y += velocity.y; - }); - */ } } -impl<'a> AsyncSystem<'a> for Accelerate { +impl<'a> AsyncSystem<'a> for SeqAccelerate { type SystemData = (WriteStorage<'a, Velocity>, ReadStorage<'a, Acceleration>); type Future = futures::future::Ready<()>; @@ -167,17 +162,35 @@ impl<'a> AsyncSystem<'a> for Accelerate { } futures::future::ready(()) + } +} - /* - use specs::{prelude::ParallelIterator, ParJoin}; +impl<'a> System<'a> for ParMove { + type SystemData = (WriteStorage<'a, Position>, ReadStorage<'a, Velocity>); + fn run(&mut self, (mut position, velocity): Self::SystemData) { + (&mut position, &velocity) + .par_join() + .for_each(|(position, velocity)| { + position.x += velocity.x; + position.y += velocity.y; + }); + } +} + +impl<'a> AsyncSystem<'a> for ParAccelerate { + type SystemData = (WriteStorage<'a, Velocity>, ReadStorage<'a, Acceleration>); + type Future = futures::future::Ready<()>; + + fn run(&mut self, (mut velocity, acceleration): Self::SystemData) -> Self::Future { (&mut velocity, &acceleration) .par_join() .for_each(|(velocity, acceleration)| { velocity.x += acceleration.x; velocity.y += acceleration.y; }); - */ + + futures::future::ready(()) } } diff --git a/async-ecs/src/misc/bit_average.rs b/async-ecs/src/misc/bit_average.rs new file mode 100644 index 0000000..25586e2 --- /dev/null +++ b/async-ecs/src/misc/bit_average.rs @@ -0,0 +1,302 @@ +pub fn bit_average(n: usize) -> Option { + #[cfg(target_pointer_width = "64")] + let average = bit_average_u64(n as u64).map(|n| n as usize); + + #[cfg(target_pointer_width = "32")] + let average = bit_average_u32(n as u32).map(|n| n as usize); + + average +} + +#[allow(clippy::many_single_char_names)] +#[cfg(any(test, target_pointer_width = "32"))] +fn bit_average_u32(n: u32) -> Option { + const PAR: [u32; 5] = [!0 / 0x3, !0 / 0x5, !0 / 0x11, !0 / 0x101, !0 / 0x10001]; + + let a = n - ((n >> 1) & PAR[0]); + let b = (a & PAR[1]) + ((a >> 2) & PAR[1]); + let c = (b + (b >> 4)) & PAR[2]; + let d = (c + (c >> 8)) & PAR[3]; + + let mut cur = d >> 16; + let count = (d + cur) & PAR[4]; + + if count <= 1 { + return None; + } + + let mut target = count / 2; + let mut result = 32; + + { + let mut descend = |child, child_stride, child_mask| { + if cur < target { + result -= 2 * child_stride; + target -= cur; + } + + cur = (child >> (result - child_stride)) & child_mask; + }; + + descend(c, 8, 16 - 1); // PAR[3] + descend(b, 4, 8 - 1); // PAR[2] + descend(a, 2, 4 - 1); // PAR[1] + descend(n, 1, 2 - 1); // PAR[0] + } + + if cur < target { + result -= 1; + } + + Some(result - 1) +} + +#[allow(clippy::many_single_char_names)] +#[cfg(any(test, target_pointer_width = "64"))] +fn bit_average_u64(n: u64) -> Option { + const PAR: [u64; 6] = [ + !0 / 0x3, + !0 / 0x5, + !0 / 0x11, + !0 / 0x101, + !0 / 0x10001, + !0 / 0x100000001, + ]; + + let a = n - ((n >> 1) & PAR[0]); + let b = (a & PAR[1]) + ((a >> 2) & PAR[1]); + let c = (b + (b >> 4)) & PAR[2]; + let d = (c + (c >> 8)) & PAR[3]; + let e = (d + (d >> 16)) & PAR[4]; + + let mut cur = e >> 32; + let count = (e + cur) & PAR[5]; + + if count <= 1 { + return None; + } + + let mut target = count / 2; + let mut result = 64; + + { + let mut descend = |child, child_stride, child_mask| { + if cur < target { + result -= 2 * child_stride; + target -= cur; + } + + cur = (child >> (result - child_stride)) & child_mask; + }; + + descend(d, 16, 256 - 1); // PAR[4] + descend(c, 8, 16 - 1); // PAR[3] + descend(b, 4, 8 - 1); // PAR[2] + descend(a, 2, 4 - 1); // PAR[1] + descend(n, 1, 2 - 1); // PAR[0] + } + + if cur < target { + result -= 1; + } + + Some(result - 1) +} + +#[cfg(test)] +mod test_bit_average { + use super::*; + + #[test] + fn parity_0_bit_average_u32() { + struct EvenParity(u32); + + impl Iterator for EvenParity { + type Item = u32; + fn next(&mut self) -> Option { + if self.0 == u32::max_value() { + return None; + } + self.0 += 1; + while self.0.count_ones() & 1 != 0 { + if self.0 == u32::max_value() { + return None; + } + self.0 += 1; + } + Some(self.0) + } + } + + let steps = 1000; + for i in 0..steps { + let pos = i * (u32::max_value() / steps); + for i in EvenParity(pos).take(steps as usize) { + let mask = (1 << bit_average_u32(i).unwrap_or(31)) - 1; + assert_eq!((i & mask).count_ones(), (i & !mask).count_ones(), "{:x}", i); + } + } + } + + #[test] + fn parity_1_bit_average_u32() { + struct OddParity(u32); + + impl Iterator for OddParity { + type Item = u32; + fn next(&mut self) -> Option { + if self.0 == u32::max_value() { + return None; + } + self.0 += 1; + while self.0.count_ones() & 1 == 0 { + if self.0 == u32::max_value() { + return None; + } + self.0 += 1; + } + Some(self.0) + } + } + + let steps = 1000; + for i in 0..steps { + let pos = i * (u32::max_value() / steps); + for i in OddParity(pos).take(steps as usize) { + let mask = (1 << bit_average_u32(i).unwrap_or(31)) - 1; + let a = (i & mask).count_ones(); + let b = (i & !mask).count_ones(); + if a < b { + assert_eq!(a + 1, b, "{:x}", i); + } else if b < a { + assert_eq!(a, b + 1, "{:x}", i); + } else { + panic!("Odd parity shouldn't split in exactly half"); + } + } + } + } + + #[test] + fn empty_bit_average_u32() { + assert_eq!(None, bit_average_u32(0)); + } + + #[test] + fn singleton_bit_average_u32() { + for i in 0..32 { + assert_eq!(None, bit_average_u32(1 << i), "{:x}", i); + } + } + + #[test] + fn parity_0_bit_average_u64() { + struct EvenParity(u64); + + impl Iterator for EvenParity { + type Item = u64; + fn next(&mut self) -> Option { + if self.0 == u64::max_value() { + return None; + } + self.0 += 1; + while self.0.count_ones() & 1 != 0 { + if self.0 == u64::max_value() { + return None; + } + self.0 += 1; + } + Some(self.0) + } + } + + let steps = 1000; + for i in 0..steps { + let pos = i * (u64::max_value() / steps); + for i in EvenParity(pos).take(steps as usize) { + let mask = (1 << bit_average_u64(i).unwrap_or(63)) - 1; + assert_eq!((i & mask).count_ones(), (i & !mask).count_ones(), "{:x}", i); + } + } + } + + #[test] + fn parity_1_bit_average_u64() { + struct OddParity(u64); + + impl Iterator for OddParity { + type Item = u64; + fn next(&mut self) -> Option { + if self.0 == u64::max_value() { + return None; + } + self.0 += 1; + while self.0.count_ones() & 1 == 0 { + if self.0 == u64::max_value() { + return None; + } + self.0 += 1; + } + Some(self.0) + } + } + + let steps = 1000; + for i in 0..steps { + let pos = i * (u64::max_value() / steps); + for i in OddParity(pos).take(steps as usize) { + let mask = (1 << bit_average_u64(i).unwrap_or(63)) - 1; + let a = (i & mask).count_ones(); + let b = (i & !mask).count_ones(); + if a < b { + assert_eq!(a + 1, b, "{:x}", i); + } else if b < a { + assert_eq!(a, b + 1, "{:x}", i); + } else { + panic!("Odd parity shouldn't split in exactly half"); + } + } + } + } + + #[test] + fn empty_bit_average_u64() { + assert_eq!(None, bit_average_u64(0)); + } + + #[test] + fn singleton_bit_average_u64() { + for i in 0..64 { + assert_eq!(None, bit_average_u64(1 << i), "{:x}", i); + } + } + + #[test] + fn bit_average_agree_u32_u64() { + let steps = 1000; + for i in 0..steps { + let pos = i * (u32::max_value() / steps); + for i in pos..steps { + assert_eq!( + bit_average_u32(i), + bit_average_u64(i as u64).map(|n| n as u32), + "{:x}", + i + ); + } + } + } + + #[test] + fn specific_values() { + assert_eq!(Some(4), bit_average_u32(0b10110)); + assert_eq!(Some(5), bit_average_u32(0b100010)); + assert_eq!(None, bit_average_u32(0)); + assert_eq!(None, bit_average_u32(1)); + + assert_eq!(Some(4), bit_average_u64(0b10110)); + assert_eq!(Some(5), bit_average_u64(0b100010)); + assert_eq!(None, bit_average_u64(0)); + assert_eq!(None, bit_average_u64(1)); + } +} diff --git a/async-ecs/src/misc/bit_iter.rs b/async-ecs/src/misc/bit_iter.rs new file mode 100644 index 0000000..9cc7364 --- /dev/null +++ b/async-ecs/src/misc/bit_iter.rs @@ -0,0 +1,100 @@ +use hibitset::BitSetLike; + +#[derive(Debug, Clone)] +pub struct BitIter { + pub(crate) set: T, + pub(crate) masks: [usize; LAYERS], + pub(crate) prefix: [u32; LAYERS - 1], +} + +impl BitIter { + pub fn new(set: T, masks: [usize; LAYERS], prefix: [u32; LAYERS - 1]) -> Self { + Self { set, masks, prefix } + } +} + +impl BitIter { + pub fn contains(&self, i: u32) -> bool { + self.set.contains(i) + } +} + +#[derive(PartialEq)] +pub(crate) enum State { + Empty, + Continue, + Value(u32), +} + +impl Iterator for BitIter +where + T: BitSetLike, +{ + type Item = u32; + + fn next(&mut self) -> Option { + use self::State::*; + + 'find: loop { + for level in 0..LAYERS { + match self.handle_level(level) { + Value(v) => return Some(v), + Continue => continue 'find, + Empty => {} + } + } + + return None; + } + } +} + +impl BitIter { + pub(crate) fn handle_level(&mut self, level: usize) -> State { + use self::State::*; + + if self.masks[level] == 0 { + Empty + } else { + let first_bit = self.masks[level].trailing_zeros(); + self.masks[level] &= !(1 << first_bit); + + let idx = self.prefix.get(level).cloned().unwrap_or(0) | first_bit; + + if level == 0 { + Value(idx) + } else { + self.masks[level - 1] = self.set.get_from_layer(level - 1, idx as usize); + self.prefix[level - 1] = idx << BITS; + + Continue + } + } + } +} + +const LAYERS: usize = 4; + +#[cfg(target_pointer_width = "64")] +pub const BITS: usize = 6; + +#[cfg(target_pointer_width = "32")] +pub const BITS: usize = 5; + +#[cfg(test)] +mod tests { + use hibitset::{BitSet, BitSetLike}; + + #[test] + fn iterator_clone() { + let mut set = BitSet::new(); + + set.add(1); + set.add(3); + + let iter = set.iter().skip(1); + for (a, b) in iter.clone().zip(iter) { + assert_eq!(a, b); + } + } +} diff --git a/async-ecs/src/misc/bit_producer.rs b/async-ecs/src/misc/bit_producer.rs new file mode 100644 index 0000000..43859ad --- /dev/null +++ b/async-ecs/src/misc/bit_producer.rs @@ -0,0 +1,93 @@ +use hibitset::BitSetLike; +use parallel_iterator::{Folder, Producer}; + +use crate::misc::bit_average; + +use super::BitIter; + +pub struct BitProducer<'a, T> { + pub(crate) iter: BitIter<&'a T>, + pub(crate) splits: u8, +} + +impl<'a, T> BitProducer<'a, T> +where + T: BitSetLike, +{ + pub fn new(iter: BitIter<&'a T>, splits: u8) -> Self { + Self { iter, splits } + } + + fn handle_level(&mut self, level: usize) -> Option { + if self.iter.masks[level] == 0 { + None + } else { + let level_prefix = self.iter.prefix.get(level).cloned().unwrap_or(0); + let first_bit = self.iter.masks[level].trailing_zeros(); + + bit_average(self.iter.masks[level]) + .map(|average_bit| { + let mask = (1 << average_bit) - 1; + let mut other = BitProducer::new( + BitIter::new(self.iter.set, [0; LAYERS], [0; LAYERS - 1]), + self.splits, + ); + + other.iter.masks[level] = self.iter.masks[level] & !mask; + other.iter.prefix[level - 1] = (level_prefix | average_bit as u32) << BITS; + other.iter.prefix[level..].copy_from_slice(&self.iter.prefix[level..]); + + self.iter.masks[level] &= mask; + self.iter.prefix[level - 1] = (level_prefix | first_bit) << BITS; + + other + }) + .or_else(|| { + let idx = level_prefix as usize | first_bit as usize; + + self.iter.prefix[level - 1] = (idx as u32) << BITS; + self.iter.masks[level] = 0; + self.iter.masks[level - 1] = self.iter.set.get_from_layer(level - 1, idx); + + None + }) + } + } +} + +impl<'a, T> Producer for BitProducer<'a, T> +where + T: BitSetLike + Send + Sync, +{ + type Item = u32; + + fn split(mut self) -> (Self, Option) { + let other = { + let top_layer = LAYERS - 1; + let mut h = self.handle_level(top_layer); + + for i in 1..self.splits { + h = h.or_else(|| self.handle_level(top_layer - i as usize)); + } + + h + }; + + (self, other) + } + + fn fold_with(self, folder: F) -> F + where + F: Folder, + { + folder.consume_iter(self.iter) + } +} + +const LAYERS: usize = 4; + +#[cfg(target_pointer_width = "64")] +pub const BITS: usize = 6; + +#[cfg(target_pointer_width = "32")] +pub const BITS: usize = 5; diff --git a/async-ecs/src/misc/mod.rs b/async-ecs/src/misc/mod.rs index 061f3c2..ae21076 100644 --- a/async-ecs/src/misc/mod.rs +++ b/async-ecs/src/misc/mod.rs @@ -1,7 +1,15 @@ pub mod bit_and; +pub mod bit_average; +pub mod bit_iter; +pub mod bit_producer; pub mod split; +pub mod tokio_executor; pub mod try_default; pub use bit_and::BitAnd; +pub use bit_average::bit_average; +pub use bit_iter::BitIter; +pub use bit_producer::BitProducer; pub use split::Split; +pub use tokio_executor::TokioExecutor; pub use try_default::TryDefault; diff --git a/async-ecs/src/misc/tokio_executor.rs b/async-ecs/src/misc/tokio_executor.rs new file mode 100644 index 0000000..f6b21ba --- /dev/null +++ b/async-ecs/src/misc/tokio_executor.rs @@ -0,0 +1,90 @@ +use std::mem::transmute; + +use futures::{ + executor::block_on, + future::{BoxFuture, Future, FutureExt}, + join, +}; +use tokio::task::{block_in_place, spawn}; + +use parallel_iterator::{ + Consumer, Executor, Folder, IndexedConsumer, IndexedProducer, Producer, Reducer, +}; + +pub struct TokioExecutor; + +impl Executor for TokioExecutor { + fn exec(self, producer: P, consumer: C) -> C::Result + where + P: Producer, + C: Consumer, + { + block_in_place(|| block_on(exec(2 * num_cpus::get(), producer, consumer))) + } + + fn exec_indexed(self, producer: P, consumer: C) -> C::Result + where + P: IndexedProducer, + C: IndexedConsumer, + { + let _producer = producer; + let _consumer = consumer; + + unimplemented!() + } +} + +fn exec<'a, P, C>(splits: usize, producer: P, consumer: C) -> BoxFuture<'a, C::Result> +where + P: Producer + 'a, + C: Consumer + 'a, +{ + async move { + if consumer.is_full() { + consumer.into_folder().complete() + } else if splits > 0 { + match producer.split() { + (left_producer, Some(right_producer)) => { + let ((left_consumer, reducer), right_consumer) = + (consumer.split_off_left(), consumer); + + let left = run_as_task(exec(splits / 2, left_producer, left_consumer)); + let right = run_as_task(exec(splits / 2, right_producer, right_consumer)); + + let (left_result, right_result) = join!(left, right); + + reducer.reduce(left_result, right_result) + } + (producer, None) => producer.fold_with(consumer.into_folder()).complete(), + } + } else { + producer.fold_with(consumer.into_folder()).complete() + } + } + .boxed() +} + +struct Pointer(*mut T); + +unsafe impl Send for Pointer {} + +async fn run_as_task<'a, T, F>(f: F) -> T +where + T: Send + 'a, + F: Future + Send + 'a, +{ + let mut result = None; + let r = Pointer(&mut result as *mut _); + + let task: BoxFuture<'a, ()> = async move { + unsafe { + *r.0 = Some(f.await); + } + } + .boxed(); + let task: BoxFuture<'static, ()> = unsafe { transmute(task) }; + + spawn(task).await.expect("Error in iterator executor"); + + result.unwrap() +} diff --git a/async-ecs/src/storage/anti_storage.rs b/async-ecs/src/storage/anti_storage.rs new file mode 100644 index 0000000..7bf67fe --- /dev/null +++ b/async-ecs/src/storage/anti_storage.rs @@ -0,0 +1,26 @@ +use hibitset::{BitSet, BitSetNot}; + +use crate::{ + access::{Join, ParJoin}, + entity::Index, +}; + +use super::DistinctStorage; + +pub struct AntiStorage<'a>(pub &'a BitSet); + +impl<'a> DistinctStorage for AntiStorage<'a> {} + +impl<'a> Join for AntiStorage<'a> { + type Mask = BitSetNot<&'a BitSet>; + type Type = (); + type Value = (); + + fn open(self) -> (Self::Mask, ()) { + (BitSetNot(self.0), ()) + } + + fn get(_: &mut (), _: Index) {} +} + +impl<'a> ParJoin for AntiStorage<'a> {} diff --git a/async-ecs/src/storage/mod.rs b/async-ecs/src/storage/mod.rs index 4d5ba04..f7a18df 100644 --- a/async-ecs/src/storage/mod.rs +++ b/async-ecs/src/storage/mod.rs @@ -1,7 +1,9 @@ +mod anti_storage; mod masked_storage; mod storage_wrapper; mod vec_storage; +pub use anti_storage::AntiStorage; pub use masked_storage::MaskedStorage; pub use storage_wrapper::StorageWrapper; pub use vec_storage::VecStorage; @@ -13,3 +15,5 @@ pub trait Storage: TryDefault { fn get_mut(&mut self, index: Index) -> &mut T; fn insert(&mut self, index: Index, value: T); } + +pub trait DistinctStorage {} diff --git a/async-ecs/src/storage/storage_wrapper.rs b/async-ecs/src/storage/storage_wrapper.rs index 807121d..96b3bdf 100644 --- a/async-ecs/src/storage/storage_wrapper.rs +++ b/async-ecs/src/storage/storage_wrapper.rs @@ -1,10 +1,10 @@ use std::marker::PhantomData; use std::ops::{Deref, DerefMut, Not}; -use hibitset::{BitSet, BitSetNot}; +use hibitset::BitSet; use crate::{ - access::Join, + access::{Join, ParJoin}, component::Component, entity::{Entities, Entity, Index}, error::Error, @@ -12,7 +12,7 @@ use crate::{ storage::MaskedStorage, }; -use super::Storage; +use super::{AntiStorage, DistinctStorage, Storage}; pub struct StorageWrapper<'a, T, D> { data: D, @@ -20,8 +20,6 @@ pub struct StorageWrapper<'a, T, D> { phantom: PhantomData, } -pub struct AntiStorage<'a>(pub &'a BitSet); - impl<'a, T, D> StorageWrapper<'a, T, D> { pub fn new(data: D, entities: Ref<'a, Entities>) -> Self { Self { @@ -50,6 +48,11 @@ where } } +impl<'a, T: Component, D> DistinctStorage for StorageWrapper<'a, T, D> where + T::Storage: DistinctStorage +{ +} + impl<'a, 'e, T, D> Not for &'a StorageWrapper<'e, T, D> where T: Component, @@ -101,14 +104,18 @@ where } } -impl<'a> Join for AntiStorage<'a> { - type Mask = BitSetNot<&'a BitSet>; - type Type = (); - type Value = (); - - fn open(self) -> (Self::Mask, ()) { - (BitSetNot(self.0), ()) - } +impl<'a, 'e, T, D> ParJoin for &'a StorageWrapper<'e, T, D> +where + T: Component, + D: Deref>, + T::Storage: Sync, +{ +} - fn get(_: &mut (), _: Index) {} +impl<'a, 'e, T, D> ParJoin for &'a mut StorageWrapper<'e, T, D> +where + T: Component, + D: DerefMut>, + T::Storage: Sync + DistinctStorage, +{ } diff --git a/async-ecs/src/storage/vec_storage.rs b/async-ecs/src/storage/vec_storage.rs index 2e3972f..39b491f 100644 --- a/async-ecs/src/storage/vec_storage.rs +++ b/async-ecs/src/storage/vec_storage.rs @@ -2,7 +2,7 @@ use std::mem::MaybeUninit; use crate::entity::Index; -use super::Storage; +use super::{DistinctStorage, Storage}; pub struct VecStorage(Vec>); @@ -34,6 +34,8 @@ impl Storage for VecStorage { } } +impl DistinctStorage for VecStorage {} + impl Default for VecStorage { fn default() -> Self { Self(Vec::new()) diff --git a/parallel-iterator/Cargo.toml b/parallel-iterator/Cargo.toml new file mode 100644 index 0000000..103b40a --- /dev/null +++ b/parallel-iterator/Cargo.toml @@ -0,0 +1,7 @@ +[package] +name = "parallel-iterator" +version = "0.1.0" +authors = ["Bergmann89 "] +edition = "2018" + +[dependencies] diff --git a/parallel-iterator/src/for_each.rs b/parallel-iterator/src/for_each.rs new file mode 100644 index 0000000..dc50eff --- /dev/null +++ b/parallel-iterator/src/for_each.rs @@ -0,0 +1,61 @@ +use super::{no_op::NoOpReducer, Consumer, Folder, ParallelIterator}; + +pub fn for_each(iter: I, f: &F) +where + I: ParallelIterator, + F: Fn(T) + Sync, + T: Send, +{ + let consumer = ForEachConsumer { f }; + + iter.drive(consumer) +} + +struct ForEachConsumer<'f, F> { + f: &'f F, +} + +impl<'f, F, T> Consumer for ForEachConsumer<'f, F> +where + F: Fn(T) + Sync, +{ + type Folder = ForEachConsumer<'f, F>; + type Reducer = NoOpReducer; + type Result = (); + + fn split_off_left(&self) -> (Self, Self::Reducer) { + (Self { f: self.f }, NoOpReducer) + } + + fn into_folder(self) -> Self::Folder { + self + } +} + +impl<'f, F, T> Folder for ForEachConsumer<'f, F> +where + F: Fn(T) + Sync, +{ + type Result = (); + + fn is_full(&self) -> bool { + false + } + + fn complete(self) -> Self::Result {} + + fn consume(self, item: T) -> Self { + (self.f)(item); + + self + } + + fn consume_iter(self, iter: I) -> Self + where + I: IntoIterator, + { + iter.into_iter().for_each(self.f); + + self + } +} diff --git a/parallel-iterator/src/lib.rs b/parallel-iterator/src/lib.rs new file mode 100644 index 0000000..e35506b --- /dev/null +++ b/parallel-iterator/src/lib.rs @@ -0,0 +1,121 @@ +mod for_each; +mod no_op; + +pub trait ParallelIterator: Sized + Send { + type Item: Send; + + fn for_each(self, f: F) + where + F: Fn(Self::Item) + Sync + Send, + { + for_each::for_each(self, &f) + } + + fn drive(self, consumer: C) -> C::Result + where + C: Consumer; + + fn len_hint_opt(&self) -> Option { + None + } +} + +pub trait IndexedParallelIterator: ParallelIterator { + fn drive_indexed(self, consumer: C) -> C::Result + where + C: IndexedConsumer; + + fn len_hint(&self) -> usize; +} + +pub trait Producer: Send + Sized { + type Item; + + fn split(self) -> (Self, Option); + + fn fold_with(self, folder: F) -> F + where + F: Folder; +} + +pub trait IndexedProducer: Send + Sized { + type Item; + type IntoIter: Iterator + DoubleEndedIterator + ExactSizeIterator; + + fn into_iter(self) -> Self::IntoIter; + + fn min_len(&self) -> usize { + 1 + } + + fn max_len(&self) -> usize { + usize::MAX + } + + fn split_at(self, index: usize) -> (Self, Self); + + fn fold_with(self, folder: F) -> F + where + F: Folder, + { + folder.consume_iter(self.into_iter()) + } +} + +pub trait Consumer: Send + Sized { + type Folder: Folder; + type Reducer: Reducer + Send; + type Result: Send; + + fn split_off_left(&self) -> (Self, Self::Reducer); + + fn into_folder(self) -> Self::Folder; + + fn is_full(&self) -> bool { + false + } +} + +pub trait IndexedConsumer: Consumer { + fn split_at(self, index: usize) -> (Self, Self, Self::Reducer); +} + +pub trait Folder: Sized { + type Result; + + fn is_full(&self) -> bool; + + fn complete(self) -> Self::Result; + + fn consume(self, item: Item) -> Self; + + fn consume_iter(mut self, iter: I) -> Self + where + I: IntoIterator, + { + for item in iter { + self = self.consume(item); + if self.is_full() { + break; + } + } + + self + } +} + +pub trait Reducer { + fn reduce(self, left: Result, right: Result) -> Result; +} + +pub trait Executor { + fn exec(self, producer: P, consumer: C) -> C::Result + where + P: Producer, + C: Consumer; + + fn exec_indexed(self, producer: P, consumer: C) -> C::Result + where + P: IndexedProducer, + C: IndexedConsumer; +} diff --git a/parallel-iterator/src/no_op.rs b/parallel-iterator/src/no_op.rs new file mode 100644 index 0000000..0f3f023 --- /dev/null +++ b/parallel-iterator/src/no_op.rs @@ -0,0 +1,7 @@ +use super::Reducer; + +pub struct NoOpReducer; + +impl Reducer<()> for NoOpReducer { + fn reduce(self, _left: (), _right: ()) {} +}