Skip to content

Commit

Permalink
Storages: New serialization/deserialization for DataTypeString (#9608)
Browse files Browse the repository at this point in the history
close #9673

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
Co-authored-by: Lloyd-Pottiger <[email protected]>
  • Loading branch information
3 people authored Dec 12, 2024
1 parent 96ae6d2 commit 1060155
Show file tree
Hide file tree
Showing 38 changed files with 1,687 additions and 656 deletions.
10 changes: 7 additions & 3 deletions dbms/src/Core/tests/gtest_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ try
"Decimal(40,2)",
"MyDate",
"MyDateTime",
"String",
DataTypeString::getDefaultName(),
"FixedString(10)"};
for (auto & type_name : all_types)
{
Expand All @@ -80,7 +80,11 @@ try
ArenaPtr pool = std::make_shared<Arena>();
pool->alloc(1024 * 1024);
/// case 1, agg function not allocate memory in arena
std::vector<String> types{"Int64", "String", "Nullable(Int64)", "Nullable(String)"};
std::vector<String> types{
"Int64",
DataTypeString::getDefaultName(),
"Nullable(Int64)",
DataTypeString::getNullableDefaultName()};
std::vector<size_t> data_size{
16,
ColumnString::APPROX_STRING_SIZE * 2,
Expand Down Expand Up @@ -139,7 +143,7 @@ try
String long_str(ColumnString::APPROX_STRING_SIZE * 5, 'a');
String short_str(std::max(1, ColumnString::APPROX_STRING_SIZE / 10), 'a');
std::vector<String> string_values{short_str, long_str};
std::vector<String> types{"String", "Nullable(String)"};
std::vector<String> types{DataTypeString::getDefaultName(), DataTypeString::getNullableDefaultName()};
for (const auto & string_value : string_values)
{
for (const auto & type_string : types)
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Core/tests/gtest_spiller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -940,8 +940,8 @@ TEST_F(SpillerTest, SpillAndRestoreStringEnumData)
try
{
NamesAndTypes spiller_schema;
spiller_schema.emplace_back("col0", DataTypeFactory::instance().get("String"));
spiller_schema.emplace_back("col1", DataTypeFactory::instance().get("Nullable(String)"));
spiller_schema.emplace_back("col0", DataTypeFactory::instance().get(DataTypeString::getDefaultName()));
spiller_schema.emplace_back("col1", DataTypeFactory::instance().get(DataTypeString::getNullableDefaultName()));
spiller_schema.emplace_back("col2", DataTypeFactory::instance().get("Enum8('a' = 0,'b' = 1,'c' = 2)"));
spiller_schema.emplace_back("col3", DataTypeFactory::instance().get("Nullable(Enum8('a' = 0,'b' = 1,'c' = 2))"));
spiller_schema.emplace_back("col4", DataTypeFactory::instance().get("Enum16('a' = 0,'b' = 1,'c' = 2)"));
Expand Down Expand Up @@ -969,4 +969,4 @@ try
CATCH

} // namespace tests
} // namespace DB
} // namespace DB
211 changes: 209 additions & 2 deletions dbms/src/DataTypes/DataTypeString.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <IO/ReadHelpers.h>
#include <IO/VarInt.h>
#include <IO/WriteHelpers.h>
#include <Storages/FormatVersion.h>

#if __SSE2__
#include <emmintrin.h>
Expand Down Expand Up @@ -292,9 +293,15 @@ bool DataTypeString::equals(const IDataType & rhs) const

void registerDataTypeString(DataTypeFactory & factory)
{
auto creator = static_cast<DataTypePtr (*)()>([] { return DataTypePtr(std::make_shared<DataTypeString>()); });
std::function<DataTypePtr()> legacy_creator = [] {
return std::make_shared<DataTypeString>(DataTypeString::SerdesFormat::SizePrefix);
};
factory.registerSimpleDataType(DataTypeString::LegacyName, legacy_creator);

factory.registerSimpleDataType("String", creator);
std::function<DataTypePtr()> creator = [] {
return std::make_shared<DataTypeString>(DataTypeString::SerdesFormat::SeparateSizeAndChars);
};
factory.registerSimpleDataType(DataTypeString::NameV2, creator);

/// These synonims are added for compatibility.

Expand All @@ -310,4 +317,204 @@ void registerDataTypeString(DataTypeFactory & factory)
factory.registerSimpleDataType("LONGBLOB", creator, DataTypeFactory::CaseInsensitive);
}

namespace
{

using Offset = ColumnString::Offset;

// Returns <offsets_stream, chars_stream>.
template <typename B, typename G>
std::pair<B *, B *> getStream(const G & getter, IDataType::SubstreamPath & path)
{
auto * chars_stream = getter(path);
path.emplace_back(IDataType::Substream::StringSizes);
auto * offsets_stream = getter(path);
return {offsets_stream, chars_stream};
}

PaddedPODArray<Offset> offsetToStrSize(
const ColumnString::Offsets & chars_offsets,
const size_t begin,
const size_t end)
{
assert(!chars_offsets.empty());
// The class PODArrayBase ensure chars_offsets[-1] is well defined as 0.
// For details, check the `pad_left` argument in PODArrayBase.
// In the for loop code below, when `begin` and `i` are 0:
// str_sizes[0] = chars_offsets[0] - chars_offsets[-1];
assert(chars_offsets[-1] == 0);

PaddedPODArray<Offset> str_sizes(end - begin);
auto chars_offsets_pos = chars_offsets.begin() + begin;

// clang-format off
#pragma clang loop vectorize(enable)
// clang-format on
for (ssize_t i = 0; i < static_cast<ssize_t>(str_sizes.size()); ++i)
{
str_sizes[i] = chars_offsets_pos[i] - chars_offsets_pos[i - 1];
}
return str_sizes;
}

void strSizeToOffset(const PaddedPODArray<Offset> & str_sizes, ColumnString::Offsets & chars_offsets)
{
assert(!str_sizes.empty());
assert(chars_offsets[-1] == 0);
const auto initial_size = chars_offsets.size();
chars_offsets.resize(initial_size + str_sizes.size());
auto chars_offsets_pos = chars_offsets.begin() + initial_size;
// Cannot be vectorize by compiler because chars_offsets[i] depends on chars_offsets[i-1]
// #pragma clang loop vectorize(enable)
for (ssize_t i = 0; i < static_cast<ssize_t>(str_sizes.size()); ++i)
{
chars_offsets_pos[i] = str_sizes[i] + chars_offsets_pos[i - 1];
}
}

std::pair<size_t, size_t> serializeOffsetsBinary(
const ColumnString::Offsets & chars_offsets,
WriteBuffer & ostr,
size_t offset,
size_t limit)
{
// [begin, end) is the range that need to be serialized of `chars_offsets`.
const auto begin = offset;
const auto end = limit != 0 && offset + limit < chars_offsets.size() ? offset + limit : chars_offsets.size();

PaddedPODArray<Offset> sizes = offsetToStrSize(chars_offsets, begin, end);
ostr.write(reinterpret_cast<const char *>(sizes.data()), sizeof(Offset) * sizes.size());

// [chars_begin, chars_end) is the range that need to be serialized of `chars`.
const auto chars_begin = begin == 0 ? 0 : chars_offsets[begin - 1];
const auto chars_end = chars_offsets[end - 1];
return {chars_begin, chars_end};
}

void serializeCharsBinary(const ColumnString::Chars_t & chars, WriteBuffer & ostr, size_t begin, size_t end)
{
ostr.write(reinterpret_cast<const char *>(&chars[begin]), end - begin);
}

size_t deserializeOffsetsBinary(ColumnString::Offsets & chars_offsets, ReadBuffer & istr, size_t limit)
{
PaddedPODArray<Offset> str_sizes(limit);
const auto size = istr.readBig(reinterpret_cast<char *>(str_sizes.data()), sizeof(Offset) * limit);
str_sizes.resize(size / sizeof(Offset));
strSizeToOffset(str_sizes, chars_offsets);
return std::accumulate(str_sizes.begin(), str_sizes.end(), 0uz);
}

void deserializeCharsBinary(ColumnString::Chars_t & chars, ReadBuffer & istr, size_t bytes)
{
const auto initial_size = chars.size();
chars.resize(initial_size + bytes);
istr.readStrict(reinterpret_cast<char *>(&chars[initial_size]), bytes);
}

void serializeBinaryBulkV2(
const IColumn & column,
WriteBuffer & offsets_stream,
WriteBuffer & chars_stream,
size_t offset,
size_t limit)
{
if (column.empty())
return;
const auto & column_string = typeid_cast<const ColumnString &>(column);
const auto & chars = column_string.getChars();
const auto & offsets = column_string.getOffsets();
auto [chars_begin, chars_end] = serializeOffsetsBinary(offsets, offsets_stream, offset, limit);
serializeCharsBinary(chars, chars_stream, chars_begin, chars_end);
}

void deserializeBinaryBulkV2(IColumn & column, ReadBuffer & offsets_stream, ReadBuffer & chars_stream, size_t limit)
{
if (limit == 0)
return;
auto & column_string = typeid_cast<ColumnString &>(column);
auto & chars = column_string.getChars();
auto & offsets = column_string.getOffsets();
auto bytes = deserializeOffsetsBinary(offsets, offsets_stream, limit);
deserializeCharsBinary(chars, chars_stream, bytes);
}

} // namespace

void DataTypeString::enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const
{
callback(path);
if (serdes_fmt == SerdesFormat::SeparateSizeAndChars)
{
path.emplace_back(Substream::StringSizes);
callback(path);
}
}

void DataTypeString::serializeBinaryBulkWithMultipleStreams(
const IColumn & column,
const OutputStreamGetter & getter,
size_t offset,
size_t limit,
bool /*position_independent_encoding*/,
SubstreamPath & path) const
{
if (serdes_fmt == SerdesFormat::SeparateSizeAndChars)
{
auto [offsets_stream, chars_stream] = getStream<WriteBuffer, IDataType::OutputStreamGetter>(getter, path);
serializeBinaryBulkV2(column, *offsets_stream, *chars_stream, offset, limit);
}
else
{
serializeBinaryBulk(column, *getter(path), offset, limit);
}
}

void DataTypeString::deserializeBinaryBulkWithMultipleStreams(
IColumn & column,
const InputStreamGetter & getter,
size_t limit,
double avg_value_size_hint,
bool /*position_independent_encoding*/,
SubstreamPath & path) const
{
if (serdes_fmt == SerdesFormat::SeparateSizeAndChars)
{
auto [offsets_stream, chars_stream] = getStream<ReadBuffer, IDataType::InputStreamGetter>(getter, path);
deserializeBinaryBulkV2(column, *offsets_stream, *chars_stream, limit);
}
else
{
deserializeBinaryBulk(column, *getter(path), limit, avg_value_size_hint);
}
}

static DataTypeString::SerdesFormat getDefaultByStorageFormat(StorageFormatVersion current)
{
if (current.identifier < 8 || (current.identifier >= 100 && current.identifier < 103))
{
return DataTypeString::SerdesFormat::SizePrefix;
}
return DataTypeString::SerdesFormat::SeparateSizeAndChars;
}

DataTypeString::DataTypeString(SerdesFormat serdes_fmt_)
: serdes_fmt((serdes_fmt_ != SerdesFormat::None) ? serdes_fmt_ : getDefaultByStorageFormat(STORAGE_FORMAT_CURRENT))
{}

String DataTypeString::getDefaultName()
{
if (STORAGE_FORMAT_CURRENT.identifier < 8
|| (STORAGE_FORMAT_CURRENT.identifier >= 100 && STORAGE_FORMAT_CURRENT.identifier < 103))
{
return LegacyName;
}
return NameV2;
}

String DataTypeString::getNullableDefaultName()
{
return fmt::format("Nullable({})", getDefaultName());
}

} // namespace DB
40 changes: 39 additions & 1 deletion dbms/src/DataTypes/DataTypeString.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

#include <DataTypes/IDataType.h>


namespace DB
{
class DataTypeString final : public IDataType
Expand All @@ -27,6 +26,8 @@ class DataTypeString final : public IDataType

const char * getFamilyName() const override { return "String"; }

String getName() const override { return serdes_fmt == SerdesFormat::SeparateSizeAndChars ? NameV2 : LegacyName; }

TypeIndex getTypeId() const override { return TypeIndex::String; }

void serializeBinary(const Field & field, WriteBuffer & ostr) const override;
Expand Down Expand Up @@ -64,6 +65,43 @@ class DataTypeString final : public IDataType
bool isString() const override { return true; }
bool isCategorial() const override { return true; }
bool canBeInsideNullable() const override { return true; }

void enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const override;

void serializeBinaryBulkWithMultipleStreams(
const IColumn & column,
const OutputStreamGetter & getter,
size_t offset,
size_t limit,
bool position_independent_encoding,
SubstreamPath & path) const override;

void deserializeBinaryBulkWithMultipleStreams(
IColumn & column,
const InputStreamGetter & getter,
size_t limit,
double avg_value_size_hint,
bool position_independent_encoding,
SubstreamPath & path) const override;

enum class SerdesFormat
{
None = 0, // Decide by STORAGE_FORMAT_CURRENT
SizePrefix = 1, // Legacy format, corresponding to `LegacyName`
SeparateSizeAndChars = 2, // New format, corresponding to `NameV2`
};

inline static const String LegacyName{"String"}; // For compatibility of size-prefix format.
inline static const String NameV2{"StringV2"}; // The separate size and chars format.

// Both getDefaultName and getNullableDefaultName are unit-tests helpers.
static String getDefaultName();
static String getNullableDefaultName();

explicit DataTypeString(SerdesFormat serdes_fmt_ = SerdesFormat::None);

private:
const SerdesFormat serdes_fmt;
};

} // namespace DB
9 changes: 9 additions & 0 deletions dbms/src/DataTypes/IDataType.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,13 @@ bool IDataType::isArraySizes(const SubstreamPath & path)
return false;
}

bool IDataType::isStringSizes(const SubstreamPath & path)
{
return std::any_of(path.cbegin(), path.cend(), [](const auto & elem) {
return elem.type == IDataType::Substream::StringSizes;
});
}

String IDataType::getFileNameForStream(const String & column_name, const IDataType::SubstreamPath & path)
{
String nested_table_name = Nested::extractTableName(column_name);
Expand All @@ -127,6 +134,8 @@ String IDataType::getFileNameForStream(const String & column_name, const IDataTy
/// and name is encoded as a whole.
stream_name += "%2E" + escapeForFileName(elem.tuple_element_name);
}
else if (elem.type == Substream::StringSizes)
stream_name += ".size";
}
return stream_name;
}
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/DataTypes/IDataType.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ class IDataType : private boost::noncopyable
NullMap,

TupleElement,

StringSizes,
};
Type type;

Expand Down Expand Up @@ -421,6 +423,7 @@ class IDataType : private boost::noncopyable

static bool isNullMap(const SubstreamPath & path);
static bool isArraySizes(const SubstreamPath & path);
static bool isStringSizes(const SubstreamPath & path);
};


Expand Down
Loading

0 comments on commit 1060155

Please sign in to comment.