Skip to content

Commit

Permalink
pick #1250 to tidb-7.5 (#1325)
Browse files Browse the repository at this point in the history
Signed-off-by: zyguan <[email protected]>
  • Loading branch information
zyguan authored Apr 29, 2024
1 parent 2de9b7d commit 6aedd99
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 44 deletions.
2 changes: 1 addition & 1 deletion integration_tests/pd_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (s *apiTestSuite) TestInitClusterMinResolvedTSZero() {
retryCount++
}
// Make sure the store's min resolved ts is not initialized.
require.Equal(uint64(math.MaxUint64), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.Equal(uint64(0), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS"))

// Try to get the minimum resolved timestamp of the cluster from PD.
Expand Down
9 changes: 9 additions & 0 deletions internal/logutil/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ package logutil

import (
"context"
"testing"

"github.com/pingcap/log"
"go.uber.org/zap"
Expand All @@ -60,3 +61,11 @@ type ctxLogKeyType struct{}
// CtxLogKey is the key to retrieve logger from context.
// It can be assigned to another value.
var CtxLogKey interface{} = ctxLogKeyType{}

// AssertWarn panics when in testing mode, and logs a warning msg otherwise.
func AssertWarn(logger *zap.Logger, msg string, fields ...zap.Field) {
if testing.Testing() {
logger.Panic(msg, fields...)
}
logger.Warn(msg, fields...)
}
64 changes: 52 additions & 12 deletions tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,21 @@ type KVStore struct {
wg sync.WaitGroup
close atomicutil.Bool
gP Pool

testingKnobs struct {
mockGetMinResolvedTSByStoresIDs atomic.Pointer[func(ctx context.Context, ids []string) (uint64, map[uint64]uint64, error)]
}
}

func (s *KVStore) setGetMinResolvedTSByStoresIDs(f func(ctx context.Context, ids []string) (uint64, map[uint64]uint64, error)) {
s.testingKnobs.mockGetMinResolvedTSByStoresIDs.Store(&f)
}

func (s *KVStore) getMinResolvedTSByStoresIDs(ctx context.Context, ids []string) (uint64, map[uint64]uint64, error) {
if f := s.testingKnobs.mockGetMinResolvedTSByStoresIDs.Load(); f != nil {
return (*f)(ctx, ids)
}
return s.pdHttpClient.GetMinResolvedTSByStoresIDs(ctx, ids)
}

// Go run the function in a separate goroutine.
Expand Down Expand Up @@ -497,6 +512,15 @@ func (s *KVStore) GetMinSafeTS(txnScope string) uint64 {
return 0
}

func (s *KVStore) setMinSafeTS(txnScope string, safeTS uint64) {
// ensure safeTS is not set to max uint64
if safeTS == math.MaxUint64 {
logutil.AssertWarn(logutil.BgLogger(), "skip setting min-safe-ts to max uint64", zap.String("txnScope", txnScope), zap.Stack("stack"))
return
}
s.minSafeTS.Store(txnScope, safeTS)
}

// Ctx returns ctx.
func (s *KVStore) Ctx() context.Context {
return s.ctx
Expand Down Expand Up @@ -532,14 +556,22 @@ func (s *KVStore) getSafeTS(storeID uint64) (bool, uint64) {

// setSafeTS sets safeTs for store storeID, export for testing
func (s *KVStore) setSafeTS(storeID, safeTS uint64) {
// ensure safeTS is not set to max uint64
if safeTS == math.MaxUint64 {
logutil.AssertWarn(logutil.BgLogger(), "skip setting safe-ts to max uint64", zap.Uint64("storeID", storeID), zap.Stack("stack"))
return
}
s.safeTSMap.Store(storeID, safeTS)
}

func (s *KVStore) updateMinSafeTS(txnScope string, storeIDs []uint64) {
minSafeTS := uint64(math.MaxUint64)
// when there is no store, return 0 in order to let minStartTS become startTS directly
// actually storeIDs won't be empty since updateMinSafeTS is only called by updateSafeTS and updateSafeTS builds
// txnScopeMap with non-empty values. here we check it to make the logic more robust.
if len(storeIDs) < 1 {
s.minSafeTS.Store(txnScope, 0)
s.setMinSafeTS(txnScope, 0)
return
}
for _, store := range storeIDs {
ok, safeTS := s.getSafeTS(store)
Expand All @@ -551,7 +583,11 @@ func (s *KVStore) updateMinSafeTS(txnScope string, storeIDs []uint64) {
minSafeTS = 0
}
}
s.minSafeTS.Store(txnScope, minSafeTS)
// if minSafeTS is still math.MaxUint64, that means all store safe ts are 0, then we set minSafeTS to 0.
if minSafeTS == math.MaxUint64 {
minSafeTS = 0
}
s.setMinSafeTS(txnScope, minSafeTS)
}

func (s *KVStore) safeTSUpdater() {
Expand Down Expand Up @@ -591,11 +627,11 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
storeMinResolvedTSs map[uint64]uint64
)
storeIDs := make([]string, len(stores))
if s.pdHttpClient != nil {
if s.pdHttpClient != nil || s.testingKnobs.mockGetMinResolvedTSByStoresIDs.Load() != nil {
for i, store := range stores {
storeIDs[i] = strconv.FormatUint(store.StoreID(), 10)
}
_, storeMinResolvedTSs, err = s.pdHttpClient.GetMinResolvedTSByStoresIDs(ctx, storeIDs)
_, storeMinResolvedTSs, err = s.getMinResolvedTSByStoresIDs(ctx, storeIDs)
if err != nil {
// If getting the minimum resolved timestamp from PD failed, log the error and need to get it from TiKV.
logutil.BgLogger().Debug("get resolved TS from PD failed", zap.Error(err), zap.Any("stores", storeIDs))
Expand All @@ -612,8 +648,8 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
defer wg.Done()

var safeTS uint64
// If getting the minimum resolved timestamp from PD failed or returned 0, try to get it from TiKV.
if storeMinResolvedTSs == nil || storeMinResolvedTSs[storeID] == 0 || err != nil {
// If getting the minimum resolved timestamp from PD failed or returned 0/MaxUint64, try to get it from TiKV.
if storeMinResolvedTSs == nil || !isValidSafeTS(storeMinResolvedTSs[storeID]) || err != nil {
resp, err := tikvClient.SendRequest(
ctx, storeAddr, tikvrpc.NewRequest(
tikvrpc.CmdStoreSafeTS, &kvrpcpb.StoreSafeTSRequest{
Expand Down Expand Up @@ -675,21 +711,21 @@ var (
func (s *KVStore) updateGlobalTxnScopeTSFromPD(ctx context.Context) bool {
isGlobal := config.GetTxnScopeFromConfig() == oracle.GlobalTxnScope
// Try to get the minimum resolved timestamp of the cluster from PD.
if s.pdHttpClient != nil && isGlobal {
clusterMinSafeTS, _, err := s.pdHttpClient.GetMinResolvedTSByStoresIDs(ctx, nil)
if (s.pdHttpClient != nil || s.testingKnobs.mockGetMinResolvedTSByStoresIDs.Load() != nil) && isGlobal {
clusterMinSafeTS, _, err := s.getMinResolvedTSByStoresIDs(ctx, nil)
if err != nil {
logutil.BgLogger().Debug("get resolved TS from PD failed", zap.Error(err))
} else if clusterMinSafeTS != 0 {
} else if isValidSafeTS(clusterMinSafeTS) {
// Update ts and metrics.
preClusterMinSafeTS := s.GetMinSafeTS(oracle.GlobalTxnScope)
// If preClusterMinSafeTS is maxUint64, it means that the min safe ts has not been initialized.
// preClusterMinSafeTS is guaranteed to be less than math.MaxUint64 (by this method and setMinSafeTS)
// related to https://github.com/tikv/client-go/issues/991
if preClusterMinSafeTS != math.MaxUint64 && preClusterMinSafeTS > clusterMinSafeTS {
if preClusterMinSafeTS > clusterMinSafeTS {
skipSafeTSUpdateCounter.Inc()
preSafeTSTime := oracle.GetTimeFromTS(preClusterMinSafeTS)
clusterMinSafeTSGap.Set(time.Since(preSafeTSTime).Seconds())
} else {
s.minSafeTS.Store(oracle.GlobalTxnScope, clusterMinSafeTS)
s.setMinSafeTS(oracle.GlobalTxnScope, clusterMinSafeTS)
successSafeTSUpdateCounter.Inc()
safeTSTime := oracle.GetTimeFromTS(clusterMinSafeTS)
clusterMinSafeTSGap.Set(time.Since(safeTSTime).Seconds())
Expand All @@ -701,6 +737,10 @@ func (s *KVStore) updateGlobalTxnScopeTSFromPD(ctx context.Context) bool {
return false
}

func isValidSafeTS(ts uint64) bool {
return ts != 0 && ts != math.MaxUint64
}

// EnableResourceControl enables the resource control.
func EnableResourceControl() {
client.ResourceControlSwitch.Store(true)
Expand Down
Loading

0 comments on commit 6aedd99

Please sign in to comment.