diff --git a/go.mod b/go.mod index d2ff6fc5e..5dad70f74 100644 --- a/go.mod +++ b/go.mod @@ -15,14 +15,14 @@ require ( github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 - github.com/pingcap/kvproto v0.0.0-20241113043844-e1fa7ea8c302 + github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.18.0 github.com/prometheus/client_model v0.5.0 github.com/stretchr/testify v1.8.2 github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a - github.com/tikv/pd/client v0.0.0-20241111073742-238d4d79ea31 + github.com/tikv/pd/client v0.0.0-20241211081727-5d62787565f7 github.com/twmb/murmur3 v1.1.3 go.etcd.io/etcd/api/v3 v3.5.10 go.etcd.io/etcd/client/v3 v3.5.10 diff --git a/go.sum b/go.sum index 60814ed10..c202ec557 100644 --- a/go.sum +++ b/go.sum @@ -74,8 +74,8 @@ github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgW github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= -github.com/pingcap/kvproto v0.0.0-20241113043844-e1fa7ea8c302 h1:ynwwqr0rLliSOJcx0wHMu4T/NiPXHlK48mk2DCrBKCI= -github.com/pingcap/kvproto v0.0.0-20241113043844-e1fa7ea8c302/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= +github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 h1:xYNSJjYNur4Dr5bV+9BXK9n5E0T1zlcAN25XX68+mOg= +github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -112,8 +112,8 @@ github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM= -github.com/tikv/pd/client v0.0.0-20241111073742-238d4d79ea31 h1:oAYc4m5Eu1OY9ogJ103VO47AYPHvhtzbUPD8L8B67Qk= -github.com/tikv/pd/client v0.0.0-20241111073742-238d4d79ea31/go.mod h1:W5a0sDadwUpI9k8p7M77d3jo253ZHdmua+u4Ho4Xw8U= +github.com/tikv/pd/client v0.0.0-20241211081727-5d62787565f7 h1:HR2WOtqRd+VDXWcKzLZlBvEHYpKDvonRmd35dbvB0Sc= +github.com/tikv/pd/client v0.0.0-20241211081727-5d62787565f7/go.mod h1:q2sGh0Lo0GRZuSBkFvljGsJMEMaSa6d/58A+B7bCoCE= github.com/twmb/murmur3 v1.1.3 h1:D83U0XYKcHRYwYIpBKf3Pks91Z0Byda/9SJ8B6EMRcA= github.com/twmb/murmur3 v1.1.3/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/integration_tests/split_test.go b/integration_tests/split_test.go index a06d87b81..afae4d251 100644 --- a/integration_tests/split_test.go +++ b/integration_tests/split_test.go @@ -50,6 +50,8 @@ import ( "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/txnkv/transaction" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/clients/router" + "github.com/tikv/pd/client/opt" ) func TestSplit(t *testing.T) { @@ -191,7 +193,7 @@ func (c *mockPDClient) GetLocalTSAsync(ctx context.Context, dcLocation string) p return nil } -func (c *mockPDClient) GetRegion(ctx context.Context, key []byte, opts ...pd.GetRegionOption) (*pd.Region, error) { +func (c *mockPDClient) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*router.Region, error) { c.RLock() defer c.RUnlock() @@ -201,11 +203,11 @@ func (c *mockPDClient) GetRegion(ctx context.Context, key []byte, opts ...pd.Get return c.client.GetRegion(ctx, key, opts...) } -func (c *mockPDClient) GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, opts ...pd.GetRegionOption) (*pd.Region, error) { +func (c *mockPDClient) GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, opts ...opt.GetRegionOption) (*router.Region, error) { return nil, nil } -func (c *mockPDClient) GetPrevRegion(ctx context.Context, key []byte, opts ...pd.GetRegionOption) (*pd.Region, error) { +func (c *mockPDClient) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*router.Region, error) { c.RLock() defer c.RUnlock() @@ -215,7 +217,7 @@ func (c *mockPDClient) GetPrevRegion(ctx context.Context, key []byte, opts ...pd return c.client.GetPrevRegion(ctx, key, opts...) } -func (c *mockPDClient) GetRegionByID(ctx context.Context, regionID uint64, opts ...pd.GetRegionOption) (*pd.Region, error) { +func (c *mockPDClient) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt.GetRegionOption) (*router.Region, error) { c.RLock() defer c.RUnlock() @@ -225,7 +227,7 @@ func (c *mockPDClient) GetRegionByID(ctx context.Context, regionID uint64, opts return c.client.GetRegionByID(ctx, regionID, opts...) } -func (c *mockPDClient) ScanRegions(ctx context.Context, startKey []byte, endKey []byte, limit int, opts ...pd.GetRegionOption) ([]*pd.Region, error) { +func (c *mockPDClient) ScanRegions(ctx context.Context, startKey []byte, endKey []byte, limit int, opts ...opt.GetRegionOption) ([]*router.Region, error) { c.RLock() defer c.RUnlock() @@ -235,7 +237,7 @@ func (c *mockPDClient) ScanRegions(ctx context.Context, startKey []byte, endKey return c.client.ScanRegions(ctx, startKey, endKey, limit) } -func (c *mockPDClient) BatchScanRegions(ctx context.Context, keyRanges []pd.KeyRange, limit int, opts ...pd.GetRegionOption) ([]*pd.Region, error) { +func (c *mockPDClient) BatchScanRegions(ctx context.Context, keyRanges []router.KeyRange, limit int, opts ...opt.GetRegionOption) ([]*router.Region, error) { c.RLock() defer c.RUnlock() diff --git a/internal/locate/pd_codec.go b/internal/locate/pd_codec.go index 8a68e2af3..6041ad527 100644 --- a/internal/locate/pd_codec.go +++ b/internal/locate/pd_codec.go @@ -42,6 +42,8 @@ import ( "github.com/pkg/errors" "github.com/tikv/client-go/v2/internal/apicodec" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/clients/router" + "github.com/tikv/pd/client/opt" ) var _ pd.Client = &CodecPDClient{} @@ -101,7 +103,7 @@ func (c *CodecPDClient) GetCodec() apicodec.Codec { // GetRegion encodes the key before send requests to pd-server and decodes the // returned StartKey && EndKey from pd-server. -func (c *CodecPDClient) GetRegion(ctx context.Context, key []byte, opts ...pd.GetRegionOption) (*pd.Region, error) { +func (c *CodecPDClient) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*router.Region, error) { encodedKey := c.codec.EncodeRegionKey(key) region, err := c.Client.GetRegion(ctx, encodedKey, opts...) return c.processRegionResult(region, err) @@ -109,7 +111,7 @@ func (c *CodecPDClient) GetRegion(ctx context.Context, key []byte, opts ...pd.Ge // GetPrevRegion encodes the key before send requests to pd-server and decodes the // returned StartKey && EndKey from pd-server. -func (c *CodecPDClient) GetPrevRegion(ctx context.Context, key []byte, opts ...pd.GetRegionOption) (*pd.Region, error) { +func (c *CodecPDClient) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*router.Region, error) { encodedKey := c.codec.EncodeRegionKey(key) region, err := c.Client.GetPrevRegion(ctx, encodedKey, opts...) return c.processRegionResult(region, err) @@ -117,14 +119,14 @@ func (c *CodecPDClient) GetPrevRegion(ctx context.Context, key []byte, opts ...p // GetRegionByID encodes the key before send requests to pd-server and decodes the // returned StartKey && EndKey from pd-server. -func (c *CodecPDClient) GetRegionByID(ctx context.Context, regionID uint64, opts ...pd.GetRegionOption) (*pd.Region, error) { +func (c *CodecPDClient) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt.GetRegionOption) (*router.Region, error) { region, err := c.Client.GetRegionByID(ctx, regionID, opts...) return c.processRegionResult(region, err) } // ScanRegions encodes the key before send requests to pd-server and decodes the // returned StartKey && EndKey from pd-server. -func (c *CodecPDClient) ScanRegions(ctx context.Context, startKey []byte, endKey []byte, limit int, opts ...pd.GetRegionOption) ([]*pd.Region, error) { +func (c *CodecPDClient) ScanRegions(ctx context.Context, startKey []byte, endKey []byte, limit int, opts ...opt.GetRegionOption) ([]*router.Region, error) { startKey, endKey = c.codec.EncodeRegionRange(startKey, endKey) //nolint:staticcheck regions, err := c.Client.ScanRegions(ctx, startKey, endKey, limit, opts...) @@ -145,8 +147,8 @@ func (c *CodecPDClient) ScanRegions(ctx context.Context, startKey []byte, endKey // BatchScanRegions encodes the key before send requests to pd-server and decodes the // returned StartKey && EndKey from pd-server. // if limit > 0, it limits the maximum number of returned regions, should check if the result regions fully contain the given key ranges. -func (c *CodecPDClient) BatchScanRegions(ctx context.Context, keyRanges []pd.KeyRange, limit int, opts ...pd.GetRegionOption) ([]*pd.Region, error) { - encodedRanges := make([]pd.KeyRange, len(keyRanges)) +func (c *CodecPDClient) BatchScanRegions(ctx context.Context, keyRanges []router.KeyRange, limit int, opts ...opt.GetRegionOption) ([]*router.Region, error) { + encodedRanges := make([]router.KeyRange, len(keyRanges)) for i, keyRange := range keyRanges { encodedRanges[i].StartKey, encodedRanges[i].EndKey = c.codec.EncodeRegionRange(keyRange.StartKey, keyRange.EndKey) } @@ -166,7 +168,7 @@ func (c *CodecPDClient) BatchScanRegions(ctx context.Context, keyRanges []pd.Key } // SplitRegions split regions by given split keys -func (c *CodecPDClient) SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...pd.RegionsOption) (*pdpb.SplitRegionsResponse, error) { +func (c *CodecPDClient) SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...opt.RegionsOption) (*pdpb.SplitRegionsResponse, error) { var keys [][]byte for i := range splitKeys { keys = append(keys, c.codec.EncodeRegionKey(splitKeys[i])) @@ -174,7 +176,7 @@ func (c *CodecPDClient) SplitRegions(ctx context.Context, splitKeys [][]byte, op return c.Client.SplitRegions(ctx, keys, opts...) } -func (c *CodecPDClient) processRegionResult(region *pd.Region, err error) (*pd.Region, error) { +func (c *CodecPDClient) processRegionResult(region *router.Region, err error) (*router.Region, error) { if err != nil { return nil, errors.WithStack(err) } @@ -188,7 +190,7 @@ func (c *CodecPDClient) processRegionResult(region *pd.Region, err error) (*pd.R return region, nil } -func (c *CodecPDClient) decodeRegionKeyInPlace(r *pd.Region) error { +func (c *CodecPDClient) decodeRegionKeyInPlace(r *router.Region) error { decodedStart, decodedEnd, err := c.codec.DecodeRegionRange(r.Meta.StartKey, r.Meta.EndKey) if err != nil { return err diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index c70e137a4..a839869d9 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -67,6 +67,8 @@ import ( "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/util" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/clients/router" + "github.com/tikv/pd/client/opt" "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -306,7 +308,7 @@ func (r *regionStore) filterStoreCandidate(aidx AccessIndex, op *storeSelectorOp return s.IsLabelsMatch(op.labels) && (!op.preferLeader || (aidx == r.workTiKVIdx && !s.healthStatus.IsSlow())) } -func newRegion(bo *retry.Backoffer, c *RegionCache, pdRegion *pd.Region) (*Region, error) { +func newRegion(bo *retry.Backoffer, c *RegionCache, pdRegion *router.Region) (*Region, error) { r := &Region{meta: pdRegion.Meta} // regionStore pull used store from global store map // to avoid acquire storeMu in later access. @@ -1216,7 +1218,7 @@ func (c *RegionCache) LocateKeyRange(bo *retry.Backoffer, startKey, endKey []byt startKey = r.EndKey() } // 2. load remaining regions from pd client - batchRegions, err := c.BatchLoadRegionsWithKeyRanges(bo, []pd.KeyRange{{StartKey: startKey, EndKey: endKey}}, defaultRegionsPerBatch, WithNeedRegionHasLeaderPeer()) + batchRegions, err := c.BatchLoadRegionsWithKeyRanges(bo, []router.KeyRange{{StartKey: startKey, EndKey: endKey}}, defaultRegionsPerBatch, WithNeedRegionHasLeaderPeer()) if err != nil { return nil, err } @@ -1266,7 +1268,7 @@ func WithNeedRegionHasLeaderPeer() BatchLocateKeyRangesOpt { // BatchLocateKeyRanges lists regions in the given ranges. func (c *RegionCache) BatchLocateKeyRanges(bo *retry.Backoffer, keyRanges []kv.KeyRange, opts ...BatchLocateKeyRangesOpt) ([]*KeyLocation, error) { - uncachedRanges := make([]pd.KeyRange, 0, len(keyRanges)) + uncachedRanges := make([]router.KeyRange, 0, len(keyRanges)) cachedRegions := make([]*Region, 0, len(keyRanges)) // 1. find regions from cache var lastRegion *Region @@ -1285,7 +1287,7 @@ func (c *RegionCache) BatchLocateKeyRanges(bo *retry.Backoffer, keyRanges []kv.K lastRegion = r if r == nil { // region cache miss, add the cut range to uncachedRanges, load from PD later. - uncachedRanges = append(uncachedRanges, pd.KeyRange{StartKey: keyRange.StartKey, EndKey: keyRange.EndKey}) + uncachedRanges = append(uncachedRanges, router.KeyRange{StartKey: keyRange.StartKey, EndKey: keyRange.EndKey}) continue } // region cache hit, add the region to cachedRegions. @@ -1321,7 +1323,7 @@ func (c *RegionCache) BatchLocateKeyRanges(bo *retry.Backoffer, keyRanges []kv.K } } if !containsAll { - uncachedRanges = append(uncachedRanges, pd.KeyRange{StartKey: keyRange.StartKey, EndKey: keyRange.EndKey}) + uncachedRanges = append(uncachedRanges, router.KeyRange{StartKey: keyRange.StartKey, EndKey: keyRange.EndKey}) } } @@ -1421,7 +1423,7 @@ func (m *batchLocateRangesMerger) build() []*KeyLocation { // rangesAfterKey split the key ranges and return the rest ranges after splitKey. // the returned ranges are referenced to the input keyRanges, and the key range may be changed in place, // the input keyRanges should not be used after calling this function. -func rangesAfterKey(keyRanges []pd.KeyRange, splitKey []byte) []pd.KeyRange { +func rangesAfterKey(keyRanges []router.KeyRange, splitKey []byte) []router.KeyRange { if len(keyRanges) == 0 { return nil } @@ -1494,7 +1496,7 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey if r == nil || expired { // load region when it is not exists or expired. observeLoadRegion(tag, r, expired, 0) - lr, err := c.loadRegion(bo, key, isEndKey, pd.WithAllowFollowerHandle()) + lr, err := c.loadRegion(bo, key, isEndKey, opt.WithAllowFollowerHandle()) if err != nil { // no region data, return error if failure. return nil, err @@ -1799,7 +1801,7 @@ func (c *RegionCache) BatchLoadRegionsWithKeyRange(bo *retry.Backoffer, startKey // BatchLoadRegionsWithKeyRanges loads at most given numbers of regions to the RegionCache, // within the given key range from the key ranges. Returns the loaded regions. -func (c *RegionCache) BatchLoadRegionsWithKeyRanges(bo *retry.Backoffer, keyRanges []pd.KeyRange, count int, opts ...BatchLocateKeyRangesOpt) (regions []*Region, err error) { +func (c *RegionCache) BatchLoadRegionsWithKeyRanges(bo *retry.Backoffer, keyRanges []router.KeyRange, count int, opts ...BatchLocateKeyRangesOpt) (regions []*Region, err error) { if len(keyRanges) == 0 { return nil, nil } @@ -2046,7 +2048,7 @@ func observeLoadRegion(tag string, region *Region, expired bool, reloadFlags int // loadRegion loads region from pd client, and picks the first peer as leader. // If the given key is the end key of the region that you want, you may set the second argument to true. This is useful // when processing in reverse order. -func (c *RegionCache) loadRegion(bo *retry.Backoffer, key []byte, isEndKey bool, opts ...pd.GetRegionOption) (*Region, error) { +func (c *RegionCache) loadRegion(bo *retry.Backoffer, key []byte, isEndKey bool, opts ...opt.GetRegionOption) (*Region, error) { ctx := bo.GetCtx() if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("loadRegion", opentracing.ChildOf(span.Context())) @@ -2056,7 +2058,7 @@ func (c *RegionCache) loadRegion(bo *retry.Backoffer, key []byte, isEndKey bool, var backoffErr error searchPrev := false - opts = append(opts, pd.WithBuckets()) + opts = append(opts, opt.WithBuckets()) for { if backoffErr != nil { err := bo.Backoff(retry.BoPDRPC, backoffErr) @@ -2065,7 +2067,7 @@ func (c *RegionCache) loadRegion(bo *retry.Backoffer, key []byte, isEndKey bool, } } start := time.Now() - var reg *pd.Region + var reg *router.Region var err error if searchPrev { reg, err = c.pdClient.GetPrevRegion(ctx, key, opts...) @@ -2118,7 +2120,7 @@ func (c *RegionCache) loadRegionByID(bo *retry.Backoffer, regionID uint64) (*Reg } } start := time.Now() - reg, err := c.pdClient.GetRegionByID(ctx, regionID, pd.WithBuckets()) + reg, err := c.pdClient.GetRegionByID(ctx, regionID, opt.WithBuckets()) metrics.LoadRegionCacheHistogramWithRegionByID.Observe(time.Since(start).Seconds()) if err != nil { metrics.RegionCacheCounterWithGetRegionByIDError.Inc() @@ -2198,7 +2200,7 @@ func (c *RegionCache) scanRegions(bo *retry.Backoffer, startKey, endKey []byte, } start := time.Now() //nolint:staticcheck - regionsInfo, err := c.pdClient.ScanRegions(ctx, startKey, endKey, limit, pd.WithAllowFollowerHandle()) + regionsInfo, err := c.pdClient.ScanRegions(ctx, startKey, endKey, limit, opt.WithAllowFollowerHandle()) metrics.LoadRegionCacheHistogramWithRegions.Observe(time.Since(start).Seconds()) if err != nil { if apicodec.IsDecodeError(err) { @@ -2219,7 +2221,7 @@ func (c *RegionCache) scanRegions(bo *retry.Backoffer, startKey, endKey []byte, backoffErr = errors.Errorf("PD returned no region, limit: %d", limit) continue } - if regionsHaveGapInRanges([]pd.KeyRange{{StartKey: startKey, EndKey: endKey}}, regionsInfo, limit) { + if regionsHaveGapInRanges([]router.KeyRange{{StartKey: startKey, EndKey: endKey}}, regionsInfo, limit) { backoffErr = errors.Errorf("PD returned regions have gaps, limit: %d", limit) continue } @@ -2239,7 +2241,7 @@ func (c *RegionCache) scanRegions(bo *retry.Backoffer, startKey, endKey []byte, } // batchScanRegions scans at most `limit` regions from PD, starts from the region containing `startKey` and in key order. -func (c *RegionCache) batchScanRegions(bo *retry.Backoffer, keyRanges []pd.KeyRange, limit int, opts ...BatchLocateKeyRangesOpt) ([]*Region, error) { +func (c *RegionCache) batchScanRegions(bo *retry.Backoffer, keyRanges []router.KeyRange, limit int, opts ...BatchLocateKeyRangesOpt) ([]*Region, error) { if limit == 0 || len(keyRanges) == 0 { return nil, nil } @@ -2249,9 +2251,9 @@ func (c *RegionCache) batchScanRegions(bo *retry.Backoffer, keyRanges []pd.KeyRa defer span1.Finish() ctx = opentracing.ContextWithSpan(ctx, span1) } - var opt batchLocateKeyRangesOption + var batchOpt batchLocateKeyRangesOption for _, op := range opts { - op(&opt) + op(&batchOpt) } // TODO: return start key and end key after redact is introduced. var backoffErr error @@ -2263,9 +2265,9 @@ func (c *RegionCache) batchScanRegions(bo *retry.Backoffer, keyRanges []pd.KeyRa } } start := time.Now() - pdOpts := []pd.GetRegionOption{pd.WithAllowFollowerHandle()} - if opt.needBuckets { - pdOpts = append(pdOpts, pd.WithBuckets()) + pdOpts := []opt.GetRegionOption{opt.WithAllowFollowerHandle()} + if batchOpt.needBuckets { + pdOpts = append(pdOpts, opt.WithBuckets()) } regionsInfo, err := c.pdClient.BatchScanRegions(ctx, keyRanges, limit, pdOpts...) metrics.LoadRegionCacheHistogramWithBatchScanRegions.Observe(time.Since(start).Seconds()) @@ -2301,7 +2303,7 @@ func (c *RegionCache) batchScanRegions(bo *retry.Backoffer, keyRanges []pd.KeyRa ) continue } - validRegions, err := c.handleRegionInfos(bo, regionsInfo, opt.needRegionHasLeaderPeer) + validRegions, err := c.handleRegionInfos(bo, regionsInfo, batchOpt.needRegionHasLeaderPeer) if err != nil { return nil, err } @@ -2319,7 +2321,7 @@ func (c *RegionCache) batchScanRegions(bo *retry.Backoffer, keyRanges []pd.KeyRa // regionsHaveGapInRanges checks if the loaded regions can fully cover the key ranges. // If there are any gaps between the regions, it returns true, then the requests might be retried. // TODO: remove this function after PD client supports gap detection and handling it. -func regionsHaveGapInRanges(ranges []pd.KeyRange, regionsInfo []*pd.Region, limit int) bool { +func regionsHaveGapInRanges(ranges []router.KeyRange, regionsInfo []*router.Region, limit int) bool { if len(ranges) == 0 { return false } @@ -2371,7 +2373,7 @@ func regionsHaveGapInRanges(ranges []pd.KeyRange, regionsInfo []*pd.Region, limi return bytes.Compare(checkKey, ranges[checkIdx].EndKey) < 0 } -func (c *RegionCache) batchScanRegionsFallback(bo *retry.Backoffer, keyRanges []pd.KeyRange, limit int, opts ...BatchLocateKeyRangesOpt) ([]*Region, error) { +func (c *RegionCache) batchScanRegionsFallback(bo *retry.Backoffer, keyRanges []router.KeyRange, limit int, opts ...BatchLocateKeyRangesOpt) ([]*Region, error) { logutil.BgLogger().Warn("batch scan regions fallback to scan regions", zap.Int("range-num", len(keyRanges))) res := make([]*Region, 0, len(keyRanges)) var lastRegion *Region @@ -2405,7 +2407,7 @@ func (c *RegionCache) batchScanRegionsFallback(bo *retry.Backoffer, keyRanges [] return res, nil } -func (c *RegionCache) handleRegionInfos(bo *retry.Backoffer, regionsInfo []*pd.Region, needLeader bool) ([]*Region, error) { +func (c *RegionCache) handleRegionInfos(bo *retry.Backoffer, regionsInfo []*router.Region, needLeader bool) ([]*Region, error) { regions := make([]*Region, 0, len(regionsInfo)) for _, r := range regionsInfo { // Leader id = 0 indicates no leader. @@ -2573,7 +2575,7 @@ func (c *RegionCache) OnRegionEpochNotMatch(bo *retry.Backoffer, ctx *RPCContext for _, meta := range currentRegions { // TODO(youjiali1995): new regions inherit old region's buckets now. Maybe we should make EpochNotMatch error // carry buckets information. Can it bring much overhead? - region, err := newRegion(bo, c, &pd.Region{Meta: meta, Buckets: buckets}) + region, err := newRegion(bo, c, &router.Region{Meta: meta, Buckets: buckets}) if err != nil { return false, err } diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index 37a17d14c..0cb8ca5e6 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -62,23 +62,25 @@ import ( "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikvrpc" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/clients/router" + "github.com/tikv/pd/client/opt" uatomic "go.uber.org/atomic" ) type inspectedPDClient struct { pd.Client - getRegion func(ctx context.Context, cli pd.Client, key []byte, opts ...pd.GetRegionOption) (*pd.Region, error) - batchScanRegions func(ctx context.Context, keyRanges []pd.KeyRange, limit int, opts ...pd.GetRegionOption) ([]*pd.Region, error) + getRegion func(ctx context.Context, cli pd.Client, key []byte, opts ...opt.GetRegionOption) (*router.Region, error) + batchScanRegions func(ctx context.Context, keyRanges []router.KeyRange, limit int, opts ...opt.GetRegionOption) ([]*router.Region, error) } -func (c *inspectedPDClient) GetRegion(ctx context.Context, key []byte, opts ...pd.GetRegionOption) (*pd.Region, error) { +func (c *inspectedPDClient) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*router.Region, error) { if c.getRegion != nil { return c.getRegion(ctx, c.Client, key, opts...) } return c.Client.GetRegion(ctx, key, opts...) } -func (c *inspectedPDClient) BatchScanRegions(ctx context.Context, keyRanges []pd.KeyRange, limit int, opts ...pd.GetRegionOption) ([]*pd.Region, error) { +func (c *inspectedPDClient) BatchScanRegions(ctx context.Context, keyRanges []router.KeyRange, limit int, opts ...opt.GetRegionOption) ([]*router.Region, error) { if c.batchScanRegions != nil { return c.batchScanRegions(ctx, keyRanges, limit, opts...) } @@ -470,7 +472,7 @@ func (s *testRegionCacheSuite) TestResolveStateTransition() { func (s *testRegionCacheSuite) TestReturnRegionWithNoLeader() { region := s.getRegion([]byte("x")) - NoLeaderRegion := &pd.Region{ + NoLeaderRegion := &router.Region{ Meta: region.meta, Leader: nil, } @@ -480,10 +482,10 @@ func (s *testRegionCacheSuite) TestReturnRegionWithNoLeader() { batchScanCnt := 0 s.cache.pdClient = &inspectedPDClient{ Client: s.cache.pdClient, - batchScanRegions: func(ctx context.Context, keyRanges []pd.KeyRange, limit int, opts ...pd.GetRegionOption) ([]*pd.Region, error) { + batchScanRegions: func(ctx context.Context, keyRanges []router.KeyRange, limit int, opts ...opt.GetRegionOption) ([]*router.Region, error) { if batchScanCnt == 0 { batchScanCnt++ - return []*pd.Region{NoLeaderRegion}, nil + return []*router.Region{NoLeaderRegion}, nil } else { return originalBatchScanRegions(ctx, keyRanges, limit, opts...) } @@ -496,7 +498,7 @@ func (s *testRegionCacheSuite) TestReturnRegionWithNoLeader() { s.Equal(len(returnedRegions), 1) s.Equal(returnedRegions[0].meta.GetId(), region.GetID()) - returnedRegions, err = s.cache.batchScanRegions(bo, []pd.KeyRange{{StartKey: nil, EndKey: nil}}, 100, WithNeedRegionHasLeaderPeer()) + returnedRegions, err = s.cache.batchScanRegions(bo, []router.KeyRange{{StartKey: nil, EndKey: nil}}, 100, WithNeedRegionHasLeaderPeer()) s.Nil(err) s.Equal(len(returnedRegions), 1) s.Equal(returnedRegions[0].meta.GetId(), region.GetID()) @@ -509,7 +511,7 @@ func (s *testRegionCacheSuite) TestNeedExpireRegionAfterTTL() { cntGetRegion := 0 s.cache.pdClient = &inspectedPDClient{ Client: s.cache.pdClient, - getRegion: func(ctx context.Context, cli pd.Client, key []byte, opts ...pd.GetRegionOption) (*pd.Region, error) { + getRegion: func(ctx context.Context, cli pd.Client, key []byte, opts ...opt.GetRegionOption) (*router.Region, error) { cntGetRegion++ return cli.GetRegion(ctx, key, opts...) }, @@ -1581,7 +1583,7 @@ func (s *testRegionCacheSuite) TestPeersLenChange() { Peers: make([]*metapb.Peer, len(ctx.Meta.Peers)), } copy(cpMeta.Peers, ctx.Meta.Peers) - cpRegion := &pd.Region{ + cpRegion := &router.Region{ Meta: cpMeta, DownPeers: []*metapb.Peer{{Id: s.peer1, StoreId: s.store1}}, } @@ -1620,7 +1622,7 @@ func (s *testRegionCacheSuite) TestPeersLenChangedByWitness() { peer.IsWitness = true } } - cpRegion := &pd.Region{Meta: cpMeta} + cpRegion := &router.Region{Meta: cpMeta} region, err := newRegion(s.bo, s.cache, cpRegion) s.Nil(err) s.cache.insertRegionToCache(region, true, true) @@ -2650,9 +2652,9 @@ func (s *testRegionCacheSuite) TestBatchScanRegionsMerger() { func (s *testRegionCacheSuite) TestSplitKeyRanges() { check := func(keyRangeKeys []string, splitKey string, expects []string) { - keyRanges := make([]pd.KeyRange, 0, len(keyRangeKeys)/2) + keyRanges := make([]router.KeyRange, 0, len(keyRangeKeys)/2) for i := 0; i < len(keyRangeKeys); i += 2 { - keyRanges = append(keyRanges, pd.KeyRange{StartKey: []byte(keyRangeKeys[i]), EndKey: []byte(keyRangeKeys[i+1])}) + keyRanges = append(keyRanges, router.KeyRange{StartKey: []byte(keyRangeKeys[i]), EndKey: []byte(keyRangeKeys[i+1])}) } splitKeyRanges := rangesAfterKey(keyRanges, []byte(splitKey)) splitKeys := make([]string, 0, 2*len(splitKeyRanges)) @@ -2775,13 +2777,13 @@ func (s *testRegionCacheSuite) testBatchScanRegions() { func (s *testRegionCacheSuite) TestRangesAreCoveredCheck() { check := func(ranges []string, regions []string, limit int, expect bool) { - rs := make([]pd.KeyRange, 0, len(ranges)/2) + rs := make([]router.KeyRange, 0, len(ranges)/2) for i := 0; i < len(ranges); i += 2 { - rs = append(rs, pd.KeyRange{StartKey: []byte(ranges[i]), EndKey: []byte(ranges[i+1])}) + rs = append(rs, router.KeyRange{StartKey: []byte(ranges[i]), EndKey: []byte(ranges[i+1])}) } - rgs := make([]*pd.Region, 0, len(regions)) + rgs := make([]*router.Region, 0, len(regions)) for i := 0; i < len(regions); i += 2 { - rgs = append(rgs, &pd.Region{Meta: &metapb.Region{ + rgs = append(rgs, &router.Region{Meta: &metapb.Region{ StartKey: []byte(regions[i]), EndKey: []byte(regions[i+1]), }}) @@ -2912,7 +2914,7 @@ func (s *testRegionCacheSuite) TestScanRegionsWithGaps() { s.Equal(scanRegionRes, regions) batchScanRegionRes := getRegionIDsWithInject(func() ([]*Region, error) { - return s.cache.BatchLoadRegionsWithKeyRanges(s.bo, []pd.KeyRange{{StartKey: []byte{}, EndKey: []byte{}}}, 10) + return s.cache.BatchLoadRegionsWithKeyRanges(s.bo, []router.KeyRange{{StartKey: []byte{}, EndKey: []byte{}}}, 10) }) s.Equal(batchScanRegionRes, regions) } diff --git a/internal/mockstore/mocktikv/cluster.go b/internal/mockstore/mocktikv/cluster.go index 748900abd..72a0875ab 100644 --- a/internal/mockstore/mocktikv/cluster.go +++ b/internal/mockstore/mocktikv/cluster.go @@ -48,7 +48,8 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/tikv/client-go/v2/internal/mockstore/cluster" "github.com/tikv/client-go/v2/util" - pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/clients/router" + "github.com/tikv/pd/client/opt" ) var _ cluster.Cluster = &Cluster{} @@ -351,7 +352,7 @@ func (c *Cluster) GetRegionByID(regionID uint64) (*metapb.Region, *metapb.Peer, } // ScanRegions returns at most `limit` regions from given `key` and their leaders. -func (c *Cluster) ScanRegions(startKey, endKey []byte, limit int, opts ...pd.GetRegionOption) []*pd.Region { +func (c *Cluster) ScanRegions(startKey, endKey []byte, limit int, opts ...opt.GetRegionOption) []*router.Region { c.RLock() defer c.RUnlock() @@ -392,7 +393,7 @@ func (c *Cluster) ScanRegions(startKey, endKey []byte, limit int, opts ...pd.Get regions = regions[:limit] } - result := make([]*pd.Region, 0, len(regions)) + result := make([]*router.Region, 0, len(regions)) for _, region := range regions { leader := region.leaderPeer() if leader == nil { @@ -401,7 +402,7 @@ func (c *Cluster) ScanRegions(startKey, endKey []byte, limit int, opts ...pd.Get leader = proto.Clone(leader).(*metapb.Peer) } - r := &pd.Region{ + r := &router.Region{ Meta: proto.Clone(region.Meta).(*metapb.Region), Leader: leader, DownPeers: c.getDownPeers(region), diff --git a/internal/mockstore/mocktikv/pd.go b/internal/mockstore/mocktikv/pd.go index 0a1998d01..d267cbf1a 100644 --- a/internal/mockstore/mocktikv/pd.go +++ b/internal/mockstore/mocktikv/pd.go @@ -51,6 +51,11 @@ import ( "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/util" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/clients/router" + "github.com/tikv/pd/client/clients/tso" + "github.com/tikv/pd/client/opt" + "github.com/tikv/pd/client/pkg/caller" + "github.com/tikv/pd/client/servicediscovery" "go.uber.org/atomic" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -89,6 +94,8 @@ type pdClient struct { groups map[string]*rmpb.ResourceGroup delay *atomic.Bool + + callerComponent caller.Component } // NewPDClient creates a mock pd.Client that uses local timestamp and meta data @@ -170,11 +177,11 @@ func (c *pdClient) GetLocalTS(ctx context.Context, dcLocation string) (int64, in return c.GetTS(ctx) } -func (c *pdClient) GetTSAsync(ctx context.Context) pd.TSFuture { +func (c *pdClient) GetTSAsync(ctx context.Context) tso.TSFuture { return &mockTSFuture{c, ctx, false} } -func (c *pdClient) GetLocalTSAsync(ctx context.Context, dcLocation string) pd.TSFuture { +func (c *pdClient) GetLocalTSAsync(ctx context.Context, dcLocation string) tso.TSFuture { return c.GetTSAsync(ctx) } @@ -220,7 +227,7 @@ func (m *mockTSFuture) Wait() (int64, int64, error) { return m.pdc.GetTS(m.ctx) } -func (c *pdClient) GetRegion(ctx context.Context, key []byte, opts ...pd.GetRegionOption) (*pd.Region, error) { +func (c *pdClient) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*router.Region, error) { region, peer, buckets, downPeers := c.cluster.GetRegionByKey(key) if len(opts) == 0 { buckets = nil @@ -231,37 +238,37 @@ func (c *pdClient) GetRegion(ctx context.Context, key []byte, opts ...pd.GetRegi case <-time.After(200 * time.Millisecond): } } - return &pd.Region{Meta: region, Leader: peer, Buckets: buckets, DownPeers: downPeers}, nil + return &router.Region{Meta: region, Leader: peer, Buckets: buckets, DownPeers: downPeers}, nil } -func (c *pdClient) GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, opts ...pd.GetRegionOption) (*pd.Region, error) { - return &pd.Region{}, nil +func (c *pdClient) GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, opts ...opt.GetRegionOption) (*router.Region, error) { + return &router.Region{}, nil } -func (c *pdClient) GetPrevRegion(ctx context.Context, key []byte, opts ...pd.GetRegionOption) (*pd.Region, error) { +func (c *pdClient) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*router.Region, error) { region, peer, buckets, downPeers := c.cluster.GetPrevRegionByKey(key) if len(opts) == 0 { buckets = nil } - return &pd.Region{Meta: region, Leader: peer, Buckets: buckets, DownPeers: downPeers}, nil + return &router.Region{Meta: region, Leader: peer, Buckets: buckets, DownPeers: downPeers}, nil } -func (c *pdClient) GetRegionByID(ctx context.Context, regionID uint64, opts ...pd.GetRegionOption) (*pd.Region, error) { +func (c *pdClient) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt.GetRegionOption) (*router.Region, error) { region, peer, buckets, downPeers := c.cluster.GetRegionByID(regionID) - return &pd.Region{Meta: region, Leader: peer, Buckets: buckets, DownPeers: downPeers}, nil + return &router.Region{Meta: region, Leader: peer, Buckets: buckets, DownPeers: downPeers}, nil } -func (c *pdClient) ScanRegions(ctx context.Context, startKey []byte, endKey []byte, limit int, opts ...pd.GetRegionOption) ([]*pd.Region, error) { +func (c *pdClient) ScanRegions(ctx context.Context, startKey []byte, endKey []byte, limit int, opts ...opt.GetRegionOption) ([]*router.Region, error) { regions := c.cluster.ScanRegions(startKey, endKey, limit, opts...) return regions, nil } -func (c *pdClient) BatchScanRegions(ctx context.Context, keyRanges []pd.KeyRange, limit int, opts ...pd.GetRegionOption) ([]*pd.Region, error) { +func (c *pdClient) BatchScanRegions(ctx context.Context, keyRanges []router.KeyRange, limit int, opts ...opt.GetRegionOption) ([]*router.Region, error) { if _, err := util.EvalFailpoint("mockBatchScanRegionsUnimplemented"); err == nil { return nil, status.Errorf(codes.Unimplemented, "mock BatchScanRegions is not implemented") } - regions := make([]*pd.Region, 0, len(keyRanges)) - var lastRegion *pd.Region + regions := make([]*router.Region, 0, len(keyRanges)) + var lastRegion *router.Region for _, keyRange := range keyRanges { if lastRegion != nil && lastRegion.Meta != nil { if lastRegion.Meta.EndKey == nil || bytes.Compare(lastRegion.Meta.EndKey, keyRange.EndKey) >= 0 { @@ -301,7 +308,7 @@ func (c *pdClient) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, return store, nil } -func (c *pdClient) GetAllStores(ctx context.Context, opts ...pd.GetStoreOption) ([]*metapb.Store, error) { +func (c *pdClient) GetAllStores(ctx context.Context, opts ...opt.GetStoreOption) ([]*metapb.Store, error) { return c.cluster.GetAllStores(), nil } @@ -355,11 +362,11 @@ func (c *pdClient) ScatterRegion(ctx context.Context, regionID uint64) error { return nil } -func (c *pdClient) ScatterRegions(ctx context.Context, regionsID []uint64, opts ...pd.RegionsOption) (*pdpb.ScatterRegionResponse, error) { +func (c *pdClient) ScatterRegions(ctx context.Context, regionsID []uint64, opts ...opt.RegionsOption) (*pdpb.ScatterRegionResponse, error) { return nil, nil } -func (c *pdClient) SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...pd.RegionsOption) (*pdpb.SplitRegionsResponse, error) { +func (c *pdClient) SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...opt.RegionsOption) (*pdpb.SplitRegionsResponse, error) { return nil, nil } @@ -367,7 +374,7 @@ func (c *pdClient) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetO return &pdpb.GetOperatorResponse{Status: pdpb.OperatorStatus_SUCCESS}, nil } -func (c *pdClient) SplitAndScatterRegions(ctx context.Context, splitKeys [][]byte, opts ...pd.RegionsOption) (*pdpb.SplitAndScatterRegionsResponse, error) { +func (c *pdClient) SplitAndScatterRegions(ctx context.Context, splitKeys [][]byte, opts ...opt.RegionsOption) (*pdpb.SplitAndScatterRegionsResponse, error) { return nil, nil } @@ -377,7 +384,7 @@ func (c *pdClient) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) { func (c *pdClient) GetLeaderURL() string { return "mockpd" } -func (c *pdClient) UpdateOption(option pd.DynamicOption, value interface{}) error { +func (c *pdClient) UpdateOption(option opt.DynamicOption, value interface{}) error { return nil } @@ -429,7 +436,7 @@ func (c *pdClient) GetTSWithinKeyspace(ctx context.Context, keyspaceID uint32) ( return 0, 0, nil } -func (c *pdClient) GetTSWithinKeyspaceAsync(ctx context.Context, keyspaceID uint32) pd.TSFuture { +func (c *pdClient) GetTSWithinKeyspaceAsync(ctx context.Context, keyspaceID uint32) tso.TSFuture { return nil } @@ -437,19 +444,19 @@ func (c *pdClient) GetLocalTSWithinKeyspace(ctx context.Context, dcLocation stri return 0, 0, nil } -func (c *pdClient) GetLocalTSWithinKeyspaceAsync(ctx context.Context, dcLocation string, keyspaceID uint32) pd.TSFuture { +func (c *pdClient) GetLocalTSWithinKeyspaceAsync(ctx context.Context, dcLocation string, keyspaceID uint32) tso.TSFuture { return nil } -func (c *pdClient) Watch(ctx context.Context, key []byte, opts ...pd.OpOption) (chan []*meta_storagepb.Event, error) { +func (c *pdClient) Watch(ctx context.Context, key []byte, opts ...opt.MetaStorageOption) (chan []*meta_storagepb.Event, error) { return nil, nil } -func (c *pdClient) Get(ctx context.Context, key []byte, opts ...pd.OpOption) (*meta_storagepb.GetResponse, error) { +func (c *pdClient) Get(ctx context.Context, key []byte, opts ...opt.MetaStorageOption) (*meta_storagepb.GetResponse, error) { return nil, nil } -func (c *pdClient) Put(ctx context.Context, key []byte, value []byte, opts ...pd.OpOption) (*meta_storagepb.PutResponse, error) { +func (c *pdClient) Put(ctx context.Context, key []byte, value []byte, opts ...opt.MetaStorageOption) (*meta_storagepb.PutResponse, error) { return nil, nil } @@ -457,4 +464,11 @@ func (m *pdClient) LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGrou return nil, 0, nil } -func (m *pdClient) GetServiceDiscovery() pd.ServiceDiscovery { return nil } +func (m *pdClient) GetServiceDiscovery() servicediscovery.ServiceDiscovery { return nil } + +// WithCallerComponent implements the RPCClient interface. +func (m *pdClient) WithCallerComponent(callerComponent caller.Component) pd.RPCClient { + newClient := *m + newClient.callerComponent = callerComponent + return &newClient +} diff --git a/oracle/oracles/pd.go b/oracle/oracles/pd.go index b5b0c5f65..0be10fa05 100644 --- a/oracle/oracles/pd.go +++ b/oracle/oracles/pd.go @@ -48,6 +48,7 @@ import ( "github.com/tikv/client-go/v2/metrics" "github.com/tikv/client-go/v2/oracle" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/clients/tso" "go.uber.org/zap" "golang.org/x/sync/singleflight" ) @@ -226,7 +227,7 @@ func (o *pdOracle) GetAllTSOKeyspaceGroupMinTS(ctx context.Context) (uint64, err } type tsFuture struct { - pd.TSFuture + tso.TSFuture o *pdOracle txnScope string } @@ -245,7 +246,7 @@ func (f *tsFuture) Wait() (uint64, error) { } func (o *pdOracle) GetTimestampAsync(ctx context.Context, opt *oracle.Option) oracle.Future { - var ts pd.TSFuture + var ts tso.TSFuture if opt.TxnScope == oracle.GlobalTxnScope || opt.TxnScope == "" { ts = o.c.GetTSAsync(ctx) } else { diff --git a/rawkv/rawkv.go b/rawkv/rawkv.go index 87ed2f84c..dd88c1ff2 100644 --- a/rawkv/rawkv.go +++ b/rawkv/rawkv.go @@ -52,6 +52,8 @@ import ( "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/opt" + "github.com/tikv/pd/client/pkg/caller" "google.golang.org/grpc" ) @@ -135,15 +137,15 @@ type option struct { apiVersion kvrpcpb.APIVersion security config.Security gRPCDialOptions []grpc.DialOption - pdOptions []pd.ClientOption + pdOptions []opt.ClientOption keyspace string } // ClientOpt is factory to set the client options. type ClientOpt func(*option) -// WithPDOptions is used to set the pd.ClientOption -func WithPDOptions(opts ...pd.ClientOption) ClientOpt { +// WithPDOptions is used to set the opt.ClientOption +func WithPDOptions(opts ...opt.ClientOption) ClientOpt { return func(o *option) { o.pdOptions = append(o.pdOptions, opts...) } @@ -190,7 +192,7 @@ func (c *Client) SetColumnFamily(columnFamily string) *Client { } // NewClient creates a client with PD cluster addrs. -func NewClient(ctx context.Context, pdAddrs []string, security config.Security, opts ...pd.ClientOption) (*Client, error) { +func NewClient(ctx context.Context, pdAddrs []string, security config.Security, opts ...opt.ClientOption) (*Client, error) { return NewClientWithOpts(ctx, pdAddrs, WithSecurity(security), WithPDOptions(opts...)) } @@ -202,7 +204,7 @@ func NewClientWithOpts(ctx context.Context, pdAddrs []string, opts ...ClientOpt) } // Use an unwrapped PDClient to obtain keyspace meta. - pdCli, err := pd.NewClientWithContext(ctx, pdAddrs, pd.SecurityOption{ + pdCli, err := pd.NewClientWithContext(ctx, caller.Component("client-go/rawkv"), pdAddrs, pd.SecurityOption{ CAPath: opt.security.ClusterSSLCA, CertPath: opt.security.ClusterSSLCert, KeyPath: opt.security.ClusterSSLKey, diff --git a/tikv/kv.go b/tikv/kv.go index f9bc0488a..f2de3c9e5 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -68,6 +68,8 @@ import ( "github.com/tikv/client-go/v2/util" pd "github.com/tikv/pd/client" pdhttp "github.com/tikv/pd/client/http" + "github.com/tikv/pd/client/opt" + "github.com/tikv/pd/client/pkg/caller" resourceControlClient "github.com/tikv/pd/client/resource_group/controller" clientv3 "go.etcd.io/etcd/client/v3" atomicutil "go.uber.org/atomic" @@ -301,12 +303,13 @@ func NewPDClient(pdAddrs []string) (pd.Client, error) { cfg := config.GetGlobalConfig() // init pd-client pdCli, err := pd.NewClient( + caller.Component("client-go/kv"), pdAddrs, pd.SecurityOption{ CAPath: cfg.Security.ClusterSSLCA, CertPath: cfg.Security.ClusterSSLCert, KeyPath: cfg.Security.ClusterSSLKey, }, - pd.WithGRPCDialOptions( + opt.WithGRPCDialOptions( grpc.WithKeepaliveParams( keepalive.ClientParameters{ Time: time.Duration(cfg.TiKVClient.GrpcKeepAliveTime) * time.Second, @@ -314,8 +317,8 @@ func NewPDClient(pdAddrs []string) (pd.Client, error) { }, ), ), - pd.WithCustomTimeoutOption(time.Duration(cfg.PDClient.PDServerTimeout)*time.Second), - pd.WithForwardingOption(config.GetGlobalConfig().EnableForwarding), + opt.WithCustomTimeoutOption(time.Duration(cfg.PDClient.PDServerTimeout)*time.Second), + opt.WithForwardingOption(config.GetGlobalConfig().EnableForwarding), ) if err != nil { return nil, errors.WithStack(err) @@ -889,10 +892,11 @@ var _ = NewLockResolver // It is exported for other pkg to use. For instance, binlog service needs // to determine a transaction's commit state. // TODO(iosmanthus): support api v2 -func NewLockResolver(etcdAddrs []string, security config.Security, opts ...pd.ClientOption) ( +func NewLockResolver(etcdAddrs []string, security config.Security, opts ...opt.ClientOption) ( *txnlock.LockResolver, error, ) { pdCli, err := pd.NewClient( + caller.Component("client-go/kv"), etcdAddrs, pd.SecurityOption{ CAPath: security.ClusterSSLCA, CertPath: security.ClusterSSLCert, diff --git a/tikv/split_region.go b/tikv/split_region.go index e7aaba022..3742a00b0 100644 --- a/tikv/split_region.go +++ b/tikv/split_region.go @@ -54,7 +54,7 @@ import ( "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/txnkv/rangetask" "github.com/tikv/client-go/v2/util" - pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/opt" "go.uber.org/zap" ) @@ -242,9 +242,9 @@ func (s *KVStore) scatterRegion(bo *Backoffer, regionID uint64, tableID *int64) logutil.BgLogger().Info("start scatter region", zap.Uint64("regionID", regionID)) for { - opts := make([]pd.RegionsOption, 0, 1) + opts := make([]opt.RegionsOption, 0, 1) if tableID != nil { - opts = append(opts, pd.WithGroup(fmt.Sprintf("%v", *tableID))) + opts = append(opts, opt.WithGroup(fmt.Sprintf("%v", *tableID))) } _, err := s.pdClient.ScatterRegions(bo.GetCtx(), []uint64{regionID}, opts...) diff --git a/util/dns.go b/util/dns.go index 03b2d4234..a15397ebc 100644 --- a/util/dns.go +++ b/util/dns.go @@ -32,7 +32,7 @@ func wrapWithDomain(target, domain string) (string, error) { // // context.TODO(), // []string{"pd0.pd:2379"}, -// rawkv.WithPDOptions(pd.WithGRPCDialOptions(dialer)), +// rawkv.WithPDOptions(opt.WithGRPCDialOptions(dialer)), // rawkv.WithGRPCDialOptions(dialer), // // ) diff --git a/util/pd_interceptor.go b/util/pd_interceptor.go index 6f319c6a9..41eaa661f 100644 --- a/util/pd_interceptor.go +++ b/util/pd_interceptor.go @@ -41,11 +41,14 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/clients/router" + "github.com/tikv/pd/client/clients/tso" + "github.com/tikv/pd/client/opt" ) var ( - _ pd.Client = &InterceptedPDClient{} - _ pd.TSFuture = &interceptedTsFuture{} + _ pd.Client = &InterceptedPDClient{} + _ tso.TSFuture = &interceptedTsFuture{} ) func recordPDWaitTime(ctx context.Context, start time.Time) { @@ -63,7 +66,7 @@ type InterceptedPDClient struct { // interceptedTsFuture is a PD's wrapper future to record stmt detail. type interceptedTsFuture struct { - pd.TSFuture + tso.TSFuture ctx context.Context } @@ -84,7 +87,7 @@ func (m InterceptedPDClient) GetTS(ctx context.Context) (int64, int64, error) { } // GetTSAsync implements pd.Client#GetTSAsync. -func (m InterceptedPDClient) GetTSAsync(ctx context.Context) pd.TSFuture { +func (m InterceptedPDClient) GetTSAsync(ctx context.Context) tso.TSFuture { start := time.Now() f := m.Client.GetTSAsync(ctx) recordPDWaitTime(ctx, start) @@ -95,7 +98,7 @@ func (m InterceptedPDClient) GetTSAsync(ctx context.Context) pd.TSFuture { } // GetRegion implements pd.Client#GetRegion. -func (m InterceptedPDClient) GetRegion(ctx context.Context, key []byte, opts ...pd.GetRegionOption) (*pd.Region, error) { +func (m InterceptedPDClient) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*router.Region, error) { start := time.Now() r, err := m.Client.GetRegion(ctx, key, opts...) recordPDWaitTime(ctx, start) @@ -103,7 +106,7 @@ func (m InterceptedPDClient) GetRegion(ctx context.Context, key []byte, opts ... } // GetPrevRegion implements pd.Client#GetPrevRegion. -func (m InterceptedPDClient) GetPrevRegion(ctx context.Context, key []byte, opts ...pd.GetRegionOption) (*pd.Region, error) { +func (m InterceptedPDClient) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*router.Region, error) { start := time.Now() r, err := m.Client.GetPrevRegion(ctx, key, opts...) recordPDWaitTime(ctx, start) @@ -111,7 +114,7 @@ func (m InterceptedPDClient) GetPrevRegion(ctx context.Context, key []byte, opts } // GetRegionByID implements pd.Client#GetRegionByID. -func (m InterceptedPDClient) GetRegionByID(ctx context.Context, regionID uint64, opts ...pd.GetRegionOption) (*pd.Region, error) { +func (m InterceptedPDClient) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt.GetRegionOption) (*router.Region, error) { start := time.Now() r, err := m.Client.GetRegionByID(ctx, regionID, opts...) recordPDWaitTime(ctx, start) @@ -119,7 +122,7 @@ func (m InterceptedPDClient) GetRegionByID(ctx context.Context, regionID uint64, } // ScanRegions implements pd.Client#ScanRegions. -func (m InterceptedPDClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...pd.GetRegionOption) ([]*pd.Region, error) { +func (m InterceptedPDClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...opt.GetRegionOption) ([]*router.Region, error) { start := time.Now() //nolint:staticcheck r, err := m.Client.ScanRegions(ctx, key, endKey, limit, opts...)