Skip to content

Commit

Permalink
Fully integrated PD HTTP client into InfoSyncer
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Nov 29, 2023
1 parent 129be4c commit 72f3c32
Show file tree
Hide file tree
Showing 13 changed files with 126 additions and 556 deletions.
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -7019,13 +7019,13 @@ def go_deps():
name = "com_github_tikv_pd_client",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/pd/client",
sha256 = "5232ba0bba677a6d4614ae2cc102554d59cd00d473d9138739508d6f25169f02",
strip_prefix = "github.com/tikv/pd/[email protected]20231127075044-9f4803d8bd05",
sha256 = "15430134936aabc977df9daaa542b93b8ff197e258d529a13fd48a23f2360f3b",
strip_prefix = "github.com/JmPotato/pd/[email protected]20231129080121-11c3b737f508",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231127075044-9f4803d8bd05.zip",
"http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231127075044-9f4803d8bd05.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231127075044-9f4803d8bd05.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231127075044-9f4803d8bd05.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/JmPotato/pd/client/com_github_jmpotato_pd_client-v0.0.0-20231129080121-11c3b737f508.zip",
"http://ats.apps.svc/gomod/github.com/JmPotato/pd/client/com_github_jmpotato_pd_client-v0.0.0-20231129080121-11c3b737f508.zip",
"https://cache.hawkingrei.com/gomod/github.com/JmPotato/pd/client/com_github_jmpotato_pd_client-v0.0.0-20231129080121-11c3b737f508.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/JmPotato/pd/client/com_github_jmpotato_pd_client-v0.0.0-20231129080121-11c3b737f508.zip",
],
)
go_repository(
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/pdutil/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func (p *PdController) getRegionCountWith(
}
var err error
for _, addr := range p.getAllPDAddrs() {
v, e := get(ctx, addr, pdhttp.RegionStatsByKeyRange(pdhttp.NewKeyRange(start, end)), p.cli, http.MethodGet, nil)
v, e := get(ctx, addr, pdhttp.RegionStatsByKeyRange(pdhttp.NewKeyRange(start, end), false), p.cli, http.MethodGet, nil)
if e != nil {
err = e
continue
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -312,4 +312,5 @@ replace (
github.com/dgrijalva/jwt-go => github.com/form3tech-oss/jwt-go v3.2.6-0.20210809144907-32ab6a8243d7+incompatible
github.com/go-ldap/ldap/v3 => github.com/YangKeao/ldap/v3 v3.4.5-0.20230421065457-369a3bab1117
github.com/pingcap/tidb/pkg/parser => ./pkg/parser
github.com/tikv/pd/client => github.com/JmPotato/pd/client v0.0.0-20231129080121-11c3b737f508
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ github.com/HdrHistogram/hdrhistogram-go v1.1.2 h1:5IcZpTvzydCQeHzK4Ef/D5rrSqwxob
github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo=
github.com/Jeffail/gabs/v2 v2.5.1 h1:ANfZYjpMlfTTKebycu4X1AgkVWumFVDYQl7JwOr4mDk=
github.com/Jeffail/gabs/v2 v2.5.1/go.mod h1:xCn81vdHKxFUuWWAaD5jCTQDNPBMh5pPs9IJ+NcziBI=
github.com/JmPotato/pd/client v0.0.0-20231129080121-11c3b737f508 h1:QigQ4+n1KpG1g8tbJtyOvNuc/mwUNOrlVcI+FpX/TDE=
github.com/JmPotato/pd/client v0.0.0-20231129080121-11c3b737f508/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ=
github.com/Joker/hpp v1.0.0/go.mod h1:8x5n+M1Hp5hC0g8okX3sR3vFQwynaX/UgSOM9MeBKzY=
github.com/Joker/jade v1.0.1-0.20190614124447-d475f43051e7/go.mod h1:6E6s8o2AE4KhCrqr6GRJjdC/gNfTdxkIXvuGZZda2VM=
github.com/Masterminds/goutils v1.1.1 h1:5nUrii3FMTL5diU80unEVvNevw1nH4+ZV4DSLVJLSYI=
Expand Down Expand Up @@ -853,8 +855,6 @@ github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tikv/client-go/v2 v2.0.8-0.20231116051730-1c2351c28173 h1:lmJzX0kqrV7kO21wrZPbtjkidzwbDCfXeQrhDWEi5dE=
github.com/tikv/client-go/v2 v2.0.8-0.20231116051730-1c2351c28173/go.mod h1:BOGTSZtbMHEnGC4HOpbONdnTQF+E9nb2Io7c3P9sb7g=
github.com/tikv/pd/client v0.0.0-20231127075044-9f4803d8bd05 h1:87NPUfzaVrO5MTBwVCPQ/FlJGpFnHi6WFYHDYD3n3Zc=
github.com/tikv/pd/client v0.0.0-20231127075044-9f4803d8bd05/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ=
github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966 h1:quvGphlmUVU+nhpFa4gg4yJyTRJ13reZMDHrKwYw53M=
github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966/go.mod h1:27bSVNWSBOHm+qRp1T9qzaIpsWEP6TbUnei/43HK+PQ=
github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs=
Expand Down
99 changes: 0 additions & 99 deletions pkg/ddl/placement/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,10 @@
package placement

import (
"encoding/hex"
"encoding/json"
"fmt"
"regexp"
"strings"

"github.com/pingcap/tidb/pkg/util/codec"
pd "github.com/tikv/pd/client/http"
"gopkg.in/yaml.v2"
)
Expand All @@ -32,102 +29,6 @@ const (
attributeEvictLeader = "evict-leader"
)

// RuleGroupConfig defines basic config of rule group
type RuleGroupConfig struct {
ID string `json:"id"`
Index int `json:"index"`
Override bool `json:"override"`
}

var (
_ json.Marshaler = (*TiFlashRule)(nil)
_ json.Unmarshaler = (*TiFlashRule)(nil)
)

// TiFlashRule extends Rule with other necessary fields.
type TiFlashRule struct {
GroupID string
ID string
Index int
Override bool
Role pd.PeerRoleType
Count int
Constraints []pd.LabelConstraint
LocationLabels []string
IsolationLevel string
StartKey []byte
EndKey []byte
}

type tiFlashRule struct {
GroupID string `json:"group_id"`
ID string `json:"id"`
Index int `json:"index,omitempty"`
Override bool `json:"override,omitempty"`
Role pd.PeerRoleType `json:"role"`
Count int `json:"count"`
Constraints []pd.LabelConstraint `json:"label_constraints,omitempty"`
LocationLabels []string `json:"location_labels,omitempty"`
IsolationLevel string `json:"isolation_level,omitempty"`
StartKeyHex string `json:"start_key"`
EndKeyHex string `json:"end_key"`
}

// MarshalJSON implements json.Marshaler interface for TiFlashRule.
func (r *TiFlashRule) MarshalJSON() ([]byte, error) {
return json.Marshal(&tiFlashRule{
GroupID: r.GroupID,
ID: r.ID,
Index: r.Index,
Override: r.Override,
Role: r.Role,
Count: r.Count,
Constraints: r.Constraints,
LocationLabels: r.LocationLabels,
IsolationLevel: r.IsolationLevel,
StartKeyHex: hex.EncodeToString(codec.EncodeBytes(nil, r.StartKey)),
EndKeyHex: hex.EncodeToString(codec.EncodeBytes(nil, r.EndKey)),
})
}

// UnmarshalJSON implements json.Unmarshaler interface for TiFlashRule.
func (r *TiFlashRule) UnmarshalJSON(bytes []byte) error {
var rule tiFlashRule
if err := json.Unmarshal(bytes, &rule); err != nil {
return err
}
*r = TiFlashRule{
GroupID: rule.GroupID,
ID: rule.ID,
Index: rule.Index,
Override: rule.Override,
Role: rule.Role,
Count: rule.Count,
Constraints: rule.Constraints,
LocationLabels: rule.LocationLabels,
IsolationLevel: rule.IsolationLevel,
}

startKey, err := hex.DecodeString(rule.StartKeyHex)
if err != nil {
return err
}

endKey, err := hex.DecodeString(rule.EndKeyHex)
if err != nil {
return err
}

_, r.StartKey, err = codec.DecodeBytes(startKey, nil)
if err != nil {
return err
}

_, r.EndKey, err = codec.DecodeBytes(endKey, nil)

return err
}

// RuleBuilder is used to build the Rules from a constraint string.
type RuleBuilder struct {
role pd.PeerRoleType
Expand Down
1 change: 1 addition & 0 deletions pkg/domain/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func TestInfo(t *testing.T) {
t.Skip("ETCD use ip:port as unix socket address, skip when it is unavailable.")
}

// NOTICE: this failpoint has been REMOVED, be aware of this if you want to reopen this test.
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/domain/infosync/FailPlacement", `return(true)`))

s, err := mockstore.NewMockStore()
Expand Down
1 change: 1 addition & 0 deletions pkg/domain/infosync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ go_test(
"//pkg/util",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
"@com_github_tikv_pd_client//http",
"@io_etcd_go_etcd_tests_v3//integration",
"@org_uber_go_goleak//:goleak",
],
Expand Down
112 changes: 20 additions & 92 deletions pkg/domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ type InfoSyncer struct {
// It must be used when the etcd path isn't needed to separate by keyspace.
// See keyspace RFC: https://github.com/pingcap/tidb/pull/39685
unprefixedEtcdCli *clientv3.Client
pdHTTPCli pdhttp.Client
info *ServerInfo
serverInfoPath string
minStartTS uint64
Expand Down Expand Up @@ -200,6 +201,7 @@ func GlobalInfoSyncerInit(
) (*InfoSyncer, error) {
is := &InfoSyncer{
etcdCli: etcdCli,
pdHTTPCli: pdHTTPCli.WithRespHandler(pdResponseHandler),
unprefixedEtcdCli: unprefixedEtcdCli,
info: getServerInfo(id, serverIDGetter),
serverInfoPath: fmt.Sprintf("%s/%s", ServerInformationPath, id),
Expand All @@ -209,13 +211,10 @@ func GlobalInfoSyncerInit(
if err != nil {
return nil, err
}
if pdHTTPCli != nil {
pdHTTPCli = pdHTTPCli.WithRespHandler(pdResponseHandler)
}
is.labelRuleManager = initLabelRuleManager(pdHTTPCli)
is.placementManager = initPlacementManager(pdHTTPCli)
is.scheduleManager = initScheduleManager(etcdCli)
is.tiflashReplicaManager = initTiFlashReplicaManager(etcdCli, codec)
is.labelRuleManager = initLabelRuleManager(is.pdHTTPCli)
is.placementManager = initPlacementManager(is.pdHTTPCli)
is.scheduleManager = initScheduleManager(is.pdHTTPCli)
is.tiflashReplicaManager = initTiFlashReplicaManager(is.pdHTTPCli, codec)
is.resourceManagerClient = initResourceManagerClient(pdCli)
setGlobalInfoSyncer(is)
return is, nil
Expand Down Expand Up @@ -299,20 +298,20 @@ func initResourceManagerClient(pdCli pd.Client) (cli pd.ResourceManagerClient) {
return
}

func initTiFlashReplicaManager(etcdCli *clientv3.Client, codec tikv.Codec) TiFlashReplicaManager {
if etcdCli == nil {
func initTiFlashReplicaManager(pdHTTPCli pdhttp.Client, codec tikv.Codec) TiFlashReplicaManager {
if pdHTTPCli == nil {
m := mockTiFlashReplicaManagerCtx{tiflashProgressCache: make(map[int64]float64)}
return &m
}
logutil.BgLogger().Warn("init TiFlashReplicaManager", zap.Strings("pd addrs", etcdCli.Endpoints()))
return &TiFlashReplicaManagerCtx{etcdCli: etcdCli, tiflashProgressCache: make(map[int64]float64), codec: codec}
logutil.BgLogger().Warn("init TiFlashReplicaManager")
return &TiFlashReplicaManagerCtx{pdHTTPCli: pdHTTPCli, tiflashProgressCache: make(map[int64]float64), codec: codec}
}

func initScheduleManager(etcdCli *clientv3.Client) ScheduleManager {
if etcdCli == nil {
func initScheduleManager(pdHTTPCli pdhttp.Client) ScheduleManager {
if pdHTTPCli == nil {
return &mockScheduleManager{}
}
return &PDScheduleManager{etcdCli: etcdCli}
return &PDScheduleManager{pdHTTPCli}
}

// GetMockTiFlash can only be used in tests to get MockTiFlash
Expand Down Expand Up @@ -450,7 +449,7 @@ func MustGetTiFlashProgress(tableID int64, replicaCount uint64, tiFlashStores *m
}

// pdResponseHandler will be injected into the PD HTTP client to handle the response,
// this is to maintain consistency with the logic in the `doRequest`.
// this is to maintain consistency with the original logic without the PD HTTP client.
func pdResponseHandler(resp *http.Response, res interface{}) error {
defer func() { terror.Log(resp.Body.Close()) }()
bodyBytes, err := io.ReadAll(resp.Body)
Expand All @@ -475,77 +474,6 @@ func pdResponseHandler(resp *http.Response, res interface{}) error {
return nil
}

// TODO: replace with the unified PD HTTP client.
func doRequest(ctx context.Context, apiName string, addrs []string, route, method string, body io.Reader) ([]byte, error) {
var (
err error
req *http.Request
res *http.Response
)
for idx, addr := range addrs {
url := util2.ComposeURL(addr, route)
req, err = http.NewRequestWithContext(ctx, method, url, body)
if err != nil {
return nil, err
}
if body != nil {
req.Header.Set("Content-Type", "application/json")
}
start := time.Now()
res, err = doRequestWithFailpoint(req)
if err == nil {
metrics.PDAPIExecutionHistogram.WithLabelValues(apiName).Observe(time.Since(start).Seconds())
metrics.PDAPIRequestCounter.WithLabelValues(apiName, res.Status).Inc()
bodyBytes, err := io.ReadAll(res.Body)
if err != nil {
terror.Log(res.Body.Close())
return nil, err
}
if res.StatusCode != http.StatusOK {
logutil.BgLogger().Warn("response not 200",
zap.String("method", method),
zap.String("hosts", addr),
zap.String("url", url),
zap.Int("http status", res.StatusCode),
zap.Int("address order", idx),
)
err = ErrHTTPServiceError.FastGen("%s", bodyBytes)
if res.StatusCode == http.StatusNotFound || res.StatusCode == http.StatusPreconditionFailed {
err = nil
bodyBytes = nil
}
}
terror.Log(res.Body.Close())
return bodyBytes, err
}
metrics.PDAPIRequestCounter.WithLabelValues(apiName, "network error").Inc()
logutil.BgLogger().Warn("fail to doRequest",
zap.Error(err),
zap.Bool("retry next address", idx == len(addrs)-1),
zap.String("method", method),
zap.String("hosts", addr),
zap.String("url", url),
zap.Int("address order", idx),
)
}
return nil, err
}

func doRequestWithFailpoint(req *http.Request) (resp *http.Response, err error) {
fpEnabled := false
failpoint.Inject("FailPlacement", func(val failpoint.Value) {
if val.(bool) {
fpEnabled = true
resp = &http.Response{StatusCode: http.StatusNotFound, Body: http.NoBody}
err = nil
}
})
if fpEnabled {
return
}
return util2.InternalHTTPClient().Do(req)
}

// GetAllRuleBundles is used to get all rule bundles from PD It is used to load full rules from PD while fullload infoschema.
func GetAllRuleBundles(ctx context.Context) ([]*placement.Bundle, error) {
is, err := getGlobalInfoSyncer()
Expand Down Expand Up @@ -1177,13 +1105,13 @@ func SetTiFlashGroupConfig(ctx context.Context) error {
// SetTiFlashPlacementRule is a helper function to set placement rule.
// It is discouraged to use SetTiFlashPlacementRule directly,
// use `ConfigureTiFlashPDForTable`/`ConfigureTiFlashPDForPartitions` instead.
func SetTiFlashPlacementRule(ctx context.Context, rule placement.TiFlashRule) error {
func SetTiFlashPlacementRule(ctx context.Context, rule pdhttp.Rule) error {
is, err := getGlobalInfoSyncer()
if err != nil {
return errors.Trace(err)
}
logutil.BgLogger().Info("SetTiFlashPlacementRule", zap.String("ruleID", rule.ID))
return is.tiflashReplicaManager.SetPlacementRule(ctx, rule)
return is.tiflashReplicaManager.SetPlacementRule(ctx, &rule)
}

// DeleteTiFlashPlacementRule is to delete placement rule for certain group.
Expand All @@ -1197,7 +1125,7 @@ func DeleteTiFlashPlacementRule(ctx context.Context, group string, ruleID string
}

// GetTiFlashGroupRules to get all placement rule in a certain group.
func GetTiFlashGroupRules(ctx context.Context, group string) ([]placement.TiFlashRule, error) {
func GetTiFlashGroupRules(ctx context.Context, group string) ([]*pdhttp.Rule, error) {
is, err := getGlobalInfoSyncer()
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -1241,7 +1169,7 @@ func ConfigureTiFlashPDForTable(id int64, count uint64, locationLabels *[]string
ctx := context.Background()
logutil.BgLogger().Info("ConfigureTiFlashPDForTable", zap.Int64("tableID", id), zap.Uint64("count", count))
ruleNew := MakeNewRule(id, count, *locationLabels)
if e := is.tiflashReplicaManager.SetPlacementRule(ctx, ruleNew); e != nil {
if e := is.tiflashReplicaManager.SetPlacementRule(ctx, &ruleNew); e != nil {
return errors.Trace(e)
}
return nil
Expand All @@ -1254,12 +1182,12 @@ func ConfigureTiFlashPDForPartitions(accel bool, definitions *[]model.PartitionD
return errors.Trace(err)
}
ctx := context.Background()
rules := make([]placement.TiFlashRule, 0, len(*definitions))
rules := make([]*pdhttp.Rule, 0, len(*definitions))
pids := make([]int64, 0, len(*definitions))
for _, p := range *definitions {
logutil.BgLogger().Info("ConfigureTiFlashPDForPartitions", zap.Int64("tableID", tableID), zap.Int64("partID", p.ID), zap.Bool("accel", accel), zap.Uint64("count", count))
ruleNew := MakeNewRule(p.ID, count, *locationLabels)
rules = append(rules, ruleNew)
rules = append(rules, &ruleNew)
pids = append(pids, p.ID)
}
if e := is.tiflashReplicaManager.SetPlacementRuleBatch(ctx, rules); e != nil {
Expand Down
Loading

0 comments on commit 72f3c32

Please sign in to comment.