.. _qu_mpsc_bounded:

tmc::qu_mpsc_bounded
-----------------------------------------------------------------------------------

Async MPSC bounded queue using purely zero-copy operation. It is linearizable / FIFO. It can be configured by overriding the Config template parameter.

Producer (unrestricted) operations:

* ``co_await`` :cpp:func:`push()<tmc::qu_mpsc_bounded::push>`
* :cpp:func:`close()<tmc::qu_mpsc_bounded::close>`
* :cpp:func:`close_resume_inline()<tmc::qu_mpsc_bounded::close_resume_inline>`

Consumer operations:

* ``co_await`` :cpp:func:`pull()<tmc::qu_mpsc_bounded::pull>`
* :cpp:func:`try_pull()<tmc::qu_mpsc_bounded::try_pull>`
* :cpp:func:`empty()<tmc::qu_mpsc_bounded::empty>`

Usage Examples
-----------------------------------------------------------------------------------

.. tab:: Using pull()

   :cpp:func:`pull()<tmc::qu_mpsc_bounded::pull>` suspends until data is available. It returns a scoped zero-copy handle whose ``operator bool()`` / ``has_value()`` indicates the result.

   .. code-block:: cpp

      #include "tmc/task.hpp"
      #include "tmc/spawn.hpp"
      #include "tmc/spawn_group.hpp"
      #include "tmc/qu_mpsc_bounded.hpp"

      tmc::task<void> producer(tmc::qu_mpsc_bounded<int>& q) {
        for (int i = 0; i < 100; ++i) {
          // If the queue is full, this will suspend until a slot becomes free
          co_await q.push(i);
        }
      }

      tmc::task<void> consumer(tmc::qu_mpsc_bounded<int>& q) {
        // Loop automatically breaks once the queue empties after close()
        while (auto data = co_await q.pull()) {
          // v is a zero-copy reference to a T located in the queue storage
          int& v = data.value();
          // do something with v
        }
      }

      tmc::task<void> queue_quickstart() {
        // Set queue capacity
        tmc::qu_mpsc_bounded<int> q(10);

        // Start the consumer first
        auto cons = tmc::spawn(consumer(q)).fork();

        // Start multiple producers and wait for them to finish
        auto sg = tmc::spawn_group();
        for (size_t i = 0; i < 10; ++i) {
          sg.add(producer(q));
        }
        co_await std::move(sg);

        // Close the queue and wait for the consumer to drain all data
        q.close();
        co_await std::move(cons);
      }

.. tab:: Using try_pull()

   :cpp:func:`try_pull()<tmc::qu_mpsc_bounded::try_pull>` is non-suspending and
   must be polled. It returns a scoped zero-copy handle whose ``status()`` (or
   ``operator bool()``) indicates the result.

   .. code-block:: cpp

      // Drains all data currently in the queue. Returns true if the queue was empty afterward,
      // and false if the queue was closed afterward.
      tmc::task<bool> consume_all_data(tmc::qu_mpsc_bounded<int>& q) {
        while(true) {
          auto data = q.try_pull();
          switch (data.status()) {
            case tmc::qu_mpsc_bounded_err::OK: {
              // v is a zero-copy reference to a T located in the queue storage
              int& v = data.value();
              // do something with v
              break;
            }
            case tmc::qu_mpsc_bounded_err::EMPTY:
              // No data available right now. Try again later.
              co_return true;
            case tmc::qu_mpsc_bounded_err::CLOSED:
              // The queue has been closed and drained. Do not try again later.
              co_return false;
            default:
              std::unreachable();
          }
        }
      }

API Reference
-----------------------------------------------------------------------------------
.. doxygenclass:: tmc::qu_mpsc_bounded
  :members:

.. doxygenenum:: tmc::qu_mpsc_bounded_err

.. doxygenstruct:: tmc::qu_mpsc_bounded_default_config
  :members:
