Skip to content

Commit

Permalink
Read-Write Daphne hdfs.
Browse files Browse the repository at this point in the history
Distributed Read/Write
  • Loading branch information
aristotelis96 authored and KostasBitsakos committed Jun 26, 2024
1 parent 643a6af commit 1edd51d
Show file tree
Hide file tree
Showing 26 changed files with 1,221 additions and 234 deletions.
3 changes: 3 additions & 0 deletions UserConfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
"taskPartitioningScheme": "STATIC",
"numberOfThreads": -1,
"minimumTaskSize": 1,
"useHdfs": false,
"hdfsAddress": "",
"hdfsUsername": "",
"libdir": "{exedir}/../lib",
"daphnedsl_import_paths": {},
"force_cuda": false,
Expand Down
7 changes: 6 additions & 1 deletion src/api/cli/DaphneUserConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,12 @@ struct DaphneUserConfig {
size_t max_distributed_serialization_chunk_size = std::numeric_limits<int>::max() - 1024; // 2GB (-1KB to make up for gRPC headers etc.) - which is the maximum size allowed by gRPC / MPI. TODO: Investigate what might be the optimal.
int numberOfThreads = -1;
int minimumTaskSize = 1;


// hdfs
bool use_hdfs = false;
std::string hdfs_Address = "";
std::string hdfs_username = "";

// minimum considered log level (e.g., no logging below ERROR (essentially suppressing WARN, INFO, DEBUG and TRACE)
spdlog::level::level_enum log_level_limit = spdlog::level::err;
std::vector<LogConfig> loggers;
Expand Down
35 changes: 34 additions & 1 deletion src/api/internal/daphne_internal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ int startDAPHNE(int argc, const char** argv, DaphneLibResult* daphneLibRes, int
static OptionCategory daphneOptions("DAPHNE Options");
static OptionCategory schedulingOptions("Advanced Scheduling Knobs");
static OptionCategory distributedBackEndSetupOptions("Distributed Backend Knobs");
static OptionCategory HDFSOptions("HDFS Knobs");


// Options ----------------------------------------------------------------
Expand All @@ -147,6 +148,22 @@ int startDAPHNE(int argc, const char** argv, DaphneLibResult* daphneLibRes, int
init(std::numeric_limits<int>::max() - 1024)
);

// HDFS knobs
static opt<bool> use_hdfs(
"enable-hdfs", cat(HDFSOptions),
desc("Enable HDFS filesystem")
);
static opt<string> hdfs_Address(
"hdfs-ip", cat(HDFSOptions),
desc("IP of the HDFS filesystem (including port)."),
init("")
);
static opt<string> hdfs_username(
"hdfs-username", cat(HDFSOptions),
desc("Username of the HDFS filesystem."),
init("")
);


// Scheduling options

Expand Down Expand Up @@ -397,6 +414,7 @@ int startDAPHNE(int argc, const char** argv, DaphneLibResult* daphneLibRes, int
visibleCategories.push_back(&daphneOptions);
visibleCategories.push_back(&schedulingOptions);
visibleCategories.push_back(&distributedBackEndSetupOptions);
visibleCategories.push_back(&HDFSOptions);

HideUnrelatedOptions(visibleCategories);

Expand Down Expand Up @@ -477,7 +495,22 @@ int startDAPHNE(int argc, const char** argv, DaphneLibResult* daphneLibRes, int
if(user_config.distributedBackEndSetup!=ALLOCATION_TYPE::DIST_MPI && user_config.distributedBackEndSetup!=ALLOCATION_TYPE::DIST_GRPC_SYNC && user_config.distributedBackEndSetup!=ALLOCATION_TYPE::DIST_GRPC_ASYNC)
spdlog::warn("No backend has been selected. Wiil use the default 'MPI'");
}
user_config.max_distributed_serialization_chunk_size = maxDistrChunkSize;
user_config.max_distributed_serialization_chunk_size = maxDistrChunkSize;

// only overwrite with non-defaults
if (use_hdfs) {
user_config.use_hdfs = use_hdfs;
}
if (hdfs_Address != "") {
user_config.hdfs_Address = hdfs_Address;
}
if (hdfs_username != "") {
user_config.hdfs_username = hdfs_username;
}
if (user_config.use_hdfs && (user_config.hdfs_Address == "" || user_config.hdfs_username == "")){
spdlog::warn("HDFS is enabled, but the HDFS IP address or username were not provided.");
}

for (auto explain : explainArgList) {
switch (explain) {
case kernels:
Expand Down
6 changes: 6 additions & 0 deletions src/parser/config/ConfigParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ void ConfigParser::readUserConfig(const std::string& filename, DaphneUserConfig&
config.numberOfThreads = jf.at(DaphneConfigJsonParams::NUMBER_OF_THREADS).get<int>();
if (keyExists(jf, DaphneConfigJsonParams::MINIMUM_TASK_SIZE))
config.minimumTaskSize = jf.at(DaphneConfigJsonParams::MINIMUM_TASK_SIZE).get<int>();
if (keyExists(jf, DaphneConfigJsonParams::USE_HDFS))
config.use_hdfs = jf.at(DaphneConfigJsonParams::USE_HDFS).get<bool>();
if (keyExists(jf, DaphneConfigJsonParams::HDFS_ADDRESS))
config.hdfs_Address = jf.at(DaphneConfigJsonParams::HDFS_ADDRESS).get<std::string>();
if (keyExists(jf, DaphneConfigJsonParams::HDFS_USERNAME))
config.hdfs_username = jf.at(DaphneConfigJsonParams::HDFS_USERNAME).get<std::string>();
#ifdef USE_CUDA
if (keyExists(jf, DaphneConfigJsonParams::CUDA_DEVICES))
config.cuda_devices = jf.at(DaphneConfigJsonParams::CUDA_DEVICES).get<std::vector<int>>();
Expand Down
6 changes: 6 additions & 0 deletions src/parser/config/JsonParams.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ struct DaphneConfigJsonParams {
inline static const std::string TASK_PARTITIONING_SCHEME = "taskPartitioningScheme";
inline static const std::string NUMBER_OF_THREADS = "numberOfThreads";
inline static const std::string MINIMUM_TASK_SIZE = "minimumTaskSize";
inline static const std::string USE_HDFS = "useHdfs";
inline static const std::string HDFS_ADDRESS = "hdfsAddress";
inline static const std::string HDFS_USERNAME = "hdfsUsername";
inline static const std::string CUDA_DEVICES = "cuda_devices";
inline static const std::string LIB_DIR = "libdir";
inline static const std::string DAPHNEDSL_IMPORT_PATHS = "daphnedsl_import_paths";
Expand Down Expand Up @@ -97,6 +100,9 @@ struct DaphneConfigJsonParams {
TASK_PARTITIONING_SCHEME,
NUMBER_OF_THREADS,
MINIMUM_TASK_SIZE,
USE_HDFS,
HDFS_ADDRESS,
HDFS_USERNAME,
CUDA_DEVICES,
LIB_DIR,
DAPHNEDSL_IMPORT_PATHS,
Expand Down
79 changes: 45 additions & 34 deletions src/parser/metadata/MetaDataParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,19 @@

#include <fstream>
#include <iostream>
#include <filesystem>

FileMetaData MetaDataParser::readMetaData(const std::string& filename_) {
std::string metaFilename = filename_ + ".meta";
std::ifstream ifs(metaFilename, std::ios::in);
if (!ifs.good())
throw std::runtime_error("Could not open file '" + metaFilename + "' for reading meta data.");

nlohmann::json jf = nlohmann::json::parse(ifs);
std::stringstream buffer;
buffer << ifs.rdbuf();
return MetaDataParser::readMetaDataFromString(buffer.str());
}
FileMetaData MetaDataParser::readMetaDataFromString(const std::string& str) {
nlohmann::json jf = nlohmann::json::parse(str);

if (!keyExists(jf, JsonKeys::NUM_ROWS) || !keyExists(jf, JsonKeys::NUM_COLS)) {
throw std::invalid_argument("A meta data JSON file should always contain \"" + JsonKeys::NUM_ROWS + "\" and \""
Expand Down Expand Up @@ -85,44 +90,50 @@ FileMetaData MetaDataParser::readMetaData(const std::string& filename_) {
}
}

std::string MetaDataParser::writeMetaDataToString(const FileMetaData& metaData) {
nlohmann::json json;

json[JsonKeys::NUM_ROWS] = metaData.numRows;
json[JsonKeys::NUM_COLS] = metaData.numCols;

if (metaData.isSingleValueType) {
if (metaData.schema.size() != 1)
throw std::runtime_error("inappropriate meta data tried to be written to file");
json[JsonKeys::VALUE_TYPE] = metaData.schema[0];
}
else {
std::vector<SchemaColumn> schemaColumns;
// assume that the schema and labels are the same lengths
for (unsigned int i = 0; i < metaData.schema.size(); i++) {
SchemaColumn schemaColumn;
schemaColumn.setLabel(metaData.labels[i]);
schemaColumn.setValueType(metaData.schema[i]);
schemaColumns.emplace_back(schemaColumn);
}
json[JsonKeys::SCHEMA] = schemaColumns;
}

if (metaData.numNonZeros != -1)
json[JsonKeys::NUM_NON_ZEROS] = metaData.numNonZeros;

// HDFS
if (metaData.hdfs.isHDFS){
json[JsonKeys::HDFS][JsonKeys::HDFSKeys::isHDFS] = metaData.hdfs.isHDFS;
std::filesystem::path filePath(metaData.hdfs.HDFSFilename);
auto baseFileName = filePath.filename().string();

json[JsonKeys::HDFS][JsonKeys::HDFSKeys::HDFSFilename] = "/" + baseFileName;
}
return json.dump();
}
void MetaDataParser::writeMetaData(const std::string& filename_, const FileMetaData& metaData) {
std::string metaFilename = filename_ + ".meta";
std::string metaFilename = filename_ + ".meta";
std::ofstream ofs(metaFilename, std::ios::out);
if (!ofs.good())
throw std::runtime_error("could not open file '" + metaFilename + "' for writing meta data");

if(ofs.is_open()) {
nlohmann::json json;

json[JsonKeys::NUM_ROWS] = metaData.numRows;
json[JsonKeys::NUM_COLS] = metaData.numCols;

if (metaData.isSingleValueType) {
if (metaData.schema.size() != 1)
throw std::runtime_error("inappropriate meta data tried to be written to file");
json[JsonKeys::VALUE_TYPE] = metaData.schema[0];
}
else {
std::vector<SchemaColumn> schemaColumns;
// assume that the schema and labels are the same lengths
for (unsigned int i = 0; i < metaData.schema.size(); i++) {
SchemaColumn schemaColumn;
schemaColumn.setLabel(metaData.labels[i]);
schemaColumn.setValueType(metaData.schema[i]);
schemaColumns.emplace_back(schemaColumn);
}
json[JsonKeys::SCHEMA] = schemaColumns;
}

if (metaData.numNonZeros != -1)
json[JsonKeys::NUM_NON_ZEROS] = metaData.numNonZeros;

// HDFS
if (metaData.hdfs.isHDFS){
json[JsonKeys::HDFS][JsonKeys::HDFSKeys::isHDFS] = metaData.hdfs.isHDFS;
json[JsonKeys::HDFS][JsonKeys::HDFSKeys::HDFSFilename] = metaData.hdfs.HDFSFilename;
}
ofs << json.dump();
ofs << MetaDataParser::writeMetaDataToString(metaData);
}
else
throw std::runtime_error("could not open file '" + metaFilename + "' for writing meta data");
Expand Down
3 changes: 2 additions & 1 deletion src/parser/metadata/MetaDataParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class MetaDataParser {
* keys or if the file doesn't contain all the metadata.
*/
static FileMetaData readMetaData(const std::string& filename);

static FileMetaData readMetaDataFromString(const std::string& str);
/**
* @brief Saves the file meta data to the specified file.
*
Expand All @@ -76,6 +76,7 @@ class MetaDataParser {
* @throws std::runtime_error Thrown if the specified file could not be openn.
*/
static void writeMetaData(const std::string& filename, const FileMetaData& metaData);
static std::string writeMetaDataToString(const FileMetaData& metaData);

private:
/**
Expand Down
136 changes: 124 additions & 12 deletions src/runtime/distributed/coordinator/kernels/DistributedRead.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,142 @@
* limitations under the License.
*/

#ifndef SRC_RUNTIME_DISTRIBUTED_COORDINATOR_KERNELS_DISTRIBUTEDREAD_H
#define SRC_RUNTIME_DISTRIBUTED_COORDINATOR_KERNELS_DISTRIBUTEDREAD_H
#pragma once

#include <runtime/local/context/DaphneContext.h>
#include <runtime/local/context/DistributedContext.h>
#include <runtime/local/datastructures/DataObjectFactory.h>
#include <runtime/local/io/ReadCsv.h>
#include <runtime/local/io/File.h>
#include <parser/metadata/MetaDataParser.h>
#include <runtime/local/datastructures/DenseMatrix.h>

#include <runtime/distributed/coordinator/scheduling/LoadPartitioningDistributed.h>
#include <runtime/local/datastructures/AllocationDescriptorGRPC.h>
#include <runtime/distributed/proto/DistributedGRPCCaller.h>
#include <runtime/distributed/worker/WorkerImpl.h>

#ifdef USE_MPI
#include <runtime/distributed/worker/MPIHelper.h>
#endif

#include <cassert>
#include <cstddef>

// ****************************************************************************
// Struct for partial template specialization
// ****************************************************************************

template<ALLOCATION_TYPE AT, class DTRes>
struct DistributedRead {
static void apply(DTRes *&res, const char *filename, DCTX(dctx)) = delete;
};

// ****************************************************************************
// Convenience function
// ****************************************************************************

template<class DT>
void distributedRead(Handle<DT> *&res, const char * filename, DCTX(ctx))
template<class DTRes>
void distributedRead(DTRes *&res, const char *filename, DCTX(dctx))
{
FileMetaData fmd = MetaDataParser::readMetaData(filename);

readCsv(res, filename, fmd.numRows, fmd.numCols, ',');
const auto allocation_type = dctx->getUserConfig().distributedBackEndSetup;
if (allocation_type == ALLOCATION_TYPE::DIST_MPI)
{
#ifdef USE_MPI
DistributedRead<ALLOCATION_TYPE::DIST_MPI, DTRes>::apply(res, filename, dctx);
#endif
}
else if (allocation_type == ALLOCATION_TYPE::DIST_GRPC_ASYNC)
{
DistributedRead<ALLOCATION_TYPE::DIST_GRPC_ASYNC, DTRes>::apply(res, filename, dctx);
}
else if (allocation_type == ALLOCATION_TYPE::DIST_GRPC_SYNC)
{
DistributedRead<ALLOCATION_TYPE::DIST_GRPC_SYNC, DTRes>::apply(res, filename, dctx);
}
}


#endif //SRC_RUNTIME_DISTRIBUTED_COORDINATOR_KERNELS_DISTRIBUTEDREAD_H
// ****************************************************************************
// (Partial) template specializations for different distributed backends
// ****************************************************************************

#ifdef USE_MPI
// ----------------------------------------------------------------------------
// MPI
// ----------------------------------------------------------------------------
template<class DTRes>
struct DistributedRead<ALLOCATION_TYPE::DIST_MPI, DTRes>
{
static void apply(DTRes *&res, const char *filename, DCTX(dctx)) {
throw std::runtime_error("not implemented");
}
};
#endif

// ----------------------------------------------------------------------------
// Asynchronous GRPC
// ----------------------------------------------------------------------------

template<class DTRes>
struct DistributedRead<ALLOCATION_TYPE::DIST_GRPC_ASYNC, DTRes>
{
static void apply(DTRes *&res, const char *filename, DCTX(dctx)) {
throw std::runtime_error("not implemented");
}
};

// ----------------------------------------------------------------------------
// Synchronous GRPC
// ----------------------------------------------------------------------------

template<class DTRes>
struct DistributedRead<ALLOCATION_TYPE::DIST_GRPC_SYNC, DTRes>
{
static void apply(DTRes *&res, const char *filename, DCTX(dctx)) {
auto IpPort = HDFSUtils::parseIPAddress(dctx->getUserConfig().hdfs_Address);
hdfsFS fs = hdfsConnectAsUser(std::get<0>(IpPort).c_str(), std::get<1>(IpPort), dctx->getUserConfig().hdfs_username.c_str());
if (fs == NULL)
std::cout << "Error connecting to HDFS" << std::endl;
auto ctx = DistributedContext::get(dctx);
auto workers = ctx->getWorkers();

assert(res != nullptr);

// Generate metadata for the object based on MetaDataFile and
// when the worker needs the data it will read it automatically

std::vector<std::thread> threads_vector;
LoadPartitioningDistributed<DTRes, AllocationDescriptorGRPC> partioner(DistributionSchema::DISTRIBUTE, res, dctx);
while (partioner.HasNextChunk()){
auto hdfsFn = std::string(filename);
auto dp = partioner.GetNextChunk();

auto workerAddr = dynamic_cast<AllocationDescriptorGRPC&>(*(dp->allocation)).getLocation();
std::thread t([=, &res]()
{
auto stub = ctx->stubs[workerAddr].get();

distributed::HDFSFile fileData;
fileData.set_filename(hdfsFn);
fileData.set_start_row(dp->range->r_start);
fileData.set_num_rows(dp->range->r_len);
fileData.set_num_cols(dp->range->c_len);

grpc::ClientContext grpc_ctx;
distributed::StoredData response;

auto status = stub->ReadHDFS(&grpc_ctx, fileData, &response);
if (!status.ok())
throw std::runtime_error(status.error_message());

DistributedData newData;
newData.identifier = response.identifier();
newData.numRows = response.num_rows();
newData.numCols = response.num_cols();
newData.isPlacedAtWorker = true;
dynamic_cast<AllocationDescriptorGRPC&>(*(dp->allocation)).updateDistributedData(newData);
});
threads_vector.push_back(move(t));
}

for (auto &thread : threads_vector)
thread.join();
}
};
Loading

0 comments on commit 1edd51d

Please sign in to comment.