From 46b668f235f4b0c524a0405f1b731bda9c01918b Mon Sep 17 00:00:00 2001 From: bergmann Date: Mon, 25 Nov 2019 19:21:12 +0100 Subject: [PATCH] * refactored/simplified 'stream' class --- include/asyncpp/core/future.inl | 6 +- include/asyncpp/core/stream.h | 68 ++++---- include/asyncpp/core/stream.inl | 158 +++++++++--------- include/asyncpp/core/stream.pre.h | 39 +---- include/asyncpp/timing/interval.h | 29 ++-- include/asyncpp/timing/interval.inl | 30 ++-- include/asyncpp/timing/timeout.h | 88 +++++----- include/asyncpp/timing/timeout.inl | 108 ++++++------ test/asyncpp/core/stream_tests.cpp | 55 +++--- .../asyncpp/executor/current_thread_tests.cpp | 15 +- test/asyncpp/timing/interval_tests.cpp | 4 +- test/asyncpp/timing/timeout_tests.cpp | 57 ++++--- 12 files changed, 312 insertions(+), 345 deletions(-) diff --git a/include/asyncpp/core/future.inl b/include/asyncpp/core/future.inl index 274a3d9..6763be1 100644 --- a/include/asyncpp/core/future.inl +++ b/include/asyncpp/core/future.inl @@ -12,7 +12,7 @@ namespace asyncpp { - /* future::map */ + /* base_future::map */ template< typename T_value, @@ -68,7 +68,7 @@ namespace asyncpp std::forward(p_lambda)); } - /* future::and_then */ + /* base_future::and_then */ template< typename T_value, @@ -124,7 +124,7 @@ namespace asyncpp std::forward(p_lambda)); } - /* future::timeout */ + /* base_future::timeout */ #ifdef asyncpp_timing template< diff --git a/include/asyncpp/core/stream.h b/include/asyncpp/core/stream.h index 1752769..46a0d2a 100644 --- a/include/asyncpp/core/stream.h +++ b/include/asyncpp/core/stream.h @@ -11,32 +11,33 @@ namespace asyncpp { template< - typename T_object, - typename T_impl> - struct stream + typename T_value, + typename T_derived> + struct base_stream + : public tag_stream { - using object_type = T_object; - using clean_object_type = std::decay_t; - using trait_type = stream_trait; - using value_type = typename trait_type::value_type; - using result_type = stream_result; - using reference = clean_object_type&; - using pointer = clean_object_type*; - using const_reference = const clean_object_type&; - using const_pointer = const clean_object_type*; - - object_type ref; + public: + using value_type = T_value; + using result_type = stream_result; + using derived_type = T_derived; + using this_type = base_stream; + public: /** - * @brief Value constructor. + * @brief Execute the given lambda for each element in the stream. + * + * @return Returns a future that completes once the stream is finished. */ - template - inline stream(X_object&& p_ref); + template + inline auto for_each(X_lambda&& p_lambda) const &; /** - * @brief Function that will be called repeatedly to check if the stream has values. + * @brief Execute the given lambda for each element in the stream. + * + * @return Returns a future that completes once the stream is finished. */ - inline result_type poll(); + template + inline auto for_each(X_lambda&& p_lambda) &; /** * @brief Execute the given lambda for each element in the stream. @@ -44,23 +45,34 @@ namespace asyncpp * @return Returns a future that completes once the stream is finished. */ template - inline auto for_each(X_lambda&& p_lambda); + inline auto for_each(X_lambda&& p_lambda) &&; - #ifdef asyncpp_timing + #ifdef asyncpp_timing + public: /** * @brief Throw an execption if the timeout has passed. * * This method is only enabled if timer.h is included before. */ template - inline auto timeout(const duration& p_timeout); - #endif + inline auto timeout(const duration& p_timeout) const &; - public: - inline pointer operator->(); - inline reference operator*(); - inline const_pointer operator->() const; - inline const_reference operator*() const; + /** + * @brief Throw an execption if the timeout has passed. + * + * This method is only enabled if timer.h is included before. + */ + template + inline auto timeout(const duration& p_timeout) &; + + /** + * @brief Throw an execption if the timeout has passed. + * + * This method is only enabled if timer.h is included before. + */ + template + inline auto timeout(const duration& p_timeout) &&; + #endif }; } diff --git a/include/asyncpp/core/stream.inl b/include/asyncpp/core/stream.inl index 1fce400..bb62afe 100644 --- a/include/asyncpp/core/stream.inl +++ b/include/asyncpp/core/stream.inl @@ -11,120 +11,118 @@ namespace asyncpp { - /* stream_base */ + /* base_stream::for_each */ - template - template - auto stream_base - ::for_each(X_stream&& self, X_lambda&& p_lambda) + template< + typename T_value, + typename T_derived> + template< + typename X_lambda> + auto base_stream + ::for_each(X_lambda&& p_lambda) const & { - using stream_type = X_stream; using lambda_type = X_lambda; - using for_each_type = __stream::for_each_impl; + using for_each_type = __stream::for_each_impl; + + auto& self = static_cast(*this); return for_each_type( - std::forward(self), + std::forward(self), std::forward(p_lambda)); } - #ifdef asyncpp_timing - template - template - auto stream_base - ::timeout(X_stream&& self, const duration& p_timeout) - { - using stream_type = X_stream; - using timeout_type = timing::timeout; - - return as_stream(timeout_type( - std::forward(self), - p_timeout)); - } - #endif - - /* stream */ - template< typename T_value, - typename T_impl> + typename T_derived> template< - typename X_object> - stream - ::stream(X_object&& p_ref) - : ref(std::forward(p_ref)) - { } + typename X_lambda> + auto base_stream + ::for_each(X_lambda&& p_lambda) & + { + using lambda_type = X_lambda; + using for_each_type = __stream::for_each_impl; - template< - typename T_value, - typename T_impl> - typename stream::result_type - stream - ::poll() - { return trait_type::poll(*this); } + auto& self = static_cast(*this); + + return for_each_type( + std::forward(self), + std::forward(p_lambda)); + } template< typename T_value, - typename T_impl> + typename T_derived> template< typename X_lambda> - auto stream - ::for_each(X_lambda&& p_lambda) - { return trait_type::for_each(std::move(*this), std::forward(p_lambda)); } + auto base_stream + ::for_each(X_lambda&& p_lambda) && + { + using lambda_type = X_lambda; + using for_each_type = __stream::for_each_impl; + + auto& self = static_cast(*this); + + return for_each_type( + std::forward(self), + std::forward(p_lambda)); + } + + /* base_stream::timeout */ #ifdef asyncpp_timing template< typename T_value, - typename T_impl> + typename T_derived> template< typename X_base, typename X_ratio> - auto stream - ::timeout(const duration& p_timeout) - { return trait_type::timeout(std::move(*this), p_timeout); } - #endif + auto base_stream + ::timeout(const duration& p_timeout) const & + { + using timeout_type = timing::timeout; - template< - typename T_value, - typename T_impl> - typename stream::pointer - stream::operator->() - { return &ref; } + auto& self = static_cast(*this); - template< - typename T_value, - typename T_impl> - typename stream::reference - stream::operator*() - { return ref; } + return timeout_type( + std::forward(self), + p_timeout); + } template< typename T_value, - typename T_impl> - typename stream::const_pointer - stream::operator->() const - { return &ref; } - + typename T_derived> template< - typename T_value, - typename T_impl> - typename stream::const_reference - stream::operator*() const - { return ref; } + typename X_base, + typename X_ratio> + auto base_stream + ::timeout(const duration& p_timeout) & + { + using timeout_type = timing::timeout; + + auto& self = static_cast(*this); - /* misc */ + return timeout_type( + std::forward(self), + p_timeout); + } - template - constexpr stream as_stream(X_value&& value) + template< + typename T_value, + typename T_derived> + template< + typename X_base, + typename X_ratio> + auto base_stream + ::timeout(const duration& p_timeout) && { - using value_type = X_value; - using stream_type = stream; + using timeout_type = timing::timeout; - return stream_type(std::forward(value)); - } + auto& self = static_cast(*this); - template - struct is_stream, void> - : public std::true_type - { }; + return timeout_type( + std::forward(self), + p_timeout); + } + #endif } diff --git a/include/asyncpp/core/stream.pre.h b/include/asyncpp/core/stream.pre.h index 2a4910e..5ac1996 100644 --- a/include/asyncpp/core/stream.pre.h +++ b/include/asyncpp/core/stream.pre.h @@ -3,45 +3,20 @@ namespace asyncpp { - template - struct stream_base - { - public: - using impl_type = T_impl; - - public: - template - static inline auto poll(X_stream& self) = delete; - - template - static inline auto for_each(X_stream&& self, X_lambda&& p_lambda); - - #ifdef asyncpp_timing - template - static inline auto timeout(X_stream&& self, const duration& p_timeout); - #endif - }; - - template - struct stream_trait; + struct tag_stream + { }; template< - typename T_object, - typename T_impl = stream_trait>> - struct stream; + typename T_value, + typename T_derived> + struct base_stream; - template + template struct is_stream - : public std::false_type + : public std::is_base_of { }; template constexpr decltype(auto) is_stream_v = is_stream::value; - /** - * @brief Construct a stream from the given value. - */ - template - constexpr stream as_stream(X_value&& value); - } diff --git a/include/asyncpp/timing/interval.h b/include/asyncpp/timing/interval.h index c8ac674..8d2f06e 100644 --- a/include/asyncpp/timing/interval.h +++ b/include/asyncpp/timing/interval.h @@ -10,9 +10,12 @@ namespace asyncpp { namespace timing { struct interval final + : public base_stream { public: - friend struct stream_trait; + using value_type = void; + using base_stream_type = base_stream; + using result_type = typename base_stream_type::result_type; private: delay _delay; //!< Delay future @@ -41,24 +44,12 @@ namespace timing { * @brief Get the duration of the interval. */ inline const clock::duration& duration() const; - }; - -} } - -namespace asyncpp -{ - - /* stream_trait for timing::interval */ - template<> - struct stream_trait - : public stream_base> - { - using value_type = void; - using result_type = stream_result; - - template - static inline result_type poll(X_stream& self); + public: /* stream */ + /** + * @brief Poll the result from the stream. + */ + inline result_type poll(); }; -} +} } diff --git a/include/asyncpp/timing/interval.inl b/include/asyncpp/timing/interval.inl index 898ec88..d6fe04e 100644 --- a/include/asyncpp/timing/interval.inl +++ b/include/asyncpp/timing/interval.inl @@ -30,32 +30,24 @@ namespace timing { const clock::duration& interval::duration() const { return _duration; } -} } - -namespace asyncpp -{ - - /* stream_trait for timing::interval */ - - template - typename stream_trait::result_type - stream_trait - ::poll(X_stream& self) + typename interval::result_type + interval + ::poll() { - if ( self->_deadline.time_since_epoch().count() - && self->_delay.deadline() >= self->_deadline - && asyncpp::now() >= self->_delay.deadline()) + if ( _deadline.time_since_epoch().count() + && _delay.deadline() >= _deadline + && asyncpp::now() >= _delay.deadline()) return result_type::done(); - auto ret = self->_delay.poll(); + auto ret = _delay.poll(); if (ret.is_not_ready()) return result_type::not_ready(); - auto now = self->_delay.deadline(); - auto new_deadline = now + self->_duration; - self->_delay.reset(new_deadline); + auto now = _delay.deadline(); + auto new_deadline = now + _duration; + _delay.reset(new_deadline); return result_type::ready(); } -} +} } diff --git a/include/asyncpp/timing/timeout.h b/include/asyncpp/timing/timeout.h index 82cdcfd..f54c4a7 100644 --- a/include/asyncpp/timing/timeout.h +++ b/include/asyncpp/timing/timeout.h @@ -21,22 +21,62 @@ namespace timing { inline timeout_exception(); }; - struct tag_timeout - { }; + template + struct timeout; + + namespace __impl + { + + template< + typename T_derived, + typename = void> + struct timeout_impl; + + template< + typename T_inner> + struct timeout_impl< + timeout, + std::enable_if_t>>> + : public base_future< + typename std::decay_t::value_type, + timeout> + { + private: + using derived_type = timeout; + + public: + auto poll(); + }; + + template< + typename T_inner> + struct timeout_impl< + timeout, + std::enable_if_t>>> + : public base_stream< + typename std::decay_t::value_type, + timeout> + { + private: + using derived_type = timeout; + + public: + auto poll(); + }; + + } template struct timeout final - : public std::conditional_t< - is_future_v, - base_future::value_type, timeout>, - tag_timeout> + : public __impl::timeout_impl> { public: using inner_type = T_inner; using value_type = typename std::decay_t::value_type; + using this_type = timeout; + using impl_type = __impl::timeout_impl; - public: - friend struct stream_trait; + friend impl_type; private: inner_type _inner; //!< Inner future / stream. @@ -58,38 +98,6 @@ namespace timing { template inline void reset( const duration& p_timeout); - - public: /* future */ - /** - * @brief Poll the result from the future. - */ - template< - typename X = std::decay_t, - typename = std::enable_if_t>> - inline auto poll(); }; } } - -namespace asyncpp -{ - - /* stream_trait for timing::timeout */ - - template - struct stream_trait< - timing::timeout, - std::enable_if_t> - > - : public stream_base, void>> - { - using inner_type = T_inner; - using timeout_type = timing::timeout; - using value_type = typename inner_type::value_type; - using result_type = typename inner_type::result_type; - - template - static inline result_type poll(X_stream& self); - }; - -} diff --git a/include/asyncpp/timing/timeout.inl b/include/asyncpp/timing/timeout.inl index 14b6a4e..b98f4b3 100644 --- a/include/asyncpp/timing/timeout.inl +++ b/include/asyncpp/timing/timeout.inl @@ -7,6 +7,59 @@ namespace asyncpp { namespace timing { + namespace __impl + { + + template< + typename T_inner> + auto timeout_impl< + timeout, + std::enable_if_t>>> + ::poll() + { + auto& self = static_cast(*this); + auto r = self._inner.poll(); + + if ( r.is_not_ready() + && self._delay.poll()) + { + auto new_deadline = self._delay.deadline() + self._timeout; + self._delay.reset(new_deadline); + throw timing::timeout_exception(); + } + + return r; + } + + + template< + typename T_inner> + auto timeout_impl< + timeout, + std::enable_if_t>>> + ::poll() + { + auto& self = static_cast(*this); + auto r = self._inner.poll(); + + if (r.is_not_ready()) + { + if (self._delay.poll()) + { + self._delay.reset(self._timeout); + throw timing::timeout_exception(); + } + } + else + { + self._delay.reset(self._timeout); + } + + return r; + } + + } + /* timeout_exception */ timeout_exception::timeout_exception() @@ -42,59 +95,4 @@ namespace timing { _delay.reset(asyncpp::now() + p_duration); } - template< - typename T_inner> - template< - typename X, - typename> - auto timeout - ::poll() - { - auto r = _inner.poll(); - - if ( r.is_not_ready() - && _delay.poll()) - { - auto new_deadline = _delay.deadline() + _timeout; - _delay.reset(new_deadline); - throw timing::timeout_exception(); - } - - return r; - } - } } - -namespace asyncpp -{ - - /* stream_trait for timing::timeout */ - - template - template - typename T_inner::result_type - stream_trait< - timing::timeout, - std::enable_if_t> - > - ::poll(X_stream& self) - { - auto r = self->_inner.poll(); - - if (r.is_not_ready()) - { - if (self->_delay.poll()) - { - self->_delay.reset(self->_timeout); - throw timing::timeout_exception(); - } - } - else - { - self->_delay.reset(self->_timeout); - } - - return r; - } - -} diff --git a/test/asyncpp/core/stream_tests.cpp b/test/asyncpp/core/stream_tests.cpp index 1e64fc6..e489ca0 100644 --- a/test/asyncpp/core/stream_tests.cpp +++ b/test/asyncpp/core/stream_tests.cpp @@ -6,43 +6,36 @@ using namespace ::testing; using namespace ::asyncpp; struct delay + : public base_stream { - int const delay { 5 }; - int const threshold { 2 }; - int count { 0 }; -}; - -namespace asyncpp -{ - - template<> - struct stream_trait - : public stream_base> +private: + int const _delay { 5 }; + int const _threshold { 2 }; + int _count { 0 }; + +public: + inline delay(int p_delay, int p_threshold, int p_count) + : _delay (p_delay) + , _threshold(p_threshold) + , _count (p_count) + { } + + auto poll() { - using value_type = int; - - template - static inline auto poll(X_stream& self) - { - using result_type = stream_result; + if (_count >= _delay) + return result_type::done(); - if (self->count >= self->delay) - return result_type::done(); + ++_count; - ++self->count; - - return self->count <= self->threshold - ? result_type::not_ready() - : result_type::ready(self->count); - } - }; - -} + return _count <= _threshold + ? result_type::not_ready() + : result_type::ready(_count); + } +}; TEST(stream_tests, poll) { - delay d { 5, 2, 0 }; - auto s = as_stream(d); + delay s { 5, 2, 0 }; auto r0 = s.poll(); ASSERT_FALSE(r0); @@ -71,7 +64,7 @@ TEST(stream_tests, for_each) { int i = 0; delay d { 5, 0, 0 }; - auto f = as_stream(d) + auto f = d .for_each([&i](int x) { ++i; EXPECT_EQ(i, x); diff --git a/test/asyncpp/executor/current_thread_tests.cpp b/test/asyncpp/executor/current_thread_tests.cpp index c564d97..c712517 100644 --- a/test/asyncpp/executor/current_thread_tests.cpp +++ b/test/asyncpp/executor/current_thread_tests.cpp @@ -143,12 +143,11 @@ TEST(current_thread_tests, interval) .WillOnce(Return(time_point(std::chrono::seconds(50)))); e.run( - as_stream( - timing::interval( - time_point(), - std::chrono::seconds(10), - time_point() + std::chrono::seconds(50))) - .for_each([&m]{ - m.call(); - })); + timing::interval( + time_point(), + std::chrono::seconds(10), + time_point() + std::chrono::seconds(50)) + .for_each([&m]{ + m.call(); + })); } diff --git a/test/asyncpp/timing/interval_tests.cpp b/test/asyncpp/timing/interval_tests.cpp index 87c9d3f..e86feeb 100644 --- a/test/asyncpp/timing/interval_tests.cpp +++ b/test/asyncpp/timing/interval_tests.cpp @@ -14,7 +14,7 @@ TEST(interval_tests, poll) InSequence seq; StrictMock m; asyncpp::timing::timer t; - interval i( + interval s( time_point(std::chrono::seconds(10)), std::chrono::seconds(5), time_point(std::chrono::seconds(30))); @@ -25,8 +25,6 @@ TEST(interval_tests, poll) t.make_current(); - auto s = as_stream(i); - EXPECT_CALL(m, now) .WillOnce(Return(time_point(std::chrono::seconds(0)))); diff --git a/test/asyncpp/timing/timeout_tests.cpp b/test/asyncpp/timing/timeout_tests.cpp index 439db09..27d3449 100644 --- a/test/asyncpp/timing/timeout_tests.cpp +++ b/test/asyncpp/timing/timeout_tests.cpp @@ -9,18 +9,18 @@ using namespace ::testing; using namespace ::asyncpp; using namespace ::asyncpp::timing; -struct test - : public base_future +struct test_future + : public base_future { public: using value_type = int; - using this_type = test; + using this_type = test_future; using base_future_type = base_future; public: int i; - inline test(int p_i) + inline test_future(int p_i) : i(p_i) { } @@ -33,26 +33,29 @@ public: } }; -namespace asyncpp +struct test_stream + : public base_stream { +public: + using value_type = int; + using this_type = test_stream; + using base_future_type = base_future; - template<> - struct stream_trait - : public stream_base> - { - using value_type = int; - using result_type = stream_result; - - template - static inline result_type poll(X_future& self) - { - return self->i == 0 ? result_type::not_ready() : - self->i < 0 ? result_type::done() : - result_type::ready(self->i); - } - }; +public: + int i; -} + inline test_stream(int p_i) + : i(p_i) + { } + +public: + inline result_type poll() + { + return i == 0 ? result_type::not_ready() : + i < 0 ? result_type::done() : + result_type::ready(i); + } +}; TEST(timeout_tests, poll_future_no_timeout) { @@ -65,7 +68,7 @@ TEST(timeout_tests, poll_future_no_timeout) EXPECT_CALL(m, now()) .WillOnce(Return(time_point(std::chrono::seconds(0)))); - test t(0); + test_future t(0); auto f = t .timeout(std::chrono::seconds(5)); @@ -98,7 +101,7 @@ TEST(timeout_tests, poll_future_timeout) EXPECT_CALL(m, now()) .WillOnce(Return(time_point(std::chrono::seconds(0)))); - auto f = test(0) + auto f = test_future(0) .timeout(std::chrono::seconds(5)); EXPECT_CALL(m, now()) @@ -130,8 +133,8 @@ TEST(timeout_tests, poll_stream_no_timeout) EXPECT_CALL(m, now()) .WillOnce(Return(time_point(std::chrono::seconds(0)))); - auto t = test { 0 }; - auto f = as_stream(t) + auto t = test_stream { 0 }; + auto f = t .timeout(std::chrono::seconds(5)); EXPECT_CALL(m, now()) @@ -180,8 +183,8 @@ TEST(timeout_tests, poll_stream_timeout) EXPECT_CALL(m, now()) .WillOnce(Return(time_point(std::chrono::seconds(0)))); - auto t = test { 0 }; - auto f = as_stream(t) + auto t = test_stream { 0 }; + auto f = t .timeout(std::chrono::seconds(5)); EXPECT_CALL(m, now())