Skip to content

Commit

Permalink
[DAPHNE-daphne-eu#758] MetaDataObject refactoring, CSRMatrix support,…
Browse files Browse the repository at this point in the history
… Pinning

* This commit introduces the meta data object to the CSRMatrix data type
  To implement this change, handling of the AllocationDescriptors has been refactored out of DenseMatrix.

* Separate handling of ranges
  Since tracking of ranges of data is only used in the distributed setting for now, we will handle this separately and assume always a full allocation for local computation. This should result in less unnecessary "if range not null do this, else do that".

* Memory pinning
  To prevent excessive allocation ID lookups in the hot path, especially when using --vec, this change "pins" memory by allocation type of previous accesses.
  Simply put, as long as there is no different access type (e.g., call getValues() for host vs device memory) it is assumed, that the data is not changed and no query of the meta data object needs to be done.

Closes daphne-eu#758
  • Loading branch information
corepointer committed Oct 18, 2024
1 parent 053d5a7 commit a4ffaa3
Show file tree
Hide file tree
Showing 45 changed files with 648 additions and 533 deletions.
1 change: 1 addition & 0 deletions src/api/cli/DaphneUserConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ struct DaphneUserConfig {
bool use_cuda = false;
bool use_vectorized_exec = false;
bool use_distributed = false;
bool use_grpc_async = false;
bool use_obj_ref_mgnt = true;
bool use_ipa_const_propa = true;
bool use_phy_op_selection = true;
Expand Down
25 changes: 13 additions & 12 deletions src/runtime/distributed/coordinator/kernels/Broadcast.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ template <class DT> struct Broadcast<ALLOCATION_TYPE::DIST_MPI, DT> {
LoadPartitioningDistributed<DT, AllocationDescriptorMPI> partioner(DistributionSchema::BROADCAST, mat, dctx);
while (partioner.HasNextChunk()) {
auto dp = partioner.GetNextChunk();
auto rank = dynamic_cast<AllocationDescriptorMPI &>(*(dp->allocation)).getRank();
auto rank = dynamic_cast<AllocationDescriptorMPI *>(dp->getAllocation(0))->getRank();

if (dynamic_cast<AllocationDescriptorMPI &>(*(dp->allocation)).getDistributedData().isPlacedAtWorker)
if (dynamic_cast<AllocationDescriptorMPI *>(dp->getAllocation(0))->getDistributedData().isPlacedAtWorker)
continue;

// Minimum chunk size
Expand Down Expand Up @@ -114,12 +114,13 @@ template <class DT> struct Broadcast<ALLOCATION_TYPE::DIST_MPI, DT> {
WorkerImpl::StoredInfo dataAcknowledgement = MPIHelper::getDataAcknowledgement(&rank);
std::string address = std::to_string(rank);
DataPlacement *dp = mat->getMetaDataObject()->getDataPlacementByLocation(address);
auto data = dynamic_cast<AllocationDescriptorMPI &>(*(dp->allocation)).getDistributedData();
auto data = dynamic_cast<AllocationDescriptorMPI *>(dp->getAllocation(0))->getDistributedData();
data.identifier = dataAcknowledgement.identifier;
data.numRows = dataAcknowledgement.numRows;
data.numCols = dataAcknowledgement.numCols;
data.isPlacedAtWorker = true;
dynamic_cast<AllocationDescriptorMPI &>(*(dp->allocation)).updateDistributedData(data);
auto alloc = dynamic_cast<AllocationDescriptorMPI *>(dp->getAllocation(0));
alloc->updateDistributedData(data);
}
}
};
Expand Down Expand Up @@ -150,12 +151,12 @@ template <class DT> struct Broadcast<ALLOCATION_TYPE::DIST_GRPC_ASYNC, DT> {

while (partioner.HasNextChunk()) {
auto dp = partioner.GetNextChunk();
if (dynamic_cast<AllocationDescriptorGRPC &>(*(dp->allocation)).getDistributedData().isPlacedAtWorker)
if (dynamic_cast<AllocationDescriptorGRPC *>(dp->getAllocation(0))->getDistributedData().isPlacedAtWorker)
continue;

auto address = dynamic_cast<AllocationDescriptorGRPC &>(*(dp->allocation)).getLocation();
auto address = dynamic_cast<AllocationDescriptorGRPC *>(dp->getAllocation(0))->getLocation();

StoredInfo storedInfo({dp->dp_id});
StoredInfo storedInfo({dp->getID()});
caller.asyncStoreCall(address, storedInfo);
// Minimum chunk size
auto min_chunk_size =
Expand Down Expand Up @@ -187,15 +188,15 @@ template <class DT> struct Broadcast<ALLOCATION_TYPE::DIST_GRPC_ASYNC, DT> {
auto dp_id = response.storedInfo.dp_id;
auto dp = mat->getMetaDataObject()->getDataPlacementByID(dp_id);

auto data = dynamic_cast<AllocationDescriptorGRPC &>(*(dp->allocation)).getDistributedData();
auto data = dynamic_cast<AllocationDescriptorGRPC *>(dp->getAllocation(0))->getDistributedData();

auto storedData = response.result;
data.identifier = storedData.identifier();
data.numRows = storedData.num_rows();
data.numCols = storedData.num_cols();
data.isPlacedAtWorker = true;

dynamic_cast<AllocationDescriptorGRPC &>(*(dp->allocation)).updateDistributedData(data);
dynamic_cast<AllocationDescriptorGRPC *>(dp->getAllocation(0))->updateDistributedData(data);
}
};
};
Expand All @@ -222,10 +223,10 @@ template <class DT> struct Broadcast<ALLOCATION_TYPE::DIST_GRPC_SYNC, DT> {

while (partioner.HasNextChunk()) {
auto dp = partioner.GetNextChunk();
if (dynamic_cast<AllocationDescriptorGRPC &>(*(dp->allocation)).getDistributedData().isPlacedAtWorker)
if (dynamic_cast<AllocationDescriptorGRPC *>(dp->getAllocation(0))->getDistributedData().isPlacedAtWorker)
continue;

auto workerAddr = dynamic_cast<AllocationDescriptorGRPC &>(*(dp->allocation)).getLocation();
auto workerAddr = dynamic_cast<AllocationDescriptorGRPC *>(dp->getAllocation(0))->getLocation();
std::thread t([=, &mat]() {
// TODO Consider saving channels inside DaphneContext
grpc::ChannelArguments ch_args;
Expand Down Expand Up @@ -260,7 +261,7 @@ template <class DT> struct Broadcast<ALLOCATION_TYPE::DIST_GRPC_SYNC, DT> {
newData.numRows = storedData.num_rows();
newData.numCols = storedData.num_cols();
newData.isPlacedAtWorker = true;
dynamic_cast<AllocationDescriptorGRPC &>(*(dp->allocation)).updateDistributedData(newData);
dynamic_cast<AllocationDescriptorGRPC *>(dp->getAllocation(0))->updateDistributedData(newData);
});
threads_vector.push_back(move(t));
}
Expand Down
34 changes: 16 additions & 18 deletions src/runtime/distributed/coordinator/kernels/Distribute.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,12 @@ template <class DT> struct Distribute<ALLOCATION_TYPE::DIST_MPI, DT> {

while (partioner.HasNextChunk()) {
DataPlacement *dp = partioner.GetNextChunk();
auto rank = dynamic_cast<AllocationDescriptorMPI &>(*(dp->allocation)).getRank();
auto rank = dynamic_cast<AllocationDescriptorMPI *>(dp->getAllocation(0))->getRank();

if (dynamic_cast<AllocationDescriptorMPI &>(*(dp->allocation)).getDistributedData().isPlacedAtWorker)
if (dynamic_cast<AllocationDescriptorMPI *>(dp->getAllocation(0))->getDistributedData().isPlacedAtWorker)
continue;

auto slicedMat = mat->sliceRow(dp->range->r_start, dp->range->r_start + dp->range->r_len);

auto slicedMat = mat->sliceRow(dp->getRange()->r_start, dp->getRange()->r_start + dp->getRange()->r_len);
// Minimum chunk size
auto min_chunk_size =
dctx->config.max_distributed_serialization_chunk_size < DaphneSerializer<DT>::length(slicedMat)
Expand All @@ -91,12 +90,12 @@ template <class DT> struct Distribute<ALLOCATION_TYPE::DIST_MPI, DT> {
WorkerImpl::StoredInfo dataAcknowledgement = MPIHelper::getDataAcknowledgement(&rank);
std::string address = std::to_string(rank);
DataPlacement *dp = mat->getMetaDataObject()->getDataPlacementByLocation(address);
auto data = dynamic_cast<AllocationDescriptorMPI &>(*(dp->allocation)).getDistributedData();
auto data = dynamic_cast<AllocationDescriptorMPI *>(dp->getAllocation(0))->getDistributedData();
data.identifier = dataAcknowledgement.identifier;
data.numRows = dataAcknowledgement.numRows;
data.numCols = dataAcknowledgement.numCols;
data.isPlacedAtWorker = true;
dynamic_cast<AllocationDescriptorMPI &>(*(dp->allocation)).updateDistributedData(data);
dynamic_cast<AllocationDescriptorMPI *>(dp->getAllocation(0))->updateDistributedData(data);
}
}
};
Expand All @@ -121,17 +120,17 @@ template <class DT> struct Distribute<ALLOCATION_TYPE::DIST_GRPC_ASYNC, DT> {
while (partioner.HasNextChunk()) {
auto dp = partioner.GetNextChunk();
// Skip if already placed at workers
if (dynamic_cast<AllocationDescriptorGRPC &>(*(dp->allocation)).getDistributedData().isPlacedAtWorker)
if (dynamic_cast<AllocationDescriptorGRPC *>(dp->getAllocation(0))->getDistributedData().isPlacedAtWorker)
continue;
distributed::Data protoMsg;

std::vector<char> buffer;

auto slicedMat = mat->sliceRow(dp->range->r_start, dp->range->r_start + dp->range->r_len);
auto slicedMat = mat->sliceRow(dp->getRange()->r_start, dp->getRange()->r_start + dp->getRange()->r_len);

StoredInfo storedInfo({dp->dp_id});
StoredInfo storedInfo({dp->getID()});

auto address = dynamic_cast<AllocationDescriptorGRPC &>(*(dp->allocation)).getLocation();
auto address = dynamic_cast<AllocationDescriptorGRPC *>(dp->getAllocation(0))->getLocation();

caller.asyncStoreCall(address, storedInfo);
// Minimum chunk size
Expand Down Expand Up @@ -161,12 +160,12 @@ template <class DT> struct Distribute<ALLOCATION_TYPE::DIST_GRPC_ASYNC, DT> {

auto dp = mat->getMetaDataObject()->getDataPlacementByID(dp_id);

auto data = dynamic_cast<AllocationDescriptorGRPC &>(*(dp->allocation)).getDistributedData();
auto data = dynamic_cast<AllocationDescriptorGRPC *>(dp->getAllocation(0))->getDistributedData();
data.identifier = storedData.identifier();
data.numRows = storedData.num_rows();
data.numCols = storedData.num_cols();
data.isPlacedAtWorker = true;
dynamic_cast<AllocationDescriptorGRPC &>(*(dp->allocation)).updateDistributedData(data);
dynamic_cast<AllocationDescriptorGRPC *>(dp->getAllocation(0))->updateDistributedData(data);
}
}
};
Expand All @@ -188,22 +187,21 @@ template <class DT> struct Distribute<ALLOCATION_TYPE::DIST_GRPC_SYNC, DT> {
while (partioner.HasNextChunk()) {
auto dp = partioner.GetNextChunk();
// Skip if already placed at workers
if (dynamic_cast<AllocationDescriptorGRPC &>(*(dp->allocation)).getDistributedData().isPlacedAtWorker)
if (dynamic_cast<AllocationDescriptorGRPC *>(dp->getAllocation(0))->getDistributedData().isPlacedAtWorker)
continue;

std::vector<char> buffer;

auto workerAddr = dynamic_cast<AllocationDescriptorGRPC &>(*(dp->allocation)).getLocation();
auto workerAddr = dynamic_cast<AllocationDescriptorGRPC *>(dp->getAllocation(0))->getLocation();
std::thread t([=, &mat]() {
auto stub = ctx->stubs[workerAddr].get();

distributed::StoredData storedData;
grpc::ClientContext grpc_ctx;

auto slicedMat = mat->sliceRow(dp->range->r_start, dp->range->r_start + dp->range->r_len);
auto slicedMat = mat->sliceRow(dp->getRange()->r_start, dp->getRange()->r_start + dp->getRange()->r_len);
auto serializer =
DaphneSerializerChunks<DT>(slicedMat, dctx->config.max_distributed_serialization_chunk_size);

distributed::Data protoMsg;

// Send chunks
Expand All @@ -222,10 +220,10 @@ template <class DT> struct Distribute<ALLOCATION_TYPE::DIST_GRPC_SYNC, DT> {
newData.numRows = storedData.num_rows();
newData.numCols = storedData.num_cols();
newData.isPlacedAtWorker = true;
dynamic_cast<AllocationDescriptorGRPC &>(*(dp->allocation)).updateDistributedData(newData);
dynamic_cast<AllocationDescriptorGRPC *>(dp->getAllocation(0))->updateDistributedData(newData);
DataObjectFactory::destroy(slicedMat);
});
threads_vector.push_back(move(t));
threads_vector.push_back(std::move(t));
}
for (auto &thread : threads_vector)
thread.join();
Expand Down
Loading

0 comments on commit a4ffaa3

Please sign in to comment.