Skip to content

Commit

Permalink
pd: replace client with NewClientWithAPIContext
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Dec 26, 2024
1 parent e641d2c commit 7597751
Show file tree
Hide file tree
Showing 27 changed files with 66 additions and 36 deletions.
2 changes: 1 addition & 1 deletion br/cmd/br/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ func setPDConfigCommand() *cobra.Command {
return errors.Trace(err)
}

mgr, err := task.NewMgr(ctx, tidbGlue, cfg.PD, cfg.TLS, task.GetKeepalive(&cfg),
mgr, err := task.NewMgr(ctx, tidbGlue, cfg.KeyspaceName, cfg.PD, cfg.TLS, task.GetKeepalive(&cfg),
cfg.CheckRequirements, false, conn.NormalVersionChecker)
if err != nil {
return errors.Trace(err)
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func checkStoresAlive(ctx context.Context,
func NewMgr(
ctx context.Context,
g glue.Glue,
keyspaceName string,
pdAddrs []string,
tlsConf *tls.Config,
securityOption pd.SecurityOption,
Expand All @@ -167,7 +168,7 @@ func NewMgr(

log.Info("new mgr", zap.Strings("pdAddrs", pdAddrs))

controller, err := pdutil.NewPdController(ctx, pdAddrs, tlsConf, securityOption)
controller, err := pdutil.NewPdController(ctx, keyspaceName, pdAddrs, tlsConf, securityOption)
if err != nil {
log.Error("failed to create pd controller", zap.Error(err))
return nil, errors.Trace(err)
Expand Down
1 change: 1 addition & 0 deletions br/pkg/pdutil/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
deps = [
"//br/pkg/errors",
"//br/pkg/httputil",
"//pkg/keyspace",
"//pkg/store/pdtypes",
"//pkg/tablecodec",
"//pkg/util/codec",
Expand Down
6 changes: 4 additions & 2 deletions br/pkg/pdutil/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/pkg/keyspace"
"github.com/pingcap/tidb/pkg/util/codec"
pd "github.com/tikv/pd/client"
pdhttp "github.com/tikv/pd/client/http"
Expand Down Expand Up @@ -147,6 +148,7 @@ type PdController struct {
// NewPdController creates a new PdController.
func NewPdController(
ctx context.Context,
keyspaceName string,
pdAddrs []string,
tlsConf *tls.Config,
securityOption pd.SecurityOption,
Expand All @@ -155,8 +157,8 @@ func NewPdController(
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(maxMsgSize)),
}
pdClient, err := pd.NewClientWithContext(
ctx, caller.Component("br-pd-controller"), pdAddrs, securityOption,
pdClient, err := pd.NewClientWithAPIContext(
ctx, keyspace.BuildAPIContext(keyspaceName), caller.Component("br-pd-controller"), pdAddrs, securityOption,
opt.WithGRPCDialOptions(maxCallMsgSize...),
// If the time too short, we may scatter a region many times, because
// the interface `ScatterRegions` may time out.
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
// Domain loads all table info into memory. By skipping Domain, we save
// lots of memory (about 500MB for 40K 40 fields YCSB tables).
needDomain := !skipStats
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements, needDomain, conn.NormalVersionChecker)
mgr, err := NewMgr(ctx, g, cfg.KeyspaceName, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements, needDomain, conn.NormalVersionChecker)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/backup_ebs.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func RunBackupEBS(c context.Context, g glue.Glue, cfg *BackupConfig) error {
if err != nil {
return errors.Trace(err)
}
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements, false, conn.NormalVersionChecker)
mgr, err := NewMgr(ctx, g, cfg.KeyspaceName, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements, false, conn.NormalVersionChecker)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/backup_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf
}
// Backup raw does not need domain.
needDomain := false
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements, needDomain, conn.NormalVersionChecker)
mgr, err := NewMgr(ctx, g, cfg.KeyspaceName, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements, needDomain, conn.NormalVersionChecker)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/backup_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func RunBackupTxn(c context.Context, g glue.Glue, cmdName string, cfg *TxnKvConf
}
// Backup txn does not need domain.
needDomain := false
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements, needDomain, conn.NormalVersionChecker)
mgr, err := NewMgr(ctx, g, cfg.KeyspaceName, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements, needDomain, conn.NormalVersionChecker)
if err != nil {
return errors.Trace(err)
}
Expand Down
5 changes: 2 additions & 3 deletions br/pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,7 @@ func (cfg *Config) OverrideDefaultForBackup() {

// NewMgr creates a new mgr at the given PD address.
func NewMgr(ctx context.Context,
g glue.Glue, pds []string,
g glue.Glue, keyspaceName string, pds []string,
tlsConfig TLSConfig,
keepalive keepalive.ClientParameters,
checkRequirements bool,
Expand Down Expand Up @@ -812,7 +812,7 @@ func NewMgr(ctx context.Context,

// Is it necessary to remove `StoreBehavior`?
return conn.NewMgr(
ctx, g, pds, tlsConf, securityOption, keepalive, util.SkipTiFlash,
ctx, g, keyspaceName, pds, tlsConf, securityOption, keepalive, util.SkipTiFlash,
checkRequirements, needDomain, versionCheckerType,
)
}
Expand Down Expand Up @@ -989,7 +989,6 @@ func progressFileWriterRoutine(ctx context.Context, progress glue.Progress, tota
case <-ctx.Done():
return
case <-time.After(500 * time.Millisecond):
break
}
cur := progress.GetCurrent()
p := float64(cur) / float64(total)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/operator/prepare_snap.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func dialPD(ctx context.Context, cfg *task.Config) (*pdutil.PdController, error)
return nil, err
}
}
mgr, err := pdutil.NewPdController(ctx, cfg.PD, tc, cfg.TLS.ToPDSecurityOption())
mgr, err := pdutil.NewPdController(ctx, cfg.KeyspaceName, cfg.PD, tc, cfg.TLS.ToPDSecurityOption())
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
})

// TODO: remove version checker from `NewMgr`
mgr, err := NewMgr(c, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements, true, conn.NormalVersionChecker)
mgr, err := NewMgr(c, g, cfg.KeyspaceName, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements, true, conn.NormalVersionChecker)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/restore_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func RunResolveKvData(c context.Context, g glue.Glue, cmdName string, cfg *Resto
summary.CollectUint("resolve-ts", resolveTS)

keepaliveCfg := GetKeepalive(&cfg.Config)
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, keepaliveCfg, cfg.CheckRequirements, false, conn.NormalVersionChecker)
mgr, err := NewMgr(ctx, g, cfg.KeyspaceName, cfg.PD, cfg.TLS, keepaliveCfg, cfg.CheckRequirements, false, conn.NormalVersionChecker)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/restore_ebs_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (h *restoreEBSMetaHelper) preRestore(ctx context.Context) error {
}
}

controller, err := pdutil.NewPdController(ctx, h.cfg.PD, tlsConf, securityOption)
controller, err := pdutil.NewPdController(ctx, h.cfg.KeyspaceName, h.cfg.PD, tlsConf, securityOption)
if err != nil {
log.Error("fail to create pd controller", zap.Error(err))
return errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/restore_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR

// Restore raw does not need domain.
needDomain := false
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements, needDomain, conn.NormalVersionChecker)
mgr, err := NewMgr(ctx, g, cfg.KeyspaceName, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements, needDomain, conn.NormalVersionChecker)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/restore_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func RunRestoreTxn(c context.Context, g glue.Glue, cmdName string, cfg *Config)
defer cancel()

// Restore raw does not need domain.
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(cfg), cfg.CheckRequirements, false, conn.NormalVersionChecker)
mgr, err := NewMgr(ctx, g,cfg.KeyspaceName, cfg.PD, cfg.TLS, GetKeepalive(cfg), cfg.CheckRequirements, false, conn.NormalVersionChecker)
if err != nil {
return errors.Trace(err)
}
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ type streamMgr struct {
}

func NewStreamMgr(ctx context.Context, cfg *StreamConfig, g glue.Glue, isStreamStart bool) (*streamMgr, error) {
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config),
mgr, err := NewMgr(ctx, g, cfg.KeyspaceName, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config),
cfg.CheckRequirements, false, conn.StreamVersionChecker)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -919,7 +919,7 @@ func RunStreamResume(
func RunStreamAdvancer(c context.Context, g glue.Glue, cmdName string, cfg *StreamConfig) error {
ctx, cancel := context.WithCancel(c)
defer cancel()
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config),
mgr, err := NewMgr(ctx, g, cfg.KeyspaceName, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config),
cfg.CheckRequirements, false, conn.StreamVersionChecker)
if err != nil {
return err
Expand Down Expand Up @@ -969,7 +969,7 @@ func makeStatusController(ctx context.Context, cfg *StreamConfig, g glue.Glue) (
} else {
printer = stream.PrintTaskWithJSON(console)
}
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config),
mgr, err := NewMgr(ctx, g, cfg.KeyspaceName, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config),
cfg.CheckRequirements, false, conn.StreamVersionChecker)
if err != nil {
return nil, err
Expand Down
9 changes: 6 additions & 3 deletions lightning/pkg/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ type Controller struct {
tikvModeSwitcher local.TiKVModeSwitcher

keyspaceName string
apiContext pd.APIContext
resourceGroupName string
taskType string
}
Expand Down Expand Up @@ -349,6 +350,7 @@ func NewImportControllerWithPauser(
var backendObj backend.Backend
var pdCli pd.Client
var pdHTTPCli pdhttp.Client
apiContext := keyspace.BuildAPIContext(p.KeyspaceName)
switch cfg.TikvImporter.Backend {
case config.BackendTiDB:
encodingBuilder = tidb.NewEncodingBuilder()
Expand All @@ -366,7 +368,7 @@ func NewImportControllerWithPauser(
}

addrs := strings.Split(cfg.TiDB.PdAddr, ",")
pdCli, err = pd.NewClientWithContext(ctx, componentName, addrs, tls.ToPDSecurityOption())
pdCli, err = pd.NewClientWithAPIContext(ctx, apiContext, componentName, addrs, tls.ToPDSecurityOption())
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -526,6 +528,7 @@ func NewImportControllerWithPauser(
tikvModeSwitcher: local.NewTiKVModeSwitcher(tls.TLSConfig(), pdHTTPCli, log.FromContext(ctx).Logger),

keyspaceName: p.KeyspaceName,
apiContext: apiContext,
resourceGroupName: p.ResourceGroupName,
taskType: p.TaskType,
}
Expand Down Expand Up @@ -1181,7 +1184,7 @@ const (
func (rc *Controller) keepPauseGCForDupeRes(ctx context.Context) (<-chan struct{}, error) {
tlsOpt := rc.tls.ToPDSecurityOption()
addrs := strings.Split(rc.cfg.TiDB.PdAddr, ",")
pdCli, err := pd.NewClientWithContext(ctx, componentName, addrs, tlsOpt)
pdCli, err := pd.NewClientWithAPIContext(ctx, rc.apiContext, componentName, addrs, tlsOpt)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -1867,7 +1870,7 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error {
if isLocalBackend(rc.cfg) {
pdAddrs := rc.pdCli.GetServiceDiscovery().GetServiceURLs()
pdController, err := pdutil.NewPdController(
ctx, pdAddrs, rc.tls.TLSConfig(), rc.tls.ToPDSecurityOption(),
ctx, rc.keyspaceName, pdAddrs, rc.tls.TLSConfig(), rc.tls.ToPDSecurityOption(),
)
if err != nil {
return common.NormalizeOrWrapErr(common.ErrCreatePDClient, err)
Expand Down
6 changes: 0 additions & 6 deletions pkg/executor/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ import (
"github.com/pingcap/tidb/pkg/util/filter"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/stringutil"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -170,11 +169,6 @@ func (t DataSourceType) String() string {
return string(t)
}

var (
// NewClientWithContext returns a kv.Client.
NewClientWithContext = pd.NewClientWithContext
)

// FieldMapping indicates the relationship between input field and table column or user variable
type FieldMapping struct {
Column *table.Column
Expand Down
4 changes: 3 additions & 1 deletion pkg/executor/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import (
"github.com/pingcap/tidb/pkg/util/syncutil"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/util"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/pkg/caller"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/multierr"
Expand Down Expand Up @@ -140,7 +141,8 @@ func GetRegionSplitSizeKeys(ctx context.Context) (regionSplitSize int64, regionS
}
tlsOpt := tls.ToPDSecurityOption()
addrs := strings.Split(tidbCfg.Path, ",")
pdCli, err := NewClientWithContext(ctx, caller.Component("tidb-table-importer"), addrs, tlsOpt)
apiContext := keyspace.BuildAPIContext(tidbCfg.KeyspaceName)
pdCli, err := pd.NewClientWithAPIContext(ctx, apiContext, caller.Component("tidb-table-importer"), addrs, tlsOpt)
if err != nil {
return 0, 0, errors.Trace(err)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/keyspace/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"//pkg/config",
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_pd_client//:client",
"@org_uber_go_zap//:zap",
"@org_uber_go_zap//zapcore",
],
Expand Down
11 changes: 11 additions & 0 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/pkg/config"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
Expand Down Expand Up @@ -69,3 +70,13 @@ func WrapZapcoreWithKeyspace() zap.Option {
return core
})
}

// BuildAPIContext is used to build APIContext.
func BuildAPIContext(keyspaceName string) (apiContext pd.APIContext) {
if len(keyspaceName) == 0 {
apiContext = pd.NewAPIContextV1()
} else {
apiContext = pd.NewAPIContextV2(keyspaceName)
}
return
}
1 change: 1 addition & 0 deletions pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ go_library(
"//br/pkg/version",
"//pkg/distsql",
"//pkg/infoschema",
"//pkg/keyspace",
"//pkg/kv",
"//pkg/lightning/backend",
"//pkg/lightning/backend/encode",
Expand Down
6 changes: 4 additions & 2 deletions pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/pingcap/tidb/br/pkg/restore/split"
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/keyspace"
"github.com/pingcap/tidb/pkg/lightning/backend"
"github.com/pingcap/tidb/pkg/lightning/backend/encode"
"github.com/pingcap/tidb/pkg/lightning/backend/external"
Expand Down Expand Up @@ -595,8 +596,9 @@ func NewBackend(
} else {
pdAddrs = strings.Split(config.PDAddr, ",")
}
pdCli, err = pd.NewClientWithContext(
ctx, caller.Component("lightning-local-backend"), pdAddrs, tls.ToPDSecurityOption(),
apiContext := keyspace.BuildAPIContext(config.KeyspaceName)
pdCli, err = pd.NewClientWithAPIContext(
ctx, apiContext, caller.Component("lightning-local-backend"), pdAddrs, tls.ToPDSecurityOption(),
opt.WithGRPCDialOptions(maxCallMsgSize...),
// If the time too short, we may scatter a region many times, because
// the interface `ScatterRegions` may time out.
Expand Down
1 change: 1 addition & 0 deletions pkg/store/driver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
importpath = "github.com/pingcap/tidb/pkg/store/driver",
visibility = ["//visibility:public"],
deps = [
"//pkg/keyspace",
"//pkg/kv",
"//pkg/metrics",
"//pkg/sessionctx/variable",
Expand Down
3 changes: 2 additions & 1 deletion pkg/store/driver/tikv_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/errors"
deadlockpb "github.com/pingcap/kvproto/pkg/deadlock"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/pkg/keyspace"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
Expand Down Expand Up @@ -156,7 +157,7 @@ func (d TiKVDriver) OpenWithOptions(path string, options ...Option) (resStore kv
}
}()

pdCli, err = pd.NewClient(caller.Component("tidb-tikv-driver"), etcdAddrs, pd.SecurityOption{
pdCli, err = pd.NewClientWithAPIContext(context.Background(), keyspace.BuildAPIContext(keyspaceName), caller.Component("tidb-tikv-driver"), etcdAddrs, pd.SecurityOption{
CAPath: d.security.ClusterSSLCA,
CertPath: d.security.ClusterSSLCert,
KeyPath: d.security.ClusterSSLKey,
Expand Down
Loading

0 comments on commit 7597751

Please sign in to comment.