From ac758c196df059bc515c50809813458456be56e6 Mon Sep 17 00:00:00 2001 From: Gal Salomon Date: Fri, 30 Aug 2024 17:32:14 +0300 Subject: [PATCH] refactor of the send-back-result-response. Signed-off-by: Gal Salomon --- example/s3select_example.cpp | 4 ++- include/s3select.h | 64 ++++++++++++++---------------------- 2 files changed, 28 insertions(+), 40 deletions(-) diff --git a/example/s3select_example.cpp b/example/s3select_example.cpp index 4147dadd..6e17e4f3 100644 --- a/example/s3select_example.cpp +++ b/example/s3select_example.cpp @@ -320,8 +320,10 @@ int run_query_on_parquet_file(const char* input_query, const char* input_file) std::cout << "DEBUG: {" << msg << "}" << std::endl; }; + std::function fp_nop = [](const char* msg){}; + std::function fp_continue_nop = [](std::string& result){return 0;}; parquet_object parquet_processor(input_file,&s3select_syntax,&rgw); - //parquet_processor.set_external_debug_system(fp_debug); + parquet_processor.set_external_system_functions(fp_continue_nop, fp_s3select_result_format, fp_s3select_header_format,fp_nop); std::string result; diff --git a/include/s3select.h b/include/s3select.h index 7f8a09e6..d23536e2 100644 --- a/include/s3select.h +++ b/include/s3select.h @@ -2474,6 +2474,8 @@ class base_s3object if(m_csv_defintion.output_json_format && projections_resuls.values.size()) { json_result_format(projections_resuls, result, output_delimiter); result.append(output_row_delimiter); + //TODO to add asneeded + //TODO "flush" the result string return; } @@ -2524,9 +2526,30 @@ class base_s3object if(!m_aggr_flow) { result.append(output_row_delimiter); m_returned_bytes_size += output_delimiter.size(); - } + } + + +//TODO config / default-value +#define CSV_INPUT_TYPE_RESPONSE_SIZE_LIMIT (64 * 1024) + if(m_fp_s3select_result_format && m_fp_s3select_header_format) + { + if (result.size() > CSV_INPUT_TYPE_RESPONSE_SIZE_LIMIT) + {//there are systems that might resject the response due to its size. + m_fp_s3select_result_format(result); + m_fp_s3select_header_format(result); + } + } } + void flush_sql_result(std::string& result) + {//purpose: flush remaining data reside in the buffer + if(m_fp_s3select_result_format && m_fp_s3select_header_format) + { + m_fp_s3select_result_format(result); + m_fp_s3select_header_format(result); + } + } + Status getMatchRow( std::string& result) { multi_values projections_resuls; @@ -2670,8 +2693,6 @@ class base_s3object }; //base_s3object -//TODO config / default-value -#define CSV_INPUT_TYPE_RESPONSE_SIZE_LIMIT (64 * 1024) class csv_object : public base_s3object { @@ -2984,16 +3005,6 @@ class csv_object : public base_s3object return -1; } } - - if(m_fp_s3select_result_format && m_fp_s3select_header_format) - { - if (result.size() > CSV_INPUT_TYPE_RESPONSE_SIZE_LIMIT) - {//there are systems that might resject the response due to its size. - m_fp_s3select_result_format(result); - m_fp_s3select_header_format(result); - } - } - if (m_sql_processing_status == Status::END_OF_STREAM) { break; @@ -3009,12 +3020,7 @@ class csv_object : public base_s3object } while (true); - if(m_fp_s3select_result_format && m_fp_s3select_header_format) - { //note: it may produce empty response(more the once) - //upon empty result, it should return *only* upon last call. - m_fp_s3select_result_format(result); - m_fp_s3select_header_format(result); - } + flush_sql_result(result); return 0; } @@ -3127,26 +3133,6 @@ class parquet_object : public base_s3object } } -#define S3SELECT_RESPONSE_SIZE_LIMIT (4 * 1024 * 1024) - if (result.size() > S3SELECT_RESPONSE_SIZE_LIMIT) - {//AWS-cli limits response size the following callbacks send response upon some threshold - if(m_fp_s3select_result_format) - m_fp_s3select_result_format(result); - - if (!is_end_of_stream() && (get_sql_processing_status() != Status::LIMIT_REACHED)) - { - if(m_fp_s3select_header_format) - m_fp_s3select_header_format(result); - } - } - else - { - if (is_end_of_stream() || (get_sql_processing_status() == Status::LIMIT_REACHED)) - { - if(m_fp_s3select_result_format) - m_fp_s3select_result_format(result); - } - } //TODO is_end_of_stream() required? if (get_sql_processing_status() == Status::END_OF_STREAM || is_end_of_stream() || get_sql_processing_status() == Status::LIMIT_REACHED)