Skip to content

Commit

Permalink
*: only enable new string serdes format in exchange when MppVersion >…
Browse files Browse the repository at this point in the history
…= MppVersionV3 (#9759)

close #9673

1. Passing mpp_version to CHBlockChunkCodec and CHBlockChunkCodecV1 for encoding: if mpp_version <= MppVersion2, use the legacy format of string.

2. When decoding, CHBlockChunkCodec and CHBlockChunkCodecV1 respect to the type name from encoder.

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
JinheLin and ti-chi-bot[bot] authored Jan 10, 2025
1 parent 265b329 commit b4e8068
Show file tree
Hide file tree
Showing 17 changed files with 258 additions and 94 deletions.
2 changes: 2 additions & 0 deletions dbms/src/DataTypes/DataTypeString.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ class DataTypeString final : public IDataType
};

inline static const String LegacyName{"String"}; // For compatibility of size-prefix format.
inline static const String NullableLegacyName{"Nullable(String)"};
inline static const String NameV2{"StringV2"}; // The separate size and chars format.
inline static const String NullableNameV2{"Nullable(StringV2)"};

// Both getDefaultName and getNullableDefaultName are unit-tests helpers.
static String getDefaultName();
Expand Down
32 changes: 18 additions & 14 deletions dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,12 @@ size_t getExtraInfoSize(const Block & block)
return size;
}

void WriteColumnData(const IDataType & type, const ColumnPtr & column, WriteBuffer & ostr, size_t offset, size_t limit)
void CHBlockChunkCodec::WriteColumnData(
const IDataType & type,
const ColumnPtr & column,
WriteBuffer & ostr,
size_t offset,
size_t limit)
{
/** If there are columns-constants - then we materialize them.
* (Since the data type does not know how to serialize / deserialize constants.)
Expand Down Expand Up @@ -151,10 +156,12 @@ void CHBlockChunkCodecStream::encode(const Block & block, size_t start, size_t e
const ColumnWithTypeAndName & column = block.safeGetByPosition(i);

writeStringBinary(column.name, *output);
writeStringBinary(column.type->getName(), *output);
// Always convert StringV2 to String, because CHBlockChunkCodec will only be used in legacy MppVersions.
const auto & ser_type = CodecUtils::convertDataType(*column.type);
writeStringBinary(ser_type.getName(), *output);

if (rows)
WriteColumnData(*column.type, column.column, *output, 0, 0);
CHBlockChunkCodec::WriteColumnData(ser_type, column.column, *output, 0, 0);
}
}

Expand Down Expand Up @@ -223,34 +230,31 @@ void CHBlockChunkCodec::readColumnMeta(size_t i, ReadBuffer & istr, ColumnWithTy
/// Type
String type_name;
readBinary(type_name, istr);
const DataTypeFactory & data_type_factory = DataTypeFactory::instance();
if (header)
{
CodecUtils::checkDataTypeName("CHBlockChunkCodec", i, header_datatypes[i].name, type_name);
column.type = header_datatypes[i].type;
}
else
{
column.type = data_type_factory.get(type_name);
}
column.type = DataTypeFactory::instance().getOrSet(type_name); // Respect the type name from encoder
}

Block CHBlockChunkCodec::decode(const String & str, const DAGSchema & schema)
{
ReadBufferFromString read_buffer(str);
return CHBlockChunkCodec(schema).decodeImpl(read_buffer);
auto block = CHBlockChunkCodec(schema).decodeImpl(read_buffer);
return header ? header.cloneWithColumns(block.getColumns()) : std::move(block);
}

Block CHBlockChunkCodec::decode(const String & str, const Block & header)
{
ReadBufferFromString read_buffer(str);
return CHBlockChunkCodec(header).decodeImpl(read_buffer);
// Codec may return legacy string type (named 'String'), respect to local string type (named 'StringV2')
auto block = CHBlockChunkCodec(header).decodeImpl(read_buffer);
return header ? header.cloneWithColumns(block.getColumns()) : std::move(block);
}

Block CHBlockChunkCodec::decode(const String & str)
{
ReadBufferFromString read_buffer(str);
return decodeImpl(read_buffer);
auto block = decodeImpl(read_buffer);
return header ? header.cloneWithColumns(block.getColumns()) : std::move(block);
}

} // namespace DB
13 changes: 13 additions & 0 deletions dbms/src/Flash/Coprocessor/CHBlockChunkCodec.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
namespace DB
{
class CHBlockChunkDecodeAndSquash;
struct CHBlockChunkCodecV1Impl;
class CHBlockChunkCodecStream;

class CHBlockChunkCodec final : public ChunkCodec
{
Expand All @@ -28,16 +30,27 @@ class CHBlockChunkCodec final : public ChunkCodec
explicit CHBlockChunkCodec(const Block & header_);
explicit CHBlockChunkCodec(const DAGSchema & schema);

// Use in dbgFuncCoprocessorUtils.cpp and MPPTaskTestUtils.cpp
Block decode(const String &, const DAGSchema & schema) override;
// Use in CoprocessorReader
static Block decode(const String &, const Block & header);
// Use in unit-tests
Block decode(const String &);
std::unique_ptr<ChunkCodecStream> newCodecStream(const std::vector<tipb::FieldType> & field_types) override;

private:
friend class CHBlockChunkDecodeAndSquash;
friend struct CHBlockChunkCodecV1Impl;
friend class CHBlockChunkCodecStream;
void readColumnMeta(size_t i, ReadBuffer & istr, ColumnWithTypeAndName & column);
void readBlockMeta(ReadBuffer & istr, size_t & columns, size_t & rows) const;
static void readData(const IDataType & type, IColumn & column, ReadBuffer & istr, size_t rows);
static void WriteColumnData(
const IDataType & type,
const ColumnPtr & column,
WriteBuffer & ostr,
size_t offset,
size_t limit);
/// 'reserve_size' used for Squash usage, and takes effect when 'reserve_size' > 0
Block decodeImpl(ReadBuffer & istr, size_t reserve_size = 0);

Expand Down
26 changes: 12 additions & 14 deletions dbms/src/Flash/Coprocessor/CHBlockChunkCodecV1.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <Flash/Coprocessor/CHBlockChunkCodecV1.h>
#include <IO/Buffer/ReadBufferFromString.h>
#include <IO/Compression/CompressionCodecFactory.h>
Expand All @@ -26,7 +27,7 @@

namespace DB
{
size_t ApproxBlockHeaderBytes(const Block & block)
static size_t ApproxBlockHeaderBytes(const Block & block)
{
size_t size = 8 + 8; /// to hold some length of structures, such as column number, row number...
size_t columns = block.columns();
Expand All @@ -41,7 +42,7 @@ size_t ApproxBlockHeaderBytes(const Block & block)
return size;
}

void EncodeHeader(WriteBuffer & ostr, const Block & header, size_t rows)
void EncodeHeader(WriteBuffer & ostr, const Block & header, size_t rows, MPPDataPacketVersion packet_version)
{
size_t columns = header.columns();
writeVarUInt(columns, ostr);
Expand All @@ -51,7 +52,8 @@ void EncodeHeader(WriteBuffer & ostr, const Block & header, size_t rows)
{
const ColumnWithTypeAndName & column = header.safeGetByPosition(i);
writeStringBinary(column.name, ostr);
writeStringBinary(column.type->getName(), ostr);
const auto & ser_type = CodecUtils::convertDataTypeByPacketVersion(*column.type, packet_version);
writeStringBinary(ser_type.getName(), ostr);
}
}

Expand Down Expand Up @@ -79,19 +81,12 @@ Block DecodeHeader(ReadBuffer & istr, const Block & header, size_t & total_rows)
String type_name;
readBinary(type_name, istr);
if (header)
{
CodecUtils::checkDataTypeName(
"CHBlockChunkCodecV1",
i,
header.getByPosition(i).type->getName(),
type_name);
column.type = header.getByPosition(i).type;
}
else
{
const auto & data_type_factory = DataTypeFactory::instance();
column.type = data_type_factory.get(type_name);
}
column.type = DataTypeFactory::instance().get(type_name); // Respect the type name from encoder
}
res.insert(std::move(column));
}
Expand Down Expand Up @@ -309,7 +304,9 @@ struct CHBlockChunkCodecV1Impl
{
auto && col_type_name = inner.header.getByPosition(col_index);
auto && column_ptr = toColumnPtr(std::forward<ColumnsHolder>(columns_holder), col_index);
WriteColumnData(*col_type_name.type, column_ptr, *ostr_ptr, 0, 0);
const auto & ser_type
= CodecUtils::convertDataTypeByPacketVersion(*col_type_name.type, inner.packet_version);
CHBlockChunkCodec::WriteColumnData(ser_type, column_ptr, *ostr_ptr, 0, 0);
}

inner.encoded_rows += rows;
Expand Down Expand Up @@ -413,7 +410,7 @@ struct CHBlockChunkCodecV1Impl
}

// Encode header
EncodeHeader(*ostr_ptr, inner.header, rows);
EncodeHeader(*ostr_ptr, inner.header, rows, inner.packet_version);
// Encode column data
encodeColumn(std::forward<VecColumns>(batch_columns), ostr_ptr);

Expand All @@ -433,9 +430,10 @@ struct CHBlockChunkCodecV1Impl
}
};

CHBlockChunkCodecV1::CHBlockChunkCodecV1(const Block & header_)
CHBlockChunkCodecV1::CHBlockChunkCodecV1(const Block & header_, MPPDataPacketVersion packet_version_)
: header(header_)
, header_size(ApproxBlockHeaderBytes(header))
, packet_version(packet_version_)
{}

static void checkSchema(const Block & header, const Block & block)
Expand Down
15 changes: 5 additions & 10 deletions dbms/src/Flash/Coprocessor/CHBlockChunkCodecV1.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,18 @@
#pragma once

#include <Flash/Coprocessor/CHBlockChunkCodec.h>
#include <Flash/Mpp/MppVersion.h>
#include <IO/Compression/CompressedReadBuffer.h>
#include <IO/Compression/CompressedWriteBuffer.h>
#include <IO/Compression/CompressionMethod.h>

namespace DB
{
size_t ApproxBlockHeaderBytes(const Block & block);
using CompressedCHBlockChunkReadBuffer = CompressedReadBuffer<false>;
using CompressedCHBlockChunkWriteBuffer = CompressedWriteBuffer<false>;
void EncodeHeader(WriteBuffer & ostr, const Block & header, size_t rows);
void DecodeColumns(ReadBuffer & istr, Block & res, size_t rows_to_read, size_t reserve_size = 0);
void DecodeColumns(ReadBuffer & istr, Block & res, size_t rows_to_read, size_t reserve_size);
Block DecodeHeader(ReadBuffer & istr, const Block & header, size_t & rows);
CompressionMethod ToInternalCompressionMethod(tipb::CompressionMode compression_mode);
extern void WriteColumnData(
const IDataType & type,
const ColumnPtr & column,
WriteBuffer & ostr,
size_t offset,
size_t limit);

struct CHBlockChunkCodecV1 : boost::noncopyable
{
Expand All @@ -42,13 +35,14 @@ struct CHBlockChunkCodecV1 : boost::noncopyable

const Block & header;
const size_t header_size;
const MPPDataPacketVersion packet_version;

size_t encoded_rows{};
size_t original_size{};
size_t compressed_size{};

void clear();
explicit CHBlockChunkCodecV1(const Block & header_);
CHBlockChunkCodecV1(const Block & header_, MPPDataPacketVersion packet_version_);
//
EncodeRes encode(const MutableColumns & columns, CompressionMethod compression_method);
EncodeRes encode(std::vector<MutableColumns> && columns, CompressionMethod compression_method);
Expand All @@ -61,6 +55,7 @@ struct CHBlockChunkCodecV1 : boost::noncopyable
EncodeRes encode(std::vector<Block> && blocks, CompressionMethod compression_method, bool check_schema = true);
//
static EncodeRes encode(std::string_view str, CompressionMethod compression_method);
// `decode` is only used for test
static Block decode(const Block & header, std::string_view str);
};

Expand Down
18 changes: 18 additions & 0 deletions dbms/src/Flash/Coprocessor/ChunkDecodeAndSquash.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ CHBlockChunkDecodeAndSquash::CHBlockChunkDecodeAndSquash(const Block & header, s
{}

std::optional<Block> CHBlockChunkDecodeAndSquash::decodeAndSquashV1(std::string_view sv)
{
auto block = doDecodeAndSquashV1(sv);
// Codec may return legacy string type (named 'String'), respect to local string type (named 'StringV2')
if (block)
return codec.header.cloneWithColumns(block->getColumns());
return block;
}

std::optional<Block> CHBlockChunkDecodeAndSquash::doDecodeAndSquashV1(std::string_view sv)
{
if unlikely (sv.empty())
{
Expand Down Expand Up @@ -77,6 +86,15 @@ std::optional<Block> CHBlockChunkDecodeAndSquash::decodeAndSquashV1Impl(ReadBuff
}

std::optional<Block> CHBlockChunkDecodeAndSquash::decodeAndSquash(const String & str)
{
auto block = doDecodeAndSquash(str);
// Codec may return legacy string type (named 'String'), respect to local string type (named 'StringV2')
if (block)
return codec.header.cloneWithColumns(block->getColumns());
return block;
}

std::optional<Block> CHBlockChunkDecodeAndSquash::doDecodeAndSquash(const String & str)
{
std::optional<Block> res;
ReadBufferFromString istr(str);
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Flash/Coprocessor/ChunkDecodeAndSquash.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ class CHBlockChunkDecodeAndSquash
std::optional<Block> flush();

private:
std::optional<Block> doDecodeAndSquash(const String & str);

std::optional<Block> doDecodeAndSquashV1(std::string_view sv);
std::optional<Block> decodeAndSquashV1Impl(ReadBuffer & istr);

private:
Expand Down
48 changes: 40 additions & 8 deletions dbms/src/Flash/Coprocessor/CodecUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#include <Common/Exception.h>
#include <Common/TiFlashException.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeString.h>
#include <Flash/Coprocessor/CodecUtils.h>
#include <Flash/Coprocessor/DAGUtils.h>

Expand All @@ -37,14 +39,44 @@ void checkColumnSize(const String & identifier, size_t expected, size_t actual)

void checkDataTypeName(const String & identifier, size_t column_index, const String & expected, const String & actual)
{
if (unlikely(expected != actual))
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"{} schema mismatch at column {}, expected {}, actual {}",
identifier,
column_index,
expected,
actual);
if (likely(expected == actual))
return;
if (expected == DataTypeString::NameV2 && actual == DataTypeString::LegacyName)
return;
if (expected == DataTypeString::NullableNameV2 && actual == DataTypeString::NullableLegacyName)
return;

throw Exception(
ErrorCodes::LOGICAL_ERROR,
"{} schema mismatch at column {}, expected {}, actual {}",
identifier,
column_index,
expected,
actual);
}

const IDataType & convertDataType(const IDataType & type)
{
static const auto legacy_string_type = DataTypeFactory::instance().getOrSet(DataTypeString::LegacyName);
static const auto legacy_nullable_string_type
= DataTypeFactory::instance().getOrSet(DataTypeString::NullableLegacyName);

auto name = type.getName();
if (name == DataTypeString::NameV2)
return *legacy_string_type;
else if (name == DataTypeString::NullableNameV2)
return *legacy_nullable_string_type;
else
return type;
}

const IDataType & convertDataTypeByPacketVersion(const IDataType & type, MPPDataPacketVersion packet_version)
{
if (packet_version >= MPPDataPacketVersion::MPPDataPacketV2)
return type;

// If packet_version < MPPDataPacketVersion::MPPDataPacketV2, use legacy DataTypeString.
return convertDataType(type);
}

} // namespace DB::CodecUtils
3 changes: 3 additions & 0 deletions dbms/src/Flash/Coprocessor/CodecUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#pragma once

#include <DataTypes/IDataType.h>
#include <Flash/Mpp/MppVersion.h>

namespace DB
{
Expand All @@ -33,5 +34,7 @@ struct DataTypeWithTypeName

void checkColumnSize(const String & identifier, size_t expected, size_t actual);
void checkDataTypeName(const String & identifier, size_t column_index, const String & expected, const String & actual);
const IDataType & convertDataType(const IDataType & type);
const IDataType & convertDataTypeByPacketVersion(const IDataType & type, MPPDataPacketVersion packet_version);
} // namespace CodecUtils
} // namespace DB
Loading

0 comments on commit b4e8068

Please sign in to comment.