Skip to content

Commit

Permalink
Merge branch 'master' into dev1
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Aug 21, 2024
2 parents dded6c8 + ce1d0e8 commit 7965d10
Show file tree
Hide file tree
Showing 244 changed files with 2,896 additions and 2,133 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ coverage
go.work*
embedded_assets_handler.go
*.log
*.bin
11 changes: 11 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ linters:
- gofmt
- revive
- errcheck
- exportloopref
- goimports
- depguard
linters-settings:
gocritic:
# Which checks should be disabled; can't be combined with 'enabled-checks'; default is empty
Expand Down Expand Up @@ -205,6 +208,14 @@ linters-settings:
- (net/http.ResponseWriter).Write
- github.com/pingcap/log.Sync
- (github.com/tikv/pd/pkg/ratelimit.Runner).RunTask
depguard:
rules:
denied-deps:
deny:
- pkg: go.uber.org/atomic
desc: "Use 'sync/atomic' instead of 'go.uber.org/atomic'"
- pkg: github.com/pkg/errors
desc: "Use 'github.com/pingcap/errors' instead of 'github.com/pkg/errors'"
issues:
exclude-rules:
- path: (_test\.go|pkg/mock/.*\.go|tests/.*\.go)
Expand Down
52 changes: 33 additions & 19 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,7 @@ func (c *client) setup() error {
return nil
}

// Close closes the client.
func (c *client) Close() {
c.cancel()
c.wg.Wait()
Expand Down Expand Up @@ -802,19 +803,19 @@ func (c *client) UpdateOption(option DynamicOption, value any) error {
return nil
}

// GetAllMembers gets the members Info from PD.
func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) {
start := time.Now()
defer func() { cmdDurationGetAllMembers.Observe(time.Since(start).Seconds()) }()

ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
req := &pdpb.GetMembersRequest{Header: c.requestHeader()}
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
}
resp, err := protoClient.GetMembers(ctx, req)
cancel()
if err = c.respForErr(cmdFailDurationGetAllMembers, start, err, resp.GetHeader()); err != nil {
return nil, err
}
Expand Down Expand Up @@ -848,10 +849,12 @@ func (c *client) getRegionAPIClientAndContext(ctx context.Context, allowFollower
return serviceClient, serviceClient.BuildGRPCTargetContext(ctx, !allowFollower)
}

// GetTSAsync implements the TSOClient interface.
func (c *client) GetTSAsync(ctx context.Context) TSFuture {
return c.GetLocalTSAsync(ctx, globalDCLocation)
}

// GetLocalTSAsync implements the TSOClient interface.
func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFuture {
defer trace.StartRegion(ctx, "pdclient.GetLocalTSAsync").End()
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
Expand Down Expand Up @@ -902,16 +905,19 @@ func (c *client) dispatchTSORequestWithRetry(ctx context.Context, dcLocation str
return req
}

// GetTS implements the TSOClient interface.
func (c *client) GetTS(ctx context.Context) (physical int64, logical int64, err error) {
resp := c.GetTSAsync(ctx)
return resp.Wait()
}

// GetLocalTS implements the TSOClient interface.
func (c *client) GetLocalTS(ctx context.Context, dcLocation string) (physical int64, logical int64, err error) {
resp := c.GetLocalTSAsync(ctx, dcLocation)
return resp.Wait()
}

// GetMinTS implements the TSOClient interface.
func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, err error) {
// Handle compatibility issue in case of PD/API server doesn't support GetMinTS API.
serviceMode := c.getServiceMode()
Expand All @@ -927,17 +933,16 @@ func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, e
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("undefined service mode")
}
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
// Call GetMinTS API to get the minimal TS from the API leader.
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return 0, 0, errs.ErrClientGetProtoClient
}

resp, err := protoClient.GetMinTS(ctx, &pdpb.GetMinTSRequest{
Header: c.requestHeader(),
})
cancel()
if err != nil {
if strings.Contains(err.Error(), "Unimplemented") {
// If the method is not supported, we fallback to GetTS.
Expand Down Expand Up @@ -975,6 +980,7 @@ func handleRegionResponse(res *pdpb.GetRegionResponse) *Region {
return r
}

// GetRegionFromMember implements the RPCClient interface.
func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, _ ...GetRegionOption) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetRegionFromMember", opentracing.ChildOf(span.Context()))
Expand Down Expand Up @@ -1013,6 +1019,7 @@ func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs
return handleRegionResponse(resp), nil
}

// GetRegion implements the RPCClient interface.
func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetRegion", opentracing.ChildOf(span.Context()))
Expand Down Expand Up @@ -1051,6 +1058,7 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOpt
return handleRegionResponse(resp), nil
}

// GetPrevRegion implements the RPCClient interface.
func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetPrevRegion", opentracing.ChildOf(span.Context()))
Expand Down Expand Up @@ -1089,6 +1097,7 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegio
return handleRegionResponse(resp), nil
}

// GetRegionByID implements the RPCClient interface.
func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...GetRegionOption) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetRegionByID", opentracing.ChildOf(span.Context()))
Expand Down Expand Up @@ -1127,6 +1136,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...Get
return handleRegionResponse(resp), nil
}

// ScanRegions implements the RPCClient interface.
func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...GetRegionOption) ([]*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.ScanRegions", opentracing.ChildOf(span.Context()))
Expand Down Expand Up @@ -1176,6 +1186,7 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int,
return handleRegionsResponse(resp), nil
}

// BatchScanRegions implements the RPCClient interface.
func (c *client) BatchScanRegions(ctx context.Context, ranges []KeyRange, limit int, opts ...GetRegionOption) ([]*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.BatchScanRegions", opentracing.ChildOf(span.Context()))
Expand Down Expand Up @@ -1274,6 +1285,7 @@ func handleRegionsResponse(resp *pdpb.ScanRegionsResponse) []*Region {
return regions
}

// GetStore implements the RPCClient interface.
func (c *client) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetStore", opentracing.ChildOf(span.Context()))
Expand All @@ -1283,17 +1295,16 @@ func (c *client) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, e
defer func() { cmdDurationGetStore.Observe(time.Since(start).Seconds()) }()

ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
req := &pdpb.GetStoreRequest{
Header: c.requestHeader(),
StoreId: storeID,
}
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
}
resp, err := protoClient.GetStore(ctx, req)
cancel()

if err = c.respForErr(cmdFailedDurationGetStore, start, err, resp.GetHeader()); err != nil {
return nil, err
Expand All @@ -1312,6 +1323,7 @@ func handleStoreResponse(resp *pdpb.GetStoreResponse) (*metapb.Store, error) {
return store, nil
}

// GetAllStores implements the RPCClient interface.
func (c *client) GetAllStores(ctx context.Context, opts ...GetStoreOption) ([]*metapb.Store, error) {
// Applies options
options := &GetStoreOp{}
Expand All @@ -1327,24 +1339,24 @@ func (c *client) GetAllStores(ctx context.Context, opts ...GetStoreOption) ([]*m
defer func() { cmdDurationGetAllStores.Observe(time.Since(start).Seconds()) }()

ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
req := &pdpb.GetAllStoresRequest{
Header: c.requestHeader(),
ExcludeTombstoneStores: options.excludeTombstone,
}
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
}
resp, err := protoClient.GetAllStores(ctx, req)
cancel()

if err = c.respForErr(cmdFailedDurationGetAllStores, start, err, resp.GetHeader()); err != nil {
return nil, err
}
return resp.GetStores(), nil
}

// UpdateGCSafePoint implements the RPCClient interface.
func (c *client) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint64, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.UpdateGCSafePoint", opentracing.ChildOf(span.Context()))
Expand All @@ -1354,17 +1366,16 @@ func (c *client) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint6
defer func() { cmdDurationUpdateGCSafePoint.Observe(time.Since(start).Seconds()) }()

ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
req := &pdpb.UpdateGCSafePointRequest{
Header: c.requestHeader(),
SafePoint: safePoint,
}
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return 0, errs.ErrClientGetProtoClient
}
resp, err := protoClient.UpdateGCSafePoint(ctx, req)
cancel()

if err = c.respForErr(cmdFailedDurationUpdateGCSafePoint, start, err, resp.GetHeader()); err != nil {
return 0, err
Expand All @@ -1386,6 +1397,7 @@ func (c *client) UpdateServiceGCSafePoint(ctx context.Context, serviceID string,
defer func() { cmdDurationUpdateServiceGCSafePoint.Observe(time.Since(start).Seconds()) }()

ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
req := &pdpb.UpdateServiceGCSafePointRequest{
Header: c.requestHeader(),
ServiceId: []byte(serviceID),
Expand All @@ -1394,18 +1406,17 @@ func (c *client) UpdateServiceGCSafePoint(ctx context.Context, serviceID string,
}
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return 0, errs.ErrClientGetProtoClient
}
resp, err := protoClient.UpdateServiceGCSafePoint(ctx, req)
cancel()

if err = c.respForErr(cmdFailedDurationUpdateServiceGCSafePoint, start, err, resp.GetHeader()); err != nil {
return 0, err
}
return resp.GetMinSafePoint(), nil
}

// ScatterRegion implements the RPCClient interface.
func (c *client) ScatterRegion(ctx context.Context, regionID uint64) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.ScatterRegion", opentracing.ChildOf(span.Context()))
Expand All @@ -1419,18 +1430,17 @@ func (c *client) scatterRegionsWithGroup(ctx context.Context, regionID uint64, g
defer func() { cmdDurationScatterRegion.Observe(time.Since(start).Seconds()) }()

ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
req := &pdpb.ScatterRegionRequest{
Header: c.requestHeader(),
RegionId: regionID,
Group: group,
}
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return errs.ErrClientGetProtoClient
}
resp, err := protoClient.ScatterRegion(ctx, req)
cancel()
if err != nil {
return err
}
Expand All @@ -1440,6 +1450,7 @@ func (c *client) scatterRegionsWithGroup(ctx context.Context, regionID uint64, g
return nil
}

// ScatterRegions implements the RPCClient interface.
func (c *client) ScatterRegions(ctx context.Context, regionsID []uint64, opts ...RegionsOption) (*pdpb.ScatterRegionResponse, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.ScatterRegions", opentracing.ChildOf(span.Context()))
Expand All @@ -1448,6 +1459,7 @@ func (c *client) ScatterRegions(ctx context.Context, regionsID []uint64, opts ..
return c.scatterRegionsWithOptions(ctx, regionsID, opts...)
}

// SplitAndScatterRegions implements the RPCClient interface.
func (c *client) SplitAndScatterRegions(ctx context.Context, splitKeys [][]byte, opts ...RegionsOption) (*pdpb.SplitAndScatterRegionsResponse, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.SplitAndScatterRegions", opentracing.ChildOf(span.Context()))
Expand All @@ -1470,12 +1482,12 @@ func (c *client) SplitAndScatterRegions(ctx context.Context, splitKeys [][]byte,

protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
}
return protoClient.SplitAndScatterRegions(ctx, req)
}

// GetOperator implements the RPCClient interface.
func (c *client) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetOperator", opentracing.ChildOf(span.Context()))
Expand All @@ -1492,7 +1504,6 @@ func (c *client) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOpe
}
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
}
return protoClient.GetOperator(ctx, req)
Expand All @@ -1519,7 +1530,6 @@ func (c *client) SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...R
}
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
}
return protoClient.SplitRegions(ctx, req)
Expand All @@ -1539,6 +1549,7 @@ func (c *client) scatterRegionsWithOptions(ctx context.Context, regionsID []uint
opt(options)
}
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
req := &pdpb.ScatterRegionRequest{
Header: c.requestHeader(),
Group: options.group,
Expand All @@ -1549,11 +1560,9 @@ func (c *client) scatterRegionsWithOptions(ctx context.Context, regionsID []uint

protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
}
resp, err := protoClient.ScatterRegion(ctx, req)
cancel()

if err != nil {
return nil, err
Expand All @@ -1575,6 +1584,7 @@ func trimHTTPPrefix(str string) string {
return str
}

// LoadGlobalConfig implements the RPCClient interface.
func (c *client) LoadGlobalConfig(ctx context.Context, names []string, configPath string) ([]GlobalConfigItem, int64, error) {
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
Expand Down Expand Up @@ -1602,6 +1612,7 @@ func (c *client) LoadGlobalConfig(ctx context.Context, names []string, configPat
return res, resp.GetRevision(), nil
}

// StoreGlobalConfig implements the RPCClient interface.
func (c *client) StoreGlobalConfig(ctx context.Context, configPath string, items []GlobalConfigItem) error {
resArr := make([]*pdpb.GlobalConfigItem, len(items))
for i, it := range items {
Expand All @@ -1620,6 +1631,7 @@ func (c *client) StoreGlobalConfig(ctx context.Context, configPath string, items
return nil
}

// WatchGlobalConfig implements the RPCClient interface.
func (c *client) WatchGlobalConfig(ctx context.Context, configPath string, revision int64) (chan []GlobalConfigItem, error) {
// TODO: Add retry mechanism
// register watch components there
Expand Down Expand Up @@ -1671,6 +1683,7 @@ func (c *client) WatchGlobalConfig(ctx context.Context, configPath string, revis
return globalConfigWatcherCh, err
}

// GetExternalTimestamp implements the RPCClient interface.
func (c *client) GetExternalTimestamp(ctx context.Context) (uint64, error) {
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
Expand All @@ -1691,6 +1704,7 @@ func (c *client) GetExternalTimestamp(ctx context.Context) (uint64, error) {
return resp.GetTimestamp(), nil
}

// SetExternalTimestamp implements the RPCClient interface.
func (c *client) SetExternalTimestamp(ctx context.Context, timestamp uint64) error {
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
Expand Down
Loading

0 comments on commit 7965d10

Please sign in to comment.