From bb932305089f706e22fe10f4e5e9d866804c28fe Mon Sep 17 00:00:00 2001 From: Kleis Auke Wolthuizen Date: Thu, 5 Sep 2024 14:12:18 +0200 Subject: [PATCH] threadset: revise/simplify on top of `GAsyncQueue` The previous implementation was prone to race conditions under rare circumstances. --- ChangeLog | 1 + libvips/include/vips/internal.h | 1 - libvips/iofuncs/threadset.c | 332 +++++++++++++++++--------------- 3 files changed, 174 insertions(+), 160 deletions(-) diff --git a/ChangeLog b/ChangeLog index 7152f809db..d1ed5ecedd 100644 --- a/ChangeLog +++ b/ChangeLog @@ -7,6 +7,7 @@ - threadpool: fix a race condition in error handling [kleisauke] - disable GLib cast checks and asserts for plain builds [kleisauke] - fix jpeg in tiff for high Q [nahilsobh] +- threadset: fix a race condition during thread exit [kleisauke] 11/8/24 8.15.3 diff --git a/libvips/include/vips/internal.h b/libvips/include/vips/internal.h index 678140746e..4eb6070d4a 100644 --- a/libvips/include/vips/internal.h +++ b/libvips/include/vips/internal.h @@ -151,7 +151,6 @@ extern gboolean vips__cache_trace; void vips__thread_init(void); void vips__threadpool_init(void); void vips__threadpool_shutdown(void); -int vips__thread_execute(const char *name, GFunc func, gpointer data); VIPS_API void vips__worker_lock(GMutex *mutex); void vips__cache_init(void); diff --git a/libvips/iofuncs/threadset.c b/libvips/iofuncs/threadset.c index 153fd2b43e..10b48a332b 100644 --- a/libvips/iofuncs/threadset.c +++ b/libvips/iofuncs/threadset.c @@ -52,48 +52,40 @@ #include #include -typedef struct _VipsThreadsetMember { - /* The set we are part of. - */ - VipsThreadset *set; - - /* The underlying glib thread object. - */ - GThread *thread; - - /* The task the thread should run next. +typedef struct _VipsThreadExec { + /* The source of this function. */ const char *domain; - GFunc func; - void *data; - void *user_data; - /* The thread waits on this when it's free. + /* The function to execute within the thread. */ - VipsSemaphore idle; + GFunc func; - /* Set by our controller to request exit. + /* User data that is handed over to func when it is called. */ - gboolean kill; -} VipsThreadsetMember; + gpointer data; +} VipsThreadExec; struct _VipsThreadset { - GMutex *lock; - - /* All the VipsThreadsetMember we have created. + /* An asynchronous queue of tasks. */ - GSList *members; + GAsyncQueue *queue; - /* The set of currently idle threads. + /* Idle threads wait on this semaphore. */ - GSList *free; + VipsSemaphore idle; - /* The current number of threads, the highwater mark, and - * the max we allow before blocking thread creation. + /* The current number of (idle-)threads, the highwater mark, + * and the max we allow before blocking thread creation. */ int n_threads; int n_threads_highwater; + int n_idle_threads; int max_threads; + + /* Set by our controller to request exit. + */ + gboolean exit; }; /* The maximum relative time (in microseconds) that a thread waits @@ -101,113 +93,157 @@ struct _VipsThreadset { */ static const gint64 max_idle_time = 15 * G_TIME_SPAN_SECOND; +/* The maximum number of idle threads. + */ +static const int max_idle_threads = 2; + +static gboolean +vips_threadset_reuse_wait(VipsThreadset *set) +{ + int result; + + /* A superfluous thread? Leave this thread. + */ + if (++set->n_idle_threads > max_idle_threads) + return FALSE; + + g_async_queue_unlock(set->queue); + + /* Wait for at least 15 seconds before leaving this thread. + */ + result = vips_semaphore_down_timeout(&set->idle, max_idle_time); + + g_async_queue_lock(set->queue); + + return result != -1; +} + +static void +vips_threadset_free_internal(VipsThreadset *set) +{ + VIPS_FREEF(g_async_queue_unref, set->queue); + vips_semaphore_destroy(&set->idle); + VIPS_FREE(set); +} + /* The thread work function. */ static void * vips_threadset_work(void *pointer) { - VipsThreadsetMember *member = (VipsThreadsetMember *) pointer; - VipsThreadset *set = member->set; + VipsThreadset *set = (VipsThreadset *) pointer; + gboolean cleanup = FALSE; - VIPS_DEBUG_MSG("vips_threadset_work: starting %p\n", member); + VIPS_DEBUG_MSG("vips_threadset_work: starting %p\n", g_thread_self()); + + g_async_queue_lock(set->queue); for (;;) { - /* Wait for at least 15 seconds to be given work. + /* Pop a task from the queue. If the number of threads is limited, + * this will block until a task becomes available. Otherwise, it + * waits for at least 1/2 second before being marked as idle. */ - if (vips_semaphore_down_timeout(&member->idle, - max_idle_time) == -1) - break; + VipsThreadExec *task = set->max_threads > 0 + ? g_async_queue_pop_unlocked(set->queue) + : g_async_queue_timeout_pop_unlocked(set->queue, + G_USEC_PER_SEC / 2); - /* Killed or no task available? Leave this thread. + /* Request to exit? Leave this thread. */ - if (member->kill || - !member->func) + if (set->exit) { + /* The last thread should cleanup the set. + */ + cleanup = set->n_threads == 1; break; + } + + /* No task available? Wait for being reused. + */ + if (task == NULL) { + if (!vips_threadset_reuse_wait(set)) { + set->n_idle_threads--; + break; + } + + continue; + } + + /* A task was received and there was no request to exit. + */ + g_async_queue_unlock(set->queue); /* If we're profiling, attach a prof struct to this thread. */ if (vips__thread_profile) - vips__thread_profile_attach(member->domain); + vips__thread_profile_attach(task->domain); /* Execute the task. */ - member->func(member->data, member->user_data); + task->func(task->data, NULL); /* Free any thread-private resources -- they will not be * useful for the next task to use this thread. */ vips_thread_shutdown(); + VIPS_FREE(task); - member->domain = NULL; - member->func = NULL; - member->data = NULL; - member->user_data = NULL; - - /* We are free ... back on the free list! - */ - g_mutex_lock(set->lock); - set->free = g_slist_prepend(set->free, member); - g_mutex_unlock(set->lock); + g_async_queue_lock(set->queue); } - /* Timed-out or kill has been requested ... remove from both free - * and member list. + /* Timed-out or exit has been requested, decrement number of threads. */ - g_mutex_lock(set->lock); - set->free = g_slist_remove(set->free, member); - set->members = g_slist_remove(set->members, member); - set->n_threads -= 1; - VIPS_DEBUG_MSG("vips_threadset_work: stopping %p (%d remaining)\n", - member, set->n_threads); - g_mutex_unlock(set->lock); + set->n_threads--; + VIPS_DEBUG_MSG( + "vips_threadset_work: stopping %p (%d remaining, %d idle)\n", + g_thread_self(), set->n_threads, set->n_idle_threads); - vips_semaphore_destroy(&member->idle); + g_async_queue_unlock(set->queue); - VIPS_FREE(member); + if (cleanup) + vips_threadset_free_internal(set); return NULL; } -/* Create a new idle member for the set. +/* Add a new thread to the set. */ -static VipsThreadsetMember * -vips_threadset_add(VipsThreadset *set) +static gboolean +vips_threadset_add_thread(VipsThreadset *set) { - VipsThreadsetMember *member; + gboolean reused = FALSE; - if (set->max_threads && - set->n_threads >= set->max_threads) { - vips_error("VipsThreadset", - "%s", _("threadset is exhausted")); - return NULL; - } + /* There are already sufficient threads running. + */ + if (set->max_threads > 0 && + set->n_threads >= set->max_threads) + return TRUE; - member = g_new0(VipsThreadsetMember, 1); - member->set = set; + if (set->n_idle_threads > 0) { + vips_semaphore_up(&set->idle); - vips_semaphore_init(&member->idle, 0, "idle"); + set->n_idle_threads--; + reused = TRUE; + } - if (!(member->thread = vips_g_thread_new("libvips worker", - vips_threadset_work, member))) { - vips_semaphore_destroy(&member->idle); - VIPS_FREE(member); + if (!reused) { + /* No idle thread was found, we have to start a new one. + */ + GThread *thread; - return NULL; - } + if (!(thread = vips_g_thread_new("libvips worker", + vips_threadset_work, set))) + return FALSE; - /* Ensure idle threads are freed on exit, this - * ref is increased before the thread is joined. - */ - g_thread_unref(member->thread); + /* Ensure threads are freed on exit. + */ + g_thread_unref(thread); - g_mutex_lock(set->lock); - set->members = g_slist_prepend(set->members, member); - set->n_threads += 1; - set->n_threads_highwater = - VIPS_MAX(set->n_threads_highwater, set->n_threads); - g_mutex_unlock(set->lock); + set->n_threads++; + set->n_threads_highwater = + VIPS_MAX(set->n_threads_highwater, set->n_threads); + } - return member; + return TRUE; } /** @@ -220,8 +256,8 @@ vips_threadset_add(VipsThreadset *set) * vips_threadset_run(), with no limit on the number of threads. * * If @max_threads is > 0, then that many threads will be created by - * vips_threadset_new() during startup and vips_threadset_run() will fail if - * no free threads are available. + * vips_threadset_new() during startup and vips_threadset_run() will + * not spawn any additional threads. * * Returns: the new threadset. */ @@ -231,19 +267,16 @@ vips_threadset_new(int max_threads) VipsThreadset *set; set = g_new0(VipsThreadset, 1); - set->lock = vips_g_mutex_new(); + set->queue = g_async_queue_new(); + vips_semaphore_init(&set->idle, 0, "idle"); set->max_threads = max_threads; if (set->max_threads > 0) for (int i = 0; i < set->max_threads; i++) { - VipsThreadsetMember *member; - - if (!(member = vips_threadset_add(set))) { + if (!vips_threadset_add_thread(set)) { vips_threadset_free(set); return NULL; } - - set->free = g_slist_prepend(set->free, member); } return set; @@ -256,8 +289,9 @@ vips_threadset_new(int max_threads) * @func: the task to execute * @data: the task's data * - * Execute a task in a thread. If there are no idle threads, create a new one, - * provided we are under @max_threads. + * Execute a task in a thread. If there are no idle threads and the maximum + * thread limit specified by @max_threads has not been reached, a new thread + * will be spawned. * * See also: vips_threadset_new(). * @@ -267,91 +301,71 @@ int vips_threadset_run(VipsThreadset *set, const char *domain, GFunc func, gpointer data) { - VipsThreadsetMember *member; + VipsThreadExec *task; - member = NULL; + g_async_queue_lock(set->queue); - /* Try to get an idle thread. + /* Create a new thread if there are no waiting threads in the queue. */ - g_mutex_lock(set->lock); - if (set->free) { - member = (VipsThreadsetMember *) set->free->data; - set->free = g_slist_remove(set->free, member); - } - g_mutex_unlock(set->lock); + if (g_async_queue_length_unlocked(set->queue) >= 0) + if (!vips_threadset_add_thread(set)) { + g_async_queue_unlock(set->queue); - /* None? Make a new idle but not free member. - */ - if (!member) - member = vips_threadset_add(set); + /* Thread create has failed. + */ + return -1; + } - /* Still nothing? Thread create has failed. + /* Allocate the task and push it into the queue. */ - if (!member) - return -1; + task = g_new0(VipsThreadExec, 1); + task->domain = domain; + task->func = func; + task->data = data; - /* Allocate the task and set it going. - */ - member->domain = domain; - member->func = func; - member->data = data; - member->user_data = NULL; - vips_semaphore_up(&member->idle); + g_async_queue_push_unlocked(set->queue, task); + g_async_queue_unlock(set->queue); return 0; } -/* Kill a member. - */ -static void -vips_threadset_kill_member(VipsThreadsetMember *member) -{ - GThread *thread; - - thread = g_thread_ref(member->thread); - member->kill = TRUE; - - vips_semaphore_up(&member->idle); - - (void) g_thread_join(thread); - - /* member is freed on thread exit. - */ -} - /** * vips_threadset_free: * @set: the threadset to free * - * Free a threadset. This call will block until all pending tasks are - * finished. + * Free a threadset. This call returns immediately. */ void vips_threadset_free(VipsThreadset *set) { VIPS_DEBUG_MSG("vips_threadset_free: %p\n", set); - /* Try to get and finish a thread. - */ - for (;;) { - VipsThreadsetMember *member; + g_async_queue_lock(set->queue); - member = NULL; - g_mutex_lock(set->lock); - if (set->members) - member = (VipsThreadsetMember *) set->members->data; - g_mutex_unlock(set->lock); + if (vips__leak) + printf("vips_threadset_free: peak of %d threads\n", + set->n_threads_highwater); - if (!member) - break; + set->exit = TRUE; - vips_threadset_kill_member(member); + /* No threads left, we cleanup. + */ + if (set->n_threads == 0) { + g_async_queue_unlock(set->queue); + vips_threadset_free_internal(set); + return; } - if (vips__leak) - printf("vips_threadset_free: peak of %d threads\n", - set->n_threads_highwater); + /* Wake up idle threads, if any. + */ + if (set->n_idle_threads > 0) + vips_semaphore_upn(&set->idle, set->n_idle_threads); - VIPS_FREEF(vips_g_mutex_free, set->lock); - VIPS_FREE(set); + /* Send dummy data to the queue, causing threads to wake up and check + * the above set->exit condition. + */ + for (int i = 0; i < set->n_threads; i++) + g_async_queue_push_unlocked(set->queue, GUINT_TO_POINTER(1)); + + g_async_queue_unlock(set->queue); }