diff --git a/libvips/iofuncs/threadset.c b/libvips/iofuncs/threadset.c index 336b30e8a..086800a5d 100644 --- a/libvips/iofuncs/threadset.c +++ b/libvips/iofuncs/threadset.c @@ -75,22 +75,14 @@ struct _VipsThreadset { */ GAsyncQueue *idle_thread_queue; - /* A semaphore used to indicate when the last thread has - * completed its execution. - */ - VipsSemaphore finish; - - /* The current number of threads, the highwater mark, and - * the max we allow before blocking thread creation. + /* The current number of (idle-)threads, the highwater mark, + * and the max we allow before blocking thread creation. */ int n_threads; int n_threads_highwater; + int n_idle_threads; int max_threads; - /* The current number of idle threads. - */ - int idle_threads; - /* Set by our controller to request exit. */ gboolean exit; @@ -112,10 +104,10 @@ vips_threadset_reuse_wait(VipsThreadset *set) /* A superfluous thread? Leave this thread. */ - if (set->idle_threads >= max_idle_threads) + if (set->n_idle_threads >= max_idle_threads) return FALSE; - set->idle_threads++; + set->n_idle_threads++; g_async_queue_unlock(set->task_queue); @@ -125,18 +117,26 @@ vips_threadset_reuse_wait(VipsThreadset *set) g_async_queue_lock(set->task_queue); - set->idle_threads--; + set->n_idle_threads--; return result != NULL; } +static void +vips_threadset_free_internal(VipsThreadset *set) +{ + VIPS_FREEF(g_async_queue_unref, set->task_queue); + VIPS_FREEF(g_async_queue_unref, set->idle_thread_queue); + VIPS_FREE(set); +} + /* The thread work function. */ static void * vips_threadset_work(void *pointer) { VipsThreadset *set = (VipsThreadset *) pointer; - gboolean signal_last; + gboolean cleanup = FALSE; VIPS_DEBUG_MSG("vips_threadset_work: starting %p\n", g_thread_self()); @@ -154,8 +154,12 @@ vips_threadset_work(void *pointer) /* Request to exit? Leave this thread. */ - if (set->exit) + if (set->exit) { + /* The last thread should cleanup the set. + */ + cleanup = set->n_threads == 1; break; + } /* No task available? Wait for being reused. */ @@ -191,17 +195,14 @@ vips_threadset_work(void *pointer) /* Timed-out or exit has been requested, decrement number of threads. */ set->n_threads--; - signal_last = set->exit && set->n_threads == 0; VIPS_DEBUG_MSG( "vips_threadset_work: stopping %p (%d remaining, %d idle)\n", g_thread_self(), set->n_threads, set->idle_threads); g_async_queue_unlock(set->task_queue); - /* We are the last thread: tell the main thread. - */ - if (signal_last) - vips_semaphore_up(&set->finish); + if (cleanup) + vips_threadset_free_internal(set); return NULL; } @@ -274,7 +275,6 @@ vips_threadset_new(int max_threads) set = g_new0(VipsThreadset, 1); set->task_queue = g_async_queue_new(); set->idle_thread_queue = g_async_queue_new(); - vips_semaphore_init(&set->finish, 0, "finish"); set->max_threads = max_threads; if (set->max_threads > 0) @@ -339,44 +339,37 @@ vips_threadset_run(VipsThreadset *set, * vips_threadset_free: * @set: the threadset to free * - * Free a threadset. This call will block until all pending tasks are - * finished. + * Free a threadset. This call returns immediately. */ void vips_threadset_free(VipsThreadset *set) { - int n_threads; - VIPS_DEBUG_MSG("vips_threadset_free: %p\n", set); g_async_queue_lock(set->task_queue); + if (vips__leak) + printf("vips_threadset_free: peak of %d threads\n", + set->n_threads_highwater); + set->exit = TRUE; - n_threads = set->n_threads; + /* No threads left, we cleanup. + */ + if (set->n_threads == 0) { + g_async_queue_unlock(set->task_queue); + vips_threadset_free_internal(set); + return; + } /* Send dummy data to the queue, causing threads to wake up and check * the above set->exit condition. */ - for (int i = 0; i < set->idle_threads; i++) + for (int i = 0; i < set->n_idle_threads; i++) g_async_queue_push(set->idle_thread_queue, GUINT_TO_POINTER(1)); - for (int i = 0; i < n_threads; i++) + for (int i = 0; i < set->n_threads; i++) g_async_queue_push_unlocked(set->task_queue, GUINT_TO_POINTER(1)); g_async_queue_unlock(set->task_queue); - - /* Wait for the last thread to finish. - */ - if (n_threads > 0) - vips_semaphore_down(&set->finish); - vips_semaphore_destroy(&set->finish); - - if (vips__leak) - printf("vips_threadset_free: peak of %d threads\n", - set->n_threads_highwater); - - VIPS_FREEF(g_async_queue_unref, set->task_queue); - VIPS_FREEF(g_async_queue_unref, set->idle_thread_queue); - VIPS_FREE(set); }