From 9f2806076399121bf4c534f91bd39c221b27a884 Mon Sep 17 00:00:00 2001 From: bergmann Date: Sat, 15 Sep 2018 16:03:59 +0200 Subject: [PATCH] * moved cppamqp to extra submodule repository --- CMakeLists.txt | 13 + include/cppamqp.h | 19 + include/cppamqp/channel.h | 50 ++ include/cppamqp/channel.inl | 48 ++ include/cppamqp/config.h | 4 + include/cppamqp/connection.fwd.h | 8 + include/cppamqp/connection.h | 62 +++ include/cppamqp/connection.inl | 52 ++ include/cppamqp/consume_result.h | 38 ++ include/cppamqp/consume_result.inl | 50 ++ include/cppamqp/enums.h | 48 ++ include/cppamqp/exception.h | 21 + include/cppamqp/exception.inl | 22 + include/cppamqp/helper.h | 17 + include/cppamqp/helper.inl | 22 + include/cppamqp/message.h | 20 + include/cppamqp/message.inl | 17 + include/cppamqp/publish_options.h | 39 ++ include/cppamqp/publish_options.inl | 29 ++ include/cppamqp/types.h | 17 + src/CMakeLists.txt | 36 ++ src/channel.cpp | 126 +++++ src/connection.cpp | 164 +++++++ src/helper.cpp | 57 +++ src/publish_options.cpp | 58 +++ test/CMakeLists.txt | 29 ++ test/amqp.cpp | 722 ++++++++++++++++++++++++++++ test/mock.cpp | 120 +++++ test/mock.h | 47 ++ 29 files changed, 1955 insertions(+) create mode 100644 CMakeLists.txt create mode 100644 include/cppamqp.h create mode 100644 include/cppamqp/channel.h create mode 100644 include/cppamqp/channel.inl create mode 100644 include/cppamqp/config.h create mode 100644 include/cppamqp/connection.fwd.h create mode 100644 include/cppamqp/connection.h create mode 100644 include/cppamqp/connection.inl create mode 100644 include/cppamqp/consume_result.h create mode 100644 include/cppamqp/consume_result.inl create mode 100644 include/cppamqp/enums.h create mode 100644 include/cppamqp/exception.h create mode 100644 include/cppamqp/exception.inl create mode 100644 include/cppamqp/helper.h create mode 100644 include/cppamqp/helper.inl create mode 100644 include/cppamqp/message.h create mode 100644 include/cppamqp/message.inl create mode 100644 include/cppamqp/publish_options.h create mode 100644 include/cppamqp/publish_options.inl create mode 100644 include/cppamqp/types.h create mode 100644 src/CMakeLists.txt create mode 100644 src/channel.cpp create mode 100644 src/connection.cpp create mode 100644 src/helper.cpp create mode 100644 src/publish_options.cpp create mode 100644 test/CMakeLists.txt create mode 100644 test/amqp.cpp create mode 100644 test/mock.cpp create mode 100644 test/mock.h diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..fee0f4c --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,13 @@ +# Initialize CMake ################################################################################ + +CMake_Minimum_Required ( VERSION 3.5.1 FATAL_ERROR ) + +If ( NOT CMAKE_BUILD_TYPE ) + Set ( CMAKE_BUILD_TYPE "Release" CACHE STRING "Choose the type of build!" FORCE ) +EndIf ( NOT CMAKE_BUILD_TYPE ) +Set ( CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_CURRENT_SOURCE_DIR}/../../../inc/cmake/") + +# Projects ######################################################################################## + +Add_SubDirectory ( ${CMAKE_CURRENT_SOURCE_DIR}/src ) +Add_SubDirectory ( ${CMAKE_CURRENT_SOURCE_DIR}/test ) diff --git a/include/cppamqp.h b/include/cppamqp.h new file mode 100644 index 0000000..55f3266 --- /dev/null +++ b/include/cppamqp.h @@ -0,0 +1,19 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include \ No newline at end of file diff --git a/include/cppamqp/channel.h b/include/cppamqp/channel.h new file mode 100644 index 0000000..9da0637 --- /dev/null +++ b/include/cppamqp/channel.h @@ -0,0 +1,50 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace cppamqp +{ + + struct channel + { + private: + struct internal + { + const connection& connection; + channel_number handle; + + internal(const ::cppamqp::connection& p_connection, channel_number p_handle); + ~internal(); + }; + + private: + friend struct connection; + + std::shared_ptr _internal; + + inline channel(const connection& p_connection, channel_number p_handle); + + public: + inline channel(); + inline channel(channel&& other); + inline ~channel(); + + inline operator bool () const; + inline void operator = (channel&& other); + + inline channel_number handle () const; + inline const connection& connection () const; + + queue_declaration declare_queue (const std::string& name, const queue_flags& flags); + void bind_queue (const std::string& queue, const std::string& exchange, const std::string& routing_key); + void publish (const std::string& exchange, const std::string& routing_key, const publish_flags& flags, const std::string& message, const publish_options* options = nullptr); + std::string consume (const std::string& queue, const std::string& consumer_tag, const consume_flags& flags); + void close (int status = AMQP_REPLY_SUCCESS); + }; + +} \ No newline at end of file diff --git a/include/cppamqp/channel.inl b/include/cppamqp/channel.inl new file mode 100644 index 0000000..996487e --- /dev/null +++ b/include/cppamqp/channel.inl @@ -0,0 +1,48 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace cppamqp +{ + + channel::channel(const cppamqp::connection& p_connection, channel_number p_handle) + : _internal(new internal(p_connection, p_handle)) + { } + + channel::channel() + { } + + channel::channel(channel&& other) + : _internal(std::move(other._internal)) + { } + + channel::~channel() + { } + + channel::operator bool() const + { return static_cast(_internal); } + + void channel::operator =(channel&& other) + { _internal = std::move(other._internal); } + + channel_number channel::handle() const + { + if (!_internal) + throw exception("channel is closed!"); + return _internal->handle; + } + + const connection& channel::connection() const + { + if (!_internal) + throw exception("channel is closed!"); + return _internal->connection; + } + +} \ No newline at end of file diff --git a/include/cppamqp/config.h b/include/cppamqp/config.h new file mode 100644 index 0000000..04cc552 --- /dev/null +++ b/include/cppamqp/config.h @@ -0,0 +1,4 @@ +#pragma once + +#include +#include \ No newline at end of file diff --git a/include/cppamqp/connection.fwd.h b/include/cppamqp/connection.fwd.h new file mode 100644 index 0000000..d2ac201 --- /dev/null +++ b/include/cppamqp/connection.fwd.h @@ -0,0 +1,8 @@ +#pragma once + +namespace cppamqp +{ + + struct connection; + +} \ No newline at end of file diff --git a/include/cppamqp/connection.h b/include/cppamqp/connection.h new file mode 100644 index 0000000..2426aa4 --- /dev/null +++ b/include/cppamqp/connection.h @@ -0,0 +1,62 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace cppamqp +{ + + struct connection + { + private: + static inline const std::string& default_vhost (); + static inline const std::string& default_username (); + static inline const std::string& default_password (); + static inline const std::chrono::milliseconds& default_consume_timeout (); + + private: + struct internal + { + amqp_connection_state_t connection; + int reply_code; + bool auto_close { false }; + + internal(); + ~internal(); + }; + + private: + std::unique_ptr _internal; + + public: + inline amqp_connection_state_t& handle (); + inline const amqp_connection_state_t& handle () const; + + void tcp_connect( + const std::string& hostname, + uint port); + + void login_plain( + const std::string& p_username = default_username(), + const std::string& p_password = default_password(), + const std::string& p_vhost = default_vhost(), + int p_max_channels = AMQP_DEFAULT_MAX_CHANNELS, + int p_max_frame_size = AMQP_DEFAULT_FRAME_SIZE); + + void login_external( + const std::string& p_identify, + const std::string& p_vhost = default_vhost(), + int p_max_channels = AMQP_DEFAULT_MAX_CHANNELS, + int p_max_frame_size = AMQP_DEFAULT_FRAME_SIZE); + + channel open_channel (channel_number p_channel); + consume_result consume_message(const std::chrono::milliseconds& p_timeout = default_consume_timeout()); + void close (int p_status = AMQP_REPLY_SUCCESS, bool p_force = false); + }; + +} \ No newline at end of file diff --git a/include/cppamqp/connection.inl b/include/cppamqp/connection.inl new file mode 100644 index 0000000..5196f9e --- /dev/null +++ b/include/cppamqp/connection.inl @@ -0,0 +1,52 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace cppamqp +{ + + const std::string& connection::default_vhost() + { + static const std::string value("/"); + return value; + } + + const std::string& connection::default_username() + { + static const std::string value("guest"); + return value; + } + + const std::string& connection::default_password() + { + static const std::string value("guest"); + return value; + } + + const std::chrono::milliseconds& connection::default_consume_timeout() + { + static const std::chrono::milliseconds value(-1); + return value; + } + + amqp_connection_state_t& connection::handle() + { + if (!_internal) + throw exception("connection is closed"); + return _internal->connection; + } + + const amqp_connection_state_t& connection::handle() const + { + if (!_internal) + throw exception("connection is closed"); + return _internal->connection; + } + +} \ No newline at end of file diff --git a/include/cppamqp/consume_result.h b/include/cppamqp/consume_result.h new file mode 100644 index 0000000..19b4cf0 --- /dev/null +++ b/include/cppamqp/consume_result.h @@ -0,0 +1,38 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace cppamqp +{ + + struct consume_result + { + private: + const connection& _connection; + + public: + consume_result_type type; + message message; + channel_number channel; + std::string consumer_tag; + uint64_t delivery_tag; + bool redelivered; + std::string exchange; + std::string routing_key; + + inline consume_result(const connection& p_connection, consume_result_type p_type); + inline consume_result(const connection& p_connection, consume_result_type p_type, const amqp_message_t& p_message); + inline consume_result(const connection& p_connection, const amqp_envelope_t& p_envelope); + inline consume_result(consume_result&& other) = default; + inline consume_result(const consume_result& other) = default; + + inline void ack(); + inline void nack(bool requeue); + }; + +} \ No newline at end of file diff --git a/include/cppamqp/consume_result.inl b/include/cppamqp/consume_result.inl new file mode 100644 index 0000000..6e19b24 --- /dev/null +++ b/include/cppamqp/consume_result.inl @@ -0,0 +1,50 @@ +#pragma once + +#include +#include +#include +#include + +namespace cppamqp +{ + + consume_result::consume_result(const connection& p_connection, consume_result_type p_type) + : _connection (p_connection) + , type (p_type) + { } + + consume_result::consume_result(const connection& p_connection, consume_result_type p_type, const amqp_message_t& p_message) + : _connection (p_connection) + , type (p_type) + , message (p_message) + { } + + consume_result::consume_result(const connection& p_connection, const amqp_envelope_t& p_envelope) + : _connection (p_connection) + , type (consume_result_type::success) + , message (p_envelope.message) + , channel (p_envelope.channel) + , consumer_tag (__impl::from_bytes(p_envelope.consumer_tag)) + , delivery_tag (p_envelope.delivery_tag) + , redelivered (p_envelope.redelivered) + , exchange (__impl::from_bytes(p_envelope.exchange)) + , routing_key (__impl::from_bytes(p_envelope.routing_key)) + { } + + void consume_result::ack() + { + __impl::check_and_raise( + amqp_basic_ack(_connection.handle(), channel, delivery_tag, false), + "error while ack message", + false); + } + + void consume_result::nack(bool requeue) + { + __impl::check_and_raise( + amqp_basic_nack(_connection.handle(), channel, delivery_tag, false, requeue), + "error while nack message", + false); + } + +} \ No newline at end of file diff --git a/include/cppamqp/enums.h b/include/cppamqp/enums.h new file mode 100644 index 0000000..24f7a98 --- /dev/null +++ b/include/cppamqp/enums.h @@ -0,0 +1,48 @@ +#pragma once + +#include +#include + +namespace cppamqp +{ + + enum class delivery_mode + { + non_persistent = AMQP_DELIVERY_NONPERSISTENT, + persistent = AMQP_DELIVERY_PERSISTENT, + }; + + enum class consume_result_type + { + success, + timeout, + acknowledge, + connection_closed_by_peer, + could_not_deliver_return_to_sender, + }; + + enum class queue_flag + { + passive, + durable, + exclusive, + auto_delete, + }; + using queue_flags = utl::shifted_flags; + + enum class publish_flag + { + mandatory, + immediate, + }; + using publish_flags = utl::shifted_flags; + + enum class consume_flag + { + no_local, + no_ack, + exclusive, + }; + using consume_flags = utl::shifted_flags; + +} \ No newline at end of file diff --git a/include/cppamqp/exception.h b/include/cppamqp/exception.h new file mode 100644 index 0000000..389e574 --- /dev/null +++ b/include/cppamqp/exception.h @@ -0,0 +1,21 @@ +#pragma once + +#include +#include +#include +#include + +namespace cppamqp +{ + + struct exception + : public utl::exception + { + amqp_status_enum status { AMQP_STATUS_OK }; + + inline exception(const std::string& p_message); + inline exception(amqp_status_enum p_status); + inline exception(const std::string& p_message, amqp_status_enum p_status); + }; + +} \ No newline at end of file diff --git a/include/cppamqp/exception.inl b/include/cppamqp/exception.inl new file mode 100644 index 0000000..2867774 --- /dev/null +++ b/include/cppamqp/exception.inl @@ -0,0 +1,22 @@ +#pragma once + +#include + +namespace cppamqp +{ + + exception::exception(const std::string& p_message) : + utl::exception (p_message) + { } + + exception::exception(amqp_status_enum p_status) : + utl::exception (amqp_error_string2(p_status)), + status (p_status) + { } + + exception::exception(const std::string& p_message, amqp_status_enum p_status) : + utl::exception (p_message), + status (p_status) + { } + +} \ No newline at end of file diff --git a/include/cppamqp/helper.h b/include/cppamqp/helper.h new file mode 100644 index 0000000..ca05186 --- /dev/null +++ b/include/cppamqp/helper.h @@ -0,0 +1,17 @@ +#pragma once + +#include +#include + +namespace cppamqp +{ + + namespace __impl + { + void check_and_raise (int returnCode, std::string msg, bool logOnly); + void check_and_raise (const amqp_rpc_reply_t& reply, std::string msg, bool logOnly); + inline amqp_bytes_t make_bytes (const std::string& s); + inline std::string from_bytes (const amqp_bytes_t& data); + } + +} \ No newline at end of file diff --git a/include/cppamqp/helper.inl b/include/cppamqp/helper.inl new file mode 100644 index 0000000..5a1429c --- /dev/null +++ b/include/cppamqp/helper.inl @@ -0,0 +1,22 @@ +#pragma once + +#include + +namespace cppamqp +{ + + namespace __impl + { + amqp_bytes_t make_bytes(const std::string& s) + { + return amqp_bytes_t { + s.size(), + const_cast(s.c_str()), + }; + } + + std::string from_bytes(const amqp_bytes_t& data) + { return std::string(static_cast(data.bytes), data.len); } + } + +} \ No newline at end of file diff --git a/include/cppamqp/message.h b/include/cppamqp/message.h new file mode 100644 index 0000000..2acc94c --- /dev/null +++ b/include/cppamqp/message.h @@ -0,0 +1,20 @@ +#pragma once + +#include +#include + +namespace cppamqp +{ + + struct message + { + publish_options options; + std::string body; + + inline message(); + inline message(const amqp_message_t& msg); + inline message(message&& other) = default; + inline message(const message& other) = default; + }; + +} \ No newline at end of file diff --git a/include/cppamqp/message.inl b/include/cppamqp/message.inl new file mode 100644 index 0000000..74a48c9 --- /dev/null +++ b/include/cppamqp/message.inl @@ -0,0 +1,17 @@ +#pragma once + +#include +#include + +namespace cppamqp +{ + + message::message() + { } + + message::message(const amqp_message_t& msg) + : options (msg.properties) + , body (__impl::from_bytes(msg.body)) + { } + +} \ No newline at end of file diff --git a/include/cppamqp/publish_options.h b/include/cppamqp/publish_options.h new file mode 100644 index 0000000..679e3d0 --- /dev/null +++ b/include/cppamqp/publish_options.h @@ -0,0 +1,39 @@ +#pragma once + +#include +#include +#include + +namespace cppamqp +{ + + struct publish_options + { + private: + mutable amqp_basic_properties_t _properties; + + public: + utl::nullable content_type; + utl::nullable content_encoding; + utl::nullable delivery_mode; + utl::nullable priority; + utl::nullable correlation_id; + utl::nullable reply_to; + utl::nullable expiration; + utl::nullable message_id; + utl::nullable timestamp; + utl::nullable type; + utl::nullable user_id; + utl::nullable app_id; + utl::nullable cluster_id; + + inline publish_options(); + inline publish_options(const amqp_basic_properties_t& prop); + inline publish_options(publish_options&& other) = default; + inline publish_options(const publish_options& other) = default; + + const amqp_basic_properties_t* properties () const; + void reset (); + }; + +} \ No newline at end of file diff --git a/include/cppamqp/publish_options.inl b/include/cppamqp/publish_options.inl new file mode 100644 index 0000000..c74e1ef --- /dev/null +++ b/include/cppamqp/publish_options.inl @@ -0,0 +1,29 @@ +#pragma once + +#include +#include + +namespace cppamqp +{ + + publish_options::publish_options() + { } + + publish_options::publish_options(const amqp_basic_properties_t& prop) + { + if (prop._flags & AMQP_BASIC_CONTENT_TYPE_FLAG) content_type = __impl::from_bytes (prop.content_type); + if (prop._flags & AMQP_BASIC_CONTENT_ENCODING_FLAG) content_encoding = __impl::from_bytes (prop.content_encoding); + if (prop._flags & AMQP_BASIC_DELIVERY_MODE_FLAG) delivery_mode = static_cast(prop.delivery_mode); + if (prop._flags & AMQP_BASIC_PRIORITY_FLAG) priority = prop.priority; + if (prop._flags & AMQP_BASIC_CORRELATION_ID_FLAG) correlation_id = __impl::from_bytes (prop.correlation_id); + if (prop._flags & AMQP_BASIC_REPLY_TO_FLAG) reply_to = __impl::from_bytes (prop.reply_to); + if (prop._flags & AMQP_BASIC_EXPIRATION_FLAG) expiration = __impl::from_bytes (prop.expiration); + if (prop._flags & AMQP_BASIC_MESSAGE_ID_FLAG) message_id = __impl::from_bytes (prop.message_id); + if (prop._flags & AMQP_BASIC_TIMESTAMP_FLAG) timestamp = prop.timestamp; + if (prop._flags & AMQP_BASIC_TYPE_FLAG) type = __impl::from_bytes (prop.type); + if (prop._flags & AMQP_BASIC_USER_ID_FLAG) user_id = __impl::from_bytes (prop.user_id); + if (prop._flags & AMQP_BASIC_APP_ID_FLAG) app_id = __impl::from_bytes (prop.app_id); + if (prop._flags & AMQP_BASIC_CLUSTER_ID_FLAG) cluster_id = __impl::from_bytes (prop.cluster_id); + } + +} \ No newline at end of file diff --git a/include/cppamqp/types.h b/include/cppamqp/types.h new file mode 100644 index 0000000..c5cb7d1 --- /dev/null +++ b/include/cppamqp/types.h @@ -0,0 +1,17 @@ +#pragma once + +#include + +namespace cppamqp +{ + + struct queue_declaration + { + std::string name; + uint message_count; + uint consumer_count; + }; + + using channel_number = amqp_channel_t; + +} \ No newline at end of file diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt new file mode 100644 index 0000000..7b2d0c3 --- /dev/null +++ b/src/CMakeLists.txt @@ -0,0 +1,36 @@ +# Initialize ###################################################################################### + +Include ( cotire OPTIONAL ) +Include ( pedantic OPTIONAL ) +Include ( strip_symbols OPTIONAL ) + +Option ( BUILD_SHARED_CPPAMQP "Build cppamqp shared library" OFF ) + +Set ( BUILD_SHARED_LIBS ${BUILD_SHARED_CPPAMQP} ) +Set ( CMAKE_CXX_STANDARD 17 ) +Set ( CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${PEDANTIC_C_FLAGS}" ) +Set ( CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${PEDANTIC_CXX_FLAGS}" ) + +# Dependencies #################################################################################### + +Find_Package ( cpputils REQUIRED ) + +# Project: cppamqp ############################################################################### + +Project ( cppamqp VERSION 1.0.0.0 LANGUAGES CXX ) +File ( GLOB_RECURSE SOURCE_FILES ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp ) +Add_Library ( cppamqp ${SOURCE_FILES} ) +Target_Include_Directories ( + cppamqp + PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/../include + ) +Target_Link_Libraries ( + cppamqp + cpputils + ) +If ( __COTIRE_INCLUDED ) + Cotire ( cppamqp ) +EndIf ( ) +If ( __STRIP_SYMBOLS_INCLUDED AND BUILD_SHARED_LIBS ) + Strip_Symbols ( cppamqp DBG_FILE ) +EndIf () diff --git a/src/channel.cpp b/src/channel.cpp new file mode 100644 index 0000000..601cd93 --- /dev/null +++ b/src/channel.cpp @@ -0,0 +1,126 @@ +#include +#include +#include +#include + +using namespace ::cppamqp; + +channel::internal::internal(const cppamqp::connection& p_connection, channel_number p_handle) + : connection(p_connection) + , handle (p_handle) +{ + if (handle <= 0) + throw utl::argument_exception("handle", "handle must be greater than 0"); + amqp_channel_open(connection.handle(), handle); + __impl::check_and_raise( + amqp_get_rpc_reply(connection.handle()), + std::string("unable to open channel ") + std::to_string(handle), + false); +} + +channel::internal::~internal() +{ + if (handle <= 0) + return; + __impl::check_and_raise( + amqp_channel_close(connection.handle(), handle, AMQP_REPLY_SUCCESS), + "unable to close amqp channel", + true); +} + +queue_declaration channel::declare_queue(const std::string& name, const queue_flags& flags) +{ + auto ret = amqp_queue_declare( + connection().handle(), + handle(), + __impl::make_bytes(name), + flags.is_set(queue_flag::passive) ? 1 : 0, + flags.is_set(queue_flag::durable) ? 1 : 0, + flags.is_set(queue_flag::exclusive) ? 1 : 0, + flags.is_set(queue_flag::auto_delete) ? 1 : 0, + amqp_empty_table); + __impl::check_and_raise( + amqp_get_rpc_reply(connection().handle()), + std::string("unable to declare queue '") + name + "'", + false); + if (!ret) + throw exception(std::string("unable to declare queue '") + name + "'"); + return queue_declaration + { + std::string(static_cast(ret->queue.bytes), ret->queue.len), + ret->message_count, + ret->consumer_count + }; +} + +void channel::bind_queue(const std::string& queue, const std::string& exchange, const std::string& routing_key) +{ + amqp_queue_bind( + connection().handle(), + handle(), + __impl::make_bytes(queue), + __impl::make_bytes(exchange), + __impl::make_bytes(routing_key), + amqp_empty_table); + __impl::check_and_raise( + amqp_get_rpc_reply(connection().handle()), + static_cast(std::ostringstream() << + "unable to bind queue '" << queue << + "' to '" << exchange << + "' using '" << routing_key << + "'").str(), + false); +} + +void channel::publish( + const std::string& exchange, + const std::string& routing_key, + const publish_flags& flags, + const std::string& message, + const publish_options* options) +{ + __impl::check_and_raise( + amqp_basic_publish( + connection().handle(), + handle(), + __impl::make_bytes(exchange), + __impl::make_bytes(routing_key), + flags.is_set(publish_flag::mandatory) ? 1 : 0, + flags.is_set(publish_flag::immediate) ? 1 : 0, + options ? options->properties() : nullptr, + __impl::make_bytes(message)), + "error while publishing message", + false); +} + +std::string channel::consume(const std::string& queue, const std::string& consumer_tag, const consume_flags& flags) +{ + auto ret = amqp_basic_consume( + connection().handle(), + handle(), + __impl::make_bytes(queue), + __impl::make_bytes(consumer_tag), + flags.is_set(consume_flag::no_local) ? 1 : 0, + flags.is_set(consume_flag::no_ack) ? 1 : 0, + flags.is_set(consume_flag::exclusive) ? 1 : 0, + amqp_empty_table); + __impl::check_and_raise( + amqp_get_rpc_reply(connection().handle()), + static_cast(std::ostringstream() << "unable to consume from queue '" << queue << "'").str(), + false); + if (!ret) + throw exception(std::string("unable to consume from queue '") + queue + "'"); + return __impl::from_bytes(ret->consumer_tag); +} + +void channel::close(int status) +{ + if (!_internal) + return; + __impl::check_and_raise( + amqp_channel_close(connection().handle(), handle(), status), + "unable to close channel", + false); + _internal->handle = 0; + _internal.reset(); +} \ No newline at end of file diff --git a/src/connection.cpp b/src/connection.cpp new file mode 100644 index 0000000..74161df --- /dev/null +++ b/src/connection.cpp @@ -0,0 +1,164 @@ +#include + +#include +#include +#include +#include +#include +#include + +using namespace ::cppamqp; + +connection::internal::internal() : + connection (amqp_new_connection()), + reply_code (AMQP_REPLY_SUCCESS) +{ + if (!connection) + throw cppamqp::exception("unable to create connection"); +} + +connection::internal::~internal() +{ + if (auto_close) + { + auto_close = false; + __impl::check_and_raise( + amqp_connection_close(connection, reply_code), + "error while closing connection", + true); + } + __impl::check_and_raise( + amqp_destroy_connection(connection), + "error while destroying connection", + true); +} + + +struct internal_envelope : public amqp_envelope_t +{ + internal_envelope() + { memset(this, 0, sizeof(*this)); } + + ~internal_envelope() + { amqp_destroy_envelope(this); } +}; + +struct internal_message : public amqp_message_t +{ + internal_message() + { memset(this, 0, sizeof(*this)); } + + ~internal_message() + { amqp_destroy_message(this); } +}; + +void connection::tcp_connect(const std::string& hostname, uint port) +{ + _internal.reset(new internal()); + + auto socket = amqp_tcp_socket_new(handle()); + if (!socket) + throw exception(std::string("unable to open tcp socket to [") + hostname + "]:" + std::to_string(port)); + + __impl::check_and_raise( + amqp_socket_open(socket, hostname.c_str(), static_cast(port)), + "error while opening socket", + false); + + _internal->auto_close = true; +} + +void connection::login_plain( + const std::string& username, + const std::string& password, + const std::string& vhost, + int maxChannels, + int maxFrameSize) +{ + __impl::check_and_raise( + amqp_login(handle(), vhost.c_str(), maxChannels, maxFrameSize, 0, AMQP_SASL_METHOD_PLAIN, username.c_str(), password.c_str()), + "error while login plain", + false); +} + +void connection::login_external( + const std::string& identify, + const std::string& vhost, + int maxChannels, + int maxFrameSize) +{ + __impl::check_and_raise( + amqp_login(handle(), vhost.c_str(), maxChannels, maxFrameSize, 0, AMQP_SASL_METHOD_EXTERNAL, identify.c_str()), + "error while login external", + false); +} + +channel connection::open_channel(channel_number c) + { return channel(*this, c); } + +consume_result connection::consume_message(const std::chrono::milliseconds& timeout) +{ + ::timeval tv = utl::duration_cast<::timeval>(timeout); + internal_envelope envelope; + amqp_maybe_release_buffers(handle()); + auto ret = amqp_consume_message( + handle(), + &envelope, + timeout.count() >= 0 ? &tv : nullptr, + 0); + + if (ret.reply_type == AMQP_RESPONSE_NORMAL) + return consume_result(*this, envelope); + + if ( ret.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION + && ret.library_error == AMQP_STATUS_TIMEOUT) // TODO: test this case!!!! + return consume_result(*this, consume_result_type::timeout); + + if ( ret.reply_type != AMQP_RESPONSE_LIBRARY_EXCEPTION + || ret.library_error != AMQP_STATUS_UNEXPECTED_STATE) + __impl::check_and_raise(ret, "error while consuming message", false); + + amqp_frame_t frame; + memset(&tv, 0, sizeof(tv)); + __impl::check_and_raise( + amqp_simple_wait_frame_noblock(handle(), &frame, &tv), + "error wile waiting for frame", + false); + if (frame.frame_type != AMQP_FRAME_METHOD) + throw exception("unexpected or unknown frame type"); + switch(frame.payload.method.id) + { + case AMQP_BASIC_ACK_METHOD: + return consume_result(*this, consume_result_type::acknowledge); + + case AMQP_BASIC_RETURN_METHOD: + { + internal_message message; + __impl::check_and_raise( + amqp_read_message(handle(), frame.channel, &message, 0), + "error while reading returned message", + false); + return consume_result(*this, consume_result_type::could_not_deliver_return_to_sender, message); + } + + case AMQP_CHANNEL_CLOSE_METHOD: + throw exception("channel is closed"); + + case AMQP_CONNECTION_CLOSE_METHOD: + close(AMQP_REPLY_SUCCESS, true); + return consume_result(*this, consume_result_type::connection_closed_by_peer); + + default: + throw exception("received unexpected frame method"); + } +} + +void connection::close(int status, bool force) +{ + if (!_internal) + return; + if (force) + _internal->auto_close = false; + _internal->reply_code = status; + _internal.reset(); +} \ No newline at end of file diff --git a/src/helper.cpp b/src/helper.cpp new file mode 100644 index 0000000..5de3d17 --- /dev/null +++ b/src/helper.cpp @@ -0,0 +1,57 @@ +#include +#include +#include + +void ::cppamqp::__impl::check_and_raise(int returnCode, std::string msg, bool logOnly) +{ + if (returnCode == AMQP_STATUS_OK) + return; + if (logOnly) + { + if (!msg.empty()) + msg += " - "; + msg += amqp_error_string2(returnCode); + log_global_message(error, msg); + } + else + throw cppamqp::exception(static_cast(returnCode)); +} + +void ::cppamqp::__impl::check_and_raise(const amqp_rpc_reply_t& reply, std::string msg, bool logOnly) +{ + switch(reply.reply_type) + { + case AMQP_RESPONSE_NORMAL: + return; + + case AMQP_RESPONSE_SERVER_EXCEPTION: + { + if (!msg.empty()) + msg += " - "; + msg += std::string("Server error: ID=") + std::to_string(reply.reply.id); + if (logOnly) { + log_global_message(error, msg); + } else { + throw cppamqp::exception(msg); + } + } + break; + + case AMQP_RESPONSE_LIBRARY_EXCEPTION: + check_and_raise(reply.library_error, msg, logOnly); + break; + + default: + { + if (!msg.empty()) + msg += " - "; + msg += "unknown reply"; + if (logOnly) { + log_global_message(error, msg); + } else { + throw cppamqp::exception(msg); + } + } + break; + } +} diff --git a/src/publish_options.cpp b/src/publish_options.cpp new file mode 100644 index 0000000..5cdd6de --- /dev/null +++ b/src/publish_options.cpp @@ -0,0 +1,58 @@ +#include +#include + +using namespace ::cppamqp; + +template +inline T simpleConvert(const S& value) + { return static_cast(value); } + +template<> +inline amqp_bytes_t simpleConvert(const std::string& value) + { return __impl::make_bytes(value); } + +template +inline void addProperty(amqp_basic_properties_t& properties, amqp_flags_t flag, T amqp_basic_properties_t::*member, const utl::nullable& value) +{ + if (value) + { + properties.*member = simpleConvert(value()); + properties._flags |= flag; + } +} + +const amqp_basic_properties_t* publish_options::properties() const +{ + _properties._flags = 0; + addProperty(_properties, AMQP_BASIC_CONTENT_TYPE_FLAG, &amqp_basic_properties_t::content_type, content_type); + addProperty(_properties, AMQP_BASIC_CONTENT_ENCODING_FLAG, &amqp_basic_properties_t::content_encoding, content_encoding); + addProperty(_properties, AMQP_BASIC_DELIVERY_MODE_FLAG, &amqp_basic_properties_t::delivery_mode, delivery_mode); + addProperty(_properties, AMQP_BASIC_PRIORITY_FLAG, &amqp_basic_properties_t::priority, priority); + addProperty(_properties, AMQP_BASIC_CORRELATION_ID_FLAG, &amqp_basic_properties_t::correlation_id, correlation_id); + addProperty(_properties, AMQP_BASIC_REPLY_TO_FLAG, &amqp_basic_properties_t::reply_to, reply_to); + addProperty(_properties, AMQP_BASIC_EXPIRATION_FLAG, &amqp_basic_properties_t::expiration, expiration); + addProperty(_properties, AMQP_BASIC_MESSAGE_ID_FLAG, &amqp_basic_properties_t::message_id, message_id); + addProperty(_properties, AMQP_BASIC_TIMESTAMP_FLAG, &amqp_basic_properties_t::timestamp, timestamp); + addProperty(_properties, AMQP_BASIC_TYPE_FLAG, &amqp_basic_properties_t::type, type); + addProperty(_properties, AMQP_BASIC_USER_ID_FLAG, &amqp_basic_properties_t::user_id, user_id); + addProperty(_properties, AMQP_BASIC_APP_ID_FLAG, &amqp_basic_properties_t::app_id, app_id); + addProperty(_properties, AMQP_BASIC_CLUSTER_ID_FLAG, &amqp_basic_properties_t::cluster_id, cluster_id); + return &_properties; +} + +void publish_options::reset() +{ + content_type.reset(); + content_encoding.reset(); + delivery_mode.reset(); + priority.reset(); + correlation_id.reset(); + reply_to.reset(); + expiration.reset(); + message_id.reset(); + timestamp.reset(); + type.reset(); + user_id.reset(); + app_id.reset(); + cluster_id.reset(); +} diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt new file mode 100644 index 0000000..5a5e4f4 --- /dev/null +++ b/test/CMakeLists.txt @@ -0,0 +1,29 @@ +# Initialize ###################################################################################### + +Include ( cotire OPTIONAL ) +Include ( pedantic OPTIONAL ) +Include ( cmake_tests OPTIONAL ) + +Set ( CMAKE_CXX_STANDARD 17 ) +Set ( CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${PEDANTIC_C_FLAGS}" ) +Set ( CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${PEDANTIC_CXX_FLAGS}" ) + +# Project: test_cppamqp ########################################################################## + +Project ( test_cppamqp ) +File ( GLOB_RECURSE SOURCE_FILES ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp ) +Add_Executable ( test_cppamqp EXCLUDE_FROM_ALL ${SOURCE_FILES} ) +Target_Link_Libraries ( + test_cppamqp + cppamqp + gtest + gmock + gmock_main + pthread + ) +If ( __COTIRE_INCLUDED ) + Cotire ( test_cppamqp ) +EndIf ( ) +If ( __CMAKE_TESTS_INCLUDED ) + Add_CMake_Test ( cppamqp test_cppamqp ) +EndIf ( ) diff --git a/test/amqp.cpp b/test/amqp.cpp new file mode 100644 index 0000000..8fafe3b --- /dev/null +++ b/test/amqp.cpp @@ -0,0 +1,722 @@ +#include +#include + +#include "mock.h" + +using namespace ::testing; +using namespace ::cppamqp; + +using StrictAmqpMock = StrictMock; + +amqp_envelope_t DefaultEnvelope { + /* .channel */ 3, + /* .consumer_tag */ { 15, const_cast("my_consumer_tag") }, + /* .delivery_tag */ 12345678, + /* .redelivered */ true, + /* .exchange */ { 11, const_cast("my_exchange") }, + /* .routing_key */ { 14, const_cast("my_routing_key") }, + /* .message */ { + /* .properties */ { + /* ._flags */ 0, + /* .content_type */ { 0, nullptr }, + /* .content_encoding */ { 0, nullptr }, + /* .headers */ { 0, nullptr }, + /* .delivery_mode */ 0, + /* .priority */ 0, + /* .correlation_id */ { 0, nullptr }, + /* .reply_to */ { 0, nullptr }, + /* .expiration */ { 0, nullptr }, + /* .message_id */ { 0, nullptr }, + /* .timestamp */ 0, + /* .type */ { 0, nullptr }, + /* .user_id */ { 0, nullptr }, + /* .app_id */ { 0, nullptr }, + /* .cluster_id */ { 0, nullptr }, + }, + /* .body */ { 19, const_cast("this_is_the_message") }, + /* .pool */ { + /* .pagesize */ 0, + /* .pages */ { 0, nullptr }, + /* .large_blocks */ { 0, nullptr }, + /* .next_page */ 0, + /* .alloc_block */ nullptr, + /* .alloc_used */ 0 + } + } +}; + +bool operator ==(const timeval& a, const timeval& b) + { return a.tv_sec == b.tv_sec && a.tv_usec == b.tv_usec; } + +MATCHER_P(BytesEq, data, "") +{ + if ( data == nullptr + && arg.len == 0) + return true; + std::string d(data); + return arg.len == d.size() + && memcmp(arg.bytes, d.data(), arg.len) == 0; +} + +TEST(AmqpTest, Connection_connect_newConnectionFailed) +{ + StrictAmqpMock mock; + InSequence seq; + EXPECT_CALL(mock, amqp_new_connection()) + .WillOnce(Return(nullptr)); + + connection con; + EXPECT_THROW(con.tcp_connect("localhost", 1234), cppamqp::exception); +} + +TEST(AmqpTest, Connection_connect_newSocketFailed) +{ + StrictAmqpMock mock; + InSequence seq; + EXPECT_CALL(mock, amqp_new_connection()) + .WillOnce(Return(reinterpret_cast(1))); + EXPECT_CALL(mock, amqp_tcp_socket_new(reinterpret_cast(1))) + .WillOnce(Return(nullptr)); + EXPECT_CALL(mock, amqp_destroy_connection(reinterpret_cast(1))) + .WillOnce(Return(0)); + + connection con; + EXPECT_THROW(con.tcp_connect("localhost", 1234), cppamqp::exception); +} + +TEST(AmqpTest, Connection_connect_openSocketFailed) +{ + StrictAmqpMock mock; + InSequence seq; + EXPECT_CALL(mock, amqp_new_connection()) + .WillOnce(Return(reinterpret_cast(1))); + EXPECT_CALL(mock, amqp_tcp_socket_new(reinterpret_cast(1))) + .WillOnce(Return(reinterpret_cast(2))); + EXPECT_CALL(mock, amqp_socket_open(reinterpret_cast(2), StrEq("localhost"), 1234)) + .WillOnce(Return(666)); + EXPECT_CALL(mock, amqp_error_string2(666)) + .WillOnce(Return("error from hell >:D")); + EXPECT_CALL(mock, amqp_destroy_connection(reinterpret_cast(1))) + .WillOnce(Return(0)); + + connection con; + EXPECT_THROW(con.tcp_connect("localhost", 1234), cppamqp::exception); +} + +TEST(AmqpTest, Connection_connect_success) +{ + StrictAmqpMock mock; + InSequence seq; + EXPECT_CALL(mock, amqp_new_connection()) + .WillOnce(Return(reinterpret_cast(1))); + EXPECT_CALL(mock, amqp_tcp_socket_new(reinterpret_cast(1))) + .WillOnce(Return(reinterpret_cast(2))); + EXPECT_CALL(mock, amqp_socket_open(reinterpret_cast(2), StrEq("localhost"), 1234)) + .WillOnce(Return(0)); + EXPECT_CALL(mock, amqp_connection_close(reinterpret_cast(1), 200)) + .WillOnce(Return(AmqpMock::defaultRpcReply)); + EXPECT_CALL(mock, amqp_destroy_connection(reinterpret_cast(1))) + .WillOnce(Return(0)); + + connection con; + EXPECT_NO_THROW(con.tcp_connect("localhost", 1234)); +} + +TEST(AmqpTest, Connection_loginPlain_failed) +{ + connection con; + con.tcp_connect("localhost", 1234); + + StrictAmqpMock mock; + InSequence seq; + EXPECT_CALL(mock, amqp_login_plain(AmqpMock::defaultConnectionState, StrEq("vhost"), 100, 200, 0, AMQP_SASL_METHOD_PLAIN, StrEq("username"), StrEq("password"))) + .WillOnce(Return(amqp_rpc_reply_t { + static_cast(AMQP_RESPONSE_LIBRARY_EXCEPTION), + { 0, nullptr }, + 666 })); + EXPECT_CALL(mock, amqp_error_string2(666)) + .WillOnce(Return("error from hell >:D")); + + EXPECT_THROW(con.login_plain("username", "password", "vhost", 100, 200), cppamqp::exception); +} + +TEST(AmqpTest, Connection_loginPlain_success) +{ + connection con; + con.tcp_connect("localhost", 1234); + + StrictAmqpMock mock; + InSequence seq; + EXPECT_CALL(mock, amqp_login_plain(AmqpMock::defaultConnectionState, StrEq("vhost"), 100, 200, 0, AMQP_SASL_METHOD_PLAIN, StrEq("username"), StrEq("password"))) + .WillOnce(Return(AmqpMock::defaultRpcReply)); + + EXPECT_NO_THROW(con.login_plain("username", "password", "vhost", 100, 200)); +} + +TEST(AmqpTest, Connection_openChannel_failed) +{ + std::string channelId("test_channel"); + amqp_channel_open_ok_t channelOk { { + channelId.size(), + const_cast(channelId.c_str()) + } }; + + connection con; + con.tcp_connect("localhost", 1234); + + StrictAmqpMock mock; + InSequence seq; + EXPECT_CALL(mock, amqp_channel_open(AmqpMock::defaultConnectionState, 3)) + .WillOnce(Return(&channelOk)); + EXPECT_CALL(mock, amqp_get_rpc_reply(AmqpMock::defaultConnectionState)) + .WillOnce(Return(amqp_rpc_reply_t { + static_cast(AMQP_RESPONSE_LIBRARY_EXCEPTION), + { 0, nullptr }, + 666 })); + EXPECT_CALL(mock, amqp_error_string2(666)) + .WillOnce(Return("error from hell >:D")); + + EXPECT_THROW(con.open_channel(3), cppamqp::exception); +} + +TEST(AmqpTest, Connection_openChannel_success) +{ + std::string channelId("test_channel"); + amqp_channel_open_ok_t channelOk { { + channelId.size(), + const_cast(channelId.c_str()) + } }; + + connection con; + con.tcp_connect("localhost", 1234); + + StrictAmqpMock mock; + InSequence seq; + EXPECT_CALL(mock, amqp_channel_open(AmqpMock::defaultConnectionState, 3)) + .WillOnce(Return(&channelOk)); + EXPECT_CALL(mock, amqp_get_rpc_reply(AmqpMock::defaultConnectionState)) + .WillOnce(Return(AmqpMock::defaultRpcReply)); + EXPECT_CALL(mock, amqp_channel_close(AmqpMock::defaultConnectionState, 3, AMQP_REPLY_SUCCESS)) + .WillOnce(Return(AmqpMock::defaultRpcReply)); + + EXPECT_NO_THROW(con.open_channel(3)); +} + +TEST(AmqpTest, Connection_consumeMessage_success) +{ + connection con; + con.tcp_connect("localhost", 1234); + + StrictAmqpMock mock; + InSequence seq; + EXPECT_CALL(mock, amqp_maybe_release_buffers(AmqpMock::defaultConnectionState)) + .Times(1); + EXPECT_CALL(mock, amqp_consume_message(AmqpMock::defaultConnectionState, NotNull(), nullptr, 0)) + .WillOnce(DoAll(SetArgPointee<1>(DefaultEnvelope), Return(AmqpMock::defaultRpcReply))); + EXPECT_CALL(mock, amqp_destroy_envelope(NotNull())) + .Times(1); + + auto mr = con.consume_message(); + EXPECT_EQ(consume_result_type::success, mr.type); + EXPECT_EQ(std::string("this_is_the_message"), mr.message.body); + EXPECT_EQ(3, mr.channel); + EXPECT_EQ(std::string("my_consumer_tag"), mr.consumer_tag); + EXPECT_EQ(12345678, mr.delivery_tag); + EXPECT_EQ(true, mr.redelivered); + EXPECT_EQ(std::string("my_exchange"), mr.exchange); + EXPECT_EQ(std::string("my_routing_key"), mr.routing_key); +} + +TEST(AmqpTest, Connection_consumeMessage_nonLibraryError) +{ + connection con; + con.tcp_connect("localhost", 1234); + + StrictAmqpMock mock; + InSequence seq; + EXPECT_CALL(mock, amqp_maybe_release_buffers(AmqpMock::defaultConnectionState)) + .Times(1); + EXPECT_CALL(mock, amqp_consume_message(AmqpMock::defaultConnectionState, NotNull(), nullptr, 0)) + .WillOnce( + DoAll(SetArgPointee<1>(DefaultEnvelope), + Return(amqp_rpc_reply_t { + static_cast(AMQP_RESPONSE_SERVER_EXCEPTION), + { 0, nullptr }, + 666 }))); + EXPECT_CALL(mock, amqp_destroy_envelope(NotNull())) + .Times(1); + + EXPECT_THROW(con.consume_message(), cppamqp::exception); +} + +TEST(AmqpTest, Connection_consumeMessage_nonUnexpectedState) +{ + connection con; + con.tcp_connect("localhost", 1234); + + StrictAmqpMock mock; + InSequence seq; + EXPECT_CALL(mock, amqp_maybe_release_buffers(AmqpMock::defaultConnectionState)) + .Times(1); + EXPECT_CALL(mock, amqp_consume_message(AmqpMock::defaultConnectionState, NotNull(), nullptr, 0)) + .WillOnce( + DoAll(SetArgPointee<1>(DefaultEnvelope), + Return(amqp_rpc_reply_t { + static_cast(AMQP_RESPONSE_LIBRARY_EXCEPTION), + { 0, nullptr }, + 666 }))); + EXPECT_CALL(mock, amqp_error_string2(666)) + .WillOnce(Return("error from hell >:D")); + EXPECT_CALL(mock, amqp_destroy_envelope(NotNull())) + .Times(1); + + EXPECT_THROW(con.consume_message(), cppamqp::exception); +} + +TEST(AmqpTest, Connection_consumeMessage_waitFrameError) +{ + connection con; + con.tcp_connect("localhost", 1234); + + StrictAmqpMock mock; + InSequence seq; + EXPECT_CALL(mock, amqp_maybe_release_buffers(AmqpMock::defaultConnectionState)) + .Times(1); + EXPECT_CALL(mock, amqp_consume_message(AmqpMock::defaultConnectionState, NotNull(), nullptr, 0)) + .WillOnce(DoAll( + SetArgPointee<1>(DefaultEnvelope), + Return(amqp_rpc_reply_t { + static_cast(AMQP_RESPONSE_LIBRARY_EXCEPTION), + { 0, nullptr }, + AMQP_STATUS_UNEXPECTED_STATE }))); + EXPECT_CALL(mock, amqp_simple_wait_frame_noblock(AmqpMock::defaultConnectionState, NotNull(), Pointee(timeval { 0, 0 }))) + .WillOnce(Return(666)); + EXPECT_CALL(mock, amqp_error_string2(666)) + .WillOnce(Return("error from hell >:D")); + EXPECT_CALL(mock, amqp_destroy_envelope(NotNull())) + .Times(1); + + EXPECT_THROW(con.consume_message(), cppamqp::exception); +} + +TEST(AmqpTest, Connection_consumeMessage_noMethodFrame) +{ + amqp_frame_t frame { + /* .frame_type */ AMQP_FRAME_HEADER, + /* .channel */ 3, + /* .payload */ { { 0, nullptr } } + }; + + connection con; + con.tcp_connect("localhost", 1234); + StrictAmqpMock mock; + InSequence seq; + EXPECT_CALL(mock, amqp_maybe_release_buffers(AmqpMock::defaultConnectionState)) + .Times(1); + EXPECT_CALL(mock, amqp_consume_message(AmqpMock::defaultConnectionState, NotNull(), nullptr, 0)) + .WillOnce(DoAll( + SetArgPointee<1>(DefaultEnvelope), + Return(amqp_rpc_reply_t { + static_cast(AMQP_RESPONSE_LIBRARY_EXCEPTION), + { 0, nullptr }, + AMQP_STATUS_UNEXPECTED_STATE }))); + EXPECT_CALL(mock, amqp_simple_wait_frame_noblock(AmqpMock::defaultConnectionState, NotNull(), Pointee(timeval { 0, 0 }))) + .WillOnce(DoAll( + SetArgPointee<1>(frame), + Return(0))); + EXPECT_CALL(mock, amqp_destroy_envelope(NotNull())) + .Times(1); + + EXPECT_THROW(con.consume_message(), cppamqp::exception); +} + +TEST(AmqpTest, Connection_consumeMessage_ackMessage) +{ + amqp_frame_t frame { + /* .frame_type */ AMQP_FRAME_METHOD, + /* .channel */ 3, + /* .payload */ { { AMQP_BASIC_ACK_METHOD, nullptr } } + }; + + connection con; + con.tcp_connect("localhost", 1234); + StrictAmqpMock mock; + InSequence seq; + EXPECT_CALL(mock, amqp_maybe_release_buffers(AmqpMock::defaultConnectionState)) + .Times(1); + EXPECT_CALL(mock, amqp_consume_message(AmqpMock::defaultConnectionState, NotNull(), nullptr, 0)) + .WillOnce(DoAll( + SetArgPointee<1>(DefaultEnvelope), + Return(amqp_rpc_reply_t { + static_cast(AMQP_RESPONSE_LIBRARY_EXCEPTION), + { 0, nullptr }, + AMQP_STATUS_UNEXPECTED_STATE }))); + EXPECT_CALL(mock, amqp_simple_wait_frame_noblock(AmqpMock::defaultConnectionState, NotNull(), Pointee(timeval { 0, 0 }))) + .WillOnce(DoAll( + SetArgPointee<1>(frame), + Return(0))); + EXPECT_CALL(mock, amqp_destroy_envelope(NotNull())) + .Times(1); + + auto mr = con.consume_message(); + EXPECT_EQ(consume_result_type::acknowledge, mr.type); +} + +TEST(AmqpTest, Connection_consumeMessage_channelClosedMessage) +{ + amqp_frame_t frame { + /* .frame_type */ AMQP_FRAME_METHOD, + /* .channel */ 3, + /* .payload */ { { AMQP_CHANNEL_CLOSE_METHOD, nullptr } } + }; + + connection con; + con.tcp_connect("localhost", 1234); + StrictAmqpMock mock; + InSequence seq; + EXPECT_CALL(mock, amqp_maybe_release_buffers(AmqpMock::defaultConnectionState)) + .Times(1); + EXPECT_CALL(mock, amqp_consume_message(AmqpMock::defaultConnectionState, NotNull(), nullptr, 0)) + .WillOnce(DoAll( + SetArgPointee<1>(DefaultEnvelope), + Return(amqp_rpc_reply_t { + static_cast(AMQP_RESPONSE_LIBRARY_EXCEPTION), + { 0, nullptr }, + AMQP_STATUS_UNEXPECTED_STATE }))); + EXPECT_CALL(mock, amqp_simple_wait_frame_noblock(AmqpMock::defaultConnectionState, NotNull(), Pointee(timeval { 0, 0 }))) + .WillOnce(DoAll( + SetArgPointee<1>(frame), + Return(0))); + EXPECT_CALL(mock, amqp_destroy_envelope(NotNull())) + .Times(1); + + EXPECT_THROW(con.consume_message(), cppamqp::exception); +} + +TEST(AmqpTest, Connection_consumeMessage_connectionClosedMessage) +{ + amqp_frame_t frame { + /* .frame_type */ AMQP_FRAME_METHOD, + /* .channel */ 3, + /* .payload */ { { AMQP_CONNECTION_CLOSE_METHOD, nullptr } } + }; + + connection con; + con.tcp_connect("localhost", 1234); + + StrictAmqpMock mock; + InSequence seq; + EXPECT_CALL(mock, amqp_maybe_release_buffers(AmqpMock::defaultConnectionState)) + .Times(1); + EXPECT_CALL(mock, amqp_consume_message(AmqpMock::defaultConnectionState, NotNull(), nullptr, 0)) + .WillOnce(DoAll( + SetArgPointee<1>(DefaultEnvelope), + Return(amqp_rpc_reply_t { + static_cast(AMQP_RESPONSE_LIBRARY_EXCEPTION), + { 0, nullptr }, + AMQP_STATUS_UNEXPECTED_STATE }))); + EXPECT_CALL(mock, amqp_simple_wait_frame_noblock(AmqpMock::defaultConnectionState, NotNull(), Pointee(timeval { 0, 0 }))) + .WillOnce(DoAll( + SetArgPointee<1>(frame), + Return(0))); + EXPECT_CALL(mock, amqp_destroy_connection(AmqpMock::defaultConnectionState)) + .WillOnce(Return(0)); + EXPECT_CALL(mock, amqp_destroy_envelope(NotNull())) + .Times(1); + + auto mr = con.consume_message(); + EXPECT_EQ(consume_result_type::connection_closed_by_peer, mr.type); +} + +TEST(AmqpTest, Connection_consumeMessage_returnToSender) +{ + amqp_frame_t frame { + /* .frame_type */ AMQP_FRAME_METHOD, + /* .channel */ 3, + /* .payload */ { { AMQP_BASIC_RETURN_METHOD, nullptr } } + }; + + connection con; + con.tcp_connect("localhost", 1234); + StrictAmqpMock mock; + InSequence seq; + EXPECT_CALL(mock, amqp_maybe_release_buffers(AmqpMock::defaultConnectionState)) + .Times(1); + EXPECT_CALL(mock, amqp_consume_message(AmqpMock::defaultConnectionState, NotNull(), nullptr, 0)) + .WillOnce(DoAll( + SetArgPointee<1>(DefaultEnvelope), + Return(amqp_rpc_reply_t { + static_cast(AMQP_RESPONSE_LIBRARY_EXCEPTION), + { 0, nullptr }, + AMQP_STATUS_UNEXPECTED_STATE }))); + EXPECT_CALL(mock, amqp_simple_wait_frame_noblock(AmqpMock::defaultConnectionState, NotNull(), Pointee(timeval { 0, 0 }))) + .WillOnce(DoAll( + SetArgPointee<1>(frame), + Return(0))); + EXPECT_CALL(mock, amqp_read_message(AmqpMock::defaultConnectionState, 3, NotNull(), 0)) + .WillOnce(DoAll( + SetArgPointee<2>(DefaultEnvelope.message), + Return(AmqpMock::defaultRpcReply))); + EXPECT_CALL(mock, amqp_destroy_message(NotNull())) + .Times(1); + EXPECT_CALL(mock, amqp_destroy_envelope(NotNull())) + .Times(1); + + auto mr = con.consume_message(); + EXPECT_EQ(consume_result_type::could_not_deliver_return_to_sender, mr.type); + EXPECT_EQ(std::string("this_is_the_message"), mr.message.body); +} + +TEST(AmqpTest, Connection_close_success) +{ + connection con; + con.tcp_connect("localhost", 1234); + + StrictAmqpMock mock; + InSequence seq; + EXPECT_CALL(mock, amqp_connection_close(AmqpMock::defaultConnectionState, 123)) + .WillOnce(Return(AmqpMock::defaultRpcReply)); + EXPECT_CALL(mock, amqp_destroy_connection(AmqpMock::defaultConnectionState)) + .WillOnce(Return(0)); + + con.close(123); +} + +TEST(AmqpTest, Connection_closeForced_success) +{ + connection con; + con.tcp_connect("localhost", 1234); + + StrictAmqpMock mock; + InSequence seq; + EXPECT_CALL(mock, amqp_destroy_connection(AmqpMock::defaultConnectionState)) + .WillOnce(Return(0)); + + con.close(123, true); +} + +TEST(AmqpTest, Channel_declareQueue_declareFailed) +{ + connection con; + con.tcp_connect("localhost", 1234); + channel channel = con.open_channel(3); + + StrictAmqpMock mock; + InSequence seq; + EXPECT_CALL(mock, amqp_queue_declare(AmqpMock::defaultConnectionState, 3, BytesEq("test_queue"), false, true, false, true, _)) + .WillOnce(Return(nullptr)); + EXPECT_CALL(mock, amqp_get_rpc_reply(AmqpMock::defaultConnectionState)) + .WillOnce(Return(AmqpMock::defaultRpcReply)); + + EXPECT_THROW(channel.declare_queue("test_queue", queue_flags({ queue_flag::durable, queue_flag::auto_delete })), cppamqp::exception); +} + +TEST(AmqpTest, Channel_declareQueue_rpcReplyError) +{ + connection con; + con.tcp_connect("localhost", 1234); + channel channel = con.open_channel(3); + + StrictAmqpMock mock; + InSequence seq; + EXPECT_CALL(mock, amqp_queue_declare(AmqpMock::defaultConnectionState, 3, BytesEq("test_queue"), true, false, false, true, _)) + .WillOnce(Return(nullptr)); + EXPECT_CALL(mock, amqp_get_rpc_reply(AmqpMock::defaultConnectionState)) + .WillOnce(Return(amqp_rpc_reply_t { + static_cast(AMQP_RESPONSE_LIBRARY_EXCEPTION), + { 0, nullptr }, + 666 })); + EXPECT_CALL(mock, amqp_error_string2(666)) + .WillOnce(Return("error from hell >:D")); + + EXPECT_THROW(channel.declare_queue("test_queue", queue_flags({ queue_flag::passive, queue_flag::auto_delete })), cppamqp::exception); +} + +TEST(AmqpTest, Channel_declareQueue_success) +{ + std::string name("blub"); + amqp_queue_declare_ok_t queueDeclareOk { + { name.size(), const_cast(name.data()) }, + 123, + 456 + }; + + connection con; + con.tcp_connect("localhost", 1234); + channel channel = con.open_channel(3); + + StrictAmqpMock mock; + InSequence seq; + EXPECT_CALL(mock, amqp_queue_declare(AmqpMock::defaultConnectionState, 3, BytesEq("test_queue"), true, false, true, false, _)) + .WillOnce(Return(&queueDeclareOk)); + EXPECT_CALL(mock, amqp_get_rpc_reply(AmqpMock::defaultConnectionState)) + .WillOnce(Return(AmqpMock::defaultRpcReply)); + + auto qDecl = channel.declare_queue("test_queue", queue_flags({ queue_flag::passive, queue_flag::exclusive })); + EXPECT_EQ(name, qDecl.name); + EXPECT_EQ(123, qDecl.message_count); + EXPECT_EQ(456, qDecl.consumer_count); +} + +TEST(AmqpTest, Channel_bindQueue_rpcReplyError) +{ + amqp_queue_bind_ok_t dummy { 0 }; + + connection con; + con.tcp_connect("localhost", 1234); + channel channel = con.open_channel(3); + + StrictAmqpMock mock; + InSequence seq; + EXPECT_CALL(mock, amqp_queue_bind(AmqpMock::defaultConnectionState, 3, BytesEq("test_queue"), BytesEq("my_exchange"), BytesEq("the_routing_key"), _)) + .WillOnce(Return(&dummy)); + EXPECT_CALL(mock, amqp_get_rpc_reply(AmqpMock::defaultConnectionState)) + .WillOnce(Return(amqp_rpc_reply_t { + static_cast(AMQP_RESPONSE_LIBRARY_EXCEPTION), + { 0, nullptr }, + 666 })); + EXPECT_CALL(mock, amqp_error_string2(666)) + .WillOnce(Return("error from hell >:D")); + + EXPECT_THROW(channel.bind_queue("test_queue", "my_exchange", "the_routing_key"), cppamqp::exception); +} + +TEST(AmqpTest, Channel_bindQueue_success) +{ + amqp_queue_bind_ok_t dummy { 0 }; + + connection con; + con.tcp_connect("localhost", 1234); + channel channel = con.open_channel(3); + + StrictAmqpMock mock; + InSequence seq; + EXPECT_CALL(mock, amqp_queue_bind(AmqpMock::defaultConnectionState, 3, BytesEq("test_queue"), BytesEq("my_exchange"), BytesEq("the_routing_key"), _)) + .WillOnce(Return(&dummy)); + EXPECT_CALL(mock, amqp_get_rpc_reply(AmqpMock::defaultConnectionState)) + .WillOnce(Return(AmqpMock::defaultRpcReply)); + + EXPECT_NO_THROW(channel.bind_queue("test_queue", "my_exchange", "the_routing_key")); +} + +TEST(AmqpTest, Channel_publish_failed) +{ + connection con; + con.tcp_connect("localhost", 1234); + channel channel = con.open_channel(3); + + StrictAmqpMock mock; + InSequence seq; + EXPECT_CALL(mock, amqp_basic_publish(AmqpMock::defaultConnectionState, 3, BytesEq("my_exchange"), BytesEq("my_routing_key"), false, true, nullptr, BytesEq("the_message"))) + .WillOnce(Return(666)); + EXPECT_CALL(mock, amqp_error_string2(666)) + .WillOnce(Return("error from hell >:D")); + + EXPECT_THROW(channel.publish("my_exchange", "my_routing_key", publish_flags({ publish_flag::immediate }), "the_message", nullptr), cppamqp::exception); +} + +TEST(AmqpTest, Channel_publish_success) +{ + connection con; + con.tcp_connect("localhost", 1234); + channel channel = con.open_channel(3); + + StrictAmqpMock mock; + InSequence seq; + EXPECT_CALL(mock, amqp_basic_publish(AmqpMock::defaultConnectionState, 3, BytesEq("my_exchange"), BytesEq("my_routing_key"), true, false, NotNull(), BytesEq("the_message"))) + .WillOnce(Return(0)); + + publish_options po; + EXPECT_NO_THROW(channel.publish("my_exchange", "my_routing_key", publish_flags({ publish_flag::mandatory }), "the_message", &po)); +} + +TEST(AmqpTest, Channel_consume_consumeFailed) +{ + connection con; + con.tcp_connect("localhost", 1234); + channel channel = con.open_channel(3); + + StrictAmqpMock mock; + InSequence seq; + EXPECT_CALL(mock, amqp_basic_consume(AmqpMock::defaultConnectionState, 3, BytesEq("my_queue"), BytesEq("my_consumer_tag"), true, false, false, _)) + .WillOnce(Return(nullptr)); + EXPECT_CALL(mock, amqp_get_rpc_reply(AmqpMock::defaultConnectionState)) + .WillOnce(Return(AmqpMock::defaultRpcReply)); + + EXPECT_THROW(channel.consume("my_queue", "my_consumer_tag", consume_flags(consume_flag::no_local)), cppamqp::exception); +} + +TEST(AmqpTest, Channel_consume_rpcReplyError) +{ + connection con; + con.tcp_connect("localhost", 1234); + channel channel = con.open_channel(3); + + amqp_basic_consume_ok_t consumeOk { { 7, const_cast("new_tag") } }; + + StrictAmqpMock mock; + InSequence seq; + EXPECT_CALL(mock, amqp_basic_consume(AmqpMock::defaultConnectionState, 3, BytesEq("my_queue"), BytesEq("my_consumer_tag"), false, true, false, _)) + .WillOnce(Return(&consumeOk)); + EXPECT_CALL(mock, amqp_get_rpc_reply(AmqpMock::defaultConnectionState)) + .WillOnce(Return(amqp_rpc_reply_t { + static_cast(AMQP_RESPONSE_LIBRARY_EXCEPTION), + { 0, nullptr }, + 666 })); + EXPECT_CALL(mock, amqp_error_string2(666)) + .WillOnce(Return("error from hell >:D")); + + EXPECT_THROW(channel.consume("my_queue", "my_consumer_tag", consume_flags(consume_flag::no_ack)), cppamqp::exception); +} + +TEST(AmqpTest, Channel_consume_success) +{ + connection con; + con.tcp_connect("localhost", 1234); + channel channel = con.open_channel(3); + + amqp_basic_consume_ok_t consumeOk { { 7, const_cast("new_tag") } }; + + StrictAmqpMock mock; + InSequence seq; + EXPECT_CALL(mock, amqp_basic_consume(AmqpMock::defaultConnectionState, 3, BytesEq("my_queue"), BytesEq("my_consumer_tag"), false, false, true, _)) + .WillOnce(Return(&consumeOk)); + EXPECT_CALL(mock, amqp_get_rpc_reply(AmqpMock::defaultConnectionState)) + .WillOnce(Return(AmqpMock::defaultRpcReply)); + + auto tag = channel.consume("my_queue", "my_consumer_tag", consume_flags(consume_flag::exclusive)); + EXPECT_EQ(std::string("new_tag"), tag); +} + +TEST(AmqpTest, Channel_close_failed) +{ + connection con; + con.tcp_connect("localhost", 1234); + channel channel = con.open_channel(3); + + StrictAmqpMock mock; + InSequence seq; + EXPECT_CALL(mock, amqp_channel_close(AmqpMock::defaultConnectionState, 3, 500)) + .WillOnce(Return(amqp_rpc_reply_t { + static_cast(AMQP_RESPONSE_LIBRARY_EXCEPTION), + { 0, nullptr }, + 666 })); + EXPECT_CALL(mock, amqp_error_string2(666)) + .WillOnce(Return("error from hell >:D")); + + EXPECT_THROW(channel.close(500), cppamqp::exception); +} + +TEST(AmqpTest, Channel_close_success) +{ + connection con; + con.tcp_connect("localhost", 1234); + channel channel = con.open_channel(3); + + StrictAmqpMock mock; + InSequence seq; + EXPECT_CALL(mock, amqp_channel_close(AmqpMock::defaultConnectionState, 3, 200)) + .WillOnce(Return(AmqpMock::defaultRpcReply)); + + EXPECT_NO_THROW(channel.close(200)); +} \ No newline at end of file diff --git a/test/mock.cpp b/test/mock.cpp new file mode 100644 index 0000000..d3368cc --- /dev/null +++ b/test/mock.cpp @@ -0,0 +1,120 @@ +#include + +#include "mock.h" + +const amqp_table_t amqp_empty_table = { 0, nullptr }; + +AmqpMock* amqp_mock_instance = nullptr; + +amqp_connection_state_t AmqpMock::defaultConnectionState = reinterpret_cast(1); +amqp_socket_t* AmqpMock::defaultSocket = reinterpret_cast(2); +amqp_rpc_reply_t AmqpMock::defaultRpcReply { + static_cast(AMQP_RESPONSE_NORMAL), + { 0, nullptr }, + 0 +}; + +void AmqpMock::setInstance(AmqpMock* value) +{ + amqp_mock_instance = value; +} + +void AmqpMock::clearInstance(AmqpMock* value) +{ + if (amqp_mock_instance == value) + amqp_mock_instance = nullptr; +} + +const char* amqp_error_string2(int err) + { return (amqp_mock_instance ? amqp_mock_instance->amqp_error_string2(err) : nullptr); } + +amqp_channel_open_ok_t* amqp_channel_open(amqp_connection_state_t state, amqp_channel_t channel) + { return (amqp_mock_instance ? amqp_mock_instance->amqp_channel_open(state, channel) : nullptr); } + +amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state) + { return (amqp_mock_instance ? amqp_mock_instance->amqp_get_rpc_reply(state) : AmqpMock::defaultRpcReply); } + +amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state, amqp_channel_t channel, int code) + { return (amqp_mock_instance ? amqp_mock_instance->amqp_channel_close(state, channel, code) : amqp_rpc_reply_t { }); } + +amqp_queue_declare_ok_t* amqp_queue_declare(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, amqp_boolean_t passive, amqp_boolean_t durable, amqp_boolean_t exclusive, amqp_boolean_t auto_delete, amqp_table_t arguments) + { return (amqp_mock_instance ? amqp_mock_instance->amqp_queue_declare(state, channel, queue, passive, durable, exclusive, auto_delete, arguments) : nullptr); } + +void amqp_release_buffers(amqp_connection_state_t state) + { if (amqp_mock_instance) amqp_mock_instance->amqp_release_buffers(state); } + +amqp_rpc_reply_t amqp_consume_message(amqp_connection_state_t state, amqp_envelope_t *envelope, struct timeval *timeout, int flags) + { return (amqp_mock_instance ? amqp_mock_instance->amqp_consume_message(state, envelope, timeout, flags) : AmqpMock::defaultRpcReply); } + +int amqp_simple_wait_frame_noblock(amqp_connection_state_t state, amqp_frame_t *decoded_frame, struct timeval *tv) + { return (amqp_mock_instance ? amqp_mock_instance->amqp_simple_wait_frame_noblock(state, decoded_frame, tv) : 0); } + +amqp_rpc_reply_t amqp_read_message(amqp_connection_state_t state, amqp_channel_t channel, amqp_message_t *message, int flags) + { return (amqp_mock_instance ? amqp_mock_instance->amqp_read_message(state, channel, message, flags) : AmqpMock::defaultRpcReply); } + +amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state, int code) + { return (amqp_mock_instance ? amqp_mock_instance->amqp_connection_close(state, code) : AmqpMock::defaultRpcReply); } + +void amqp_destroy_envelope(amqp_envelope_t *envelope) + { if (amqp_mock_instance) amqp_mock_instance->amqp_destroy_envelope(envelope); } + +void amqp_destroy_message(amqp_message_t *message) + { if (amqp_mock_instance) amqp_mock_instance->amqp_destroy_message(message); } + +int amqp_destroy_connection(amqp_connection_state_t state) + { return (amqp_mock_instance ? amqp_mock_instance->amqp_destroy_connection(state) : 0); } + +void amqp_maybe_release_buffers(amqp_connection_state_t state) + { if (amqp_mock_instance) amqp_mock_instance->amqp_maybe_release_buffers(state); } + +amqp_connection_state_t amqp_new_connection(void) + { return (amqp_mock_instance ? amqp_mock_instance->amqp_new_connection() : AmqpMock::defaultConnectionState); } + +amqp_socket_t* amqp_tcp_socket_new(amqp_connection_state_t state) + { return (amqp_mock_instance ? amqp_mock_instance->amqp_tcp_socket_new(state) : AmqpMock::defaultSocket); } + +int amqp_socket_open(amqp_socket_t *self, const char *host, int port) + { return (amqp_mock_instance ? amqp_mock_instance->amqp_socket_open(self, host, port) : 0); } + +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wvarargs" +amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, char const *vhost, int channel_max, int frame_max, int heartbeat, amqp_sasl_method_enum sasl_method, ...) +{ + if (!amqp_mock_instance) + return AmqpMock::defaultRpcReply; + amqp_rpc_reply_t ret; + va_list args; + va_start(args, sasl_method); + switch (sasl_method) + { + case AMQP_SASL_METHOD_PLAIN: + { + const char* username = va_arg(args, const char*); + const char* password = va_arg(args, const char*); + ret = amqp_mock_instance->amqp_login_plain(state, vhost, channel_max, frame_max, heartbeat, sasl_method, username, password); + } + break; + + case AMQP_SASL_METHOD_EXTERNAL: + { + const char* token = va_arg(args, const char*); + ret = amqp_mock_instance->amqp_login_external(state, vhost, channel_max, frame_max, heartbeat, sasl_method, token); + } + break; + + default: + ret = AmqpMock::defaultRpcReply; + } + va_end(args); + return ret; +} +#pragma clang diagnostic pop + +amqp_queue_bind_ok_t* amqp_queue_bind(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, amqp_bytes_t exchange, amqp_bytes_t routing_key, amqp_table_t arguments) + { return (amqp_mock_instance ? amqp_mock_instance->amqp_queue_bind(state, channel, queue, exchange, routing_key, arguments) : nullptr); } + +int amqp_basic_publish(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t exchange, amqp_bytes_t routing_key, amqp_boolean_t mandatory, amqp_boolean_t immediate, struct amqp_basic_properties_t_ const *properties, amqp_bytes_t body) + { return (amqp_mock_instance ? amqp_mock_instance->amqp_basic_publish(state, channel, exchange, routing_key, mandatory, immediate, properties, body) : 0); } + +amqp_basic_consume_ok_t* amqp_basic_consume(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, amqp_bytes_t consumer_tag, amqp_boolean_t no_local, amqp_boolean_t no_ack, amqp_boolean_t exclusive, amqp_table_t arguments) + { return (amqp_mock_instance ? amqp_mock_instance->amqp_basic_consume(state, channel, queue, consumer_tag, no_local, no_ack, exclusive, arguments) : nullptr); } \ No newline at end of file diff --git a/test/mock.h b/test/mock.h new file mode 100644 index 0000000..091cb96 --- /dev/null +++ b/test/mock.h @@ -0,0 +1,47 @@ +#pragma once + +#include +#include + +#include + +struct AmqpMock +{ +private: + static void setInstance(AmqpMock* value); + static void clearInstance(AmqpMock* value); + +public: + MOCK_METHOD1(amqp_error_string2, const char* (int err)); + MOCK_METHOD2(amqp_channel_open, amqp_channel_open_ok_t* (amqp_connection_state_t state, amqp_channel_t channel)); + MOCK_METHOD1(amqp_get_rpc_reply, amqp_rpc_reply_t (amqp_connection_state_t state)); + MOCK_METHOD3(amqp_channel_close, amqp_rpc_reply_t (amqp_connection_state_t state, amqp_channel_t channel, int code)); + MOCK_METHOD8(amqp_queue_declare, amqp_queue_declare_ok_t* (amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, amqp_boolean_t passive, amqp_boolean_t durable, amqp_boolean_t exclusive, amqp_boolean_t auto_delete, amqp_table_t arguments)); + MOCK_METHOD1(amqp_release_buffers, void (amqp_connection_state_t state)); + MOCK_METHOD4(amqp_consume_message, amqp_rpc_reply_t (amqp_connection_state_t state, amqp_envelope_t *envelope, struct timeval *timeout, int flags)); + MOCK_METHOD3(amqp_simple_wait_frame_noblock, int (amqp_connection_state_t state, amqp_frame_t *decoded_frame, struct timeval *tv)); + MOCK_METHOD4(amqp_read_message, amqp_rpc_reply_t (amqp_connection_state_t state, amqp_channel_t channel, amqp_message_t *message, int flags)); + MOCK_METHOD2(amqp_connection_close, amqp_rpc_reply_t (amqp_connection_state_t state, int code)); + MOCK_METHOD1(amqp_destroy_envelope, void (amqp_envelope_t *envelope)); + MOCK_METHOD1(amqp_destroy_message, void (amqp_message_t *message)); + MOCK_METHOD1(amqp_destroy_connection, int (amqp_connection_state_t state)); + MOCK_METHOD1(amqp_maybe_release_buffers, void (amqp_connection_state_t state)); + MOCK_METHOD0(amqp_new_connection, amqp_connection_state_t (void)); + MOCK_METHOD1(amqp_tcp_socket_new, amqp_socket_t* (amqp_connection_state_t state)); + MOCK_METHOD3(amqp_socket_open, int (amqp_socket_t *self, const char *host, int port)); + MOCK_METHOD8(amqp_login_plain, amqp_rpc_reply_t (amqp_connection_state_t state, char const *vhost, int channel_max, int frame_max, int heartbeat, amqp_sasl_method_enum sasl_method, const char* user, const char* pw)); + MOCK_METHOD7(amqp_login_external, amqp_rpc_reply_t (amqp_connection_state_t state, char const *vhost, int channel_max, int frame_max, int heartbeat, amqp_sasl_method_enum sasl_method, const char* token)); + MOCK_METHOD6(amqp_queue_bind, amqp_queue_bind_ok_t* (amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, amqp_bytes_t exchange, amqp_bytes_t routing_key, amqp_table_t arguments)); + MOCK_METHOD8(amqp_basic_publish, int (amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t exchange, amqp_bytes_t routing_key, amqp_boolean_t mandatory, amqp_boolean_t immediate, struct amqp_basic_properties_t_ const *properties, amqp_bytes_t body)); + MOCK_METHOD8(amqp_basic_consume, amqp_basic_consume_ok_t* (amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, amqp_bytes_t consumer_tag, amqp_boolean_t no_local, amqp_boolean_t no_ack, amqp_boolean_t exclusive, amqp_table_t arguments)); + + static amqp_connection_state_t defaultConnectionState; + static amqp_socket_t* defaultSocket; + static amqp_rpc_reply_t defaultRpcReply; + + AmqpMock() + { setInstance(this); } + + ~AmqpMock() + { clearInstance(this); } +}; \ No newline at end of file