浏览代码

* fixed dead lock in thread pool

wip
bergmann 5 年前
父节点
当前提交
28f4d2508c
共有 11 个文件被更改,包括 137 次插入39 次删除
  1. +1
    -1
      include/ecs/core/system/scheduler/atomic_counter.h
  2. +2
    -1
      include/ecs/core/utils/thread_pool.h
  3. +1
    -1
      include/ecs/core/utils/thread_pool/pool.h
  4. +1
    -1
      include/ecs/core/utils/thread_pool/pool.inl
  5. +68
    -0
      include/ecs/core/utils/thread_pool/task_queue.h
  6. +15
    -0
      include/ecs/core/utils/thread_pool/task_queue.inl
  7. +0
    -28
      include/ecs/core/utils/thread_pool/types.h
  8. +1
    -1
      include/ecs/core/utils/thread_pool/worker.h
  9. +8
    -3
      include/ecs/core/utils/thread_pool/worker.inl
  10. +2
    -3
      src/core/utils/thread_pool/pool.cpp
  11. +38
    -0
      src/core/utils/thread_pool/task_queue.cpp

+ 1
- 1
include/ecs/core/system/scheduler/atomic_counter.h 查看文件

@@ -3,7 +3,7 @@
#include <ecs/config.h>
#include <ecs/tag/system.h>
#include <ecs/context/context.fwd.h>
#include <ecs/core/utils/thread_pool/types.h>
#include <ecs/core/utils/thread_pool/task_queue.h>

namespace ecs {
namespace core {


+ 2
- 1
include/ecs/core/utils/thread_pool.h 查看文件

@@ -1,8 +1,9 @@
#pragma once

#include "./thread_pool/pool.h"
#include "./thread_pool/types.h"
#include "./thread_pool/task_queue.h"
#include "./thread_pool/worker.h"

#include "./thread_pool/pool.inl"
#include "./thread_pool/task_queue.inl"
#include "./thread_pool/worker.inl"

+ 1
- 1
include/ecs/core/utils/thread_pool/pool.h 查看文件

@@ -4,8 +4,8 @@
#include <vector>

#include <ecs/config.h>
#include <ecs/core/utils/thread_pool/types.h>
#include <ecs/core/utils/thread_pool/worker.h>
#include <ecs/core/utils/thread_pool/task_queue.h>

namespace ecs {
namespace core {


+ 1
- 1
include/ecs/core/utils/thread_pool/pool.inl 查看文件

@@ -12,7 +12,7 @@ namespace utils {
{
if (worker_id == -1)
{
_tasks.enqueue(std::forward<T_task>(task));
_tasks.push(std::forward<T_task>(task));
for (auto& w : _workers)
{
w->signal();


+ 68
- 0
include/ecs/core/utils/thread_pool/task_queue.h 查看文件

@@ -0,0 +1,68 @@
#pragma once

#include <sstream> // TODO debug!
#include <queue>
#include <ecs/config.h>
#include <moodycamel/concurrentqueue.h>

#include "../fixed_function.h"

namespace ecs {
namespace core {
namespace utils {

/**
* Functor to execute as task inside the thread pool. The functor can have a max size of 128 bytes!
*/
using task = fixed_function<void(size_t thread_id), 128>;

/**
* normal task queue
*/
using task_queue = std::queue<task>;

/**
* none blocking concurrent queue to store thread pool tasks in
*/
struct concurrent_task_queue
{
private:
using inner_queue_type = moodycamel::ConcurrentQueue<task>;

private:
std::atomic<ssize_t> _count;
inner_queue_type _inner_queue;

public:
/**
* constructor
*/
concurrent_task_queue();

/**
* check if the task queue is empty
*
* @retval TRUE if the queue is empty
* @retval FALSE if the queue contains at least one element
*/
inline bool empty() const;

/**
* push a new element to the queue
*
* @param t task to push to queue
*/
void push(task&& t);

/**
* try to get dequeue a task from the queue
*
* @param t dequeued task
*
* @retval TRUE if an task was dequeued
* @retval FALSE if the queue is empty
*/
bool pop(task& t);
};

} } }

+ 15
- 0
include/ecs/core/utils/thread_pool/task_queue.inl 查看文件

@@ -0,0 +1,15 @@
#pragma once

#include <ecs/core/utils/thread_pool/task_queue.h>

namespace ecs {
namespace core {
namespace utils {

bool concurrent_task_queue
::empty() const
{
return (_count.load() <= 0);
}

} } }

+ 0
- 28
include/ecs/core/utils/thread_pool/types.h 查看文件

@@ -1,28 +0,0 @@
#pragma once

#include <queue>
#include <ecs/config.h>
#include <moodycamel/blockingconcurrentqueue.h>

#include "../fixed_function.h"

namespace ecs {
namespace core {
namespace utils {

/**
* Functor to execute as task inside the thread pool. The functor can have a max size of 128 bytes!
*/
using task = fixed_function<void(size_t thread_id), 128>;

/**
* none blocking concurrent queue to store thread pool tasks in
*/
using concurrent_task_queue = moodycamel::BlockingConcurrentQueue<task>;

/**
* normal task queue
*/
using task_queue = std::queue<task>;

} } }

+ 1
- 1
include/ecs/core/utils/thread_pool/worker.h 查看文件

@@ -5,7 +5,7 @@
#include <condition_variable>
#include <ecs/config.h>

#include "./types.h"
#include "./task_queue.h"
#include "../movable_atomic.h"

namespace ecs {


+ 8
- 3
include/ecs/core/utils/thread_pool/worker.inl 查看文件

@@ -1,6 +1,7 @@
#pragma once

#include <ecs/core/utils/thread_pool/worker.h>
#include <ecs/core/utils/thread_pool/task_queue.inl>

namespace ecs {
namespace core {
@@ -17,19 +18,23 @@ namespace utils {
::pop(task& t, const std::chrono::duration<T_rep, T_period>& timeout)
{
std::unique_lock<std::mutex> lock(_mutex);
if (_conditional.wait_for(lock, std::chrono::milliseconds(1000)) != std::cv_status::timeout)
do
{
if (_state != state::running)
{
return false;
}
if (!_own_tasks.empty())
{
t = std::move(_own_tasks.front());
_own_tasks.pop();
return true;
}
if (_all_tasks.wait_dequeue_timed(t, 0)) // TODO: this will sometimes return false, even if the queue is not yet empty
if (!_all_tasks.empty() && _all_tasks.pop(t)) // TODO: this will sometimes return false, even if the queue is not yet empty
{
return true;
}
}
} while(_conditional.wait_for(lock, timeout) != std::cv_status::timeout);
return false;
}



+ 2
- 3
src/core/utils/thread_pool/pool.cpp 查看文件

@@ -1,5 +1,3 @@
#include <iostream>

#include <ecs/core/utils/fixed_function.inl>
#include <ecs/core/utils/thread_pool/pool.inl>
#include <ecs/core/utils/thread_pool/worker.inl>
@@ -22,7 +20,7 @@ auto thread_pool
void thread_pool
::initialize_workers(size_t count)
{
// _workers.reserve(count);
_workers.reserve(count);
for (size_t i = 0; i < count; ++i)
{
_workers.emplace_back(std::make_unique<thread_pool_worker>(_tasks, i + 1));
@@ -38,6 +36,7 @@ void thread_pool
thread_pool
::thread_pool(size_t count)
{
assert(count > 1);
initialize_workers(count);
}



+ 38
- 0
src/core/utils/thread_pool/task_queue.cpp 查看文件

@@ -0,0 +1,38 @@
#include <ecs/core/utils/fixed_function.inl>
#include <ecs/core/utils/thread_pool/task_queue.inl>

using namespace ::ecs::core::utils;

concurrent_task_queue
::concurrent_task_queue()
: _count (0)
, _inner_queue ()
{ }

void concurrent_task_queue
::push(task&& t)
{
if (!_inner_queue.enqueue(std::move(t)))
{
throw std::overflow_error("task queue is out of memory");
}
_count++;
}

bool concurrent_task_queue
::pop(task& t)
{
ssize_t old_count = _count.load(std::memory_order_relaxed);
while (old_count > 0)
{
if (_count.compare_exchange_weak(old_count, old_count - 1, std::memory_order_acquire, std::memory_order_relaxed))
{
while (!_inner_queue.try_dequeue(t))
{
continue;
}
return true;
}
}
return false;
}

正在加载...
取消
保存