Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.

142 рядки
4.7 KiB

  1. #include <cppamqp/helper.inl>
  2. #include <cppamqp/channel.inl>
  3. #include <cppamqp/exception.inl>
  4. #include <cppamqp/connection.inl>
  5. using namespace ::cppamqp;
  6. channel::internal::internal(const cppamqp::connection& p_connection, channel_number p_handle)
  7. : connection(p_connection)
  8. , handle (p_handle)
  9. {
  10. if (handle <= 0)
  11. throw utl::argument_exception("handle", "handle must be greater than 0");
  12. amqp_channel_open(connection.handle(), handle);
  13. __impl::check_and_raise(
  14. amqp_get_rpc_reply(connection.handle()),
  15. std::string("unable to open channel ") + std::to_string(handle),
  16. false);
  17. }
  18. channel::internal::~internal()
  19. {
  20. if (handle <= 0 || !static_cast<bool>(connection))
  21. return;
  22. __impl::check_and_raise(
  23. amqp_channel_close(connection.handle(), handle, AMQP_REPLY_SUCCESS),
  24. "unable to close amqp channel",
  25. true);
  26. }
  27. queue_declaration channel::declare_queue(const std::string& name, const queue_flags& flags)
  28. {
  29. auto ret = amqp_queue_declare(
  30. connection().handle(),
  31. handle(),
  32. __impl::make_bytes(name),
  33. flags.is_set(queue_flag::passive) ? 1 : 0,
  34. flags.is_set(queue_flag::durable) ? 1 : 0,
  35. flags.is_set(queue_flag::exclusive) ? 1 : 0,
  36. flags.is_set(queue_flag::auto_delete) ? 1 : 0,
  37. amqp_empty_table);
  38. __impl::check_and_raise(
  39. amqp_get_rpc_reply(connection().handle()),
  40. std::string("unable to declare queue '") + name + "'",
  41. false);
  42. if (!ret)
  43. throw exception(std::string("unable to declare queue '") + name + "'");
  44. return queue_declaration
  45. {
  46. std::string(static_cast<const char*>(ret->queue.bytes), ret->queue.len),
  47. ret->message_count,
  48. ret->consumer_count
  49. };
  50. }
  51. void channel::bind_queue(const std::string& queue, const std::string& exchange, const std::string& routing_key)
  52. {
  53. amqp_queue_bind(
  54. connection().handle(),
  55. handle(),
  56. __impl::make_bytes(queue),
  57. __impl::make_bytes(exchange),
  58. __impl::make_bytes(routing_key),
  59. amqp_empty_table);
  60. __impl::check_and_raise(
  61. amqp_get_rpc_reply(connection().handle()),
  62. static_cast<std::ostringstream&>(std::ostringstream() <<
  63. "unable to bind queue '" << queue <<
  64. "' to '" << exchange <<
  65. "' using '" << routing_key <<
  66. "'").str(),
  67. false);
  68. }
  69. void channel::publish(
  70. const std::string& exchange,
  71. const std::string& routing_key,
  72. const publish_flags& flags,
  73. const std::string& message,
  74. const publish_options* options)
  75. {
  76. __impl::check_and_raise(
  77. amqp_basic_publish(
  78. connection().handle(),
  79. handle(),
  80. __impl::make_bytes(exchange),
  81. __impl::make_bytes(routing_key),
  82. flags.is_set(publish_flag::mandatory) ? 1 : 0,
  83. flags.is_set(publish_flag::immediate) ? 1 : 0,
  84. options ? options->properties() : nullptr,
  85. __impl::make_bytes(message)),
  86. "error while publishing message",
  87. false);
  88. }
  89. std::string channel::consume(const std::string& queue, const std::string& consumer_tag, const consume_flags& flags)
  90. {
  91. auto ret = amqp_basic_consume(
  92. connection().handle(),
  93. handle(),
  94. __impl::make_bytes(queue),
  95. __impl::make_bytes(consumer_tag),
  96. flags.is_set(consume_flag::no_local) ? 1 : 0,
  97. flags.is_set(consume_flag::no_ack) ? 1 : 0,
  98. flags.is_set(consume_flag::exclusive) ? 1 : 0,
  99. amqp_empty_table);
  100. __impl::check_and_raise(
  101. amqp_get_rpc_reply(connection().handle()),
  102. static_cast<std::ostringstream&>(std::ostringstream() << "unable to consume from queue '" << queue << "'").str(),
  103. false);
  104. if (!ret)
  105. throw exception(std::string("unable to consume from queue '") + queue + "'");
  106. return __impl::from_bytes(ret->consumer_tag);
  107. }
  108. void channel::qos(uint32_t prefetch_size, uint16_t prefetch_count, bool global)
  109. {
  110. auto ret = amqp_basic_qos(
  111. connection().handle(),
  112. handle(),
  113. prefetch_size,
  114. prefetch_count,
  115. global);
  116. __impl::check_and_raise(
  117. amqp_get_rpc_reply(connection().handle()),
  118. static_cast<std::ostringstream&>(std::ostringstream() << "unable to set QOS values").str(),
  119. false);
  120. if (!ret)
  121. throw exception("unable to set QOS values");
  122. }
  123. void channel::close(int status)
  124. {
  125. if (!_internal)
  126. return;
  127. __impl::check_and_raise(
  128. amqp_channel_close(connection().handle(), handle(), status),
  129. "unable to close channel",
  130. false);
  131. _internal->handle = 0;
  132. _internal.reset();
  133. }