diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 29a9c81001c..e44b05bbe2e 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -459,10 +459,13 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva "Total number of keys processed in some types of Raft commands", \ Counter, \ F(type_write_put, {"type", "write_put"}), \ + F(type_write_remove, {"type", "write_remove"}), \ F(type_lock_put, {"type", "lock_put"}), \ F(type_default_put, {"type", "default_put"}), \ F(type_write_del, {"type", "write_del"}), \ F(type_lock_del, {"type", "lock_del"}), \ + F(type_pessimistic_lock_put, {"type", "pessimistic_lock_put"}), \ + F(type_lock_replaced, {"type", "lock_replaced"}), \ F(type_default_del, {"type", "default_del"}), \ F(type_apply_snapshot, {"type", "apply_snapshot"}), \ F(type_apply_snapshot_default, {"type", "apply_snapshot_default"}), \ diff --git a/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp b/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp index 44b225d45b4..9f55148dd75 100644 --- a/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp +++ b/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp @@ -273,68 +273,62 @@ std::variantcreateCommittedScanner(true, need_data_value); + LockInfoPtr lock_info; - /// Some sanity checks for region meta. - { - /** - * special check: when source region is merging, read_index can not guarantee the behavior about target region. - * Reject all read request for safety. - * Only when region is Normal can continue read process. - */ - if (region->peerState() != raft_serverpb::PeerState::Normal) - return RegionException::RegionReadStatus::NOT_FOUND; - - const auto & meta_snap = region->dumpRegionMetaSnapshot(); - // No need to check conf_version if its peer state is normal - std::ignore = conf_version; - if (meta_snap.ver != region_version) - return RegionException::RegionReadStatus::EPOCH_NOT_MATCH; - - // todo check table id - TableID mapped_table_id; - if (!computeMappedTableID(*meta_snap.range->rawKeys().first, mapped_table_id) - || mapped_table_id != table_id) - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "Should not happen, region not belong to table, table_id={} expect_table_id={}", - mapped_table_id, - table_id); - } + auto scanner = region->createCommittedScanner(true, need_data_value); - /// Deal with locks. - if (resolve_locks) - { - /// Check if there are any lock should be resolved, if so, throw LockException. - lock_value - = scanner.getLockInfo(RegionLockReadQuery{.read_tso = start_ts, .bypass_lock_ts = bypass_lock_ts}); - } + /// Some sanity checks for region meta. + { + /** + * special check: when source region is merging, read_index can not guarantee the behavior about target region. + * Reject all read request for safety. + * Only when region is Normal can continue read process. + */ + if (region->peerState() != raft_serverpb::PeerState::Normal) + return RegionException::RegionReadStatus::NOT_FOUND; + + const auto & meta_snap = region->dumpRegionMetaSnapshot(); + // No need to check conf_version if its peer state is normal + std::ignore = conf_version; + if (meta_snap.ver != region_version) + return RegionException::RegionReadStatus::EPOCH_NOT_MATCH; + + // todo check table id + TableID mapped_table_id; + if (!computeMappedTableID(*meta_snap.range->rawKeys().first, mapped_table_id) || mapped_table_id != table_id) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Should not happen, region not belong to table, table_id={} expect_table_id={}", + mapped_table_id, + table_id); + } - /// If there is no lock, leave scope of region scanner and raise LockException. - /// Read raw KVs from region cache. - if (!lock_value) - { - // Shortcut for empty region. - if (!scanner.hasNext()) - return data_list_read; - - // If worked with raftstore v2, the final size may not equal to here. - data_list_read.reserve(scanner.writeMapSize()); - - // Tiny optimization for queries that need only handle, tso, delmark. - do - { - data_list_read.emplace_back(scanner.next()); - } while (scanner.hasNext()); - } + /// Deal with locks. + if (resolve_locks) + { + /// Check if there are any lock should be resolved, if so, throw LockException. + /// It will iterate all locks with in the time range. + lock_info = scanner.getLockInfo(RegionLockReadQuery{.read_tso = start_ts, .bypass_lock_ts = bypass_lock_ts}); } - if (lock_value) - return lock_value->intoLockInfo(); + if (lock_info) + return lock_info; + + /// If there is no lock, leave scope of region scanner and raise LockException. + /// Read raw KVs from region cache. + RegionDataReadInfoList data_list_read; + // Shortcut for empty region. + if (!scanner.hasNext()) + return data_list_read; + + // If worked with raftstore v2, the final size may not equal to here. + data_list_read.reserve(scanner.writeMapSize()); + // Tiny optimization for queries that need only handle, tso, delmark. + do + { + data_list_read.emplace_back(scanner.next()); + } while (scanner.hasNext()); return data_list_read; } @@ -388,12 +382,16 @@ void RemoveRegionCommitCache(const RegionPtr & region, const RegionDataReadInfoL { /// Remove data in region. auto remover = region->createCommittedRemover(lock_region); + size_t remove_committed_count = 0; for (const auto & [handle, write_type, commit_ts, value] : data_list_read) { std::ignore = write_type; std::ignore = value; - remover.remove({handle, commit_ts}); + // Effectively calls `RegionData::removeDataByWriteIt`. + if (remover.remove({handle, commit_ts})) + remove_committed_count++; } + GET_METRIC(tiflash_raft_process_keys, type_write_remove).Increment(remove_committed_count); } // ParseTS parses the ts to (physical,logical). diff --git a/dbms/src/Storages/KVStore/MultiRaft/Persistence.cpp b/dbms/src/Storages/KVStore/MultiRaft/Persistence.cpp index b34d863d88e..79884e34beb 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/Persistence.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/Persistence.cpp @@ -191,6 +191,7 @@ bool KVStore::canFlushRegionDataImpl( const auto current_applied_gap = index > last_compact_log_applied ? index - last_compact_log_applied : 0; // TODO We will use truncated_index once Proxy/TiKV supports. + // They are always 0 currently. // When a Region is newly created in TiFlash, last_compact_log_applied is 0, we don't trigger immediately. if (last_compact_log_applied == 0) { @@ -205,12 +206,15 @@ bool KVStore::canFlushRegionDataImpl( LOG_DEBUG( log, - "{} approx mem cache info: rows {}, bytes {}, gap {}/{}", + "{} approx mem cache info: rows {}, bytes {}, gap {}/{}, data_summary: {}, applied_(index): {}/{}", curr_region.toString(false), rows, size_bytes, current_applied_gap, - gap_threshold); + gap_threshold, + curr_region.getData().summary(), + curr_region.appliedIndex(), + index); if (can_flush && flush_if_possible) { @@ -265,4 +269,4 @@ bool KVStore::forceFlushRegionDataImpl( GET_METRIC(tiflash_raft_apply_write_command_duration_seconds, type_flush_region).Observe(watch.elapsedSeconds()); return true; } -} // namespace DB \ No newline at end of file +} // namespace DB diff --git a/dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp b/dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp index c8b9fd89c86..a765e1f0066 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp @@ -434,7 +434,7 @@ std::pair Region::handleWriteRaftCmd( }; const auto handle_write_cmd_func = [&]() { - size_t cmd_write_cf_cnt = 0, cache_written_size = 0; + size_t cmd_write_cf_cnt = 0; auto ori_cache_size = dataSize(); for (UInt64 i = 0; i < cmds.len; ++i) { @@ -452,9 +452,12 @@ std::pair Region::handleWriteRaftCmd( handle_by_index_func(i); } } - cache_written_size = dataSize() - ori_cache_size; approx_mem_cache_rows += cmd_write_cf_cnt; - approx_mem_cache_bytes += cache_written_size; + auto current_size = dataSize(); + if (auto t = approx_mem_cache_rows + current_size; t > ori_cache_size) + approx_mem_cache_rows = t - ori_cache_size; + else + approx_mem_cache_rows = 0; }; DM::WriteResult write_result = std::nullopt; @@ -523,4 +526,4 @@ RegionRaftCommandDelegate & Region::makeRaftCommandDelegate(const KVStoreTaskLoc std::ignore = lock; return static_cast(*this); } -} // namespace DB \ No newline at end of file +} // namespace DB diff --git a/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.cpp b/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.cpp index 94de1762a41..38778e5ac17 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.cpp @@ -43,8 +43,8 @@ const TiKVValue & RegionCFDataBase::getTiKVValue(const Value & val) template RegionDataRes RegionCFDataBase::insert(TiKVKey && key, TiKVValue && value, DupCheck mode) { - const auto & raw_key = RecordKVFormat::decodeTiKVKey(key); - auto kv_pair = Trait::genKVPair(std::move(key), raw_key, std::move(value)); + auto raw_key = RecordKVFormat::decodeTiKVKey(key); + auto kv_pair = Trait::genKVPair(std::move(key), std::move(raw_key), std::move(value)); if (!kv_pair) return 0; @@ -63,18 +63,19 @@ RegionDataRes RegionCFDataBase::insert(TiKVKey && key, Ti auto iter = data.find(kv_pair.first); if (iter != data.end()) { + // Could be a perssimistic lock is overwritten, or a old generation large txn lock is overwritten. added_size -= calcTiKVKeyValueSize(iter->second); data.erase(iter); + + // In most cases, an optimistic lock replace a pessimistic lock. + GET_METRIC(tiflash_raft_process_keys, type_lock_replaced).Increment(1); } - else + if unlikely (is_large_txn) { - if unlikely (is_large_txn) - { - GET_METRIC(tiflash_raft_process_keys, type_large_txn_lock_put).Increment(1); - } + GET_METRIC(tiflash_raft_process_keys, type_large_txn_lock_put).Increment(1); } } - // according to the process of pessimistic lock, just overwrite. + // According to the process of pessimistic lock, just overwrite. data.emplace(std::move(kv_pair.first), std::move(kv_pair.second)); return added_size; } diff --git a/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.h b/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.h index 0363bbf6a76..a163ababbd3 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.h +++ b/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include diff --git a/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataTrait.h b/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataTrait.h index 8206abae492..0b57649622d 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataTrait.h +++ b/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataTrait.h @@ -110,9 +110,8 @@ struct RegionLockCFDataTrait { auto key = std::make_shared(std::move(key_)); auto value = std::make_shared(std::move(value_)); - return { - {key, std::string_view(key->data(), key->dataSize())}, - Value{key, value, std::make_shared(key, value)}}; + auto lo = std::make_shared(key, value); + return {{key, std::string_view(key->data(), key->dataSize())}, Value{key, value, std::move(lo)}}; } }; diff --git a/dbms/src/Storages/KVStore/MultiRaft/RegionData.cpp b/dbms/src/Storages/KVStore/MultiRaft/RegionData.cpp index 66acb0bac07..7831dee1ab0 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RegionData.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/RegionData.cpp @@ -18,6 +18,7 @@ #include #include #include +#include namespace DB { @@ -213,7 +214,7 @@ std::optional RegionData::readDataByWriteIt( return RegionDataReadInfo{pk, decoded_val.write_type, ts, decoded_val.short_value}; } -DecodedLockCFValuePtr RegionData::getLockInfo(const RegionLockReadQuery & query) const +LockInfoPtr RegionData::getLockInfo(const RegionLockReadQuery & query) const { for (const auto & [pk, value] : lock_cf.getData()) { @@ -222,22 +223,12 @@ DecodedLockCFValuePtr RegionData::getLockInfo(const RegionLockReadQuery & query) const auto & [tikv_key, tikv_val, lock_info_ptr] = value; std::ignore = tikv_key; std::ignore = tikv_val; - const auto & lock_info = *lock_info_ptr; - - if (lock_info.lock_version > query.read_tso || lock_info.lock_type == kvrpcpb::Op::Lock - || lock_info.lock_type == kvrpcpb::Op::PessimisticLock) - continue; - if (lock_info.min_commit_ts > query.read_tso) - continue; - if (query.bypass_lock_ts) + const auto & lock_info_raw = *lock_info_ptr; + + if (auto t = lock_info_raw.getLockInfoPtr(query); t != nullptr) { - if (query.bypass_lock_ts->count(lock_info.lock_version)) - { - GET_METRIC(tiflash_raft_read_index_events_count, type_bypass_lock).Increment(); - continue; - } + return t; } - return lock_info_ptr; } return nullptr; @@ -367,4 +358,9 @@ RegionData & RegionData::operator=(RegionData && rhs) return *this; } +String RegionData::summary() const +{ + return fmt::format("write:{},lock:{},default:{}", write_cf.getSize(), lock_cf.getSize(), default_cf.getSize()); +} + } // namespace DB diff --git a/dbms/src/Storages/KVStore/MultiRaft/RegionData.h b/dbms/src/Storages/KVStore/MultiRaft/RegionData.h index 32f70d5d3f4..a066e149851 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RegionData.h +++ b/dbms/src/Storages/KVStore/MultiRaft/RegionData.h @@ -36,6 +36,7 @@ class RegionData public: using WriteCFIter = RegionWriteCFData::Map::iterator; using ConstWriteCFIter = RegionWriteCFData::Map::const_iterator; + using LockInfoPtr = std::unique_ptr; static void reportAlloc(size_t delta); static void reportDealloc(size_t delta); @@ -53,7 +54,7 @@ class RegionData UInt64 applied, bool hard_error); - DecodedLockCFValuePtr getLockInfo(const RegionLockReadQuery & query) const; + LockInfoPtr getLockInfo(const RegionLockReadQuery & query) const; std::shared_ptr getLockByKey(const TiKVKey & key) const; @@ -84,6 +85,7 @@ class RegionData RegionData(RegionData && data); RegionData & operator=(RegionData &&); + String summary() const; struct OrphanKeysInfo { // Protected by region task lock. @@ -127,7 +129,7 @@ class RegionData RegionLockCFData lock_cf; OrphanKeysInfo orphan_keys_info; - // Size of data cf & write cf, without lock cf. + // Size of data cf & write cf, with lock cf. std::atomic cf_data_size = 0; }; diff --git a/dbms/src/Storages/KVStore/Read/ReadIndex.cpp b/dbms/src/Storages/KVStore/Read/ReadIndex.cpp index dd9721e5a1b..8b0918228ed 100644 --- a/dbms/src/Storages/KVStore/Read/ReadIndex.cpp +++ b/dbms/src/Storages/KVStore/Read/ReadIndex.cpp @@ -183,10 +183,11 @@ void WaitCheckRegionReadyImpl( need_retry = false; LOG_DEBUG( log, - "neglect error, region_id={} not_found={} epoch_not_match={}", + "neglect error, region_id={} not_found={} epoch_not_match={} region_error={}", region_id, region_error.has_region_not_found(), - region_error.has_epoch_not_match()); + region_error.has_epoch_not_match(), + region_error.DebugString()); } if (!need_retry) { diff --git a/dbms/src/Storages/KVStore/Region.cpp b/dbms/src/Storages/KVStore/Region.cpp index 8710dd01729..42d6d885885 100644 --- a/dbms/src/Storages/KVStore/Region.cpp +++ b/dbms/src/Storages/KVStore/Region.cpp @@ -51,7 +51,7 @@ std::optional Region::readDataByWriteIt( } } -DecodedLockCFValuePtr Region::getLockInfo(const RegionLockReadQuery & query) const +LockInfoPtr Region::getLockInfo(const RegionLockReadQuery & query) const { return data.getLockInfo(query); } diff --git a/dbms/src/Storages/KVStore/Region.h b/dbms/src/Storages/KVStore/Region.h index f5afabc6d8a..db766a2c413 100644 --- a/dbms/src/Storages/KVStore/Region.h +++ b/dbms/src/Storages/KVStore/Region.h @@ -80,7 +80,7 @@ class Region : public std::enable_shared_from_this bool hasNext(); RegionDataReadInfo next(); - DecodedLockCFValuePtr getLockInfo(const RegionLockReadQuery & query) { return region->getLockInfo(query); } + RegionData::LockInfoPtr getLockInfo(const RegionLockReadQuery & query) { return region->getLockInfo(query); } size_t writeMapSize() const { return write_map_size; } @@ -111,11 +111,15 @@ class Region : public std::enable_shared_from_this lock = std::unique_lock(region_->mutex); } - void remove(const RegionWriteCFData::Key & key) + bool remove(const RegionWriteCFData::Key & key) { auto & write_cf_data = region->data.writeCF().getDataMut(); if (auto it = write_cf_data.find(key); it != write_cf_data.end()) + { region->removeDataByWriteIt(it); + return true; + } + return false; } private: @@ -150,6 +154,7 @@ class Region : public std::enable_shared_from_this RegionMeta & mutMeta() { return meta; } const RegionMeta & getMeta() const { return meta; } + const RegionData & getData() const { return data; } bool isPendingRemove() const; void setPendingRemove(); @@ -284,7 +289,7 @@ class Region : public std::enable_shared_from_this bool hard_error); RegionData::WriteCFIter removeDataByWriteIt(const RegionData::WriteCFIter & write_it); - DecodedLockCFValuePtr getLockInfo(const RegionLockReadQuery & query) const; + RegionData::LockInfoPtr getLockInfo(const RegionLockReadQuery & query) const; RegionPtr splitInto(RegionMeta && meta); void setPeerState(raft_serverpb::PeerState state); diff --git a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp index f90fdf61299..fdb10662b65 100644 --- a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp +++ b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp @@ -23,9 +23,12 @@ namespace RecordKVFormat { // https://github.com/tikv/tikv/blob/master/components/txn_types/src/lock.rs -inline void decodeLockCfValue(DecodedLockCFValue & res) +[[nodiscard]] std::unique_ptr DecodedLockCFValue::decodeLockCfValue( + const DecodedLockCFValue & decoded) { - const TiKVValue & value = *res.val; + auto inner = std::make_unique(); + auto & res = *inner; + const TiKVValue & value = *decoded.val; const char * data = value.data(); size_t len = value.dataSize(); @@ -146,16 +149,55 @@ inline void decodeLockCfValue(DecodedLockCFValue & res) } if (len != 0) throw Exception("invalid lock value " + value.toDebugString(), ErrorCodes::LOGICAL_ERROR); + return inner; } DecodedLockCFValue::DecodedLockCFValue(std::shared_ptr key_, std::shared_ptr val_) : key(std::move(key_)) , val(std::move(val_)) { - decodeLockCfValue(*this); + auto parsed = decodeLockCfValue(*this); + if (parsed->lock_type == kvrpcpb::Op::PessimisticLock) + { + GET_METRIC(tiflash_raft_process_keys, type_pessimistic_lock_put).Increment(1); + } + if (parsed->generation == 0) + { + // It is not a large txn, we cache the parsed lock. + inner = std::move(parsed); + } +} + +void DecodedLockCFValue::withInner(std::function && f) const +{ + if likely (inner != nullptr) + { + f(*inner); + return; + } + std::unique_ptr in = decodeLockCfValue(*this); + f(*in); } void DecodedLockCFValue::intoLockInfo(kvrpcpb::LockInfo & res) const +{ + withInner([&](const Inner & in) { in.intoLockInfo(key, res); }); +} + +std::unique_ptr DecodedLockCFValue::intoLockInfo() const +{ + auto res = std::make_unique(); + intoLockInfo(*res); + return res; +} + +bool DecodedLockCFValue::isLargeTxn() const +{ + // Because we do not cache the parsed result for large txn. + return inner == nullptr; +} + +void DecodedLockCFValue::Inner::intoLockInfo(const std::shared_ptr & key, kvrpcpb::LockInfo & res) const { res.set_lock_type(lock_type); res.set_primary_lock(primary_lock.data(), primary_lock.size()); @@ -167,7 +209,6 @@ void DecodedLockCFValue::intoLockInfo(kvrpcpb::LockInfo & res) const res.set_use_async_commit(use_async_commit); res.set_key(decodeTiKVKey(*key)); res.set_is_txn_file(is_txn_file); - if (use_async_commit) { const auto * data = secondaries.data(); @@ -180,16 +221,33 @@ void DecodedLockCFValue::intoLockInfo(kvrpcpb::LockInfo & res) const } } -std::unique_ptr DecodedLockCFValue::intoLockInfo() const +void DecodedLockCFValue::Inner::getLockInfoPtr( + const RegionLockReadQuery & query, + const std::shared_ptr & key, + LockInfoPtr & res) const { - auto res = std::make_unique(); - intoLockInfo(*res); - return res; + res = nullptr; + if (lock_version > query.read_tso || lock_type == kvrpcpb::Op::Lock || lock_type == kvrpcpb::Op::PessimisticLock) + return; + if (min_commit_ts > query.read_tso) + return; + if (query.bypass_lock_ts) + { + if (query.bypass_lock_ts->count(lock_version)) + { + GET_METRIC(tiflash_raft_read_index_events_count, type_bypass_lock).Increment(); + return; + } + } + res = std::make_unique(); + intoLockInfo(key, *res); } -bool DecodedLockCFValue::isLargeTxn() const +LockInfoPtr DecodedLockCFValue::getLockInfoPtr(const RegionLockReadQuery & query) const { - return generation > 0; + LockInfoPtr res = nullptr; + withInner([&](const Inner & in) { in.getLockInfoPtr(query, key, res); }); + return res; } } // namespace RecordKVFormat diff --git a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h index 56538e1854d..61991bd8613 100644 --- a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h +++ b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h @@ -17,31 +17,48 @@ #include #include #include +#include namespace DB::RecordKVFormat { struct DecodedLockCFValue : boost::noncopyable { + struct Inner + { + std::string_view secondaries; + std::string_view primary_lock; + UInt64 lock_version{0}; + UInt64 lock_ttl{0}; + UInt64 txn_size{0}; + UInt64 lock_for_update_ts{0}; + UInt64 min_commit_ts{0}; + UInt64 generation{0}; // For large txn, generation is not zero. + kvrpcpb::Op lock_type{kvrpcpb::Op_MIN}; + bool use_async_commit{false}; + bool is_txn_file{false}; + /// Set `res` if the `query` could be blocked by this lock. Otherwise `set` res to nullptr. + void getLockInfoPtr( + const RegionLockReadQuery & query, + const std::shared_ptr & key, + LockInfoPtr & res) const; + void intoLockInfo(const std::shared_ptr & key, kvrpcpb::LockInfo &) const; + }; DecodedLockCFValue(std::shared_ptr key_, std::shared_ptr val_); std::unique_ptr intoLockInfo() const; void intoLockInfo(kvrpcpb::LockInfo &) const; bool isLargeTxn() const; + void withInner(std::function && f) const; + /// Return LockInfoPtr if the `query` could be blocked by this lock. Otherwise return nullptr. + LockInfoPtr getLockInfoPtr(const RegionLockReadQuery & query) const; std::shared_ptr key; std::shared_ptr val; - UInt64 lock_version{0}; - UInt64 lock_ttl{0}; - UInt64 txn_size{0}; - UInt64 lock_for_update_ts{0}; - kvrpcpb::Op lock_type{kvrpcpb::Op_MIN}; - bool use_async_commit{0}; - UInt64 min_commit_ts{0}; - std::string_view secondaries; - std::string_view primary_lock; - bool is_txn_file{0}; - // For large txn, generation is not zero. - UInt64 generation{0}; + +private: + static std::unique_ptr decodeLockCfValue(const DecodedLockCFValue & decoded); + // Avoid using shared_ptr to reduce space. + std::unique_ptr inner{nullptr}; }; } // namespace DB::RecordKVFormat diff --git a/dbms/src/Storages/KVStore/tests/bench_parse_lock.cpp b/dbms/src/Storages/KVStore/tests/bench_parse_lock.cpp new file mode 100644 index 00000000000..63febc20ec1 --- /dev/null +++ b/dbms/src/Storages/KVStore/tests/bench_parse_lock.cpp @@ -0,0 +1,133 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + +#include + +using namespace DB; + +namespace DB::tests +{ +using DB::RecordKVFormat::DecodedLockCFValue; + +DecodedLockCFValue::Inner * decodeLockCfValue(const DecodedLockCFValue & decoded); + +TiKVValue encode_lock_cf_value( + UInt8 lock_type, + const String & primary, + Timestamp ts, + UInt64 ttl, + const String * short_value, + Timestamp min_commit_ts, + Timestamp for_update_ts, + uint64_t txn_size, + const std::vector & async_commit, + const std::vector & rollback, + UInt64 generation = 0) +{ + auto lock_value = RecordKVFormat::encodeLockCfValue(lock_type, primary, ts, ttl, short_value, min_commit_ts); + WriteBufferFromOwnString res; + res.write(lock_value.getStr().data(), lock_value.getStr().size()); + { + res.write(RecordKVFormat::MIN_COMMIT_TS_PREFIX); + RecordKVFormat::encodeUInt64(min_commit_ts, res); + } + { + res.write(RecordKVFormat::FOR_UPDATE_TS_PREFIX); + RecordKVFormat::encodeUInt64(for_update_ts, res); + } + { + res.write(RecordKVFormat::TXN_SIZE_PREFIX); + RecordKVFormat::encodeUInt64(txn_size, res); + } + { + res.write(RecordKVFormat::ROLLBACK_TS_PREFIX); + TiKV::writeVarUInt(rollback.size(), res); + for (auto ts : rollback) + { + RecordKVFormat::encodeUInt64(ts, res); + } + } + { + res.write(RecordKVFormat::ASYNC_COMMIT_PREFIX); + TiKV::writeVarUInt(async_commit.size(), res); + for (const auto & s : async_commit) + { + writeVarInt(s.size(), res); + res.write(s.data(), s.size()); + } + } + { + res.write(RecordKVFormat::LAST_CHANGE_PREFIX); + RecordKVFormat::encodeUInt64(12345678, res); + TiKV::writeVarUInt(87654321, res); + } + { + res.write(RecordKVFormat::TXN_SOURCE_PREFIX_FOR_LOCK); + TiKV::writeVarUInt(876543, res); + } + { + res.write(RecordKVFormat::PESSIMISTIC_LOCK_WITH_CONFLICT_PREFIX); + } + if (generation > 0) + { + // res.write(RecordKVFormat::GENERATION_PREFIX); + // RecordKVFormat::encodeUInt64(generation, res); + } + return TiKVValue(res.releaseStr()); +} + +void parseTest(benchmark::State & state) +{ + try + { + std::string shor_value = "value"; + auto lock_for_update_ts = 7777, txn_size = 1; + const std::vector & async_commit = {"s1", "s2"}; + const std::vector & rollback = {3, 4}; + auto lock_value2 = encode_lock_cf_value( + Region::DelFlag, + "primary key", + 421321, + std::numeric_limits::max(), + &shor_value, + 66666, + lock_for_update_ts, + txn_size, + async_commit, + rollback, + 1111); + + auto ori_key = std::make_shared(RecordKVFormat::genKey(1, 88888)); + for (auto _ : state) + { + auto lock2 = RecordKVFormat::DecodedLockCFValue( + ori_key, + std::make_shared(TiKVValue::copyFrom(lock_value2))); + benchmark::DoNotOptimize(lock2); + } + } + catch (...) + { + tryLogCurrentException(DB::Logger::get(), __PRETTY_FUNCTION__); + } +} + +BENCHMARK(parseTest); + +} // namespace DB::tests diff --git a/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp b/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp index d6e7e073b53..53d6e7a0e46 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp @@ -643,7 +643,7 @@ TEST_F(RegionKVStoreOldTest, RegionReadWrite) region->insert( "lock", RecordKVFormat::genKey(table_id, 3), - RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 3, 20)); + RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 3, 20, nullptr, 5)); region->insert("default", RecordKVFormat::genKey(table_id, 3, 5), TiKVValue("value1")); region->insert( "write", @@ -656,8 +656,26 @@ TEST_F(RegionKVStoreOldTest, RegionReadWrite) auto iter = region->createCommittedScanner(true, true); auto lock = iter.getLockInfo({100, nullptr}); ASSERT_NE(lock, nullptr); - auto k = lock->intoLockInfo(); - ASSERT_EQ(k->lock_version(), 3); + ASSERT_EQ(lock->lock_version(), 3); + } + { + // There is a lock, and could be bypassed. + std::unordered_set bypass = {3}; + auto iter = region->createCommittedScanner(true, true); + auto lock = iter.getLockInfo({100, &bypass}); + ASSERT_EQ(lock, nullptr); + } + { + // There is no lock. + auto iter = region->createCommittedScanner(true, true); + auto lock = iter.getLockInfo({2, nullptr}); + ASSERT_EQ(lock, nullptr); + } + { + // The read ts is smaller than min_commit_ts, so this txn is not visible. + auto iter = region->createCommittedScanner(true, true); + auto lock = iter.getLockInfo({4, nullptr}); + ASSERT_EQ(lock, nullptr); } { // The record is committed since there is a write record. @@ -675,6 +693,30 @@ TEST_F(RegionKVStoreOldTest, RegionReadWrite) } region->clearAllData(); } + { + region->insert( + "lock", + RecordKVFormat::genKey(table_id, 3), + RecordKVFormat::encodeLockCfValue(RecordKVFormat::LockType::Lock, "PK", 3, 20, nullptr, 5)); + { + auto iter = region->createCommittedScanner(true, true); + auto lock = iter.getLockInfo({100, nullptr}); + ASSERT_EQ(lock, nullptr); + } + region->clearAllData(); + } + { + region->insert( + "lock", + RecordKVFormat::genKey(table_id, 3), + RecordKVFormat::encodeLockCfValue(RecordKVFormat::LockType::Pessimistic, "PK", 3, 20, nullptr, 5)); + { + auto iter = region->createCommittedScanner(true, true); + auto lock = iter.getLockInfo({100, nullptr}); + ASSERT_EQ(lock, nullptr); + } + region->clearAllData(); + } { // Test duplicate and tryCompactionFilter region->insert( diff --git a/dbms/src/Storages/KVStore/tests/gtest_tikv_keyvalue.cpp b/dbms/src/Storages/KVStore/tests/gtest_tikv_keyvalue.cpp index 18282797c62..c43dc3adf29 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_tikv_keyvalue.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_tikv_keyvalue.cpp @@ -33,7 +33,8 @@ TiKVValue encode_lock_cf_value( Timestamp for_update_ts, uint64_t txn_size, const std::vector & async_commit, - const std::vector & rollback) + const std::vector & rollback, + UInt64 generation = 0) { auto lock_value = RecordKVFormat::encodeLockCfValue(lock_type, primary, ts, ttl, short_value, min_commit_ts); WriteBufferFromOwnString res; @@ -79,6 +80,11 @@ TiKVValue encode_lock_cf_value( { res.write(RecordKVFormat::PESSIMISTIC_LOCK_WITH_CONFLICT_PREFIX); } + if (generation > 0) + { + res.write(RecordKVFormat::GENERATION_PREFIX); + RecordKVFormat::encodeUInt64(generation, res); + } return TiKVValue(res.releaseStr()); } @@ -149,16 +155,17 @@ TEST(TiKVKeyValueTest, PortedTests) auto lock = RecordKVFormat::DecodedLockCFValue(ori_key, std::make_shared(std::move(lock_value))); { auto & lock_info = lock; - ASSERT_TRUE(kvrpcpb::Op::Del == lock_info.lock_type); - ASSERT_TRUE("primary key" == lock_info.primary_lock); - ASSERT_TRUE(421321 == lock_info.lock_version); - ASSERT_TRUE(std::numeric_limits::max() == lock_info.lock_ttl); - ASSERT_TRUE(66666 == lock_info.min_commit_ts); ASSERT_TRUE(ori_key == lock_info.key); - ASSERT_EQ(lock_for_update_ts, lock_info.lock_for_update_ts); - ASSERT_EQ(txn_size, lock_info.txn_size); - - ASSERT_EQ(true, lock_info.use_async_commit); + lock_info.withInner([&](const auto & in) { + ASSERT_TRUE(kvrpcpb::Op::Del == in.lock_type); + ASSERT_TRUE("primary key" == in.primary_lock); + ASSERT_TRUE(421321 == in.lock_version); + ASSERT_TRUE(std::numeric_limits::max() == in.lock_ttl); + ASSERT_TRUE(66666 == in.min_commit_ts); + ASSERT_EQ(lock_for_update_ts, in.lock_for_update_ts); + ASSERT_EQ(txn_size, in.txn_size); + ASSERT_EQ(true, in.use_async_commit); + }); } { auto lock_info = lock.intoLockInfo(); @@ -208,12 +215,12 @@ TEST(TiKVKeyValueTest, PortedTests) nullptr, 66666)); ASSERT_TRUE(d.getSize() == 2); - ASSERT_TRUE( - std::get<2>(d.getData() - .find(RegionLockCFDataTrait::Key{nullptr, std::string_view(k2.data(), k2.dataSize())}) - ->second) - ->lock_version - == 5678); + + std::get<2>(d.getData() + .find(RegionLockCFDataTrait::Key{nullptr, std::string_view(k2.data(), k2.dataSize())}) + ->second) + ->withInner([&](const auto & in) { ASSERT_EQ(in.lock_version, 5678); }); + d.remove(RegionLockCFDataTrait::Key{nullptr, std::string_view(k1.data(), k1.dataSize())}, true); ASSERT_TRUE(d.getSize() == 1); d.remove(RegionLockCFDataTrait::Key{nullptr, std::string_view(k2.data(), k2.dataSize())}, true); @@ -481,20 +488,40 @@ try // check the parsed result { auto & lock_info = lock; - ASSERT_TRUE(kvrpcpb::Op::Del == lock_info.lock_type); - ASSERT_TRUE("primary key" == lock_info.primary_lock); - ASSERT_TRUE(421321 == lock_info.lock_version); - ASSERT_TRUE(std::numeric_limits::max() == lock_info.lock_ttl); - ASSERT_TRUE(66666 == lock_info.min_commit_ts); ASSERT_TRUE(ori_key == lock_info.key); - ASSERT_EQ(lock_for_update_ts, lock_info.lock_for_update_ts); - ASSERT_EQ(txn_size, lock_info.txn_size); - - ASSERT_EQ(true, lock_info.use_async_commit); + ASSERT_FALSE(lock_info.isLargeTxn()); + + lock_info.withInner([&](const auto & in) { + ASSERT_TRUE(kvrpcpb::Op::Del == in.lock_type); + ASSERT_TRUE("primary key" == in.primary_lock); + ASSERT_TRUE(421321 == in.lock_version); + ASSERT_TRUE(std::numeric_limits::max() == in.lock_ttl); + ASSERT_TRUE(66666 == in.min_commit_ts); + ASSERT_EQ(lock_for_update_ts, in.lock_for_update_ts); + ASSERT_EQ(txn_size, in.txn_size); + ASSERT_EQ(true, in.use_async_commit); + }); } + + auto lock_value2 = encode_lock_cf_value( + Region::DelFlag, + "primary key", + 421321, + std::numeric_limits::max(), + &shor_value, + 66666, + lock_for_update_ts, + txn_size, + async_commit, + rollback, + 1111); + + auto lock2 = RecordKVFormat::DecodedLockCFValue(ori_key, std::make_shared(std::move(lock_value2))); + ASSERT_TRUE(lock2.isLargeTxn()); } CATCH + TEST(TiKVKeyValueTest, Redact) try { diff --git a/dbms/src/Storages/KVStore/tests/region_kvstore_test.h b/dbms/src/Storages/KVStore/tests/region_kvstore_test.h index 5f007a24fc8..3c17243db43 100644 --- a/dbms/src/Storages/KVStore/tests/region_kvstore_test.h +++ b/dbms/src/Storages/KVStore/tests/region_kvstore_test.h @@ -62,7 +62,7 @@ inline void validateSSTGeneration( const LoggerPtr & log_) -> std::unique_ptr { auto parsed_kind = MockSSTGenerator::parseSSTViewKind(buffToStrView(snap.path)); auto reader = std::make_unique(proxy_helper, snap, range, log_); - assert(reader->sstFormatKind() == parsed_kind); + RUNTIME_CHECK(reader->sstFormatKind() == parsed_kind); return reader; }; MultiSSTReader reader{