Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add background thread to update device op queue upon receiving kernel interrupt #189

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/acl_globals.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ int acl_present_board_is_valid(void);
// Can't use ACL after this.
// Undoes acl_init().
void acl_reset(void);
// Version of reset used in unit test only
void acl_reset_join_thread(void);
Comment on lines +25 to +26
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is used in the unit tests only it should not be in the include or src directories, but under test.


// Initializes the HAL and loads the builtin system definition.
//
Expand Down
3 changes: 3 additions & 0 deletions include/acl_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -1636,6 +1636,9 @@ typedef struct _cl_platform_id
// The device operation queue.
// These are the operations that can run immediately on the device.
acl_device_op_queue_t device_op_queue;
// Thread used to update device_op_queue when kernel interrupt triggers
acl_thread_t device_op_queue_update_thread;
bool outstanding_interrupt;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for my understanding: Reads/writes to this variable are guarded by the global mutex.


// Limits. See clGetDeviceInfo for semantics.
unsigned int max_param_size;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ int acl_sem_destroy(acl_sem_t *sem);
// See this Microsoft Research paper on how to implement condition
// variables with only semaphores
// http://research.microsoft.com/pubs/64242/implementingcvs.pdf
// It's veyr instructive, but we can't use its implementation because:
// It's very instructive, but we can't use its implementation because:
// - The signaler acquires a mutex
// - It keeps an explicit linked list of waiters
//
Expand Down
29 changes: 29 additions & 0 deletions src/acl_globals.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,35 @@ void acl_reset(void) {
acl_platform.initialized = 0;
}

// This function should only be used in the unit test
void acl_reset_join_thread(void) {
{
std::scoped_lock lock{acl_mutex_wrapper};

l_reset_present_board();

acl_platform.offline_device = "";
acl_platform.num_devices = 0;
for (unsigned i = 0; i < ACL_MAX_DEVICE; ++i) {
acl_platform.device[i] = _cl_device_id();
}
acl_platform.initialized = 0;
Comment on lines +239 to +246
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This (almost) duplicates acl_reset(). Can that function be called instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember seeing some issue if I call acl_signal_device_update without the lock or in another critical section, but I can't remember clearly if that's the case. I'll try to think through it again to see if using acl_reset here is okay.

acl_signal_device_update();
}
// Each unit test test groups are sequentially run and acl_init and acl_reset
// is called once at the start (setup) and end (teardown) of the test group.
// As acl_init wouldn't be called before acl_reset finished, it is okay to
// block here to wait for the device op queue update thread to finish here.

// Note that the join has to be called without holding the acl global lock, if
// reset acquires lock and wait, the device op queue update thread will try to
// obtain the lock forever, resulting in deadlock in the unit test.
if (acl_platform.device_op_queue_update_thread != 0) {
acl_thread_join(&acl_platform.device_op_queue_update_thread);
acl_platform.device_op_queue_update_thread = 0;
}
}

////////////////////////////////////////////////////
// Static functions

Expand Down
4 changes: 4 additions & 0 deletions src/acl_kernel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3176,6 +3176,10 @@ void acl_receive_kernel_update(int activation_id, cl_int status) {
std::unique_lock lock{acl_mutex_wrapper, std::defer_lock};
if (!acl_is_inside_sig()) {
lock.lock();
} else {
// Let the device op queue update thread know there is an interrupt from
// the kernel interrupt signal handler
acl_platform.outstanding_interrupt = 1;
}

if (activation_id >= 0 && activation_id < doq->max_ops) {
Expand Down
24 changes: 24 additions & 0 deletions src/acl_platform.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ static void l_initialize_devices(const acl_system_def_t *present_board_def,
int offline_mode, unsigned int num_devices,
const cl_device_id *devices);
static void l_add_device(int idx);
void *l_eagerly_update_device_op_queue(void *arg);

//////////////////////////////
// OpenCL API
Expand Down Expand Up @@ -412,6 +413,10 @@ void acl_init_platform(void) {

// Device operation queue.
acl_init_device_op_queue(&acl_platform.device_op_queue);
// Send off device_op_queue update thread
acl_platform.outstanding_interrupt = 0;
acl_thread_create(&acl_platform.device_op_queue_update_thread, 0,
l_eagerly_update_device_op_queue, NULL);

// Initialize sampler allocator.
for (int i = 0; i < ACL_MAX_SAMPLER; i++) {
Expand Down Expand Up @@ -737,6 +742,25 @@ static void l_add_device(int idx) {
device->address_bits = 64; // Yes, our devices are 64-bit.
}

void *l_eagerly_update_device_op_queue(void *arg) {
while (true) {
std::scoped_lock lock{acl_mutex_wrapper};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused how this is not blocking runtime progress. If the mutex is acquired first and the wait for a device update is second, won't the mutex be locked most of the time, such that other threads trying to acquire the mutex cannot make progress?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the thread waits it releases the lock so that other threads can make progress, are you referring to the time between the lock and wait during which other threads will be blocked?


// Sleep if no interrupt happening
acl_wait_for_device_update(NULL);
Copy link
Contributor Author

@sophimao sophimao Oct 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The side effect of using this existing call is that the thread gets woken up not just when there is a mmd triggered kernel interrupt, but also when all the signal handler functions are called (acl_receive_kernel_update, acl_set_execution_status, acl_receive_device_exception, and acl_schedule_printf_buffer_pickup). But this is already better than always polling, I wonder if it is necessary to fine grain this even further (might be risky)?

Copy link
Contributor Author

@sophimao sophimao Nov 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can potentially

  • Introduce a new acl_condvar_s, or
  • Use a semaphore (is this doable?)

To signal to the device op queue update thread, to avoid waking up from event status update calls, etc.

However, the device op queue update thread might still need to acquire the global lock when doing the update, so that no other thread is modifying the shared resources (which are quite a lot...).

Copy link
Contributor

@pcolberg pcolberg Nov 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are good questions! However, instead of considering the options for the choice of syncronisation mechanism, I suggest taking a step back and considering the data structures that are underlying this change and how they might be refactored to avoid (too much) locking due to concurrent access.

The primary goal here is to make forward progress in processing the device operations queue, correct? With this implementation, the queue is written to by multiple threads and read from by multiple threads (some spawned by the user and the background thread spawned by the runtime). Is there any advantage in processing the queue from multiple threads? To avoid contention over the global mutex, could the processing, i.e. reading from the queue be taken over by the background thread only? Such that the user's (potentially) multiple threads only feed, i.e. write to the queue?

If the queue is indeed the central data structure to this change, then the task becomes to find a thread-safe data structure. In the simplest implementation this could be a achieved using a dedicated mutex. Beyond that, third-party implementations are available, some of them lock-free, with different tradeoffs to be evaluated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback! Yes, the primary goal is to make forward progress in processing the device operation queue. In the model you suggested, is it kind of like treating the device op queue as a buffer for consumer (runtime spawned thread) and producer (user threads)? This conceptually makes sense, just that I'm not sure if there are other runtime constructs (events, command queue, etc.) modified during the process of updating the device op queue, would that pose any obstacle to the consumer-producer model?

Also I'm thinking, if at the end of the day we are actively making progress on the device op queue, maybe it would be possible to remove some of the acl_idle_updates calls in clEnqueue... used to nudge the device op queue. I'm not sure if this idea is practical in any sense or if it will cause any issue though...

Copy link
Contributor

@pcolberg pcolberg Nov 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the model you suggested, is it kind of like treating the device op queue as a buffer for consumer (runtime spawned thread) and producer (user threads)?

Yes, I am thinking of a multi-producer single-consumer queue. The runtime is thread-safe, but it is not efficient since it was designed for a single thread only. The addition of a background thread means a significant overhaul of this design.

This conceptually makes sense, just that I'm not sure if there are other runtime constructs (events, command queue, etc.) modified during the process of updating the device op queue, would that pose any obstacle to the consumer-producer model?

Indeed, that is the next question. As the background thread consumes new operations from the queue, can it perform its work without constantly obtaining the global mutex for other resources, thus blocking the foreground/user threads feeding the queue? What resources does the background thread need to perform its work? Which of these are shared with foreground threads and how frequently?

Also I'm thinking, if at the end of the day we are actively making progress on the device op queue, maybe it would be possible to remove some of the acl_idle_updates calls in clEnqueue... used to nudge the device op queue. I'm not sure if this idea is practical in any sense or if it will cause any issue though...

I am not certain either, but removing these is likely needed to avoid blocking the background thread. Maybe a good start is to draft a summary of all the tasks which the runtime currently performs in the "background" to make forward progress?


if (!acl_platform.initialized) {
break;
}
if (acl_platform.outstanding_interrupt) {
acl_print_debug_msg("Serving outstanding kernel interrupt...\n");
acl_update_device_op_queue(&(acl_platform.device_op_queue));
acl_platform.outstanding_interrupt = 0;
}
}
return NULL;
}

// These functions check to see if a given object is known to the system.
// acl_*_is_valid( * );
// This is simple because everything is statically allocated.
Expand Down
11 changes: 10 additions & 1 deletion src/acl_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <acl_context.h>
#include <acl_hal.h>
#include <acl_thread.h>
#include <acl_util.h>

ACL_TLS int acl_global_lock_count = 0;
ACL_TLS int acl_inside_sig_flag = 0;
Expand Down Expand Up @@ -55,7 +56,7 @@ void acl_mutex_wrapper_t::resume_lock(int lock_count) {

void acl_wait_for_device_update(cl_context context) {
acl_assert_locked();
if (acl_get_hal()->get_debug_verbosity &&
if (acl_context_is_valid(context) && acl_get_hal()->get_debug_verbosity &&
acl_get_hal()->get_debug_verbosity() > 0) {
unsigned timeout = 5; // Seconds
// Keep waiting until signal is received
Expand Down Expand Up @@ -102,6 +103,14 @@ __attribute__((constructor)) static void l_global_lock_init() {
}

__attribute__((destructor)) static void l_global_lock_uninit() {
if (acl_get_platform()->device_op_queue_update_thread) {
{
std::scoped_lock lock{acl_mutex_wrapper};
acl_get_platform()->initialized = 0;
acl_signal_condvar(&l_acl_global_condvar); // wake up waiting thread
}
acl_thread_join(&acl_get_platform()->device_op_queue_update_thread);
}
Comment on lines +106 to +113
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the thread joined in the library destructor? From my naïve understanding, I would expect the thread to be created on clCreateContext() and joined on (the last) clReleaseContext()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally included the thread as a member of acl_platform so I created it when initializing the platform. However the acl_reset function is never called in a user flow so I had to put it in the destructor... I agree that it's probably better to do this when creating and releasing context, but need to add some flag indicating the first creation and the last release.

acl_reset_condvar(&l_acl_global_condvar);
}

Expand Down
2 changes: 1 addition & 1 deletion test/acl_command_queue_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ MT_TEST_GROUP(acl_command_queue) {
syncThreads();

if (threadNum() == 0) {
ACL_LOCKED(acl_test_teardown_generic_system());
acl_test_teardown_generic_system();
}

acl_test_run_standard_teardown_checks();
Expand Down
2 changes: 1 addition & 1 deletion test/acl_context_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ MT_TEST_GROUP(Context) {
syncThreads();

if (threadNum() == 0) {
ACL_LOCKED(acl_test_teardown_generic_system());
acl_test_teardown_generic_system();
}
acl_test_run_standard_teardown_checks();
}
Expand Down
2 changes: 1 addition & 1 deletion test/acl_device_op_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ TEST_GROUP(device_op) {

virtual void teardown() {
unload();
acl_test_teardown_generic_system();
acl_mutex_wrapper.unlock();
acl_test_teardown_generic_system();
acl_test_run_standard_teardown_checks();
}

Expand Down
2 changes: 1 addition & 1 deletion test/acl_device_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ syncThreads();
void teardown() {
syncThreads();
if (threadNum() == 0) {
ACL_LOCKED(acl_test_teardown_generic_system());
acl_test_teardown_generic_system();
}
acl_test_run_standard_teardown_checks();
}
Expand Down
4 changes: 2 additions & 2 deletions test/acl_event_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ MT_TEST_GROUP(acl_event) {
syncThreads();

if (threadNum() == 0) {
ACL_LOCKED(acl_test_teardown_generic_system());
acl_test_teardown_generic_system();
}

acl_test_run_standard_teardown_checks();
Expand Down Expand Up @@ -129,7 +129,7 @@ MT_TEST_GROUP(acl_event_default_config) {

syncThreads();
if (threadNum() == 0) {
ACL_LOCKED(acl_test_teardown_generic_system());
acl_test_teardown_generic_system();
}

acl_test_run_standard_teardown_checks();
Expand Down
12 changes: 9 additions & 3 deletions test/acl_globals_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,9 @@ TEST(acl_globals_undef, valid_init_simple) {
CHECK(0 != acl_present_board_def());
CHECK(0 != acl_present_board_is_valid());
// Teardown
acl_reset();
acl_mutex_wrapper.unlock();
acl_reset_join_thread();
acl_mutex_wrapper.lock();
CHECK(0 == acl_present_board_def());
CHECK(0 == acl_present_board_is_valid());
}
Expand All @@ -726,7 +728,9 @@ TEST(acl_globals_undef, valid_init_empty) {
CHECK(0 != acl_present_board_def());
CHECK(0 != acl_present_board_is_valid());
// Teardown
acl_reset();
acl_mutex_wrapper.unlock();
acl_reset_join_thread();
acl_mutex_wrapper.lock();
CHECK(0 == acl_present_board_def());
CHECK(0 == acl_present_board_is_valid());
}
Expand All @@ -735,6 +739,8 @@ TEST(acl_globals_undef, valid_init_complex) {
CHECK_EQUAL(1, acl_init(&acltest_complex_system));
CHECK(0 != acl_present_board_def());
// Teardown
acl_reset();
acl_mutex_wrapper.unlock();
acl_reset_join_thread();
acl_mutex_wrapper.lock();
CHECK_EQUAL(0, acl_present_board_def());
}
9 changes: 7 additions & 2 deletions test/acl_platform_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ TEST(offline_device, offline_hal) {
cl_bool result;

acl_test_setenv(m_env, offline_device);
ACL_LOCKED(acl_reset());
acl_reset_join_thread();
ACL_LOCKED(result = acl_init_from_hal_discovery());
CHECK_EQUAL(CL_TRUE, result);
// Exercise the offline HAL: printing, and the timestamps.
Expand All @@ -390,6 +390,11 @@ TEST(offline_device, offline_hal) {
ACL_LOCKED(now = acl_get_hal()->get_timestamp());
ACL_LOCKED(acl_print_debug_msg("offline hal time is %08x%08x", (now >> 32),
(now & 0xffffffff)));

syncThreads();
if (threadNum() == 0) {
acl_test_teardown_system();
}
}

struct live_info_t {
Expand Down Expand Up @@ -435,7 +440,7 @@ MT_TEST_GROUP(track_object) {
syncThreads();
if (threadNum() == 0) {
acl_test_unsetenv(m_offline_env);
ACL_LOCKED(acl_reset());
acl_reset_join_thread();
}
acl_test_run_standard_teardown_checks();
}
Expand Down
2 changes: 1 addition & 1 deletion test/acl_support_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ TEST_GROUP(support){void setup(){acl_mutex_wrapper.lock();
acl_test_setup_generic_system();
}
void teardown() {
acl_test_teardown_generic_system();
acl_mutex_wrapper.unlock();
acl_test_teardown_generic_system();
acl_test_run_standard_teardown_checks();
}

Expand Down
4 changes: 2 additions & 2 deletions test/acl_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ void acl_test_teardown_sample_default_board_system(void) {

void acl_test_teardown_generic_system(void) { acl_test_teardown_system(); }
void acl_test_teardown_system(void) {
acl_reset_join_thread();
acl_mutex_wrapper.lock();
acl_reset();
acl_reset_hal();
acltest_hal_teardown();
acl_mutex_wrapper.unlock();
Expand Down Expand Up @@ -358,7 +358,7 @@ static void l_load_example_binary(void) {
acl_test_setenv(envvar_program_lib, program_lib_old_value);
}

ACL_LOCKED(acl_test_teardown_generic_system());
acl_test_teardown_generic_system();
}

// Return a context properties array that specifies preloaded binary only.
Expand Down