Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

threadpool.c: threadpool implementation #207

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .indent.pro
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,6 @@
/* project-specific types */
-TFILE_INFO
-TPROCESS_INFO
-Tpackaged_task
-Tthreadpool

1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ To build it you are going to need the following tools:

As well as the following libraries:
* libcrypto
* libpthread
### Build from github repository
In order to build from the github repository directly, you are also going
to need
Expand Down
19 changes: 18 additions & 1 deletion configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ AM_CONDITIONAL([HAVE_XXD], [test -n "$XXD"])

dnl Checks for libraries
AC_CHECK_LIB(crypto, EVP_sha1,, AC_MSG_ERROR([cannot find libcrypto]))
AC_CHECK_LIB(pthread, pthread_create,, AC_MSG_ERROR([cannot find libpthread]))

dnl Checks for headers
AC_CHECK_HEADER([errno.h],, AC_MSG_ERROR([cannot find errno.h]))
Expand All @@ -65,6 +66,7 @@ AC_CHECK_HEADER([sys/sysinfo.h],, AC_MSG_ERROR([cannot find sys/sysinfo.h]))
AC_CHECK_HEADER([sys/types.h],, AC_MSG_ERROR([cannot find sys/types.h]))
AC_CHECK_HEADER([sys/wait.h],, AC_MSG_ERROR([cannot find sys/wait.h]))
AC_CHECK_HEADER([sysexits.h],, AC_MSG_ERROR([cannot find sysexits.h]))
AC_CHECK_HEADER([pthread.h],, AC_MSG_ERROR([cannot find pthread.h]))
AC_CHECK_HEADER([time.h],, AC_MSG_ERROR([cannot find time.h]))
AC_CHECK_HEADER([unistd.h],, AC_MSG_ERROR([cannot find unistd.h]))

Expand All @@ -77,8 +79,11 @@ AC_CHECK_TYPE([ptrdiff_t],, AC_MSG_ERROR([cannot find ptrdiff_t]), [#include <st
AC_CHECK_TYPE([struct ptrace_syscall_info],, AC_MSG_ERROR([cannot find struct ptrace_syscall_info]), [#include <linux/ptrace.h>])
AC_CHECK_TYPE([time_t],, AC_MSG_ERROR([cannot find time_t]), [#include <time.h>])
AC_CHECK_TYPE([uint8_t],, AC_MSG_ERROR([cannot find uint8_t]), [#include <stdint.h>])
AC_CHECK_TYPE([pthread_t],, AC_MSG_ERROR([cannot find pthread_t]), [#include <pthread.h>])
AC_CHECK_TYPE([pthread_mutex_t],, AC_MSG_ERROR([cannot find pthread_mutex_t]), [#include <pthread.h>])
AC_CHECK_TYPE([pthread_cond_t],, AC_MSG_ERROR([cannot find pthread_cond_t]), [#include <pthread.h>])


AC_CHECK_DECL([PATH_MAX],, AC_MSG_ERROR([cannot find PATH_MAX]), [#include <linux/limits.h>])
AC_CHECK_DECL([ARG_MAX],, AC_MSG_ERROR([cannot find ARG_MAX]), [#include <linux/limits.h>])
AC_CHECK_DECL([AT_FDCWD],, AC_MSG_ERROR([cannot find AT_FDCWD]), [#include <fcntl.h>])
AC_CHECK_DECL([ENOENT],, AC_MSG_ERROR([cannot find ENOENT]), [#include <errno.h>])
Expand Down Expand Up @@ -164,6 +169,18 @@ AC_CHECK_FUNC([time],, AC_MSG_ERROR([cannot find time(2)]))
AC_CHECK_FUNC([wait],, AC_MSG_ERROR([cannot find wait(2)]))
AC_CHECK_FUNC([waitpid],, AC_MSG_ERROR([cannot find waitpid(2)]))

AC_CHECK_FUNC([pthread_mutex_lock],, AC_MSG_ERROR([cannot find pthread_mutex_lock(3)]))
AC_CHECK_FUNC([pthread_cond_wait],, AC_MSG_ERROR([cannot find pthread_cond_wait(3)]))
AC_CHECK_FUNC([pthread_mutex_unlock],, AC_MSG_ERROR([cannot find pthread_mutex_unlock(3)]))
AC_CHECK_FUNC([pthread_mutex_init],, AC_MSG_ERROR([cannot find pthread_mutex_init(3)]))
AC_CHECK_FUNC([pthread_cond_init],, AC_MSG_ERROR([cannot find pthread_cond_init(3)]))
AC_CHECK_FUNC([pthread_create],, AC_MSG_ERROR([cannot find pthread_create(3)]))
AC_CHECK_FUNC([pthread_cond_signal],, AC_MSG_ERROR([cannot find pthread_cond_signal(3)]))
AC_CHECK_FUNC([pthread_cond_broadcast],, AC_MSG_ERROR([cannot find pthread_cond_broadcast(3)]))
AC_CHECK_FUNC([pthread_join],, AC_MSG_ERROR([cannot find pthread_join(3)]))
AC_CHECK_FUNC([pthread_mutex_destroy],, AC_MSG_ERROR([cannot find pthread_mutex_destroy(3)]))
AC_CHECK_FUNC([pthread_cond_destroy],, AC_MSG_ERROR([cannot find pthread_cond_destroy(3)]))

dnl We need a C compiler
AC_PROG_CC

Expand Down
1 change: 1 addition & 0 deletions src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ build_recorder_SOURCES = \
record.c \
schema.c \
tracer.c \
threadpool.c \
$(EMPTY)

SCHEMA = ../doc/build-recorder-schema.ttl
Expand Down
81 changes: 81 additions & 0 deletions src/threadpool.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@

/*
Copyright (C) 2022 Valasiadis Fotios
SPDX-License-Identifier: LGPL-2.1-or-later
*/

#include <stdlib.h>
#include <error.h>
#include <errno.h>
#include "threadpool.h"
#include <stdio.h>

static void *
worker(void *arg)
{
threadpool *pool = (threadpool *) arg;

while (1) {
pthread_mutex_lock(&pool->lock);
while (!pool->stop && pool->numtasks == -1) {
pthread_cond_wait(&pool->cond, &pool->lock);
}
if (pool->stop && pool->numtasks == -1) {
pthread_mutex_unlock(&pool->lock);
return NULL;
}
packaged_task task = pool->tasks[pool->numtasks--];

pthread_mutex_unlock(&pool->lock);

task.fun(task.arg);
}
}

void
threadpool_new(threadpool *self, int workers_size)
{
pthread_mutex_init(&self->lock, NULL);
pthread_cond_init(&self->cond, NULL);

self->workers = malloc(workers_size * sizeof (pthread_t));
self->workers_size = workers_size;
self->stop = 0;

self->tasks_size = 256;
self->tasks = malloc(self->tasks_size * sizeof (packaged_task));
self->numtasks = -1;

while (workers_size--) {
if (pthread_create(self->workers + workers_size, NULL, worker, self) <
0) {
error(EXIT_FAILURE, errno, "on threadpool_new pthread_create");
}
}
}

void
threadpool_enqueue(threadpool *self, packaged_task task)
{
pthread_mutex_lock(&self->lock);
self->tasks[++self->numtasks] = task;
pthread_mutex_unlock(&self->lock);

pthread_cond_signal(&self->cond);
}

void
threadpool_destroy(threadpool *self)
{
pthread_mutex_lock(&self->lock);
self->stop = 1;
pthread_mutex_unlock(&self->lock);

pthread_cond_broadcast(&self->cond);
while (self->workers_size--) {
pthread_join(self->workers[self->workers_size], NULL);
}

pthread_mutex_destroy(&self->lock);
pthread_cond_destroy(&self->cond);
}
31 changes: 31 additions & 0 deletions src/threadpool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@

/*
Copyright (C) 2022 Valasiadis Fotios
SPDX-License-Identifier: LGPL-2.1-or-later
*/

#include <pthread.h>

typedef struct {
void (*fun)(void *);
void *arg;
} packaged_task;

typedef struct {
pthread_t *workers;
int workers_size;

packaged_task *tasks;
int numtasks;
int tasks_size;

pthread_mutex_t lock;
pthread_cond_t cond;
char stop;
} threadpool;

void threadpool_new(threadpool *self, int workers_size);

void threadpool_enqueue(threadpool *self, packaged_task task);

void threadpool_destroy(threadpool *self);