From 222348dd4c9ea51cf4fdbb3f3dfd622407d5675b Mon Sep 17 00:00:00 2001 From: bergmann Date: Tue, 19 Nov 2019 22:22:44 +0100 Subject: [PATCH] * Implemented current thread executor (and executors in general) * Refactored timer implementation --- include/asyncpp/{timer => core}/misc.h | 6 +- include/asyncpp/{timer => core}/misc.inl | 0 include/asyncpp/core/task.h | 37 +++-- include/asyncpp/core/task.inl | 41 ++---- include/asyncpp/executor/current_thread.h | 73 ++++++++++ include/asyncpp/executor/current_thread.inl | 136 ++++++++++++++++++ include/asyncpp/executor/executor.h | 44 ++++++ include/asyncpp/executor/executor.inl | 28 ++++ include/asyncpp/timer.h | 4 +- include/asyncpp/timer/delay.h | 8 +- include/asyncpp/timer/delay.inl | 8 +- include/asyncpp/timer/delay.pre.h | 8 ++ include/asyncpp/timer/impl/registration.h | 40 ++++++ include/asyncpp/timer/impl/registration.inl | 23 +++ include/asyncpp/timer/impl/registration.pre.h | 9 ++ include/asyncpp/timer/impl/timer_base.h | 86 +++++++++++ include/asyncpp/timer/impl/timer_base.inl | 95 ++++++++++++ include/asyncpp/timer/impl/timer_base.pre.h | 9 ++ include/asyncpp/timer/timer.h | 105 +++++--------- include/asyncpp/timer/timer.inl | 124 +++++++--------- .../asyncpp/executor/current_thread_tests.cpp | 63 ++++++++ test/asyncpp/timer/timer_tests.cpp | 54 ++++++- test/helper/now_mock.h | 4 +- test/helper/runtime_mock.h | 12 ++ 24 files changed, 812 insertions(+), 205 deletions(-) rename include/asyncpp/{timer => core}/misc.h (88%) rename include/asyncpp/{timer => core}/misc.inl (100%) create mode 100644 include/asyncpp/executor/current_thread.h create mode 100644 include/asyncpp/executor/current_thread.inl create mode 100644 include/asyncpp/executor/executor.h create mode 100644 include/asyncpp/executor/executor.inl create mode 100644 include/asyncpp/timer/delay.pre.h create mode 100644 include/asyncpp/timer/impl/registration.h create mode 100644 include/asyncpp/timer/impl/registration.inl create mode 100644 include/asyncpp/timer/impl/registration.pre.h create mode 100644 include/asyncpp/timer/impl/timer_base.h create mode 100644 include/asyncpp/timer/impl/timer_base.inl create mode 100644 include/asyncpp/timer/impl/timer_base.pre.h create mode 100644 test/asyncpp/executor/current_thread_tests.cpp create mode 100644 test/helper/runtime_mock.h diff --git a/include/asyncpp/timer/misc.h b/include/asyncpp/core/misc.h similarity index 88% rename from include/asyncpp/timer/misc.h rename to include/asyncpp/core/misc.h index 1f7fa75..6f5acd3 100644 --- a/include/asyncpp/timer/misc.h +++ b/include/asyncpp/core/misc.h @@ -2,8 +2,8 @@ #include -namespace asyncpp { -namespace timer { +namespace asyncpp +{ using clock = std::chrono::steady_clock; @@ -17,4 +17,4 @@ namespace timer { */ inline time_point now(); -} } +} diff --git a/include/asyncpp/timer/misc.inl b/include/asyncpp/core/misc.inl similarity index 100% rename from include/asyncpp/timer/misc.inl rename to include/asyncpp/core/misc.inl diff --git a/include/asyncpp/core/task.h b/include/asyncpp/core/task.h index 02444af..3aa4ba2 100644 --- a/include/asyncpp/core/task.h +++ b/include/asyncpp/core/task.h @@ -1,41 +1,55 @@ #pragma once +#include #include +#include namespace asyncpp { struct task { + public: + using notify_handler = std::function; + using notify_handler_vector = std::vector; + + private: + bool _notified; + notify_handler_vector _notify_handlers; + public: /** * @brief Destructor. */ virtual ~task() = default; + /** + * @brief Check if the task is notified. + */ + inline bool notified() const; + + /** + * @brief Notify the task that a resource that the task is interested in, is ready to be polled. + */ + inline void notify(); + /** * @brief Poll the future stored in the task. * * @return TRUE if the task is finished, FALSE otherwise. */ - bool poll(); + inline bool poll(); - private: /** - * @brief Actual implementation of the poll function. + * @brief Add a callback to execute when the task is notified. */ - virtual bool poll_impl() = 0; + inline void add_notify_handler(notify_handler p_handler); private: - struct storage - { - task* current { nullptr }; - }; - /** - * @brief Get the thread local storage. + * @brief Actual implementation of the poll function. */ - static inline storage& local_storage(); + virtual bool poll_impl() = 0; }; template @@ -62,6 +76,7 @@ namespace asyncpp inline bool poll_impl() override; }; + using task_ptr_w = std::weak_ptr; using task_ptr_s = std::shared_ptr; /** diff --git a/include/asyncpp/core/task.inl b/include/asyncpp/core/task.inl index 9b34143..97c7ab4 100644 --- a/include/asyncpp/core/task.inl +++ b/include/asyncpp/core/task.inl @@ -3,45 +3,28 @@ namespace asyncpp { - namespace __impl - { - - struct current_lock - { - task* owner; - task*& storage; - - current_lock( - task* p_owner, - task*& p_storage) - : owner (p_owner) - , storage (p_storage) - { - if (storage) - throw std::runtime_error("Thread local task instance is already assigned!"); - storage = owner; - } - - ~current_lock() - { storage = nullptr; } - }; - - } - /* task */ bool task::poll() { - __impl::current_lock l(this, local_storage().current); + _notified = false; return poll_impl(); } - task::storage& task::local_storage() + bool task::notified() const + { return _notified; } + + void task::notify() { - thread_local storage value; - return value; + _notified = true; + + for (auto& h : _notify_handlers) + h(*this); } + void task::add_notify_handler(notify_handler p_handler) + { _notify_handlers.emplace_back(p_handler); } + /* task_tpl */ template diff --git a/include/asyncpp/executor/current_thread.h b/include/asyncpp/executor/current_thread.h new file mode 100644 index 0000000..dd5f898 --- /dev/null +++ b/include/asyncpp/executor/current_thread.h @@ -0,0 +1,73 @@ +#pragma once + +#include + +#include "executor.h" + +namespace asyncpp { +namespace executor { + + template + struct current_thread + : public executor + { + public: + using runtime_type = T_runtime; + using task_map_type = std::map; + using task_queue_type = std::queue; + + private: + runtime_type _runtime; //!> Runtime to call when executor is idling + task_queue_type _pending_tasks; //!< Tasks ready for execution. + task_map_type _sleeping_tasks; //!< Tasks waiting for resource. + + public: + /** + * @brief Constructor. + */ + template + inline current_thread(X_runtime&& p_runtime); + + /** + * @brief Run the executor. + * + * It will return if all tasks are finished. + */ + void run(); + + /** + * @brief Spawn the passed future and run the executor. + * + * It will return if all tasks are finished. + */ + template + void run(X_future&& p_future); + + private: + /** + * @brief Run the executor. + * + * It will return if all tasks are finished. + */ + void run_impl(); + + /** + * @brief Poll the passed task. + */ + void poll_task(task_ptr_s p_task); + + /** + * @brief Callback to inform the executor about notified tasks. + */ + void task_notified(task& p_task); + + private: /* executor */ + /** + * @brief Actual implementation of the spawn method. + */ + void spawn_impl(task_ptr_s p_task) override; + }; + +} } + +#include "current_thread.inl" diff --git a/include/asyncpp/executor/current_thread.inl b/include/asyncpp/executor/current_thread.inl new file mode 100644 index 0000000..8a57dfb --- /dev/null +++ b/include/asyncpp/executor/current_thread.inl @@ -0,0 +1,136 @@ +#pragma once + +#include "current_thread.h" + +namespace asyncpp { +namespace executor { + + namespace __impl + { + + struct current_task_lock + { + private: + executor::storage& _storage; + + public: + current_task_lock( + executor::storage& p_storage, + const task_ptr_s& p_task) + : _storage (p_storage) + { + _storage.current_task = p_task; + } + + ~current_task_lock() + { _storage.current_task.reset(); } + }; + + struct current_executor_lock + { + private: + executor::storage& _storage; + + public: + current_executor_lock( + executor::storage& p_storage, + executor& p_executor) + : _storage (p_storage) + { + _storage.current_executor = &p_executor; + } + + ~current_executor_lock() + { _storage.current_executor = nullptr; } + }; + + } + + /* current_thread */ + + template + template + current_thread + ::current_thread(X_runtime&& p_runtime) + : _runtime(std::forward(p_runtime)) + { } + + template + void current_thread + ::run() + { + __impl::current_executor_lock l(executor::local_storage(), *this); + + run_impl(); + } + + template + template + void current_thread + ::run(X_future&& p_future) + { + __impl::current_executor_lock l(executor::local_storage(), *this); + + spawn(std::forward(p_future)); + + run_impl(); + } + + template + void current_thread + ::run_impl() + { + while (true) + { + while (!_pending_tasks.empty()) + { + auto t = _pending_tasks.front(); + _pending_tasks.pop(); + poll_task(t); + } + + if (_sleeping_tasks.empty()) + return; + + _runtime.idle(nullptr); + } + } + + template + void current_thread + ::poll_task(task_ptr_s p_task) + { + __impl::current_task_lock l(executor::local_storage(), p_task); + + // TODO execption handling + + if (!p_task->poll()) + { + if (p_task->notified()) + _pending_tasks.emplace(p_task); + else + _sleeping_tasks.emplace(p_task.get(), p_task); + } + } + + template + void current_thread + ::task_notified(task& p_task) + { + auto it = _sleeping_tasks.find(&p_task); + if (it != _sleeping_tasks.end()) + { + _pending_tasks.emplace(it->second); + _sleeping_tasks.erase(it); + } + } + + template + void current_thread + ::spawn_impl(task_ptr_s p_task) + { + p_task->add_notify_handler(std::bind(¤t_thread::task_notified, this, std::placeholders::_1)); + _pending_tasks.push(p_task); + } + +} } diff --git a/include/asyncpp/executor/executor.h b/include/asyncpp/executor/executor.h new file mode 100644 index 0000000..33789fd --- /dev/null +++ b/include/asyncpp/executor/executor.h @@ -0,0 +1,44 @@ +#pragma once + +#include + +namespace asyncpp { +namespace executor { + + struct executor + { + public: + struct storage + { + task_ptr_w current_task; + executor * current_executor; + }; + + public: + /** + * @brief Get reference of the current task. + */ + static inline const task_ptr_w& current_task(); + + /** + * @brief Spawn a new task. + */ + template + static inline void spawn(X_future&& p_future); + + private: + /** + * @brief Actual implementation of the spawn method. + */ + virtual void spawn_impl(task_ptr_s p_task) = 0; + + protected: + /** + * @brief Get the thread local storage. + */ + static inline storage& local_storage(); + }; + +} } + +#include "executor.inl" diff --git a/include/asyncpp/executor/executor.inl b/include/asyncpp/executor/executor.inl new file mode 100644 index 0000000..ed6c7b8 --- /dev/null +++ b/include/asyncpp/executor/executor.inl @@ -0,0 +1,28 @@ +#pragma once + +#include "executor.h" + +namespace asyncpp { +namespace executor { + + /* executor */ + + const task_ptr_w& executor::current_task() + { return local_storage().current_task; } + + template + void executor::spawn(X_future&& p_future) + { + auto exec = local_storage().current_executor; + if (!exec) + throw std::runtime_error("Thread local executor instance is not assigned!"); + exec->spawn_impl(make_task(std::forward(p_future))); + } + + executor::storage& executor::local_storage() + { + thread_local storage value; + return value; + } + +} } diff --git a/include/asyncpp/timer.h b/include/asyncpp/timer.h index 938e9f0..e90e6c3 100644 --- a/include/asyncpp/timer.h +++ b/include/asyncpp/timer.h @@ -1,7 +1,7 @@ #pragma once #include "timer/delay.h" -#include "timer/misc.h" +#include "timer/timer.h" #include "timer/delay.inl" -#include "timer/misc.inl" +#include "timer/timer.inl" diff --git a/include/asyncpp/timer/delay.h b/include/asyncpp/timer/delay.h index 3ede2c0..c90b7f4 100644 --- a/include/asyncpp/timer/delay.h +++ b/include/asyncpp/timer/delay.h @@ -1,7 +1,9 @@ #pragma once -#include "misc.h" +#include + #include "timer.h" +#include "delay.pre.h" namespace asyncpp { namespace timer { @@ -12,8 +14,8 @@ namespace timer { friend struct future_trait; private: - time_point _timeout; - registration _registration; + time_point _timeout; + __impl::registration _registration; public: /** diff --git a/include/asyncpp/timer/delay.inl b/include/asyncpp/timer/delay.inl index e5c2741..e127a9e 100644 --- a/include/asyncpp/timer/delay.inl +++ b/include/asyncpp/timer/delay.inl @@ -24,21 +24,21 @@ namespace timer { { } inline delay::~delay() - { timer::unregister_resource(_registration); } + { __impl::timer_base::unregister_resource(_registration); } time_point delay::timeout() const { return _timeout; } void delay::reset(const time_point& p_timeout) { - timer::unregister_resource(_registration); + __impl::timer_base::unregister_resource(_registration); _timeout = p_timeout; } template void delay::reset(const duration& p_duration) { - timer::unregister_resource(_registration); + __impl::timer_base::unregister_resource(_registration); _timeout = now() + p_duration; } @@ -64,7 +64,7 @@ namespace asyncpp if (self.ref._timeout <= now) return result_type::ready(); - timer::timer::register_resource(self.ref._registration); + timer::__impl::timer_base::register_resource(self.ref._registration); return result_type::not_ready(); } diff --git a/include/asyncpp/timer/delay.pre.h b/include/asyncpp/timer/delay.pre.h new file mode 100644 index 0000000..52daa6a --- /dev/null +++ b/include/asyncpp/timer/delay.pre.h @@ -0,0 +1,8 @@ +#pragma once + +namespace asyncpp { +namespace timer { + + struct delay; + +} } diff --git a/include/asyncpp/timer/impl/registration.h b/include/asyncpp/timer/impl/registration.h new file mode 100644 index 0000000..87fa320 --- /dev/null +++ b/include/asyncpp/timer/impl/registration.h @@ -0,0 +1,40 @@ +#pragma once + +#include + +#include + +#include "timer_base.pre.h" +#include "../delay.pre.h" + +namespace asyncpp { +namespace timer { +namespace __impl { + + struct registration + { + public: + struct inner + { + timer_base& owner; + time_point timeout; + + /** + * @brief Constructor. + */ + inline inner(timer_base& p_owner, time_point p_timeout); + }; + + using inner_ptr_w = std::weak_ptr; + + public: + delay& owner; + inner_ptr_w ptr; + + /** + * @brief Constructor. + */ + inline registration(delay& p_owner); + }; + +} } } diff --git a/include/asyncpp/timer/impl/registration.inl b/include/asyncpp/timer/impl/registration.inl new file mode 100644 index 0000000..b0516d1 --- /dev/null +++ b/include/asyncpp/timer/impl/registration.inl @@ -0,0 +1,23 @@ +#pragma once + +#include "registration.h" + +namespace asyncpp { +namespace timer { +namespace __impl { + + /* registration::inner */ + + registration::inner::inner(timer_base& p_owner, time_point p_timeout) + : owner (p_owner) + , timeout (p_timeout) + { } + + /* registration */ + + registration::registration(delay& p_owner) + : owner(p_owner) + , ptr () + { } + +} } } diff --git a/include/asyncpp/timer/impl/registration.pre.h b/include/asyncpp/timer/impl/registration.pre.h new file mode 100644 index 0000000..f4274b3 --- /dev/null +++ b/include/asyncpp/timer/impl/registration.pre.h @@ -0,0 +1,9 @@ +#pragma once + +namespace asyncpp { +namespace timer { +namespace __impl { + + struct registration; + +} } } diff --git a/include/asyncpp/timer/impl/timer_base.h b/include/asyncpp/timer/impl/timer_base.h new file mode 100644 index 0000000..00921c2 --- /dev/null +++ b/include/asyncpp/timer/impl/timer_base.h @@ -0,0 +1,86 @@ +#pragma once + +#include +#include + +#include + +#include + +#include "timer_base.pre.h" +#include "registration.h" + +namespace asyncpp { +namespace timer { +namespace __impl { + + struct timer_base + { + private: + using inner_ptr_s = std::shared_ptr; + + struct inner_less_compare + { constexpr bool operator()(const inner_ptr_s& lhs, const inner_ptr_s& rhs) const; }; + + using inner_set = std::set; + + protected: + cppcore::locked _registrations; + + public: + /** + * @brief Constructor. + */ + inline ~timer_base(); + + public: + /** + * @brief Get the number of resources assigned to this timer_base. + */ + inline size_t resource_count() const; + + /** + * @brief Set the thread local current timer_base. + */ + inline void make_current(); + + /** + * @brief Set the thread local current timer_base. + */ + inline void clear_current(); + + public: + /** + * @brief Get the thread local timer_base instance. + */ + static inline timer_base* current(); + + /** + * @brief Register a new resource within this timer_base. + */ + static inline void register_resource(registration& p_value); + + /** + * @brief Register a new resource within this timer_base. + */ + static inline void unregister_resource(registration& p_value); + + private: + /** + * @brief Add a new resource to the timer_base. + */ + inline inner_ptr_s make_inner(registration& p_value); + + private: + struct storage + { + timer_base* current { nullptr }; + }; + + /** + * @brief Get the thread local storage. + */ + static inline storage& local_storage(); + }; + +} } } diff --git a/include/asyncpp/timer/impl/timer_base.inl b/include/asyncpp/timer/impl/timer_base.inl new file mode 100644 index 0000000..bb80eac --- /dev/null +++ b/include/asyncpp/timer/impl/timer_base.inl @@ -0,0 +1,95 @@ +#pragma once + +#include "timer_base.h" + +namespace asyncpp { +namespace timer { +namespace __impl { + + /* timer_base::inner_less_compare */ + + constexpr bool timer_base::inner_less_compare::operator()(const inner_ptr_s& lhs, const inner_ptr_s& rhs) const + { + return (lhs->timeout < rhs->timeout) + ? true + : + (lhs->timeout == rhs->timeout) + && (lhs.get() < rhs.get()) + ? true + : false; + } + + /* timer_base */ + + timer_base::~timer_base() + { + auto& s = local_storage(); + if (s.current == this) + s.current = nullptr; + } + + size_t timer_base::resource_count() const + { return _registrations.lock()->size(); } + + void timer_base::make_current() + { + auto& s = local_storage(); + if (s.current) + throw std::runtime_error("Thread local timer_base instance is already assigned!"); + s.current = this; + } + + void timer_base::clear_current() + { + auto& s = local_storage(); + if (s.current && s.current != this) + throw std::runtime_error("Thread local timer_base instance is not assigned to this instance!"); + s.current = nullptr; + } + + timer_base* timer_base::current() + { return local_storage().current; } + + void timer_base::register_resource(registration& p_value) + { + auto s = p_value.ptr.lock(); + + if (s && s->timeout != p_value.owner.timeout()) + { + unregister_resource(p_value); + s.reset(); + } + + if (!s) + { + auto t = current(); + + if (!t) + throw std::runtime_error("Thread local timer_base instance is not assigned!"); + + p_value.ptr = t->make_inner(p_value); + } + } + + void timer_base::unregister_resource(registration& p_value) + { + auto s = p_value.ptr.lock(); + p_value.ptr.reset(); + if (s) + s->owner._registrations.lock()->erase(s); + } + + inline timer_base::inner_ptr_s timer_base::make_inner(registration& p_value) + { + auto s = std::make_shared(*this, p_value.owner.timeout()); + _registrations.lock()->insert(s); + return s; + } + + timer_base::storage& timer_base::local_storage() + { + thread_local storage value; + return value; + } + +} } } diff --git a/include/asyncpp/timer/impl/timer_base.pre.h b/include/asyncpp/timer/impl/timer_base.pre.h new file mode 100644 index 0000000..4f11170 --- /dev/null +++ b/include/asyncpp/timer/impl/timer_base.pre.h @@ -0,0 +1,9 @@ +#pragma once + +namespace asyncpp { +namespace timer { +namespace __impl { + + struct timer_base; + +} } } diff --git a/include/asyncpp/timer/timer.h b/include/asyncpp/timer/timer.h index 77c1e4a..490b305 100644 --- a/include/asyncpp/timer/timer.h +++ b/include/asyncpp/timer/timer.h @@ -5,107 +5,68 @@ #include -#include "misc.h" +#include "impl/timer_base.h" +#include "impl/registration.h" namespace asyncpp { namespace timer { - struct delay; - struct timer; - - struct registration + namespace __impl { - public: - struct inner + + template + struct storage { - timer& owner; - time_point timeout; + using inner_type = T_inner; - /** - * @brief Constructor. - */ - inline inner(timer& p_owner, time_point p_timeout); - }; + inner_type inner; - using inner_ptr_w = std::weak_ptr; + template + inline storage(X_args&&... p_args); + }; - public: - delay& owner; - inner_ptr_w ptr; - - /** - * @brief Constructor. - */ - inline registration(delay& p_owner); - }; + } + template struct timer + : public __impl::timer_base { - private: - using inner_ptr_s = std::shared_ptr; - - struct inner_less_compare - { constexpr bool operator()(const inner_ptr_s& lhs, const inner_ptr_s& rhs) const; }; - - using inner_set = std::set; + public: + using inner_type = T_inner; + using storage_type = __impl::storage; private: - cppcore::locked _registrations; + storage_type _storage; public: /** * @brief Constructor. */ - inline ~timer(); - - public: - /** - * @brief Get the number of resources assigned to this timer. - */ - inline size_t resource_count() const; - - /** - * @brief Set the thread local current timer. - */ - inline void make_current(); - - /** - * @brief Set the thread local current timer. - */ - inline void clear_current(); + template + inline timer(X_args&&... p_args); - public: /** - * @brief Get the thread local timer instance. + * @brief Handle idle of the runtime. + * + * This method is called as soon as the runtime has nothing to do. + * The passed timeout is the timepoint the method should return (or null if not set). */ - static inline timer* current(); - - /** - * @brief Register a new resource within this timer. - */ - static inline void register_resource(registration& p_value); - - /** - * @brief Register a new resource within this timer. - */ - static inline void unregister_resource(registration& p_value); + inline void idle(asyncpp::time_point* p_timeout); private: /** - * @brief Add a new resource to the timer. + * @brief Call the idle method of the inner runtime. */ - inline inner_ptr_s make_inner(registration& p_value); - - private: - struct storage - { - timer* current { nullptr }; - }; + template + inline auto inner_idle(asyncpp::time_point* p_timeout) + -> std::enable_if_t>; /** - * @brief Get the thread local storage. + * @brief Call the idle method of the inner runtime. */ - static inline storage& local_storage(); + template + inline auto inner_idle(asyncpp::time_point* p_timeout) + -> std::enable_if_t>; }; } } diff --git a/include/asyncpp/timer/timer.inl b/include/asyncpp/timer/timer.inl index 65def8d..a5f424b 100644 --- a/include/asyncpp/timer/timer.inl +++ b/include/asyncpp/timer/timer.inl @@ -3,107 +3,85 @@ #include "timer.h" #include "delay.inl" +#include "impl/timer_base.inl" +#include "impl/registration.inl" + namespace asyncpp { namespace timer { - /* registration::inner */ - - registration::inner::inner(timer& p_owner, time_point p_timeout) - : owner (p_owner) - , timeout (p_timeout) - { } - - /* registration */ - - registration::registration(delay& p_owner) - : owner(p_owner) - , ptr () - { } - - /* timer::inner_less_compare */ - - constexpr bool timer::inner_less_compare::operator()(const inner_ptr_s& lhs, const inner_ptr_s& rhs) const + namespace __impl { - return (lhs->timeout < rhs->timeout) - ? true - : - (lhs->timeout == rhs->timeout) - && (lhs.get() < rhs.get()) - ? true - : false; - } - /* timer */ + template + template + storage::storage(X_args&&... p_args) + : inner(std::forward(p_args)...) + { } - timer::~timer() - { - auto& s = local_storage(); - if (s.current == this) - s.current = nullptr; - } + template<> + struct storage + { + using inner_type = void; - size_t timer::resource_count() const - { return _registrations.lock()->size(); } + inline storage() = default; + }; - void timer::make_current() - { - auto& s = local_storage(); - if (s.current) - throw std::runtime_error("Thread local timer instance is already assigned!"); - s.current = this; } - void timer::clear_current() - { - auto& s = local_storage(); - if (s.current && s.current != this) - throw std::runtime_error("Thread local timer instance is not assigned to this instance!"); - s.current = nullptr; - } + /* timer */ - timer* timer::current() - { return local_storage().current; } + template + template + timer::timer(X_args&&... p_args) + : _storage(std::forward(p_args)...) + { } - void timer::register_resource(registration& p_value) + template + void timer::idle(asyncpp::time_point* p_timeout) { - auto s = p_value.ptr.lock(); + bool has_timeout = false; + asyncpp::time_point timeout; - if (s && s->timeout != p_value.owner.timeout()) + if (p_timeout) { - unregister_resource(p_value); - s.reset(); + timeout = *p_timeout; + has_timeout = true; } - if (!s) { - auto t = current(); + auto r = _registrations.lock(); + if (!r->empty()) + { + auto t = (*r->begin())->timeout; - if (!t) - throw std::runtime_error("Thread local timer instance is not assigned!"); + if (!has_timeout) + timeout = t; + else if (t < timeout) + timeout = t; - p_value.ptr = t->make_inner(p_value); + has_timeout = true; + } } - } - void timer::unregister_resource(registration& p_value) - { - auto s = p_value.ptr.lock(); - p_value.ptr.reset(); - if (s) - s->owner._registrations.lock()->erase(s); + inner_idle(has_timeout + ? &timeout + : nullptr); } - inline timer::inner_ptr_s timer::make_inner(registration& p_value) + template + template + auto timer::inner_idle(asyncpp::time_point* p_timeout) + -> std::enable_if_t> { - auto s = std::make_shared(*this, p_value.owner.timeout()); - _registrations.lock()->insert(s); - return s; + /* no-op */ } - timer::storage& timer::local_storage() + template + template + auto timer::inner_idle(asyncpp::time_point* p_timeout) + -> std::enable_if_t> { - thread_local storage value; - return value; + _storage.inner.idle(p_timeout); } } } diff --git a/test/asyncpp/executor/current_thread_tests.cpp b/test/asyncpp/executor/current_thread_tests.cpp new file mode 100644 index 0000000..4e567c3 --- /dev/null +++ b/test/asyncpp/executor/current_thread_tests.cpp @@ -0,0 +1,63 @@ +#include + +#include "../../helper/runtime_mock.h" + +#include +#include + +using namespace ::testing; +using namespace ::asyncpp; + +struct test +{ + bool done { false }; + task_ptr_w task; +}; + +namespace asyncpp +{ + + template<> + struct future_trait + : public future_base> + { + using value_type = void; + using result_type = future_result; + + template + static inline auto poll(X_future& self) + { + self.ref.task = executor::executor::current_task(); + return self.ref.done + ? result_type::ready() + : result_type::not_ready(); + } + }; + +} + +TEST(current_thread_tests, run) +{ + Sequence s; + StrictMock m; + executor::current_thread&> e(m); + + test t; + auto f = as_future(t); + + EXPECT_CALL(m, idle(nullptr)) + .InSequence(s) + .WillOnce(Invoke([&t](auto){ + auto s = t.task.lock(); + ASSERT_TRUE(s); + s->notify(); + })) + .WillOnce(Invoke([&t](auto){ + t.done = true; + auto s = t.task.lock(); + ASSERT_TRUE(s); + s->notify(); + }));; + + e.run(f); +} diff --git a/test/asyncpp/timer/timer_tests.cpp b/test/asyncpp/timer/timer_tests.cpp index 1c3279d..5c1321b 100644 --- a/test/asyncpp/timer/timer_tests.cpp +++ b/test/asyncpp/timer/timer_tests.cpp @@ -1,6 +1,7 @@ #include #include "../../helper/now_mock.h" +#include "../../helper/runtime_mock.h" #include #include @@ -13,29 +14,29 @@ TEST(timer_tests, current) { timer::timer t; - EXPECT_EQ(nullptr, timer::timer::current()); + EXPECT_EQ(nullptr, timer::__impl::timer_base::current()); t.make_current(); - EXPECT_EQ(&t, timer::timer::current()); + EXPECT_EQ(&t, timer::__impl::timer_base::current()); t.clear_current(); - EXPECT_EQ(nullptr, timer::timer::current()); + EXPECT_EQ(nullptr, timer::__impl::timer_base::current()); t.make_current(); - EXPECT_EQ(&t, timer::timer::current()); + EXPECT_EQ(&t, timer::__impl::timer_base::current()); } - EXPECT_EQ(nullptr, timer::timer::current()); + EXPECT_EQ(nullptr, timer::__impl::timer_base::current()); } TEST(timer_tests, resource_registration) { StrictMock m; EXPECT_CALL(m, now) - .WillRepeatedly(Return(timer::time_point(std::chrono::seconds(0)))); + .WillRepeatedly(Return(time_point(std::chrono::seconds(0)))); using delay_future_type = decltype(as_future(timer::delay(std::chrono::seconds(10)))); @@ -76,3 +77,44 @@ TEST(timer_tests, resource_registration) EXPECT_EQ(0, t.resource_count()); } + +TEST(timer_tests, idle) +{ + using delay_future_type = decltype(as_future(std::declval())); + + time_point x; + InSequence seq; + StrictMock nm; + StrictMock rm; + timer::timer&> t(rm); + t.make_current(); + + EXPECT_CALL(rm, idle(nullptr)); + + t.idle(nullptr); + + EXPECT_CALL(rm, idle(Pointee(time_point(std::chrono::seconds(5))))); + + x = time_point(std::chrono::seconds(5)); + t.idle(&x); + + EXPECT_CALL(nm, now) + .WillOnce(Return(time_point(std::chrono::seconds(0)))); + + auto f = std::make_unique(timer::delay(time_point(std::chrono::seconds(10)))); + f->poll(); + + EXPECT_CALL(rm, idle(Pointee(time_point(std::chrono::seconds(10))))); + + t.idle(nullptr); + + EXPECT_CALL(rm, idle(Pointee(time_point(std::chrono::seconds(5))))); + + x = time_point(std::chrono::seconds(5)); + t.idle(&x); + + EXPECT_CALL(rm, idle(Pointee(time_point(std::chrono::seconds(10))))); + + x = time_point(std::chrono::seconds(15)); + t.idle(&x); +} diff --git a/test/helper/now_mock.h b/test/helper/now_mock.h index 693355a..7c8b42f 100644 --- a/test/helper/now_mock.h +++ b/test/helper/now_mock.h @@ -5,7 +5,7 @@ #define __asyncpp_has_impl_timer_now -#include +#include struct now_mock; @@ -24,7 +24,7 @@ public: } public: - MOCK_METHOD0(now, asyncpp::timer::time_point()); + MOCK_METHOD0(now, asyncpp::time_point()); }; namespace asyncpp { diff --git a/test/helper/runtime_mock.h b/test/helper/runtime_mock.h new file mode 100644 index 0000000..0851f20 --- /dev/null +++ b/test/helper/runtime_mock.h @@ -0,0 +1,12 @@ +#pragma once + +#include +#include + +#include + +struct runtime_mock +{ +public: + MOCK_METHOD1(idle, void (asyncpp::time_point* p_timeout)); +};