Skip to content

Commit

Permalink
[BUGFIX] LoadPartitioningDistributed crashed
Browse files Browse the repository at this point in the history
Due to the use of ptr to local var the distributed (GRPC_SYNC) mode crashed in test cases. This patch fixes this by using std::unique_ptr appropriately.
Furthermore, a check for nullptr is performed before getting distributed data to add a message indicating that execution failed here.
  • Loading branch information
corepointer committed Oct 18, 2024
1 parent 2e15f49 commit d9d1b59
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 12 deletions.
13 changes: 9 additions & 4 deletions src/runtime/distributed/coordinator/kernels/Distribute.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,14 @@ template <class DT> struct Distribute<ALLOCATION_TYPE::DIST_GRPC_SYNC, DT> {
LoadPartitioningDistributed<DT, AllocationDescriptorGRPC> partioner(DistributionSchema::DISTRIBUTE, mat, dctx);
while (partioner.HasNextChunk()) {
auto dp = partioner.GetNextChunk();
// Skip if already placed at workers
if (dynamic_cast<AllocationDescriptorGRPC *>(dp->getAllocation(0))->getDistributedData().isPlacedAtWorker)
continue;

if (auto grpc_alloc = dynamic_cast<AllocationDescriptorGRPC *>(dp->getAllocation(0))) {
auto dist_data = grpc_alloc->getDistributedData();
// Skip if already placed at workers
if (dist_data.isPlacedAtWorker)
continue;
} else
throw std::runtime_error("dynamic_cast<AllocationDescriptorGRPC*>(alloc) failed (returned nullptr)");
std::vector<char> buffer;

auto workerAddr = dynamic_cast<AllocationDescriptorGRPC *>(dp->getAllocation(0))->getLocation();
Expand All @@ -199,7 +203,8 @@ template <class DT> struct Distribute<ALLOCATION_TYPE::DIST_GRPC_SYNC, DT> {
distributed::StoredData storedData;
grpc::ClientContext grpc_ctx;

auto slicedMat = mat->sliceRow(dp->getRange()->r_start, dp->getRange()->r_start + dp->getRange()->r_len);
auto slicedMat =
mat->sliceRow(dp->getRange()->r_start, dp->getRange()->r_start + dp->getRange()->r_len);
auto serializer =
DaphneSerializerChunks<DT>(slicedMat, dctx->config.max_distributed_serialization_chunk_size);
distributed::Data protoMsg;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@
#pragma once

#include <runtime/local/context/DistributedContext.h>

#include <runtime/local/datastructures/AllocationDescriptorGRPC.h>
#include <runtime/local/datastructures/AllocationDescriptorMPI.h>
#include <runtime/local/datastructures/Range.h>

#include <cstddef>
#include <stdexcept>
Expand Down Expand Up @@ -55,11 +52,12 @@ template <class DT, class ALLOCATOR> class LoadPartitioningDistributed {
// Here we provide the different implementations.
// Another solution would be to make sure that every constructor is similar
// so this would not be needed.
static ALLOCATOR CreateAllocatorDescriptor(DaphneContext* ctx, const std::string &addr, DistributedData& data) {
static std::unique_ptr<ALLOCATOR> CreateAllocatorDescriptor(DaphneContext *ctx, const std::string &addr,
DistributedData &data) {
if constexpr (std::is_same_v<ALLOCATOR, AllocationDescriptorMPI>)
return AllocationDescriptorMPI(std::stoi(addr), ctx, data);
return std::make_unique<AllocationDescriptorMPI>(std::stoi(addr), ctx, data);
else if constexpr (std::is_same_v<ALLOCATOR, AllocationDescriptorGRPC>)
return AllocationDescriptorGRPC(ctx, addr, data);
return std::make_unique<AllocationDescriptorGRPC>(ctx, addr, data);
else
throw std::runtime_error("Unknown allocation type");
}
Expand Down Expand Up @@ -133,7 +131,7 @@ template <class DT, class ALLOCATOR> class LoadPartitioningDistributed {
data.ix = GetDistributedIndex();
auto allocationDescriptor = CreateAllocatorDescriptor(dctx, workerAddr, data);
std::vector<std::unique_ptr<IAllocationDescriptor>> allocations;
allocations.emplace_back(&allocationDescriptor);
allocations.emplace_back(std::move(allocationDescriptor));
dp = mat->getMetaDataObject()->addDataPlacement(allocations, &range);
}
taskIndex++;
Expand Down Expand Up @@ -199,7 +197,7 @@ template <class DT, class ALLOCATOR> class LoadPartitioningDistributed {
} else { // else create new dp entry
auto allocationDescriptor = CreateAllocatorDescriptor(dctx, workerAddr, data);
std::vector<std::unique_ptr<IAllocationDescriptor>> allocations;
allocations.emplace_back(&allocationDescriptor);
allocations.emplace_back(std::move(allocationDescriptor));
((*outputs[i]))->getMetaDataObject()->addDataPlacement(allocations, &range);
}
}
Expand Down

0 comments on commit d9d1b59

Please sign in to comment.