Author | SHA1 | Message | Date |
---|---|---|---|
bergmann | 9d365d8118 | * added ToDo list | 5 years ago |
bergmann | 28f4d2508c | * fixed dead lock in thread pool | 5 years ago |
bergmann | b109ee1170 | * implemented executor for split_every and split_evenly parallelism strategy | 5 years ago |
bergmann | c13d678d85 | * bind system execution to specific threads (also including main thread) | 5 years ago |
bergmann | a21d5daf59 |
* implemented entity creation and matching
* added dummy system processing |
5 years ago |
@@ -0,0 +1,3 @@ | |||
- check if the thread pool can deadlock if all threads are waiting for their counter_blocker | |||
- check if we could implement some kind of progress_messages for the worker threads while waiting for counter_blocker | |||
- check if the conditional variables are used correctly inside counter_blockers |
@@ -3,6 +3,7 @@ | |||
#include <ecs/config.h> | |||
#include <ecs/core/utils/thread_pool.h> | |||
#include <ecs/core/utils/counter_blocker.h> | |||
#include <ecs/core/utils/ordered_vector.h> | |||
#include <ecs/core/component/manager.h> | |||
#include "./context.fwd.h" | |||
@@ -44,7 +45,7 @@ namespace context { | |||
using handle_type = entity_handle_type; | |||
private: | |||
using handle_vector_type = std::vector<handle_type>; | |||
using handle_set_type = core::utils::ordered_vector<handle_type>; | |||
protected: | |||
context_type& _context; //!< reference to its own child class | |||
@@ -56,8 +57,8 @@ namespace context { | |||
core::utils::thread_pool _thread_pool; //!< thread pool to execute tasks parallel | |||
handle_vector_type _to_match; //!< handles of entities that were created in the last tick | |||
handle_vector_type _to_kill; //!< handles of entities that were destroyed in the last tick | |||
handle_set_type _to_match; //!< handles of entities that were created in the last tick | |||
handle_set_type _to_kill; //!< handles of entities that were destroyed in the last tick | |||
public: | |||
@@ -76,9 +77,10 @@ namespace context { | |||
* @tparam T_func type of the function to execute | |||
* | |||
* @param func function to execute | |||
* @param thread_id ID of the thread to add task to | |||
*/ | |||
template<typename T_func> | |||
inline void _post_in_thread_pool(T_func&& func); | |||
inline void _post_in_thread_pool(T_func&& func, ssize_t thread_id = -1); | |||
protected: /* entity */ | |||
@@ -111,6 +113,22 @@ namespace context { | |||
*/ | |||
inline bool _is_alive(const handle_type& handle) const; | |||
/** | |||
* destroy the passed entity | |||
* | |||
* @param handle handle of the entity to destroy | |||
*/ | |||
inline void _destroy_entity(const handle_type& handle); | |||
/** | |||
* get the meta data for the passed entity | |||
* | |||
* @param handle handle to get meta data for | |||
* | |||
* @return meta data of the passed entity | |||
*/ | |||
inline const auto& _entity_meta_data(const handle_type& handle); | |||
protected: /* component */ | |||
/** | |||
@@ -26,9 +26,9 @@ namespace detail { | |||
template<typename T_settings> | |||
template<typename T_func> | |||
inline void base_t<T_settings> | |||
::_post_in_thread_pool(T_func&& func) | |||
::_post_in_thread_pool(T_func&& func, ssize_t thread_id) | |||
{ | |||
_thread_pool.post(std::forward<T_func>(func)); | |||
_thread_pool.post(std::forward<T_func>(func), thread_id); | |||
} | |||
/* entity */ | |||
@@ -38,15 +38,16 @@ namespace detail { | |||
inline decltype(auto) base_t<T_settings> | |||
::_create_entity(T_args&&... args) | |||
{ | |||
_to_match.emplace_back(_entities.claim(std::forward<T_args>(args)...)); | |||
return _to_match.back(); | |||
auto handle = _entities.claim(std::forward<T_args>(args)...); | |||
_to_match.insert(handle); | |||
return handle; | |||
} | |||
template<typename T_settings> | |||
inline void base_t<T_settings> | |||
::_kill_entity(const handle_type& handle) | |||
{ | |||
_to_kill.emplace_back(handle); | |||
_to_kill.insert(handle); | |||
} | |||
template<typename T_settings> | |||
@@ -56,6 +57,20 @@ namespace detail { | |||
return _entities.is_valid(handle); | |||
} | |||
template<typename T_settings> | |||
inline void base_t<T_settings> | |||
::_destroy_entity(const handle_type& handle) | |||
{ | |||
_entities.reclaim(handle); | |||
} | |||
template<typename T_settings> | |||
inline const auto& base_t<T_settings> | |||
::_entity_meta_data(const handle_type& handle) | |||
{ | |||
return _entities.meta_data(handle); | |||
} | |||
/* components */ | |||
template<typename T_settings> | |||
@@ -63,7 +78,7 @@ namespace detail { | |||
inline decltype(auto) base_t<T_settings> | |||
::_add_component(const handle_type& handle, T_component_tag ct) | |||
{ | |||
_to_match.emplace_back(handle); | |||
_to_match.insert(handle); | |||
auto& meta = _entities.meta_data(handle); | |||
auto& cmd = meta.storage_meta_data(); | |||
auto& c = _components.add(ct, handle, cmd); | |||
@@ -91,7 +106,7 @@ namespace detail { | |||
auto& bitset = meta.bitset(); | |||
auto& cmd = meta.storage_meta_data(); | |||
if (!bitset.has_component(ct)) | |||
throw std::invalid_argument("entity does not contain the requested compnent"); | |||
throw std::invalid_argument("entity does not contain the requested component"); | |||
return _components.get(ct, handle, cmd); | |||
} | |||
@@ -154,7 +169,7 @@ namespace detail { | |||
::_for_systems_parallel(T_func&& func) | |||
{ | |||
core::utils::counter_blocker counter(_systems.size()); | |||
counter.execute_and_wait_until_zero([this, &counter, func = std::forward<T_func>(func)]{ | |||
counter.execute_and_wait([this, &counter, func = std::forward<T_func>(func)]{ | |||
_systems.for_each([this, &counter, &func](auto& instance){ | |||
this->_post_in_thread_pool([&counter, &func, &instance]{ | |||
ecs_make_scope_guard([&counter](){ | |||
@@ -18,16 +18,91 @@ namespace context { | |||
private: | |||
using settings_type = T_settings; | |||
using this_type = context_t<settings_type>; | |||
using base_type = step_proxy_t<settings_type>; | |||
using step_proxy = step_proxy_t<settings_type>; | |||
using defer_proxy = defer_proxy_t<settings_type>; | |||
using base_type = step_proxy; | |||
private: | |||
inline void refresh() | |||
{ | |||
refresh_execute_deferred(); | |||
refresh_kill_entities(); | |||
refresh_match_entities(); | |||
} | |||
inline void refresh_match_entities() | |||
{ | |||
/* match new/changed entities to system instances */ | |||
for_systems_dispatch([this](auto& instance){ | |||
auto& system_bitset = instance.bitset(); | |||
for (const auto& handle : this->_to_match) | |||
{ | |||
auto& entity_bitset = this->entity_meta_data(handle).bitset(); | |||
if (entity_bitset.contains(system_bitset)) | |||
{ | |||
if (instance.subscribe(handle)) | |||
{ | |||
// TODO send event | |||
} | |||
} | |||
else if (instance.unsubscribe(handle)) | |||
{ | |||
// TODO send event | |||
} | |||
} | |||
}); | |||
this->_to_match.clear(); | |||
} | |||
inline void refresh_kill_entities() | |||
{ | |||
/* copy entities to kill to the main vector */ | |||
for_systems_sequential([this](auto& instance){ | |||
instance.for_states([this](auto& state){ | |||
for (auto& handle : state.to_kill) | |||
{ | |||
this->_to_kill.insert(handle); | |||
} | |||
}); | |||
}); | |||
/* unsubscribe dead entities */ | |||
for_systems_dispatch([this](auto& instance){ | |||
for (const auto& handle : this->_to_kill) | |||
{ | |||
if (instance.unsubscribe(handle)) | |||
{ | |||
// TODO send event | |||
} | |||
} | |||
}); | |||
/* reclaim all killed entities */ | |||
for (const auto& handle : this->_to_kill) | |||
{ | |||
destroy_entity(handle); | |||
} | |||
this->_to_kill.clear(); | |||
} | |||
inline void refresh_execute_deferred() | |||
{ | |||
defer_proxy& proxy = *this; | |||
for_systems_sequential([&proxy](auto& instance){ | |||
instance.for_states([&proxy](auto& state){ | |||
state.deferred_functions.execute_all(proxy); | |||
}); | |||
}); | |||
} | |||
public: | |||
ecs_context_proxy_func(this_type, destroy_entity) | |||
ecs_context_proxy_func(this_type, post_in_thread_pool) | |||
ecs_context_proxy_func(this_type, for_systems_sequential) | |||
ecs_context_proxy_func(this_type, for_systems_parallel) | |||
ecs_context_proxy_func(this_type, for_systems_dispatch) | |||
public: | |||
inline context_t() | |||
@@ -33,6 +33,7 @@ namespace context { | |||
ecs_context_proxy_func(this_type, create_entity) | |||
ecs_context_proxy_func(this_type, kill_entity) | |||
ecs_context_proxy_func(this_type, is_alive) | |||
ecs_context_proxy_func(this_type, entity_meta_data) | |||
public: /* component */ | |||
ecs_context_proxy_func(this_type, add_component) | |||
@@ -105,6 +105,13 @@ namespace storage { | |||
*/ | |||
inline auto& bitset(); | |||
/** | |||
* get the bitset of components stored for this entity | |||
* | |||
* @return component bitset | |||
*/ | |||
inline const auto& bitset() const; | |||
/** | |||
* get the current reusage counter of the entity | |||
* | |||
@@ -58,6 +58,13 @@ namespace storage { | |||
return _bitset; | |||
} | |||
template <typename T_settings, typename T_storage_meta_data> | |||
inline const auto& entity_meta_data<T_settings, T_storage_meta_data> | |||
::bitset() const | |||
{ | |||
return _bitset; | |||
} | |||
template <typename T_settings, typename T_storage_meta_data> | |||
inline auto entity_meta_data<T_settings, T_storage_meta_data> | |||
::counter() const | |||
@@ -120,7 +127,7 @@ namespace storage { | |||
grow(grow_size); | |||
} | |||
assert(!_free_ids.empty()); | |||
auto index = _free_ids.back(); | |||
auto index = _free_ids.front(); | |||
auto& item = _container.at(index); | |||
_free_ids.pop(); | |||
return entity_handle(index, item.counter()); | |||
@@ -1,7 +1,9 @@ | |||
#pragma once | |||
#include "./data_proxy/base.h" | |||
#include "./data_proxy/multi.h" | |||
#include "./data_proxy/single.h" | |||
#include "./data_proxy/base.inl" | |||
#include "./data_proxy/multi.inl" | |||
#include "./data_proxy/single.inl" |
@@ -0,0 +1,112 @@ | |||
#pragma once | |||
#include <ecs/config.h> | |||
#include <ecs/core/system/data_proxy/base.h> | |||
namespace ecs { | |||
namespace core { | |||
namespace system { | |||
namespace data_proxy { | |||
/** | |||
* data proxy to execute system non-parallel | |||
* | |||
* @tparam T_context context type of the ECS environment | |||
* @tparam T_instance system instance type | |||
*/ | |||
template<typename T_context, typename T_instance> | |||
struct multi | |||
: public base<T_context, T_instance> | |||
{ | |||
private: | |||
using context_type = T_context; | |||
using instance_type = T_instance; | |||
using base_type = base<context_type, instance_type>; | |||
public: // private: | |||
size_t _index; | |||
size_t _begin; | |||
size_t _end; | |||
public: | |||
/** | |||
* create the multi data proxy | |||
* | |||
* @param p_context context of the ECS environment | |||
* @param p_instance system instance | |||
* @param p_index index of this data proxy | |||
* @param p_begin index of the first entity to process | |||
* @param p_end index of the first entity not to process | |||
*/ | |||
inline multi(context_type& p_context, instance_type& p_instance, size_t p_index, size_t p_begin, size_t p_end); | |||
/** | |||
* execute the given function for all entities handled by this data proxy | |||
* | |||
* @tparam T_func function type to execute | |||
* | |||
* @param func function to execute | |||
*/ | |||
template<typename T_func> | |||
inline void for_entities(T_func&& func) const; | |||
/** | |||
* execute the given function for all entities not handled by this data proxy | |||
* | |||
* @tparam T_func function type to execute | |||
* | |||
* @param func function to execute | |||
*/ | |||
template<typename T_func> | |||
void for_other_entities(T_func&& func) const; | |||
/** | |||
* execute the given function for all entities | |||
* | |||
* @tparam T_func function type to execute | |||
* | |||
* @param func function to execute | |||
*/ | |||
template<typename T_func> | |||
void for_all_entities(T_func&& func) const; | |||
/** | |||
* get the number of entities handled by this data proxy | |||
* | |||
* @return number of entities handle by this data proxy | |||
*/ | |||
inline size_t entity_count() const; | |||
/** | |||
* get the number of entities not handled by this data proxy | |||
* | |||
* @return number of entities not handle by this data proxy | |||
*/ | |||
inline size_t other_entity_count() const; | |||
/** | |||
* get the number of all entities | |||
* | |||
* @return number of all entities | |||
*/ | |||
inline size_t all_entity_count() const; | |||
}; | |||
/** | |||
* create a multi data proxy (data proxy that does not execute in parallel) | |||
* | |||
* @tparam T_context context type | |||
* @tparam T_instance instance type | |||
* | |||
* @param context context | |||
* @param instance instance | |||
* @param index index of this data proxy | |||
* @param begin index of the first entity to process | |||
* @param end index of the first entity not to process | |||
* | |||
* @return multi data proxy | |||
*/ | |||
template<typename T_context, typename T_instance> | |||
constexpr decltype(auto) make_multi(T_context&& context, T_instance&& instance, size_t index, size_t begin, size_t end); | |||
} } } } |
@@ -0,0 +1,96 @@ | |||
#pragma once | |||
#include <ecs/core/system/data_proxy/multi.h> | |||
namespace ecs { | |||
namespace core { | |||
namespace system { | |||
namespace data_proxy { | |||
/* multi */ | |||
template<typename T_context, typename T_instance> | |||
inline multi<T_context, T_instance> | |||
::multi(context_type& p_context, instance_type& p_instance, size_t p_index, size_t p_begin, size_t p_end) | |||
: base_type (p_context, p_instance) | |||
, _index (p_index) | |||
, _begin (p_begin) | |||
, _end (p_end) | |||
{ } | |||
template<typename T_context, typename T_instance> | |||
template<typename T_func> | |||
inline void multi<T_context, T_instance> | |||
::for_entities(T_func&& func) const | |||
{ | |||
for (size_t i = _begin; i < _end; ++i) | |||
{ | |||
std::forward<T_func>(func)(this->instance.subscribed()[i]); | |||
} | |||
} | |||
template<typename T_context, typename T_instance> | |||
template<typename T_func> | |||
inline void multi<T_context, T_instance> | |||
::for_other_entities(T_func&& func) const | |||
{ | |||
auto& subscribed = this->instance.subscribed(); | |||
auto total_count = this->instance.subscribed_count(); | |||
for (size_t i = 0; i < _begin; ++i) | |||
{ | |||
std::forward<T_func>(func)(subscribed()[i]); | |||
} | |||
for (size_t i = _end; i < total_count; ++i) | |||
{ | |||
std::forward<T_func>(func)(subscribed()[i]); | |||
} | |||
} | |||
template<typename T_context, typename T_instance> | |||
template<typename T_func> | |||
inline void multi<T_context, T_instance> | |||
::for_all_entities(T_func&& func) const | |||
{ | |||
for (auto& e : this->instance.subscribed()) | |||
{ | |||
std::forward<T_func>(func)(e); | |||
} | |||
} | |||
template<typename T_context, typename T_instance> | |||
inline size_t multi<T_context, T_instance> | |||
::entity_count() const | |||
{ | |||
return _end - _begin; | |||
} | |||
template<typename T_context, typename T_instance> | |||
inline size_t multi<T_context, T_instance> | |||
::other_entity_count() const | |||
{ | |||
return all_entity_count() - entity_count(); | |||
} | |||
template<typename T_context, typename T_instance> | |||
inline size_t multi<T_context, T_instance> | |||
::all_entity_count() const | |||
{ | |||
return this->instance.subscribed_count(); | |||
} | |||
/* make_multi */ | |||
template<typename T_context, typename T_instance> | |||
constexpr decltype(auto) make_multi(T_context&& context, T_instance&& instance, size_t index, size_t begin, size_t end) | |||
{ | |||
using context_type = mp::decay_t<T_context>; | |||
using instance_type = mp::decay_t<T_instance>; | |||
return multi<context_type, instance_type>( | |||
std::forward<T_context>(context), | |||
std::forward<T_instance>(instance), | |||
index, | |||
begin, | |||
end); | |||
} | |||
} } } } |
@@ -34,7 +34,7 @@ namespace data_proxy { | |||
* @param func function to execute | |||
*/ | |||
template<typename T_func> | |||
void for_entities(T_func&& func) const; | |||
inline void for_entities(T_func&& func) const; | |||
/** | |||
* execute the given function for all entities not handled by this data proxy | |||
@@ -44,7 +44,7 @@ namespace data_proxy { | |||
* @param func function to execute | |||
*/ | |||
template<typename T_func> | |||
void for_other_entities(T_func&& func) const; | |||
inline void for_other_entities(T_func&& func) const; | |||
/** | |||
* execute the given function for all entities | |||
@@ -54,7 +54,7 @@ namespace data_proxy { | |||
* @param func function to execute | |||
*/ | |||
template<typename T_func> | |||
void for_all_entities(T_func&& func) const; | |||
inline void for_all_entities(T_func&& func) const; | |||
/** | |||
* get the number of entities handled by this data proxy | |||
@@ -11,24 +11,24 @@ namespace data_proxy { | |||
template<typename T_context, typename T_instance> | |||
template<typename T_func> | |||
void single<T_context, T_instance> | |||
inline void single<T_context, T_instance> | |||
::for_entities(T_func&& func) const | |||
{ | |||
for (auto& e : this->instance.subscribed()) | |||
{ | |||
func(e); | |||
std::forward<T_func>(func)(e); | |||
} | |||
} | |||
template<typename T_context, typename T_instance> | |||
template<typename T_func> | |||
void single<T_context, T_instance> | |||
inline void single<T_context, T_instance> | |||
::for_other_entities(T_func&& func) const | |||
{ } | |||
template<typename T_context, typename T_instance> | |||
template<typename T_func> | |||
void single<T_context, T_instance> | |||
inline void single<T_context, T_instance> | |||
::for_all_entities(T_func&& func) const | |||
{ | |||
for_entities(std::forward<T_func>(func)); | |||
@@ -2,6 +2,7 @@ | |||
#include <ecs/config.h> | |||
#include <ecs/signature/system.h> | |||
#include <ecs/context/defer_proxy.h> | |||
#include <ecs/core/utils/bitset.h> | |||
#include <ecs/core/utils/fixed_function.h> | |||
#include <ecs/core/utils/ordered_vector.h> | |||
@@ -14,7 +15,8 @@ namespace system { | |||
struct deferred_function_vector | |||
{ | |||
private: | |||
using deferred_proxy_type = int; | |||
using settings_type = T_settings; | |||
using deferred_proxy_type = context::detail::defer_proxy_t<settings_type>; | |||
using function_type = utils::fixed_function<void(deferred_proxy_type&)>; | |||
using function_vector_type = std::vector<function_type>; | |||
@@ -39,8 +41,20 @@ namespace system { | |||
} | |||
}; | |||
template<typename T_settings> | |||
constexpr decltype(auto) get_scheduler_instance_meta_data_type(T_settings&&) noexcept | |||
{ | |||
using settings_type = T_settings; | |||
using context_type = ::ecs::context::type<settings_type>; | |||
using scheduler_type = decltype((settings_type { }).scheduler()(settings_type { }, std::declval<context_type&>())); | |||
using instance_meta_data_type = typename scheduler_type::instance_meta_data_type; | |||
return hana::type_c<instance_meta_data_type>; | |||
} | |||
template<typename T_settings, typename T_system_signature, typename T_entity_handle> | |||
struct instance | |||
: private mp::unwrap_t<mp::decay_t<decltype(get_scheduler_instance_meta_data_type(T_settings { }))>> | |||
{ | |||
static_assert(decltype(signature::system::is_valid(mp::unwrap_t<T_system_signature> { })) { }); | |||
@@ -51,6 +65,7 @@ namespace system { | |||
using system_tag_type = mp::decay_t<decltype(core::mp::unwrap(system_signature_type { }).tag())>; | |||
using system_type = mp::unwrap_t<system_tag_type>; | |||
using output_type = mp::unwrap_t<mp::decay_t<decltype(mp::unwrap(system_signature_type { }).output())>>; | |||
using scheduler_meta_data_type = mp::unwrap_t<mp::decay_t<decltype(get_scheduler_instance_meta_data_type(T_settings { }))>>; | |||
private: | |||
using signature_list_type = decltype((settings_type { }).system_signatures()); | |||
@@ -92,7 +107,7 @@ namespace system { | |||
public: | |||
instance() | |||
: _bitset () | |||
: _bitset (bitset_type::from_system_signature(mp::unwrap(system_signature_type { }))) | |||
, _system () | |||
, _executor (mp::unwrap(system_signature_type { }).parallelism()()) | |||
, _subscribed() | |||
@@ -122,34 +137,17 @@ namespace system { | |||
clear_and_prepare(n); | |||
} | |||
template<typename T_context, typename T_func> | |||
inline void prepare_and_wait_subtasks(T_context& context, size_t n, T_func& func) | |||
{ | |||
assert(n > 0); | |||
clear_and_prepare(n); | |||
utils::counter_blocker b(n-1); | |||
auto run_in_seperate_thread = [this, &context, &b](auto& f) { | |||
return [this, &b, &context, &f](auto&&... xs) { | |||
context.post_in_thread_pool([&f, &b, xs...]() { | |||
ecs_make_scope_guard([&b](){ | |||
b.decrement(); | |||
}); | |||
f(xs...); | |||
}); | |||
}; | |||
}; | |||
b.execute_and_wait_until_zero([&func, &run_in_seperate_thread]{ | |||
func(run_in_seperate_thread); | |||
}); | |||
} | |||
template<typename T_context, typename T_func> | |||
inline void execute(T_context& context, T_func&& func) | |||
{ _executor(context, *this, std::forward<T_func>(func)); } | |||
public: /* bitset */ | |||
public: /* misc */ | |||
constexpr decltype(auto) scheduler_meta_data() noexcept | |||
{ return static_cast<scheduler_meta_data_type&>(*this); } | |||
constexpr decltype(auto) signature() const noexcept | |||
{ return mp::unwrap(system_signature_type { }); } | |||
inline const auto& bitset() const noexcept | |||
{ return _bitset; } | |||
@@ -173,11 +171,22 @@ namespace system { | |||
inline bool is_subscribed(const entity_handle_type& handle) const | |||
{ return (_subscribed.find(handle) != _subscribed.end()); } | |||
inline void subscribe(const entity_handle_type& handle) | |||
{ _subscribed.insert(handle); } | |||
inline bool subscribe(const entity_handle_type& handle) | |||
{ return _subscribed.insert(handle).second; } | |||
inline void unsubscribe(const entity_handle_type& handle) | |||
{ _subscribed.erase(handle); } | |||
inline bool unsubscribe(const entity_handle_type& handle) | |||
{ | |||
auto it = _subscribed.find(handle); | |||
if (it != _subscribed.end()) | |||
{ | |||
_subscribed.erase(it); | |||
return true; | |||
} | |||
else | |||
{ | |||
return false; | |||
} | |||
} | |||
public: /* states */ | |||
template<typename T_func> | |||
@@ -20,7 +20,7 @@ namespace parallelism { | |||
template<typename T_context, typename T_instance, typename T_func> | |||
inline void operator()(T_context& context, T_instance& instance, T_func&& func) const | |||
{ | |||
bool threshold_reached = false; // TODO | |||
bool threshold_reached = instance.subscribed_count() >= parameters_type::get_threshold(); | |||
if (!threshold_reached) _strategy_lower (instance, context, std::forward<T_func>(func)); | |||
else _strategy_greater(instance, context, std::forward<T_func>(func)); | |||
} | |||
@@ -10,29 +10,55 @@ namespace parallelism { | |||
struct none | |||
{ | |||
public: | |||
template<typename T_context, typename T_instance> | |||
struct executor_proxy | |||
{ | |||
public: | |||
using context_type = T_context; | |||
using instance_type = T_instance; | |||
context_type& context; | |||
instance_type& instance; | |||
public: | |||
template<typename T_func> | |||
inline void for_subtasks(T_func&& func) | |||
{ | |||
instance.prepare_states(1); | |||
auto data = data_proxy::make_single(context, instance); | |||
func(data); | |||
std::forward<T_func>(func)(data); | |||
} | |||
}; | |||
private: | |||
#ifndef NDEBUG | |||
bool _bound { false }; | |||
mutable std::thread::id _thread_id { 0 }; | |||
#endif | |||
public: | |||
#ifndef NDEBUG | |||
none(bool bound) | |||
: _bound(bound) | |||
{ } | |||
#else | |||
none() | |||
{ } | |||
#endif | |||
template<typename T_context, typename T_instance, typename T_func> | |||
inline void operator()(T_context& context, T_instance& instance, T_func&& func) const | |||
{ | |||
#ifndef NDEBUG | |||
assert( !_bound | |||
&& ( _thread_id == std::thread::id { 0 } | |||
|| _thread_id == std::this_thread::get_id())); | |||
_thread_id = std::this_thread::get_id(); | |||
#endif | |||
executor_proxy<T_context, T_instance> ep { context, instance }; | |||
func(instance, ep); | |||
std::forward<T_func>(func)(instance, ep); | |||
} | |||
}; | |||
@@ -0,0 +1,66 @@ | |||
#pragma once | |||
#include <ecs/config.h> | |||
#include <ecs/core/system/data_proxy.h> | |||
namespace ecs { | |||
namespace core { | |||
namespace system { | |||
namespace parallelism { | |||
struct split_base | |||
{ | |||
public: | |||
template<typename T_context, typename T_instance> | |||
struct executor_proxy | |||
{ | |||
public: | |||
using context_type = T_context; | |||
using instance_type = T_instance; | |||
context_type& context; | |||
instance_type& instance; | |||
size_t split_count; | |||
size_t per_split; | |||
public: | |||
template<typename T_func> | |||
inline void for_subtasks(T_func&& func) | |||
{ | |||
auto total_count = instance.subscribed_count(); | |||
instance.prepare_states(split_count); | |||
utils::counter_blocker counter(split_count - 1); | |||
/* execute the splitted subtask */ | |||
auto execute = [this, &func](size_t index, size_t from, size_t to) | |||
{ | |||
auto data = data_proxy::make_multi(context, instance, index, from, to); | |||
func(data); | |||
}; | |||
/* create subtasks */ | |||
for (size_t i = 0; i < split_count - 1; ++i) | |||
{ | |||
context.post_in_thread_pool([this, &counter, &execute, i](auto){ | |||
ecs_make_scope_guard([&counter](){ | |||
counter.decrement(); | |||
}); | |||
auto beg = i * per_split; | |||
auto end = (i + 1) * per_split; | |||
execute(i, beg, end); | |||
}); | |||
} | |||
/* execute and wait */ | |||
counter.execute_and_wait([this, &execute, total_count]{ | |||
auto index = split_count - 1; | |||
auto beg = index * per_split; | |||
auto end = total_count; | |||
execute(index, beg, end); | |||
}); | |||
} | |||
}; | |||
}; | |||
} } } } |
@@ -1,6 +1,7 @@ | |||
#pragma once | |||
#include <ecs/config.h> | |||
#include <ecs/core/system/parallelism/strategy/split_base.h> | |||
namespace ecs { | |||
namespace core { | |||
@@ -9,14 +10,19 @@ namespace parallelism { | |||
template<typename T_parameters> | |||
struct split_evenly | |||
: private split_base | |||
{ | |||
public: | |||
using parameters_type = T_parameters; | |||
template<typename T_context, typename T_instance, typename T_func> | |||
inline void operator()(T_context& context, T_instance& instance, T_func&& func) const | |||
{ | |||
// TODO | |||
// auto split_count = parameters_type::get_split_count(); | |||
auto split_count = parameters_type::get_split_count(); | |||
auto total_count = instance.subscribed_count(); | |||
auto per_split = total_count / split_count; | |||
executor_proxy<T_context, T_instance> ep { context, instance, split_count, per_split }; | |||
func(instance, ep); | |||
} | |||
}; | |||
@@ -1,6 +1,7 @@ | |||
#pragma once | |||
#include <ecs/config.h> | |||
#include <ecs/core/system/parallelism/strategy/split_base.h> | |||
namespace ecs { | |||
namespace core { | |||
@@ -9,14 +10,19 @@ namespace parallelism { | |||
template<typename T_parameters> | |||
struct split_every | |||
: private split_base | |||
{ | |||
using parameters_type = T_parameters; | |||
template<typename T_context, typename T_instance, typename T_func> | |||
inline void operator()(T_context& context, T_instance& instance, T_func&& func) const | |||
{ | |||
// TODO | |||
// auto per_split = parameters_type::get_per_split_count(); | |||
auto per_split = parameters_type::get_per_split_count(); | |||
auto total_count = instance.subscribed_count(); | |||
auto split_count = (total_count + per_split - 1) / per_split; | |||
executor_proxy<T_context, T_instance> ep { context, instance, split_count, per_split }; | |||
func(instance, ep); | |||
} | |||
}; | |||
@@ -3,6 +3,7 @@ | |||
#include <ecs/config.h> | |||
#include <ecs/tag/system.h> | |||
#include <ecs/context/context.fwd.h> | |||
#include <ecs/core/utils/thread_pool/task_queue.h> | |||
namespace ecs { | |||
namespace core { | |||
@@ -11,6 +12,14 @@ namespace scheduler { | |||
namespace detail | |||
{ | |||
/** | |||
* meta data to store for each system instance | |||
*/ | |||
struct instance_meta_data_t | |||
{ | |||
ssize_t thread_id = -1; | |||
}; | |||
/** | |||
* struct to wrap some system meta data | |||
*/ | |||
@@ -113,6 +122,34 @@ namespace scheduler { | |||
constexpr decltype(auto) operator()(T_dependency_items) const noexcept; | |||
}; | |||
/** | |||
* extention of the normal counter_blocker to handle tasks to be executed in the context of the main thread | |||
*/ | |||
struct task_counter_blocker_t | |||
: public utils::counter_blocker | |||
{ | |||
private: | |||
utils::task_queue _tasks; //!< tasks to execute in the main thread | |||
public: | |||
using utils::counter_blocker::counter_blocker; | |||
/** | |||
* enqueue a new task inside the task list | |||
* | |||
* @tparam T_task type of the task | |||
* | |||
* @param task task to execute | |||
*/ | |||
template<typename T_task> | |||
inline void post(T_task&& task); | |||
/** | |||
* execute all tasks stored in the queue | |||
*/ | |||
inline void process(); | |||
}; | |||
/** | |||
* defines a group of tasks to execute with | |||
* | |||
@@ -133,7 +170,7 @@ namespace scheduler { | |||
private: | |||
context_type& _context; //!< context to use for system execution | |||
utils::counter_blocker& _counter; //!< counter to track running tasks | |||
task_counter_blocker_t& _counter; //!< counter to track running tasks | |||
tasks_tuple_type _tasks; //!< tuple of all tasks | |||
public: | |||
@@ -143,7 +180,7 @@ namespace scheduler { | |||
* @param p_context context to use for system execution | |||
* @param p_counter counter to track running tasks | |||
*/ | |||
inline task_group_t(context_type& p_context, utils::counter_blocker& p_counter); | |||
inline task_group_t(context_type& p_context, task_counter_blocker_t& p_counter); | |||
/** | |||
* start the given task | |||
@@ -223,8 +260,9 @@ namespace scheduler { | |||
struct atomic_counter | |||
{ | |||
public: | |||
using settings_type = T_settings; | |||
using context_type = ::ecs::context::type<settings_type>; | |||
using settings_type = T_settings; | |||
using context_type = ::ecs::context::type<settings_type>; | |||
using instance_meta_data_type = detail::instance_meta_data_t; | |||
private: | |||
context_type& _context; | |||
@@ -1,7 +1,11 @@ | |||
#pragma once | |||
// #define ECS_DEBUG_ATOMIC_COUNTER | |||
#include <ecs/core/system/scheduler/atomic_counter.h> | |||
#include <ecs/core/utils/fixed_function.inl> | |||
namespace ecs { | |||
namespace core { | |||
namespace system { | |||
@@ -10,6 +14,25 @@ namespace scheduler { | |||
namespace detail | |||
{ | |||
/* misc */ | |||
struct make_is_bound_to_main | |||
{ | |||
constexpr decltype(auto) operator()() const | |||
{ | |||
return hana::is_valid( | |||
[](auto t) -> hana::type<typename decltype(t)::type::bind_to_main> { }); | |||
} | |||
}; | |||
struct make_is_bound_to_thread | |||
{ | |||
constexpr decltype(auto) operator()() const | |||
{ | |||
return hana::is_valid( | |||
[](auto t) -> hana::type<typename decltype(t)::type::bind_to_thread> { }); | |||
} | |||
}; | |||
/* task_t */ | |||
@@ -78,11 +101,53 @@ namespace scheduler { | |||
}); | |||
} | |||
/* task_counter_blocker_t */ | |||
template<typename T_task> | |||
inline void task_counter_blocker_t | |||
::post(T_task&& task) | |||
{ | |||
std::lock_guard lock(_mutex); | |||
_tasks.emplace(std::forward<T_task>(task)); | |||
_cond_var.notify_all(); | |||
} | |||
inline void task_counter_blocker_t | |||
::process() | |||
{ | |||
while(true) | |||
{ | |||
utils::task t; | |||
{ | |||
std::lock_guard lock(_mutex); | |||
if (_tasks.empty()) | |||
{ | |||
return; | |||
} | |||
t = std::move(_tasks.front()); | |||
_tasks.pop(); | |||
try | |||
{ | |||
t(static_cast<size_t>(-1)); | |||
} | |||
catch(const std::exception& ex) | |||
{ | |||
std::cerr << "error in main thread: " << ex.what() << std::endl; | |||
} | |||
catch(...) | |||
{ | |||
std::cerr << "error in main thread: unknown" << std::endl; | |||
} | |||
} | |||
} | |||
} | |||
/* task_group_t */ | |||
template<typename T_context, typename T_dependency_items> | |||
inline task_group_t<T_context, T_dependency_items> | |||
::task_group_t(context_type& p_context, utils::counter_blocker& p_counter) | |||
::task_group_t(context_type& p_context, task_counter_blocker_t& p_counter) | |||
: _context(p_context) | |||
, _counter(p_counter) | |||
{ } | |||
@@ -106,10 +171,36 @@ namespace scheduler { | |||
inline void task_group_t<T_context, T_dependency_items> | |||
::post_task_in_thread_pool(T_task_id, T_func&& func) | |||
{ | |||
std::cout << "post_task_in_thread_pool (current=" << std::this_thread::get_id() << ", task=" << T_task_id::value << ")" << std::endl; | |||
_context.post_in_thread_pool([this, &func]{ | |||
execute_task(T_task_id { }, func); | |||
}); | |||
using task_type = mp::decay_t<decltype(hana::at(_tasks, T_task_id { }))>; | |||
using instance_type = mp::decay_t<decltype(_context.instance_by_tag(std::declval<task_type>().dependency_item.tag))>; | |||
using signature_type = typename instance_type::system_signature_type; | |||
using executor_builder_type = mp::decay_t<decltype(mp::unwrap(signature_type { }).parallelism())>; | |||
using is_bound_to_main_type = mp::decay_t<decltype(make_is_bound_to_main { } ())>; | |||
using is_bound_to_thread_type = mp::decay_t<decltype(make_is_bound_to_thread { } ())>; | |||
hana::eval_if( | |||
is_bound_to_main_type{ }(hana::type_c<executor_builder_type>), | |||
[this, &func](auto _){ | |||
_counter.post([this, &func](size_t thread_id){ | |||
execute_task(T_task_id { }, func); | |||
}); | |||
}, | |||
[this, &func](auto _){ | |||
hana::eval_if(is_bound_to_thread_type{ }(hana::type_c<executor_builder_type>), | |||
[this, &func](auto _){ | |||
auto& meta = _context.instance_by_tag(task_type { }.dependency_item.tag).scheduler_meta_data(); | |||
_context.post_in_thread_pool([this, &func, &meta](size_t thread_id){ | |||
meta.thread_id = static_cast<ssize_t>(thread_id); | |||
execute_task(T_task_id { }, func); | |||
}, meta.thread_id); | |||
}, | |||
[this, &func](auto _){ | |||
_context.post_in_thread_pool([this, &func](size_t thread_id){ | |||
execute_task(T_task_id { }, func); | |||
}); | |||
}); | |||
}); | |||
} | |||
template<typename T_context, typename T_dependency_items> | |||
@@ -117,7 +208,6 @@ namespace scheduler { | |||
inline void task_group_t<T_context, T_dependency_items> | |||
::execute_task(T_task_id, T_func&& func) | |||
{ | |||
std::cout << "execute_task (current=" << std::this_thread::get_id() << ", task=" << T_task_id::value << ")" << std::endl; | |||
auto& task = hana::at(_tasks, T_task_id { }); | |||
auto& instance = _context.instance_by_tag(task.dependency_item.tag); | |||
instance.execute(_context, func); | |||
@@ -129,7 +219,6 @@ namespace scheduler { | |||
inline void task_group_t<T_context, T_dependency_items> | |||
::check_and_start_dependencies(T_task_id, T_func&& func) | |||
{ | |||
std::cout << "check_and_start_dependencies (current=" << std::this_thread::get_id() << ", task=" << T_task_id::value << ")" << std::endl; | |||
auto& task = hana::at(_tasks, T_task_id { }); | |||
task.for_dependent_ids([this, &func](auto id){ | |||
auto& other = hana::at(_tasks, id); | |||
@@ -232,6 +321,8 @@ namespace scheduler { | |||
} | |||
} | |||
/* atomic_counter */ | |||
template<typename T_settings> | |||
inline atomic_counter<T_settings> | |||
::atomic_counter(context_type& p_context) | |||
@@ -250,7 +341,7 @@ namespace scheduler { | |||
using independent_item_ids_type = mp::decay_t<decltype(detail::get_independent_item_ids( | |||
dependency_list_type { }))>;; | |||
/* TODO debug beg! */ | |||
#ifdef ECS_DEBUG_ATOMIC_COUNTER | |||
size_t i = 0; | |||
std::cout << "dependency_list" << std::endl; | |||
hana::for_each(dependency_list_type { }, [&i](auto item){ | |||
@@ -260,19 +351,23 @@ namespace scheduler { | |||
std::cout << " " << hana::value(id) << std::endl; | |||
}); | |||
}); | |||
/* TODO debug end! */ | |||
#endif | |||
using task_group_type = detail::task_group_t<context_type, dependency_list_type>; | |||
utils::counter_blocker counter (1); | |||
task_group_type task_group (_context, counter); | |||
detail::task_counter_blocker_t counter (1); | |||
task_group_type task_group (_context, counter); | |||
counter.execute_and_wait_until_zero([&task_group, &counter, &func]{ | |||
hana::for_each(independent_item_ids_type { }, [&task_group, &func](auto id){ | |||
task_group.start_task(id, func); | |||
counter.execute_and_wait_tick( | |||
[&task_group, &counter, &func]{ | |||
hana::for_each(independent_item_ids_type { }, [&task_group, &func](auto id){ | |||
task_group.start_task(id, func); | |||
}); | |||
counter.decrement(); | |||
}, | |||
[&counter]{ | |||
counter.process(); | |||
}); | |||
counter.decrement(); | |||
}); | |||
} | |||
} } } } |
@@ -53,14 +53,14 @@ namespace storage { | |||
make_tuple { }))>; | |||
private: | |||
storage_type _storage; | |||
storage_type _storage; | |||
public: | |||
constexpr decltype(auto) size() const noexcept | |||
{ return (ssl_type { }).size(); } | |||
template<typename T_func> | |||
constexpr void for_each(T_func&& f) const noexcept | |||
constexpr void for_each(T_func&& f) noexcept | |||
{ hana::for_each(_storage, std::forward<T_func>(f)); } | |||
template<typename T_system> | |||
@@ -52,7 +52,19 @@ namespace utils { | |||
* @return integral constant with the ID of the requested component tag | |||
*/ | |||
template<typename T_component_tag> | |||
static constexpr decltype(auto) component_id(T_component_tag ct) noexcept; | |||
static constexpr decltype(auto) component_id(T_component_tag&& ct) noexcept; | |||
/** | |||
* get the bitset from the given system signature | |||
* | |||
* @tparam T_system_signature system signature type to get bitmask for | |||
* | |||
* @param ssig system signature to get bitmask for | |||
* | |||
* @return bitmask for the passed system signature | |||
*/ | |||
template<typename T_system_signature> | |||
static constexpr decltype(auto) from_system_signature(T_system_signature&& ssig) noexcept; | |||
/** | |||
* internal bitset type that contains all components | |||
@@ -63,6 +75,13 @@ namespace utils { | |||
bitset_type _bitset; | |||
public: | |||
/** | |||
* return a string representation of the bitset | |||
* | |||
* @return string representation of the bitset | |||
*/ | |||
inline decltype(auto) to_string() const; | |||
/** | |||
* clear all bits of the bitset | |||
*/ | |||
@@ -9,9 +9,28 @@ namespace utils { | |||
template<typename T_settings> | |||
template<typename T_component_tag> | |||
constexpr decltype(auto) bitset<T_settings> | |||
::component_id(T_component_tag ct) noexcept | |||
::component_id(T_component_tag&& ct) noexcept | |||
{ | |||
return mp::list::index_of(all_components(), ct); | |||
return mp::list::index_of(all_components(), std::forward<T_component_tag>(ct)); | |||
} | |||
template<typename T_settings> | |||
template<typename T_system_signature> | |||
constexpr decltype(auto) bitset<T_settings> | |||
::from_system_signature(T_system_signature&& ssig) noexcept | |||
{ | |||
bitset ret; | |||
hana::for_each(hana::concat(ssig.read(), ssig.write()), [&ret](auto ct){ | |||
ret.set_component(ct, true); | |||
}); | |||
return ret; | |||
} | |||
template<typename T_settings> | |||
inline decltype(auto) bitset<T_settings> | |||
::to_string() const | |||
{ | |||
return _bitset.to_string(); | |||
} | |||
template<typename T_settings> | |||
@@ -26,7 +45,7 @@ namespace utils { | |||
inline bool bitset<T_settings> | |||
::contains(const T_other& other) const noexcept | |||
{ | |||
return (_bitset & other._bitset) == _bitset; | |||
return (_bitset & other._bitset) == other._bitset; | |||
} | |||
template<typename T_settings> | |||
@@ -14,9 +14,9 @@ namespace utils { | |||
*/ | |||
struct counter_blocker | |||
{ | |||
private: | |||
std::mutex _mutex; //!< mutex to preotect the counter | |||
std::condition_variable _cond_var; //!< conditional variable to synchronize the counter | |||
protected: | |||
std::recursive_mutex _mutex; //!< mutex to preotect the counter | |||
std::condition_variable_any _cond_var; //!< conditional variable to synchronize the counter | |||
size_t _counter; //!< the actual counter value | |||
public: | |||
@@ -42,10 +42,23 @@ namespace utils { | |||
* | |||
* @tparam T_func type of the function to execute | |||
* | |||
* @param func function to execute | |||
* @param func function to execute before waiting for the counter | |||
*/ | |||
template<typename T_func> | |||
inline void execute_and_wait_until_zero(T_func&& func) noexcept; | |||
inline void execute_and_wait(T_func&& func); | |||
/** | |||
* Execute the given function and wait until the counter is equal to zero. | |||
* Execute the given tick function at least once and repeatedly if the counter value has changed. | |||
* | |||
* @tparam T_func type of the function to execute | |||
* @tparam T_tick type of tick function to execute | |||
* | |||
* @param func function to execute before waiting for the counter | |||
* @param tick tick function to execute | |||
*/ | |||
template<typename T_func, typename T_tick> | |||
inline void execute_and_wait_tick(T_func&& func, T_tick&& tick); | |||
}; | |||
} } } |
@@ -1,5 +1,6 @@ | |||
#pragma once | |||
#include <iostream> // TODO debug! | |||
#include <ecs/core/utils/counter_blocker.h> | |||
namespace ecs { | |||
@@ -30,11 +31,23 @@ namespace utils { | |||
template<typename T_func> | |||
inline void counter_blocker | |||
::execute_and_wait_until_zero(T_func&& func) noexcept | |||
::execute_and_wait(T_func&& func) | |||
{ | |||
func(); | |||
std::unique_lock lock(_mutex); | |||
_cond_var.wait(lock, [this]{ return (_counter == 0); }); | |||
} | |||
template<typename T_func, typename T_tick> | |||
inline void counter_blocker | |||
::execute_and_wait_tick(T_func&& func, T_tick&& tick) | |||
{ | |||
func(); | |||
std::unique_lock lock(_mutex); | |||
_cond_var.wait(lock, [this, t = std::forward<T_tick>(tick)]{ | |||
t(); | |||
return (_counter == 0); | |||
}); | |||
} | |||
} } } |
@@ -152,7 +152,7 @@ namespace utils { | |||
inline decltype(auto) insert(const value_type& value) | |||
{ | |||
auto it = std::lower_bound(begin(), end(), value); | |||
auto is_new = _compare(value, *it); | |||
auto is_new = (it != end() && _compare(value, *it)); | |||
if (it == end() || is_new) | |||
_items.insert(it, value); | |||
return std::make_pair(it, is_new); | |||
@@ -181,6 +181,9 @@ namespace utils { | |||
template<typename... T_args> | |||
constexpr decltype(auto) erase(T_args&&... args) | |||
{ return _items.erase(std::forward<T_args>(args)...); } | |||
void clear() | |||
{ _items.clear(); } | |||
}; | |||
} } } |
@@ -1,8 +1,9 @@ | |||
#pragma once | |||
#include "./thread_pool/pool.h" | |||
#include "./thread_pool/types.h" | |||
#include "./thread_pool/task_queue.h" | |||
#include "./thread_pool/worker.h" | |||
#include "./thread_pool/pool.inl" | |||
#include "./thread_pool/task_queue.inl" | |||
#include "./thread_pool/worker.inl" |
@@ -4,8 +4,8 @@ | |||
#include <vector> | |||
#include <ecs/config.h> | |||
#include <ecs/core/utils/thread_pool/types.h> | |||
#include <ecs/core/utils/thread_pool/worker.h> | |||
#include <ecs/core/utils/thread_pool/task_queue.h> | |||
namespace ecs { | |||
namespace core { | |||
@@ -17,13 +17,14 @@ namespace utils { | |||
struct thread_pool | |||
{ | |||
private: | |||
using worker_vector = std::vector<thread_pool_worker>; | |||
using worker_ptr_u = std::unique_ptr<thread_pool_worker>; | |||
using worker_vector = std::vector<worker_ptr_u>; | |||
using atomic_size_t = std::atomic<size_t>; | |||
private: | |||
task_queue _queue; //!< queue to store tasks to execute | |||
worker_vector _workers; //!< vector of worker threads | |||
atomic_size_t _outstanding_inits; //!< number of outstanding worker initializations | |||
concurrent_task_queue _tasks; //!< queue to store tasks to execute | |||
worker_vector _workers; //!< vector of worker threads | |||
atomic_size_t _outstanding_inits; //!< number of outstanding worker initializations | |||
/** | |||
* check if all workers are finished or not | |||
@@ -59,9 +60,10 @@ namespace utils { | |||
* @tparam T_task type of the task | |||
* | |||
* @param task task to execute | |||
* @param worker_id id of the worker to enqueue task at (-1 = any; n = worker tasks) | |||
*/ | |||
template<typename T_task> | |||
inline void post(T_task&& task); | |||
inline void post(T_task&& task, ssize_t worker_id = -1); | |||
}; | |||
} } } |
@@ -8,9 +8,25 @@ namespace utils { | |||
template<typename T_task> | |||
inline void thread_pool | |||
::post(T_task&& task) | |||
::post(T_task&& task, ssize_t worker_id) | |||
{ | |||
_queue.enqueue(std::forward<T_task>(task)); | |||
if (worker_id == -1) | |||
{ | |||
_tasks.push(std::forward<T_task>(task)); | |||
for (auto& w : _workers) | |||
{ | |||
w->signal(); | |||
} | |||
} | |||
else if (worker_id >= 1 && worker_id <= static_cast<ssize_t>(_workers.size())) | |||
{ | |||
auto& w = _workers.at(static_cast<size_t>(worker_id - 1)); | |||
w->post(std::forward<T_task>(task)); | |||
} | |||
else | |||
{ | |||
throw std::invalid_argument(std::string("invalid worker_id: ") + std::to_string(worker_id)); | |||
} | |||
} | |||
} } } |
@@ -0,0 +1,68 @@ | |||
#pragma once | |||
#include <sstream> // TODO debug! | |||
#include <queue> | |||
#include <ecs/config.h> | |||
#include <moodycamel/concurrentqueue.h> | |||
#include "../fixed_function.h" | |||
namespace ecs { | |||
namespace core { | |||
namespace utils { | |||
/** | |||
* Functor to execute as task inside the thread pool. The functor can have a max size of 128 bytes! | |||
*/ | |||
using task = fixed_function<void(size_t thread_id), 128>; | |||
/** | |||
* normal task queue | |||
*/ | |||
using task_queue = std::queue<task>; | |||
/** | |||
* none blocking concurrent queue to store thread pool tasks in | |||
*/ | |||
struct concurrent_task_queue | |||
{ | |||
private: | |||
using inner_queue_type = moodycamel::ConcurrentQueue<task>; | |||
private: | |||
std::atomic<ssize_t> _count; | |||
inner_queue_type _inner_queue; | |||
public: | |||
/** | |||
* constructor | |||
*/ | |||
concurrent_task_queue(); | |||
/** | |||
* check if the task queue is empty | |||
* | |||
* @retval TRUE if the queue is empty | |||
* @retval FALSE if the queue contains at least one element | |||
*/ | |||
inline bool empty() const; | |||
/** | |||
* push a new element to the queue | |||
* | |||
* @param t task to push to queue | |||
*/ | |||
void push(task&& t); | |||
/** | |||
* try to get dequeue a task from the queue | |||
* | |||
* @param t dequeued task | |||
* | |||
* @retval TRUE if an task was dequeued | |||
* @retval FALSE if the queue is empty | |||
*/ | |||
bool pop(task& t); | |||
}; | |||
} } } |
@@ -0,0 +1,15 @@ | |||
#pragma once | |||
#include <ecs/core/utils/thread_pool/task_queue.h> | |||
namespace ecs { | |||
namespace core { | |||
namespace utils { | |||
bool concurrent_task_queue | |||
::empty() const | |||
{ | |||
return (_count.load() <= 0); | |||
} | |||
} } } |
@@ -1,22 +0,0 @@ | |||
#pragma once | |||
#include <ecs/config.h> | |||
#include <moodycamel/blockingconcurrentqueue.h> | |||
#include "../fixed_function.h" | |||
namespace ecs { | |||
namespace core { | |||
namespace utils { | |||
/** | |||
* Functor to execute as task inside the thread pool. The functor can have a max size of 128 bytes! | |||
*/ | |||
using task = fixed_function<void(), 128>; | |||
/** | |||
* non blocking concurrent queue to store thread pool tasks in | |||
*/ | |||
using task_queue = moodycamel::BlockingConcurrentQueue<task>; | |||
} } } |
@@ -1,9 +1,11 @@ | |||
#pragma once | |||
#include <mutex> | |||
#include <thread> | |||
#include <condition_variable> | |||
#include <ecs/config.h> | |||
#include "./types.h" | |||
#include "./task_queue.h" | |||
#include "../movable_atomic.h" | |||
namespace ecs { | |||
@@ -29,9 +31,13 @@ namespace utils { | |||
using atomic_state = movable_atomic<state>; | |||
std::thread _thread; //!< actual thread object to use for execution | |||
task_queue& _queue; //!< queue to poll new tasks from | |||
atomic_state _state; //!< current state of the worker | |||
std::thread _thread; //!< actual thread object to use for execution | |||
std::mutex _mutex; //!< mutex to protext the conditional | |||
std::condition_variable _conditional; //!< condition variable to inform about new tasks | |||
concurrent_task_queue& _all_tasks; //!< queue to poll new tasks from | |||
task_queue _own_tasks; //!< queue of tasks that are assigned to this specific worker | |||
atomic_state _state; //!< current state of the worker | |||
size_t _thread_id; //!< index of the thread inside the thread pool | |||
private: | |||
/** | |||
@@ -39,13 +45,24 @@ namespace utils { | |||
*/ | |||
void run(); | |||
/** | |||
* pop the next task from either the own task list or the general task list | |||
* | |||
* @param t parameter to store dequeued task in | |||
* @param timeout timeout to wait for new tasks | |||
* | |||
* @return TRUE if a task was dequeued, FALSE otherwise | |||
*/ | |||
template<typename T_rep, typename T_period> | |||
inline bool pop(task& t, const std::chrono::duration<T_rep, T_period>& timeout); | |||
public: | |||
/** | |||
* constructor | |||
* | |||
* @param queue task queue to poll tasks from | |||
*/ | |||
inline thread_pool_worker(task_queue& queue) noexcept; | |||
inline thread_pool_worker(concurrent_task_queue& all_tasks, size_t thread_id) noexcept; | |||
inline thread_pool_worker(thread_pool_worker&&) = default; | |||
inline thread_pool_worker(const thread_pool_worker&) = delete; | |||
@@ -81,6 +98,21 @@ namespace utils { | |||
*/ | |||
inline bool finished() const noexcept; | |||
/** | |||
* enqueue a new task to execute inside this worker thread | |||
* | |||
* @tparam T_task type of the task | |||
* | |||
* @param task task to execute | |||
*/ | |||
template<typename T_task> | |||
inline void post(T_task&& task); | |||
/** | |||
* inform the task about new tasks in the concurent task list | |||
*/ | |||
inline void signal(); | |||
}; | |||
} } } |
@@ -1,16 +1,42 @@ | |||
#pragma once | |||
#include <ecs/core/utils/thread_pool/worker.h> | |||
#include <ecs/core/utils/thread_pool/task_queue.inl> | |||
namespace ecs { | |||
namespace core { | |||
namespace utils { | |||
inline thread_pool_worker | |||
::thread_pool_worker(task_queue& queue) noexcept | |||
: _queue(queue) | |||
::thread_pool_worker(concurrent_task_queue& all_tasks, size_t thread_id) noexcept | |||
: _all_tasks(all_tasks) | |||
, _thread_id(thread_id) | |||
{ } | |||
template<typename T_rep, typename T_period> | |||
inline bool thread_pool_worker | |||
::pop(task& t, const std::chrono::duration<T_rep, T_period>& timeout) | |||
{ | |||
std::unique_lock<std::mutex> lock(_mutex); | |||
do | |||
{ | |||
if (_state != state::running) | |||
{ | |||
return false; | |||
} | |||
if (!_own_tasks.empty()) | |||
{ | |||
t = std::move(_own_tasks.front()); | |||
_own_tasks.pop(); | |||
return true; | |||
} | |||
if (!_all_tasks.empty() && _all_tasks.pop(t)) // TODO: this will sometimes return false, even if the queue is not yet empty | |||
{ | |||
return true; | |||
} | |||
} while(_conditional.wait_for(lock, timeout) != std::cv_status::timeout); | |||
return false; | |||
} | |||
template<typename T_counter> | |||
inline void thread_pool_worker | |||
@@ -33,7 +59,6 @@ namespace utils { | |||
::join() noexcept | |||
{ | |||
assert(_thread.joinable()); | |||
assert(_state == state::finished); | |||
_thread.join(); | |||
} | |||
@@ -43,4 +68,19 @@ namespace utils { | |||
return _state == state::finished; | |||
} | |||
template<typename T_task> | |||
inline void thread_pool_worker | |||
::post(T_task&& task) | |||
{ | |||
std::unique_lock<std::mutex> lock(_mutex); | |||
_own_tasks.emplace(std::forward<T_task>(task)); | |||
signal(); | |||
} | |||
inline void thread_pool_worker | |||
::signal() | |||
{ | |||
_conditional.notify_all(); | |||
} | |||
} } } |
@@ -1,5 +1,7 @@ | |||
#pragma once | |||
#include "./strategy/bound.h" | |||
#include "./strategy/main.h" | |||
#include "./strategy/none.h" | |||
#include "./strategy/split_evenly_cores.h" | |||
#include "./strategy/split_evenly.h" |
@@ -0,0 +1,36 @@ | |||
#pragma once | |||
#include <ecs/config.h> | |||
#include <ecs/core/system/parallelism/strategy/none.h> | |||
namespace ecs { | |||
namespace signature { | |||
namespace system { | |||
namespace parallelism { | |||
namespace detail | |||
{ | |||
struct bound_builder_t | |||
{ | |||
using bind_to_thread = void; | |||
inline decltype(auto) operator()() const noexcept | |||
{ | |||
#ifndef NDEBUG | |||
return ::ecs::core::system::parallelism::none(false); | |||
#else | |||
return ::ecs::core::system::parallelism::none(); | |||
#endif | |||
} | |||
}; | |||
struct bound_t | |||
{ | |||
constexpr decltype(auto) operator()() const noexcept | |||
{ return bound_builder_t { }; } | |||
}; | |||
} | |||
constexpr decltype(auto) bound = detail::bound_t { }; | |||
} } } } |
@@ -0,0 +1,36 @@ | |||
#pragma once | |||
#include <ecs/config.h> | |||
#include <ecs/core/system/parallelism/strategy/none.h> | |||
namespace ecs { | |||
namespace signature { | |||
namespace system { | |||
namespace parallelism { | |||
namespace detail | |||
{ | |||
struct main_builder_t | |||
{ | |||
using bind_to_main = void; | |||
inline decltype(auto) operator()() const noexcept | |||
{ | |||
#ifndef NDEBUG | |||
return ::ecs::core::system::parallelism::none(false); | |||
#else | |||
return ::ecs::core::system::parallelism::none(); | |||
#endif | |||
} | |||
}; | |||
struct main_t | |||
{ | |||
constexpr decltype(auto) operator()() const noexcept | |||
{ return main_builder_t { }; } | |||
}; | |||
} | |||
constexpr decltype(auto) main = detail::main_t { }; | |||
} } } } |
@@ -12,8 +12,14 @@ namespace parallelism { | |||
{ | |||
struct none_builder_t | |||
{ | |||
constexpr decltype(auto) operator()() const noexcept | |||
{ return ::ecs::core::system::parallelism::none { }; } | |||
inline decltype(auto) operator()() const noexcept | |||
{ | |||
#ifndef NDEBUG | |||
return ::ecs::core::system::parallelism::none(false); | |||
#else | |||
return ::ecs::core::system::parallelism::none(); | |||
#endif | |||
} | |||
}; | |||
struct none_t | |||
@@ -15,7 +15,7 @@ namespace parallelism { | |||
{ | |||
struct parameters | |||
{ | |||
static constexpr size_t get_split_count() noexcept | |||
static constexpr size_t get_per_split_count() noexcept | |||
{ return T_split::value; } | |||
}; | |||
@@ -121,7 +121,7 @@ namespace system { | |||
public: /* setter */ | |||
/** set the parallelism options | |||
* @param parallelism is a predicate the takes no parameters and returns a | |||
* @param parallelism is a predicate that takes no parameters and returns a | |||
* system executor interface */ | |||
template<typename T_parallelism> | |||
constexpr decltype(auto) parallelism(T_parallelism parallelism) const noexcept; | |||
@@ -9,7 +9,7 @@ auto thread_pool | |||
{ | |||
for (const auto& w : _workers) | |||
{ | |||
if (!w.finished()) | |||
if (!w->finished()) | |||
{ | |||
return false; | |||
} | |||
@@ -23,19 +23,20 @@ void thread_pool | |||
_workers.reserve(count); | |||
for (size_t i = 0; i < count; ++i) | |||
{ | |||
_workers.emplace_back(_queue); | |||
_workers.emplace_back(std::make_unique<thread_pool_worker>(_tasks, i + 1)); | |||
} | |||
_outstanding_inits = count; | |||
for (auto& w : _workers) | |||
{ | |||
w.start(_outstanding_inits); | |||
w->start(_outstanding_inits); | |||
} | |||
} | |||
thread_pool | |||
::thread_pool(size_t count) | |||
{ | |||
assert(count > 1); | |||
initialize_workers(count); | |||
} | |||
@@ -48,21 +49,11 @@ thread_pool | |||
std::this_thread::sleep_for(std::chrono::milliseconds(50)); | |||
} | |||
// stop all workers | |||
for (auto& w : _workers) | |||
{ | |||
w.stop(); | |||
} | |||
// post dummy tasks untill all workers has stopped | |||
while (!all_workers_finished()) | |||
{ | |||
post([]{ }); | |||
} | |||
// join the worker threads | |||
for (auto& w : _workers) | |||
{ | |||
w.join(); | |||
w->stop(); | |||
w->signal(); | |||
w->join(); | |||
} | |||
} | |||
} |
@@ -0,0 +1,38 @@ | |||
#include <ecs/core/utils/fixed_function.inl> | |||
#include <ecs/core/utils/thread_pool/task_queue.inl> | |||
using namespace ::ecs::core::utils; | |||
concurrent_task_queue | |||
::concurrent_task_queue() | |||
: _count (0) | |||
, _inner_queue () | |||
{ } | |||
void concurrent_task_queue | |||
::push(task&& t) | |||
{ | |||
if (!_inner_queue.enqueue(std::move(t))) | |||
{ | |||
throw std::overflow_error("task queue is out of memory"); | |||
} | |||
_count++; | |||
} | |||
bool concurrent_task_queue | |||
::pop(task& t) | |||
{ | |||
ssize_t old_count = _count.load(std::memory_order_relaxed); | |||
while (old_count > 0) | |||
{ | |||
if (_count.compare_exchange_weak(old_count, old_count - 1, std::memory_order_acquire, std::memory_order_relaxed)) | |||
{ | |||
while (!_inner_queue.try_dequeue(t)) | |||
{ | |||
continue; | |||
} | |||
return true; | |||
} | |||
} | |||
return false; | |||
} |
@@ -1,3 +1,5 @@ | |||
#include <iostream> | |||
#include <ecs/core/utils/fixed_function.inl> | |||
#include <ecs/core/utils/thread_pool/worker.inl> | |||
@@ -10,9 +12,20 @@ void thread_pool_worker | |||
while (_state == state::running) | |||
{ | |||
task t; | |||
if (_queue.wait_dequeue_timed(t, std::chrono::milliseconds(500))) | |||
if (pop(t, std::chrono::milliseconds(500))) | |||
{ | |||
t(); | |||
try | |||
{ | |||
t(_thread_id); | |||
} | |||
catch(const std::exception& ex) | |||
{ | |||
std::cerr << "error in worker thread: " << ex.what() << std::endl; | |||
} | |||
catch(...) | |||
{ | |||
std::cerr << "error in worker thread: unknown" << std::endl; | |||
} | |||
} | |||
} | |||
_state = state::finished; |
@@ -40,13 +40,9 @@ namespace test | |||
double y; | |||
}; | |||
struct test | |||
{ }; | |||
MAKE_COMPONENT_TAG(position) | |||
MAKE_COMPONENT_TAG(velocity) | |||
MAKE_COMPONENT_TAG(acceleration) | |||
MAKE_COMPONENT_TAG(test) | |||
} | |||
namespace system | |||
@@ -60,18 +56,16 @@ namespace test | |||
struct render | |||
{ }; | |||
struct foo | |||
{ }; | |||
MAKE_SYSTEM_TAG(accelerate) | |||
MAKE_SYSTEM_TAG(move) | |||
MAKE_SYSTEM_TAG(render) | |||
MAKE_SYSTEM_TAG(foo) | |||
} | |||
} | |||
using namespace test; | |||
namespace hana = ::boost::hana; | |||
namespace c = component; | |||
namespace ct = c::tag; | |||
namespace cs = ecs::signature::component; | |||
@@ -96,71 +90,139 @@ constexpr decltype(auto) cs_acceleration = | |||
cs::make(ct::acceleration) | |||
.storage(cs::storage::dynamic<storage_size>); | |||
constexpr decltype(auto) cs_test = | |||
cs::make(ct::test) | |||
.storage(cs::storage::dynamic<storage_size>); | |||
constexpr decltype(auto) cs_list = csl::make( | |||
cs_position, | |||
cs_velocity, | |||
cs_acceleration, | |||
cs_test); | |||
constexpr decltype(auto) ss_foo = | |||
ss::make(st::foo) | |||
.write(ct::test); | |||
cs_acceleration); | |||
constexpr decltype(auto) ss_accelerate = | |||
ss::make(st::accelerate) | |||
// .parallelism(ss::parallelism::split_evenly(hana::size_c<3>)) | |||
.read(ct::acceleration) | |||
.write(ct::velocity); | |||
constexpr decltype(auto) ss_move = | |||
ss::make(st::move) | |||
.parallelism(ss::parallelism::split_every(hana::size_c<3>)) | |||
.read(ct::velocity) | |||
.write(ct::position); | |||
constexpr decltype(auto) ss_render = | |||
ss::make(st::render) | |||
.read(ct::velocity, ct::position, ct::acceleration, ct::test); | |||
.parallelism(ss::parallelism::main()) | |||
.read(ct::velocity, ct::position, ct::acceleration); | |||
constexpr decltype(auto) ss_list = ssl::make( | |||
ss_foo, | |||
ss_accelerate, | |||
ss_move, | |||
ss_render); | |||
constexpr decltype(auto) settings = ::ecs::settings::make() | |||
.component_signatures(cs_list) | |||
.system_signatures (ss_list); | |||
.system_signatures (ss_list) | |||
.refresh_parallelism (ecs::settings::refresh_parallelism::disable); | |||
namespace sea = ::ecs::system_execution_adapter; | |||
double frand(double min, double max) | |||
{ | |||
double f = (double)rand() / RAND_MAX; | |||
return min + f * (max - min); | |||
} | |||
inline void log_system_execution(const std::string& system) | |||
{ | |||
std::ostringstream os; | |||
os << system | |||
<< " (thread=" << std::this_thread::get_id() | |||
<< ")" | |||
<< std::endl; | |||
std::cout << os.str(); | |||
} | |||
template<typename T_data_proxy> | |||
inline void log_system_execution(const std::string& system, const T_data_proxy& data) | |||
{ | |||
std::ostringstream os; | |||
os << system | |||
<< " (thread=" << std::this_thread::get_id() | |||
<< ", index=" << data._index | |||
<< ", begin=" << data._begin | |||
<< ", end=" << data._end | |||
<< ", count=" << data.entity_count() | |||
<< ")" | |||
<< std::endl; | |||
std::cout << os.str(); | |||
} | |||
TEST(DummyTest, fuu) | |||
{ | |||
srand(static_cast<unsigned int>(time(nullptr))); | |||
auto context = ::ecs::context::make(settings); | |||
context.step([](auto& step_proxy){ | |||
step_proxy.execute_systems()( | |||
sea::tags(st::foo) | |||
.for_subtasks([](auto& s, auto& data){ | |||
std::cout << "foo" << std::endl | |||
<< " " << type_helper<decltype(data)>::name() << std::endl; | |||
}), | |||
sea::tags(st::accelerate) | |||
.for_subtasks([](auto& s, auto& data){ | |||
std::cout << "accelerate" << std::endl | |||
<< " " << type_helper<decltype(data)>::name() << std::endl; | |||
}), | |||
sea::tags(st::move) | |||
.for_subtasks([](auto& s, auto& data){ | |||
std::cout << "move" << std::endl | |||
<< " " << type_helper<decltype(data)>::name() << std::endl; | |||
}), | |||
sea::tags(st::render) | |||
.for_subtasks([](auto& s, auto& data){ | |||
std::cout << "render" << std::endl | |||
<< " " << type_helper<decltype(data)>::name() << std::endl; | |||
}) | |||
); | |||
context.step([](auto& proxy){ | |||
for (auto i = 0; i < 10; ++i) { | |||
auto handle = proxy.create_entity(); | |||
auto& position = proxy.add_component(handle, ct::position); | |||
position.x = frand(-100, 100); | |||
position.y = frand(-100, 100); | |||
auto& velocity = proxy.add_component(handle, ct::velocity); | |||
velocity.x = frand(-1, 1); | |||
velocity.y = frand(-1, 1); | |||
auto& acceleration = proxy.add_component(handle, ct::acceleration); | |||
acceleration.x = frand(-1, 1); | |||
acceleration.y = frand(-1, 1); | |||
} | |||
}); | |||
std::cout << "main " << std::this_thread::get_id() << std::endl; | |||
for (int i = 0; i < 10; ++i) { | |||
context.step([](auto& proxy){ | |||
proxy.execute_systems()( | |||
sea::tags(st::accelerate) | |||
.for_subtasks([](auto& s, auto& data){ | |||
log_system_execution("accelerate"); | |||
data.for_entities([&data](auto& handle) { | |||
auto& velocity = data.get_component(ct::velocity, handle); | |||
auto& acceleration = data.get_component(ct::acceleration, handle); | |||
velocity.x += acceleration.x; | |||
velocity.y += acceleration.y; | |||
}); | |||
}), | |||
sea::tags(st::move) | |||
.for_subtasks([](auto& s, auto& data){ | |||
log_system_execution("move", data); | |||
data.for_entities([&data](auto& handle) { | |||
auto& position = data.get_component(ct::position, handle); | |||
auto& velocity = data.get_component(ct::velocity, handle); | |||
position.x += velocity.x; | |||
position.y += velocity.y; | |||
}); | |||
}), | |||
sea::tags(st::render) | |||
.for_subtasks([](auto& s, auto& data){ | |||
log_system_execution("render"); | |||
data.for_entities([&data](auto& handle) { | |||
auto& position = data.get_component(ct::position, handle); | |||
auto& velocity = data.get_component(ct::velocity, handle); | |||
auto& acceleration = data.get_component(ct::acceleration, handle); | |||
std::cout << std::fixed | |||
<< " " << handle.index() << ": " | |||
<< "pos(" << position.x << ";" << position.y << ") " | |||
<< "vel(" << velocity.x << ";" << velocity.y << ") " | |||
<< "acc(" << acceleration.x << ";" << acceleration.y << ")" | |||
<< std::endl; | |||
}); | |||
}) | |||
); | |||
}); | |||
} | |||
} |