Skip to content

Commit

Permalink
threadset: avoid an unnecessary semaphore
Browse files Browse the repository at this point in the history
Instead, let the last thread cleanup the set, if we request to exit.
  • Loading branch information
kleisauke committed Sep 9, 2024
1 parent b7198e2 commit 049358e
Showing 1 changed file with 36 additions and 43 deletions.
79 changes: 36 additions & 43 deletions libvips/iofuncs/threadset.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,22 +75,14 @@ struct _VipsThreadset {
*/
GAsyncQueue *idle_thread_queue;

/* A semaphore used to indicate when the last thread has
* completed its execution.
*/
VipsSemaphore finish;

/* The current number of threads, the highwater mark, and
* the max we allow before blocking thread creation.
/* The current number of (idle-)threads, the highwater mark,
* and the max we allow before blocking thread creation.
*/
int n_threads;
int n_threads_highwater;
int n_idle_threads;
int max_threads;

/* The current number of idle threads.
*/
int idle_threads;

/* Set by our controller to request exit.
*/
gboolean exit;
Expand All @@ -112,10 +104,10 @@ vips_threadset_reuse_wait(VipsThreadset *set)

/* A superfluous thread? Leave this thread.
*/
if (set->idle_threads >= max_idle_threads)
if (set->n_idle_threads >= max_idle_threads)
return FALSE;

set->idle_threads++;
set->n_idle_threads++;

g_async_queue_unlock(set->task_queue);

Expand All @@ -125,18 +117,26 @@ vips_threadset_reuse_wait(VipsThreadset *set)

g_async_queue_lock(set->task_queue);

set->idle_threads--;
set->n_idle_threads--;

return result != NULL;
}

static void
vips_threadset_free_internal(VipsThreadset *set)
{
VIPS_FREEF(g_async_queue_unref, set->task_queue);
VIPS_FREEF(g_async_queue_unref, set->idle_thread_queue);
VIPS_FREE(set);
}

/* The thread work function.
*/
static void *
vips_threadset_work(void *pointer)
{
VipsThreadset *set = (VipsThreadset *) pointer;
gboolean signal_last;
gboolean cleanup = FALSE;

VIPS_DEBUG_MSG("vips_threadset_work: starting %p\n", g_thread_self());

Expand All @@ -154,8 +154,12 @@ vips_threadset_work(void *pointer)

/* Request to exit? Leave this thread.
*/
if (set->exit)
if (set->exit) {
/* The last thread should cleanup the set.
*/
cleanup = set->n_threads == 1;
break;
}

/* No task available? Wait for being reused.
*/
Expand Down Expand Up @@ -191,17 +195,14 @@ vips_threadset_work(void *pointer)
/* Timed-out or exit has been requested, decrement number of threads.
*/
set->n_threads--;
signal_last = set->exit && set->n_threads == 0;
VIPS_DEBUG_MSG(
"vips_threadset_work: stopping %p (%d remaining, %d idle)\n",
g_thread_self(), set->n_threads, set->idle_threads);

g_async_queue_unlock(set->task_queue);

/* We are the last thread: tell the main thread.
*/
if (signal_last)
vips_semaphore_up(&set->finish);
if (cleanup)
vips_threadset_free_internal(set);

return NULL;
}
Expand Down Expand Up @@ -274,7 +275,6 @@ vips_threadset_new(int max_threads)
set = g_new0(VipsThreadset, 1);
set->task_queue = g_async_queue_new();
set->idle_thread_queue = g_async_queue_new();
vips_semaphore_init(&set->finish, 0, "finish");
set->max_threads = max_threads;

if (set->max_threads > 0)
Expand Down Expand Up @@ -339,44 +339,37 @@ vips_threadset_run(VipsThreadset *set,
* vips_threadset_free:
* @set: the threadset to free
*
* Free a threadset. This call will block until all pending tasks are
* finished.
* Free a threadset. This call returns immediately.
*/
void
vips_threadset_free(VipsThreadset *set)
{
int n_threads;

VIPS_DEBUG_MSG("vips_threadset_free: %p\n", set);

g_async_queue_lock(set->task_queue);

if (vips__leak)
printf("vips_threadset_free: peak of %d threads\n",
set->n_threads_highwater);

set->exit = TRUE;

n_threads = set->n_threads;
/* No threads left, we cleanup.
*/
if (set->n_threads == 0) {
g_async_queue_unlock(set->task_queue);
vips_threadset_free_internal(set);
return;
}

/* Send dummy data to the queue, causing threads to wake up and check
* the above set->exit condition.
*/
for (int i = 0; i < set->idle_threads; i++)
for (int i = 0; i < set->n_idle_threads; i++)
g_async_queue_push(set->idle_thread_queue, GUINT_TO_POINTER(1));

for (int i = 0; i < n_threads; i++)
for (int i = 0; i < set->n_threads; i++)
g_async_queue_push_unlocked(set->task_queue, GUINT_TO_POINTER(1));

g_async_queue_unlock(set->task_queue);

/* Wait for the last thread to finish.
*/
if (n_threads > 0)
vips_semaphore_down(&set->finish);
vips_semaphore_destroy(&set->finish);

if (vips__leak)
printf("vips_threadset_free: peak of %d threads\n",
set->n_threads_highwater);

VIPS_FREEF(g_async_queue_unref, set->task_queue);
VIPS_FREEF(g_async_queue_unref, set->idle_thread_queue);
VIPS_FREE(set);
}

0 comments on commit 049358e

Please sign in to comment.