diff --git a/runtime-light/coroutine/awaitable.h b/runtime-light/coroutine/awaitable.h index 84c75f5936..c27ab80eb3 100644 --- a/runtime-light/coroutine/awaitable.h +++ b/runtime-light/coroutine/awaitable.h @@ -21,7 +21,6 @@ #include "runtime-light/stdlib/fork/fork-context.h" #include "runtime-light/stdlib/fork/fork.h" #include "runtime-light/utils/context.h" -#include "runtime-light/stdlib/fork/wait-queue-context.h" template concept Awaitable = requires(T awaitable, std::coroutine_handle<> coro) { @@ -435,43 +434,5 @@ class wait_with_timeout_t { } }; -// ================================================================================================ - -class wait_queue_next_t { - int64_t queue_id; - wait_for_timer_t timer_awaiter; - -public: - explicit wait_queue_next_t(int64_t queue_id_, std::chrono::nanoseconds timeout_) noexcept - : queue_id(queue_id_) - , timer_awaiter(timeout_) {} - - bool await_ready() const noexcept { - if (auto queue = WaitQueueContext::get().get_queue(queue_id); queue.has_value()) { - return queue.val()->empty() || queue.val()->has_ready_fork(); - } - return true; - } - - void await_suspend(std::coroutine_handle<> coro) noexcept { - auto queue = WaitQueueContext::get().get_queue(queue_id); - queue.val()->push_coro_handle(coro); - timer_awaiter.await_suspend(coro); - } - - Optional await_resume() noexcept { - auto queue = WaitQueueContext::get().get_queue(queue_id); - queue.val()->pop_coro_handle(); - auto ready_fork = queue.val()->pop_fork(); - if (ready_fork.has_value()) { - timer_awaiter.cancel(); - // todo set here info about prev fork_id - return ForkComponentContext::get().push_fork(std::move(ready_fork.val().second)); - } else { - return {}; - } - } -}; - template wait_with_timeout_t(T &&, std::chrono::nanoseconds) -> wait_with_timeout_t; diff --git a/runtime-light/stdlib/fork/fork-api.cpp b/runtime-light/stdlib/fork/fork-api.cpp index 8bb49e744a..55160350c8 100644 --- a/runtime-light/stdlib/fork/fork-api.cpp +++ b/runtime-light/stdlib/fork/fork-api.cpp @@ -24,34 +24,15 @@ task_t f$sched_yield_sleep(int64_t duration_ns) noexcept { co_await wait_for_timer_t{std::chrono::nanoseconds{static_cast(duration_ns)}}; } -int64_t f$wait_queue_create(array> fork_ids) noexcept { - return WaitQueueContext::get().create_queue(fork_ids); -} - -void f$wait_queue_push(int64_t queue_id, Optional fork_id) noexcept { - if (auto queue = WaitQueueContext::get().get_queue(queue_id); queue.has_value() && fork_id.has_value()) { - auto task = ForkComponentContext::get().pop_fork(fork_id.val()); - if (task.has_value()) { - php_debug("push fork %ld, to queue %ld", fork_id.val(), queue_id); - queue.val()->push_fork(fork_id.val(), std::move(task.val())); - } - } -} - -bool f$wait_queue_empty(int64_t queue_id) noexcept { - if (auto queue = WaitQueueContext::get().get_queue(queue_id); queue.has_value()) { - return queue.val()->empty(); - } - return false; -} - task_t> f$wait_queue_next(int64_t queue_id, double timeout) noexcept { - if (WaitQueueContext::get().get_queue(queue_id).has_value()) { - if (timeout < 0.0) { - timeout = fork_api_impl_::WAIT_FORK_MAX_TIMEOUT; - } - const auto timeout_ns{std::chrono::duration_cast(std::chrono::duration{timeout})}; - co_return co_await wait_queue_next_t{queue_id, timeout_ns}; + static_assert(CancellableAwaitable); + auto queue = WaitQueueContext::get().get_queue(queue_id); + if (!queue.has_value()) { + co_return Optional{}; } - co_return Optional{}; + const auto timeout_ns{timeout > 0 && timeout <= fork_api_impl_::MAX_TIMEOUT_S + ? std::chrono::duration_cast(std::chrono::duration{timeout}) + : fork_api_impl_::DEFAULT_TIMEOUT_NS}; + auto result_opt{co_await wait_with_timeout_t{queue.val()->pop(), timeout_ns}}; + co_return result_opt.has_value() ? result_opt.value() : Optional{}; } diff --git a/runtime-light/stdlib/fork/fork-api.h b/runtime-light/stdlib/fork/fork-api.h index a5a58c2171..2c3cc87dea 100644 --- a/runtime-light/stdlib/fork/fork-api.h +++ b/runtime-light/stdlib/fork/fork-api.h @@ -45,10 +45,21 @@ task_t f$sched_yield() noexcept; task_t f$sched_yield_sleep(int64_t duration_ns) noexcept; -int64_t f$wait_queue_create(array> fork_ids) noexcept; +inline int64_t f$wait_queue_create(array> fork_ids) noexcept { + return WaitQueueContext::get().create_queue(fork_ids); +} -void f$wait_queue_push(int64_t queue, Optional fork_id) noexcept; +inline void f$wait_queue_push(int64_t queue_id, Optional fork_id) noexcept { + if (auto queue = WaitQueueContext::get().get_queue(queue_id); queue.has_value() && fork_id.has_value()) { + queue.val()->push(fork_id.val()); + } +} -bool f$wait_queue_empty(int64_t queue_id) noexcept; +inline bool f$wait_queue_empty(int64_t queue_id) noexcept { + if (auto queue = WaitQueueContext::get().get_queue(queue_id); queue.has_value()) { + return queue.val()->empty(); + } + return false; +} task_t> f$wait_queue_next(int64_t queue, double timeout = -1.0) noexcept; diff --git a/runtime-light/stdlib/fork/fork-context.h b/runtime-light/stdlib/fork/fork-context.h index 57c67ec1af..6249e2a1af 100644 --- a/runtime-light/stdlib/fork/fork-context.h +++ b/runtime-light/stdlib/fork/fork-context.h @@ -11,21 +11,24 @@ #include "runtime-core/utils/kphp-assert-core.h" #include "runtime-light/coroutine/task.h" #include "runtime-light/stdlib/fork/fork.h" +#include "runtime-light/stdlib/fork/wait-queue-context.h" #include "runtime-light/utils/concepts.h" constexpr int64_t INVALID_FORK_ID = -1; class ForkComponentContext { + enum class ForkStatus { available, reserved }; + template using unordered_map = memory_resource::stl::unordered_map; static constexpr auto FORK_ID_INIT = 0; - unordered_map> forks; + unordered_map, ForkStatus>> forks; int64_t next_fork_id{FORK_ID_INIT + 1}; int64_t push_fork(task_t &&task) noexcept { - return forks.emplace(next_fork_id, std::move(task)), next_fork_id++; + return forks.emplace(next_fork_id, std::make_pair(std::move(task), ForkStatus::available)), next_fork_id++; } task_t pop_fork(int64_t fork_id) noexcept { @@ -34,21 +37,48 @@ class ForkComponentContext { php_critical_error("can't find fork %" PRId64, fork_id); } auto fork{std::move(it_fork->second)}; + php_assert(fork.second == ForkStatus::available); forks.erase(it_fork); - return fork; + return std::move(fork.first); + } + + void reserve_fork(int64_t fork_id) noexcept { + if (const auto it = forks.find(fork_id); it != forks.end()) { + it->second.second = ForkStatus::reserved; + } + } + + void unreserve_fork(int64_t fork_id) noexcept { + if (const auto it = forks.find(fork_id); it != forks.end()) { + it->second.second = ForkStatus::available; + } } friend class start_fork_t; template friend class wait_fork_t; + friend class wait_queue_t; public: + WaitQueueContext wait_queue_context; + explicit ForkComponentContext(memory_resource::unsynchronized_pool_resource &memory_resource) noexcept - : forks(unordered_map>::allocator_type{memory_resource}) {} + : forks(unordered_map, ForkStatus>>::allocator_type{memory_resource}) + , wait_queue_context(memory_resource) {} static ForkComponentContext &get() noexcept; bool contains(int64_t fork_id) const noexcept { - return forks.contains(fork_id); + if (const auto it = forks.find(fork_id); it != forks.cend()) { + return it->second.second == ForkStatus::available; + } + return false; + } + + bool is_ready(int64_t fork_id) const noexcept { + if (const auto it = forks.find(fork_id); it != forks.cend()) { + return it->second.first.done(); + } + return false; } }; diff --git a/runtime-light/stdlib/fork/wait-queue-context.cpp b/runtime-light/stdlib/fork/wait-queue-context.cpp index 864764b8ba..bf805e07c1 100644 --- a/runtime-light/stdlib/fork/wait-queue-context.cpp +++ b/runtime-light/stdlib/fork/wait-queue-context.cpp @@ -5,6 +5,7 @@ #include "runtime-light/stdlib/fork/wait-queue-context.h" #include "runtime-light/component/component.h" +#include "runtime-light/coroutine/awaitable.h" #include "runtime-light/stdlib/fork/fork-context.h" WaitQueueContext &WaitQueueContext::get() noexcept { @@ -13,17 +14,15 @@ WaitQueueContext &WaitQueueContext::get() noexcept { int64_t WaitQueueContext::create_queue(const array> &fork_ids) noexcept { auto &memory_resource{get_component_context()->runtime_allocator.memory_resource}; - unordered_map> forks(unordered_map>::allocator_type{memory_resource}); - std::for_each(fork_ids.begin(), fork_ids.end(), [&forks](const auto &it) { + unordered_set forks_ids(unordered_set::allocator_type{memory_resource}); + std::for_each(fork_ids.begin(), fork_ids.end(), [&forks_ids](const auto &it) { Optional fork_id = it.get_value(); if (fork_id.has_value()) { - if (auto task = ForkComponentContext::get().pop_fork(fork_id.val()); task.has_value()) { - forks[fork_id.val()] = std::move(task.val()); - } + forks_ids.insert(fork_id.val()); } }); int64_t queue_id{++next_wait_queue_id}; - wait_queues.emplace(queue_id, WaitQueue(memory_resource, std::move(forks))); + wait_queues.emplace(queue_id, wait_queue_t(memory_resource, std::move(forks_ids))); php_debug("WaitQueueContext: create queue %ld with %ld forks", queue_id, fork_ids.size().size); return queue_id; } diff --git a/runtime-light/stdlib/fork/wait-queue-context.h b/runtime-light/stdlib/fork/wait-queue-context.h index 89c98dd708..f8485c6437 100644 --- a/runtime-light/stdlib/fork/wait-queue-context.h +++ b/runtime-light/stdlib/fork/wait-queue-context.h @@ -9,28 +9,31 @@ #include "runtime-core/core-types/decl/optional.h" #include "runtime-core/memory-resource/resource_allocator.h" #include "runtime-core/memory-resource/unsynchronized_pool_resource.h" -#include "runtime-light/stdlib/fork/wait-queue.h" #include "runtime-light/utils/concepts.h" +#include "runtime-light/stdlib/fork/wait-queue.h" class WaitQueueContext { template using unordered_map = memory_resource::stl::unordered_map; - unordered_map wait_queues; + template + using unordered_set = memory_resource::stl::unordered_set; + + unordered_map wait_queues; static constexpr auto WAIT_QUEUE_INIT_ID = 0; int64_t next_wait_queue_id{WAIT_QUEUE_INIT_ID}; public: explicit WaitQueueContext(memory_resource::unsynchronized_pool_resource &memory_resource) noexcept - : wait_queues(unordered_map::allocator_type{memory_resource}) {} + : wait_queues(unordered_map::allocator_type{memory_resource}) {} static WaitQueueContext &get() noexcept; int64_t create_queue(const array> &fork_ids) noexcept; - Optional get_queue(int64_t queue_id) noexcept { + Optional get_queue(int64_t queue_id) noexcept { if (auto it = wait_queues.find(queue_id); it != wait_queues.end()) { - return &it->second; + return std::addressof(it->second); } return {}; } diff --git a/runtime-light/stdlib/fork/wait-queue.cpp b/runtime-light/stdlib/fork/wait-queue.cpp index 0fb8b6e850..d53aa87fe7 100644 --- a/runtime-light/stdlib/fork/wait-queue.cpp +++ b/runtime-light/stdlib/fork/wait-queue.cpp @@ -4,15 +4,57 @@ #include "runtime-light/stdlib/fork/wait-queue.h" -#include "runtime-light/component/component.h" - -void WaitQueue::resume_awaited_handles_if_empty() { - if (forks.empty()) { - while (!awaited_handles.empty()) { - auto handle = awaited_handles.front(); - awaited_handles.pop_front(); - CoroutineScheduler::get().suspend({handle, WaitEvent::Rechedule{}}); - } +#include "runtime-light/stdlib/fork/fork-context.h" + +wait_queue_t::wait_queue_t(memory_resource::unsynchronized_pool_resource &memory_resource, unordered_set &&forks_ids_) noexcept + : forks_ids(std::move(forks_ids_)) + , awaiters(deque::allocator_type{memory_resource}) { + std::for_each(forks_ids.begin(), forks_ids.end(), [](int64_t fork_id) { ForkComponentContext::get().reserve_fork(fork_id); }); +} + +void wait_queue_t::push(int64_t fork_id) noexcept { + forks_ids.insert(fork_id); + ForkComponentContext::get().reserve_fork(fork_id); + if (!awaiters.empty()) { + auto &task = ForkComponentContext::get().forks[fork_id].first; + task_t::awaiter_t awaiter{std::addressof(task)}; + awaiter.await_suspend(awaiters.front().awaited_handle); } } +void wait_queue_t::insert_awaiter(awaiter_t awaiter) noexcept { + if (awaiters.empty()) { + std::for_each(forks_ids.begin(), forks_ids.end(), [&](int64_t fork_id) { + task_t::awaiter_t task_awaiter{std::addressof(ForkComponentContext::get().forks[fork_id].first)}; + task_awaiter.await_suspend(awaiter.awaited_handle); + }); + } + awaiters.push_back(awaiter); +} + +void wait_queue_t::erase_awaiter(awaiter_t awaiter) noexcept { + auto it = std::find(awaiters.begin(), awaiters.end(), awaiter); + if (it != awaiters.end()) { + awaiters.erase(it); + } + std::coroutine_handle<> next_awaiter = awaiters.empty() ? std::noop_coroutine() : awaiters.front().awaited_handle; + std::for_each(forks_ids.begin(), forks_ids.end(), [&](int64_t fork_id) { + task_t::awaiter_t task_awaiter{std::addressof(ForkComponentContext::get().forks[fork_id].first)}; + task_awaiter.await_suspend(next_awaiter); + }); +} + +int64_t wait_queue_t::pop_ready_fork() noexcept { + auto it = std::find_if(forks_ids.begin(), forks_ids.end(), [](int64_t fork_id) { return ForkComponentContext::get().is_ready(fork_id); }); + if (it == forks_ids.end()) { + php_critical_error("there is no fork to pop from queue"); + } + int64_t ready_fork = *it; + forks_ids.erase(it); + ForkComponentContext::get().unreserve_fork(ready_fork); + return ready_fork; +} + +bool wait_queue_t::has_ready_fork() const noexcept { + return std::any_of(forks_ids.begin(), forks_ids.end(), [](int64_t fork_id) { return ForkComponentContext::get().is_ready(fork_id); }); +} diff --git a/runtime-light/stdlib/fork/wait-queue.h b/runtime-light/stdlib/fork/wait-queue.h index 8116a5f633..3c6e71b445 100644 --- a/runtime-light/stdlib/fork/wait-queue.h +++ b/runtime-light/stdlib/fork/wait-queue.h @@ -4,93 +4,79 @@ #pragma once -#include #include -#include -#include "runtime-core/memory-resource/resource_allocator.h" -#include "runtime-core/memory-resource/unsynchronized_pool_resource.h" -#include "runtime-light/coroutine/task.h" -#include "runtime-light/stdlib/fork/fork.h" +#include "runtime-light/allocator/allocator.h" #include "runtime-light/utils/concepts.h" -class WaitQueue { - template - using unordered_map = memory_resource::stl::unordered_map; +class wait_queue_t { + template + using unordered_set = memory_resource::stl::unordered_set; template using deque = memory_resource::stl::deque; - unordered_map> forks; - unordered_map::awaiter_t> fork_awaiters; - deque> awaited_handles; - - void resume_awaited_handles_if_empty(); - public: - WaitQueue(const WaitQueue &) = delete; - WaitQueue &operator=(const WaitQueue &) = delete; - WaitQueue &operator=(WaitQueue &&) = delete; - - WaitQueue(WaitQueue &&other) noexcept - : forks(std::move(other.forks)) - , fork_awaiters(std::move(other.fork_awaiters)) - , awaited_handles(std::move(other.awaited_handles)) {} - - explicit WaitQueue(memory_resource::unsynchronized_pool_resource &memory_resource, unordered_map> &&forks_) noexcept - : forks(std::move(forks_)) - , fork_awaiters(unordered_map::awaiter_t>::allocator_type{memory_resource}) - , awaited_handles(deque>::allocator_type{memory_resource}) { - for (auto &fork : forks) { - fork_awaiters.emplace(fork.first, std::addressof(fork.second)); + struct awaiter_t { + wait_queue_t *wait_queue; + std::coroutine_handle<> awaited_handle; + explicit awaiter_t(wait_queue_t *wait_queue_) noexcept + : wait_queue(wait_queue_) + , awaited_handle(std::noop_coroutine()) {} + + bool await_ready() const noexcept { + return wait_queue->has_ready_fork(); } - } - void push_fork(int64_t fork_id, task_t &&fork) noexcept { - auto [fork_it, fork_success] = forks.emplace(fork_id, std::move(fork)); - auto [awaiter_id, awaiter_success] = fork_awaiters.emplace(fork_id, std::addressof(fork_it->second)); - if (!awaited_handles.empty()) { - awaiter_id->second.await_suspend(awaited_handles.front()); + void await_suspend(std::coroutine_handle<> coro) noexcept { + awaited_handle = coro; + wait_queue->insert_awaiter(*this); } - } - Optional>> pop_fork() noexcept { - auto it = std::find_if(forks.begin(), forks.end(), [](std::pair> &fork) { return fork.second.done(); }); - if (it != forks.end()) { - auto fork = std::move(*it); - forks.erase(fork.first); - fork_awaiters.erase(fork.first); - resume_awaited_handles_if_empty(); - return fork; - } else { - return {}; + int64_t await_resume() noexcept { + wait_queue->erase_awaiter(*this); + return wait_queue->pop_ready_fork(); } - } - void push_coro_handle(std::coroutine_handle<> coro) noexcept { - if (awaited_handles.empty()) { - std::for_each(fork_awaiters.begin(), fork_awaiters.end(), [coro](auto &awaiter) { awaiter.second.await_suspend(coro); }); + bool resumable() noexcept { + return wait_queue->has_ready_fork(); } - awaited_handles.push_back(coro); - } - void pop_coro_handle() noexcept { - if (!awaited_handles.empty()) { - awaited_handles.pop_front(); + void cancel() noexcept { + wait_queue->erase_awaiter(*this); } - std::coroutine_handle<> next_awaiter = awaited_handles.empty() ? std::noop_coroutine() : awaited_handles.front(); - std::for_each(fork_awaiters.begin(), fork_awaiters.end(), [&](auto &awaiter) { awaiter.second.await_suspend(next_awaiter); }); - } - bool has_ready_fork() const noexcept { - return std::any_of(forks.begin(), forks.end(), [](const auto &fork) { return fork.second.done(); }); - } + bool operator==(const awaiter_t & other) const { + return awaited_handle == other.awaited_handle; + } + }; + + wait_queue_t(const wait_queue_t &) = delete; + wait_queue_t &operator=(const wait_queue_t &) = delete; + wait_queue_t &operator=(wait_queue_t &&) = delete; + + wait_queue_t(wait_queue_t &&other) noexcept + : forks_ids(std::move(other.forks_ids)) + , awaiters(std::move(other.awaiters)) {} + + explicit wait_queue_t(memory_resource::unsynchronized_pool_resource &memory_resource, unordered_set &&forks_ids_) noexcept; - size_t size() const noexcept { - return forks.size(); + void push(int64_t fork_id) noexcept; + + awaiter_t pop() noexcept { + return awaiter_t{this}; } bool empty() const noexcept { - return forks.empty(); + return forks_ids.empty(); } + +private: + unordered_set forks_ids; + deque awaiters; + + void insert_awaiter(awaiter_t awaiter) noexcept; + void erase_awaiter(awaiter_t awaiter) noexcept; + int64_t pop_ready_fork() noexcept; + bool has_ready_fork() const noexcept; };