Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KVStore: Reduce the memory before and after a large txn is committed #9707

Merged
merged 24 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
4347a5c
add some debug
CalvinNeo Dec 6, 2024
b5a7e1d
some debug funcs
CalvinNeo Dec 10, 2024
a04ad2d
private some
CalvinNeo Dec 10, 2024
c8cffe4
improve performance
CalvinNeo Dec 10, 2024
fdbfa1f
fix pessi
CalvinNeo Dec 25, 2024
f667ba6
Update dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp
CalvinNeo Dec 25, 2024
a17701c
Update dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h
CalvinNeo Dec 25, 2024
a847489
Update dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp
CalvinNeo Dec 25, 2024
0b200fc
address some comments
CalvinNeo Dec 25, 2024
11aed2c
Merge branch 'try-fix-oom' of github.com:CalvinNeo/tics into try-fix-oom
CalvinNeo Dec 25, 2024
bb4a4dc
format the code
CalvinNeo Dec 25, 2024
37beddd
Update dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp
CalvinNeo Dec 27, 2024
bc7c71d
add tests and reorg
CalvinNeo Dec 27, 2024
c003dff
Merge branch 'try-fix-oom' of github.com:CalvinNeo/tics into try-fix-oom
CalvinNeo Dec 27, 2024
132211c
Update dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp
CalvinNeo Dec 27, 2024
0539605
short
CalvinNeo Dec 27, 2024
b41f8a0
Update dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp
CalvinNeo Dec 27, 2024
b4ea20c
format
CalvinNeo Dec 28, 2024
8af6d3a
Merge branch 'try-fix-oom' of github.com:CalvinNeo/tics into try-fix-oom
CalvinNeo Dec 28, 2024
36066b3
fix clang tidy
CalvinNeo Dec 30, 2024
f455012
Update dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h
CalvinNeo Dec 30, 2024
917ca1c
Update dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp
CalvinNeo Dec 30, 2024
e388256
Merge branch 'master' into try-fix-oom
CalvinNeo Dec 30, 2024
f36faba
Merge branch 'master' into try-fix-oom
ti-chi-bot[bot] Dec 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -459,10 +459,13 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
"Total number of keys processed in some types of Raft commands", \
Counter, \
F(type_write_put, {"type", "write_put"}), \
F(type_write_remove, {"type", "write_remove"}), \
F(type_lock_put, {"type", "lock_put"}), \
F(type_default_put, {"type", "default_put"}), \
F(type_write_del, {"type", "write_del"}), \
F(type_lock_del, {"type", "lock_del"}), \
F(type_pessimistic_lock_put, {"type", "pessimistic_lock_put"}), \
F(type_lock_replaced, {"type", "lock_replaced"}), \
F(type_default_del, {"type", "default_del"}), \
F(type_apply_snapshot, {"type", "apply_snapshot"}), \
F(type_apply_snapshot_default, {"type", "apply_snapshot_default"}), \
Expand Down
113 changes: 58 additions & 55 deletions dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,66 +275,65 @@ std::variant<RegionDataReadInfoList, RegionException::RegionReadStatus, LockInfo
bool need_data_value)
{
RegionDataReadInfoList data_list_read;
Lloyd-Pottiger marked this conversation as resolved.
Show resolved Hide resolved
DecodedLockCFValuePtr lock_value;
LockInfoPtr lock_info;

auto scanner = region->createCommittedScanner(true, need_data_value);

/// Some sanity checks for region meta.
{
auto scanner = region->createCommittedScanner(true, need_data_value);
/**
* special check: when source region is merging, read_index can not guarantee the behavior about target region.
* Reject all read request for safety.
* Only when region is Normal can continue read process.
*/
if (region->peerState() != raft_serverpb::PeerState::Normal)
return RegionException::RegionReadStatus::NOT_FOUND;

const auto & meta_snap = region->dumpRegionMetaSnapshot();
// No need to check conf_version if its peer state is normal
std::ignore = conf_version;
if (meta_snap.ver != region_version)
return RegionException::RegionReadStatus::EPOCH_NOT_MATCH;

// todo check table id
TableID mapped_table_id;
if (!computeMappedTableID(*meta_snap.range->rawKeys().first, mapped_table_id) || mapped_table_id != table_id)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Should not happen, region not belong to table, table_id={} expect_table_id={}",
mapped_table_id,
table_id);
}

/// Some sanity checks for region meta.
{
/**
* special check: when source region is merging, read_index can not guarantee the behavior about target region.
* Reject all read request for safety.
* Only when region is Normal can continue read process.
*/
if (region->peerState() != raft_serverpb::PeerState::Normal)
return RegionException::RegionReadStatus::NOT_FOUND;

const auto & meta_snap = region->dumpRegionMetaSnapshot();
// No need to check conf_version if its peer state is normal
std::ignore = conf_version;
if (meta_snap.ver != region_version)
return RegionException::RegionReadStatus::EPOCH_NOT_MATCH;

// todo check table id
TableID mapped_table_id;
if (!computeMappedTableID(*meta_snap.range->rawKeys().first, mapped_table_id)
|| mapped_table_id != table_id)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Should not happen, region not belong to table, table_id={} expect_table_id={}",
mapped_table_id,
table_id);
}
/// Deal with locks.
if (resolve_locks)
{
/// Check if there are any lock should be resolved, if so, throw LockException.
/// It will iterate all locks with in the time range.
lock_info = scanner.getLockInfo(RegionLockReadQuery{.read_tso = start_ts, .bypass_lock_ts = bypass_lock_ts});
}

/// Deal with locks.
if (resolve_locks)
{
/// Check if there are any lock should be resolved, if so, throw LockException.
lock_value
= scanner.getLockInfo(RegionLockReadQuery{.read_tso = start_ts, .bypass_lock_ts = bypass_lock_ts});
}
/// If there is no lock, leave scope of region scanner and raise LockException.
CalvinNeo marked this conversation as resolved.
Show resolved Hide resolved
/// Read raw KVs from region cache.
if (!lock_info)
{
// Shortcut for empty region.
Lloyd-Pottiger marked this conversation as resolved.
Show resolved Hide resolved
if (!scanner.hasNext())
return data_list_read;

// If worked with raftstore v2, the final size may not equal to here.
data_list_read.reserve(scanner.writeMapSize());

/// If there is no lock, leave scope of region scanner and raise LockException.
/// Read raw KVs from region cache.
if (!lock_value)
// Tiny optimization for queries that need only handle, tso, delmark.
do
{
// Shortcut for empty region.
if (!scanner.hasNext())
return data_list_read;

// If worked with raftstore v2, the final size may not equal to here.
data_list_read.reserve(scanner.writeMapSize());

// Tiny optimization for queries that need only handle, tso, delmark.
do
{
data_list_read.emplace_back(scanner.next());
} while (scanner.hasNext());
}
data_list_read.emplace_back(scanner.next());
} while (scanner.hasNext());
}
Lloyd-Pottiger marked this conversation as resolved.
Show resolved Hide resolved
else
{
return lock_info;
}
CalvinNeo marked this conversation as resolved.
Show resolved Hide resolved

if (lock_value)
return lock_value->intoLockInfo();

return data_list_read;
Lloyd-Pottiger marked this conversation as resolved.
Show resolved Hide resolved
}
Expand Down Expand Up @@ -389,12 +388,16 @@ void RemoveRegionCommitCache(const RegionPtr & region, const RegionDataReadInfoL
{
/// Remove data in region.
auto remover = region->createCommittedRemover(lock_region);
size_t remove_committed_count = 0;
for (const auto & [handle, write_type, commit_ts, value] : data_list_read)
{
std::ignore = write_type;
std::ignore = value;
remover.remove({handle, commit_ts});
// Effectively calls `RegionData::removeDataByWriteIt`.
if (remover.remove({handle, commit_ts}))
remove_committed_count++;
}
GET_METRIC(tiflash_raft_process_keys, type_write_remove).Increment(remove_committed_count);
}

// ParseTS parses the ts to (physical,logical).
Expand Down
10 changes: 7 additions & 3 deletions dbms/src/Storages/KVStore/MultiRaft/Persistence.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ bool KVStore::canFlushRegionDataImpl(
const auto current_applied_gap = index > last_compact_log_applied ? index - last_compact_log_applied : 0;

// TODO We will use truncated_index once Proxy/TiKV supports.
// They are always 0 currently.
// When a Region is newly created in TiFlash, last_compact_log_applied is 0, we don't trigger immediately.
if (last_compact_log_applied == 0)
{
Expand All @@ -205,12 +206,15 @@ bool KVStore::canFlushRegionDataImpl(

LOG_DEBUG(
log,
"{} approx mem cache info: rows {}, bytes {}, gap {}/{}",
"{} approx mem cache info: rows {}, bytes {}, gap {}/{}, data_summary: {}, applied_(index): {}/{}",
curr_region.toString(false),
rows,
size_bytes,
current_applied_gap,
gap_threshold);
gap_threshold,
curr_region.getData().summary(),
curr_region.appliedIndex(),
index);

if (can_flush && flush_if_possible)
{
Expand Down Expand Up @@ -265,4 +269,4 @@ bool KVStore::forceFlushRegionDataImpl(
GET_METRIC(tiflash_raft_apply_write_command_duration_seconds, type_flush_region).Observe(watch.elapsedSeconds());
return true;
}
} // namespace DB
} // namespace DB
16 changes: 12 additions & 4 deletions dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ std::pair<EngineStoreApplyRes, DM::WriteResult> Region::handleWriteRaftCmd(
};

const auto handle_write_cmd_func = [&]() {
size_t cmd_write_cf_cnt = 0, cache_written_size = 0;
size_t cmd_write_cf_cnt = 0;
auto ori_cache_size = dataSize();
for (UInt64 i = 0; i < cmds.len; ++i)
{
Expand All @@ -452,9 +452,17 @@ std::pair<EngineStoreApplyRes, DM::WriteResult> Region::handleWriteRaftCmd(
handle_by_index_func(i);
}
}
cache_written_size = dataSize() - ori_cache_size;
approx_mem_cache_rows += cmd_write_cf_cnt;
approx_mem_cache_bytes += cache_written_size;
auto current_size = dataSize();
if (current_size > ori_cache_size)
JinheLin marked this conversation as resolved.
Show resolved Hide resolved
approx_mem_cache_bytes += (current_size - ori_cache_size);
else
{
if (approx_mem_cache_bytes > ori_cache_size - current_size)
approx_mem_cache_bytes -= (ori_cache_size - current_size);
else
approx_mem_cache_bytes = 0;
}
CalvinNeo marked this conversation as resolved.
Show resolved Hide resolved
};

DM::WriteResult write_result = std::nullopt;
Expand Down Expand Up @@ -523,4 +531,4 @@ RegionRaftCommandDelegate & Region::makeRaftCommandDelegate(const KVStoreTaskLoc
std::ignore = lock;
return static_cast<RegionRaftCommandDelegate &>(*this);
}
} // namespace DB
} // namespace DB
17 changes: 9 additions & 8 deletions dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ const TiKVValue & RegionCFDataBase<Trait>::getTiKVValue(const Value & val)
template <typename Trait>
RegionDataRes RegionCFDataBase<Trait>::insert(TiKVKey && key, TiKVValue && value, DupCheck mode)
{
const auto & raw_key = RecordKVFormat::decodeTiKVKey(key);
auto kv_pair = Trait::genKVPair(std::move(key), raw_key, std::move(value));
auto raw_key = RecordKVFormat::decodeTiKVKey(key);
auto kv_pair = Trait::genKVPair(std::move(key), std::move(raw_key), std::move(value));
if (!kv_pair)
return 0;

Expand All @@ -63,18 +63,19 @@ RegionDataRes RegionCFDataBase<RegionLockCFDataTrait>::insert(TiKVKey && key, Ti
auto iter = data.find(kv_pair.first);
if (iter != data.end())
{
// Could be a perssimistic lock is overwritten, or a old generation large txn lock is overwritten.
added_size -= calcTiKVKeyValueSize(iter->second);
data.erase(iter);

// In most cases, an optimistic lock replace a pessimistic lock.
GET_METRIC(tiflash_raft_process_keys, type_lock_replaced).Increment(1);
}
else
if unlikely (is_large_txn)
{
if unlikely (is_large_txn)
{
GET_METRIC(tiflash_raft_process_keys, type_large_txn_lock_put).Increment(1);
}
GET_METRIC(tiflash_raft_process_keys, type_large_txn_lock_put).Increment(1);
}
}
// according to the process of pessimistic lock, just overwrite.
// According to the process of pessimistic lock, just overwrite.
data.emplace(std::move(kv_pair.first), std::move(kv_pair.second));
return added_size;
}
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <Common/TiFlashMetrics.h>
#include <Storages/KVStore/Decode/DecodedTiKVKeyValue.h>
#include <Storages/KVStore/MultiRaft/RegionRangeKeys.h>

Expand Down
5 changes: 2 additions & 3 deletions dbms/src/Storages/KVStore/MultiRaft/RegionCFDataTrait.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,8 @@ struct RegionLockCFDataTrait
{
auto key = std::make_shared<const TiKVKey>(std::move(key_));
auto value = std::make_shared<const TiKVValue>(std::move(value_));
return {
{key, std::string_view(key->data(), key->dataSize())},
Value{key, value, std::make_shared<const DecodedLockCFValue>(key, value)}};
auto lo = std::make_shared<const DecodedLockCFValue>(key, value);
return {{key, std::string_view(key->data(), key->dataSize())}, Value{key, value, std::move(lo)}};
}
};

Expand Down
47 changes: 34 additions & 13 deletions dbms/src/Storages/KVStore/MultiRaft/RegionData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <Storages/KVStore/FFI/ColumnFamily.h>
#include <Storages/KVStore/MultiRaft/RegionData.h>
#include <Storages/KVStore/Read/RegionLockInfo.h>
#include <Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h>

namespace DB
{
Expand Down Expand Up @@ -213,7 +214,7 @@ std::optional<RegionDataReadInfo> RegionData::readDataByWriteIt(
return RegionDataReadInfo{pk, decoded_val.write_type, ts, decoded_val.short_value};
}

DecodedLockCFValuePtr RegionData::getLockInfo(const RegionLockReadQuery & query) const
LockInfoPtr RegionData::getLockInfo(const RegionLockReadQuery & query) const
{
for (const auto & [pk, value] : lock_cf.getData())
{
Expand All @@ -222,22 +223,37 @@ DecodedLockCFValuePtr RegionData::getLockInfo(const RegionLockReadQuery & query)
const auto & [tikv_key, tikv_val, lock_info_ptr] = value;
std::ignore = tikv_key;
std::ignore = tikv_val;
const auto & lock_info = *lock_info_ptr;
const auto & lock_info_raw = *lock_info_ptr;

if (lock_info.lock_version > query.read_tso || lock_info.lock_type == kvrpcpb::Op::Lock
|| lock_info.lock_type == kvrpcpb::Op::PessimisticLock)
continue;
if (lock_info.min_commit_ts > query.read_tso)
continue;
if (query.bypass_lock_ts)
{
if (query.bypass_lock_ts->count(lock_info.lock_version))
bool should_continue = false;
lock_info_raw.withInner([&](const RecordKVFormat::DecodedLockCFValue::Inner & in) {
if (in.lock_version > query.read_tso || in.lock_type == kvrpcpb::Op::Lock
|| in.lock_type == kvrpcpb::Op::PessimisticLock)
{
should_continue = true;
return;
}
if (in.min_commit_ts > query.read_tso)
{
GET_METRIC(tiflash_raft_read_index_events_count, type_bypass_lock).Increment();
continue;
should_continue = true;
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bool withInner(std::function<bool(const Inner &)> f) const; may be better?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar, I can move it into the DecodedLockCfValue

}
if (query.bypass_lock_ts)
{
if (query.bypass_lock_ts->count(in.lock_version))
{
GET_METRIC(tiflash_raft_read_index_events_count, type_bypass_lock).Increment();
should_continue = true;
}
}
return;
});
if (should_continue)
{
continue;
}
return lock_info_ptr;

return lock_info_ptr->intoLockInfo();
}

return nullptr;
Expand Down Expand Up @@ -367,4 +383,9 @@ RegionData & RegionData::operator=(RegionData && rhs)
return *this;
}

String RegionData::summary() const
{
return fmt::format("write:{},lock:{},default:{}", write_cf.getSize(), lock_cf.getSize(), default_cf.getSize());
}

} // namespace DB
6 changes: 4 additions & 2 deletions dbms/src/Storages/KVStore/MultiRaft/RegionData.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class RegionData
public:
using WriteCFIter = RegionWriteCFData::Map::iterator;
using ConstWriteCFIter = RegionWriteCFData::Map::const_iterator;
using LockInfoPtr = std::unique_ptr<kvrpcpb::LockInfo>;

static void reportAlloc(size_t delta);
static void reportDealloc(size_t delta);
Expand All @@ -53,7 +54,7 @@ class RegionData
UInt64 applied,
bool hard_error);

DecodedLockCFValuePtr getLockInfo(const RegionLockReadQuery & query) const;
LockInfoPtr getLockInfo(const RegionLockReadQuery & query) const;

std::shared_ptr<const TiKVValue> getLockByKey(const TiKVKey & key) const;

Expand Down Expand Up @@ -84,6 +85,7 @@ class RegionData
RegionData(RegionData && data);
RegionData & operator=(RegionData &&);

String summary() const;
struct OrphanKeysInfo
{
// Protected by region task lock.
Expand Down Expand Up @@ -127,7 +129,7 @@ class RegionData
RegionLockCFData lock_cf;
OrphanKeysInfo orphan_keys_info;

// Size of data cf & write cf, without lock cf.
// Size of data cf & write cf, with lock cf.
std::atomic<size_t> cf_data_size = 0;
};

Expand Down
Loading