作者 | SHA1 | 備註 | 提交日期 |
---|---|---|---|
bergmann | 9d365d8118 | * added ToDo list | 5 年之前 |
bergmann | 28f4d2508c | * fixed dead lock in thread pool | 5 年之前 |
bergmann | b109ee1170 | * implemented executor for split_every and split_evenly parallelism strategy | 5 年之前 |
bergmann | c13d678d85 | * bind system execution to specific threads (also including main thread) | 5 年之前 |
bergmann | a21d5daf59 |
* implemented entity creation and matching
* added dummy system processing |
5 年之前 |
@@ -0,0 +1,3 @@ | |||||
- check if the thread pool can deadlock if all threads are waiting for their counter_blocker | |||||
- check if we could implement some kind of progress_messages for the worker threads while waiting for counter_blocker | |||||
- check if the conditional variables are used correctly inside counter_blockers |
@@ -3,6 +3,7 @@ | |||||
#include <ecs/config.h> | #include <ecs/config.h> | ||||
#include <ecs/core/utils/thread_pool.h> | #include <ecs/core/utils/thread_pool.h> | ||||
#include <ecs/core/utils/counter_blocker.h> | #include <ecs/core/utils/counter_blocker.h> | ||||
#include <ecs/core/utils/ordered_vector.h> | |||||
#include <ecs/core/component/manager.h> | #include <ecs/core/component/manager.h> | ||||
#include "./context.fwd.h" | #include "./context.fwd.h" | ||||
@@ -44,7 +45,7 @@ namespace context { | |||||
using handle_type = entity_handle_type; | using handle_type = entity_handle_type; | ||||
private: | private: | ||||
using handle_vector_type = std::vector<handle_type>; | |||||
using handle_set_type = core::utils::ordered_vector<handle_type>; | |||||
protected: | protected: | ||||
context_type& _context; //!< reference to its own child class | context_type& _context; //!< reference to its own child class | ||||
@@ -56,8 +57,8 @@ namespace context { | |||||
core::utils::thread_pool _thread_pool; //!< thread pool to execute tasks parallel | core::utils::thread_pool _thread_pool; //!< thread pool to execute tasks parallel | ||||
handle_vector_type _to_match; //!< handles of entities that were created in the last tick | |||||
handle_vector_type _to_kill; //!< handles of entities that were destroyed in the last tick | |||||
handle_set_type _to_match; //!< handles of entities that were created in the last tick | |||||
handle_set_type _to_kill; //!< handles of entities that were destroyed in the last tick | |||||
public: | public: | ||||
@@ -76,9 +77,10 @@ namespace context { | |||||
* @tparam T_func type of the function to execute | * @tparam T_func type of the function to execute | ||||
* | * | ||||
* @param func function to execute | * @param func function to execute | ||||
* @param thread_id ID of the thread to add task to | |||||
*/ | */ | ||||
template<typename T_func> | template<typename T_func> | ||||
inline void _post_in_thread_pool(T_func&& func); | |||||
inline void _post_in_thread_pool(T_func&& func, ssize_t thread_id = -1); | |||||
protected: /* entity */ | protected: /* entity */ | ||||
@@ -111,6 +113,22 @@ namespace context { | |||||
*/ | */ | ||||
inline bool _is_alive(const handle_type& handle) const; | inline bool _is_alive(const handle_type& handle) const; | ||||
/** | |||||
* destroy the passed entity | |||||
* | |||||
* @param handle handle of the entity to destroy | |||||
*/ | |||||
inline void _destroy_entity(const handle_type& handle); | |||||
/** | |||||
* get the meta data for the passed entity | |||||
* | |||||
* @param handle handle to get meta data for | |||||
* | |||||
* @return meta data of the passed entity | |||||
*/ | |||||
inline const auto& _entity_meta_data(const handle_type& handle); | |||||
protected: /* component */ | protected: /* component */ | ||||
/** | /** | ||||
@@ -26,9 +26,9 @@ namespace detail { | |||||
template<typename T_settings> | template<typename T_settings> | ||||
template<typename T_func> | template<typename T_func> | ||||
inline void base_t<T_settings> | inline void base_t<T_settings> | ||||
::_post_in_thread_pool(T_func&& func) | |||||
::_post_in_thread_pool(T_func&& func, ssize_t thread_id) | |||||
{ | { | ||||
_thread_pool.post(std::forward<T_func>(func)); | |||||
_thread_pool.post(std::forward<T_func>(func), thread_id); | |||||
} | } | ||||
/* entity */ | /* entity */ | ||||
@@ -38,15 +38,16 @@ namespace detail { | |||||
inline decltype(auto) base_t<T_settings> | inline decltype(auto) base_t<T_settings> | ||||
::_create_entity(T_args&&... args) | ::_create_entity(T_args&&... args) | ||||
{ | { | ||||
_to_match.emplace_back(_entities.claim(std::forward<T_args>(args)...)); | |||||
return _to_match.back(); | |||||
auto handle = _entities.claim(std::forward<T_args>(args)...); | |||||
_to_match.insert(handle); | |||||
return handle; | |||||
} | } | ||||
template<typename T_settings> | template<typename T_settings> | ||||
inline void base_t<T_settings> | inline void base_t<T_settings> | ||||
::_kill_entity(const handle_type& handle) | ::_kill_entity(const handle_type& handle) | ||||
{ | { | ||||
_to_kill.emplace_back(handle); | |||||
_to_kill.insert(handle); | |||||
} | } | ||||
template<typename T_settings> | template<typename T_settings> | ||||
@@ -56,6 +57,20 @@ namespace detail { | |||||
return _entities.is_valid(handle); | return _entities.is_valid(handle); | ||||
} | } | ||||
template<typename T_settings> | |||||
inline void base_t<T_settings> | |||||
::_destroy_entity(const handle_type& handle) | |||||
{ | |||||
_entities.reclaim(handle); | |||||
} | |||||
template<typename T_settings> | |||||
inline const auto& base_t<T_settings> | |||||
::_entity_meta_data(const handle_type& handle) | |||||
{ | |||||
return _entities.meta_data(handle); | |||||
} | |||||
/* components */ | /* components */ | ||||
template<typename T_settings> | template<typename T_settings> | ||||
@@ -63,7 +78,7 @@ namespace detail { | |||||
inline decltype(auto) base_t<T_settings> | inline decltype(auto) base_t<T_settings> | ||||
::_add_component(const handle_type& handle, T_component_tag ct) | ::_add_component(const handle_type& handle, T_component_tag ct) | ||||
{ | { | ||||
_to_match.emplace_back(handle); | |||||
_to_match.insert(handle); | |||||
auto& meta = _entities.meta_data(handle); | auto& meta = _entities.meta_data(handle); | ||||
auto& cmd = meta.storage_meta_data(); | auto& cmd = meta.storage_meta_data(); | ||||
auto& c = _components.add(ct, handle, cmd); | auto& c = _components.add(ct, handle, cmd); | ||||
@@ -91,7 +106,7 @@ namespace detail { | |||||
auto& bitset = meta.bitset(); | auto& bitset = meta.bitset(); | ||||
auto& cmd = meta.storage_meta_data(); | auto& cmd = meta.storage_meta_data(); | ||||
if (!bitset.has_component(ct)) | if (!bitset.has_component(ct)) | ||||
throw std::invalid_argument("entity does not contain the requested compnent"); | |||||
throw std::invalid_argument("entity does not contain the requested component"); | |||||
return _components.get(ct, handle, cmd); | return _components.get(ct, handle, cmd); | ||||
} | } | ||||
@@ -154,7 +169,7 @@ namespace detail { | |||||
::_for_systems_parallel(T_func&& func) | ::_for_systems_parallel(T_func&& func) | ||||
{ | { | ||||
core::utils::counter_blocker counter(_systems.size()); | core::utils::counter_blocker counter(_systems.size()); | ||||
counter.execute_and_wait_until_zero([this, &counter, func = std::forward<T_func>(func)]{ | |||||
counter.execute_and_wait([this, &counter, func = std::forward<T_func>(func)]{ | |||||
_systems.for_each([this, &counter, &func](auto& instance){ | _systems.for_each([this, &counter, &func](auto& instance){ | ||||
this->_post_in_thread_pool([&counter, &func, &instance]{ | this->_post_in_thread_pool([&counter, &func, &instance]{ | ||||
ecs_make_scope_guard([&counter](){ | ecs_make_scope_guard([&counter](){ | ||||
@@ -18,16 +18,91 @@ namespace context { | |||||
private: | private: | ||||
using settings_type = T_settings; | using settings_type = T_settings; | ||||
using this_type = context_t<settings_type>; | using this_type = context_t<settings_type>; | ||||
using base_type = step_proxy_t<settings_type>; | |||||
using step_proxy = step_proxy_t<settings_type>; | |||||
using defer_proxy = defer_proxy_t<settings_type>; | |||||
using base_type = step_proxy; | |||||
private: | private: | ||||
inline void refresh() | inline void refresh() | ||||
{ | { | ||||
refresh_execute_deferred(); | |||||
refresh_kill_entities(); | |||||
refresh_match_entities(); | |||||
} | |||||
inline void refresh_match_entities() | |||||
{ | |||||
/* match new/changed entities to system instances */ | |||||
for_systems_dispatch([this](auto& instance){ | |||||
auto& system_bitset = instance.bitset(); | |||||
for (const auto& handle : this->_to_match) | |||||
{ | |||||
auto& entity_bitset = this->entity_meta_data(handle).bitset(); | |||||
if (entity_bitset.contains(system_bitset)) | |||||
{ | |||||
if (instance.subscribe(handle)) | |||||
{ | |||||
// TODO send event | |||||
} | |||||
} | |||||
else if (instance.unsubscribe(handle)) | |||||
{ | |||||
// TODO send event | |||||
} | |||||
} | |||||
}); | |||||
this->_to_match.clear(); | |||||
} | |||||
inline void refresh_kill_entities() | |||||
{ | |||||
/* copy entities to kill to the main vector */ | |||||
for_systems_sequential([this](auto& instance){ | |||||
instance.for_states([this](auto& state){ | |||||
for (auto& handle : state.to_kill) | |||||
{ | |||||
this->_to_kill.insert(handle); | |||||
} | |||||
}); | |||||
}); | |||||
/* unsubscribe dead entities */ | |||||
for_systems_dispatch([this](auto& instance){ | |||||
for (const auto& handle : this->_to_kill) | |||||
{ | |||||
if (instance.unsubscribe(handle)) | |||||
{ | |||||
// TODO send event | |||||
} | |||||
} | |||||
}); | |||||
/* reclaim all killed entities */ | |||||
for (const auto& handle : this->_to_kill) | |||||
{ | |||||
destroy_entity(handle); | |||||
} | |||||
this->_to_kill.clear(); | |||||
} | |||||
inline void refresh_execute_deferred() | |||||
{ | |||||
defer_proxy& proxy = *this; | |||||
for_systems_sequential([&proxy](auto& instance){ | |||||
instance.for_states([&proxy](auto& state){ | |||||
state.deferred_functions.execute_all(proxy); | |||||
}); | |||||
}); | |||||
} | } | ||||
public: | public: | ||||
ecs_context_proxy_func(this_type, destroy_entity) | |||||
ecs_context_proxy_func(this_type, post_in_thread_pool) | ecs_context_proxy_func(this_type, post_in_thread_pool) | ||||
ecs_context_proxy_func(this_type, for_systems_sequential) | |||||
ecs_context_proxy_func(this_type, for_systems_parallel) | |||||
ecs_context_proxy_func(this_type, for_systems_dispatch) | |||||
public: | public: | ||||
inline context_t() | inline context_t() | ||||
@@ -33,6 +33,7 @@ namespace context { | |||||
ecs_context_proxy_func(this_type, create_entity) | ecs_context_proxy_func(this_type, create_entity) | ||||
ecs_context_proxy_func(this_type, kill_entity) | ecs_context_proxy_func(this_type, kill_entity) | ||||
ecs_context_proxy_func(this_type, is_alive) | ecs_context_proxy_func(this_type, is_alive) | ||||
ecs_context_proxy_func(this_type, entity_meta_data) | |||||
public: /* component */ | public: /* component */ | ||||
ecs_context_proxy_func(this_type, add_component) | ecs_context_proxy_func(this_type, add_component) | ||||
@@ -105,6 +105,13 @@ namespace storage { | |||||
*/ | */ | ||||
inline auto& bitset(); | inline auto& bitset(); | ||||
/** | |||||
* get the bitset of components stored for this entity | |||||
* | |||||
* @return component bitset | |||||
*/ | |||||
inline const auto& bitset() const; | |||||
/** | /** | ||||
* get the current reusage counter of the entity | * get the current reusage counter of the entity | ||||
* | * | ||||
@@ -58,6 +58,13 @@ namespace storage { | |||||
return _bitset; | return _bitset; | ||||
} | } | ||||
template <typename T_settings, typename T_storage_meta_data> | |||||
inline const auto& entity_meta_data<T_settings, T_storage_meta_data> | |||||
::bitset() const | |||||
{ | |||||
return _bitset; | |||||
} | |||||
template <typename T_settings, typename T_storage_meta_data> | template <typename T_settings, typename T_storage_meta_data> | ||||
inline auto entity_meta_data<T_settings, T_storage_meta_data> | inline auto entity_meta_data<T_settings, T_storage_meta_data> | ||||
::counter() const | ::counter() const | ||||
@@ -120,7 +127,7 @@ namespace storage { | |||||
grow(grow_size); | grow(grow_size); | ||||
} | } | ||||
assert(!_free_ids.empty()); | assert(!_free_ids.empty()); | ||||
auto index = _free_ids.back(); | |||||
auto index = _free_ids.front(); | |||||
auto& item = _container.at(index); | auto& item = _container.at(index); | ||||
_free_ids.pop(); | _free_ids.pop(); | ||||
return entity_handle(index, item.counter()); | return entity_handle(index, item.counter()); | ||||
@@ -1,7 +1,9 @@ | |||||
#pragma once | #pragma once | ||||
#include "./data_proxy/base.h" | #include "./data_proxy/base.h" | ||||
#include "./data_proxy/multi.h" | |||||
#include "./data_proxy/single.h" | #include "./data_proxy/single.h" | ||||
#include "./data_proxy/base.inl" | #include "./data_proxy/base.inl" | ||||
#include "./data_proxy/multi.inl" | |||||
#include "./data_proxy/single.inl" | #include "./data_proxy/single.inl" |
@@ -0,0 +1,112 @@ | |||||
#pragma once | |||||
#include <ecs/config.h> | |||||
#include <ecs/core/system/data_proxy/base.h> | |||||
namespace ecs { | |||||
namespace core { | |||||
namespace system { | |||||
namespace data_proxy { | |||||
/** | |||||
* data proxy to execute system non-parallel | |||||
* | |||||
* @tparam T_context context type of the ECS environment | |||||
* @tparam T_instance system instance type | |||||
*/ | |||||
template<typename T_context, typename T_instance> | |||||
struct multi | |||||
: public base<T_context, T_instance> | |||||
{ | |||||
private: | |||||
using context_type = T_context; | |||||
using instance_type = T_instance; | |||||
using base_type = base<context_type, instance_type>; | |||||
public: // private: | |||||
size_t _index; | |||||
size_t _begin; | |||||
size_t _end; | |||||
public: | |||||
/** | |||||
* create the multi data proxy | |||||
* | |||||
* @param p_context context of the ECS environment | |||||
* @param p_instance system instance | |||||
* @param p_index index of this data proxy | |||||
* @param p_begin index of the first entity to process | |||||
* @param p_end index of the first entity not to process | |||||
*/ | |||||
inline multi(context_type& p_context, instance_type& p_instance, size_t p_index, size_t p_begin, size_t p_end); | |||||
/** | |||||
* execute the given function for all entities handled by this data proxy | |||||
* | |||||
* @tparam T_func function type to execute | |||||
* | |||||
* @param func function to execute | |||||
*/ | |||||
template<typename T_func> | |||||
inline void for_entities(T_func&& func) const; | |||||
/** | |||||
* execute the given function for all entities not handled by this data proxy | |||||
* | |||||
* @tparam T_func function type to execute | |||||
* | |||||
* @param func function to execute | |||||
*/ | |||||
template<typename T_func> | |||||
void for_other_entities(T_func&& func) const; | |||||
/** | |||||
* execute the given function for all entities | |||||
* | |||||
* @tparam T_func function type to execute | |||||
* | |||||
* @param func function to execute | |||||
*/ | |||||
template<typename T_func> | |||||
void for_all_entities(T_func&& func) const; | |||||
/** | |||||
* get the number of entities handled by this data proxy | |||||
* | |||||
* @return number of entities handle by this data proxy | |||||
*/ | |||||
inline size_t entity_count() const; | |||||
/** | |||||
* get the number of entities not handled by this data proxy | |||||
* | |||||
* @return number of entities not handle by this data proxy | |||||
*/ | |||||
inline size_t other_entity_count() const; | |||||
/** | |||||
* get the number of all entities | |||||
* | |||||
* @return number of all entities | |||||
*/ | |||||
inline size_t all_entity_count() const; | |||||
}; | |||||
/** | |||||
* create a multi data proxy (data proxy that does not execute in parallel) | |||||
* | |||||
* @tparam T_context context type | |||||
* @tparam T_instance instance type | |||||
* | |||||
* @param context context | |||||
* @param instance instance | |||||
* @param index index of this data proxy | |||||
* @param begin index of the first entity to process | |||||
* @param end index of the first entity not to process | |||||
* | |||||
* @return multi data proxy | |||||
*/ | |||||
template<typename T_context, typename T_instance> | |||||
constexpr decltype(auto) make_multi(T_context&& context, T_instance&& instance, size_t index, size_t begin, size_t end); | |||||
} } } } |
@@ -0,0 +1,96 @@ | |||||
#pragma once | |||||
#include <ecs/core/system/data_proxy/multi.h> | |||||
namespace ecs { | |||||
namespace core { | |||||
namespace system { | |||||
namespace data_proxy { | |||||
/* multi */ | |||||
template<typename T_context, typename T_instance> | |||||
inline multi<T_context, T_instance> | |||||
::multi(context_type& p_context, instance_type& p_instance, size_t p_index, size_t p_begin, size_t p_end) | |||||
: base_type (p_context, p_instance) | |||||
, _index (p_index) | |||||
, _begin (p_begin) | |||||
, _end (p_end) | |||||
{ } | |||||
template<typename T_context, typename T_instance> | |||||
template<typename T_func> | |||||
inline void multi<T_context, T_instance> | |||||
::for_entities(T_func&& func) const | |||||
{ | |||||
for (size_t i = _begin; i < _end; ++i) | |||||
{ | |||||
std::forward<T_func>(func)(this->instance.subscribed()[i]); | |||||
} | |||||
} | |||||
template<typename T_context, typename T_instance> | |||||
template<typename T_func> | |||||
inline void multi<T_context, T_instance> | |||||
::for_other_entities(T_func&& func) const | |||||
{ | |||||
auto& subscribed = this->instance.subscribed(); | |||||
auto total_count = this->instance.subscribed_count(); | |||||
for (size_t i = 0; i < _begin; ++i) | |||||
{ | |||||
std::forward<T_func>(func)(subscribed()[i]); | |||||
} | |||||
for (size_t i = _end; i < total_count; ++i) | |||||
{ | |||||
std::forward<T_func>(func)(subscribed()[i]); | |||||
} | |||||
} | |||||
template<typename T_context, typename T_instance> | |||||
template<typename T_func> | |||||
inline void multi<T_context, T_instance> | |||||
::for_all_entities(T_func&& func) const | |||||
{ | |||||
for (auto& e : this->instance.subscribed()) | |||||
{ | |||||
std::forward<T_func>(func)(e); | |||||
} | |||||
} | |||||
template<typename T_context, typename T_instance> | |||||
inline size_t multi<T_context, T_instance> | |||||
::entity_count() const | |||||
{ | |||||
return _end - _begin; | |||||
} | |||||
template<typename T_context, typename T_instance> | |||||
inline size_t multi<T_context, T_instance> | |||||
::other_entity_count() const | |||||
{ | |||||
return all_entity_count() - entity_count(); | |||||
} | |||||
template<typename T_context, typename T_instance> | |||||
inline size_t multi<T_context, T_instance> | |||||
::all_entity_count() const | |||||
{ | |||||
return this->instance.subscribed_count(); | |||||
} | |||||
/* make_multi */ | |||||
template<typename T_context, typename T_instance> | |||||
constexpr decltype(auto) make_multi(T_context&& context, T_instance&& instance, size_t index, size_t begin, size_t end) | |||||
{ | |||||
using context_type = mp::decay_t<T_context>; | |||||
using instance_type = mp::decay_t<T_instance>; | |||||
return multi<context_type, instance_type>( | |||||
std::forward<T_context>(context), | |||||
std::forward<T_instance>(instance), | |||||
index, | |||||
begin, | |||||
end); | |||||
} | |||||
} } } } |
@@ -34,7 +34,7 @@ namespace data_proxy { | |||||
* @param func function to execute | * @param func function to execute | ||||
*/ | */ | ||||
template<typename T_func> | template<typename T_func> | ||||
void for_entities(T_func&& func) const; | |||||
inline void for_entities(T_func&& func) const; | |||||
/** | /** | ||||
* execute the given function for all entities not handled by this data proxy | * execute the given function for all entities not handled by this data proxy | ||||
@@ -44,7 +44,7 @@ namespace data_proxy { | |||||
* @param func function to execute | * @param func function to execute | ||||
*/ | */ | ||||
template<typename T_func> | template<typename T_func> | ||||
void for_other_entities(T_func&& func) const; | |||||
inline void for_other_entities(T_func&& func) const; | |||||
/** | /** | ||||
* execute the given function for all entities | * execute the given function for all entities | ||||
@@ -54,7 +54,7 @@ namespace data_proxy { | |||||
* @param func function to execute | * @param func function to execute | ||||
*/ | */ | ||||
template<typename T_func> | template<typename T_func> | ||||
void for_all_entities(T_func&& func) const; | |||||
inline void for_all_entities(T_func&& func) const; | |||||
/** | /** | ||||
* get the number of entities handled by this data proxy | * get the number of entities handled by this data proxy | ||||
@@ -11,24 +11,24 @@ namespace data_proxy { | |||||
template<typename T_context, typename T_instance> | template<typename T_context, typename T_instance> | ||||
template<typename T_func> | template<typename T_func> | ||||
void single<T_context, T_instance> | |||||
inline void single<T_context, T_instance> | |||||
::for_entities(T_func&& func) const | ::for_entities(T_func&& func) const | ||||
{ | { | ||||
for (auto& e : this->instance.subscribed()) | for (auto& e : this->instance.subscribed()) | ||||
{ | { | ||||
func(e); | |||||
std::forward<T_func>(func)(e); | |||||
} | } | ||||
} | } | ||||
template<typename T_context, typename T_instance> | template<typename T_context, typename T_instance> | ||||
template<typename T_func> | template<typename T_func> | ||||
void single<T_context, T_instance> | |||||
inline void single<T_context, T_instance> | |||||
::for_other_entities(T_func&& func) const | ::for_other_entities(T_func&& func) const | ||||
{ } | { } | ||||
template<typename T_context, typename T_instance> | template<typename T_context, typename T_instance> | ||||
template<typename T_func> | template<typename T_func> | ||||
void single<T_context, T_instance> | |||||
inline void single<T_context, T_instance> | |||||
::for_all_entities(T_func&& func) const | ::for_all_entities(T_func&& func) const | ||||
{ | { | ||||
for_entities(std::forward<T_func>(func)); | for_entities(std::forward<T_func>(func)); | ||||
@@ -2,6 +2,7 @@ | |||||
#include <ecs/config.h> | #include <ecs/config.h> | ||||
#include <ecs/signature/system.h> | #include <ecs/signature/system.h> | ||||
#include <ecs/context/defer_proxy.h> | |||||
#include <ecs/core/utils/bitset.h> | #include <ecs/core/utils/bitset.h> | ||||
#include <ecs/core/utils/fixed_function.h> | #include <ecs/core/utils/fixed_function.h> | ||||
#include <ecs/core/utils/ordered_vector.h> | #include <ecs/core/utils/ordered_vector.h> | ||||
@@ -14,7 +15,8 @@ namespace system { | |||||
struct deferred_function_vector | struct deferred_function_vector | ||||
{ | { | ||||
private: | private: | ||||
using deferred_proxy_type = int; | |||||
using settings_type = T_settings; | |||||
using deferred_proxy_type = context::detail::defer_proxy_t<settings_type>; | |||||
using function_type = utils::fixed_function<void(deferred_proxy_type&)>; | using function_type = utils::fixed_function<void(deferred_proxy_type&)>; | ||||
using function_vector_type = std::vector<function_type>; | using function_vector_type = std::vector<function_type>; | ||||
@@ -39,8 +41,20 @@ namespace system { | |||||
} | } | ||||
}; | }; | ||||
template<typename T_settings> | |||||
constexpr decltype(auto) get_scheduler_instance_meta_data_type(T_settings&&) noexcept | |||||
{ | |||||
using settings_type = T_settings; | |||||
using context_type = ::ecs::context::type<settings_type>; | |||||
using scheduler_type = decltype((settings_type { }).scheduler()(settings_type { }, std::declval<context_type&>())); | |||||
using instance_meta_data_type = typename scheduler_type::instance_meta_data_type; | |||||
return hana::type_c<instance_meta_data_type>; | |||||
} | |||||
template<typename T_settings, typename T_system_signature, typename T_entity_handle> | template<typename T_settings, typename T_system_signature, typename T_entity_handle> | ||||
struct instance | struct instance | ||||
: private mp::unwrap_t<mp::decay_t<decltype(get_scheduler_instance_meta_data_type(T_settings { }))>> | |||||
{ | { | ||||
static_assert(decltype(signature::system::is_valid(mp::unwrap_t<T_system_signature> { })) { }); | static_assert(decltype(signature::system::is_valid(mp::unwrap_t<T_system_signature> { })) { }); | ||||
@@ -51,6 +65,7 @@ namespace system { | |||||
using system_tag_type = mp::decay_t<decltype(core::mp::unwrap(system_signature_type { }).tag())>; | using system_tag_type = mp::decay_t<decltype(core::mp::unwrap(system_signature_type { }).tag())>; | ||||
using system_type = mp::unwrap_t<system_tag_type>; | using system_type = mp::unwrap_t<system_tag_type>; | ||||
using output_type = mp::unwrap_t<mp::decay_t<decltype(mp::unwrap(system_signature_type { }).output())>>; | using output_type = mp::unwrap_t<mp::decay_t<decltype(mp::unwrap(system_signature_type { }).output())>>; | ||||
using scheduler_meta_data_type = mp::unwrap_t<mp::decay_t<decltype(get_scheduler_instance_meta_data_type(T_settings { }))>>; | |||||
private: | private: | ||||
using signature_list_type = decltype((settings_type { }).system_signatures()); | using signature_list_type = decltype((settings_type { }).system_signatures()); | ||||
@@ -92,7 +107,7 @@ namespace system { | |||||
public: | public: | ||||
instance() | instance() | ||||
: _bitset () | |||||
: _bitset (bitset_type::from_system_signature(mp::unwrap(system_signature_type { }))) | |||||
, _system () | , _system () | ||||
, _executor (mp::unwrap(system_signature_type { }).parallelism()()) | , _executor (mp::unwrap(system_signature_type { }).parallelism()()) | ||||
, _subscribed() | , _subscribed() | ||||
@@ -122,34 +137,17 @@ namespace system { | |||||
clear_and_prepare(n); | clear_and_prepare(n); | ||||
} | } | ||||
template<typename T_context, typename T_func> | |||||
inline void prepare_and_wait_subtasks(T_context& context, size_t n, T_func& func) | |||||
{ | |||||
assert(n > 0); | |||||
clear_and_prepare(n); | |||||
utils::counter_blocker b(n-1); | |||||
auto run_in_seperate_thread = [this, &context, &b](auto& f) { | |||||
return [this, &b, &context, &f](auto&&... xs) { | |||||
context.post_in_thread_pool([&f, &b, xs...]() { | |||||
ecs_make_scope_guard([&b](){ | |||||
b.decrement(); | |||||
}); | |||||
f(xs...); | |||||
}); | |||||
}; | |||||
}; | |||||
b.execute_and_wait_until_zero([&func, &run_in_seperate_thread]{ | |||||
func(run_in_seperate_thread); | |||||
}); | |||||
} | |||||
template<typename T_context, typename T_func> | template<typename T_context, typename T_func> | ||||
inline void execute(T_context& context, T_func&& func) | inline void execute(T_context& context, T_func&& func) | ||||
{ _executor(context, *this, std::forward<T_func>(func)); } | { _executor(context, *this, std::forward<T_func>(func)); } | ||||
public: /* bitset */ | |||||
public: /* misc */ | |||||
constexpr decltype(auto) scheduler_meta_data() noexcept | |||||
{ return static_cast<scheduler_meta_data_type&>(*this); } | |||||
constexpr decltype(auto) signature() const noexcept | |||||
{ return mp::unwrap(system_signature_type { }); } | |||||
inline const auto& bitset() const noexcept | inline const auto& bitset() const noexcept | ||||
{ return _bitset; } | { return _bitset; } | ||||
@@ -173,11 +171,22 @@ namespace system { | |||||
inline bool is_subscribed(const entity_handle_type& handle) const | inline bool is_subscribed(const entity_handle_type& handle) const | ||||
{ return (_subscribed.find(handle) != _subscribed.end()); } | { return (_subscribed.find(handle) != _subscribed.end()); } | ||||
inline void subscribe(const entity_handle_type& handle) | |||||
{ _subscribed.insert(handle); } | |||||
inline bool subscribe(const entity_handle_type& handle) | |||||
{ return _subscribed.insert(handle).second; } | |||||
inline void unsubscribe(const entity_handle_type& handle) | |||||
{ _subscribed.erase(handle); } | |||||
inline bool unsubscribe(const entity_handle_type& handle) | |||||
{ | |||||
auto it = _subscribed.find(handle); | |||||
if (it != _subscribed.end()) | |||||
{ | |||||
_subscribed.erase(it); | |||||
return true; | |||||
} | |||||
else | |||||
{ | |||||
return false; | |||||
} | |||||
} | |||||
public: /* states */ | public: /* states */ | ||||
template<typename T_func> | template<typename T_func> | ||||
@@ -20,7 +20,7 @@ namespace parallelism { | |||||
template<typename T_context, typename T_instance, typename T_func> | template<typename T_context, typename T_instance, typename T_func> | ||||
inline void operator()(T_context& context, T_instance& instance, T_func&& func) const | inline void operator()(T_context& context, T_instance& instance, T_func&& func) const | ||||
{ | { | ||||
bool threshold_reached = false; // TODO | |||||
bool threshold_reached = instance.subscribed_count() >= parameters_type::get_threshold(); | |||||
if (!threshold_reached) _strategy_lower (instance, context, std::forward<T_func>(func)); | if (!threshold_reached) _strategy_lower (instance, context, std::forward<T_func>(func)); | ||||
else _strategy_greater(instance, context, std::forward<T_func>(func)); | else _strategy_greater(instance, context, std::forward<T_func>(func)); | ||||
} | } | ||||
@@ -10,29 +10,55 @@ namespace parallelism { | |||||
struct none | struct none | ||||
{ | { | ||||
public: | |||||
template<typename T_context, typename T_instance> | template<typename T_context, typename T_instance> | ||||
struct executor_proxy | struct executor_proxy | ||||
{ | { | ||||
public: | |||||
using context_type = T_context; | using context_type = T_context; | ||||
using instance_type = T_instance; | using instance_type = T_instance; | ||||
context_type& context; | context_type& context; | ||||
instance_type& instance; | instance_type& instance; | ||||
public: | |||||
template<typename T_func> | template<typename T_func> | ||||
inline void for_subtasks(T_func&& func) | inline void for_subtasks(T_func&& func) | ||||
{ | { | ||||
instance.prepare_states(1); | instance.prepare_states(1); | ||||
auto data = data_proxy::make_single(context, instance); | auto data = data_proxy::make_single(context, instance); | ||||
func(data); | |||||
std::forward<T_func>(func)(data); | |||||
} | } | ||||
}; | }; | ||||
private: | |||||
#ifndef NDEBUG | |||||
bool _bound { false }; | |||||
mutable std::thread::id _thread_id { 0 }; | |||||
#endif | |||||
public: | |||||
#ifndef NDEBUG | |||||
none(bool bound) | |||||
: _bound(bound) | |||||
{ } | |||||
#else | |||||
none() | |||||
{ } | |||||
#endif | |||||
template<typename T_context, typename T_instance, typename T_func> | template<typename T_context, typename T_instance, typename T_func> | ||||
inline void operator()(T_context& context, T_instance& instance, T_func&& func) const | inline void operator()(T_context& context, T_instance& instance, T_func&& func) const | ||||
{ | { | ||||
#ifndef NDEBUG | |||||
assert( !_bound | |||||
&& ( _thread_id == std::thread::id { 0 } | |||||
|| _thread_id == std::this_thread::get_id())); | |||||
_thread_id = std::this_thread::get_id(); | |||||
#endif | |||||
executor_proxy<T_context, T_instance> ep { context, instance }; | executor_proxy<T_context, T_instance> ep { context, instance }; | ||||
func(instance, ep); | |||||
std::forward<T_func>(func)(instance, ep); | |||||
} | } | ||||
}; | }; | ||||
@@ -0,0 +1,66 @@ | |||||
#pragma once | |||||
#include <ecs/config.h> | |||||
#include <ecs/core/system/data_proxy.h> | |||||
namespace ecs { | |||||
namespace core { | |||||
namespace system { | |||||
namespace parallelism { | |||||
struct split_base | |||||
{ | |||||
public: | |||||
template<typename T_context, typename T_instance> | |||||
struct executor_proxy | |||||
{ | |||||
public: | |||||
using context_type = T_context; | |||||
using instance_type = T_instance; | |||||
context_type& context; | |||||
instance_type& instance; | |||||
size_t split_count; | |||||
size_t per_split; | |||||
public: | |||||
template<typename T_func> | |||||
inline void for_subtasks(T_func&& func) | |||||
{ | |||||
auto total_count = instance.subscribed_count(); | |||||
instance.prepare_states(split_count); | |||||
utils::counter_blocker counter(split_count - 1); | |||||
/* execute the splitted subtask */ | |||||
auto execute = [this, &func](size_t index, size_t from, size_t to) | |||||
{ | |||||
auto data = data_proxy::make_multi(context, instance, index, from, to); | |||||
func(data); | |||||
}; | |||||
/* create subtasks */ | |||||
for (size_t i = 0; i < split_count - 1; ++i) | |||||
{ | |||||
context.post_in_thread_pool([this, &counter, &execute, i](auto){ | |||||
ecs_make_scope_guard([&counter](){ | |||||
counter.decrement(); | |||||
}); | |||||
auto beg = i * per_split; | |||||
auto end = (i + 1) * per_split; | |||||
execute(i, beg, end); | |||||
}); | |||||
} | |||||
/* execute and wait */ | |||||
counter.execute_and_wait([this, &execute, total_count]{ | |||||
auto index = split_count - 1; | |||||
auto beg = index * per_split; | |||||
auto end = total_count; | |||||
execute(index, beg, end); | |||||
}); | |||||
} | |||||
}; | |||||
}; | |||||
} } } } |
@@ -1,6 +1,7 @@ | |||||
#pragma once | #pragma once | ||||
#include <ecs/config.h> | #include <ecs/config.h> | ||||
#include <ecs/core/system/parallelism/strategy/split_base.h> | |||||
namespace ecs { | namespace ecs { | ||||
namespace core { | namespace core { | ||||
@@ -9,14 +10,19 @@ namespace parallelism { | |||||
template<typename T_parameters> | template<typename T_parameters> | ||||
struct split_evenly | struct split_evenly | ||||
: private split_base | |||||
{ | { | ||||
public: | |||||
using parameters_type = T_parameters; | using parameters_type = T_parameters; | ||||
template<typename T_context, typename T_instance, typename T_func> | template<typename T_context, typename T_instance, typename T_func> | ||||
inline void operator()(T_context& context, T_instance& instance, T_func&& func) const | inline void operator()(T_context& context, T_instance& instance, T_func&& func) const | ||||
{ | { | ||||
// TODO | |||||
// auto split_count = parameters_type::get_split_count(); | |||||
auto split_count = parameters_type::get_split_count(); | |||||
auto total_count = instance.subscribed_count(); | |||||
auto per_split = total_count / split_count; | |||||
executor_proxy<T_context, T_instance> ep { context, instance, split_count, per_split }; | |||||
func(instance, ep); | |||||
} | } | ||||
}; | }; | ||||
@@ -1,6 +1,7 @@ | |||||
#pragma once | #pragma once | ||||
#include <ecs/config.h> | #include <ecs/config.h> | ||||
#include <ecs/core/system/parallelism/strategy/split_base.h> | |||||
namespace ecs { | namespace ecs { | ||||
namespace core { | namespace core { | ||||
@@ -9,14 +10,19 @@ namespace parallelism { | |||||
template<typename T_parameters> | template<typename T_parameters> | ||||
struct split_every | struct split_every | ||||
: private split_base | |||||
{ | { | ||||
using parameters_type = T_parameters; | using parameters_type = T_parameters; | ||||
template<typename T_context, typename T_instance, typename T_func> | template<typename T_context, typename T_instance, typename T_func> | ||||
inline void operator()(T_context& context, T_instance& instance, T_func&& func) const | inline void operator()(T_context& context, T_instance& instance, T_func&& func) const | ||||
{ | { | ||||
// TODO | |||||
// auto per_split = parameters_type::get_per_split_count(); | |||||
auto per_split = parameters_type::get_per_split_count(); | |||||
auto total_count = instance.subscribed_count(); | |||||
auto split_count = (total_count + per_split - 1) / per_split; | |||||
executor_proxy<T_context, T_instance> ep { context, instance, split_count, per_split }; | |||||
func(instance, ep); | |||||
} | } | ||||
}; | }; | ||||
@@ -3,6 +3,7 @@ | |||||
#include <ecs/config.h> | #include <ecs/config.h> | ||||
#include <ecs/tag/system.h> | #include <ecs/tag/system.h> | ||||
#include <ecs/context/context.fwd.h> | #include <ecs/context/context.fwd.h> | ||||
#include <ecs/core/utils/thread_pool/task_queue.h> | |||||
namespace ecs { | namespace ecs { | ||||
namespace core { | namespace core { | ||||
@@ -11,6 +12,14 @@ namespace scheduler { | |||||
namespace detail | namespace detail | ||||
{ | { | ||||
/** | |||||
* meta data to store for each system instance | |||||
*/ | |||||
struct instance_meta_data_t | |||||
{ | |||||
ssize_t thread_id = -1; | |||||
}; | |||||
/** | /** | ||||
* struct to wrap some system meta data | * struct to wrap some system meta data | ||||
*/ | */ | ||||
@@ -113,6 +122,34 @@ namespace scheduler { | |||||
constexpr decltype(auto) operator()(T_dependency_items) const noexcept; | constexpr decltype(auto) operator()(T_dependency_items) const noexcept; | ||||
}; | }; | ||||
/** | |||||
* extention of the normal counter_blocker to handle tasks to be executed in the context of the main thread | |||||
*/ | |||||
struct task_counter_blocker_t | |||||
: public utils::counter_blocker | |||||
{ | |||||
private: | |||||
utils::task_queue _tasks; //!< tasks to execute in the main thread | |||||
public: | |||||
using utils::counter_blocker::counter_blocker; | |||||
/** | |||||
* enqueue a new task inside the task list | |||||
* | |||||
* @tparam T_task type of the task | |||||
* | |||||
* @param task task to execute | |||||
*/ | |||||
template<typename T_task> | |||||
inline void post(T_task&& task); | |||||
/** | |||||
* execute all tasks stored in the queue | |||||
*/ | |||||
inline void process(); | |||||
}; | |||||
/** | /** | ||||
* defines a group of tasks to execute with | * defines a group of tasks to execute with | ||||
* | * | ||||
@@ -133,7 +170,7 @@ namespace scheduler { | |||||
private: | private: | ||||
context_type& _context; //!< context to use for system execution | context_type& _context; //!< context to use for system execution | ||||
utils::counter_blocker& _counter; //!< counter to track running tasks | |||||
task_counter_blocker_t& _counter; //!< counter to track running tasks | |||||
tasks_tuple_type _tasks; //!< tuple of all tasks | tasks_tuple_type _tasks; //!< tuple of all tasks | ||||
public: | public: | ||||
@@ -143,7 +180,7 @@ namespace scheduler { | |||||
* @param p_context context to use for system execution | * @param p_context context to use for system execution | ||||
* @param p_counter counter to track running tasks | * @param p_counter counter to track running tasks | ||||
*/ | */ | ||||
inline task_group_t(context_type& p_context, utils::counter_blocker& p_counter); | |||||
inline task_group_t(context_type& p_context, task_counter_blocker_t& p_counter); | |||||
/** | /** | ||||
* start the given task | * start the given task | ||||
@@ -223,8 +260,9 @@ namespace scheduler { | |||||
struct atomic_counter | struct atomic_counter | ||||
{ | { | ||||
public: | public: | ||||
using settings_type = T_settings; | |||||
using context_type = ::ecs::context::type<settings_type>; | |||||
using settings_type = T_settings; | |||||
using context_type = ::ecs::context::type<settings_type>; | |||||
using instance_meta_data_type = detail::instance_meta_data_t; | |||||
private: | private: | ||||
context_type& _context; | context_type& _context; | ||||
@@ -1,7 +1,11 @@ | |||||
#pragma once | #pragma once | ||||
// #define ECS_DEBUG_ATOMIC_COUNTER | |||||
#include <ecs/core/system/scheduler/atomic_counter.h> | #include <ecs/core/system/scheduler/atomic_counter.h> | ||||
#include <ecs/core/utils/fixed_function.inl> | |||||
namespace ecs { | namespace ecs { | ||||
namespace core { | namespace core { | ||||
namespace system { | namespace system { | ||||
@@ -10,6 +14,25 @@ namespace scheduler { | |||||
namespace detail | namespace detail | ||||
{ | { | ||||
/* misc */ | |||||
struct make_is_bound_to_main | |||||
{ | |||||
constexpr decltype(auto) operator()() const | |||||
{ | |||||
return hana::is_valid( | |||||
[](auto t) -> hana::type<typename decltype(t)::type::bind_to_main> { }); | |||||
} | |||||
}; | |||||
struct make_is_bound_to_thread | |||||
{ | |||||
constexpr decltype(auto) operator()() const | |||||
{ | |||||
return hana::is_valid( | |||||
[](auto t) -> hana::type<typename decltype(t)::type::bind_to_thread> { }); | |||||
} | |||||
}; | |||||
/* task_t */ | /* task_t */ | ||||
@@ -78,11 +101,53 @@ namespace scheduler { | |||||
}); | }); | ||||
} | } | ||||
/* task_counter_blocker_t */ | |||||
template<typename T_task> | |||||
inline void task_counter_blocker_t | |||||
::post(T_task&& task) | |||||
{ | |||||
std::lock_guard lock(_mutex); | |||||
_tasks.emplace(std::forward<T_task>(task)); | |||||
_cond_var.notify_all(); | |||||
} | |||||
inline void task_counter_blocker_t | |||||
::process() | |||||
{ | |||||
while(true) | |||||
{ | |||||
utils::task t; | |||||
{ | |||||
std::lock_guard lock(_mutex); | |||||
if (_tasks.empty()) | |||||
{ | |||||
return; | |||||
} | |||||
t = std::move(_tasks.front()); | |||||
_tasks.pop(); | |||||
try | |||||
{ | |||||
t(static_cast<size_t>(-1)); | |||||
} | |||||
catch(const std::exception& ex) | |||||
{ | |||||
std::cerr << "error in main thread: " << ex.what() << std::endl; | |||||
} | |||||
catch(...) | |||||
{ | |||||
std::cerr << "error in main thread: unknown" << std::endl; | |||||
} | |||||
} | |||||
} | |||||
} | |||||
/* task_group_t */ | /* task_group_t */ | ||||
template<typename T_context, typename T_dependency_items> | template<typename T_context, typename T_dependency_items> | ||||
inline task_group_t<T_context, T_dependency_items> | inline task_group_t<T_context, T_dependency_items> | ||||
::task_group_t(context_type& p_context, utils::counter_blocker& p_counter) | |||||
::task_group_t(context_type& p_context, task_counter_blocker_t& p_counter) | |||||
: _context(p_context) | : _context(p_context) | ||||
, _counter(p_counter) | , _counter(p_counter) | ||||
{ } | { } | ||||
@@ -106,10 +171,36 @@ namespace scheduler { | |||||
inline void task_group_t<T_context, T_dependency_items> | inline void task_group_t<T_context, T_dependency_items> | ||||
::post_task_in_thread_pool(T_task_id, T_func&& func) | ::post_task_in_thread_pool(T_task_id, T_func&& func) | ||||
{ | { | ||||
std::cout << "post_task_in_thread_pool (current=" << std::this_thread::get_id() << ", task=" << T_task_id::value << ")" << std::endl; | |||||
_context.post_in_thread_pool([this, &func]{ | |||||
execute_task(T_task_id { }, func); | |||||
}); | |||||
using task_type = mp::decay_t<decltype(hana::at(_tasks, T_task_id { }))>; | |||||
using instance_type = mp::decay_t<decltype(_context.instance_by_tag(std::declval<task_type>().dependency_item.tag))>; | |||||
using signature_type = typename instance_type::system_signature_type; | |||||
using executor_builder_type = mp::decay_t<decltype(mp::unwrap(signature_type { }).parallelism())>; | |||||
using is_bound_to_main_type = mp::decay_t<decltype(make_is_bound_to_main { } ())>; | |||||
using is_bound_to_thread_type = mp::decay_t<decltype(make_is_bound_to_thread { } ())>; | |||||
hana::eval_if( | |||||
is_bound_to_main_type{ }(hana::type_c<executor_builder_type>), | |||||
[this, &func](auto _){ | |||||
_counter.post([this, &func](size_t thread_id){ | |||||
execute_task(T_task_id { }, func); | |||||
}); | |||||
}, | |||||
[this, &func](auto _){ | |||||
hana::eval_if(is_bound_to_thread_type{ }(hana::type_c<executor_builder_type>), | |||||
[this, &func](auto _){ | |||||
auto& meta = _context.instance_by_tag(task_type { }.dependency_item.tag).scheduler_meta_data(); | |||||
_context.post_in_thread_pool([this, &func, &meta](size_t thread_id){ | |||||
meta.thread_id = static_cast<ssize_t>(thread_id); | |||||
execute_task(T_task_id { }, func); | |||||
}, meta.thread_id); | |||||
}, | |||||
[this, &func](auto _){ | |||||
_context.post_in_thread_pool([this, &func](size_t thread_id){ | |||||
execute_task(T_task_id { }, func); | |||||
}); | |||||
}); | |||||
}); | |||||
} | } | ||||
template<typename T_context, typename T_dependency_items> | template<typename T_context, typename T_dependency_items> | ||||
@@ -117,7 +208,6 @@ namespace scheduler { | |||||
inline void task_group_t<T_context, T_dependency_items> | inline void task_group_t<T_context, T_dependency_items> | ||||
::execute_task(T_task_id, T_func&& func) | ::execute_task(T_task_id, T_func&& func) | ||||
{ | { | ||||
std::cout << "execute_task (current=" << std::this_thread::get_id() << ", task=" << T_task_id::value << ")" << std::endl; | |||||
auto& task = hana::at(_tasks, T_task_id { }); | auto& task = hana::at(_tasks, T_task_id { }); | ||||
auto& instance = _context.instance_by_tag(task.dependency_item.tag); | auto& instance = _context.instance_by_tag(task.dependency_item.tag); | ||||
instance.execute(_context, func); | instance.execute(_context, func); | ||||
@@ -129,7 +219,6 @@ namespace scheduler { | |||||
inline void task_group_t<T_context, T_dependency_items> | inline void task_group_t<T_context, T_dependency_items> | ||||
::check_and_start_dependencies(T_task_id, T_func&& func) | ::check_and_start_dependencies(T_task_id, T_func&& func) | ||||
{ | { | ||||
std::cout << "check_and_start_dependencies (current=" << std::this_thread::get_id() << ", task=" << T_task_id::value << ")" << std::endl; | |||||
auto& task = hana::at(_tasks, T_task_id { }); | auto& task = hana::at(_tasks, T_task_id { }); | ||||
task.for_dependent_ids([this, &func](auto id){ | task.for_dependent_ids([this, &func](auto id){ | ||||
auto& other = hana::at(_tasks, id); | auto& other = hana::at(_tasks, id); | ||||
@@ -232,6 +321,8 @@ namespace scheduler { | |||||
} | } | ||||
} | } | ||||
/* atomic_counter */ | |||||
template<typename T_settings> | template<typename T_settings> | ||||
inline atomic_counter<T_settings> | inline atomic_counter<T_settings> | ||||
::atomic_counter(context_type& p_context) | ::atomic_counter(context_type& p_context) | ||||
@@ -250,7 +341,7 @@ namespace scheduler { | |||||
using independent_item_ids_type = mp::decay_t<decltype(detail::get_independent_item_ids( | using independent_item_ids_type = mp::decay_t<decltype(detail::get_independent_item_ids( | ||||
dependency_list_type { }))>;; | dependency_list_type { }))>;; | ||||
/* TODO debug beg! */ | |||||
#ifdef ECS_DEBUG_ATOMIC_COUNTER | |||||
size_t i = 0; | size_t i = 0; | ||||
std::cout << "dependency_list" << std::endl; | std::cout << "dependency_list" << std::endl; | ||||
hana::for_each(dependency_list_type { }, [&i](auto item){ | hana::for_each(dependency_list_type { }, [&i](auto item){ | ||||
@@ -260,19 +351,23 @@ namespace scheduler { | |||||
std::cout << " " << hana::value(id) << std::endl; | std::cout << " " << hana::value(id) << std::endl; | ||||
}); | }); | ||||
}); | }); | ||||
/* TODO debug end! */ | |||||
#endif | |||||
using task_group_type = detail::task_group_t<context_type, dependency_list_type>; | using task_group_type = detail::task_group_t<context_type, dependency_list_type>; | ||||
utils::counter_blocker counter (1); | |||||
task_group_type task_group (_context, counter); | |||||
detail::task_counter_blocker_t counter (1); | |||||
task_group_type task_group (_context, counter); | |||||
counter.execute_and_wait_until_zero([&task_group, &counter, &func]{ | |||||
hana::for_each(independent_item_ids_type { }, [&task_group, &func](auto id){ | |||||
task_group.start_task(id, func); | |||||
counter.execute_and_wait_tick( | |||||
[&task_group, &counter, &func]{ | |||||
hana::for_each(independent_item_ids_type { }, [&task_group, &func](auto id){ | |||||
task_group.start_task(id, func); | |||||
}); | |||||
counter.decrement(); | |||||
}, | |||||
[&counter]{ | |||||
counter.process(); | |||||
}); | }); | ||||
counter.decrement(); | |||||
}); | |||||
} | } | ||||
} } } } | } } } } |
@@ -53,14 +53,14 @@ namespace storage { | |||||
make_tuple { }))>; | make_tuple { }))>; | ||||
private: | private: | ||||
storage_type _storage; | |||||
storage_type _storage; | |||||
public: | public: | ||||
constexpr decltype(auto) size() const noexcept | constexpr decltype(auto) size() const noexcept | ||||
{ return (ssl_type { }).size(); } | { return (ssl_type { }).size(); } | ||||
template<typename T_func> | template<typename T_func> | ||||
constexpr void for_each(T_func&& f) const noexcept | |||||
constexpr void for_each(T_func&& f) noexcept | |||||
{ hana::for_each(_storage, std::forward<T_func>(f)); } | { hana::for_each(_storage, std::forward<T_func>(f)); } | ||||
template<typename T_system> | template<typename T_system> | ||||
@@ -52,7 +52,19 @@ namespace utils { | |||||
* @return integral constant with the ID of the requested component tag | * @return integral constant with the ID of the requested component tag | ||||
*/ | */ | ||||
template<typename T_component_tag> | template<typename T_component_tag> | ||||
static constexpr decltype(auto) component_id(T_component_tag ct) noexcept; | |||||
static constexpr decltype(auto) component_id(T_component_tag&& ct) noexcept; | |||||
/** | |||||
* get the bitset from the given system signature | |||||
* | |||||
* @tparam T_system_signature system signature type to get bitmask for | |||||
* | |||||
* @param ssig system signature to get bitmask for | |||||
* | |||||
* @return bitmask for the passed system signature | |||||
*/ | |||||
template<typename T_system_signature> | |||||
static constexpr decltype(auto) from_system_signature(T_system_signature&& ssig) noexcept; | |||||
/** | /** | ||||
* internal bitset type that contains all components | * internal bitset type that contains all components | ||||
@@ -63,6 +75,13 @@ namespace utils { | |||||
bitset_type _bitset; | bitset_type _bitset; | ||||
public: | public: | ||||
/** | |||||
* return a string representation of the bitset | |||||
* | |||||
* @return string representation of the bitset | |||||
*/ | |||||
inline decltype(auto) to_string() const; | |||||
/** | /** | ||||
* clear all bits of the bitset | * clear all bits of the bitset | ||||
*/ | */ | ||||
@@ -9,9 +9,28 @@ namespace utils { | |||||
template<typename T_settings> | template<typename T_settings> | ||||
template<typename T_component_tag> | template<typename T_component_tag> | ||||
constexpr decltype(auto) bitset<T_settings> | constexpr decltype(auto) bitset<T_settings> | ||||
::component_id(T_component_tag ct) noexcept | |||||
::component_id(T_component_tag&& ct) noexcept | |||||
{ | { | ||||
return mp::list::index_of(all_components(), ct); | |||||
return mp::list::index_of(all_components(), std::forward<T_component_tag>(ct)); | |||||
} | |||||
template<typename T_settings> | |||||
template<typename T_system_signature> | |||||
constexpr decltype(auto) bitset<T_settings> | |||||
::from_system_signature(T_system_signature&& ssig) noexcept | |||||
{ | |||||
bitset ret; | |||||
hana::for_each(hana::concat(ssig.read(), ssig.write()), [&ret](auto ct){ | |||||
ret.set_component(ct, true); | |||||
}); | |||||
return ret; | |||||
} | |||||
template<typename T_settings> | |||||
inline decltype(auto) bitset<T_settings> | |||||
::to_string() const | |||||
{ | |||||
return _bitset.to_string(); | |||||
} | } | ||||
template<typename T_settings> | template<typename T_settings> | ||||
@@ -26,7 +45,7 @@ namespace utils { | |||||
inline bool bitset<T_settings> | inline bool bitset<T_settings> | ||||
::contains(const T_other& other) const noexcept | ::contains(const T_other& other) const noexcept | ||||
{ | { | ||||
return (_bitset & other._bitset) == _bitset; | |||||
return (_bitset & other._bitset) == other._bitset; | |||||
} | } | ||||
template<typename T_settings> | template<typename T_settings> | ||||
@@ -14,9 +14,9 @@ namespace utils { | |||||
*/ | */ | ||||
struct counter_blocker | struct counter_blocker | ||||
{ | { | ||||
private: | |||||
std::mutex _mutex; //!< mutex to preotect the counter | |||||
std::condition_variable _cond_var; //!< conditional variable to synchronize the counter | |||||
protected: | |||||
std::recursive_mutex _mutex; //!< mutex to preotect the counter | |||||
std::condition_variable_any _cond_var; //!< conditional variable to synchronize the counter | |||||
size_t _counter; //!< the actual counter value | size_t _counter; //!< the actual counter value | ||||
public: | public: | ||||
@@ -42,10 +42,23 @@ namespace utils { | |||||
* | * | ||||
* @tparam T_func type of the function to execute | * @tparam T_func type of the function to execute | ||||
* | * | ||||
* @param func function to execute | |||||
* @param func function to execute before waiting for the counter | |||||
*/ | */ | ||||
template<typename T_func> | template<typename T_func> | ||||
inline void execute_and_wait_until_zero(T_func&& func) noexcept; | |||||
inline void execute_and_wait(T_func&& func); | |||||
/** | |||||
* Execute the given function and wait until the counter is equal to zero. | |||||
* Execute the given tick function at least once and repeatedly if the counter value has changed. | |||||
* | |||||
* @tparam T_func type of the function to execute | |||||
* @tparam T_tick type of tick function to execute | |||||
* | |||||
* @param func function to execute before waiting for the counter | |||||
* @param tick tick function to execute | |||||
*/ | |||||
template<typename T_func, typename T_tick> | |||||
inline void execute_and_wait_tick(T_func&& func, T_tick&& tick); | |||||
}; | }; | ||||
} } } | } } } |
@@ -1,5 +1,6 @@ | |||||
#pragma once | #pragma once | ||||
#include <iostream> // TODO debug! | |||||
#include <ecs/core/utils/counter_blocker.h> | #include <ecs/core/utils/counter_blocker.h> | ||||
namespace ecs { | namespace ecs { | ||||
@@ -30,11 +31,23 @@ namespace utils { | |||||
template<typename T_func> | template<typename T_func> | ||||
inline void counter_blocker | inline void counter_blocker | ||||
::execute_and_wait_until_zero(T_func&& func) noexcept | |||||
::execute_and_wait(T_func&& func) | |||||
{ | { | ||||
func(); | func(); | ||||
std::unique_lock lock(_mutex); | std::unique_lock lock(_mutex); | ||||
_cond_var.wait(lock, [this]{ return (_counter == 0); }); | _cond_var.wait(lock, [this]{ return (_counter == 0); }); | ||||
} | } | ||||
template<typename T_func, typename T_tick> | |||||
inline void counter_blocker | |||||
::execute_and_wait_tick(T_func&& func, T_tick&& tick) | |||||
{ | |||||
func(); | |||||
std::unique_lock lock(_mutex); | |||||
_cond_var.wait(lock, [this, t = std::forward<T_tick>(tick)]{ | |||||
t(); | |||||
return (_counter == 0); | |||||
}); | |||||
} | |||||
} } } | } } } |
@@ -152,7 +152,7 @@ namespace utils { | |||||
inline decltype(auto) insert(const value_type& value) | inline decltype(auto) insert(const value_type& value) | ||||
{ | { | ||||
auto it = std::lower_bound(begin(), end(), value); | auto it = std::lower_bound(begin(), end(), value); | ||||
auto is_new = _compare(value, *it); | |||||
auto is_new = (it != end() && _compare(value, *it)); | |||||
if (it == end() || is_new) | if (it == end() || is_new) | ||||
_items.insert(it, value); | _items.insert(it, value); | ||||
return std::make_pair(it, is_new); | return std::make_pair(it, is_new); | ||||
@@ -181,6 +181,9 @@ namespace utils { | |||||
template<typename... T_args> | template<typename... T_args> | ||||
constexpr decltype(auto) erase(T_args&&... args) | constexpr decltype(auto) erase(T_args&&... args) | ||||
{ return _items.erase(std::forward<T_args>(args)...); } | { return _items.erase(std::forward<T_args>(args)...); } | ||||
void clear() | |||||
{ _items.clear(); } | |||||
}; | }; | ||||
} } } | } } } |
@@ -1,8 +1,9 @@ | |||||
#pragma once | #pragma once | ||||
#include "./thread_pool/pool.h" | #include "./thread_pool/pool.h" | ||||
#include "./thread_pool/types.h" | |||||
#include "./thread_pool/task_queue.h" | |||||
#include "./thread_pool/worker.h" | #include "./thread_pool/worker.h" | ||||
#include "./thread_pool/pool.inl" | #include "./thread_pool/pool.inl" | ||||
#include "./thread_pool/task_queue.inl" | |||||
#include "./thread_pool/worker.inl" | #include "./thread_pool/worker.inl" |
@@ -4,8 +4,8 @@ | |||||
#include <vector> | #include <vector> | ||||
#include <ecs/config.h> | #include <ecs/config.h> | ||||
#include <ecs/core/utils/thread_pool/types.h> | |||||
#include <ecs/core/utils/thread_pool/worker.h> | #include <ecs/core/utils/thread_pool/worker.h> | ||||
#include <ecs/core/utils/thread_pool/task_queue.h> | |||||
namespace ecs { | namespace ecs { | ||||
namespace core { | namespace core { | ||||
@@ -17,13 +17,14 @@ namespace utils { | |||||
struct thread_pool | struct thread_pool | ||||
{ | { | ||||
private: | private: | ||||
using worker_vector = std::vector<thread_pool_worker>; | |||||
using worker_ptr_u = std::unique_ptr<thread_pool_worker>; | |||||
using worker_vector = std::vector<worker_ptr_u>; | |||||
using atomic_size_t = std::atomic<size_t>; | using atomic_size_t = std::atomic<size_t>; | ||||
private: | private: | ||||
task_queue _queue; //!< queue to store tasks to execute | |||||
worker_vector _workers; //!< vector of worker threads | |||||
atomic_size_t _outstanding_inits; //!< number of outstanding worker initializations | |||||
concurrent_task_queue _tasks; //!< queue to store tasks to execute | |||||
worker_vector _workers; //!< vector of worker threads | |||||
atomic_size_t _outstanding_inits; //!< number of outstanding worker initializations | |||||
/** | /** | ||||
* check if all workers are finished or not | * check if all workers are finished or not | ||||
@@ -59,9 +60,10 @@ namespace utils { | |||||
* @tparam T_task type of the task | * @tparam T_task type of the task | ||||
* | * | ||||
* @param task task to execute | * @param task task to execute | ||||
* @param worker_id id of the worker to enqueue task at (-1 = any; n = worker tasks) | |||||
*/ | */ | ||||
template<typename T_task> | template<typename T_task> | ||||
inline void post(T_task&& task); | |||||
inline void post(T_task&& task, ssize_t worker_id = -1); | |||||
}; | }; | ||||
} } } | } } } |
@@ -8,9 +8,25 @@ namespace utils { | |||||
template<typename T_task> | template<typename T_task> | ||||
inline void thread_pool | inline void thread_pool | ||||
::post(T_task&& task) | |||||
::post(T_task&& task, ssize_t worker_id) | |||||
{ | { | ||||
_queue.enqueue(std::forward<T_task>(task)); | |||||
if (worker_id == -1) | |||||
{ | |||||
_tasks.push(std::forward<T_task>(task)); | |||||
for (auto& w : _workers) | |||||
{ | |||||
w->signal(); | |||||
} | |||||
} | |||||
else if (worker_id >= 1 && worker_id <= static_cast<ssize_t>(_workers.size())) | |||||
{ | |||||
auto& w = _workers.at(static_cast<size_t>(worker_id - 1)); | |||||
w->post(std::forward<T_task>(task)); | |||||
} | |||||
else | |||||
{ | |||||
throw std::invalid_argument(std::string("invalid worker_id: ") + std::to_string(worker_id)); | |||||
} | |||||
} | } | ||||
} } } | } } } |
@@ -0,0 +1,68 @@ | |||||
#pragma once | |||||
#include <sstream> // TODO debug! | |||||
#include <queue> | |||||
#include <ecs/config.h> | |||||
#include <moodycamel/concurrentqueue.h> | |||||
#include "../fixed_function.h" | |||||
namespace ecs { | |||||
namespace core { | |||||
namespace utils { | |||||
/** | |||||
* Functor to execute as task inside the thread pool. The functor can have a max size of 128 bytes! | |||||
*/ | |||||
using task = fixed_function<void(size_t thread_id), 128>; | |||||
/** | |||||
* normal task queue | |||||
*/ | |||||
using task_queue = std::queue<task>; | |||||
/** | |||||
* none blocking concurrent queue to store thread pool tasks in | |||||
*/ | |||||
struct concurrent_task_queue | |||||
{ | |||||
private: | |||||
using inner_queue_type = moodycamel::ConcurrentQueue<task>; | |||||
private: | |||||
std::atomic<ssize_t> _count; | |||||
inner_queue_type _inner_queue; | |||||
public: | |||||
/** | |||||
* constructor | |||||
*/ | |||||
concurrent_task_queue(); | |||||
/** | |||||
* check if the task queue is empty | |||||
* | |||||
* @retval TRUE if the queue is empty | |||||
* @retval FALSE if the queue contains at least one element | |||||
*/ | |||||
inline bool empty() const; | |||||
/** | |||||
* push a new element to the queue | |||||
* | |||||
* @param t task to push to queue | |||||
*/ | |||||
void push(task&& t); | |||||
/** | |||||
* try to get dequeue a task from the queue | |||||
* | |||||
* @param t dequeued task | |||||
* | |||||
* @retval TRUE if an task was dequeued | |||||
* @retval FALSE if the queue is empty | |||||
*/ | |||||
bool pop(task& t); | |||||
}; | |||||
} } } |
@@ -0,0 +1,15 @@ | |||||
#pragma once | |||||
#include <ecs/core/utils/thread_pool/task_queue.h> | |||||
namespace ecs { | |||||
namespace core { | |||||
namespace utils { | |||||
bool concurrent_task_queue | |||||
::empty() const | |||||
{ | |||||
return (_count.load() <= 0); | |||||
} | |||||
} } } |
@@ -1,22 +0,0 @@ | |||||
#pragma once | |||||
#include <ecs/config.h> | |||||
#include <moodycamel/blockingconcurrentqueue.h> | |||||
#include "../fixed_function.h" | |||||
namespace ecs { | |||||
namespace core { | |||||
namespace utils { | |||||
/** | |||||
* Functor to execute as task inside the thread pool. The functor can have a max size of 128 bytes! | |||||
*/ | |||||
using task = fixed_function<void(), 128>; | |||||
/** | |||||
* non blocking concurrent queue to store thread pool tasks in | |||||
*/ | |||||
using task_queue = moodycamel::BlockingConcurrentQueue<task>; | |||||
} } } |
@@ -1,9 +1,11 @@ | |||||
#pragma once | #pragma once | ||||
#include <mutex> | |||||
#include <thread> | #include <thread> | ||||
#include <condition_variable> | |||||
#include <ecs/config.h> | #include <ecs/config.h> | ||||
#include "./types.h" | |||||
#include "./task_queue.h" | |||||
#include "../movable_atomic.h" | #include "../movable_atomic.h" | ||||
namespace ecs { | namespace ecs { | ||||
@@ -29,9 +31,13 @@ namespace utils { | |||||
using atomic_state = movable_atomic<state>; | using atomic_state = movable_atomic<state>; | ||||
std::thread _thread; //!< actual thread object to use for execution | |||||
task_queue& _queue; //!< queue to poll new tasks from | |||||
atomic_state _state; //!< current state of the worker | |||||
std::thread _thread; //!< actual thread object to use for execution | |||||
std::mutex _mutex; //!< mutex to protext the conditional | |||||
std::condition_variable _conditional; //!< condition variable to inform about new tasks | |||||
concurrent_task_queue& _all_tasks; //!< queue to poll new tasks from | |||||
task_queue _own_tasks; //!< queue of tasks that are assigned to this specific worker | |||||
atomic_state _state; //!< current state of the worker | |||||
size_t _thread_id; //!< index of the thread inside the thread pool | |||||
private: | private: | ||||
/** | /** | ||||
@@ -39,13 +45,24 @@ namespace utils { | |||||
*/ | */ | ||||
void run(); | void run(); | ||||
/** | |||||
* pop the next task from either the own task list or the general task list | |||||
* | |||||
* @param t parameter to store dequeued task in | |||||
* @param timeout timeout to wait for new tasks | |||||
* | |||||
* @return TRUE if a task was dequeued, FALSE otherwise | |||||
*/ | |||||
template<typename T_rep, typename T_period> | |||||
inline bool pop(task& t, const std::chrono::duration<T_rep, T_period>& timeout); | |||||
public: | public: | ||||
/** | /** | ||||
* constructor | * constructor | ||||
* | * | ||||
* @param queue task queue to poll tasks from | * @param queue task queue to poll tasks from | ||||
*/ | */ | ||||
inline thread_pool_worker(task_queue& queue) noexcept; | |||||
inline thread_pool_worker(concurrent_task_queue& all_tasks, size_t thread_id) noexcept; | |||||
inline thread_pool_worker(thread_pool_worker&&) = default; | inline thread_pool_worker(thread_pool_worker&&) = default; | ||||
inline thread_pool_worker(const thread_pool_worker&) = delete; | inline thread_pool_worker(const thread_pool_worker&) = delete; | ||||
@@ -81,6 +98,21 @@ namespace utils { | |||||
*/ | */ | ||||
inline bool finished() const noexcept; | inline bool finished() const noexcept; | ||||
/** | |||||
* enqueue a new task to execute inside this worker thread | |||||
* | |||||
* @tparam T_task type of the task | |||||
* | |||||
* @param task task to execute | |||||
*/ | |||||
template<typename T_task> | |||||
inline void post(T_task&& task); | |||||
/** | |||||
* inform the task about new tasks in the concurent task list | |||||
*/ | |||||
inline void signal(); | |||||
}; | }; | ||||
} } } | } } } |
@@ -1,16 +1,42 @@ | |||||
#pragma once | #pragma once | ||||
#include <ecs/core/utils/thread_pool/worker.h> | #include <ecs/core/utils/thread_pool/worker.h> | ||||
#include <ecs/core/utils/thread_pool/task_queue.inl> | |||||
namespace ecs { | namespace ecs { | ||||
namespace core { | namespace core { | ||||
namespace utils { | namespace utils { | ||||
inline thread_pool_worker | inline thread_pool_worker | ||||
::thread_pool_worker(task_queue& queue) noexcept | |||||
: _queue(queue) | |||||
::thread_pool_worker(concurrent_task_queue& all_tasks, size_t thread_id) noexcept | |||||
: _all_tasks(all_tasks) | |||||
, _thread_id(thread_id) | |||||
{ } | { } | ||||
template<typename T_rep, typename T_period> | |||||
inline bool thread_pool_worker | |||||
::pop(task& t, const std::chrono::duration<T_rep, T_period>& timeout) | |||||
{ | |||||
std::unique_lock<std::mutex> lock(_mutex); | |||||
do | |||||
{ | |||||
if (_state != state::running) | |||||
{ | |||||
return false; | |||||
} | |||||
if (!_own_tasks.empty()) | |||||
{ | |||||
t = std::move(_own_tasks.front()); | |||||
_own_tasks.pop(); | |||||
return true; | |||||
} | |||||
if (!_all_tasks.empty() && _all_tasks.pop(t)) // TODO: this will sometimes return false, even if the queue is not yet empty | |||||
{ | |||||
return true; | |||||
} | |||||
} while(_conditional.wait_for(lock, timeout) != std::cv_status::timeout); | |||||
return false; | |||||
} | |||||
template<typename T_counter> | template<typename T_counter> | ||||
inline void thread_pool_worker | inline void thread_pool_worker | ||||
@@ -33,7 +59,6 @@ namespace utils { | |||||
::join() noexcept | ::join() noexcept | ||||
{ | { | ||||
assert(_thread.joinable()); | assert(_thread.joinable()); | ||||
assert(_state == state::finished); | |||||
_thread.join(); | _thread.join(); | ||||
} | } | ||||
@@ -43,4 +68,19 @@ namespace utils { | |||||
return _state == state::finished; | return _state == state::finished; | ||||
} | } | ||||
template<typename T_task> | |||||
inline void thread_pool_worker | |||||
::post(T_task&& task) | |||||
{ | |||||
std::unique_lock<std::mutex> lock(_mutex); | |||||
_own_tasks.emplace(std::forward<T_task>(task)); | |||||
signal(); | |||||
} | |||||
inline void thread_pool_worker | |||||
::signal() | |||||
{ | |||||
_conditional.notify_all(); | |||||
} | |||||
} } } | } } } |
@@ -1,5 +1,7 @@ | |||||
#pragma once | #pragma once | ||||
#include "./strategy/bound.h" | |||||
#include "./strategy/main.h" | |||||
#include "./strategy/none.h" | #include "./strategy/none.h" | ||||
#include "./strategy/split_evenly_cores.h" | #include "./strategy/split_evenly_cores.h" | ||||
#include "./strategy/split_evenly.h" | #include "./strategy/split_evenly.h" |
@@ -0,0 +1,36 @@ | |||||
#pragma once | |||||
#include <ecs/config.h> | |||||
#include <ecs/core/system/parallelism/strategy/none.h> | |||||
namespace ecs { | |||||
namespace signature { | |||||
namespace system { | |||||
namespace parallelism { | |||||
namespace detail | |||||
{ | |||||
struct bound_builder_t | |||||
{ | |||||
using bind_to_thread = void; | |||||
inline decltype(auto) operator()() const noexcept | |||||
{ | |||||
#ifndef NDEBUG | |||||
return ::ecs::core::system::parallelism::none(false); | |||||
#else | |||||
return ::ecs::core::system::parallelism::none(); | |||||
#endif | |||||
} | |||||
}; | |||||
struct bound_t | |||||
{ | |||||
constexpr decltype(auto) operator()() const noexcept | |||||
{ return bound_builder_t { }; } | |||||
}; | |||||
} | |||||
constexpr decltype(auto) bound = detail::bound_t { }; | |||||
} } } } |
@@ -0,0 +1,36 @@ | |||||
#pragma once | |||||
#include <ecs/config.h> | |||||
#include <ecs/core/system/parallelism/strategy/none.h> | |||||
namespace ecs { | |||||
namespace signature { | |||||
namespace system { | |||||
namespace parallelism { | |||||
namespace detail | |||||
{ | |||||
struct main_builder_t | |||||
{ | |||||
using bind_to_main = void; | |||||
inline decltype(auto) operator()() const noexcept | |||||
{ | |||||
#ifndef NDEBUG | |||||
return ::ecs::core::system::parallelism::none(false); | |||||
#else | |||||
return ::ecs::core::system::parallelism::none(); | |||||
#endif | |||||
} | |||||
}; | |||||
struct main_t | |||||
{ | |||||
constexpr decltype(auto) operator()() const noexcept | |||||
{ return main_builder_t { }; } | |||||
}; | |||||
} | |||||
constexpr decltype(auto) main = detail::main_t { }; | |||||
} } } } |
@@ -12,8 +12,14 @@ namespace parallelism { | |||||
{ | { | ||||
struct none_builder_t | struct none_builder_t | ||||
{ | { | ||||
constexpr decltype(auto) operator()() const noexcept | |||||
{ return ::ecs::core::system::parallelism::none { }; } | |||||
inline decltype(auto) operator()() const noexcept | |||||
{ | |||||
#ifndef NDEBUG | |||||
return ::ecs::core::system::parallelism::none(false); | |||||
#else | |||||
return ::ecs::core::system::parallelism::none(); | |||||
#endif | |||||
} | |||||
}; | }; | ||||
struct none_t | struct none_t | ||||
@@ -15,7 +15,7 @@ namespace parallelism { | |||||
{ | { | ||||
struct parameters | struct parameters | ||||
{ | { | ||||
static constexpr size_t get_split_count() noexcept | |||||
static constexpr size_t get_per_split_count() noexcept | |||||
{ return T_split::value; } | { return T_split::value; } | ||||
}; | }; | ||||
@@ -121,7 +121,7 @@ namespace system { | |||||
public: /* setter */ | public: /* setter */ | ||||
/** set the parallelism options | /** set the parallelism options | ||||
* @param parallelism is a predicate the takes no parameters and returns a | |||||
* @param parallelism is a predicate that takes no parameters and returns a | |||||
* system executor interface */ | * system executor interface */ | ||||
template<typename T_parallelism> | template<typename T_parallelism> | ||||
constexpr decltype(auto) parallelism(T_parallelism parallelism) const noexcept; | constexpr decltype(auto) parallelism(T_parallelism parallelism) const noexcept; | ||||
@@ -9,7 +9,7 @@ auto thread_pool | |||||
{ | { | ||||
for (const auto& w : _workers) | for (const auto& w : _workers) | ||||
{ | { | ||||
if (!w.finished()) | |||||
if (!w->finished()) | |||||
{ | { | ||||
return false; | return false; | ||||
} | } | ||||
@@ -23,19 +23,20 @@ void thread_pool | |||||
_workers.reserve(count); | _workers.reserve(count); | ||||
for (size_t i = 0; i < count; ++i) | for (size_t i = 0; i < count; ++i) | ||||
{ | { | ||||
_workers.emplace_back(_queue); | |||||
_workers.emplace_back(std::make_unique<thread_pool_worker>(_tasks, i + 1)); | |||||
} | } | ||||
_outstanding_inits = count; | _outstanding_inits = count; | ||||
for (auto& w : _workers) | for (auto& w : _workers) | ||||
{ | { | ||||
w.start(_outstanding_inits); | |||||
w->start(_outstanding_inits); | |||||
} | } | ||||
} | } | ||||
thread_pool | thread_pool | ||||
::thread_pool(size_t count) | ::thread_pool(size_t count) | ||||
{ | { | ||||
assert(count > 1); | |||||
initialize_workers(count); | initialize_workers(count); | ||||
} | } | ||||
@@ -48,21 +49,11 @@ thread_pool | |||||
std::this_thread::sleep_for(std::chrono::milliseconds(50)); | std::this_thread::sleep_for(std::chrono::milliseconds(50)); | ||||
} | } | ||||
// stop all workers | |||||
for (auto& w : _workers) | |||||
{ | |||||
w.stop(); | |||||
} | |||||
// post dummy tasks untill all workers has stopped | |||||
while (!all_workers_finished()) | |||||
{ | |||||
post([]{ }); | |||||
} | |||||
// join the worker threads | // join the worker threads | ||||
for (auto& w : _workers) | for (auto& w : _workers) | ||||
{ | { | ||||
w.join(); | |||||
w->stop(); | |||||
w->signal(); | |||||
w->join(); | |||||
} | } | ||||
} | |||||
} |
@@ -0,0 +1,38 @@ | |||||
#include <ecs/core/utils/fixed_function.inl> | |||||
#include <ecs/core/utils/thread_pool/task_queue.inl> | |||||
using namespace ::ecs::core::utils; | |||||
concurrent_task_queue | |||||
::concurrent_task_queue() | |||||
: _count (0) | |||||
, _inner_queue () | |||||
{ } | |||||
void concurrent_task_queue | |||||
::push(task&& t) | |||||
{ | |||||
if (!_inner_queue.enqueue(std::move(t))) | |||||
{ | |||||
throw std::overflow_error("task queue is out of memory"); | |||||
} | |||||
_count++; | |||||
} | |||||
bool concurrent_task_queue | |||||
::pop(task& t) | |||||
{ | |||||
ssize_t old_count = _count.load(std::memory_order_relaxed); | |||||
while (old_count > 0) | |||||
{ | |||||
if (_count.compare_exchange_weak(old_count, old_count - 1, std::memory_order_acquire, std::memory_order_relaxed)) | |||||
{ | |||||
while (!_inner_queue.try_dequeue(t)) | |||||
{ | |||||
continue; | |||||
} | |||||
return true; | |||||
} | |||||
} | |||||
return false; | |||||
} |
@@ -1,3 +1,5 @@ | |||||
#include <iostream> | |||||
#include <ecs/core/utils/fixed_function.inl> | #include <ecs/core/utils/fixed_function.inl> | ||||
#include <ecs/core/utils/thread_pool/worker.inl> | #include <ecs/core/utils/thread_pool/worker.inl> | ||||
@@ -10,9 +12,20 @@ void thread_pool_worker | |||||
while (_state == state::running) | while (_state == state::running) | ||||
{ | { | ||||
task t; | task t; | ||||
if (_queue.wait_dequeue_timed(t, std::chrono::milliseconds(500))) | |||||
if (pop(t, std::chrono::milliseconds(500))) | |||||
{ | { | ||||
t(); | |||||
try | |||||
{ | |||||
t(_thread_id); | |||||
} | |||||
catch(const std::exception& ex) | |||||
{ | |||||
std::cerr << "error in worker thread: " << ex.what() << std::endl; | |||||
} | |||||
catch(...) | |||||
{ | |||||
std::cerr << "error in worker thread: unknown" << std::endl; | |||||
} | |||||
} | } | ||||
} | } | ||||
_state = state::finished; | _state = state::finished; |
@@ -40,13 +40,9 @@ namespace test | |||||
double y; | double y; | ||||
}; | }; | ||||
struct test | |||||
{ }; | |||||
MAKE_COMPONENT_TAG(position) | MAKE_COMPONENT_TAG(position) | ||||
MAKE_COMPONENT_TAG(velocity) | MAKE_COMPONENT_TAG(velocity) | ||||
MAKE_COMPONENT_TAG(acceleration) | MAKE_COMPONENT_TAG(acceleration) | ||||
MAKE_COMPONENT_TAG(test) | |||||
} | } | ||||
namespace system | namespace system | ||||
@@ -60,18 +56,16 @@ namespace test | |||||
struct render | struct render | ||||
{ }; | { }; | ||||
struct foo | |||||
{ }; | |||||
MAKE_SYSTEM_TAG(accelerate) | MAKE_SYSTEM_TAG(accelerate) | ||||
MAKE_SYSTEM_TAG(move) | MAKE_SYSTEM_TAG(move) | ||||
MAKE_SYSTEM_TAG(render) | MAKE_SYSTEM_TAG(render) | ||||
MAKE_SYSTEM_TAG(foo) | |||||
} | } | ||||
} | } | ||||
using namespace test; | using namespace test; | ||||
namespace hana = ::boost::hana; | |||||
namespace c = component; | namespace c = component; | ||||
namespace ct = c::tag; | namespace ct = c::tag; | ||||
namespace cs = ecs::signature::component; | namespace cs = ecs::signature::component; | ||||
@@ -96,71 +90,139 @@ constexpr decltype(auto) cs_acceleration = | |||||
cs::make(ct::acceleration) | cs::make(ct::acceleration) | ||||
.storage(cs::storage::dynamic<storage_size>); | .storage(cs::storage::dynamic<storage_size>); | ||||
constexpr decltype(auto) cs_test = | |||||
cs::make(ct::test) | |||||
.storage(cs::storage::dynamic<storage_size>); | |||||
constexpr decltype(auto) cs_list = csl::make( | constexpr decltype(auto) cs_list = csl::make( | ||||
cs_position, | cs_position, | ||||
cs_velocity, | cs_velocity, | ||||
cs_acceleration, | |||||
cs_test); | |||||
constexpr decltype(auto) ss_foo = | |||||
ss::make(st::foo) | |||||
.write(ct::test); | |||||
cs_acceleration); | |||||
constexpr decltype(auto) ss_accelerate = | constexpr decltype(auto) ss_accelerate = | ||||
ss::make(st::accelerate) | ss::make(st::accelerate) | ||||
// .parallelism(ss::parallelism::split_evenly(hana::size_c<3>)) | |||||
.read(ct::acceleration) | .read(ct::acceleration) | ||||
.write(ct::velocity); | .write(ct::velocity); | ||||
constexpr decltype(auto) ss_move = | constexpr decltype(auto) ss_move = | ||||
ss::make(st::move) | ss::make(st::move) | ||||
.parallelism(ss::parallelism::split_every(hana::size_c<3>)) | |||||
.read(ct::velocity) | .read(ct::velocity) | ||||
.write(ct::position); | .write(ct::position); | ||||
constexpr decltype(auto) ss_render = | constexpr decltype(auto) ss_render = | ||||
ss::make(st::render) | ss::make(st::render) | ||||
.read(ct::velocity, ct::position, ct::acceleration, ct::test); | |||||
.parallelism(ss::parallelism::main()) | |||||
.read(ct::velocity, ct::position, ct::acceleration); | |||||
constexpr decltype(auto) ss_list = ssl::make( | constexpr decltype(auto) ss_list = ssl::make( | ||||
ss_foo, | |||||
ss_accelerate, | ss_accelerate, | ||||
ss_move, | ss_move, | ||||
ss_render); | ss_render); | ||||
constexpr decltype(auto) settings = ::ecs::settings::make() | constexpr decltype(auto) settings = ::ecs::settings::make() | ||||
.component_signatures(cs_list) | .component_signatures(cs_list) | ||||
.system_signatures (ss_list); | |||||
.system_signatures (ss_list) | |||||
.refresh_parallelism (ecs::settings::refresh_parallelism::disable); | |||||
namespace sea = ::ecs::system_execution_adapter; | namespace sea = ::ecs::system_execution_adapter; | ||||
double frand(double min, double max) | |||||
{ | |||||
double f = (double)rand() / RAND_MAX; | |||||
return min + f * (max - min); | |||||
} | |||||
inline void log_system_execution(const std::string& system) | |||||
{ | |||||
std::ostringstream os; | |||||
os << system | |||||
<< " (thread=" << std::this_thread::get_id() | |||||
<< ")" | |||||
<< std::endl; | |||||
std::cout << os.str(); | |||||
} | |||||
template<typename T_data_proxy> | |||||
inline void log_system_execution(const std::string& system, const T_data_proxy& data) | |||||
{ | |||||
std::ostringstream os; | |||||
os << system | |||||
<< " (thread=" << std::this_thread::get_id() | |||||
<< ", index=" << data._index | |||||
<< ", begin=" << data._begin | |||||
<< ", end=" << data._end | |||||
<< ", count=" << data.entity_count() | |||||
<< ")" | |||||
<< std::endl; | |||||
std::cout << os.str(); | |||||
} | |||||
TEST(DummyTest, fuu) | TEST(DummyTest, fuu) | ||||
{ | { | ||||
srand(static_cast<unsigned int>(time(nullptr))); | |||||
auto context = ::ecs::context::make(settings); | auto context = ::ecs::context::make(settings); | ||||
context.step([](auto& step_proxy){ | |||||
step_proxy.execute_systems()( | |||||
sea::tags(st::foo) | |||||
.for_subtasks([](auto& s, auto& data){ | |||||
std::cout << "foo" << std::endl | |||||
<< " " << type_helper<decltype(data)>::name() << std::endl; | |||||
}), | |||||
sea::tags(st::accelerate) | |||||
.for_subtasks([](auto& s, auto& data){ | |||||
std::cout << "accelerate" << std::endl | |||||
<< " " << type_helper<decltype(data)>::name() << std::endl; | |||||
}), | |||||
sea::tags(st::move) | |||||
.for_subtasks([](auto& s, auto& data){ | |||||
std::cout << "move" << std::endl | |||||
<< " " << type_helper<decltype(data)>::name() << std::endl; | |||||
}), | |||||
sea::tags(st::render) | |||||
.for_subtasks([](auto& s, auto& data){ | |||||
std::cout << "render" << std::endl | |||||
<< " " << type_helper<decltype(data)>::name() << std::endl; | |||||
}) | |||||
); | |||||
context.step([](auto& proxy){ | |||||
for (auto i = 0; i < 10; ++i) { | |||||
auto handle = proxy.create_entity(); | |||||
auto& position = proxy.add_component(handle, ct::position); | |||||
position.x = frand(-100, 100); | |||||
position.y = frand(-100, 100); | |||||
auto& velocity = proxy.add_component(handle, ct::velocity); | |||||
velocity.x = frand(-1, 1); | |||||
velocity.y = frand(-1, 1); | |||||
auto& acceleration = proxy.add_component(handle, ct::acceleration); | |||||
acceleration.x = frand(-1, 1); | |||||
acceleration.y = frand(-1, 1); | |||||
} | |||||
}); | }); | ||||
std::cout << "main " << std::this_thread::get_id() << std::endl; | |||||
for (int i = 0; i < 10; ++i) { | |||||
context.step([](auto& proxy){ | |||||
proxy.execute_systems()( | |||||
sea::tags(st::accelerate) | |||||
.for_subtasks([](auto& s, auto& data){ | |||||
log_system_execution("accelerate"); | |||||
data.for_entities([&data](auto& handle) { | |||||
auto& velocity = data.get_component(ct::velocity, handle); | |||||
auto& acceleration = data.get_component(ct::acceleration, handle); | |||||
velocity.x += acceleration.x; | |||||
velocity.y += acceleration.y; | |||||
}); | |||||
}), | |||||
sea::tags(st::move) | |||||
.for_subtasks([](auto& s, auto& data){ | |||||
log_system_execution("move", data); | |||||
data.for_entities([&data](auto& handle) { | |||||
auto& position = data.get_component(ct::position, handle); | |||||
auto& velocity = data.get_component(ct::velocity, handle); | |||||
position.x += velocity.x; | |||||
position.y += velocity.y; | |||||
}); | |||||
}), | |||||
sea::tags(st::render) | |||||
.for_subtasks([](auto& s, auto& data){ | |||||
log_system_execution("render"); | |||||
data.for_entities([&data](auto& handle) { | |||||
auto& position = data.get_component(ct::position, handle); | |||||
auto& velocity = data.get_component(ct::velocity, handle); | |||||
auto& acceleration = data.get_component(ct::acceleration, handle); | |||||
std::cout << std::fixed | |||||
<< " " << handle.index() << ": " | |||||
<< "pos(" << position.x << ";" << position.y << ") " | |||||
<< "vel(" << velocity.x << ";" << velocity.y << ") " | |||||
<< "acc(" << acceleration.x << ";" << acceleration.y << ")" | |||||
<< std::endl; | |||||
}); | |||||
}) | |||||
); | |||||
}); | |||||
} | |||||
} | } |