Skip to content

Commit

Permalink
storage: fix block rows not match when filter column is the first non…
Browse files Browse the repository at this point in the history
…-empty column in the block (pingcap#9483) (pingcap#9493)

ref pingcap#9472

storage: fix block rows not match when the filter column is the first non-empty column in the block

Signed-off-by: ti-chi-bot <[email protected]>
Signed-off-by: Lloyd-Pottiger <[email protected]>

Co-authored-by: Lloyd-Pottiger <[email protected]>
Co-authored-by: Lloyd-Pottiger <[email protected]>
  • Loading branch information
3 people authored Sep 30, 2024
1 parent 0ad1189 commit 8c8c97b
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 24 deletions.
3 changes: 1 addition & 2 deletions dbms/src/Core/Block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -529,10 +529,8 @@ Block hstackBlocks(Blocks && blocks, const Block & header)
return {};

Block res = header.cloneEmpty();
size_t num_rows = blocks.front().rows();
for (const auto & block : blocks)
{
RUNTIME_CHECK_MSG(block.rows() == num_rows, "Cannot hstack blocks with different number of rows");
for (const auto & elem : block)
{
if (likely(res.has(elem.name)))
Expand All @@ -541,6 +539,7 @@ Block hstackBlocks(Blocks && blocks, const Block & header)
}
}
}
res.checkNumberOfRows();

return res;
}
Expand Down
13 changes: 8 additions & 5 deletions dbms/src/Core/Block.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,14 @@ using BucketBlocksListMap = std::map<Int32, BlocksList>;
/// Join blocks by columns
/// The schema of the output block is the same as the header block.
/// The columns not in the header block will be ignored.
/// For example:
/// header: (a UInt32, b UInt32, c UInt32, d UInt32)
/// block1: (a UInt32, b UInt32, c UInt32, e UInt32), rows: 3
/// block2: (d UInt32), rows: 3
/// result: (a UInt32, b UInt32, c UInt32, d UInt32), rows: 3
/// NOTE: The input blocks can have columns with different sizes,
/// but the columns in the header block must have the same size,
/// Otherwise, an exception will be thrown.
/// Example:
/// header: (a UInt32, b UInt32, c UInt32, d UInt32)
/// block1: (a UInt32, b UInt32, c UInt32, e UInt32), rows: 3
/// block2: (d UInt32), rows: 3
/// result: (a UInt32, b UInt32, c UInt32, d UInt32), rows: 3
Block hstackBlocks(Blocks && blocks, const Block & header);

/// Join blocks by rows
Expand Down
1 change: 0 additions & 1 deletion dbms/src/DataStreams/FilterTransformAction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include <Common/typeid_cast.h>
#include <DataStreams/FilterTransformAction.h>

#include <algorithm>

namespace DB
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,32 @@
namespace DB::DM
{

namespace
{

void filterFilterColumnBlock(
const Block & header,
Block & block,
const IColumn::Filter & filter,
size_t passed_count,
const String & filter_column_name)
{
ColumnPtr filter_column;
for (auto & col : block)
{
if (col.name == filter_column_name)
{
filter_column = col.column;
continue;
}
col.column = col.column->filter(filter, passed_count);
}
if (header.has(filter_column_name))
filter_column = filter_column->filter(filter, passed_count);
}

} // namespace

LateMaterializationBlockInputStream::LateMaterializationBlockInputStream(
const ColumnDefines & columns_to_read,
const String & filter_column_name_,
Expand Down Expand Up @@ -69,10 +95,7 @@ Block LateMaterializationBlockInputStream::readImpl()
{
col.column = col.column->filter(col_filter, passed_count);
}
for (auto & col : filter_column_block)
{
col.column = col.column->filter(col_filter, passed_count);
}
filterFilterColumnBlock(header, filter_column_block, col_filter, passed_count, filter_column_name);
}
return hstackBlocks({std::move(filter_column_block), std::move(rest_column_block)}, header);
}
Expand Down Expand Up @@ -111,12 +134,7 @@ Block LateMaterializationBlockInputStream::readImpl()
// so only if the number of rows left after filtering out is large enough,
// we can skip some packs of the next block, call readWithFilter to get the next block.
rest_column_block = rest_column_stream->readWithFilter(*filter);
for (auto & col : filter_column_block)
{
if (col.name == filter_column_name)
continue;
col.column = col.column->filter(*filter, passed_count);
}
filterFilterColumnBlock(header, filter_column_block, *filter, passed_count, filter_column_name);
}
else if (filter_out_count > 0)
{
Expand All @@ -127,12 +145,7 @@ Block LateMaterializationBlockInputStream::readImpl()
{
col.column = col.column->filter(*filter, passed_count);
}
for (auto & col : filter_column_block)
{
if (col.name == filter_column_name)
continue;
col.column = col.column->filter(*filter, passed_count);
}
filterFilterColumnBlock(header, filter_column_block, *filter, passed_count, filter_column_name);
}
else
{
Expand Down

0 comments on commit 8c8c97b

Please sign in to comment.