Ver a proveniência

* Refactored task and executor classes

master
bergmann há 4 anos
ascendente
cometimento
b0a59eb054
20 ficheiros alterados com 299 adições e 260 eliminações
  1. +0
    -4
      include/asyncpp/core/task.h
  2. +0
    -23
      include/asyncpp/core/task/current_task_lock.h
  3. +0
    -16
      include/asyncpp/core/task/current_task_lock.inl
  4. +0
    -8
      include/asyncpp/core/task/current_task_lock.pre.h
  5. +40
    -36
      include/asyncpp/core/task/task.h
  6. +24
    -19
      include/asyncpp/core/task/task.inl
  7. +0
    -13
      include/asyncpp/core/task/task.pre.h
  8. +0
    -32
      include/asyncpp/core/task/task_tpl.h
  9. +0
    -29
      include/asyncpp/core/task/task_tpl.inl
  10. +5
    -5
      include/asyncpp/executor/current_thread.h
  11. +17
    -38
      include/asyncpp/executor/current_thread.inl
  12. +34
    -8
      include/asyncpp/executor/executor.h
  13. +17
    -1
      include/asyncpp/executor/executor.inl
  14. +81
    -0
      include/asyncpp/executor/task_handle.h
  15. +50
    -0
      include/asyncpp/executor/task_handle.inl
  16. +4
    -4
      include/asyncpp/timing/impl/registration.h
  17. +3
    -3
      include/asyncpp/timing/impl/registration.inl
  18. +1
    -4
      include/asyncpp/timing/timer.inl
  19. +19
    -9
      test/asyncpp/core/task_tests.cpp
  20. +4
    -8
      test/asyncpp/executor/current_thread_tests.cpp

+ 0
- 4
include/asyncpp/core/task.h Ver ficheiro

@@ -1,9 +1,5 @@
#pragma once

#include "task/task.h"
#include "task/task_tpl.h"
#include "task/current_task_lock.h"

#include "task/task.inl"
#include "task/task_tpl.inl"
#include "task/current_task_lock.inl"

+ 0
- 23
include/asyncpp/core/task/current_task_lock.h Ver ficheiro

@@ -1,23 +0,0 @@
#pragma once

#include "task.h"
#include "current_task_lock.pre.h"

namespace asyncpp
{

struct current_task_lock
{
public:
/**
* @brief Constructor.
*/
current_task_lock(const task_ptr_s& p_task);

/**
* @brief Destructor.
*/
~current_task_lock();
};

}

+ 0
- 16
include/asyncpp/core/task/current_task_lock.inl Ver ficheiro

@@ -1,16 +0,0 @@
#pragma once

#include "current_task_lock.h"

namespace asyncpp
{

/* current_task_lock */

current_task_lock::current_task_lock(const task_ptr_s& p_task)
{ task::local_storage().current = p_task; }

current_task_lock::~current_task_lock()
{ task::local_storage().current.reset(); }

}

+ 0
- 8
include/asyncpp/core/task/current_task_lock.pre.h Ver ficheiro

@@ -1,8 +0,0 @@
#pragma once

namespace asyncpp
{

struct current_task_lock;

}

+ 40
- 36
include/asyncpp/core/task/task.h Ver ficheiro

@@ -1,10 +1,6 @@
#pragma once

#include <vector>
#include <functional>

#include "task.pre.h"
#include "current_task_lock.pre.h"
#include <memory>

namespace asyncpp
{
@@ -12,59 +8,67 @@ namespace asyncpp
struct task
{
public:
friend current_task_lock;
struct handle
{
public:
/**
* @brief Destructor.
*/
virtual ~handle() = default;

/**
* @brief Notify that a resource, the task is waiting for is ready.
*/
virtual void notify() = 0;
};

using notify_handler = std::function<void (task&)>;
using notify_handler_vector = std::vector<notify_handler>;
using handle_ptr_s = std::shared_ptr<handle>;
using handle_ptr_w = std::weak_ptr<handle>;

private:
bool _notified;
notify_handler_vector _notify_handlers;
handle_ptr_w _handle;

public:
/**
* @brief Destructor.
*/
virtual ~task() = default;

/**
* @brief Check if the task is notified.
* @brief Default constructor.
*/
inline bool notified() const;
inline task();

/**
* @brief Notify the task that a resource that the task is interested in, is ready to be polled.
* @brief Constructor.
*/
inline void notify();
inline task(const handle_ptr_w& p_handle);

/**
* @brief Poll the future stored in the task.
*
* @return TRUE if the task is finished, FALSE otherwise.
* @brief Notify that a resource, the task is waiting for is ready.
*/
inline bool poll();

/**
* @brief Add a callback to execute when the task is notified.
*/
inline void add_notify_handler(notify_handler p_handler);
inline void notify() const;

public:
/**
* @brief Get reference of the current task.
* @brief Get the current task.
*/
static inline const task_ptr_w& current();
static inline task current();

private:
/**
* @brief Actual implementation of the poll function.
*/
virtual bool poll_impl() = 0;
public:
struct lock
{
public:
/**
* @brief Constructor.
*/
inline lock(task::handle_ptr_s p_handle);

/**
* @brief Destructor.
*/
inline ~lock();
};

private:
struct storage
{
task_ptr_w current;
handle_ptr_w current;
};

/**


+ 24
- 19
include/asyncpp/core/task/task.inl Ver ficheiro

@@ -1,36 +1,41 @@
#pragma once

#include "task.h"

namespace asyncpp
{

/* task */
/* task::lock */

bool task::poll()
{
_notified = false;
return poll_impl();
}
task::lock::lock(task::handle_ptr_s p_handle)
{ local_storage().current = p_handle; }

bool task::notified() const
{ return _notified; }
task::lock::~lock()
{ local_storage().current.reset(); }

void task::notify()
{
_notified = true;
/* task*/

for (auto& h : _notify_handlers)
h(*this);
}
task::task()
: _handle()
{ }

task::task(const handle_ptr_w& p_handle)
: _handle(p_handle)
{ }

void task::add_notify_handler(notify_handler p_handler)
{ _notify_handlers.emplace_back(p_handler); }
void task::notify() const
{
auto s = _handle.lock();
if (s)
s->notify();
}

const task_ptr_w& task::current()
{ return local_storage().current; }
task task::current()
{ return task(local_storage().current); }

task::storage& task::local_storage()
{
thread_local storage value { };
thread_local storage value;
return value;
}



+ 0
- 13
include/asyncpp/core/task/task.pre.h Ver ficheiro

@@ -1,13 +0,0 @@
#pragma once

#include <memory>

namespace asyncpp
{

struct task;

using task_ptr_w = std::weak_ptr<task>;
using task_ptr_s = std::shared_ptr<task>;

}

+ 0
- 32
include/asyncpp/core/task/task_tpl.h Ver ficheiro

@@ -1,32 +0,0 @@
#pragma once

#include "task.h"

namespace asyncpp
{

template<typename T_future>
struct task_tpl
: public task
{
public:
using future_type = T_future;

private:
future_type _future;

public:
/**
* @brief Constructor.
*/
template<typename X_future>
inline task_tpl(X_future&& p_future);

private:
/**
* @brief Actual implementation of the poll function.
*/
inline bool poll_impl() override;
};

}

+ 0
- 29
include/asyncpp/core/task/task_tpl.inl Ver ficheiro

@@ -1,29 +0,0 @@
#pragma once

#include "task_tpl.h"

namespace asyncpp
{

/* task_tpl */

template<typename T_future>
template<typename X_future>
task_tpl<T_future>::task_tpl(X_future&& p_future)
: _future(std::forward<X_future>(p_future))
{ }

template<typename T_future>
bool task_tpl<T_future>::poll_impl()
{ return _future.poll().is_ready(); }

/* misc */

template<typename X_future>
task_ptr_s make_task(X_future&& p_future)
{
using task_type = task_tpl<X_future>;
return std::make_shared<task_type>(std::forward<X_future>(p_future));
}

}

+ 5
- 5
include/asyncpp/executor/current_thread.h Ver ficheiro

@@ -13,8 +13,8 @@ namespace executor {
{
public:
using runtime_type = T_runtime;
using task_map_type = std::map<task*, task_ptr_s>;
using task_queue_type = std::queue<task_ptr_s>;
using task_map_type = std::map<task_handle*, task_handle_ptr_s>;
using task_queue_type = std::queue<task_handle_ptr_s>;

private:
runtime_type _runtime; //!> Runtime to call when executor is idling
@@ -54,18 +54,18 @@ namespace executor {
/**
* @brief Poll the passed task.
*/
void poll_task(task_ptr_s p_task);
void poll_task(task_handle_ptr_s p_handle);

/**
* @brief Callback to inform the executor about notified tasks.
*/
void task_notified(task& p_task);
void task_notified(task_handle& p_handle) override;

private: /* executor */
/**
* @brief Actual implementation of the spawn method.
*/
void spawn_impl(task_ptr_s p_task) override;
void spawn_impl(task_handle_ptr_s p_handle) override;
};

} }


+ 17
- 38
include/asyncpp/executor/current_thread.inl Ver ficheiro

@@ -2,32 +2,11 @@

#include "current_thread.h"

#include "executor.inl"

namespace asyncpp {
namespace executor {

namespace __impl
{

struct current_executor_lock
{
private:
executor::storage& _storage;

public:
current_executor_lock(
executor::storage& p_storage,
executor& p_executor)
: _storage (p_storage)
{
_storage.current = &p_executor;
}

~current_executor_lock()
{ _storage.current = nullptr; }
};

}

/* current_thread */

template<typename T_runtime>
@@ -41,7 +20,10 @@ namespace executor {
void current_thread<T_runtime>
::run()
{
__impl::current_executor_lock l(executor::local_storage(), *this);
auto executor_lock = executor::lock(*this);
auto runtime_lock = _runtime.init_thread();

(void)runtime_lock;

run_impl();
}
@@ -51,7 +33,7 @@ namespace executor {
void current_thread<T_runtime>
::run(X_future&& p_future)
{
auto executor_lock = __impl::current_executor_lock(executor::local_storage(), *this);
auto executor_lock = executor::lock(*this);
auto runtime_lock = _runtime.init_thread();

(void)runtime_lock;
@@ -83,26 +65,26 @@ namespace executor {

template<typename T_runtime>
void current_thread<T_runtime>
::poll_task(task_ptr_s p_task)
::poll_task(task_handle_ptr_s p_handle)
{
current_task_lock l(p_task);
task::lock l(p_handle);

// TODO execption handling

if (!p_task->poll())
if (!p_handle->poll())
{
if (p_task->notified())
_pending_tasks.emplace(p_task);
if (p_handle->notified())
_pending_tasks.emplace(p_handle);
else
_sleeping_tasks.emplace(p_task.get(), p_task);
_sleeping_tasks.emplace(p_handle.get(), p_handle);
}
}

template<typename T_runtime>
void current_thread<T_runtime>
::task_notified(task& p_task)
::task_notified(task_handle& p_handle)
{
auto it = _sleeping_tasks.find(&p_task);
auto it = _sleeping_tasks.find(&p_handle);
if (it != _sleeping_tasks.end())
{
_pending_tasks.emplace(it->second);
@@ -112,10 +94,7 @@ namespace executor {

template<typename T_runtime>
void current_thread<T_runtime>
::spawn_impl(task_ptr_s p_task)
{
p_task->add_notify_handler(std::bind(&current_thread::task_notified, this, std::placeholders::_1));
_pending_tasks.push(p_task);
}
::spawn_impl(task_handle_ptr_s p_task)
{ _pending_tasks.push(p_task); }

} }

+ 34
- 8
include/asyncpp/executor/executor.h Ver ficheiro

@@ -1,6 +1,6 @@
#pragma once

#include <asyncpp/core/task.h>
#include "task_handle.h"

namespace asyncpp {
namespace executor {
@@ -8,10 +8,10 @@ namespace executor {
struct executor
{
public:
struct storage
{
executor * current;
};
friend __impl::task_handle;
using task_handle = __impl::task_handle;
using task_handle_ptr_s = std::shared_ptr<__impl::task_handle>;

public:
/**
@@ -21,12 +21,40 @@ namespace executor {
static inline void spawn(X_future&& p_future);

private:
/**
* @brief A task was notified that a resource it is waiting for is ready.
*/
virtual void task_notified(task_handle& p_handle) = 0;

/**
* @brief Actual implementation of the spawn method.
*/
virtual void spawn_impl(task_ptr_s p_task) = 0;
virtual void spawn_impl(task_handle_ptr_s p_handle) = 0;

protected:
struct lock
{
public:
/**
* @brief Constructor.
*/
inline lock(executor& p_current);

inline lock(lock &&) = delete;
inline lock(lock const &) = delete;

/**
* @brief Destructor.
*/
inline ~lock();
};

private:
struct storage
{
executor * current;
};

/**
* @brief Get the thread local storage.
*/
@@ -34,5 +62,3 @@ namespace executor {
};

} }

#include "executor.inl"

+ 17
- 1
include/asyncpp/executor/executor.inl Ver ficheiro

@@ -2,6 +2,8 @@

#include "executor.h"

#include "task_handle.inl"

namespace asyncpp {
namespace executor {

@@ -10,10 +12,16 @@ namespace executor {
template<typename X_future>
void executor::spawn(X_future&& p_future)
{
using future_type = X_future;
using task_handle_type = __impl::task_handle_tpl<future_type>;

auto exec = local_storage().current;
if (!exec)
throw std::runtime_error("Thread local executor instance is not assigned!");
exec->spawn_impl(make_task(std::forward<X_future>(p_future)));

exec->spawn_impl(std::make_shared<task_handle_type>(
*exec,
std::forward<X_future>(p_future)));
}

executor::storage& executor::local_storage()
@@ -22,4 +30,12 @@ namespace executor {
return value;
}

/* lock */

executor::lock::lock(executor& p_current)
{ executor::local_storage().current = &p_current; }

executor::lock::~lock()
{ executor::local_storage().current = nullptr; }

} }

+ 81
- 0
include/asyncpp/executor/task_handle.h Ver ficheiro

@@ -0,0 +1,81 @@
#pragma once

#include <asyncpp/core/task.h>

namespace asyncpp {
namespace executor {

struct executor;

namespace __impl
{

struct task_handle
: public task::handle
{
private:
executor& _owner;
bool _notified;

public:
/**
* @brief Constructor.
*/
inline task_handle(executor& p_owner);

inline task_handle(task_handle &) = delete;
inline task_handle(task_handle const &) = delete;

/**
* @brief Notify that a resource, the task is waiting for is ready.
*/
inline void notify() override;

/**
* @brief Check if the task was notified.
*/
inline bool notified() const;

/**
* @brief Poll the future stored in the task.
*
* @return TRUE if the task is finished, FALSE otherwise.
*/
inline bool poll();

private:
/**
* @brief Actual poll implementation.
*/
virtual bool poll_impl() = 0;
};

template<typename T_future>
struct task_handle_tpl
: public task_handle
{
public:
using future_type = T_future;

private:
future_type _future;

public:
/**
* @brief Constructor.
*/
template<typename X_future>
inline task_handle_tpl(
executor& p_owner,
X_future&& p_future);

private:
/**
* @brief Actual poll implementation.
*/
inline bool poll_impl() override;
};

}

} }

+ 50
- 0
include/asyncpp/executor/task_handle.inl Ver ficheiro

@@ -0,0 +1,50 @@
#pragma once

#include "executor.h"
#include "task_handle.h"

namespace asyncpp {
namespace executor {
namespace __impl {

/* task_handle */

task_handle::task_handle(
executor& p_owner)
: _owner (p_owner)
, _notified (false)
{ }

void task_handle::notify()
{
_notified = true;
_owner.task_notified(*this);
}

bool task_handle::notified() const
{ return _notified; }

bool task_handle::poll()
{
_notified = false;
return poll_impl();
}

/* task_handle_tpl */

template<typename T_future>
template<typename X_future>
task_handle_tpl<T_future>
::task_handle_tpl(
executor& p_owner,
X_future&& p_future)
: task_handle (p_owner)
, _future (std::forward<X_future>(p_future))
{ }

template<typename T_future>
bool task_handle_tpl<T_future>
::poll_impl()
{ return _future.poll().is_ready(); }

} } }

+ 4
- 4
include/asyncpp/timing/impl/registration.h Ver ficheiro

@@ -16,15 +16,15 @@ namespace __impl {
public:
timer_base& owner;
const time_point deadline;
const task_ptr_w task;
const asyncpp::task task;

/**
* @brief Constructor.
*/
inline registration(
timer_base& p_owner,
const time_point& p_deadline,
const task_ptr_w& p_task);
timer_base& p_owner,
const time_point& p_deadline,
const asyncpp::task& p_task);
};

using registration_ptr_w = std::weak_ptr<registration>;


+ 3
- 3
include/asyncpp/timing/impl/registration.inl Ver ficheiro

@@ -9,9 +9,9 @@ namespace __impl {
/* registration */

registration::registration(
timer_base& p_owner,
const time_point& p_deadline,
const task_ptr_w& p_task)
timer_base& p_owner,
const time_point& p_deadline,
const asyncpp::task& p_task)
: owner (p_owner)
, deadline (p_deadline)
, task (p_task)


+ 1
- 4
include/asyncpp/timing/timer.inl Ver ficheiro

@@ -28,10 +28,7 @@ namespace timing {
auto it = r->begin();
while (it != r->end() && now >= (**it).deadline)
{
auto t = (**it).task.lock();
if (t)
t->notify();

(**it).task.notify();
++it;
}
}


+ 19
- 9
test/asyncpp/core/task_tests.cpp Ver ficheiro

@@ -1,4 +1,5 @@
#include <gtest/gtest.h>
#include <gmock/gmock.h>

#include <asyncpp.h>

@@ -7,14 +8,23 @@
using namespace ::testing;
using namespace ::asyncpp;

TEST(task_tests, poll)
struct task_handle
: public task::handle
{
auto t = make_task(test_delay { 5, 0 });

ASSERT_FALSE(t->poll());
ASSERT_FALSE(t->poll());
ASSERT_FALSE(t->poll());
ASSERT_FALSE(t->poll());
ASSERT_FALSE(t->poll());
ASSERT_TRUE (t->poll());
MOCK_METHOD0(notify, void ());
};

TEST(task_tests, lock_and_notify)
{
auto handle = std::make_shared<StrictMock<task_handle>>();

EXPECT_CALL(*handle, notify);

auto t = task::current();
t.notify(); // nop

auto lock = task::lock(handle);

t = task::current();
t.notify(); // actual notify
}

+ 4
- 8
test/asyncpp/executor/current_thread_tests.cpp Ver ficheiro

@@ -20,8 +20,8 @@ public:
using result_type = typename base_future_type::result_type;

public:
bool done { false };
task_ptr_w task;
bool done { false };
asyncpp::task task;

public:
inline test() = default;
@@ -54,15 +54,11 @@ TEST(current_thread_tests, run)
EXPECT_CALL(m, idle(nullptr))
.InSequence(s)
.WillOnce(Invoke([&t](auto){
auto s = t.task.lock();
ASSERT_TRUE(s);
s->notify();
t.task.notify();
}))
.WillOnce(Invoke([&t](auto){
t.done = true;
auto s = t.task.lock();
ASSERT_TRUE(s);
s->notify();
t.task.notify();
}));;

e.run(t);


Carregando…
Cancelar
Guardar