Bladeren bron

* bind system execution to specific threads (also including main thread)

wip
bergmann 5 jaren geleden
bovenliggende
commit
c13d678d85
21 gewijzigde bestanden met toevoegingen van 450 en 67 verwijderingen
  1. +2
    -1
      include/ecs/context/base.h
  2. +3
    -3
      include/ecs/context/base.inl
  3. +17
    -1
      include/ecs/core/system/instance.h
  4. +26
    -0
      include/ecs/core/system/parallelism/strategy/none.h
  5. +42
    -4
      include/ecs/core/system/scheduler/atomic_counter.h
  6. +107
    -11
      include/ecs/core/system/scheduler/atomic_counter.inl
  7. +18
    -5
      include/ecs/core/utils/counter_blocker.h
  8. +13
    -1
      include/ecs/core/utils/counter_blocker.inl
  9. +7
    -5
      include/ecs/core/utils/thread_pool/pool.h
  10. +18
    -2
      include/ecs/core/utils/thread_pool/pool.inl
  11. +9
    -3
      include/ecs/core/utils/thread_pool/types.h
  12. +36
    -4
      include/ecs/core/utils/thread_pool/worker.h
  13. +38
    -3
      include/ecs/core/utils/thread_pool/worker.inl
  14. +2
    -0
      include/ecs/signature/system/parallelism/strategie.h
  15. +36
    -0
      include/ecs/signature/system/parallelism/strategy/bound.h
  16. +36
    -0
      include/ecs/signature/system/parallelism/strategy/main.h
  17. +8
    -2
      include/ecs/signature/system/parallelism/strategy/none.h
  18. +1
    -1
      include/ecs/signature/system/signature.h
  19. +10
    -18
      src/core/utils/thread_pool/pool.cpp
  20. +15
    -2
      src/core/utils/thread_pool/worker.cpp
  21. +6
    -1
      test/dummy.cpp

+ 2
- 1
include/ecs/context/base.h Bestand weergeven

@@ -77,9 +77,10 @@ namespace context {
* @tparam T_func type of the function to execute
*
* @param func function to execute
* @param thread_id ID of the thread to add task to
*/
template<typename T_func>
inline void _post_in_thread_pool(T_func&& func);
inline void _post_in_thread_pool(T_func&& func, ssize_t thread_id = -1);

protected: /* entity */



+ 3
- 3
include/ecs/context/base.inl Bestand weergeven

@@ -26,9 +26,9 @@ namespace detail {
template<typename T_settings>
template<typename T_func>
inline void base_t<T_settings>
::_post_in_thread_pool(T_func&& func)
::_post_in_thread_pool(T_func&& func, ssize_t thread_id)
{
_thread_pool.post(std::forward<T_func>(func));
_thread_pool.post(std::forward<T_func>(func), thread_id);
}

/* entity */
@@ -169,7 +169,7 @@ namespace detail {
::_for_systems_parallel(T_func&& func)
{
core::utils::counter_blocker counter(_systems.size());
counter.execute_and_wait_until_zero([this, &counter, func = std::forward<T_func>(func)]{
counter.execute_and_wait([this, &counter, func = std::forward<T_func>(func)]{
_systems.for_each([this, &counter, &func](auto& instance){
this->_post_in_thread_pool([&counter, &func, &instance]{
ecs_make_scope_guard([&counter](){


+ 17
- 1
include/ecs/core/system/instance.h Bestand weergeven

@@ -41,8 +41,20 @@ namespace system {
}
};

template<typename T_settings>
constexpr decltype(auto) get_scheduler_instance_meta_data_type(T_settings&&) noexcept
{
using settings_type = T_settings;
using context_type = ::ecs::context::type<settings_type>;
using scheduler_type = decltype((settings_type { }).scheduler()(settings_type { }, std::declval<context_type&>()));
using instance_meta_data_type = typename scheduler_type::instance_meta_data_type;

return hana::type_c<instance_meta_data_type>;
}

template<typename T_settings, typename T_system_signature, typename T_entity_handle>
struct instance
: private mp::unwrap_t<mp::decay_t<decltype(get_scheduler_instance_meta_data_type(T_settings { }))>>
{
static_assert(decltype(signature::system::is_valid(mp::unwrap_t<T_system_signature> { })) { });

@@ -53,6 +65,7 @@ namespace system {
using system_tag_type = mp::decay_t<decltype(core::mp::unwrap(system_signature_type { }).tag())>;
using system_type = mp::unwrap_t<system_tag_type>;
using output_type = mp::unwrap_t<mp::decay_t<decltype(mp::unwrap(system_signature_type { }).output())>>;
using scheduler_meta_data_type = mp::unwrap_t<mp::decay_t<decltype(get_scheduler_instance_meta_data_type(T_settings { }))>>;

private:
using signature_list_type = decltype((settings_type { }).system_signatures());
@@ -142,7 +155,7 @@ namespace system {
};
};

b.execute_and_wait_until_zero([&func, &run_in_seperate_thread]{
b.execute_and_wait([&func, &run_in_seperate_thread]{
func(run_in_seperate_thread);
});
}
@@ -152,6 +165,9 @@ namespace system {
{ _executor(context, *this, std::forward<T_func>(func)); }

public: /* misc */
constexpr decltype(auto) scheduler_meta_data() noexcept
{ return static_cast<scheduler_meta_data_type&>(*this); }

constexpr decltype(auto) signature() const noexcept
{ return mp::unwrap(system_signature_type { }); }



+ 26
- 0
include/ecs/core/system/parallelism/strategy/none.h Bestand weergeven

@@ -10,15 +10,18 @@ namespace parallelism {

struct none
{
public:
template<typename T_context, typename T_instance>
struct executor_proxy
{
public:
using context_type = T_context;
using instance_type = T_instance;

context_type& context;
instance_type& instance;

public:
template<typename T_func>
inline void for_subtasks(T_func&& func)
{
@@ -28,9 +31,32 @@ namespace parallelism {
}
};

private:
#ifndef NDEBUG
bool _bound { false };
mutable std::thread::id _thread_id { 0 };
#endif

public:
#ifndef NDEBUG
none(bool bound)
: _bound(bound)
{ }
#else
none()
{ }
#endif

template<typename T_context, typename T_instance, typename T_func>
inline void operator()(T_context& context, T_instance& instance, T_func&& func) const
{
#ifndef NDEBUG
assert( !_bound
&& ( _thread_id == std::thread::id { 0 }
|| _thread_id == std::this_thread::get_id()));
_thread_id = std::this_thread::get_id();
#endif

executor_proxy<T_context, T_instance> ep { context, instance };
func(instance, ep);
}


+ 42
- 4
include/ecs/core/system/scheduler/atomic_counter.h Bestand weergeven

@@ -3,6 +3,7 @@
#include <ecs/config.h>
#include <ecs/tag/system.h>
#include <ecs/context/context.fwd.h>
#include <ecs/core/utils/thread_pool/types.h>

namespace ecs {
namespace core {
@@ -11,6 +12,14 @@ namespace scheduler {

namespace detail
{
/**
* meta data to store for each system instance
*/
struct instance_meta_data_t
{
ssize_t thread_id = -1;
};

/**
* struct to wrap some system meta data
*/
@@ -113,6 +122,34 @@ namespace scheduler {
constexpr decltype(auto) operator()(T_dependency_items) const noexcept;
};

/**
* extention of the normal counter_blocker to handle tasks to be executed in the context of the main thread
*/
struct task_counter_blocker_t
: public utils::counter_blocker
{
private:
utils::task_queue _tasks; //!< tasks to execute in the main thread

public:
using utils::counter_blocker::counter_blocker;

/**
* enqueue a new task inside the task list
*
* @tparam T_task type of the task
*
* @param task task to execute
*/
template<typename T_task>
inline void post(T_task&& task);

/**
* execute all tasks stored in the queue
*/
inline void process();
};

/**
* defines a group of tasks to execute with
*
@@ -133,7 +170,7 @@ namespace scheduler {

private:
context_type& _context; //!< context to use for system execution
utils::counter_blocker& _counter; //!< counter to track running tasks
task_counter_blocker_t& _counter; //!< counter to track running tasks
tasks_tuple_type _tasks; //!< tuple of all tasks

public:
@@ -143,7 +180,7 @@ namespace scheduler {
* @param p_context context to use for system execution
* @param p_counter counter to track running tasks
*/
inline task_group_t(context_type& p_context, utils::counter_blocker& p_counter);
inline task_group_t(context_type& p_context, task_counter_blocker_t& p_counter);

/**
* start the given task
@@ -223,8 +260,9 @@ namespace scheduler {
struct atomic_counter
{
public:
using settings_type = T_settings;
using context_type = ::ecs::context::type<settings_type>;
using settings_type = T_settings;
using context_type = ::ecs::context::type<settings_type>;
using instance_meta_data_type = detail::instance_meta_data_t;

private:
context_type& _context;


+ 107
- 11
include/ecs/core/system/scheduler/atomic_counter.inl Bestand weergeven

@@ -4,6 +4,8 @@

#include <ecs/core/system/scheduler/atomic_counter.h>

#include <ecs/core/utils/fixed_function.inl>

namespace ecs {
namespace core {
namespace system {
@@ -12,6 +14,25 @@ namespace scheduler {
namespace detail
{

/* misc */

struct make_is_bound_to_main
{
constexpr decltype(auto) operator()() const
{
return hana::is_valid(
[](auto t) -> hana::type<typename decltype(t)::type::bind_to_main> { });
}
};

struct make_is_bound_to_thread
{
constexpr decltype(auto) operator()() const
{
return hana::is_valid(
[](auto t) -> hana::type<typename decltype(t)::type::bind_to_thread> { });
}
};

/* task_t */

@@ -80,11 +101,53 @@ namespace scheduler {
});
}

/* task_counter_blocker_t */

template<typename T_task>
inline void task_counter_blocker_t
::post(T_task&& task)
{
std::lock_guard lock(_mutex);
_tasks.emplace(std::forward<T_task>(task));
_cond_var.notify_all();
}

inline void task_counter_blocker_t
::process()
{
while(true)
{
utils::task t;
{
std::lock_guard lock(_mutex);
if (_tasks.empty())
{
return;
}
t = std::move(_tasks.front());
_tasks.pop();

try
{
t(static_cast<size_t>(-1));
}
catch(const std::exception& ex)
{
std::cerr << "error in main thread: " << ex.what() << std::endl;
}
catch(...)
{
std::cerr << "error in main thread: unknown" << std::endl;
}
}
}
}

/* task_group_t */

template<typename T_context, typename T_dependency_items>
inline task_group_t<T_context, T_dependency_items>
::task_group_t(context_type& p_context, utils::counter_blocker& p_counter)
::task_group_t(context_type& p_context, task_counter_blocker_t& p_counter)
: _context(p_context)
, _counter(p_counter)
{ }
@@ -108,9 +171,36 @@ namespace scheduler {
inline void task_group_t<T_context, T_dependency_items>
::post_task_in_thread_pool(T_task_id, T_func&& func)
{
_context.post_in_thread_pool([this, &func]{
execute_task(T_task_id { }, func);
});
using task_type = mp::decay_t<decltype(hana::at(_tasks, T_task_id { }))>;
using instance_type = mp::decay_t<decltype(_context.instance_by_tag(std::declval<task_type>().dependency_item.tag))>;
using signature_type = typename instance_type::system_signature_type;
using executor_builder_type = mp::decay_t<decltype(mp::unwrap(signature_type { }).parallelism())>;

using is_bound_to_main_type = mp::decay_t<decltype(make_is_bound_to_main { } ())>;
using is_bound_to_thread_type = mp::decay_t<decltype(make_is_bound_to_thread { } ())>;

hana::eval_if(
is_bound_to_main_type{ }(hana::type_c<executor_builder_type>),
[this, &func](auto _){
_counter.post([this, &func](size_t thread_id){
execute_task(T_task_id { }, func);
});
},
[this, &func](auto _){
hana::eval_if(is_bound_to_thread_type{ }(hana::type_c<executor_builder_type>),
[this, &func](auto _){
auto& meta = _context.instance_by_tag(task_type { }.dependency_item.tag).scheduler_meta_data();
_context.post_in_thread_pool([this, &func, &meta](size_t thread_id){
meta.thread_id = static_cast<ssize_t>(thread_id);
execute_task(T_task_id { }, func);
}, meta.thread_id);
},
[this, &func](auto _){
_context.post_in_thread_pool([this, &func](size_t thread_id){
execute_task(T_task_id { }, func);
});
});
});
}

template<typename T_context, typename T_dependency_items>
@@ -231,6 +321,8 @@ namespace scheduler {
}
}

/* atomic_counter */

template<typename T_settings>
inline atomic_counter<T_settings>
::atomic_counter(context_type& p_context)
@@ -263,15 +355,19 @@ namespace scheduler {

using task_group_type = detail::task_group_t<context_type, dependency_list_type>;

utils::counter_blocker counter (1);
task_group_type task_group (_context, counter);
detail::task_counter_blocker_t counter (1);
task_group_type task_group (_context, counter);

counter.execute_and_wait_until_zero([&task_group, &counter, &func]{
hana::for_each(independent_item_ids_type { }, [&task_group, &func](auto id){
task_group.start_task(id, func);
counter.execute_and_wait_tick(
[&task_group, &counter, &func]{
hana::for_each(independent_item_ids_type { }, [&task_group, &func](auto id){
task_group.start_task(id, func);
});
counter.decrement();
},
[&counter]{
counter.process();
});
counter.decrement();
});
}

} } } }

+ 18
- 5
include/ecs/core/utils/counter_blocker.h Bestand weergeven

@@ -14,9 +14,9 @@ namespace utils {
*/
struct counter_blocker
{
private:
std::mutex _mutex; //!< mutex to preotect the counter
std::condition_variable _cond_var; //!< conditional variable to synchronize the counter
protected:
std::recursive_mutex _mutex; //!< mutex to preotect the counter
std::condition_variable_any _cond_var; //!< conditional variable to synchronize the counter
size_t _counter; //!< the actual counter value

public:
@@ -42,10 +42,23 @@ namespace utils {
*
* @tparam T_func type of the function to execute
*
* @param func function to execute
* @param func function to execute before waiting for the counter
*/
template<typename T_func>
inline void execute_and_wait_until_zero(T_func&& func) noexcept;
inline void execute_and_wait(T_func&& func);

/**
* Execute the given function and wait until the counter is equal to zero.
* Execute the given tick function at least once and repeatedly if the counter value has changed.
*
* @tparam T_func type of the function to execute
* @tparam T_tick type of tick function to execute
*
* @param func function to execute before waiting for the counter
* @param tick tick function to execute
*/
template<typename T_func, typename T_tick>
inline void execute_and_wait_tick(T_func&& func, T_tick&& tick);
};

} } }

+ 13
- 1
include/ecs/core/utils/counter_blocker.inl Bestand weergeven

@@ -30,11 +30,23 @@ namespace utils {

template<typename T_func>
inline void counter_blocker
::execute_and_wait_until_zero(T_func&& func) noexcept
::execute_and_wait(T_func&& func)
{
func();
std::unique_lock lock(_mutex);
_cond_var.wait(lock, [this]{ return (_counter == 0); });
}

template<typename T_func, typename T_tick>
inline void counter_blocker
::execute_and_wait_tick(T_func&& func, T_tick&& tick)
{
func();
std::unique_lock lock(_mutex);
_cond_var.wait(lock, [this, t = std::forward<T_tick>(tick)]{
t();
return (_counter == 0);
});
}

} } }

+ 7
- 5
include/ecs/core/utils/thread_pool/pool.h Bestand weergeven

@@ -17,13 +17,14 @@ namespace utils {
struct thread_pool
{
private:
using worker_vector = std::vector<thread_pool_worker>;
using worker_ptr_u = std::unique_ptr<thread_pool_worker>;
using worker_vector = std::vector<worker_ptr_u>;
using atomic_size_t = std::atomic<size_t>;

private:
task_queue _queue; //!< queue to store tasks to execute
worker_vector _workers; //!< vector of worker threads
atomic_size_t _outstanding_inits; //!< number of outstanding worker initializations
concurrent_task_queue _tasks; //!< queue to store tasks to execute
worker_vector _workers; //!< vector of worker threads
atomic_size_t _outstanding_inits; //!< number of outstanding worker initializations

/**
* check if all workers are finished or not
@@ -59,9 +60,10 @@ namespace utils {
* @tparam T_task type of the task
*
* @param task task to execute
* @param worker_id id of the worker to enqueue task at (-1 = any; n = worker tasks)
*/
template<typename T_task>
inline void post(T_task&& task);
inline void post(T_task&& task, ssize_t worker_id = -1);
};

} } }

+ 18
- 2
include/ecs/core/utils/thread_pool/pool.inl Bestand weergeven

@@ -8,9 +8,25 @@ namespace utils {

template<typename T_task>
inline void thread_pool
::post(T_task&& task)
::post(T_task&& task, ssize_t worker_id)
{
_queue.enqueue(std::forward<T_task>(task));
if (worker_id == -1)
{
_tasks.enqueue(std::forward<T_task>(task));
for (auto& w : _workers)
{
w->signal();
}
}
else if (worker_id >= 1 && worker_id <= static_cast<ssize_t>(_workers.size()))
{
auto& w = _workers.at(static_cast<size_t>(worker_id - 1));
w->post(std::forward<T_task>(task));
}
else
{
throw std::invalid_argument(std::string("invalid worker_id: ") + std::to_string(worker_id));
}
}

} } }

+ 9
- 3
include/ecs/core/utils/thread_pool/types.h Bestand weergeven

@@ -1,5 +1,6 @@
#pragma once

#include <queue>
#include <ecs/config.h>
#include <moodycamel/blockingconcurrentqueue.h>

@@ -12,11 +13,16 @@ namespace utils {
/**
* Functor to execute as task inside the thread pool. The functor can have a max size of 128 bytes!
*/
using task = fixed_function<void(), 128>;
using task = fixed_function<void(size_t thread_id), 128>;

/**
* non blocking concurrent queue to store thread pool tasks in
* none blocking concurrent queue to store thread pool tasks in
*/
using task_queue = moodycamel::BlockingConcurrentQueue<task>;
using concurrent_task_queue = moodycamel::BlockingConcurrentQueue<task>;

/**
* normal task queue
*/
using task_queue = std::queue<task>;

} } }

+ 36
- 4
include/ecs/core/utils/thread_pool/worker.h Bestand weergeven

@@ -1,6 +1,8 @@
#pragma once

#include <mutex>
#include <thread>
#include <condition_variable>
#include <ecs/config.h>

#include "./types.h"
@@ -29,9 +31,13 @@ namespace utils {

using atomic_state = movable_atomic<state>;

std::thread _thread; //!< actual thread object to use for execution
task_queue& _queue; //!< queue to poll new tasks from
atomic_state _state; //!< current state of the worker
std::thread _thread; //!< actual thread object to use for execution
std::mutex _mutex; //!< mutex to protext the conditional
std::condition_variable _conditional; //!< condition variable to inform about new tasks
concurrent_task_queue& _all_tasks; //!< queue to poll new tasks from
task_queue _own_tasks; //!< queue of tasks that are assigned to this specific worker
atomic_state _state; //!< current state of the worker
size_t _thread_id; //!< index of the thread inside the thread pool

private:
/**
@@ -39,13 +45,24 @@ namespace utils {
*/
void run();

/**
* pop the next task from either the own task list or the general task list
*
* @param t parameter to store dequeued task in
* @param timeout timeout to wait for new tasks
*
* @return TRUE if a task was dequeued, FALSE otherwise
*/
template<typename T_rep, typename T_period>
inline bool pop(task& t, const std::chrono::duration<T_rep, T_period>& timeout);

public:
/**
* constructor
*
* @param queue task queue to poll tasks from
*/
inline thread_pool_worker(task_queue& queue) noexcept;
inline thread_pool_worker(concurrent_task_queue& all_tasks, size_t thread_id) noexcept;

inline thread_pool_worker(thread_pool_worker&&) = default;
inline thread_pool_worker(const thread_pool_worker&) = delete;
@@ -81,6 +98,21 @@ namespace utils {
*/
inline bool finished() const noexcept;

/**
* enqueue a new task to execute inside this worker thread
*
* @tparam T_task type of the task
*
* @param task task to execute
*/
template<typename T_task>
inline void post(T_task&& task);

/**
* inform the task about new tasks in the concurent task list
*/
inline void signal();

};

} } }

+ 38
- 3
include/ecs/core/utils/thread_pool/worker.inl Bestand weergeven

@@ -7,10 +7,31 @@ namespace core {
namespace utils {

inline thread_pool_worker
::thread_pool_worker(task_queue& queue) noexcept
: _queue(queue)
::thread_pool_worker(concurrent_task_queue& all_tasks, size_t thread_id) noexcept
: _all_tasks(all_tasks)
, _thread_id(thread_id)
{ }

template<typename T_rep, typename T_period>
inline bool thread_pool_worker
::pop(task& t, const std::chrono::duration<T_rep, T_period>& timeout)
{
std::unique_lock<std::mutex> lock(_mutex);
if (_conditional.wait_for(lock, std::chrono::milliseconds(1000)) != std::cv_status::timeout)
{
if (!_own_tasks.empty())
{
t = std::move(_own_tasks.front());
_own_tasks.pop();
return true;
}
if (_all_tasks.wait_dequeue_timed(t, 0))
{
return true;
}
}
return false;
}

template<typename T_counter>
inline void thread_pool_worker
@@ -33,7 +54,6 @@ namespace utils {
::join() noexcept
{
assert(_thread.joinable());
assert(_state == state::finished);
_thread.join();
}

@@ -43,4 +63,19 @@ namespace utils {
return _state == state::finished;
}

template<typename T_task>
inline void thread_pool_worker
::post(T_task&& task)
{
std::unique_lock<std::mutex> lock(_mutex);
_own_tasks.emplace(std::forward<T_task>(task));
signal();
}

inline void thread_pool_worker
::signal()
{
_conditional.notify_all();
}

} } }

+ 2
- 0
include/ecs/signature/system/parallelism/strategie.h Bestand weergeven

@@ -1,5 +1,7 @@
#pragma once

#include "./strategy/bound.h"
#include "./strategy/main.h"
#include "./strategy/none.h"
#include "./strategy/split_evenly_cores.h"
#include "./strategy/split_evenly.h"

+ 36
- 0
include/ecs/signature/system/parallelism/strategy/bound.h Bestand weergeven

@@ -0,0 +1,36 @@
#pragma once

#include <ecs/config.h>
#include <ecs/core/system/parallelism/strategy/none.h>

namespace ecs {
namespace signature {
namespace system {
namespace parallelism {

namespace detail
{
struct bound_builder_t
{
using bind_to_thread = void;

inline decltype(auto) operator()() const noexcept
{
#ifndef NDEBUG
return ::ecs::core::system::parallelism::none(false);
#else
return ::ecs::core::system::parallelism::none();
#endif
}
};

struct bound_t
{
constexpr decltype(auto) operator()() const noexcept
{ return bound_builder_t { }; }
};
}

constexpr decltype(auto) bound = detail::bound_t { };

} } } }

+ 36
- 0
include/ecs/signature/system/parallelism/strategy/main.h Bestand weergeven

@@ -0,0 +1,36 @@
#pragma once

#include <ecs/config.h>
#include <ecs/core/system/parallelism/strategy/none.h>

namespace ecs {
namespace signature {
namespace system {
namespace parallelism {

namespace detail
{
struct main_builder_t
{
using bind_to_main = void;

inline decltype(auto) operator()() const noexcept
{
#ifndef NDEBUG
return ::ecs::core::system::parallelism::none(false);
#else
return ::ecs::core::system::parallelism::none();
#endif
}
};

struct main_t
{
constexpr decltype(auto) operator()() const noexcept
{ return main_builder_t { }; }
};
}

constexpr decltype(auto) main = detail::main_t { };

} } } }

+ 8
- 2
include/ecs/signature/system/parallelism/strategy/none.h Bestand weergeven

@@ -12,8 +12,14 @@ namespace parallelism {
{
struct none_builder_t
{
constexpr decltype(auto) operator()() const noexcept
{ return ::ecs::core::system::parallelism::none { }; }
inline decltype(auto) operator()() const noexcept
{
#ifndef NDEBUG
return ::ecs::core::system::parallelism::none(false);
#else
return ::ecs::core::system::parallelism::none();
#endif
}
};

struct none_t


+ 1
- 1
include/ecs/signature/system/signature.h Bestand weergeven

@@ -121,7 +121,7 @@ namespace system {
public: /* setter */

/** set the parallelism options
* @param parallelism is a predicate the takes no parameters and returns a
* @param parallelism is a predicate that takes no parameters and returns a
* system executor interface */
template<typename T_parallelism>
constexpr decltype(auto) parallelism(T_parallelism parallelism) const noexcept;


+ 10
- 18
src/core/utils/thread_pool/pool.cpp Bestand weergeven

@@ -1,3 +1,5 @@
#include <iostream>

#include <ecs/core/utils/fixed_function.inl>
#include <ecs/core/utils/thread_pool/pool.inl>
#include <ecs/core/utils/thread_pool/worker.inl>
@@ -9,7 +11,7 @@ auto thread_pool
{
for (const auto& w : _workers)
{
if (!w.finished())
if (!w->finished())
{
return false;
}
@@ -20,16 +22,16 @@ auto thread_pool
void thread_pool
::initialize_workers(size_t count)
{
_workers.reserve(count);
// _workers.reserve(count);
for (size_t i = 0; i < count; ++i)
{
_workers.emplace_back(_queue);
_workers.emplace_back(std::make_unique<thread_pool_worker>(_tasks, i + 1));
}

_outstanding_inits = count;
for (auto& w : _workers)
{
w.start(_outstanding_inits);
w->start(_outstanding_inits);
}
}

@@ -48,21 +50,11 @@ thread_pool
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}

// stop all workers
for (auto& w : _workers)
{
w.stop();
}

// post dummy tasks untill all workers has stopped
while (!all_workers_finished())
{
post([]{ });
}

// join the worker threads
for (auto& w : _workers)
{
w.join();
w->stop();
w->signal();
w->join();
}
}
}

+ 15
- 2
src/core/utils/thread_pool/worker.cpp Bestand weergeven

@@ -1,3 +1,5 @@
#include <iostream>

#include <ecs/core/utils/fixed_function.inl>
#include <ecs/core/utils/thread_pool/worker.inl>

@@ -10,9 +12,20 @@ void thread_pool_worker
while (_state == state::running)
{
task t;
if (_queue.wait_dequeue_timed(t, std::chrono::milliseconds(500)))
if (pop(t, std::chrono::milliseconds(500)))
{
t();
try
{
t(_thread_id);
}
catch(const std::exception& ex)
{
std::cerr << "error in worker thread: " << ex.what() << std::endl;
}
catch(...)
{
std::cerr << "error in worker thread: unknown" << std::endl;
}
}
}
_state = state::finished;

+ 6
- 1
test/dummy.cpp Bestand weergeven

@@ -105,6 +105,7 @@ constexpr decltype(auto) ss_move =

constexpr decltype(auto) ss_render =
ss::make(st::render)
.parallelism(ss::parallelism::main())
.read(ct::velocity, ct::position, ct::acceleration);

constexpr decltype(auto) ss_list = ssl::make(
@@ -149,11 +150,14 @@ TEST(DummyTest, fuu)
}
});

std::cout << "main " << std::this_thread::get_id() << std::endl;

for (int i = 0; i < 10; ++i) {
context.step([](auto& proxy){
proxy.execute_systems()(
sea::tags(st::accelerate)
.for_subtasks([](auto& s, auto& data){
std::cout << "accelerate " << std::this_thread::get_id() << std::endl;
data.for_entities([&data](auto& handle) {
auto& velocity = data.get_component(ct::velocity, handle);
auto& acceleration = data.get_component(ct::acceleration, handle);
@@ -164,6 +168,7 @@ TEST(DummyTest, fuu)
}),
sea::tags(st::move)
.for_subtasks([](auto& s, auto& data){
std::cout << "move " << std::this_thread::get_id() << std::endl;
data.for_entities([&data](auto& handle) {
auto& position = data.get_component(ct::position, handle);
auto& velocity = data.get_component(ct::velocity, handle);
@@ -174,7 +179,7 @@ TEST(DummyTest, fuu)
}),
sea::tags(st::render)
.for_subtasks([](auto& s, auto& data){
std::cout << "render" << std::endl;
std::cout << "render " << std::this_thread::get_id() << std::endl;
data.for_entities([&data](auto& handle) {
auto& position = data.get_component(ct::position, handle);
auto& velocity = data.get_component(ct::velocity, handle);


Laden…
Annuleren
Opslaan