Skip to content

Commit

Permalink
Try to validate read ts for all RPC requests (#1513) (#1548)
Browse files Browse the repository at this point in the history
 

Signed-off-by: MyonKeminta <[email protected]>
Signed-off-by: you06 <[email protected]>

Co-authored-by: MyonKeminta <[email protected]>
  • Loading branch information
you06 and MyonKeminta authored Jan 15, 2025
1 parent 0dc4129 commit c1b98e6
Show file tree
Hide file tree
Showing 24 changed files with 324 additions and 99 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ go 1.21
require (
github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2
github.com/docker/go-units v0.5.0
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.5.3
github.com/google/btree v1.1.2
github.com/google/uuid v1.3.1
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0
github.com/ninedraft/israce v0.0.3
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
Expand Down Expand Up @@ -58,6 +60,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/ninedraft/israce v0.0.3 h1:F/Y1u6OlvgE75Syv1WbBatyg3CjGCdxLojLE7ydv2yE=
github.com/ninedraft/israce v0.0.3/go.mod h1:4L1ITFl340650ZmexVbUcBwG18ozlWiMe47pltZAmn4=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
Expand Down Expand Up @@ -112,8 +116,6 @@ github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tikv/pd/client v0.0.0-20231116062916-ef6ba8551e52 h1:wucAo/ks8INgayRVfbrzZ+BSWEwRLETj0XfngDcrZ4k=
github.com/tikv/pd/client v0.0.0-20231116062916-ef6ba8551e52/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ=
github.com/tikv/pd/client v0.0.0-20240202093025-6978558f4e97 h1:LlsMm3/IIRRXrip19M0yYb4bZ7BvdQgzF77csNnGK1Y=
github.com/tikv/pd/client v0.0.0-20240202093025-6978558f4e97/go.mod h1:AwjTSpM7CgAynYwB6qTG5R5fVC9/eXlQXiTO6zDL1HI=
github.com/twmb/murmur3 v1.1.3 h1:D83U0XYKcHRYwYIpBKf3Pks91Z0Byda/9SJ8B6EMRcA=
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ require (
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect
github.com/BurntSushi/toml v1.3.2 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cloudfoundry/gosigar v1.3.6 // indirect
github.com/cockroachdb/errors v1.8.1 // indirect
github.com/cockroachdb/logtags v0.0.0-20190617123548-eb05cc24525f // indirect
Expand Down
8 changes: 4 additions & 4 deletions integration_tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ github.com/cenkalti/backoff/v4 v4.1.1 h1:G2HAfAmvm/GcKan2oOQpBXOd2tT2G57ZnZGWa1P
github.com/cenkalti/backoff/v4 v4.1.1/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cheggaaa/pb/v3 v3.0.8 h1:bC8oemdChbke2FHIIGy9mn4DPJ2caZYQnfbRqwmdCoA=
github.com/cheggaaa/pb/v3 v3.0.8/go.mod h1:UICbiLec/XO6Hw6k+BHEtHeQFzzBH4i2/qk/ow1EJTA=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
Expand Down Expand Up @@ -118,8 +118,8 @@ github.com/dgryski/go-farm v0.0.0-20190104051053-3adb47b1fb0f/go.mod h1:SqUrOPUn
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 h1:fAjc9m62+UWV/WAFKLNi6ZS0675eEUC9y3AlwSbQu1Y=
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw=
github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/dolthub/maphash v0.1.0 h1:bsQ7JsF4FkkWyrP3oCnFJgrCUAFbFf3kOl4L/QxPDyQ=
github.com/dolthub/maphash v0.1.0/go.mod h1:gkg4Ch4CdCDu5h6PMriVLawB7koZ+5ijb9puGMV50a4=
github.com/dolthub/swiss v0.2.1 h1:gs2osYs5SJkAaH5/ggVJqXQxRXtWshF6uE0lgR/Y3Gw=
Expand Down
5 changes: 3 additions & 2 deletions internal/locate/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/tikv/client-go/v2/internal/mockstore/mocktikv"
"github.com/tikv/client-go/v2/internal/retry"
"github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
)

Expand Down Expand Up @@ -1055,7 +1056,7 @@ func (s *testRegionCacheSuite) TestRegionEpochOnTiFlash() {
s.Equal(ctxTiFlash.Peer.Id, s.peer1)
ctxTiFlash.Peer.Role = metapb.PeerRole_Learner
r := ctxTiFlash.Meta
reqSend := NewRegionRequestSender(s.cache, nil)
reqSend := NewRegionRequestSender(s.cache, nil, oracle.NoopReadTSValidator{})
regionErr := &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{CurrentRegions: []*metapb.Region{r}}}
reqSend.onRegionError(s.bo, ctxTiFlash, nil, regionErr)

Expand Down Expand Up @@ -1691,7 +1692,7 @@ func (s *testRegionCacheSuite) TestShouldNotRetryFlashback() {
ctx, err := s.cache.GetTiKVRPCContext(retry.NewBackofferWithVars(context.Background(), 100, nil), loc.Region, kv.ReplicaReadLeader, 0)
s.NotNil(ctx)
s.NoError(err)
reqSend := NewRegionRequestSender(s.cache, nil)
reqSend := NewRegionRequestSender(s.cache, nil, oracle.NoopReadTSValidator{})
shouldRetry, err := reqSend.onRegionError(s.bo, ctx, nil, &errorpb.Error{FlashbackInProgress: &errorpb.FlashbackInProgress{}})
s.Error(err)
s.False(shouldRetry)
Expand Down
54 changes: 50 additions & 4 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"sync/atomic"
"time"

"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -105,6 +106,7 @@ type RegionRequestSender struct {
regionCache *RegionCache
apiVersion kvrpcpb.APIVersion
client client.Client
readTSValidator oracle.ReadTSValidator
storeAddr string
rpcError error
replicaSelector *replicaSelector
Expand Down Expand Up @@ -372,11 +374,12 @@ func (s *ReplicaAccessStats) String() string {
}

// NewRegionRequestSender creates a new sender.
func NewRegionRequestSender(regionCache *RegionCache, client client.Client) *RegionRequestSender {
func NewRegionRequestSender(regionCache *RegionCache, client client.Client, readTSValidator oracle.ReadTSValidator) *RegionRequestSender {
return &RegionRequestSender{
regionCache: regionCache,
apiVersion: regionCache.codec.GetAPIVersion(),
client: client,
regionCache: regionCache,
apiVersion: regionCache.codec.GetAPIVersion(),
client: client,
readTSValidator: readTSValidator,
}
}

Expand Down Expand Up @@ -1558,6 +1561,11 @@ func (s *RegionRequestSender) SendReqCtx(
}
}

if err = s.validateReadTS(bo.GetCtx(), req); err != nil {
logutil.Logger(bo.GetCtx()).Error("validate read ts failed for request", zap.Stringer("reqType", req.Type), zap.Stringer("req", req.Req.(fmt.Stringer)), zap.Stringer("context", &req.Context), zap.Stack("stack"), zap.Error(err))
return nil, nil, 0, err
}

// If the MaxExecutionDurationMs is not set yet, we set it to be the RPC timeout duration
// so TiKV can give up the requests whose response TiDB cannot receive due to timeout.
if req.Context.MaxExecutionDurationMs == 0 {
Expand Down Expand Up @@ -2535,6 +2543,44 @@ func (s *RegionRequestSender) onRegionError(
return false, nil
}

func (s *RegionRequestSender) validateReadTS(ctx context.Context, req *tikvrpc.Request) error {
if req.StoreTp == tikvrpc.TiDB {
// Skip the checking if the store type is TiDB.
return nil
}

var readTS uint64
switch req.Type {
case tikvrpc.CmdGet, tikvrpc.CmdScan, tikvrpc.CmdBatchGet, tikvrpc.CmdCop, tikvrpc.CmdCopStream, tikvrpc.CmdBatchCop, tikvrpc.CmdScanLock:
readTS = req.GetStartTS()

// TODO: Check transactional write requests that has implicit read.
// case tikvrpc.CmdPessimisticLock:
// readTS = req.PessimisticLock().GetForUpdateTs()
// case tikvrpc.CmdPrewrite:
// inner := req.Prewrite()
// readTS = inner.GetForUpdateTs()
// if readTS == 0 {
// readTS = inner.GetStartVersion()
// }
// case tikvrpc.CmdCheckTxnStatus:
// inner := req.CheckTxnStatus()
// // TiKV uses the greater one of these three fields to update the max_ts.
// readTS = inner.GetLockTs()
// if inner.GetCurrentTs() != math.MaxUint64 && inner.GetCurrentTs() > readTS {
// readTS = inner.GetCurrentTs()
// }
// if inner.GetCallerStartTs() != math.MaxUint64 && inner.GetCallerStartTs() > readTS {
// readTS = inner.GetCallerStartTs()
// }
// case tikvrpc.CmdCheckSecondaryLocks, tikvrpc.CmdCleanup, tikvrpc.CmdBatchRollback:
// readTS = req.GetStartTS()
default:
return nil
}
return s.readTSValidator.ValidateReadTS(ctx, readTS, req.StaleRead, &oracle.Option{TxnScope: req.TxnScope})
}

type staleReadMetricsCollector struct {
}

Expand Down
9 changes: 5 additions & 4 deletions internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (s *testRegionRequestToThreeStoresSuite) SetupTest() {
s.cache = NewRegionCache(pdCli)
s.bo = retry.NewNoopBackoff(context.Background())
client := mocktikv.NewRPCClient(s.cluster, s.mvccStore, nil)
s.regionRequestSender = NewRegionRequestSender(s.cache, client)
s.regionRequestSender = NewRegionRequestSender(s.cache, client, oracle.NoopReadTSValidator{})
}

func (s *testRegionRequestToThreeStoresSuite) TearDownTest() {
Expand Down Expand Up @@ -150,7 +150,8 @@ func (s *testRegionRequestToThreeStoresSuite) loadAndGetLeaderStore() (*Store, s
}

func (s *testRegionRequestToThreeStoresSuite) TestForwarding() {
s.regionRequestSender.regionCache.enableForwarding = true
sender := NewRegionRequestSender(s.cache, s.regionRequestSender.client, oracle.NoopReadTSValidator{})
sender.regionCache.enableForwarding = true

// First get the leader's addr from region cache
leaderStore, leaderAddr := s.loadAndGetLeaderStore()
Expand Down Expand Up @@ -1240,7 +1241,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() {
}
resetStats := func() {
reqTargetAddrs = make(map[string]struct{})
s.regionRequestSender = NewRegionRequestSender(s.cache, mockRPCClient)
s.regionRequestSender = NewRegionRequestSender(s.cache, mockRPCClient, oracle.NoopReadTSValidator{})
s.regionRequestSender.Stats = NewRegionRequestRuntimeStats()
}

Expand Down Expand Up @@ -1564,7 +1565,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestStaleReadTryFollowerAfterTimeo
}
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte("value")}}, nil
}}
s.regionRequestSender = NewRegionRequestSender(s.cache, mockRPCClient)
s.regionRequestSender = NewRegionRequestSender(s.cache, mockRPCClient, oracle.NoopReadTSValidator{})
s.regionRequestSender.Stats = NewRegionRequestRuntimeStats()
getLocFn := func() *KeyLocation {
loc, err := s.regionRequestSender.regionCache.LocateKey(bo, []byte("a"))
Expand Down
3 changes: 2 additions & 1 deletion internal/locate/region_request_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/tikv/client-go/v2/internal/retry"
"github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikvrpc"
)

Expand Down Expand Up @@ -76,7 +77,7 @@ func (s *testRegionCacheStaleReadSuite) SetupTest() {
s.cache = NewRegionCache(pdCli)
s.bo = retry.NewNoopBackoff(context.Background())
client := mocktikv.NewRPCClient(s.cluster, s.mvccStore, nil)
s.regionRequestSender = NewRegionRequestSender(s.cache, client)
s.regionRequestSender = NewRegionRequestSender(s.cache, client, oracle.NoopReadTSValidator{})
s.setClient()
s.injection = testRegionCacheFSMSuiteInjection{
unavailableStoreIDs: make(map[uint64]struct{}),
Expand Down
15 changes: 9 additions & 6 deletions internal/locate/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ import (
"github.com/tikv/client-go/v2/internal/client/mock_server"
"github.com/tikv/client-go/v2/internal/mockstore/mocktikv"
"github.com/tikv/client-go/v2/internal/retry"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikvrpc"
pd "github.com/tikv/pd/client"
pderr "github.com/tikv/pd/client/errs"
"google.golang.org/grpc"
)
Expand All @@ -77,6 +79,7 @@ type testRegionRequestToSingleStoreSuite struct {
store uint64
peer uint64
region uint64
pdCli pd.Client
cache *RegionCache
bo *retry.Backoffer
regionRequestSender *RegionRequestSender
Expand All @@ -87,11 +90,11 @@ func (s *testRegionRequestToSingleStoreSuite) SetupTest() {
s.mvccStore = mocktikv.MustNewMVCCStore()
s.cluster = mocktikv.NewCluster(s.mvccStore)
s.store, s.peer, s.region = mocktikv.BootstrapWithSingleStore(s.cluster)
pdCli := &CodecPDClient{mocktikv.NewPDClient(s.cluster), apicodec.NewCodecV1(apicodec.ModeTxn)}
s.cache = NewRegionCache(pdCli)
s.pdCli = &CodecPDClient{mocktikv.NewPDClient(s.cluster), apicodec.NewCodecV1(apicodec.ModeTxn)}
s.cache = NewRegionCache(s.pdCli)
s.bo = retry.NewNoopBackoff(context.Background())
client := mocktikv.NewRPCClient(s.cluster, s.mvccStore, nil)
s.regionRequestSender = NewRegionRequestSender(s.cache, client)
s.regionRequestSender = NewRegionRequestSender(s.cache, client, oracle.NoopReadTSValidator{})
}

func (s *testRegionRequestToSingleStoreSuite) TearDownTest() {
Expand Down Expand Up @@ -573,7 +576,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestNoReloadRegionForGrpcWhenCtxCa
}()

cli := client.NewRPCClient()
sender := NewRegionRequestSender(s.cache, cli)
sender := NewRegionRequestSender(s.cache, cli, oracle.NoopReadTSValidator{})
req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{
Key: []byte("key"),
Value: []byte("value"),
Expand All @@ -592,7 +595,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestNoReloadRegionForGrpcWhenCtxCa
Client: client.NewRPCClient(),
redirectAddr: addr,
}
sender = NewRegionRequestSender(s.cache, client1)
sender = NewRegionRequestSender(s.cache, client1, oracle.NoopReadTSValidator{})
sender.SendReq(s.bo, req, region.Region, 3*time.Second)

// cleanup
Expand Down Expand Up @@ -812,7 +815,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestBatchClientSendLoopPanic() {
cancel()
}()
req := tikvrpc.NewRequest(tikvrpc.CmdCop, &coprocessor.Request{Data: []byte("a"), StartTs: 1})
regionRequestSender := NewRegionRequestSender(s.cache, fnClient)
regionRequestSender := NewRegionRequestSender(s.cache, fnClient, oracle.NoopReadTSValidator{})
regionRequestSender.regionCache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf))
regionRequestSender.SendReq(bo, req, region.Region, client.ReadTimeoutShort)
}
Expand Down
38 changes: 34 additions & 4 deletions oracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ package oracle

import (
"context"
"fmt"
"time"
)

Expand Down Expand Up @@ -64,12 +65,17 @@ type Oracle interface {
GetExternalTimestamp(ctx context.Context) (uint64, error)
SetExternalTimestamp(ctx context.Context, ts uint64) error

// ValidateSnapshotReadTS verifies whether it can be guaranteed that the given readTS doesn't exceed the maximum ts
// that has been allocated by the oracle, so that it's safe to use this ts to perform snapshot read, stale read,
// etc.
ReadTSValidator
}

// ReadTSValidator is the interface for providing the ability for verifying whether a timestamp is safe to be used
// for readings, as part of the `Oracle` interface.
type ReadTSValidator interface {
// ValidateReadTS verifies whether it can be guaranteed that the given readTS doesn't exceed the maximum ts
// that has been allocated by the oracle, so that it's safe to use this ts to perform read operations.
// Note that this method only checks the ts from the oracle's perspective. It doesn't check whether the snapshot
// has been GCed.
ValidateSnapshotReadTS(ctx context.Context, readTS uint64, opt *Option) error
ValidateReadTS(ctx context.Context, readTS uint64, isStaleRead bool, opt *Option) error
}

// Future is a future which promises to return a timestamp.
Expand Down Expand Up @@ -121,3 +127,27 @@ func GoTimeToTS(t time.Time) uint64 {
func GoTimeToLowerLimitStartTS(now time.Time, maxTxnTimeUse int64) uint64 {
return GoTimeToTS(now.Add(-time.Duration(maxTxnTimeUse) * time.Millisecond))
}

// NoopReadTSValidator is a dummy implementation of ReadTSValidator that always let the validation pass.
// Only use this when using RPCs that are not related to ts (e.g. rawkv), or in tests where `Oracle` is not available
// and the validation is not necessary.
type NoopReadTSValidator struct{}

func (NoopReadTSValidator) ValidateReadTS(ctx context.Context, readTS uint64, isStaleRead bool, opt *Option) error {
return nil
}

type ErrFutureTSRead struct {
ReadTS uint64
CurrentTS uint64
}

func (e ErrFutureTSRead) Error() string {
return fmt.Sprintf("cannot set read timestamp to a future time, readTS: %d, currentTS: %d", e.ReadTS, e.CurrentTS)
}

type ErrLatestStaleRead struct{}

func (ErrLatestStaleRead) Error() string {
return "cannot set read ts to max uint64 for stale read"
}
2 changes: 1 addition & 1 deletion oracle/oracles/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,6 @@ func SetEmptyPDOracleLastTs(oc oracle.Oracle, ts uint64) {
case *pdOracle:
lastTSInterface, _ := o.lastTSMap.LoadOrStore(oracle.GlobalTxnScope, &atomic.Pointer[lastTSO]{})
lastTSPointer := lastTSInterface.(*atomic.Pointer[lastTSO])
lastTSPointer.Store(&lastTSO{tso: ts, arrival: ts})
lastTSPointer.Store(&lastTSO{tso: ts, arrival: oracle.GetTimeFromTS(ts)})
}
}
Loading

0 comments on commit c1b98e6

Please sign in to comment.