Skip to content

Commit

Permalink
version 2
Browse files Browse the repository at this point in the history
  • Loading branch information
astrophysik committed Aug 14, 2024
1 parent 345fffa commit 9eebcb3
Show file tree
Hide file tree
Showing 8 changed files with 173 additions and 160 deletions.
39 changes: 0 additions & 39 deletions runtime-light/coroutine/awaitable.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include "runtime-light/stdlib/fork/fork-context.h"
#include "runtime-light/stdlib/fork/fork.h"
#include "runtime-light/utils/context.h"
#include "runtime-light/stdlib/fork/wait-queue-context.h"

template<class T>
concept Awaitable = requires(T awaitable, std::coroutine_handle<> coro) {
Expand Down Expand Up @@ -435,43 +434,5 @@ class wait_with_timeout_t {
}
};

// ================================================================================================

class wait_queue_next_t {
int64_t queue_id;
wait_for_timer_t timer_awaiter;

public:
explicit wait_queue_next_t(int64_t queue_id_, std::chrono::nanoseconds timeout_) noexcept
: queue_id(queue_id_)
, timer_awaiter(timeout_) {}

bool await_ready() const noexcept {
if (auto queue = WaitQueueContext::get().get_queue(queue_id); queue.has_value()) {
return queue.val()->empty() || queue.val()->has_ready_fork();
}
return true;
}

void await_suspend(std::coroutine_handle<> coro) noexcept {
auto queue = WaitQueueContext::get().get_queue(queue_id);
queue.val()->push_coro_handle(coro);
timer_awaiter.await_suspend(coro);
}

Optional<int64_t> await_resume() noexcept {
auto queue = WaitQueueContext::get().get_queue(queue_id);
queue.val()->pop_coro_handle();
auto ready_fork = queue.val()->pop_fork();
if (ready_fork.has_value()) {
timer_awaiter.cancel();
// todo set here info about prev fork_id
return ForkComponentContext::get().push_fork(std::move(ready_fork.val().second));
} else {
return {};
}
}
};

template<class T>
wait_with_timeout_t(T &&, std::chrono::nanoseconds) -> wait_with_timeout_t<T>;
37 changes: 9 additions & 28 deletions runtime-light/stdlib/fork/fork-api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,34 +24,15 @@ task_t<void> f$sched_yield_sleep(int64_t duration_ns) noexcept {
co_await wait_for_timer_t{std::chrono::nanoseconds{static_cast<uint64_t>(duration_ns)}};
}

int64_t f$wait_queue_create(array<Optional<int64_t>> fork_ids) noexcept {
return WaitQueueContext::get().create_queue(fork_ids);
}

void f$wait_queue_push(int64_t queue_id, Optional<int64_t> fork_id) noexcept {
if (auto queue = WaitQueueContext::get().get_queue(queue_id); queue.has_value() && fork_id.has_value()) {
auto task = ForkComponentContext::get().pop_fork(fork_id.val());
if (task.has_value()) {
php_debug("push fork %ld, to queue %ld", fork_id.val(), queue_id);
queue.val()->push_fork(fork_id.val(), std::move(task.val()));
}
}
}

bool f$wait_queue_empty(int64_t queue_id) noexcept {
if (auto queue = WaitQueueContext::get().get_queue(queue_id); queue.has_value()) {
return queue.val()->empty();
}
return false;
}

task_t<Optional<int64_t>> f$wait_queue_next(int64_t queue_id, double timeout) noexcept {
if (WaitQueueContext::get().get_queue(queue_id).has_value()) {
if (timeout < 0.0) {
timeout = fork_api_impl_::WAIT_FORK_MAX_TIMEOUT;
}
const auto timeout_ns{std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::duration<double>{timeout})};
co_return co_await wait_queue_next_t{queue_id, timeout_ns};
static_assert(CancellableAwaitable<wait_queue_t::awaiter_t>);
auto queue = WaitQueueContext::get().get_queue(queue_id);
if (!queue.has_value()) {
co_return Optional<int64_t>{};
}
co_return Optional<int64_t>{};
const auto timeout_ns{timeout > 0 && timeout <= fork_api_impl_::MAX_TIMEOUT_S
? std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::duration<double>{timeout})
: fork_api_impl_::DEFAULT_TIMEOUT_NS};
auto result_opt{co_await wait_with_timeout_t{queue.val()->pop(), timeout_ns}};
co_return result_opt.has_value() ? result_opt.value() : Optional<int64_t>{};
}
17 changes: 14 additions & 3 deletions runtime-light/stdlib/fork/fork-api.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,21 @@ task_t<void> f$sched_yield() noexcept;

task_t<void> f$sched_yield_sleep(int64_t duration_ns) noexcept;

int64_t f$wait_queue_create(array<Optional<int64_t>> fork_ids) noexcept;
inline int64_t f$wait_queue_create(array<Optional<int64_t>> fork_ids) noexcept {
return WaitQueueContext::get().create_queue(fork_ids);
}

void f$wait_queue_push(int64_t queue, Optional<int64_t> fork_id) noexcept;
inline void f$wait_queue_push(int64_t queue_id, Optional<int64_t> fork_id) noexcept {
if (auto queue = WaitQueueContext::get().get_queue(queue_id); queue.has_value() && fork_id.has_value()) {
queue.val()->push(fork_id.val());
}
}

bool f$wait_queue_empty(int64_t queue_id) noexcept;
inline bool f$wait_queue_empty(int64_t queue_id) noexcept {
if (auto queue = WaitQueueContext::get().get_queue(queue_id); queue.has_value()) {
return queue.val()->empty();
}
return false;
}

task_t<Optional<int64_t>> f$wait_queue_next(int64_t queue, double timeout = -1.0) noexcept;
40 changes: 35 additions & 5 deletions runtime-light/stdlib/fork/fork-context.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,24 @@
#include "runtime-core/utils/kphp-assert-core.h"
#include "runtime-light/coroutine/task.h"
#include "runtime-light/stdlib/fork/fork.h"
#include "runtime-light/stdlib/fork/wait-queue-context.h"
#include "runtime-light/utils/concepts.h"

constexpr int64_t INVALID_FORK_ID = -1;

class ForkComponentContext {
enum class ForkStatus { available, reserved };

template<hashable Key, typename Value>
using unordered_map = memory_resource::stl::unordered_map<Key, Value, memory_resource::unsynchronized_pool_resource>;

static constexpr auto FORK_ID_INIT = 0;

unordered_map<int64_t, task_t<fork_result>> forks;
unordered_map<int64_t, std::pair<task_t<fork_result>, ForkStatus>> forks;
int64_t next_fork_id{FORK_ID_INIT + 1};

int64_t push_fork(task_t<fork_result> &&task) noexcept {
return forks.emplace(next_fork_id, std::move(task)), next_fork_id++;
return forks.emplace(next_fork_id, std::make_pair(std::move(task), ForkStatus::available)), next_fork_id++;
}

task_t<fork_result> pop_fork(int64_t fork_id) noexcept {
Expand All @@ -34,21 +37,48 @@ class ForkComponentContext {
php_critical_error("can't find fork %" PRId64, fork_id);
}
auto fork{std::move(it_fork->second)};
php_assert(fork.second == ForkStatus::available);
forks.erase(it_fork);
return fork;
return std::move(fork.first);
}

void reserve_fork(int64_t fork_id) noexcept {
if (const auto it = forks.find(fork_id); it != forks.end()) {
it->second.second = ForkStatus::reserved;
}
}

void unreserve_fork(int64_t fork_id) noexcept {
if (const auto it = forks.find(fork_id); it != forks.end()) {
it->second.second = ForkStatus::available;
}
}

friend class start_fork_t;
template<typename>
friend class wait_fork_t;
friend class wait_queue_t;

public:
WaitQueueContext wait_queue_context;

explicit ForkComponentContext(memory_resource::unsynchronized_pool_resource &memory_resource) noexcept
: forks(unordered_map<int64_t, task_t<fork_result>>::allocator_type{memory_resource}) {}
: forks(unordered_map<int64_t, std::pair<task_t<fork_result>, ForkStatus>>::allocator_type{memory_resource})
, wait_queue_context(memory_resource) {}

static ForkComponentContext &get() noexcept;

bool contains(int64_t fork_id) const noexcept {
return forks.contains(fork_id);
if (const auto it = forks.find(fork_id); it != forks.cend()) {
return it->second.second == ForkStatus::available;
}
return false;
}

bool is_ready(int64_t fork_id) const noexcept {
if (const auto it = forks.find(fork_id); it != forks.cend()) {
return it->second.first.done();
}
return false;
}
};
11 changes: 5 additions & 6 deletions runtime-light/stdlib/fork/wait-queue-context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "runtime-light/stdlib/fork/wait-queue-context.h"

#include "runtime-light/component/component.h"
#include "runtime-light/coroutine/awaitable.h"
#include "runtime-light/stdlib/fork/fork-context.h"

WaitQueueContext &WaitQueueContext::get() noexcept {
Expand All @@ -13,17 +14,15 @@ WaitQueueContext &WaitQueueContext::get() noexcept {

int64_t WaitQueueContext::create_queue(const array<Optional<int64_t>> &fork_ids) noexcept {
auto &memory_resource{get_component_context()->runtime_allocator.memory_resource};
unordered_map<int64_t, task_t<fork_result>> forks(unordered_map<int64_t, task_t<fork_result>>::allocator_type{memory_resource});
std::for_each(fork_ids.begin(), fork_ids.end(), [&forks](const auto &it) {
unordered_set<int64_t> forks_ids(unordered_set<int64_t>::allocator_type{memory_resource});
std::for_each(fork_ids.begin(), fork_ids.end(), [&forks_ids](const auto &it) {
Optional<int64_t> fork_id = it.get_value();
if (fork_id.has_value()) {
if (auto task = ForkComponentContext::get().pop_fork(fork_id.val()); task.has_value()) {
forks[fork_id.val()] = std::move(task.val());
}
forks_ids.insert(fork_id.val());
}
});
int64_t queue_id{++next_wait_queue_id};
wait_queues.emplace(queue_id, WaitQueue(memory_resource, std::move(forks)));
wait_queues.emplace(queue_id, wait_queue_t(memory_resource, std::move(forks_ids)));
php_debug("WaitQueueContext: create queue %ld with %ld forks", queue_id, fork_ids.size().size);
return queue_id;
}
13 changes: 8 additions & 5 deletions runtime-light/stdlib/fork/wait-queue-context.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,31 @@
#include "runtime-core/core-types/decl/optional.h"
#include "runtime-core/memory-resource/resource_allocator.h"
#include "runtime-core/memory-resource/unsynchronized_pool_resource.h"
#include "runtime-light/stdlib/fork/wait-queue.h"
#include "runtime-light/utils/concepts.h"
#include "runtime-light/stdlib/fork/wait-queue.h"

class WaitQueueContext {
template<hashable Key, typename Value>
using unordered_map = memory_resource::stl::unordered_map<Key, Value, memory_resource::unsynchronized_pool_resource>;

unordered_map<int64_t, WaitQueue> wait_queues;
template<hashable T>
using unordered_set = memory_resource::stl::unordered_set<T, memory_resource::unsynchronized_pool_resource>;

unordered_map<int64_t, wait_queue_t> wait_queues;
static constexpr auto WAIT_QUEUE_INIT_ID = 0;
int64_t next_wait_queue_id{WAIT_QUEUE_INIT_ID};

public:
explicit WaitQueueContext(memory_resource::unsynchronized_pool_resource &memory_resource) noexcept
: wait_queues(unordered_map<int64_t, WaitQueue>::allocator_type{memory_resource}) {}
: wait_queues(unordered_map<int64_t, wait_queue_t>::allocator_type{memory_resource}) {}

static WaitQueueContext &get() noexcept;

int64_t create_queue(const array<Optional<int64_t>> &fork_ids) noexcept;

Optional<WaitQueue *> get_queue(int64_t queue_id) noexcept {
Optional<wait_queue_t *> get_queue(int64_t queue_id) noexcept {
if (auto it = wait_queues.find(queue_id); it != wait_queues.end()) {
return &it->second;
return std::addressof(it->second);
}
return {};
}
Expand Down
60 changes: 51 additions & 9 deletions runtime-light/stdlib/fork/wait-queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,57 @@

#include "runtime-light/stdlib/fork/wait-queue.h"

#include "runtime-light/component/component.h"

void WaitQueue::resume_awaited_handles_if_empty() {
if (forks.empty()) {
while (!awaited_handles.empty()) {
auto handle = awaited_handles.front();
awaited_handles.pop_front();
CoroutineScheduler::get().suspend({handle, WaitEvent::Rechedule{}});
}
#include "runtime-light/stdlib/fork/fork-context.h"

wait_queue_t::wait_queue_t(memory_resource::unsynchronized_pool_resource &memory_resource, unordered_set<int64_t> &&forks_ids_) noexcept
: forks_ids(std::move(forks_ids_))
, awaiters(deque<awaiter_t>::allocator_type{memory_resource}) {
std::for_each(forks_ids.begin(), forks_ids.end(), [](int64_t fork_id) { ForkComponentContext::get().reserve_fork(fork_id); });
}

void wait_queue_t::push(int64_t fork_id) noexcept {
forks_ids.insert(fork_id);
ForkComponentContext::get().reserve_fork(fork_id);
if (!awaiters.empty()) {
auto &task = ForkComponentContext::get().forks[fork_id].first;
task_t<fork_result>::awaiter_t awaiter{std::addressof(task)};
awaiter.await_suspend(awaiters.front().awaited_handle);
}
}

void wait_queue_t::insert_awaiter(awaiter_t awaiter) noexcept {
if (awaiters.empty()) {
std::for_each(forks_ids.begin(), forks_ids.end(), [&](int64_t fork_id) {
task_t<fork_result>::awaiter_t task_awaiter{std::addressof(ForkComponentContext::get().forks[fork_id].first)};
task_awaiter.await_suspend(awaiter.awaited_handle);
});
}
awaiters.push_back(awaiter);
}

void wait_queue_t::erase_awaiter(awaiter_t awaiter) noexcept {
auto it = std::find(awaiters.begin(), awaiters.end(), awaiter);
if (it != awaiters.end()) {
awaiters.erase(it);
}
std::coroutine_handle<> next_awaiter = awaiters.empty() ? std::noop_coroutine() : awaiters.front().awaited_handle;
std::for_each(forks_ids.begin(), forks_ids.end(), [&](int64_t fork_id) {
task_t<fork_result>::awaiter_t task_awaiter{std::addressof(ForkComponentContext::get().forks[fork_id].first)};
task_awaiter.await_suspend(next_awaiter);
});
}

int64_t wait_queue_t::pop_ready_fork() noexcept {
auto it = std::find_if(forks_ids.begin(), forks_ids.end(), [](int64_t fork_id) { return ForkComponentContext::get().is_ready(fork_id); });
if (it == forks_ids.end()) {
php_critical_error("there is no fork to pop from queue");
}
int64_t ready_fork = *it;
forks_ids.erase(it);
ForkComponentContext::get().unreserve_fork(ready_fork);
return ready_fork;
}

bool wait_queue_t::has_ready_fork() const noexcept {
return std::any_of(forks_ids.begin(), forks_ids.end(), [](int64_t fork_id) { return ForkComponentContext::get().is_ready(fork_id); });
}
Loading

0 comments on commit 9eebcb3

Please sign in to comment.