Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storages: remove ColumnFileTiny cache #9677

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 6 additions & 14 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@
#include <Storages/DeltaMerge/convertColumnTypeHelpers.h>


namespace DB
{
namespace DM
namespace DB::DM
{

void ColumnFileInMemory::fillColumns(const ColumnDefines & col_defs, size_t col_count, Columns & result) const
{
if (result.size() >= col_count)
Expand Down Expand Up @@ -108,17 +107,11 @@ Block ColumnFileInMemory::readDataForFlush() const
return cache_block.cloneWithColumns(std::move(columns));
}


ColumnPtr ColumnFileInMemoryReader::getPKColumn()
{
memory_file.fillColumns(*col_defs, 1, cols_data_cache);
return cols_data_cache[0];
}

ColumnPtr ColumnFileInMemoryReader::getVersionColumn()
std::pair<ColumnPtr, ColumnPtr> ColumnFileInMemoryReader::getPKAndVersionColumns()
{
// fill the first 2 columns into `cols_data_cache`
memory_file.fillColumns(*col_defs, 2, cols_data_cache);
JaySon-Huang marked this conversation as resolved.
Show resolved Hide resolved
return cols_data_cache[1];
return {cols_data_cache[0], cols_data_cache[1]};
}

std::pair<size_t, size_t> ColumnFileInMemoryReader::readRows(
Expand Down Expand Up @@ -162,5 +155,4 @@ ColumnFileReaderPtr ColumnFileInMemoryReader::createNewReader(const ColumnDefine
return std::make_shared<ColumnFileInMemoryReader>(memory_file, new_col_defs, cols_data_cache);
}

} // namespace DM
} // namespace DB
} // namespace DB::DM
26 changes: 13 additions & 13 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@
#include <Storages/DeltaMerge/ColumnFile/ColumnFileSchema.h>
#include <Storages/DeltaMerge/Remote/Serializer_fwd.h>

namespace DB
{
namespace DM
namespace DB::DM
{

class ColumnFileInMemory;
using ColumnFileInMemoryPtr = std::shared_ptr<ColumnFileInMemory>;

Expand All @@ -44,6 +43,8 @@ class ColumnFileInMemory : public ColumnFile
CachePtr cache;

private:
// Ensure the columns[0~`col_count`] in the `result` are filled with the data of this
// ColumnFileInMemory. The column id and data type in `result` is defined by `col_defs`.
void fillColumns(const ColumnDefines & col_defs, size_t col_count, Columns & result) const;

const DataTypePtr & getDataType(ColId column_id) const { return schema->getDataType(column_id); }
Expand Down Expand Up @@ -91,12 +92,13 @@ class ColumnFileInMemory : public ColumnFile

String toString() const override
{
String s = "{in_memory_file,rows:" + DB::toString(rows) //
+ ",bytes:" + DB::toString(bytes) //
+ ",disable_append:" + DB::toString(disable_append) //
+ ",schema:" + (schema ? schema->toString() : "none") //
+ ",cache_block:" + (cache ? cache->block.dumpStructure() : "none") + "}";
return s;
return fmt::format(
"{{in_memory_file,rows:{},bytes:{},disable_append:{},schema:{},cache_block:{}}}",
rows,
bytes,
disable_append,
(schema ? schema->toString() : "none"),
(cache ? cache->block.dumpJsonStructure() : "none"));
}
};

Expand Down Expand Up @@ -126,8 +128,7 @@ class ColumnFileInMemoryReader : public ColumnFileReader
{}

/// This is a ugly hack to fast return PK & Version column.
ColumnPtr getPKColumn();
ColumnPtr getVersionColumn();
std::pair<ColumnPtr, ColumnPtr> getPKAndVersionColumns();

std::pair<size_t, size_t> readRows(
MutableColumns & output_cols,
Expand All @@ -142,5 +143,4 @@ class ColumnFileInMemoryReader : public ColumnFileReader
ColumnFileReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs, ReadTag) override;
};

} // namespace DM
} // namespace DB
} // namespace DB::DM
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,7 @@ bool ColumnFileSetReader::shouldPlace(
if (column_file->isInMemoryFile())
{
auto & dpb_reader = typeid_cast<ColumnFileInMemoryReader &>(*column_file_reader);
auto pk_column = dpb_reader.getPKColumn();
auto version_column = dpb_reader.getVersionColumn();
auto [pk_column, version_column] = dpb_reader.getPKAndVersionColumns();

auto rkcc = RowKeyColumnContainer(pk_column, context.is_common_handle);
const auto & version_col_data = toColumnVectorData<UInt64>(version_column);
Expand All @@ -305,8 +304,7 @@ bool ColumnFileSetReader::shouldPlace(
else if (column_file->isTinyFile())
{
auto & dpb_reader = typeid_cast<ColumnFileTinyReader &>(*column_file_reader);
auto pk_column = dpb_reader.getPKColumn();
auto version_column = dpb_reader.getVersionColumn();
auto [pk_column, version_column] = dpb_reader.getPKAndVersionColumns();

auto rkcc = RowKeyColumnContainer(pk_column, context.is_common_handle);
const auto & version_col_data = toColumnVectorData<UInt64>(version_column);
Expand Down
166 changes: 16 additions & 150 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,12 @@

#include <Common/Exception.h>
#include <Interpreters/Context.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileDataProvider.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileSchema.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileTinyReader.h>
#include <Storages/DeltaMerge/DMContext.h>
#include <Storages/DeltaMerge/WriteBatchesImpl.h>
#include <Storages/DeltaMerge/convertColumnTypeHelpers.h>
#include <Storages/Page/V3/Universal/UniversalPageStorage.h>

#include <memory>
Expand All @@ -30,122 +28,6 @@
namespace DB::DM
{

Columns ColumnFileTiny::readFromCache(const ColumnDefines & column_defines, size_t col_start, size_t col_end) const
{
if (!cache)
return {};

Columns columns;
const auto & colid_to_offset = schema->getColIdToOffset();
for (size_t i = col_start; i < col_end; ++i)
{
const auto & cd = column_defines[i];
if (auto it = colid_to_offset.find(cd.id); it != colid_to_offset.end())
{
auto col_offset = it->second;
// Copy data from cache
const auto & type = getDataType(cd.id);
auto col_data = type->createColumn();
col_data->insertRangeFrom(*cache->block.getByPosition(col_offset).column, 0, rows);
// Cast if need
auto col_converted = convertColumnByColumnDefineIfNeed(type, std::move(col_data), cd);
columns.push_back(std::move(col_converted));
}
else
{
ColumnPtr column = createColumnWithDefaultValue(cd, rows);
columns.emplace_back(std::move(column));
}
}
return columns;
}

Columns ColumnFileTiny::readFromDisk(
const IColumnFileDataProviderPtr & data_provider, //
const ColumnDefines & column_defines,
size_t col_start,
size_t col_end) const
{
const size_t num_columns_read = col_end - col_start;

Columns columns(num_columns_read); // allocate empty columns

std::vector<size_t> fields;
const auto & colid_to_offset = schema->getColIdToOffset();
for (size_t index = col_start; index < col_end; ++index)
{
const auto & cd = column_defines[index];
if (auto it = colid_to_offset.find(cd.id); it != colid_to_offset.end())
{
auto col_index = it->second;
fields.emplace_back(col_index);
}
else
{
// New column after ddl is not exist in this CFTiny, fill with default value
columns[index - col_start] = createColumnWithDefaultValue(cd, rows);
}
}

// All columns to be read are not exist in this CFTiny and filled with default value,
// we can skip reading from disk
if (fields.empty())
return columns;

// Read the columns from disk and apply DDL cast if need
Page page = data_provider->readTinyData(data_page_id, fields);
// use `unlikely` to reduce performance impact on keyspaces without enable encryption
if (unlikely(file_provider->isEncryptionEnabled(keyspace_id)))
{
// decrypt the page data in place
size_t data_size = page.data.size();
char * data = page.mem_holder.get();
file_provider->decryptPage(keyspace_id, data, data_size, data_page_id);
}

for (size_t index = col_start; index < col_end; ++index)
{
const size_t index_in_read_columns = index - col_start;
if (columns[index_in_read_columns] != nullptr)
{
// the column is fill with default values.
continue;
}
auto col_id = column_defines[index].id;
auto col_index = colid_to_offset.at(col_id);
auto data_buf = page.getFieldData(col_index);

const auto & cd = column_defines[index];
// Deserialize column by pack's schema
const auto & type = getDataType(cd.id);
auto col_data = type->createColumn();
deserializeColumn(*col_data, type, data_buf, rows);

columns[index_in_read_columns] = convertColumnByColumnDefineIfNeed(type, std::move(col_data), cd);
}

return columns;
}

void ColumnFileTiny::fillColumns(
const IColumnFileDataProviderPtr & data_provider,
const ColumnDefines & col_defs,
size_t col_count,
Columns & result) const
{
if (result.size() >= col_count)
return;

size_t col_start = result.size();
size_t col_end = col_count;

Columns read_cols = readFromCache(col_defs, col_start, col_end);
if (read_cols.empty())
read_cols = readFromDisk(data_provider, col_defs, col_start, col_end);

result.insert(result.end(), read_cols.begin(), read_cols.end());
}

ColumnFileReaderPtr ColumnFileTiny::getReader(
const DMContext &,
const IColumnFileDataProviderPtr & data_provider,
Expand Down Expand Up @@ -367,51 +249,37 @@ std::tuple<ColumnFilePersistedPtr, BlockPtr> ColumnFileTiny::createFromCheckpoin

Block ColumnFileTiny::readBlockForMinorCompaction(const PageReader & page_reader) const
{
if (cache)
{
std::scoped_lock lock(cache->mutex);
const auto & schema_ref = schema->getSchema();
auto page = page_reader.read(data_page_id);
auto columns = schema_ref.cloneEmptyColumns();

auto & cache_block = cache->block;
MutableColumns columns = cache_block.cloneEmptyColumns();
for (size_t i = 0; i < cache_block.columns(); ++i)
columns[i]->insertRangeFrom(*cache_block.getByPosition(i).column, 0, rows);
return cache_block.cloneWithColumns(std::move(columns));
}
else
{
const auto & schema_ref = schema->getSchema();
auto page = page_reader.read(data_page_id);
auto columns = schema_ref.cloneEmptyColumns();

if (unlikely(columns.size() != page.fieldSize()))
throw Exception("Column size and field size not the same");
if (unlikely(columns.size() != page.fieldSize()))
throw Exception("Column size and field size not the same");

for (size_t index = 0; index < schema_ref.columns(); ++index)
{
auto data_buf = page.getFieldData(index);
const auto & type = schema_ref.getByPosition(index).type;
auto & column = columns[index];
deserializeColumn(*column, type, data_buf, rows);
}

return schema_ref.cloneWithColumns(std::move(columns));
for (size_t index = 0; index < schema_ref.columns(); ++index)
{
auto data_buf = page.getFieldData(index);
const auto & type = schema_ref.getByPosition(index).type;
auto & column = columns[index];
deserializeColumn(*column, type, data_buf, rows);
}

return schema_ref.cloneWithColumns(std::move(columns));
}

ColumnFileTinyPtr ColumnFileTiny::writeColumnFile(
const DMContext & context,
const Block & block,
size_t offset,
size_t limit,
WriteBatches & wbs,
const CachePtr & cache)
WriteBatches & wbs)
{
auto page_id = writeColumnFileData(context, block, offset, limit, wbs);

auto schema = getSharedBlockSchemas(context)->getOrCreate(block);

auto bytes = block.bytes(offset, limit);
return std::make_shared<ColumnFileTiny>(schema, limit, bytes, page_id, context, nullptr, cache);
return std::make_shared<ColumnFileTiny>(schema, limit, bytes, page_id, context);
}

PageIdU64 ColumnFileTiny::writeColumnFileData(
Expand Down Expand Up @@ -485,16 +353,14 @@ ColumnFileTiny::ColumnFileTiny(
UInt64 bytes_,
PageIdU64 data_page_id_,
const DMContext & dm_context,
const IndexInfosPtr & index_infos_,
const CachePtr & cache_)
const IndexInfosPtr & index_infos_)
: schema(schema_)
, rows(rows_)
, bytes(bytes_)
, data_page_id(data_page_id_)
, index_infos(index_infos_)
, keyspace_id(dm_context.keyspace_id)
, file_provider(dm_context.global_context.getFileProvider())
, cache(cache_)
{}

} // namespace DB::DM
Loading