diff --git a/async-ecs/Cargo.toml b/async-ecs/Cargo.toml index d348b91..830b201 100644 --- a/async-ecs/Cargo.toml +++ b/async-ecs/Cargo.toml @@ -9,7 +9,7 @@ async-ecs-derive = "0.1" env_logger = "0.8" futures = "0.3" hashbrown = "0.9" -hibitset = "0.6" +hibitset = { version = "0.6", default-features = false } log = "0.4" mopa = "0.2" rand = "0.7" diff --git a/async-ecs/src/dispatcher/builder.rs b/async-ecs/src/dispatcher/builder.rs index 4d41495..dd63478 100644 --- a/async-ecs/src/dispatcher/builder.rs +++ b/async-ecs/src/dispatcher/builder.rs @@ -1,11 +1,17 @@ use std::collections::hash_map::{Entry, HashMap}; use std::fmt::Debug; -use tokio::{spawn, sync::watch::channel}; +use tokio::{ + sync::watch::channel, + task::{spawn as spawn_thread, spawn_local}, +}; use crate::{access::Accessor, resource::ResourceId, system::AsyncSystem}; -use super::{task::execute, BoxedDispatchable, Dispatcher, Error, Receiver, Sender, SharedWorld}; +use super::{ + task::{execute_local, execute_thread}, + Dispatcher, Error, LocalRun, Receiver, Sender, SharedWorld, ThreadRun, +}; pub struct Builder { next_id: SystemId, @@ -15,7 +21,7 @@ pub struct Builder { struct Item { name: String, - system: BoxedDispatchable, + run: RunType, sender: Sender, receiver: Receiver, @@ -26,6 +32,11 @@ struct Item { dependencies: Vec, } +enum RunType { + Thread(ThreadRun), + Local(LocalRun), +} + #[derive(Default, Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] struct SystemId(pub usize); @@ -41,16 +52,23 @@ impl Builder { let (sender, receiver) = channel(()); for (_, item) in self.items.into_iter() { + let run = item.run; let name = item.name; let sender = item.sender; - let system = item.system; let receivers = if item.dependencies.is_empty() { vec![receiver.clone()] } else { item.receivers }; - spawn(execute(name, system, sender, receivers, world.clone())); + match run { + RunType::Thread(run) => { + spawn_thread(execute_thread(name, run, sender, receivers, world.clone())) + } + RunType::Local(run) => { + spawn_local(execute_local(name, run, sender, receivers, world.clone())) + } + }; } Dispatcher { @@ -77,17 +95,76 @@ impl Builder { ) -> Result<&mut Self, Error> where S: for<'s> AsyncSystem<'s> + Send + 'static, + { + self.add_inner( + name, + dependencies, + system.accessor().reads(), + system.accessor().writes(), + |this, id| match this.items.entry(id) { + Entry::Vacant(e) => e.insert(Item::thread(name.into(), system)), + Entry::Occupied(_) => panic!("Item was already created!"), + }, + )?; + + Ok(self) + } + + pub fn with_local( + mut self, + system: S, + name: &str, + dependencies: &[&str], + ) -> Result + where + S: for<'s> AsyncSystem<'s> + 'static, + { + self.add_local(system, name, dependencies)?; + + Ok(self) + } + + pub fn add_local( + &mut self, + system: S, + name: &str, + dependencies: &[&str], + ) -> Result<&mut Self, Error> + where + S: for<'s> AsyncSystem<'s> + 'static, + { + self.add_inner( + name, + dependencies, + system.accessor().reads(), + system.accessor().writes(), + |this, id| match this.items.entry(id) { + Entry::Vacant(e) => e.insert(Item::local(name.into(), system)), + Entry::Occupied(_) => panic!("Item was already created!"), + }, + )?; + + Ok(self) + } + + fn add_inner( + &mut self, + name: &str, + dependencies: &[&str], + mut reads: Vec, + mut writes: Vec, + f: F, + ) -> Result<&mut Self, Error> + where + F: FnOnce(&mut Self, SystemId) -> &mut Item, { let name = name.to_owned(); let id = self.next_id(); - let id = match self.names.entry(name.clone()) { + let id = match self.names.entry(name) { Entry::Vacant(e) => Ok(*e.insert(id)), Entry::Occupied(e) => Err(Error::NameAlreadyRegistered(e.key().into())), }?; - let mut reads = system.accessor().reads(); - let mut writes = system.accessor().writes(); - reads.sort(); writes.sort(); @@ -127,10 +204,7 @@ impl Builder { .map(|id| self.items.get(id).unwrap().receiver.clone()) .collect(); - let item = match self.items.entry(id) { - Entry::Vacant(e) => e.insert(Item::new(name, system)), - Entry::Occupied(_) => panic!("Item was already created!"), - }; + let item = f(self, id); item.reads = reads; item.writes = writes; @@ -206,15 +280,12 @@ impl Default for Builder { } impl Item { - fn new(name: String, system: S) -> Self - where - S: for<'s> AsyncSystem<'s> + Send + 'static, - { + fn new(name: String, run: RunType) -> Self { let (sender, receiver) = channel(()); Self { name, - system: Box::new(system), + run, sender, receiver, @@ -225,6 +296,20 @@ impl Item { dependencies: Vec::new(), } } + + fn thread(name: String, system: S) -> Self + where + S: for<'s> AsyncSystem<'s> + Send + 'static, + { + Self::new(name, RunType::Thread(Box::new(system))) + } + + fn local(name: String, system: S) -> Self + where + S: for<'s> AsyncSystem<'s> + 'static, + { + Self::new(name, RunType::Local(Box::new(system))) + } } #[cfg(test)] diff --git a/async-ecs/src/dispatcher/mod.rs b/async-ecs/src/dispatcher/mod.rs index daa509f..9f18f2e 100644 --- a/async-ecs/src/dispatcher/mod.rs +++ b/async-ecs/src/dispatcher/mod.rs @@ -1,11 +1,11 @@ pub mod builder; -pub mod dispatchable; pub mod error; +pub mod run; pub mod task; pub use builder::Builder; -pub use dispatchable::{BoxedDispatchable, Dispatchable}; pub use error::Error; +pub use run::{LocalRun, Run, ThreadRun}; use std::cell::RefCell; use std::ops::Deref; diff --git a/async-ecs/src/dispatcher/dispatchable.rs b/async-ecs/src/dispatcher/run.rs similarity index 78% rename from async-ecs/src/dispatcher/dispatchable.rs rename to async-ecs/src/dispatcher/run.rs index 7b7cef4..2ccde66 100644 --- a/async-ecs/src/dispatcher/dispatchable.rs +++ b/async-ecs/src/dispatcher/run.rs @@ -8,14 +8,16 @@ use crate::{ world::World, }; +pub type ThreadRun = Box Run<'a> + Send>; +pub type LocalRun = Box Run<'a>>; + pub type BoxedFuture<'a> = Pin + Send + 'a>>; -pub type BoxedDispatchable = Box Dispatchable<'a> + Send>; -pub trait Dispatchable<'a> { +pub trait Run<'a> { fn run(&mut self, world: &'a World) -> BoxedFuture<'a>; } -impl<'a, T> Dispatchable<'a> for T +impl<'a, T> Run<'a> for T where T: AsyncSystem<'a>, >::Future: Send, diff --git a/async-ecs/src/dispatcher/task.rs b/async-ecs/src/dispatcher/task.rs index fd3fd66..559c2eb 100644 --- a/async-ecs/src/dispatcher/task.rs +++ b/async-ecs/src/dispatcher/task.rs @@ -1,23 +1,37 @@ use log::info; -use super::{BoxedDispatchable, Receiver, Sender, SharedWorld}; +use super::{LocalRun, Receiver, Run, Sender, SharedWorld, ThreadRun}; -pub async fn execute( +pub async fn execute_thread( name: String, - dispatchable: BoxedDispatchable, + mut run: ThreadRun, sender: Sender, receivers: Vec, world: SharedWorld, ) { info!("System started: {}", &name); - run(dispatchable, sender, receivers, world).await; + execute_inner(run.as_mut(), sender, receivers, world).await; info!("System finished: {}", &name); } -async fn run( - mut dispatchable: BoxedDispatchable, +pub async fn execute_local( + name: String, + mut run: LocalRun, + sender: Sender, + receivers: Vec, + world: SharedWorld, +) { + info!("System started (local): {}", &name); + + execute_inner(run.as_mut(), sender, receivers, world).await; + + info!("System finished (local): {}", &name); +} + +async fn execute_inner Run<'a> + ?Sized>( + run: &mut R, sender: Sender, mut receivers: Vec, world: SharedWorld, @@ -33,7 +47,7 @@ async fn run( let world = world.borrow(); let world = world.as_ref().unwrap(); - dispatchable.run(world); + run.run(world); match sender.send(()) { Ok(()) => (), diff --git a/async-ecs/src/main.rs b/async-ecs/src/main.rs index 32256c8..9cb1bd3 100644 --- a/async-ecs/src/main.rs +++ b/async-ecs/src/main.rs @@ -9,7 +9,7 @@ use async_ecs_derive::Component; use log::info; use rand::random; use thiserror::Error; -use tokio::runtime::Builder; +use tokio::{runtime::Builder, task::LocalSet}; fn main() -> Result<(), Error> { env_logger::builder() @@ -20,7 +20,7 @@ fn main() -> Result<(), Error> { Builder::new_multi_thread() .enable_all() .build()? - .block_on(run()) + .block_on(async move { LocalSet::new().run_until(run()).await }) } async fn run() -> Result<(), Error> {