tmc::qu_mpsc_unbounded#
Async MPSC unbounded queue using purely zero-copy operation. It is linearizable / FIFO. It can be configured by overriding the Config template parameter.
Producer (unrestricted) operations:
Consumer operations:
co_awaitpull()
Usage Examples#
pull() suspends until data is available. It returns a scoped zero-copy handle whose operator bool() / has_value() indicates the result.
#include "tmc/task.hpp"
#include "tmc/spawn.hpp"
#include "tmc/spawn_group.hpp"
#include "tmc/qu_mpsc_unbounded.hpp"
tmc::task<void> producer(tmc::qu_mpsc_unbounded<int>& q) {
for (int i = 0; i < 100; ++i) {
q.post(i);
}
}
tmc::task<void> consumer(tmc::qu_mpsc_unbounded<int>& q) {
// Loop automatically breaks once the queue empties after close()
while (auto data = co_await q.pull()) {
// v is a zero-copy reference to a T located in the queue storage
int& v = data.value();
// do something with v
}
}
tmc::task<void> queue_quickstart() {
tmc::qu_mpsc_unbounded<int> q;
// Start the consumer first
auto cons = tmc::spawn(consumer(q)).fork();
// Start multiple producers and wait for them to finish
auto sg = tmc::spawn_group();
for (size_t i = 0; i < 10; ++i) {
sg.add(producer(q));
}
co_await std::move(sg);
// Close the queue and wait for the consumer to drain all data
q.close();
co_await std::move(cons);
}
try_pull() is non-suspending and
must be polled. It returns a scoped zero-copy handle whose status() (or
operator bool()) indicates the result.
// Drains all data currently in the queue. Returns true if the queue was empty afterward,
// and false if the queue was closed afterward.
tmc::task<bool> consume_all_data(tmc::qu_mpsc_unbounded<int>& q) {
while(true) {
auto data = q.try_pull();
switch (data.status()) {
case tmc::qu_mpsc_unbounded_err::OK: {
// v is a zero-copy reference to a T located in the queue storage
int& v = data.value();
// do something with v
break;
}
case tmc::qu_mpsc_unbounded_err::EMPTY:
// No data available right now. Try again later.
co_return true;
case tmc::qu_mpsc_unbounded_err::CLOSED:
// The queue has been closed and drained. Do not try again later.
co_return false;
default:
std::unreachable();
}
}
}
API Reference#
-
template<typename T, typename Config = tmc::qu_mpsc_unbounded_default_config>
class qu_mpsc_unbounded# Public Functions
-
template<typename ...Args>
inline bool post(Args&&... ConstructArgs) noexcept# If the queue is open, this will always return true, indicating that an object of type T was enqueued by in-place construction, forwarding
ConstructArgsto T’s constructor.If the queue is closed, this will return false, and the object will not be enqueued.
If a consumer is currently suspended waiting for a value, it will be resumed once the the value is enqueued.
-
template<typename It>
inline bool post_bulk(It &&Items, size_t Count) noexcept# If the queue is open, this will always return true, indicating that Count elements, starting from the Begin iterator, were enqueued.
If the queue is closed, this will return false, and no items will be enqueued.
If a consumer is currently suspended waiting for a value, it will be resumed once the the values are enqueued.
Each item is moved (not copied) from the iterator into the queue.
The closed check is performed first, then space is pre-allocated, then all Count items are moved into the queue. Thus, there cannot be a partial success - either all or none of the items will be moved.
-
template<typename It>
inline bool post_bulk(It &&Begin, It &&End) noexcept# Calculates the number of elements via
size_t Count = End - Begin;If the queue is open, this will always return true, indicating that Count elements, starting from the Begin iterator, were enqueued.
If the queue is closed, this will return false, and no items will be enqueued.
If a consumer is currently suspended waiting for a value, it will be resumed once the the values are enqueued.
Each item is moved (not copied) from the iterator into the queue.
The closed check is performed first, then space is pre-allocated, then all Count items are moved into the queue. Thus, there cannot be a partial success - either all or none of the items will be moved.
-
template<typename Range>
inline bool post_bulk(Range &&R) noexcept# Calculates the number of elements via
size_t Count = Range.end() - Range.begin();If the queue is open, this will always return true, indicating that Count elements from the beginning of the range were enqueued.
If the queue is closed, this will return false, and no items will be enqueued.
If a consumer is currently suspended waiting for a value, it will be resumed once the the values are enqueued.
Each item is moved (not copied) from the iterator into the queue.
The closed check is performed first, then space is pre-allocated, then all Count items are moved into the queue. Thus, there cannot be a partial success - either all or none of the items will be moved.
-
inline void close() noexcept#
All future calls to
post()andpost_bulk()will immediately return false. Calls topull()andtry_pull()will continue to read data until all messages have been consumed, at which point all subsequent calls will immediately return an empty scope. If the queue was already empty, any waiting consumers will be awoken immediately and return an empty scope.close()is idempotent and safe to call from any thread.
-
inline void close_resume_inline() noexcept#
Closes the queue and resumes any waiting consumer inline on the caller’s thread instead of posting its continuation to its continuation executor. This should only be used when the caller knows that the waiting consumer may safely run on the caller’s thread.
Behaves like
close()in all other respects.close_resume_inline()is idempotent and safe to call from any thread.
-
inline bool empty()#
Returns true if the queue appears to be empty. A closed-and-drained queue is considered non-empty, and this will return false so that the consumer will call
try_pull()/pull()and observe the CLOSED status. This is an unsynchronized read (liketry_pull()), so it is only a hint. Only safe to call from the single consumer.
-
inline aw_pull pull() noexcept#
Await to dequeue. Returns a
pull_zc_scopewhich provides a scoped zero-copy reference to a value in the queue storage. When the scope is destroyed, the referenced value will be destroyed and the queue slot freed for reuse. Only safe to call from the single consumer.The returned scope’s
has_value()/operator bool()returns true if a value was dequeued, or false if the queue was closed and drained.This scope must be released before the next call to
try_pull()orpull(). It must also be released before the queue is destroyed.May suspend until a value is available, or until
close()is called.
-
inline try_pull_zc_scope try_pull()#
Attempts to immediately dequeue, returning a
try_pull_zc_scopewhich provides a scoped zero-copy reference to a value in the queue storage. When the scope is destroyed, the referenced value will be destroyed and the queue slot freed for reuse. Only safe to call from the single consumer.The returned scope’s
status()returns:qu_mpsc_unbounded_err::OK - a value was dequeued
qu_mpsc_unbounded_err::EMPTY - no value is currently available
qu_mpsc_unbounded_err::CLOSED - the queue has been closed and drained
The returned scope’s
has_value()/operator bool()returns true if a value was dequeued, or false if the queue was empty or closed.This scope must be released before the next call to
try_pull()orpull(). It must also be released before the queue is destroyed.
-
inline ~qu_mpsc_unbounded()#
Destroys the queue and any contained values that have not yet been consumed.
Before destroying this, you must ensure:
No producer is currently calling post() or post_bulk().
No consumer is calling or suspended in pull() / try_pull().
No pull_zc_scope / try_pull_zc_scope from this queue is alive.
No other thread is calling any other member function.
The recommended teardown sequence is:
Stop submitting new post() calls.
close() the queue.
Drain via pull() / try_pull() until CLOSED.
Ensure no further queue method calls will occur (e.g. by joining all producer and consumer coroutines).
Destroy the queue.
-
class aw_pull : private tmc::detail::AwaitTagNoGroupCoAwait#
Returns a
pull_zc_scopewhen awaited.
-
class pull_zc_scope#
A zero-copy handle to an object in the queue’s storage. The object is exclusively available to this handle. When this handle is destroyed, the queued object will be destroyed and the queue slot will be freed for reuse. Returned by
co_await pull().If the queue has been closed and is drained,
pull()will resume with an emptypull_zc_scope(operator bool returns false).Public Functions
-
inline pull_zc_scope() noexcept#
Constructs an empty scope. Evaluates to false when converted to bool.
-
inline explicit operator bool() const noexcept#
Returns true if this scope holds a value from the queue.
-
inline bool has_value() const noexcept#
Returns true if this scope holds a value from the queue.
-
inline T &value() noexcept#
Returns a reference to the object in the queue storage. Only valid to call if
operator bool()is true.
-
inline T &operator*() noexcept#
Returns a reference to the object in the queue storage. Only valid to call if
operator bool()is true.
-
inline T *operator->() noexcept#
Returns a pointer to the object in the queue storage. Only valid to call if
operator bool()is true.
-
TMC_FORCE_INLINE inline ~pull_zc_scope()#
Destroys the object in the queue storage and releases the queue slot.
-
inline pull_zc_scope() noexcept#
-
class try_pull_zc_scope#
A zero-copy handle to an object in the queue’s storage. The object is exclusively available to this handle. When this handle is destroyed, the queued object will be destroyed and the queue slot will be freed for reuse. Returned by
try_pull().The status of the pull is exposed via
status():qu_mpsc_unbounded_err::OKif a value is held,EMPTYif no value was available, orCLOSEDif the queue has been closed and drained.Public Functions
-
inline try_pull_zc_scope() noexcept#
Constructs an empty scope (status EMPTY). Evaluates to false when converted to bool.
-
inline explicit operator bool() const noexcept#
Returns true if this scope holds a value from the queue (status == OK).
-
inline bool has_value() const noexcept#
Returns true if this scope holds a value from the queue (status == OK).
-
inline tmc::qu_mpsc_unbounded_err status() const noexcept#
Returns the status of this pull: OK, EMPTY, or CLOSED.
-
inline T &value() noexcept#
Returns a reference to the object in the queue storage. Only valid to call if
status()is OK /operator bool()is true.
-
inline T &operator*() noexcept#
Returns a reference to the object in the queue storage. Only valid to call if
status()is OK /operator bool()is true.
-
inline T *operator->() noexcept#
Returns a pointer to the object in the queue storage. Only valid to call if
status()is OK /operator bool()is true.
-
inline ~try_pull_zc_scope()#
Destroys the object in the queue storage and releases the queue slot.
-
inline try_pull_zc_scope() noexcept#
-
template<typename ...Args>
-
enum class tmc::qu_mpsc_unbounded_err#
Status code returned by qu_mpsc_unbounded.try_pull().status()
Values:
-
enumerator OK#
-
enumerator EMPTY#
-
enumerator CLOSED#
-
enumerator OK#
-
struct qu_mpsc_unbounded_default_config#
Subclassed by tmc::detail::braid_qu_config
Public Static Attributes
-
static constexpr bool ConsumerCanSuspend = true#
If true, enables the suspending
pull()operation. This costs each producer an additional locked operation to check for a waiting consumer.
-
static constexpr size_t BlockSize = 4096#
The number of elements that can be stored in each block in the qu_mpsc_unbounded linked list.
-
static constexpr size_t PackingLevel = 0#
At level 0, queue elements will be padded up to the next increment of 64 bytes. This reduces false sharing between neighboring elements. At level 1, no padding will be applied.
-
static constexpr bool EmbedFirstBlock = false#
If true, the first storage block will be a member of the qu_mpsc_unbounded object (instead of dynamically allocated). Subsequent storage blocks are always dynamically allocated.
-
static constexpr bool ConsumerCanSuspend = true#