Non puoi selezionare più di 25 argomenti Gli argomenti devono iniziare con una lettera o un numero, possono includere trattini ('-') e possono essere lunghi fino a 35 caratteri.

155 righe
5.1 KiB

  1. #include <cppamqp/types.inl>
  2. #include <cppamqp/helper.inl>
  3. #include <cppamqp/channel.inl>
  4. #include <cppamqp/exception.inl>
  5. #include <cppamqp/connection.inl>
  6. using namespace ::cppamqp;
  7. channel::internal::internal(const cppamqp::connection& p_connection, channel_number p_handle)
  8. : connection(p_connection)
  9. , handle (p_handle)
  10. {
  11. if (handle <= 0)
  12. throw utl::argument_exception("handle", "handle must be greater than 0");
  13. amqp_channel_open(connection.handle(), handle);
  14. __impl::check_and_raise(
  15. amqp_get_rpc_reply(connection.handle()),
  16. std::string("unable to open channel ") + std::to_string(handle),
  17. false);
  18. }
  19. channel::internal::~internal()
  20. {
  21. if (handle <= 0 || !static_cast<bool>(connection))
  22. return;
  23. __impl::check_and_raise(
  24. amqp_channel_close(connection.handle(), handle, AMQP_REPLY_SUCCESS),
  25. "unable to close amqp channel",
  26. true);
  27. }
  28. queue_declaration channel::declare_queue(const std::string& name, const queue_flags& flags, const table& args)
  29. {
  30. size_t i = 0;
  31. amqp_table_t arg_table;
  32. amqp_table_entry_t entries[args.size()];
  33. arg_table.num_entries = static_cast<int>(args.size());
  34. arg_table.entries = &entries[0];
  35. for (auto& kvp : args)
  36. {
  37. arg_table.entries[i].key = __impl::make_bytes(kvp.first);
  38. arg_table.entries[i].value = kvp.second.get();
  39. ++i;
  40. }
  41. auto ret = amqp_queue_declare(
  42. connection().handle(),
  43. handle(),
  44. __impl::make_bytes(name),
  45. flags.is_set(queue_flag::passive) ? 1 : 0,
  46. flags.is_set(queue_flag::durable) ? 1 : 0,
  47. flags.is_set(queue_flag::exclusive) ? 1 : 0,
  48. flags.is_set(queue_flag::auto_delete) ? 1 : 0,
  49. arg_table);
  50. __impl::check_and_raise(
  51. amqp_get_rpc_reply(connection().handle()),
  52. std::string("unable to declare queue '") + name + "'",
  53. false);
  54. if (!ret)
  55. throw exception(std::string("unable to declare queue '") + name + "'");
  56. return queue_declaration
  57. {
  58. std::string(static_cast<const char*>(ret->queue.bytes), ret->queue.len),
  59. ret->message_count,
  60. ret->consumer_count
  61. };
  62. }
  63. void channel::bind_queue(const std::string& queue, const std::string& exchange, const std::string& routing_key)
  64. {
  65. amqp_queue_bind(
  66. connection().handle(),
  67. handle(),
  68. __impl::make_bytes(queue),
  69. __impl::make_bytes(exchange),
  70. __impl::make_bytes(routing_key),
  71. amqp_empty_table);
  72. __impl::check_and_raise(
  73. amqp_get_rpc_reply(connection().handle()),
  74. static_cast<std::ostringstream&>(std::ostringstream() <<
  75. "unable to bind queue '" << queue <<
  76. "' to '" << exchange <<
  77. "' using '" << routing_key <<
  78. "'").str(),
  79. false);
  80. }
  81. void channel::publish(
  82. const std::string& exchange,
  83. const std::string& routing_key,
  84. const publish_flags& flags,
  85. const std::string& message,
  86. const publish_options* options)
  87. {
  88. __impl::check_and_raise(
  89. amqp_basic_publish(
  90. connection().handle(),
  91. handle(),
  92. __impl::make_bytes(exchange),
  93. __impl::make_bytes(routing_key),
  94. flags.is_set(publish_flag::mandatory) ? 1 : 0,
  95. flags.is_set(publish_flag::immediate) ? 1 : 0,
  96. options ? options->properties() : nullptr,
  97. __impl::make_bytes(message)),
  98. "error while publishing message",
  99. false);
  100. }
  101. std::string channel::consume(const std::string& queue, const std::string& consumer_tag, const consume_flags& flags)
  102. {
  103. auto ret = amqp_basic_consume(
  104. connection().handle(),
  105. handle(),
  106. __impl::make_bytes(queue),
  107. __impl::make_bytes(consumer_tag),
  108. flags.is_set(consume_flag::no_local) ? 1 : 0,
  109. flags.is_set(consume_flag::no_ack) ? 1 : 0,
  110. flags.is_set(consume_flag::exclusive) ? 1 : 0,
  111. amqp_empty_table);
  112. __impl::check_and_raise(
  113. amqp_get_rpc_reply(connection().handle()),
  114. static_cast<std::ostringstream&>(std::ostringstream() << "unable to consume from queue '" << queue << "'").str(),
  115. false);
  116. if (!ret)
  117. throw exception(std::string("unable to consume from queue '") + queue + "'");
  118. return __impl::from_bytes(ret->consumer_tag);
  119. }
  120. void channel::qos(uint32_t prefetch_size, uint16_t prefetch_count, bool global)
  121. {
  122. auto ret = amqp_basic_qos(
  123. connection().handle(),
  124. handle(),
  125. prefetch_size,
  126. prefetch_count,
  127. global);
  128. __impl::check_and_raise(
  129. amqp_get_rpc_reply(connection().handle()),
  130. static_cast<std::ostringstream&>(std::ostringstream() << "unable to set QOS values").str(),
  131. false);
  132. if (!ret)
  133. throw exception("unable to set QOS values");
  134. }
  135. void channel::close(int status)
  136. {
  137. if (!_internal)
  138. return;
  139. __impl::check_and_raise(
  140. amqp_channel_close(connection().handle(), handle(), status),
  141. "unable to close channel",
  142. false);
  143. _internal->handle = 0;
  144. _internal.reset();
  145. }