Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First pass at scan, implemented linear and ring algorithms #1154

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion mpp/shmemx.h4
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ include(shmemx_c_func.h4)dnl
/* SHMEMX constant(s) are included in MAX_HINTS value in shmem-def.h */
#define SHMEMX_MALLOC_NO_BARRIER (1l<<2)

/* C++ overloaded declarations */
/* C overloaded declarations */
#ifdef __cplusplus
} /* extern "C" */

Expand Down Expand Up @@ -68,6 +68,20 @@ static inline void shmemx_ibget(shmem_ctx_t ctx, $2 *target, const $2 *source,
}')dnl
SHMEM_CXX_DEFINE_FOR_RMA(`SHMEM_CXX_IBGET')

define(`SHMEM_CXX_SUM_EXSCAN',
`static inline int shmemx_sum_exscan(shmem_team_t team, $2* dest, const $2* source,
size_t nelems) {
return shmemx_$1_sum_exscan(team, dest, source, nelems);
}')dnl
SHMEM_CXX_DEFINE_FOR_COLL_SUM_PROD(`SHMEM_CXX_SUM_EXSCAN')

define(`SHMEM_CXX_SUM_INSCAN',
`static inline int shmemx_sum_inscan(shmem_team_t team, $2* dest, const $2* source,
size_t nelems) {
return shmemx_$1_sum_inscan(team, dest, source, nelems);
}')dnl
SHMEM_CXX_DEFINE_FOR_COLL_SUM_PROD(`SHMEM_CXX_SUM_INSCAN')

/* C11 Generic Macros */
#elif (defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L && !defined(SHMEM_INTERNAL_INCLUDE))

Expand Down Expand Up @@ -105,6 +119,19 @@ SHMEM_BIND_C11_RMA(`SHMEM_C11_GEN_IBGET', `, \') \
uint64_t*: shmemx_signal_add \
)(__VA_ARGS__)

define(`SHMEM_C11_GEN_EXSCAN', ` $2*: shmemx_$1_sum_exscan')dnl
#define shmemx_sum_exscan(...) \
_Generic(SHMEM_C11_TYPE_EVAL_PTR(SHMEM_C11_ARG1(__VA_ARGS__)), \
SHMEM_BIND_C11_RMA(`SHMEM_C11_GEN_EXSCAN', `, \') \
)(__VA_ARGS__)

define(`SHMEM_C11_GEN_INSCAN', ` $2*: shmemx_$1_sum_inscan')dnl
#define shmemx_sum_inscan(...) \
_Generic(SHMEM_C11_TYPE_EVAL_PTR(SHMEM_C11_ARG1(__VA_ARGS__)), \
SHMEM_BIND_C11_RMA(`SHMEM_C11_GEN_INSCAN', `, \') \
)(__VA_ARGS__)


#endif /* C11 */

#endif /* SHMEMX_H */
10 changes: 10 additions & 0 deletions mpp/shmemx_c_func.h4
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ SH_PAD(`$1') ptrdiff_t tst, ptrdiff_t sst,
SH_PAD(`$1') size_t bsize, size_t nblocks, int pe)')dnl
SHMEM_DECLARE_FOR_SIZES(`SHMEM_C_CTX_IBGET_N')

define(`SHMEM_C_EXSCAN',
`SHMEM_FUNCTION_ATTRIBUTES int SHPRE()shmemx_$1_$4_exscan(shmem_team_t team, $2 *dest, const $2 *source, size_t nelems);')dnl

SHMEM_BIND_C_COLL_SUM_PROD(`SHMEM_C_EXSCAN', `sum')

define(`SHMEM_C_INSCAN',
`SHMEM_FUNCTION_ATTRIBUTES int SHPRE()shmemx_$1_$4_inscan(shmem_team_t team, $2 *dest, const $2 *source, size_t nelems);')dnl

SHMEM_BIND_C_COLL_SUM_PROD(`SHMEM_C_INSCAN', `sum')

/* Performance Counter Query Routines */
SHMEM_FUNCTION_ATTRIBUTES void SHPRE()shmemx_pcntr_get_issued_write(shmem_ctx_t ctx, uint64_t *cntr_value);
SHMEM_FUNCTION_ATTRIBUTES void SHPRE()shmemx_pcntr_get_issued_read(shmem_ctx_t ctx, uint64_t *cntr_value);
Expand Down
242 changes: 242 additions & 0 deletions src/collectives.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

#define SHMEM_INTERNAL_INCLUDE
#include "shmem.h"
#include "shmemx.h"
#include "shmem_internal.h"
#include "shmem_collectives.h"
#include "shmem_internal_op.h"

coll_type_t shmem_internal_barrier_type = AUTO;
coll_type_t shmem_internal_bcast_type = AUTO;
coll_type_t shmem_internal_reduce_type = AUTO;
coll_type_t shmem_internal_scan_type = AUTO;
coll_type_t shmem_internal_collect_type = AUTO;
coll_type_t shmem_internal_fcollect_type = AUTO;
long *shmem_internal_barrier_all_psync;
Expand Down Expand Up @@ -206,6 +208,18 @@ shmem_internal_collectives_init(void)
} else {
RAISE_WARN_MSG("Ignoring bad reduction algorithm '%s'\n", type);
}
}
if (shmem_internal_params.SCAN_ALGORITHM_provided) {
type = shmem_internal_params.SCAN_ALGORITHM;
if (0 == strcmp(type, "auto")) {
shmem_internal_scan_type = AUTO;
} else if (0 == strcmp(type, "linear")) {
shmem_internal_scan_type = LINEAR;
} else if (0 == strcmp(type, "ring")) {
shmem_internal_scan_type = RING;
} else {
RAISE_WARN_MSG("Ignoring bad scan algorithm '%s'\n", type);
}
}
if (shmem_internal_params.COLLECT_ALGORITHM_provided) {
type = shmem_internal_params.COLLECT_ALGORITHM;
Expand Down Expand Up @@ -971,6 +985,234 @@ shmem_internal_op_to_all_recdbl_sw(void *target, const void *source, size_t coun
}


/*****************************************
*
* SCAN
*
*****************************************/
void
shmem_internal_scan_linear(void *target, const void *source, size_t count, size_t type_size,
int PE_start, int PE_stride, int PE_size,
void *pWrk, long *pSync,
shm_internal_op_t op, shm_internal_datatype_t datatype, int scantype)
{

/* scantype is 0 for inscan and 1 for exscan */
long zero = 0, one = 1;
long completion = 0;
int free_source = 0;


if (count == 0) return;

int pe, i;

/* In-place scan: copy source data to a temporary buffer so we can use
* the symmetric buffer to accumulate scan data. */
if (target == source) {
void *tmp = malloc(count * type_size);

if (NULL == tmp)
RAISE_ERROR_MSG("Unable to allocate %zub temporary buffer\n", count*type_size);

shmem_internal_copy_self(tmp, target, count * type_size);
free_source = 1;
source = tmp;

shmem_internal_sync(PE_start, PE_stride, PE_size, pSync + 2);
}

if (PE_start == shmem_internal_my_pe) {


/* initialize target buffer. The put
will flush any atomic cache value that may currently
exist. */
if(scantype)
{
/* Exclude own value for EXSCAN */
//Create an array of size (count * type_size) of zeroes
uint8_t *zeroes = (uint8_t *) calloc(count, type_size);
shmem_internal_put_nb(SHMEM_CTX_DEFAULT, target, zeroes, count * type_size,
shmem_internal_my_pe, &completion);
shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion);
shmem_internal_quiet(SHMEM_CTX_DEFAULT);
free(zeroes);
}


/* Send contribution to all */
for (pe = PE_start + PE_stride*scantype, i = scantype ;
i < PE_size ;
i++, pe += PE_stride) {

shmem_internal_put_nb(SHMEM_CTX_DEFAULT, target, source, count * type_size,
pe, &completion);
shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion);
shmem_internal_fence(SHMEM_CTX_DEFAULT);

}

for (pe = PE_start + PE_stride, i = 1 ;
i < PE_size ;
i++, pe += PE_stride) {
shmem_internal_put_scalar(SHMEM_CTX_DEFAULT, pSync, &one, sizeof(one), pe);
}

/* Wait for others to acknowledge initialization */
SHMEM_WAIT_UNTIL(pSync, SHMEM_CMP_EQ, PE_size - 1);

/* reset pSync */
shmem_internal_put_scalar(SHMEM_CTX_DEFAULT, pSync, &zero, sizeof(zero), shmem_internal_my_pe);
SHMEM_WAIT_UNTIL(pSync, SHMEM_CMP_EQ, 0);


/* Let everyone know sending can start */
for (pe = PE_start + PE_stride, i = 1 ;
i < PE_size ;
i++, pe += PE_stride) {
shmem_internal_put_scalar(SHMEM_CTX_DEFAULT, pSync, &one, sizeof(one), pe);
}
} else {

/* wait for clear to intialization */
SHMEM_WAIT(pSync, 0);

/* reset pSync */
shmem_internal_put_scalar(SHMEM_CTX_DEFAULT, pSync, &zero, sizeof(zero), shmem_internal_my_pe);
SHMEM_WAIT_UNTIL(pSync, SHMEM_CMP_EQ, 0);

/* Send contribution to all pes larger than itself */
for (pe = shmem_internal_my_pe + PE_stride*scantype, i = shmem_internal_my_pe + scantype ;
i < PE_size;
i++, pe += PE_stride) {

shmem_internal_atomicv(SHMEM_CTX_DEFAULT, target, source, count * type_size,
pe, op, datatype, &completion);
shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion);
shmem_internal_fence(SHMEM_CTX_DEFAULT);

}

shmem_internal_atomic(SHMEM_CTX_DEFAULT, pSync, &one, sizeof(one),
PE_start, SHM_INTERNAL_SUM, SHM_INTERNAL_LONG);

SHMEM_WAIT(pSync, 0);

/* reset pSync */
shmem_internal_put_scalar(SHMEM_CTX_DEFAULT, pSync, &zero, sizeof(zero), shmem_internal_my_pe);
SHMEM_WAIT_UNTIL(pSync, SHMEM_CMP_EQ, 0);

}

if (free_source)
free((void *)source);

}


void
shmem_internal_scan_ring(void *target, const void *source, size_t count, size_t type_size,
int PE_start, int PE_stride, int PE_size,
void *pWrk, long *pSync,
shm_internal_op_t op, shm_internal_datatype_t datatype, int scantype)
{

/* scantype is 0 for inscan and 1 for exscan */
long zero = 0, one = 1;
long completion = 0;
int free_source = 0;

/* In-place scan: copy source data to a temporary buffer so we can use
* the symmetric buffer to accumulate scan data. */
if (target == source) {
void *tmp = malloc(count * type_size);

if (NULL == tmp)
RAISE_ERROR_MSG("Unable to allocate %zub temporary buffer\n", count*type_size);

shmem_internal_copy_self(tmp, target, count * type_size);
free_source = 1;
source = tmp;

shmem_internal_sync(PE_start, PE_stride, PE_size, pSync + 2);
}


if (count == 0) return;

int pe, i;

if (PE_start == shmem_internal_my_pe) {

/* initialize target buffer. The put
will flush any atomic cache value that may currently
exist. */
if(scantype)
{
/* Exclude own value for EXSCAN */
//Create an array of size (count * type_size) of zeroes
uint8_t *zeroes = (uint8_t *) calloc(count, type_size);
shmem_internal_put_nb(SHMEM_CTX_DEFAULT, target, zeroes, count * type_size,
shmem_internal_my_pe, &completion);
shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion);
shmem_internal_quiet(SHMEM_CTX_DEFAULT);
free(zeroes);
}

/* Send contribution to all */
for (pe = PE_start + PE_stride*scantype, i = scantype ;
i < PE_size ;
i++, pe += PE_stride) {

shmem_internal_put_nb(SHMEM_CTX_DEFAULT, target, source, count * type_size,
pe, &completion);
shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion);
shmem_internal_fence(SHMEM_CTX_DEFAULT);
}

/* Let next pe know that it's safe to send to us */
if(shmem_internal_my_pe + PE_stride < PE_size)
shmem_internal_put_scalar(SHMEM_CTX_DEFAULT, pSync, &one, sizeof(one), shmem_internal_my_pe + PE_stride);

/* Wait for others to acknowledge sending data */
SHMEM_WAIT_UNTIL(pSync, SHMEM_CMP_EQ, PE_size - 1);

/* reset pSync */
shmem_internal_put_scalar(SHMEM_CTX_DEFAULT, pSync, &zero, sizeof(zero), shmem_internal_my_pe);
SHMEM_WAIT_UNTIL(pSync, SHMEM_CMP_EQ, 0);

} else {
/* wait for clear to send */
SHMEM_WAIT(pSync, 0);

/* reset pSync */
shmem_internal_put_scalar(SHMEM_CTX_DEFAULT, pSync, &zero, sizeof(zero), shmem_internal_my_pe);
SHMEM_WAIT_UNTIL(pSync, SHMEM_CMP_EQ, 0);

/* Send contribution to all pes larger than itself */
for (pe = shmem_internal_my_pe + PE_stride*scantype, i = shmem_internal_my_pe + scantype ;
i < PE_size;
i++, pe += PE_stride) {

shmem_internal_atomicv(SHMEM_CTX_DEFAULT, target, source, count * type_size,
pe, op, datatype, &completion);
shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion);
shmem_internal_fence(SHMEM_CTX_DEFAULT);
}

/* Let next pe know that it's safe to send to us */
if(shmem_internal_my_pe + PE_stride < PE_size)
shmem_internal_put_scalar(SHMEM_CTX_DEFAULT, pSync, &one, sizeof(one), shmem_internal_my_pe + PE_stride);

shmem_internal_atomic(SHMEM_CTX_DEFAULT, pSync, &one, sizeof(one),
PE_start, SHM_INTERNAL_SUM, SHM_INTERNAL_LONG);
}

if (free_source)
free((void *)source);

}
/*****************************************
*
* COLLECT (variable size)
Expand Down
Loading
Loading