Du kan inte välja fler än 25 ämnen Ämnen måste starta med en bokstav eller siffra, kan innehålla bindestreck ('-') och vara max 35 tecken långa.
 
 
 

126 rader
4.2 KiB

  1. #include <cppamqp/helper.inl>
  2. #include <cppamqp/channel.inl>
  3. #include <cppamqp/exception.inl>
  4. #include <cppamqp/connection.inl>
  5. using namespace ::cppamqp;
  6. channel::internal::internal(const cppamqp::connection& p_connection, channel_number p_handle)
  7. : connection(p_connection)
  8. , handle (p_handle)
  9. {
  10. if (handle <= 0)
  11. throw utl::argument_exception("handle", "handle must be greater than 0");
  12. amqp_channel_open(connection.handle(), handle);
  13. __impl::check_and_raise(
  14. amqp_get_rpc_reply(connection.handle()),
  15. std::string("unable to open channel ") + std::to_string(handle),
  16. false);
  17. }
  18. channel::internal::~internal()
  19. {
  20. if (handle <= 0)
  21. return;
  22. __impl::check_and_raise(
  23. amqp_channel_close(connection.handle(), handle, AMQP_REPLY_SUCCESS),
  24. "unable to close amqp channel",
  25. true);
  26. }
  27. queue_declaration channel::declare_queue(const std::string& name, const queue_flags& flags)
  28. {
  29. auto ret = amqp_queue_declare(
  30. connection().handle(),
  31. handle(),
  32. __impl::make_bytes(name),
  33. flags.is_set(queue_flag::passive) ? 1 : 0,
  34. flags.is_set(queue_flag::durable) ? 1 : 0,
  35. flags.is_set(queue_flag::exclusive) ? 1 : 0,
  36. flags.is_set(queue_flag::auto_delete) ? 1 : 0,
  37. amqp_empty_table);
  38. __impl::check_and_raise(
  39. amqp_get_rpc_reply(connection().handle()),
  40. std::string("unable to declare queue '") + name + "'",
  41. false);
  42. if (!ret)
  43. throw exception(std::string("unable to declare queue '") + name + "'");
  44. return queue_declaration
  45. {
  46. std::string(static_cast<const char*>(ret->queue.bytes), ret->queue.len),
  47. ret->message_count,
  48. ret->consumer_count
  49. };
  50. }
  51. void channel::bind_queue(const std::string& queue, const std::string& exchange, const std::string& routing_key)
  52. {
  53. amqp_queue_bind(
  54. connection().handle(),
  55. handle(),
  56. __impl::make_bytes(queue),
  57. __impl::make_bytes(exchange),
  58. __impl::make_bytes(routing_key),
  59. amqp_empty_table);
  60. __impl::check_and_raise(
  61. amqp_get_rpc_reply(connection().handle()),
  62. static_cast<std::ostringstream&>(std::ostringstream() <<
  63. "unable to bind queue '" << queue <<
  64. "' to '" << exchange <<
  65. "' using '" << routing_key <<
  66. "'").str(),
  67. false);
  68. }
  69. void channel::publish(
  70. const std::string& exchange,
  71. const std::string& routing_key,
  72. const publish_flags& flags,
  73. const std::string& message,
  74. const publish_options* options)
  75. {
  76. __impl::check_and_raise(
  77. amqp_basic_publish(
  78. connection().handle(),
  79. handle(),
  80. __impl::make_bytes(exchange),
  81. __impl::make_bytes(routing_key),
  82. flags.is_set(publish_flag::mandatory) ? 1 : 0,
  83. flags.is_set(publish_flag::immediate) ? 1 : 0,
  84. options ? options->properties() : nullptr,
  85. __impl::make_bytes(message)),
  86. "error while publishing message",
  87. false);
  88. }
  89. std::string channel::consume(const std::string& queue, const std::string& consumer_tag, const consume_flags& flags)
  90. {
  91. auto ret = amqp_basic_consume(
  92. connection().handle(),
  93. handle(),
  94. __impl::make_bytes(queue),
  95. __impl::make_bytes(consumer_tag),
  96. flags.is_set(consume_flag::no_local) ? 1 : 0,
  97. flags.is_set(consume_flag::no_ack) ? 1 : 0,
  98. flags.is_set(consume_flag::exclusive) ? 1 : 0,
  99. amqp_empty_table);
  100. __impl::check_and_raise(
  101. amqp_get_rpc_reply(connection().handle()),
  102. static_cast<std::ostringstream&>(std::ostringstream() << "unable to consume from queue '" << queue << "'").str(),
  103. false);
  104. if (!ret)
  105. throw exception(std::string("unable to consume from queue '") + queue + "'");
  106. return __impl::from_bytes(ret->consumer_tag);
  107. }
  108. void channel::close(int status)
  109. {
  110. if (!_internal)
  111. return;
  112. __impl::check_and_raise(
  113. amqp_channel_close(connection().handle(), handle(), status),
  114. "unable to close channel",
  115. false);
  116. _internal->handle = 0;
  117. _internal.reset();
  118. }