Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KVStore: Reduce the memory before and after a large txn is committed #9707

Merged
merged 24 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
4347a5c
add some debug
CalvinNeo Dec 6, 2024
b5a7e1d
some debug funcs
CalvinNeo Dec 10, 2024
a04ad2d
private some
CalvinNeo Dec 10, 2024
c8cffe4
improve performance
CalvinNeo Dec 10, 2024
fdbfa1f
fix pessi
CalvinNeo Dec 25, 2024
f667ba6
Update dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp
CalvinNeo Dec 25, 2024
a17701c
Update dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h
CalvinNeo Dec 25, 2024
a847489
Update dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp
CalvinNeo Dec 25, 2024
0b200fc
address some comments
CalvinNeo Dec 25, 2024
11aed2c
Merge branch 'try-fix-oom' of github.com:CalvinNeo/tics into try-fix-oom
CalvinNeo Dec 25, 2024
bb4a4dc
format the code
CalvinNeo Dec 25, 2024
37beddd
Update dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp
CalvinNeo Dec 27, 2024
bc7c71d
add tests and reorg
CalvinNeo Dec 27, 2024
c003dff
Merge branch 'try-fix-oom' of github.com:CalvinNeo/tics into try-fix-oom
CalvinNeo Dec 27, 2024
132211c
Update dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp
CalvinNeo Dec 27, 2024
0539605
short
CalvinNeo Dec 27, 2024
b41f8a0
Update dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp
CalvinNeo Dec 27, 2024
b4ea20c
format
CalvinNeo Dec 28, 2024
8af6d3a
Merge branch 'try-fix-oom' of github.com:CalvinNeo/tics into try-fix-oom
CalvinNeo Dec 28, 2024
36066b3
fix clang tidy
CalvinNeo Dec 30, 2024
f455012
Update dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h
CalvinNeo Dec 30, 2024
917ca1c
Update dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp
CalvinNeo Dec 30, 2024
e388256
Merge branch 'master' into try-fix-oom
CalvinNeo Dec 30, 2024
f36faba
Merge branch 'master' into try-fix-oom
ti-chi-bot[bot] Dec 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -459,10 +459,13 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
"Total number of keys processed in some types of Raft commands", \
Counter, \
F(type_write_put, {"type", "write_put"}), \
F(type_write_remove, {"type", "write_remove"}), \
F(type_lock_put, {"type", "lock_put"}), \
F(type_default_put, {"type", "default_put"}), \
F(type_write_del, {"type", "write_del"}), \
F(type_lock_del, {"type", "lock_del"}), \
F(type_pessimistic_lock_put, {"type", "pessimistic_lock_put"}), \
F(type_lock_replaced, {"type", "lock_replaced"}), \
F(type_default_del, {"type", "default_del"}), \
F(type_apply_snapshot, {"type", "apply_snapshot"}), \
F(type_apply_snapshot_default, {"type", "apply_snapshot_default"}), \
Expand Down
112 changes: 55 additions & 57 deletions dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,68 +273,62 @@ std::variant<RegionDataReadInfoList, RegionException::RegionReadStatus, LockInfo
bool resolve_locks,
bool need_data_value)
{
RegionDataReadInfoList data_list_read;
DecodedLockCFValuePtr lock_value;
{
auto scanner = region->createCommittedScanner(true, need_data_value);
LockInfoPtr lock_info;

/// Some sanity checks for region meta.
{
/**
* special check: when source region is merging, read_index can not guarantee the behavior about target region.
* Reject all read request for safety.
* Only when region is Normal can continue read process.
*/
if (region->peerState() != raft_serverpb::PeerState::Normal)
return RegionException::RegionReadStatus::NOT_FOUND;

const auto & meta_snap = region->dumpRegionMetaSnapshot();
// No need to check conf_version if its peer state is normal
std::ignore = conf_version;
if (meta_snap.ver != region_version)
return RegionException::RegionReadStatus::EPOCH_NOT_MATCH;

// todo check table id
TableID mapped_table_id;
if (!computeMappedTableID(*meta_snap.range->rawKeys().first, mapped_table_id)
|| mapped_table_id != table_id)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Should not happen, region not belong to table, table_id={} expect_table_id={}",
mapped_table_id,
table_id);
}
auto scanner = region->createCommittedScanner(true, need_data_value);

/// Deal with locks.
if (resolve_locks)
{
/// Check if there are any lock should be resolved, if so, throw LockException.
lock_value
= scanner.getLockInfo(RegionLockReadQuery{.read_tso = start_ts, .bypass_lock_ts = bypass_lock_ts});
}
/// Some sanity checks for region meta.
{
/**
* special check: when source region is merging, read_index can not guarantee the behavior about target region.
* Reject all read request for safety.
* Only when region is Normal can continue read process.
*/
if (region->peerState() != raft_serverpb::PeerState::Normal)
return RegionException::RegionReadStatus::NOT_FOUND;

const auto & meta_snap = region->dumpRegionMetaSnapshot();
// No need to check conf_version if its peer state is normal
std::ignore = conf_version;
if (meta_snap.ver != region_version)
return RegionException::RegionReadStatus::EPOCH_NOT_MATCH;

// todo check table id
TableID mapped_table_id;
if (!computeMappedTableID(*meta_snap.range->rawKeys().first, mapped_table_id) || mapped_table_id != table_id)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Should not happen, region not belong to table, table_id={} expect_table_id={}",
mapped_table_id,
table_id);
}

/// If there is no lock, leave scope of region scanner and raise LockException.
/// Read raw KVs from region cache.
if (!lock_value)
{
// Shortcut for empty region.
if (!scanner.hasNext())
return data_list_read;

// If worked with raftstore v2, the final size may not equal to here.
data_list_read.reserve(scanner.writeMapSize());

// Tiny optimization for queries that need only handle, tso, delmark.
do
{
data_list_read.emplace_back(scanner.next());
} while (scanner.hasNext());
}
/// Deal with locks.
if (resolve_locks)
{
/// Check if there are any lock should be resolved, if so, throw LockException.
/// It will iterate all locks with in the time range.
lock_info = scanner.getLockInfo(RegionLockReadQuery{.read_tso = start_ts, .bypass_lock_ts = bypass_lock_ts});
}

if (lock_value)
return lock_value->intoLockInfo();
if (lock_info)
return lock_info;

/// If there is no lock, leave scope of region scanner and raise LockException.
CalvinNeo marked this conversation as resolved.
Show resolved Hide resolved
/// Read raw KVs from region cache.
RegionDataReadInfoList data_list_read;
// Shortcut for empty region.
if (!scanner.hasNext())
return data_list_read;

// If worked with raftstore v2, the final size may not equal to here.
data_list_read.reserve(scanner.writeMapSize());

// Tiny optimization for queries that need only handle, tso, delmark.
do
{
data_list_read.emplace_back(scanner.next());
} while (scanner.hasNext());
return data_list_read;
}

Expand Down Expand Up @@ -388,12 +382,16 @@ void RemoveRegionCommitCache(const RegionPtr & region, const RegionDataReadInfoL
{
/// Remove data in region.
auto remover = region->createCommittedRemover(lock_region);
size_t remove_committed_count = 0;
for (const auto & [handle, write_type, commit_ts, value] : data_list_read)
{
std::ignore = write_type;
std::ignore = value;
remover.remove({handle, commit_ts});
// Effectively calls `RegionData::removeDataByWriteIt`.
if (remover.remove({handle, commit_ts}))
remove_committed_count++;
}
GET_METRIC(tiflash_raft_process_keys, type_write_remove).Increment(remove_committed_count);
}

// ParseTS parses the ts to (physical,logical).
Expand Down
10 changes: 7 additions & 3 deletions dbms/src/Storages/KVStore/MultiRaft/Persistence.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ bool KVStore::canFlushRegionDataImpl(
const auto current_applied_gap = index > last_compact_log_applied ? index - last_compact_log_applied : 0;

// TODO We will use truncated_index once Proxy/TiKV supports.
// They are always 0 currently.
// When a Region is newly created in TiFlash, last_compact_log_applied is 0, we don't trigger immediately.
if (last_compact_log_applied == 0)
{
Expand All @@ -205,12 +206,15 @@ bool KVStore::canFlushRegionDataImpl(

LOG_DEBUG(
log,
"{} approx mem cache info: rows {}, bytes {}, gap {}/{}",
"{} approx mem cache info: rows {}, bytes {}, gap {}/{}, data_summary: {}, applied_(index): {}/{}",
curr_region.toString(false),
rows,
size_bytes,
current_applied_gap,
gap_threshold);
gap_threshold,
curr_region.getData().summary(),
curr_region.appliedIndex(),
index);

if (can_flush && flush_if_possible)
{
Expand Down Expand Up @@ -265,4 +269,4 @@ bool KVStore::forceFlushRegionDataImpl(
GET_METRIC(tiflash_raft_apply_write_command_duration_seconds, type_flush_region).Observe(watch.elapsedSeconds());
return true;
}
} // namespace DB
} // namespace DB
11 changes: 7 additions & 4 deletions dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ std::pair<EngineStoreApplyRes, DM::WriteResult> Region::handleWriteRaftCmd(
};

const auto handle_write_cmd_func = [&]() {
size_t cmd_write_cf_cnt = 0, cache_written_size = 0;
size_t cmd_write_cf_cnt = 0;
auto ori_cache_size = dataSize();
for (UInt64 i = 0; i < cmds.len; ++i)
{
Expand All @@ -452,9 +452,12 @@ std::pair<EngineStoreApplyRes, DM::WriteResult> Region::handleWriteRaftCmd(
handle_by_index_func(i);
}
}
cache_written_size = dataSize() - ori_cache_size;
approx_mem_cache_rows += cmd_write_cf_cnt;
approx_mem_cache_bytes += cache_written_size;
auto current_size = dataSize();
if (auto t = approx_mem_cache_rows + current_size; t > ori_cache_size)
approx_mem_cache_rows = t - ori_cache_size;
else
approx_mem_cache_rows = 0;
};

DM::WriteResult write_result = std::nullopt;
Expand Down Expand Up @@ -523,4 +526,4 @@ RegionRaftCommandDelegate & Region::makeRaftCommandDelegate(const KVStoreTaskLoc
std::ignore = lock;
return static_cast<RegionRaftCommandDelegate &>(*this);
}
} // namespace DB
} // namespace DB
17 changes: 9 additions & 8 deletions dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ const TiKVValue & RegionCFDataBase<Trait>::getTiKVValue(const Value & val)
template <typename Trait>
RegionDataRes RegionCFDataBase<Trait>::insert(TiKVKey && key, TiKVValue && value, DupCheck mode)
{
const auto & raw_key = RecordKVFormat::decodeTiKVKey(key);
auto kv_pair = Trait::genKVPair(std::move(key), raw_key, std::move(value));
auto raw_key = RecordKVFormat::decodeTiKVKey(key);
auto kv_pair = Trait::genKVPair(std::move(key), std::move(raw_key), std::move(value));
if (!kv_pair)
return 0;

Expand All @@ -63,18 +63,19 @@ RegionDataRes RegionCFDataBase<RegionLockCFDataTrait>::insert(TiKVKey && key, Ti
auto iter = data.find(kv_pair.first);
if (iter != data.end())
{
// Could be a perssimistic lock is overwritten, or a old generation large txn lock is overwritten.
added_size -= calcTiKVKeyValueSize(iter->second);
data.erase(iter);

// In most cases, an optimistic lock replace a pessimistic lock.
GET_METRIC(tiflash_raft_process_keys, type_lock_replaced).Increment(1);
}
else
if unlikely (is_large_txn)
{
if unlikely (is_large_txn)
{
GET_METRIC(tiflash_raft_process_keys, type_large_txn_lock_put).Increment(1);
}
GET_METRIC(tiflash_raft_process_keys, type_large_txn_lock_put).Increment(1);
}
}
// according to the process of pessimistic lock, just overwrite.
// According to the process of pessimistic lock, just overwrite.
data.emplace(std::move(kv_pair.first), std::move(kv_pair.second));
return added_size;
}
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <Common/TiFlashMetrics.h>
#include <Storages/KVStore/Decode/DecodedTiKVKeyValue.h>
#include <Storages/KVStore/MultiRaft/RegionRangeKeys.h>

Expand Down
5 changes: 2 additions & 3 deletions dbms/src/Storages/KVStore/MultiRaft/RegionCFDataTrait.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,8 @@ struct RegionLockCFDataTrait
{
auto key = std::make_shared<const TiKVKey>(std::move(key_));
auto value = std::make_shared<const TiKVValue>(std::move(value_));
return {
{key, std::string_view(key->data(), key->dataSize())},
Value{key, value, std::make_shared<const DecodedLockCFValue>(key, value)}};
auto lo = std::make_shared<const DecodedLockCFValue>(key, value);
return {{key, std::string_view(key->data(), key->dataSize())}, Value{key, value, std::move(lo)}};
}
};

Expand Down
26 changes: 11 additions & 15 deletions dbms/src/Storages/KVStore/MultiRaft/RegionData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <Storages/KVStore/FFI/ColumnFamily.h>
#include <Storages/KVStore/MultiRaft/RegionData.h>
#include <Storages/KVStore/Read/RegionLockInfo.h>
#include <Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h>

namespace DB
{
Expand Down Expand Up @@ -213,7 +214,7 @@ std::optional<RegionDataReadInfo> RegionData::readDataByWriteIt(
return RegionDataReadInfo{pk, decoded_val.write_type, ts, decoded_val.short_value};
}

DecodedLockCFValuePtr RegionData::getLockInfo(const RegionLockReadQuery & query) const
LockInfoPtr RegionData::getLockInfo(const RegionLockReadQuery & query) const
{
for (const auto & [pk, value] : lock_cf.getData())
{
Expand All @@ -222,22 +223,12 @@ DecodedLockCFValuePtr RegionData::getLockInfo(const RegionLockReadQuery & query)
const auto & [tikv_key, tikv_val, lock_info_ptr] = value;
std::ignore = tikv_key;
std::ignore = tikv_val;
const auto & lock_info = *lock_info_ptr;

if (lock_info.lock_version > query.read_tso || lock_info.lock_type == kvrpcpb::Op::Lock
|| lock_info.lock_type == kvrpcpb::Op::PessimisticLock)
continue;
if (lock_info.min_commit_ts > query.read_tso)
continue;
if (query.bypass_lock_ts)
const auto & lock_info_raw = *lock_info_ptr;

if (auto t = lock_info_raw.getLockInfoPtr(query); t != nullptr)
{
if (query.bypass_lock_ts->count(lock_info.lock_version))
{
GET_METRIC(tiflash_raft_read_index_events_count, type_bypass_lock).Increment();
continue;
}
return t;
}
return lock_info_ptr;
}

return nullptr;
Expand Down Expand Up @@ -367,4 +358,9 @@ RegionData & RegionData::operator=(RegionData && rhs)
return *this;
}

String RegionData::summary() const
{
return fmt::format("write:{},lock:{},default:{}", write_cf.getSize(), lock_cf.getSize(), default_cf.getSize());
}

} // namespace DB
6 changes: 4 additions & 2 deletions dbms/src/Storages/KVStore/MultiRaft/RegionData.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class RegionData
public:
using WriteCFIter = RegionWriteCFData::Map::iterator;
using ConstWriteCFIter = RegionWriteCFData::Map::const_iterator;
using LockInfoPtr = std::unique_ptr<kvrpcpb::LockInfo>;

static void reportAlloc(size_t delta);
static void reportDealloc(size_t delta);
Expand All @@ -53,7 +54,7 @@ class RegionData
UInt64 applied,
bool hard_error);

DecodedLockCFValuePtr getLockInfo(const RegionLockReadQuery & query) const;
LockInfoPtr getLockInfo(const RegionLockReadQuery & query) const;

std::shared_ptr<const TiKVValue> getLockByKey(const TiKVKey & key) const;

Expand Down Expand Up @@ -84,6 +85,7 @@ class RegionData
RegionData(RegionData && data);
RegionData & operator=(RegionData &&);

String summary() const;
struct OrphanKeysInfo
{
// Protected by region task lock.
Expand Down Expand Up @@ -127,7 +129,7 @@ class RegionData
RegionLockCFData lock_cf;
OrphanKeysInfo orphan_keys_info;

// Size of data cf & write cf, without lock cf.
// Size of data cf & write cf, with lock cf.
std::atomic<size_t> cf_data_size = 0;
};

Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Storages/KVStore/Read/ReadIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,11 @@ void WaitCheckRegionReadyImpl(
need_retry = false;
LOG_DEBUG(
log,
"neglect error, region_id={} not_found={} epoch_not_match={}",
"neglect error, region_id={} not_found={} epoch_not_match={} region_error={}",
region_id,
region_error.has_region_not_found(),
region_error.has_epoch_not_match());
region_error.has_epoch_not_match(),
region_error.DebugString());
}
if (!need_retry)
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/KVStore/Region.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ std::optional<RegionDataReadInfo> Region::readDataByWriteIt(
}
}

DecodedLockCFValuePtr Region::getLockInfo(const RegionLockReadQuery & query) const
LockInfoPtr Region::getLockInfo(const RegionLockReadQuery & query) const
{
return data.getLockInfo(query);
}
Expand Down
Loading