Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KVStore: Reduce the memory before and after a large txn is committed #9707

Merged
merged 24 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
4347a5c
add some debug
CalvinNeo Dec 6, 2024
b5a7e1d
some debug funcs
CalvinNeo Dec 10, 2024
a04ad2d
private some
CalvinNeo Dec 10, 2024
c8cffe4
improve performance
CalvinNeo Dec 10, 2024
fdbfa1f
fix pessi
CalvinNeo Dec 25, 2024
f667ba6
Update dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp
CalvinNeo Dec 25, 2024
a17701c
Update dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h
CalvinNeo Dec 25, 2024
a847489
Update dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp
CalvinNeo Dec 25, 2024
0b200fc
address some comments
CalvinNeo Dec 25, 2024
11aed2c
Merge branch 'try-fix-oom' of github.com:CalvinNeo/tics into try-fix-oom
CalvinNeo Dec 25, 2024
bb4a4dc
format the code
CalvinNeo Dec 25, 2024
37beddd
Update dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp
CalvinNeo Dec 27, 2024
bc7c71d
add tests and reorg
CalvinNeo Dec 27, 2024
c003dff
Merge branch 'try-fix-oom' of github.com:CalvinNeo/tics into try-fix-oom
CalvinNeo Dec 27, 2024
132211c
Update dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp
CalvinNeo Dec 27, 2024
0539605
short
CalvinNeo Dec 27, 2024
b41f8a0
Update dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp
CalvinNeo Dec 27, 2024
b4ea20c
format
CalvinNeo Dec 28, 2024
8af6d3a
Merge branch 'try-fix-oom' of github.com:CalvinNeo/tics into try-fix-oom
CalvinNeo Dec 28, 2024
36066b3
fix clang tidy
CalvinNeo Dec 30, 2024
f455012
Update dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h
CalvinNeo Dec 30, 2024
917ca1c
Update dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp
CalvinNeo Dec 30, 2024
e388256
Merge branch 'master' into try-fix-oom
CalvinNeo Dec 30, 2024
f36faba
Merge branch 'master' into try-fix-oom
ti-chi-bot[bot] Dec 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -459,10 +459,14 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
"Total number of keys processed in some types of Raft commands", \
Counter, \
F(type_write_put, {"type", "write_put"}), \
F(type_write_remove, {"type", "write_remove"}), \
F(type_lock_put, {"type", "lock_put"}), \
F(type_default_put, {"type", "default_put"}), \
F(type_write_del, {"type", "write_del"}), \
F(type_lock_del, {"type", "lock_del"}), \
F(type_pessimistic_lock_del, {"type", "pessimistic_lock_del"}), \
F(type_pessimistic_lock_put, {"type", "pessimistic_lock_put"}), \
F(type_lock_replaced, {"type", "lock_replaced"}), \
F(type_default_del, {"type", "default_del"}), \
F(type_apply_snapshot, {"type", "apply_snapshot"}), \
F(type_apply_snapshot_default, {"type", "apply_snapshot_default"}), \
Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -389,12 +389,15 @@ void RemoveRegionCommitCache(const RegionPtr & region, const RegionDataReadInfoL
{
/// Remove data in region.
auto remover = region->createCommittedRemover(lock_region);
size_t remove_committed_count = 0;
for (const auto & [handle, write_type, commit_ts, value] : data_list_read)
{
std::ignore = write_type;
std::ignore = value;
remover.remove({handle, commit_ts});
// Effectively calls `RegionData::removeDataByWriteIt`.
if(remover.remove({handle, commit_ts})) remove_committed_count++;
}
GET_METRIC(tiflash_raft_process_keys, type_write_remove).Increment(remove_committed_count);
}

// ParseTS parses the ts to (physical,logical).
Expand Down
8 changes: 6 additions & 2 deletions dbms/src/Storages/KVStore/MultiRaft/Persistence.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ bool KVStore::canFlushRegionDataImpl(
const auto current_applied_gap = index > last_compact_log_applied ? index - last_compact_log_applied : 0;

// TODO We will use truncated_index once Proxy/TiKV supports.
// They are always 0 currently.
// When a Region is newly created in TiFlash, last_compact_log_applied is 0, we don't trigger immediately.
if (last_compact_log_applied == 0)
{
Expand All @@ -205,12 +206,15 @@ bool KVStore::canFlushRegionDataImpl(

LOG_DEBUG(
log,
"{} approx mem cache info: rows {}, bytes {}, gap {}/{}",
"{} approx mem cache info: rows {}, bytes {}, gap {}/{}, data_summary: {}, applied_(index): {}/{}",
curr_region.toString(false),
rows,
size_bytes,
current_applied_gap,
gap_threshold);
gap_threshold,
curr_region.getData().summary(),
curr_region.appliedIndex(),
index);

if (can_flush && flush_if_possible)
{
Expand Down
14 changes: 11 additions & 3 deletions dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ std::pair<EngineStoreApplyRes, DM::WriteResult> Region::handleWriteRaftCmd(
};

const auto handle_write_cmd_func = [&]() {
size_t cmd_write_cf_cnt = 0, cache_written_size = 0;
size_t cmd_write_cf_cnt = 0;
auto ori_cache_size = dataSize();
for (UInt64 i = 0; i < cmds.len; ++i)
{
Expand All @@ -452,9 +452,17 @@ std::pair<EngineStoreApplyRes, DM::WriteResult> Region::handleWriteRaftCmd(
handle_by_index_func(i);
}
}
cache_written_size = dataSize() - ori_cache_size;
approx_mem_cache_rows += cmd_write_cf_cnt;
approx_mem_cache_bytes += cache_written_size;
auto current_size = dataSize();
if (current_size > ori_cache_size)
JinheLin marked this conversation as resolved.
Show resolved Hide resolved
approx_mem_cache_bytes += (current_size - ori_cache_size);
else
{
if (approx_mem_cache_bytes > ori_cache_size - current_size)
approx_mem_cache_bytes -= (ori_cache_size - current_size);
else
approx_mem_cache_bytes = 0;
}
CalvinNeo marked this conversation as resolved.
Show resolved Hide resolved
};

DM::WriteResult write_result = std::nullopt;
Expand Down
25 changes: 17 additions & 8 deletions dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ const TiKVValue & RegionCFDataBase<Trait>::getTiKVValue(const Value & val)
template <typename Trait>
RegionDataRes RegionCFDataBase<Trait>::insert(TiKVKey && key, TiKVValue && value, DupCheck mode)
{
const auto & raw_key = RecordKVFormat::decodeTiKVKey(key);
auto kv_pair = Trait::genKVPair(std::move(key), raw_key, std::move(value));
auto raw_key = RecordKVFormat::decodeTiKVKey(key);
auto kv_pair = Trait::genKVPair(std::move(key), std::move(raw_key), std::move(value));
if (!kv_pair)
return 0;

Expand All @@ -63,18 +63,22 @@ RegionDataRes RegionCFDataBase<RegionLockCFDataTrait>::insert(TiKVKey && key, Ti
auto iter = data.find(kv_pair.first);
if (iter != data.end())
{
// Could be a perssimistic lock is overwritten, or a old generation large txn lock is overwritten.
added_size -= calcTiKVKeyValueSize(iter->second);
data.erase(iter);
}
else
{
if unlikely (is_large_txn)

if (decoded->getLockType() == kvrpcpb::Op::PessimisticLock)
{
GET_METRIC(tiflash_raft_process_keys, type_large_txn_lock_put).Increment(1);
GET_METRIC(tiflash_raft_process_keys, type_pessimistic_lock_del).Increment(1);
}
GET_METRIC(tiflash_raft_process_keys, type_lock_replaced).Increment(1);
}
if unlikely (is_large_txn)
{
GET_METRIC(tiflash_raft_process_keys, type_large_txn_lock_put).Increment(1);
}
}
// according to the process of pessimistic lock, just overwrite.
// According to the process of pessimistic lock, just overwrite.
data.emplace(std::move(kv_pair.first), std::move(kv_pair.second));
return added_size;
}
Expand Down Expand Up @@ -179,6 +183,11 @@ size_t RegionCFDataBase<Trait>::remove(const Key & key, bool quiet)
{
GET_METRIC(tiflash_raft_process_keys, type_large_txn_lock_del).Increment(1);
}

if (std::get<2>(value)->getLockType() == kvrpcpb::Op::PessimisticLock)
{
GET_METRIC(tiflash_raft_process_keys, type_pessimistic_lock_del).Increment(1);
}
}
map.erase(it);
return size;
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <Storages/KVStore/Decode/DecodedTiKVKeyValue.h>
#include <Storages/KVStore/MultiRaft/RegionRangeKeys.h>
#include <Common/TiFlashMetrics.h>

#include <map>

Expand Down
9 changes: 6 additions & 3 deletions dbms/src/Storages/KVStore/MultiRaft/RegionCFDataTrait.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,12 @@ struct RegionLockCFDataTrait
{
auto key = std::make_shared<const TiKVKey>(std::move(key_));
auto value = std::make_shared<const TiKVValue>(std::move(value_));
return {
{key, std::string_view(key->data(), key->dataSize())},
Value{key, value, std::make_shared<const DecodedLockCFValue>(key, value)}};
auto lo = std::make_shared<const DecodedLockCFValue>(key, value);
if (lo->getLockType() == kvrpcpb::Op::PessimisticLock)
{
GET_METRIC(tiflash_raft_process_keys, type_pessimistic_lock_put).Increment(1);
}
return {{key, std::string_view(key->data(), key->dataSize())}, Value{key, value, std::move(lo)}};
}
};

Expand Down
43 changes: 32 additions & 11 deletions dbms/src/Storages/KVStore/MultiRaft/RegionData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <Storages/KVStore/FFI/ColumnFamily.h>
#include <Storages/KVStore/MultiRaft/RegionData.h>
#include <Storages/KVStore/Read/RegionLockInfo.h>
#include <Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h>

namespace DB
{
Expand Down Expand Up @@ -222,21 +223,36 @@ DecodedLockCFValuePtr RegionData::getLockInfo(const RegionLockReadQuery & query)
const auto & [tikv_key, tikv_val, lock_info_ptr] = value;
std::ignore = tikv_key;
std::ignore = tikv_val;
const auto & lock_info = *lock_info_ptr;
const auto & lock_info_raw = *lock_info_ptr;

if (lock_info.lock_version > query.read_tso || lock_info.lock_type == kvrpcpb::Op::Lock
|| lock_info.lock_type == kvrpcpb::Op::PessimisticLock)
continue;
if (lock_info.min_commit_ts > query.read_tso)
continue;
if (query.bypass_lock_ts)
{
if (query.bypass_lock_ts->count(lock_info.lock_version))
bool should_continue = false;
lock_info_raw.withInner([&](const RecordKVFormat::DecodedLockCFValue::Inner & in) {
if (in.lock_version > query.read_tso || in.lock_type == kvrpcpb::Op::Lock
|| in.lock_type == kvrpcpb::Op::PessimisticLock)
{
should_continue = true;
return;
}
if (in.min_commit_ts > query.read_tso)
{
GET_METRIC(tiflash_raft_read_index_events_count, type_bypass_lock).Increment();
continue;
should_continue = true;
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bool withInner(std::function<bool(const Inner &)> f) const; may be better?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar, I can move it into the DecodedLockCfValue

}
if (query.bypass_lock_ts)
{
if (query.bypass_lock_ts->count(in.lock_version))
{
GET_METRIC(tiflash_raft_read_index_events_count, type_bypass_lock).Increment();
should_continue = true;
}
}
return;
});
if (should_continue)
{
continue;
}

return lock_info_ptr;
}

Expand Down Expand Up @@ -367,4 +383,9 @@ RegionData & RegionData::operator=(RegionData && rhs)
return *this;
}

String RegionData::summary() const
{
return fmt::format("write:{},lock:{},default:{}", write_cf.getSize(), lock_cf.getSize(), default_cf.getSize());
}

} // namespace DB
3 changes: 2 additions & 1 deletion dbms/src/Storages/KVStore/MultiRaft/RegionData.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ class RegionData
RegionData(RegionData && data);
RegionData & operator=(RegionData &&);

String summary() const;
struct OrphanKeysInfo
{
// Protected by region task lock.
Expand Down Expand Up @@ -127,7 +128,7 @@ class RegionData
RegionLockCFData lock_cf;
OrphanKeysInfo orphan_keys_info;

// Size of data cf & write cf, without lock cf.
// Size of data cf & write cf, with lock cf.
std::atomic<size_t> cf_data_size = 0;
};

Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Storages/KVStore/Read/ReadIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,11 @@ void WaitCheckRegionReadyImpl(
need_retry = false;
LOG_DEBUG(
log,
"neglect error, region_id={} not_found={} epoch_not_match={}",
"neglect error, region_id={} not_found={} epoch_not_match={} region_error={}",
region_id,
region_error.has_region_not_found(),
region_error.has_epoch_not_match());
region_error.has_epoch_not_match(),
region_error.DebugString());
}
if (!need_retry)
{
Expand Down
8 changes: 6 additions & 2 deletions dbms/src/Storages/KVStore/Region.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,14 @@ class Region : public std::enable_shared_from_this<Region>
lock = std::unique_lock<std::shared_mutex>(region_->mutex);
}

void remove(const RegionWriteCFData::Key & key)
bool remove(const RegionWriteCFData::Key & key)
{
auto & write_cf_data = region->data.writeCF().getDataMut();
if (auto it = write_cf_data.find(key); it != write_cf_data.end())
if (auto it = write_cf_data.find(key); it != write_cf_data.end()) {
region->removeDataByWriteIt(it);
return true;
}
return false;
}

private:
Expand Down Expand Up @@ -150,6 +153,7 @@ class Region : public std::enable_shared_from_this<Region>

RegionMeta & mutMeta() { return meta; }
const RegionMeta & getMeta() const { return meta; }
const RegionData & getData() const { return data; }

bool isPendingRemove() const;
void setPendingRemove();
Expand Down
Loading