Skip to content

Commit

Permalink
Merge branch 'master' into fix_log
Browse files Browse the repository at this point in the history
  • Loading branch information
HuSharp authored Sep 7, 2023
2 parents 7bb26b1 + 5dd12b0 commit f1c8414
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 44 deletions.
91 changes: 79 additions & 12 deletions integration_tests/pd_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ import (

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
Expand All @@ -67,7 +69,7 @@ type apiTestSuite struct {
func (s *apiTestSuite) SetupTest() {
addrs := strings.Split(*pdAddrs, ",")
pdClient, err := pd.NewClient(addrs, pd.SecurityOption{})
s.Require().Nil(err)
s.Require().NoError(err)
rpcClient := tikv.NewRPCClient()
// Set PD HTTP client.
store, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0, tikv.WithPDHTTPClient(nil, addrs))
Expand Down Expand Up @@ -103,42 +105,107 @@ func (c *storeSafeTsMockClient) CloseAddr(addr string) error {
return c.Client.CloseAddr(addr)
}

func (s *apiTestSuite) TestGetStoreMinResolvedTS() {
func (s *apiTestSuite) TestGetStoresMinResolvedTS() {
util.EnableFailpoints()
// Try to get the minimum resolved timestamp of the store from PD.
require := s.Require()
require.Nil(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
require.NoError(failpoint.Enable("tikvclient/mockFastSafeTSUpdater", `return()`))
defer func() {
require.NoError(failpoint.Disable("tikvclient/mockFastSafeTSUpdater"))
}()

// Set DC label for store 1.
// Mock Cluster-level min resolved ts failed.
dcLabel := "testDC"
restore := config.UpdateGlobal(func(conf *config.Config) {
conf.TxnScope = dcLabel
})
defer restore()

labels := []*metapb.StoreLabel{
{
Key: tikv.DCLabelKey,
Value: dcLabel,
},
}
storeID := uint64(1)
s.store.GetRegionCache().SetRegionCacheStore(storeID, s.storeAddr(storeID), s.storeAddr(storeID), tikvrpc.TiKV, 1, labels)
// Try to get the minimum resolved timestamp of the stores from PD.
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
mockClient := storeSafeTsMockClient{
Client: s.store.GetTiKVClient(),
}
s.store.SetTiKVClient(&mockClient)
var retryCount int
for s.store.GetMinSafeTS(dcLabel) != 100 {
time.Sleep(100 * time.Millisecond)
if retryCount > 5 {
break
}
retryCount++
}
require.Equal(int32(0), atomic.LoadInt32(&mockClient.requestCount))
require.Equal(uint64(100), s.store.GetMinSafeTS(dcLabel))
require.Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
}

func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() {
util.EnableFailpoints()
// Try to get the minimum resolved timestamp of the cluster from PD.
require := s.Require()
require.NoError(failpoint.Enable("tikvclient/mockFastSafeTSUpdater", `return()`))
defer func() {
require.NoError(failpoint.Disable("tikvclient/mockFastSafeTSUpdater"))
}()
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
mockClient := storeSafeTsMockClient{
Client: s.store.GetTiKVClient(),
}
s.store.SetTiKVClient(&mockClient)
var retryCount int
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 100 {
time.Sleep(2 * time.Second)
time.Sleep(100 * time.Millisecond)
if retryCount > 5 {
break
}
retryCount++
}
require.Equal(atomic.LoadInt32(&mockClient.requestCount), int32(0))
require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS"))

// Try to get the minimum resolved timestamp of the store from TiKV.
require.Nil(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(0)`))
// Set DC label for store 1.
// Mock PD server not support get min resolved ts by stores.
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(0)`))
defer func() {
s.Require().Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
}()
dcLabel := "testDC"
restore := config.UpdateGlobal(func(conf *config.Config) {
conf.TxnScope = dcLabel
})
defer restore()

labels := []*metapb.StoreLabel{
{
Key: tikv.DCLabelKey,
Value: dcLabel,
},
}
storeID := uint64(1)
s.store.GetRegionCache().SetRegionCacheStore(storeID, s.storeAddr(storeID), s.storeAddr(storeID), tikvrpc.TiKV, 1, labels)

// Try to get the minimum resolved timestamp of the store from TiKV.
retryCount = 0
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 150 {
time.Sleep(2 * time.Second)
for s.store.GetMinSafeTS(dcLabel) != 150 {
time.Sleep(100 * time.Millisecond)
if retryCount > 5 {
break
}
retryCount++
}

require.GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(1))
require.Equal(uint64(150), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.Equal(uint64(150), s.store.GetMinSafeTS(dcLabel))
}

func (s *apiTestSuite) TearDownTest() {
Expand Down
84 changes: 68 additions & 16 deletions tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,9 @@ func (s *KVStore) updateMinSafeTS(txnScope string, storeIDs []uint64) {
func (s *KVStore) safeTSUpdater() {
defer s.wg.Done()
t := time.NewTicker(safeTSUpdateInterval)
if _, e := util.EvalFailpoint("mockFastSafeTSUpdater"); e == nil {
t.Reset(time.Millisecond * 100)
}
defer t.Stop()
ctx, cancel := context.WithCancel(s.ctx)
ctx = util.WithInternalSourceType(ctx, util.InternalTxnGC)
Expand All @@ -580,33 +583,45 @@ func (s *KVStore) safeTSUpdater() {
}

func (s *KVStore) updateSafeTS(ctx context.Context) {
// Try to get the cluster-level minimum resolved timestamp from PD first.
if s.updateGlobalTxnScopeTSFromPD(ctx) {
return
}

// When txn scope is not global, we need to get the minimum resolved timestamp of each store.
stores := s.regionCache.GetAllStores()
tikvClient := s.GetTiKVClient()
wg := &sync.WaitGroup{}
wg.Add(len(stores))
for _, store := range stores {
// Try to get the minimum resolved timestamp of the store from PD.
var (
err error
storeMinResolvedTSs map[uint64]uint64
)
storeIDs := make([]string, len(stores))
if s.pdHttpClient != nil {
for i, store := range stores {
storeIDs[i] = strconv.FormatUint(store.StoreID(), 10)
}
_, storeMinResolvedTSs, err = s.pdHttpClient.GetMinResolvedTSByStoresIDs(ctx, storeIDs)
if err != nil {
// If getting the minimum resolved timestamp from PD failed, log the error and need to get it from TiKV.
logutil.BgLogger().Debug("get resolved TS from PD failed", zap.Error(err), zap.Any("stores", storeIDs))
}
}

for i, store := range stores {
storeID := store.StoreID()
storeAddr := store.GetAddr()
if store.IsTiFlash() {
storeAddr = store.GetPeerAddr()
}
go func(ctx context.Context, wg *sync.WaitGroup, storeID uint64, storeAddr string) {
go func(ctx context.Context, wg *sync.WaitGroup, storeID uint64, storeAddr string, storeIDStr string) {
defer wg.Done()

var (
safeTS uint64
err error
)
storeIDStr := strconv.Itoa(int(storeID))
// Try to get the minimum resolved timestamp of the store from PD.
if s.pdHttpClient != nil {
safeTS, err = s.pdHttpClient.GetStoreMinResolvedTS(ctx, storeID)
if err != nil {
logutil.BgLogger().Debug("get resolved TS from PD failed", zap.Error(err), zap.Uint64("store-id", storeID))
}
}
var safeTS uint64
// If getting the minimum resolved timestamp from PD failed or returned 0, try to get it from TiKV.
if safeTS == 0 || err != nil {
if storeMinResolvedTSs == nil || storeMinResolvedTSs[storeID] == 0 || err != nil {
resp, err := tikvClient.SendRequest(
ctx, storeAddr, tikvrpc.NewRequest(
tikvrpc.CmdStoreSafeTS, &kvrpcpb.StoreSafeTSRequest{
Expand All @@ -625,6 +640,8 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
return
}
safeTS = resp.Resp.(*kvrpcpb.StoreSafeTSResponse).GetSafeTs()
} else {
safeTS = storeMinResolvedTSs[storeID]
}

_, preSafeTS := s.getSafeTS(storeID)
Expand All @@ -638,7 +655,7 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
metrics.TiKVSafeTSUpdateCounter.WithLabelValues("success", storeIDStr).Inc()
safeTSTime := oracle.GetTimeFromTS(safeTS)
metrics.TiKVMinSafeTSGapSeconds.WithLabelValues(storeIDStr).Set(time.Since(safeTSTime).Seconds())
}(ctx, wg, storeID, storeAddr)
}(ctx, wg, storeID, storeAddr, storeIDs[i])
}

txnScopeMap := make(map[string][]uint64)
Expand All @@ -655,6 +672,41 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
wg.Wait()
}

var (
skipSafeTSUpdateCounter = metrics.TiKVSafeTSUpdateCounter.WithLabelValues("skip", "cluster")
successSafeTSUpdateCounter = metrics.TiKVSafeTSUpdateCounter.WithLabelValues("success", "cluster")
clusterMinSafeTSGap = metrics.TiKVMinSafeTSGapSeconds.WithLabelValues("cluster")
)

// updateGlobalTxnScopeTSFromPD check whether it is needed to get cluster-level's min resolved ts from PD
// to update min safe ts for global txn scope.
func (s *KVStore) updateGlobalTxnScopeTSFromPD(ctx context.Context) bool {
isGlobal := config.GetTxnScopeFromConfig() == oracle.GlobalTxnScope
// Try to get the minimum resolved timestamp of the cluster from PD.
if s.pdHttpClient != nil && isGlobal {
clusterMinSafeTS, _, err := s.pdHttpClient.GetMinResolvedTSByStoresIDs(ctx, nil)
if err != nil {
logutil.BgLogger().Debug("get resolved TS from PD failed", zap.Error(err))
} else if clusterMinSafeTS != 0 {
// Update ts and metrics.
preClusterMinSafeTS := s.GetMinSafeTS(oracle.GlobalTxnScope)
if preClusterMinSafeTS > clusterMinSafeTS {
skipSafeTSUpdateCounter.Inc()
preSafeTSTime := oracle.GetTimeFromTS(preClusterMinSafeTS)
clusterMinSafeTSGap.Set(time.Since(preSafeTSTime).Seconds())
} else {
s.minSafeTS.Store(oracle.GlobalTxnScope, clusterMinSafeTS)
successSafeTSUpdateCounter.Inc()
safeTSTime := oracle.GetTimeFromTS(clusterMinSafeTS)
clusterMinSafeTSGap.Set(time.Since(safeTSTime).Seconds())
}
return true
}
}

return false
}

func (s *KVStore) ruRuntimeStatsMapCleaner() {
defer s.wg.Done()
t := time.NewTicker(ruRuntimeStatsCleanInterval)
Expand Down
53 changes: 37 additions & 16 deletions util/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ const (
// pd request retry time when connection fail.
pdRequestRetryTime = 10

storeMinResolvedTSPrefix = "pd/api/v1/min-resolved-ts"
minResolvedTSPrefix = "pd/api/v1/min-resolved-ts"
)

// PDHTTPClient is an HTTP client of pd.
Expand Down Expand Up @@ -86,45 +86,66 @@ func NewPDHTTPClient(
}
}

// GetStoreMinResolvedTS get store-level min-resolved-ts from pd.
func (p *PDHTTPClient) GetStoreMinResolvedTS(ctx context.Context, storeID uint64) (uint64, error) {
// GetMinResolvedTSByStoresIDs get min-resolved-ts from pd by stores ids.
func (p *PDHTTPClient) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []string) (uint64, map[uint64]uint64, error) {
var err error
for _, addr := range p.addrs {
query := fmt.Sprintf("%s/%d", storeMinResolvedTSPrefix, storeID)
// scope is an optional parameter, it can be `cluster` or specified store IDs.
// - When no scope is given, cluster-level's min_resolved_ts will be returned and storesMinResolvedTS will be nil.
// - When scope is `cluster`, cluster-level's min_resolved_ts will be returned and storesMinResolvedTS will be filled.
// - When scope given a list of stores, min_resolved_ts will be provided for each store
// and the scope-specific min_resolved_ts will be returned.
query := minResolvedTSPrefix
if len(storeIDs) != 0 {
query = fmt.Sprintf("%s?scope=%s", query, strings.Join(storeIDs, ","))
}
v, e := pdRequest(ctx, addr, query, p.cli, http.MethodGet, nil)
if e != nil {
logutil.BgLogger().Debug("failed to get min resolved ts", zap.String("addr", addr), zap.Error(e))
err = e
continue
}
logutil.BgLogger().Debug("store min resolved ts", zap.String("resp", string(v)))
logutil.BgLogger().Debug("min resolved ts", zap.String("resp", string(v)))
d := struct {
IsRealTime bool `json:"is_real_time,omitempty"`
MinResolvedTS uint64 `json:"min_resolved_ts"`
MinResolvedTS uint64 `json:"min_resolved_ts"`
IsRealTime bool `json:"is_real_time,omitempty"`
StoresMinResolvedTS map[uint64]uint64 `json:"stores_min_resolved_ts"`
}{}
err = json.Unmarshal(v, &d)
if err != nil {
return 0, errors.Trace(err)
return 0, nil, errors.Trace(err)
}
if !d.IsRealTime {
message := fmt.Errorf("store min resolved ts not enabled, addr: %s", addr)
message := fmt.Errorf("min resolved ts not enabled, addr: %s", addr)
logutil.BgLogger().Debug(message.Error())
return 0, errors.Trace(message)
return 0, nil, errors.Trace(message)
}
if val, e := EvalFailpoint("InjectMinResolvedTS"); e == nil {
// Need to make sure successfully get from real pd.
if d.MinResolvedTS != 0 {
// Should be val.(uint64) but failpoint doesn't support that.
if tmp, ok := val.(int); ok {
d.MinResolvedTS = uint64(tmp)
if d.StoresMinResolvedTS != nil {
for storeID, v := range d.StoresMinResolvedTS {
if v != 0 {
// Should be val.(uint64) but failpoint doesn't support that.
if tmp, ok := val.(int); ok {
d.StoresMinResolvedTS[storeID] = uint64(tmp)
logutil.BgLogger().Info("inject min resolved ts", zap.Uint64("storeID", storeID), zap.Uint64("ts", uint64(tmp)))
}
}
}
} else if tmp, ok := val.(int); ok {
// Should be val.(uint64) but failpoint doesn't support that.
// ci's store id is 1, we can change it if we have more stores.
// but for pool ci it's no need to do that :(
d.MinResolvedTS = uint64(tmp)
logutil.BgLogger().Info("inject min resolved ts", zap.Uint64("ts", uint64(tmp)))
}

}

return d.MinResolvedTS, nil
return d.MinResolvedTS, d.StoresMinResolvedTS, nil
}

return 0, errors.Trace(err)
return 0, nil, errors.Trace(err)
}

// pdRequest is a func to send an HTTP to pd and return the result bytes.
Expand Down

0 comments on commit f1c8414

Please sign in to comment.