Skip to content

Commit

Permalink
enable using an external-system for sending continue message upon lon…
Browse files Browse the repository at this point in the history
…g processing.

refactoring the current external system functionality setup.

Signed-off-by: Gal Salomon <[email protected]>
  • Loading branch information
galsalomon66 committed Mar 13, 2024
1 parent 232ac70 commit 2a6de4e
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 35 deletions.
2 changes: 1 addition & 1 deletion example/s3select_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ int run_query_on_parquet_file(const char* input_query, const char* input_file)
{
try
{
status = parquet_processor.run_s3select_on_object(result,fp_s3select_result_format,fp_s3select_header_format);
status = parquet_processor.run_s3select_on_object(result);
}
catch (base_s3select_exception &e)
{
Expand Down
85 changes: 51 additions & 34 deletions include/s3select.h
Original file line number Diff line number Diff line change
Expand Up @@ -2188,8 +2188,11 @@ class base_s3object
unsigned long m_limit;
unsigned long m_processed_rows;
size_t m_returned_bytes_size;
std::function<void(const char*)> fp_ext_debug_mesg;//dispache debug message into external system
std::function<void(const char*)> m_fp_ext_debug_mesg=nullptr;//dispache debug message into external system
std::vector<std::string> m_projection_keys{};
std::function<int(std::string&)> m_fp_s3select_continue=nullptr;
std::function<int(std::string&)> m_fp_s3select_result_format=nullptr;
std::function<int(std::string&)> m_fp_s3select_header_format=nullptr;

public:
s3select_csv_definitions m_csv_defintion;//TODO add method for modify
Expand Down Expand Up @@ -2314,6 +2317,18 @@ class base_s3object

base_s3object():m_sa(nullptr),m_is_to_aggregate(false),m_where_clause(nullptr),m_s3_select(nullptr),m_error_count(0),m_returned_bytes_size(0),m_sql_processing_status(Status::INITIAL_STAT){}

void set_external_system_functions(std::function<int(std::string&)>& continue_message_fp,
std::function<int(std::string&)>& result_message_fp,
std::function<int(std::string&)>& header_message_fp,
std::function<void(const char*)>& ext_debug_mesg_fp)
{
m_fp_s3select_continue = continue_message_fp;
m_fp_s3select_result_format = result_message_fp;
m_fp_s3select_header_format = header_message_fp;
m_fp_ext_debug_mesg = ext_debug_mesg_fp;
}


explicit base_s3object(s3select* m):base_s3object()
{
if(m)
Expand All @@ -2330,11 +2345,6 @@ class base_s3object
// for the case were the rows are not fetched, but "pushed" by the data-source parser (JSON)
virtual bool multiple_row_processing(){return true;}

void set_external_debug_system(std::function<void(const char*)> fp_external)
{
fp_ext_debug_mesg = fp_external;
}

size_t get_return_result_size()
{
return m_returned_bytes_size;
Expand Down Expand Up @@ -2397,8 +2407,8 @@ class base_s3object
}


if(fp_ext_debug_mesg)
fp_ext_debug_mesg(column_result.data());
if(m_fp_ext_debug_mesg)
m_fp_ext_debug_mesg(column_result.data());

if (m_csv_defintion.quote_fields_always) {
std::ostringstream quoted_result;
Expand Down Expand Up @@ -2461,6 +2471,15 @@ class base_s3object
}

m_processed_rows++;
if(m_processed_rows % 10000 == 0)
{//TODO number of rows sould replace by time
if(m_fp_s3select_continue)
m_fp_s3select_continue(result);
std::string debug_mesg = "s3select : send continue message "+std::to_string(m_processed_rows);
if(m_fp_ext_debug_mesg)
m_fp_ext_debug_mesg(debug_mesg.c_str());
}

if ((*m_projections.begin())->is_set_last_call())
{
//should validate while query execution , no update upon nodes are marked with set_last_call
Expand Down Expand Up @@ -2489,7 +2508,7 @@ class base_s3object
i->set_last_call();
i->set_skip_non_aggregate(false);//projection column is set to be runnable
projections_resuls.push_value( &(i->eval()) );
}
}
result_values_to_string(projections_resuls,result);
return is_processing_time_error() ? (m_sql_processing_status = Status::SQL_ERROR) : (m_sql_processing_status = Status::LIMIT_REACHED);
}
Expand All @@ -2510,6 +2529,14 @@ class base_s3object
}

m_processed_rows++;
if(m_processed_rows % 10000 == 0)
{//TODO number of rows sould replace by time
if(m_fp_s3select_continue)
m_fp_s3select_continue(result);
std::string debug_mesg = "s3select : send continue message "+std::to_string(m_processed_rows);
if(m_fp_ext_debug_mesg)
m_fp_ext_debug_mesg(debug_mesg.c_str());
}
row_update_data();
for (auto& a : *m_s3_select->get_aliases()->get())
{
Expand Down Expand Up @@ -2543,7 +2570,7 @@ class base_s3object
for (auto& i : m_projections)
{
projections_resuls.push_value( &(i->eval()) );
}
}
result_values_to_string(projections_resuls,result);
if(m_sql_processing_status == Status::SQL_ERROR)
{
Expand Down Expand Up @@ -2627,15 +2654,6 @@ class csv_object : public base_s3object
int64_t m_number_of_tokens;
size_t m_skip_x_first_bytes=0;

std::function<int(std::string&)> fp_s3select_result_format=nullptr;
std::function<int(std::string&)> fp_s3select_header_format=nullptr;
public:
void set_result_formatters( std::function<int(std::string&)>& result_format,
std::function<int(std::string&)>& header_format)
{
fp_s3select_result_format = result_format;
fp_s3select_header_format = header_format;
}
private:
int getNextRow()
{
Expand Down Expand Up @@ -2786,10 +2804,10 @@ class csv_object : public base_s3object

if(*p_obj_chunk != m_csv_defintion.row_delimiter)
{// previous row can not be completed with current chunk
if(fp_ext_debug_mesg)
if(m_fp_ext_debug_mesg)
{
std::string err_mesg = "** the stream chunk is too small for processing(saved for later) **";
fp_ext_debug_mesg(err_mesg.c_str());
m_fp_ext_debug_mesg(err_mesg.c_str());
}
//copy the part to be processed later
tmp_buff.assign((char*)csv_stream, (char*)csv_stream + (p_obj_chunk - csv_stream));
Expand Down Expand Up @@ -2886,12 +2904,12 @@ class csv_object : public base_s3object
}
}

if(fp_s3select_result_format && fp_s3select_header_format)
if(m_fp_s3select_result_format && m_fp_s3select_header_format)
{
if (result.size() > CSV_INPUT_TYPE_RESPONSE_SIZE_LIMIT)
{//there are systems that might resject the response due to its size.
fp_s3select_result_format(result);
fp_s3select_header_format(result);
m_fp_s3select_result_format(result);
m_fp_s3select_header_format(result);
}
}

Expand All @@ -2910,11 +2928,11 @@ class csv_object : public base_s3object

} while (true);

if(fp_s3select_result_format && fp_s3select_header_format)
if(m_fp_s3select_result_format && m_fp_s3select_header_format)
{ //note: it may produce empty response(more the once)
//upon empty result, it should return *only* upon last call.
fp_s3select_result_format(result);
fp_s3select_header_format(result);
m_fp_s3select_result_format(result);
m_fp_s3select_header_format(result);
}

return 0;
Expand Down Expand Up @@ -3002,11 +3020,10 @@ class parquet_object : public base_s3object
}


int run_s3select_on_object(std::string &result,
std::function<int(std::string&)> fp_s3select_result_format,
std::function<int(std::string&)> fp_s3select_header_format)
int run_s3select_on_object(std::string &result)
{
m_sql_processing_status = Status::INITIAL_STAT;

m_sql_processing_status = Status::INITIAL_STAT;
do
{
try
Expand Down Expand Up @@ -3035,18 +3052,18 @@ class parquet_object : public base_s3object
#define S3SELECT_RESPONSE_SIZE_LIMIT (4 * 1024 * 1024)
if (result.size() > S3SELECT_RESPONSE_SIZE_LIMIT)
{//AWS-cli limits response size the following callbacks send response upon some threshold
fp_s3select_result_format(result);
m_fp_s3select_result_format(result);

if (!is_end_of_stream() && (get_sql_processing_status() != Status::LIMIT_REACHED))
{
fp_s3select_header_format(result);
m_fp_s3select_header_format(result);
}
}
else
{
if (is_end_of_stream() || (get_sql_processing_status() == Status::LIMIT_REACHED))
{
fp_s3select_result_format(result);
m_fp_s3select_result_format(result);
}
}

Expand Down

0 comments on commit 2a6de4e

Please sign in to comment.