Skip to content

Commit

Permalink
update pd client
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Dec 17, 2024
1 parent 06d7f4b commit cd1a3df
Show file tree
Hide file tree
Showing 14 changed files with 149 additions and 116 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ require (
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
github.com/pingcap/kvproto v0.0.0-20241113043844-e1fa7ea8c302
github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.18.0
github.com/prometheus/client_model v0.5.0
github.com/stretchr/testify v1.8.2
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a
github.com/tikv/pd/client v0.0.0-20241111073742-238d4d79ea31
github.com/tikv/pd/client v0.0.0-20241211081727-5d62787565f7
github.com/twmb/murmur3 v1.1.3
go.etcd.io/etcd/api/v3 v3.5.10
go.etcd.io/etcd/client/v3 v3.5.10
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgW
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20241113043844-e1fa7ea8c302 h1:ynwwqr0rLliSOJcx0wHMu4T/NiPXHlK48mk2DCrBKCI=
github.com/pingcap/kvproto v0.0.0-20241113043844-e1fa7ea8c302/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 h1:xYNSJjYNur4Dr5bV+9BXK9n5E0T1zlcAN25XX68+mOg=
github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down Expand Up @@ -112,8 +112,8 @@ github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tikv/pd/client v0.0.0-20241111073742-238d4d79ea31 h1:oAYc4m5Eu1OY9ogJ103VO47AYPHvhtzbUPD8L8B67Qk=
github.com/tikv/pd/client v0.0.0-20241111073742-238d4d79ea31/go.mod h1:W5a0sDadwUpI9k8p7M77d3jo253ZHdmua+u4Ho4Xw8U=
github.com/tikv/pd/client v0.0.0-20241211081727-5d62787565f7 h1:HR2WOtqRd+VDXWcKzLZlBvEHYpKDvonRmd35dbvB0Sc=
github.com/tikv/pd/client v0.0.0-20241211081727-5d62787565f7/go.mod h1:q2sGh0Lo0GRZuSBkFvljGsJMEMaSa6d/58A+B7bCoCE=
github.com/twmb/murmur3 v1.1.3 h1:D83U0XYKcHRYwYIpBKf3Pks91Z0Byda/9SJ8B6EMRcA=
github.com/twmb/murmur3 v1.1.3/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
14 changes: 8 additions & 6 deletions integration_tests/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ import (
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/txnkv/transaction"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/clients/router"
"github.com/tikv/pd/client/opt"
)

func TestSplit(t *testing.T) {
Expand Down Expand Up @@ -191,7 +193,7 @@ func (c *mockPDClient) GetLocalTSAsync(ctx context.Context, dcLocation string) p
return nil
}

func (c *mockPDClient) GetRegion(ctx context.Context, key []byte, opts ...pd.GetRegionOption) (*pd.Region, error) {
func (c *mockPDClient) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*router.Region, error) {
c.RLock()
defer c.RUnlock()

Expand All @@ -201,11 +203,11 @@ func (c *mockPDClient) GetRegion(ctx context.Context, key []byte, opts ...pd.Get
return c.client.GetRegion(ctx, key, opts...)
}

func (c *mockPDClient) GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, opts ...pd.GetRegionOption) (*pd.Region, error) {
func (c *mockPDClient) GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, opts ...opt.GetRegionOption) (*router.Region, error) {
return nil, nil
}

func (c *mockPDClient) GetPrevRegion(ctx context.Context, key []byte, opts ...pd.GetRegionOption) (*pd.Region, error) {
func (c *mockPDClient) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*router.Region, error) {
c.RLock()
defer c.RUnlock()

Expand All @@ -215,7 +217,7 @@ func (c *mockPDClient) GetPrevRegion(ctx context.Context, key []byte, opts ...pd
return c.client.GetPrevRegion(ctx, key, opts...)
}

func (c *mockPDClient) GetRegionByID(ctx context.Context, regionID uint64, opts ...pd.GetRegionOption) (*pd.Region, error) {
func (c *mockPDClient) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt.GetRegionOption) (*router.Region, error) {
c.RLock()
defer c.RUnlock()

Expand All @@ -225,7 +227,7 @@ func (c *mockPDClient) GetRegionByID(ctx context.Context, regionID uint64, opts
return c.client.GetRegionByID(ctx, regionID, opts...)
}

func (c *mockPDClient) ScanRegions(ctx context.Context, startKey []byte, endKey []byte, limit int, opts ...pd.GetRegionOption) ([]*pd.Region, error) {
func (c *mockPDClient) ScanRegions(ctx context.Context, startKey []byte, endKey []byte, limit int, opts ...opt.GetRegionOption) ([]*router.Region, error) {
c.RLock()
defer c.RUnlock()

Expand All @@ -235,7 +237,7 @@ func (c *mockPDClient) ScanRegions(ctx context.Context, startKey []byte, endKey
return c.client.ScanRegions(ctx, startKey, endKey, limit)
}

func (c *mockPDClient) BatchScanRegions(ctx context.Context, keyRanges []pd.KeyRange, limit int, opts ...pd.GetRegionOption) ([]*pd.Region, error) {
func (c *mockPDClient) BatchScanRegions(ctx context.Context, keyRanges []router.KeyRange, limit int, opts ...opt.GetRegionOption) ([]*router.Region, error) {
c.RLock()
defer c.RUnlock()

Expand Down
20 changes: 11 additions & 9 deletions internal/locate/pd_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ import (
"github.com/pkg/errors"
"github.com/tikv/client-go/v2/internal/apicodec"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/clients/router"
"github.com/tikv/pd/client/opt"
)

var _ pd.Client = &CodecPDClient{}
Expand Down Expand Up @@ -101,30 +103,30 @@ func (c *CodecPDClient) GetCodec() apicodec.Codec {

// GetRegion encodes the key before send requests to pd-server and decodes the
// returned StartKey && EndKey from pd-server.
func (c *CodecPDClient) GetRegion(ctx context.Context, key []byte, opts ...pd.GetRegionOption) (*pd.Region, error) {
func (c *CodecPDClient) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*router.Region, error) {
encodedKey := c.codec.EncodeRegionKey(key)
region, err := c.Client.GetRegion(ctx, encodedKey, opts...)
return c.processRegionResult(region, err)
}

// GetPrevRegion encodes the key before send requests to pd-server and decodes the
// returned StartKey && EndKey from pd-server.
func (c *CodecPDClient) GetPrevRegion(ctx context.Context, key []byte, opts ...pd.GetRegionOption) (*pd.Region, error) {
func (c *CodecPDClient) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*router.Region, error) {
encodedKey := c.codec.EncodeRegionKey(key)
region, err := c.Client.GetPrevRegion(ctx, encodedKey, opts...)
return c.processRegionResult(region, err)
}

// GetRegionByID encodes the key before send requests to pd-server and decodes the
// returned StartKey && EndKey from pd-server.
func (c *CodecPDClient) GetRegionByID(ctx context.Context, regionID uint64, opts ...pd.GetRegionOption) (*pd.Region, error) {
func (c *CodecPDClient) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt.GetRegionOption) (*router.Region, error) {
region, err := c.Client.GetRegionByID(ctx, regionID, opts...)
return c.processRegionResult(region, err)
}

// ScanRegions encodes the key before send requests to pd-server and decodes the
// returned StartKey && EndKey from pd-server.
func (c *CodecPDClient) ScanRegions(ctx context.Context, startKey []byte, endKey []byte, limit int, opts ...pd.GetRegionOption) ([]*pd.Region, error) {
func (c *CodecPDClient) ScanRegions(ctx context.Context, startKey []byte, endKey []byte, limit int, opts ...opt.GetRegionOption) ([]*router.Region, error) {
startKey, endKey = c.codec.EncodeRegionRange(startKey, endKey)
//nolint:staticcheck
regions, err := c.Client.ScanRegions(ctx, startKey, endKey, limit, opts...)
Expand All @@ -145,8 +147,8 @@ func (c *CodecPDClient) ScanRegions(ctx context.Context, startKey []byte, endKey
// BatchScanRegions encodes the key before send requests to pd-server and decodes the
// returned StartKey && EndKey from pd-server.
// if limit > 0, it limits the maximum number of returned regions, should check if the result regions fully contain the given key ranges.
func (c *CodecPDClient) BatchScanRegions(ctx context.Context, keyRanges []pd.KeyRange, limit int, opts ...pd.GetRegionOption) ([]*pd.Region, error) {
encodedRanges := make([]pd.KeyRange, len(keyRanges))
func (c *CodecPDClient) BatchScanRegions(ctx context.Context, keyRanges []router.KeyRange, limit int, opts ...opt.GetRegionOption) ([]*router.Region, error) {
encodedRanges := make([]router.KeyRange, len(keyRanges))
for i, keyRange := range keyRanges {
encodedRanges[i].StartKey, encodedRanges[i].EndKey = c.codec.EncodeRegionRange(keyRange.StartKey, keyRange.EndKey)
}
Expand All @@ -166,15 +168,15 @@ func (c *CodecPDClient) BatchScanRegions(ctx context.Context, keyRanges []pd.Key
}

// SplitRegions split regions by given split keys
func (c *CodecPDClient) SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...pd.RegionsOption) (*pdpb.SplitRegionsResponse, error) {
func (c *CodecPDClient) SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...opt.RegionsOption) (*pdpb.SplitRegionsResponse, error) {
var keys [][]byte
for i := range splitKeys {
keys = append(keys, c.codec.EncodeRegionKey(splitKeys[i]))
}
return c.Client.SplitRegions(ctx, keys, opts...)
}

func (c *CodecPDClient) processRegionResult(region *pd.Region, err error) (*pd.Region, error) {
func (c *CodecPDClient) processRegionResult(region *router.Region, err error) (*router.Region, error) {
if err != nil {
return nil, errors.WithStack(err)
}
Expand All @@ -188,7 +190,7 @@ func (c *CodecPDClient) processRegionResult(region *pd.Region, err error) (*pd.R
return region, nil
}

func (c *CodecPDClient) decodeRegionKeyInPlace(r *pd.Region) error {
func (c *CodecPDClient) decodeRegionKeyInPlace(r *router.Region) error {
decodedStart, decodedEnd, err := c.codec.DecodeRegionRange(r.Meta.StartKey, r.Meta.EndKey)
if err != nil {
return err
Expand Down
Loading

0 comments on commit cd1a3df

Please sign in to comment.