Explorar el Código

Implemented local spawned systems

master
Bergmann89 hace 5 años
padre
commit
76c04694aa
Se han modificado 6 ficheros con 134 adiciones y 33 borrados
  1. +1
    -1
      async-ecs/Cargo.toml
  2. +103
    -18
      async-ecs/src/dispatcher/builder.rs
  3. +2
    -2
      async-ecs/src/dispatcher/mod.rs
  4. +5
    -3
      async-ecs/src/dispatcher/run.rs
  5. +21
    -7
      async-ecs/src/dispatcher/task.rs
  6. +2
    -2
      async-ecs/src/main.rs

+ 1
- 1
async-ecs/Cargo.toml Ver fichero

@@ -9,7 +9,7 @@ async-ecs-derive = "0.1"
env_logger = "0.8" env_logger = "0.8"
futures = "0.3" futures = "0.3"
hashbrown = "0.9" hashbrown = "0.9"
hibitset = "0.6"
hibitset = { version = "0.6", default-features = false }
log = "0.4" log = "0.4"
mopa = "0.2" mopa = "0.2"
rand = "0.7" rand = "0.7"


+ 103
- 18
async-ecs/src/dispatcher/builder.rs Ver fichero

@@ -1,11 +1,17 @@
use std::collections::hash_map::{Entry, HashMap}; use std::collections::hash_map::{Entry, HashMap};
use std::fmt::Debug; use std::fmt::Debug;


use tokio::{spawn, sync::watch::channel};
use tokio::{
sync::watch::channel,
task::{spawn as spawn_thread, spawn_local},
};


use crate::{access::Accessor, resource::ResourceId, system::AsyncSystem}; use crate::{access::Accessor, resource::ResourceId, system::AsyncSystem};


use super::{task::execute, BoxedDispatchable, Dispatcher, Error, Receiver, Sender, SharedWorld};
use super::{
task::{execute_local, execute_thread},
Dispatcher, Error, LocalRun, Receiver, Sender, SharedWorld, ThreadRun,
};


pub struct Builder { pub struct Builder {
next_id: SystemId, next_id: SystemId,
@@ -15,7 +21,7 @@ pub struct Builder {


struct Item { struct Item {
name: String, name: String,
system: BoxedDispatchable,
run: RunType,


sender: Sender, sender: Sender,
receiver: Receiver, receiver: Receiver,
@@ -26,6 +32,11 @@ struct Item {
dependencies: Vec<SystemId>, dependencies: Vec<SystemId>,
} }


enum RunType {
Thread(ThreadRun),
Local(LocalRun),
}

#[derive(Default, Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] #[derive(Default, Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
struct SystemId(pub usize); struct SystemId(pub usize);


@@ -41,16 +52,23 @@ impl Builder {
let (sender, receiver) = channel(()); let (sender, receiver) = channel(());


for (_, item) in self.items.into_iter() { for (_, item) in self.items.into_iter() {
let run = item.run;
let name = item.name; let name = item.name;
let sender = item.sender; let sender = item.sender;
let system = item.system;
let receivers = if item.dependencies.is_empty() { let receivers = if item.dependencies.is_empty() {
vec![receiver.clone()] vec![receiver.clone()]
} else { } else {
item.receivers item.receivers
}; };


spawn(execute(name, system, sender, receivers, world.clone()));
match run {
RunType::Thread(run) => {
spawn_thread(execute_thread(name, run, sender, receivers, world.clone()))
}
RunType::Local(run) => {
spawn_local(execute_local(name, run, sender, receivers, world.clone()))
}
};
} }


Dispatcher { Dispatcher {
@@ -77,17 +95,76 @@ impl Builder {
) -> Result<&mut Self, Error> ) -> Result<&mut Self, Error>
where where
S: for<'s> AsyncSystem<'s> + Send + 'static, S: for<'s> AsyncSystem<'s> + Send + 'static,
{
self.add_inner(
name,
dependencies,
system.accessor().reads(),
system.accessor().writes(),
|this, id| match this.items.entry(id) {
Entry::Vacant(e) => e.insert(Item::thread(name.into(), system)),
Entry::Occupied(_) => panic!("Item was already created!"),
},
)?;

Ok(self)
}

pub fn with_local<S>(
mut self,
system: S,
name: &str,
dependencies: &[&str],
) -> Result<Self, Error>
where
S: for<'s> AsyncSystem<'s> + 'static,
{
self.add_local(system, name, dependencies)?;

Ok(self)
}

pub fn add_local<S>(
&mut self,
system: S,
name: &str,
dependencies: &[&str],
) -> Result<&mut Self, Error>
where
S: for<'s> AsyncSystem<'s> + 'static,
{
self.add_inner(
name,
dependencies,
system.accessor().reads(),
system.accessor().writes(),
|this, id| match this.items.entry(id) {
Entry::Vacant(e) => e.insert(Item::local(name.into(), system)),
Entry::Occupied(_) => panic!("Item was already created!"),
},
)?;

Ok(self)
}

fn add_inner<F>(
&mut self,
name: &str,
dependencies: &[&str],
mut reads: Vec<ResourceId>,
mut writes: Vec<ResourceId>,
f: F,
) -> Result<&mut Self, Error>
where
F: FnOnce(&mut Self, SystemId) -> &mut Item,
{ {
let name = name.to_owned(); let name = name.to_owned();
let id = self.next_id(); let id = self.next_id();
let id = match self.names.entry(name.clone()) {
let id = match self.names.entry(name) {
Entry::Vacant(e) => Ok(*e.insert(id)), Entry::Vacant(e) => Ok(*e.insert(id)),
Entry::Occupied(e) => Err(Error::NameAlreadyRegistered(e.key().into())), Entry::Occupied(e) => Err(Error::NameAlreadyRegistered(e.key().into())),
}?; }?;


let mut reads = system.accessor().reads();
let mut writes = system.accessor().writes();

reads.sort(); reads.sort();
writes.sort(); writes.sort();


@@ -127,10 +204,7 @@ impl Builder {
.map(|id| self.items.get(id).unwrap().receiver.clone()) .map(|id| self.items.get(id).unwrap().receiver.clone())
.collect(); .collect();


let item = match self.items.entry(id) {
Entry::Vacant(e) => e.insert(Item::new(name, system)),
Entry::Occupied(_) => panic!("Item was already created!"),
};
let item = f(self, id);


item.reads = reads; item.reads = reads;
item.writes = writes; item.writes = writes;
@@ -206,15 +280,12 @@ impl Default for Builder {
} }


impl Item { impl Item {
fn new<S>(name: String, system: S) -> Self
where
S: for<'s> AsyncSystem<'s> + Send + 'static,
{
fn new(name: String, run: RunType) -> Self {
let (sender, receiver) = channel(()); let (sender, receiver) = channel(());


Self { Self {
name, name,
system: Box::new(system),
run,


sender, sender,
receiver, receiver,
@@ -225,6 +296,20 @@ impl Item {
dependencies: Vec::new(), dependencies: Vec::new(),
} }
} }

fn thread<S>(name: String, system: S) -> Self
where
S: for<'s> AsyncSystem<'s> + Send + 'static,
{
Self::new(name, RunType::Thread(Box::new(system)))
}

fn local<S>(name: String, system: S) -> Self
where
S: for<'s> AsyncSystem<'s> + 'static,
{
Self::new(name, RunType::Local(Box::new(system)))
}
} }


#[cfg(test)] #[cfg(test)]


+ 2
- 2
async-ecs/src/dispatcher/mod.rs Ver fichero

@@ -1,11 +1,11 @@
pub mod builder; pub mod builder;
pub mod dispatchable;
pub mod error; pub mod error;
pub mod run;
pub mod task; pub mod task;


pub use builder::Builder; pub use builder::Builder;
pub use dispatchable::{BoxedDispatchable, Dispatchable};
pub use error::Error; pub use error::Error;
pub use run::{LocalRun, Run, ThreadRun};


use std::cell::RefCell; use std::cell::RefCell;
use std::ops::Deref; use std::ops::Deref;


async-ecs/src/dispatcher/dispatchable.rs → async-ecs/src/dispatcher/run.rs Ver fichero

@@ -8,14 +8,16 @@ use crate::{
world::World, world::World,
}; };


pub type ThreadRun = Box<dyn for<'a> Run<'a> + Send>;
pub type LocalRun = Box<dyn for<'a> Run<'a>>;

pub type BoxedFuture<'a> = Pin<Box<dyn Future<Output = ()> + Send + 'a>>; pub type BoxedFuture<'a> = Pin<Box<dyn Future<Output = ()> + Send + 'a>>;
pub type BoxedDispatchable = Box<dyn for<'a> Dispatchable<'a> + Send>;


pub trait Dispatchable<'a> {
pub trait Run<'a> {
fn run(&mut self, world: &'a World) -> BoxedFuture<'a>; fn run(&mut self, world: &'a World) -> BoxedFuture<'a>;
} }


impl<'a, T> Dispatchable<'a> for T
impl<'a, T> Run<'a> for T
where where
T: AsyncSystem<'a>, T: AsyncSystem<'a>,
<T as AsyncSystem<'a>>::Future: Send, <T as AsyncSystem<'a>>::Future: Send,

+ 21
- 7
async-ecs/src/dispatcher/task.rs Ver fichero

@@ -1,23 +1,37 @@
use log::info; use log::info;


use super::{BoxedDispatchable, Receiver, Sender, SharedWorld};
use super::{LocalRun, Receiver, Run, Sender, SharedWorld, ThreadRun};


pub async fn execute(
pub async fn execute_thread(
name: String, name: String,
dispatchable: BoxedDispatchable,
mut run: ThreadRun,
sender: Sender, sender: Sender,
receivers: Vec<Receiver>, receivers: Vec<Receiver>,
world: SharedWorld, world: SharedWorld,
) { ) {
info!("System started: {}", &name); info!("System started: {}", &name);


run(dispatchable, sender, receivers, world).await;
execute_inner(run.as_mut(), sender, receivers, world).await;


info!("System finished: {}", &name); info!("System finished: {}", &name);
} }


async fn run(
mut dispatchable: BoxedDispatchable,
pub async fn execute_local(
name: String,
mut run: LocalRun,
sender: Sender,
receivers: Vec<Receiver>,
world: SharedWorld,
) {
info!("System started (local): {}", &name);

execute_inner(run.as_mut(), sender, receivers, world).await;

info!("System finished (local): {}", &name);
}

async fn execute_inner<R: for<'a> Run<'a> + ?Sized>(
run: &mut R,
sender: Sender, sender: Sender,
mut receivers: Vec<Receiver>, mut receivers: Vec<Receiver>,
world: SharedWorld, world: SharedWorld,
@@ -33,7 +47,7 @@ async fn run(
let world = world.borrow(); let world = world.borrow();
let world = world.as_ref().unwrap(); let world = world.as_ref().unwrap();


dispatchable.run(world);
run.run(world);


match sender.send(()) { match sender.send(()) {
Ok(()) => (), Ok(()) => (),


+ 2
- 2
async-ecs/src/main.rs Ver fichero

@@ -9,7 +9,7 @@ use async_ecs_derive::Component;
use log::info; use log::info;
use rand::random; use rand::random;
use thiserror::Error; use thiserror::Error;
use tokio::runtime::Builder;
use tokio::{runtime::Builder, task::LocalSet};


fn main() -> Result<(), Error> { fn main() -> Result<(), Error> {
env_logger::builder() env_logger::builder()
@@ -20,7 +20,7 @@ fn main() -> Result<(), Error> {
Builder::new_multi_thread() Builder::new_multi_thread()
.enable_all() .enable_all()
.build()? .build()?
.block_on(run())
.block_on(async move { LocalSet::new().run_until(run()).await })
} }


async fn run() -> Result<(), Error> { async fn run() -> Result<(), Error> {


Cargando…
Cancelar
Guardar