Skip to content

Commit

Permalink
using reader-propeties to set buffer size upon reading column chunks.
Browse files Browse the repository at this point in the history
it is useful upon very row-groups(huge number of values per column).
refactor of the send-back-result-response.
add external configuration per parquet read-buffer. fix for result printing
remove debug.

Signed-off-by: Gal Salomon <[email protected]>
  • Loading branch information
galsalomon66 committed Sep 10, 2024
1 parent a616839 commit 4b126a7
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 51 deletions.
4 changes: 3 additions & 1 deletion example/s3select_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,10 @@ int run_query_on_parquet_file(const char* input_query, const char* input_file)
std::cout << "DEBUG: {" << msg << "}" << std::endl;
};

std::function<void(const char*)> fp_nop = [](const char* msg){};
std::function<int(std::string&)> fp_continue_nop = [](std::string& result){return 0;};
parquet_object parquet_processor(input_file,&s3select_syntax,&rgw);
//parquet_processor.set_external_debug_system(fp_debug);
parquet_processor.set_external_system_functions(fp_continue_nop, fp_s3select_result_format, fp_s3select_header_format,fp_nop);

std::string result;

Expand Down
72 changes: 28 additions & 44 deletions include/s3select.h
Original file line number Diff line number Diff line change
Expand Up @@ -2458,6 +2458,7 @@ class base_s3object
}

result.append(res->to_string());
//TODO the strlen should replace with the size of the string(performance perspective)
m_returned_bytes_size += strlen(res->to_string());
++j;
}
Expand All @@ -2474,6 +2475,8 @@ class base_s3object
if(m_csv_defintion.output_json_format && projections_resuls.values.size()) {
json_result_format(projections_resuls, result, output_delimiter);
result.append(output_row_delimiter);
//TODO to add asneeded
//TODO "flush" the result string
return;
}

Expand All @@ -2493,10 +2496,6 @@ class base_s3object
set_processing_time_error();
}


if(m_fp_ext_debug_mesg)
m_fp_ext_debug_mesg(column_result.data());

if (m_csv_defintion.quote_fields_always) {
std::ostringstream quoted_result;
quoted_result << std::quoted(column_result,m_csv_defintion.output_quot_char, m_csv_defintion.escape_char);
Expand Down Expand Up @@ -2524,9 +2523,30 @@ class base_s3object
if(!m_aggr_flow) {
result.append(output_row_delimiter);
m_returned_bytes_size += output_delimiter.size();
}
}


//TODO config / default-value
#define CSV_INPUT_TYPE_RESPONSE_SIZE_LIMIT (64 * 1024)
if(m_fp_s3select_result_format && m_fp_s3select_header_format)
{
if (result.size() > CSV_INPUT_TYPE_RESPONSE_SIZE_LIMIT)
{//there are systems that might resject the response due to its size.
m_fp_s3select_result_format(result);
m_fp_s3select_header_format(result);
}
}
}

void flush_sql_result(std::string& result)
{//purpose: flush remaining data reside in the buffer
if(m_fp_s3select_result_format && m_fp_s3select_header_format)
{
m_fp_s3select_result_format(result);
m_fp_s3select_header_format(result);
}
}

Status getMatchRow( std::string& result)
{
multi_values projections_resuls;
Expand Down Expand Up @@ -2670,8 +2690,6 @@ class base_s3object

}; //base_s3object

//TODO config / default-value
#define CSV_INPUT_TYPE_RESPONSE_SIZE_LIMIT (64 * 1024)
class csv_object : public base_s3object
{

Expand Down Expand Up @@ -2984,16 +3002,6 @@ class csv_object : public base_s3object
return -1;
}
}

if(m_fp_s3select_result_format && m_fp_s3select_header_format)
{
if (result.size() > CSV_INPUT_TYPE_RESPONSE_SIZE_LIMIT)
{//there are systems that might resject the response due to its size.
m_fp_s3select_result_format(result);
m_fp_s3select_header_format(result);
}
}

if (m_sql_processing_status == Status::END_OF_STREAM)
{
break;
Expand All @@ -3009,12 +3017,7 @@ class csv_object : public base_s3object

} while (true);

if(m_fp_s3select_result_format && m_fp_s3select_header_format)
{ //note: it may produce empty response(more the once)
//upon empty result, it should return *only* upon last call.
m_fp_s3select_result_format(result);
m_fp_s3select_header_format(result);
}
flush_sql_result(result);

return 0;
}
Expand Down Expand Up @@ -3050,7 +3053,7 @@ class parquet_object : public base_s3object
parquet_query_setting(nullptr);
}

parquet_object() : base_s3object(nullptr)
parquet_object() : base_s3object(nullptr), not_to_increase_first_time(true)
{}

void parquet_query_setting(s3select *s3_query)
Expand Down Expand Up @@ -3127,26 +3130,6 @@ class parquet_object : public base_s3object
}
}

#define S3SELECT_RESPONSE_SIZE_LIMIT (4 * 1024 * 1024)
if (result.size() > S3SELECT_RESPONSE_SIZE_LIMIT)
{//AWS-cli limits response size the following callbacks send response upon some threshold
if(m_fp_s3select_result_format)
m_fp_s3select_result_format(result);

if (!is_end_of_stream() && (get_sql_processing_status() != Status::LIMIT_REACHED))
{
if(m_fp_s3select_header_format)
m_fp_s3select_header_format(result);
}
}
else
{
if (is_end_of_stream() || (get_sql_processing_status() == Status::LIMIT_REACHED))
{
if(m_fp_s3select_result_format)
m_fp_s3select_result_format(result);
}
}

//TODO is_end_of_stream() required?
if (get_sql_processing_status() == Status::END_OF_STREAM || is_end_of_stream() || get_sql_processing_status() == Status::LIMIT_REACHED)
Expand All @@ -3156,6 +3139,7 @@ class parquet_object : public base_s3object

} while (1);

flush_sql_result(result);
return 0;
}

Expand Down
36 changes: 30 additions & 6 deletions include/s3select_parquet_intrf.h
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,30 @@ class PARQUET_EXPORT RowGroupReader {
std::unique_ptr<Contents> contents_;
};

#define RGW_default_buffer_size 1024*1024*1024
class S3select_Config {
public:
static S3select_Config& getInstance() {
static S3select_Config instance;
return instance;
}

void set_s3select_reader_properties(uint64_t value) { this->m_reader_properties = value; }
uint64_t get_s3select_reader_properties() const { return m_reader_properties; }

private:
uint64_t m_reader_properties;
S3select_Config() : m_reader_properties(RGW_default_buffer_size) {} // Private constructor
};

ReaderProperties s3select_reader_properties() {

static ReaderProperties default_reader_properties;
default_reader_properties.enable_buffered_stream();
default_reader_properties.set_buffer_size(S3select_Config::getInstance().get_s3select_reader_properties());
return default_reader_properties;
}

class PARQUET_EXPORT ParquetFileReader {
public:
// Declare a virtual class 'Contents' to aid dependency injection and more
Expand All @@ -755,7 +779,7 @@ class PARQUET_EXPORT ParquetFileReader {
struct PARQUET_EXPORT Contents {
static std::unique_ptr<Contents> Open(
std::shared_ptr<::arrow::io::RandomAccessFile> source,
const ReaderProperties& props = default_reader_properties(),
const ReaderProperties& props = s3select_reader_properties(),
std::shared_ptr<FileMetaData> metadata = NULLPTR);

virtual ~Contents() = default;
Expand All @@ -776,21 +800,21 @@ class PARQUET_EXPORT ParquetFileReader {
ARROW_DEPRECATED("Use arrow::io::RandomAccessFile version")
static std::unique_ptr<ParquetFileReader> Open(
std::unique_ptr<RandomAccessSource> source,
const ReaderProperties& props = default_reader_properties(),
const ReaderProperties& props = s3select_reader_properties(),
std::shared_ptr<FileMetaData> metadata = NULLPTR);

// Create a file reader instance from an Arrow file object. Thread-safety is
// the responsibility of the file implementation
static std::unique_ptr<ParquetFileReader> Open(
std::shared_ptr<::arrow::io::RandomAccessFile> source,
const ReaderProperties& props = default_reader_properties(),
const ReaderProperties& props = s3select_reader_properties(),
std::shared_ptr<FileMetaData> metadata = NULLPTR);

// API Convenience to open a serialized Parquet file on disk, using Arrow IO
// interfaces.
static std::unique_ptr<ParquetFileReader> OpenFile(
const std::string& path,s3selectEngine::rgw_s3select_api* rgw, bool memory_map = true,
const ReaderProperties& props = default_reader_properties(),
const ReaderProperties& props = s3select_reader_properties(),
std::shared_ptr<FileMetaData> metadata = NULLPTR
);

Expand Down Expand Up @@ -1034,7 +1058,7 @@ class SerializedRowGroup : public RowGroupReader::Contents {
class SerializedFile : public ParquetFileReader::Contents {
public:
SerializedFile(std::shared_ptr<ArrowInputFile> source,
const ReaderProperties& props = default_reader_properties())
const ReaderProperties& props = s3select_reader_properties())
: source_(std::move(source)), properties_(props) {
PARQUET_ASSIGN_OR_THROW(source_size_, source_->GetSize());
}
Expand Down Expand Up @@ -1241,7 +1265,7 @@ void SerializedFile::ParseMetaDataOfEncryptedFileWithEncryptedFooter(

#if ARROW_VERSION_MAJOR > 9
file_metadata_ =
FileMetaData::Make(metadata_buffer->data(), &metadata_len, default_reader_properties(), file_decryptor_);
FileMetaData::Make(metadata_buffer->data(), &metadata_len, s3select_reader_properties(), file_decryptor_);
#else
file_metadata_ =
FileMetaData::Make(metadata_buffer->data(), &metadata_len, file_decryptor_);
Expand Down

0 comments on commit 4b126a7

Please sign in to comment.