Skip to content

Commit

Permalink
add wait queue
Browse files Browse the repository at this point in the history
  • Loading branch information
astrophysik committed Aug 9, 2024
1 parent 08e4a29 commit e43b37a
Show file tree
Hide file tree
Showing 12 changed files with 271 additions and 13 deletions.
11 changes: 11 additions & 0 deletions builtin-functions/kphp-light/functions.txt
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ function get_hash_of_class (object $klass) ::: int;

function strlen ($str ::: string) ::: int;

function rand() ::: int;

// === Fork =======================================================================================

/** @kphp-extern-func-info interruptible cpp_template_call */
Expand All @@ -87,6 +89,15 @@ function sched_yield() ::: void;
/** @kphp-extern-func-info interruptible */
function sched_yield_sleep($timeout_ns ::: int) ::: void;

function wait_queue_create (array<future<any> | false> $request_ids = []) ::: future_queue<^1[*][*]>;

function wait_queue_push (future_queue<any> &$queue_id, future<any> | false $request_ids) ::: void;

function wait_queue_empty (future_queue<any> $queue_id) ::: bool;

/** @kphp-extern-func-info interruptible */
function wait_queue_next (future_queue<any> $queue_id, $timeout ::: float = -1.0) ::: future<^1[*]> | false;

// === Rpc ========================================================================================

/** @kphp-tl-class */
Expand Down
2 changes: 1 addition & 1 deletion runtime-core/memory-resource/resource_allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class resource_allocator {
}

static constexpr size_t max_value_type_size() {
return 128U;
return 256U;
}

friend inline bool operator==(const resource_allocator &lhs, const resource_allocator &rhs) noexcept {
Expand Down
44 changes: 42 additions & 2 deletions runtime-light/coroutine/awaitable.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,19 @@
#include <coroutine>
#include <cstdint>
#include <memory>

#include <utility>

#include "runtime-core/core-types/decl/optional.h"
#include "runtime-core/utils/kphp-assert-core.h"
#include "runtime-light/component/component.h"
#include "runtime-light/coroutine/task.h"
#include "runtime-light/stdlib/fork/fork-context.h"
#include "runtime-light/stdlib/fork/fork.h"
#include "runtime-light/header.h"
#include "runtime-light/scheduler/scheduler.h"
#include "runtime-light/stdlib/fork/fork-context.h"
#include "runtime-light/stdlib/fork/fork.h"
#include "runtime-light/utils/context.h"
#include "runtime-light/stdlib/fork/wait-queue-context.h"

template<class T>
concept Awaitable = requires(T && awaitable, std::coroutine_handle<> coro) {
Expand Down Expand Up @@ -199,3 +201,41 @@ class wait_fork_t {
}
}
};

// ================================================================================================

class wait_queue_next_t {
int64_t queue_id;
wait_for_timer_t timer_awaiter;

public:
explicit wait_queue_next_t(int64_t queue_id_, std::chrono::nanoseconds timeout_) noexcept
: queue_id(queue_id_)
, timer_awaiter(timeout_) {}

bool await_ready() const noexcept {
if (auto queue = WaitQueueContext::get().get_queue(queue_id); queue.has_value()) {
return queue.val()->empty() || queue.val()->has_ready_fork();
}
return true;
}

void await_suspend(std::coroutine_handle<> coro) noexcept {
auto queue = WaitQueueContext::get().get_queue(queue_id);
queue.val()->subscribe_coro_handle(coro);
timer_awaiter.await_suspend(coro);
}

Optional<int64_t> await_resume() noexcept {
auto queue = WaitQueueContext::get().get_queue(queue_id);
queue.val()->move_coro_handle();
auto ready_fork = queue.val()->pop();
if (ready_fork.has_value()) {
timer_awaiter.cancel();
// todo set here info about prev fork_id
return ForkComponentContext::get().push_fork(std::move(ready_fork.val().second));
} else {
return {};
}
}
};
33 changes: 33 additions & 0 deletions runtime-light/stdlib/fork/fork-api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "runtime-core/utils/kphp-assert-core.h"
#include "runtime-light/coroutine/awaitable.h"
#include "runtime-light/coroutine/task.h"
#include "runtime-light/stdlib/fork/wait-queue-context.h"

task_t<void> f$sched_yield() noexcept {
co_await wait_for_reschedule_t{};
Expand All @@ -22,3 +23,35 @@ task_t<void> f$sched_yield_sleep(int64_t duration_ns) noexcept {
}
co_await wait_for_timer_t{std::chrono::nanoseconds{static_cast<uint64_t>(duration_ns)}};
}

int64_t f$wait_queue_create(array<Optional<int64_t>> fork_ids) noexcept {
return WaitQueueContext::get().create_queue(fork_ids);
}

void f$wait_queue_push(int64_t queue_id, Optional<int64_t> fork_id) noexcept {
if (auto queue = WaitQueueContext::get().get_queue(queue_id); queue.has_value() && fork_id.has_value()) {
auto task = ForkComponentContext::get().pop_fork(fork_id.val());
if (task.has_value()) {
php_debug("push fork %ld, to queue %ld", fork_id.val(), queue_id);
queue.val()->push(fork_id.val(), std::move(task.val()));
}
}
}

bool f$wait_queue_empty(int64_t queue_id) noexcept {
if (auto queue = WaitQueueContext::get().get_queue(queue_id); queue.has_value()) {
return queue.val()->empty();
}
return false;
}

task_t<Optional<int64_t>> f$wait_queue_next(int64_t queue_id, double timeout) noexcept {
if (WaitQueueContext::get().get_queue(queue_id).has_value()) {
if (timeout < 0.0) {
timeout = fork_api_impl_::WAIT_FORK_MAX_TIMEOUT;
}
const auto timeout_ns{std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::duration<double>{timeout})};
co_return co_await wait_queue_next_t{queue_id, timeout_ns};
}
co_return Optional<int64_t>{};
}
11 changes: 9 additions & 2 deletions runtime-light/stdlib/fork/fork-api.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#include "runtime-core/utils/kphp-assert-core.h"
#include "runtime-light/coroutine/awaitable.h"
#include "runtime-light/coroutine/task.h"
#include "runtime-light/stdlib/fork/fork-context.h"

namespace fork_api_impl_ {

Expand All @@ -32,7 +31,7 @@ requires(is_optional<T>::value) task_t<T> f$wait(int64_t fork_id, double timeout
co_return T{};
}
const auto timeout_ns{std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::duration<double>{timeout})};
co_return co_await wait_fork_t<internal_optional_type_t<T>>{*std::move(task_opt), timeout_ns};
co_return co_await wait_fork_t<internal_optional_type_t<T>>{std::move(task_opt.val()), timeout_ns};
}

template<typename T>
Expand All @@ -43,3 +42,11 @@ requires(is_optional<T>::value) task_t<T> f$wait(Optional<int64_t> fork_id_opt,
task_t<void> f$sched_yield() noexcept;

task_t<void> f$sched_yield_sleep(int64_t duration_ns) noexcept;

int64_t f$wait_queue_create(array<Optional<int64_t>> fork_ids) noexcept;

void f$wait_queue_push(int64_t queue, Optional<int64_t> fork_id) noexcept;

bool f$wait_queue_empty(int64_t queue_id) noexcept;

task_t<Optional<int64_t>> f$wait_queue_next(int64_t queue, double timeout = -1.0) noexcept;
10 changes: 7 additions & 3 deletions runtime-light/stdlib/fork/fork-context.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,24 @@
#include "runtime-core/utils/kphp-assert-core.h"
#include "runtime-light/coroutine/task.h"
#include "runtime-light/stdlib/fork/fork.h"
#include "runtime-light/stdlib/fork/wait-queue-context.h"
#include "runtime-light/utils/concepts.h"

class ForkComponentContext {
friend WaitQueueContext;
template<hashable Key, typename Value>
using unordered_map = memory_resource::stl::unordered_map<Key, Value, memory_resource::unsynchronized_pool_resource>;

static constexpr auto FORK_ID_INIT = 1;

WaitQueueContext wait_queue_context;
unordered_map<int64_t, task_t<fork_result>> forks_;
int64_t next_fork_id_{FORK_ID_INIT};

public:
explicit ForkComponentContext(memory_resource::unsynchronized_pool_resource &memory_resource) noexcept
: forks_(unordered_map<int64_t, task_t<fork_result>>::allocator_type{memory_resource}) {}
: wait_queue_context(memory_resource)
, forks_(unordered_map<int64_t, task_t<fork_result>>::allocator_type{memory_resource}) {}

static ForkComponentContext &get() noexcept;

Expand All @@ -35,13 +39,13 @@ class ForkComponentContext {
return fork_id;
}

std::optional<task_t<fork_result>> pop_fork(int64_t fork_id) noexcept {
Optional<task_t<fork_result>> pop_fork(int64_t fork_id) noexcept {
if (const auto it_fork{forks_.find(fork_id)}; it_fork != forks_.cend()) {
php_debug("ForkComponentContext: pop fork %" PRId64, fork_id);
auto fork{std::move(it_fork->second)};
forks_.erase(it_fork);
return {std::move(fork)};
}
return std::nullopt;
return {};
}
};
29 changes: 29 additions & 0 deletions runtime-light/stdlib/fork/wait-queue-context.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Compiler for PHP (aka KPHP)
// Copyright (c) 2024 LLC «V Kontakte»
// Distributed under the GPL v3 License, see LICENSE.notice.txt

#include "runtime-light/stdlib/fork/wait-queue-context.h"

#include "runtime-light/component/component.h"
#include "runtime-light/stdlib/fork/fork-context.h"

WaitQueueContext &WaitQueueContext::get() noexcept {
return ForkComponentContext::get().wait_queue_context;
}

int64_t WaitQueueContext::create_queue(const array<Optional<int64_t>> &fork_ids) noexcept {
auto &memory_resource{get_component_context()->runtime_allocator.memory_resource};
unordered_map<int64_t, task_t<fork_result>> forks(unordered_map<int64_t, task_t<fork_result>>::allocator_type{memory_resource});
std::for_each(fork_ids.begin(), fork_ids.end(), [&forks](const auto &it) {
Optional<int64_t> fork_id = it.get_value();
if (fork_id.has_value()) {
if (auto task = ForkComponentContext::get().pop_fork(fork_id.val()); task.has_value()) {
forks[fork_id.val()] = std::move(task.val());
}
}
});
int64_t queue_id{++next_wait_queue_id};
wait_queues.emplace(queue_id, WaitQueue(memory_resource, std::move(forks)));
php_debug("WaitQueueContext: create queue %ld with %ld forks", queue_id, fork_ids.size().size);
return queue_id;
}
37 changes: 37 additions & 0 deletions runtime-light/stdlib/fork/wait-queue-context.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Compiler for PHP (aka KPHP)
// Copyright (c) 2024 LLC «V Kontakte»
// Distributed under the GPL v3 License, see LICENSE.notice.txt

#pragma once

#include <cstdint>

#include "runtime-core/core-types/decl/optional.h"
#include "runtime-core/memory-resource/resource_allocator.h"
#include "runtime-core/memory-resource/unsynchronized_pool_resource.h"
#include "runtime-light/stdlib/fork/wait-queue.h"
#include "runtime-light/utils/concepts.h"

class WaitQueueContext {
template<hashable Key, typename Value>
using unordered_map = memory_resource::stl::unordered_map<Key, Value, memory_resource::unsynchronized_pool_resource>;

unordered_map<int64_t, WaitQueue> wait_queues;
static constexpr auto WAIT_QUEUE_INIT_ID = 0;
int64_t next_wait_queue_id{WAIT_QUEUE_INIT_ID};

public:
explicit WaitQueueContext(memory_resource::unsynchronized_pool_resource &memory_resource) noexcept
: wait_queues(unordered_map<int64_t, WaitQueue>::allocator_type{memory_resource}) {}

static WaitQueueContext &get() noexcept;

int64_t create_queue(const array<Optional<int64_t>> &fork_ids) noexcept;

Optional<WaitQueue *> get_queue(int64_t queue_id) noexcept {
if (auto it = wait_queues.find(queue_id); it != wait_queues.end()) {
return &it->second;
}
return {};
}
};
95 changes: 95 additions & 0 deletions runtime-light/stdlib/fork/wait-queue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Compiler for PHP (aka KPHP)
// Copyright (c) 2024 LLC «V Kontakte»
// Distributed under the GPL v3 License, see LICENSE.notice.txt

#pragma once

#include <algorithm>
#include <coroutine>
#include <cstdint>

#include "runtime-core/memory-resource/resource_allocator.h"
#include "runtime-core/memory-resource/unsynchronized_pool_resource.h"
#include "runtime-light/coroutine/task.h"
#include "runtime-light/stdlib/fork/fork.h"
#include "runtime-light/utils/concepts.h"

class WaitQueue {
template<hashable Key, typename Value>
using unordered_map = memory_resource::stl::unordered_map<Key, Value, memory_resource::unsynchronized_pool_resource>;

template<typename T>
using deque = memory_resource::stl::deque<T, memory_resource::unsynchronized_pool_resource>;

unordered_map<int64_t, task_t<fork_result>> forks;
unordered_map<int64_t, task_t<fork_result>::awaiter_t> fork_awaiters;
deque<std::coroutine_handle<>> awaited_handles;

public:
WaitQueue(const WaitQueue &) = delete;
WaitQueue &operator=(const WaitQueue &) = delete;

WaitQueue(WaitQueue &&other) noexcept
: forks(std::move(other.forks))
, fork_awaiters(std::move(other.fork_awaiters))
, awaited_handles(std::move(other.awaited_handles)) {}

explicit WaitQueue(memory_resource::unsynchronized_pool_resource &memory_resource, unordered_map<int64_t, task_t<fork_result>> &&forks_) noexcept
: forks(std::move(forks_))
, fork_awaiters(unordered_map<int64_t, task_t<fork_result>::awaiter_t>::allocator_type{memory_resource})
, awaited_handles(deque<std::coroutine_handle<>>::allocator_type{memory_resource}) {
for (auto &fork : forks) {
fork_awaiters.emplace(fork.first, std::addressof(fork.second));
}
}

void push(int64_t fork_id, task_t<fork_result> &&fork) noexcept {
auto [fork_it, fork_success] = forks.emplace(fork_id, std::move(fork));
auto [awaiter_id, awaiter_success] = fork_awaiters.emplace(fork_id, std::addressof(fork_it->second));
if (!awaited_handles.empty()) {
awaiter_id->second.await_suspend(awaited_handles.front());
}
}

Optional<std::pair<int64_t, task_t<fork_result>>> pop() noexcept {
auto it = std::find_if(forks.begin(), forks.end(), [](std::pair<const int64_t, task_t<fork_result>> &fork) { return fork.second.done(); });
if (it != forks.end()) {
auto fork = std::move(*it);
forks.erase(fork.first);
fork_awaiters.erase(fork.first);
return fork;
} else {
return {};
}
}

void subscribe_coro_handle(std::coroutine_handle<> coro) noexcept {
if (awaited_handles.empty()) {
std::for_each(fork_awaiters.begin(), fork_awaiters.end(), [coro](auto &awaiter) { awaiter.second.await_suspend(coro); });
}
awaited_handles.push_back(coro);
}

void move_coro_handle() noexcept {
if (!awaited_handles.empty()) {
awaited_handles.pop_front();
}
std::coroutine_handle<> next_awaiter = awaited_handles.empty() ? std::noop_coroutine() : awaited_handles.front();
std::for_each(fork_awaiters.begin(), fork_awaiters.end(),
[&](auto &awaiter) { awaiter.second.await_suspend(next_awaiter);});
}

bool has_ready_fork() const noexcept {
return std::any_of(forks.begin(), forks.end(), [](const auto &fork) {
return fork.second.done();
});
}

size_t size() const noexcept {
return forks.size();
}

bool empty() const noexcept {
return forks.empty();
}
};
3 changes: 2 additions & 1 deletion runtime-light/stdlib/interface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

int64_t f$rand() {
std::random_device rd;
int64_t dice_roll = rd();
std::uniform_int_distribution<> dis(0, rd.max());
int64_t dice_roll = dis(rd);
return dice_roll;
}
1 change: 1 addition & 0 deletions runtime-light/stdlib/stdlib.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ prepend(
superglobals.cpp
fork/fork-api.cpp
fork/fork-context.cpp
fork/wait-queue-context.cpp
rpc/rpc-api.cpp
rpc/rpc-context.cpp
rpc/rpc-extra-headers.cpp
Expand Down
Loading

0 comments on commit e43b37a

Please sign in to comment.