diff --git a/cpp/include/milvus-storage/common/profile.h b/cpp/include/milvus-storage/common/profile.h new file mode 100644 index 0000000..f8840ce --- /dev/null +++ b/cpp/include/milvus-storage/common/profile.h @@ -0,0 +1,111 @@ +/* + * Author: David Robert Nadeau + * Site: http://NadeauSoftware.com/ + * License: Creative Commons Attribution 3.0 Unported License + * http://creativecommons.org/licenses/by/3.0/deed.en_US + */ + +#if defined(_WIN32) +#include +#include + +#elif defined(__unix__) || defined(__unix) || defined(unix) || (defined(__APPLE__) && defined(__MACH__)) +#include +#include + +#if defined(__APPLE__) && defined(__MACH__) +#include + +#elif (defined(_AIX) || defined(__TOS__AIX__)) || \ + (defined(__sun__) || defined(__sun) || defined(sun) && (defined(__SVR4) || defined(__svr4__))) +#include +#include + +#elif defined(__linux__) || defined(__linux) || defined(linux) || defined(__gnu_linux__) +#include + +#endif + +#else +#error "Cannot define getPeakRSS( ) or getCurrentRSS( ) for an unknown OS." +#endif + +/** + * Returns the peak (maximum so far) resident set size (physical + * memory use) measured in bytes, or zero if the value cannot be + * determined on this OS. + */ +inline size_t getPeakRSS() { +#if defined(_WIN32) + /* Windows -------------------------------------------------- */ + PROCESS_MEMORY_COUNTERS info; + GetProcessMemoryInfo(GetCurrentProcess(), &info, sizeof(info)); + return (size_t)info.PeakWorkingSetSize; + +#elif (defined(_AIX) || defined(__TOS__AIX__)) || \ + (defined(__sun__) || defined(__sun) || defined(sun) && (defined(__SVR4) || defined(__svr4__))) + /* AIX and Solaris ------------------------------------------ */ + struct psinfo psinfo; + int fd = -1; + if ((fd = open("/proc/self/psinfo", O_RDONLY)) == -1) + return (size_t)0L; /* Can't open? */ + if (read(fd, &psinfo, sizeof(psinfo)) != sizeof(psinfo)) { + close(fd); + return (size_t)0L; /* Can't read? */ + } + close(fd); + return (size_t)(psinfo.pr_rssize * 1024L); + +#elif defined(__unix__) || defined(__unix) || defined(unix) || (defined(__APPLE__) && defined(__MACH__)) + /* BSD, Linux, and OSX -------------------------------------- */ + struct rusage rusage; + getrusage(RUSAGE_SELF, &rusage); +#if defined(__APPLE__) && defined(__MACH__) + return (size_t)rusage.ru_maxrss; +#else + return (size_t)(rusage.ru_maxrss * 1024L); +#endif + +#else + /* Unknown OS ----------------------------------------------- */ + return (size_t)0L; /* Unsupported. */ +#endif +} + +/** + * Returns the current resident set size (physical memory use) measured + * in bytes, or zero if the value cannot be determined on this OS. + */ +inline size_t getCurrentRSS() { +#if defined(_WIN32) + /* Windows -------------------------------------------------- */ + PROCESS_MEMORY_COUNTERS info; + GetProcessMemoryInfo(GetCurrentProcess(), &info, sizeof(info)); + return (size_t)info.WorkingSetSize; + +#elif defined(__APPLE__) && defined(__MACH__) + /* OSX ------------------------------------------------------ */ + struct mach_task_basic_info info; + mach_msg_type_number_t infoCount = MACH_TASK_BASIC_INFO_COUNT; + if (task_info(mach_task_self(), MACH_TASK_BASIC_INFO, (task_info_t)&info, &infoCount) != KERN_SUCCESS) + return (size_t)0L; /* Can't access? */ + return (size_t)info.resident_size; + +#elif defined(__linux__) || defined(__linux) || defined(linux) || defined(__gnu_linux__) + /* Linux ---------------------------------------------------- */ + long rss = 0L; + FILE* fp = NULL; + if ((fp = fopen("/proc/self/statm", "r")) == NULL) + return (size_t)0L; /* Can't open? */ + if (fscanf(fp, "%*s%ld", &rss) != 1) { + fclose(fp); + return (size_t)0L; /* Can't read? */ + } + fclose(fp); + return (size_t)rss * (size_t)sysconf(_SC_PAGESIZE); + +#else + /* AIX, BSD, Solaris, and Unknown OS ------------------------ */ + return (size_t)0L; /* Unsupported. */ +#endif +} \ No newline at end of file diff --git a/cpp/src/common/fs_util.cpp b/cpp/src/common/fs_util.cpp index acee746..0870e35 100644 --- a/cpp/src/common/fs_util.cpp +++ b/cpp/src/common/fs_util.cpp @@ -52,6 +52,9 @@ Result> BuildFileSystem(const std::string uri_parser.password(); options.ConfigureAccessKey(std::getenv("ACCESS_KEY"), std::getenv("SECRET_KEY")); *out_path = std::getenv("FILE_PATH"); + if (std::getenv("REGION") != nullptr) { + options.region = std::getenv("REGION"); + } ASSIGN_OR_RETURN_ARROW_NOT_OK(auto fs, arrow::fs::S3FileSystem::Make(options)); return std::shared_ptr(fs); diff --git a/cpp/src/packed/reader.cpp b/cpp/src/packed/reader.cpp index c0e2086..6da81d7 100644 --- a/cpp/src/packed/reader.cpp +++ b/cpp/src/packed/reader.cpp @@ -21,6 +21,7 @@ #include "common/arrow_util.h" #include "common/log.h" #include "packed/chunk_manager.h" +#include "common/profile.h" namespace milvus_storage { @@ -138,6 +139,10 @@ arrow::Status PackedRecordBatchReader::advanceBuffer() { } arrow::Status PackedRecordBatchReader::ReadNext(std::shared_ptr* out) { + size_t currentSize = getCurrentRSS() / 1024 / 1024; + size_t peakSize = getPeakRSS() / 1024 / 1024; + LOG_STORAGE_DEBUG_ << "Packed Reader Current memory usage: " << currentSize << " MB, Peak memory usage: " << peakSize + << " MB"; if (absolute_row_position_ >= row_limit_) { RETURN_NOT_OK(advanceBuffer()); if (absolute_row_position_ >= row_limit_) { diff --git a/cpp/src/packed/writer.cpp b/cpp/src/packed/writer.cpp index ee3894f..5c01d24 100644 --- a/cpp/src/packed/writer.cpp +++ b/cpp/src/packed/writer.cpp @@ -21,6 +21,7 @@ #include "packed/splitter/indices_based_splitter.h" #include "packed/splitter/size_based_splitter.h" #include "common/fs_util.h" +#include "common/profile.h" namespace milvus_storage { @@ -78,6 +79,10 @@ Status PackedRecordBatchWriter::Init(const std::shared_ptr& } Status PackedRecordBatchWriter::Write(const std::shared_ptr& record) { + size_t currentSize = getCurrentRSS() / 1024 / 1024; + size_t peakSize = getPeakRSS() / 1024 / 1024; + LOG_STORAGE_DEBUG_ << "Packed Writer Current memory usage: " << currentSize << " MB, Peak memory usage: " << peakSize + << " MB"; std::vector column_groups = splitter_.Split(record); // Calculate the total memory usage of the new column groups diff --git a/cpp/test/packed/packed_test_base.h b/cpp/test/packed/packed_test_base.h index 0f54080..485478d 100644 --- a/cpp/test/packed/packed_test_base.h +++ b/cpp/test/packed/packed_test_base.h @@ -20,8 +20,10 @@ #include #include #include -#include "test_util.h" #include "arrow/table.h" +#include "common/log.h" +#include +#include using namespace std; @@ -30,41 +32,60 @@ namespace milvus_storage { class PackedTestBase : public ::testing::Test { protected: void SetUpCommonData() { + record_batch_ = randomRecordBatch(); + table_ = arrow::Table::FromRecordBatches({record_batch_}).ValueOrDie(); + schema_ = table_->schema(); + } + + protected: + std::shared_ptr randomRecordBatch() { arrow::Int32Builder int_builder; arrow::Int64Builder int64_builder; arrow::StringBuilder str_builder; - ASSERT_STATUS_OK(int_builder.AppendValues(int32_values)); - ASSERT_STATUS_OK(int64_builder.AppendValues(int64_values)); - ASSERT_STATUS_OK(str_builder.AppendValues(str_values)); + int32_values = {rand() % 10000, rand() % 10000, rand() % 10000}; + int64_values = {rand() % 10000000, rand() % 10000000, rand() % 10000000}; + str_values = {random_string(10000), random_string(10000), random_string(10000)}; + + int_builder.AppendValues(int32_values); + int64_builder.AppendValues(int64_values); + str_builder.AppendValues(str_values); std::shared_ptr int_array; std::shared_ptr int64_array; std::shared_ptr str_array; - ASSERT_STATUS_OK(int_builder.Finish(&int_array)); - ASSERT_STATUS_OK(int64_builder.Finish(&int64_array)); - ASSERT_STATUS_OK(str_builder.Finish(&str_array)); + int_builder.Finish(&int_array); + int64_builder.Finish(&int64_array); + str_builder.Finish(&str_array); - std::vector> fields = {arrow::field("int32", arrow::int32()), - arrow::field("int64", arrow::int64()), - arrow::field("str", arrow::utf8())}; std::vector> arrays = {int_array, int64_array, str_array}; + auto schema = arrow::schema({arrow::field("int32", arrow::int32()), arrow::field("int64", arrow::int64()), + arrow::field("str", arrow::utf8())}); + return arrow::RecordBatch::Make(schema, 3, arrays); + } - schema_ = arrow::schema(fields); - record_batch_ = arrow::RecordBatch::Make(schema_, 3, arrays); - - table_ = arrow::Table::FromRecordBatches({record_batch_}).ValueOrDie(); + std::string random_string(size_t length) { + auto randchar = []() -> char { + const char charset[] = + "0123456789" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz"; + const size_t max_index = (sizeof(charset) - 1); + return charset[rand() % max_index]; + }; + std::string str(length, 0); + std::generate_n(str.begin(), length, randchar); + return str; } std::shared_ptr schema_; std::shared_ptr record_batch_; std::shared_ptr table_; - const std::vector int32_values = {1, 2, 3}; - const std::vector int64_values = {4, 5, 6}; - const std::vector> str_values = {std::string(10000, 'a'), std::string(10000, 'b'), - std::string(10000, 'c')}; + std::vector int32_values; + std::vector int64_values; + std::vector> str_values; }; } // namespace milvus_storage \ No newline at end of file