Bläddra i källkod

* refactored/simplified 'stream' class

master
bergmann 4 år sedan
förälder
incheckning
46b668f235
12 ändrade filer med 312 tillägg och 345 borttagningar
  1. +3
    -3
      include/asyncpp/core/future.inl
  2. +40
    -28
      include/asyncpp/core/stream.h
  3. +78
    -80
      include/asyncpp/core/stream.inl
  4. +7
    -32
      include/asyncpp/core/stream.pre.h
  5. +10
    -19
      include/asyncpp/timing/interval.h
  6. +11
    -19
      include/asyncpp/timing/interval.inl
  7. +48
    -40
      include/asyncpp/timing/timeout.h
  8. +53
    -55
      include/asyncpp/timing/timeout.inl
  9. +24
    -31
      test/asyncpp/core/stream_tests.cpp
  10. +7
    -8
      test/asyncpp/executor/current_thread_tests.cpp
  11. +1
    -3
      test/asyncpp/timing/interval_tests.cpp
  12. +30
    -27
      test/asyncpp/timing/timeout_tests.cpp

+ 3
- 3
include/asyncpp/core/future.inl Visa fil

@@ -12,7 +12,7 @@
namespace asyncpp
{

/* future::map */
/* base_future::map */

template<
typename T_value,
@@ -68,7 +68,7 @@ namespace asyncpp
std::forward<X_lambda>(p_lambda));
}

/* future::and_then */
/* base_future::and_then */

template<
typename T_value,
@@ -124,7 +124,7 @@ namespace asyncpp
std::forward<X_lambda>(p_lambda));
}

/* future::timeout */
/* base_future::timeout */

#ifdef asyncpp_timing
template<


+ 40
- 28
include/asyncpp/core/stream.h Visa fil

@@ -11,32 +11,33 @@ namespace asyncpp
{

template<
typename T_object,
typename T_impl>
struct stream
typename T_value,
typename T_derived>
struct base_stream
: public tag_stream
{
using object_type = T_object;
using clean_object_type = std::decay_t<object_type>;
using trait_type = stream_trait<clean_object_type>;
using value_type = typename trait_type::value_type;
using result_type = stream_result<value_type>;
using reference = clean_object_type&;
using pointer = clean_object_type*;
using const_reference = const clean_object_type&;
using const_pointer = const clean_object_type*;

object_type ref;
public:
using value_type = T_value;
using result_type = stream_result<value_type>;
using derived_type = T_derived;
using this_type = base_stream<value_type, derived_type>;

public:
/**
* @brief Value constructor.
* @brief Execute the given lambda for each element in the stream.
*
* @return Returns a future that completes once the stream is finished.
*/
template<typename X_object>
inline stream(X_object&& p_ref);
template<typename X_lambda>
inline auto for_each(X_lambda&& p_lambda) const &;

/**
* @brief Function that will be called repeatedly to check if the stream has values.
* @brief Execute the given lambda for each element in the stream.
*
* @return Returns a future that completes once the stream is finished.
*/
inline result_type poll();
template<typename X_lambda>
inline auto for_each(X_lambda&& p_lambda) &;

/**
* @brief Execute the given lambda for each element in the stream.
@@ -44,23 +45,34 @@ namespace asyncpp
* @return Returns a future that completes once the stream is finished.
*/
template<typename X_lambda>
inline auto for_each(X_lambda&& p_lambda);
inline auto for_each(X_lambda&& p_lambda) &&;

#ifdef asyncpp_timing
#ifdef asyncpp_timing
public:
/**
* @brief Throw an execption if the timeout has passed.
*
* This method is only enabled if timer.h is included before.
*/
template<typename X_base, typename X_ratio>
inline auto timeout(const duration<X_base, X_ratio>& p_timeout);
#endif
inline auto timeout(const duration<X_base, X_ratio>& p_timeout) const &;

public:
inline pointer operator->();
inline reference operator*();
inline const_pointer operator->() const;
inline const_reference operator*() const;
/**
* @brief Throw an execption if the timeout has passed.
*
* This method is only enabled if timer.h is included before.
*/
template<typename X_base, typename X_ratio>
inline auto timeout(const duration<X_base, X_ratio>& p_timeout) &;

/**
* @brief Throw an execption if the timeout has passed.
*
* This method is only enabled if timer.h is included before.
*/
template<typename X_base, typename X_ratio>
inline auto timeout(const duration<X_base, X_ratio>& p_timeout) &&;
#endif
};

}

+ 78
- 80
include/asyncpp/core/stream.inl Visa fil

@@ -11,120 +11,118 @@
namespace asyncpp
{

/* stream_base */
/* base_stream::for_each */

template<typename T_impl>
template<typename X_stream, typename X_lambda>
auto stream_base<T_impl>
::for_each(X_stream&& self, X_lambda&& p_lambda)
template<
typename T_value,
typename T_derived>
template<
typename X_lambda>
auto base_stream<T_value, T_derived>
::for_each(X_lambda&& p_lambda) const &
{
using stream_type = X_stream;
using lambda_type = X_lambda;
using for_each_type = __stream::for_each_impl<stream_type, lambda_type>;
using for_each_type = __stream::for_each_impl<derived_type, lambda_type>;

auto& self = static_cast<derived_type const &>(*this);

return for_each_type(
std::forward<X_stream>(self),
std::forward<derived_type const>(self),
std::forward<X_lambda>(p_lambda));
}

#ifdef asyncpp_timing
template<typename T_impl>
template<typename X_stream, typename X_base, typename X_ratio>
auto stream_base<T_impl>
::timeout(X_stream&& self, const duration<X_base, X_ratio>& p_timeout)
{
using stream_type = X_stream;
using timeout_type = timing::timeout<stream_type>;

return as_stream(timeout_type(
std::forward<X_stream>(self),
p_timeout));
}
#endif

/* stream */

template<
typename T_value,
typename T_impl>
typename T_derived>
template<
typename X_object>
stream<T_value, T_impl>
::stream(X_object&& p_ref)
: ref(std::forward<X_object>(p_ref))
{ }
typename X_lambda>
auto base_stream<T_value, T_derived>
::for_each(X_lambda&& p_lambda) &
{
using lambda_type = X_lambda;
using for_each_type = __stream::for_each_impl<derived_type&, lambda_type>;

template<
typename T_value,
typename T_impl>
typename stream<T_value, T_impl>::result_type
stream<T_value, T_impl>
::poll()
{ return trait_type::poll(*this); }
auto& self = static_cast<derived_type &>(*this);

return for_each_type(
std::forward<derived_type &>(self),
std::forward<X_lambda>(p_lambda));
}

template<
typename T_value,
typename T_impl>
typename T_derived>
template<
typename X_lambda>
auto stream<T_value, T_impl>
::for_each(X_lambda&& p_lambda)
{ return trait_type::for_each(std::move(*this), std::forward<X_lambda>(p_lambda)); }
auto base_stream<T_value, T_derived>
::for_each(X_lambda&& p_lambda) &&
{
using lambda_type = X_lambda;
using for_each_type = __stream::for_each_impl<derived_type, lambda_type>;

auto& self = static_cast<derived_type &>(*this);

return for_each_type(
std::forward<derived_type &&>(self),
std::forward<X_lambda>(p_lambda));
}

/* base_stream::timeout */

#ifdef asyncpp_timing
template<
typename T_value,
typename T_impl>
typename T_derived>
template<
typename X_base,
typename X_ratio>
auto stream<T_value, T_impl>
::timeout(const duration<X_base, X_ratio>& p_timeout)
{ return trait_type::timeout(std::move(*this), p_timeout); }
#endif
auto base_stream<T_value, T_derived>
::timeout(const duration<X_base, X_ratio>& p_timeout) const &
{
using timeout_type = timing::timeout<derived_type>;

template<
typename T_value,
typename T_impl>
typename stream<T_value, T_impl>::pointer
stream<T_value, T_impl>::operator->()
{ return &ref; }
auto& self = static_cast<derived_type const &>(*this);

template<
typename T_value,
typename T_impl>
typename stream<T_value, T_impl>::reference
stream<T_value, T_impl>::operator*()
{ return ref; }
return timeout_type(
std::forward<derived_type const>(self),
p_timeout);
}

template<
typename T_value,
typename T_impl>
typename stream<T_value, T_impl>::const_pointer
stream<T_value, T_impl>::operator->() const
{ return &ref; }

typename T_derived>
template<
typename T_value,
typename T_impl>
typename stream<T_value, T_impl>::const_reference
stream<T_value, T_impl>::operator*() const
{ return ref; }
typename X_base,
typename X_ratio>
auto base_stream<T_value, T_derived>
::timeout(const duration<X_base, X_ratio>& p_timeout) &
{
using timeout_type = timing::timeout<derived_type&>;

auto& self = static_cast<derived_type &>(*this);

/* misc */
return timeout_type(
std::forward<derived_type &>(self),
p_timeout);
}

template<typename X_value>
constexpr stream<X_value> as_stream(X_value&& value)
template<
typename T_value,
typename T_derived>
template<
typename X_base,
typename X_ratio>
auto base_stream<T_value, T_derived>
::timeout(const duration<X_base, X_ratio>& p_timeout) &&
{
using value_type = X_value;
using stream_type = stream<value_type>;
using timeout_type = timing::timeout<derived_type>;

return stream_type(std::forward<X_value>(value));
}
auto& self = static_cast<derived_type &>(*this);

template<typename T>
struct is_stream<stream<T>, void>
: public std::true_type
{ };
return timeout_type(
std::forward<derived_type &&>(self),
p_timeout);
}
#endif

}

+ 7
- 32
include/asyncpp/core/stream.pre.h Visa fil

@@ -3,45 +3,20 @@
namespace asyncpp
{

template<typename T_impl>
struct stream_base
{
public:
using impl_type = T_impl;

public:
template<typename X_stream>
static inline auto poll(X_stream& self) = delete;

template<typename X_stream, typename X_lambda>
static inline auto for_each(X_stream&& self, X_lambda&& p_lambda);

#ifdef asyncpp_timing
template<typename X_stream, typename X_base, typename X_ratio>
static inline auto timeout(X_stream&& self, const duration<X_base, X_ratio>& p_timeout);
#endif
};

template<typename T, typename = void>
struct stream_trait;
struct tag_stream
{ };

template<
typename T_object,
typename T_impl = stream_trait<std::decay_t<T_object>>>
struct stream;
typename T_value,
typename T_derived>
struct base_stream;

template<typename T, typename = void>
template<typename T>
struct is_stream
: public std::false_type
: public std::is_base_of<tag_stream, T>
{ };

template<typename T>
constexpr decltype(auto) is_stream_v = is_stream<T>::value;

/**
* @brief Construct a stream from the given value.
*/
template<typename X_value>
constexpr stream<X_value> as_stream(X_value&& value);

}

+ 10
- 19
include/asyncpp/timing/interval.h Visa fil

@@ -10,9 +10,12 @@ namespace asyncpp {
namespace timing {

struct interval final
: public base_stream<void, interval>
{
public:
friend struct stream_trait<interval, void>;
using value_type = void;
using base_stream_type = base_stream<void, delay>;
using result_type = typename base_stream_type::result_type;

private:
delay _delay; //!< Delay future
@@ -41,24 +44,12 @@ namespace timing {
* @brief Get the duration of the interval.
*/
inline const clock::duration& duration() const;
};

} }

namespace asyncpp
{

/* stream_trait for timing::interval */

template<>
struct stream_trait<timing::interval, void>
: public stream_base<stream<timing::interval, void>>
{
using value_type = void;
using result_type = stream_result<value_type>;

template<typename X_stream>
static inline result_type poll(X_stream& self);
public: /* stream */
/**
* @brief Poll the result from the stream.
*/
inline result_type poll();
};

}
} }

+ 11
- 19
include/asyncpp/timing/interval.inl Visa fil

@@ -30,32 +30,24 @@ namespace timing {
const clock::duration& interval::duration() const
{ return _duration; }

} }

namespace asyncpp
{

/* stream_trait for timing::interval */

template<typename X_stream>
typename stream_trait<timing::interval, void>::result_type
stream_trait<timing::interval, void>
::poll(X_stream& self)
typename interval::result_type
interval
::poll()
{
if ( self->_deadline.time_since_epoch().count()
&& self->_delay.deadline() >= self->_deadline
&& asyncpp::now() >= self->_delay.deadline())
if ( _deadline.time_since_epoch().count()
&& _delay.deadline() >= _deadline
&& asyncpp::now() >= _delay.deadline())
return result_type::done();

auto ret = self->_delay.poll();
auto ret = _delay.poll();
if (ret.is_not_ready())
return result_type::not_ready();

auto now = self->_delay.deadline();
auto new_deadline = now + self->_duration;
self->_delay.reset(new_deadline);
auto now = _delay.deadline();
auto new_deadline = now + _duration;
_delay.reset(new_deadline);

return result_type::ready();
}

}
} }

+ 48
- 40
include/asyncpp/timing/timeout.h Visa fil

@@ -21,22 +21,62 @@ namespace timing {
inline timeout_exception();
};

struct tag_timeout
{ };
template<typename T_inner>
struct timeout;

namespace __impl
{

template<
typename T_derived,
typename = void>
struct timeout_impl;

template<
typename T_inner>
struct timeout_impl<
timeout<T_inner>,
std::enable_if_t<is_future_v<std::decay_t<T_inner>>>>
: public base_future<
typename std::decay_t<T_inner>::value_type,
timeout<T_inner>>
{
private:
using derived_type = timeout<T_inner>;

public:
auto poll();
};

template<
typename T_inner>
struct timeout_impl<
timeout<T_inner>,
std::enable_if_t<is_stream_v<std::decay_t<T_inner>>>>
: public base_stream<
typename std::decay_t<T_inner>::value_type,
timeout<T_inner>>
{
private:
using derived_type = timeout<T_inner>;

public:
auto poll();
};

}

template<typename T_inner>
struct timeout final
: public std::conditional_t<
is_future_v<T_inner>,
base_future<typename std::decay_t<T_inner>::value_type, timeout<T_inner>>,
tag_timeout>
: public __impl::timeout_impl<timeout<T_inner>>
{
public:
using inner_type = T_inner;
using value_type = typename std::decay_t<inner_type>::value_type;
using this_type = timeout<inner_type>;
using impl_type = __impl::timeout_impl<this_type>;

public:
friend struct stream_trait<timeout, void>;
friend impl_type;

private:
inner_type _inner; //!< Inner future / stream.
@@ -58,38 +98,6 @@ namespace timing {
template<typename X_base, typename X_ratio>
inline void reset(
const duration<X_base, X_ratio>& p_timeout);

public: /* future */
/**
* @brief Poll the result from the future.
*/
template<
typename X = std::decay_t<inner_type>,
typename = std::enable_if_t<is_future_v<X>>>
inline auto poll();
};

} }

namespace asyncpp
{

/* stream_trait for timing::timeout */

template<typename T_inner>
struct stream_trait<
timing::timeout<T_inner>,
std::enable_if_t<is_stream_v<T_inner>>
>
: public stream_base<stream<timing::timeout<T_inner>, void>>
{
using inner_type = T_inner;
using timeout_type = timing::timeout<inner_type>;
using value_type = typename inner_type::value_type;
using result_type = typename inner_type::result_type;

template<typename X_stream>
static inline result_type poll(X_stream& self);
};

}

+ 53
- 55
include/asyncpp/timing/timeout.inl Visa fil

@@ -7,6 +7,59 @@
namespace asyncpp {
namespace timing {

namespace __impl
{

template<
typename T_inner>
auto timeout_impl<
timeout<T_inner>,
std::enable_if_t<is_future_v<std::decay_t<T_inner>>>>
::poll()
{
auto& self = static_cast<derived_type&>(*this);
auto r = self._inner.poll();

if ( r.is_not_ready()
&& self._delay.poll())
{
auto new_deadline = self._delay.deadline() + self._timeout;
self._delay.reset(new_deadline);
throw timing::timeout_exception();
}

return r;
}


template<
typename T_inner>
auto timeout_impl<
timeout<T_inner>,
std::enable_if_t<is_stream_v<std::decay_t<T_inner>>>>
::poll()
{
auto& self = static_cast<derived_type&>(*this);
auto r = self._inner.poll();

if (r.is_not_ready())
{
if (self._delay.poll())
{
self._delay.reset(self._timeout);
throw timing::timeout_exception();
}
}
else
{
self._delay.reset(self._timeout);
}

return r;
}

}

/* timeout_exception */

timeout_exception::timeout_exception()
@@ -42,59 +95,4 @@ namespace timing {
_delay.reset(asyncpp::now() + p_duration);
}

template<
typename T_inner>
template<
typename X,
typename>
auto timeout<T_inner>
::poll()
{
auto r = _inner.poll();

if ( r.is_not_ready()
&& _delay.poll())
{
auto new_deadline = _delay.deadline() + _timeout;
_delay.reset(new_deadline);
throw timing::timeout_exception();
}

return r;
}

} }

namespace asyncpp
{

/* stream_trait for timing::timeout */

template<typename T_inner>
template<typename X_stream>
typename T_inner::result_type
stream_trait<
timing::timeout<T_inner>,
std::enable_if_t<is_stream_v<T_inner>>
>
::poll(X_stream& self)
{
auto r = self->_inner.poll();

if (r.is_not_ready())
{
if (self->_delay.poll())
{
self->_delay.reset(self->_timeout);
throw timing::timeout_exception();
}
}
else
{
self->_delay.reset(self->_timeout);
}

return r;
}

}

+ 24
- 31
test/asyncpp/core/stream_tests.cpp Visa fil

@@ -6,43 +6,36 @@ using namespace ::testing;
using namespace ::asyncpp;

struct delay
: public base_stream<int, delay>
{
int const delay { 5 };
int const threshold { 2 };
int count { 0 };
};

namespace asyncpp
{

template<>
struct stream_trait<delay, void>
: public stream_base<stream<delay, void>>
private:
int const _delay { 5 };
int const _threshold { 2 };
int _count { 0 };

public:
inline delay(int p_delay, int p_threshold, int p_count)
: _delay (p_delay)
, _threshold(p_threshold)
, _count (p_count)
{ }

auto poll()
{
using value_type = int;

template<typename X_stream>
static inline auto poll(X_stream& self)
{
using result_type = stream_result<value_type>;
if (_count >= _delay)
return result_type::done();

if (self->count >= self->delay)
return result_type::done();
++_count;

++self->count;

return self->count <= self->threshold
? result_type::not_ready()
: result_type::ready(self->count);
}
};

}
return _count <= _threshold
? result_type::not_ready()
: result_type::ready(_count);
}
};

TEST(stream_tests, poll)
{
delay d { 5, 2, 0 };
auto s = as_stream(d);
delay s { 5, 2, 0 };

auto r0 = s.poll();
ASSERT_FALSE(r0);
@@ -71,7 +64,7 @@ TEST(stream_tests, for_each)
{
int i = 0;
delay d { 5, 0, 0 };
auto f = as_stream(d)
auto f = d
.for_each([&i](int x) {
++i;
EXPECT_EQ(i, x);


+ 7
- 8
test/asyncpp/executor/current_thread_tests.cpp Visa fil

@@ -143,12 +143,11 @@ TEST(current_thread_tests, interval)
.WillOnce(Return(time_point(std::chrono::seconds(50))));

e.run(
as_stream(
timing::interval(
time_point(),
std::chrono::seconds(10),
time_point() + std::chrono::seconds(50)))
.for_each([&m]{
m.call();
}));
timing::interval(
time_point(),
std::chrono::seconds(10),
time_point() + std::chrono::seconds(50))
.for_each([&m]{
m.call();
}));
}

+ 1
- 3
test/asyncpp/timing/interval_tests.cpp Visa fil

@@ -14,7 +14,7 @@ TEST(interval_tests, poll)
InSequence seq;
StrictMock<now_mock> m;
asyncpp::timing::timer t;
interval i(
interval s(
time_point(std::chrono::seconds(10)),
std::chrono::seconds(5),
time_point(std::chrono::seconds(30)));
@@ -25,8 +25,6 @@ TEST(interval_tests, poll)

t.make_current();

auto s = as_stream(i);

EXPECT_CALL(m, now)
.WillOnce(Return(time_point(std::chrono::seconds(0))));



+ 30
- 27
test/asyncpp/timing/timeout_tests.cpp Visa fil

@@ -9,18 +9,18 @@ using namespace ::testing;
using namespace ::asyncpp;
using namespace ::asyncpp::timing;

struct test
: public base_future<int, test>
struct test_future
: public base_future<int, test_future>
{
public:
using value_type = int;
using this_type = test;
using this_type = test_future;
using base_future_type = base_future<value_type, this_type>;

public:
int i;

inline test(int p_i)
inline test_future(int p_i)
: i(p_i)
{ }

@@ -33,26 +33,29 @@ public:
}
};

namespace asyncpp
struct test_stream
: public base_stream<int, test_stream>
{
public:
using value_type = int;
using this_type = test_stream;
using base_future_type = base_future<value_type, this_type>;

template<>
struct stream_trait<test, void>
: public stream_base<stream<test, void>>
{
using value_type = int;
using result_type = stream_result<value_type>;

template<typename X_future>
static inline result_type poll(X_future& self)
{
return self->i == 0 ? result_type::not_ready() :
self->i < 0 ? result_type::done() :
result_type::ready(self->i);
}
};
public:
int i;

}
inline test_stream(int p_i)
: i(p_i)
{ }

public:
inline result_type poll()
{
return i == 0 ? result_type::not_ready() :
i < 0 ? result_type::done() :
result_type::ready(i);
}
};

TEST(timeout_tests, poll_future_no_timeout)
{
@@ -65,7 +68,7 @@ TEST(timeout_tests, poll_future_no_timeout)
EXPECT_CALL(m, now())
.WillOnce(Return(time_point(std::chrono::seconds(0))));

test t(0);
test_future t(0);
auto f = t
.timeout(std::chrono::seconds(5));

@@ -98,7 +101,7 @@ TEST(timeout_tests, poll_future_timeout)
EXPECT_CALL(m, now())
.WillOnce(Return(time_point(std::chrono::seconds(0))));

auto f = test(0)
auto f = test_future(0)
.timeout(std::chrono::seconds(5));

EXPECT_CALL(m, now())
@@ -130,8 +133,8 @@ TEST(timeout_tests, poll_stream_no_timeout)
EXPECT_CALL(m, now())
.WillOnce(Return(time_point(std::chrono::seconds(0))));

auto t = test { 0 };
auto f = as_stream(t)
auto t = test_stream { 0 };
auto f = t
.timeout(std::chrono::seconds(5));

EXPECT_CALL(m, now())
@@ -180,8 +183,8 @@ TEST(timeout_tests, poll_stream_timeout)
EXPECT_CALL(m, now())
.WillOnce(Return(time_point(std::chrono::seconds(0))));

auto t = test { 0 };
auto f = as_stream(t)
auto t = test_stream { 0 };
auto f = t
.timeout(std::chrono::seconds(5));

EXPECT_CALL(m, now())


Laddar…
Avbryt
Spara