diff --git a/include/asyncpp/core/task.h b/include/asyncpp/core/task.h index 3aa2cc5..e043bc7 100644 --- a/include/asyncpp/core/task.h +++ b/include/asyncpp/core/task.h @@ -1,9 +1,5 @@ #pragma once #include "task/task.h" -#include "task/task_tpl.h" -#include "task/current_task_lock.h" #include "task/task.inl" -#include "task/task_tpl.inl" -#include "task/current_task_lock.inl" diff --git a/include/asyncpp/core/task/current_task_lock.h b/include/asyncpp/core/task/current_task_lock.h deleted file mode 100644 index 481b9a5..0000000 --- a/include/asyncpp/core/task/current_task_lock.h +++ /dev/null @@ -1,23 +0,0 @@ -#pragma once - -#include "task.h" -#include "current_task_lock.pre.h" - -namespace asyncpp -{ - - struct current_task_lock - { - public: - /** - * @brief Constructor. - */ - current_task_lock(const task_ptr_s& p_task); - - /** - * @brief Destructor. - */ - ~current_task_lock(); - }; - -} diff --git a/include/asyncpp/core/task/current_task_lock.inl b/include/asyncpp/core/task/current_task_lock.inl deleted file mode 100644 index 4367570..0000000 --- a/include/asyncpp/core/task/current_task_lock.inl +++ /dev/null @@ -1,16 +0,0 @@ -#pragma once - -#include "current_task_lock.h" - -namespace asyncpp -{ - - /* current_task_lock */ - - current_task_lock::current_task_lock(const task_ptr_s& p_task) - { task::local_storage().current = p_task; } - - current_task_lock::~current_task_lock() - { task::local_storage().current.reset(); } - -} diff --git a/include/asyncpp/core/task/current_task_lock.pre.h b/include/asyncpp/core/task/current_task_lock.pre.h deleted file mode 100644 index 3ebedf2..0000000 --- a/include/asyncpp/core/task/current_task_lock.pre.h +++ /dev/null @@ -1,8 +0,0 @@ -#pragma once - -namespace asyncpp -{ - - struct current_task_lock; - -} diff --git a/include/asyncpp/core/task/task.h b/include/asyncpp/core/task/task.h index 438ec3d..a7f16fc 100644 --- a/include/asyncpp/core/task/task.h +++ b/include/asyncpp/core/task/task.h @@ -1,10 +1,6 @@ #pragma once -#include -#include - -#include "task.pre.h" -#include "current_task_lock.pre.h" +#include namespace asyncpp { @@ -12,59 +8,67 @@ namespace asyncpp struct task { public: - friend current_task_lock; + struct handle + { + public: + /** + * @brief Destructor. + */ + virtual ~handle() = default; + + /** + * @brief Notify that a resource, the task is waiting for is ready. + */ + virtual void notify() = 0; + }; - using notify_handler = std::function; - using notify_handler_vector = std::vector; + using handle_ptr_s = std::shared_ptr; + using handle_ptr_w = std::weak_ptr; private: - bool _notified; - notify_handler_vector _notify_handlers; + handle_ptr_w _handle; public: /** - * @brief Destructor. - */ - virtual ~task() = default; - - /** - * @brief Check if the task is notified. + * @brief Default constructor. */ - inline bool notified() const; + inline task(); /** - * @brief Notify the task that a resource that the task is interested in, is ready to be polled. + * @brief Constructor. */ - inline void notify(); + inline task(const handle_ptr_w& p_handle); /** - * @brief Poll the future stored in the task. - * - * @return TRUE if the task is finished, FALSE otherwise. + * @brief Notify that a resource, the task is waiting for is ready. */ - inline bool poll(); - - /** - * @brief Add a callback to execute when the task is notified. - */ - inline void add_notify_handler(notify_handler p_handler); + inline void notify() const; public: /** - * @brief Get reference of the current task. + * @brief Get the current task. */ - static inline const task_ptr_w& current(); + static inline task current(); - private: - /** - * @brief Actual implementation of the poll function. - */ - virtual bool poll_impl() = 0; + public: + struct lock + { + public: + /** + * @brief Constructor. + */ + inline lock(task::handle_ptr_s p_handle); + + /** + * @brief Destructor. + */ + inline ~lock(); + }; private: struct storage { - task_ptr_w current; + handle_ptr_w current; }; /** diff --git a/include/asyncpp/core/task/task.inl b/include/asyncpp/core/task/task.inl index 0ce239a..3043d09 100644 --- a/include/asyncpp/core/task/task.inl +++ b/include/asyncpp/core/task/task.inl @@ -1,36 +1,41 @@ #pragma once +#include "task.h" + namespace asyncpp { - /* task */ + /* task::lock */ - bool task::poll() - { - _notified = false; - return poll_impl(); - } + task::lock::lock(task::handle_ptr_s p_handle) + { local_storage().current = p_handle; } - bool task::notified() const - { return _notified; } + task::lock::~lock() + { local_storage().current.reset(); } - void task::notify() - { - _notified = true; + /* task*/ - for (auto& h : _notify_handlers) - h(*this); - } + task::task() + : _handle() + { } + + task::task(const handle_ptr_w& p_handle) + : _handle(p_handle) + { } - void task::add_notify_handler(notify_handler p_handler) - { _notify_handlers.emplace_back(p_handler); } + void task::notify() const + { + auto s = _handle.lock(); + if (s) + s->notify(); + } - const task_ptr_w& task::current() - { return local_storage().current; } + task task::current() + { return task(local_storage().current); } task::storage& task::local_storage() { - thread_local storage value { }; + thread_local storage value; return value; } diff --git a/include/asyncpp/core/task/task.pre.h b/include/asyncpp/core/task/task.pre.h deleted file mode 100644 index 6780a6b..0000000 --- a/include/asyncpp/core/task/task.pre.h +++ /dev/null @@ -1,13 +0,0 @@ -#pragma once - -#include - -namespace asyncpp -{ - - struct task; - - using task_ptr_w = std::weak_ptr; - using task_ptr_s = std::shared_ptr; - -} diff --git a/include/asyncpp/core/task/task_tpl.h b/include/asyncpp/core/task/task_tpl.h deleted file mode 100644 index f6663b7..0000000 --- a/include/asyncpp/core/task/task_tpl.h +++ /dev/null @@ -1,32 +0,0 @@ -#pragma once - -#include "task.h" - -namespace asyncpp -{ - - template - struct task_tpl - : public task - { - public: - using future_type = T_future; - - private: - future_type _future; - - public: - /** - * @brief Constructor. - */ - template - inline task_tpl(X_future&& p_future); - - private: - /** - * @brief Actual implementation of the poll function. - */ - inline bool poll_impl() override; - }; - -} diff --git a/include/asyncpp/core/task/task_tpl.inl b/include/asyncpp/core/task/task_tpl.inl deleted file mode 100644 index 908c63e..0000000 --- a/include/asyncpp/core/task/task_tpl.inl +++ /dev/null @@ -1,29 +0,0 @@ -#pragma once - -#include "task_tpl.h" - -namespace asyncpp -{ - - /* task_tpl */ - - template - template - task_tpl::task_tpl(X_future&& p_future) - : _future(std::forward(p_future)) - { } - - template - bool task_tpl::poll_impl() - { return _future.poll().is_ready(); } - - /* misc */ - - template - task_ptr_s make_task(X_future&& p_future) - { - using task_type = task_tpl; - return std::make_shared(std::forward(p_future)); - } - -} diff --git a/include/asyncpp/executor/current_thread.h b/include/asyncpp/executor/current_thread.h index dd5f898..1d41b6d 100644 --- a/include/asyncpp/executor/current_thread.h +++ b/include/asyncpp/executor/current_thread.h @@ -13,8 +13,8 @@ namespace executor { { public: using runtime_type = T_runtime; - using task_map_type = std::map; - using task_queue_type = std::queue; + using task_map_type = std::map; + using task_queue_type = std::queue; private: runtime_type _runtime; //!> Runtime to call when executor is idling @@ -54,18 +54,18 @@ namespace executor { /** * @brief Poll the passed task. */ - void poll_task(task_ptr_s p_task); + void poll_task(task_handle_ptr_s p_handle); /** * @brief Callback to inform the executor about notified tasks. */ - void task_notified(task& p_task); + void task_notified(task_handle& p_handle) override; private: /* executor */ /** * @brief Actual implementation of the spawn method. */ - void spawn_impl(task_ptr_s p_task) override; + void spawn_impl(task_handle_ptr_s p_handle) override; }; } } diff --git a/include/asyncpp/executor/current_thread.inl b/include/asyncpp/executor/current_thread.inl index d9e30ed..b8eeb15 100644 --- a/include/asyncpp/executor/current_thread.inl +++ b/include/asyncpp/executor/current_thread.inl @@ -2,32 +2,11 @@ #include "current_thread.h" +#include "executor.inl" + namespace asyncpp { namespace executor { - namespace __impl - { - - struct current_executor_lock - { - private: - executor::storage& _storage; - - public: - current_executor_lock( - executor::storage& p_storage, - executor& p_executor) - : _storage (p_storage) - { - _storage.current = &p_executor; - } - - ~current_executor_lock() - { _storage.current = nullptr; } - }; - - } - /* current_thread */ template @@ -41,7 +20,10 @@ namespace executor { void current_thread ::run() { - __impl::current_executor_lock l(executor::local_storage(), *this); + auto executor_lock = executor::lock(*this); + auto runtime_lock = _runtime.init_thread(); + + (void)runtime_lock; run_impl(); } @@ -51,7 +33,7 @@ namespace executor { void current_thread ::run(X_future&& p_future) { - auto executor_lock = __impl::current_executor_lock(executor::local_storage(), *this); + auto executor_lock = executor::lock(*this); auto runtime_lock = _runtime.init_thread(); (void)runtime_lock; @@ -83,26 +65,26 @@ namespace executor { template void current_thread - ::poll_task(task_ptr_s p_task) + ::poll_task(task_handle_ptr_s p_handle) { - current_task_lock l(p_task); + task::lock l(p_handle); // TODO execption handling - if (!p_task->poll()) + if (!p_handle->poll()) { - if (p_task->notified()) - _pending_tasks.emplace(p_task); + if (p_handle->notified()) + _pending_tasks.emplace(p_handle); else - _sleeping_tasks.emplace(p_task.get(), p_task); + _sleeping_tasks.emplace(p_handle.get(), p_handle); } } template void current_thread - ::task_notified(task& p_task) + ::task_notified(task_handle& p_handle) { - auto it = _sleeping_tasks.find(&p_task); + auto it = _sleeping_tasks.find(&p_handle); if (it != _sleeping_tasks.end()) { _pending_tasks.emplace(it->second); @@ -112,10 +94,7 @@ namespace executor { template void current_thread - ::spawn_impl(task_ptr_s p_task) - { - p_task->add_notify_handler(std::bind(¤t_thread::task_notified, this, std::placeholders::_1)); - _pending_tasks.push(p_task); - } + ::spawn_impl(task_handle_ptr_s p_task) + { _pending_tasks.push(p_task); } } } diff --git a/include/asyncpp/executor/executor.h b/include/asyncpp/executor/executor.h index 50a6f9d..f6c7fd3 100644 --- a/include/asyncpp/executor/executor.h +++ b/include/asyncpp/executor/executor.h @@ -1,6 +1,6 @@ #pragma once -#include +#include "task_handle.h" namespace asyncpp { namespace executor { @@ -8,10 +8,10 @@ namespace executor { struct executor { public: - struct storage - { - executor * current; - }; + friend __impl::task_handle; + + using task_handle = __impl::task_handle; + using task_handle_ptr_s = std::shared_ptr<__impl::task_handle>; public: /** @@ -21,12 +21,40 @@ namespace executor { static inline void spawn(X_future&& p_future); private: + /** + * @brief A task was notified that a resource it is waiting for is ready. + */ + virtual void task_notified(task_handle& p_handle) = 0; + /** * @brief Actual implementation of the spawn method. */ - virtual void spawn_impl(task_ptr_s p_task) = 0; + virtual void spawn_impl(task_handle_ptr_s p_handle) = 0; protected: + struct lock + { + public: + /** + * @brief Constructor. + */ + inline lock(executor& p_current); + + inline lock(lock &&) = delete; + inline lock(lock const &) = delete; + + /** + * @brief Destructor. + */ + inline ~lock(); + }; + + private: + struct storage + { + executor * current; + }; + /** * @brief Get the thread local storage. */ @@ -34,5 +62,3 @@ namespace executor { }; } } - -#include "executor.inl" diff --git a/include/asyncpp/executor/executor.inl b/include/asyncpp/executor/executor.inl index 385342b..d2ac892 100644 --- a/include/asyncpp/executor/executor.inl +++ b/include/asyncpp/executor/executor.inl @@ -2,6 +2,8 @@ #include "executor.h" +#include "task_handle.inl" + namespace asyncpp { namespace executor { @@ -10,10 +12,16 @@ namespace executor { template void executor::spawn(X_future&& p_future) { + using future_type = X_future; + using task_handle_type = __impl::task_handle_tpl; + auto exec = local_storage().current; if (!exec) throw std::runtime_error("Thread local executor instance is not assigned!"); - exec->spawn_impl(make_task(std::forward(p_future))); + + exec->spawn_impl(std::make_shared( + *exec, + std::forward(p_future))); } executor::storage& executor::local_storage() @@ -22,4 +30,12 @@ namespace executor { return value; } + /* lock */ + + executor::lock::lock(executor& p_current) + { executor::local_storage().current = &p_current; } + + executor::lock::~lock() + { executor::local_storage().current = nullptr; } + } } diff --git a/include/asyncpp/executor/task_handle.h b/include/asyncpp/executor/task_handle.h new file mode 100644 index 0000000..5e73fe1 --- /dev/null +++ b/include/asyncpp/executor/task_handle.h @@ -0,0 +1,81 @@ +#pragma once + +#include + +namespace asyncpp { +namespace executor { + + struct executor; + + namespace __impl + { + + struct task_handle + : public task::handle + { + private: + executor& _owner; + bool _notified; + + public: + /** + * @brief Constructor. + */ + inline task_handle(executor& p_owner); + + inline task_handle(task_handle &) = delete; + inline task_handle(task_handle const &) = delete; + + /** + * @brief Notify that a resource, the task is waiting for is ready. + */ + inline void notify() override; + + /** + * @brief Check if the task was notified. + */ + inline bool notified() const; + + /** + * @brief Poll the future stored in the task. + * + * @return TRUE if the task is finished, FALSE otherwise. + */ + inline bool poll(); + + private: + /** + * @brief Actual poll implementation. + */ + virtual bool poll_impl() = 0; + }; + + template + struct task_handle_tpl + : public task_handle + { + public: + using future_type = T_future; + + private: + future_type _future; + + public: + /** + * @brief Constructor. + */ + template + inline task_handle_tpl( + executor& p_owner, + X_future&& p_future); + + private: + /** + * @brief Actual poll implementation. + */ + inline bool poll_impl() override; + }; + + } + +} } diff --git a/include/asyncpp/executor/task_handle.inl b/include/asyncpp/executor/task_handle.inl new file mode 100644 index 0000000..83e3d82 --- /dev/null +++ b/include/asyncpp/executor/task_handle.inl @@ -0,0 +1,50 @@ +#pragma once + +#include "executor.h" +#include "task_handle.h" + +namespace asyncpp { +namespace executor { +namespace __impl { + + /* task_handle */ + + task_handle::task_handle( + executor& p_owner) + : _owner (p_owner) + , _notified (false) + { } + + void task_handle::notify() + { + _notified = true; + _owner.task_notified(*this); + } + + bool task_handle::notified() const + { return _notified; } + + bool task_handle::poll() + { + _notified = false; + return poll_impl(); + } + + /* task_handle_tpl */ + + template + template + task_handle_tpl + ::task_handle_tpl( + executor& p_owner, + X_future&& p_future) + : task_handle (p_owner) + , _future (std::forward(p_future)) + { } + + template + bool task_handle_tpl + ::poll_impl() + { return _future.poll().is_ready(); } + +} } } diff --git a/include/asyncpp/timing/impl/registration.h b/include/asyncpp/timing/impl/registration.h index 8acbffb..9c8155e 100644 --- a/include/asyncpp/timing/impl/registration.h +++ b/include/asyncpp/timing/impl/registration.h @@ -16,15 +16,15 @@ namespace __impl { public: timer_base& owner; const time_point deadline; - const task_ptr_w task; + const asyncpp::task task; /** * @brief Constructor. */ inline registration( - timer_base& p_owner, - const time_point& p_deadline, - const task_ptr_w& p_task); + timer_base& p_owner, + const time_point& p_deadline, + const asyncpp::task& p_task); }; using registration_ptr_w = std::weak_ptr; diff --git a/include/asyncpp/timing/impl/registration.inl b/include/asyncpp/timing/impl/registration.inl index 0963866..b19be9b 100644 --- a/include/asyncpp/timing/impl/registration.inl +++ b/include/asyncpp/timing/impl/registration.inl @@ -9,9 +9,9 @@ namespace __impl { /* registration */ registration::registration( - timer_base& p_owner, - const time_point& p_deadline, - const task_ptr_w& p_task) + timer_base& p_owner, + const time_point& p_deadline, + const asyncpp::task& p_task) : owner (p_owner) , deadline (p_deadline) , task (p_task) diff --git a/include/asyncpp/timing/timer.inl b/include/asyncpp/timing/timer.inl index a085ec9..0549928 100644 --- a/include/asyncpp/timing/timer.inl +++ b/include/asyncpp/timing/timer.inl @@ -28,10 +28,7 @@ namespace timing { auto it = r->begin(); while (it != r->end() && now >= (**it).deadline) { - auto t = (**it).task.lock(); - if (t) - t->notify(); - + (**it).task.notify(); ++it; } } diff --git a/test/asyncpp/core/task_tests.cpp b/test/asyncpp/core/task_tests.cpp index 90ae885..120fef6 100644 --- a/test/asyncpp/core/task_tests.cpp +++ b/test/asyncpp/core/task_tests.cpp @@ -1,4 +1,5 @@ #include +#include #include @@ -7,14 +8,23 @@ using namespace ::testing; using namespace ::asyncpp; -TEST(task_tests, poll) +struct task_handle + : public task::handle { - auto t = make_task(test_delay { 5, 0 }); - - ASSERT_FALSE(t->poll()); - ASSERT_FALSE(t->poll()); - ASSERT_FALSE(t->poll()); - ASSERT_FALSE(t->poll()); - ASSERT_FALSE(t->poll()); - ASSERT_TRUE (t->poll()); + MOCK_METHOD0(notify, void ()); +}; + +TEST(task_tests, lock_and_notify) +{ + auto handle = std::make_shared>(); + + EXPECT_CALL(*handle, notify); + + auto t = task::current(); + t.notify(); // nop + + auto lock = task::lock(handle); + + t = task::current(); + t.notify(); // actual notify } diff --git a/test/asyncpp/executor/current_thread_tests.cpp b/test/asyncpp/executor/current_thread_tests.cpp index c712517..9f2027b 100644 --- a/test/asyncpp/executor/current_thread_tests.cpp +++ b/test/asyncpp/executor/current_thread_tests.cpp @@ -20,8 +20,8 @@ public: using result_type = typename base_future_type::result_type; public: - bool done { false }; - task_ptr_w task; + bool done { false }; + asyncpp::task task; public: inline test() = default; @@ -54,15 +54,11 @@ TEST(current_thread_tests, run) EXPECT_CALL(m, idle(nullptr)) .InSequence(s) .WillOnce(Invoke([&t](auto){ - auto s = t.task.lock(); - ASSERT_TRUE(s); - s->notify(); + t.task.notify(); })) .WillOnce(Invoke([&t](auto){ t.done = true; - auto s = t.task.lock(); - ASSERT_TRUE(s); - s->notify(); + t.task.notify(); }));; e.run(t);