Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge develop to main #18

Merged
merged 7 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Wait Queue, a Header-Only C++ 20 MPMC Thread-Safe Queue
# Wait Queue, a Header-Only C++ 20 MPMC Thread-Safe Queue With Shutdown Semantics

#### Unit Test and Documentation Generation Workflow Status

Expand All @@ -16,8 +16,12 @@

`wait_queue` is a multi-reader, multi-writer FIFO thread-safe wait queue (often called MPMC for multiple producer / multiple consumer) for transferring data between threads. It is templatized on the type of data passed through the queue as well as the queue container type. Data is passed with value semantics, either by copying or by moving (as opposed to a queue that transfers data by pointer or reference). The wait queue has both wait and no-wait pop semantics. A fixed size container (e.g. a `ring_span`) can be used, eliminating any and all dynamic memory management (useful in embedded or deterministic environments). Similarly, a circular buffer that only allocates on construction can be used, which eliminates dynamic memory management when pushing or popping values on or off the queue.

Shutdown semantics are available through `std::stop_token` facilities. A `std::stop_token` can be passed in through the constructors, allowing shutdown to be requested externally to the `wait_queue`, or shutdown can be requested through the `wait_queue request_stop` method.

Thanks go to [Louis Langholtz](https://github.com/louis-langholtz) for adding DBC (Design by Contract) asserts and comments.

Concepts and various type constraints have been added. Enhancements are always appreciated.

## Generated Documentation

The generated Doxygen documentation for `wait_queue` is [here](https://connectivecpp.github.io/wait-queue/).
Expand All @@ -28,11 +32,11 @@ The `wait_queue` header file does not have any third-party dependencies. It uses

## C++ Standard

`wait_queue` uses C++ 20 features, including `std::stop_token`, `std::stop_source`, `std::condition_variable_any`, `std::scoped_lock`, and `concepts` / `requires`.
`wait_queue` uses C++ 20 features, including `std::stop_token`, `std::stop_source`, `std::condition_variable_any`, `std::scoped_lock`, `concepts`, and `requires` clauses.

## Supported Compilers

Continuous integration workflows build and unit test on g++ (through Ubuntu) and MSVC (through Windows). Note that clang support for C++ 20 `std::jthread` and `std::stop_token` is still experimental (and possibly incomplete) as of May 2024, so has not (yet) been tested with `wait_queue`.
Continuous integration workflows build and unit test on g++ (through Ubuntu) and MSVC (through Windows). Note that clang support for C++ 20 `std::jthread` and `std::stop_token` is still experimental (and possibly incomplete) as of Sep 2024, so has not (yet) been tested with `wait_queue`.

## Unit Test Dependencies

Expand Down
152 changes: 103 additions & 49 deletions include/queue/wait_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,11 @@
*
* The container type must support the following (depending on which
* methods are called): default construction, construction with an initial size,
* @c push_back (preferably overloaded for both copy and move),
* @c push_back (preferably overloaded for both copy and move semantics),
* @c emplace_back (with a template parameter pack), @c front, @c pop_front,
* @c empty, and @c size. The container must also have a @c size_type
* defined.
* defined. Type constraints and concepts are defined for the various
* type requirements.
*
* Iterators on a @c wait_queue are not supported, due to obvious difficulties
* with maintaining consistency and integrity. The @c apply method can be used to
Expand Down Expand Up @@ -157,12 +158,43 @@
#include <condition_variable>
#include <stop_token> // std::stop_source, std::stop_token
#include <optional>
#include <utility> // std::move, std::move_if_noexcept
#include <type_traits> // for noexcept specs
#include <utility> // std::move, std::move_if_noexcept, std::forward
#include <type_traits> // for requires clauses and noexcept specs
// #include <concepts>

namespace chops {

// requirements for wait_queue container

template <typename Ctr, typename T>
concept supports_push_back = requires (Ctr ctr, T val) {
ctr.push_back(val);
};

template <typename Ctr, typename ... Args>
concept supports_emplace_back = requires (Ctr ctr, Args&& ... args) {
ctr.emplace_back(args ...);
};

template <typename Ctr>
concept supports_empty = requires (Ctr ctr) {
ctr.empty();
};

template <typename Ctr>
concept supports_pop_front = requires (Ctr ctr) {
ctr.pop_front();
};

template <typename Ctr>
concept supports_size = requires (Ctr ctr) {
ctr.size();
};

// declaration for wait_queue

template <typename T, typename Container = std::deque<T> >
requires std::is_copy_constructible_v<T> || std::is_move_constructible_v<T>
class wait_queue {
private:
mutable std::mutex m_mut;
Expand Down Expand Up @@ -195,8 +227,10 @@ class wait_queue {
* @post @c stop_requested returns @c false.
*/
wait_queue()
// noexcept(std::is_nothrow_constructible<Container>::value)
: m_stop_src(std::stop_source{}), m_stop_tok((*m_stop_src).get_token()) {
requires std::is_default_constructible_v<Container>
// noexcept(std::is_nothrow_constructible_v<Container>)
: m_stop_src(std::stop_source{}), m_stop_tok((*m_stop_src).get_token())
{
assert(empty());
assert(size() == size_type(0));
assert(!stop_requested());
Expand All @@ -212,8 +246,10 @@ class wait_queue {
* @post @c size returns 0.
*/
wait_queue(std::stop_token stop_tok)
// noexcept(std::is_nothrow_constructible<Container>::value)
: m_stop_tok(stop_tok) {
requires std::is_default_constructible_v<Container>
// noexcept(std::is_nothrow_constructible_v<Container, std::stop_token>)
: m_stop_tok(stop_tok)
{
assert(empty());
assert(size() == size_type(0));
}
Expand All @@ -236,15 +272,16 @@ class wait_queue {
* @param container Container object to be moved from (or copied from if not
* movable).
*
* @post @c empty returns @c true if @c beg equals @c end otherwise returns @c false.
* @post @c size returns the distance between @c beg and @c end parameters.
* @post @c empty and @c size match moved (or copied) in container.
*/
wait_queue(Container&& container)
// noexcept(std::is_ something movable or maybe copyable )
requires std::is_move_constructible_v<Container> ||
std::is_copy_constructible_v<Container>
// noexcept(std::is_nnthrow_constructible_v<Container, Container&&>)
: m_stop_src(std::stop_source{}), m_stop_tok((*m_stop_src).get_token()),
m_data_queue(std::move(container)) {
// assert(empty() == (beg == end));
// assert((size() == size_type(0)) == (beg == end)); // std::distance constrains beg, end.
m_data_queue(std::move(container))
{
// not easily assertible until contracts added to C++
}

/**
Expand All @@ -257,14 +294,14 @@ class wait_queue {
* @param container Container object to be moved from (or copied from if not
* movable).
*
* @post @c empty returns @c true if @c beg equals @c end otherwise returns @c false.
* @post @c size returns the distance between @c beg and @c end parameters.
* @post @c empty and @c size match moved (or copied) in container.
*/
wait_queue(std::stop_token stop_tok, Container&& container)
// noexcept(std::is_nothrow_constructible<Container, Iter, Iter>::value)
: m_stop_tok(stop_tok), m_data_queue(std::move(container)) {
// assert(empty() == (beg == end));
// assert((size() == size_type(0)) == (beg == end)); // std::distance constrains beg, end.
requires std::is_move_constructible_v<Container> ||
std::is_copy_constructible_v<Container>
// noexcept(std::is_nothrow_constructible_v<Container, std::stop_token, Container&&>)
: m_stop_tok(stop_tok), m_data_queue(std::move(container))
{
}

/**
Expand All @@ -290,9 +327,11 @@ class wait_queue {
* @post @c size returns 0 or @c sz depending on container used.
*/
wait_queue(size_type sz)
// noexcept(std::is_nothrow_constructible<Container, size_type>::value)
requires std::is_constructible_v<Container, size_type>
// noexcept(std::is_nothrow_constructible_v<Container, size_type>)
: m_stop_src(std::stop_source{}), m_stop_tok((*m_stop_src).get_token()),
m_data_queue(sz) {
m_data_queue(sz)
{
assert((sz != size_type(0)) || empty());
assert((size() == size_type(0)) || (size() == sz));
}
Expand All @@ -310,8 +349,10 @@ class wait_queue {
* @post @c size returns 0 or @c sz depending on container used.
*/
wait_queue(std::stop_token stop_tok, size_type sz)
// noexcept(std::is_nothrow_constructible<Container, size_type>::value)
: m_stop_tok((*m_stop_src).get_token()), m_data_queue(sz) {
requires std::is_constructible_v<Container, std::stop_token, size_type>
// noexcept(std::is_nothrow_constructible_v<Container, std::stop_token, size_type>)
: m_stop_tok((*m_stop_src).get_token()), m_data_queue(sz)
{
assert((sz != size_type(0)) || empty());
assert((size() == size_type(0)) || (size() == sz));
}
Expand Down Expand Up @@ -341,13 +382,12 @@ class wait_queue {
* external @c std::stop_token was passed in.
*/
auto request_stop() noexcept
-> bool {

-> bool
{
if (m_stop_src) {
return (*m_stop_src).request_stop();
}
return false;

}

/**
Expand All @@ -364,9 +404,11 @@ class wait_queue {
* @post If @c true is returned and @c empty is @c false, one of any threads waiting for a
* value will be unblocked.
*/
auto push(const T& val) /* noexcept(std::is_nothrow_copy_constructible<T>::value) */
-> bool {
auto push(const T& val) /* noexcept(std::is_nothrow_copy_constructible_v<T>) */
-> bool
requires supports_push_back<Container, T>

{
if (m_stop_tok.stop_requested()) {
return false;
}
Expand All @@ -386,9 +428,11 @@ class wait_queue {
* @post If @c true is returned and @c empty is @c false, one of any threads waiting for a
* value will be unblocked.
*/
auto push(T&& val) /* noexcept(std::is_nothrow_move_constructible<T>::value) */
-> bool {
auto push(T&& val) /* noexcept(std::is_nothrow_move_constructible_v<T>) */
-> bool
requires supports_push_back<Container, T>

{
if (m_stop_tok.stop_requested()) {
return false;
}
Expand Down Expand Up @@ -416,17 +460,18 @@ class wait_queue {
* value will be unblocked.
*/
template <typename ... Args>
auto emplace_push(Args &&... args) /* noexcept(std::is_nothrow_constructible<T, Args...>::value)*/
-> bool {

auto emplace_push(Args &&... args) /* noexcept(std::is_nothrow_constructible_v<T, Args...>)*/
-> bool
requires supports_emplace_back<Container, Args...>

{
if (m_stop_tok.stop_requested()) {
return false;
}
lock_guard lk{m_mut};
m_data_queue.emplace_back(std::forward<Args>(args)...);
m_data_cond.notify_one();
return true;

}

/**
Expand All @@ -443,9 +488,11 @@ class wait_queue {
* @post If a non empty value is returned, until a push function is called, @c size is one
* less than before this function was called.
*/
auto wait_and_pop() /* noexcept(std::is_nothrow_constructible<T>::value) */
-> std::optional<T> {
[[nodiscard]] auto wait_and_pop() /* noexcept(std::is_nothrow_constructible_v<T>) */
-> std::optional<T>
requires supports_empty<Container> && supports_pop_front<Container>

{
std::unique_lock<std::mutex> lk{m_mut};
if (!m_data_cond.wait ( lk, m_stop_tok, [this] { return !m_data_queue.empty(); } )) {
return std::optional<T> {}; // queue was request to stop, no data available
Expand All @@ -472,9 +519,10 @@ class wait_queue {
* @post If a non empty value is returned, until a push function is called, @c size is one
* less than before this function was called.
*/
auto try_pop() /* noexcept(std::is_nothrow_constructible<T>::value) */
-> std::optional<T> {

[[nodiscard]] auto try_pop() /* noexcept(std::is_nothrow_constructible_v<T>) */
-> std::optional<T>
requires supports_empty<Container> && supports_pop_front<Container>
{
if (m_stop_tok.stop_requested()) {
return std::optional<T> {};
}
Expand Down Expand Up @@ -520,9 +568,11 @@ class wait_queue {
* same @c wait_queue since it results in recursive mutex locks.
*/
template <typename F>
auto apply(F&& func) const /* noexcept(std::is_nothrow_invocable<F&&, const T&>::value) */
-> void {
auto apply(F&& func) const /* noexcept(std::is_nothrow_invocable_v<F&&, const T&>) */
-> void
requires std::is_invocable_v<F, T>

{
lock_guard lk{m_mut};
for (const T& elem : m_data_queue) {
func(elem);
Expand All @@ -536,21 +586,23 @@ class wait_queue {
*
* @return @c true if the @c stop_requested has been called.
*/
auto stop_requested() const noexcept
-> bool {
[[nodiscard]] auto stop_requested() const noexcept
-> bool

{
return m_stop_tok.stop_requested();

}

/**
* Query whether the @c wait_queue is empty or not.
*
* @return @c true if the @c wait_queue is empty.
*/
auto empty() const /* noexcept */
-> bool {
[[nodiscard]] auto empty() const /* noexcept */
-> bool
requires supports_empty<Container>

{
lock_guard lk{m_mut};
return m_data_queue.empty();

Expand All @@ -561,9 +613,11 @@ class wait_queue {
*
* @return Number of elements in the @c wait_queue.
*/
auto size() const /* noexcept */
-> size_type {
[[nodiscard]] auto size() const /* noexcept */
-> size_type
requires supports_size<Container>

{
lock_guard lk{m_mut};
return m_data_queue.size();

Expand Down