From 4347a5c9738b4b1ed3a913c5ada628b571484a0c Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Fri, 6 Dec 2024 17:39:48 +0800 Subject: [PATCH 01/19] add some debug Signed-off-by: Calvin Neo --- dbms/src/Common/TiFlashMetrics.h | 4 ++++ .../KVStore/Decode/PartitionStreams.cpp | 4 +++- .../KVStore/MultiRaft/RegionCFDataBase.cpp | 19 +++++++++++++------ .../KVStore/MultiRaft/RegionCFDataBase.h | 1 + .../KVStore/MultiRaft/RegionCFDataTrait.h | 6 +++++- dbms/src/Storages/KVStore/Region.h | 7 +++++-- 6 files changed, 31 insertions(+), 10 deletions(-) diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 29a9c81001c..49fede89ef1 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -459,10 +459,14 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva "Total number of keys processed in some types of Raft commands", \ Counter, \ F(type_write_put, {"type", "write_put"}), \ + F(type_write_remove, {"type", "write_remove"}), \ F(type_lock_put, {"type", "lock_put"}), \ F(type_default_put, {"type", "default_put"}), \ F(type_write_del, {"type", "write_del"}), \ F(type_lock_del, {"type", "lock_del"}), \ + F(type_pessimistic_lock_del, {"type", "pessimistic_lock_del"}), \ + F(type_pessimistic_lock_put, {"type", "pessimistic_lock_put"}), \ + F(type_pessimistic_lock_replaced, {"type", "pessimistic_lock_replaced"}), \ F(type_default_del, {"type", "default_del"}), \ F(type_apply_snapshot, {"type", "apply_snapshot"}), \ F(type_apply_snapshot_default, {"type", "apply_snapshot_default"}), \ diff --git a/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp b/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp index 0e2e7076cdf..7d3dafedac1 100644 --- a/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp +++ b/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp @@ -389,12 +389,14 @@ void RemoveRegionCommitCache(const RegionPtr & region, const RegionDataReadInfoL { /// Remove data in region. auto remover = region->createCommittedRemover(lock_region); + size_t remove_committed_count = 0; for (const auto & [handle, write_type, commit_ts, value] : data_list_read) { std::ignore = write_type; std::ignore = value; - remover.remove({handle, commit_ts}); + if(remover.remove({handle, commit_ts})) remove_committed_count++; } + GET_METRIC(tiflash_raft_process_keys, type_write_remove).Increment(remove_committed_count); } // ParseTS parses the ts to (physical,logical). diff --git a/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.cpp b/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.cpp index 94de1762a41..1b94a5f098e 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.cpp @@ -63,18 +63,21 @@ RegionDataRes RegionCFDataBase::insert(TiKVKey && key, Ti auto iter = data.find(kv_pair.first); if (iter != data.end()) { + // Could be a perssimistic lock is overwritten, or a old generation large txn lock is overwritten. added_size -= calcTiKVKeyValueSize(iter->second); data.erase(iter); + + if (decoded->lock_type == kvrpcpb::Op::PessimisticLock) { + GET_METRIC(tiflash_raft_process_keys, type_pessimistic_lock_del).Increment(1); + } + GET_METRIC(tiflash_raft_process_keys, type_pessimistic_lock_replaced).Increment(1); } - else + if unlikely (is_large_txn) { - if unlikely (is_large_txn) - { - GET_METRIC(tiflash_raft_process_keys, type_large_txn_lock_put).Increment(1); - } + GET_METRIC(tiflash_raft_process_keys, type_large_txn_lock_put).Increment(1); } } - // according to the process of pessimistic lock, just overwrite. + // According to the process of pessimistic lock, just overwrite. data.emplace(std::move(kv_pair.first), std::move(kv_pair.second)); return added_size; } @@ -179,6 +182,10 @@ size_t RegionCFDataBase::remove(const Key & key, bool quiet) { GET_METRIC(tiflash_raft_process_keys, type_large_txn_lock_del).Increment(1); } + + if (std::get<2>(value)->lock_type == kvrpcpb::Op::PessimisticLock) { + GET_METRIC(tiflash_raft_process_keys, type_pessimistic_lock_del).Increment(1); + } } map.erase(it); return size; diff --git a/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.h b/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.h index 0363bbf6a76..cc3fe3037ab 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.h +++ b/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.h @@ -16,6 +16,7 @@ #include #include +#include #include diff --git a/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataTrait.h b/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataTrait.h index 8206abae492..b7057f7fe07 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataTrait.h +++ b/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataTrait.h @@ -110,9 +110,13 @@ struct RegionLockCFDataTrait { auto key = std::make_shared(std::move(key_)); auto value = std::make_shared(std::move(value_)); + auto lo = std::make_shared(key, value); + if (lo->lock_type == kvrpcpb::Op::PessimisticLock) { + GET_METRIC(tiflash_raft_process_keys, type_pessimistic_lock_put).Increment(1); + } return { {key, std::string_view(key->data(), key->dataSize())}, - Value{key, value, std::make_shared(key, value)}}; + Value{key, value, std::move(lo)}}; } }; diff --git a/dbms/src/Storages/KVStore/Region.h b/dbms/src/Storages/KVStore/Region.h index f5afabc6d8a..5bcd66b5985 100644 --- a/dbms/src/Storages/KVStore/Region.h +++ b/dbms/src/Storages/KVStore/Region.h @@ -111,11 +111,14 @@ class Region : public std::enable_shared_from_this lock = std::unique_lock(region_->mutex); } - void remove(const RegionWriteCFData::Key & key) + bool remove(const RegionWriteCFData::Key & key) { auto & write_cf_data = region->data.writeCF().getDataMut(); - if (auto it = write_cf_data.find(key); it != write_cf_data.end()) + if (auto it = write_cf_data.find(key); it != write_cf_data.end()) { region->removeDataByWriteIt(it); + return true; + } + return false; } private: From b5a7e1d93771cfd2c217a92b08b97fdab57490ad Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Tue, 10 Dec 2024 20:23:03 +0800 Subject: [PATCH 02/19] some debug funcs Signed-off-by: Calvin Neo --- dbms/src/Common/TiFlashMetrics.h | 2 +- .../Storages/KVStore/Decode/PartitionStreams.cpp | 1 + .../src/Storages/KVStore/MultiRaft/Persistence.cpp | 8 ++++++-- .../Storages/KVStore/MultiRaft/RaftCommands.cpp | 14 +++++++++++--- .../KVStore/MultiRaft/RegionCFDataBase.cpp | 6 +++--- dbms/src/Storages/KVStore/MultiRaft/RegionData.cpp | 4 ++++ dbms/src/Storages/KVStore/MultiRaft/RegionData.h | 3 ++- dbms/src/Storages/KVStore/Read/ReadIndex.cpp | 5 +++-- dbms/src/Storages/KVStore/Region.h | 1 + .../Storages/KVStore/tests/gtest_new_kvstore.cpp | 4 ++++ 10 files changed, 36 insertions(+), 12 deletions(-) diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 49fede89ef1..6858f5450e5 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -466,7 +466,7 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva F(type_lock_del, {"type", "lock_del"}), \ F(type_pessimistic_lock_del, {"type", "pessimistic_lock_del"}), \ F(type_pessimistic_lock_put, {"type", "pessimistic_lock_put"}), \ - F(type_pessimistic_lock_replaced, {"type", "pessimistic_lock_replaced"}), \ + F(type_lock_replaced, {"type", "lock_replaced"}), \ F(type_default_del, {"type", "default_del"}), \ F(type_apply_snapshot, {"type", "apply_snapshot"}), \ F(type_apply_snapshot_default, {"type", "apply_snapshot_default"}), \ diff --git a/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp b/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp index 7d3dafedac1..4ab172c88a3 100644 --- a/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp +++ b/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp @@ -394,6 +394,7 @@ void RemoveRegionCommitCache(const RegionPtr & region, const RegionDataReadInfoL { std::ignore = write_type; std::ignore = value; + // Effectively calls `RegionData::removeDataByWriteIt`. if(remover.remove({handle, commit_ts})) remove_committed_count++; } GET_METRIC(tiflash_raft_process_keys, type_write_remove).Increment(remove_committed_count); diff --git a/dbms/src/Storages/KVStore/MultiRaft/Persistence.cpp b/dbms/src/Storages/KVStore/MultiRaft/Persistence.cpp index b34d863d88e..6e678de78a1 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/Persistence.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/Persistence.cpp @@ -191,6 +191,7 @@ bool KVStore::canFlushRegionDataImpl( const auto current_applied_gap = index > last_compact_log_applied ? index - last_compact_log_applied : 0; // TODO We will use truncated_index once Proxy/TiKV supports. + // They are always 0 currently. // When a Region is newly created in TiFlash, last_compact_log_applied is 0, we don't trigger immediately. if (last_compact_log_applied == 0) { @@ -205,12 +206,15 @@ bool KVStore::canFlushRegionDataImpl( LOG_DEBUG( log, - "{} approx mem cache info: rows {}, bytes {}, gap {}/{}", + "{} approx mem cache info: rows {}, bytes {}, gap {}/{}, data_summary: {}, applied_(index): {}/{}", curr_region.toString(false), rows, size_bytes, current_applied_gap, - gap_threshold); + gap_threshold, + curr_region.getData().summary(), + curr_region.appliedIndex(), + index); if (can_flush && flush_if_possible) { diff --git a/dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp b/dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp index c8b9fd89c86..d2c024e996f 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp @@ -434,7 +434,7 @@ std::pair Region::handleWriteRaftCmd( }; const auto handle_write_cmd_func = [&]() { - size_t cmd_write_cf_cnt = 0, cache_written_size = 0; + size_t cmd_write_cf_cnt = 0; auto ori_cache_size = dataSize(); for (UInt64 i = 0; i < cmds.len; ++i) { @@ -452,9 +452,17 @@ std::pair Region::handleWriteRaftCmd( handle_by_index_func(i); } } - cache_written_size = dataSize() - ori_cache_size; approx_mem_cache_rows += cmd_write_cf_cnt; - approx_mem_cache_bytes += cache_written_size; + auto current_size = dataSize(); + if (current_size > ori_cache_size) + approx_mem_cache_bytes += (current_size - ori_cache_size); + else + { + if (approx_mem_cache_bytes > ori_cache_size - current_size) + approx_mem_cache_bytes -= (ori_cache_size - current_size); + else + approx_mem_cache_bytes = 0; + } }; DM::WriteResult write_result = std::nullopt; diff --git a/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.cpp b/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.cpp index 1b94a5f098e..7bcde79577e 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.cpp @@ -43,8 +43,8 @@ const TiKVValue & RegionCFDataBase::getTiKVValue(const Value & val) template RegionDataRes RegionCFDataBase::insert(TiKVKey && key, TiKVValue && value, DupCheck mode) { - const auto & raw_key = RecordKVFormat::decodeTiKVKey(key); - auto kv_pair = Trait::genKVPair(std::move(key), raw_key, std::move(value)); + auto raw_key = RecordKVFormat::decodeTiKVKey(key); + auto kv_pair = Trait::genKVPair(std::move(key), std::move(raw_key), std::move(value)); if (!kv_pair) return 0; @@ -70,7 +70,7 @@ RegionDataRes RegionCFDataBase::insert(TiKVKey && key, Ti if (decoded->lock_type == kvrpcpb::Op::PessimisticLock) { GET_METRIC(tiflash_raft_process_keys, type_pessimistic_lock_del).Increment(1); } - GET_METRIC(tiflash_raft_process_keys, type_pessimistic_lock_replaced).Increment(1); + GET_METRIC(tiflash_raft_process_keys, type_lock_replaced).Increment(1); } if unlikely (is_large_txn) { diff --git a/dbms/src/Storages/KVStore/MultiRaft/RegionData.cpp b/dbms/src/Storages/KVStore/MultiRaft/RegionData.cpp index 66acb0bac07..5df84b739eb 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RegionData.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/RegionData.cpp @@ -367,4 +367,8 @@ RegionData & RegionData::operator=(RegionData && rhs) return *this; } +String RegionData::summary() const { + return fmt::format("write:{},lock:{},default:{}", write_cf.getSize(), lock_cf.getSize(), default_cf.getSize()); +} + } // namespace DB diff --git a/dbms/src/Storages/KVStore/MultiRaft/RegionData.h b/dbms/src/Storages/KVStore/MultiRaft/RegionData.h index 32f70d5d3f4..753c4d96aa8 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RegionData.h +++ b/dbms/src/Storages/KVStore/MultiRaft/RegionData.h @@ -84,6 +84,7 @@ class RegionData RegionData(RegionData && data); RegionData & operator=(RegionData &&); + String summary() const; struct OrphanKeysInfo { // Protected by region task lock. @@ -127,7 +128,7 @@ class RegionData RegionLockCFData lock_cf; OrphanKeysInfo orphan_keys_info; - // Size of data cf & write cf, without lock cf. + // Size of data cf & write cf, with lock cf. std::atomic cf_data_size = 0; }; diff --git a/dbms/src/Storages/KVStore/Read/ReadIndex.cpp b/dbms/src/Storages/KVStore/Read/ReadIndex.cpp index dd9721e5a1b..8b0918228ed 100644 --- a/dbms/src/Storages/KVStore/Read/ReadIndex.cpp +++ b/dbms/src/Storages/KVStore/Read/ReadIndex.cpp @@ -183,10 +183,11 @@ void WaitCheckRegionReadyImpl( need_retry = false; LOG_DEBUG( log, - "neglect error, region_id={} not_found={} epoch_not_match={}", + "neglect error, region_id={} not_found={} epoch_not_match={} region_error={}", region_id, region_error.has_region_not_found(), - region_error.has_epoch_not_match()); + region_error.has_epoch_not_match(), + region_error.DebugString()); } if (!need_retry) { diff --git a/dbms/src/Storages/KVStore/Region.h b/dbms/src/Storages/KVStore/Region.h index 5bcd66b5985..958f7654264 100644 --- a/dbms/src/Storages/KVStore/Region.h +++ b/dbms/src/Storages/KVStore/Region.h @@ -153,6 +153,7 @@ class Region : public std::enable_shared_from_this RegionMeta & mutMeta() { return meta; } const RegionMeta & getMeta() const { return meta; } + const RegionData & getData() const { return data; } bool isPendingRemove() const; void setPendingRemove(); diff --git a/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp b/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp index e9b126df194..f3f552ea2ce 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp @@ -1240,4 +1240,8 @@ try } CATCH +TEST(PrintSize, XXX) { + LOG_INFO(DB::Logger::get(), "!!!! F {}", std::to_string(sizeof(RecordKVFormat::DecodedLockCFValue))); +} + } // namespace DB::tests From a04ad2d1f954e3628a0281db41ac12ead466f7b1 Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Tue, 10 Dec 2024 21:03:25 +0800 Subject: [PATCH 03/19] private some Signed-off-by: Calvin Neo --- .../KVStore/MultiRaft/RegionCFDataBase.cpp | 6 ++-- .../KVStore/MultiRaft/RegionCFDataTrait.h | 7 ++-- .../Storages/KVStore/MultiRaft/RegionData.cpp | 11 +++--- .../KVStore/TiKVHelpers/DecodedLockCFValue.h | 18 ++++++++-- .../KVStore/tests/gtest_tikv_keyvalue.cpp | 34 +++++++++---------- 5 files changed, 46 insertions(+), 30 deletions(-) diff --git a/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.cpp b/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.cpp index 7bcde79577e..3a31d0de888 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.cpp @@ -67,7 +67,8 @@ RegionDataRes RegionCFDataBase::insert(TiKVKey && key, Ti added_size -= calcTiKVKeyValueSize(iter->second); data.erase(iter); - if (decoded->lock_type == kvrpcpb::Op::PessimisticLock) { + if (decoded->getLockType() == kvrpcpb::Op::PessimisticLock) + { GET_METRIC(tiflash_raft_process_keys, type_pessimistic_lock_del).Increment(1); } GET_METRIC(tiflash_raft_process_keys, type_lock_replaced).Increment(1); @@ -183,7 +184,8 @@ size_t RegionCFDataBase::remove(const Key & key, bool quiet) GET_METRIC(tiflash_raft_process_keys, type_large_txn_lock_del).Increment(1); } - if (std::get<2>(value)->lock_type == kvrpcpb::Op::PessimisticLock) { + if (std::get<2>(value)->getLockType() == kvrpcpb::Op::PessimisticLock) + { GET_METRIC(tiflash_raft_process_keys, type_pessimistic_lock_del).Increment(1); } } diff --git a/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataTrait.h b/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataTrait.h index b7057f7fe07..ab7e2b5555b 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataTrait.h +++ b/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataTrait.h @@ -111,12 +111,11 @@ struct RegionLockCFDataTrait auto key = std::make_shared(std::move(key_)); auto value = std::make_shared(std::move(value_)); auto lo = std::make_shared(key, value); - if (lo->lock_type == kvrpcpb::Op::PessimisticLock) { + if (lo->getLockType() == kvrpcpb::Op::PessimisticLock) + { GET_METRIC(tiflash_raft_process_keys, type_pessimistic_lock_put).Increment(1); } - return { - {key, std::string_view(key->data(), key->dataSize())}, - Value{key, value, std::move(lo)}}; + return {{key, std::string_view(key->data(), key->dataSize())}, Value{key, value, std::move(lo)}}; } }; diff --git a/dbms/src/Storages/KVStore/MultiRaft/RegionData.cpp b/dbms/src/Storages/KVStore/MultiRaft/RegionData.cpp index 5df84b739eb..6d7a1094741 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RegionData.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/RegionData.cpp @@ -224,14 +224,14 @@ DecodedLockCFValuePtr RegionData::getLockInfo(const RegionLockReadQuery & query) std::ignore = tikv_val; const auto & lock_info = *lock_info_ptr; - if (lock_info.lock_version > query.read_tso || lock_info.lock_type == kvrpcpb::Op::Lock - || lock_info.lock_type == kvrpcpb::Op::PessimisticLock) + if (lock_info.getLockVersion() > query.read_tso || lock_info.getLockType() == kvrpcpb::Op::Lock + || lock_info.getLockType() == kvrpcpb::Op::PessimisticLock) continue; - if (lock_info.min_commit_ts > query.read_tso) + if (lock_info.getMinCommitTs() > query.read_tso) continue; if (query.bypass_lock_ts) { - if (query.bypass_lock_ts->count(lock_info.lock_version)) + if (query.bypass_lock_ts->count(lock_info.getLockVersion())) { GET_METRIC(tiflash_raft_read_index_events_count, type_bypass_lock).Increment(); continue; @@ -367,7 +367,8 @@ RegionData & RegionData::operator=(RegionData && rhs) return *this; } -String RegionData::summary() const { +String RegionData::summary() const +{ return fmt::format("write:{},lock:{},default:{}", write_cf.getSize(), lock_cf.getSize(), default_cf.getSize()); } diff --git a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h index 56538e1854d..8c7bac5e2c9 100644 --- a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h +++ b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h @@ -27,19 +27,33 @@ struct DecodedLockCFValue : boost::noncopyable std::unique_ptr intoLockInfo() const; void intoLockInfo(kvrpcpb::LockInfo &) const; bool isLargeTxn() const; + UInt64 getLockVersion() const { return lock_version; } + UInt64 getLockTtl() const { return lock_ttl; } + UInt64 getTxnSize() const { return txn_size; } + UInt64 getLockForUpdateTs() const { return lock_for_update_ts; } + kvrpcpb::Op getLockType() const { return lock_type; } + bool getUseAsyncCommit() const { return use_async_commit; } + UInt64 getMinCommitTs() const { return min_commit_ts; } + const std::string_view & getSecondaries() const { return secondaries; } + const std::string_view & getPrimaryLock() const { return primary_lock; } + bool getIsTxnFile() const { return is_txn_file; } + bool getGeneration() const { return generation; } std::shared_ptr key; std::shared_ptr val; + +private: + friend void decodeLockCfValue(DecodedLockCFValue & res); UInt64 lock_version{0}; UInt64 lock_ttl{0}; UInt64 txn_size{0}; UInt64 lock_for_update_ts{0}; kvrpcpb::Op lock_type{kvrpcpb::Op_MIN}; - bool use_async_commit{0}; + bool use_async_commit{false}; UInt64 min_commit_ts{0}; std::string_view secondaries; std::string_view primary_lock; - bool is_txn_file{0}; + bool is_txn_file{false}; // For large txn, generation is not zero. UInt64 generation{0}; }; diff --git a/dbms/src/Storages/KVStore/tests/gtest_tikv_keyvalue.cpp b/dbms/src/Storages/KVStore/tests/gtest_tikv_keyvalue.cpp index 18282797c62..76289ef1112 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_tikv_keyvalue.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_tikv_keyvalue.cpp @@ -149,16 +149,16 @@ TEST(TiKVKeyValueTest, PortedTests) auto lock = RecordKVFormat::DecodedLockCFValue(ori_key, std::make_shared(std::move(lock_value))); { auto & lock_info = lock; - ASSERT_TRUE(kvrpcpb::Op::Del == lock_info.lock_type); - ASSERT_TRUE("primary key" == lock_info.primary_lock); - ASSERT_TRUE(421321 == lock_info.lock_version); - ASSERT_TRUE(std::numeric_limits::max() == lock_info.lock_ttl); - ASSERT_TRUE(66666 == lock_info.min_commit_ts); + ASSERT_TRUE(kvrpcpb::Op::Del == lock_info.getLockType()); + ASSERT_TRUE("primary key" == lock_info.getPrimaryLock()); + ASSERT_TRUE(421321 == lock_info.getLockVersion()); + ASSERT_TRUE(std::numeric_limits::max() == lock_info.getLockTtl()); + ASSERT_TRUE(66666 == lock_info.getMinCommitTs()); ASSERT_TRUE(ori_key == lock_info.key); - ASSERT_EQ(lock_for_update_ts, lock_info.lock_for_update_ts); - ASSERT_EQ(txn_size, lock_info.txn_size); + ASSERT_EQ(lock_for_update_ts, lock_info.getLockForUpdateTs()); + ASSERT_EQ(txn_size, lock_info.getTxnSize()); - ASSERT_EQ(true, lock_info.use_async_commit); + ASSERT_EQ(true, lock_info.getUseAsyncCommit()); } { auto lock_info = lock.intoLockInfo(); @@ -212,7 +212,7 @@ TEST(TiKVKeyValueTest, PortedTests) std::get<2>(d.getData() .find(RegionLockCFDataTrait::Key{nullptr, std::string_view(k2.data(), k2.dataSize())}) ->second) - ->lock_version + ->getLockVersion() == 5678); d.remove(RegionLockCFDataTrait::Key{nullptr, std::string_view(k1.data(), k1.dataSize())}, true); ASSERT_TRUE(d.getSize() == 1); @@ -481,16 +481,16 @@ try // check the parsed result { auto & lock_info = lock; - ASSERT_TRUE(kvrpcpb::Op::Del == lock_info.lock_type); - ASSERT_TRUE("primary key" == lock_info.primary_lock); - ASSERT_TRUE(421321 == lock_info.lock_version); - ASSERT_TRUE(std::numeric_limits::max() == lock_info.lock_ttl); - ASSERT_TRUE(66666 == lock_info.min_commit_ts); + ASSERT_TRUE(kvrpcpb::Op::Del == lock_info.getLockType()); + ASSERT_TRUE("primary key" == lock_info.getPrimaryLock()); + ASSERT_TRUE(421321 == lock_info.getLockVersion()); + ASSERT_TRUE(std::numeric_limits::max() == lock_info.getLockTtl()); + ASSERT_TRUE(66666 == lock_info.getMinCommitTs()); ASSERT_TRUE(ori_key == lock_info.key); - ASSERT_EQ(lock_for_update_ts, lock_info.lock_for_update_ts); - ASSERT_EQ(txn_size, lock_info.txn_size); + ASSERT_EQ(lock_for_update_ts, lock_info.getLockForUpdateTs()); + ASSERT_EQ(txn_size, lock_info.getTxnSize()); - ASSERT_EQ(true, lock_info.use_async_commit); + ASSERT_EQ(true, lock_info.getUseAsyncCommit()); } } CATCH From c8cffe4ca2f1d3a7ac7e38452eead65660bd438c Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Tue, 10 Dec 2024 22:35:55 +0800 Subject: [PATCH 04/19] improve performance Signed-off-by: Calvin Neo --- .../Storages/KVStore/MultiRaft/RegionData.cpp | 38 ++++-- .../TiKVHelpers/DecodedLockCFValue.cpp | 125 ++++++++++++++---- .../KVStore/TiKVHelpers/DecodedLockCFValue.h | 53 ++++---- .../KVStore/tests/gtest_tikv_keyvalue.cpp | 25 +++- 4 files changed, 179 insertions(+), 62 deletions(-) diff --git a/dbms/src/Storages/KVStore/MultiRaft/RegionData.cpp b/dbms/src/Storages/KVStore/MultiRaft/RegionData.cpp index 6d7a1094741..cd70d0ef766 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RegionData.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/RegionData.cpp @@ -18,6 +18,7 @@ #include #include #include +#include namespace DB { @@ -222,21 +223,36 @@ DecodedLockCFValuePtr RegionData::getLockInfo(const RegionLockReadQuery & query) const auto & [tikv_key, tikv_val, lock_info_ptr] = value; std::ignore = tikv_key; std::ignore = tikv_val; - const auto & lock_info = *lock_info_ptr; + const auto & lock_info_raw = *lock_info_ptr; - if (lock_info.getLockVersion() > query.read_tso || lock_info.getLockType() == kvrpcpb::Op::Lock - || lock_info.getLockType() == kvrpcpb::Op::PessimisticLock) - continue; - if (lock_info.getMinCommitTs() > query.read_tso) - continue; - if (query.bypass_lock_ts) - { - if (query.bypass_lock_ts->count(lock_info.getLockVersion())) + bool should_continue = false; + lock_info_raw.withInner([&](const RecordKVFormat::DecodedLockCFValue::Inner & in) { + if (in.lock_version > query.read_tso || in.lock_type == kvrpcpb::Op::Lock + || in.lock_type == kvrpcpb::Op::PessimisticLock) + { + should_continue = true; + return; + } + if (in.min_commit_ts > query.read_tso) + { + should_continue = true; + return; + } + if (query.bypass_lock_ts) { - GET_METRIC(tiflash_raft_read_index_events_count, type_bypass_lock).Increment(); - continue; + if (query.bypass_lock_ts->count(in.lock_version)) + { + GET_METRIC(tiflash_raft_read_index_events_count, type_bypass_lock).Increment(); + should_continue = true; + } } + return; + }); + if (should_continue) + { + continue; } + return lock_info_ptr; } diff --git a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp index f90fdf61299..053f4d483f0 100644 --- a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp +++ b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp @@ -23,9 +23,11 @@ namespace RecordKVFormat { // https://github.com/tikv/tikv/blob/master/components/txn_types/src/lock.rs -inline void decodeLockCfValue(DecodedLockCFValue & res) +[[nodiscard]] DecodedLockCFValue::Inner * decodeLockCfValue(const DecodedLockCFValue & decoded) { - const TiKVValue & value = *res.val; + auto * inner = new DecodedLockCFValue::Inner{}; + auto & res = *inner; + const TiKVValue & value = *decoded.val; const char * data = value.data(); size_t len = value.dataSize(); @@ -146,38 +148,54 @@ inline void decodeLockCfValue(DecodedLockCFValue & res) } if (len != 0) throw Exception("invalid lock value " + value.toDebugString(), ErrorCodes::LOGICAL_ERROR); + return inner; } DecodedLockCFValue::DecodedLockCFValue(std::shared_ptr key_, std::shared_ptr val_) : key(std::move(key_)) , val(std::move(val_)) { - decodeLockCfValue(*this); + // TODO do not cache when meets large txn, otherwise, still cache + // decodeLockCfValue(*this); } -void DecodedLockCFValue::intoLockInfo(kvrpcpb::LockInfo & res) const +void DecodedLockCFValue::withInner(std::function f) const { - res.set_lock_type(lock_type); - res.set_primary_lock(primary_lock.data(), primary_lock.size()); - res.set_lock_version(lock_version); - res.set_lock_ttl(lock_ttl); - res.set_min_commit_ts(min_commit_ts); - res.set_lock_for_update_ts(lock_for_update_ts); - res.set_txn_size(txn_size); - res.set_use_async_commit(use_async_commit); - res.set_key(decodeTiKVKey(*key)); - res.set_is_txn_file(is_txn_file); - - if (use_async_commit) + if likely (inner != nullptr) { - const auto * data = secondaries.data(); - auto len = secondaries.size(); - UInt64 cnt = readVarUInt(data, len); - for (UInt64 i = 0; i < cnt; ++i) + f(*inner); + return; + } + // We own a raw pointer here, remember to release it! + DecodedLockCFValue::Inner * in = decodeLockCfValue(*this); + f(*in); + delete in; +} + +void DecodedLockCFValue::intoLockInfo(kvrpcpb::LockInfo & res) const +{ + withInner([&](const Inner & in) { + res.set_lock_type(in.lock_type); + res.set_primary_lock(in.primary_lock.data(), in.primary_lock.size()); + res.set_lock_version(in.lock_version); + res.set_lock_ttl(in.lock_ttl); + res.set_min_commit_ts(in.min_commit_ts); + res.set_lock_for_update_ts(in.lock_for_update_ts); + res.set_txn_size(in.txn_size); + res.set_use_async_commit(in.use_async_commit); + res.set_key(decodeTiKVKey(*key)); + res.set_is_txn_file(in.is_txn_file); + if (in.use_async_commit) { - res.add_secondaries(readVarString(data, len)); + const auto * data = in.secondaries.data(); + auto len = in.secondaries.size(); + UInt64 cnt = readVarUInt(data, len); + for (UInt64 i = 0; i < cnt; ++i) + { + res.add_secondaries(readVarString(data, len)); + } } - } + }); } std::unique_ptr DecodedLockCFValue::intoLockInfo() const @@ -189,7 +207,68 @@ std::unique_ptr DecodedLockCFValue::intoLockInfo() const bool DecodedLockCFValue::isLargeTxn() const { - return generation > 0; + auto g = 0; + withInner([&](const Inner & in) { g = in.generation; }); + return g > 0; +} + +UInt64 DecodedLockCFValue::getLockVersion() const +{ + auto x = 0; + withInner([&](const Inner & in) { x = in.lock_version; }); + return x; +} +UInt64 DecodedLockCFValue::getLockTtl() const +{ + auto x = 0; + withInner([&](const Inner & in) { x = in.lock_ttl; }); + return x; +} +UInt64 DecodedLockCFValue::getTxnSize() const +{ + auto x = 0; + withInner([&](const Inner & in) { x = in.txn_size; }); + return x; +} +UInt64 DecodedLockCFValue::getLockForUpdateTs() const +{ + auto x = 0; + withInner([&](const Inner & in) { x = in.lock_for_update_ts; }); + return x; +} +kvrpcpb::Op DecodedLockCFValue::getLockType() const +{ + kvrpcpb::Op x; + withInner([&](const Inner & in) { x = in.lock_type; }); + return x; +} + +bool DecodedLockCFValue::getUseAsyncCommit() const +{ + auto x = false; + withInner([&](const Inner & in) { x = in.use_async_commit; }); + return x; +} + +UInt64 DecodedLockCFValue::getMinCommitTs() const +{ + auto x = 0; + withInner([&](const Inner & in) { x = in.min_commit_ts; }); + return x; +} + +bool DecodedLockCFValue::getIsTxnFile() const +{ + auto x = false; + withInner([&](const Inner & in) { x = in.is_txn_file; }); + return x; +} + +UInt64 DecodedLockCFValue::getGeneration() const +{ + auto x = 0; + withInner([&](const Inner & in) { x = in.generation; }); + return x; } } // namespace RecordKVFormat diff --git a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h index 8c7bac5e2c9..dba6a8e33ab 100644 --- a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h +++ b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h @@ -23,39 +23,44 @@ namespace DB::RecordKVFormat struct DecodedLockCFValue : boost::noncopyable { + struct Inner + { + UInt64 lock_version{0}; + UInt64 lock_ttl{0}; + UInt64 txn_size{0}; + UInt64 lock_for_update_ts{0}; + kvrpcpb::Op lock_type{kvrpcpb::Op_MIN}; + bool use_async_commit{false}; + UInt64 min_commit_ts{0}; + std::string_view secondaries; + std::string_view primary_lock; + bool is_txn_file{false}; + // For large txn, generation is not zero. + UInt64 generation{0}; + }; DecodedLockCFValue(std::shared_ptr key_, std::shared_ptr val_); std::unique_ptr intoLockInfo() const; void intoLockInfo(kvrpcpb::LockInfo &) const; bool isLargeTxn() const; - UInt64 getLockVersion() const { return lock_version; } - UInt64 getLockTtl() const { return lock_ttl; } - UInt64 getTxnSize() const { return txn_size; } - UInt64 getLockForUpdateTs() const { return lock_for_update_ts; } - kvrpcpb::Op getLockType() const { return lock_type; } - bool getUseAsyncCommit() const { return use_async_commit; } - UInt64 getMinCommitTs() const { return min_commit_ts; } - const std::string_view & getSecondaries() const { return secondaries; } - const std::string_view & getPrimaryLock() const { return primary_lock; } - bool getIsTxnFile() const { return is_txn_file; } - bool getGeneration() const { return generation; } + UInt64 getLockVersion() const; + UInt64 getLockTtl() const; + UInt64 getTxnSize() const; + UInt64 getLockForUpdateTs() const; + kvrpcpb::Op getLockType() const; + bool getUseAsyncCommit() const; + UInt64 getMinCommitTs() const; + bool getIsTxnFile() const; + UInt64 getGeneration() const; + + void withInner(std::function f) const; std::shared_ptr key; std::shared_ptr val; private: - friend void decodeLockCfValue(DecodedLockCFValue & res); - UInt64 lock_version{0}; - UInt64 lock_ttl{0}; - UInt64 txn_size{0}; - UInt64 lock_for_update_ts{0}; - kvrpcpb::Op lock_type{kvrpcpb::Op_MIN}; - bool use_async_commit{false}; - UInt64 min_commit_ts{0}; - std::string_view secondaries; - std::string_view primary_lock; - bool is_txn_file{false}; - // For large txn, generation is not zero. - UInt64 generation{0}; + friend DecodedLockCFValue::Inner * decodeLockCfValue(const DecodedLockCFValue & res); + // Avoid using shared_ptr to reduce space. + Inner * inner{nullptr}; }; } // namespace DB::RecordKVFormat diff --git a/dbms/src/Storages/KVStore/tests/gtest_tikv_keyvalue.cpp b/dbms/src/Storages/KVStore/tests/gtest_tikv_keyvalue.cpp index 76289ef1112..0a5872a2355 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_tikv_keyvalue.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_tikv_keyvalue.cpp @@ -150,15 +150,23 @@ TEST(TiKVKeyValueTest, PortedTests) { auto & lock_info = lock; ASSERT_TRUE(kvrpcpb::Op::Del == lock_info.getLockType()); - ASSERT_TRUE("primary key" == lock_info.getPrimaryLock()); ASSERT_TRUE(421321 == lock_info.getLockVersion()); ASSERT_TRUE(std::numeric_limits::max() == lock_info.getLockTtl()); ASSERT_TRUE(66666 == lock_info.getMinCommitTs()); ASSERT_TRUE(ori_key == lock_info.key); ASSERT_EQ(lock_for_update_ts, lock_info.getLockForUpdateTs()); ASSERT_EQ(txn_size, lock_info.getTxnSize()); - ASSERT_EQ(true, lock_info.getUseAsyncCommit()); + lock_info.withInner([&](const auto & in) { + ASSERT_TRUE(kvrpcpb::Op::Del == in.lock_type); + ASSERT_TRUE("primary key" == in.primary_lock); + ASSERT_TRUE(421321 == in.lock_version); + ASSERT_TRUE(std::numeric_limits::max() == in.lock_ttl); + ASSERT_TRUE(66666 == in.min_commit_ts); + ASSERT_EQ(lock_for_update_ts, in.lock_for_update_ts); + ASSERT_EQ(txn_size, in.txn_size); + ASSERT_EQ(true, in.use_async_commit); + }); } { auto lock_info = lock.intoLockInfo(); @@ -482,15 +490,24 @@ try { auto & lock_info = lock; ASSERT_TRUE(kvrpcpb::Op::Del == lock_info.getLockType()); - ASSERT_TRUE("primary key" == lock_info.getPrimaryLock()); ASSERT_TRUE(421321 == lock_info.getLockVersion()); ASSERT_TRUE(std::numeric_limits::max() == lock_info.getLockTtl()); ASSERT_TRUE(66666 == lock_info.getMinCommitTs()); ASSERT_TRUE(ori_key == lock_info.key); ASSERT_EQ(lock_for_update_ts, lock_info.getLockForUpdateTs()); ASSERT_EQ(txn_size, lock_info.getTxnSize()); - ASSERT_EQ(true, lock_info.getUseAsyncCommit()); + + lock_info.withInner([&](const auto & in) { + ASSERT_TRUE(kvrpcpb::Op::Del == in.lock_type); + ASSERT_TRUE("primary key" == in.primary_lock); + ASSERT_TRUE(421321 == in.lock_version); + ASSERT_TRUE(std::numeric_limits::max() == in.lock_ttl); + ASSERT_TRUE(66666 == in.min_commit_ts); + ASSERT_EQ(lock_for_update_ts, in.lock_for_update_ts); + ASSERT_EQ(txn_size, in.txn_size); + ASSERT_EQ(true, in.use_async_commit); + }); } } CATCH From fdbfa1fb2609fc0d6b078beea3d96b17af074a4d Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Wed, 25 Dec 2024 12:39:08 +0800 Subject: [PATCH 05/19] fix pessi Signed-off-by: Calvin Neo --- dbms/src/Common/TiFlashMetrics.h | 1 - .../KVStore/Decode/PartitionStreams.cpp | 110 +++++++++--------- .../KVStore/MultiRaft/Persistence.cpp | 2 +- .../KVStore/MultiRaft/RaftCommands.cpp | 2 +- .../KVStore/MultiRaft/RegionCFDataBase.cpp | 10 +- .../KVStore/MultiRaft/RegionCFDataBase.h | 2 +- .../KVStore/MultiRaft/RegionCFDataTrait.h | 4 - .../Storages/KVStore/MultiRaft/RegionData.cpp | 4 +- .../Storages/KVStore/MultiRaft/RegionData.h | 3 +- dbms/src/Storages/KVStore/Region.cpp | 2 +- dbms/src/Storages/KVStore/Region.h | 7 +- .../TiKVHelpers/DecodedLockCFValue.cpp | 69 +++-------- .../KVStore/TiKVHelpers/DecodedLockCFValue.h | 10 +- .../Storages/KVStore/tests/gtest_kvstore.cpp | 3 +- .../KVStore/tests/gtest_new_kvstore.cpp | 4 - .../KVStore/tests/gtest_tikv_keyvalue.cpp | 37 +++--- 16 files changed, 111 insertions(+), 159 deletions(-) diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 6858f5450e5..e44b05bbe2e 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -464,7 +464,6 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva F(type_default_put, {"type", "default_put"}), \ F(type_write_del, {"type", "write_del"}), \ F(type_lock_del, {"type", "lock_del"}), \ - F(type_pessimistic_lock_del, {"type", "pessimistic_lock_del"}), \ F(type_pessimistic_lock_put, {"type", "pessimistic_lock_put"}), \ F(type_lock_replaced, {"type", "lock_replaced"}), \ F(type_default_del, {"type", "default_del"}), \ diff --git a/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp b/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp index 4ab172c88a3..02d23458a05 100644 --- a/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp +++ b/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp @@ -275,66 +275,65 @@ std::variantcreateCommittedScanner(true, need_data_value); + + /// Some sanity checks for region meta. { - auto scanner = region->createCommittedScanner(true, need_data_value); + /** + * special check: when source region is merging, read_index can not guarantee the behavior about target region. + * Reject all read request for safety. + * Only when region is Normal can continue read process. + */ + if (region->peerState() != raft_serverpb::PeerState::Normal) + return RegionException::RegionReadStatus::NOT_FOUND; + + const auto & meta_snap = region->dumpRegionMetaSnapshot(); + // No need to check conf_version if its peer state is normal + std::ignore = conf_version; + if (meta_snap.ver != region_version) + return RegionException::RegionReadStatus::EPOCH_NOT_MATCH; + + // todo check table id + TableID mapped_table_id; + if (!computeMappedTableID(*meta_snap.range->rawKeys().first, mapped_table_id) || mapped_table_id != table_id) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Should not happen, region not belong to table, table_id={} expect_table_id={}", + mapped_table_id, + table_id); + } - /// Some sanity checks for region meta. - { - /** - * special check: when source region is merging, read_index can not guarantee the behavior about target region. - * Reject all read request for safety. - * Only when region is Normal can continue read process. - */ - if (region->peerState() != raft_serverpb::PeerState::Normal) - return RegionException::RegionReadStatus::NOT_FOUND; - - const auto & meta_snap = region->dumpRegionMetaSnapshot(); - // No need to check conf_version if its peer state is normal - std::ignore = conf_version; - if (meta_snap.ver != region_version) - return RegionException::RegionReadStatus::EPOCH_NOT_MATCH; - - // todo check table id - TableID mapped_table_id; - if (!computeMappedTableID(*meta_snap.range->rawKeys().first, mapped_table_id) - || mapped_table_id != table_id) - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "Should not happen, region not belong to table, table_id={} expect_table_id={}", - mapped_table_id, - table_id); - } + /// Deal with locks. + if (resolve_locks) + { + /// Check if there are any lock should be resolved, if so, throw LockException. + /// It will iterate all locks with in the time range. + lock_info = scanner.getLockInfo(RegionLockReadQuery{.read_tso = start_ts, .bypass_lock_ts = bypass_lock_ts}); + } - /// Deal with locks. - if (resolve_locks) - { - /// Check if there are any lock should be resolved, if so, throw LockException. - lock_value - = scanner.getLockInfo(RegionLockReadQuery{.read_tso = start_ts, .bypass_lock_ts = bypass_lock_ts}); - } + /// If there is no lock, leave scope of region scanner and raise LockException. + /// Read raw KVs from region cache. + if (!lock_info) + { + // Shortcut for empty region. + if (!scanner.hasNext()) + return data_list_read; + + // If worked with raftstore v2, the final size may not equal to here. + data_list_read.reserve(scanner.writeMapSize()); - /// If there is no lock, leave scope of region scanner and raise LockException. - /// Read raw KVs from region cache. - if (!lock_value) + // Tiny optimization for queries that need only handle, tso, delmark. + do { - // Shortcut for empty region. - if (!scanner.hasNext()) - return data_list_read; - - // If worked with raftstore v2, the final size may not equal to here. - data_list_read.reserve(scanner.writeMapSize()); - - // Tiny optimization for queries that need only handle, tso, delmark. - do - { - data_list_read.emplace_back(scanner.next()); - } while (scanner.hasNext()); - } + data_list_read.emplace_back(scanner.next()); + } while (scanner.hasNext()); + } + else + { + return lock_info; } - - if (lock_value) - return lock_value->intoLockInfo(); return data_list_read; } @@ -395,7 +394,8 @@ void RemoveRegionCommitCache(const RegionPtr & region, const RegionDataReadInfoL std::ignore = write_type; std::ignore = value; // Effectively calls `RegionData::removeDataByWriteIt`. - if(remover.remove({handle, commit_ts})) remove_committed_count++; + if (remover.remove({handle, commit_ts})) + remove_committed_count++; } GET_METRIC(tiflash_raft_process_keys, type_write_remove).Increment(remove_committed_count); } diff --git a/dbms/src/Storages/KVStore/MultiRaft/Persistence.cpp b/dbms/src/Storages/KVStore/MultiRaft/Persistence.cpp index 6e678de78a1..79884e34beb 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/Persistence.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/Persistence.cpp @@ -269,4 +269,4 @@ bool KVStore::forceFlushRegionDataImpl( GET_METRIC(tiflash_raft_apply_write_command_duration_seconds, type_flush_region).Observe(watch.elapsedSeconds()); return true; } -} // namespace DB \ No newline at end of file +} // namespace DB diff --git a/dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp b/dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp index d2c024e996f..b6cf806f99b 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp @@ -531,4 +531,4 @@ RegionRaftCommandDelegate & Region::makeRaftCommandDelegate(const KVStoreTaskLoc std::ignore = lock; return static_cast(*this); } -} // namespace DB \ No newline at end of file +} // namespace DB diff --git a/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.cpp b/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.cpp index 3a31d0de888..38778e5ac17 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.cpp @@ -67,10 +67,7 @@ RegionDataRes RegionCFDataBase::insert(TiKVKey && key, Ti added_size -= calcTiKVKeyValueSize(iter->second); data.erase(iter); - if (decoded->getLockType() == kvrpcpb::Op::PessimisticLock) - { - GET_METRIC(tiflash_raft_process_keys, type_pessimistic_lock_del).Increment(1); - } + // In most cases, an optimistic lock replace a pessimistic lock. GET_METRIC(tiflash_raft_process_keys, type_lock_replaced).Increment(1); } if unlikely (is_large_txn) @@ -183,11 +180,6 @@ size_t RegionCFDataBase::remove(const Key & key, bool quiet) { GET_METRIC(tiflash_raft_process_keys, type_large_txn_lock_del).Increment(1); } - - if (std::get<2>(value)->getLockType() == kvrpcpb::Op::PessimisticLock) - { - GET_METRIC(tiflash_raft_process_keys, type_pessimistic_lock_del).Increment(1); - } } map.erase(it); return size; diff --git a/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.h b/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.h index cc3fe3037ab..a163ababbd3 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.h +++ b/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.h @@ -14,9 +14,9 @@ #pragma once +#include #include #include -#include #include diff --git a/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataTrait.h b/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataTrait.h index ab7e2b5555b..0b57649622d 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataTrait.h +++ b/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataTrait.h @@ -111,10 +111,6 @@ struct RegionLockCFDataTrait auto key = std::make_shared(std::move(key_)); auto value = std::make_shared(std::move(value_)); auto lo = std::make_shared(key, value); - if (lo->getLockType() == kvrpcpb::Op::PessimisticLock) - { - GET_METRIC(tiflash_raft_process_keys, type_pessimistic_lock_put).Increment(1); - } return {{key, std::string_view(key->data(), key->dataSize())}, Value{key, value, std::move(lo)}}; } }; diff --git a/dbms/src/Storages/KVStore/MultiRaft/RegionData.cpp b/dbms/src/Storages/KVStore/MultiRaft/RegionData.cpp index cd70d0ef766..ccd0ae35633 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RegionData.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/RegionData.cpp @@ -214,7 +214,7 @@ std::optional RegionData::readDataByWriteIt( return RegionDataReadInfo{pk, decoded_val.write_type, ts, decoded_val.short_value}; } -DecodedLockCFValuePtr RegionData::getLockInfo(const RegionLockReadQuery & query) const +LockInfoPtr RegionData::getLockInfo(const RegionLockReadQuery & query) const { for (const auto & [pk, value] : lock_cf.getData()) { @@ -253,7 +253,7 @@ DecodedLockCFValuePtr RegionData::getLockInfo(const RegionLockReadQuery & query) continue; } - return lock_info_ptr; + return lock_info_ptr->intoLockInfo(); } return nullptr; diff --git a/dbms/src/Storages/KVStore/MultiRaft/RegionData.h b/dbms/src/Storages/KVStore/MultiRaft/RegionData.h index 753c4d96aa8..a066e149851 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RegionData.h +++ b/dbms/src/Storages/KVStore/MultiRaft/RegionData.h @@ -36,6 +36,7 @@ class RegionData public: using WriteCFIter = RegionWriteCFData::Map::iterator; using ConstWriteCFIter = RegionWriteCFData::Map::const_iterator; + using LockInfoPtr = std::unique_ptr; static void reportAlloc(size_t delta); static void reportDealloc(size_t delta); @@ -53,7 +54,7 @@ class RegionData UInt64 applied, bool hard_error); - DecodedLockCFValuePtr getLockInfo(const RegionLockReadQuery & query) const; + LockInfoPtr getLockInfo(const RegionLockReadQuery & query) const; std::shared_ptr getLockByKey(const TiKVKey & key) const; diff --git a/dbms/src/Storages/KVStore/Region.cpp b/dbms/src/Storages/KVStore/Region.cpp index 8710dd01729..42d6d885885 100644 --- a/dbms/src/Storages/KVStore/Region.cpp +++ b/dbms/src/Storages/KVStore/Region.cpp @@ -51,7 +51,7 @@ std::optional Region::readDataByWriteIt( } } -DecodedLockCFValuePtr Region::getLockInfo(const RegionLockReadQuery & query) const +LockInfoPtr Region::getLockInfo(const RegionLockReadQuery & query) const { return data.getLockInfo(query); } diff --git a/dbms/src/Storages/KVStore/Region.h b/dbms/src/Storages/KVStore/Region.h index 958f7654264..db766a2c413 100644 --- a/dbms/src/Storages/KVStore/Region.h +++ b/dbms/src/Storages/KVStore/Region.h @@ -80,7 +80,7 @@ class Region : public std::enable_shared_from_this bool hasNext(); RegionDataReadInfo next(); - DecodedLockCFValuePtr getLockInfo(const RegionLockReadQuery & query) { return region->getLockInfo(query); } + RegionData::LockInfoPtr getLockInfo(const RegionLockReadQuery & query) { return region->getLockInfo(query); } size_t writeMapSize() const { return write_map_size; } @@ -114,7 +114,8 @@ class Region : public std::enable_shared_from_this bool remove(const RegionWriteCFData::Key & key) { auto & write_cf_data = region->data.writeCF().getDataMut(); - if (auto it = write_cf_data.find(key); it != write_cf_data.end()) { + if (auto it = write_cf_data.find(key); it != write_cf_data.end()) + { region->removeDataByWriteIt(it); return true; } @@ -288,7 +289,7 @@ class Region : public std::enable_shared_from_this bool hard_error); RegionData::WriteCFIter removeDataByWriteIt(const RegionData::WriteCFIter & write_it); - DecodedLockCFValuePtr getLockInfo(const RegionLockReadQuery & query) const; + RegionData::LockInfoPtr getLockInfo(const RegionLockReadQuery & query) const; RegionPtr splitInto(RegionMeta && meta); void setPeerState(raft_serverpb::PeerState state); diff --git a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp index 053f4d483f0..469b298b322 100644 --- a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp +++ b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp @@ -155,8 +155,19 @@ DecodedLockCFValue::DecodedLockCFValue(std::shared_ptr key_, std: : key(std::move(key_)) , val(std::move(val_)) { - // TODO do not cache when meets large txn, otherwise, still cache - // decodeLockCfValue(*this); + auto * parsed = decodeLockCfValue(*this); + if (parsed->generation > 0) + { + delete parsed; + } + else + { + inner = parsed; + } + if (parsed->lock_type == kvrpcpb::Op::PessimisticLock) + { + GET_METRIC(tiflash_raft_process_keys, type_pessimistic_lock_put).Increment(1); + } } void DecodedLockCFValue::withInner(std::function f) const @@ -207,35 +218,10 @@ std::unique_ptr DecodedLockCFValue::intoLockInfo() const bool DecodedLockCFValue::isLargeTxn() const { - auto g = 0; - withInner([&](const Inner & in) { g = in.generation; }); - return g > 0; + // Because we do not cache the parsed result for large txn. + return inner == nullptr; } -UInt64 DecodedLockCFValue::getLockVersion() const -{ - auto x = 0; - withInner([&](const Inner & in) { x = in.lock_version; }); - return x; -} -UInt64 DecodedLockCFValue::getLockTtl() const -{ - auto x = 0; - withInner([&](const Inner & in) { x = in.lock_ttl; }); - return x; -} -UInt64 DecodedLockCFValue::getTxnSize() const -{ - auto x = 0; - withInner([&](const Inner & in) { x = in.txn_size; }); - return x; -} -UInt64 DecodedLockCFValue::getLockForUpdateTs() const -{ - auto x = 0; - withInner([&](const Inner & in) { x = in.lock_for_update_ts; }); - return x; -} kvrpcpb::Op DecodedLockCFValue::getLockType() const { kvrpcpb::Op x; @@ -243,31 +229,10 @@ kvrpcpb::Op DecodedLockCFValue::getLockType() const return x; } -bool DecodedLockCFValue::getUseAsyncCommit() const -{ - auto x = false; - withInner([&](const Inner & in) { x = in.use_async_commit; }); - return x; -} - -UInt64 DecodedLockCFValue::getMinCommitTs() const -{ - auto x = 0; - withInner([&](const Inner & in) { x = in.min_commit_ts; }); - return x; -} - -bool DecodedLockCFValue::getIsTxnFile() const -{ - auto x = false; - withInner([&](const Inner & in) { x = in.is_txn_file; }); - return x; -} - -UInt64 DecodedLockCFValue::getGeneration() const +UInt64 DecodedLockCFValue::getLockVersion() const { auto x = 0; - withInner([&](const Inner & in) { x = in.generation; }); + withInner([&](const Inner & in) { x = in.lock_version; }); return x; } diff --git a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h index dba6a8e33ab..a65f16d497a 100644 --- a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h +++ b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h @@ -42,16 +42,8 @@ struct DecodedLockCFValue : boost::noncopyable std::unique_ptr intoLockInfo() const; void intoLockInfo(kvrpcpb::LockInfo &) const; bool isLargeTxn() const; - UInt64 getLockVersion() const; - UInt64 getLockTtl() const; - UInt64 getTxnSize() const; - UInt64 getLockForUpdateTs() const; kvrpcpb::Op getLockType() const; - bool getUseAsyncCommit() const; - UInt64 getMinCommitTs() const; - bool getIsTxnFile() const; - UInt64 getGeneration() const; - + UInt64 getLockVersion() const; void withInner(std::function f) const; std::shared_ptr key; diff --git a/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp b/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp index d6e7e073b53..60f1ccd13bd 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp @@ -656,8 +656,7 @@ TEST_F(RegionKVStoreOldTest, RegionReadWrite) auto iter = region->createCommittedScanner(true, true); auto lock = iter.getLockInfo({100, nullptr}); ASSERT_NE(lock, nullptr); - auto k = lock->intoLockInfo(); - ASSERT_EQ(k->lock_version(), 3); + ASSERT_EQ(lock->lock_version(), 3); } { // The record is committed since there is a write record. diff --git a/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp b/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp index f3f552ea2ce..e9b126df194 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp @@ -1240,8 +1240,4 @@ try } CATCH -TEST(PrintSize, XXX) { - LOG_INFO(DB::Logger::get(), "!!!! F {}", std::to_string(sizeof(RecordKVFormat::DecodedLockCFValue))); -} - } // namespace DB::tests diff --git a/dbms/src/Storages/KVStore/tests/gtest_tikv_keyvalue.cpp b/dbms/src/Storages/KVStore/tests/gtest_tikv_keyvalue.cpp index 0a5872a2355..f5c55f67e8f 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_tikv_keyvalue.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_tikv_keyvalue.cpp @@ -33,7 +33,8 @@ TiKVValue encode_lock_cf_value( Timestamp for_update_ts, uint64_t txn_size, const std::vector & async_commit, - const std::vector & rollback) + const std::vector & rollback, + UInt64 generation = 0) { auto lock_value = RecordKVFormat::encodeLockCfValue(lock_type, primary, ts, ttl, short_value, min_commit_ts); WriteBufferFromOwnString res; @@ -79,6 +80,11 @@ TiKVValue encode_lock_cf_value( { res.write(RecordKVFormat::PESSIMISTIC_LOCK_WITH_CONFLICT_PREFIX); } + if (generation > 0) + { + res.write(RecordKVFormat::GENERATION_PREFIX); + RecordKVFormat::encodeUInt64(generation, res); + } return TiKVValue(res.releaseStr()); } @@ -150,13 +156,7 @@ TEST(TiKVKeyValueTest, PortedTests) { auto & lock_info = lock; ASSERT_TRUE(kvrpcpb::Op::Del == lock_info.getLockType()); - ASSERT_TRUE(421321 == lock_info.getLockVersion()); - ASSERT_TRUE(std::numeric_limits::max() == lock_info.getLockTtl()); - ASSERT_TRUE(66666 == lock_info.getMinCommitTs()); ASSERT_TRUE(ori_key == lock_info.key); - ASSERT_EQ(lock_for_update_ts, lock_info.getLockForUpdateTs()); - ASSERT_EQ(txn_size, lock_info.getTxnSize()); - ASSERT_EQ(true, lock_info.getUseAsyncCommit()); lock_info.withInner([&](const auto & in) { ASSERT_TRUE(kvrpcpb::Op::Del == in.lock_type); ASSERT_TRUE("primary key" == in.primary_lock); @@ -490,13 +490,8 @@ try { auto & lock_info = lock; ASSERT_TRUE(kvrpcpb::Op::Del == lock_info.getLockType()); - ASSERT_TRUE(421321 == lock_info.getLockVersion()); - ASSERT_TRUE(std::numeric_limits::max() == lock_info.getLockTtl()); - ASSERT_TRUE(66666 == lock_info.getMinCommitTs()); ASSERT_TRUE(ori_key == lock_info.key); - ASSERT_EQ(lock_for_update_ts, lock_info.getLockForUpdateTs()); - ASSERT_EQ(txn_size, lock_info.getTxnSize()); - ASSERT_EQ(true, lock_info.getUseAsyncCommit()); + ASSERT_FALSE(lock_info.isLargeTxn()); lock_info.withInner([&](const auto & in) { ASSERT_TRUE(kvrpcpb::Op::Del == in.lock_type); @@ -509,6 +504,22 @@ try ASSERT_EQ(true, in.use_async_commit); }); } + + auto lock_value2 = encode_lock_cf_value( + Region::DelFlag, + "primary key", + 421321, + std::numeric_limits::max(), + &shor_value, + 66666, + lock_for_update_ts, + txn_size, + async_commit, + rollback, + 1111); + + auto lock2 = RecordKVFormat::DecodedLockCFValue(ori_key, std::make_shared(std::move(lock_value2))); + ASSERT_TRUE(lock2.isLargeTxn()); } CATCH From f667ba6ddb8b83aa59ae09030cd44768e02c54e5 Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Wed, 25 Dec 2024 13:39:30 +0800 Subject: [PATCH 06/19] Update dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp Co-authored-by: Lloyd-Pottiger <60744015+Lloyd-Pottiger@users.noreply.github.com> --- dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp index 469b298b322..65904df7a71 100644 --- a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp +++ b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp @@ -231,7 +231,7 @@ kvrpcpb::Op DecodedLockCFValue::getLockType() const UInt64 DecodedLockCFValue::getLockVersion() const { - auto x = 0; + UInt64 x = 0; withInner([&](const Inner & in) { x = in.lock_version; }); return x; } From a17701ce1759f89fb32251c0b36064850a7d15bf Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Wed, 25 Dec 2024 17:13:59 +0800 Subject: [PATCH 07/19] Update dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h Co-authored-by: jinhelin --- .../Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h index a65f16d497a..cf86b517522 100644 --- a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h +++ b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h @@ -25,18 +25,17 @@ struct DecodedLockCFValue : boost::noncopyable { struct Inner { + std::string_view secondaries; + std::string_view primary_lock; UInt64 lock_version{0}; UInt64 lock_ttl{0}; UInt64 txn_size{0}; UInt64 lock_for_update_ts{0}; + UInt64 min_commit_ts{0}; + UInt64 generation{0}; // For large txn, generation is not zero. kvrpcpb::Op lock_type{kvrpcpb::Op_MIN}; bool use_async_commit{false}; - UInt64 min_commit_ts{0}; - std::string_view secondaries; - std::string_view primary_lock; bool is_txn_file{false}; - // For large txn, generation is not zero. - UInt64 generation{0}; }; DecodedLockCFValue(std::shared_ptr key_, std::shared_ptr val_); std::unique_ptr intoLockInfo() const; From a847489a890b1dc11ecb72de308c0240db82643f Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Wed, 25 Dec 2024 17:27:25 +0800 Subject: [PATCH 08/19] Update dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp Co-authored-by: jinhelin --- dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp b/dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp index b6cf806f99b..a765e1f0066 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp @@ -454,15 +454,10 @@ std::pair Region::handleWriteRaftCmd( } approx_mem_cache_rows += cmd_write_cf_cnt; auto current_size = dataSize(); - if (current_size > ori_cache_size) - approx_mem_cache_bytes += (current_size - ori_cache_size); + if (auto t = approx_mem_cache_rows + current_size; t > ori_cache_size) + approx_mem_cache_rows = t - ori_cache_size; else - { - if (approx_mem_cache_bytes > ori_cache_size - current_size) - approx_mem_cache_bytes -= (ori_cache_size - current_size); - else - approx_mem_cache_bytes = 0; - } + approx_mem_cache_rows = 0; }; DM::WriteResult write_result = std::nullopt; From 0b200fc171a45796d670c5f9bafd14f3bf2c8c6e Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Wed, 25 Dec 2024 22:51:48 +0800 Subject: [PATCH 09/19] address some comments Signed-off-by: Calvin Neo --- .../TiKVHelpers/DecodedLockCFValue.cpp | 23 ++-- .../KVStore/TiKVHelpers/DecodedLockCFValue.h | 4 +- .../KVStore/tests/bench_parse_lock.cpp | 127 ++++++++++++++++++ .../KVStore/tests/gtest_tikv_keyvalue.cpp | 1 + .../KVStore/tests/region_kvstore_test.h | 2 +- 5 files changed, 140 insertions(+), 17 deletions(-) create mode 100644 dbms/src/Storages/KVStore/tests/bench_parse_lock.cpp diff --git a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp index 469b298b322..90958fb017b 100644 --- a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp +++ b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp @@ -23,9 +23,9 @@ namespace RecordKVFormat { // https://github.com/tikv/tikv/blob/master/components/txn_types/src/lock.rs -[[nodiscard]] DecodedLockCFValue::Inner * decodeLockCfValue(const DecodedLockCFValue & decoded) +[[nodiscard]] std::unique_ptr DecodedLockCFValue::decodeLockCfValue(const DecodedLockCFValue & decoded) const { - auto * inner = new DecodedLockCFValue::Inner{}; + auto inner = std::make_unique(); auto & res = *inner; const TiKVValue & value = *decoded.val; const char * data = value.data(); @@ -155,19 +155,16 @@ DecodedLockCFValue::DecodedLockCFValue(std::shared_ptr key_, std: : key(std::move(key_)) , val(std::move(val_)) { - auto * parsed = decodeLockCfValue(*this); - if (parsed->generation > 0) - { - delete parsed; - } - else - { - inner = parsed; - } + auto parsed = decodeLockCfValue(*this); if (parsed->lock_type == kvrpcpb::Op::PessimisticLock) { GET_METRIC(tiflash_raft_process_keys, type_pessimistic_lock_put).Increment(1); } + if (parsed->generation == 0) + { + // It is not a large txn, we cache the parsed lock. + inner = std::move(parsed); + } } void DecodedLockCFValue::withInner(std::function f) const @@ -177,10 +174,8 @@ void DecodedLockCFValue::withInner(std::function in = decodeLockCfValue(*this); f(*in); - delete in; } void DecodedLockCFValue::intoLockInfo(kvrpcpb::LockInfo & res) const diff --git a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h index a65f16d497a..47bbe524807 100644 --- a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h +++ b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h @@ -50,9 +50,9 @@ struct DecodedLockCFValue : boost::noncopyable std::shared_ptr val; private: - friend DecodedLockCFValue::Inner * decodeLockCfValue(const DecodedLockCFValue & res); + std::unique_ptr decodeLockCfValue(const DecodedLockCFValue & res) const; // Avoid using shared_ptr to reduce space. - Inner * inner{nullptr}; + std::unique_ptr inner{nullptr}; }; } // namespace DB::RecordKVFormat diff --git a/dbms/src/Storages/KVStore/tests/bench_parse_lock.cpp b/dbms/src/Storages/KVStore/tests/bench_parse_lock.cpp new file mode 100644 index 00000000000..bf2773c6223 --- /dev/null +++ b/dbms/src/Storages/KVStore/tests/bench_parse_lock.cpp @@ -0,0 +1,127 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + +#include + +using namespace DB; + +namespace DB::tests +{ +using DB::RecordKVFormat::DecodedLockCFValue; + +DecodedLockCFValue::Inner * decodeLockCfValue(const DecodedLockCFValue & decoded); + +TiKVValue encode_lock_cf_value( + UInt8 lock_type, + const String & primary, + Timestamp ts, + UInt64 ttl, + const String * short_value, + Timestamp min_commit_ts, + Timestamp for_update_ts, + uint64_t txn_size, + const std::vector & async_commit, + const std::vector & rollback, + UInt64 generation = 0) +{ + auto lock_value = RecordKVFormat::encodeLockCfValue(lock_type, primary, ts, ttl, short_value, min_commit_ts); + WriteBufferFromOwnString res; + res.write(lock_value.getStr().data(), lock_value.getStr().size()); + { + res.write(RecordKVFormat::MIN_COMMIT_TS_PREFIX); + RecordKVFormat::encodeUInt64(min_commit_ts, res); + } + { + res.write(RecordKVFormat::FOR_UPDATE_TS_PREFIX); + RecordKVFormat::encodeUInt64(for_update_ts, res); + } + { + res.write(RecordKVFormat::TXN_SIZE_PREFIX); + RecordKVFormat::encodeUInt64(txn_size, res); + } + { + res.write(RecordKVFormat::ROLLBACK_TS_PREFIX); + TiKV::writeVarUInt(rollback.size(), res); + for (auto ts : rollback) + { + RecordKVFormat::encodeUInt64(ts, res); + } + } + { + res.write(RecordKVFormat::ASYNC_COMMIT_PREFIX); + TiKV::writeVarUInt(async_commit.size(), res); + for (const auto & s : async_commit) + { + writeVarInt(s.size(), res); + res.write(s.data(), s.size()); + } + } + { + res.write(RecordKVFormat::LAST_CHANGE_PREFIX); + RecordKVFormat::encodeUInt64(12345678, res); + TiKV::writeVarUInt(87654321, res); + } + { + res.write(RecordKVFormat::TXN_SOURCE_PREFIX_FOR_LOCK); + TiKV::writeVarUInt(876543, res); + } + { + res.write(RecordKVFormat::PESSIMISTIC_LOCK_WITH_CONFLICT_PREFIX); + } + if (generation > 0) + { + res.write(RecordKVFormat::GENERATION_PREFIX); + RecordKVFormat::encodeUInt64(generation, res); + } + return TiKVValue(res.releaseStr()); +} + +void parseTest(benchmark::State& state) { + try { + std::string shor_value = "value"; + auto lock_for_update_ts = 7777, txn_size = 1; + const std::vector & async_commit = {"s1", "s2"}; + const std::vector & rollback = {3, 4}; + auto lock_value2 = encode_lock_cf_value( + Region::DelFlag, + "primary key", + 421321, + std::numeric_limits::max(), + &shor_value, + 66666, + lock_for_update_ts, + txn_size, + async_commit, + rollback, + 111); + auto ori_key = std::make_shared(RecordKVFormat::genKey(1, 88888)); + for (auto _ : state) + { + auto x = RecordKVFormat::DecodedLockCFValue(ori_key, std::make_shared(std::move(lock_value2))); + benchmark::DoNotOptimize(x); + } + } catch (...) { + tryLogCurrentException(DB::Logger::get(), __PRETTY_FUNCTION__); + } +} + +BENCHMARK(parseTest); + +} // namespace + diff --git a/dbms/src/Storages/KVStore/tests/gtest_tikv_keyvalue.cpp b/dbms/src/Storages/KVStore/tests/gtest_tikv_keyvalue.cpp index f5c55f67e8f..3c92c245358 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_tikv_keyvalue.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_tikv_keyvalue.cpp @@ -523,6 +523,7 @@ try } CATCH + TEST(TiKVKeyValueTest, Redact) try { diff --git a/dbms/src/Storages/KVStore/tests/region_kvstore_test.h b/dbms/src/Storages/KVStore/tests/region_kvstore_test.h index 5f007a24fc8..3c17243db43 100644 --- a/dbms/src/Storages/KVStore/tests/region_kvstore_test.h +++ b/dbms/src/Storages/KVStore/tests/region_kvstore_test.h @@ -62,7 +62,7 @@ inline void validateSSTGeneration( const LoggerPtr & log_) -> std::unique_ptr { auto parsed_kind = MockSSTGenerator::parseSSTViewKind(buffToStrView(snap.path)); auto reader = std::make_unique(proxy_helper, snap, range, log_); - assert(reader->sstFormatKind() == parsed_kind); + RUNTIME_CHECK(reader->sstFormatKind() == parsed_kind); return reader; }; MultiSSTReader reader{ From bb4a4dc3c63eaf5cc39c70818cd0bc70b4d6b9ee Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Wed, 25 Dec 2024 23:34:21 +0800 Subject: [PATCH 10/19] format the code Signed-off-by: Calvin Neo --- .../TiKVHelpers/DecodedLockCFValue.cpp | 17 ++-------- .../KVStore/TiKVHelpers/DecodedLockCFValue.h | 2 -- .../KVStore/tests/bench_parse_lock.cpp | 32 +++++++++++-------- .../KVStore/tests/gtest_tikv_keyvalue.cpp | 14 ++++---- 4 files changed, 27 insertions(+), 38 deletions(-) diff --git a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp index 49c461b6218..f731d4c1506 100644 --- a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp +++ b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp @@ -23,7 +23,8 @@ namespace RecordKVFormat { // https://github.com/tikv/tikv/blob/master/components/txn_types/src/lock.rs -[[nodiscard]] std::unique_ptr DecodedLockCFValue::decodeLockCfValue(const DecodedLockCFValue & decoded) const +[[nodiscard]] std::unique_ptr DecodedLockCFValue::decodeLockCfValue( + const DecodedLockCFValue & decoded) const { auto inner = std::make_unique(); auto & res = *inner; @@ -217,19 +218,5 @@ bool DecodedLockCFValue::isLargeTxn() const return inner == nullptr; } -kvrpcpb::Op DecodedLockCFValue::getLockType() const -{ - kvrpcpb::Op x; - withInner([&](const Inner & in) { x = in.lock_type; }); - return x; -} - -UInt64 DecodedLockCFValue::getLockVersion() const -{ - UInt64 x = 0; - withInner([&](const Inner & in) { x = in.lock_version; }); - return x; -} - } // namespace RecordKVFormat } // namespace DB diff --git a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h index 306ddec8f5e..cc594667208 100644 --- a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h +++ b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h @@ -41,8 +41,6 @@ struct DecodedLockCFValue : boost::noncopyable std::unique_ptr intoLockInfo() const; void intoLockInfo(kvrpcpb::LockInfo &) const; bool isLargeTxn() const; - kvrpcpb::Op getLockType() const; - UInt64 getLockVersion() const; void withInner(std::function f) const; std::shared_ptr key; diff --git a/dbms/src/Storages/KVStore/tests/bench_parse_lock.cpp b/dbms/src/Storages/KVStore/tests/bench_parse_lock.cpp index bf2773c6223..63febc20ec1 100644 --- a/dbms/src/Storages/KVStore/tests/bench_parse_lock.cpp +++ b/dbms/src/Storages/KVStore/tests/bench_parse_lock.cpp @@ -12,9 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include -#include #include +#include +#include #include #include @@ -86,14 +86,16 @@ TiKVValue encode_lock_cf_value( } if (generation > 0) { - res.write(RecordKVFormat::GENERATION_PREFIX); - RecordKVFormat::encodeUInt64(generation, res); + // res.write(RecordKVFormat::GENERATION_PREFIX); + // RecordKVFormat::encodeUInt64(generation, res); } return TiKVValue(res.releaseStr()); } -void parseTest(benchmark::State& state) { - try { +void parseTest(benchmark::State & state) +{ + try + { std::string shor_value = "value"; auto lock_for_update_ts = 7777, txn_size = 1; const std::vector & async_commit = {"s1", "s2"}; @@ -109,19 +111,23 @@ void parseTest(benchmark::State& state) { txn_size, async_commit, rollback, - 111); + 1111); + auto ori_key = std::make_shared(RecordKVFormat::genKey(1, 88888)); for (auto _ : state) { - auto x = RecordKVFormat::DecodedLockCFValue(ori_key, std::make_shared(std::move(lock_value2))); - benchmark::DoNotOptimize(x); + auto lock2 = RecordKVFormat::DecodedLockCFValue( + ori_key, + std::make_shared(TiKVValue::copyFrom(lock_value2))); + benchmark::DoNotOptimize(lock2); } - } catch (...) { + } + catch (...) + { tryLogCurrentException(DB::Logger::get(), __PRETTY_FUNCTION__); } } - -BENCHMARK(parseTest); -} // namespace +BENCHMARK(parseTest); +} // namespace DB::tests diff --git a/dbms/src/Storages/KVStore/tests/gtest_tikv_keyvalue.cpp b/dbms/src/Storages/KVStore/tests/gtest_tikv_keyvalue.cpp index 3c92c245358..c43dc3adf29 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_tikv_keyvalue.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_tikv_keyvalue.cpp @@ -155,7 +155,6 @@ TEST(TiKVKeyValueTest, PortedTests) auto lock = RecordKVFormat::DecodedLockCFValue(ori_key, std::make_shared(std::move(lock_value))); { auto & lock_info = lock; - ASSERT_TRUE(kvrpcpb::Op::Del == lock_info.getLockType()); ASSERT_TRUE(ori_key == lock_info.key); lock_info.withInner([&](const auto & in) { ASSERT_TRUE(kvrpcpb::Op::Del == in.lock_type); @@ -216,12 +215,12 @@ TEST(TiKVKeyValueTest, PortedTests) nullptr, 66666)); ASSERT_TRUE(d.getSize() == 2); - ASSERT_TRUE( - std::get<2>(d.getData() - .find(RegionLockCFDataTrait::Key{nullptr, std::string_view(k2.data(), k2.dataSize())}) - ->second) - ->getLockVersion() - == 5678); + + std::get<2>(d.getData() + .find(RegionLockCFDataTrait::Key{nullptr, std::string_view(k2.data(), k2.dataSize())}) + ->second) + ->withInner([&](const auto & in) { ASSERT_EQ(in.lock_version, 5678); }); + d.remove(RegionLockCFDataTrait::Key{nullptr, std::string_view(k1.data(), k1.dataSize())}, true); ASSERT_TRUE(d.getSize() == 1); d.remove(RegionLockCFDataTrait::Key{nullptr, std::string_view(k2.data(), k2.dataSize())}, true); @@ -489,7 +488,6 @@ try // check the parsed result { auto & lock_info = lock; - ASSERT_TRUE(kvrpcpb::Op::Del == lock_info.getLockType()); ASSERT_TRUE(ori_key == lock_info.key); ASSERT_FALSE(lock_info.isLargeTxn()); From 37bedddb7f21206912e87baf04b26015379ba9e3 Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Fri, 27 Dec 2024 14:40:55 +0800 Subject: [PATCH 11/19] Update dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp Co-authored-by: Lloyd-Pottiger <60744015+Lloyd-Pottiger@users.noreply.github.com> --- dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp b/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp index 02d23458a05..fc460a8a263 100644 --- a/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp +++ b/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp @@ -330,10 +330,7 @@ std::variant Date: Fri, 27 Dec 2024 17:51:04 +0800 Subject: [PATCH 12/19] add tests and reorg Signed-off-by: Calvin Neo --- .../KVStore/Decode/PartitionStreams.cpp | 7 +- .../Storages/KVStore/MultiRaft/RegionData.cpp | 29 +------ .../TiKVHelpers/DecodedLockCFValue.cpp | 76 +++++++++++++------ .../KVStore/TiKVHelpers/DecodedLockCFValue.h | 9 +++ .../Storages/KVStore/tests/gtest_kvstore.cpp | 45 ++++++++++- 5 files changed, 111 insertions(+), 55 deletions(-) diff --git a/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp b/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp index 02d23458a05..8a56cdda29a 100644 --- a/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp +++ b/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp @@ -274,7 +274,6 @@ std::variantcreateCommittedScanner(true, need_data_value); @@ -317,6 +316,7 @@ std::variant ReadRegionCommitCache(const RegionPtr & region, bool lock_region) diff --git a/dbms/src/Storages/KVStore/MultiRaft/RegionData.cpp b/dbms/src/Storages/KVStore/MultiRaft/RegionData.cpp index ccd0ae35633..7831dee1ab0 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RegionData.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/RegionData.cpp @@ -225,35 +225,10 @@ LockInfoPtr RegionData::getLockInfo(const RegionLockReadQuery & query) const std::ignore = tikv_val; const auto & lock_info_raw = *lock_info_ptr; - bool should_continue = false; - lock_info_raw.withInner([&](const RecordKVFormat::DecodedLockCFValue::Inner & in) { - if (in.lock_version > query.read_tso || in.lock_type == kvrpcpb::Op::Lock - || in.lock_type == kvrpcpb::Op::PessimisticLock) - { - should_continue = true; - return; - } - if (in.min_commit_ts > query.read_tso) - { - should_continue = true; - return; - } - if (query.bypass_lock_ts) - { - if (query.bypass_lock_ts->count(in.lock_version)) - { - GET_METRIC(tiflash_raft_read_index_events_count, type_bypass_lock).Increment(); - should_continue = true; - } - } - return; - }); - if (should_continue) + if (auto t = lock_info_raw.getLockInfoPtr(query); t != nullptr) { - continue; + return t; } - - return lock_info_ptr->intoLockInfo(); } return nullptr; diff --git a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp index f731d4c1506..3d69857a2ce 100644 --- a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp +++ b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp @@ -181,28 +181,7 @@ void DecodedLockCFValue::withInner(std::function(data, len)); - } - } - }); + withInner([&](const Inner & in) { in.intoLockInfo(key, res); }); } std::unique_ptr DecodedLockCFValue::intoLockInfo() const @@ -218,5 +197,58 @@ bool DecodedLockCFValue::isLargeTxn() const return inner == nullptr; } +void DecodedLockCFValue::Inner::intoLockInfo(const std::shared_ptr & key, kvrpcpb::LockInfo & res) const +{ + res.set_lock_type(lock_type); + res.set_primary_lock(primary_lock.data(), primary_lock.size()); + res.set_lock_version(lock_version); + res.set_lock_ttl(lock_ttl); + res.set_min_commit_ts(min_commit_ts); + res.set_lock_for_update_ts(lock_for_update_ts); + res.set_txn_size(txn_size); + res.set_use_async_commit(use_async_commit); + res.set_key(decodeTiKVKey(*key)); + res.set_is_txn_file(is_txn_file); + if (use_async_commit) + { + const auto * data = secondaries.data(); + auto len = secondaries.size(); + UInt64 cnt = readVarUInt(data, len); + for (UInt64 i = 0; i < cnt; ++i) + { + res.add_secondaries(readVarString(data, len)); + } + } +} + +void DecodedLockCFValue::Inner::getLockInfoPtr( + const RegionLockReadQuery & query, + const std::shared_ptr & key, + LockInfoPtr & res) const +{ + res = nullptr; + if (lock_version > query.read_tso || lock_type == kvrpcpb::Op::Lock || lock_type == kvrpcpb::Op::PessimisticLock) + return; + if (min_commit_ts > query.read_tso) + return; + if (query.bypass_lock_ts) + { + if (query.bypass_lock_ts->count(lock_version)) + { + GET_METRIC(tiflash_raft_read_index_events_count, type_bypass_lock).Increment(); + return; + } + } + res = std::make_unique(); + intoLockInfo(key, *res); +} + +LockInfoPtr DecodedLockCFValue::getLockInfoPtr(const RegionLockReadQuery & query) const +{ + LockInfoPtr res = nullptr; + withInner([&](const Inner & in) { in.getLockInfoPtr(query, key, res); }); + return res; +} + } // namespace RecordKVFormat } // namespace DB diff --git a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h index cc594667208..42a35fa42df 100644 --- a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h +++ b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h @@ -17,6 +17,7 @@ #include #include #include +#include namespace DB::RecordKVFormat { @@ -36,12 +37,20 @@ struct DecodedLockCFValue : boost::noncopyable kvrpcpb::Op lock_type{kvrpcpb::Op_MIN}; bool use_async_commit{false}; bool is_txn_file{false}; + /// Set `res` if the `query` could be blocked by this lock. Otherwise `set` res to nullptr. + void getLockInfoPtr( + const RegionLockReadQuery & query, + const std::shared_ptr & key, + LockInfoPtr & res) const; + void intoLockInfo(const std::shared_ptr & key, kvrpcpb::LockInfo &) const; }; DecodedLockCFValue(std::shared_ptr key_, std::shared_ptr val_); std::unique_ptr intoLockInfo() const; void intoLockInfo(kvrpcpb::LockInfo &) const; bool isLargeTxn() const; void withInner(std::function f) const; + /// Return LockInfoPtr if the `query` could be blocked by this lock. Otherwise return nullptr. + LockInfoPtr getLockInfoPtr(const RegionLockReadQuery & query) const; std::shared_ptr key; std::shared_ptr val; diff --git a/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp b/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp index 60f1ccd13bd..53d6e7a0e46 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp @@ -643,7 +643,7 @@ TEST_F(RegionKVStoreOldTest, RegionReadWrite) region->insert( "lock", RecordKVFormat::genKey(table_id, 3), - RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 3, 20)); + RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 3, 20, nullptr, 5)); region->insert("default", RecordKVFormat::genKey(table_id, 3, 5), TiKVValue("value1")); region->insert( "write", @@ -658,6 +658,25 @@ TEST_F(RegionKVStoreOldTest, RegionReadWrite) ASSERT_NE(lock, nullptr); ASSERT_EQ(lock->lock_version(), 3); } + { + // There is a lock, and could be bypassed. + std::unordered_set bypass = {3}; + auto iter = region->createCommittedScanner(true, true); + auto lock = iter.getLockInfo({100, &bypass}); + ASSERT_EQ(lock, nullptr); + } + { + // There is no lock. + auto iter = region->createCommittedScanner(true, true); + auto lock = iter.getLockInfo({2, nullptr}); + ASSERT_EQ(lock, nullptr); + } + { + // The read ts is smaller than min_commit_ts, so this txn is not visible. + auto iter = region->createCommittedScanner(true, true); + auto lock = iter.getLockInfo({4, nullptr}); + ASSERT_EQ(lock, nullptr); + } { // The record is committed since there is a write record. std::optional data_list_read = ReadRegionCommitCache(region, true); @@ -674,6 +693,30 @@ TEST_F(RegionKVStoreOldTest, RegionReadWrite) } region->clearAllData(); } + { + region->insert( + "lock", + RecordKVFormat::genKey(table_id, 3), + RecordKVFormat::encodeLockCfValue(RecordKVFormat::LockType::Lock, "PK", 3, 20, nullptr, 5)); + { + auto iter = region->createCommittedScanner(true, true); + auto lock = iter.getLockInfo({100, nullptr}); + ASSERT_EQ(lock, nullptr); + } + region->clearAllData(); + } + { + region->insert( + "lock", + RecordKVFormat::genKey(table_id, 3), + RecordKVFormat::encodeLockCfValue(RecordKVFormat::LockType::Pessimistic, "PK", 3, 20, nullptr, 5)); + { + auto iter = region->createCommittedScanner(true, true); + auto lock = iter.getLockInfo({100, nullptr}); + ASSERT_EQ(lock, nullptr); + } + region->clearAllData(); + } { // Test duplicate and tryCompactionFilter region->insert( From 132211c709ad61cf3ffd7a3275c9ba578189a4b5 Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Fri, 27 Dec 2024 18:06:40 +0800 Subject: [PATCH 13/19] Update dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp Co-authored-by: Lloyd-Pottiger <60744015+Lloyd-Pottiger@users.noreply.github.com> --- dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp b/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp index 8a56cdda29a..74eb2330606 100644 --- a/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp +++ b/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp @@ -331,8 +331,7 @@ std::variant ReadRegionCommitCache(const RegionPtr & region, bool lock_region) From 0539605f953311afb52cc699fa3e53be49083b3b Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Fri, 27 Dec 2024 18:09:17 +0800 Subject: [PATCH 14/19] short Signed-off-by: Calvin Neo --- .../KVStore/Decode/PartitionStreams.cpp | 30 +++++++++---------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp b/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp index 74eb2330606..08d6f69089e 100644 --- a/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp +++ b/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp @@ -312,26 +312,24 @@ std::variant ReadRegionCommitCache(const RegionPtr & region, bool lock_region) From b41f8a0c8adce480229205dcc4d593352ac93656 Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Sat, 28 Dec 2024 01:33:57 +0800 Subject: [PATCH 15/19] Update dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp Co-authored-by: JaySon --- dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp b/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp index 08d6f69089e..8d3449026ee 100644 --- a/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp +++ b/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp @@ -314,7 +314,7 @@ std::variant Date: Sat, 28 Dec 2024 22:07:05 +0800 Subject: [PATCH 16/19] format Signed-off-by: Calvin Neo --- dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp b/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp index 08d6f69089e..0fed7f70a93 100644 --- a/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp +++ b/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp @@ -312,8 +312,9 @@ std::variant Date: Mon, 30 Dec 2024 10:31:03 +0800 Subject: [PATCH 17/19] fix clang tidy Signed-off-by: Calvin Neo --- dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp | 2 +- dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp index 3d69857a2ce..3bdb872f345 100644 --- a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp +++ b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp @@ -24,7 +24,7 @@ namespace RecordKVFormat // https://github.com/tikv/tikv/blob/master/components/txn_types/src/lock.rs [[nodiscard]] std::unique_ptr DecodedLockCFValue::decodeLockCfValue( - const DecodedLockCFValue & decoded) const + const DecodedLockCFValue & decoded) { auto inner = std::make_unique(); auto & res = *inner; diff --git a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h index 42a35fa42df..6042062a68f 100644 --- a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h +++ b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h @@ -56,7 +56,7 @@ struct DecodedLockCFValue : boost::noncopyable std::shared_ptr val; private: - std::unique_ptr decodeLockCfValue(const DecodedLockCFValue & res) const; + static std::unique_ptr decodeLockCfValue(const DecodedLockCFValue & decoded); // Avoid using shared_ptr to reduce space. std::unique_ptr inner{nullptr}; }; From f455012a998e2b570042a83ef2b1ba7fabe6b6c8 Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Mon, 30 Dec 2024 11:46:21 +0800 Subject: [PATCH 18/19] Update dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h Co-authored-by: jinhelin --- dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h index 6042062a68f..61991bd8613 100644 --- a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h +++ b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h @@ -48,7 +48,7 @@ struct DecodedLockCFValue : boost::noncopyable std::unique_ptr intoLockInfo() const; void intoLockInfo(kvrpcpb::LockInfo &) const; bool isLargeTxn() const; - void withInner(std::function f) const; + void withInner(std::function && f) const; /// Return LockInfoPtr if the `query` could be blocked by this lock. Otherwise return nullptr. LockInfoPtr getLockInfoPtr(const RegionLockReadQuery & query) const; From 917ca1cb6be43ffa809ab69797b1d452a076407f Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Mon, 30 Dec 2024 11:47:26 +0800 Subject: [PATCH 19/19] Update dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp Co-authored-by: jinhelin --- dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp index 3bdb872f345..fdb10662b65 100644 --- a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp +++ b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp @@ -168,7 +168,7 @@ DecodedLockCFValue::DecodedLockCFValue(std::shared_ptr key_, std: } } -void DecodedLockCFValue::withInner(std::function f) const +void DecodedLockCFValue::withInner(std::function && f) const { if likely (inner != nullptr) {