From e43b37a70481ebebe2487cc9e6a9886b35c16973 Mon Sep 17 00:00:00 2001 From: Vadim Sadokhov Date: Thu, 8 Aug 2024 17:00:08 +0300 Subject: [PATCH] add wait queue --- builtin-functions/kphp-light/functions.txt | 11 +++ .../memory-resource/resource_allocator.h | 2 +- runtime-light/coroutine/awaitable.h | 44 ++++++++- runtime-light/stdlib/fork/fork-api.cpp | 33 +++++++ runtime-light/stdlib/fork/fork-api.h | 11 ++- runtime-light/stdlib/fork/fork-context.h | 10 +- .../stdlib/fork/wait-queue-context.cpp | 29 ++++++ .../stdlib/fork/wait-queue-context.h | 37 ++++++++ runtime-light/stdlib/fork/wait-queue.h | 95 +++++++++++++++++++ runtime-light/stdlib/interface.cpp | 3 +- runtime-light/stdlib/stdlib.cmake | 1 + tests/phpt/fork/001_basic.php | 8 +- 12 files changed, 271 insertions(+), 13 deletions(-) create mode 100644 runtime-light/stdlib/fork/wait-queue-context.cpp create mode 100644 runtime-light/stdlib/fork/wait-queue-context.h create mode 100644 runtime-light/stdlib/fork/wait-queue.h diff --git a/builtin-functions/kphp-light/functions.txt b/builtin-functions/kphp-light/functions.txt index c9f5980d70..ba7c6b58e7 100644 --- a/builtin-functions/kphp-light/functions.txt +++ b/builtin-functions/kphp-light/functions.txt @@ -76,6 +76,8 @@ function get_hash_of_class (object $klass) ::: int; function strlen ($str ::: string) ::: int; +function rand() ::: int; + // === Fork ======================================================================================= /** @kphp-extern-func-info interruptible cpp_template_call */ @@ -87,6 +89,15 @@ function sched_yield() ::: void; /** @kphp-extern-func-info interruptible */ function sched_yield_sleep($timeout_ns ::: int) ::: void; +function wait_queue_create (array | false> $request_ids = []) ::: future_queue<^1[*][*]>; + +function wait_queue_push (future_queue &$queue_id, future | false $request_ids) ::: void; + +function wait_queue_empty (future_queue $queue_id) ::: bool; + +/** @kphp-extern-func-info interruptible */ +function wait_queue_next (future_queue $queue_id, $timeout ::: float = -1.0) ::: future<^1[*]> | false; + // === Rpc ======================================================================================== /** @kphp-tl-class */ diff --git a/runtime-core/memory-resource/resource_allocator.h b/runtime-core/memory-resource/resource_allocator.h index eca8f3b1a1..ce4b341960 100644 --- a/runtime-core/memory-resource/resource_allocator.h +++ b/runtime-core/memory-resource/resource_allocator.h @@ -46,7 +46,7 @@ class resource_allocator { } static constexpr size_t max_value_type_size() { - return 128U; + return 256U; } friend inline bool operator==(const resource_allocator &lhs, const resource_allocator &rhs) noexcept { diff --git a/runtime-light/coroutine/awaitable.h b/runtime-light/coroutine/awaitable.h index f468796117..99f993c5af 100644 --- a/runtime-light/coroutine/awaitable.h +++ b/runtime-light/coroutine/awaitable.h @@ -9,17 +9,19 @@ #include #include #include + #include #include "runtime-core/core-types/decl/optional.h" #include "runtime-core/utils/kphp-assert-core.h" #include "runtime-light/component/component.h" #include "runtime-light/coroutine/task.h" -#include "runtime-light/stdlib/fork/fork-context.h" -#include "runtime-light/stdlib/fork/fork.h" #include "runtime-light/header.h" #include "runtime-light/scheduler/scheduler.h" +#include "runtime-light/stdlib/fork/fork-context.h" +#include "runtime-light/stdlib/fork/fork.h" #include "runtime-light/utils/context.h" +#include "runtime-light/stdlib/fork/wait-queue-context.h" template concept Awaitable = requires(T && awaitable, std::coroutine_handle<> coro) { @@ -199,3 +201,41 @@ class wait_fork_t { } } }; + +// ================================================================================================ + +class wait_queue_next_t { + int64_t queue_id; + wait_for_timer_t timer_awaiter; + +public: + explicit wait_queue_next_t(int64_t queue_id_, std::chrono::nanoseconds timeout_) noexcept + : queue_id(queue_id_) + , timer_awaiter(timeout_) {} + + bool await_ready() const noexcept { + if (auto queue = WaitQueueContext::get().get_queue(queue_id); queue.has_value()) { + return queue.val()->empty() || queue.val()->has_ready_fork(); + } + return true; + } + + void await_suspend(std::coroutine_handle<> coro) noexcept { + auto queue = WaitQueueContext::get().get_queue(queue_id); + queue.val()->subscribe_coro_handle(coro); + timer_awaiter.await_suspend(coro); + } + + Optional await_resume() noexcept { + auto queue = WaitQueueContext::get().get_queue(queue_id); + queue.val()->move_coro_handle(); + auto ready_fork = queue.val()->pop(); + if (ready_fork.has_value()) { + timer_awaiter.cancel(); + // todo set here info about prev fork_id + return ForkComponentContext::get().push_fork(std::move(ready_fork.val().second)); + } else { + return {}; + } + } +}; diff --git a/runtime-light/stdlib/fork/fork-api.cpp b/runtime-light/stdlib/fork/fork-api.cpp index b8e68746ce..2d0b877bed 100644 --- a/runtime-light/stdlib/fork/fork-api.cpp +++ b/runtime-light/stdlib/fork/fork-api.cpp @@ -10,6 +10,7 @@ #include "runtime-core/utils/kphp-assert-core.h" #include "runtime-light/coroutine/awaitable.h" #include "runtime-light/coroutine/task.h" +#include "runtime-light/stdlib/fork/wait-queue-context.h" task_t f$sched_yield() noexcept { co_await wait_for_reschedule_t{}; @@ -22,3 +23,35 @@ task_t f$sched_yield_sleep(int64_t duration_ns) noexcept { } co_await wait_for_timer_t{std::chrono::nanoseconds{static_cast(duration_ns)}}; } + +int64_t f$wait_queue_create(array> fork_ids) noexcept { + return WaitQueueContext::get().create_queue(fork_ids); +} + +void f$wait_queue_push(int64_t queue_id, Optional fork_id) noexcept { + if (auto queue = WaitQueueContext::get().get_queue(queue_id); queue.has_value() && fork_id.has_value()) { + auto task = ForkComponentContext::get().pop_fork(fork_id.val()); + if (task.has_value()) { + php_debug("push fork %ld, to queue %ld", fork_id.val(), queue_id); + queue.val()->push(fork_id.val(), std::move(task.val())); + } + } +} + +bool f$wait_queue_empty(int64_t queue_id) noexcept { + if (auto queue = WaitQueueContext::get().get_queue(queue_id); queue.has_value()) { + return queue.val()->empty(); + } + return false; +} + +task_t> f$wait_queue_next(int64_t queue_id, double timeout) noexcept { + if (WaitQueueContext::get().get_queue(queue_id).has_value()) { + if (timeout < 0.0) { + timeout = fork_api_impl_::WAIT_FORK_MAX_TIMEOUT; + } + const auto timeout_ns{std::chrono::duration_cast(std::chrono::duration{timeout})}; + co_return co_await wait_queue_next_t{queue_id, timeout_ns}; + } + co_return Optional{}; +} diff --git a/runtime-light/stdlib/fork/fork-api.h b/runtime-light/stdlib/fork/fork-api.h index 9ccb268e64..43c34fa46b 100644 --- a/runtime-light/stdlib/fork/fork-api.h +++ b/runtime-light/stdlib/fork/fork-api.h @@ -11,7 +11,6 @@ #include "runtime-core/utils/kphp-assert-core.h" #include "runtime-light/coroutine/awaitable.h" #include "runtime-light/coroutine/task.h" -#include "runtime-light/stdlib/fork/fork-context.h" namespace fork_api_impl_ { @@ -32,7 +31,7 @@ requires(is_optional::value) task_t f$wait(int64_t fork_id, double timeout co_return T{}; } const auto timeout_ns{std::chrono::duration_cast(std::chrono::duration{timeout})}; - co_return co_await wait_fork_t>{*std::move(task_opt), timeout_ns}; + co_return co_await wait_fork_t>{std::move(task_opt.val()), timeout_ns}; } template @@ -43,3 +42,11 @@ requires(is_optional::value) task_t f$wait(Optional fork_id_opt, task_t f$sched_yield() noexcept; task_t f$sched_yield_sleep(int64_t duration_ns) noexcept; + +int64_t f$wait_queue_create(array> fork_ids) noexcept; + +void f$wait_queue_push(int64_t queue, Optional fork_id) noexcept; + +bool f$wait_queue_empty(int64_t queue_id) noexcept; + +task_t> f$wait_queue_next(int64_t queue, double timeout = -1.0) noexcept; diff --git a/runtime-light/stdlib/fork/fork-context.h b/runtime-light/stdlib/fork/fork-context.h index 79bd0f6f24..fbc8ef224b 100644 --- a/runtime-light/stdlib/fork/fork-context.h +++ b/runtime-light/stdlib/fork/fork-context.h @@ -11,20 +11,24 @@ #include "runtime-core/utils/kphp-assert-core.h" #include "runtime-light/coroutine/task.h" #include "runtime-light/stdlib/fork/fork.h" +#include "runtime-light/stdlib/fork/wait-queue-context.h" #include "runtime-light/utils/concepts.h" class ForkComponentContext { + friend WaitQueueContext; template using unordered_map = memory_resource::stl::unordered_map; static constexpr auto FORK_ID_INIT = 1; + WaitQueueContext wait_queue_context; unordered_map> forks_; int64_t next_fork_id_{FORK_ID_INIT}; public: explicit ForkComponentContext(memory_resource::unsynchronized_pool_resource &memory_resource) noexcept - : forks_(unordered_map>::allocator_type{memory_resource}) {} + : wait_queue_context(memory_resource) + , forks_(unordered_map>::allocator_type{memory_resource}) {} static ForkComponentContext &get() noexcept; @@ -35,13 +39,13 @@ class ForkComponentContext { return fork_id; } - std::optional> pop_fork(int64_t fork_id) noexcept { + Optional> pop_fork(int64_t fork_id) noexcept { if (const auto it_fork{forks_.find(fork_id)}; it_fork != forks_.cend()) { php_debug("ForkComponentContext: pop fork %" PRId64, fork_id); auto fork{std::move(it_fork->second)}; forks_.erase(it_fork); return {std::move(fork)}; } - return std::nullopt; + return {}; } }; diff --git a/runtime-light/stdlib/fork/wait-queue-context.cpp b/runtime-light/stdlib/fork/wait-queue-context.cpp new file mode 100644 index 0000000000..864764b8ba --- /dev/null +++ b/runtime-light/stdlib/fork/wait-queue-context.cpp @@ -0,0 +1,29 @@ +// Compiler for PHP (aka KPHP) +// Copyright (c) 2024 LLC «V Kontakte» +// Distributed under the GPL v3 License, see LICENSE.notice.txt + +#include "runtime-light/stdlib/fork/wait-queue-context.h" + +#include "runtime-light/component/component.h" +#include "runtime-light/stdlib/fork/fork-context.h" + +WaitQueueContext &WaitQueueContext::get() noexcept { + return ForkComponentContext::get().wait_queue_context; +} + +int64_t WaitQueueContext::create_queue(const array> &fork_ids) noexcept { + auto &memory_resource{get_component_context()->runtime_allocator.memory_resource}; + unordered_map> forks(unordered_map>::allocator_type{memory_resource}); + std::for_each(fork_ids.begin(), fork_ids.end(), [&forks](const auto &it) { + Optional fork_id = it.get_value(); + if (fork_id.has_value()) { + if (auto task = ForkComponentContext::get().pop_fork(fork_id.val()); task.has_value()) { + forks[fork_id.val()] = std::move(task.val()); + } + } + }); + int64_t queue_id{++next_wait_queue_id}; + wait_queues.emplace(queue_id, WaitQueue(memory_resource, std::move(forks))); + php_debug("WaitQueueContext: create queue %ld with %ld forks", queue_id, fork_ids.size().size); + return queue_id; +} diff --git a/runtime-light/stdlib/fork/wait-queue-context.h b/runtime-light/stdlib/fork/wait-queue-context.h new file mode 100644 index 0000000000..89c98dd708 --- /dev/null +++ b/runtime-light/stdlib/fork/wait-queue-context.h @@ -0,0 +1,37 @@ +// Compiler for PHP (aka KPHP) +// Copyright (c) 2024 LLC «V Kontakte» +// Distributed under the GPL v3 License, see LICENSE.notice.txt + +#pragma once + +#include + +#include "runtime-core/core-types/decl/optional.h" +#include "runtime-core/memory-resource/resource_allocator.h" +#include "runtime-core/memory-resource/unsynchronized_pool_resource.h" +#include "runtime-light/stdlib/fork/wait-queue.h" +#include "runtime-light/utils/concepts.h" + +class WaitQueueContext { + template + using unordered_map = memory_resource::stl::unordered_map; + + unordered_map wait_queues; + static constexpr auto WAIT_QUEUE_INIT_ID = 0; + int64_t next_wait_queue_id{WAIT_QUEUE_INIT_ID}; + +public: + explicit WaitQueueContext(memory_resource::unsynchronized_pool_resource &memory_resource) noexcept + : wait_queues(unordered_map::allocator_type{memory_resource}) {} + + static WaitQueueContext &get() noexcept; + + int64_t create_queue(const array> &fork_ids) noexcept; + + Optional get_queue(int64_t queue_id) noexcept { + if (auto it = wait_queues.find(queue_id); it != wait_queues.end()) { + return &it->second; + } + return {}; + } +}; diff --git a/runtime-light/stdlib/fork/wait-queue.h b/runtime-light/stdlib/fork/wait-queue.h new file mode 100644 index 0000000000..553cefd012 --- /dev/null +++ b/runtime-light/stdlib/fork/wait-queue.h @@ -0,0 +1,95 @@ +// Compiler for PHP (aka KPHP) +// Copyright (c) 2024 LLC «V Kontakte» +// Distributed under the GPL v3 License, see LICENSE.notice.txt + +#pragma once + +#include +#include +#include + +#include "runtime-core/memory-resource/resource_allocator.h" +#include "runtime-core/memory-resource/unsynchronized_pool_resource.h" +#include "runtime-light/coroutine/task.h" +#include "runtime-light/stdlib/fork/fork.h" +#include "runtime-light/utils/concepts.h" + +class WaitQueue { + template + using unordered_map = memory_resource::stl::unordered_map; + + template + using deque = memory_resource::stl::deque; + + unordered_map> forks; + unordered_map::awaiter_t> fork_awaiters; + deque> awaited_handles; + +public: + WaitQueue(const WaitQueue &) = delete; + WaitQueue &operator=(const WaitQueue &) = delete; + + WaitQueue(WaitQueue &&other) noexcept + : forks(std::move(other.forks)) + , fork_awaiters(std::move(other.fork_awaiters)) + , awaited_handles(std::move(other.awaited_handles)) {} + + explicit WaitQueue(memory_resource::unsynchronized_pool_resource &memory_resource, unordered_map> &&forks_) noexcept + : forks(std::move(forks_)) + , fork_awaiters(unordered_map::awaiter_t>::allocator_type{memory_resource}) + , awaited_handles(deque>::allocator_type{memory_resource}) { + for (auto &fork : forks) { + fork_awaiters.emplace(fork.first, std::addressof(fork.second)); + } + } + + void push(int64_t fork_id, task_t &&fork) noexcept { + auto [fork_it, fork_success] = forks.emplace(fork_id, std::move(fork)); + auto [awaiter_id, awaiter_success] = fork_awaiters.emplace(fork_id, std::addressof(fork_it->second)); + if (!awaited_handles.empty()) { + awaiter_id->second.await_suspend(awaited_handles.front()); + } + } + + Optional>> pop() noexcept { + auto it = std::find_if(forks.begin(), forks.end(), [](std::pair> &fork) { return fork.second.done(); }); + if (it != forks.end()) { + auto fork = std::move(*it); + forks.erase(fork.first); + fork_awaiters.erase(fork.first); + return fork; + } else { + return {}; + } + } + + void subscribe_coro_handle(std::coroutine_handle<> coro) noexcept { + if (awaited_handles.empty()) { + std::for_each(fork_awaiters.begin(), fork_awaiters.end(), [coro](auto &awaiter) { awaiter.second.await_suspend(coro); }); + } + awaited_handles.push_back(coro); + } + + void move_coro_handle() noexcept { + if (!awaited_handles.empty()) { + awaited_handles.pop_front(); + } + std::coroutine_handle<> next_awaiter = awaited_handles.empty() ? std::noop_coroutine() : awaited_handles.front(); + std::for_each(fork_awaiters.begin(), fork_awaiters.end(), + [&](auto &awaiter) { awaiter.second.await_suspend(next_awaiter);}); + } + + bool has_ready_fork() const noexcept { + return std::any_of(forks.begin(), forks.end(), [](const auto &fork) { + return fork.second.done(); + }); + } + + size_t size() const noexcept { + return forks.size(); + } + + bool empty() const noexcept { + return forks.empty(); + } +}; diff --git a/runtime-light/stdlib/interface.cpp b/runtime-light/stdlib/interface.cpp index f99c7ad954..c1e07d1c38 100644 --- a/runtime-light/stdlib/interface.cpp +++ b/runtime-light/stdlib/interface.cpp @@ -14,6 +14,7 @@ int64_t f$rand() { std::random_device rd; - int64_t dice_roll = rd(); + std::uniform_int_distribution<> dis(0, rd.max()); + int64_t dice_roll = dis(rd); return dice_roll; } diff --git a/runtime-light/stdlib/stdlib.cmake b/runtime-light/stdlib/stdlib.cmake index d7956932a3..28d4195c2b 100644 --- a/runtime-light/stdlib/stdlib.cmake +++ b/runtime-light/stdlib/stdlib.cmake @@ -9,6 +9,7 @@ prepend( superglobals.cpp fork/fork-api.cpp fork/fork-context.cpp + fork/wait-queue-context.cpp rpc/rpc-api.cpp rpc/rpc-context.cpp rpc/rpc-extra-headers.cpp diff --git a/tests/phpt/fork/001_basic.php b/tests/phpt/fork/001_basic.php index 65e245dc70..546c67bae7 100644 --- a/tests/phpt/fork/001_basic.php +++ b/tests/phpt/fork/001_basic.php @@ -1,4 +1,4 @@ -@ok k2_skip +@ok