浏览代码

Implemented dispatcher

master
Bergmann89 5 年前
父节点
当前提交
5d92798d59
共有 14 个文件被更改,包括 632 次插入79 次删除
  1. +1
    -0
      async-ecs/Cargo.toml
  2. +28
    -56
      async-ecs/src/access/accessor.rs
  3. +2
    -0
      async-ecs/src/access/mod.rs
  4. +376
    -0
      async-ecs/src/dispatcher/builder.rs
  5. +21
    -0
      async-ecs/src/dispatcher/dispatchable.rs
  6. +18
    -0
      async-ecs/src/dispatcher/error.rs
  7. +63
    -0
      async-ecs/src/dispatcher/mod.rs
  8. +46
    -0
      async-ecs/src/dispatcher/task.rs
  9. +6
    -0
      async-ecs/src/lib.rs
  10. +53
    -14
      async-ecs/src/main.rs
  11. +2
    -0
      async-ecs/src/resource/cell.rs
  12. +7
    -1
      async-ecs/src/resource/mod.rs
  13. +4
    -5
      async-ecs/src/system/mod.rs
  14. +5
    -3
      async-ecs/src/system/system_data.rs

+ 1
- 0
async-ecs/Cargo.toml 查看文件

@@ -13,3 +13,4 @@ log = "0.4"
mopa = "0.2"
rand = "0.7"
thiserror = "1.0"
tokio = { version = "0.3", features = [ "full", "net", "time", "rt-multi-thread" ] }

async-ecs/src/system/accessor.rs → async-ecs/src/access/accessor.rs 查看文件

@@ -1,16 +1,28 @@
use std::marker::PhantomData;
use std::ops::Deref;

use crate::resource::ResourceId;

use super::{DynamicSystemData, System, SystemData};
use crate::{
resource::ResourceId,
system::{DynamicSystemData, System, SystemData},
};

pub trait Accessor: Sized {
fn try_new() -> Option<Self>;
fn reads(&self) -> Vec<ResourceId> {
Vec::new()
}

fn reads(&self) -> Vec<ResourceId>;
fn writes(&self) -> Vec<ResourceId> {
Vec::new()
}

fn writes(&self) -> Vec<ResourceId>;
fn try_new() -> Option<Self> {
None
}
}

#[derive(Default)]
pub struct StaticAccessor<T> {
marker: PhantomData<fn() -> T>,
}

pub enum AccessorCow<'a, 'b, T>
@@ -19,49 +31,30 @@ where
T: System<'a> + ?Sized,
'a: 'b,
{
/// A reference to an accessor.
Ref(&'b AccessorType<'a, T>),
/// An owned accessor.
Borrow(&'b AccessorType<'a, T>),
Owned(AccessorType<'a, T>),
}

#[derive(Default)]
pub struct StaticAccessor<T> {
marker: PhantomData<fn() -> T>,
}

pub type AccessorType<'a, T> = <<T as System<'a>>::SystemData as DynamicSystemData<'a>>::Accessor;

/* Accessor */

impl Accessor for () {
fn try_new() -> Option<Self> {
None
}

fn reads(&self) -> Vec<ResourceId> {
Vec::new()
}

fn writes(&self) -> Vec<ResourceId> {
Vec::new()
}
}
/* StaticAccessor */

impl<T> Accessor for PhantomData<T>
impl<'a, T> Accessor for StaticAccessor<T>
where
T: ?Sized,
T: SystemData<'a>,
{
fn try_new() -> Option<Self> {
None
Some(StaticAccessor {
marker: PhantomData,
})
}

fn reads(&self) -> Vec<ResourceId> {
Vec::new()
T::reads()
}

fn writes(&self) -> Vec<ResourceId> {
Vec::new()
T::writes()
}
}

@@ -77,29 +70,8 @@ where

fn deref(&self) -> &AccessorType<'a, T> {
match *self {
AccessorCow::Ref(r) => &*r,
AccessorCow::Borrow(r) => &*r,
AccessorCow::Owned(ref o) => o,
}
}
}

/* StaticAccessor */

impl<'a, T> Accessor for StaticAccessor<T>
where
T: SystemData<'a>,
{
fn try_new() -> Option<Self> {
Some(StaticAccessor {
marker: PhantomData,
})
}

fn reads(&self) -> Vec<ResourceId> {
T::reads()
}

fn writes(&self) -> Vec<ResourceId> {
T::writes()
}
}

+ 2
- 0
async-ecs/src/access/mod.rs 查看文件

@@ -1,8 +1,10 @@
pub mod accessor;
pub mod read;
pub mod read_storage;
pub mod write;
pub mod write_storage;

pub use accessor::{Accessor, AccessorCow, AccessorType, StaticAccessor};
pub use read::Read;
pub use read_storage::ReadStorage;
pub use write::Write;


+ 376
- 0
async-ecs/src/dispatcher/builder.rs 查看文件

@@ -0,0 +1,376 @@
use std::collections::hash_map::{Entry, HashMap};
use std::fmt::Debug;

use tokio::{spawn, sync::watch::channel};

use crate::{access::Accessor, resource::ResourceId, system::System};

use super::{task::execute, BoxedDispatchable, Dispatcher, Error, Receiver, Sender, SharedWorld};

pub struct Builder {
next_id: SystemId,
items: HashMap<SystemId, Item>,
names: HashMap<String, SystemId>,
}

struct Item {
name: String,
system: BoxedDispatchable,

sender: Sender,
receiver: Receiver,
receivers: Vec<Receiver>,

reads: Vec<ResourceId>,
writes: Vec<ResourceId>,
dependencies: Vec<SystemId>,
}

#[derive(Default, Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
struct SystemId(pub usize);

impl Builder {
pub fn build(self) -> Dispatcher {
let receivers = self
.final_systems()
.into_iter()
.map(|id| self.items.get(&id).unwrap().receiver.clone())
.collect();

let world = SharedWorld::default();
let (sender, receiver) = channel(());

for (_, item) in self.items.into_iter() {
let name = item.name;
let sender = item.sender;
let system = item.system;
let receivers = if item.dependencies.is_empty() {
vec![receiver.clone()]
} else {
item.receivers
};

spawn(execute(name, system, sender, receivers, world.clone()));
}

Dispatcher {
sender,
receivers,
world,
}
}

pub fn with<S>(mut self, system: S, name: &str, dependencies: &[&str]) -> Result<Self, Error>
where
S: for<'s> System<'s> + Send + 'static,
{
self.add(system, name, dependencies)?;

Ok(self)
}

pub fn add<S>(
&mut self,
system: S,
name: &str,
dependencies: &[&str],
) -> Result<&mut Self, Error>
where
S: for<'s> System<'s> + Send + 'static,
{
let name = name.to_owned();
let id = self.next_id();
let id = match self.names.entry(name.clone()) {
Entry::Vacant(e) => Ok(*e.insert(id)),
Entry::Occupied(e) => Err(Error::NameAlreadyRegistered(e.key().into())),
}?;

let mut reads = system.accessor().reads();
let mut writes = system.accessor().writes();

reads.sort();
writes.sort();

reads.dedup();
writes.dedup();

let mut dependencies = dependencies
.iter()
.map(|name| {
self.names
.get(*name)
.map(Clone::clone)
.ok_or_else(|| Error::DependencyWasNotFound((*name).into()))
})
.collect::<Result<Vec<_>, _>>()?;

for read in &reads {
for (key, value) in &self.items {
if value.writes.contains(read) {
dependencies.push(*key);
}
}
}

for write in &writes {
for (key, value) in &self.items {
if value.reads.contains(write) || value.writes.contains(write) {
dependencies.push(*key);
}
}
}

self.reduce_dependencies(&mut dependencies);

let receivers = dependencies
.iter()
.map(|id| self.items.get(id).unwrap().receiver.clone())
.collect();

let item = match self.items.entry(id) {
Entry::Vacant(e) => e.insert(Item::new(name, system)),
Entry::Occupied(_) => panic!("Item was already created!"),
};

item.reads = reads;
item.writes = writes;
item.receivers = receivers;
item.dependencies = dependencies;

Ok(self)
}

fn final_systems(&self) -> Vec<SystemId> {
let mut ret = self.items.keys().map(Clone::clone).collect();

self.reduce_dependencies(&mut ret);

ret
}

fn reduce_dependencies(&self, dependencies: &mut Vec<SystemId>) {
dependencies.sort();
dependencies.dedup();

let mut remove_indices = Vec::new();
for (i, a) in dependencies.iter().enumerate() {
for (j, b) in dependencies.iter().enumerate() {
if self.depends_on(a, b) {
remove_indices.push(j);
} else if self.depends_on(b, a) {
remove_indices.push(i);
}
}
}

remove_indices.sort();
remove_indices.dedup();
remove_indices.reverse();

for i in remove_indices {
dependencies.remove(i);
}
}

fn depends_on(&self, a: &SystemId, b: &SystemId) -> bool {
let item = self.items.get(a).unwrap();

if item.dependencies.contains(b) {
return true;
}

for d in &item.dependencies {
if self.depends_on(d, b) {
return true;
}
}

false
}

fn next_id(&mut self) -> SystemId {
self.next_id.0 += 1;

self.next_id
}
}

impl Default for Builder {
fn default() -> Self {
Self {
next_id: SystemId(0),
items: HashMap::new(),
names: HashMap::new(),
}
}
}

impl Item {
fn new<S>(name: String, system: S) -> Self
where
S: for<'s> System<'s> + Send + 'static,
{
let (sender, receiver) = channel(());

Self {
name,
system: Box::new(system),

sender,
receiver,
receivers: Vec::new(),

reads: Vec::new(),
writes: Vec::new(),
dependencies: Vec::new(),
}
}
}

#[cfg(test)]
mod tests {
use super::*;

use crate::{access::AccessorCow, system::DynamicSystemData, world::World};

#[test]
fn dependencies_on_read_and_write() {
/*
- Systems ------------------------------------
Id: 1 2 3 4 5

- Resources ----------------------------------
Read: A A B C A
Write: B C D D BCD

- Dependencies Total -------------------------
| | | | |
|<--------------| | |
| | | | |
| |<--------------| |
| | |<------| |
| | | | |
|<------------------------------|
| |<----------------------|
| | |<--------------|
| | | |<------|
| | | | |

- Dependencies Reduced -----------------------
| | | | |
|<--------------| | |
| | | | |
| |<--------------| |
| | |<------| |
| | | | |
| | | |<------|
| | | | |
*/

struct ResA;
struct ResB;
struct ResC;
struct ResD;

let sys1 = TestSystem::new(
vec![ResourceId::new::<ResA>()],
vec![ResourceId::new::<ResB>()],
);
let sys2 = TestSystem::new(
vec![ResourceId::new::<ResA>()],
vec![ResourceId::new::<ResC>()],
);
let sys3 = TestSystem::new(
vec![ResourceId::new::<ResB>()],
vec![ResourceId::new::<ResD>()],
);
let sys4 = TestSystem::new(
vec![ResourceId::new::<ResC>()],
vec![ResourceId::new::<ResD>()],
);
let sys5 = TestSystem::new(
vec![ResourceId::new::<ResA>()],
vec![
ResourceId::new::<ResB>(),
ResourceId::new::<ResC>(),
ResourceId::new::<ResD>(),
],
);

let dispatcher = Dispatcher::builder()
.with(sys1, "sys1", &[])
.unwrap()
.with(sys2, "sys2", &[])
.unwrap()
.with(sys3, "sys3", &[])
.unwrap()
.with(sys4, "sys4", &[])
.unwrap()
.with(sys5, "sys5", &[])
.unwrap();

let sys1 = dispatcher.items.get(&SystemId(1)).unwrap();
let sys2 = dispatcher.items.get(&SystemId(2)).unwrap();
let sys3 = dispatcher.items.get(&SystemId(3)).unwrap();
let sys4 = dispatcher.items.get(&SystemId(4)).unwrap();
let sys5 = dispatcher.items.get(&SystemId(5)).unwrap();

assert_eq!(sys1.dependencies, vec![]);
assert_eq!(sys2.dependencies, vec![]);
assert_eq!(sys3.dependencies, vec![SystemId(1)]);
assert_eq!(sys4.dependencies, vec![SystemId(2), SystemId(3)]);
assert_eq!(sys5.dependencies, vec![SystemId(4)]);
assert_eq!(dispatcher.final_systems(), vec![SystemId(5)]);
}

struct TestSystem {
accessor: TestAccessor,
}

impl TestSystem {
fn new(reads: Vec<ResourceId>, writes: Vec<ResourceId>) -> Self {
Self {
accessor: TestAccessor { reads, writes },
}
}
}

impl<'a> System<'a> for TestSystem {
type SystemData = TestData;

fn run(&mut self, _data: Self::SystemData) {
unimplemented!()
}

fn accessor<'b>(&'b self) -> AccessorCow<'a, 'b, Self> {
AccessorCow::Borrow(&self.accessor)
}
}

struct TestData;

impl<'a> DynamicSystemData<'a> for TestData {
type Accessor = TestAccessor;

fn setup(_accessor: &Self::Accessor, _world: &mut World) {}

fn fetch(_access: &Self::Accessor, _world: &'a World) -> Self {
TestData
}
}

struct TestAccessor {
reads: Vec<ResourceId>,
writes: Vec<ResourceId>,
}

impl Accessor for TestAccessor {
fn reads(&self) -> Vec<ResourceId> {
self.reads.clone()
}

fn writes(&self) -> Vec<ResourceId> {
self.writes.clone()
}
}
}

+ 21
- 0
async-ecs/src/dispatcher/dispatchable.rs 查看文件

@@ -0,0 +1,21 @@
use crate::{
system::{DynamicSystemData, System},
world::World,
};

pub type BoxedDispatchable = Box<dyn for<'a> Dispatchable<'a> + Send>;

pub trait Dispatchable<'a> {
fn run(&mut self, world: &'a World);
}

impl<'a, T> Dispatchable<'a> for T
where
T: System<'a>,
{
fn run(&mut self, world: &'a World) {
let data = T::SystemData::fetch(&self.accessor(), world);

self.run(data);
}
}

+ 18
- 0
async-ecs/src/dispatcher/error.rs 查看文件

@@ -0,0 +1,18 @@
use std::fmt::Debug;

use thiserror::Error;

#[derive(Error, Debug)]
pub enum Error {
#[error("A System with this name was already registered: {0}!")]
NameAlreadyRegistered(String),

#[error("Dependency of the given system was not found: {0}!")]
DependencyWasNotFound(String),

#[error("Unable to start dispatching!")]
DispatchSend,

#[error("Unable to wait for systems to finish!")]
DispatchReceive,
}

+ 63
- 0
async-ecs/src/dispatcher/mod.rs 查看文件

@@ -0,0 +1,63 @@
pub mod builder;
pub mod dispatchable;
pub mod error;
pub mod task;

pub use builder::Builder;
pub use dispatchable::{BoxedDispatchable, Dispatchable};
pub use error::Error;

use std::cell::RefCell;
use std::ops::Deref;
use std::sync::Arc;

use tokio::sync::watch::{Receiver as WatchReceiver, Sender as WatchSender};

use crate::world::World;

pub struct Dispatcher {
sender: Sender,
receivers: Vec<Receiver>,
world: SharedWorld,
}

#[derive(Clone, Default)]
pub struct SharedWorld(Arc<RefCell<Option<World>>>);

type Sender = WatchSender<()>;
type Receiver = WatchReceiver<()>;

impl Dispatcher {
pub fn builder() -> Builder {
Builder::default()
}

pub async fn dispatch(&mut self, world: World) -> Result<World, Error> {
*self.world.borrow_mut() = Some(world);

match self.sender.send(()) {
Ok(()) => (),
Err(_) => return Err(Error::DispatchSend),
}

for receiver in &mut self.receivers {
match receiver.changed().await {
Ok(()) => (),
Err(_) => return Err(Error::DispatchReceive),
}
}

Ok(self.world.borrow_mut().take().unwrap())
}
}

impl Deref for SharedWorld {
type Target = RefCell<Option<World>>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

unsafe impl Send for SharedWorld {}
unsafe impl Sync for SharedWorld {}

+ 46
- 0
async-ecs/src/dispatcher/task.rs 查看文件

@@ -0,0 +1,46 @@
use log::info;

use super::{BoxedDispatchable, Receiver, Sender, SharedWorld};

pub async fn execute(
name: String,
dispatchable: BoxedDispatchable,
sender: Sender,
receivers: Vec<Receiver>,
world: SharedWorld,
) {
info!("System started: {}", &name);

run(&name, dispatchable, sender, receivers, world).await;

info!("System finished: {}", &name);
}

async fn run(
name: &str,
mut dispatchable: BoxedDispatchable,
sender: Sender,
mut receivers: Vec<Receiver>,
world: SharedWorld,
) {
loop {
for receiver in &mut receivers {
match receiver.changed().await {
Ok(()) => (),
Err(_) => return,
}
}

info!("Run system: {}", &name);

let world = world.borrow();
let world = world.as_ref().unwrap();

dispatchable.run(world);

match sender.send(()) {
Ok(()) => (),
Err(_) => return,
}
}
}

+ 6
- 0
async-ecs/src/lib.rs 查看文件

@@ -1,5 +1,8 @@
#![allow(dead_code)]

pub mod access;
pub mod component;
pub mod dispatcher;
pub mod entity;
pub mod error;
pub mod misc;
@@ -8,6 +11,9 @@ pub mod storage;
pub mod system;
pub mod world;

pub use access::{ReadStorage, WriteStorage};
pub use dispatcher::Dispatcher;
pub use resource::Resources;
pub use storage::VecStorage;
pub use system::System;
pub use world::World;

+ 53
- 14
async-ecs/src/main.rs 查看文件

@@ -1,15 +1,29 @@
use log::info;
use std::io::Error as IoError;
use std::time::{Duration, Instant};

use async_ecs::{VecStorage, World};
use async_ecs::{
dispatcher::Error as DispatcherError, Dispatcher, ReadStorage, System, VecStorage, World,
WriteStorage,
};
use async_ecs_derive::Component;
use log::info;
use rand::random;
use thiserror::Error;
use tokio::runtime::Builder;

fn main() {
fn main() -> Result<(), Error> {
env_logger::builder()
.filter_level(log::LevelFilter::Info)
.format_timestamp_nanos()
.init();

Builder::new_multi_thread()
.enable_all()
.build()?
.block_on(run())
}

async fn run() -> Result<(), Error> {
info!("Application started!");

let mut world = World::default();
@@ -28,12 +42,10 @@ fn main() {

info!("World initialized!");

/*
let mut dispatcher = DispatcherBuilder::new()
.with(Move, "move", &[])
.with(Accelerate, "accelerate", &["move"])
let mut dispatcher = Dispatcher::builder()
.with(Move, "move", &[])?
.with(Accelerate, "accelerate", &["move"])?
.build();
dispatcher.setup(&mut world);

info!("Setup done!");

@@ -43,7 +55,7 @@ fn main() {

let start = Instant::now();

dispatcher.dispatch(&world);
world = dispatcher.dispatch(world).await?;

let end = Instant::now();

@@ -55,7 +67,8 @@ fn main() {
let delta = delta / REPEAT_COUNT;

info!("Average time per dispatch: {:?}", delta);
*/

Ok(())
}

const ENTITY_COUNT: usize = 100_000;
@@ -85,6 +98,15 @@ struct Acceleration {
struct Move;
struct Accelerate;

#[derive(Debug, Error)]
enum Error {
#[error("IO Error: {0}")]
IoError(IoError),

#[error("Dispatcher Error: {0}")]
DispatcherError(DispatcherError),
}

impl Position {
fn random() -> Self {
Self {
@@ -111,11 +133,14 @@ impl Acceleration {
}
}
}
/*
impl<'a> System<'a> for Move {
type SystemData = (WriteStorage<'a, Position>, ReadStorage<'a, Velocity>);

fn run(&mut self, (mut position, velocity): Self::SystemData) {
fn run(&mut self, (position, velocity): Self::SystemData) {
let _position = position;
let _velocity = velocity;

/*
use specs::{prelude::ParallelIterator, ParJoin};

@@ -132,7 +157,10 @@ impl<'a> System<'a> for Move {
impl<'a> System<'a> for Accelerate {
type SystemData = (WriteStorage<'a, Velocity>, ReadStorage<'a, Acceleration>);

fn run(&mut self, (mut velocity, acceleration): Self::SystemData) {
fn run(&mut self, (velocity, acceleration): Self::SystemData) {
let _velocity = velocity;
let _acceleration = acceleration;

/*
use specs::{prelude::ParallelIterator, ParJoin};

@@ -145,4 +173,15 @@ impl<'a> System<'a> for Accelerate {
*/
}
}
*/

impl From<IoError> for Error {
fn from(err: IoError) -> Self {
Self::IoError(err)
}
}

impl From<DispatcherError> for Error {
fn from(err: DispatcherError) -> Self {
Self::DispatcherError(err)
}
}

+ 2
- 0
async-ecs/src/resource/cell.rs 查看文件

@@ -121,6 +121,8 @@ impl<T> Cell<T> {
}
}

unsafe impl<T> Sync for Cell<T> where T: Sync {}

/* Ref */

impl<'a, T> Ref<'a, T>


+ 7
- 1
async-ecs/src/resource/mod.rs 查看文件

@@ -10,7 +10,7 @@ use mopa::Any;

pub trait Resource: Any + Send + Sync + 'static {}

#[derive(Debug, Hash, Eq, PartialEq)]
#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub struct ResourceId(TypeId);

impl ResourceId {
@@ -21,3 +21,9 @@ impl ResourceId {
Self(TypeId::of::<R>())
}
}

impl From<TypeId> for ResourceId {
fn from(id: TypeId) -> Self {
Self(id)
}
}

+ 4
- 5
async-ecs/src/system/mod.rs 查看文件

@@ -1,12 +1,11 @@
mod accessor;
mod system_data;

pub use accessor::{Accessor, AccessorCow};
pub use system_data::{DynamicSystemData, SystemData};

use crate::world::World;

use accessor::AccessorType;
use crate::{
access::{Accessor, AccessorCow, AccessorType},
world::World,
};

pub trait System<'a> {
type SystemData: DynamicSystemData<'a>;


+ 5
- 3
async-ecs/src/system/system_data.rs 查看文件

@@ -1,8 +1,10 @@
use std::marker::PhantomData;

use crate::{resource::ResourceId, world::World};

use super::accessor::{Accessor, StaticAccessor};
use crate::{
access::{Accessor, StaticAccessor},
resource::ResourceId,
world::World,
};

pub trait SystemData<'a> {
fn setup(world: &mut World);


正在加载...
取消
保存