Skip to content

Commit

Permalink
client: separate the metrics package (#8833)
Browse files Browse the repository at this point in the history
ref #8690

Separate the client metrics package.

Signed-off-by: JmPotato <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
JmPotato and ti-chi-bot[bot] authored Nov 22, 2024
1 parent be4a366 commit 20c4157
Show file tree
Hide file tree
Showing 11 changed files with 331 additions and 311 deletions.
55 changes: 28 additions & 27 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/tikv/pd/client/caller"
"github.com/tikv/pd/client/clients/metastorage"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/metrics"
"github.com/tikv/pd/client/opt"
"github.com/tikv/pd/client/utils/tlsutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -555,7 +556,7 @@ func (c *client) UpdateOption(option opt.DynamicOption, value any) error {
// GetAllMembers gets the members Info from PD.
func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) {
start := time.Now()
defer func() { cmdDurationGetAllMembers.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationGetAllMembers.Observe(time.Since(start).Seconds()) }()

ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
defer cancel()
Expand All @@ -565,7 +566,7 @@ func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) {
return nil, errs.ErrClientGetProtoClient
}
resp, err := protoClient.GetMembers(ctx, req)
if err = c.respForErr(cmdFailDurationGetAllMembers, start, err, resp.GetHeader()); err != nil {
if err = c.respForErr(metrics.CmdFailedDurationGetAllMembers, start, err, resp.GetHeader()); err != nil {
return nil, err
}
return resp.GetMembers(), nil
Expand Down Expand Up @@ -683,7 +684,7 @@ func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationGetRegion.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationGetRegion.Observe(time.Since(start).Seconds()) }()

var resp *pdpb.GetRegionResponse
for _, url := range memberURLs {
Expand All @@ -707,7 +708,7 @@ func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs
}

if resp == nil {
cmdFailDurationGetRegion.Observe(time.Since(start).Seconds())
metrics.CmdFailedDurationGetRegion.Observe(time.Since(start).Seconds())
c.inner.pdSvcDiscovery.ScheduleCheckMemberChanged()
errorMsg := fmt.Sprintf("[pd] can't get region info from member URLs: %+v", memberURLs)
return nil, errors.WithStack(errors.New(errorMsg))
Expand All @@ -722,7 +723,7 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegio
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationGetRegion.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationGetRegion.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
defer cancel()

Expand All @@ -749,7 +750,7 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegio
resp, err = protoClient.GetRegion(cctx, req)
}

if err = c.respForErr(cmdFailDurationGetRegion, start, err, resp.GetHeader()); err != nil {
if err = c.respForErr(metrics.CmdFailedDurationGetRegion, start, err, resp.GetHeader()); err != nil {
return nil, err
}
return handleRegionResponse(resp), nil
Expand All @@ -762,7 +763,7 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetR
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationGetPrevRegion.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationGetPrevRegion.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
defer cancel()

Expand All @@ -789,7 +790,7 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetR
resp, err = protoClient.GetPrevRegion(cctx, req)
}

if err = c.respForErr(cmdFailDurationGetPrevRegion, start, err, resp.GetHeader()); err != nil {
if err = c.respForErr(metrics.CmdFailedDurationGetPrevRegion, start, err, resp.GetHeader()); err != nil {
return nil, err
}
return handleRegionResponse(resp), nil
Expand All @@ -802,7 +803,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationGetRegionByID.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationGetRegionByID.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
defer cancel()

Expand All @@ -829,7 +830,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt
resp, err = protoClient.GetRegionByID(cctx, req)
}

if err = c.respForErr(cmdFailedDurationGetRegionByID, start, err, resp.GetHeader()); err != nil {
if err = c.respForErr(metrics.CmdFailedDurationGetRegionByID, start, err, resp.GetHeader()); err != nil {
return nil, err
}
return handleRegionResponse(resp), nil
Expand All @@ -842,7 +843,7 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int,
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationScanRegions.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationScanRegions.Observe(time.Since(start).Seconds()) }()

var cancel context.CancelFunc
scanCtx := ctx
Expand Down Expand Up @@ -879,7 +880,7 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int,
resp, err = protoClient.ScanRegions(cctx, req)
}

if err = c.respForErr(cmdFailedDurationScanRegions, start, err, resp.GetHeader()); err != nil {
if err = c.respForErr(metrics.CmdFailedDurationScanRegions, start, err, resp.GetHeader()); err != nil {
return nil, err
}

Expand All @@ -893,7 +894,7 @@ func (c *client) BatchScanRegions(ctx context.Context, ranges []KeyRange, limit
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationBatchScanRegions.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationBatchScanRegions.Observe(time.Since(start).Seconds()) }()

var cancel context.CancelFunc
scanCtx := ctx
Expand Down Expand Up @@ -933,7 +934,7 @@ func (c *client) BatchScanRegions(ctx context.Context, ranges []KeyRange, limit
resp, err = protoClient.BatchScanRegions(cctx, req)
}

if err = c.respForErr(cmdFailedDurationBatchScanRegions, start, err, resp.GetHeader()); err != nil {
if err = c.respForErr(metrics.CmdFailedDurationBatchScanRegions, start, err, resp.GetHeader()); err != nil {
return nil, err
}

Expand Down Expand Up @@ -993,7 +994,7 @@ func (c *client) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, e
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationGetStore.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationGetStore.Observe(time.Since(start).Seconds()) }()

ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
defer cancel()
Expand All @@ -1007,7 +1008,7 @@ func (c *client) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, e
}
resp, err := protoClient.GetStore(ctx, req)

if err = c.respForErr(cmdFailedDurationGetStore, start, err, resp.GetHeader()); err != nil {
if err = c.respForErr(metrics.CmdFailedDurationGetStore, start, err, resp.GetHeader()); err != nil {
return nil, err
}
return handleStoreResponse(resp)
Expand Down Expand Up @@ -1037,7 +1038,7 @@ func (c *client) GetAllStores(ctx context.Context, opts ...opt.GetStoreOption) (
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationGetAllStores.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationGetAllStores.Observe(time.Since(start).Seconds()) }()

ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
defer cancel()
Expand All @@ -1051,7 +1052,7 @@ func (c *client) GetAllStores(ctx context.Context, opts ...opt.GetStoreOption) (
}
resp, err := protoClient.GetAllStores(ctx, req)

if err = c.respForErr(cmdFailedDurationGetAllStores, start, err, resp.GetHeader()); err != nil {
if err = c.respForErr(metrics.CmdFailedDurationGetAllStores, start, err, resp.GetHeader()); err != nil {
return nil, err
}
return resp.GetStores(), nil
Expand All @@ -1064,7 +1065,7 @@ func (c *client) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint6
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationUpdateGCSafePoint.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationUpdateGCSafePoint.Observe(time.Since(start).Seconds()) }()

ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
defer cancel()
Expand All @@ -1078,7 +1079,7 @@ func (c *client) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint6
}
resp, err := protoClient.UpdateGCSafePoint(ctx, req)

if err = c.respForErr(cmdFailedDurationUpdateGCSafePoint, start, err, resp.GetHeader()); err != nil {
if err = c.respForErr(metrics.CmdFailedDurationUpdateGCSafePoint, start, err, resp.GetHeader()); err != nil {
return 0, err
}
return resp.GetNewSafePoint(), nil
Expand All @@ -1095,7 +1096,7 @@ func (c *client) UpdateServiceGCSafePoint(ctx context.Context, serviceID string,
}

start := time.Now()
defer func() { cmdDurationUpdateServiceGCSafePoint.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationUpdateServiceGCSafePoint.Observe(time.Since(start).Seconds()) }()

ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
defer cancel()
Expand All @@ -1111,7 +1112,7 @@ func (c *client) UpdateServiceGCSafePoint(ctx context.Context, serviceID string,
}
resp, err := protoClient.UpdateServiceGCSafePoint(ctx, req)

if err = c.respForErr(cmdFailedDurationUpdateServiceGCSafePoint, start, err, resp.GetHeader()); err != nil {
if err = c.respForErr(metrics.CmdFailedDurationUpdateServiceGCSafePoint, start, err, resp.GetHeader()); err != nil {
return 0, err
}
return resp.GetMinSafePoint(), nil
Expand All @@ -1128,7 +1129,7 @@ func (c *client) ScatterRegion(ctx context.Context, regionID uint64) error {

func (c *client) scatterRegionsWithGroup(ctx context.Context, regionID uint64, group string) error {
start := time.Now()
defer func() { cmdDurationScatterRegion.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationScatterRegion.Observe(time.Since(start).Seconds()) }()

ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
defer cancel()
Expand Down Expand Up @@ -1167,7 +1168,7 @@ func (c *client) SplitAndScatterRegions(ctx context.Context, splitKeys [][]byte,
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationSplitAndScatterRegions.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationSplitAndScatterRegions.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
defer cancel()
options := &opt.RegionsOp{}
Expand Down Expand Up @@ -1195,7 +1196,7 @@ func (c *client) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOpe
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationGetOperator.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationGetOperator.Observe(time.Since(start).Seconds()) }()

ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
defer cancel()
Expand All @@ -1217,7 +1218,7 @@ func (c *client) SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...o
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationSplitRegions.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationSplitRegions.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
defer cancel()
options := &opt.RegionsOp{}
Expand Down Expand Up @@ -1246,7 +1247,7 @@ func (c *client) requestHeader() *pdpb.RequestHeader {

func (c *client) scatterRegionsWithOptions(ctx context.Context, regionsID []uint64, opts ...opt.RegionsOption) (*pdpb.ScatterRegionResponse, error) {
start := time.Now()
defer func() { cmdDurationScatterRegions.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationScatterRegions.Observe(time.Since(start).Seconds()) }()
options := &opt.RegionsOp{}
for _, opt := range opts {
opt(options)
Expand Down
9 changes: 5 additions & 4 deletions client/gc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/metrics"
"go.uber.org/zap"
)

Expand All @@ -39,7 +40,7 @@ func (c *client) UpdateGCSafePointV2(ctx context.Context, keyspaceID uint32, saf
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationUpdateGCSafePointV2.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationUpdateGCSafePointV2.Observe(time.Since(start).Seconds()) }()

ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
req := &pdpb.UpdateGCSafePointV2Request{
Expand All @@ -55,7 +56,7 @@ func (c *client) UpdateGCSafePointV2(ctx context.Context, keyspaceID uint32, saf
resp, err := protoClient.UpdateGCSafePointV2(ctx, req)
cancel()

if err = c.respForErr(cmdFailedDurationUpdateGCSafePointV2, start, err, resp.GetHeader()); err != nil {
if err = c.respForErr(metrics.CmdFailedDurationUpdateGCSafePointV2, start, err, resp.GetHeader()); err != nil {
return 0, err
}
return resp.GetNewSafePoint(), nil
Expand All @@ -68,7 +69,7 @@ func (c *client) UpdateServiceSafePointV2(ctx context.Context, keyspaceID uint32
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationUpdateServiceSafePointV2.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationUpdateServiceSafePointV2.Observe(time.Since(start).Seconds()) }()

ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
req := &pdpb.UpdateServiceSafePointV2Request{
Expand All @@ -85,7 +86,7 @@ func (c *client) UpdateServiceSafePointV2(ctx context.Context, keyspaceID uint32
}
resp, err := protoClient.UpdateServiceSafePointV2(ctx, req)
cancel()
if err = c.respForErr(cmdFailedDurationUpdateServiceSafePointV2, start, err, resp.GetHeader()); err != nil {
if err = c.respForErr(metrics.CmdFailedDurationUpdateServiceSafePointV2, start, err, resp.GetHeader()); err != nil {
return 0, err
}
return resp.GetMinSafePoint(), nil
Expand Down
3 changes: 2 additions & 1 deletion client/inner_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/metrics"
"github.com/tikv/pd/client/opt"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand Down Expand Up @@ -160,7 +161,7 @@ func (c *innerClient) close() {
func (c *innerClient) setup() error {
// Init the metrics.
if c.option.InitMetrics {
initAndRegisterMetrics(c.option.MetricsLabels)
metrics.InitAndRegisterMetrics(c.option.MetricsLabels)
}

// Init the client base.
Expand Down
19 changes: 10 additions & 9 deletions client/keyspace_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/metrics"
)

// KeyspaceClient manages keyspace metadata.
Expand Down Expand Up @@ -51,7 +52,7 @@ func (c *client) LoadKeyspace(ctx context.Context, name string) (*keyspacepb.Key
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationLoadKeyspace.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationLoadKeyspace.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
req := &keyspacepb.LoadKeyspaceRequest{
Header: c.requestHeader(),
Expand All @@ -66,13 +67,13 @@ func (c *client) LoadKeyspace(ctx context.Context, name string) (*keyspacepb.Key
cancel()

if err != nil {
cmdFailedDurationLoadKeyspace.Observe(time.Since(start).Seconds())
metrics.CmdFailedDurationLoadKeyspace.Observe(time.Since(start).Seconds())
c.inner.pdSvcDiscovery.ScheduleCheckMemberChanged()
return nil, err
}

if resp.Header.GetError() != nil {
cmdFailedDurationLoadKeyspace.Observe(time.Since(start).Seconds())
metrics.CmdFailedDurationLoadKeyspace.Observe(time.Since(start).Seconds())
return nil, errors.Errorf("Load keyspace %s failed: %s", name, resp.Header.GetError().String())
}

Expand All @@ -95,7 +96,7 @@ func (c *client) UpdateKeyspaceState(ctx context.Context, id uint32, state keysp
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationUpdateKeyspaceState.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationUpdateKeyspaceState.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
req := &keyspacepb.UpdateKeyspaceStateRequest{
Header: c.requestHeader(),
Expand All @@ -111,13 +112,13 @@ func (c *client) UpdateKeyspaceState(ctx context.Context, id uint32, state keysp
cancel()

if err != nil {
cmdFailedDurationUpdateKeyspaceState.Observe(time.Since(start).Seconds())
metrics.CmdFailedDurationUpdateKeyspaceState.Observe(time.Since(start).Seconds())
c.inner.pdSvcDiscovery.ScheduleCheckMemberChanged()
return nil, err
}

if resp.Header.GetError() != nil {
cmdFailedDurationUpdateKeyspaceState.Observe(time.Since(start).Seconds())
metrics.CmdFailedDurationUpdateKeyspaceState.Observe(time.Since(start).Seconds())
return nil, errors.Errorf("Update state for keyspace id %d failed: %s", id, resp.Header.GetError().String())
}

Expand All @@ -139,7 +140,7 @@ func (c *client) GetAllKeyspaces(ctx context.Context, startID uint32, limit uint
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationGetAllKeyspaces.Observe(time.Since(start).Seconds()) }()
defer func() { metrics.CmdDurationGetAllKeyspaces.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
req := &keyspacepb.GetAllKeyspacesRequest{
Header: c.requestHeader(),
Expand All @@ -155,13 +156,13 @@ func (c *client) GetAllKeyspaces(ctx context.Context, startID uint32, limit uint
cancel()

if err != nil {
cmdDurationGetAllKeyspaces.Observe(time.Since(start).Seconds())
metrics.CmdDurationGetAllKeyspaces.Observe(time.Since(start).Seconds())
c.inner.pdSvcDiscovery.ScheduleCheckMemberChanged()
return nil, err
}

if resp.Header.GetError() != nil {
cmdDurationGetAllKeyspaces.Observe(time.Since(start).Seconds())
metrics.CmdDurationGetAllKeyspaces.Observe(time.Since(start).Seconds())
return nil, errors.Errorf("Get all keyspaces metadata failed: %s", resp.Header.GetError().String())
}

Expand Down
Loading

0 comments on commit 20c4157

Please sign in to comment.