Bladeren bron

* Implemented 'interval' stream

* Implemented 'for_each' stream method
* More refactoring and unit tests
master
bergmann 4 jaren geleden
bovenliggende
commit
16427a5c89
45 gewijzigde bestanden met toevoegingen van 956 en 376 verwijderingen
  1. +0
    -1
      include/asyncpp/core.h
  2. +11
    -0
      include/asyncpp/core/future.h
  3. +28
    -0
      include/asyncpp/core/future.inl
  4. +10
    -6
      include/asyncpp/core/future.pre.h
  5. +31
    -7
      include/asyncpp/core/future/and_then.h
  6. +34
    -41
      include/asyncpp/core/future/and_then.inl
  7. +23
    -0
      include/asyncpp/core/future/map.h
  8. +21
    -29
      include/asyncpp/core/future/map.inl
  9. +8
    -0
      include/asyncpp/core/misc.h
  10. +24
    -0
      include/asyncpp/core/misc.inl
  11. +20
    -0
      include/asyncpp/core/stream.h
  12. +55
    -0
      include/asyncpp/core/stream.inl
  13. +9
    -2
      include/asyncpp/core/stream.pre.h
  14. +56
    -0
      include/asyncpp/core/stream/for_each.h
  15. +52
    -0
      include/asyncpp/core/stream/for_each.inl
  16. +6
    -85
      include/asyncpp/core/task.h
  17. +23
    -0
      include/asyncpp/core/task/current_task_lock.h
  18. +16
    -0
      include/asyncpp/core/task/current_task_lock.inl
  19. +8
    -0
      include/asyncpp/core/task/current_task_lock.pre.h
  20. +76
    -0
      include/asyncpp/core/task/task.h
  21. +37
    -0
      include/asyncpp/core/task/task.inl
  22. +13
    -0
      include/asyncpp/core/task/task.pre.h
  23. +32
    -0
      include/asyncpp/core/task/task_tpl.h
  24. +2
    -22
      include/asyncpp/core/task/task_tpl.inl
  25. +3
    -21
      include/asyncpp/executor/current_thread.inl
  26. +1
    -7
      include/asyncpp/executor/executor.h
  27. +1
    -4
      include/asyncpp/executor/executor.inl
  28. +2
    -0
      include/asyncpp/timer.h
  29. +25
    -7
      include/asyncpp/timer/delay.h
  30. +20
    -26
      include/asyncpp/timer/delay.inl
  31. +11
    -18
      include/asyncpp/timer/impl/registration.h
  32. +7
    -10
      include/asyncpp/timer/impl/registration.inl
  33. +0
    -9
      include/asyncpp/timer/impl/registration.pre.h
  34. +12
    -10
      include/asyncpp/timer/impl/timer_base.h
  35. +24
    -28
      include/asyncpp/timer/impl/timer_base.inl
  36. +67
    -0
      include/asyncpp/timer/interval.h
  37. +60
    -0
      include/asyncpp/timer/interval.inl
  38. +4
    -4
      include/asyncpp/timer/timer.h
  39. +22
    -23
      include/asyncpp/timer/timer.inl
  40. +3
    -3
      test/asyncpp/core/future_tests.cpp
  41. +21
    -7
      test/asyncpp/core/stream_tests.cpp
  42. +3
    -3
      test/asyncpp/core/task_tests.cpp
  43. +2
    -2
      test/asyncpp/executor/current_thread_tests.cpp
  44. +72
    -0
      test/asyncpp/timer/interval_tests.cpp
  45. +1
    -1
      test/helper/runtime_mock.h

+ 0
- 1
include/asyncpp/core.h Bestand weergeven

@@ -8,4 +8,3 @@
#include "core/future.inl"
#include "core/result.inl"
#include "core/stream.inl"
#include "core/task.inl"

+ 11
- 0
include/asyncpp/core/future.h Bestand weergeven

@@ -4,6 +4,7 @@

#include "result.h"
#include "future.pre.h"

#include "future/map.h"
#include "future/and_then.h"

@@ -20,6 +21,10 @@ namespace asyncpp
using trait_type = future_trait<clean_object_type>;
using value_type = typename trait_type::value_type;
using result_type = future_result<value_type>;
using reference = clean_object_type&;
using pointer = clean_object_type*;
using const_reference = const clean_object_type&;
using const_pointer = const clean_object_type*;

object_type ref;

@@ -46,6 +51,12 @@ namespace asyncpp
*/
template<typename X_lambda>
inline auto and_then(X_lambda&& p_lambda);

public:
inline pointer operator->();
inline reference operator*();
inline const_pointer operator->() const;
inline const_reference operator*() const;
};

}

+ 28
- 0
include/asyncpp/core/future.inl Bestand weergeven

@@ -76,6 +76,34 @@ namespace asyncpp
::and_then(X_lambda&& p_lambda)
{ return trait_type::and_then(std::move(*this), std::forward<X_lambda>(p_lambda)); }

template<
typename T_value,
typename T_impl>
typename future<T_value, T_impl>::pointer
future<T_value, T_impl>::operator->()
{ return &ref; }

template<
typename T_value,
typename T_impl>
typename future<T_value, T_impl>::reference
future<T_value, T_impl>::operator*()
{ return ref; }

template<
typename T_value,
typename T_impl>
typename future<T_value, T_impl>::const_pointer
future<T_value, T_impl>::operator->() const
{ return &ref; }

template<
typename T_value,
typename T_impl>
typename future<T_value, T_impl>::const_reference
future<T_value, T_impl>::operator*() const
{ return ref; }

/* misc */

template<typename X_value>


+ 10
- 6
include/asyncpp/core/future.pre.h Bestand weergeven

@@ -6,14 +6,18 @@ namespace asyncpp
template<typename T_impl>
struct future_base
{
template<typename T_future>
static inline auto poll(T_future& self) = delete;
public:
using impl_type = T_impl;

template<typename T_future, typename T_lambda>
static inline auto map(T_future&& self, T_lambda&& p_lambda);
public:
template<typename X_future>
static inline auto poll(X_future& self) = delete;

template<typename T_future, typename T_lambda>
static inline auto and_then(T_future&& self, T_lambda&& p_lambda);
template<typename X_future, typename X_lambda>
static inline auto map(X_future&& self, X_lambda&& p_lambda);

template<typename X_future, typename X_lambda>
static inline auto and_then(X_future&& self, X_lambda&& p_lambda);
};

template<typename T, typename = void>


+ 31
- 7
include/asyncpp/core/future/and_then.h Bestand weergeven

@@ -2,6 +2,8 @@

#include <memory>

#include <asyncpp/core/future.pre.h>

namespace asyncpp {
namespace __future {

@@ -11,16 +13,16 @@ namespace __future {
struct and_then_impl
{
public:
using lambda_type = T_lambda;
using outer_future_type = T_future;
using outer_value_type = typename outer_future_type::value_type;
using inner_future_type = decltype(as_future(std::declval<lambda_type>()(std::declval<outer_value_type>())));
using inner_future_type_ptr = std::unique_ptr<inner_future_type>;
using lambda_type = T_lambda;
using first_future_type = T_future;
using first_value_type = typename first_future_type::value_type;
using second_future_type = decltype(as_future(std::declval<lambda_type>()(std::declval<first_value_type>())));
using second_future_type_ptr = std::unique_ptr<second_future_type>;

public:
lambda_type lambda;
outer_future_type outer;
inner_future_type_ptr inner;
first_future_type first;
second_future_type_ptr second;

public:
/**
@@ -35,3 +37,25 @@ namespace __future {
};

} }

namespace asyncpp
{

/* future_trait for and_then_impl */

template<
typename T_future,
typename T_lambda>
struct future_trait<__future::and_then_impl<T_future, T_lambda>, void>
: public future_base<future<__future::and_then_impl<T_future, T_lambda>, void>>
{
using and_then_type = __future::and_then_impl<T_future, T_lambda>;
using second_future_type = typename and_then_type::second_future_type;
using value_type = typename second_future_type::value_type;
using result_type = typename second_future_type::result_type;

template<typename X_future>
static inline auto poll(X_future& self);
};

}

+ 34
- 41
include/asyncpp/core/future/and_then.inl Bestand weergeven

@@ -2,45 +2,6 @@

#include "map.h"

namespace asyncpp
{

/* future_trait for ant_then_impl */

template<
typename T_future,
typename T_lambda>
struct future_trait<__future::and_then_impl<T_future, T_lambda>, void>
: public future_base<future<__future::and_then_impl<T_future, T_lambda>, void>>
{
using and_then_type = __future::and_then_impl<T_future, T_lambda>;
using inner_future_type = typename and_then_type::inner_future_type;
using value_type = typename inner_future_type::value_type;
using result_type = typename inner_future_type::result_type;

template<typename X_future>
static inline auto poll(X_future& self)
{
while (true)
{
if (self.ref.inner)
{
return self.ref.inner->poll();
}
else
{
auto r = self.ref.outer.poll();
if (!r)
return result_type::not_ready();

self.ref.inner = std::make_unique<inner_future_type>(as_future(self.ref.lambda(*r)));
}
}
}
};

}

namespace asyncpp {
namespace __future {

@@ -54,10 +15,42 @@ namespace __future {
typename X_lambda>
and_then_impl<T_future, T_lambda>
::and_then_impl(
X_future&& p_outer,
X_future&& p_first,
X_lambda&& p_lambda)
: outer (std::forward<X_future>(p_outer))
: first (std::forward<X_future>(p_first))
, lambda(std::forward<X_lambda>(p_lambda))
{ }

} }

namespace asyncpp
{

/* future_trait for ant_then_impl */

template<
typename T_future,
typename T_lambda>
template<
typename X_future>
auto future_trait<__future::and_then_impl<T_future, T_lambda>, void>
::poll(X_future& self)
{
while (true)
{
if (self->second)
{
return self->second->poll();
}
else
{
auto r = self->first.poll();
if (!r)
return result_type::not_ready();

self->second = std::make_unique<second_future_type>(as_future(self->lambda(*r)));
}
}
}

}

+ 23
- 0
include/asyncpp/core/future/map.h Bestand weergeven

@@ -29,3 +29,26 @@ namespace __future {
};

} }

namespace asyncpp
{

/* future_trait for map_impl */

template<
typename T_future,
typename T_lambda>
struct future_trait<__future::map_impl<T_future, T_lambda>, void>
: public future_base<future<__future::map_impl<T_future, T_lambda>, void>>
{
using future_type = T_future;
using inner_value_type = typename future_type::value_type;
using lambda_type = T_lambda;
using value_type = decltype(std::declval<lambda_type>()(std::declval<inner_value_type>()));
using result_type = future_result<value_type>;

template<typename X_future>
static inline auto poll(X_future& self);
};

}

+ 21
- 29
include/asyncpp/core/future/map.inl Bestand weergeven

@@ -2,35 +2,6 @@

#include "map.h"

namespace asyncpp
{

/* future_trait for map_impl */

template<
typename T_future,
typename T_lambda>
struct future_trait<__future::map_impl<T_future, T_lambda>, void>
: public future_base<future<__future::map_impl<T_future, T_lambda>, void>>
{
using future_type = T_future;
using inner_value_type = typename future_type::value_type;
using lambda_type = T_lambda;
using value_type = decltype(std::declval<lambda_type>()(std::declval<inner_value_type>()));
using result_type = future_result<value_type>;

template<typename X_future>
static inline auto poll(X_future& self)
{
auto r = self.ref.future.poll();
return r
? result_type::ready(self.ref.lambda(*r))
: result_type::not_ready();
}
};

}

namespace asyncpp {
namespace __future {

@@ -51,3 +22,24 @@ namespace __future {
{ }

} }

namespace asyncpp
{

/* future_trait for map_impl */

template<
typename T_future,
typename T_lambda>
template<
typename X_future>
auto future_trait<__future::map_impl<T_future, T_lambda>, void>
::poll(X_future& self)
{
auto r = self->future.poll();
return r
? result_type::ready(self->lambda(*r))
: result_type::not_ready();
}

}

+ 8
- 0
include/asyncpp/core/misc.h Bestand weergeven

@@ -17,4 +17,12 @@ namespace asyncpp
*/
inline time_point now();

/**
* @brief Returns the lowest of the two passed deadlines (if not null)
* or nullptr if no deadline was passed.
*/
inline const time_point * merge_deadlines(
const time_point * p_deadline_0,
const time_point * p_deadline_1);

}

+ 24
- 0
include/asyncpp/core/misc.inl Bestand weergeven

@@ -10,4 +10,28 @@ namespace timer {
{ return clock::now(); }
#endif

const time_point * merge_deadlines(
const time_point * p_deadline_0,
const time_point * p_deadline_1)
{
if (p_deadline_0 && p_deadline_1)
{
return *p_deadline_0 < *p_deadline_1
? p_deadline_0
: p_deadline_1;
}
else if (p_deadline_0)
{
return p_deadline_0;
}
else if (p_deadline_1)
{
return p_deadline_1;
}
else
{
return nullptr;
}
}

} }

+ 20
- 0
include/asyncpp/core/stream.h Bestand weergeven

@@ -5,6 +5,8 @@
#include "result.h"
#include "stream.pre.h"

#include "stream/for_each.h"

namespace asyncpp
{

@@ -18,6 +20,10 @@ namespace asyncpp
using trait_type = stream_trait<clean_object_type>;
using value_type = typename trait_type::value_type;
using result_type = stream_result<value_type>;
using reference = clean_object_type&;
using pointer = clean_object_type*;
using const_reference = const clean_object_type&;
using const_pointer = const clean_object_type*;

object_type ref;

@@ -31,6 +37,20 @@ namespace asyncpp
* @brief Function that will be called repeatedly to check if the stream has values.
*/
inline result_type poll();

/**
* @brief Execute the given lambda for each element in the stream.
*
* @return Returns a future that completes once the stream is finished.
*/
template<typename X_lambda>
inline auto for_each(X_lambda&& p_lambda);

public:
inline pointer operator->();
inline reference operator*();
inline const_pointer operator->() const;
inline const_reference operator*() const;
};

}

+ 55
- 0
include/asyncpp/core/stream.inl Bestand weergeven

@@ -2,9 +2,27 @@

#include "stream.h"

#include "stream/for_each.inl"

namespace asyncpp
{

/* stream_base */

template<typename T_impl>
template<typename X_stream, typename X_lambda>
auto stream_base<T_impl>
::for_each(X_stream&& self, X_lambda&& p_lambda)
{
using stream_type = X_stream;
using lambda_type = X_lambda;
using for_each_type = __stream::for_each_impl<stream_type, lambda_type>;

return as_future(for_each_type(
std::forward<X_stream>(self),
std::forward<X_lambda>(p_lambda)));
}

/* stream */

template<
@@ -25,6 +43,43 @@ namespace asyncpp
::poll()
{ return trait_type::poll(*this); }

template<
typename T_value,
typename T_impl>
template<
typename X_lambda>
auto stream<T_value, T_impl>
::for_each(X_lambda&& p_lambda)
{ return trait_type::for_each(std::move(*this), std::forward<X_lambda>(p_lambda)); }

template<
typename T_value,
typename T_impl>
typename stream<T_value, T_impl>::pointer
stream<T_value, T_impl>::operator->()
{ return &ref; }

template<
typename T_value,
typename T_impl>
typename stream<T_value, T_impl>::reference
stream<T_value, T_impl>::operator*()
{ return ref; }

template<
typename T_value,
typename T_impl>
typename stream<T_value, T_impl>::const_pointer
stream<T_value, T_impl>::operator->() const
{ return &ref; }

template<
typename T_value,
typename T_impl>
typename stream<T_value, T_impl>::const_reference
stream<T_value, T_impl>::operator*() const
{ return ref; }

/* misc */

template<typename X_value>


+ 9
- 2
include/asyncpp/core/stream.pre.h Bestand weergeven

@@ -6,8 +6,15 @@ namespace asyncpp
template<typename T_impl>
struct stream_base
{
template<typename T_stream>
static inline auto poll(T_stream& self) = delete;
public:
using impl_type = T_impl;

public:
template<typename X_stream>
static inline auto poll(X_stream& self) = delete;

template<typename X_stream, typename X_lambda>
static inline auto for_each(X_stream&& self, X_lambda&& p_lambda);
};

template<typename T, typename = void>


+ 56
- 0
include/asyncpp/core/stream/for_each.h Bestand weergeven

@@ -0,0 +1,56 @@
#pragma once

#include <asyncpp/core/future.pre.h>

namespace asyncpp {
namespace __stream {

template<
typename T_stream,
typename T_lambda>
struct for_each_impl
{
public:
using stream_type = T_stream;
using lambda_type = T_lambda;

public:
stream_type stream;
lambda_type lambda;

public:
/**
* @brief Constructor.
*/
template<
typename X_stream,
typename X_lambda>
inline for_each_impl(
X_stream&& p_stream,
X_lambda&& p_lambda);
};

} }

namespace asyncpp
{

/* future_trait for map_impl */

template<
typename T_stream,
typename T_lambda>
struct future_trait<__stream::for_each_impl<T_stream, T_lambda>, void>
: public future_base<future<__stream::for_each_impl<T_stream, T_lambda>, void>>
{
using stream_type = T_stream;
using inner_value_type = typename stream_type::value_type;
using lambda_type = T_lambda;
using value_type = decltype(std::declval<lambda_type>()(std::declval<inner_value_type>()));
using result_type = future_result<value_type>;

template<typename X_future>
static inline auto poll(X_future& self);
};

}

+ 52
- 0
include/asyncpp/core/stream/for_each.inl Bestand weergeven

@@ -0,0 +1,52 @@
#pragma once

#include "for_each.h"

namespace asyncpp {
namespace __stream {

/* for_each_impl */

template<
typename T_stream,
typename T_lambda>
template<
typename X_stream,
typename X_lambda>
for_each_impl<T_stream, T_lambda>
::for_each_impl(
X_stream&& p_stream,
X_lambda&& p_lambda)
: stream(std::forward<X_stream>(p_stream))
, lambda(std::forward<X_lambda>(p_lambda))
{ }

} }

namespace asyncpp
{

/* future_trait for for_each_impl */

template<
typename T_stream,
typename T_lambda>
template<
typename X_stream>
auto future_trait<__stream::for_each_impl<T_stream, T_lambda>, void>
::poll(X_stream& self)
{
while (true)
{
auto r = self->stream.poll();
if (r.is_done())
return result_type::ready();

if (r.is_not_ready())
return result_type::not_ready();

self->lambda(*r);
}
}

}

+ 6
- 85
include/asyncpp/core/task.h Bestand weergeven

@@ -1,88 +1,9 @@
#pragma once

#include <vector>
#include <memory>
#include <functional>
#include "task/task.h"
#include "task/task_tpl.h"
#include "task/current_task_lock.h"

namespace asyncpp
{

struct task
{
public:
using notify_handler = std::function<void (task&)>;
using notify_handler_vector = std::vector<notify_handler>;

private:
bool _notified;
notify_handler_vector _notify_handlers;

public:
/**
* @brief Destructor.
*/
virtual ~task() = default;

/**
* @brief Check if the task is notified.
*/
inline bool notified() const;

/**
* @brief Notify the task that a resource that the task is interested in, is ready to be polled.
*/
inline void notify();

/**
* @brief Poll the future stored in the task.
*
* @return TRUE if the task is finished, FALSE otherwise.
*/
inline bool poll();

/**
* @brief Add a callback to execute when the task is notified.
*/
inline void add_notify_handler(notify_handler p_handler);

private:
/**
* @brief Actual implementation of the poll function.
*/
virtual bool poll_impl() = 0;
};

template<typename T_future>
struct task_tpl
: public task
{
public:
using future_type = T_future;

private:
future_type _future;

public:
/**
* @brief Constructor.
*/
template<typename X_future>
inline task_tpl(X_future&& p_future);

private:
/**
* @brief Actual implementation of the poll function.
*/
inline bool poll_impl() override;
};

using task_ptr_w = std::weak_ptr<task>;
using task_ptr_s = std::shared_ptr<task>;

/**
* @brief Create a task from the passed future.
*/
template<typename X_future>
inline task_ptr_s make_task(X_future&& p_future);

}
#include "task/task.inl"
#include "task/task_tpl.inl"
#include "task/current_task_lock.inl"

+ 23
- 0
include/asyncpp/core/task/current_task_lock.h Bestand weergeven

@@ -0,0 +1,23 @@
#pragma once

#include "task.h"
#include "current_task_lock.pre.h"

namespace asyncpp
{

struct current_task_lock
{
public:
/**
* @brief Constructor.
*/
current_task_lock(const task_ptr_s& p_task);

/**
* @brief Destructor.
*/
~current_task_lock();
};

}

+ 16
- 0
include/asyncpp/core/task/current_task_lock.inl Bestand weergeven

@@ -0,0 +1,16 @@
#pragma once

#include "current_task_lock.h"

namespace asyncpp
{

/* current_task_lock */

current_task_lock::current_task_lock(const task_ptr_s& p_task)
{ task::local_storage().current = p_task; }

current_task_lock::~current_task_lock()
{ task::local_storage().current.reset(); }

}

+ 8
- 0
include/asyncpp/core/task/current_task_lock.pre.h Bestand weergeven

@@ -0,0 +1,8 @@
#pragma once

namespace asyncpp
{

struct current_task_lock;

}

+ 76
- 0
include/asyncpp/core/task/task.h Bestand weergeven

@@ -0,0 +1,76 @@
#pragma once

#include <vector>
#include <functional>

#include "task.pre.h"
#include "current_task_lock.pre.h"

namespace asyncpp
{

struct task
{
public:
friend current_task_lock;

using notify_handler = std::function<void (task&)>;
using notify_handler_vector = std::vector<notify_handler>;

private:
bool _notified;
notify_handler_vector _notify_handlers;

public:
/**
* @brief Destructor.
*/
virtual ~task() = default;

/**
* @brief Check if the task is notified.
*/
inline bool notified() const;

/**
* @brief Notify the task that a resource that the task is interested in, is ready to be polled.
*/
inline void notify();

/**
* @brief Poll the future stored in the task.
*
* @return TRUE if the task is finished, FALSE otherwise.
*/
inline bool poll();

/**
* @brief Add a callback to execute when the task is notified.
*/
inline void add_notify_handler(notify_handler p_handler);

public:
/**
* @brief Get reference of the current task.
*/
static inline const task_ptr_w& current();

private:
/**
* @brief Actual implementation of the poll function.
*/
virtual bool poll_impl() = 0;

private:
struct storage
{
task_ptr_w current;
};

/**
* @brief Get the thread local storage.
*/
static storage& local_storage();
};

}

+ 37
- 0
include/asyncpp/core/task/task.inl Bestand weergeven

@@ -0,0 +1,37 @@
#pragma once

namespace asyncpp
{

/* task */

bool task::poll()
{
_notified = false;
return poll_impl();
}

bool task::notified() const
{ return _notified; }

void task::notify()
{
_notified = true;

for (auto& h : _notify_handlers)
h(*this);
}

void task::add_notify_handler(notify_handler p_handler)
{ _notify_handlers.emplace_back(p_handler); }

const task_ptr_w& task::current()
{ return local_storage().current; }

task::storage& task::local_storage()
{
thread_local storage value { };
return value;
}

}

+ 13
- 0
include/asyncpp/core/task/task.pre.h Bestand weergeven

@@ -0,0 +1,13 @@
#pragma once

#include <memory>

namespace asyncpp
{

struct task;

using task_ptr_w = std::weak_ptr<task>;
using task_ptr_s = std::shared_ptr<task>;

}

+ 32
- 0
include/asyncpp/core/task/task_tpl.h Bestand weergeven

@@ -0,0 +1,32 @@
#pragma once

#include "task.h"

namespace asyncpp
{

template<typename T_future>
struct task_tpl
: public task
{
public:
using future_type = T_future;

private:
future_type _future;

public:
/**
* @brief Constructor.
*/
template<typename X_future>
inline task_tpl(X_future&& p_future);

private:
/**
* @brief Actual implementation of the poll function.
*/
inline bool poll_impl() override;
};

}

include/asyncpp/core/task.inl → include/asyncpp/core/task/task_tpl.inl Bestand weergeven

@@ -1,30 +1,10 @@
#pragma once

#include "task_tpl.h"

namespace asyncpp
{

/* task */

bool task::poll()
{
_notified = false;
return poll_impl();
}

bool task::notified() const
{ return _notified; }

void task::notify()
{
_notified = true;

for (auto& h : _notify_handlers)
h(*this);
}

void task::add_notify_handler(notify_handler p_handler)
{ _notify_handlers.emplace_back(p_handler); }

/* task_tpl */

template<typename T_future>

+ 3
- 21
include/asyncpp/executor/current_thread.inl Bestand weergeven

@@ -8,24 +8,6 @@ namespace executor {
namespace __impl
{

struct current_task_lock
{
private:
executor::storage& _storage;

public:
current_task_lock(
executor::storage& p_storage,
const task_ptr_s& p_task)
: _storage (p_storage)
{
_storage.current_task = p_task;
}

~current_task_lock()
{ _storage.current_task.reset(); }
};

struct current_executor_lock
{
private:
@@ -37,11 +19,11 @@ namespace executor {
executor& p_executor)
: _storage (p_storage)
{
_storage.current_executor = &p_executor;
_storage.current = &p_executor;
}

~current_executor_lock()
{ _storage.current_executor = nullptr; }
{ _storage.current = nullptr; }
};

}
@@ -100,7 +82,7 @@ namespace executor {
void current_thread<T_runtime>
::poll_task(task_ptr_s p_task)
{
__impl::current_task_lock l(executor::local_storage(), p_task);
current_task_lock l(p_task);

// TODO execption handling



+ 1
- 7
include/asyncpp/executor/executor.h Bestand weergeven

@@ -10,16 +10,10 @@ namespace executor {
public:
struct storage
{
task_ptr_w current_task;
executor * current_executor;
executor * current;
};

public:
/**
* @brief Get reference of the current task.
*/
static inline const task_ptr_w& current_task();

/**
* @brief Spawn a new task.
*/


+ 1
- 4
include/asyncpp/executor/executor.inl Bestand weergeven

@@ -7,13 +7,10 @@ namespace executor {

/* executor */

const task_ptr_w& executor::current_task()
{ return local_storage().current_task; }

template<typename X_future>
void executor::spawn(X_future&& p_future)
{
auto exec = local_storage().current_executor;
auto exec = local_storage().current;
if (!exec)
throw std::runtime_error("Thread local executor instance is not assigned!");
exec->spawn_impl(make_task(std::forward<X_future>(p_future)));


+ 2
- 0
include/asyncpp/timer.h Bestand weergeven

@@ -1,7 +1,9 @@
#pragma once

#include "timer/delay.h"
#include "timer/interval.h"
#include "timer/timer.h"

#include "timer/delay.inl"
#include "timer/interval.inl"
#include "timer/timer.inl"

+ 25
- 7
include/asyncpp/timer/delay.h Bestand weergeven

@@ -14,14 +14,14 @@ namespace timer {
friend struct future_trait<delay, void>;

private:
time_point _timeout;
__impl::registration _registration;
time_point _deadline;
__impl::registration_ptr_w _registration;

public:
/**
* @brief Constructor. Create a delay at the given timeout.
* @brief Constructor. Create a delay at the given deadline.
*/
inline delay(const time_point& p_timeout);
inline delay(const time_point& p_deadline);

/**
* @brief Constructor. Create a delay with the given duration.
@@ -35,14 +35,14 @@ namespace timer {
inline ~delay();

/**
* @brief Get the current timeout of the delay.
* @brief Get the current deadline of the delay.
*/
inline time_point timeout() const;
inline time_point deadline() const;

/**
* @brief Reset the delay to a new time point.
*/
inline void reset(const time_point& p_timeout);
inline void reset(const time_point& p_deadline);

/**
* @brief Reset the delay to the given duration.
@@ -52,3 +52,21 @@ namespace timer {
};

} }

namespace asyncpp
{

/* future_impl for timer::delay */

template<>
struct future_trait<timer::delay, void>
: public future_base<future<timer::delay, void>>
{
using value_type = void;
using result_type = future_result<value_type>;

template<typename X_future>
static inline auto poll(X_future& self);
};

}

+ 20
- 26
include/asyncpp/timer/delay.inl Bestand weergeven

@@ -12,34 +12,34 @@ namespace timer {

/* delay */

delay::delay(const time_point& p_timeout)
: _timeout (p_timeout)
, _registration (*this)
delay::delay(const time_point& p_deadline)
: _deadline (p_deadline)
, _registration ()
{ }

template<typename T_base, typename T_ratio>
delay::delay(const duration<T_base, T_ratio>& p_duration)
: _timeout (clock::now() + p_duration)
, _registration (*this)
: _deadline (clock::now() + p_duration)
, _registration ()
{ }

inline delay::~delay()
{ __impl::timer_base::unregister_resource(_registration); }

time_point delay::timeout() const
{ return _timeout; }
time_point delay::deadline() const
{ return _deadline; }

void delay::reset(const time_point& p_timeout)
void delay::reset(const time_point& p_deadline)
{
__impl::timer_base::unregister_resource(_registration);
_timeout = p_timeout;
_deadline = p_deadline;
}

template<typename T_base, typename T_ratio>
void delay::reset(const duration<T_base, T_ratio>& p_duration)
{
__impl::timer_base::unregister_resource(_registration);
_timeout = now() + p_duration;
_deadline = now() + p_duration;
}

} }
@@ -49,25 +49,19 @@ namespace asyncpp

/* future_impl for timer::delay */

template<>
struct future_trait<timer::delay, void>
: public future_base<future<timer::delay, void>>
template<typename X_future>
auto future_trait<timer::delay, void>
::poll(X_future& self)
{
using value_type = void;
using result_type = future_result<value_type>;
auto now = timer::now();

template<typename X_future>
static inline auto poll(X_future& self)
{
auto now = timer::now();
if (self->_deadline <= now)
return result_type::ready();

if (self.ref._timeout <= now)
return result_type::ready();
if (self->_registration.expired())
self->_registration = timer::__impl::timer_base::register_resource(self->_deadline);

timer::__impl::timer_base::register_resource(self.ref._registration);

return result_type::not_ready();
}
};
return result_type::not_ready();
}

}

+ 11
- 18
include/asyncpp/timer/impl/registration.h Bestand weergeven

@@ -3,9 +3,9 @@
#include <memory>

#include <asyncpp/core/misc.h>
#include <asyncpp/core/task.h>

#include "timer_base.pre.h"
#include "../delay.pre.h"

namespace asyncpp {
namespace timer {
@@ -14,27 +14,20 @@ namespace __impl {
struct registration
{
public:
struct inner
{
timer_base& owner;
time_point timeout;

/**
* @brief Constructor.
*/
inline inner(timer_base& p_owner, time_point p_timeout);
};

using inner_ptr_w = std::weak_ptr<inner>;

public:
delay& owner;
inner_ptr_w ptr;
timer_base& owner;
const time_point deadline;
const task_ptr_w task;

/**
* @brief Constructor.
*/
inline registration(delay& p_owner);
inline registration(
timer_base& p_owner,
const time_point& p_deadline,
const task_ptr_w& p_task);
};

using registration_ptr_w = std::weak_ptr<registration>;
using registration_ptr_s = std::shared_ptr<registration>;

} } }

+ 7
- 10
include/asyncpp/timer/impl/registration.inl Bestand weergeven

@@ -6,18 +6,15 @@ namespace asyncpp {
namespace timer {
namespace __impl {

/* registration::inner */

registration::inner::inner(timer_base& p_owner, time_point p_timeout)
: owner (p_owner)
, timeout (p_timeout)
{ }

/* registration */

registration::registration(delay& p_owner)
: owner(p_owner)
, ptr ()
registration::registration(
timer_base& p_owner,
const time_point& p_deadline,
const task_ptr_w& p_task)
: owner (p_owner)
, deadline (p_deadline)
, task (p_task)
{ }

} } }

+ 0
- 9
include/asyncpp/timer/impl/registration.pre.h Bestand weergeven

@@ -1,9 +0,0 @@
#pragma once

namespace asyncpp {
namespace timer {
namespace __impl {

struct registration;

} } }

+ 12
- 10
include/asyncpp/timer/impl/timer_base.h Bestand weergeven

@@ -17,15 +17,17 @@ namespace __impl {
struct timer_base
{
private:
using inner_ptr_s = std::shared_ptr<registration::inner>;

struct inner_less_compare
{ constexpr bool operator()(const inner_ptr_s& lhs, const inner_ptr_s& rhs) const; };
struct registration_less_compare
{
constexpr bool operator()(
const registration_ptr_s& lhs,
const registration_ptr_s& rhs) const;
};

using inner_set = std::set<inner_ptr_s, inner_less_compare>;
using registration_set = std::set<registration_ptr_s, registration_less_compare>;

protected:
cppcore::locked<inner_set> _registrations;
cppcore::locked<registration_set> _registrations;

public:
/**
@@ -58,18 +60,18 @@ namespace __impl {
/**
* @brief Register a new resource within this timer_base.
*/
static inline void register_resource(registration& p_value);
static inline registration_ptr_w register_resource(const time_point& p_deadline);

/**
* @brief Register a new resource within this timer_base.
*/
static inline void unregister_resource(registration& p_value);
static inline void unregister_resource(registration_ptr_w& p_value);

private:
/**
* @brief Add a new resource to the timer_base.
* @brief Create registration for a new resource.
*/
inline inner_ptr_s make_inner(registration& p_value);
registration_ptr_w create_registration(const time_point& p_deadline);

private:
struct storage


+ 24
- 28
include/asyncpp/timer/impl/timer_base.inl Bestand weergeven

@@ -8,13 +8,15 @@ namespace __impl {

/* timer_base::inner_less_compare */

constexpr bool timer_base::inner_less_compare::operator()(const inner_ptr_s& lhs, const inner_ptr_s& rhs) const
constexpr bool timer_base::registration_less_compare::operator()(
const registration_ptr_s& lhs,
const registration_ptr_s& rhs) const
{
return (lhs->timeout < rhs->timeout)
return (lhs->deadline < rhs->deadline)
? true
:
(lhs->timeout == rhs->timeout)
&& (lhs.get() < rhs.get())
(lhs->deadline == rhs->deadline)
&& (lhs.get() < rhs.get())
? true
: false;
}
@@ -50,42 +52,36 @@ namespace __impl {
timer_base* timer_base::current()
{ return local_storage().current; }

void timer_base::register_resource(registration& p_value)
registration_ptr_w timer_base::register_resource(const time_point& p_deadline)
{
auto s = p_value.ptr.lock();
auto t = current();

if (s && s->timeout != p_value.owner.timeout())
{
unregister_resource(p_value);
s.reset();
}
if (!t)
throw std::runtime_error("Thread local timer_base instance is not assigned!");

if (!s)
{
auto t = current();
return t->create_registration(p_deadline);
}

registration_ptr_w timer_base::create_registration(const time_point& p_deadline)
{
auto r = std::make_shared<registration>(
*this,
p_deadline,
task::current());

if (!t)
throw std::runtime_error("Thread local timer_base instance is not assigned!");
_registrations.lock()->insert(r);

p_value.ptr = t->make_inner(p_value);
}
return r;
}

void timer_base::unregister_resource(registration& p_value)
void timer_base::unregister_resource(registration_ptr_w& p_value)
{
auto s = p_value.ptr.lock();
p_value.ptr.reset();
auto s = p_value.lock();
p_value.reset();
if (s)
s->owner._registrations.lock()->erase(s);
}

inline timer_base::inner_ptr_s timer_base::make_inner(registration& p_value)
{
auto s = std::make_shared<registration::inner>(*this, p_value.owner.timeout());
_registrations.lock()->insert(s);
return s;
}

timer_base::storage& timer_base::local_storage()
{
thread_local storage value;


+ 67
- 0
include/asyncpp/timer/interval.h Bestand weergeven

@@ -0,0 +1,67 @@
#pragma once

#include <asyncpp/core/misc.h>
#include <asyncpp/core/future.h>
#include <asyncpp/core/stream.h>

#include "delay.h"

namespace asyncpp {
namespace timer {

struct interval
{
public:
using delay_future_type = future<delay>;

public:
friend struct stream_trait<interval, void>;

private:
delay_future_type _delay; //!< Delay future
clock::duration _duration; //!< Interval duration.
time_point _deadline; //!< Deadline after the interval should end.

public:
/**
* @brief Constructor.
*/
template<typename T_base, typename T_ratio>
inline interval(
const duration<T_base, T_ratio>& p_duration,
time_point p_deadline = time_point());

/**
* @brief Constructor.
*/
template<typename T_base, typename T_ratio>
inline interval(
const time_point& p_at,
const duration<T_base, T_ratio>& p_duration,
time_point p_deadline = time_point());

/**
* @brief Get the duration of the interval.
*/
inline const clock::duration& duration() const;
};

} }

namespace asyncpp
{

/* stream_trait for timer::interval */

template<>
struct stream_trait<timer::interval, void>
: public stream_base<stream<timer::interval, void>>
{
using value_type = void;
using result_type = stream_result<value_type>;

template<typename X_stream>
static inline result_type poll(X_stream& self);
};

}

+ 60
- 0
include/asyncpp/timer/interval.inl Bestand weergeven

@@ -0,0 +1,60 @@
#pragma once

#include <asyncpp/core/future.inl>

#include "delay.inl"
#include "interval.h"

namespace asyncpp {
namespace timer {

/* interval */

template<typename T_base, typename T_ratio>
interval::interval(
const asyncpp::duration<T_base, T_ratio>& p_duration,
time_point p_deadline)
: interval(clock::now() + p_duration, p_duration, p_deadline)
{ }

template<typename T_base, typename T_ratio>
interval::interval(
const time_point& p_at,
const asyncpp::duration<T_base, T_ratio>& p_duration,
time_point p_deadline)
: _delay (as_future(delay(p_at)))
, _duration(p_duration)
, _deadline(p_deadline)
{ }

const clock::duration& interval::duration() const
{ return _duration; }

} }

namespace asyncpp
{

/* stream_trait for timer::interval */

template<typename X_stream>
typename stream_trait<timer::interval, void>::result_type
stream_trait<timer::interval, void>
::poll(X_stream& self)
{
auto ret = self->_delay.poll();
if (ret.is_not_ready())
return result_type::not_ready();

auto now = self->_delay->deadline();
auto new_deadline = now + self->_duration;
if ( self->_deadline.time_since_epoch().count()
&& new_deadline >= self->_deadline)
return result_type::done();

self->_delay->reset(new_deadline);

return result_type::ready();
}

}

+ 4
- 4
include/asyncpp/timer/timer.h Bestand weergeven

@@ -49,23 +49,23 @@ namespace timer {
* @brief Handle idle of the runtime.
*
* This method is called as soon as the runtime has nothing to do.
* The passed timeout is the timepoint the method should return (or null if not set).
* The passed deadline is the timepoint the method should return (or null if not set).
*/
inline void idle(asyncpp::time_point* p_timeout);
inline void idle(const asyncpp::time_point * p_deadline);

private:
/**
* @brief Call the idle method of the inner runtime.
*/
template<typename X = inner_type>
inline auto inner_idle(asyncpp::time_point* p_timeout)
inline auto inner_idle(const asyncpp::time_point * p_deadline)
-> std::enable_if_t<std::is_same_v<X, void>>;

/**
* @brief Call the idle method of the inner runtime.
*/
template<typename X = inner_type>
inline auto inner_idle(asyncpp::time_point* p_timeout)
inline auto inner_idle(const asyncpp::time_point * p_deadline)
-> std::enable_if_t<!std::is_same_v<X, void>>;
};



+ 22
- 23
include/asyncpp/timer/timer.inl Bestand weergeven

@@ -1,5 +1,7 @@
#pragma once

#include <asyncpp/core/misc.inl>

#include "timer.h"
#include "delay.inl"

@@ -37,40 +39,29 @@ namespace timer {
{ }

template<typename T_inner>
void timer<T_inner>::idle(asyncpp::time_point* p_timeout)
void timer<T_inner>::idle(const asyncpp::time_point * p_deadline)
{
bool has_timeout = false;
asyncpp::time_point timeout;

if (p_timeout)
{
timeout = *p_timeout;
has_timeout = true;
}
auto now = clock::now();

{
auto r = _registrations.lock();
if (!r->empty())
auto it = r->begin();
while (it != r->end() && now >= (**it).deadline)
{
auto t = (*r->begin())->timeout;

if (!has_timeout)
timeout = t;
else if (t < timeout)
timeout = t;
auto t = (**it).task.lock();
if (t)
t->notify();

has_timeout = true;
++it;
}
}

inner_idle(has_timeout
? &timeout
: nullptr);
inner_idle(p_deadline);
}

template<typename T_inner>
template<typename X>
auto timer<T_inner>::inner_idle(asyncpp::time_point* p_timeout)
auto timer<T_inner>::inner_idle(const asyncpp::time_point * p_deadline)
-> std::enable_if_t<std::is_same_v<X, void>>
{
/* no-op */
@@ -78,10 +69,18 @@ namespace timer {

template<typename T_inner>
template<typename X>
auto timer<T_inner>::inner_idle(asyncpp::time_point* p_timeout)
auto timer<T_inner>::inner_idle(const asyncpp::time_point * p_deadline)
-> std::enable_if_t<!std::is_same_v<X, void>>
{
_storage.inner.idle(p_timeout);
{
auto r = _registrations.lock();
if (!r->empty())
{
p_deadline = merge_deadlines(p_deadline, &(*r->begin())->deadline);
}
}

_storage.inner.idle(p_deadline);
}

} }

+ 3
- 3
test/asyncpp/core/future_tests.cpp Bestand weergeven

@@ -25,10 +25,10 @@ namespace asyncpp
{
using result_type = future_result<value_type>;

if (self.ref.count >= self.ref.delay)
return result_type::ready(self.ref.count);
if (self->count >= self->delay)
return result_type::ready(self->count);

++self.ref.count;
++self->count;
return result_type::not_ready();
}
};


+ 21
- 7
test/asyncpp/core/stream_tests.cpp Bestand weergeven

@@ -17,23 +17,23 @@ namespace asyncpp

template<>
struct stream_trait<delay, void>
: public future_base<future<delay, void>>
: public stream_base<stream<delay, void>>
{
using value_type = int;

template<typename T_future>
static inline auto poll(T_future& self)
template<typename X_stream>
static inline auto poll(X_stream& self)
{
using result_type = stream_result<value_type>;

if (self.ref.count >= self.ref.delay)
if (self->count >= self->delay)
return result_type::done();

++self.ref.count;
++self->count;

return self.ref.count <= self.ref.threshold
return self->count <= self->threshold
? result_type::not_ready()
: result_type::ready(self.ref.count);
: result_type::ready(self->count);
}
};

@@ -66,3 +66,17 @@ TEST(stream_tests, poll)
ASSERT_FALSE(r);
ASSERT_TRUE (r.is_done());
}

TEST(stream_tests, for_each)
{
int i = 0;
delay d { 5, 0, 0 };
auto f = as_stream(d)
.for_each([&i](int x) {
++i;
EXPECT_EQ(i, x);
});

auto r = f.poll();
ASSERT_TRUE(r);
}

+ 3
- 3
test/asyncpp/core/task_tests.cpp Bestand weergeven

@@ -25,10 +25,10 @@ namespace asyncpp
{
using result_type = future_result<value_type>;

if (self.ref.count >= self.ref.delay)
return result_type::ready(self.ref.count);
if (self->count >= self->delay)
return result_type::ready(self->count);

++self.ref.count;
++self->count;
return result_type::not_ready();
}
};


+ 2
- 2
test/asyncpp/executor/current_thread_tests.cpp Bestand weergeven

@@ -27,8 +27,8 @@ namespace asyncpp
template<typename X_future>
static inline auto poll(X_future& self)
{
self.ref.task = executor::executor::current_task();
return self.ref.done
self->task = task::current();
return self->done
? result_type::ready()
: result_type::not_ready();
}


+ 72
- 0
test/asyncpp/timer/interval_tests.cpp Bestand weergeven

@@ -0,0 +1,72 @@
#include <gtest/gtest.h>

#include "../../helper/now_mock.h"

#include <asyncpp.h>
#include <asyncpp/timer.h>

using namespace ::testing;
using namespace ::asyncpp;
using namespace ::asyncpp::timer;

TEST(interval_tests, poll)
{
InSequence seq;
StrictMock<now_mock> m;
asyncpp::timer::timer t;
interval i(
time_point(std::chrono::seconds(10)),
std::chrono::seconds(5),
time_point(std::chrono::seconds(30)));

// 30 - 10 = 20
// 20 / 5 = 4
// 4 - 1 = 3
// 3 x Ready

t.make_current();

auto s = as_stream(i);

EXPECT_CALL(m, now)
.WillOnce(Return(time_point(std::chrono::seconds(0))));

auto r0 = s.poll();
ASSERT_EQ(result_status::not_ready, r0.status());

EXPECT_CALL(m, now)
.WillOnce(Return(time_point(std::chrono::seconds(9))));

auto r1 = s.poll();
ASSERT_EQ(result_status::not_ready, r1.status());

EXPECT_CALL(m, now)
.WillOnce(Return(time_point(std::chrono::seconds(10))));

auto r2 = s.poll();
ASSERT_EQ(result_status::ready, r2.status());

EXPECT_CALL(m, now)
.WillOnce(Return(time_point(std::chrono::seconds(11))));

auto r3 = s.poll();
ASSERT_EQ(result_status::not_ready, r3.status());

EXPECT_CALL(m, now)
.WillOnce(Return(time_point(std::chrono::seconds(30))));

auto r4 = s.poll();
ASSERT_EQ(result_status::ready, r4.status());

EXPECT_CALL(m, now)
.WillOnce(Return(time_point(std::chrono::seconds(30))));

auto r5 = s.poll();
ASSERT_EQ(result_status::ready, r5.status());

EXPECT_CALL(m, now)
.WillOnce(Return(time_point(std::chrono::seconds(30))));

auto r6 = s.poll();
ASSERT_EQ(result_status::done, r6.status());
}

+ 1
- 1
test/helper/runtime_mock.h Bestand weergeven

@@ -8,5 +8,5 @@
struct runtime_mock
{
public:
MOCK_METHOD1(idle, void (asyncpp::time_point* p_timeout));
MOCK_METHOD1(idle, void (const asyncpp::time_point * p_deadline));
};

Laden…
Annuleren
Opslaan