From b7198e256f8369234598e2d7d3e5b35698cebfc5 Mon Sep 17 00:00:00 2001 From: Kleis Auke Wolthuizen Date: Mon, 9 Sep 2024 13:54:53 +0200 Subject: [PATCH] threadset: avoid thread oversubscription By having another queue of idle threads. --- libvips/iofuncs/threadset.c | 171 ++++++++++++++++++++++++------------ 1 file changed, 117 insertions(+), 54 deletions(-) diff --git a/libvips/iofuncs/threadset.c b/libvips/iofuncs/threadset.c index 787094788..336b30e8a 100644 --- a/libvips/iofuncs/threadset.c +++ b/libvips/iofuncs/threadset.c @@ -67,9 +67,13 @@ typedef struct _VipsThreadExec { } VipsThreadExec; struct _VipsThreadset { - /* A asynchronous queue of VipsThreadExec. + /* An asynchronous queue of tasks. */ - GAsyncQueue *queue; + GAsyncQueue *task_queue; + + /* An asynchronous queue of idle threads. + */ + GAsyncQueue *idle_thread_queue; /* A semaphore used to indicate when the last thread has * completed its execution. @@ -83,42 +87,88 @@ struct _VipsThreadset { int n_threads_highwater; int max_threads; + /* The current number of idle threads. + */ + int idle_threads; + /* Set by our controller to request exit. */ gboolean exit; }; /* The maximum relative time (in microseconds) that a thread waits - * for work before being stopped, when set->max_threads is 0. + * for work before being stopped. */ static const guint64 max_idle_time = 15 * G_TIME_SPAN_SECOND; +/* The maximum number of idle threads. + */ +static const int max_idle_threads = 2; + +static gboolean +vips_threadset_reuse_wait(VipsThreadset *set) +{ + gpointer result; + + /* A superfluous thread? Leave this thread. + */ + if (set->idle_threads >= max_idle_threads) + return FALSE; + + set->idle_threads++; + + g_async_queue_unlock(set->task_queue); + + /* Wait for at least 15 seconds before leaving this thread. + */ + result = g_async_queue_timeout_pop(set->idle_thread_queue, max_idle_time); + + g_async_queue_lock(set->task_queue); + + set->idle_threads--; + + return result != NULL; +} + /* The thread work function. */ static void * vips_threadset_work(void *pointer) { VipsThreadset *set = (VipsThreadset *) pointer; + gboolean signal_last; VIPS_DEBUG_MSG("vips_threadset_work: starting %p\n", g_thread_self()); - for (;;) { - g_async_queue_lock(set->queue); + g_async_queue_lock(set->task_queue); + for (;;) { /* Pop a task from the queue. If the number of threads is limited, * this will block until a task becomes available. Otherwise, it - * waits for at least 15 seconds before receiving work. + * waits for at least 1/2 second before being marked as idle. */ VipsThreadExec *task = set->max_threads > 0 - ? g_async_queue_pop_unlocked(set->queue) - : g_async_queue_timeout_pop_unlocked(set->queue, max_idle_time); + ? g_async_queue_pop_unlocked(set->task_queue) + : g_async_queue_timeout_pop_unlocked(set->task_queue, + G_USEC_PER_SEC / 2); - /* No task available or request to exit? Leave this thread. + /* Request to exit? Leave this thread. */ - if (task == NULL || set->exit) + if (set->exit) break; - g_async_queue_unlock(set->queue); + /* No task available? Wait for being reused. + */ + if (task == NULL) { + if (!vips_threadset_reuse_wait(set)) + break; + + continue; + } + + /* A task was received and there was no request to exit. + */ + g_async_queue_unlock(set->task_queue); /* If we're profiling, attach a prof struct to this thread. */ @@ -134,16 +184,19 @@ vips_threadset_work(void *pointer) */ vips_thread_shutdown(); VIPS_FREE(task); + + g_async_queue_lock(set->task_queue); } /* Timed-out or exit has been requested, decrement number of threads. */ set->n_threads--; - gboolean signal_last = set->exit && set->n_threads == 0; - VIPS_DEBUG_MSG("vips_threadset_work: stopping %p (%d remaining)\n", - g_thread_self(), set->n_threads); + signal_last = set->exit && set->n_threads == 0; + VIPS_DEBUG_MSG( + "vips_threadset_work: stopping %p (%d remaining, %d idle)\n", + g_thread_self(), set->n_threads, set->idle_threads); - g_async_queue_unlock(set->queue); + g_async_queue_unlock(set->task_queue); /* We are the last thread: tell the main thread. */ @@ -153,32 +206,49 @@ vips_threadset_work(void *pointer) return NULL; } -/* Add a new idle thread to the set. +/* Add a new thread to the set. */ -static int +static gboolean vips_threadset_add_thread(VipsThreadset *set) { - GThread *thread; + gboolean reused = FALSE; /* There are already sufficient threads running. */ if (set->max_threads > 0 && set->n_threads >= set->max_threads) - return 0; + return TRUE; - if (!(thread = vips_g_thread_new("libvips worker", - vips_threadset_work, set))) - return -1; + g_async_queue_lock(set->idle_thread_queue); - /* Ensure idle threads are freed on exit. - */ - g_thread_unref(thread); + if (g_async_queue_length_unlocked(set->idle_thread_queue) < 0) { + g_async_queue_push_unlocked(set->idle_thread_queue, + GUINT_TO_POINTER(1)); - set->n_threads++; - set->n_threads_highwater = - VIPS_MAX(set->n_threads_highwater, set->n_threads); + reused = TRUE; + } - return 0; + g_async_queue_unlock(set->idle_thread_queue); + + if (!reused) { + /* No idle thread was found, we have to start a new one. + */ + GThread *thread; + + if (!(thread = vips_g_thread_new("libvips worker", + vips_threadset_work, set))) + return FALSE; + + /* Ensure threads are freed on exit. + */ + g_thread_unref(thread); + + set->n_threads++; + set->n_threads_highwater = + VIPS_MAX(set->n_threads_highwater, set->n_threads); + } + + return TRUE; } /** @@ -202,25 +272,19 @@ vips_threadset_new(int max_threads) VipsThreadset *set; set = g_new0(VipsThreadset, 1); - set->queue = g_async_queue_new(); + set->task_queue = g_async_queue_new(); + set->idle_thread_queue = g_async_queue_new(); vips_semaphore_init(&set->finish, 0, "finish"); set->max_threads = max_threads; - if (set->max_threads > 0) { - g_async_queue_lock(set->queue); - + if (set->max_threads > 0) for (int i = 0; i < set->max_threads; i++) { - if (vips_threadset_add_thread(set)) { - g_async_queue_unlock(set->queue); - + if (!vips_threadset_add_thread(set)) { vips_threadset_free(set); return NULL; } } - g_async_queue_unlock(set->queue); - } - return set; } @@ -245,18 +309,13 @@ vips_threadset_run(VipsThreadset *set, { VipsThreadExec *task; - g_async_queue_lock(set->queue); - - /* The queue length is calculated as the number of tasks in the queue - * minus the number of waiting threads. - */ - const int queue_length = g_async_queue_length_unlocked(set->queue); + g_async_queue_lock(set->task_queue); /* Create a new thread if there are no waiting threads in the queue. */ - if (queue_length >= 0) - if (vips_threadset_add_thread(set)) { - g_async_queue_unlock(set->queue); + if (g_async_queue_length_unlocked(set->task_queue) >= 0) + if (!vips_threadset_add_thread(set)) { + g_async_queue_unlock(set->task_queue); /* Thread create has failed. */ @@ -270,8 +329,8 @@ vips_threadset_run(VipsThreadset *set, task->func = func; task->data = data; - g_async_queue_push_unlocked(set->queue, task); - g_async_queue_unlock(set->queue); + g_async_queue_push_unlocked(set->task_queue, task); + g_async_queue_unlock(set->task_queue); return 0; } @@ -290,7 +349,7 @@ vips_threadset_free(VipsThreadset *set) VIPS_DEBUG_MSG("vips_threadset_free: %p\n", set); - g_async_queue_lock(set->queue); + g_async_queue_lock(set->task_queue); set->exit = TRUE; @@ -299,10 +358,13 @@ vips_threadset_free(VipsThreadset *set) /* Send dummy data to the queue, causing threads to wake up and check * the above set->exit condition. */ + for (int i = 0; i < set->idle_threads; i++) + g_async_queue_push(set->idle_thread_queue, GUINT_TO_POINTER(1)); + for (int i = 0; i < n_threads; i++) - g_async_queue_push_unlocked(set->queue, GUINT_TO_POINTER(1)); + g_async_queue_push_unlocked(set->task_queue, GUINT_TO_POINTER(1)); - g_async_queue_unlock(set->queue); + g_async_queue_unlock(set->task_queue); /* Wait for the last thread to finish. */ @@ -314,6 +376,7 @@ vips_threadset_free(VipsThreadset *set) printf("vips_threadset_free: peak of %d threads\n", set->n_threads_highwater); - VIPS_FREEF(g_async_queue_unref, set->queue); + VIPS_FREEF(g_async_queue_unref, set->task_queue); + VIPS_FREEF(g_async_queue_unref, set->idle_thread_queue); VIPS_FREE(set); }