Skip to content

Commit

Permalink
KVStore: Reduce the memory before and after a large txn is committed (#…
Browse files Browse the repository at this point in the history
…9707)

ref #9722

Avoid store the built DecodedLockCFValue

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
Co-authored-by: jinhelin <[email protected]>
  • Loading branch information
3 people authored Dec 30, 2024
1 parent c807718 commit 08abd71
Show file tree
Hide file tree
Showing 18 changed files with 439 additions and 149 deletions.
3 changes: 3 additions & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -459,10 +459,13 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
"Total number of keys processed in some types of Raft commands", \
Counter, \
F(type_write_put, {"type", "write_put"}), \
F(type_write_remove, {"type", "write_remove"}), \
F(type_lock_put, {"type", "lock_put"}), \
F(type_default_put, {"type", "default_put"}), \
F(type_write_del, {"type", "write_del"}), \
F(type_lock_del, {"type", "lock_del"}), \
F(type_pessimistic_lock_put, {"type", "pessimistic_lock_put"}), \
F(type_lock_replaced, {"type", "lock_replaced"}), \
F(type_default_del, {"type", "default_del"}), \
F(type_apply_snapshot, {"type", "apply_snapshot"}), \
F(type_apply_snapshot_default, {"type", "apply_snapshot_default"}), \
Expand Down
112 changes: 55 additions & 57 deletions dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,68 +273,62 @@ std::variant<RegionDataReadInfoList, RegionException::RegionReadStatus, LockInfo
bool resolve_locks,
bool need_data_value)
{
RegionDataReadInfoList data_list_read;
DecodedLockCFValuePtr lock_value;
{
auto scanner = region->createCommittedScanner(true, need_data_value);
LockInfoPtr lock_info;

/// Some sanity checks for region meta.
{
/**
* special check: when source region is merging, read_index can not guarantee the behavior about target region.
* Reject all read request for safety.
* Only when region is Normal can continue read process.
*/
if (region->peerState() != raft_serverpb::PeerState::Normal)
return RegionException::RegionReadStatus::NOT_FOUND;

const auto & meta_snap = region->dumpRegionMetaSnapshot();
// No need to check conf_version if its peer state is normal
std::ignore = conf_version;
if (meta_snap.ver != region_version)
return RegionException::RegionReadStatus::EPOCH_NOT_MATCH;

// todo check table id
TableID mapped_table_id;
if (!computeMappedTableID(*meta_snap.range->rawKeys().first, mapped_table_id)
|| mapped_table_id != table_id)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Should not happen, region not belong to table, table_id={} expect_table_id={}",
mapped_table_id,
table_id);
}
auto scanner = region->createCommittedScanner(true, need_data_value);

/// Deal with locks.
if (resolve_locks)
{
/// Check if there are any lock should be resolved, if so, throw LockException.
lock_value
= scanner.getLockInfo(RegionLockReadQuery{.read_tso = start_ts, .bypass_lock_ts = bypass_lock_ts});
}
/// Some sanity checks for region meta.
{
/**
* special check: when source region is merging, read_index can not guarantee the behavior about target region.
* Reject all read request for safety.
* Only when region is Normal can continue read process.
*/
if (region->peerState() != raft_serverpb::PeerState::Normal)
return RegionException::RegionReadStatus::NOT_FOUND;

const auto & meta_snap = region->dumpRegionMetaSnapshot();
// No need to check conf_version if its peer state is normal
std::ignore = conf_version;
if (meta_snap.ver != region_version)
return RegionException::RegionReadStatus::EPOCH_NOT_MATCH;

// todo check table id
TableID mapped_table_id;
if (!computeMappedTableID(*meta_snap.range->rawKeys().first, mapped_table_id) || mapped_table_id != table_id)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Should not happen, region not belong to table, table_id={} expect_table_id={}",
mapped_table_id,
table_id);
}

/// If there is no lock, leave scope of region scanner and raise LockException.
/// Read raw KVs from region cache.
if (!lock_value)
{
// Shortcut for empty region.
if (!scanner.hasNext())
return data_list_read;

// If worked with raftstore v2, the final size may not equal to here.
data_list_read.reserve(scanner.writeMapSize());

// Tiny optimization for queries that need only handle, tso, delmark.
do
{
data_list_read.emplace_back(scanner.next());
} while (scanner.hasNext());
}
/// Deal with locks.
if (resolve_locks)
{
/// Check if there are any lock should be resolved, if so, throw LockException.
/// It will iterate all locks with in the time range.
lock_info = scanner.getLockInfo(RegionLockReadQuery{.read_tso = start_ts, .bypass_lock_ts = bypass_lock_ts});
}

if (lock_value)
return lock_value->intoLockInfo();
if (lock_info)
return lock_info;

/// If there is no lock, leave scope of region scanner and raise LockException.
/// Read raw KVs from region cache.
RegionDataReadInfoList data_list_read;
// Shortcut for empty region.
if (!scanner.hasNext())
return data_list_read;

// If worked with raftstore v2, the final size may not equal to here.
data_list_read.reserve(scanner.writeMapSize());

// Tiny optimization for queries that need only handle, tso, delmark.
do
{
data_list_read.emplace_back(scanner.next());
} while (scanner.hasNext());
return data_list_read;
}

Expand Down Expand Up @@ -388,12 +382,16 @@ void RemoveRegionCommitCache(const RegionPtr & region, const RegionDataReadInfoL
{
/// Remove data in region.
auto remover = region->createCommittedRemover(lock_region);
size_t remove_committed_count = 0;
for (const auto & [handle, write_type, commit_ts, value] : data_list_read)
{
std::ignore = write_type;
std::ignore = value;
remover.remove({handle, commit_ts});
// Effectively calls `RegionData::removeDataByWriteIt`.
if (remover.remove({handle, commit_ts}))
remove_committed_count++;
}
GET_METRIC(tiflash_raft_process_keys, type_write_remove).Increment(remove_committed_count);
}

// ParseTS parses the ts to (physical,logical).
Expand Down
10 changes: 7 additions & 3 deletions dbms/src/Storages/KVStore/MultiRaft/Persistence.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ bool KVStore::canFlushRegionDataImpl(
const auto current_applied_gap = index > last_compact_log_applied ? index - last_compact_log_applied : 0;

// TODO We will use truncated_index once Proxy/TiKV supports.
// They are always 0 currently.
// When a Region is newly created in TiFlash, last_compact_log_applied is 0, we don't trigger immediately.
if (last_compact_log_applied == 0)
{
Expand All @@ -205,12 +206,15 @@ bool KVStore::canFlushRegionDataImpl(

LOG_DEBUG(
log,
"{} approx mem cache info: rows {}, bytes {}, gap {}/{}",
"{} approx mem cache info: rows {}, bytes {}, gap {}/{}, data_summary: {}, applied_(index): {}/{}",
curr_region.toString(false),
rows,
size_bytes,
current_applied_gap,
gap_threshold);
gap_threshold,
curr_region.getData().summary(),
curr_region.appliedIndex(),
index);

if (can_flush && flush_if_possible)
{
Expand Down Expand Up @@ -265,4 +269,4 @@ bool KVStore::forceFlushRegionDataImpl(
GET_METRIC(tiflash_raft_apply_write_command_duration_seconds, type_flush_region).Observe(watch.elapsedSeconds());
return true;
}
} // namespace DB
} // namespace DB
11 changes: 7 additions & 4 deletions dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ std::pair<EngineStoreApplyRes, DM::WriteResult> Region::handleWriteRaftCmd(
};

const auto handle_write_cmd_func = [&]() {
size_t cmd_write_cf_cnt = 0, cache_written_size = 0;
size_t cmd_write_cf_cnt = 0;
auto ori_cache_size = dataSize();
for (UInt64 i = 0; i < cmds.len; ++i)
{
Expand All @@ -452,9 +452,12 @@ std::pair<EngineStoreApplyRes, DM::WriteResult> Region::handleWriteRaftCmd(
handle_by_index_func(i);
}
}
cache_written_size = dataSize() - ori_cache_size;
approx_mem_cache_rows += cmd_write_cf_cnt;
approx_mem_cache_bytes += cache_written_size;
auto current_size = dataSize();
if (auto t = approx_mem_cache_rows + current_size; t > ori_cache_size)
approx_mem_cache_rows = t - ori_cache_size;
else
approx_mem_cache_rows = 0;
};

DM::WriteResult write_result = std::nullopt;
Expand Down Expand Up @@ -523,4 +526,4 @@ RegionRaftCommandDelegate & Region::makeRaftCommandDelegate(const KVStoreTaskLoc
std::ignore = lock;
return static_cast<RegionRaftCommandDelegate &>(*this);
}
} // namespace DB
} // namespace DB
17 changes: 9 additions & 8 deletions dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ const TiKVValue & RegionCFDataBase<Trait>::getTiKVValue(const Value & val)
template <typename Trait>
RegionDataRes RegionCFDataBase<Trait>::insert(TiKVKey && key, TiKVValue && value, DupCheck mode)
{
const auto & raw_key = RecordKVFormat::decodeTiKVKey(key);
auto kv_pair = Trait::genKVPair(std::move(key), raw_key, std::move(value));
auto raw_key = RecordKVFormat::decodeTiKVKey(key);
auto kv_pair = Trait::genKVPair(std::move(key), std::move(raw_key), std::move(value));
if (!kv_pair)
return 0;

Expand All @@ -63,18 +63,19 @@ RegionDataRes RegionCFDataBase<RegionLockCFDataTrait>::insert(TiKVKey && key, Ti
auto iter = data.find(kv_pair.first);
if (iter != data.end())
{
// Could be a perssimistic lock is overwritten, or a old generation large txn lock is overwritten.
added_size -= calcTiKVKeyValueSize(iter->second);
data.erase(iter);

// In most cases, an optimistic lock replace a pessimistic lock.
GET_METRIC(tiflash_raft_process_keys, type_lock_replaced).Increment(1);
}
else
if unlikely (is_large_txn)
{
if unlikely (is_large_txn)
{
GET_METRIC(tiflash_raft_process_keys, type_large_txn_lock_put).Increment(1);
}
GET_METRIC(tiflash_raft_process_keys, type_large_txn_lock_put).Increment(1);
}
}
// according to the process of pessimistic lock, just overwrite.
// According to the process of pessimistic lock, just overwrite.
data.emplace(std::move(kv_pair.first), std::move(kv_pair.second));
return added_size;
}
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <Common/TiFlashMetrics.h>
#include <Storages/KVStore/Decode/DecodedTiKVKeyValue.h>
#include <Storages/KVStore/MultiRaft/RegionRangeKeys.h>

Expand Down
5 changes: 2 additions & 3 deletions dbms/src/Storages/KVStore/MultiRaft/RegionCFDataTrait.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,8 @@ struct RegionLockCFDataTrait
{
auto key = std::make_shared<const TiKVKey>(std::move(key_));
auto value = std::make_shared<const TiKVValue>(std::move(value_));
return {
{key, std::string_view(key->data(), key->dataSize())},
Value{key, value, std::make_shared<const DecodedLockCFValue>(key, value)}};
auto lo = std::make_shared<const DecodedLockCFValue>(key, value);
return {{key, std::string_view(key->data(), key->dataSize())}, Value{key, value, std::move(lo)}};
}
};

Expand Down
26 changes: 11 additions & 15 deletions dbms/src/Storages/KVStore/MultiRaft/RegionData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <Storages/KVStore/FFI/ColumnFamily.h>
#include <Storages/KVStore/MultiRaft/RegionData.h>
#include <Storages/KVStore/Read/RegionLockInfo.h>
#include <Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h>

namespace DB
{
Expand Down Expand Up @@ -213,7 +214,7 @@ std::optional<RegionDataReadInfo> RegionData::readDataByWriteIt(
return RegionDataReadInfo{pk, decoded_val.write_type, ts, decoded_val.short_value};
}

DecodedLockCFValuePtr RegionData::getLockInfo(const RegionLockReadQuery & query) const
LockInfoPtr RegionData::getLockInfo(const RegionLockReadQuery & query) const
{
for (const auto & [pk, value] : lock_cf.getData())
{
Expand All @@ -222,22 +223,12 @@ DecodedLockCFValuePtr RegionData::getLockInfo(const RegionLockReadQuery & query)
const auto & [tikv_key, tikv_val, lock_info_ptr] = value;
std::ignore = tikv_key;
std::ignore = tikv_val;
const auto & lock_info = *lock_info_ptr;

if (lock_info.lock_version > query.read_tso || lock_info.lock_type == kvrpcpb::Op::Lock
|| lock_info.lock_type == kvrpcpb::Op::PessimisticLock)
continue;
if (lock_info.min_commit_ts > query.read_tso)
continue;
if (query.bypass_lock_ts)
const auto & lock_info_raw = *lock_info_ptr;

if (auto t = lock_info_raw.getLockInfoPtr(query); t != nullptr)
{
if (query.bypass_lock_ts->count(lock_info.lock_version))
{
GET_METRIC(tiflash_raft_read_index_events_count, type_bypass_lock).Increment();
continue;
}
return t;
}
return lock_info_ptr;
}

return nullptr;
Expand Down Expand Up @@ -367,4 +358,9 @@ RegionData & RegionData::operator=(RegionData && rhs)
return *this;
}

String RegionData::summary() const
{
return fmt::format("write:{},lock:{},default:{}", write_cf.getSize(), lock_cf.getSize(), default_cf.getSize());
}

} // namespace DB
6 changes: 4 additions & 2 deletions dbms/src/Storages/KVStore/MultiRaft/RegionData.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class RegionData
public:
using WriteCFIter = RegionWriteCFData::Map::iterator;
using ConstWriteCFIter = RegionWriteCFData::Map::const_iterator;
using LockInfoPtr = std::unique_ptr<kvrpcpb::LockInfo>;

static void reportAlloc(size_t delta);
static void reportDealloc(size_t delta);
Expand All @@ -53,7 +54,7 @@ class RegionData
UInt64 applied,
bool hard_error);

DecodedLockCFValuePtr getLockInfo(const RegionLockReadQuery & query) const;
LockInfoPtr getLockInfo(const RegionLockReadQuery & query) const;

std::shared_ptr<const TiKVValue> getLockByKey(const TiKVKey & key) const;

Expand Down Expand Up @@ -84,6 +85,7 @@ class RegionData
RegionData(RegionData && data);
RegionData & operator=(RegionData &&);

String summary() const;
struct OrphanKeysInfo
{
// Protected by region task lock.
Expand Down Expand Up @@ -127,7 +129,7 @@ class RegionData
RegionLockCFData lock_cf;
OrphanKeysInfo orphan_keys_info;

// Size of data cf & write cf, without lock cf.
// Size of data cf & write cf, with lock cf.
std::atomic<size_t> cf_data_size = 0;
};

Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Storages/KVStore/Read/ReadIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,11 @@ void WaitCheckRegionReadyImpl(
need_retry = false;
LOG_DEBUG(
log,
"neglect error, region_id={} not_found={} epoch_not_match={}",
"neglect error, region_id={} not_found={} epoch_not_match={} region_error={}",
region_id,
region_error.has_region_not_found(),
region_error.has_epoch_not_match());
region_error.has_epoch_not_match(),
region_error.DebugString());
}
if (!need_retry)
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/KVStore/Region.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ std::optional<RegionDataReadInfo> Region::readDataByWriteIt(
}
}

DecodedLockCFValuePtr Region::getLockInfo(const RegionLockReadQuery & query) const
LockInfoPtr Region::getLockInfo(const RegionLockReadQuery & query) const
{
return data.getLockInfo(query);
}
Expand Down
Loading

0 comments on commit 08abd71

Please sign in to comment.