Compare commits

...

5 Commits
master ... wip

44 changed files with 1182 additions and 197 deletions
Split View
  1. +3
    -0
      TODO
  2. +22
    -4
      include/ecs/context/base.h
  3. +23
    -8
      include/ecs/context/base.inl
  4. +76
    -1
      include/ecs/context/context.h
  5. +1
    -0
      include/ecs/context/defer_proxy.h
  6. +7
    -0
      include/ecs/core/entity/storage/base.h
  7. +8
    -1
      include/ecs/core/entity/storage/base.inl
  8. +2
    -0
      include/ecs/core/system/data_proxy.h
  9. +112
    -0
      include/ecs/core/system/data_proxy/multi.h
  10. +96
    -0
      include/ecs/core/system/data_proxy/multi.inl
  11. +3
    -3
      include/ecs/core/system/data_proxy/single.h
  12. +4
    -4
      include/ecs/core/system/data_proxy/single.inl
  13. +39
    -30
      include/ecs/core/system/instance.h
  14. +1
    -1
      include/ecs/core/system/parallelism/composer/fixed_threshold.h
  15. +28
    -2
      include/ecs/core/system/parallelism/strategy/none.h
  16. +66
    -0
      include/ecs/core/system/parallelism/strategy/split_base.h
  17. +8
    -2
      include/ecs/core/system/parallelism/strategy/split_evenly.h
  18. +8
    -2
      include/ecs/core/system/parallelism/strategy/split_every.h
  19. +42
    -4
      include/ecs/core/system/scheduler/atomic_counter.h
  20. +111
    -16
      include/ecs/core/system/scheduler/atomic_counter.inl
  21. +2
    -2
      include/ecs/core/system/storage/tuple.h
  22. +20
    -1
      include/ecs/core/utils/bitset.h
  23. +22
    -3
      include/ecs/core/utils/bitset.inl
  24. +18
    -5
      include/ecs/core/utils/counter_blocker.h
  25. +14
    -1
      include/ecs/core/utils/counter_blocker.inl
  26. +4
    -1
      include/ecs/core/utils/ordered_vector.h
  27. +2
    -1
      include/ecs/core/utils/thread_pool.h
  28. +8
    -6
      include/ecs/core/utils/thread_pool/pool.h
  29. +18
    -2
      include/ecs/core/utils/thread_pool/pool.inl
  30. +68
    -0
      include/ecs/core/utils/thread_pool/task_queue.h
  31. +15
    -0
      include/ecs/core/utils/thread_pool/task_queue.inl
  32. +0
    -22
      include/ecs/core/utils/thread_pool/types.h
  33. +37
    -5
      include/ecs/core/utils/thread_pool/worker.h
  34. +43
    -3
      include/ecs/core/utils/thread_pool/worker.inl
  35. +2
    -0
      include/ecs/signature/system/parallelism/strategie.h
  36. +36
    -0
      include/ecs/signature/system/parallelism/strategy/bound.h
  37. +36
    -0
      include/ecs/signature/system/parallelism/strategy/main.h
  38. +8
    -2
      include/ecs/signature/system/parallelism/strategy/none.h
  39. +1
    -1
      include/ecs/signature/system/parallelism/strategy/split_every.h
  40. +1
    -1
      include/ecs/signature/system/signature.h
  41. +8
    -17
      src/core/utils/thread_pool/pool.cpp
  42. +38
    -0
      src/core/utils/thread_pool/task_queue.cpp
  43. +15
    -2
      src/core/utils/thread_pool/worker.cpp
  44. +106
    -44
      test/dummy.cpp

+ 3
- 0
TODO View File

@@ -0,0 +1,3 @@
- check if the thread pool can deadlock if all threads are waiting for their counter_blocker
- check if we could implement some kind of progress_messages for the worker threads while waiting for counter_blocker
- check if the conditional variables are used correctly inside counter_blockers

+ 22
- 4
include/ecs/context/base.h View File

@@ -3,6 +3,7 @@
#include <ecs/config.h>
#include <ecs/core/utils/thread_pool.h>
#include <ecs/core/utils/counter_blocker.h>
#include <ecs/core/utils/ordered_vector.h>
#include <ecs/core/component/manager.h>

#include "./context.fwd.h"
@@ -44,7 +45,7 @@ namespace context {
using handle_type = entity_handle_type;

private:
using handle_vector_type = std::vector<handle_type>;
using handle_set_type = core::utils::ordered_vector<handle_type>;

protected:
context_type& _context; //!< reference to its own child class
@@ -56,8 +57,8 @@ namespace context {

core::utils::thread_pool _thread_pool; //!< thread pool to execute tasks parallel

handle_vector_type _to_match; //!< handles of entities that were created in the last tick
handle_vector_type _to_kill; //!< handles of entities that were destroyed in the last tick
handle_set_type _to_match; //!< handles of entities that were created in the last tick
handle_set_type _to_kill; //!< handles of entities that were destroyed in the last tick

public:

@@ -76,9 +77,10 @@ namespace context {
* @tparam T_func type of the function to execute
*
* @param func function to execute
* @param thread_id ID of the thread to add task to
*/
template<typename T_func>
inline void _post_in_thread_pool(T_func&& func);
inline void _post_in_thread_pool(T_func&& func, ssize_t thread_id = -1);

protected: /* entity */

@@ -111,6 +113,22 @@ namespace context {
*/
inline bool _is_alive(const handle_type& handle) const;

/**
* destroy the passed entity
*
* @param handle handle of the entity to destroy
*/
inline void _destroy_entity(const handle_type& handle);

/**
* get the meta data for the passed entity
*
* @param handle handle to get meta data for
*
* @return meta data of the passed entity
*/
inline const auto& _entity_meta_data(const handle_type& handle);

protected: /* component */

/**


+ 23
- 8
include/ecs/context/base.inl View File

@@ -26,9 +26,9 @@ namespace detail {
template<typename T_settings>
template<typename T_func>
inline void base_t<T_settings>
::_post_in_thread_pool(T_func&& func)
::_post_in_thread_pool(T_func&& func, ssize_t thread_id)
{
_thread_pool.post(std::forward<T_func>(func));
_thread_pool.post(std::forward<T_func>(func), thread_id);
}

/* entity */
@@ -38,15 +38,16 @@ namespace detail {
inline decltype(auto) base_t<T_settings>
::_create_entity(T_args&&... args)
{
_to_match.emplace_back(_entities.claim(std::forward<T_args>(args)...));
return _to_match.back();
auto handle = _entities.claim(std::forward<T_args>(args)...);
_to_match.insert(handle);
return handle;
}

template<typename T_settings>
inline void base_t<T_settings>
::_kill_entity(const handle_type& handle)
{
_to_kill.emplace_back(handle);
_to_kill.insert(handle);
}

template<typename T_settings>
@@ -56,6 +57,20 @@ namespace detail {
return _entities.is_valid(handle);
}

template<typename T_settings>
inline void base_t<T_settings>
::_destroy_entity(const handle_type& handle)
{
_entities.reclaim(handle);
}

template<typename T_settings>
inline const auto& base_t<T_settings>
::_entity_meta_data(const handle_type& handle)
{
return _entities.meta_data(handle);
}

/* components */

template<typename T_settings>
@@ -63,7 +78,7 @@ namespace detail {
inline decltype(auto) base_t<T_settings>
::_add_component(const handle_type& handle, T_component_tag ct)
{
_to_match.emplace_back(handle);
_to_match.insert(handle);
auto& meta = _entities.meta_data(handle);
auto& cmd = meta.storage_meta_data();
auto& c = _components.add(ct, handle, cmd);
@@ -91,7 +106,7 @@ namespace detail {
auto& bitset = meta.bitset();
auto& cmd = meta.storage_meta_data();
if (!bitset.has_component(ct))
throw std::invalid_argument("entity does not contain the requested compnent");
throw std::invalid_argument("entity does not contain the requested component");
return _components.get(ct, handle, cmd);
}

@@ -154,7 +169,7 @@ namespace detail {
::_for_systems_parallel(T_func&& func)
{
core::utils::counter_blocker counter(_systems.size());
counter.execute_and_wait_until_zero([this, &counter, func = std::forward<T_func>(func)]{
counter.execute_and_wait([this, &counter, func = std::forward<T_func>(func)]{
_systems.for_each([this, &counter, &func](auto& instance){
this->_post_in_thread_pool([&counter, &func, &instance]{
ecs_make_scope_guard([&counter](){


+ 76
- 1
include/ecs/context/context.h View File

@@ -18,16 +18,91 @@ namespace context {
private:
using settings_type = T_settings;
using this_type = context_t<settings_type>;
using base_type = step_proxy_t<settings_type>;
using step_proxy = step_proxy_t<settings_type>;
using defer_proxy = defer_proxy_t<settings_type>;
using base_type = step_proxy;

private:
inline void refresh()
{
refresh_execute_deferred();
refresh_kill_entities();
refresh_match_entities();
}

inline void refresh_match_entities()
{
/* match new/changed entities to system instances */
for_systems_dispatch([this](auto& instance){
auto& system_bitset = instance.bitset();
for (const auto& handle : this->_to_match)
{
auto& entity_bitset = this->entity_meta_data(handle).bitset();
if (entity_bitset.contains(system_bitset))
{
if (instance.subscribe(handle))
{
// TODO send event
}
}
else if (instance.unsubscribe(handle))
{
// TODO send event
}
}
});

this->_to_match.clear();
}

inline void refresh_kill_entities()
{
/* copy entities to kill to the main vector */
for_systems_sequential([this](auto& instance){
instance.for_states([this](auto& state){
for (auto& handle : state.to_kill)
{
this->_to_kill.insert(handle);
}
});
});

/* unsubscribe dead entities */
for_systems_dispatch([this](auto& instance){
for (const auto& handle : this->_to_kill)
{
if (instance.unsubscribe(handle))
{
// TODO send event
}
}
});

/* reclaim all killed entities */
for (const auto& handle : this->_to_kill)
{
destroy_entity(handle);
}

this->_to_kill.clear();
}

inline void refresh_execute_deferred()
{
defer_proxy& proxy = *this;
for_systems_sequential([&proxy](auto& instance){
instance.for_states([&proxy](auto& state){
state.deferred_functions.execute_all(proxy);
});
});
}

public:
ecs_context_proxy_func(this_type, destroy_entity)
ecs_context_proxy_func(this_type, post_in_thread_pool)
ecs_context_proxy_func(this_type, for_systems_sequential)
ecs_context_proxy_func(this_type, for_systems_parallel)
ecs_context_proxy_func(this_type, for_systems_dispatch)

public:
inline context_t()


+ 1
- 0
include/ecs/context/defer_proxy.h View File

@@ -33,6 +33,7 @@ namespace context {
ecs_context_proxy_func(this_type, create_entity)
ecs_context_proxy_func(this_type, kill_entity)
ecs_context_proxy_func(this_type, is_alive)
ecs_context_proxy_func(this_type, entity_meta_data)

public: /* component */
ecs_context_proxy_func(this_type, add_component)


+ 7
- 0
include/ecs/core/entity/storage/base.h View File

@@ -105,6 +105,13 @@ namespace storage {
*/
inline auto& bitset();

/**
* get the bitset of components stored for this entity
*
* @return component bitset
*/
inline const auto& bitset() const;

/**
* get the current reusage counter of the entity
*


+ 8
- 1
include/ecs/core/entity/storage/base.inl View File

@@ -58,6 +58,13 @@ namespace storage {
return _bitset;
}

template <typename T_settings, typename T_storage_meta_data>
inline const auto& entity_meta_data<T_settings, T_storage_meta_data>
::bitset() const
{
return _bitset;
}

template <typename T_settings, typename T_storage_meta_data>
inline auto entity_meta_data<T_settings, T_storage_meta_data>
::counter() const
@@ -120,7 +127,7 @@ namespace storage {
grow(grow_size);
}
assert(!_free_ids.empty());
auto index = _free_ids.back();
auto index = _free_ids.front();
auto& item = _container.at(index);
_free_ids.pop();
return entity_handle(index, item.counter());


+ 2
- 0
include/ecs/core/system/data_proxy.h View File

@@ -1,7 +1,9 @@
#pragma once

#include "./data_proxy/base.h"
#include "./data_proxy/multi.h"
#include "./data_proxy/single.h"

#include "./data_proxy/base.inl"
#include "./data_proxy/multi.inl"
#include "./data_proxy/single.inl"

+ 112
- 0
include/ecs/core/system/data_proxy/multi.h View File

@@ -0,0 +1,112 @@
#pragma once

#include <ecs/config.h>
#include <ecs/core/system/data_proxy/base.h>

namespace ecs {
namespace core {
namespace system {
namespace data_proxy {

/**
* data proxy to execute system non-parallel
*
* @tparam T_context context type of the ECS environment
* @tparam T_instance system instance type
*/
template<typename T_context, typename T_instance>
struct multi
: public base<T_context, T_instance>
{
private:
using context_type = T_context;
using instance_type = T_instance;
using base_type = base<context_type, instance_type>;

public: // private:
size_t _index;
size_t _begin;
size_t _end;

public:
/**
* create the multi data proxy
*
* @param p_context context of the ECS environment
* @param p_instance system instance
* @param p_index index of this data proxy
* @param p_begin index of the first entity to process
* @param p_end index of the first entity not to process
*/
inline multi(context_type& p_context, instance_type& p_instance, size_t p_index, size_t p_begin, size_t p_end);

/**
* execute the given function for all entities handled by this data proxy
*
* @tparam T_func function type to execute
*
* @param func function to execute
*/
template<typename T_func>
inline void for_entities(T_func&& func) const;

/**
* execute the given function for all entities not handled by this data proxy
*
* @tparam T_func function type to execute
*
* @param func function to execute
*/
template<typename T_func>
void for_other_entities(T_func&& func) const;

/**
* execute the given function for all entities
*
* @tparam T_func function type to execute
*
* @param func function to execute
*/
template<typename T_func>
void for_all_entities(T_func&& func) const;

/**
* get the number of entities handled by this data proxy
*
* @return number of entities handle by this data proxy
*/
inline size_t entity_count() const;

/**
* get the number of entities not handled by this data proxy
*
* @return number of entities not handle by this data proxy
*/
inline size_t other_entity_count() const;

/**
* get the number of all entities
*
* @return number of all entities
*/
inline size_t all_entity_count() const;
};

/**
* create a multi data proxy (data proxy that does not execute in parallel)
*
* @tparam T_context context type
* @tparam T_instance instance type
*
* @param context context
* @param instance instance
* @param index index of this data proxy
* @param begin index of the first entity to process
* @param end index of the first entity not to process
*
* @return multi data proxy
*/
template<typename T_context, typename T_instance>
constexpr decltype(auto) make_multi(T_context&& context, T_instance&& instance, size_t index, size_t begin, size_t end);

} } } }

+ 96
- 0
include/ecs/core/system/data_proxy/multi.inl View File

@@ -0,0 +1,96 @@
#pragma once

#include <ecs/core/system/data_proxy/multi.h>

namespace ecs {
namespace core {
namespace system {
namespace data_proxy {

/* multi */

template<typename T_context, typename T_instance>
inline multi<T_context, T_instance>
::multi(context_type& p_context, instance_type& p_instance, size_t p_index, size_t p_begin, size_t p_end)
: base_type (p_context, p_instance)
, _index (p_index)
, _begin (p_begin)
, _end (p_end)
{ }

template<typename T_context, typename T_instance>
template<typename T_func>
inline void multi<T_context, T_instance>
::for_entities(T_func&& func) const
{
for (size_t i = _begin; i < _end; ++i)
{
std::forward<T_func>(func)(this->instance.subscribed()[i]);
}
}

template<typename T_context, typename T_instance>
template<typename T_func>
inline void multi<T_context, T_instance>
::for_other_entities(T_func&& func) const
{
auto& subscribed = this->instance.subscribed();
auto total_count = this->instance.subscribed_count();
for (size_t i = 0; i < _begin; ++i)
{
std::forward<T_func>(func)(subscribed()[i]);
}
for (size_t i = _end; i < total_count; ++i)
{
std::forward<T_func>(func)(subscribed()[i]);
}
}

template<typename T_context, typename T_instance>
template<typename T_func>
inline void multi<T_context, T_instance>
::for_all_entities(T_func&& func) const
{
for (auto& e : this->instance.subscribed())
{
std::forward<T_func>(func)(e);
}
}

template<typename T_context, typename T_instance>
inline size_t multi<T_context, T_instance>
::entity_count() const
{
return _end - _begin;
}

template<typename T_context, typename T_instance>
inline size_t multi<T_context, T_instance>
::other_entity_count() const
{
return all_entity_count() - entity_count();
}

template<typename T_context, typename T_instance>
inline size_t multi<T_context, T_instance>
::all_entity_count() const
{
return this->instance.subscribed_count();
}

/* make_multi */

template<typename T_context, typename T_instance>
constexpr decltype(auto) make_multi(T_context&& context, T_instance&& instance, size_t index, size_t begin, size_t end)
{
using context_type = mp::decay_t<T_context>;
using instance_type = mp::decay_t<T_instance>;
return multi<context_type, instance_type>(
std::forward<T_context>(context),
std::forward<T_instance>(instance),
index,
begin,
end);
}

} } } }

+ 3
- 3
include/ecs/core/system/data_proxy/single.h View File

@@ -34,7 +34,7 @@ namespace data_proxy {
* @param func function to execute
*/
template<typename T_func>
void for_entities(T_func&& func) const;
inline void for_entities(T_func&& func) const;

/**
* execute the given function for all entities not handled by this data proxy
@@ -44,7 +44,7 @@ namespace data_proxy {
* @param func function to execute
*/
template<typename T_func>
void for_other_entities(T_func&& func) const;
inline void for_other_entities(T_func&& func) const;

/**
* execute the given function for all entities
@@ -54,7 +54,7 @@ namespace data_proxy {
* @param func function to execute
*/
template<typename T_func>
void for_all_entities(T_func&& func) const;
inline void for_all_entities(T_func&& func) const;

/**
* get the number of entities handled by this data proxy


+ 4
- 4
include/ecs/core/system/data_proxy/single.inl View File

@@ -11,24 +11,24 @@ namespace data_proxy {

template<typename T_context, typename T_instance>
template<typename T_func>
void single<T_context, T_instance>
inline void single<T_context, T_instance>
::for_entities(T_func&& func) const
{
for (auto& e : this->instance.subscribed())
{
func(e);
std::forward<T_func>(func)(e);
}
}

template<typename T_context, typename T_instance>
template<typename T_func>
void single<T_context, T_instance>
inline void single<T_context, T_instance>
::for_other_entities(T_func&& func) const
{ }

template<typename T_context, typename T_instance>
template<typename T_func>
void single<T_context, T_instance>
inline void single<T_context, T_instance>
::for_all_entities(T_func&& func) const
{
for_entities(std::forward<T_func>(func));


+ 39
- 30
include/ecs/core/system/instance.h View File

@@ -2,6 +2,7 @@

#include <ecs/config.h>
#include <ecs/signature/system.h>
#include <ecs/context/defer_proxy.h>
#include <ecs/core/utils/bitset.h>
#include <ecs/core/utils/fixed_function.h>
#include <ecs/core/utils/ordered_vector.h>
@@ -14,7 +15,8 @@ namespace system {
struct deferred_function_vector
{
private:
using deferred_proxy_type = int;
using settings_type = T_settings;
using deferred_proxy_type = context::detail::defer_proxy_t<settings_type>;
using function_type = utils::fixed_function<void(deferred_proxy_type&)>;
using function_vector_type = std::vector<function_type>;

@@ -39,8 +41,20 @@ namespace system {
}
};

template<typename T_settings>
constexpr decltype(auto) get_scheduler_instance_meta_data_type(T_settings&&) noexcept
{
using settings_type = T_settings;
using context_type = ::ecs::context::type<settings_type>;
using scheduler_type = decltype((settings_type { }).scheduler()(settings_type { }, std::declval<context_type&>()));
using instance_meta_data_type = typename scheduler_type::instance_meta_data_type;

return hana::type_c<instance_meta_data_type>;
}

template<typename T_settings, typename T_system_signature, typename T_entity_handle>
struct instance
: private mp::unwrap_t<mp::decay_t<decltype(get_scheduler_instance_meta_data_type(T_settings { }))>>
{
static_assert(decltype(signature::system::is_valid(mp::unwrap_t<T_system_signature> { })) { });

@@ -51,6 +65,7 @@ namespace system {
using system_tag_type = mp::decay_t<decltype(core::mp::unwrap(system_signature_type { }).tag())>;
using system_type = mp::unwrap_t<system_tag_type>;
using output_type = mp::unwrap_t<mp::decay_t<decltype(mp::unwrap(system_signature_type { }).output())>>;
using scheduler_meta_data_type = mp::unwrap_t<mp::decay_t<decltype(get_scheduler_instance_meta_data_type(T_settings { }))>>;

private:
using signature_list_type = decltype((settings_type { }).system_signatures());
@@ -92,7 +107,7 @@ namespace system {

public:
instance()
: _bitset ()
: _bitset (bitset_type::from_system_signature(mp::unwrap(system_signature_type { })))
, _system ()
, _executor (mp::unwrap(system_signature_type { }).parallelism()())
, _subscribed()
@@ -122,34 +137,17 @@ namespace system {
clear_and_prepare(n);
}

template<typename T_context, typename T_func>
inline void prepare_and_wait_subtasks(T_context& context, size_t n, T_func& func)
{
assert(n > 0);
clear_and_prepare(n);
utils::counter_blocker b(n-1);

auto run_in_seperate_thread = [this, &context, &b](auto& f) {
return [this, &b, &context, &f](auto&&... xs) {
context.post_in_thread_pool([&f, &b, xs...]() {
ecs_make_scope_guard([&b](){
b.decrement();
});
f(xs...);
});
};
};

b.execute_and_wait_until_zero([&func, &run_in_seperate_thread]{
func(run_in_seperate_thread);
});
}

template<typename T_context, typename T_func>
inline void execute(T_context& context, T_func&& func)
{ _executor(context, *this, std::forward<T_func>(func)); }

public: /* bitset */
public: /* misc */
constexpr decltype(auto) scheduler_meta_data() noexcept
{ return static_cast<scheduler_meta_data_type&>(*this); }

constexpr decltype(auto) signature() const noexcept
{ return mp::unwrap(system_signature_type { }); }

inline const auto& bitset() const noexcept
{ return _bitset; }

@@ -173,11 +171,22 @@ namespace system {
inline bool is_subscribed(const entity_handle_type& handle) const
{ return (_subscribed.find(handle) != _subscribed.end()); }

inline void subscribe(const entity_handle_type& handle)
{ _subscribed.insert(handle); }
inline bool subscribe(const entity_handle_type& handle)
{ return _subscribed.insert(handle).second; }

inline void unsubscribe(const entity_handle_type& handle)
{ _subscribed.erase(handle); }
inline bool unsubscribe(const entity_handle_type& handle)
{
auto it = _subscribed.find(handle);
if (it != _subscribed.end())
{
_subscribed.erase(it);
return true;
}
else
{
return false;
}
}

public: /* states */
template<typename T_func>


+ 1
- 1
include/ecs/core/system/parallelism/composer/fixed_threshold.h View File

@@ -20,7 +20,7 @@ namespace parallelism {
template<typename T_context, typename T_instance, typename T_func>
inline void operator()(T_context& context, T_instance& instance, T_func&& func) const
{
bool threshold_reached = false; // TODO
bool threshold_reached = instance.subscribed_count() >= parameters_type::get_threshold();
if (!threshold_reached) _strategy_lower (instance, context, std::forward<T_func>(func));
else _strategy_greater(instance, context, std::forward<T_func>(func));
}


+ 28
- 2
include/ecs/core/system/parallelism/strategy/none.h View File

@@ -10,29 +10,55 @@ namespace parallelism {

struct none
{
public:
template<typename T_context, typename T_instance>
struct executor_proxy
{
public:
using context_type = T_context;
using instance_type = T_instance;

context_type& context;
instance_type& instance;

public:
template<typename T_func>
inline void for_subtasks(T_func&& func)
{
instance.prepare_states(1);
auto data = data_proxy::make_single(context, instance);
func(data);
std::forward<T_func>(func)(data);
}
};

private:
#ifndef NDEBUG
bool _bound { false };
mutable std::thread::id _thread_id { 0 };
#endif

public:
#ifndef NDEBUG
none(bool bound)
: _bound(bound)
{ }
#else
none()
{ }
#endif

template<typename T_context, typename T_instance, typename T_func>
inline void operator()(T_context& context, T_instance& instance, T_func&& func) const
{
#ifndef NDEBUG
assert( !_bound
&& ( _thread_id == std::thread::id { 0 }
|| _thread_id == std::this_thread::get_id()));
_thread_id = std::this_thread::get_id();
#endif

executor_proxy<T_context, T_instance> ep { context, instance };
func(instance, ep);
std::forward<T_func>(func)(instance, ep);
}
};


+ 66
- 0
include/ecs/core/system/parallelism/strategy/split_base.h View File

@@ -0,0 +1,66 @@
#pragma once

#include <ecs/config.h>
#include <ecs/core/system/data_proxy.h>

namespace ecs {
namespace core {
namespace system {
namespace parallelism {

struct split_base
{
public:
template<typename T_context, typename T_instance>
struct executor_proxy
{
public:
using context_type = T_context;
using instance_type = T_instance;

context_type& context;
instance_type& instance;
size_t split_count;
size_t per_split;

public:
template<typename T_func>
inline void for_subtasks(T_func&& func)
{
auto total_count = instance.subscribed_count();

instance.prepare_states(split_count);
utils::counter_blocker counter(split_count - 1);

/* execute the splitted subtask */
auto execute = [this, &func](size_t index, size_t from, size_t to)
{
auto data = data_proxy::make_multi(context, instance, index, from, to);
func(data);
};

/* create subtasks */
for (size_t i = 0; i < split_count - 1; ++i)
{
context.post_in_thread_pool([this, &counter, &execute, i](auto){
ecs_make_scope_guard([&counter](){
counter.decrement();
});
auto beg = i * per_split;
auto end = (i + 1) * per_split;
execute(i, beg, end);
});
}

/* execute and wait */
counter.execute_and_wait([this, &execute, total_count]{
auto index = split_count - 1;
auto beg = index * per_split;
auto end = total_count;
execute(index, beg, end);
});
}
};
};

} } } }

+ 8
- 2
include/ecs/core/system/parallelism/strategy/split_evenly.h View File

@@ -1,6 +1,7 @@
#pragma once

#include <ecs/config.h>
#include <ecs/core/system/parallelism/strategy/split_base.h>

namespace ecs {
namespace core {
@@ -9,14 +10,19 @@ namespace parallelism {

template<typename T_parameters>
struct split_evenly
: private split_base
{
public:
using parameters_type = T_parameters;

template<typename T_context, typename T_instance, typename T_func>
inline void operator()(T_context& context, T_instance& instance, T_func&& func) const
{
// TODO
// auto split_count = parameters_type::get_split_count();
auto split_count = parameters_type::get_split_count();
auto total_count = instance.subscribed_count();
auto per_split = total_count / split_count;
executor_proxy<T_context, T_instance> ep { context, instance, split_count, per_split };
func(instance, ep);
}
};


+ 8
- 2
include/ecs/core/system/parallelism/strategy/split_every.h View File

@@ -1,6 +1,7 @@
#pragma once

#include <ecs/config.h>
#include <ecs/core/system/parallelism/strategy/split_base.h>

namespace ecs {
namespace core {
@@ -9,14 +10,19 @@ namespace parallelism {

template<typename T_parameters>
struct split_every
: private split_base
{
using parameters_type = T_parameters;

template<typename T_context, typename T_instance, typename T_func>
inline void operator()(T_context& context, T_instance& instance, T_func&& func) const
{
// TODO
// auto per_split = parameters_type::get_per_split_count();
auto per_split = parameters_type::get_per_split_count();
auto total_count = instance.subscribed_count();
auto split_count = (total_count + per_split - 1) / per_split;

executor_proxy<T_context, T_instance> ep { context, instance, split_count, per_split };
func(instance, ep);
}
};


+ 42
- 4
include/ecs/core/system/scheduler/atomic_counter.h View File

@@ -3,6 +3,7 @@
#include <ecs/config.h>
#include <ecs/tag/system.h>
#include <ecs/context/context.fwd.h>
#include <ecs/core/utils/thread_pool/task_queue.h>

namespace ecs {
namespace core {
@@ -11,6 +12,14 @@ namespace scheduler {

namespace detail
{
/**
* meta data to store for each system instance
*/
struct instance_meta_data_t
{
ssize_t thread_id = -1;
};

/**
* struct to wrap some system meta data
*/
@@ -113,6 +122,34 @@ namespace scheduler {
constexpr decltype(auto) operator()(T_dependency_items) const noexcept;
};

/**
* extention of the normal counter_blocker to handle tasks to be executed in the context of the main thread
*/
struct task_counter_blocker_t
: public utils::counter_blocker
{
private:
utils::task_queue _tasks; //!< tasks to execute in the main thread

public:
using utils::counter_blocker::counter_blocker;

/**
* enqueue a new task inside the task list
*
* @tparam T_task type of the task
*
* @param task task to execute
*/
template<typename T_task>
inline void post(T_task&& task);

/**
* execute all tasks stored in the queue
*/
inline void process();
};

/**
* defines a group of tasks to execute with
*
@@ -133,7 +170,7 @@ namespace scheduler {

private:
context_type& _context; //!< context to use for system execution
utils::counter_blocker& _counter; //!< counter to track running tasks
task_counter_blocker_t& _counter; //!< counter to track running tasks
tasks_tuple_type _tasks; //!< tuple of all tasks

public:
@@ -143,7 +180,7 @@ namespace scheduler {
* @param p_context context to use for system execution
* @param p_counter counter to track running tasks
*/
inline task_group_t(context_type& p_context, utils::counter_blocker& p_counter);
inline task_group_t(context_type& p_context, task_counter_blocker_t& p_counter);

/**
* start the given task
@@ -223,8 +260,9 @@ namespace scheduler {
struct atomic_counter
{
public:
using settings_type = T_settings;
using context_type = ::ecs::context::type<settings_type>;
using settings_type = T_settings;
using context_type = ::ecs::context::type<settings_type>;
using instance_meta_data_type = detail::instance_meta_data_t;

private:
context_type& _context;


+ 111
- 16
include/ecs/core/system/scheduler/atomic_counter.inl View File

@@ -1,7 +1,11 @@
#pragma once

// #define ECS_DEBUG_ATOMIC_COUNTER

#include <ecs/core/system/scheduler/atomic_counter.h>

#include <ecs/core/utils/fixed_function.inl>

namespace ecs {
namespace core {
namespace system {
@@ -10,6 +14,25 @@ namespace scheduler {
namespace detail
{

/* misc */

struct make_is_bound_to_main
{
constexpr decltype(auto) operator()() const
{
return hana::is_valid(
[](auto t) -> hana::type<typename decltype(t)::type::bind_to_main> { });
}
};

struct make_is_bound_to_thread
{
constexpr decltype(auto) operator()() const
{
return hana::is_valid(
[](auto t) -> hana::type<typename decltype(t)::type::bind_to_thread> { });
}
};

/* task_t */

@@ -78,11 +101,53 @@ namespace scheduler {
});
}

/* task_counter_blocker_t */

template<typename T_task>
inline void task_counter_blocker_t
::post(T_task&& task)
{
std::lock_guard lock(_mutex);
_tasks.emplace(std::forward<T_task>(task));
_cond_var.notify_all();
}

inline void task_counter_blocker_t
::process()
{
while(true)
{
utils::task t;
{
std::lock_guard lock(_mutex);
if (_tasks.empty())
{
return;
}
t = std::move(_tasks.front());
_tasks.pop();

try
{
t(static_cast<size_t>(-1));
}
catch(const std::exception& ex)
{
std::cerr << "error in main thread: " << ex.what() << std::endl;
}
catch(...)
{
std::cerr << "error in main thread: unknown" << std::endl;
}
}
}
}

/* task_group_t */

template<typename T_context, typename T_dependency_items>
inline task_group_t<T_context, T_dependency_items>
::task_group_t(context_type& p_context, utils::counter_blocker& p_counter)
::task_group_t(context_type& p_context, task_counter_blocker_t& p_counter)
: _context(p_context)
, _counter(p_counter)
{ }
@@ -106,10 +171,36 @@ namespace scheduler {
inline void task_group_t<T_context, T_dependency_items>
::post_task_in_thread_pool(T_task_id, T_func&& func)
{
std::cout << "post_task_in_thread_pool (current=" << std::this_thread::get_id() << ", task=" << T_task_id::value << ")" << std::endl;
_context.post_in_thread_pool([this, &func]{
execute_task(T_task_id { }, func);
});
using task_type = mp::decay_t<decltype(hana::at(_tasks, T_task_id { }))>;
using instance_type = mp::decay_t<decltype(_context.instance_by_tag(std::declval<task_type>().dependency_item.tag))>;
using signature_type = typename instance_type::system_signature_type;
using executor_builder_type = mp::decay_t<decltype(mp::unwrap(signature_type { }).parallelism())>;

using is_bound_to_main_type = mp::decay_t<decltype(make_is_bound_to_main { } ())>;
using is_bound_to_thread_type = mp::decay_t<decltype(make_is_bound_to_thread { } ())>;

hana::eval_if(
is_bound_to_main_type{ }(hana::type_c<executor_builder_type>),
[this, &func](auto _){
_counter.post([this, &func](size_t thread_id){
execute_task(T_task_id { }, func);
});
},
[this, &func](auto _){
hana::eval_if(is_bound_to_thread_type{ }(hana::type_c<executor_builder_type>),
[this, &func](auto _){
auto& meta = _context.instance_by_tag(task_type { }.dependency_item.tag).scheduler_meta_data();
_context.post_in_thread_pool([this, &func, &meta](size_t thread_id){
meta.thread_id = static_cast<ssize_t>(thread_id);
execute_task(T_task_id { }, func);
}, meta.thread_id);
},
[this, &func](auto _){
_context.post_in_thread_pool([this, &func](size_t thread_id){
execute_task(T_task_id { }, func);
});
});
});
}

template<typename T_context, typename T_dependency_items>
@@ -117,7 +208,6 @@ namespace scheduler {
inline void task_group_t<T_context, T_dependency_items>
::execute_task(T_task_id, T_func&& func)
{
std::cout << "execute_task (current=" << std::this_thread::get_id() << ", task=" << T_task_id::value << ")" << std::endl;
auto& task = hana::at(_tasks, T_task_id { });
auto& instance = _context.instance_by_tag(task.dependency_item.tag);
instance.execute(_context, func);
@@ -129,7 +219,6 @@ namespace scheduler {
inline void task_group_t<T_context, T_dependency_items>
::check_and_start_dependencies(T_task_id, T_func&& func)
{
std::cout << "check_and_start_dependencies (current=" << std::this_thread::get_id() << ", task=" << T_task_id::value << ")" << std::endl;
auto& task = hana::at(_tasks, T_task_id { });
task.for_dependent_ids([this, &func](auto id){
auto& other = hana::at(_tasks, id);
@@ -232,6 +321,8 @@ namespace scheduler {
}
}

/* atomic_counter */

template<typename T_settings>
inline atomic_counter<T_settings>
::atomic_counter(context_type& p_context)
@@ -250,7 +341,7 @@ namespace scheduler {
using independent_item_ids_type = mp::decay_t<decltype(detail::get_independent_item_ids(
dependency_list_type { }))>;;

/* TODO debug beg! */
#ifdef ECS_DEBUG_ATOMIC_COUNTER
size_t i = 0;
std::cout << "dependency_list" << std::endl;
hana::for_each(dependency_list_type { }, [&i](auto item){
@@ -260,19 +351,23 @@ namespace scheduler {
std::cout << " " << hana::value(id) << std::endl;
});
});
/* TODO debug end! */
#endif

using task_group_type = detail::task_group_t<context_type, dependency_list_type>;

utils::counter_blocker counter (1);
task_group_type task_group (_context, counter);
detail::task_counter_blocker_t counter (1);
task_group_type task_group (_context, counter);

counter.execute_and_wait_until_zero([&task_group, &counter, &func]{
hana::for_each(independent_item_ids_type { }, [&task_group, &func](auto id){
task_group.start_task(id, func);
counter.execute_and_wait_tick(
[&task_group, &counter, &func]{
hana::for_each(independent_item_ids_type { }, [&task_group, &func](auto id){
task_group.start_task(id, func);
});
counter.decrement();
},
[&counter]{
counter.process();
});
counter.decrement();
});
}

} } } }

+ 2
- 2
include/ecs/core/system/storage/tuple.h View File

@@ -53,14 +53,14 @@ namespace storage {
make_tuple { }))>;

private:
storage_type _storage;
storage_type _storage;

public:
constexpr decltype(auto) size() const noexcept
{ return (ssl_type { }).size(); }

template<typename T_func>
constexpr void for_each(T_func&& f) const noexcept
constexpr void for_each(T_func&& f) noexcept
{ hana::for_each(_storage, std::forward<T_func>(f)); }

template<typename T_system>


+ 20
- 1
include/ecs/core/utils/bitset.h View File

@@ -52,7 +52,19 @@ namespace utils {
* @return integral constant with the ID of the requested component tag
*/
template<typename T_component_tag>
static constexpr decltype(auto) component_id(T_component_tag ct) noexcept;
static constexpr decltype(auto) component_id(T_component_tag&& ct) noexcept;

/**
* get the bitset from the given system signature
*
* @tparam T_system_signature system signature type to get bitmask for
*
* @param ssig system signature to get bitmask for
*
* @return bitmask for the passed system signature
*/
template<typename T_system_signature>
static constexpr decltype(auto) from_system_signature(T_system_signature&& ssig) noexcept;

/**
* internal bitset type that contains all components
@@ -63,6 +75,13 @@ namespace utils {
bitset_type _bitset;

public:
/**
* return a string representation of the bitset
*
* @return string representation of the bitset
*/
inline decltype(auto) to_string() const;

/**
* clear all bits of the bitset
*/


+ 22
- 3
include/ecs/core/utils/bitset.inl View File

@@ -9,9 +9,28 @@ namespace utils {
template<typename T_settings>
template<typename T_component_tag>
constexpr decltype(auto) bitset<T_settings>
::component_id(T_component_tag ct) noexcept
::component_id(T_component_tag&& ct) noexcept
{
return mp::list::index_of(all_components(), ct);
return mp::list::index_of(all_components(), std::forward<T_component_tag>(ct));
}

template<typename T_settings>
template<typename T_system_signature>
constexpr decltype(auto) bitset<T_settings>
::from_system_signature(T_system_signature&& ssig) noexcept
{
bitset ret;
hana::for_each(hana::concat(ssig.read(), ssig.write()), [&ret](auto ct){
ret.set_component(ct, true);
});
return ret;
}

template<typename T_settings>
inline decltype(auto) bitset<T_settings>
::to_string() const
{
return _bitset.to_string();
}

template<typename T_settings>
@@ -26,7 +45,7 @@ namespace utils {
inline bool bitset<T_settings>
::contains(const T_other& other) const noexcept
{
return (_bitset & other._bitset) == _bitset;
return (_bitset & other._bitset) == other._bitset;
}

template<typename T_settings>


+ 18
- 5
include/ecs/core/utils/counter_blocker.h View File

@@ -14,9 +14,9 @@ namespace utils {
*/
struct counter_blocker
{
private:
std::mutex _mutex; //!< mutex to preotect the counter
std::condition_variable _cond_var; //!< conditional variable to synchronize the counter
protected:
std::recursive_mutex _mutex; //!< mutex to preotect the counter
std::condition_variable_any _cond_var; //!< conditional variable to synchronize the counter
size_t _counter; //!< the actual counter value

public:
@@ -42,10 +42,23 @@ namespace utils {
*
* @tparam T_func type of the function to execute
*
* @param func function to execute
* @param func function to execute before waiting for the counter
*/
template<typename T_func>
inline void execute_and_wait_until_zero(T_func&& func) noexcept;
inline void execute_and_wait(T_func&& func);

/**
* Execute the given function and wait until the counter is equal to zero.
* Execute the given tick function at least once and repeatedly if the counter value has changed.
*
* @tparam T_func type of the function to execute
* @tparam T_tick type of tick function to execute
*
* @param func function to execute before waiting for the counter
* @param tick tick function to execute
*/
template<typename T_func, typename T_tick>
inline void execute_and_wait_tick(T_func&& func, T_tick&& tick);
};

} } }

+ 14
- 1
include/ecs/core/utils/counter_blocker.inl View File

@@ -1,5 +1,6 @@
#pragma once

#include <iostream> // TODO debug!
#include <ecs/core/utils/counter_blocker.h>

namespace ecs {
@@ -30,11 +31,23 @@ namespace utils {

template<typename T_func>
inline void counter_blocker
::execute_and_wait_until_zero(T_func&& func) noexcept
::execute_and_wait(T_func&& func)
{
func();
std::unique_lock lock(_mutex);
_cond_var.wait(lock, [this]{ return (_counter == 0); });
}

template<typename T_func, typename T_tick>
inline void counter_blocker
::execute_and_wait_tick(T_func&& func, T_tick&& tick)
{
func();
std::unique_lock lock(_mutex);
_cond_var.wait(lock, [this, t = std::forward<T_tick>(tick)]{
t();
return (_counter == 0);
});
}

} } }

+ 4
- 1
include/ecs/core/utils/ordered_vector.h View File

@@ -152,7 +152,7 @@ namespace utils {
inline decltype(auto) insert(const value_type& value)
{
auto it = std::lower_bound(begin(), end(), value);
auto is_new = _compare(value, *it);
auto is_new = (it != end() && _compare(value, *it));
if (it == end() || is_new)
_items.insert(it, value);
return std::make_pair(it, is_new);
@@ -181,6 +181,9 @@ namespace utils {
template<typename... T_args>
constexpr decltype(auto) erase(T_args&&... args)
{ return _items.erase(std::forward<T_args>(args)...); }

void clear()
{ _items.clear(); }
};

} } }

+ 2
- 1
include/ecs/core/utils/thread_pool.h View File

@@ -1,8 +1,9 @@
#pragma once

#include "./thread_pool/pool.h"
#include "./thread_pool/types.h"
#include "./thread_pool/task_queue.h"
#include "./thread_pool/worker.h"

#include "./thread_pool/pool.inl"
#include "./thread_pool/task_queue.inl"
#include "./thread_pool/worker.inl"

+ 8
- 6
include/ecs/core/utils/thread_pool/pool.h View File

@@ -4,8 +4,8 @@
#include <vector>

#include <ecs/config.h>
#include <ecs/core/utils/thread_pool/types.h>
#include <ecs/core/utils/thread_pool/worker.h>
#include <ecs/core/utils/thread_pool/task_queue.h>

namespace ecs {
namespace core {
@@ -17,13 +17,14 @@ namespace utils {
struct thread_pool
{
private:
using worker_vector = std::vector<thread_pool_worker>;
using worker_ptr_u = std::unique_ptr<thread_pool_worker>;
using worker_vector = std::vector<worker_ptr_u>;
using atomic_size_t = std::atomic<size_t>;

private:
task_queue _queue; //!< queue to store tasks to execute
worker_vector _workers; //!< vector of worker threads
atomic_size_t _outstanding_inits; //!< number of outstanding worker initializations
concurrent_task_queue _tasks; //!< queue to store tasks to execute
worker_vector _workers; //!< vector of worker threads
atomic_size_t _outstanding_inits; //!< number of outstanding worker initializations

/**
* check if all workers are finished or not
@@ -59,9 +60,10 @@ namespace utils {
* @tparam T_task type of the task
*
* @param task task to execute
* @param worker_id id of the worker to enqueue task at (-1 = any; n = worker tasks)
*/
template<typename T_task>
inline void post(T_task&& task);
inline void post(T_task&& task, ssize_t worker_id = -1);
};

} } }

+ 18
- 2
include/ecs/core/utils/thread_pool/pool.inl View File

@@ -8,9 +8,25 @@ namespace utils {

template<typename T_task>
inline void thread_pool
::post(T_task&& task)
::post(T_task&& task, ssize_t worker_id)
{
_queue.enqueue(std::forward<T_task>(task));
if (worker_id == -1)
{
_tasks.push(std::forward<T_task>(task));
for (auto& w : _workers)
{
w->signal();
}
}
else if (worker_id >= 1 && worker_id <= static_cast<ssize_t>(_workers.size()))
{
auto& w = _workers.at(static_cast<size_t>(worker_id - 1));
w->post(std::forward<T_task>(task));
}
else
{
throw std::invalid_argument(std::string("invalid worker_id: ") + std::to_string(worker_id));
}
}

} } }

+ 68
- 0
include/ecs/core/utils/thread_pool/task_queue.h View File

@@ -0,0 +1,68 @@
#pragma once

#include <sstream> // TODO debug!
#include <queue>
#include <ecs/config.h>
#include <moodycamel/concurrentqueue.h>

#include "../fixed_function.h"

namespace ecs {
namespace core {
namespace utils {

/**
* Functor to execute as task inside the thread pool. The functor can have a max size of 128 bytes!
*/
using task = fixed_function<void(size_t thread_id), 128>;

/**
* normal task queue
*/
using task_queue = std::queue<task>;

/**
* none blocking concurrent queue to store thread pool tasks in
*/
struct concurrent_task_queue
{
private:
using inner_queue_type = moodycamel::ConcurrentQueue<task>;

private:
std::atomic<ssize_t> _count;
inner_queue_type _inner_queue;

public:
/**
* constructor
*/
concurrent_task_queue();

/**
* check if the task queue is empty
*
* @retval TRUE if the queue is empty
* @retval FALSE if the queue contains at least one element
*/
inline bool empty() const;

/**
* push a new element to the queue
*
* @param t task to push to queue
*/
void push(task&& t);

/**
* try to get dequeue a task from the queue
*
* @param t dequeued task
*
* @retval TRUE if an task was dequeued
* @retval FALSE if the queue is empty
*/
bool pop(task& t);
};

} } }

+ 15
- 0
include/ecs/core/utils/thread_pool/task_queue.inl View File

@@ -0,0 +1,15 @@
#pragma once

#include <ecs/core/utils/thread_pool/task_queue.h>

namespace ecs {
namespace core {
namespace utils {

bool concurrent_task_queue
::empty() const
{
return (_count.load() <= 0);
}

} } }

+ 0
- 22
include/ecs/core/utils/thread_pool/types.h View File

@@ -1,22 +0,0 @@
#pragma once

#include <ecs/config.h>
#include <moodycamel/blockingconcurrentqueue.h>

#include "../fixed_function.h"

namespace ecs {
namespace core {
namespace utils {

/**
* Functor to execute as task inside the thread pool. The functor can have a max size of 128 bytes!
*/
using task = fixed_function<void(), 128>;

/**
* non blocking concurrent queue to store thread pool tasks in
*/
using task_queue = moodycamel::BlockingConcurrentQueue<task>;

} } }

+ 37
- 5
include/ecs/core/utils/thread_pool/worker.h View File

@@ -1,9 +1,11 @@
#pragma once

#include <mutex>
#include <thread>
#include <condition_variable>
#include <ecs/config.h>

#include "./types.h"
#include "./task_queue.h"
#include "../movable_atomic.h"

namespace ecs {
@@ -29,9 +31,13 @@ namespace utils {

using atomic_state = movable_atomic<state>;

std::thread _thread; //!< actual thread object to use for execution
task_queue& _queue; //!< queue to poll new tasks from
atomic_state _state; //!< current state of the worker
std::thread _thread; //!< actual thread object to use for execution
std::mutex _mutex; //!< mutex to protext the conditional
std::condition_variable _conditional; //!< condition variable to inform about new tasks
concurrent_task_queue& _all_tasks; //!< queue to poll new tasks from
task_queue _own_tasks; //!< queue of tasks that are assigned to this specific worker
atomic_state _state; //!< current state of the worker
size_t _thread_id; //!< index of the thread inside the thread pool

private:
/**
@@ -39,13 +45,24 @@ namespace utils {
*/
void run();

/**
* pop the next task from either the own task list or the general task list
*
* @param t parameter to store dequeued task in
* @param timeout timeout to wait for new tasks
*
* @return TRUE if a task was dequeued, FALSE otherwise
*/
template<typename T_rep, typename T_period>
inline bool pop(task& t, const std::chrono::duration<T_rep, T_period>& timeout);

public:
/**
* constructor
*
* @param queue task queue to poll tasks from
*/
inline thread_pool_worker(task_queue& queue) noexcept;
inline thread_pool_worker(concurrent_task_queue& all_tasks, size_t thread_id) noexcept;

inline thread_pool_worker(thread_pool_worker&&) = default;
inline thread_pool_worker(const thread_pool_worker&) = delete;
@@ -81,6 +98,21 @@ namespace utils {
*/
inline bool finished() const noexcept;

/**
* enqueue a new task to execute inside this worker thread
*
* @tparam T_task type of the task
*
* @param task task to execute
*/
template<typename T_task>
inline void post(T_task&& task);

/**
* inform the task about new tasks in the concurent task list
*/
inline void signal();

};

} } }

+ 43
- 3
include/ecs/core/utils/thread_pool/worker.inl View File

@@ -1,16 +1,42 @@
#pragma once

#include <ecs/core/utils/thread_pool/worker.h>
#include <ecs/core/utils/thread_pool/task_queue.inl>

namespace ecs {
namespace core {
namespace utils {

inline thread_pool_worker
::thread_pool_worker(task_queue& queue) noexcept
: _queue(queue)
::thread_pool_worker(concurrent_task_queue& all_tasks, size_t thread_id) noexcept
: _all_tasks(all_tasks)
, _thread_id(thread_id)
{ }

template<typename T_rep, typename T_period>
inline bool thread_pool_worker
::pop(task& t, const std::chrono::duration<T_rep, T_period>& timeout)
{
std::unique_lock<std::mutex> lock(_mutex);
do
{
if (_state != state::running)
{
return false;
}
if (!_own_tasks.empty())
{
t = std::move(_own_tasks.front());
_own_tasks.pop();
return true;
}
if (!_all_tasks.empty() && _all_tasks.pop(t)) // TODO: this will sometimes return false, even if the queue is not yet empty
{
return true;
}
} while(_conditional.wait_for(lock, timeout) != std::cv_status::timeout);
return false;
}

template<typename T_counter>
inline void thread_pool_worker
@@ -33,7 +59,6 @@ namespace utils {
::join() noexcept
{
assert(_thread.joinable());
assert(_state == state::finished);
_thread.join();
}

@@ -43,4 +68,19 @@ namespace utils {
return _state == state::finished;
}

template<typename T_task>
inline void thread_pool_worker
::post(T_task&& task)
{
std::unique_lock<std::mutex> lock(_mutex);
_own_tasks.emplace(std::forward<T_task>(task));
signal();
}

inline void thread_pool_worker
::signal()
{
_conditional.notify_all();
}

} } }

+ 2
- 0
include/ecs/signature/system/parallelism/strategie.h View File

@@ -1,5 +1,7 @@
#pragma once

#include "./strategy/bound.h"
#include "./strategy/main.h"
#include "./strategy/none.h"
#include "./strategy/split_evenly_cores.h"
#include "./strategy/split_evenly.h"

+ 36
- 0
include/ecs/signature/system/parallelism/strategy/bound.h View File

@@ -0,0 +1,36 @@
#pragma once

#include <ecs/config.h>
#include <ecs/core/system/parallelism/strategy/none.h>

namespace ecs {
namespace signature {
namespace system {
namespace parallelism {

namespace detail
{
struct bound_builder_t
{
using bind_to_thread = void;

inline decltype(auto) operator()() const noexcept
{
#ifndef NDEBUG
return ::ecs::core::system::parallelism::none(false);
#else
return ::ecs::core::system::parallelism::none();
#endif
}
};

struct bound_t
{
constexpr decltype(auto) operator()() const noexcept
{ return bound_builder_t { }; }
};
}

constexpr decltype(auto) bound = detail::bound_t { };

} } } }

+ 36
- 0
include/ecs/signature/system/parallelism/strategy/main.h View File

@@ -0,0 +1,36 @@
#pragma once

#include <ecs/config.h>
#include <ecs/core/system/parallelism/strategy/none.h>

namespace ecs {
namespace signature {
namespace system {
namespace parallelism {

namespace detail
{
struct main_builder_t
{
using bind_to_main = void;

inline decltype(auto) operator()() const noexcept
{
#ifndef NDEBUG
return ::ecs::core::system::parallelism::none(false);
#else
return ::ecs::core::system::parallelism::none();
#endif
}
};

struct main_t
{
constexpr decltype(auto) operator()() const noexcept
{ return main_builder_t { }; }
};
}

constexpr decltype(auto) main = detail::main_t { };

} } } }

+ 8
- 2
include/ecs/signature/system/parallelism/strategy/none.h View File

@@ -12,8 +12,14 @@ namespace parallelism {
{
struct none_builder_t
{
constexpr decltype(auto) operator()() const noexcept
{ return ::ecs::core::system::parallelism::none { }; }
inline decltype(auto) operator()() const noexcept
{
#ifndef NDEBUG
return ::ecs::core::system::parallelism::none(false);
#else
return ::ecs::core::system::parallelism::none();
#endif
}
};

struct none_t


+ 1
- 1
include/ecs/signature/system/parallelism/strategy/split_every.h View File

@@ -15,7 +15,7 @@ namespace parallelism {
{
struct parameters
{
static constexpr size_t get_split_count() noexcept
static constexpr size_t get_per_split_count() noexcept
{ return T_split::value; }
};



+ 1
- 1
include/ecs/signature/system/signature.h View File

@@ -121,7 +121,7 @@ namespace system {
public: /* setter */

/** set the parallelism options
* @param parallelism is a predicate the takes no parameters and returns a
* @param parallelism is a predicate that takes no parameters and returns a
* system executor interface */
template<typename T_parallelism>
constexpr decltype(auto) parallelism(T_parallelism parallelism) const noexcept;


+ 8
- 17
src/core/utils/thread_pool/pool.cpp View File

@@ -9,7 +9,7 @@ auto thread_pool
{
for (const auto& w : _workers)
{
if (!w.finished())
if (!w->finished())
{
return false;
}
@@ -23,19 +23,20 @@ void thread_pool
_workers.reserve(count);
for (size_t i = 0; i < count; ++i)
{
_workers.emplace_back(_queue);
_workers.emplace_back(std::make_unique<thread_pool_worker>(_tasks, i + 1));
}

_outstanding_inits = count;
for (auto& w : _workers)
{
w.start(_outstanding_inits);
w->start(_outstanding_inits);
}
}

thread_pool
::thread_pool(size_t count)
{
assert(count > 1);
initialize_workers(count);
}

@@ -48,21 +49,11 @@ thread_pool
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}

// stop all workers
for (auto& w : _workers)
{
w.stop();
}

// post dummy tasks untill all workers has stopped
while (!all_workers_finished())
{
post([]{ });
}

// join the worker threads
for (auto& w : _workers)
{
w.join();
w->stop();
w->signal();
w->join();
}
}
}

+ 38
- 0
src/core/utils/thread_pool/task_queue.cpp View File

@@ -0,0 +1,38 @@
#include <ecs/core/utils/fixed_function.inl>
#include <ecs/core/utils/thread_pool/task_queue.inl>

using namespace ::ecs::core::utils;

concurrent_task_queue
::concurrent_task_queue()
: _count (0)
, _inner_queue ()
{ }

void concurrent_task_queue
::push(task&& t)
{
if (!_inner_queue.enqueue(std::move(t)))
{
throw std::overflow_error("task queue is out of memory");
}
_count++;
}

bool concurrent_task_queue
::pop(task& t)
{
ssize_t old_count = _count.load(std::memory_order_relaxed);
while (old_count > 0)
{
if (_count.compare_exchange_weak(old_count, old_count - 1, std::memory_order_acquire, std::memory_order_relaxed))
{
while (!_inner_queue.try_dequeue(t))
{
continue;
}
return true;
}
}
return false;
}

+ 15
- 2
src/core/utils/thread_pool/worker.cpp View File

@@ -1,3 +1,5 @@
#include <iostream>

#include <ecs/core/utils/fixed_function.inl>
#include <ecs/core/utils/thread_pool/worker.inl>

@@ -10,9 +12,20 @@ void thread_pool_worker
while (_state == state::running)
{
task t;
if (_queue.wait_dequeue_timed(t, std::chrono::milliseconds(500)))
if (pop(t, std::chrono::milliseconds(500)))
{
t();
try
{
t(_thread_id);
}
catch(const std::exception& ex)
{
std::cerr << "error in worker thread: " << ex.what() << std::endl;
}
catch(...)
{
std::cerr << "error in worker thread: unknown" << std::endl;
}
}
}
_state = state::finished;

+ 106
- 44
test/dummy.cpp View File

@@ -40,13 +40,9 @@ namespace test
double y;
};

struct test
{ };

MAKE_COMPONENT_TAG(position)
MAKE_COMPONENT_TAG(velocity)
MAKE_COMPONENT_TAG(acceleration)
MAKE_COMPONENT_TAG(test)
}

namespace system
@@ -60,18 +56,16 @@ namespace test
struct render
{ };

struct foo
{ };

MAKE_SYSTEM_TAG(accelerate)
MAKE_SYSTEM_TAG(move)
MAKE_SYSTEM_TAG(render)
MAKE_SYSTEM_TAG(foo)
}
}

using namespace test;

namespace hana = ::boost::hana;

namespace c = component;
namespace ct = c::tag;
namespace cs = ecs::signature::component;
@@ -96,71 +90,139 @@ constexpr decltype(auto) cs_acceleration =
cs::make(ct::acceleration)
.storage(cs::storage::dynamic<storage_size>);

constexpr decltype(auto) cs_test =
cs::make(ct::test)
.storage(cs::storage::dynamic<storage_size>);

constexpr decltype(auto) cs_list = csl::make(
cs_position,
cs_velocity,
cs_acceleration,
cs_test);

constexpr decltype(auto) ss_foo =
ss::make(st::foo)
.write(ct::test);
cs_acceleration);

constexpr decltype(auto) ss_accelerate =
ss::make(st::accelerate)
// .parallelism(ss::parallelism::split_evenly(hana::size_c<3>))
.read(ct::acceleration)
.write(ct::velocity);

constexpr decltype(auto) ss_move =
ss::make(st::move)
.parallelism(ss::parallelism::split_every(hana::size_c<3>))
.read(ct::velocity)
.write(ct::position);

constexpr decltype(auto) ss_render =
ss::make(st::render)
.read(ct::velocity, ct::position, ct::acceleration, ct::test);
.parallelism(ss::parallelism::main())
.read(ct::velocity, ct::position, ct::acceleration);

constexpr decltype(auto) ss_list = ssl::make(
ss_foo,
ss_accelerate,
ss_move,
ss_render);

constexpr decltype(auto) settings = ::ecs::settings::make()
.component_signatures(cs_list)
.system_signatures (ss_list);
.system_signatures (ss_list)
.refresh_parallelism (ecs::settings::refresh_parallelism::disable);

namespace sea = ::ecs::system_execution_adapter;

double frand(double min, double max)
{
double f = (double)rand() / RAND_MAX;
return min + f * (max - min);
}

inline void log_system_execution(const std::string& system)
{
std::ostringstream os;
os << system
<< " (thread=" << std::this_thread::get_id()
<< ")"
<< std::endl;
std::cout << os.str();
}

template<typename T_data_proxy>
inline void log_system_execution(const std::string& system, const T_data_proxy& data)
{
std::ostringstream os;
os << system
<< " (thread=" << std::this_thread::get_id()
<< ", index=" << data._index
<< ", begin=" << data._begin
<< ", end=" << data._end
<< ", count=" << data.entity_count()
<< ")"
<< std::endl;
std::cout << os.str();
}

TEST(DummyTest, fuu)
{
srand(static_cast<unsigned int>(time(nullptr)));

auto context = ::ecs::context::make(settings);
context.step([](auto& step_proxy){
step_proxy.execute_systems()(
sea::tags(st::foo)
.for_subtasks([](auto& s, auto& data){
std::cout << "foo" << std::endl
<< " " << type_helper<decltype(data)>::name() << std::endl;
}),
sea::tags(st::accelerate)
.for_subtasks([](auto& s, auto& data){
std::cout << "accelerate" << std::endl
<< " " << type_helper<decltype(data)>::name() << std::endl;
}),
sea::tags(st::move)
.for_subtasks([](auto& s, auto& data){
std::cout << "move" << std::endl
<< " " << type_helper<decltype(data)>::name() << std::endl;
}),
sea::tags(st::render)
.for_subtasks([](auto& s, auto& data){
std::cout << "render" << std::endl
<< " " << type_helper<decltype(data)>::name() << std::endl;
})
);

context.step([](auto& proxy){
for (auto i = 0; i < 10; ++i) {
auto handle = proxy.create_entity();

auto& position = proxy.add_component(handle, ct::position);
position.x = frand(-100, 100);
position.y = frand(-100, 100);

auto& velocity = proxy.add_component(handle, ct::velocity);
velocity.x = frand(-1, 1);
velocity.y = frand(-1, 1);

auto& acceleration = proxy.add_component(handle, ct::acceleration);
acceleration.x = frand(-1, 1);
acceleration.y = frand(-1, 1);
}
});

std::cout << "main " << std::this_thread::get_id() << std::endl;

for (int i = 0; i < 10; ++i) {
context.step([](auto& proxy){
proxy.execute_systems()(
sea::tags(st::accelerate)
.for_subtasks([](auto& s, auto& data){
log_system_execution("accelerate");
data.for_entities([&data](auto& handle) {
auto& velocity = data.get_component(ct::velocity, handle);
auto& acceleration = data.get_component(ct::acceleration, handle);

velocity.x += acceleration.x;
velocity.y += acceleration.y;
});
}),
sea::tags(st::move)
.for_subtasks([](auto& s, auto& data){
log_system_execution("move", data);
data.for_entities([&data](auto& handle) {
auto& position = data.get_component(ct::position, handle);
auto& velocity = data.get_component(ct::velocity, handle);

position.x += velocity.x;
position.y += velocity.y;
});
}),
sea::tags(st::render)
.for_subtasks([](auto& s, auto& data){
log_system_execution("render");
data.for_entities([&data](auto& handle) {
auto& position = data.get_component(ct::position, handle);
auto& velocity = data.get_component(ct::velocity, handle);
auto& acceleration = data.get_component(ct::acceleration, handle);

std::cout << std::fixed
<< " " << handle.index() << ": "
<< "pos(" << position.x << ";" << position.y << ") "
<< "vel(" << velocity.x << ";" << velocity.y << ") "
<< "acc(" << acceleration.x << ";" << acceleration.y << ")"
<< std::endl;
});
})
);
});
}
}

Loading…
Cancel
Save