Skip to content

Commit

Permalink
add memory profiler and random data for ut
Browse files Browse the repository at this point in the history
Signed-off-by: shaoting-huang <[email protected]>
  • Loading branch information
shaoting-huang committed Aug 30, 2024
1 parent c330061 commit 7095f9c
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 18 deletions.
111 changes: 111 additions & 0 deletions cpp/include/milvus-storage/common/profile.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Author: David Robert Nadeau
* Site: http://NadeauSoftware.com/
* License: Creative Commons Attribution 3.0 Unported License
* http://creativecommons.org/licenses/by/3.0/deed.en_US
*/

#if defined(_WIN32)
#include <windows.h>
#include <psapi.h>

#elif defined(__unix__) || defined(__unix) || defined(unix) || (defined(__APPLE__) && defined(__MACH__))
#include <unistd.h>
#include <sys/resource.h>

#if defined(__APPLE__) && defined(__MACH__)
#include <mach/mach.h>

#elif (defined(_AIX) || defined(__TOS__AIX__)) || \
(defined(__sun__) || defined(__sun) || defined(sun) && (defined(__SVR4) || defined(__svr4__)))
#include <fcntl.h>
#include <procfs.h>

#elif defined(__linux__) || defined(__linux) || defined(linux) || defined(__gnu_linux__)
#include <stdio.h>

#endif

#else
#error "Cannot define getPeakRSS( ) or getCurrentRSS( ) for an unknown OS."
#endif

/**
* Returns the peak (maximum so far) resident set size (physical
* memory use) measured in bytes, or zero if the value cannot be
* determined on this OS.
*/
inline size_t getPeakRSS() {
#if defined(_WIN32)
/* Windows -------------------------------------------------- */
PROCESS_MEMORY_COUNTERS info;
GetProcessMemoryInfo(GetCurrentProcess(), &info, sizeof(info));
return (size_t)info.PeakWorkingSetSize;

#elif (defined(_AIX) || defined(__TOS__AIX__)) || \
(defined(__sun__) || defined(__sun) || defined(sun) && (defined(__SVR4) || defined(__svr4__)))
/* AIX and Solaris ------------------------------------------ */
struct psinfo psinfo;
int fd = -1;
if ((fd = open("/proc/self/psinfo", O_RDONLY)) == -1)
return (size_t)0L; /* Can't open? */
if (read(fd, &psinfo, sizeof(psinfo)) != sizeof(psinfo)) {
close(fd);
return (size_t)0L; /* Can't read? */
}
close(fd);
return (size_t)(psinfo.pr_rssize * 1024L);

#elif defined(__unix__) || defined(__unix) || defined(unix) || (defined(__APPLE__) && defined(__MACH__))
/* BSD, Linux, and OSX -------------------------------------- */
struct rusage rusage;
getrusage(RUSAGE_SELF, &rusage);
#if defined(__APPLE__) && defined(__MACH__)
return (size_t)rusage.ru_maxrss;
#else
return (size_t)(rusage.ru_maxrss * 1024L);
#endif

#else
/* Unknown OS ----------------------------------------------- */
return (size_t)0L; /* Unsupported. */
#endif
}

/**
* Returns the current resident set size (physical memory use) measured
* in bytes, or zero if the value cannot be determined on this OS.
*/
inline size_t getCurrentRSS() {
#if defined(_WIN32)
/* Windows -------------------------------------------------- */
PROCESS_MEMORY_COUNTERS info;
GetProcessMemoryInfo(GetCurrentProcess(), &info, sizeof(info));
return (size_t)info.WorkingSetSize;

#elif defined(__APPLE__) && defined(__MACH__)
/* OSX ------------------------------------------------------ */
struct mach_task_basic_info info;
mach_msg_type_number_t infoCount = MACH_TASK_BASIC_INFO_COUNT;
if (task_info(mach_task_self(), MACH_TASK_BASIC_INFO, (task_info_t)&info, &infoCount) != KERN_SUCCESS)
return (size_t)0L; /* Can't access? */
return (size_t)info.resident_size;

#elif defined(__linux__) || defined(__linux) || defined(linux) || defined(__gnu_linux__)
/* Linux ---------------------------------------------------- */
long rss = 0L;
FILE* fp = NULL;
if ((fp = fopen("/proc/self/statm", "r")) == NULL)
return (size_t)0L; /* Can't open? */
if (fscanf(fp, "%*s%ld", &rss) != 1) {
fclose(fp);
return (size_t)0L; /* Can't read? */
}
fclose(fp);
return (size_t)rss * (size_t)sysconf(_SC_PAGESIZE);

#else
/* AIX, BSD, Solaris, and Unknown OS ------------------------ */
return (size_t)0L; /* Unsupported. */
#endif
}
3 changes: 3 additions & 0 deletions cpp/src/common/fs_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ Result<std::shared_ptr<arrow::fs::FileSystem>> BuildFileSystem(const std::string
uri_parser.password();
options.ConfigureAccessKey(std::getenv("ACCESS_KEY"), std::getenv("SECRET_KEY"));
*out_path = std::getenv("FILE_PATH");
if (std::getenv("REGION") != nullptr) {
options.region = std::getenv("REGION");
}
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto fs, arrow::fs::S3FileSystem::Make(options));

return std::shared_ptr<arrow::fs::FileSystem>(fs);
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/packed/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "common/arrow_util.h"
#include "common/log.h"
#include "packed/chunk_manager.h"
#include "common/profile.h"

namespace milvus_storage {

Expand Down Expand Up @@ -138,6 +139,10 @@ arrow::Status PackedRecordBatchReader::advanceBuffer() {
}

arrow::Status PackedRecordBatchReader::ReadNext(std::shared_ptr<arrow::RecordBatch>* out) {
size_t currentSize = getCurrentRSS() / 1024 / 1024;
size_t peakSize = getPeakRSS() / 1024 / 1024;
LOG_STORAGE_DEBUG_ << "Packed Reader Current memory usage: " << currentSize << " MB, Peak memory usage: " << peakSize
<< " MB";
if (absolute_row_position_ >= row_limit_) {
RETURN_NOT_OK(advanceBuffer());
if (absolute_row_position_ >= row_limit_) {
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/packed/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "packed/splitter/indices_based_splitter.h"
#include "packed/splitter/size_based_splitter.h"
#include "common/fs_util.h"
#include "common/profile.h"

namespace milvus_storage {

Expand Down Expand Up @@ -78,6 +79,10 @@ Status PackedRecordBatchWriter::Init(const std::shared_ptr<arrow::RecordBatch>&
}

Status PackedRecordBatchWriter::Write(const std::shared_ptr<arrow::RecordBatch>& record) {
size_t currentSize = getCurrentRSS() / 1024 / 1024;
size_t peakSize = getPeakRSS() / 1024 / 1024;
LOG_STORAGE_DEBUG_ << "Packed Writer Current memory usage: " << currentSize << " MB, Peak memory usage: " << peakSize
<< " MB";
std::vector<ColumnGroup> column_groups = splitter_.Split(record);

// Calculate the total memory usage of the new column groups
Expand Down
57 changes: 39 additions & 18 deletions cpp/test/packed/packed_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
#include <arrow/record_batch.h>
#include <arrow/builder.h>
#include <memory>
#include "test_util.h"
#include "arrow/table.h"
#include "common/log.h"
#include <vector>
#include <string>

using namespace std;

Expand All @@ -30,41 +32,60 @@ namespace milvus_storage {
class PackedTestBase : public ::testing::Test {
protected:
void SetUpCommonData() {
record_batch_ = randomRecordBatch();
table_ = arrow::Table::FromRecordBatches({record_batch_}).ValueOrDie();
schema_ = table_->schema();
}

protected:
std::shared_ptr<arrow::RecordBatch> randomRecordBatch() {
arrow::Int32Builder int_builder;
arrow::Int64Builder int64_builder;
arrow::StringBuilder str_builder;

ASSERT_STATUS_OK(int_builder.AppendValues(int32_values));
ASSERT_STATUS_OK(int64_builder.AppendValues(int64_values));
ASSERT_STATUS_OK(str_builder.AppendValues(str_values));
int32_values = {rand() % 10000, rand() % 10000, rand() % 10000};
int64_values = {rand() % 10000000, rand() % 10000000, rand() % 10000000};
str_values = {random_string(10000), random_string(10000), random_string(10000)};

int_builder.AppendValues(int32_values);
int64_builder.AppendValues(int64_values);
str_builder.AppendValues(str_values);

std::shared_ptr<arrow::Array> int_array;
std::shared_ptr<arrow::Array> int64_array;
std::shared_ptr<arrow::Array> str_array;

ASSERT_STATUS_OK(int_builder.Finish(&int_array));
ASSERT_STATUS_OK(int64_builder.Finish(&int64_array));
ASSERT_STATUS_OK(str_builder.Finish(&str_array));
int_builder.Finish(&int_array);
int64_builder.Finish(&int64_array);
str_builder.Finish(&str_array);

std::vector<std::shared_ptr<arrow::Field>> fields = {arrow::field("int32", arrow::int32()),
arrow::field("int64", arrow::int64()),
arrow::field("str", arrow::utf8())};
std::vector<std::shared_ptr<arrow::Array>> arrays = {int_array, int64_array, str_array};
auto schema = arrow::schema({arrow::field("int32", arrow::int32()), arrow::field("int64", arrow::int64()),
arrow::field("str", arrow::utf8())});
return arrow::RecordBatch::Make(schema, 3, arrays);
}

schema_ = arrow::schema(fields);
record_batch_ = arrow::RecordBatch::Make(schema_, 3, arrays);

table_ = arrow::Table::FromRecordBatches({record_batch_}).ValueOrDie();
std::string random_string(size_t length) {
auto randchar = []() -> char {
const char charset[] =
"0123456789"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz";
const size_t max_index = (sizeof(charset) - 1);
return charset[rand() % max_index];
};
std::string str(length, 0);
std::generate_n(str.begin(), length, randchar);
return str;
}

std::shared_ptr<arrow::Schema> schema_;
std::shared_ptr<arrow::RecordBatch> record_batch_;
std::shared_ptr<arrow::Table> table_;

const std::vector<int32_t> int32_values = {1, 2, 3};
const std::vector<int64_t> int64_values = {4, 5, 6};
const std::vector<basic_string<char>> str_values = {std::string(10000, 'a'), std::string(10000, 'b'),
std::string(10000, 'c')};
std::vector<int32_t> int32_values;
std::vector<int64_t> int64_values;
std::vector<basic_string<char>> str_values;
};

} // namespace milvus_storage

0 comments on commit 7095f9c

Please sign in to comment.