Преглед на файлове

* Refactored stream classes

master
bergmann преди 4 години
родител
ревизия
f06ecdc64e
променени са 32 файла, в които са добавени 934 реда и са изтрити 1074 реда
  1. +0
    -11
      include/asyncpp/core.h
  2. +7
    -86
      include/asyncpp/core/future.h
  3. +0
    -56
      include/asyncpp/core/future/and_then.h
  4. +69
    -38
      include/asyncpp/core/future/and_then.inl
  5. +92
    -0
      include/asyncpp/core/future/future.h
  6. +0
    -2
      include/asyncpp/core/future/future.pre.h
  7. +0
    -67
      include/asyncpp/core/future/lazy.h
  8. +48
    -22
      include/asyncpp/core/future/lazy.inl
  9. +0
    -51
      include/asyncpp/core/future/map.h
  10. +57
    -30
      include/asyncpp/core/future/map.inl
  11. +13
    -5
      include/asyncpp/core/future/timeout.inl
  12. +2
    -136
      include/asyncpp/core/result.h
  13. +139
    -0
      include/asyncpp/core/result/result.h
  14. +0
    -0
      include/asyncpp/core/result/result.inl
  15. +0
    -0
      include/asyncpp/core/result/result.pre.h
  16. +7
    -110
      include/asyncpp/core/stream.h
  17. +0
    -230
      include/asyncpp/core/stream.inl
  18. +0
    -47
      include/asyncpp/core/stream/flatten.h
  19. +110
    -29
      include/asyncpp/core/stream/flatten.inl
  20. +0
    -47
      include/asyncpp/core/stream/for_each.h
  21. +104
    -25
      include/asyncpp/core/stream/for_each.inl
  22. +0
    -48
      include/asyncpp/core/stream/map.h
  23. +103
    -22
      include/asyncpp/core/stream/map.inl
  24. +104
    -0
      include/asyncpp/core/stream/stream.h
  25. +0
    -0
      include/asyncpp/core/stream/stream.pre.h
  26. +69
    -0
      include/asyncpp/core/stream/timeout.inl
  27. +1
    -2
      include/asyncpp/fs/directory.h
  28. +1
    -1
      include/asyncpp/timing/delay.h
  29. +2
    -3
      include/asyncpp/timing/delay.inl
  30. +2
    -2
      include/asyncpp/timing/interval.h
  31. +2
    -2
      include/asyncpp/timing/timeout.h
  32. +2
    -2
      test/asyncpp/timing/timeout_tests.cpp

+ 0
- 11
include/asyncpp/core.h Целия файл

@@ -5,14 +5,3 @@
#include "core/result.h"
#include "core/stream.h"
#include "core/task.h"

#include "core/result.inl"
#include "core/stream.inl"

#include "core/future/map.inl"
#include "core/future/lazy.inl"
#include "core/future/and_then.inl"

#ifdef asyncpp_timing
#include "core/future/timeout.inl"
#endif

+ 7
- 86
include/asyncpp/core/future.h Целия файл

@@ -1,90 +1,11 @@
#pragma once

#include <type_traits>
#include "future/future.h"

#include "misc.h"
#include "result.h"
#include "future.pre.h"
#include "future/map.inl"
#include "future/lazy.inl"
#include "future/and_then.inl"

#include "future/map.h"
#include "future/lazy.h"
#include "future/and_then.h"

namespace asyncpp
{

template<
typename T_value,
typename T_derived>
struct base_future
: public tag_future
{
public:
using value_type = T_value;
using result_type = future_result<value_type>;
using derived_type = T_derived;
using this_type = base_future<value_type, derived_type>;

public:
/**
* @brief Transform the result of this future.
*/
template<
chaining_mode X_mode = copy,
typename X_lambda>
inline auto map(X_lambda&& p_lambda) &;

/**
* @brief Transform the result of this future.
*/
template<
chaining_mode X_mode = move,
typename X_lambda>
inline auto map(X_lambda&& p_lambda) &&;

public:
/**
* @brief Execute the given lambda after the future is finished and
* wait for the future returned by the lambda.
*/
template<
chaining_mode X_mode = copy,
typename X_lambda>
inline auto and_then(X_lambda&& p_lambda) &;

/**
* @brief Execute the given lambda after the future is finished and
* wait for the future returned by the lambda.
*/
template<
chaining_mode X_mode = move,
typename X_lambda>
inline auto and_then(X_lambda&& p_lambda) &&;

#ifdef asyncpp_timing
public:
/**
* @brief Throw an execption if the timeout has passed.
*
* This method is only enabled if timer.h is included before.
*/
template<
chaining_mode X_mode = copy,
typename X_base,
typename X_ratio>
inline auto timeout(const duration<X_base, X_ratio>& p_timeout) &;

/**
* @brief Throw an execption if the timeout has passed.
*
* This method is only enabled if timer.h is included before.
*/
template<
chaining_mode X_mode = move,
typename X_base,
typename X_ratio>
inline auto timeout(const duration<X_base, X_ratio>& p_timeout) &&;
#endif
};

}
#ifdef asyncpp_timing
#include "future/timeout.inl"
#endif

+ 0
- 56
include/asyncpp/core/future/and_then.h Целия файл

@@ -1,56 +0,0 @@
#pragma once

#include <memory>

#include "../future.pre.h"

namespace asyncpp {
namespace __future {

template<
typename T_future,
typename T_lambda>
struct and_then_future final :
public base_future<
decltype(std::declval<T_lambda>()(std::declval<typename std::decay_t<T_future>::value_type>())),
and_then_future<T_future, T_lambda>
>
{
public:
using lambda_type = T_lambda;
using first_future_type = T_future;
using first_value_type = typename std::decay_t<first_future_type>::value_type;
using second_future_type = decltype(std::declval<lambda_type>()(std::declval<first_value_type>()));
using second_future_type_ptr = std::unique_ptr<second_future_type>;
using value_type = typename second_future_type::value_type;
using this_type = and_then_future<value_type, lambda_type>;
using base_future_type = base_future<value_type, this_type>;
using result_type = typename base_future_type::result_type;

private:
lambda_type _lambda;
first_future_type _first;
second_future_type_ptr _second;

public:
/**
* @brief Constructor.
*/
template<
typename X_future,
typename X_lambda>
inline and_then_future(
X_future&& p_outer,
X_lambda&& p_lambda);

inline and_then_future(and_then_future &&) = default;
inline and_then_future(and_then_future const &) = default;

public: /* future */
/**
* @brief Poll the result from the future.
*/
inline result_type poll();
};

} }

+ 69
- 38
include/asyncpp/core/future/and_then.inl Целия файл

@@ -1,50 +1,77 @@
#pragma once

#include "and_then.h"
#include "../future.h"
#include <memory>

#include "future.h"

namespace asyncpp {
namespace __future {

/* and_then_future */

template<
typename T_future,
typename T_lambda>
template<
typename X_future,
typename X_lambda>
and_then_future<T_future, T_lambda>
::and_then_future(
struct and_then_future final :
public base_future<
decltype(std::declval<T_lambda>()(std::declval<typename std::decay_t<T_future>::value_type>())),
and_then_future<T_future, T_lambda>
>
{
public:
using lambda_type = T_lambda;
using first_future_type = T_future;
using first_value_type = typename std::decay_t<first_future_type>::value_type;
using second_future_type = decltype(std::declval<lambda_type>()(std::declval<first_value_type>()));
using second_future_type_ptr = std::unique_ptr<second_future_type>;
using value_type = typename second_future_type::value_type;
using this_type = and_then_future<value_type, lambda_type>;
using base_future_type = base_future<value_type, this_type>;
using result_type = typename base_future_type::result_type;

private:
lambda_type _lambda;
first_future_type _first;
second_future_type_ptr _second;

public:
/**
* @brief Constructor.
*/
template<
typename X_future,
typename X_lambda>
inline and_then_future(
X_future&& p_first,
X_lambda&& p_lambda)
: _first (std::forward<X_future>(p_first))
, _lambda(std::forward<X_lambda>(p_lambda))
{ }

template<
typename T_future,
typename T_lambda>
typename and_then_future<T_future, T_lambda>::result_type
and_then_future<T_future, T_lambda>
::poll()
{
while (true)
: _first (std::forward<X_future>(p_first))
, _lambda(std::forward<X_lambda>(p_lambda))
{ }
inline and_then_future(and_then_future &&) = default;
inline and_then_future(and_then_future const &) = default;
public: /* future */
/**
* @brief Poll the result from the future.
*/
inline result_type poll()
{
if (_second)
{
return _second->poll();
}
else
while (true)
{
auto r = _first.poll();
if (!r)
return result_type::not_ready();

_second = std::make_unique<second_future_type>(r.call(_lambda));
if (_second)
{
return _second->poll();
}
else
{
auto r = _first.poll();
if (!r)
return result_type::not_ready();

_second = std::make_unique<second_future_type>(r.call(_lambda));
}
}
}
}
};

/* and_then_impl */

@@ -65,10 +92,6 @@ namespace __future {
using derived_forward_type = forward_type_t<X_mode, derived_ref_type>;
using and_then_future_type = __future::and_then_future<derived_storage_type, lambda_type>;

static_assert(
X_mode != ref || !std::is_rvalue_reference_v<self_type>,
"Can not store rvalue reference as lvalue reference!");

auto& self = static_cast<derived_ref_type>(p_self);

return and_then_future_type(
@@ -91,7 +114,9 @@ namespace asyncpp
typename X_lambda>
auto base_future<T_value, T_derived>
::and_then(X_lambda&& p_lambda) &
{ return __future::and_then_impl<X_mode>(*this, std::forward<X_lambda>(p_lambda)); }
{
return __future::and_then_impl<X_mode>(*this, std::forward<X_lambda>(p_lambda));
}

template<
typename T_value,
@@ -101,6 +126,12 @@ namespace asyncpp
typename X_lambda>
auto base_future<T_value, T_derived>
::and_then(X_lambda&& p_lambda) &&
{ return __future::and_then_impl<X_mode>(std::move(*this), std::forward<X_lambda>(p_lambda)); }
{
static_assert(
X_mode != ref,
"Can not store rvalue reference as lvalue reference!");

return __future::and_then_impl<X_mode>(std::move(*this), std::forward<X_lambda>(p_lambda));
}

}

+ 92
- 0
include/asyncpp/core/future/future.h Целия файл

@@ -0,0 +1,92 @@
#pragma once

#include <type_traits>

#include "future.pre.h"
#include "../misc.h"
#include "../result.h"

namespace asyncpp
{

template<
typename T_value,
typename T_derived>
struct base_future
: public tag_future
{
public:
using value_type = T_value;
using result_type = future_result<value_type>;
using derived_type = T_derived;
using this_type = base_future<value_type, derived_type>;

public:
/**
* @brief Transform the result of this future.
*/
template<
chaining_mode X_mode = copy,
typename X_lambda>
inline auto map(X_lambda&& p_lambda) &;

/**
* @brief Transform the result of this future.
*/
template<
chaining_mode X_mode = move,
typename X_lambda>
inline auto map(X_lambda&& p_lambda) &&;

public:
/**
* @brief Execute the given lambda after the future is finished and
* wait for the future returned by the lambda.
*/
template<
chaining_mode X_mode = copy,
typename X_lambda>
inline auto and_then(X_lambda&& p_lambda) &;

/**
* @brief Execute the given lambda after the future is finished and
* wait for the future returned by the lambda.
*/
template<
chaining_mode X_mode = move,
typename X_lambda>
inline auto and_then(X_lambda&& p_lambda) &&;

#ifdef asyncpp_timing
public:
/**
* @brief Throw an execption if the timeout has passed.
*
* This method is only enabled if timer.h is included before.
*/
template<
chaining_mode X_mode = copy,
typename X_base,
typename X_ratio>
inline auto timeout(const duration<X_base, X_ratio>& p_timeout) &;

/**
* @brief Throw an execption if the timeout has passed.
*
* This method is only enabled if timer.h is included before.
*/
template<
chaining_mode X_mode = move,
typename X_base,
typename X_ratio>
inline auto timeout(const duration<X_base, X_ratio>& p_timeout) &&;
#endif
};

/**
* @brief Create a lazy evaluated future.
*/
template<typename X_lambda>
inline auto lazy(X_lambda&& p_lambda);

}

include/asyncpp/core/future.pre.h → include/asyncpp/core/future/future.pre.h Целия файл

@@ -1,7 +1,5 @@
#pragma once

#include "misc.h"

namespace asyncpp
{


+ 0
- 67
include/asyncpp/core/future/lazy.h Целия файл

@@ -1,67 +0,0 @@
#pragma once

#include "../future.pre.h"

namespace asyncpp
{

namespace __future
{

template<typename T_lambda>
struct lazy_future final :
public base_future<
decltype(std::declval<T_lambda>()()),
lazy_future<T_lambda>
>
{
public:
using lambda_type = T_lambda;
using value_type = decltype(std::declval<lambda_type>()());
using this_type = lazy_future<lambda_type>;
using base_future_type = base_future<value_type, this_type>;
using result_type = typename base_future_type::result_type;

private:
lambda_type _lambda;

public:
/**
* @brief Constructor.
*/
template<typename X_lambda>
inline lazy_future(
X_lambda&& p_lambda);

inline lazy_future(lazy_future &&) = default;
inline lazy_future(lazy_future const &) = default;

public: /* future */
/**
* @brief Poll the result from the future.
*/
template<typename X = value_type>
inline auto poll()
-> std::enable_if_t<
std::is_void_v<X>,
result_type>;

/**
* @brief Poll the result from the future.
*/
template<typename X = value_type>
inline auto poll()
-> std::enable_if_t<
!std::is_void_v<X>,
result_type>;
};

}

/**
* @brief Create a lazy evaluated future.
*/
template<typename X_lambda>
inline auto lazy(X_lambda&& p_lambda);

}

+ 48
- 22
include/asyncpp/core/future/lazy.inl Целия файл

@@ -1,45 +1,71 @@
#pragma once

#include "lazy.h"
#include "future.h"

namespace asyncpp
{
namespace asyncpp {
namespace __future {

namespace __future
template<typename T_lambda>
struct lazy_future final :
public base_future<
decltype(std::declval<T_lambda>()()),
lazy_future<T_lambda>
>
{
public:
using lambda_type = T_lambda;
using value_type = decltype(std::declval<lambda_type>()());
using this_type = lazy_future<lambda_type>;
using base_future_type = base_future<value_type, this_type>;
using result_type = typename base_future_type::result_type;

/* lazy_future */
private:
lambda_type _lambda;

template<typename T_lambda>
public:
/**
* @brief Constructor.
*/
template<typename X_lambda>
lazy_future<T_lambda>::lazy_future(
inline lazy_future(
X_lambda&& p_lambda)
: _lambda(std::forward<X_lambda>(p_lambda))
{ }

template<typename T_lambda>
template<typename X>
inline auto lazy_future<T_lambda>
::poll()
-> std::enable_if_t<
std::is_void_v<X>,
result_type>
inline lazy_future(lazy_future &&) = default;
inline lazy_future(lazy_future const &) = default;

public: /* future */
/**
* @brief Poll the result from the future.
*/
template<typename X = value_type>
inline auto poll()
-> std::enable_if_t<
std::is_void_v<X>,
result_type>
{
_lambda();
return result_type::ready();
}

template<typename T_lambda>
template<typename X>
inline auto lazy_future<T_lambda>
::poll()
-> std::enable_if_t<
!std::is_void_v<X>,
result_type>
/**
* @brief Poll the result from the future.
*/
template<typename X = value_type>
inline auto poll()
-> std::enable_if_t<
!std::is_void_v<X>,
result_type>
{
return result_type::ready(_lambda());
}
}
};

} }

namespace asyncpp
{

template<typename X_lambda>
inline auto lazy(X_lambda&& p_lambda)


+ 0
- 51
include/asyncpp/core/future/map.h Целия файл

@@ -1,51 +0,0 @@
#pragma once

#include "../future.pre.h"

namespace asyncpp {
namespace __future {

template<
typename T_future,
typename T_lambda>
struct map_future final :
public base_future<
typename std::decay_t<T_future>::value_type,
map_future<T_future, T_lambda>
>
{
public:
using future_type = T_future;
using lambda_type = T_lambda;
using future_value_type = typename std::decay_t<future_type>::value_type;
using value_type = decltype(std::declval<lambda_type>()(std::declval<future_value_type>()));
using this_type = map_future<future_type, lambda_type>;
using base_future_type = base_future<value_type, this_type>;
using result_type = typename base_future_type::result_type;

private:
future_type _future;
lambda_type _lambda;

public:
/**
* @brief Constructor.
*/
template<
typename X_future,
typename X_lambda>
inline map_future(
X_future&& p_future,
X_lambda&& p_lambda);

inline map_future(map_future &&) = default;
inline map_future(map_future const &) = default;

public: /* future */
/**
* @brief Poll the result from the future.
*/
inline result_type poll();
};

} }

+ 57
- 30
include/asyncpp/core/future/map.inl Целия файл

@@ -1,38 +1,61 @@
#pragma once

#include "map.h"
#include "future.h"

namespace asyncpp {
namespace __future {

/* map_future */

template<
typename T_future,
typename T_lambda>
template<
typename X_future,
typename X_lambda>
map_future<T_future, T_lambda>
::map_future(
struct map_future final :
public base_future<
typename std::decay_t<T_future>::value_type,
map_future<T_future, T_lambda>
>
{
public:
using future_type = T_future;
using lambda_type = T_lambda;
using future_value_type = typename std::decay_t<future_type>::value_type;
using value_type = decltype(std::declval<lambda_type>()(std::declval<future_value_type>()));
using this_type = map_future<future_type, lambda_type>;
using base_future_type = base_future<value_type, this_type>;
using result_type = typename base_future_type::result_type;

private:
future_type _future;
lambda_type _lambda;

public:
/**
* @brief Constructor.
*/
template<
typename X_future,
typename X_lambda>
inline map_future(
X_future&& p_future,
X_lambda&& p_lambda)
: _future(std::forward<X_future>(p_future))
, _lambda(std::forward<X_lambda>(p_lambda))
{ }

template<
typename T_future,
typename T_lambda>
typename map_future<T_future, T_lambda>::result_type
map_future<T_future, T_lambda>
::poll()
{
auto r = _future.poll();
return r
? result_type::ready(r.call(_lambda))
: result_type::not_ready();
}
: _future(std::forward<X_future>(p_future))
, _lambda(std::forward<X_lambda>(p_lambda))
{ }

inline map_future(map_future &&) = default;
inline map_future(map_future const &) = default;

public: /* future */
/**
* @brief Poll the result from the future.
*/
inline result_type poll()
{
auto r = _future.poll();
return r
? result_type::ready(r.call(_lambda))
: result_type::not_ready();
}
};

/* map_impl */

@@ -53,10 +76,6 @@ namespace __future {
using derived_forward_type = forward_type_t<X_mode, derived_ref_type>;
using map_future_type = __future::map_future<derived_storage_type, lambda_type>;

static_assert(
X_mode != ref || !std::is_rvalue_reference_v<self_type>,
"Can not store rvalue reference as lvalue reference!");

auto& self = static_cast<derived_ref_type>(p_self);

return map_future_type(
@@ -79,7 +98,9 @@ namespace asyncpp
typename X_lambda>
auto base_future<T_value, T_derived>
::map(X_lambda&& p_lambda) &
{ return __future::map_impl<X_mode>(*this, std::forward<X_lambda>(p_lambda)); }
{
return __future::map_impl<X_mode>(*this, std::forward<X_lambda>(p_lambda));
}

template<
typename T_value,
@@ -89,6 +110,12 @@ namespace asyncpp
typename X_lambda>
auto base_future<T_value, T_derived>
::map(X_lambda&& p_lambda) &&
{ return __future::map_impl<X_mode>(std::move(*this), std::forward<X_lambda>(p_lambda)); }
{
static_assert(
X_mode != ref,
"Can not store rvalue reference as lvalue reference!");

return __future::map_impl<X_mode>(std::move(*this), std::forward<X_lambda>(p_lambda));
}

}

+ 13
- 5
include/asyncpp/core/future/timeout.inl Целия файл

@@ -1,6 +1,6 @@
#pragma once

#include "../future.h"
#include "future.h"

namespace asyncpp {
namespace __future {
@@ -10,7 +10,7 @@ namespace __future {
typename X_self,
typename X_base,
typename X_ratio>
auto helper_timeout(
auto timeout_impl(
X_self&& p_self,
const duration<X_base, X_ratio>& p_timeout)
{
@@ -22,7 +22,7 @@ namespace __future {
derived_type const &,
derived_type &>;
using derived_forward_type = forward_type_t<X_mode, derived_ref_type>;
using timeout_type = timing::timeout<derived_storage_type>;
using timeout_type = asyncpp::timing::timeout<derived_storage_type>;

auto& self = static_cast<derived_ref_type>(p_self);

@@ -47,7 +47,9 @@ namespace asyncpp
typename X_ratio>
auto base_future<T_value, T_derived>
::timeout(const duration<X_base, X_ratio>& p_timeout) &
{ return __future::helper_timeout<X_mode>(*this, p_timeout); }
{
return __future::timeout_impl<X_mode>(*this, p_timeout);
}

template<
typename T_value,
@@ -58,6 +60,12 @@ namespace asyncpp
typename X_ratio>
auto base_future<T_value, T_derived>
::timeout(const duration<X_base, X_ratio>& p_timeout) &&
{ return __future::helper_timeout<X_mode>(std::move(*this), p_timeout); }
{
static_assert(
X_mode != ref,
"Can not store rvalue reference as lvalue reference!");

return __future::timeout_impl<X_mode>(std::move(*this), p_timeout);
}

}

+ 2
- 136
include/asyncpp/core/result.h Целия файл

@@ -1,139 +1,5 @@
#pragma once

#include <variant>
#include "result/result.h"

#include "result.pre.h"

namespace asyncpp
{

namespace __impl
{

struct result_not_ready
{ };

struct result_done
{ };

template<typename T_value>
struct result_ready
{
using value_type = T_value;

value_type value;

template<typename... T_args>
inline result_ready(T_args&&... p_args);
};

template<bool for_stream, typename T_value>
struct result
{
public:
using value_type = T_value;
using not_ready_type = result_not_ready;
using ready_type = result_ready<value_type>;
using done_type = result_done;
using storage_type = std::conditional_t<
for_stream,
std::variant<not_ready_type, ready_type, done_type>,
std::variant<not_ready_type, ready_type>>;
using clean_value_type = std::remove_reference_t<value_type>;
using pointer_type = clean_value_type*;
using reference_type = clean_value_type&;
using const_pointer_type = clean_value_type const *;
using const_reference_type = clean_value_type const &;

private:
storage_type _storage; //!< Stores the actual result.

private:
/**
* @brief Constructor.
*/
inline result(storage_type&& p_storage);

public:
/**
* @brief returns a result that is not ready.
*/
static inline auto& not_ready();

/**
* @brief returns a result that is not ready.
*/
template<typename... X_args>
static inline auto ready(X_args&&... p_args);

/**
* @brief returns a result that is not ready.
*/
template<
bool X = for_stream,
typename = std::enable_if_t<X>>
static inline auto& done();

public:
/**
* @brief Get the status of the result.
*/
template<
bool X = for_stream,
typename = std::enable_if_t<X>>
inline result_status status() const;

/**
* @brief Check if the result is not ready (is pending).
*/
inline bool is_not_ready() const;

/**
* @brief Check if the result is ready (has a value).
*/
inline bool is_ready() const;

/**
* @brief Check if the result is done (stream is finished).
*/
template<
bool X = for_stream,
typename = std::enable_if_t<X>>
inline bool is_done() const;

/**
* @brief Get the value of the result.
*/
inline reference_type value();

/**
* @brief Get the value of the result.
*/
inline const_reference_type value() const;

/**
* @brief Call the passed closure with the value stored in the result.
*/
template<typename X_lambda>
inline auto call(const X_lambda& p_lambda);

/**
* @brief Call the passed closure with the value stored in the result.
*/
template<typename X_lambda>
inline auto call(const X_lambda& p_lambda) const;

public:
inline operator bool() const;
inline pointer_type operator-> ();
inline reference_type operator* ();
inline const_pointer_type operator-> () const;
inline const_reference_type operator* () const;
};

template<bool for_stream>
struct result<for_stream, void>;

}

}
#include "result/result.inl"

+ 139
- 0
include/asyncpp/core/result/result.h Целия файл

@@ -0,0 +1,139 @@
#pragma once

#include <variant>

#include "result.pre.h"

namespace asyncpp
{

namespace __impl
{

struct result_not_ready
{ };

struct result_done
{ };

template<typename T_value>
struct result_ready
{
using value_type = T_value;

value_type value;

template<typename... T_args>
inline result_ready(T_args&&... p_args);
};

template<bool for_stream, typename T_value>
struct result
{
public:
using value_type = T_value;
using not_ready_type = result_not_ready;
using ready_type = result_ready<value_type>;
using done_type = result_done;
using storage_type = std::conditional_t<
for_stream,
std::variant<not_ready_type, ready_type, done_type>,
std::variant<not_ready_type, ready_type>>;
using clean_value_type = std::remove_reference_t<value_type>;
using pointer_type = clean_value_type*;
using reference_type = clean_value_type&;
using const_pointer_type = clean_value_type const *;
using const_reference_type = clean_value_type const &;

private:
storage_type _storage; //!< Stores the actual result.

private:
/**
* @brief Constructor.
*/
inline result(storage_type&& p_storage);

public:
/**
* @brief returns a result that is not ready.
*/
static inline auto& not_ready();

/**
* @brief returns a result that is not ready.
*/
template<typename... X_args>
static inline auto ready(X_args&&... p_args);

/**
* @brief returns a result that is not ready.
*/
template<
bool X = for_stream,
typename = std::enable_if_t<X>>
static inline auto& done();

public:
/**
* @brief Get the status of the result.
*/
template<
bool X = for_stream,
typename = std::enable_if_t<X>>
inline result_status status() const;

/**
* @brief Check if the result is not ready (is pending).
*/
inline bool is_not_ready() const;

/**
* @brief Check if the result is ready (has a value).
*/
inline bool is_ready() const;

/**
* @brief Check if the result is done (stream is finished).
*/
template<
bool X = for_stream,
typename = std::enable_if_t<X>>
inline bool is_done() const;

/**
* @brief Get the value of the result.
*/
inline reference_type value();

/**
* @brief Get the value of the result.
*/
inline const_reference_type value() const;

/**
* @brief Call the passed closure with the value stored in the result.
*/
template<typename X_lambda>
inline auto call(const X_lambda& p_lambda);

/**
* @brief Call the passed closure with the value stored in the result.
*/
template<typename X_lambda>
inline auto call(const X_lambda& p_lambda) const;

public:
inline operator bool() const;
inline pointer_type operator-> ();
inline reference_type operator* ();
inline const_pointer_type operator-> () const;
inline const_reference_type operator* () const;
};

template<bool for_stream>
struct result<for_stream, void>;

}

}

include/asyncpp/core/result.inl → include/asyncpp/core/result/result.inl Целия файл


include/asyncpp/core/result.pre.h → include/asyncpp/core/result/result.pre.h Целия файл


+ 7
- 110
include/asyncpp/core/stream.h Целия файл

@@ -1,114 +1,11 @@
#pragma once

#include <type_traits>
#include "stream/stream.h"

#include "result.h"
#include "stream.pre.h"
#include "stream/map.inl"
#include "stream/flatten.inl"
#include "stream/for_each.inl"

namespace asyncpp
{

template<
typename T_value,
typename T_derived>
struct base_stream
: public tag_stream
{
public:
using value_type = T_value;
using result_type = stream_result<value_type>;
using derived_type = T_derived;
using this_type = base_stream<value_type, derived_type>;

public:
/**
* @brief Execute the given lambda for each element in the stream.
*
* @return Returns a future that completes once the stream is finished.
*/
template<typename X_lambda>
inline auto for_each(X_lambda&& p_lambda) const &;

/**
* @brief Execute the given lambda for each element in the stream.
*
* @return Returns a future that completes once the stream is finished.
*/
template<typename X_lambda>
inline auto for_each(X_lambda&& p_lambda) &;

/**
* @brief Execute the given lambda for each element in the stream.
*
* @return Returns a future that completes once the stream is finished.
*/
template<typename X_lambda>
inline auto for_each(X_lambda&& p_lambda) &&;

public:
/**
* @brief Transforms the stream from one type to another type
* by executing the passed lambda for each value.
*/
template<typename X_lambda>
inline auto map(X_lambda&& p_lambda) const &;

/**
* @brief Transforms the stream from one type to another type
* by executing the passed lambda for each value.
*/
template<typename X_lambda>
inline auto map(X_lambda&& p_lambda) &;

/**
* @brief Transforms the stream from one type to another type
* by executing the passed lambda for each value.
*/
template<typename X_lambda>
inline auto map(X_lambda&& p_lambda) &&;

public:
/**
* @brief Flatten the stream of streams to a single stream.
*/
inline auto flatten() const &;

/**
* @brief Flatten the stream of streams to a single stream.
*/
inline auto flatten() &;

/**
* @brief Flatten the stream of streams to a single stream.
*/
inline auto flatten() &&;

#ifdef asyncpp_timing
public:
/**
* @brief Throw an execption if the timeout has passed.
*
* This method is only enabled if timer.h is included before.
*/
template<typename X_base, typename X_ratio>
inline auto timeout(const duration<X_base, X_ratio>& p_timeout) const &;

/**
* @brief Throw an execption if the timeout has passed.
*
* This method is only enabled if timer.h is included before.
*/
template<typename X_base, typename X_ratio>
inline auto timeout(const duration<X_base, X_ratio>& p_timeout) &;

/**
* @brief Throw an execption if the timeout has passed.
*
* This method is only enabled if timer.h is included before.
*/
template<typename X_base, typename X_ratio>
inline auto timeout(const duration<X_base, X_ratio>& p_timeout) &&;
#endif
};

}
#ifdef asyncpp_timing
#include "stream/timeout.inl"
#endif

+ 0
- 230
include/asyncpp/core/stream.inl Целия файл

@@ -1,230 +0,0 @@
#pragma once

#include "stream.h"

#include "stream/map.inl"
#include "stream/flatten.inl"
#include "stream/for_each.inl"

#ifdef asyncpp_timing
#include <asyncpp/timing/timeout.inl>
#endif

namespace asyncpp
{

/* base_stream::for_each */

template<
typename T_value,
typename T_derived>
template<
typename X_lambda>
auto base_stream<T_value, T_derived>
::for_each(X_lambda&& p_lambda) const &
{
using lambda_type = X_lambda;
using for_each_type = __stream::for_each_impl<derived_type, lambda_type>;

auto& self = static_cast<derived_type const &>(*this);

return for_each_type(
std::forward<derived_type const>(self),
std::forward<X_lambda>(p_lambda));
}

template<
typename T_value,
typename T_derived>
template<
typename X_lambda>
auto base_stream<T_value, T_derived>
::for_each(X_lambda&& p_lambda) &
{
using lambda_type = X_lambda;
using for_each_type = __stream::for_each_impl<derived_type&, lambda_type>;

auto& self = static_cast<derived_type &>(*this);

return for_each_type(
std::forward<derived_type &>(self),
std::forward<X_lambda>(p_lambda));
}

template<
typename T_value,
typename T_derived>
template<
typename X_lambda>
auto base_stream<T_value, T_derived>
::for_each(X_lambda&& p_lambda) &&
{
using lambda_type = X_lambda;
using for_each_type = __stream::for_each_impl<derived_type, lambda_type>;

auto& self = static_cast<derived_type &>(*this);

return for_each_type(
std::forward<derived_type &&>(self),
std::forward<X_lambda>(p_lambda));
}

/* base_stream::map */

template<
typename T_value,
typename T_derived>
template<
typename X_lambda>
auto base_stream<T_value, T_derived>
::map(X_lambda&& p_lambda) const &
{
using lambda_type = X_lambda;
using map_type = __stream::map_future<derived_type, lambda_type>;

auto& self = static_cast<derived_type const &>(*this);

return map_type(
std::forward<derived_type const &>(self),
std::forward<X_lambda>(p_lambda));
}

template<
typename T_value,
typename T_derived>
template<
typename X_lambda>
auto base_stream<T_value, T_derived>
::map(X_lambda&& p_lambda) &
{
using lambda_type = X_lambda;
using map_type = __stream::map_future<derived_type, lambda_type>;

auto& self = static_cast<derived_type &>(*this);

return map_type(
std::forward<derived_type &>(self),
std::forward<X_lambda>(p_lambda));
}

template<
typename T_value,
typename T_derived>
template<
typename X_lambda>
auto base_stream<T_value, T_derived>
::map(X_lambda&& p_lambda) &&
{
using lambda_type = X_lambda;
using map_type = __stream::map_future<derived_type, lambda_type>;

auto& self = static_cast<derived_type &>(*this);

return map_type(
std::forward<derived_type &&>(self),
std::forward<X_lambda>(p_lambda));
}

/* base_stream::flatten */

template<
typename T_value,
typename T_derived>
auto base_stream<T_value, T_derived>
::flatten() const &
{
using flatten_type = __stream::flatten_impl<derived_type>;

auto& self = static_cast<derived_type const &>(*this);

return flatten_type(
std::forward<derived_type const &>(self));
}

template<
typename T_value,
typename T_derived>
auto base_stream<T_value, T_derived>
::flatten() &
{
using flatten_type = __stream::flatten_impl<derived_type>;

auto& self = static_cast<derived_type &>(*this);

return flatten_type(
std::forward<derived_type &>(self));
}

template<
typename T_value,
typename T_derived>
auto base_stream<T_value, T_derived>
::flatten() &&
{
using flatten_type = __stream::flatten_impl<derived_type>;

auto& self = static_cast<derived_type &>(*this);

return flatten_type(
std::forward<derived_type &&>(self));
}

/* base_stream::timeout */

#ifdef asyncpp_timing
template<
typename T_value,
typename T_derived>
template<
typename X_base,
typename X_ratio>
auto base_stream<T_value, T_derived>
::timeout(const duration<X_base, X_ratio>& p_timeout) const &
{
using timeout_type = timing::timeout<derived_type>;

auto& self = static_cast<derived_type const &>(*this);

return timeout_type(
std::forward<derived_type const>(self),
p_timeout);
}

template<
typename T_value,
typename T_derived>
template<
typename X_base,
typename X_ratio>
auto base_stream<T_value, T_derived>
::timeout(const duration<X_base, X_ratio>& p_timeout) &
{
using timeout_type = timing::timeout<derived_type&>;

auto& self = static_cast<derived_type &>(*this);

return timeout_type(
std::forward<derived_type &>(self),
p_timeout);
}

template<
typename T_value,
typename T_derived>
template<
typename X_base,
typename X_ratio>
auto base_stream<T_value, T_derived>
::timeout(const duration<X_base, X_ratio>& p_timeout) &&
{
using timeout_type = timing::timeout<derived_type>;

auto& self = static_cast<derived_type &>(*this);

return timeout_type(
std::forward<derived_type &&>(self),
p_timeout);
}
#endif

}

+ 0
- 47
include/asyncpp/core/stream/flatten.h Целия файл

@@ -1,47 +0,0 @@
#pragma once

#include <memory>

#include <asyncpp/core/stream.pre.h>

namespace asyncpp {
namespace __stream {

template<
typename T_stream>
struct flatten_impl final :
public base_stream<
typename T_stream::value_type::value_type,
flatten_impl<T_stream>
>
{
public:
using stream_type = T_stream;
using inner_stream_type = typename stream_type::value_type;
using inner_stream_ptr_u = std::unique_ptr<inner_stream_type>;
using value_type = typename inner_stream_type::value_type;
using this_type = flatten_impl<stream_type>;
using base_stream_type = base_stream<value_type, this_type>;
using result_type = typename base_stream_type::result_type;

private:
stream_type _stream;
inner_stream_ptr_u _inner;

public:
/**
* @brief Constructor.
*/
template<
typename X_stream>
inline flatten_impl(
X_stream&& p_stream);

public: /* future */
/**
* @brief Poll the result from the future.
*/
inline result_type poll();
};

} }

+ 110
- 29
include/asyncpp/core/stream/flatten.inl Целия файл

@@ -1,49 +1,130 @@
#pragma once

#include "flatten.h"
#include <memory>

#include "stream.h"

namespace asyncpp {
namespace __stream {

/* flatten_impl */

template<
typename T_stream>
template<
typename X_stream>
flatten_impl<T_stream>::flatten_impl(
X_stream&& p_stream)
: _stream (std::forward<X_stream>(p_stream))
, _inner ()
{ }

template<
typename T_stream>
typename flatten_impl<T_stream>::result_type
flatten_impl<T_stream>
::poll()
struct flatten_stream final :
public base_stream<
typename T_stream::value_type::value_type,
flatten_stream<T_stream>
>
{
while (true)
public:
using stream_type = T_stream;
using inner_stream_type = typename stream_type::value_type;
using inner_stream_ptr_u = std::unique_ptr<inner_stream_type>;
using value_type = typename inner_stream_type::value_type;
using this_type = flatten_stream<stream_type>;
using base_stream_type = base_stream<value_type, this_type>;
using result_type = typename base_stream_type::result_type;

private:
stream_type _stream;
inner_stream_ptr_u _inner;

public:
/**
* @brief Constructor.
*/
template<
typename X_stream>
inline flatten_stream(
X_stream&& p_stream)
: _stream (std::forward<X_stream>(p_stream))
, _inner ()
{ }

inline flatten_stream(flatten_stream &&) = default;
inline flatten_stream(flatten_stream const &) = default;

public: /* future */
/**
* @brief Poll the result from the future.
*/
inline result_type poll()
{
if (_inner)
while (true)
{
auto r = _inner->poll();
if (_inner)
{
auto r = _inner->poll();

if (!r.is_done())
return r;
if (!r.is_done())
return r;

_inner.reset();
}
_inner.reset();
}

auto r = _stream.poll();
if (r.is_done())
return result_type::done();
auto r = _stream.poll();
if (r.is_done())
return result_type::done();

if (r.is_not_ready())
return result_type::not_ready();
if (r.is_not_ready())
return result_type::not_ready();

_inner = std::make_unique<inner_stream_type>(std::move(*r));
_inner = std::make_unique<inner_stream_type>(std::move(*r));
}
}
};

/* flatten_impl */

template<
chaining_mode X_mode,
typename X_self>
auto map_impl(X_self&& p_self)
{
using self_type = X_self;
using derived_type = typename std::decay_t<self_type>::derived_type;
using derived_storage_type = storage_type_t<X_mode, derived_type>;
using derived_ref_type = std::conditional_t<
std::is_const_v<std::remove_reference_t<self_type>>,
derived_type const &,
derived_type &>;
using derived_forward_type = forward_type_t<X_mode, derived_ref_type>;
using flatten_stream_type = __stream::flatten_stream<derived_storage_type>;

auto& self = static_cast<derived_ref_type>(p_self);

return flatten_stream_type(
std::forward<derived_forward_type>(self));
}

} }

namespace asyncpp
{

template<
typename T_value,
typename T_derived>
template<
chaining_mode X_mode>
auto base_stream<T_value, T_derived>
::flatten() &
{
return __stream::map_impl<X_mode>(*this);
}

template<
typename T_value,
typename T_derived>
template<
chaining_mode X_mode>
auto base_stream<T_value, T_derived>
::flatten() &&
{
static_assert(
X_mode != ref,
"Can not store rvalue reference as lvalue reference!");

return __stream::map_impl<X_mode>(std::move(*this));
}

}

+ 0
- 47
include/asyncpp/core/stream/for_each.h Целия файл

@@ -1,47 +0,0 @@
#pragma once

#include <asyncpp/core/future.pre.h>

namespace asyncpp {
namespace __stream {

template<
typename T_stream,
typename T_lambda>
struct for_each_impl final :
public base_future<
void,
for_each_impl<T_stream, T_lambda>
>
{
public:
using stream_type = T_stream;
using lambda_type = T_lambda;
using value_type = void;
using this_type = for_each_impl<stream_type, lambda_type>;
using base_future_type = base_future<value_type, this_type>;
using result_type = typename base_future_type::result_type;

private:
stream_type _stream;
lambda_type _lambda;

public:
/**
* @brief Constructor.
*/
template<
typename X_stream,
typename X_lambda>
inline for_each_impl(
X_stream&& p_stream,
X_lambda&& p_lambda);

public: /* future */
/**
* @brief Poll the result from the future.
*/
inline result_type poll();
};

} }

+ 104
- 25
include/asyncpp/core/stream/for_each.inl Целия файл

@@ -1,44 +1,123 @@
#pragma once

#include "for_each.h"
#include "stream.h"

namespace asyncpp {
namespace __stream {

/* for_each_impl */

template<
typename T_stream,
typename T_lambda>
template<
typename X_stream,
typename X_lambda>
for_each_impl<T_stream, T_lambda>
::for_each_impl(
struct for_each_future final :
public base_future<
void,
for_each_future<T_stream, T_lambda>
>
{
public:
using stream_type = T_stream;
using lambda_type = T_lambda;
using value_type = void;
using this_type = for_each_future<stream_type, lambda_type>;
using base_future_type = base_future<value_type, this_type>;
using result_type = typename base_future_type::result_type;

private:
stream_type _stream;
lambda_type _lambda;

public:
/**
* @brief Constructor.
*/
template<
typename X_stream,
typename X_lambda>
inline for_each_future(
X_stream&& p_stream,
X_lambda&& p_lambda)
: _stream(std::forward<X_stream>(p_stream))
, _lambda(std::forward<X_lambda>(p_lambda))
{ }
: _stream(std::forward<X_stream>(p_stream))
, _lambda(std::forward<X_lambda>(p_lambda))
{ }

template<
typename T_stream,
typename T_lambda>
typename for_each_impl<T_stream, T_lambda>::result_type
for_each_impl<T_stream, T_lambda>
::poll()
{
while (true)
inline for_each_future(for_each_future &&) = default;
inline for_each_future(for_each_future const &) = default;
public: /* future */
/**
* @brief Poll the result from the future.
*/
inline result_type poll()
{
auto r = _stream.poll();
if (r.is_done())
return result_type::ready();
while (true)
{
auto r = _stream.poll();
if (r.is_done())
return result_type::ready();

if (r.is_not_ready())
return result_type::not_ready();
if (r.is_not_ready())
return result_type::not_ready();

r.call(_lambda);
r.call(_lambda);
}
}
};

template<
chaining_mode X_mode,
typename X_self,
typename X_lambda>
auto for_each_impl(X_self&& p_self, X_lambda&& p_lambda)
{
using self_type = X_self;
using lambda_type = X_lambda;
using derived_type = typename std::decay_t<self_type>::derived_type;
using derived_storage_type = storage_type_t<X_mode, derived_type>;
using derived_ref_type = std::conditional_t<
std::is_const_v<std::remove_reference_t<self_type>>,
derived_type const &,
derived_type &>;
using derived_forward_type = forward_type_t<X_mode, derived_ref_type>;
using for_each_future_type = __stream::for_each_future<derived_storage_type, lambda_type>;

auto& self = static_cast<derived_ref_type>(p_self);

return for_each_future_type(
std::forward<derived_forward_type>(self),
std::forward<X_lambda>(p_lambda));
}

} }

namespace asyncpp
{

template<
typename T_value,
typename T_derived>
template<
chaining_mode X_mode,
typename X_lambda>
auto base_stream<T_value, T_derived>
::for_each(X_lambda&& p_lambda) &
{
return __stream::for_each_impl<X_mode>(*this, std::forward<X_lambda>(p_lambda));
}

template<
typename T_value,
typename T_derived>
template<
chaining_mode X_mode,
typename X_lambda>
auto base_stream<T_value, T_derived>
::for_each(X_lambda&& p_lambda) &&
{
static_assert(
X_mode != ref,
"Can not store rvalue reference as lvalue reference!");

return __stream::for_each_impl<X_mode>(std::move(*this), std::forward<X_lambda>(p_lambda));
}

}

+ 0
- 48
include/asyncpp/core/stream/map.h Целия файл

@@ -1,48 +0,0 @@
#pragma once

#include <asyncpp/core/stream.pre.h>

namespace asyncpp {
namespace __stream {

template<
typename T_stream,
typename T_lambda>
struct map_future final :
public base_stream<
decltype(std::declval<T_lambda>()(std::declval<typename T_stream::value_type>())),
map_future<T_stream, T_lambda>
>
{
public:
using stream_type = T_stream;
using lambda_type = T_lambda;
using inner_value_type = typename stream_type::value_type;
using value_type = decltype(std::declval<lambda_type>()(std::declval<inner_value_type>()));
using this_type = map_future<stream_type, lambda_type>;
using base_stream_type = base_stream<value_type, this_type>;
using result_type = typename base_stream_type::result_type;

private:
stream_type _stream;
lambda_type _lambda;

public:
/**
* @brief Constructor.
*/
template<
typename X_stream,
typename X_lambda>
inline map_future(
X_stream&& p_stream,
X_lambda&& p_lambda);

public: /* future */
/**
* @brief Poll the result from the future.
*/
inline result_type poll();
};

} }

+ 103
- 22
include/asyncpp/core/stream/map.inl Целия файл

@@ -1,40 +1,121 @@
#pragma once

#include "map.h"
#include "stream.h"

namespace asyncpp {
namespace __stream {

/* map_future */

template<
typename T_stream,
typename T_lambda>
struct map_stream final :
public base_stream<
decltype(std::declval<T_lambda>()(std::declval<typename T_stream::value_type>())),
map_stream<T_stream, T_lambda>
>
{
public:
using stream_type = T_stream;
using lambda_type = T_lambda;
using inner_value_type = typename stream_type::value_type;
using value_type = decltype(std::declval<lambda_type>()(std::declval<inner_value_type>()));
using this_type = map_stream<stream_type, lambda_type>;
using base_stream_type = base_stream<value_type, this_type>;
using result_type = typename base_stream_type::result_type;

private:
stream_type _stream;
lambda_type _lambda;

public:
/**
* @brief Constructor.
*/
template<
typename X_stream,
typename X_lambda>
inline map_stream(
X_stream&& p_stream,
X_lambda&& p_lambda)
: _stream(std::forward<X_stream>(p_stream))
, _lambda(std::forward<X_lambda>(p_lambda))
{ }

inline map_stream(map_stream &&) = default;
inline map_stream(map_stream const &) = default;

public: /* future */
/**
* @brief Poll the result from the future.
*/
inline result_type poll()
{
auto r = _stream.poll();
if (r.is_done())
return result_type::done();

if (r.is_ready())
return result_type::ready(r.call(_lambda));

return result_type::not_ready();
}
};

template<
typename X_stream,
chaining_mode X_mode,
typename X_self,
typename X_lambda>
map_future<T_stream, T_lambda>::map_future(
X_stream&& p_stream,
X_lambda&& p_lambda)
: _stream(std::forward<X_stream>(p_stream))
, _lambda(std::forward<X_lambda>(p_lambda))
{ }
auto map_impl(X_self&& p_self, X_lambda&& p_lambda)
{
using self_type = X_self;
using lambda_type = X_lambda;
using derived_type = typename std::decay_t<self_type>::derived_type;
using derived_storage_type = storage_type_t<X_mode, derived_type>;
using derived_ref_type = std::conditional_t<
std::is_const_v<std::remove_reference_t<self_type>>,
derived_type const &,
derived_type &>;
using derived_forward_type = forward_type_t<X_mode, derived_ref_type>;
using map_stream_type = __stream::map_stream<derived_storage_type, lambda_type>;

auto& self = static_cast<derived_ref_type>(p_self);

return map_stream_type(
std::forward<derived_forward_type>(self),
std::forward<X_lambda>(p_lambda));
}

} }

namespace asyncpp
{

template<
typename T_stream,
typename T_lambda>
typename map_future<T_stream, T_lambda>::result_type
map_future<T_stream, T_lambda>
::poll()
typename T_value,
typename T_derived>
template<
chaining_mode X_mode,
typename X_lambda>
auto base_stream<T_value, T_derived>
::map(X_lambda&& p_lambda) &
{
auto r = _stream.poll();
if (r.is_done())
return result_type::done();
return __stream::map_impl<X_mode>(*this, std::forward<X_lambda>(p_lambda));
}

if (r.is_ready())
return result_type::ready(r.call(_lambda));
template<
typename T_value,
typename T_derived>
template<
chaining_mode X_mode,
typename X_lambda>
auto base_stream<T_value, T_derived>
::map(X_lambda&& p_lambda) &&
{
static_assert(
X_mode != ref,
"Can not store rvalue reference as lvalue reference!");

return result_type::not_ready();
return __stream::map_impl<X_mode>(std::move(*this), std::forward<X_lambda>(p_lambda));
}

} }
}

+ 104
- 0
include/asyncpp/core/stream/stream.h Целия файл

@@ -0,0 +1,104 @@
#pragma once

#include <type_traits>

#include "stream.pre.h"
#include "../result.h"

namespace asyncpp
{

template<
typename T_value,
typename T_derived>
struct base_stream
: public tag_stream
{
public:
using value_type = T_value;
using result_type = stream_result<value_type>;
using derived_type = T_derived;
using this_type = base_stream<value_type, derived_type>;

public:
/**
* @brief Execute the given lambda for each element in the stream.
*
* @return Returns a future that completes once the stream is finished.
*/
template<
chaining_mode X_mode = copy,
typename X_lambda>
inline auto for_each(X_lambda&& p_lambda) &;

/**
* @brief Execute the given lambda for each element in the stream.
*
* @return Returns a future that completes once the stream is finished.
*/
template<
chaining_mode X_mode = move,
typename X_lambda>
inline auto for_each(X_lambda&& p_lambda) &&;

public:
/**
* @brief Transforms the stream from one type to another type
* by executing the passed lambda for each value.
*/
template<
chaining_mode X_mode = copy,
typename X_lambda>
inline auto map(X_lambda&& p_lambda) &;

/**
* @brief Transforms the stream from one type to another type
* by executing the passed lambda for each value.
*/
template<
chaining_mode X_mode = move,
typename X_lambda>
inline auto map(X_lambda&& p_lambda) &&;

public:
/**
* @brief Flatten the stream of streams to a single stream.
*/
template<
chaining_mode X_mode = copy>
inline auto flatten() &;

/**
* @brief Flatten the stream of streams to a single stream.
*/
template<
chaining_mode X_mode = move>
inline auto flatten() &&;

#ifdef asyncpp_timing
public:
/**
* @brief Throw an execption if the timeout has passed.
*
* This method is only enabled if timer.h is included before.
*/
template<
chaining_mode X_mode = copy,
typename X_base,
typename X_ratio>
inline auto timeout(const duration<X_base, X_ratio>& p_timeout) &;

/**
* @brief Throw an execption if the timeout has passed.
*
* This method is only enabled if timer.h is included before.
*/
template<
chaining_mode X_mode = move,
typename X_base,
typename X_ratio>
inline auto timeout(const duration<X_base, X_ratio>& p_timeout) &&;
#endif
};

}

include/asyncpp/core/stream.pre.h → include/asyncpp/core/stream/stream.pre.h Целия файл


+ 69
- 0
include/asyncpp/core/stream/timeout.inl Целия файл

@@ -0,0 +1,69 @@
#pragma once

#include "stream.h"

namespace asyncpp {
namespace __stream {

template<
chaining_mode X_mode,
typename X_self,
typename X_base,
typename X_ratio>
auto timeout_impl(
X_self&& p_self,
const duration<X_base, X_ratio>& p_timeout)
{
using self_type = X_self;
using derived_type = typename std::decay_t<self_type>::derived_type;
using derived_storage_type = storage_type_t<X_mode, derived_type>;
using derived_ref_type = std::conditional_t<
std::is_const_v<std::remove_reference_t<self_type>>,
derived_type const &,
derived_type &>;
using derived_forward_type = forward_type_t<X_mode, derived_ref_type>;
using timeout_type = timing::timeout<derived_storage_type>;

auto& self = static_cast<derived_ref_type>(p_self);

return timeout_type(
std::forward<derived_forward_type>(self),
p_timeout);
}

} }

namespace asyncpp
{

template<
typename T_value,
typename T_derived>
template<
chaining_mode X_mode,
typename X_base,
typename X_ratio>
auto base_stream<T_value, T_derived>
::timeout(const duration<X_base, X_ratio>& p_timeout) &
{
return __stream::timeout_impl<X_mode>(std::move(*this), p_timeout);
}

template<
typename T_value,
typename T_derived>
template<
chaining_mode X_mode,
typename X_base,
typename X_ratio>
auto base_stream<T_value, T_derived>
::timeout(const duration<X_base, X_ratio>& p_timeout) &&
{
static_assert(
X_mode != ref,
"Can not store rvalue reference as lvalue reference!");

return __stream::timeout_impl<X_mode>(std::move(*this), p_timeout);
}

}

+ 1
- 2
include/asyncpp/fs/directory.h Целия файл

@@ -4,8 +4,7 @@

#include <cppfs/path.h>

#include <asyncpp/core/stream.pre.h>
#include <asyncpp/core/future/lazy.h>
#include <asyncpp/core/stream/stream.pre.h>

namespace asyncpp {
namespace fs {


+ 1
- 1
include/asyncpp/timing/delay.h Целия файл

@@ -1,7 +1,7 @@
#pragma once

#include <asyncpp/core/misc.h>
#include <asyncpp/core/future.h>
#include <asyncpp/core/future/future.h>

#include "timer.h"
#include "delay.pre.h"


+ 2
- 3
include/asyncpp/timing/delay.inl Целия файл

@@ -1,8 +1,7 @@
#pragma once

#include <asyncpp/core/future.pre.h>
#include <asyncpp/core/result.pre.h>
#include <asyncpp/core/result.inl>
#include <asyncpp/core/result.h>
#include <asyncpp/core/future/future.pre.h>

#include "delay.h"



+ 2
- 2
include/asyncpp/timing/interval.h Целия файл

@@ -1,8 +1,8 @@
#pragma once

#include <asyncpp/core/misc.h>
#include <asyncpp/core/future.h>
#include <asyncpp/core/stream.h>
#include <asyncpp/core/future/future.h>
#include <asyncpp/core/stream/stream.h>

#include "delay.h"



+ 2
- 2
include/asyncpp/timing/timeout.h Целия файл

@@ -3,8 +3,8 @@
#include <cppcore/misc/exception.h>

#include <asyncpp/core/misc.h>
#include <asyncpp/core/future.h>
#include <asyncpp/core/stream.h>
#include <asyncpp/core/future/future.h>
#include <asyncpp/core/stream/stream.h>

#include "delay.h"



+ 2
- 2
test/asyncpp/timing/timeout_tests.cpp Целия файл

@@ -141,7 +141,7 @@ TEST(timeout_tests, poll_stream_no_timeout)

auto t = test_stream { 0 };
auto f = t
.timeout(std::chrono::seconds(5));
.timeout<ref>(std::chrono::seconds(5));

EXPECT_CALL(m, now())
.WillOnce(Return(time_point(std::chrono::seconds(0))));
@@ -191,7 +191,7 @@ TEST(timeout_tests, poll_stream_timeout)

auto t = test_stream { 0 };
auto f = t
.timeout(std::chrono::seconds(5));
.timeout<ref>(std::chrono::seconds(5));

EXPECT_CALL(m, now())
.WillOnce(Return(time_point(std::chrono::seconds(0))));


Зареждане…
Отказ
Запис