#include #include #include "mock.h" using namespace ::testing; using namespace ::cppamqp; using StrictAmqpMock = StrictMock; amqp_envelope_t DefaultEnvelope { /* .channel */ 3, /* .consumer_tag */ { 15, const_cast("my_consumer_tag") }, /* .delivery_tag */ 12345678, /* .redelivered */ true, /* .exchange */ { 11, const_cast("my_exchange") }, /* .routing_key */ { 14, const_cast("my_routing_key") }, /* .message */ { /* .properties */ { /* ._flags */ 0, /* .content_type */ { 0, nullptr }, /* .content_encoding */ { 0, nullptr }, /* .headers */ { 0, nullptr }, /* .delivery_mode */ 0, /* .priority */ 0, /* .correlation_id */ { 0, nullptr }, /* .reply_to */ { 0, nullptr }, /* .expiration */ { 0, nullptr }, /* .message_id */ { 0, nullptr }, /* .timestamp */ 0, /* .type */ { 0, nullptr }, /* .user_id */ { 0, nullptr }, /* .app_id */ { 0, nullptr }, /* .cluster_id */ { 0, nullptr }, }, /* .body */ { 19, const_cast("this_is_the_message") }, /* .pool */ { /* .pagesize */ 0, /* .pages */ { 0, nullptr }, /* .large_blocks */ { 0, nullptr }, /* .next_page */ 0, /* .alloc_block */ nullptr, /* .alloc_used */ 0 } } }; bool operator ==(const timeval& a, const timeval& b) { return a.tv_sec == b.tv_sec && a.tv_usec == b.tv_usec; } MATCHER_P(BytesEq, data, "") { if ( data == nullptr && arg.len == 0) return true; std::string d(data); return arg.len == d.size() && memcmp(arg.bytes, d.data(), arg.len) == 0; } TEST(AmqpTest, Connection_connect_newConnectionFailed) { StrictAmqpMock mock; InSequence seq; EXPECT_CALL(mock, amqp_new_connection()) .WillOnce(Return(nullptr)); connection con; EXPECT_THROW(con.tcp_connect("localhost", 1234), cppamqp::exception); } TEST(AmqpTest, Connection_connect_newSocketFailed) { StrictAmqpMock mock; InSequence seq; EXPECT_CALL(mock, amqp_new_connection()) .WillOnce(Return(reinterpret_cast(1))); EXPECT_CALL(mock, amqp_tcp_socket_new(reinterpret_cast(1))) .WillOnce(Return(nullptr)); EXPECT_CALL(mock, amqp_destroy_connection(reinterpret_cast(1))) .WillOnce(Return(0)); connection con; EXPECT_THROW(con.tcp_connect("localhost", 1234), cppamqp::exception); } TEST(AmqpTest, Connection_connect_openSocketFailed) { StrictAmqpMock mock; InSequence seq; EXPECT_CALL(mock, amqp_new_connection()) .WillOnce(Return(reinterpret_cast(1))); EXPECT_CALL(mock, amqp_tcp_socket_new(reinterpret_cast(1))) .WillOnce(Return(reinterpret_cast(2))); EXPECT_CALL(mock, amqp_socket_open(reinterpret_cast(2), StrEq("localhost"), 1234)) .WillOnce(Return(666)); EXPECT_CALL(mock, amqp_error_string2(666)) .WillOnce(Return("error from hell >:D")); EXPECT_CALL(mock, amqp_destroy_connection(reinterpret_cast(1))) .WillOnce(Return(0)); connection con; EXPECT_THROW(con.tcp_connect("localhost", 1234), cppamqp::exception); } TEST(AmqpTest, Connection_connect_success) { StrictAmqpMock mock; InSequence seq; EXPECT_CALL(mock, amqp_new_connection()) .WillOnce(Return(reinterpret_cast(1))); EXPECT_CALL(mock, amqp_tcp_socket_new(reinterpret_cast(1))) .WillOnce(Return(reinterpret_cast(2))); EXPECT_CALL(mock, amqp_socket_open(reinterpret_cast(2), StrEq("localhost"), 1234)) .WillOnce(Return(0)); EXPECT_CALL(mock, amqp_connection_close(reinterpret_cast(1), 200)) .WillOnce(Return(AmqpMock::defaultRpcReply)); EXPECT_CALL(mock, amqp_destroy_connection(reinterpret_cast(1))) .WillOnce(Return(0)); connection con; EXPECT_NO_THROW(con.tcp_connect("localhost", 1234)); } TEST(AmqpTest, Connection_loginPlain_failed) { connection con; con.tcp_connect("localhost", 1234); StrictAmqpMock mock; InSequence seq; EXPECT_CALL(mock, amqp_login_plain(AmqpMock::defaultConnectionState, StrEq("vhost"), 100, 200, 0, AMQP_SASL_METHOD_PLAIN, StrEq("username"), StrEq("password"))) .WillOnce(Return(amqp_rpc_reply_t { static_cast(AMQP_RESPONSE_LIBRARY_EXCEPTION), { 0, nullptr }, 666 })); EXPECT_CALL(mock, amqp_error_string2(666)) .WillOnce(Return("error from hell >:D")); EXPECT_THROW(con.login_plain("username", "password", "vhost", 100, 200), cppamqp::exception); } TEST(AmqpTest, Connection_loginPlain_success) { connection con; con.tcp_connect("localhost", 1234); StrictAmqpMock mock; InSequence seq; EXPECT_CALL(mock, amqp_login_plain(AmqpMock::defaultConnectionState, StrEq("vhost"), 100, 200, 0, AMQP_SASL_METHOD_PLAIN, StrEq("username"), StrEq("password"))) .WillOnce(Return(AmqpMock::defaultRpcReply)); EXPECT_NO_THROW(con.login_plain("username", "password", "vhost", 100, 200)); } TEST(AmqpTest, Connection_openChannel_failed) { std::string channelId("test_channel"); amqp_channel_open_ok_t channelOk { { channelId.size(), const_cast(channelId.c_str()) } }; connection con; con.tcp_connect("localhost", 1234); StrictAmqpMock mock; InSequence seq; EXPECT_CALL(mock, amqp_channel_open(AmqpMock::defaultConnectionState, 3)) .WillOnce(Return(&channelOk)); EXPECT_CALL(mock, amqp_get_rpc_reply(AmqpMock::defaultConnectionState)) .WillOnce(Return(amqp_rpc_reply_t { static_cast(AMQP_RESPONSE_LIBRARY_EXCEPTION), { 0, nullptr }, 666 })); EXPECT_CALL(mock, amqp_error_string2(666)) .WillOnce(Return("error from hell >:D")); EXPECT_THROW(con.open_channel(3), cppamqp::exception); } TEST(AmqpTest, Connection_openChannel_success) { std::string channelId("test_channel"); amqp_channel_open_ok_t channelOk { { channelId.size(), const_cast(channelId.c_str()) } }; connection con; con.tcp_connect("localhost", 1234); StrictAmqpMock mock; InSequence seq; EXPECT_CALL(mock, amqp_channel_open(AmqpMock::defaultConnectionState, 3)) .WillOnce(Return(&channelOk)); EXPECT_CALL(mock, amqp_get_rpc_reply(AmqpMock::defaultConnectionState)) .WillOnce(Return(AmqpMock::defaultRpcReply)); EXPECT_CALL(mock, amqp_channel_close(AmqpMock::defaultConnectionState, 3, AMQP_REPLY_SUCCESS)) .WillOnce(Return(AmqpMock::defaultRpcReply)); EXPECT_NO_THROW(con.open_channel(3)); } TEST(AmqpTest, Connection_consumeMessage_success) { connection con; con.tcp_connect("localhost", 1234); StrictAmqpMock mock; InSequence seq; EXPECT_CALL(mock, amqp_maybe_release_buffers(AmqpMock::defaultConnectionState)) .Times(1); EXPECT_CALL(mock, amqp_consume_message(AmqpMock::defaultConnectionState, NotNull(), nullptr, 0)) .WillOnce(DoAll(SetArgPointee<1>(DefaultEnvelope), Return(AmqpMock::defaultRpcReply))); EXPECT_CALL(mock, amqp_destroy_envelope(NotNull())) .Times(1); auto mr = con.consume_message(); EXPECT_EQ(consume_result_type::success, mr.type); EXPECT_EQ(std::string("this_is_the_message"), mr.message.body); EXPECT_EQ(3, mr.channel); EXPECT_EQ(std::string("my_consumer_tag"), mr.consumer_tag); EXPECT_EQ(12345678, mr.delivery_tag); EXPECT_EQ(true, mr.redelivered); EXPECT_EQ(std::string("my_exchange"), mr.exchange); EXPECT_EQ(std::string("my_routing_key"), mr.routing_key); } TEST(AmqpTest, Connection_consumeMessage_nonLibraryError) { connection con; con.tcp_connect("localhost", 1234); StrictAmqpMock mock; InSequence seq; EXPECT_CALL(mock, amqp_maybe_release_buffers(AmqpMock::defaultConnectionState)) .Times(1); EXPECT_CALL(mock, amqp_consume_message(AmqpMock::defaultConnectionState, NotNull(), nullptr, 0)) .WillOnce( DoAll(SetArgPointee<1>(DefaultEnvelope), Return(amqp_rpc_reply_t { static_cast(AMQP_RESPONSE_SERVER_EXCEPTION), { 0, nullptr }, 666 }))); EXPECT_CALL(mock, amqp_destroy_envelope(NotNull())) .Times(1); EXPECT_THROW(con.consume_message(), cppamqp::exception); } TEST(AmqpTest, Connection_consumeMessage_nonUnexpectedState) { connection con; con.tcp_connect("localhost", 1234); StrictAmqpMock mock; InSequence seq; EXPECT_CALL(mock, amqp_maybe_release_buffers(AmqpMock::defaultConnectionState)) .Times(1); EXPECT_CALL(mock, amqp_consume_message(AmqpMock::defaultConnectionState, NotNull(), nullptr, 0)) .WillOnce( DoAll(SetArgPointee<1>(DefaultEnvelope), Return(amqp_rpc_reply_t { static_cast(AMQP_RESPONSE_LIBRARY_EXCEPTION), { 0, nullptr }, 666 }))); EXPECT_CALL(mock, amqp_error_string2(666)) .WillOnce(Return("error from hell >:D")); EXPECT_CALL(mock, amqp_destroy_envelope(NotNull())) .Times(1); EXPECT_THROW(con.consume_message(), cppamqp::exception); } TEST(AmqpTest, Connection_consumeMessage_waitFrameError) { connection con; con.tcp_connect("localhost", 1234); StrictAmqpMock mock; InSequence seq; EXPECT_CALL(mock, amqp_maybe_release_buffers(AmqpMock::defaultConnectionState)) .Times(1); EXPECT_CALL(mock, amqp_consume_message(AmqpMock::defaultConnectionState, NotNull(), nullptr, 0)) .WillOnce(DoAll( SetArgPointee<1>(DefaultEnvelope), Return(amqp_rpc_reply_t { static_cast(AMQP_RESPONSE_LIBRARY_EXCEPTION), { 0, nullptr }, AMQP_STATUS_UNEXPECTED_STATE }))); EXPECT_CALL(mock, amqp_simple_wait_frame_noblock(AmqpMock::defaultConnectionState, NotNull(), Pointee(timeval { 0, 0 }))) .WillOnce(Return(666)); EXPECT_CALL(mock, amqp_error_string2(666)) .WillOnce(Return("error from hell >:D")); EXPECT_CALL(mock, amqp_destroy_envelope(NotNull())) .Times(1); EXPECT_THROW(con.consume_message(), cppamqp::exception); } TEST(AmqpTest, Connection_consumeMessage_noMethodFrame) { amqp_frame_t frame { /* .frame_type */ AMQP_FRAME_HEADER, /* .channel */ 3, /* .payload */ { { 0, nullptr } } }; connection con; con.tcp_connect("localhost", 1234); StrictAmqpMock mock; InSequence seq; EXPECT_CALL(mock, amqp_maybe_release_buffers(AmqpMock::defaultConnectionState)) .Times(1); EXPECT_CALL(mock, amqp_consume_message(AmqpMock::defaultConnectionState, NotNull(), nullptr, 0)) .WillOnce(DoAll( SetArgPointee<1>(DefaultEnvelope), Return(amqp_rpc_reply_t { static_cast(AMQP_RESPONSE_LIBRARY_EXCEPTION), { 0, nullptr }, AMQP_STATUS_UNEXPECTED_STATE }))); EXPECT_CALL(mock, amqp_simple_wait_frame_noblock(AmqpMock::defaultConnectionState, NotNull(), Pointee(timeval { 0, 0 }))) .WillOnce(DoAll( SetArgPointee<1>(frame), Return(0))); EXPECT_CALL(mock, amqp_destroy_envelope(NotNull())) .Times(1); EXPECT_THROW(con.consume_message(), cppamqp::exception); } TEST(AmqpTest, Connection_consumeMessage_ackMessage) { amqp_frame_t frame { /* .frame_type */ AMQP_FRAME_METHOD, /* .channel */ 3, /* .payload */ { { AMQP_BASIC_ACK_METHOD, nullptr } } }; connection con; con.tcp_connect("localhost", 1234); StrictAmqpMock mock; InSequence seq; EXPECT_CALL(mock, amqp_maybe_release_buffers(AmqpMock::defaultConnectionState)) .Times(1); EXPECT_CALL(mock, amqp_consume_message(AmqpMock::defaultConnectionState, NotNull(), nullptr, 0)) .WillOnce(DoAll( SetArgPointee<1>(DefaultEnvelope), Return(amqp_rpc_reply_t { static_cast(AMQP_RESPONSE_LIBRARY_EXCEPTION), { 0, nullptr }, AMQP_STATUS_UNEXPECTED_STATE }))); EXPECT_CALL(mock, amqp_simple_wait_frame_noblock(AmqpMock::defaultConnectionState, NotNull(), Pointee(timeval { 0, 0 }))) .WillOnce(DoAll( SetArgPointee<1>(frame), Return(0))); EXPECT_CALL(mock, amqp_destroy_envelope(NotNull())) .Times(1); auto mr = con.consume_message(); EXPECT_EQ(consume_result_type::acknowledge, mr.type); } TEST(AmqpTest, Connection_consumeMessage_channelClosedMessage) { amqp_frame_t frame { /* .frame_type */ AMQP_FRAME_METHOD, /* .channel */ 3, /* .payload */ { { AMQP_CHANNEL_CLOSE_METHOD, nullptr } } }; connection con; con.tcp_connect("localhost", 1234); StrictAmqpMock mock; InSequence seq; EXPECT_CALL(mock, amqp_maybe_release_buffers(AmqpMock::defaultConnectionState)) .Times(1); EXPECT_CALL(mock, amqp_consume_message(AmqpMock::defaultConnectionState, NotNull(), nullptr, 0)) .WillOnce(DoAll( SetArgPointee<1>(DefaultEnvelope), Return(amqp_rpc_reply_t { static_cast(AMQP_RESPONSE_LIBRARY_EXCEPTION), { 0, nullptr }, AMQP_STATUS_UNEXPECTED_STATE }))); EXPECT_CALL(mock, amqp_simple_wait_frame_noblock(AmqpMock::defaultConnectionState, NotNull(), Pointee(timeval { 0, 0 }))) .WillOnce(DoAll( SetArgPointee<1>(frame), Return(0))); EXPECT_CALL(mock, amqp_destroy_envelope(NotNull())) .Times(1); EXPECT_THROW(con.consume_message(), cppamqp::exception); } TEST(AmqpTest, Connection_consumeMessage_connectionClosedMessage) { amqp_frame_t frame { /* .frame_type */ AMQP_FRAME_METHOD, /* .channel */ 3, /* .payload */ { { AMQP_CONNECTION_CLOSE_METHOD, nullptr } } }; connection con; con.tcp_connect("localhost", 1234); StrictAmqpMock mock; InSequence seq; EXPECT_CALL(mock, amqp_maybe_release_buffers(AmqpMock::defaultConnectionState)) .Times(1); EXPECT_CALL(mock, amqp_consume_message(AmqpMock::defaultConnectionState, NotNull(), nullptr, 0)) .WillOnce(DoAll( SetArgPointee<1>(DefaultEnvelope), Return(amqp_rpc_reply_t { static_cast(AMQP_RESPONSE_LIBRARY_EXCEPTION), { 0, nullptr }, AMQP_STATUS_UNEXPECTED_STATE }))); EXPECT_CALL(mock, amqp_simple_wait_frame_noblock(AmqpMock::defaultConnectionState, NotNull(), Pointee(timeval { 0, 0 }))) .WillOnce(DoAll( SetArgPointee<1>(frame), Return(0))); EXPECT_CALL(mock, amqp_destroy_connection(AmqpMock::defaultConnectionState)) .WillOnce(Return(0)); EXPECT_CALL(mock, amqp_destroy_envelope(NotNull())) .Times(1); auto mr = con.consume_message(); EXPECT_EQ(consume_result_type::connection_closed_by_peer, mr.type); } TEST(AmqpTest, Connection_consumeMessage_returnToSender) { amqp_frame_t frame { /* .frame_type */ AMQP_FRAME_METHOD, /* .channel */ 3, /* .payload */ { { AMQP_BASIC_RETURN_METHOD, nullptr } } }; connection con; con.tcp_connect("localhost", 1234); StrictAmqpMock mock; InSequence seq; EXPECT_CALL(mock, amqp_maybe_release_buffers(AmqpMock::defaultConnectionState)) .Times(1); EXPECT_CALL(mock, amqp_consume_message(AmqpMock::defaultConnectionState, NotNull(), nullptr, 0)) .WillOnce(DoAll( SetArgPointee<1>(DefaultEnvelope), Return(amqp_rpc_reply_t { static_cast(AMQP_RESPONSE_LIBRARY_EXCEPTION), { 0, nullptr }, AMQP_STATUS_UNEXPECTED_STATE }))); EXPECT_CALL(mock, amqp_simple_wait_frame_noblock(AmqpMock::defaultConnectionState, NotNull(), Pointee(timeval { 0, 0 }))) .WillOnce(DoAll( SetArgPointee<1>(frame), Return(0))); EXPECT_CALL(mock, amqp_read_message(AmqpMock::defaultConnectionState, 3, NotNull(), 0)) .WillOnce(DoAll( SetArgPointee<2>(DefaultEnvelope.message), Return(AmqpMock::defaultRpcReply))); EXPECT_CALL(mock, amqp_destroy_message(NotNull())) .Times(1); EXPECT_CALL(mock, amqp_destroy_envelope(NotNull())) .Times(1); auto mr = con.consume_message(); EXPECT_EQ(consume_result_type::could_not_deliver_return_to_sender, mr.type); EXPECT_EQ(std::string("this_is_the_message"), mr.message.body); } TEST(AmqpTest, Connection_close_success) { connection con; con.tcp_connect("localhost", 1234); StrictAmqpMock mock; InSequence seq; EXPECT_CALL(mock, amqp_connection_close(AmqpMock::defaultConnectionState, 123)) .WillOnce(Return(AmqpMock::defaultRpcReply)); EXPECT_CALL(mock, amqp_destroy_connection(AmqpMock::defaultConnectionState)) .WillOnce(Return(0)); con.close(123); } TEST(AmqpTest, Connection_closeForced_success) { connection con; con.tcp_connect("localhost", 1234); StrictAmqpMock mock; InSequence seq; EXPECT_CALL(mock, amqp_destroy_connection(AmqpMock::defaultConnectionState)) .WillOnce(Return(0)); con.close(123, true); } TEST(AmqpTest, Channel_declareQueue_declareFailed) { connection con; con.tcp_connect("localhost", 1234); channel channel = con.open_channel(3); StrictAmqpMock mock; InSequence seq; EXPECT_CALL(mock, amqp_queue_declare(AmqpMock::defaultConnectionState, 3, BytesEq("test_queue"), false, true, false, true, _)) .WillOnce(Return(nullptr)); EXPECT_CALL(mock, amqp_get_rpc_reply(AmqpMock::defaultConnectionState)) .WillOnce(Return(AmqpMock::defaultRpcReply)); EXPECT_THROW(channel.declare_queue("test_queue", queue_flags({ queue_flag::durable, queue_flag::auto_delete })), cppamqp::exception); } TEST(AmqpTest, Channel_declareQueue_rpcReplyError) { connection con; con.tcp_connect("localhost", 1234); channel channel = con.open_channel(3); StrictAmqpMock mock; InSequence seq; EXPECT_CALL(mock, amqp_queue_declare(AmqpMock::defaultConnectionState, 3, BytesEq("test_queue"), true, false, false, true, _)) .WillOnce(Return(nullptr)); EXPECT_CALL(mock, amqp_get_rpc_reply(AmqpMock::defaultConnectionState)) .WillOnce(Return(amqp_rpc_reply_t { static_cast(AMQP_RESPONSE_LIBRARY_EXCEPTION), { 0, nullptr }, 666 })); EXPECT_CALL(mock, amqp_error_string2(666)) .WillOnce(Return("error from hell >:D")); EXPECT_THROW(channel.declare_queue("test_queue", queue_flags({ queue_flag::passive, queue_flag::auto_delete })), cppamqp::exception); } TEST(AmqpTest, Channel_declareQueue_success) { std::string name("blub"); amqp_queue_declare_ok_t queueDeclareOk { { name.size(), const_cast(name.data()) }, 123, 456 }; connection con; con.tcp_connect("localhost", 1234); channel channel = con.open_channel(3); StrictAmqpMock mock; InSequence seq; EXPECT_CALL(mock, amqp_queue_declare(AmqpMock::defaultConnectionState, 3, BytesEq("test_queue"), true, false, true, false, _)) .WillOnce(Return(&queueDeclareOk)); EXPECT_CALL(mock, amqp_get_rpc_reply(AmqpMock::defaultConnectionState)) .WillOnce(Return(AmqpMock::defaultRpcReply)); auto qDecl = channel.declare_queue("test_queue", queue_flags({ queue_flag::passive, queue_flag::exclusive })); EXPECT_EQ(name, qDecl.name); EXPECT_EQ(123, qDecl.message_count); EXPECT_EQ(456, qDecl.consumer_count); } TEST(AmqpTest, Channel_bindQueue_rpcReplyError) { amqp_queue_bind_ok_t dummy { 0 }; connection con; con.tcp_connect("localhost", 1234); channel channel = con.open_channel(3); StrictAmqpMock mock; InSequence seq; EXPECT_CALL(mock, amqp_queue_bind(AmqpMock::defaultConnectionState, 3, BytesEq("test_queue"), BytesEq("my_exchange"), BytesEq("the_routing_key"), _)) .WillOnce(Return(&dummy)); EXPECT_CALL(mock, amqp_get_rpc_reply(AmqpMock::defaultConnectionState)) .WillOnce(Return(amqp_rpc_reply_t { static_cast(AMQP_RESPONSE_LIBRARY_EXCEPTION), { 0, nullptr }, 666 })); EXPECT_CALL(mock, amqp_error_string2(666)) .WillOnce(Return("error from hell >:D")); EXPECT_THROW(channel.bind_queue("test_queue", "my_exchange", "the_routing_key"), cppamqp::exception); } TEST(AmqpTest, Channel_bindQueue_success) { amqp_queue_bind_ok_t dummy { 0 }; connection con; con.tcp_connect("localhost", 1234); channel channel = con.open_channel(3); StrictAmqpMock mock; InSequence seq; EXPECT_CALL(mock, amqp_queue_bind(AmqpMock::defaultConnectionState, 3, BytesEq("test_queue"), BytesEq("my_exchange"), BytesEq("the_routing_key"), _)) .WillOnce(Return(&dummy)); EXPECT_CALL(mock, amqp_get_rpc_reply(AmqpMock::defaultConnectionState)) .WillOnce(Return(AmqpMock::defaultRpcReply)); EXPECT_NO_THROW(channel.bind_queue("test_queue", "my_exchange", "the_routing_key")); } TEST(AmqpTest, Channel_publish_failed) { connection con; con.tcp_connect("localhost", 1234); channel channel = con.open_channel(3); StrictAmqpMock mock; InSequence seq; EXPECT_CALL(mock, amqp_basic_publish(AmqpMock::defaultConnectionState, 3, BytesEq("my_exchange"), BytesEq("my_routing_key"), false, true, nullptr, BytesEq("the_message"))) .WillOnce(Return(666)); EXPECT_CALL(mock, amqp_error_string2(666)) .WillOnce(Return("error from hell >:D")); EXPECT_THROW(channel.publish("my_exchange", "my_routing_key", publish_flags({ publish_flag::immediate }), "the_message", nullptr), cppamqp::exception); } TEST(AmqpTest, Channel_publish_success) { connection con; con.tcp_connect("localhost", 1234); channel channel = con.open_channel(3); StrictAmqpMock mock; InSequence seq; EXPECT_CALL(mock, amqp_basic_publish(AmqpMock::defaultConnectionState, 3, BytesEq("my_exchange"), BytesEq("my_routing_key"), true, false, NotNull(), BytesEq("the_message"))) .WillOnce(Return(0)); publish_options po; EXPECT_NO_THROW(channel.publish("my_exchange", "my_routing_key", publish_flags({ publish_flag::mandatory }), "the_message", &po)); } TEST(AmqpTest, Channel_consume_consumeFailed) { connection con; con.tcp_connect("localhost", 1234); channel channel = con.open_channel(3); StrictAmqpMock mock; InSequence seq; EXPECT_CALL(mock, amqp_basic_consume(AmqpMock::defaultConnectionState, 3, BytesEq("my_queue"), BytesEq("my_consumer_tag"), true, false, false, _)) .WillOnce(Return(nullptr)); EXPECT_CALL(mock, amqp_get_rpc_reply(AmqpMock::defaultConnectionState)) .WillOnce(Return(AmqpMock::defaultRpcReply)); EXPECT_THROW(channel.consume("my_queue", "my_consumer_tag", consume_flags(consume_flag::no_local)), cppamqp::exception); } TEST(AmqpTest, Channel_consume_rpcReplyError) { connection con; con.tcp_connect("localhost", 1234); channel channel = con.open_channel(3); amqp_basic_consume_ok_t consumeOk { { 7, const_cast("new_tag") } }; StrictAmqpMock mock; InSequence seq; EXPECT_CALL(mock, amqp_basic_consume(AmqpMock::defaultConnectionState, 3, BytesEq("my_queue"), BytesEq("my_consumer_tag"), false, true, false, _)) .WillOnce(Return(&consumeOk)); EXPECT_CALL(mock, amqp_get_rpc_reply(AmqpMock::defaultConnectionState)) .WillOnce(Return(amqp_rpc_reply_t { static_cast(AMQP_RESPONSE_LIBRARY_EXCEPTION), { 0, nullptr }, 666 })); EXPECT_CALL(mock, amqp_error_string2(666)) .WillOnce(Return("error from hell >:D")); EXPECT_THROW(channel.consume("my_queue", "my_consumer_tag", consume_flags(consume_flag::no_ack)), cppamqp::exception); } TEST(AmqpTest, Channel_consume_success) { connection con; con.tcp_connect("localhost", 1234); channel channel = con.open_channel(3); amqp_basic_consume_ok_t consumeOk { { 7, const_cast("new_tag") } }; StrictAmqpMock mock; InSequence seq; EXPECT_CALL(mock, amqp_basic_consume(AmqpMock::defaultConnectionState, 3, BytesEq("my_queue"), BytesEq("my_consumer_tag"), false, false, true, _)) .WillOnce(Return(&consumeOk)); EXPECT_CALL(mock, amqp_get_rpc_reply(AmqpMock::defaultConnectionState)) .WillOnce(Return(AmqpMock::defaultRpcReply)); auto tag = channel.consume("my_queue", "my_consumer_tag", consume_flags(consume_flag::exclusive)); EXPECT_EQ(std::string("new_tag"), tag); } TEST(AmqpTest, Channel_close_failed) { connection con; con.tcp_connect("localhost", 1234); channel channel = con.open_channel(3); StrictAmqpMock mock; InSequence seq; EXPECT_CALL(mock, amqp_channel_close(AmqpMock::defaultConnectionState, 3, 500)) .WillOnce(Return(amqp_rpc_reply_t { static_cast(AMQP_RESPONSE_LIBRARY_EXCEPTION), { 0, nullptr }, 666 })); EXPECT_CALL(mock, amqp_error_string2(666)) .WillOnce(Return("error from hell >:D")); EXPECT_THROW(channel.close(500), cppamqp::exception); } TEST(AmqpTest, Channel_close_success) { connection con; con.tcp_connect("localhost", 1234); channel channel = con.open_channel(3); StrictAmqpMock mock; InSequence seq; EXPECT_CALL(mock, amqp_channel_close(AmqpMock::defaultConnectionState, 3, 200)) .WillOnce(Return(AmqpMock::defaultRpcReply)); EXPECT_NO_THROW(channel.close(200)); }