Skip to content

Commit

Permalink
Merge pull request #73 from kseager/sw-reduction
Browse files Browse the repository at this point in the history
Collectives update: new recursive doubling reduction algorithm
  • Loading branch information
jdinan committed Dec 14, 2015
2 parents fbc1e3c + a8434c4 commit 7d5c025
Show file tree
Hide file tree
Showing 14 changed files with 476 additions and 64 deletions.
3 changes: 2 additions & 1 deletion configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,9 @@ AC_CHECK_SIZEOF([double])
AC_CHECK_SIZEOF([long double])
AC_CHECK_SIZEOF([void*])

C_LOG_MAXPES=32
C_BCAST_SYNC_SIZE=1
C_REDUCE_SYNC_SIZE=`echo "$C_BCAST_SYNC_SIZE 2 + p" | dc`
C_REDUCE_SYNC_SIZE=`echo "$C_BCAST_SYNC_SIZE 2 + $C_LOG_MAXPES + p" | dc`
C_BARRIER_SYNC_SIZE=`echo "$ac_cv_sizeof_int 8 * $ac_cv_sizeof_long / p" | dc`
tree_sync_size=`echo "$C_BCAST_SYNC_SIZE 3 + p" | dc`
recdbl_sync_size=`echo "$ac_cv_sizeof_int 8 * $ac_cv_sizeof_long / p" | dc`
Expand Down
3 changes: 2 additions & 1 deletion src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ libsma_la_SOURCES = \
synchronization_c.c \
collectives_c.c \
lock_c.c \
cache_management_c.c
cache_management_c.c \
op.h

if HAVE_FORTRAN
libsma_la_SOURCES += \
Expand Down
176 changes: 163 additions & 13 deletions src/collectives.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "shmem_internal.h"
#include "shmem_collectives.h"
#include "shmem.h"
#include "shmem_internal_op.h"

coll_type_t shmem_internal_barrier_type = AUTO;
coll_type_t shmem_internal_bcast_type = AUTO;
Expand Down Expand Up @@ -71,9 +72,11 @@ shmem_internal_collectives_init(int requested_crossover,

/* initialize barrier_all psync array */
shmem_internal_barrier_all_psync =
shmem_internal_shmalloc(sizeof(long) * _SHMEM_BARRIER_SYNC_SIZE);
shmem_internal_shmalloc(sizeof(long) * SHMEM_BARRIER_SYNC_SIZE);
if (NULL == shmem_internal_barrier_all_psync) return -1;
memset(shmem_internal_barrier_all_psync, 0, sizeof(long) * _SHMEM_BARRIER_SYNC_SIZE);

for (i = 0; i < SHMEM_BARRIER_SYNC_SIZE; i++)
shmem_internal_barrier_all_psync[i] = SHMEM_SYNC_VALUE;

/* initialize the binomial tree for collective operations over
entire tree */
Expand Down Expand Up @@ -140,6 +143,8 @@ shmem_internal_collectives_init(int requested_crossover,
shmem_internal_reduce_type = LINEAR;
} else if (0 == strcmp(type, "tree")) {
shmem_internal_reduce_type = TREE;
} else if (0 == strcmp(type, "recdbl")) {
shmem_internal_reduce_type = RECDBL;
} else {
fprintf(stderr, "[%03d] Bad reduction algorithm %s\n",
shmem_internal_my_pe, type);
Expand Down Expand Up @@ -188,7 +193,7 @@ shmem_internal_barrier_linear(int PE_start, int logPE_stride, int PE_size, long
int stride = 1 << logPE_stride;

/* need 1 slot */
assert(_SHMEM_BARRIER_SYNC_SIZE >= 1);
assert(SHMEM_BARRIER_SYNC_SIZE >= 1);

shmem_internal_quiet();

Expand Down Expand Up @@ -235,7 +240,7 @@ shmem_internal_barrier_tree(int PE_start, int logPE_stride, int PE_size, long *p
int parent, num_children, *children;

/* need 1 slot */
assert(_SHMEM_BARRIER_SYNC_SIZE >= 1);
assert(SHMEM_BARRIER_SYNC_SIZE >= 1);

shmem_internal_quiet();

Expand Down Expand Up @@ -326,7 +331,7 @@ shmem_internal_barrier_dissem(int PE_start, int logPE_stride, int PE_size, long
2^(sizeof(int)*8-1)-1, so make the math a bit easier and assume
2^(sizeof(int) * 8), which means log2(num_procs) is always less
than sizeof(int) * 8. */
assert(_SHMEM_BARRIER_SYNC_SIZE >= (sizeof(int) * 8) / sizeof(long));
assert(SHMEM_BARRIER_SYNC_SIZE >= (sizeof(int) * 8) / sizeof(long));

for (i = 0, distance = 1 ; distance < PE_size ; ++i, distance <<= 1) {
to = ((coll_rank + distance) % PE_size);
Expand Down Expand Up @@ -367,7 +372,7 @@ shmem_internal_bcast_linear(void *target, const void *source, size_t len,
long completion = 0;

/* need 1 slot */
assert(_SHMEM_BCAST_SYNC_SIZE >= 1);
assert(SHMEM_BCAST_SYNC_SIZE >= 1);

if (real_root == shmem_internal_my_pe) {
int i, pe;
Expand Down Expand Up @@ -428,7 +433,7 @@ shmem_internal_bcast_tree(void *target, const void *source, size_t len,
const void *send_buf = source;

/* need 1 slot */
assert(_SHMEM_BCAST_SYNC_SIZE >= 1);
assert(SHMEM_BCAST_SYNC_SIZE >= 1);

if (PE_size == shmem_internal_num_pes && 0 == PE_root) {
/* we're the full tree, use the binomial tree */
Expand Down Expand Up @@ -519,7 +524,7 @@ shmem_internal_op_to_all_linear(void *target, void *source, int count, int type_
long completion = 0;

/* need 2 slots, plus bcast */
assert(_SHMEM_REDUCE_SYNC_SIZE >= 2 + _SHMEM_BCAST_SYNC_SIZE);
assert(SHMEM_REDUCE_SYNC_SIZE >= 2 + SHMEM_BCAST_SYNC_SIZE);

if (PE_start == shmem_internal_my_pe) {
int pe, i;
Expand Down Expand Up @@ -580,7 +585,7 @@ shmem_internal_op_to_all_tree(void *target, void *source, int count, int type_si
int parent, num_children, *children;

/* need 2 slots, plus bcast */
assert(_SHMEM_REDUCE_SYNC_SIZE >= 2 + _SHMEM_BCAST_SYNC_SIZE);
assert(SHMEM_REDUCE_SYNC_SIZE >= 2 + SHMEM_BCAST_SYNC_SIZE);

if (PE_size == shmem_internal_num_pes) {
/* we're the full tree, use the binomial tree */
Expand Down Expand Up @@ -641,6 +646,151 @@ shmem_internal_op_to_all_tree(void *target, void *source, int count, int type_si
}



void
shmem_internal_op_to_all_recdbl_sw(void *target, void *source, int count, int type_size,
int PE_start, int logPE_stride, int PE_size,
void *pWrk, long *pSync,
shm_internal_op_t op, shm_internal_datatype_t datatype)
{
int stride = 1 << logPE_stride;
int my_id = ((shmem_internal_my_pe - PE_start) / stride);
long one = 1, neg_one = -1, buff = 0;
int log2_proc = 1, pow2_proc = 2;
int i = PE_size >> 1;
int wrk_size = type_size*count;
void * const current_target = malloc(wrk_size);
int peer = 0;
long completion = 0;
long * pSync_extra_peer = pSync + SHMEM_REDUCE_SYNC_SIZE - 2;

/***********************************
*
* input checks and var init
*
* **************************************/

if (PE_size == 1) {
memcpy(target, source, type_size*count);
return;
}

while (i != 1) {
i >>= 1;
pow2_proc <<= 1;
log2_proc++;
}

/*Currently SHMEM_REDUCE_SYNC_SIZE assumes space for 2^32 PEs; this
parameter may be changed if need-be */
assert(log2_proc <= (SHMEM_REDUCE_SYNC_SIZE - 2));

if (current_target)
memcpy(current_target, (void *) source, wrk_size);
else
RAISE_ERROR(1);

/***********************************
*
* alg: reduce N number of PE's into a power of two recursive doubling algorithm
* have extra_peers do the operation with one of the power of two PE's so the information
* is in the power of two algorithm, at the end, update extra_peers with answer found
* by power of two team
*
* -target is used as "temp" buffer -- current_target tracks latest result
* give partner current_result,
*
* **************************************/

/*extra peer exchange: grab information from extra_peer so its part of pairwise exchange*/
if (my_id >= pow2_proc) {
peer = (my_id - pow2_proc) * stride + PE_start;
shmem_internal_put_nb(target, current_target, wrk_size, peer,
&completion);
shmem_internal_put_wait(&completion);
shmem_internal_fence();

buff = neg_one;
shmem_internal_put_small(pSync_extra_peer, &buff, sizeof(long),
peer);
shmem_internal_fence();
buff = neg_one;
SHMEM_WAIT_UNTIL(pSync_extra_peer, SHMEM_CMP_EQ, buff);

} else {
if ((PE_size - pow2_proc) > my_id) {
peer = (my_id + pow2_proc) * stride + PE_start;
buff = neg_one;
SHMEM_WAIT_UNTIL(pSync_extra_peer, SHMEM_CMP_EQ, buff);

shmem_internal_reduce_local(op, datatype, count, target, current_target);
}

/* Pairwise exchange: (only for PE's that are within the power of 2 set) with every iteration,
the information from each previous exchange is passed forward in the new interation */

long *step_psync;

for (i = 0; i < log2_proc; i++) {

peer = (my_id ^ (1 << i)) * stride + PE_start;
step_psync = &pSync[i];

if (my_id < peer) {
shmem_internal_put_small(step_psync, &one,
sizeof(int), peer);
SHMEM_WAIT(step_psync, 0);
shmem_internal_put_nb(target, current_target,
wrk_size, peer, &completion);
shmem_internal_put_wait(&completion);
shmem_internal_fence();
shmem_internal_atomic_small(step_psync, &one,
sizeof(int), peer,
SHM_INTERNAL_SUM, DTYPE_INT);
} else {
SHMEM_WAIT(step_psync, 0);
shmem_internal_put_nb(target, current_target,
wrk_size, peer, &completion);
shmem_internal_put_wait(&completion);
shmem_internal_fence();
shmem_internal_put_small(step_psync,
&one, sizeof(int), peer);
SHMEM_WAIT(step_psync, 1);
}

/* perform op */
shmem_internal_reduce_local(op, datatype, count,
target, current_target);

}

shmem_internal_quiet();

/*update extra peer with the final result from the pairwise exchange */
if ((PE_size - pow2_proc) > my_id) {
peer = (my_id + pow2_proc) * stride + PE_start;

shmem_internal_put_nb(target, current_target, wrk_size,
peer, &completion);
shmem_internal_put_wait(&completion);
shmem_internal_fence();

buff = neg_one;
shmem_internal_put_small(pSync_extra_peer, &buff,
sizeof(long), peer);
shmem_internal_fence();
}

memcpy(target, current_target, wrk_size);
}

free(current_target);

for (i = 0; i < SHMEM_REDUCE_SYNC_SIZE; i++)
pSync[i] = SHMEM_SYNC_VALUE;
}


/*****************************************
*
* COLLECT (variable size)
Expand All @@ -657,7 +807,7 @@ shmem_internal_collect_linear(void *target, const void *source, size_t len,
long completion = 0;

/* need 3 slots, plus bcast */
assert(_SHMEM_COLLECT_SYNC_SIZE >= 3 + _SHMEM_BCAST_SYNC_SIZE);
assert(SHMEM_COLLECT_SYNC_SIZE >= 3 + SHMEM_BCAST_SYNC_SIZE);

if (PE_size == 1) {
if (target != source) memcpy(target, source, len);
Expand Down Expand Up @@ -738,7 +888,7 @@ shmem_internal_fcollect_linear(void *target, const void *source, size_t len,
long completion = 0;

/* need 1 slot, plus bcast */
assert(_SHMEM_COLLECT_SYNC_SIZE >= 1 + _SHMEM_BCAST_SYNC_SIZE);
assert(SHMEM_COLLECT_SYNC_SIZE >= 1 + SHMEM_BCAST_SYNC_SIZE);

if (PE_start == shmem_internal_my_pe) {
/* Copy data into the target */
Expand Down Expand Up @@ -794,7 +944,7 @@ shmem_internal_fcollect_ring(void *target, const void *source, size_t len,
long zero = 0, one = 1;

/* need 1 slot */
assert(_SHMEM_COLLECT_SYNC_SIZE >= 1);
assert(SHMEM_COLLECT_SYNC_SIZE >= 1);

/* copy my portion to the right place */
memcpy((char*) target + (my_id * len), source, len);
Expand Down Expand Up @@ -851,7 +1001,7 @@ shmem_internal_fcollect_recdbl(void *target, const void *source, size_t len,
2^(sizeof(int)*8-1)-1, so make the math a bit easier and assume
2^(sizeof(int) * 8), which means log2(num_procs) is always less
than sizeof(int) * 8. */
assert(_SHMEM_COLLECT_SYNC_SIZE >= (sizeof(int) * 8) / sizeof(long));
assert(SHMEM_COLLECT_SYNC_SIZE >= (sizeof(int) * 8) / sizeof(long));
assert(0 == (PE_size & (PE_size - 1)));

/* copy my portion to the right place */
Expand Down
17 changes: 9 additions & 8 deletions src/collectives_c.c
Original file line number Diff line number Diff line change
Expand Up @@ -393,8 +393,8 @@ shmem_longdouble_min_to_all(long double *target, long double *source, int nreduc
SHMEM_ERR_CHECK_INITIALIZED();

shmem_internal_op_to_all(target, source, nreduce, sizeof(long double),
PE_start, logPE_stride, PE_size,
pWrk, pSync, SHM_INTERNAL_MIN, SHM_INTERNAL_LONG_DOUBLE);
PE_start, logPE_stride, PE_size,
pWrk, pSync, SHM_INTERNAL_MIN, SHM_INTERNAL_LONG_DOUBLE);
}


Expand Down Expand Up @@ -484,11 +484,12 @@ shmem_longdouble_max_to_all(long double *target, long double *source, int nreduc
SHMEM_ERR_CHECK_INITIALIZED();

shmem_internal_op_to_all(target, source, nreduce, sizeof(long double),
PE_start, logPE_stride, PE_size,
pWrk, pSync, SHM_INTERNAL_MAX, SHM_INTERNAL_LONG_DOUBLE);
PE_start, logPE_stride, PE_size,
pWrk, pSync, SHM_INTERNAL_MAX, SHM_INTERNAL_LONG_DOUBLE);
}



void
shmem_short_max_to_all(short *target, short *source, int nreduce,
int PE_start, int logPE_stride, int PE_size,
Expand Down Expand Up @@ -575,8 +576,8 @@ shmem_longdouble_sum_to_all(long double *target, long double *source, int nreduc
SHMEM_ERR_CHECK_INITIALIZED();

shmem_internal_op_to_all(target, source, nreduce, sizeof(long double),
PE_start, logPE_stride, PE_size,
pWrk, pSync, SHM_INTERNAL_SUM, SHM_INTERNAL_LONG_DOUBLE);
PE_start, logPE_stride, PE_size,
pWrk, pSync, SHM_INTERNAL_SUM, SHM_INTERNAL_LONG_DOUBLE);
}


Expand Down Expand Up @@ -692,8 +693,8 @@ shmem_longdouble_prod_to_all(long double *target, long double *source, int nredu
SHMEM_ERR_CHECK_INITIALIZED();

shmem_internal_op_to_all(target, source, nreduce, sizeof(long double),
PE_start, logPE_stride, PE_size,
pWrk, pSync, SHM_INTERNAL_PROD, SHM_INTERNAL_LONG_DOUBLE);
PE_start, logPE_stride, PE_size,
pWrk, pSync, SHM_INTERNAL_PROD, SHM_INTERNAL_LONG_DOUBLE);
}


Expand Down
2 changes: 1 addition & 1 deletion src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ shmem_internal_init(int tl_requested, int *tl_provided)
printf("SMA_REDUCE_ALGORITHM %s\n",
(NULL == getenv("SMA_REDUCE_ALGORITHM") ? "auto" :
getenv("SMA_REDUCE_ALGORITHM")));
printf("\tAlgorithm for reductions. Options are auto, linear, tree\n");
printf("\tAlgorithm for reductions. Options are auto, linear, tree, recdbl\n");
printf("SMA_COLLECT_ALGORITHM %s\n",
(NULL == getenv("SMA_COLLECT_ALGORITHM") ? "auto" :
getenv("SMA_COLLECT_ALGORITHM")));
Expand Down
Loading

0 comments on commit 7d5c025

Please sign in to comment.