diff --git a/async-ecs/Cargo.toml b/async-ecs/Cargo.toml index 1dab17d..58fd546 100644 --- a/async-ecs/Cargo.toml +++ b/async-ecs/Cargo.toml @@ -13,3 +13,4 @@ log = "0.4" mopa = "0.2" rand = "0.7" thiserror = "1.0" +tokio = { version = "0.3", features = [ "full", "net", "time", "rt-multi-thread" ] } diff --git a/async-ecs/src/system/accessor.rs b/async-ecs/src/access/accessor.rs similarity index 66% rename from async-ecs/src/system/accessor.rs rename to async-ecs/src/access/accessor.rs index d5ba637..1d4fbe9 100644 --- a/async-ecs/src/system/accessor.rs +++ b/async-ecs/src/access/accessor.rs @@ -1,16 +1,28 @@ use std::marker::PhantomData; use std::ops::Deref; -use crate::resource::ResourceId; - -use super::{DynamicSystemData, System, SystemData}; +use crate::{ + resource::ResourceId, + system::{DynamicSystemData, System, SystemData}, +}; pub trait Accessor: Sized { - fn try_new() -> Option; + fn reads(&self) -> Vec { + Vec::new() + } - fn reads(&self) -> Vec; + fn writes(&self) -> Vec { + Vec::new() + } - fn writes(&self) -> Vec; + fn try_new() -> Option { + None + } +} + +#[derive(Default)] +pub struct StaticAccessor { + marker: PhantomData T>, } pub enum AccessorCow<'a, 'b, T> @@ -19,49 +31,30 @@ where T: System<'a> + ?Sized, 'a: 'b, { - /// A reference to an accessor. - Ref(&'b AccessorType<'a, T>), - /// An owned accessor. + Borrow(&'b AccessorType<'a, T>), Owned(AccessorType<'a, T>), } -#[derive(Default)] -pub struct StaticAccessor { - marker: PhantomData T>, -} - pub type AccessorType<'a, T> = <>::SystemData as DynamicSystemData<'a>>::Accessor; -/* Accessor */ - -impl Accessor for () { - fn try_new() -> Option { - None - } - - fn reads(&self) -> Vec { - Vec::new() - } - - fn writes(&self) -> Vec { - Vec::new() - } -} +/* StaticAccessor */ -impl Accessor for PhantomData +impl<'a, T> Accessor for StaticAccessor where - T: ?Sized, + T: SystemData<'a>, { fn try_new() -> Option { - None + Some(StaticAccessor { + marker: PhantomData, + }) } fn reads(&self) -> Vec { - Vec::new() + T::reads() } fn writes(&self) -> Vec { - Vec::new() + T::writes() } } @@ -77,29 +70,8 @@ where fn deref(&self) -> &AccessorType<'a, T> { match *self { - AccessorCow::Ref(r) => &*r, + AccessorCow::Borrow(r) => &*r, AccessorCow::Owned(ref o) => o, } } } - -/* StaticAccessor */ - -impl<'a, T> Accessor for StaticAccessor -where - T: SystemData<'a>, -{ - fn try_new() -> Option { - Some(StaticAccessor { - marker: PhantomData, - }) - } - - fn reads(&self) -> Vec { - T::reads() - } - - fn writes(&self) -> Vec { - T::writes() - } -} diff --git a/async-ecs/src/access/mod.rs b/async-ecs/src/access/mod.rs index 8ec7313..1eb7463 100644 --- a/async-ecs/src/access/mod.rs +++ b/async-ecs/src/access/mod.rs @@ -1,8 +1,10 @@ +pub mod accessor; pub mod read; pub mod read_storage; pub mod write; pub mod write_storage; +pub use accessor::{Accessor, AccessorCow, AccessorType, StaticAccessor}; pub use read::Read; pub use read_storage::ReadStorage; pub use write::Write; diff --git a/async-ecs/src/dispatcher/builder.rs b/async-ecs/src/dispatcher/builder.rs new file mode 100644 index 0000000..9396b6c --- /dev/null +++ b/async-ecs/src/dispatcher/builder.rs @@ -0,0 +1,376 @@ +use std::collections::hash_map::{Entry, HashMap}; +use std::fmt::Debug; + +use tokio::{spawn, sync::watch::channel}; + +use crate::{access::Accessor, resource::ResourceId, system::System}; + +use super::{task::execute, BoxedDispatchable, Dispatcher, Error, Receiver, Sender, SharedWorld}; + +pub struct Builder { + next_id: SystemId, + items: HashMap, + names: HashMap, +} + +struct Item { + name: String, + system: BoxedDispatchable, + + sender: Sender, + receiver: Receiver, + receivers: Vec, + + reads: Vec, + writes: Vec, + dependencies: Vec, +} + +#[derive(Default, Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +struct SystemId(pub usize); + +impl Builder { + pub fn build(self) -> Dispatcher { + let receivers = self + .final_systems() + .into_iter() + .map(|id| self.items.get(&id).unwrap().receiver.clone()) + .collect(); + + let world = SharedWorld::default(); + let (sender, receiver) = channel(()); + + for (_, item) in self.items.into_iter() { + let name = item.name; + let sender = item.sender; + let system = item.system; + let receivers = if item.dependencies.is_empty() { + vec![receiver.clone()] + } else { + item.receivers + }; + + spawn(execute(name, system, sender, receivers, world.clone())); + } + + Dispatcher { + sender, + receivers, + world, + } + } + + pub fn with(mut self, system: S, name: &str, dependencies: &[&str]) -> Result + where + S: for<'s> System<'s> + Send + 'static, + { + self.add(system, name, dependencies)?; + + Ok(self) + } + + pub fn add( + &mut self, + system: S, + name: &str, + dependencies: &[&str], + ) -> Result<&mut Self, Error> + where + S: for<'s> System<'s> + Send + 'static, + { + let name = name.to_owned(); + let id = self.next_id(); + let id = match self.names.entry(name.clone()) { + Entry::Vacant(e) => Ok(*e.insert(id)), + Entry::Occupied(e) => Err(Error::NameAlreadyRegistered(e.key().into())), + }?; + + let mut reads = system.accessor().reads(); + let mut writes = system.accessor().writes(); + + reads.sort(); + writes.sort(); + + reads.dedup(); + writes.dedup(); + + let mut dependencies = dependencies + .iter() + .map(|name| { + self.names + .get(*name) + .map(Clone::clone) + .ok_or_else(|| Error::DependencyWasNotFound((*name).into())) + }) + .collect::, _>>()?; + + for read in &reads { + for (key, value) in &self.items { + if value.writes.contains(read) { + dependencies.push(*key); + } + } + } + + for write in &writes { + for (key, value) in &self.items { + if value.reads.contains(write) || value.writes.contains(write) { + dependencies.push(*key); + } + } + } + + self.reduce_dependencies(&mut dependencies); + + let receivers = dependencies + .iter() + .map(|id| self.items.get(id).unwrap().receiver.clone()) + .collect(); + + let item = match self.items.entry(id) { + Entry::Vacant(e) => e.insert(Item::new(name, system)), + Entry::Occupied(_) => panic!("Item was already created!"), + }; + + item.reads = reads; + item.writes = writes; + item.receivers = receivers; + item.dependencies = dependencies; + + Ok(self) + } + + fn final_systems(&self) -> Vec { + let mut ret = self.items.keys().map(Clone::clone).collect(); + + self.reduce_dependencies(&mut ret); + + ret + } + + fn reduce_dependencies(&self, dependencies: &mut Vec) { + dependencies.sort(); + dependencies.dedup(); + + let mut remove_indices = Vec::new(); + for (i, a) in dependencies.iter().enumerate() { + for (j, b) in dependencies.iter().enumerate() { + if self.depends_on(a, b) { + remove_indices.push(j); + } else if self.depends_on(b, a) { + remove_indices.push(i); + } + } + } + + remove_indices.sort(); + remove_indices.dedup(); + remove_indices.reverse(); + + for i in remove_indices { + dependencies.remove(i); + } + } + + fn depends_on(&self, a: &SystemId, b: &SystemId) -> bool { + let item = self.items.get(a).unwrap(); + + if item.dependencies.contains(b) { + return true; + } + + for d in &item.dependencies { + if self.depends_on(d, b) { + return true; + } + } + + false + } + + fn next_id(&mut self) -> SystemId { + self.next_id.0 += 1; + + self.next_id + } +} + +impl Default for Builder { + fn default() -> Self { + Self { + next_id: SystemId(0), + items: HashMap::new(), + names: HashMap::new(), + } + } +} + +impl Item { + fn new(name: String, system: S) -> Self + where + S: for<'s> System<'s> + Send + 'static, + { + let (sender, receiver) = channel(()); + + Self { + name, + system: Box::new(system), + + sender, + receiver, + receivers: Vec::new(), + + reads: Vec::new(), + writes: Vec::new(), + dependencies: Vec::new(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use crate::{access::AccessorCow, system::DynamicSystemData, world::World}; + + #[test] + fn dependencies_on_read_and_write() { + /* + - Systems ------------------------------------ + Id: 1 2 3 4 5 + + - Resources ---------------------------------- + Read: A A B C A + Write: B C D D BCD + + - Dependencies Total ------------------------- + | | | | | + |<--------------| | | + | | | | | + | |<--------------| | + | | |<------| | + | | | | | + |<------------------------------| + | |<----------------------| + | | |<--------------| + | | | |<------| + | | | | | + + - Dependencies Reduced ----------------------- + | | | | | + |<--------------| | | + | | | | | + | |<--------------| | + | | |<------| | + | | | | | + | | | |<------| + | | | | | + */ + + struct ResA; + struct ResB; + struct ResC; + struct ResD; + + let sys1 = TestSystem::new( + vec![ResourceId::new::()], + vec![ResourceId::new::()], + ); + let sys2 = TestSystem::new( + vec![ResourceId::new::()], + vec![ResourceId::new::()], + ); + let sys3 = TestSystem::new( + vec![ResourceId::new::()], + vec![ResourceId::new::()], + ); + let sys4 = TestSystem::new( + vec![ResourceId::new::()], + vec![ResourceId::new::()], + ); + let sys5 = TestSystem::new( + vec![ResourceId::new::()], + vec![ + ResourceId::new::(), + ResourceId::new::(), + ResourceId::new::(), + ], + ); + + let dispatcher = Dispatcher::builder() + .with(sys1, "sys1", &[]) + .unwrap() + .with(sys2, "sys2", &[]) + .unwrap() + .with(sys3, "sys3", &[]) + .unwrap() + .with(sys4, "sys4", &[]) + .unwrap() + .with(sys5, "sys5", &[]) + .unwrap(); + + let sys1 = dispatcher.items.get(&SystemId(1)).unwrap(); + let sys2 = dispatcher.items.get(&SystemId(2)).unwrap(); + let sys3 = dispatcher.items.get(&SystemId(3)).unwrap(); + let sys4 = dispatcher.items.get(&SystemId(4)).unwrap(); + let sys5 = dispatcher.items.get(&SystemId(5)).unwrap(); + + assert_eq!(sys1.dependencies, vec![]); + assert_eq!(sys2.dependencies, vec![]); + assert_eq!(sys3.dependencies, vec![SystemId(1)]); + assert_eq!(sys4.dependencies, vec![SystemId(2), SystemId(3)]); + assert_eq!(sys5.dependencies, vec![SystemId(4)]); + assert_eq!(dispatcher.final_systems(), vec![SystemId(5)]); + } + + struct TestSystem { + accessor: TestAccessor, + } + + impl TestSystem { + fn new(reads: Vec, writes: Vec) -> Self { + Self { + accessor: TestAccessor { reads, writes }, + } + } + } + + impl<'a> System<'a> for TestSystem { + type SystemData = TestData; + + fn run(&mut self, _data: Self::SystemData) { + unimplemented!() + } + + fn accessor<'b>(&'b self) -> AccessorCow<'a, 'b, Self> { + AccessorCow::Borrow(&self.accessor) + } + } + + struct TestData; + + impl<'a> DynamicSystemData<'a> for TestData { + type Accessor = TestAccessor; + + fn setup(_accessor: &Self::Accessor, _world: &mut World) {} + + fn fetch(_access: &Self::Accessor, _world: &'a World) -> Self { + TestData + } + } + + struct TestAccessor { + reads: Vec, + writes: Vec, + } + + impl Accessor for TestAccessor { + fn reads(&self) -> Vec { + self.reads.clone() + } + + fn writes(&self) -> Vec { + self.writes.clone() + } + } +} diff --git a/async-ecs/src/dispatcher/dispatchable.rs b/async-ecs/src/dispatcher/dispatchable.rs new file mode 100644 index 0000000..d94df2d --- /dev/null +++ b/async-ecs/src/dispatcher/dispatchable.rs @@ -0,0 +1,21 @@ +use crate::{ + system::{DynamicSystemData, System}, + world::World, +}; + +pub type BoxedDispatchable = Box Dispatchable<'a> + Send>; + +pub trait Dispatchable<'a> { + fn run(&mut self, world: &'a World); +} + +impl<'a, T> Dispatchable<'a> for T +where + T: System<'a>, +{ + fn run(&mut self, world: &'a World) { + let data = T::SystemData::fetch(&self.accessor(), world); + + self.run(data); + } +} diff --git a/async-ecs/src/dispatcher/error.rs b/async-ecs/src/dispatcher/error.rs new file mode 100644 index 0000000..3e9146e --- /dev/null +++ b/async-ecs/src/dispatcher/error.rs @@ -0,0 +1,18 @@ +use std::fmt::Debug; + +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum Error { + #[error("A System with this name was already registered: {0}!")] + NameAlreadyRegistered(String), + + #[error("Dependency of the given system was not found: {0}!")] + DependencyWasNotFound(String), + + #[error("Unable to start dispatching!")] + DispatchSend, + + #[error("Unable to wait for systems to finish!")] + DispatchReceive, +} diff --git a/async-ecs/src/dispatcher/mod.rs b/async-ecs/src/dispatcher/mod.rs new file mode 100644 index 0000000..daa509f --- /dev/null +++ b/async-ecs/src/dispatcher/mod.rs @@ -0,0 +1,63 @@ +pub mod builder; +pub mod dispatchable; +pub mod error; +pub mod task; + +pub use builder::Builder; +pub use dispatchable::{BoxedDispatchable, Dispatchable}; +pub use error::Error; + +use std::cell::RefCell; +use std::ops::Deref; +use std::sync::Arc; + +use tokio::sync::watch::{Receiver as WatchReceiver, Sender as WatchSender}; + +use crate::world::World; + +pub struct Dispatcher { + sender: Sender, + receivers: Vec, + world: SharedWorld, +} + +#[derive(Clone, Default)] +pub struct SharedWorld(Arc>>); + +type Sender = WatchSender<()>; +type Receiver = WatchReceiver<()>; + +impl Dispatcher { + pub fn builder() -> Builder { + Builder::default() + } + + pub async fn dispatch(&mut self, world: World) -> Result { + *self.world.borrow_mut() = Some(world); + + match self.sender.send(()) { + Ok(()) => (), + Err(_) => return Err(Error::DispatchSend), + } + + for receiver in &mut self.receivers { + match receiver.changed().await { + Ok(()) => (), + Err(_) => return Err(Error::DispatchReceive), + } + } + + Ok(self.world.borrow_mut().take().unwrap()) + } +} + +impl Deref for SharedWorld { + type Target = RefCell>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +unsafe impl Send for SharedWorld {} +unsafe impl Sync for SharedWorld {} diff --git a/async-ecs/src/dispatcher/task.rs b/async-ecs/src/dispatcher/task.rs new file mode 100644 index 0000000..3761dd7 --- /dev/null +++ b/async-ecs/src/dispatcher/task.rs @@ -0,0 +1,46 @@ +use log::info; + +use super::{BoxedDispatchable, Receiver, Sender, SharedWorld}; + +pub async fn execute( + name: String, + dispatchable: BoxedDispatchable, + sender: Sender, + receivers: Vec, + world: SharedWorld, +) { + info!("System started: {}", &name); + + run(&name, dispatchable, sender, receivers, world).await; + + info!("System finished: {}", &name); +} + +async fn run( + name: &str, + mut dispatchable: BoxedDispatchable, + sender: Sender, + mut receivers: Vec, + world: SharedWorld, +) { + loop { + for receiver in &mut receivers { + match receiver.changed().await { + Ok(()) => (), + Err(_) => return, + } + } + + info!("Run system: {}", &name); + + let world = world.borrow(); + let world = world.as_ref().unwrap(); + + dispatchable.run(world); + + match sender.send(()) { + Ok(()) => (), + Err(_) => return, + } + } +} diff --git a/async-ecs/src/lib.rs b/async-ecs/src/lib.rs index 87ea763..6c088a9 100644 --- a/async-ecs/src/lib.rs +++ b/async-ecs/src/lib.rs @@ -1,5 +1,8 @@ +#![allow(dead_code)] + pub mod access; pub mod component; +pub mod dispatcher; pub mod entity; pub mod error; pub mod misc; @@ -8,6 +11,9 @@ pub mod storage; pub mod system; pub mod world; +pub use access::{ReadStorage, WriteStorage}; +pub use dispatcher::Dispatcher; pub use resource::Resources; pub use storage::VecStorage; +pub use system::System; pub use world::World; diff --git a/async-ecs/src/main.rs b/async-ecs/src/main.rs index 806fb08..023f935 100644 --- a/async-ecs/src/main.rs +++ b/async-ecs/src/main.rs @@ -1,15 +1,29 @@ -use log::info; +use std::io::Error as IoError; +use std::time::{Duration, Instant}; -use async_ecs::{VecStorage, World}; +use async_ecs::{ + dispatcher::Error as DispatcherError, Dispatcher, ReadStorage, System, VecStorage, World, + WriteStorage, +}; use async_ecs_derive::Component; +use log::info; use rand::random; +use thiserror::Error; +use tokio::runtime::Builder; -fn main() { +fn main() -> Result<(), Error> { env_logger::builder() .filter_level(log::LevelFilter::Info) .format_timestamp_nanos() .init(); + Builder::new_multi_thread() + .enable_all() + .build()? + .block_on(run()) +} + +async fn run() -> Result<(), Error> { info!("Application started!"); let mut world = World::default(); @@ -28,12 +42,10 @@ fn main() { info!("World initialized!"); - /* - let mut dispatcher = DispatcherBuilder::new() - .with(Move, "move", &[]) - .with(Accelerate, "accelerate", &["move"]) + let mut dispatcher = Dispatcher::builder() + .with(Move, "move", &[])? + .with(Accelerate, "accelerate", &["move"])? .build(); - dispatcher.setup(&mut world); info!("Setup done!"); @@ -43,7 +55,7 @@ fn main() { let start = Instant::now(); - dispatcher.dispatch(&world); + world = dispatcher.dispatch(world).await?; let end = Instant::now(); @@ -55,7 +67,8 @@ fn main() { let delta = delta / REPEAT_COUNT; info!("Average time per dispatch: {:?}", delta); - */ + + Ok(()) } const ENTITY_COUNT: usize = 100_000; @@ -85,6 +98,15 @@ struct Acceleration { struct Move; struct Accelerate; +#[derive(Debug, Error)] +enum Error { + #[error("IO Error: {0}")] + IoError(IoError), + + #[error("Dispatcher Error: {0}")] + DispatcherError(DispatcherError), +} + impl Position { fn random() -> Self { Self { @@ -111,11 +133,14 @@ impl Acceleration { } } } -/* + impl<'a> System<'a> for Move { type SystemData = (WriteStorage<'a, Position>, ReadStorage<'a, Velocity>); - fn run(&mut self, (mut position, velocity): Self::SystemData) { + fn run(&mut self, (position, velocity): Self::SystemData) { + let _position = position; + let _velocity = velocity; + /* use specs::{prelude::ParallelIterator, ParJoin}; @@ -132,7 +157,10 @@ impl<'a> System<'a> for Move { impl<'a> System<'a> for Accelerate { type SystemData = (WriteStorage<'a, Velocity>, ReadStorage<'a, Acceleration>); - fn run(&mut self, (mut velocity, acceleration): Self::SystemData) { + fn run(&mut self, (velocity, acceleration): Self::SystemData) { + let _velocity = velocity; + let _acceleration = acceleration; + /* use specs::{prelude::ParallelIterator, ParJoin}; @@ -145,4 +173,15 @@ impl<'a> System<'a> for Accelerate { */ } } -*/ + +impl From for Error { + fn from(err: IoError) -> Self { + Self::IoError(err) + } +} + +impl From for Error { + fn from(err: DispatcherError) -> Self { + Self::DispatcherError(err) + } +} diff --git a/async-ecs/src/resource/cell.rs b/async-ecs/src/resource/cell.rs index c5167f5..27a0d1f 100644 --- a/async-ecs/src/resource/cell.rs +++ b/async-ecs/src/resource/cell.rs @@ -121,6 +121,8 @@ impl Cell { } } +unsafe impl Sync for Cell where T: Sync {} + /* Ref */ impl<'a, T> Ref<'a, T> diff --git a/async-ecs/src/resource/mod.rs b/async-ecs/src/resource/mod.rs index fc1bc5e..bc1851e 100644 --- a/async-ecs/src/resource/mod.rs +++ b/async-ecs/src/resource/mod.rs @@ -10,7 +10,7 @@ use mopa::Any; pub trait Resource: Any + Send + Sync + 'static {} -#[derive(Debug, Hash, Eq, PartialEq)] +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct ResourceId(TypeId); impl ResourceId { @@ -21,3 +21,9 @@ impl ResourceId { Self(TypeId::of::()) } } + +impl From for ResourceId { + fn from(id: TypeId) -> Self { + Self(id) + } +} diff --git a/async-ecs/src/system/mod.rs b/async-ecs/src/system/mod.rs index d475f84..79bb413 100644 --- a/async-ecs/src/system/mod.rs +++ b/async-ecs/src/system/mod.rs @@ -1,12 +1,11 @@ -mod accessor; mod system_data; -pub use accessor::{Accessor, AccessorCow}; pub use system_data::{DynamicSystemData, SystemData}; -use crate::world::World; - -use accessor::AccessorType; +use crate::{ + access::{Accessor, AccessorCow, AccessorType}, + world::World, +}; pub trait System<'a> { type SystemData: DynamicSystemData<'a>; diff --git a/async-ecs/src/system/system_data.rs b/async-ecs/src/system/system_data.rs index 27fd1b9..b5959f9 100644 --- a/async-ecs/src/system/system_data.rs +++ b/async-ecs/src/system/system_data.rs @@ -1,8 +1,10 @@ use std::marker::PhantomData; -use crate::{resource::ResourceId, world::World}; - -use super::accessor::{Accessor, StaticAccessor}; +use crate::{ + access::{Accessor, StaticAccessor}, + resource::ResourceId, + world::World, +}; pub trait SystemData<'a> { fn setup(world: &mut World);