Skip to content

Commit

Permalink
[DISTRIBUTED] Send data in chunks and serializer improvements.
Browse files Browse the repository at this point in the history
- Distributed runtime was limited to sending messages of maximum
size INT_MAX for each message (both for MPI and gRPC). With this
commit we are no longer limited by this and we serialize data into
chunks which are then sent to the workers, allowing us to send bigger
messages.
- Added a new command line arg to set maximum chunk size for distr
messages (default ~2GB).
- Use pointer for holding buffer inside serialization iterator.
- DaphneSerializer std::vector buffer, used reserve() and
capacity() methods to manage available space. However, we should not write to
vector's memory (even if reserved) without first resizing the buffer. Therefore replaced
reserve() calls with resize(). Similar replacements for MPI and gRPC worker.
- Due to resize() use, it's best to use the smallest possible chunk size.
- Added test case for distributed chunked messages.
  • Loading branch information
aristotelis96 committed Oct 18, 2023
1 parent f475e18 commit 5cca09c
Show file tree
Hide file tree
Showing 20 changed files with 481 additions and 245 deletions.
2 changes: 2 additions & 0 deletions src/api/cli/DaphneUserConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class DaphneLogger;
#include <string>
#include <memory>
#include <map>
#include <limits>

/*
* Container to pass around user configuration
Expand Down Expand Up @@ -66,6 +67,7 @@ struct DaphneUserConfig {
QueueTypeOption queueSetupScheme = CENTRALIZED;
VictimSelectionLogic victimSelection = SEQPRI;
ALLOCATION_TYPE distributedBackEndSetup= ALLOCATION_TYPE::DIST_MPI; // default value
size_t max_distributed_serialization_chunk_size = std::numeric_limits<int>::max() - 1024; // 2GB (-1KB to make up for gRPC headers etc.) - which is the maximum size allowed by gRPC / MPI. TODO: Investigate what might be the optimal.
int numberOfThreads = -1;
int minimumTaskSize = 1;
// minimum considered log level (e.g., no logging below ERROR (essentially suppressing WARN, INFO, DEBUG and TRACE)
Expand Down
14 changes: 11 additions & 3 deletions src/api/internal/daphne_internal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,19 @@ int startDAPHNE(int argc, const char** argv, DaphneLibResult* daphneLibRes, int
static opt<ALLOCATION_TYPE> distributedBackEndSetup("dist_backend", cat(distributedBackEndSetupOptions),
desc("Choose the options for the distribution backend:"),
values(
clEnumValN(ALLOCATION_TYPE::DIST_MPI, "MPI", "Use message passing interface for internode data exchange"),
clEnumValN(ALLOCATION_TYPE::DIST_GRPC_SYNC, "sync-gRPC", "Use remote procedure call (synchronous gRPC with threading) for internode data exchange (default)"),
clEnumValN(ALLOCATION_TYPE::DIST_MPI, "MPI", "Use message passing interface for internode data exchange (default)"),
clEnumValN(ALLOCATION_TYPE::DIST_GRPC_SYNC, "sync-gRPC", "Use remote procedure call (synchronous gRPC with threading) for internode data exchange"),
clEnumValN(ALLOCATION_TYPE::DIST_GRPC_ASYNC, "async-gRPC", "Use remote procedure call (asynchronous gRPC) for internode data exchange")
),
init(ALLOCATION_TYPE::DIST_GRPC_SYNC)
init(ALLOCATION_TYPE::DIST_MPI)
);
static opt<size_t> maxDistrChunkSize("max-distr-chunk-size", cat(distributedBackEndSetupOptions),
desc(
"Define the maximum chunk size per message for the distributed runtime (in bytes)"
"(default is close to maximum allowed ~2GB)"
),
init(std::numeric_limits<int>::max() - 1024)
);


// Scheduling options
Expand Down Expand Up @@ -385,6 +392,7 @@ int startDAPHNE(int argc, const char** argv, DaphneLibResult* daphneLibRes, int
if(user_config.distributedBackEndSetup!=ALLOCATION_TYPE::DIST_MPI && user_config.distributedBackEndSetup!=ALLOCATION_TYPE::DIST_GRPC_SYNC && user_config.distributedBackEndSetup!=ALLOCATION_TYPE::DIST_GRPC_ASYNC)
spdlog::warn("No backend has been selected. Wiil use the default 'MPI'");
}
user_config.max_distributed_serialization_chunk_size = maxDistrChunkSize;
for (auto explain : explainArgList) {
switch (explain) {
case kernels:
Expand Down
113 changes: 72 additions & 41 deletions src/runtime/distributed/coordinator/kernels/Broadcast.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,11 @@ struct Broadcast<ALLOCATION_TYPE::DIST_MPI, DT>
{
size_t messageLength=0;
std::vector<char> dataToSend;
double val = 1;
if (isScalar){
auto ptr = (double*)(&mat);
double val = *ptr;
val = *ptr;
mat = DataObjectFactory::create<DenseMatrix<double>>(0, 0, false);
dataToSend.reserve(sizeof(double));
messageLength = DaphneSerializer<double>::serialize(val, dataToSend);
}
else {
messageLength = DaphneSerializer<typename std::remove_const<DT>::type>::serialize(mat, dataToSend);
}
std::vector<int> targetGroup; // We will not be able to take the advantage of broadcast if some mpi processes have the data

Expand All @@ -91,15 +87,30 @@ struct Broadcast<ALLOCATION_TYPE::DIST_MPI, DT>

if (dynamic_cast<AllocationDescriptorMPI&>(*(dp->allocation)).getDistributedData().isPlacedAtWorker)
continue;

// Minimum chunk size
auto min_chunk_size = dctx->config.max_distributed_serialization_chunk_size < DaphneSerializer<DT>::length(mat) ?
dctx->config.max_distributed_serialization_chunk_size :
DaphneSerializer<DT>::length(mat);

MPIHelper::initiateStreaming(rank, min_chunk_size);
targetGroup.push_back(rank);
}

if((int)targetGroup.size()==MPIHelper::getCommSize() - 1){ // exclude coordinator
MPIHelper::sendData(messageLength, dataToSend.data());
if (isScalar){
std::vector<char> buffer;
auto length = DaphneSerializer<double>::serialize(val, buffer);
MPIHelper::broadcastData(length, buffer.data());
} else {
auto serializer = DaphneSerializerChunks<DT>(mat, dctx->config.max_distributed_serialization_chunk_size);
for (auto it = serializer.begin(); it != serializer.end(); ++it)
MPIHelper::broadcastData(it->first, it->second->data());
}
}
else{
for(int i=0;i<(int)targetGroup.size();i++){
MPIHelper::distributeData(messageLength, dataToSend.data(), targetGroup.at(i));
}
for(int i=0;i<(int)targetGroup.size();i++)
MPIHelper::sendData(messageLength, dataToSend.data(), targetGroup.at(i));
}
for(int i=0;i<(int)targetGroup.size();i++)
{
Expand Down Expand Up @@ -139,34 +150,48 @@ struct Broadcast<ALLOCATION_TYPE::DIST_GRPC_ASYNC, DT>
auto workers = ctx->getWorkers();

distributed::Data protoMsg;

std::vector<char> buffer;
double val = 1;
if (isScalar) {
auto ptr = (double*)(&mat);
double val = *ptr;
auto length = DaphneSerializer<double>::serialize(val, buffer);
protoMsg.set_bytes(buffer.data(), length);

val = *ptr;
// Need matrix for metadata, type of matrix does not really matter.
mat = DataObjectFactory::create<DenseMatrix<double>>(0, 0, false);
}
else { // Not scalar
// DT is const Structure, but we only provide template specialization for structure.
// TODO should we implement an additional specialization or remove constness from template parameter?
size_t length = DaphneSerializer<typename std::remove_const<DT>::type>::serialize(mat, buffer);
protoMsg.set_bytes(buffer.data(), length);
}
}
LoadPartitioningDistributed<DT, AllocationDescriptorGRPC> partioner(DistributionSchema::BROADCAST, mat, dctx);

while(partioner.HasNextChunk()){
auto dp = partioner.GetNextChunk();
if (dynamic_cast<AllocationDescriptorGRPC&>(*(dp->allocation)).getDistributedData().isPlacedAtWorker)
continue;

auto address = dynamic_cast<AllocationDescriptorGRPC&>(*(dp->allocation)).getLocation();

StoredInfo storedInfo({dp->dp_id});
caller.asyncStoreCall(dynamic_cast<AllocationDescriptorGRPC&>(*(dp->allocation)).getLocation(), storedInfo, protoMsg);
}

caller.asyncStoreCall(address, storedInfo);
// Minimum chunk size
auto min_chunk_size = dctx->config.max_distributed_serialization_chunk_size < DaphneSerializer<DT>::length(mat) ?
dctx->config.max_distributed_serialization_chunk_size :
DaphneSerializer<DT>::length(mat);

// First send chunk size
protoMsg.set_bytes(&min_chunk_size, sizeof(size_t));
caller.sendDataStream(address, protoMsg);
if (isScalar) {
std::vector<char> buffer;
auto length = DaphneSerializer<double>::serialize(val, buffer);
protoMsg.set_bytes(buffer.data(), length);

caller.sendDataStream(address, protoMsg);
} else{
auto serializer = DaphneSerializerChunks<DT>(mat, min_chunk_size);
for (auto it = serializer.begin(); it != serializer.end(); ++it){
protoMsg.set_bytes(it->second->data(), it->first);
caller.sendDataStream(address, protoMsg);
}
}
}
caller.writesDone();

while (!caller.isQueueEmpty()){
auto response = caller.getNextResult();
auto dp_id = response.storedInfo.dp_id;
Expand Down Expand Up @@ -196,26 +221,16 @@ struct Broadcast<ALLOCATION_TYPE::DIST_GRPC_SYNC, DT>
{
auto ctx = DistributedContext::get(dctx);
auto workers = ctx->getWorkers();

distributed::Data protoMsg;

std::vector<std::thread> threads_vector;
std::vector<char> buffer;
double val = 1;
if (isScalar) {
auto ptr = (double*)(&mat);
double val = *ptr;
auto length = DaphneSerializer<double>::serialize(val, buffer);
protoMsg.set_bytes(buffer.data(), length);

val = *ptr;
// Need matrix for metadata, type of matrix does not really matter.
mat = DataObjectFactory::create<DenseMatrix<double>>(0, 0, false);
}
else { // Not scalar
// DT is const Structure, but we only provide template specialization for structure.
// TODO should we implement an additional specialization or remove constness from template parameter?
size_t length = DaphneSerializer<typename std::remove_const<DT>::type>::serialize(mat, buffer);
protoMsg.set_bytes(buffer.data(), length);
}
LoadPartitioningDistributed<DT, AllocationDescriptorGRPC> partioner(DistributionSchema::BROADCAST, mat, dctx);

while(partioner.HasNextChunk()){
Expand All @@ -224,19 +239,35 @@ struct Broadcast<ALLOCATION_TYPE::DIST_GRPC_SYNC, DT>
continue;

auto workerAddr = dynamic_cast<AllocationDescriptorGRPC&>(*(dp->allocation)).getLocation();
std::thread t([=]()
std::thread t([=, &mat]()
{
// TODO Consider saving channels inside DaphneContext
grpc::ChannelArguments ch_args;
ch_args.SetMaxSendMessageSize(-1);
ch_args.SetMaxReceiveMessageSize(-1);
auto channel = grpc::CreateCustomChannel(workerAddr, grpc::InsecureChannelCredentials(), ch_args);
auto stub = distributed::Worker::NewStub(channel);

distributed::StoredData storedData;
grpc::ClientContext grpc_ctx;
stub->Store(&grpc_ctx, protoMsg, &storedData);
auto writer = stub->Store(&grpc_ctx, &storedData);
distributed::Data protoMsg;

if (isScalar){
std::vector<char> buffer;
auto length = DaphneSerializer<double>::serialize(val, buffer);
protoMsg.set_bytes(buffer.data(), length);

writer->Write(protoMsg);
} else {
auto serializer = DaphneSerializerChunks<DT>(mat, dctx->config.max_distributed_serialization_chunk_size);
for (auto it = serializer.begin(); it != serializer.end(); ++it){
protoMsg.set_bytes(it->second->data(), it->first);
writer->Write(protoMsg);
}
}
writer->WritesDone();
auto status = writer->Finish();

DistributedData newData;
newData.identifier = storedData.identifier();
newData.numRows = storedData.num_rows();
Expand Down
66 changes: 47 additions & 19 deletions src/runtime/distributed/coordinator/kernels/Distribute.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ void distribute(DT *mat, DCTX(dctx))
template<class DT>
struct Distribute<ALLOCATION_TYPE::DIST_MPI, DT>
{
static void apply(DT *mat, DCTX(dctx)) {
static void apply(DT *mat, DCTX(dctx)) {
std::vector<char> dataToSend;
std::vector<int> targetGroup;

Expand All @@ -77,9 +77,17 @@ struct Distribute<ALLOCATION_TYPE::DIST_MPI, DT>
continue;

auto slicedMat = mat->sliceRow(dp->range->r_start, dp->range->r_start + dp->range->r_len);
auto len = DaphneSerializer<typename std::remove_const<DT>::type>::serialize(slicedMat, dataToSend);
MPIHelper::distributeData(len, dataToSend.data(),rank);
targetGroup.push_back(rank);

// Minimum chunk size
auto min_chunk_size = dctx->config.max_distributed_serialization_chunk_size < DaphneSerializer<DT>::length(slicedMat) ?
dctx->config.max_distributed_serialization_chunk_size :
DaphneSerializer<DT>::length(slicedMat);
MPIHelper::initiateStreaming(rank, min_chunk_size);
auto serializer = DaphneSerializerChunks<DT>(slicedMat, min_chunk_size);
for (auto it = serializer.begin(); it != serializer.end(); ++it){
MPIHelper::sendData(it->first, it->second->data(), rank);
}
targetGroup.push_back(rank);
}
for(size_t i=0;i<targetGroup.size();i++)
{
Expand Down Expand Up @@ -113,7 +121,6 @@ struct Distribute<ALLOCATION_TYPE::DIST_GRPC_ASYNC, DT>
struct StoredInfo {
size_t dp_id;
};

DistributedGRPCCaller<StoredInfo, distributed::Data, distributed::StoredData> caller(dctx);

assert(mat != nullptr);
Expand All @@ -130,14 +137,27 @@ struct Distribute<ALLOCATION_TYPE::DIST_GRPC_ASYNC, DT>
std::vector<char> buffer;

auto slicedMat = mat->sliceRow(dp->range->r_start, dp->range->r_start + dp->range->r_len);
// DT is const Structure, but we only provide template specialization for structure.
// TODO should we implement an additional specialization or remove constness from template parameter?
auto length = DaphneSerializer<typename std::remove_const<DT>::type>::serialize(slicedMat, buffer);
protoMsg.set_bytes(buffer.data(), length);

StoredInfo storedInfo({dp->dp_id});
caller.asyncStoreCall(dynamic_cast<AllocationDescriptorGRPC&>(*(dp->allocation)).getLocation(), storedInfo, protoMsg);

auto address = dynamic_cast<AllocationDescriptorGRPC&>(*(dp->allocation)).getLocation();

caller.asyncStoreCall(address, storedInfo);
// Minimum chunk size
auto min_chunk_size = dctx->config.max_distributed_serialization_chunk_size < DaphneSerializer<DT>::length(mat) ?
dctx->config.max_distributed_serialization_chunk_size :
DaphneSerializer<DT>::length(mat);

protoMsg.set_bytes(&min_chunk_size, sizeof(size_t));
caller.sendDataStream(address, protoMsg);

auto serializer = DaphneSerializerChunks<DT>(slicedMat, min_chunk_size);
for (auto it = serializer.begin(); it != serializer.end(); ++it){
protoMsg.set_bytes(it->second->data(), it->first);
caller.sendDataStream(address, protoMsg);
}
}
caller.writesDone();


// get results
Expand Down Expand Up @@ -180,24 +200,32 @@ struct Distribute<ALLOCATION_TYPE::DIST_GRPC_SYNC, DT>
// Skip if already placed at workers
if (dynamic_cast<AllocationDescriptorGRPC&>(*(dp->allocation)).getDistributedData().isPlacedAtWorker)
continue;
distributed::Data protoMsg;


std::vector<char> buffer;

auto slicedMat = mat->sliceRow(dp->range->r_start, dp->range->r_start + dp->range->r_len);
// DT is const Structure, but we only provide template specialization for structure.
// TODO should we implement an additional specialization or remove constness from template parameter?
auto length = DaphneSerializer<typename std::remove_const<DT>::type>::serialize(slicedMat, buffer);
protoMsg.set_bytes(buffer.data(), length);


auto workerAddr = dynamic_cast<AllocationDescriptorGRPC&>(*(dp->allocation)).getLocation();
std::thread t([=]()
std::thread t([=, &mat]()
{
auto stub = ctx->stubs[workerAddr].get();

distributed::StoredData storedData;
grpc::ClientContext grpc_ctx;
auto status = stub->Store(&grpc_ctx, protoMsg, &storedData);

auto slicedMat = mat->sliceRow(dp->range->r_start, dp->range->r_start + dp->range->r_len);
auto serializer = DaphneSerializerChunks<DT>(slicedMat, dctx->config.max_distributed_serialization_chunk_size);

distributed::Data protoMsg;

// Send chunks
auto writer = stub->Store(&grpc_ctx, &storedData);
for (auto it = serializer.begin(); it != serializer.end(); ++it){
protoMsg.set_bytes(it->second->data(), it->first);
writer->Write(protoMsg);
}
writer->WritesDone();
auto status = writer->Finish();
if (!status.ok())
throw std::runtime_error(status.error_message());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,12 @@ struct DistributedCompute<ALLOCATION_TYPE::DIST_MPI, DTRes, const Structure>
task.mlir_code = mlirCode;
task.serialize(taskBuffer);
auto len = task.sizeInBytes();
MPIHelper::distributeTask(len, taskBuffer.data(),rank);
MPIHelper::sendTask(len, taskBuffer.data(), rank);
}

for (size_t rank = 1; rank < worldSize; rank++){
auto buffer = MPIHelper::getComputeResults(rank);
std::vector<WorkerImpl::StoredInfo> infoVec = MPIHelper::constructStoredInfoVector(std::string(buffer.data()));
std::vector<WorkerImpl::StoredInfo> infoVec = MPIHelper::constructStoredInfoVector(buffer);
size_t idx = 0;
for (auto info : infoVec){
auto resMat = *res[idx++];
Expand Down
Loading

0 comments on commit 5cca09c

Please sign in to comment.