Skip to content

Commit

Permalink
refine test
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp committed Oct 8, 2023
1 parent aa81be7 commit b5f94e5
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 63 deletions.
86 changes: 38 additions & 48 deletions integration_tests/pd_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// NOTE: The code in this file is based on code from the
// TiDB project, licensed under the Apache License v 2.0
//
// https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/tests/prewrite_test.go
//

// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tikv_test

import (
Expand Down Expand Up @@ -86,16 +66,28 @@ func (s *apiTestSuite) storeAddr(id uint64) string {
type storeSafeTsMockClient struct {
tikv.Client
requestCount int32
kvSafeTS uint64
}

func newStoreSafeTsMockClient(client tikv.Client) storeSafeTsMockClient {
return storeSafeTsMockClient{
Client: client,
kvSafeTS: 150, // Set a default value.
}
}

func (c *storeSafeTsMockClient) SetKVSafeTS(ts uint64) {
c.kvSafeTS = ts
}

func (c *storeSafeTsMockClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
if req.Type != tikvrpc.CmdStoreSafeTS {
return c.Client.SendRequest(ctx, addr, req, timeout)
}
atomic.AddInt32(&c.requestCount, 1)
resp := &tikvrpc.Response{}
resp.Resp = &kvrpcpb.StoreSafeTSResponse{SafeTs: 150}
return resp, nil
return &tikvrpc.Response{
Resp: &kvrpcpb.StoreSafeTSResponse{SafeTs: c.kvSafeTS},
}, nil
}

func (c *storeSafeTsMockClient) Close() error {
Expand Down Expand Up @@ -132,9 +124,7 @@ func (s *apiTestSuite) TestGetStoresMinResolvedTS() {
s.store.GetRegionCache().SetRegionCacheStore(storeID, s.storeAddr(storeID), s.storeAddr(storeID), tikvrpc.TiKV, 1, labels)
// Try to get the minimum resolved timestamp of the stores from PD.
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
mockClient := storeSafeTsMockClient{
Client: s.store.GetTiKVClient(),
}
mockClient := newStoreSafeTsMockClient(s.store.GetTiKVClient())
s.store.SetTiKVClient(&mockClient)
var retryCount int
for s.store.GetMinSafeTS(dcLabel) != 100 {
Expand All @@ -158,9 +148,7 @@ func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() {
require.NoError(failpoint.Disable("tikvclient/mockFastSafeTSUpdater"))
}()
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
mockClient := storeSafeTsMockClient{
Client: s.store.GetTiKVClient(),
}
mockClient := newStoreSafeTsMockClient(s.store.GetTiKVClient())
s.store.SetTiKVClient(&mockClient)
var retryCount int
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 100 {
Expand Down Expand Up @@ -216,20 +204,26 @@ func (s *apiTestSuite) TestInitClusterMinResolvedTSZero() {
defer func() {
require.NoError(failpoint.Disable("tikvclient/mockFastSafeTSUpdater"))
}()
// Try to get the minimum resolved timestamp of the cluster from PD.
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
mockClient := storeSafeTsMockClient{
Client: s.store.GetTiKVClient(),
}
s.store.SetTiKVClient(&mockClient)
// Mock safeTS is not initialized.
s.store.SetStoreSafeTS(uint64(1), 0)
s.store.SetMinSafeTS(oracle.GlobalTxnScope, 0)
require.Equal(uint64(0), s.store.GetMinSafeTS(oracle.GlobalTxnScope))

mockClient := newStoreSafeTsMockClient(s.store.GetTiKVClient())
s.store.SetTiKVClient(&mockClient)
// Make sure the store's min resolved ts is not initialized.
mockClient.SetKVSafeTS(0)
// Try to get the minimum resolved timestamp of the cluster from TiKV.
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(0)`))
var retryCount int
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 100 {
require.NotEqual(uint64(math.MaxUint64), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != math.MaxUint64 {
time.Sleep(100 * time.Millisecond)
if retryCount > 5 {
break
}
retryCount++
}

// Try to get the minimum resolved timestamp of the cluster from PD.
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
retryCount = 0
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) == math.MaxUint64 {
time.Sleep(100 * time.Millisecond)
if retryCount > 5 {
break
Expand All @@ -239,15 +233,11 @@ func (s *apiTestSuite) TestInitClusterMinResolvedTSZero() {
require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS"))

// Try to get the minimum resolved timestamp of the cluster from TiKV.
// Fallback to KV Request when PD server not support get min resolved ts.
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(0)`))
// Mock safeTS is not initialized.
s.store.SetStoreSafeTS(uint64(1), 0)
s.store.SetMinSafeTS(oracle.GlobalTxnScope, 0)
require.Equal(uint64(0), s.store.GetMinSafeTS(oracle.GlobalTxnScope))

mockClient.SetKVSafeTS(150)
retryCount = 0
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 150 {
require.NotEqual(uint64(math.MaxUint64), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
time.Sleep(100 * time.Millisecond)
if retryCount > 5 {
break
Expand Down
20 changes: 5 additions & 15 deletions tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,9 +500,6 @@ func (s *KVStore) GetTiKVClient() (client Client) {
// GetMinSafeTS return the minimal safeTS of the storage with given txnScope.
func (s *KVStore) GetMinSafeTS(txnScope string) uint64 {
if val, ok := s.minSafeTS.Load(txnScope); ok {
if val.(uint64) == uint64(math.MaxUint64) {
return 0
}
return val.(uint64)
}
return 0
Expand Down Expand Up @@ -693,6 +690,11 @@ func (s *KVStore) updateGlobalTxnScopeTSFromPD(ctx context.Context) bool {
} else if clusterMinSafeTS != 0 {
// Update ts and metrics.
preClusterMinSafeTS := s.GetMinSafeTS(oracle.GlobalTxnScope)
// If preClusterMinSafeTS is maxUint64, it means that the min safe ts has not been initialized.
// related to https://github.com/tikv/client-go/issues/991
if preClusterMinSafeTS == math.MaxUint64 {
preClusterMinSafeTS = 0
}
if preClusterMinSafeTS > clusterMinSafeTS {
skipSafeTSUpdateCounter.Inc()
preSafeTSTime := oracle.GetTimeFromTS(preClusterMinSafeTS)
Expand Down Expand Up @@ -853,15 +855,3 @@ type SchemaVer = transaction.SchemaVer
// MaxTxnTimeUse is the max time a Txn may use (in ms) from its begin to commit.
// We use it to abort the transaction to guarantee GC worker will not influence it.
const MaxTxnTimeUse = transaction.MaxTxnTimeUse

// SetMinSafeTS set the minimal safeTS of the storage with given txnScope.
// Note: it is only used for test.
func (s *KVStore) SetMinSafeTS(txnScope string, safeTS uint64) {
s.minSafeTS.Store(txnScope, safeTS)
}

// SetStoreSafeTS set the safeTS of the store with given storeID.
// Note: it is only used for test.
func (s *KVStore) SetStoreSafeTS(storeID, safeTS uint64) {
s.safeTSMap.Store(storeID, safeTS)
}

0 comments on commit b5f94e5

Please sign in to comment.