Skip to content

Commit

Permalink
remove contextPatcher
Browse files Browse the repository at this point in the history
Signed-off-by: crazycs520 <[email protected]>
  • Loading branch information
crazycs520 committed Apr 7, 2024
1 parent 83ebf95 commit 2c97bb2
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 110 deletions.
27 changes: 0 additions & 27 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"context"
"encoding/hex"
"fmt"
"math"
"math/rand"
"slices"
"sort"
Expand All @@ -53,7 +52,6 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/google/btree"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/tikvpb"
"github.com/pkg/errors"
Expand Down Expand Up @@ -793,8 +791,6 @@ type RPCContext struct {
ProxyAddr string // valid when ProxyStore is not nil
TiKVNum int // Number of TiKV nodes among the region's peers. Assuming non-TiKV peers are all TiFlash peers.
BucketVersion uint64

contextPatcher contextPatcher // kvrpcpb.Context fields that need to be overridden
}

func (c *RPCContext) String() string {
Expand All @@ -810,29 +806,6 @@ func (c *RPCContext) String() string {
return res
}

type contextPatcher struct {
replicaRead *bool
busyThreshold *time.Duration
staleRead *bool
}

func (patcher *contextPatcher) applyTo(pbCtx *kvrpcpb.Context) {
if patcher.replicaRead != nil {
pbCtx.ReplicaRead = *patcher.replicaRead
}
if patcher.staleRead != nil {
pbCtx.StaleRead = *patcher.staleRead
}
if patcher.busyThreshold != nil {
millis := patcher.busyThreshold.Milliseconds()
if millis > 0 && millis <= math.MaxUint32 {
pbCtx.BusyThresholdMs = uint32(millis)
} else {
pbCtx.BusyThresholdMs = 0
}
}
}

type storeSelectorOp struct {
leaderOnly bool
preferLeader bool
Expand Down
52 changes: 7 additions & 45 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ import (
"sync/atomic"
"time"

"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/errorpb"
Expand All @@ -60,9 +64,6 @@ import (
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util"
pderr "github.com/tikv/pd/client/errs"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// shuttingDown is a flag to indicate tidb-server is exiting (Ctrl+C signal
Expand Down Expand Up @@ -364,42 +365,6 @@ func (s *baseReplicaSelector) String() string {
return fmt.Sprintf("cacheRegionIsValid: %v, replicaStatus: %v", cacheRegionIsValid, replicaStatus)
}

func hasDeadlineExceededError(replicas []*replica) bool {
for _, replica := range replicas {
if replica.hasFlag(deadlineErrUsingConfTimeoutFlag) {
// when meet deadline exceeded error, do fast retry without invalidate region cache.
return true
}
}
return false
}

func buildTiKVReplicas(region *Region) []*replica {
regionStore := region.getStore()
replicas := make([]*replica, 0, regionStore.accessStoreNum(tiKVOnly))
for _, storeIdx := range regionStore.accessIndex[tiKVOnly] {
replicas = append(
replicas, &replica{
store: regionStore.stores[storeIdx],
peer: region.meta.Peers[storeIdx],
epoch: regionStore.storeEpochs[storeIdx],
attempts: 0,
},
)
}

if val, err := util.EvalFailpoint("newReplicaSelectorInitialAttemptedTime"); err == nil {
attemptedTime, err := time.ParseDuration(val.(string))
if err != nil {
panic(err)
}
for _, r := range replicas {
r.attemptedTime = attemptedTime
}
}
return replicas
}

const (
maxReplicaAttempt = 10
// The maximum time to allow retrying sending requests after RPC failure. In case an RPC request fails after
Expand Down Expand Up @@ -686,11 +651,9 @@ func (s *RegionRequestSender) SendReqCtx(
}

var isLocalTraffic bool
if staleReadCollector != nil && s.replicaSelector != nil {
if s.replicaSelector.target != nil {
isLocalTraffic = s.replicaSelector.target.store.IsLabelsMatch(s.replicaSelector.option.labels)
staleReadCollector.onReq(req, isLocalTraffic)
}
if staleReadCollector != nil && s.replicaSelector != nil && s.replicaSelector.target != nil {
isLocalTraffic = s.replicaSelector.target.store.IsLabelsMatch(s.replicaSelector.option.labels)
staleReadCollector.onReq(req, isLocalTraffic)
}

logutil.Eventf(bo.GetCtx(), "send %s request to region %d at %s", req.Type, regionID.id, rpcCtx.Addr)
Expand All @@ -704,7 +667,6 @@ func (s *RegionRequestSender) SendReqCtx(
}

req.Context.ClusterId = rpcCtx.ClusterID
rpcCtx.contextPatcher.applyTo(&req.Context)
if req.InputRequestSource != "" && s.replicaSelector != nil {
patchRequestSource(req, s.replicaSelector.replicaType())
}
Expand Down
40 changes: 10 additions & 30 deletions internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
// Should only contain TiKV stores.
s.Equal(len(replicaSelector.replicas), regionStore.accessStoreNum(tiKVOnly))
s.Equal(len(replicaSelector.replicas), len(regionStore.stores)-1)
//s.IsType(&accessKnownLeader{}, replicaSelector.state)

// Verify that the store matches the peer and epoch.
for _, replica := range replicaSelector.replicas {
Expand Down Expand Up @@ -567,33 +566,24 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
s.Equal(replicaSelector.proxy.attempts, 1)

// When the current proxy node fails, it should try another one.
//lastProxy := replicaSelector.proxy
replicaSelector.onSendFailure(s.bo, nil)
rpcCtx, err = replicaSelector.next(s.bo, req)
s.NotNil(rpcCtx)
s.Nil(err)
//state, ok = replicaSelector.state.(*tryNewProxy)
//s.True(ok)
//s.Equal(regionStore.workTiKVIdx, state.leaderIdx)
//s.Equal(AccessIndex(2), replicaSelector.targetIdx)
//s.NotEqual(lastProxy, replicaSelector.proxyIdx)
s.Equal(replicaSelector.target.attempts, 2)
s.Equal(replicaSelector.proxy.attempts, 1)

// Test proxy store is saves when proxy is enabled
replicaSelector.onSendSuccess(req)
regionStore = region.getStore()
//s.Equal(replicaSelector.proxyIdx, regionStore.proxyTiKVIdx)
s.Equal(replicaSelector.proxy.peer.Id, replicaSelector.replicas[regionStore.proxyTiKVIdx].peer.Id)

// Test initial state is accessByKnownProxy when proxyTiKVIdx is valid
// Test when proxyTiKVIdx is valid
refreshEpochs(regionStore)
cache.enableForwarding = true
replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req)
s.Nil(err)
s.NotNil(replicaSelector)
//state2, ok := replicaSelector.state.(*accessByKnownProxy)
//s.True(ok)
//s.Equal(regionStore.workTiKVIdx, state2.leaderIdx)
_, err = replicaSelector.next(s.bo, req)
s.Nil(err)
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.target, replicaSelector.proxy)
Expand Down Expand Up @@ -780,19 +770,13 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
s.Nil(err)
s.True(hasFakeRegionError(resp))
s.Equal(bo.GetTotalBackoffTimes(), 3)
getReplicaSelectorRegion := func() *Region {
return sender.replicaSelector.region
}
s.False(getReplicaSelectorRegion().isValid())
s.False(sender.replicaSelector.region.isValid())
s.cluster.ChangeLeader(s.regionID, s.peerIDs[0])

// The leader store is alive but can't provide service.
getReplicaSelectorRegionStores := func() []*Store {
return sender.replicaSelector.region.getStore().stores
}
reachable.injectConstantLiveness(s.cache)
s.Eventually(func() bool {
stores := getReplicaSelectorRegionStores()
stores := sender.replicaSelector.region.getStore().stores
return stores[0].getLivenessState() == reachable &&
stores[1].getLivenessState() == reachable &&
stores[2].getLivenessState() == reachable
Expand All @@ -804,7 +788,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
resp, _, err = sender.SendReq(bo, req, region.Region, time.Second)
s.Nil(err)
s.True(hasFakeRegionError(resp))
s.False(getReplicaSelectorRegion().isValid())
s.False(sender.replicaSelector.region.isValid())
s.Equal(bo.GetTotalBackoffTimes(), maxReplicaAttempt+2)
s.cluster.StartStore(s.storeIDs[0])

Expand Down Expand Up @@ -838,7 +822,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
resp, _, err := sender.SendReq(bo, req, region.Region, time.Second)
s.Nil(err)
s.True(hasFakeRegionError(resp))
s.False(getReplicaSelectorRegion().isValid())
s.False(sender.replicaSelector.region.isValid())
s.Equal(bo.GetTotalBackoffTimes(), maxReplicaAttempt+2)
}()
}
Expand All @@ -858,7 +842,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
resp, _, err := sender.SendReq(bo, req, region.Region, time.Second)
s.Nil(err)
s.True(hasFakeRegionError(resp))
s.False(getReplicaSelectorRegion().isValid())
s.False(sender.replicaSelector.region.isValid())
s.Equal(bo.GetTotalBackoffTimes(), 0)
}()

Expand All @@ -877,7 +861,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
resp, _, err := sender.SendReq(bo, req, region.Region, time.Second)
s.Nil(err)
s.True(hasFakeRegionError(resp))
s.False(getReplicaSelectorRegion().isValid())
s.False(sender.replicaSelector.region.isValid())
s.Equal(bo.GetTotalBackoffTimes(), 0)
}()

Expand Down Expand Up @@ -910,7 +894,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
regionErr, _ := resp.GetRegionError()
s.NotNil(regionErr)
}
s.False(getReplicaSelectorRegion().isValid())
s.False(sender.replicaSelector.region.isValid())
s.Equal(bo.GetTotalBackoffTimes(), 0)
}()
}
Expand All @@ -926,7 +910,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
s.Nil(err)
s.True(hasFakeRegionError(resp))
s.True(bo.GetTotalBackoffTimes() == 3)
s.False(getReplicaSelectorRegion().isValid())
s.False(sender.replicaSelector.region.isValid())
for _, store := range s.storeIDs {
s.cluster.StartStore(store)
}
Expand Down Expand Up @@ -963,7 +947,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestLoadBasedReplicaRead() {
rpcCtx, err = replicaSelector.next(bo, req)
s.Nil(err)
s.NotEqual(rpcCtx.Peer.Id, s.leaderPeer)
rpcCtx.contextPatcher.applyTo(&req.Context)
s.True(req.ReplicaRead)
s.Equal(req.BusyThresholdMs, uint32(50))
lastPeerID := rpcCtx.Peer.Id
Expand All @@ -977,7 +960,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestLoadBasedReplicaRead() {
// Should choose a peer different from before
s.NotEqual(rpcCtx.Peer.Id, s.leaderPeer)
s.NotEqual(rpcCtx.Peer.Id, lastPeerID)
rpcCtx.contextPatcher.applyTo(&req.Context)
s.True(req.ReplicaRead)
s.Equal(req.BusyThresholdMs, uint32(50))

Expand All @@ -991,7 +973,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestLoadBasedReplicaRead() {
rpcCtx, err = replicaSelector.next(bo, req)
s.Nil(err)
s.Equal(rpcCtx.Peer.Id, s.leaderPeer)
rpcCtx.contextPatcher.applyTo(&req.Context)
s.False(req.ReplicaRead)
s.Equal(req.BusyThresholdMs, uint32(0))
s.True(replicaSelector.region.isValid()) // don't invalidate region when can't find an idle replica.
Expand All @@ -1006,7 +987,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestLoadBasedReplicaRead() {
rpcCtx, err = replicaSelector.next(bo, req)
s.Nil(err)
s.Equal(rpcCtx.Peer.Id, lessBusyPeer)
rpcCtx.contextPatcher.applyTo(&req.Context)
s.True(req.ReplicaRead)
}

Expand Down
37 changes: 37 additions & 0 deletions internal/locate/replica_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util"
)

type replicaSelector struct {
Expand Down Expand Up @@ -67,6 +68,32 @@ func newReplicaSelector(
}, nil
}

func buildTiKVReplicas(region *Region) []*replica {
regionStore := region.getStore()
replicas := make([]*replica, 0, regionStore.accessStoreNum(tiKVOnly))
for _, storeIdx := range regionStore.accessIndex[tiKVOnly] {
replicas = append(
replicas, &replica{
store: regionStore.stores[storeIdx],
peer: region.meta.Peers[storeIdx],
epoch: regionStore.storeEpochs[storeIdx],
attempts: 0,
},
)
}

if val, err := util.EvalFailpoint("newReplicaSelectorInitialAttemptedTime"); err == nil {
attemptedTime, err := time.ParseDuration(val.(string))
if err != nil {
panic(err)
}
for _, r := range replicas {
r.attemptedTime = attemptedTime
}
}
return replicas
}

func (s *replicaSelector) next(bo *retry.Backoffer, req *tikvrpc.Request) (rpcCtx *RPCContext, err error) {
if !s.region.isValid() {
metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("invalid").Inc()
Expand Down Expand Up @@ -266,6 +293,16 @@ func (s *ReplicaSelectMixedStrategy) next(selector *replicaSelector) *replica {
return nil
}

func hasDeadlineExceededError(replicas []*replica) bool {
for _, replica := range replicas {
if replica.hasFlag(deadlineErrUsingConfTimeoutFlag) {
// when meet deadline exceeded error, do fast retry without invalidate region cache.
return true
}
}
return false
}

func (s *ReplicaSelectMixedStrategy) isCandidate(r *replica, isLeader bool, epochStale bool, liveness livenessState) bool {
if epochStale || liveness == unreachable {
// the replica is not available, skip it.
Expand Down
16 changes: 8 additions & 8 deletions internal/locate/replica_selector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2492,18 +2492,18 @@ func (s *testReplicaSelectorSuite) changeRegionLeader(storeId uint64) {
s.cache.InvalidateCachedRegion(loc.Region)
}

func (s *testReplicaSelectorSuite) runCaseAndCompare(ca1 replicaSelectorAccessPathCase) bool {
sender := ca1.run(s)
ca1.checkResult(s, sender)
return !ca1.accessErrInValid
func (s *testReplicaSelectorSuite) runCaseAndCompare(ca replicaSelectorAccessPathCase) bool {
sender := ca.run(s)
ca.checkResult(s, sender)
return !ca.accessErrInValid
}

func (s *testReplicaSelectorSuite) runMultiCaseAndCompare(cas []replicaSelectorAccessPathCase) bool {
valid := true
for _, ca1 := range cas {
sender := ca1.run(s)
ca1.checkResult(s, sender)
valid = valid && !ca1.accessErrInValid
for _, ca := range cas {
sender := ca.run(s)
ca.checkResult(s, sender)
valid = valid && !ca.accessErrInValid
}
return valid
}
Expand Down

0 comments on commit 2c97bb2

Please sign in to comment.