|
- #pragma once
-
- // #define ECS_DEBUG_ATOMIC_COUNTER
-
- #include <ecs/core/system/scheduler/atomic_counter.h>
-
- #include <ecs/core/utils/fixed_function.inl>
-
- namespace ecs {
- namespace core {
- namespace system {
- namespace scheduler {
-
- namespace detail
- {
-
- /* misc */
-
- struct make_is_bound_to_main
- {
- constexpr decltype(auto) operator()() const
- {
- return hana::is_valid(
- [](auto t) -> hana::type<typename decltype(t)::type::bind_to_main> { });
- }
- };
-
- struct make_is_bound_to_thread
- {
- constexpr decltype(auto) operator()() const
- {
- return hana::is_valid(
- [](auto t) -> hana::type<typename decltype(t)::type::bind_to_thread> { });
- }
- };
-
- /* task_t */
-
- template<
- typename T_dependency_item,
- typename T_dependent_ids>
- inline task_t<T_dependency_item, T_dependent_ids>
- ::task_t()
- : _dependency_count(hana::value(hana::size(mp::unwrap(dependency_item_type { }).dependency_ids)))
- { }
-
- template<
- typename T_dependency_item,
- typename T_dependent_ids>
- inline size_t task_t<T_dependency_item, T_dependent_ids>
- ::dependency_count() const
- {
- return _dependency_count;
- }
-
- template<
- typename T_dependency_item,
- typename T_dependent_ids>
- inline bool task_t<T_dependency_item, T_dependent_ids>
- ::decrement_and_check()
- {
- return (--_dependency_count == 0);
- }
-
- template<
- typename T_dependency_item,
- typename T_dependent_ids>
- template<typename T_func>
- constexpr decltype(auto) task_t<T_dependency_item, T_dependent_ids>
- ::for_dependent_ids(T_func&& func) const noexcept
- {
- hana::for_each(dependent_ids_type { }, std::forward<T_func>(func));
- }
-
- /* dependent_ids_t */
-
- template<typename T_dependency_items, typename T_dependency_item>
- constexpr decltype(auto) dependent_ids_t
- ::operator()(T_dependency_items, T_dependency_item) const noexcept
- {
- auto item_id = mp::list::index_of(T_dependency_items { }, T_dependency_item { });
- return hana::fold_right(T_dependency_items { }, mp::list::empty, [=](auto other, auto acc){
- return hana::if_(
- hana::contains(mp::unwrap(other).dependency_ids, item_id),
- hana::append(acc, mp::list::index_of(T_dependency_items { }, other)),
- acc);
- });
- }
-
- /* make_tasks_t */
-
- template<typename T_dependency_items>
- constexpr decltype(auto) make_tasks_t
- ::operator()(T_dependency_items) const noexcept
- {
- return hana::transform(T_dependency_items { }, [](auto item){
- using item_type = mp::decay_t<decltype(item)>;
- using dependent_ids_type = mp::decay_t<decltype((dependent_ids_t { })(T_dependency_items { }, item_type { }))>;
- using task_type = task_t<item_type, dependent_ids_type>;
- return hana::type_c<task_type>;
- });
- }
-
- /* task_counter_blocker_t */
-
- template<typename T_task>
- inline void task_counter_blocker_t
- ::post(T_task&& task)
- {
- std::lock_guard lock(_mutex);
- _tasks.emplace(std::forward<T_task>(task));
- _cond_var.notify_all();
- }
-
- inline void task_counter_blocker_t
- ::process()
- {
- while(true)
- {
- utils::task t;
- {
- std::lock_guard lock(_mutex);
- if (_tasks.empty())
- {
- return;
- }
- t = std::move(_tasks.front());
- _tasks.pop();
-
- try
- {
- t(static_cast<size_t>(-1));
- }
- catch(const std::exception& ex)
- {
- std::cerr << "error in main thread: " << ex.what() << std::endl;
- }
- catch(...)
- {
- std::cerr << "error in main thread: unknown" << std::endl;
- }
- }
- }
- }
-
- /* task_group_t */
-
- template<typename T_context, typename T_dependency_items>
- inline task_group_t<T_context, T_dependency_items>
- ::task_group_t(context_type& p_context, task_counter_blocker_t& p_counter)
- : _context(p_context)
- , _counter(p_counter)
- { }
-
- template<typename T_context, typename T_dependency_items>
- template<typename T_task_id, typename T_func>
- inline void task_group_t<T_context, T_dependency_items>
- ::start_task(T_task_id id, T_func&& func)
- {
- using task_type = mp::decay_t<decltype(hana::at(_tasks, id))>;
- using dependency_item_type = typename task_type::dependency_item_type;
- _counter.increment();
- hana::eval_if(
- mp::unwrap(dependency_item_type { }).execute,
- [this, &func](auto _) { _(this)->post_task_in_thread_pool (T_task_id { }, func); },
- [this, &func](auto _) { _(this)->check_and_start_dependencies(T_task_id { }, func); });
- }
-
- template<typename T_context, typename T_dependency_items>
- template<typename T_task_id, typename T_func>
- inline void task_group_t<T_context, T_dependency_items>
- ::post_task_in_thread_pool(T_task_id, T_func&& func)
- {
- using task_type = mp::decay_t<decltype(hana::at(_tasks, T_task_id { }))>;
- using instance_type = mp::decay_t<decltype(_context.instance_by_tag(std::declval<task_type>().dependency_item.tag))>;
- using signature_type = typename instance_type::system_signature_type;
- using executor_builder_type = mp::decay_t<decltype(mp::unwrap(signature_type { }).parallelism())>;
-
- using is_bound_to_main_type = mp::decay_t<decltype(make_is_bound_to_main { } ())>;
- using is_bound_to_thread_type = mp::decay_t<decltype(make_is_bound_to_thread { } ())>;
-
- hana::eval_if(
- is_bound_to_main_type{ }(hana::type_c<executor_builder_type>),
- [this, &func](auto _){
- _counter.post([this, &func](size_t thread_id){
- execute_task(T_task_id { }, func);
- });
- },
- [this, &func](auto _){
- hana::eval_if(is_bound_to_thread_type{ }(hana::type_c<executor_builder_type>),
- [this, &func](auto _){
- auto& meta = _context.instance_by_tag(task_type { }.dependency_item.tag).scheduler_meta_data();
- _context.post_in_thread_pool([this, &func, &meta](size_t thread_id){
- meta.thread_id = static_cast<ssize_t>(thread_id);
- execute_task(T_task_id { }, func);
- }, meta.thread_id);
- },
- [this, &func](auto _){
- _context.post_in_thread_pool([this, &func](size_t thread_id){
- execute_task(T_task_id { }, func);
- });
- });
- });
- }
-
- template<typename T_context, typename T_dependency_items>
- template<typename T_task_id, typename T_func>
- inline void task_group_t<T_context, T_dependency_items>
- ::execute_task(T_task_id, T_func&& func)
- {
- auto& task = hana::at(_tasks, T_task_id { });
- auto& instance = _context.instance_by_tag(task.dependency_item.tag);
- instance.execute(_context, func);
- check_and_start_dependencies(T_task_id { }, func);
- }
-
- template<typename T_context, typename T_dependency_items>
- template<typename T_task_id, typename T_func>
- inline void task_group_t<T_context, T_dependency_items>
- ::check_and_start_dependencies(T_task_id, T_func&& func)
- {
- auto& task = hana::at(_tasks, T_task_id { });
- task.for_dependent_ids([this, &func](auto id){
- auto& other = hana::at(_tasks, id);
- if (other.decrement_and_check()) {
- start_task(id, func);
- }
- });
- _counter.decrement();
- }
-
- /* misc */
-
- template<typename T_system_signature_list, typename T_system_tag_list>
- constexpr decltype(auto) build_dependency_list(T_system_signature_list, T_system_tag_list) noexcept
- {
- using system_signature_list_type = T_system_signature_list;
- using system_signature_tuple_type = mp::decay_t<decltype((system_signature_list_type { }).items)>;
- using system_tag_list_type = T_system_tag_list;
-
- /* filter system signature list by the passed system tags */
- auto filtered = (system_signature_list_type { })
- .filter([](auto ssig){
- return hana::contains(system_tag_list_type { }, mp::unwrap(ssig).tag());
- });
- using filtered_ssig_list_type = mp::decay_t<decltype(filtered)>;
-
- /* check if any systems that does not depend on each other, wants to get write access to a component */
- (filtered_ssig_list_type { }).for_each([](auto ssig){
- using ssig_type = mp::decay_t<decltype(ssig)>;
- hana::for_each(mp::unwrap(ssig).write(), [](auto tag){
- using tag_type = mp::decay_t<decltype(tag)>;
- (filtered_ssig_list_type { }).for_each([](auto other){
- using check_dependencies_type = decltype(
- hana::or_(
- hana::equal(ssig_type { }, other),
- (T_system_signature_list { }).depends_on(ssig_type { }, mp::unwrap(other).tag()),
- (T_system_signature_list { }).depends_on(other, mp::unwrap(ssig_type { }).tag()),
- hana::not_(hana::contains(mp::unwrap(other).write(), tag_type { }))));
- static_assert(
- check_dependencies_type { },
- "independent systems can not share write access to component");
- });
- });
- });
-
- /* build the actual dependency item list */
- return hana::transform(system_signature_tuple_type { }, [](auto ssig){
- using ssig_type = mp::decay_t<decltype(ssig)>;
-
- auto tag = mp::unwrap(ssig).tag();
- auto deps = mp::unwrap(ssig).dependencies();
- auto execute = hana::contains(system_tag_list_type { }, mp::unwrap(ssig).tag());
-
- auto read_deps =
- hana::transform(mp::unwrap(ssig).read(), [](auto read_tag){
- using read_tag_type = mp::decay_t<decltype(read_tag)>;
- return hana::transform(
- hana::filter((filtered_ssig_list_type { }).items, [](auto other){
- return hana::and_(
- hana::not_equal(ssig_type { }, other),
- hana::contains(mp::unwrap(other).write(), read_tag_type { }));
- }),
- [](auto other){
- return mp::unwrap(other).tag();
- });
- });
- auto final_deps =
- hana::transform(
- hana::fold(
- hana::if_(
- hana::size(read_deps) == hana::size_c<0>,
- hana::make_basic_tuple(hana::make_basic_tuple()),
- read_deps),
- deps,
- hana::concat),
- [](auto tmp_tag){
- return (system_signature_list_type { }).id_by_tag(tmp_tag);
- });
-
- using tag_type = mp::decay_t<decltype(tag)>;
- using dep_type = mp::decay_t<decltype(final_deps)>;
- using execute_type = mp::decay_t<decltype(execute)>;
- using item_type = dependency_item_t<tag_type, execute_type, dep_type>;
-
- return hana::type_c<item_type>;
- });
- }
-
- template<typename T_dependency_list>
- constexpr decltype(auto) get_independent_item_ids(T_dependency_list) noexcept
- {
- return
- hana::transform(
- hana::filter(T_dependency_list { }, [](auto item){
- return hana::size(mp::unwrap(item).dependency_ids) == hana::size_c<0>;
- }),
- [](auto item){
- return mp::list::index_of(T_dependency_list { }, item);
- });
- }
- }
-
- /* atomic_counter */
-
- template<typename T_settings>
- inline atomic_counter<T_settings>
- ::atomic_counter(context_type& p_context)
- : _context(p_context)
- { }
-
- template<typename T_settings>
- template<typename T_system_tag_list, typename T_func>
- inline void atomic_counter<T_settings>
- ::execute(T_system_tag_list stl, T_func&& func)
- {
- static_assert(tag::system::is_list(T_system_tag_list { }));
- using dependency_list_type = mp::decay_t<decltype(detail::build_dependency_list(
- (settings_type { }).system_signatures(),
- T_system_tag_list { }))>;
- using independent_item_ids_type = mp::decay_t<decltype(detail::get_independent_item_ids(
- dependency_list_type { }))>;;
-
- #ifdef ECS_DEBUG_ATOMIC_COUNTER
- size_t i = 0;
- std::cout << "dependency_list" << std::endl;
- hana::for_each(dependency_list_type { }, [&i](auto item){
- using system_type = mp::unwrap_t<mp::decay_t<decltype(mp::unwrap(item).tag)>>;
- std::cout << " " << (i++) << ": " << type_helper<system_type>::name() << " (" << hana::value(mp::unwrap(item).execute) << ")" << std::endl;
- hana::for_each(mp::unwrap(item).dependency_ids, [](auto id){
- std::cout << " " << hana::value(id) << std::endl;
- });
- });
- #endif
-
- using task_group_type = detail::task_group_t<context_type, dependency_list_type>;
-
- detail::task_counter_blocker_t counter (1);
- task_group_type task_group (_context, counter);
-
- counter.execute_and_wait_tick(
- [&task_group, &counter, &func]{
- hana::for_each(independent_item_ids_type { }, [&task_group, &func](auto id){
- task_group.start_task(id, func);
- });
- counter.decrement();
- },
- [&counter]{
- counter.process();
- });
- }
-
- } } } }
|