Skip to content

Commit

Permalink
Add task arena worker block time (#1352)
Browse files Browse the repository at this point in the history
* Add `threading_control::is_any_other_client_active()` to verify if other arenas are requesting workers.
* Add hybrid CPU detection at initialization.
* On non-hybrid CPUs, make workers block for 1ms before leaving the arena, unless
  - more work becomes available in the arena (-> unblock and stay in the arena);
  - other arenas are requesting workers (-> unblock and leave the arena immediately).

Signed-off-by: Dmitri Mokhov <[email protected]>
Signed-off-by: pavelkumbrasev <[email protected]>
  • Loading branch information
dnmokhov authored Apr 24, 2024
1 parent 9501cd9 commit 92c8529
Show file tree
Hide file tree
Showing 13 changed files with 88 additions and 20 deletions.
4 changes: 2 additions & 2 deletions src/tbb/arena.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2023 Intel Corporation
Copyright (c) 2005-2024 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -396,7 +396,7 @@ bool arena::is_top_priority() const {
}

bool arena::try_join() {
if (num_workers_active() < my_num_workers_allotted.load(std::memory_order_relaxed)) {
if (is_joinable()) {
my_references += arena::ref_worker;
return true;
}
Expand Down
6 changes: 5 additions & 1 deletion src/tbb/arena.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2023 Intel Corporation
Copyright (c) 2005-2024 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -385,6 +385,10 @@ class arena: public padded<arena_base>

bool is_top_priority() const;

bool is_joinable() const {
return num_workers_active() < my_num_workers_allotted.load(std::memory_order_relaxed);
}

bool try_join();

void set_allotment(unsigned allotment);
Expand Down
4 changes: 3 additions & 1 deletion src/tbb/governor.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2023 Intel Corporation
Copyright (c) 2005-2024 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -138,6 +138,8 @@ class governor {
static bool wait_package_enabled() { return cpu_features.waitpkg_enabled; }
#endif

static bool hybrid_cpu() { return cpu_features.hybrid; }

static bool rethrow_exception_broken() { return is_rethrow_broken; }

static bool is_itt_present() {
Expand Down
17 changes: 14 additions & 3 deletions src/tbb/misc.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2021 Intel Corporation
Copyright (c) 2005-2024 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -92,6 +92,8 @@ void PrintExtraVersionInfo( const char* category, const char* format, ... ) {
//! check for transaction support.
#if _MSC_VER
#include <intrin.h> // for __cpuid
#elif __APPLE__
#include <sys/sysctl.h>
#endif

#if __TBB_x86_32 || __TBB_x86_64
Expand Down Expand Up @@ -131,13 +133,22 @@ void detect_cpu_features(cpu_features_type& cpu_features) {
#if __TBB_x86_32 || __TBB_x86_64
const int rtm_ebx_mask = 1 << 11;
const int waitpkg_ecx_mask = 1 << 5;
const int hybrid_edx_mask = 1 << 15;
int registers[4] = {0};

// Check RTM and WAITPKG
// Check RTM, WAITPKG, HYBRID
check_cpuid(7, 0, registers);
cpu_features.rtm_enabled = (registers[1] & rtm_ebx_mask) != 0;
cpu_features.waitpkg_enabled = (registers[2] & waitpkg_ecx_mask) != 0;
#endif /* (__TBB_x86_32 || __TBB_x86_64) */
cpu_features.hybrid = (registers[3] & hybrid_edx_mask) != 0;
#elif __APPLE__
// Check HYBRID (hw.nperflevels > 1)
uint64_t nperflevels = 0;
size_t nperflevels_size = sizeof(nperflevels);
if (!sysctlbyname("hw.nperflevels", &nperflevels, &nperflevels_size, nullptr, 0)) {
cpu_features.hybrid = (nperflevels > 1);
}
#endif
}

} // namespace r1
Expand Down
3 changes: 2 additions & 1 deletion src/tbb/misc.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2022 Intel Corporation
Copyright (c) 2005-2024 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -211,6 +211,7 @@ T1 atomic_update(std::atomic<T1>& dst, T1 newValue, Pred compare) {
struct cpu_features_type {
bool rtm_enabled{false};
bool waitpkg_enabled{false};
bool hybrid{false};
};

void detect_cpu_features(cpu_features_type& cpu_features);
Expand Down
14 changes: 13 additions & 1 deletion src/tbb/thread_dispatcher.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2022-2023 Intel Corporation
Copyright (c) 2022-2024 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -164,6 +164,18 @@ thread_dispatcher_client* thread_dispatcher::client_in_need(thread_dispatcher_cl
return client_in_need(my_client_list, my_next_client);
}

bool thread_dispatcher::is_any_client_in_need() {
client_list_mutex_type::scoped_lock lock(my_list_mutex, /*is_writer=*/false);
for (auto& priority_list : my_client_list) {
for (auto& client : priority_list) {
if (client.is_joinable()) {
return true;
}
}
}
return false;
}

void thread_dispatcher::adjust_job_count_estimate(int delta) {
my_server->adjust_job_count_estimate(delta);
}
Expand Down
3 changes: 2 additions & 1 deletion src/tbb/thread_dispatcher.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2022-2023 Intel Corporation
Copyright (c) 2022-2024 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -44,6 +44,7 @@ class thread_dispatcher : no_copy, rml::tbb_client {
thread_dispatcher_client* create_client(arena& a);
void register_client(thread_dispatcher_client* client);
bool try_unregister_client(thread_dispatcher_client* client, std::uint64_t aba_epoch, unsigned priority);
bool is_any_client_in_need();

void adjust_job_count_estimate(int delta);
void release(bool blocking_terminate);
Expand Down
7 changes: 6 additions & 1 deletion src/tbb/thread_dispatcher_client.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2022-2023 Intel Corporation
Copyright (c) 2022-2024 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,6 +32,11 @@ class thread_dispatcher_client : public d1::intrusive_list_node /* Need for list
bool try_join() {
return my_arena.try_join();
}

bool is_joinable() {
return my_arena.is_joinable();
}

void process(thread_data& td) {
my_arena.process(td);
}
Expand Down
10 changes: 6 additions & 4 deletions src/tbb/thread_request_serializer.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2022-2023 Intel Corporation
Copyright (c) 2022-2024 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -37,16 +37,16 @@ void thread_request_serializer::update(int delta) {
if (prev_pending_delta == pending_delta_base) {
delta = int(my_pending_delta.exchange(pending_delta_base) & delta_mask) - int(pending_delta_base);
mutex_type::scoped_lock lock(my_mutex);
my_total_request += delta;
delta = limit_delta(delta, my_soft_limit, my_total_request);
my_total_request.store(my_total_request.load(std::memory_order_relaxed) + delta, std::memory_order_relaxed);
delta = limit_delta(delta, my_soft_limit, my_total_request.load(std::memory_order_relaxed));
my_thread_dispatcher.adjust_job_count_estimate(delta);
}
}

void thread_request_serializer::set_active_num_workers(int soft_limit) {
mutex_type::scoped_lock lock(my_mutex);
int delta = soft_limit - my_soft_limit;
delta = limit_delta(delta, my_total_request, soft_limit);
delta = limit_delta(delta, my_total_request.load(std::memory_order_relaxed), soft_limit);
my_thread_dispatcher.adjust_job_count_estimate(delta);
my_soft_limit = soft_limit;
}
Expand Down Expand Up @@ -109,6 +109,8 @@ void thread_request_serializer_proxy::set_active_num_workers(int soft_limit) {
}
}

int thread_request_serializer_proxy::num_workers_requested() { return my_serializer.num_workers_requested(); }

void thread_request_serializer_proxy::update(int delta) { my_serializer.update(delta); }

void thread_request_serializer_proxy::enable_mandatory_concurrency(mutex_type::scoped_lock& lock) {
Expand Down
6 changes: 4 additions & 2 deletions src/tbb/thread_request_serializer.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2022-2023 Intel Corporation
Copyright (c) 2022-2024 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -39,6 +39,7 @@ class thread_request_serializer : public thread_request_observer {
public:
thread_request_serializer(thread_dispatcher& td, int soft_limit);
void set_active_num_workers(int soft_limit);
int num_workers_requested() { return my_total_request.load(std::memory_order_relaxed); }
bool is_no_workers_avaliable() { return my_soft_limit == 0; }

private:
Expand All @@ -48,7 +49,7 @@ class thread_request_serializer : public thread_request_observer {

thread_dispatcher& my_thread_dispatcher;
int my_soft_limit{ 0 };
int my_total_request{ 0 };
std::atomic<int> my_total_request{ 0 };
// my_pending_delta is set to pending_delta_base to have ability to hold negative values
// consider increase base since thead number will be bigger than 1 << 15
static constexpr std::uint64_t pending_delta_base = 1 << 15;
Expand All @@ -63,6 +64,7 @@ class thread_request_serializer_proxy : public thread_request_observer {
thread_request_serializer_proxy(thread_dispatcher& td, int soft_limit);
void register_mandatory_request(int mandatory_delta);
void set_active_num_workers(int soft_limit);
int num_workers_requested();

private:
void update(int delta) override;
Expand Down
10 changes: 9 additions & 1 deletion src/tbb/threading_control.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2022-2023 Intel Corporation
Copyright (c) 2022-2024 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -164,6 +164,10 @@ void threading_control_impl::adjust_demand(threading_control_client tc_client, i
my_permit_manager->adjust_demand(c, mandatory_delta, workers_delta);
}

bool threading_control_impl::is_any_other_client_active() {
return my_thread_request_serializer->num_workers_requested() > 0 ? my_thread_dispatcher->is_any_client_in_need() : false;
}

thread_control_monitor& threading_control_impl::get_waiting_threads_monitor() {
return *my_waiting_threads_monitor;
}
Expand Down Expand Up @@ -389,6 +393,10 @@ void threading_control::adjust_demand(threading_control_client client, int manda
my_pimpl->adjust_demand(client, mandatory_delta, workers_delta);
}

bool threading_control::is_any_other_client_active() {
return my_pimpl->is_any_other_client_active();
}

thread_control_monitor& threading_control::get_waiting_threads_monitor() {
return my_pimpl->get_waiting_threads_monitor();
}
Expand Down
4 changes: 3 additions & 1 deletion src/tbb/threading_control.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2022-2023 Intel Corporation
Copyright (c) 2022-2024 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -69,6 +69,7 @@ class threading_control_impl {
unsigned max_num_workers();

void adjust_demand(threading_control_client, int mandatory_delta, int workers_delta);
bool is_any_other_client_active();

thread_control_monitor& get_waiting_threads_monitor();

Expand Down Expand Up @@ -116,6 +117,7 @@ class threading_control {
static unsigned max_num_workers();

void adjust_demand(threading_control_client client, int mandatory_delta, int workers_delta);
bool is_any_other_client_active();

thread_control_monitor& get_waiting_threads_monitor();

Expand Down
20 changes: 19 additions & 1 deletion src/tbb/waiters.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2023 Intel Corporation
Copyright (c) 2005-2024 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -58,6 +58,24 @@ class outermost_worker_waiter : public waiter_base {
__TBB_ASSERT(t == nullptr, nullptr);

if (is_worker_should_leave(slot)) {
if (!governor::hybrid_cpu()) {
static constexpr std::chrono::microseconds worker_wait_leave_duration(1000);
static_assert(worker_wait_leave_duration > std::chrono::steady_clock::duration(1), "Clock resolution is not enough for measured interval.");

for (auto t1 = std::chrono::steady_clock::now(), t2 = t1;
std::chrono::duration_cast<std::chrono::microseconds>(t2 - t1) < worker_wait_leave_duration;
t2 = std::chrono::steady_clock::now())
{
if (!my_arena.is_empty() && !my_arena.is_recall_requested()) {
return true;
}

if (my_arena.my_threading_control->is_any_other_client_active()) {
break;
}
d0::yield();
}
}
// Leave dispatch loop
return false;
}
Expand Down

0 comments on commit 92c8529

Please sign in to comment.