Skip to content

Commit

Permalink
threadset: avoid thread oversubscription
Browse files Browse the repository at this point in the history
By having another queue of idle threads.
  • Loading branch information
kleisauke committed Sep 9, 2024
1 parent f91679a commit b7198e2
Showing 1 changed file with 117 additions and 54 deletions.
171 changes: 117 additions & 54 deletions libvips/iofuncs/threadset.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,13 @@ typedef struct _VipsThreadExec {
} VipsThreadExec;

struct _VipsThreadset {
/* A asynchronous queue of VipsThreadExec.
/* An asynchronous queue of tasks.
*/
GAsyncQueue *queue;
GAsyncQueue *task_queue;

/* An asynchronous queue of idle threads.
*/
GAsyncQueue *idle_thread_queue;

/* A semaphore used to indicate when the last thread has
* completed its execution.
Expand All @@ -83,42 +87,88 @@ struct _VipsThreadset {
int n_threads_highwater;
int max_threads;

/* The current number of idle threads.
*/
int idle_threads;

/* Set by our controller to request exit.
*/
gboolean exit;
};

/* The maximum relative time (in microseconds) that a thread waits
* for work before being stopped, when set->max_threads is 0.
* for work before being stopped.
*/
static const guint64 max_idle_time = 15 * G_TIME_SPAN_SECOND;

/* The maximum number of idle threads.
*/
static const int max_idle_threads = 2;

static gboolean
vips_threadset_reuse_wait(VipsThreadset *set)
{
gpointer result;

/* A superfluous thread? Leave this thread.
*/
if (set->idle_threads >= max_idle_threads)
return FALSE;

set->idle_threads++;

g_async_queue_unlock(set->task_queue);

/* Wait for at least 15 seconds before leaving this thread.
*/
result = g_async_queue_timeout_pop(set->idle_thread_queue, max_idle_time);

g_async_queue_lock(set->task_queue);

set->idle_threads--;

return result != NULL;
}

/* The thread work function.
*/
static void *
vips_threadset_work(void *pointer)
{
VipsThreadset *set = (VipsThreadset *) pointer;
gboolean signal_last;

VIPS_DEBUG_MSG("vips_threadset_work: starting %p\n", g_thread_self());

for (;;) {
g_async_queue_lock(set->queue);
g_async_queue_lock(set->task_queue);

for (;;) {
/* Pop a task from the queue. If the number of threads is limited,
* this will block until a task becomes available. Otherwise, it
* waits for at least 15 seconds before receiving work.
* waits for at least 1/2 second before being marked as idle.
*/
VipsThreadExec *task = set->max_threads > 0
? g_async_queue_pop_unlocked(set->queue)
: g_async_queue_timeout_pop_unlocked(set->queue, max_idle_time);
? g_async_queue_pop_unlocked(set->task_queue)
: g_async_queue_timeout_pop_unlocked(set->task_queue,
G_USEC_PER_SEC / 2);

/* No task available or request to exit? Leave this thread.
/* Request to exit? Leave this thread.
*/
if (task == NULL || set->exit)
if (set->exit)
break;

g_async_queue_unlock(set->queue);
/* No task available? Wait for being reused.
*/
if (task == NULL) {
if (!vips_threadset_reuse_wait(set))
break;

continue;
}

/* A task was received and there was no request to exit.
*/
g_async_queue_unlock(set->task_queue);

/* If we're profiling, attach a prof struct to this thread.
*/
Expand All @@ -134,16 +184,19 @@ vips_threadset_work(void *pointer)
*/
vips_thread_shutdown();
VIPS_FREE(task);

g_async_queue_lock(set->task_queue);
}

/* Timed-out or exit has been requested, decrement number of threads.
*/
set->n_threads--;
gboolean signal_last = set->exit && set->n_threads == 0;
VIPS_DEBUG_MSG("vips_threadset_work: stopping %p (%d remaining)\n",
g_thread_self(), set->n_threads);
signal_last = set->exit && set->n_threads == 0;
VIPS_DEBUG_MSG(
"vips_threadset_work: stopping %p (%d remaining, %d idle)\n",
g_thread_self(), set->n_threads, set->idle_threads);

g_async_queue_unlock(set->queue);
g_async_queue_unlock(set->task_queue);

/* We are the last thread: tell the main thread.
*/
Expand All @@ -153,32 +206,49 @@ vips_threadset_work(void *pointer)
return NULL;
}

/* Add a new idle thread to the set.
/* Add a new thread to the set.
*/
static int
static gboolean
vips_threadset_add_thread(VipsThreadset *set)
{
GThread *thread;
gboolean reused = FALSE;

/* There are already sufficient threads running.
*/
if (set->max_threads > 0 &&
set->n_threads >= set->max_threads)
return 0;
return TRUE;

if (!(thread = vips_g_thread_new("libvips worker",
vips_threadset_work, set)))
return -1;
g_async_queue_lock(set->idle_thread_queue);

/* Ensure idle threads are freed on exit.
*/
g_thread_unref(thread);
if (g_async_queue_length_unlocked(set->idle_thread_queue) < 0) {
g_async_queue_push_unlocked(set->idle_thread_queue,
GUINT_TO_POINTER(1));

set->n_threads++;
set->n_threads_highwater =
VIPS_MAX(set->n_threads_highwater, set->n_threads);
reused = TRUE;
}

return 0;
g_async_queue_unlock(set->idle_thread_queue);

if (!reused) {
/* No idle thread was found, we have to start a new one.
*/
GThread *thread;

if (!(thread = vips_g_thread_new("libvips worker",
vips_threadset_work, set)))
return FALSE;

/* Ensure threads are freed on exit.
*/
g_thread_unref(thread);

set->n_threads++;
set->n_threads_highwater =
VIPS_MAX(set->n_threads_highwater, set->n_threads);
}

return TRUE;
}

/**
Expand All @@ -202,25 +272,19 @@ vips_threadset_new(int max_threads)
VipsThreadset *set;

set = g_new0(VipsThreadset, 1);
set->queue = g_async_queue_new();
set->task_queue = g_async_queue_new();
set->idle_thread_queue = g_async_queue_new();
vips_semaphore_init(&set->finish, 0, "finish");
set->max_threads = max_threads;

if (set->max_threads > 0) {
g_async_queue_lock(set->queue);

if (set->max_threads > 0)
for (int i = 0; i < set->max_threads; i++) {
if (vips_threadset_add_thread(set)) {
g_async_queue_unlock(set->queue);

if (!vips_threadset_add_thread(set)) {
vips_threadset_free(set);
return NULL;
}
}

g_async_queue_unlock(set->queue);
}

return set;
}

Expand All @@ -245,18 +309,13 @@ vips_threadset_run(VipsThreadset *set,
{
VipsThreadExec *task;

g_async_queue_lock(set->queue);

/* The queue length is calculated as the number of tasks in the queue
* minus the number of waiting threads.
*/
const int queue_length = g_async_queue_length_unlocked(set->queue);
g_async_queue_lock(set->task_queue);

/* Create a new thread if there are no waiting threads in the queue.
*/
if (queue_length >= 0)
if (vips_threadset_add_thread(set)) {
g_async_queue_unlock(set->queue);
if (g_async_queue_length_unlocked(set->task_queue) >= 0)
if (!vips_threadset_add_thread(set)) {
g_async_queue_unlock(set->task_queue);

/* Thread create has failed.
*/
Expand All @@ -270,8 +329,8 @@ vips_threadset_run(VipsThreadset *set,
task->func = func;
task->data = data;

g_async_queue_push_unlocked(set->queue, task);
g_async_queue_unlock(set->queue);
g_async_queue_push_unlocked(set->task_queue, task);
g_async_queue_unlock(set->task_queue);

return 0;
}
Expand All @@ -290,7 +349,7 @@ vips_threadset_free(VipsThreadset *set)

VIPS_DEBUG_MSG("vips_threadset_free: %p\n", set);

g_async_queue_lock(set->queue);
g_async_queue_lock(set->task_queue);

set->exit = TRUE;

Expand All @@ -299,10 +358,13 @@ vips_threadset_free(VipsThreadset *set)
/* Send dummy data to the queue, causing threads to wake up and check
* the above set->exit condition.
*/
for (int i = 0; i < set->idle_threads; i++)
g_async_queue_push(set->idle_thread_queue, GUINT_TO_POINTER(1));

for (int i = 0; i < n_threads; i++)
g_async_queue_push_unlocked(set->queue, GUINT_TO_POINTER(1));
g_async_queue_push_unlocked(set->task_queue, GUINT_TO_POINTER(1));

g_async_queue_unlock(set->queue);
g_async_queue_unlock(set->task_queue);

/* Wait for the last thread to finish.
*/
Expand All @@ -314,6 +376,7 @@ vips_threadset_free(VipsThreadset *set)
printf("vips_threadset_free: peak of %d threads\n",
set->n_threads_highwater);

VIPS_FREEF(g_async_queue_unref, set->queue);
VIPS_FREEF(g_async_queue_unref, set->task_queue);
VIPS_FREEF(g_async_queue_unref, set->idle_thread_queue);
VIPS_FREE(set);
}

0 comments on commit b7198e2

Please sign in to comment.