tmc::qu_spsc_bounded#

Async SPSC bounded queue using purely zero-copy operation. It is linearizable / FIFO. It can be configured by overriding the Config template parameter.

Producer operations:

Consumer operations:

Usage Examples#

pull() suspends until data is available. It returns a scoped zero-copy handle whose operator bool() / has_value() indicates the result.

#include "tmc/task.hpp"
#include "tmc/spawn_tuple.hpp"
#include "tmc/qu_spsc_bounded.hpp"

tmc::task<void> producer(tmc::qu_spsc_bounded<int>& q) {
  for (int i = 0; i < 100; ++i) {
    // If the queue is full, this will suspend until a slot becomes free
    co_await q.push(i);
  }
  q.close();
}

tmc::task<void> consumer(tmc::qu_spsc_bounded<int>& q) {
  // Loop automatically breaks once the queue drains after close()
  while (auto data = co_await q.pull()) {
    // v is a zero-copy reference to a T located in the queue storage
    int& v = data.value();
    // do something with v
  }
}

tmc::task<void> queue_quickstart() {
  // Set queue capacity
  tmc::qu_spsc_bounded<int> q(10);
  co_await tmc::spawn_tuple(producer(q), consumer(q));
}

try_pull() is non-suspending and must be polled. It returns a scoped zero-copy handle whose status() (or operator bool()) indicates the result.

// Drains all data currently in the queue. Returns true if the queue was empty afterward,
// and false if the queue was closed afterward.
tmc::task<bool> consume_all_data(tmc::qu_spsc_bounded<int>& q) {
  while(true) {
    auto data = q.try_pull();
    switch (data.status()) {
      case tmc::qu_spsc_bounded_err::OK: {
        // v is a zero-copy reference to a T located in the queue storage
        int& v = data.value();
        // do something with v
        break;
      }
      case tmc::qu_spsc_bounded_err::EMPTY:
        // No data available right now. Try again later.
        co_return true;
      case tmc::qu_spsc_bounded_err::CLOSED:
        // The queue has been closed and drained. Do not try again later.
        co_return false;
      default:
        std::unreachable();
    }
  }
}

API Reference#

template<typename T, typename Config = tmc::qu_spsc_bounded_default_config>
class qu_spsc_bounded#

Public Functions

inline explicit qu_spsc_bounded(size_t Capacity) noexcept#

Constructs a qu_spsc_bounded with the given capacity.

template<typename ...Args>
inline aw_push<Args...> push(Args&&... ConstructArgs) noexcept#

Enqueues a new value in the queue by in-place construction, forwarding ConstructArgs to T’s constructor. Only safe to call from the single producer.

Returns an awaitable. If the queue is full, the producer suspends until the consumer reads a value and frees a slot. Otherwise it completes synchronously.

The awaited result is bool: true if the value was enqueued, false if the queue was closed and the value was not enqueued.

If a consumer is currently suspended waiting for a value, it will be resumed once the the value is enqueued.

You must not call this after calling close().

LIFETIME REQUIREMENT: the returned awaitable holds the arguments by reference (T& for lvalues, T&& for rvalues / temporaries). If you pass a temporary into this, you must co_await it immediately, so the lifetime of the argument can be extended to the end of the full-expression.

// Safe: the temporary T's lifetime is extended to the end of the
// full-expression.
co_await q.push(T{...});

// Unsafe: `a` holds a dangling reference to the temporary T
auto a = q.push(T{...});
co_await std::move(a);

// Safe: passing a reference to a named variable
auto v = T{...};
auto a = q.push(std::move(v));
co_await std::move(a);

template<typename ...Args>
inline bool try_push(Args&&... ConstructArgs) noexcept#

Non-suspending counterpart to push(). Constructs a new value in the queue, forwarding ConstructArgs to T’s constructor. Only safe to call from the single producer.

Returns true if the value was successfully pushed, or false if the queue was full (in which case no value is enqueued, ConstructArgs are not used, and the queue is not modified).

If a consumer is currently suspended waiting for a value, it will be resumed if a value was enqueued.

You must not call this after calling close().

template<typename It>
inline aw_push_bulk<std::remove_cvref_t<It>> push_bulk(It &&Items, size_t Count) noexcept#

Moves Count values from the iterator Items into the queue. Only safe to call from the single producer. Count must be no greater than the queue capacity passed to the constructor; if more elements need to be submitted, call this function in a loop.

Returns an awaitable that must be co_awaited. If the queue does not have Count free slots, the producer suspends until enough slots have been freed by the consumer to fit all Count elements at once. Otherwise it completes synchronously.

If a consumer is currently suspended waiting for a value, it will be resumed once the the values are enqueued.

You must not call this after calling close().

template<typename It>
inline aw_push_bulk<std::remove_cvref_t<It>> push_bulk(It &&Begin, It &&End) noexcept#

Calculates the number of elements via size_t Count = End - Begin; and moves them from the iterator Begin into the queue. Only safe to call from the single producer. The number of elements must be no greater than the queue capacity passed to the constructor; if more elements need to be submitted, call this function in a loop.

Returns an awaitable that must be co_awaited. See the (Items, Count) overload for suspension behavior.

If a consumer is currently suspended waiting for a value, it will be resumed once the the values are enqueued.

You must not call this after calling close().

template<typename Range>
inline auto push_bulk(Range &&R) noexcept#

Calculates the number of elements via size_t Count = Range.end() - Range.begin(); and moves them from the beginning of the range into the queue. Only safe to call from the single producer. The number of elements must be no greater than the queue capacity passed to the constructor; if more elements need to be submitted, call this function in a loop.

Returns an awaitable that must be co_awaited. See the (Items, Count) overload for suspension behavior.

If a consumer is currently suspended waiting for a value, it will be resumed once the the values are enqueued.

You must not call this after calling close().

inline void close() noexcept#

Closes the queue. May only be called from the single producer. After close() returns, the producer must not call push(), try_push(), or push_bulk() again. Calls to pull() and try_pull() will continue to read data until all messages have been consumed, at which point all subsequent calls will immediately return an empty scope. If the queue was already empty, any waiting consumers will be awoken immediately and return an empty scope.

close() is idempotent.

inline void close_resume_inline() noexcept#

Closes the queue and resumes any waiting consumer inline on the caller’s thread instead of posting its continuation to its continuation executor. This should only be used when the caller knows that the waiting consumer may safely run on the caller’s thread.

Behaves like close() in all other respects. close_resume_inline() is idempotent. May only be called from the single producer.

inline bool empty()#

Returns true if the queue appears to be empty. A closed-and-drained queue is considered non-empty, and this will return false so that the consumer will call try_pull() / pull() and observe the CLOSED status. This is an unsynchronized read (like try_pull()), so it is only a hint. Only safe to call from the single consumer.

inline aw_pull pull() noexcept#

Await to dequeue. Returns a pull_zc_scope which provides a scoped zero-copy reference to a value in the queue storage. When the scope is destroyed, the referenced value will be destroyed and the queue slot freed for reuse. Only safe to call from the single consumer.

The returned scope’s has_value() / operator bool() returns true if a value was dequeued, or false if the queue was closed and drained.

This scope must be released before the next call to try_pull() or pull(). It must also be released before the queue is destroyed.

May suspend until a value is available, or until close() is called.

inline try_pull_zc_scope try_pull()#

Attempts to immediately dequeue, returning a try_pull_zc_scope which provides a scoped zero-copy reference to a value in the queue storage. When the scope is destroyed, the referenced value will be destroyed and the queue slot freed for reuse. Only safe to call from the single consumer.

The returned scope’s status() returns:

  • qu_spsc_bounded_err::OK - a value was dequeued

  • qu_spsc_bounded_err::EMPTY - no value is currently available

  • qu_spsc_bounded_err::CLOSED - the queue has been closed and drained

The returned scope’s has_value() / operator bool() returns true if a value was dequeued, or false if the queue was empty or closed.

This scope must be released before the next call to try_pull() or pull(). It must also be released before the queue is destroyed.

inline ~qu_spsc_bounded()#

Destroys the queue and any contained values that have not yet been consumed.

Before destroying this, you must ensure:

The recommended teardown sequence is:

  1. Stop submitting new push() calls.

  2. close() the queue.

  3. Drain via pull() / try_pull() until CLOSED.

  4. Ensure no further queue method calls will occur (e.g. by joining all producer and consumer coroutines).

  5. Destroy the queue.

class aw_pull : private tmc::detail::AwaitTagNoGroupCoAwait#

Returns a pull_zc_scope when awaited.

template<typename ...Args>
class aw_push : private tmc::detail::AwaitTagNoGroupCoAwait#

Returns void when awaited.

template<typename It>
class aw_push_bulk : private tmc::detail::AwaitTagNoGroupCoAwait#

Returns void when awaited.

class pull_zc_scope#

A zero-copy handle to an object in the queue’s storage. The object is exclusively available to this handle. When this handle is destroyed, the queued object will be destroyed and the queue slot will be freed for reuse. Returned by co_await pull().

If the queue has been closed and is drained, pull() will resume with an empty pull_zc_scope (operator bool returns false).

Public Functions

inline pull_zc_scope() noexcept#

Constructs an empty scope. Evaluates to false when converted to bool.

inline explicit operator bool() const noexcept#

Returns true if this scope holds a value from the queue.

inline bool has_value() const noexcept#

Returns true if this scope holds a value from the queue.

inline T &value() noexcept#

Returns a reference to the object in the queue storage. Only valid to call if operator bool() is true.

inline T &operator*() noexcept#

Returns a reference to the object in the queue storage. Only valid to call if operator bool() is true.

inline T *operator->() noexcept#

Returns a pointer to the object in the queue storage. Only valid to call if operator bool() is true.

TMC_FORCE_INLINE inline ~pull_zc_scope()#

Destroys the object in the queue storage and releases the queue slot.

class try_pull_zc_scope#

A zero-copy handle to an object in the queue’s storage. The object is exclusively available to this handle. When this handle is destroyed, the queued object will be destroyed and the queue slot will be freed for reuse. Returned by try_pull().

The status of the pull is exposed via status(): qu_spsc_bounded_err::OK if a value is held, EMPTY if no value was available, or CLOSED if the queue has been closed and drained.

Public Functions

inline try_pull_zc_scope() noexcept#

Constructs an empty scope (status EMPTY). Evaluates to false when converted to bool.

inline explicit operator bool() const noexcept#

Returns true if this scope holds a value from the queue (status == OK).

inline bool has_value() const noexcept#

Returns true if this scope holds a value from the queue (status == OK).

inline tmc::qu_spsc_bounded_err status() const noexcept#

Returns the status of this pull: OK, EMPTY, or CLOSED.

inline T &value() noexcept#

Returns a reference to the object in the queue storage. Only valid to call if status() is OK / operator bool() is true.

inline T &operator*() noexcept#

Returns a reference to the object in the queue storage. Only valid to call if status() is OK / operator bool() is true.

inline T *operator->() noexcept#

Returns a pointer to the object in the queue storage. Only valid to call if status() is OK / operator bool() is true.

inline ~try_pull_zc_scope()#

Destroys the object in the queue storage and releases the queue slot.

enum class tmc::qu_spsc_bounded_err#

Status code returned by qu_spsc_bounded.try_pull().status()

Values:

enumerator OK#
enumerator EMPTY#
enumerator CLOSED#
struct qu_spsc_bounded_default_config#

Public Static Attributes

static constexpr bool ConsumerCanSuspend = true#

If true, enables the suspending pull() operation. This costs the producer an additional locked operation to check for a waiting consumer.

static constexpr size_t PackingLevel = 1#

At level 0, queue elements will be padded up to the next increment of 64 bytes. This reduces false sharing between neighboring elements. At level 1, no padding will be applied. The SPSC queue is packed by default to improve cache coherency for the single producer.