您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

982 行
32 KiB

  1. // Provides an efficient blocking version of moodycamel::ConcurrentQueue.
  2. // ©2015-2016 Cameron Desrochers. Distributed under the terms of the simplified
  3. // BSD license, available at the top of concurrentqueue.h.
  4. // Uses Jeff Preshing's semaphore implementation (under the terms of its
  5. // separate zlib license, embedded below).
  6. #pragma once
  7. #include "concurrentqueue.h"
  8. #include <type_traits>
  9. #include <cerrno>
  10. #include <memory>
  11. #include <chrono>
  12. #include <ctime>
  13. #if defined(_WIN32)
  14. // Avoid including windows.h in a header; we only need a handful of
  15. // items, so we'll redeclare them here (this is relatively safe since
  16. // the API generally has to remain stable between Windows versions).
  17. // I know this is an ugly hack but it still beats polluting the global
  18. // namespace with thousands of generic names or adding a .cpp for nothing.
  19. extern "C" {
  20. struct _SECURITY_ATTRIBUTES;
  21. __declspec(dllimport) void* __stdcall CreateSemaphoreW(_SECURITY_ATTRIBUTES* lpSemaphoreAttributes, long lInitialCount, long lMaximumCount, const wchar_t* lpName);
  22. __declspec(dllimport) int __stdcall CloseHandle(void* hObject);
  23. __declspec(dllimport) unsigned long __stdcall WaitForSingleObject(void* hHandle, unsigned long dwMilliseconds);
  24. __declspec(dllimport) int __stdcall ReleaseSemaphore(void* hSemaphore, long lReleaseCount, long* lpPreviousCount);
  25. }
  26. #elif defined(__MACH__)
  27. #include <mach/mach.h>
  28. #elif defined(__unix__)
  29. #include <semaphore.h>
  30. #endif
  31. namespace moodycamel
  32. {
  33. namespace details
  34. {
  35. // Code in the mpmc_sema namespace below is an adaptation of Jeff Preshing's
  36. // portable + lightweight semaphore implementations, originally from
  37. // https://github.com/preshing/cpp11-on-multicore/blob/master/common/sema.h
  38. // LICENSE:
  39. // Copyright (c) 2015 Jeff Preshing
  40. //
  41. // This software is provided 'as-is', without any express or implied
  42. // warranty. In no event will the authors be held liable for any damages
  43. // arising from the use of this software.
  44. //
  45. // Permission is granted to anyone to use this software for any purpose,
  46. // including commercial applications, and to alter it and redistribute it
  47. // freely, subject to the following restrictions:
  48. //
  49. // 1. The origin of this software must not be misrepresented; you must not
  50. // claim that you wrote the original software. If you use this software
  51. // in a product, an acknowledgement in the product documentation would be
  52. // appreciated but is not required.
  53. // 2. Altered source versions must be plainly marked as such, and must not be
  54. // misrepresented as being the original software.
  55. // 3. This notice may not be removed or altered from any source distribution.
  56. namespace mpmc_sema
  57. {
  58. #if defined(_WIN32)
  59. class Semaphore
  60. {
  61. private:
  62. void* m_hSema;
  63. Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
  64. Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
  65. public:
  66. Semaphore(int initialCount = 0)
  67. {
  68. assert(initialCount >= 0);
  69. const long maxLong = 0x7fffffff;
  70. m_hSema = CreateSemaphoreW(nullptr, initialCount, maxLong, nullptr);
  71. }
  72. ~Semaphore()
  73. {
  74. CloseHandle(m_hSema);
  75. }
  76. void wait()
  77. {
  78. const unsigned long infinite = 0xffffffff;
  79. WaitForSingleObject(m_hSema, infinite);
  80. }
  81. bool try_wait()
  82. {
  83. const unsigned long RC_WAIT_TIMEOUT = 0x00000102;
  84. return WaitForSingleObject(m_hSema, 0) != RC_WAIT_TIMEOUT;
  85. }
  86. bool timed_wait(std::uint64_t usecs)
  87. {
  88. const unsigned long RC_WAIT_TIMEOUT = 0x00000102;
  89. return WaitForSingleObject(m_hSema, (unsigned long)(usecs / 1000)) != RC_WAIT_TIMEOUT;
  90. }
  91. void signal(int count = 1)
  92. {
  93. ReleaseSemaphore(m_hSema, count, nullptr);
  94. }
  95. };
  96. #elif defined(__MACH__)
  97. //---------------------------------------------------------
  98. // Semaphore (Apple iOS and OSX)
  99. // Can't use POSIX semaphores due to http://lists.apple.com/archives/darwin-kernel/2009/Apr/msg00010.html
  100. //---------------------------------------------------------
  101. class Semaphore
  102. {
  103. private:
  104. semaphore_t m_sema;
  105. Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
  106. Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
  107. public:
  108. Semaphore(int initialCount = 0)
  109. {
  110. assert(initialCount >= 0);
  111. semaphore_create(mach_task_self(), &m_sema, SYNC_POLICY_FIFO, initialCount);
  112. }
  113. ~Semaphore()
  114. {
  115. semaphore_destroy(mach_task_self(), m_sema);
  116. }
  117. void wait()
  118. {
  119. semaphore_wait(m_sema);
  120. }
  121. bool try_wait()
  122. {
  123. return timed_wait(0);
  124. }
  125. bool timed_wait(std::uint64_t timeout_usecs)
  126. {
  127. mach_timespec_t ts;
  128. ts.tv_sec = static_cast<unsigned int>(timeout_usecs / 1000000);
  129. ts.tv_nsec = (timeout_usecs % 1000000) * 1000;
  130. // added in OSX 10.10: https://developer.apple.com/library/prerelease/mac/documentation/General/Reference/APIDiffsMacOSX10_10SeedDiff/modules/Darwin.html
  131. kern_return_t rc = semaphore_timedwait(m_sema, ts);
  132. return rc != KERN_OPERATION_TIMED_OUT && rc != KERN_ABORTED;
  133. }
  134. void signal()
  135. {
  136. semaphore_signal(m_sema);
  137. }
  138. void signal(int count)
  139. {
  140. while (count-- > 0)
  141. {
  142. semaphore_signal(m_sema);
  143. }
  144. }
  145. };
  146. #elif defined(__unix__)
  147. //---------------------------------------------------------
  148. // Semaphore (POSIX, Linux)
  149. //---------------------------------------------------------
  150. class Semaphore
  151. {
  152. private:
  153. sem_t m_sema;
  154. Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
  155. Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
  156. public:
  157. Semaphore(int initialCount = 0)
  158. {
  159. assert(initialCount >= 0);
  160. sem_init(&m_sema, 0, static_cast<unsigned int>(initialCount));
  161. }
  162. ~Semaphore()
  163. {
  164. sem_destroy(&m_sema);
  165. }
  166. void wait()
  167. {
  168. // http://stackoverflow.com/questions/2013181/gdb-causes-sem-wait-to-fail-with-eintr-error
  169. int rc;
  170. do {
  171. rc = sem_wait(&m_sema);
  172. } while (rc == -1 && errno == EINTR);
  173. }
  174. bool try_wait()
  175. {
  176. int rc;
  177. do {
  178. rc = sem_trywait(&m_sema);
  179. } while (rc == -1 && errno == EINTR);
  180. return !(rc == -1 && errno == EAGAIN);
  181. }
  182. bool timed_wait(std::uint64_t usecs)
  183. {
  184. struct timespec ts;
  185. const int usecs_in_1_sec = 1000000;
  186. const int nsecs_in_1_sec = 1000000000;
  187. clock_gettime(CLOCK_REALTIME, &ts);
  188. ts.tv_sec += usecs / usecs_in_1_sec;
  189. ts.tv_nsec += (usecs % usecs_in_1_sec) * 1000;
  190. // sem_timedwait bombs if you have more than 1e9 in tv_nsec
  191. // so we have to clean things up before passing it in
  192. if (ts.tv_nsec >= nsecs_in_1_sec) {
  193. ts.tv_nsec -= nsecs_in_1_sec;
  194. ++ts.tv_sec;
  195. }
  196. int rc;
  197. do {
  198. rc = sem_timedwait(&m_sema, &ts);
  199. } while (rc == -1 && errno == EINTR);
  200. return !(rc == -1 && errno == ETIMEDOUT);
  201. }
  202. void signal()
  203. {
  204. sem_post(&m_sema);
  205. }
  206. void signal(int count)
  207. {
  208. while (count-- > 0)
  209. {
  210. sem_post(&m_sema);
  211. }
  212. }
  213. };
  214. #else
  215. #error Unsupported platform! (No semaphore wrapper available)
  216. #endif
  217. //---------------------------------------------------------
  218. // LightweightSemaphore
  219. //---------------------------------------------------------
  220. class LightweightSemaphore
  221. {
  222. public:
  223. typedef std::make_signed<std::size_t>::type ssize_t;
  224. private:
  225. std::atomic<ssize_t> m_count;
  226. Semaphore m_sema;
  227. bool waitWithPartialSpinning(std::int64_t timeout_usecs = -1)
  228. {
  229. ssize_t oldCount;
  230. // Is there a better way to set the initial spin count?
  231. // If we lower it to 1000, testBenaphore becomes 15x slower on my Core i7-5930K Windows PC,
  232. // as threads start hitting the kernel semaphore.
  233. int spin = 10000;
  234. while (--spin >= 0)
  235. {
  236. oldCount = m_count.load(std::memory_order_relaxed);
  237. if ((oldCount > 0) && m_count.compare_exchange_strong(oldCount, oldCount - 1, std::memory_order_acquire, std::memory_order_relaxed))
  238. return true;
  239. std::atomic_signal_fence(std::memory_order_acquire); // Prevent the compiler from collapsing the loop.
  240. }
  241. oldCount = m_count.fetch_sub(1, std::memory_order_acquire);
  242. if (oldCount > 0)
  243. return true;
  244. if (timeout_usecs < 0)
  245. {
  246. m_sema.wait();
  247. return true;
  248. }
  249. if (m_sema.timed_wait((std::uint64_t)timeout_usecs))
  250. return true;
  251. // At this point, we've timed out waiting for the semaphore, but the
  252. // count is still decremented indicating we may still be waiting on
  253. // it. So we have to re-adjust the count, but only if the semaphore
  254. // wasn't signaled enough times for us too since then. If it was, we
  255. // need to release the semaphore too.
  256. while (true)
  257. {
  258. oldCount = m_count.load(std::memory_order_acquire);
  259. if (oldCount >= 0 && m_sema.try_wait())
  260. return true;
  261. if (oldCount < 0 && m_count.compare_exchange_strong(oldCount, oldCount + 1, std::memory_order_relaxed, std::memory_order_relaxed))
  262. return false;
  263. }
  264. }
  265. ssize_t waitManyWithPartialSpinning(ssize_t max, std::int64_t timeout_usecs = -1)
  266. {
  267. assert(max > 0);
  268. ssize_t oldCount;
  269. int spin = 10000;
  270. while (--spin >= 0)
  271. {
  272. oldCount = m_count.load(std::memory_order_relaxed);
  273. if (oldCount > 0)
  274. {
  275. ssize_t newCount = oldCount > max ? oldCount - max : 0;
  276. if (m_count.compare_exchange_strong(oldCount, newCount, std::memory_order_acquire, std::memory_order_relaxed))
  277. return oldCount - newCount;
  278. }
  279. std::atomic_signal_fence(std::memory_order_acquire);
  280. }
  281. oldCount = m_count.fetch_sub(1, std::memory_order_acquire);
  282. if (oldCount <= 0)
  283. {
  284. if (timeout_usecs < 0)
  285. m_sema.wait();
  286. else if (!m_sema.timed_wait((std::uint64_t)timeout_usecs))
  287. {
  288. while (true)
  289. {
  290. oldCount = m_count.load(std::memory_order_acquire);
  291. if (oldCount >= 0 && m_sema.try_wait())
  292. break;
  293. if (oldCount < 0 && m_count.compare_exchange_strong(oldCount, oldCount + 1, std::memory_order_relaxed, std::memory_order_relaxed))
  294. return 0;
  295. }
  296. }
  297. }
  298. if (max > 1)
  299. return 1 + tryWaitMany(max - 1);
  300. return 1;
  301. }
  302. public:
  303. LightweightSemaphore(ssize_t initialCount = 0) : m_count(initialCount)
  304. {
  305. assert(initialCount >= 0);
  306. }
  307. bool tryWait()
  308. {
  309. ssize_t oldCount = m_count.load(std::memory_order_relaxed);
  310. while (oldCount > 0)
  311. {
  312. if (m_count.compare_exchange_weak(oldCount, oldCount - 1, std::memory_order_acquire, std::memory_order_relaxed))
  313. return true;
  314. }
  315. return false;
  316. }
  317. void wait()
  318. {
  319. if (!tryWait())
  320. waitWithPartialSpinning();
  321. }
  322. bool wait(std::int64_t timeout_usecs)
  323. {
  324. return tryWait() || waitWithPartialSpinning(timeout_usecs);
  325. }
  326. // Acquires between 0 and (greedily) max, inclusive
  327. ssize_t tryWaitMany(ssize_t max)
  328. {
  329. assert(max >= 0);
  330. ssize_t oldCount = m_count.load(std::memory_order_relaxed);
  331. while (oldCount > 0)
  332. {
  333. ssize_t newCount = oldCount > max ? oldCount - max : 0;
  334. if (m_count.compare_exchange_weak(oldCount, newCount, std::memory_order_acquire, std::memory_order_relaxed))
  335. return oldCount - newCount;
  336. }
  337. return 0;
  338. }
  339. // Acquires at least one, and (greedily) at most max
  340. ssize_t waitMany(ssize_t max, std::int64_t timeout_usecs)
  341. {
  342. assert(max >= 0);
  343. ssize_t result = tryWaitMany(max);
  344. if (result == 0 && max > 0)
  345. result = waitManyWithPartialSpinning(max, timeout_usecs);
  346. return result;
  347. }
  348. ssize_t waitMany(ssize_t max)
  349. {
  350. ssize_t result = waitMany(max, -1);
  351. assert(result > 0);
  352. return result;
  353. }
  354. void signal(ssize_t count = 1)
  355. {
  356. assert(count >= 0);
  357. ssize_t oldCount = m_count.fetch_add(count, std::memory_order_release);
  358. ssize_t toRelease = -oldCount < count ? -oldCount : count;
  359. if (toRelease > 0)
  360. {
  361. m_sema.signal((int)toRelease);
  362. }
  363. }
  364. ssize_t availableApprox() const
  365. {
  366. ssize_t count = m_count.load(std::memory_order_relaxed);
  367. return count > 0 ? count : 0;
  368. }
  369. };
  370. } // end namespace mpmc_sema
  371. } // end namespace details
  372. // This is a blocking version of the queue. It has an almost identical interface to
  373. // the normal non-blocking version, with the addition of various wait_dequeue() methods
  374. // and the removal of producer-specific dequeue methods.
  375. template<typename T, typename Traits = ConcurrentQueueDefaultTraits>
  376. class BlockingConcurrentQueue
  377. {
  378. private:
  379. typedef ::moodycamel::ConcurrentQueue<T, Traits> ConcurrentQueue;
  380. typedef details::mpmc_sema::LightweightSemaphore LightweightSemaphore;
  381. public:
  382. typedef typename ConcurrentQueue::producer_token_t producer_token_t;
  383. typedef typename ConcurrentQueue::consumer_token_t consumer_token_t;
  384. typedef typename ConcurrentQueue::index_t index_t;
  385. typedef typename ConcurrentQueue::size_t size_t;
  386. typedef typename std::make_signed<size_t>::type ssize_t;
  387. static const size_t BLOCK_SIZE = ConcurrentQueue::BLOCK_SIZE;
  388. static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = ConcurrentQueue::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD;
  389. static const size_t EXPLICIT_INITIAL_INDEX_SIZE = ConcurrentQueue::EXPLICIT_INITIAL_INDEX_SIZE;
  390. static const size_t IMPLICIT_INITIAL_INDEX_SIZE = ConcurrentQueue::IMPLICIT_INITIAL_INDEX_SIZE;
  391. static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = ConcurrentQueue::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE;
  392. static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = ConcurrentQueue::EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE;
  393. static const size_t MAX_SUBQUEUE_SIZE = ConcurrentQueue::MAX_SUBQUEUE_SIZE;
  394. public:
  395. // Creates a queue with at least `capacity` element slots; note that the
  396. // actual number of elements that can be inserted without additional memory
  397. // allocation depends on the number of producers and the block size (e.g. if
  398. // the block size is equal to `capacity`, only a single block will be allocated
  399. // up-front, which means only a single producer will be able to enqueue elements
  400. // without an extra allocation -- blocks aren't shared between producers).
  401. // This method is not thread safe -- it is up to the user to ensure that the
  402. // queue is fully constructed before it starts being used by other threads (this
  403. // includes making the memory effects of construction visible, possibly with a
  404. // memory barrier).
  405. explicit BlockingConcurrentQueue(size_t capacity = 6 * BLOCK_SIZE)
  406. : inner(capacity), sema(create<LightweightSemaphore>(), &BlockingConcurrentQueue::template destroy<LightweightSemaphore>)
  407. {
  408. assert(reinterpret_cast<ConcurrentQueue*>((BlockingConcurrentQueue*)1) == &((BlockingConcurrentQueue*)1)->inner && "BlockingConcurrentQueue must have ConcurrentQueue as its first member");
  409. if (!sema) {
  410. MOODYCAMEL_THROW(std::bad_alloc());
  411. }
  412. }
  413. BlockingConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers)
  414. : inner(minCapacity, maxExplicitProducers, maxImplicitProducers), sema(create<LightweightSemaphore>(), &BlockingConcurrentQueue::template destroy<LightweightSemaphore>)
  415. {
  416. assert(reinterpret_cast<ConcurrentQueue*>((BlockingConcurrentQueue*)1) == &((BlockingConcurrentQueue*)1)->inner && "BlockingConcurrentQueue must have ConcurrentQueue as its first member");
  417. if (!sema) {
  418. MOODYCAMEL_THROW(std::bad_alloc());
  419. }
  420. }
  421. // Disable copying and copy assignment
  422. BlockingConcurrentQueue(BlockingConcurrentQueue const&) MOODYCAMEL_DELETE_FUNCTION;
  423. BlockingConcurrentQueue& operator=(BlockingConcurrentQueue const&) MOODYCAMEL_DELETE_FUNCTION;
  424. // Moving is supported, but note that it is *not* a thread-safe operation.
  425. // Nobody can use the queue while it's being moved, and the memory effects
  426. // of that move must be propagated to other threads before they can use it.
  427. // Note: When a queue is moved, its tokens are still valid but can only be
  428. // used with the destination queue (i.e. semantically they are moved along
  429. // with the queue itself).
  430. BlockingConcurrentQueue(BlockingConcurrentQueue&& other) MOODYCAMEL_NOEXCEPT
  431. : inner(std::move(other.inner)), sema(std::move(other.sema))
  432. { }
  433. inline BlockingConcurrentQueue& operator=(BlockingConcurrentQueue&& other) MOODYCAMEL_NOEXCEPT
  434. {
  435. return swap_internal(other);
  436. }
  437. // Swaps this queue's state with the other's. Not thread-safe.
  438. // Swapping two queues does not invalidate their tokens, however
  439. // the tokens that were created for one queue must be used with
  440. // only the swapped queue (i.e. the tokens are tied to the
  441. // queue's movable state, not the object itself).
  442. inline void swap(BlockingConcurrentQueue& other) MOODYCAMEL_NOEXCEPT
  443. {
  444. swap_internal(other);
  445. }
  446. private:
  447. BlockingConcurrentQueue& swap_internal(BlockingConcurrentQueue& other)
  448. {
  449. if (this == &other) {
  450. return *this;
  451. }
  452. inner.swap(other.inner);
  453. sema.swap(other.sema);
  454. return *this;
  455. }
  456. public:
  457. // Enqueues a single item (by copying it).
  458. // Allocates memory if required. Only fails if memory allocation fails (or implicit
  459. // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
  460. // or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
  461. // Thread-safe.
  462. inline bool enqueue(T const& item)
  463. {
  464. if ((details::likely)(inner.enqueue(item))) {
  465. sema->signal();
  466. return true;
  467. }
  468. return false;
  469. }
  470. // Enqueues a single item (by moving it, if possible).
  471. // Allocates memory if required. Only fails if memory allocation fails (or implicit
  472. // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
  473. // or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
  474. // Thread-safe.
  475. inline bool enqueue(T&& item)
  476. {
  477. if ((details::likely)(inner.enqueue(std::move(item)))) {
  478. sema->signal();
  479. return true;
  480. }
  481. return false;
  482. }
  483. // Enqueues a single item (by copying it) using an explicit producer token.
  484. // Allocates memory if required. Only fails if memory allocation fails (or
  485. // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
  486. // Thread-safe.
  487. inline bool enqueue(producer_token_t const& token, T const& item)
  488. {
  489. if ((details::likely)(inner.enqueue(token, item))) {
  490. sema->signal();
  491. return true;
  492. }
  493. return false;
  494. }
  495. // Enqueues a single item (by moving it, if possible) using an explicit producer token.
  496. // Allocates memory if required. Only fails if memory allocation fails (or
  497. // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
  498. // Thread-safe.
  499. inline bool enqueue(producer_token_t const& token, T&& item)
  500. {
  501. if ((details::likely)(inner.enqueue(token, std::move(item)))) {
  502. sema->signal();
  503. return true;
  504. }
  505. return false;
  506. }
  507. // Enqueues several items.
  508. // Allocates memory if required. Only fails if memory allocation fails (or
  509. // implicit production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
  510. // is 0, or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
  511. // Note: Use std::make_move_iterator if the elements should be moved instead of copied.
  512. // Thread-safe.
  513. template<typename It>
  514. inline bool enqueue_bulk(It itemFirst, size_t count)
  515. {
  516. if ((details::likely)(inner.enqueue_bulk(std::forward<It>(itemFirst), count))) {
  517. sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
  518. return true;
  519. }
  520. return false;
  521. }
  522. // Enqueues several items using an explicit producer token.
  523. // Allocates memory if required. Only fails if memory allocation fails
  524. // (or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
  525. // Note: Use std::make_move_iterator if the elements should be moved
  526. // instead of copied.
  527. // Thread-safe.
  528. template<typename It>
  529. inline bool enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
  530. {
  531. if ((details::likely)(inner.enqueue_bulk(token, std::forward<It>(itemFirst), count))) {
  532. sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
  533. return true;
  534. }
  535. return false;
  536. }
  537. // Enqueues a single item (by copying it).
  538. // Does not allocate memory. Fails if not enough room to enqueue (or implicit
  539. // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
  540. // is 0).
  541. // Thread-safe.
  542. inline bool try_enqueue(T const& item)
  543. {
  544. if (inner.try_enqueue(item)) {
  545. sema->signal();
  546. return true;
  547. }
  548. return false;
  549. }
  550. // Enqueues a single item (by moving it, if possible).
  551. // Does not allocate memory (except for one-time implicit producer).
  552. // Fails if not enough room to enqueue (or implicit production is
  553. // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
  554. // Thread-safe.
  555. inline bool try_enqueue(T&& item)
  556. {
  557. if (inner.try_enqueue(std::move(item))) {
  558. sema->signal();
  559. return true;
  560. }
  561. return false;
  562. }
  563. // Enqueues a single item (by copying it) using an explicit producer token.
  564. // Does not allocate memory. Fails if not enough room to enqueue.
  565. // Thread-safe.
  566. inline bool try_enqueue(producer_token_t const& token, T const& item)
  567. {
  568. if (inner.try_enqueue(token, item)) {
  569. sema->signal();
  570. return true;
  571. }
  572. return false;
  573. }
  574. // Enqueues a single item (by moving it, if possible) using an explicit producer token.
  575. // Does not allocate memory. Fails if not enough room to enqueue.
  576. // Thread-safe.
  577. inline bool try_enqueue(producer_token_t const& token, T&& item)
  578. {
  579. if (inner.try_enqueue(token, std::move(item))) {
  580. sema->signal();
  581. return true;
  582. }
  583. return false;
  584. }
  585. // Enqueues several items.
  586. // Does not allocate memory (except for one-time implicit producer).
  587. // Fails if not enough room to enqueue (or implicit production is
  588. // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
  589. // Note: Use std::make_move_iterator if the elements should be moved
  590. // instead of copied.
  591. // Thread-safe.
  592. template<typename It>
  593. inline bool try_enqueue_bulk(It itemFirst, size_t count)
  594. {
  595. if (inner.try_enqueue_bulk(std::forward<It>(itemFirst), count)) {
  596. sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
  597. return true;
  598. }
  599. return false;
  600. }
  601. // Enqueues several items using an explicit producer token.
  602. // Does not allocate memory. Fails if not enough room to enqueue.
  603. // Note: Use std::make_move_iterator if the elements should be moved
  604. // instead of copied.
  605. // Thread-safe.
  606. template<typename It>
  607. inline bool try_enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
  608. {
  609. if (inner.try_enqueue_bulk(token, std::forward<It>(itemFirst), count)) {
  610. sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
  611. return true;
  612. }
  613. return false;
  614. }
  615. // Attempts to dequeue from the queue.
  616. // Returns false if all producer streams appeared empty at the time they
  617. // were checked (so, the queue is likely but not guaranteed to be empty).
  618. // Never allocates. Thread-safe.
  619. template<typename U>
  620. inline bool try_dequeue(U& item)
  621. {
  622. if (sema->tryWait()) {
  623. while (!inner.try_dequeue(item)) {
  624. continue;
  625. }
  626. return true;
  627. }
  628. return false;
  629. }
  630. // Attempts to dequeue from the queue using an explicit consumer token.
  631. // Returns false if all producer streams appeared empty at the time they
  632. // were checked (so, the queue is likely but not guaranteed to be empty).
  633. // Never allocates. Thread-safe.
  634. template<typename U>
  635. inline bool try_dequeue(consumer_token_t& token, U& item)
  636. {
  637. if (sema->tryWait()) {
  638. while (!inner.try_dequeue(token, item)) {
  639. continue;
  640. }
  641. return true;
  642. }
  643. return false;
  644. }
  645. // Attempts to dequeue several elements from the queue.
  646. // Returns the number of items actually dequeued.
  647. // Returns 0 if all producer streams appeared empty at the time they
  648. // were checked (so, the queue is likely but not guaranteed to be empty).
  649. // Never allocates. Thread-safe.
  650. template<typename It>
  651. inline size_t try_dequeue_bulk(It itemFirst, size_t max)
  652. {
  653. size_t count = 0;
  654. max = (size_t)sema->tryWaitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
  655. while (count != max) {
  656. count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
  657. }
  658. return count;
  659. }
  660. // Attempts to dequeue several elements from the queue using an explicit consumer token.
  661. // Returns the number of items actually dequeued.
  662. // Returns 0 if all producer streams appeared empty at the time they
  663. // were checked (so, the queue is likely but not guaranteed to be empty).
  664. // Never allocates. Thread-safe.
  665. template<typename It>
  666. inline size_t try_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max)
  667. {
  668. size_t count = 0;
  669. max = (size_t)sema->tryWaitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
  670. while (count != max) {
  671. count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
  672. }
  673. return count;
  674. }
  675. // Blocks the current thread until there's something to dequeue, then
  676. // dequeues it.
  677. // Never allocates. Thread-safe.
  678. template<typename U>
  679. inline void wait_dequeue(U& item)
  680. {
  681. sema->wait();
  682. while (!inner.try_dequeue(item)) {
  683. continue;
  684. }
  685. }
  686. // Blocks the current thread until either there's something to dequeue
  687. // or the timeout (specified in microseconds) expires. Returns false
  688. // without setting `item` if the timeout expires, otherwise assigns
  689. // to `item` and returns true.
  690. // Using a negative timeout indicates an indefinite timeout,
  691. // and is thus functionally equivalent to calling wait_dequeue.
  692. // Never allocates. Thread-safe.
  693. template<typename U>
  694. inline bool wait_dequeue_timed(U& item, std::int64_t timeout_usecs)
  695. {
  696. if (!sema->wait(timeout_usecs)) {
  697. return false;
  698. }
  699. while (!inner.try_dequeue(item)) {
  700. continue;
  701. }
  702. return true;
  703. }
  704. // Blocks the current thread until either there's something to dequeue
  705. // or the timeout expires. Returns false without setting `item` if the
  706. // timeout expires, otherwise assigns to `item` and returns true.
  707. // Never allocates. Thread-safe.
  708. template<typename U, typename Rep, typename Period>
  709. inline bool wait_dequeue_timed(U& item, std::chrono::duration<Rep, Period> const& timeout)
  710. {
  711. return wait_dequeue_timed(item, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
  712. }
  713. // Blocks the current thread until there's something to dequeue, then
  714. // dequeues it using an explicit consumer token.
  715. // Never allocates. Thread-safe.
  716. template<typename U>
  717. inline void wait_dequeue(consumer_token_t& token, U& item)
  718. {
  719. sema->wait();
  720. while (!inner.try_dequeue(token, item)) {
  721. continue;
  722. }
  723. }
  724. // Blocks the current thread until either there's something to dequeue
  725. // or the timeout (specified in microseconds) expires. Returns false
  726. // without setting `item` if the timeout expires, otherwise assigns
  727. // to `item` and returns true.
  728. // Using a negative timeout indicates an indefinite timeout,
  729. // and is thus functionally equivalent to calling wait_dequeue.
  730. // Never allocates. Thread-safe.
  731. template<typename U>
  732. inline bool wait_dequeue_timed(consumer_token_t& token, U& item, std::int64_t timeout_usecs)
  733. {
  734. if (!sema->wait(timeout_usecs)) {
  735. return false;
  736. }
  737. while (!inner.try_dequeue(token, item)) {
  738. continue;
  739. }
  740. return true;
  741. }
  742. // Blocks the current thread until either there's something to dequeue
  743. // or the timeout expires. Returns false without setting `item` if the
  744. // timeout expires, otherwise assigns to `item` and returns true.
  745. // Never allocates. Thread-safe.
  746. template<typename U, typename Rep, typename Period>
  747. inline bool wait_dequeue_timed(consumer_token_t& token, U& item, std::chrono::duration<Rep, Period> const& timeout)
  748. {
  749. return wait_dequeue_timed(token, item, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
  750. }
  751. // Attempts to dequeue several elements from the queue.
  752. // Returns the number of items actually dequeued, which will
  753. // always be at least one (this method blocks until the queue
  754. // is non-empty) and at most max.
  755. // Never allocates. Thread-safe.
  756. template<typename It>
  757. inline size_t wait_dequeue_bulk(It itemFirst, size_t max)
  758. {
  759. size_t count = 0;
  760. max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
  761. while (count != max) {
  762. count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
  763. }
  764. return count;
  765. }
  766. // Attempts to dequeue several elements from the queue.
  767. // Returns the number of items actually dequeued, which can
  768. // be 0 if the timeout expires while waiting for elements,
  769. // and at most max.
  770. // Using a negative timeout indicates an indefinite timeout,
  771. // and is thus functionally equivalent to calling wait_dequeue_bulk.
  772. // Never allocates. Thread-safe.
  773. template<typename It>
  774. inline size_t wait_dequeue_bulk_timed(It itemFirst, size_t max, std::int64_t timeout_usecs)
  775. {
  776. size_t count = 0;
  777. max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max, timeout_usecs);
  778. while (count != max) {
  779. count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
  780. }
  781. return count;
  782. }
  783. // Attempts to dequeue several elements from the queue.
  784. // Returns the number of items actually dequeued, which can
  785. // be 0 if the timeout expires while waiting for elements,
  786. // and at most max.
  787. // Never allocates. Thread-safe.
  788. template<typename It, typename Rep, typename Period>
  789. inline size_t wait_dequeue_bulk_timed(It itemFirst, size_t max, std::chrono::duration<Rep, Period> const& timeout)
  790. {
  791. return wait_dequeue_bulk_timed<It&>(itemFirst, max, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
  792. }
  793. // Attempts to dequeue several elements from the queue using an explicit consumer token.
  794. // Returns the number of items actually dequeued, which will
  795. // always be at least one (this method blocks until the queue
  796. // is non-empty) and at most max.
  797. // Never allocates. Thread-safe.
  798. template<typename It>
  799. inline size_t wait_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max)
  800. {
  801. size_t count = 0;
  802. max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
  803. while (count != max) {
  804. count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
  805. }
  806. return count;
  807. }
  808. // Attempts to dequeue several elements from the queue using an explicit consumer token.
  809. // Returns the number of items actually dequeued, which can
  810. // be 0 if the timeout expires while waiting for elements,
  811. // and at most max.
  812. // Using a negative timeout indicates an indefinite timeout,
  813. // and is thus functionally equivalent to calling wait_dequeue_bulk.
  814. // Never allocates. Thread-safe.
  815. template<typename It>
  816. inline size_t wait_dequeue_bulk_timed(consumer_token_t& token, It itemFirst, size_t max, std::int64_t timeout_usecs)
  817. {
  818. size_t count = 0;
  819. max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max, timeout_usecs);
  820. while (count != max) {
  821. count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
  822. }
  823. return count;
  824. }
  825. // Attempts to dequeue several elements from the queue using an explicit consumer token.
  826. // Returns the number of items actually dequeued, which can
  827. // be 0 if the timeout expires while waiting for elements,
  828. // and at most max.
  829. // Never allocates. Thread-safe.
  830. template<typename It, typename Rep, typename Period>
  831. inline size_t wait_dequeue_bulk_timed(consumer_token_t& token, It itemFirst, size_t max, std::chrono::duration<Rep, Period> const& timeout)
  832. {
  833. return wait_dequeue_bulk_timed<It&>(token, itemFirst, max, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
  834. }
  835. // Returns an estimate of the total number of elements currently in the queue. This
  836. // estimate is only accurate if the queue has completely stabilized before it is called
  837. // (i.e. all enqueue and dequeue operations have completed and their memory effects are
  838. // visible on the calling thread, and no further operations start while this method is
  839. // being called).
  840. // Thread-safe.
  841. inline size_t size_approx() const
  842. {
  843. return (size_t)sema->availableApprox();
  844. }
  845. // Returns true if the underlying atomic variables used by
  846. // the queue are lock-free (they should be on most platforms).
  847. // Thread-safe.
  848. static bool is_lock_free()
  849. {
  850. return ConcurrentQueue::is_lock_free();
  851. }
  852. private:
  853. template<typename U>
  854. static inline U* create()
  855. {
  856. auto p = (Traits::malloc)(sizeof(U));
  857. return p != nullptr ? new (p) U : nullptr;
  858. }
  859. template<typename U, typename A1>
  860. static inline U* create(A1&& a1)
  861. {
  862. auto p = (Traits::malloc)(sizeof(U));
  863. return p != nullptr ? new (p) U(std::forward<A1>(a1)) : nullptr;
  864. }
  865. template<typename U>
  866. static inline void destroy(U* p)
  867. {
  868. if (p != nullptr) {
  869. p->~U();
  870. }
  871. (Traits::free)(p);
  872. }
  873. private:
  874. ConcurrentQueue inner;
  875. std::unique_ptr<LightweightSemaphore, void (*)(LightweightSemaphore*)> sema;
  876. };
  877. template<typename T, typename Traits>
  878. inline void swap(BlockingConcurrentQueue<T, Traits>& a, BlockingConcurrentQueue<T, Traits>& b) MOODYCAMEL_NOEXCEPT
  879. {
  880. a.swap(b);
  881. }
  882. } // end namespace moodycamel