Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for data races when threads are dynamically created #49

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions src/tracer/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ WRAPPERS_CORE = \
core_SRCS = \
calltrace.c calltrace.h \
signals.c signals.h \
pthread_redirect.c pthread_redirect.h \
xml-parse.c xml-parse.h \
UF_gcc_instrument.c UF_gcc_instrument.h \
UF_xl_instrument.c UF_xl_instrument.h \
Expand Down
10 changes: 9 additions & 1 deletion src/tracer/calltrace.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
#include "trace_macros.h"
#include "wrapper.h"
#include "common_hwc.h"
#include "pthread_redirect.h"

extern pthread_rwlock_t pThread_mtx_change_number_threads;

//#define DEBUG
//#define MPICALLER_DEBUG
Expand Down Expand Up @@ -87,14 +90,19 @@ void Extrae_trace_callers (iotimer_t time, int offset, int type)
{
if (Trace_Caller[type][current_deep-offset])
{
mtx_rw_rdlock(&pThread_mtx_change_number_threads);
TRACE_EVENT(time, CALLER_EVENT_TYPE(type, current_deep-offset+1), (UINT64)ip);
mtx_rw_unlock(&pThread_mtx_change_number_threads);
}
}
#if defined(SAMPLING_SUPPORT)
else if (type == CALLER_SAMPLING)
{
if (Trace_Caller[type][current_deep-offset])
if (Trace_Caller[type][current_deep-offset]) {
mtx_rw_rdlock(&pThread_mtx_change_number_threads);
SAMPLE_EVENT_NOHWC(time, SAMPLING_EV+current_deep-offset+1, (UINT64) ip);
mtx_rw_unlock(&pThread_mtx_change_number_threads);
}
}
#endif
}
Expand Down
16 changes: 15 additions & 1 deletion src/tracer/clocks/clock.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@
# error "Unhandled clock type"
#endif

#include "pthread_redirect.h"
static pthread_rwlock_t pThread_mtx_extrae_last_read_clock = PTHREAD_RWLOCK_INITIALIZER;

static UINT64 *_extrae_last_read_clock = NULL;
static unsigned ClockType = REAL_CLOCK;
iotimer_t (*get_clock)();
Expand All @@ -89,7 +92,11 @@ unsigned Clock_getType (void)
/* We obtain the last read time */
UINT64 Clock_getLastReadTime (unsigned thread)
{
return _extrae_last_read_clock[thread];
UINT64 ret;
mtx_rw_rdlock(&pThread_mtx_extrae_last_read_clock);
ret = _extrae_last_read_clock[thread];
mtx_rw_unlock(&pThread_mtx_extrae_last_read_clock);
return ret;
}

/* We obtain the current time, but we don't store it in the last read time */
Expand All @@ -103,24 +110,31 @@ UINT64 Clock_getCurrentTime_nstore (void)
UINT64 Clock_getCurrentTime (unsigned thread)
{
UINT64 tmp = Clock_getCurrentTime_nstore ();
mtx_rw_rdlock(&pThread_mtx_extrae_last_read_clock);
_extrae_last_read_clock[thread] = tmp;
mtx_rw_unlock(&pThread_mtx_extrae_last_read_clock);
return tmp;
}

void Clock_AllocateThreads (unsigned numthreads)
{
mtx_rw_wrlock(&pThread_mtx_extrae_last_read_clock);
_extrae_last_read_clock = (UINT64*) realloc (_extrae_last_read_clock,
sizeof(UINT64)*numthreads);
if (_extrae_last_read_clock == NULL)
{
fprintf (stderr, PACKAGE_NAME": Cannot allocate timing memory for %u threads\n", numthreads);
mtx_rw_unlock(&pThread_mtx_extrae_last_read_clock);
exit (-1);
}
mtx_rw_unlock(&pThread_mtx_extrae_last_read_clock);
}

void Clock_CleanUp (void)
{
mtx_rw_wrlock(&pThread_mtx_extrae_last_read_clock);
xfree (_extrae_last_read_clock);
mtx_rw_unlock(&pThread_mtx_extrae_last_read_clock);
}

void Clock_Initialize (unsigned numthreads)
Expand Down
57 changes: 46 additions & 11 deletions src/tracer/hwc/common_hwc.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@
#if defined(ENABLE_PEBS_SAMPLING)
# include "sampling-intel-pebs.h"
#endif
#include <pthread.h>
#include "pthread_redirect.h"

pthread_mutex_t pThread_mtx_HWC_current_changetype = PTHREAD_MUTEX_INITIALIZER;
extern pthread_rwlock_t pThread_mtx_change_number_threads;

/*------------------------------------------------ Global Variables ---------*/
int HWCEnabled = FALSE; /* Have the HWC been started? */
Expand Down Expand Up @@ -103,7 +108,11 @@ int HWC_IsEnabled()
*/
int HWC_Get_Current_Set (int threadid)
{
return HWC_current_set[threadid];
int ret;
mtx_rw_rdlock(&pThread_mtx_change_number_threads);
ret = HWC_current_set[threadid];
mtx_rw_unlock(&pThread_mtx_change_number_threads);
return ret;
}

/*
Expand Down Expand Up @@ -194,9 +203,11 @@ void HWC_Stop_Current_Set (UINT64 time, int thread_id)
{
/* make sure we don't loose the current counter values */
Extrae_counters_at_Time_Wrapper(time);


mtx_rw_rdlock(&pThread_mtx_change_number_threads);
/* Actually stop the counters */
HWCBE_STOP_SET (time, HWC_current_set[thread_id], thread_id);
mtx_rw_unlock(&pThread_mtx_change_number_threads);
}
}

Expand All @@ -209,8 +220,10 @@ void HWC_Start_Current_Set (UINT64 countglops, UINT64 time, int thread_id)
/* If there are less than 2 sets, don't do anything! */
if (HWC_num_sets > 0)
{
mtx_rw_rdlock(&pThread_mtx_change_number_threads);
/* Actually start the counters */
HWCBE_START_SET (countglops, time, HWC_current_set[thread_id], thread_id);
mtx_rw_unlock(&pThread_mtx_change_number_threads);
}
}

Expand All @@ -226,11 +239,13 @@ void HWC_Start_Next_Set (UINT64 countglops, UINT64 time, int thread_id)
{
HWC_Stop_Current_Set (time, thread_id);

mtx_rw_rdlock(&pThread_mtx_change_number_threads);
/* Move to the next set */
if (HWC_current_changeto == CHANGE_SEQUENTIAL)
HWC_current_set[thread_id] = (HWC_current_set[thread_id] + 1) % HWC_num_sets;
else if (HWC_current_changeto == CHANGE_RANDOM)
HWC_current_set[thread_id] = random()%HWC_num_sets;
mtx_rw_unlock(&pThread_mtx_change_number_threads);

HWC_Start_Current_Set (countglops, time, thread_id);
}
Expand All @@ -247,11 +262,13 @@ void HWC_Start_Previous_Set (UINT64 countglops, UINT64 time, int thread_id)
{
HWC_Stop_Current_Set (time, thread_id);

mtx_rw_rdlock(&pThread_mtx_change_number_threads);
/* Move to the previous set */
if (HWC_current_changeto == CHANGE_SEQUENTIAL)
HWC_current_set[thread_id] = ((HWC_current_set[thread_id] - 1) < 0) ? (HWC_num_sets - 1) : (HWC_current_set[thread_id] - 1) ;
else if (HWC_current_changeto == CHANGE_RANDOM)
HWC_current_set[thread_id] = random()%HWC_num_sets;
mtx_rw_unlock(&pThread_mtx_change_number_threads);

HWC_Start_Current_Set (countglops, time, thread_id);
}
Expand Down Expand Up @@ -291,12 +308,13 @@ static inline int CheckForHWCSetChange_TIME (UINT64 countglops, UINT64 time, int
int ret = 0;

// fprintf (stderr, "HWC_current_timebegin[%d]=%llu HWC_current_changeat=%llu time = %llu\n", THREADID, HWC_current_timebegin[threadid], HWC_current_changeat, time);

mtx_rw_rdlock(&pThread_mtx_change_number_threads);
if (HWC_current_timebegin[threadid] + HWC_current_changeat < time)
{
HWC_Start_Next_Set (countglops, time, threadid);
ret = 1;
}
mtx_rw_unlock(&pThread_mtx_change_number_threads);
return ret;
}

Expand All @@ -309,12 +327,16 @@ static inline int CheckForHWCSetChange_TIME (UINT64 countglops, UINT64 time, int
*/
int HWC_Check_Pending_Set_Change (UINT64 countglops, UINT64 time, int thread_id)
{
if (HWC_current_changetype == CHANGE_GLOPS)
mtx_lock(&pThread_mtx_HWC_current_changetype);
if (HWC_current_changetype == CHANGE_GLOPS) {
mtx_unlock(&pThread_mtx_HWC_current_changetype);
return CheckForHWCSetChange_GLOPS(countglops, time, thread_id);
else if (HWC_current_changetype == CHANGE_TIME)
} else if (HWC_current_changetype == CHANGE_TIME) {
mtx_unlock(&pThread_mtx_HWC_current_changetype);
return CheckForHWCSetChange_TIME(countglops, time, thread_id);
else
return 0;
}
mtx_unlock(&pThread_mtx_HWC_current_changetype);
return 0;
}

/**
Expand All @@ -324,7 +346,6 @@ int HWC_Check_Pending_Set_Change (UINT64 countglops, UINT64 time, int thread_id)
void HWC_Initialize (int options)
{
int num_threads = Backend_getMaximumOfThreads();

HWC_current_set = (int *)malloc(sizeof(int) * num_threads);
ASSERT(HWC_current_set != NULL, "Cannot allocate memory for HWC_current_set");
memset (HWC_current_set, 0, sizeof(int) * num_threads);
Expand Down Expand Up @@ -393,8 +414,9 @@ void HWC_Start_Counters (int num_threads, UINT64 time, int forked)
HWC_Accum_Reset(i);
}

if (HWC_num_sets <= 0)
if (HWC_num_sets <= 0) {
return;
}

HWCEnabled = TRUE;
}
Expand All @@ -403,6 +425,7 @@ void HWC_Start_Counters (int num_threads, UINT64 time, int forked)
HWCEnabled = HWCBE_START_COUNTERS_THREAD (time, 0, forked);

/* Inherit hwc set change values from thread 0 */
mtx_rw_rdlock(&pThread_mtx_change_number_threads);
for (i = 1; i < num_threads; i++)
{
/*
Expand All @@ -414,6 +437,7 @@ void HWC_Start_Counters (int num_threads, UINT64 time, int forked)
HWC_current_timebegin[i] = HWC_current_timebegin[0];
HWC_current_glopsbegin[i] = HWC_current_glopsbegin[0];
}
mtx_rw_unlock(&pThread_mtx_change_number_threads);
}

/**
Expand All @@ -424,7 +448,6 @@ void HWC_Start_Counters (int num_threads, UINT64 time, int forked)
void HWC_Restart_Counters (int old_num_threads, int new_num_threads)
{
int i;

#if defined(PAPI_COUNTERS)
for (i = 0; i < HWC_num_sets; i++)
HWCBE_PAPI_Allocate_eventsets_per_thread (i, old_num_threads, new_num_threads);
Expand Down Expand Up @@ -588,12 +611,14 @@ int HWC_Read (unsigned int tid, UINT64 time, long long *store_buffer)

if (HWCEnabled)
{
mtx_rw_rdlock(&pThread_mtx_change_number_threads);
if (!HWC_Thread_Initialized[tid])
HWCBE_START_COUNTERS_THREAD(time, tid, FALSE);
TOUCH_LASTFIELD( store_buffer );

read_ok = HWCBE_READ (tid, store_buffer);
reset_ok = (Reset_After_Read ? HWCBE_RESET (tid) : TRUE);
mtx_rw_unlock(&pThread_mtx_change_number_threads);
}
return (HWCEnabled && read_ok && reset_ok);
}
Expand Down Expand Up @@ -629,6 +654,7 @@ int HWC_Accum (unsigned int tid, UINT64 time)

if (HWCEnabled)
{
mtx_rw_rdlock(&pThread_mtx_change_number_threads);
if (!HWC_Thread_Initialized[tid])
HWCBE_START_COUNTERS_THREAD(time, tid, FALSE);
TOUCH_LASTFIELD( Accumulated_HWC[tid] );
Expand All @@ -642,6 +668,7 @@ int HWC_Accum (unsigned int tid, UINT64 time)
#endif

Accumulated_HWC_Valid[tid] = TRUE;
mtx_rw_unlock(&pThread_mtx_change_number_threads);
}
return (HWCEnabled && accum_ok);
}
Expand All @@ -665,7 +692,11 @@ int HWC_Accum_Reset (unsigned int tid)
/** Returns whether Accumulated_HWC contains valid values or not */
int HWC_Accum_Valid_Values (unsigned int tid)
{
return ( HWCEnabled ? Accumulated_HWC_Valid[tid] : 0 );
int ret;
mtx_rw_rdlock(&pThread_mtx_change_number_threads);
ret = ( HWCEnabled ? Accumulated_HWC_Valid[tid] : 0 );
mtx_rw_unlock(&pThread_mtx_change_number_threads);
return ret;
}

/**
Expand All @@ -677,7 +708,9 @@ int HWC_Accum_Copy_Here (unsigned int tid, long long *store_buffer)
{
if (HWCEnabled)
{
mtx_rw_rdlock(&pThread_mtx_change_number_threads);
memcpy(store_buffer, Accumulated_HWC[tid], MAX_HWC * sizeof(long long));
mtx_rw_unlock(&pThread_mtx_change_number_threads);
return 1;
}
else return 0;
Expand All @@ -693,10 +726,12 @@ int HWC_Accum_Add_Here (unsigned int tid, long long *store_buffer)
int i;
if (HWCEnabled)
{
mtx_rw_rdlock(&pThread_mtx_change_number_threads);
for (i=0; i<MAX_HWC; i++)
{
store_buffer[i] += Accumulated_HWC[tid][i];
}
mtx_rw_unlock(&pThread_mtx_change_number_threads);
return 1;
}
else return 0;
Expand Down
3 changes: 3 additions & 0 deletions src/tracer/hwc/common_hwc.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "num_hwc.h"
#include "hwc_version.h"
#include "hwc.h"
#include "pthread_redirect.h"

/*------------------------------------------------ Structures ---------------*/

Expand Down Expand Up @@ -79,6 +80,8 @@ extern unsigned long long * HWC_current_glopsbegin;
extern enum ChangeType_t HWC_current_changetype;
extern int * HWC_current_set;

extern pthread_mutex_t pThread_mtx_HWC_current_changetype;

/*------------------------------------------------ Useful macros ------------*/

/**
Expand Down
Loading