Skip to content

Commit

Permalink
Merge branch 'master' into cgroup_mem
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Jan 4, 2024
2 parents 4543df2 + 61b15c6 commit a4083da
Show file tree
Hide file tree
Showing 87 changed files with 2,065 additions and 3,009 deletions.
24 changes: 10 additions & 14 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -108,23 +108,23 @@ pd-server-basic:
# Tools

pd-ctl:
GOEXPERIMENT=$(BUILD_GOEXPERIMENT) CGO_ENABLED=$(BUILD_TOOL_CGO_ENABLED) go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/pd-ctl tools/pd-ctl/main.go
cd tools && GOEXPERIMENT=$(BUILD_GOEXPERIMENT) CGO_ENABLED=$(BUILD_TOOL_CGO_ENABLED) go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/pd-ctl pd-ctl/main.go
pd-tso-bench:
cd tools/pd-tso-bench && CGO_ENABLED=0 go build -o $(BUILD_BIN_PATH)/pd-tso-bench main.go
cd tools && CGO_ENABLED=0 go build -o $(BUILD_BIN_PATH)/pd-tso-bench pd-tso-bench/main.go
pd-api-bench:
cd tools/pd-api-bench && CGO_ENABLED=0 go build -o $(BUILD_BIN_PATH)/pd-api-bench main.go
cd tools && CGO_ENABLED=0 go build -o $(BUILD_BIN_PATH)/pd-api-bench pd-api-bench/main.go
pd-recover:
GOEXPERIMENT=$(BUILD_GOEXPERIMENT) CGO_ENABLED=$(BUILD_TOOL_CGO_ENABLED) go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/pd-recover tools/pd-recover/main.go
cd tools && GOEXPERIMENT=$(BUILD_GOEXPERIMENT) CGO_ENABLED=$(BUILD_TOOL_CGO_ENABLED) go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/pd-recover pd-recover/main.go
pd-analysis:
CGO_ENABLED=0 go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/pd-analysis tools/pd-analysis/main.go
cd tools && CGO_ENABLED=0 go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/pd-analysis pd-analysis/main.go
pd-heartbeat-bench:
CGO_ENABLED=0 go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/pd-heartbeat-bench tools/pd-heartbeat-bench/main.go
cd tools && CGO_ENABLED=0 go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/pd-heartbeat-bench pd-heartbeat-bench/main.go
simulator:
CGO_ENABLED=0 go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/pd-simulator tools/pd-simulator/main.go
cd tools && CGO_ENABLED=0 go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/pd-simulator pd-simulator/main.go
regions-dump:
CGO_ENABLED=0 go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/regions-dump tools/regions-dump/main.go
cd tools && CGO_ENABLED=0 go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/regions-dump regions-dump/main.go
stores-dump:
CGO_ENABLED=0 go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/stores-dump tools/stores-dump/main.go
cd tools && CGO_ENABLED=0 go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/stores-dump stores-dump/main.go

.PHONY: pd-ctl pd-tso-bench pd-recover pd-analysis pd-heartbeat-bench simulator regions-dump stores-dump pd-api-bench

Expand Down Expand Up @@ -240,11 +240,7 @@ basic-test: install-tools

ci-test-job: install-tools dashboard-ui
@$(FAILPOINT_ENABLE)
if [[ $(JOB_INDEX) -le 10 ]]; then \
CGO_ENABLED=1 go test -timeout=15m -tags deadlock -race -covermode=atomic -coverprofile=covprofile -coverpkg=./... $(shell ./scripts/ci-subtask.sh $(JOB_COUNT) $(JOB_INDEX)); \
else \
for mod in $(shell ./scripts/ci-subtask.sh $(JOB_COUNT) $(JOB_INDEX)); do cd $$mod && $(MAKE) ci-test-job && cd $(ROOT_PATH) > /dev/null && cat $$mod/covprofile >> covprofile; done; \
fi
./scripts/ci-subtask.sh $(JOB_COUNT) $(JOB_INDEX)
@$(FAILPOINT_DISABLE)

TSO_INTEGRATION_TEST_PKGS := $(PD_PKG)/tests/server/tso
Expand Down
87 changes: 66 additions & 21 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func (k *serviceModeKeeper) close() {
type client struct {
keyspaceID uint32
svrUrls []string
pdSvcDiscovery ServiceDiscovery
pdSvcDiscovery *pdServiceDiscovery
tokenDispatcher *tokenDispatcher

// For service mode switching.
Expand Down Expand Up @@ -503,7 +503,7 @@ func newClientWithKeyspaceName(
return err
}
// c.keyspaceID is the source of truth for keyspace id.
c.pdSvcDiscovery.(*pdServiceDiscovery).SetKeyspaceID(c.keyspaceID)
c.pdSvcDiscovery.SetKeyspaceID(c.keyspaceID)
return nil
}

Expand Down Expand Up @@ -733,6 +733,23 @@ func (c *client) getClientAndContext(ctx context.Context) (pdpb.PDClient, contex
return pdpb.NewPDClient(serviceClient.GetClientConn()), serviceClient.BuildGRPCTargetContext(ctx, true)
}

// getClientAndContext returns the leader pd client and the original context. If leader is unhealthy, it returns
// follower pd client and the context which holds forward information.
func (c *client) getRegionAPIClientAndContext(ctx context.Context, allowFollower bool) (ServiceClient, context.Context) {
var serviceClient ServiceClient
if allowFollower {
serviceClient = c.pdSvcDiscovery.getServiceClientByKind(regionAPIKind)
if serviceClient != nil {
return serviceClient, serviceClient.BuildGRPCTargetContext(ctx, !allowFollower)
}
}
serviceClient = c.pdSvcDiscovery.GetServiceClient()
if serviceClient == nil {
return nil, ctx
}
return serviceClient, serviceClient.BuildGRPCTargetContext(ctx, !allowFollower)
}

func (c *client) GetTSAsync(ctx context.Context) TSFuture {
return c.GetLocalTSAsync(ctx, globalDCLocation)
}
Expand Down Expand Up @@ -885,6 +902,7 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOpt
start := time.Now()
defer func() { cmdDurationGetRegion.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()

options := &GetRegionOp{}
for _, opt := range opts {
Expand All @@ -895,13 +913,18 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOpt
RegionKey: key,
NeedBuckets: options.needBuckets,
}
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
serviceClient, cctx := c.getRegionAPIClientAndContext(ctx, options.allowFollowerHandle && c.option.getEnableFollowerHandle())
if serviceClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err := protoClient.GetRegion(ctx, req)
cancel()
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegion(cctx, req)
if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) {
protoClient, cctx := c.getClientAndContext(ctx)
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err = protoClient.GetRegion(cctx, req)
}

if err = c.respForErr(cmdFailDurationGetRegion, start, err, resp.GetHeader()); err != nil {
return nil, err
Expand All @@ -917,6 +940,7 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegio
start := time.Now()
defer func() { cmdDurationGetPrevRegion.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()

options := &GetRegionOp{}
for _, opt := range opts {
Expand All @@ -927,13 +951,18 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegio
RegionKey: key,
NeedBuckets: options.needBuckets,
}
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
serviceClient, cctx := c.getRegionAPIClientAndContext(ctx, options.allowFollowerHandle && c.option.getEnableFollowerHandle())
if serviceClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err := protoClient.GetPrevRegion(ctx, req)
cancel()
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetPrevRegion(cctx, req)
if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) {
protoClient, cctx := c.getClientAndContext(ctx)
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err = protoClient.GetPrevRegion(cctx, req)
}

if err = c.respForErr(cmdFailDurationGetPrevRegion, start, err, resp.GetHeader()); err != nil {
return nil, err
Expand All @@ -949,6 +978,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...Get
start := time.Now()
defer func() { cmdDurationGetRegionByID.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()

options := &GetRegionOp{}
for _, opt := range opts {
Expand All @@ -959,13 +989,18 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...Get
RegionId: regionID,
NeedBuckets: options.needBuckets,
}
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
serviceClient, cctx := c.getRegionAPIClientAndContext(ctx, options.allowFollowerHandle && c.option.getEnableFollowerHandle())
if serviceClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err := protoClient.GetRegionByID(ctx, req)
cancel()
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegionByID(cctx, req)
if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) {
protoClient, cctx := c.getClientAndContext(ctx)
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err = protoClient.GetRegionByID(cctx, req)
}

if err = c.respForErr(cmdFailedDurationGetRegionByID, start, err, resp.GetHeader()); err != nil {
return nil, err
Expand All @@ -987,18 +1022,28 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int,
scanCtx, cancel = context.WithTimeout(ctx, c.option.timeout)
defer cancel()
}
options := &GetRegionOp{}
for _, opt := range opts {
opt(options)
}
req := &pdpb.ScanRegionsRequest{
Header: c.requestHeader(),
StartKey: key,
EndKey: endKey,
Limit: int32(limit),
}
protoClient, scanCtx := c.getClientAndContext(scanCtx)
if protoClient == nil {
cancel()
serviceClient, cctx := c.getRegionAPIClientAndContext(scanCtx, options.allowFollowerHandle && c.option.getEnableFollowerHandle())
if serviceClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err := protoClient.ScanRegions(scanCtx, req)
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).ScanRegions(cctx, req)
if !serviceClient.IsConnectedToLeader() && err != nil || resp.Header.GetError() != nil {
protoClient, cctx := c.getClientAndContext(scanCtx)
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err = protoClient.ScanRegions(cctx, req)
}

if err = c.respForErr(cmdFailedDurationScanRegions, start, err, resp.GetHeader()); err != nil {
return nil, err
Expand Down
2 changes: 2 additions & 0 deletions client/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ const (
PProfGoroutine = "/pd/api/v1/debug/pprof/goroutine"
// Others
MinResolvedTSPrefix = "/pd/api/v1/min-resolved-ts"
Cluster = "/pd/api/v1/cluster"
ClusterStatus = "/pd/api/v1/cluster/status"
Status = "/pd/api/v1/status"
Version = "/pd/api/v1/version"
// Micro Service
Expand Down
24 changes: 24 additions & 0 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"io"
"net/http"
"os"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -359,6 +360,21 @@ func WithMetrics(
}
}

// WithLoggerRedirection configures the client with the given logger redirection.
func WithLoggerRedirection(logLevel, fileName string) ClientOption {
cfg := &log.Config{}
cfg.Level = logLevel
if fileName != "" {
f, _ := os.CreateTemp(".", fileName)
fname := f.Name()
f.Close()
cfg.File.Filename = fname
}
lg, p, _ := log.InitLogger(cfg)
log.ReplaceGlobals(lg, p)
return func(c *client) {}
}

// NewClient creates a PD HTTP client with the given PD addresses and TLS config.
func NewClient(
source string,
Expand Down Expand Up @@ -425,3 +441,11 @@ func (c *client) request(ctx context.Context, reqInfo *requestInfo, headerOpts .
func (c *client) UpdateMembersInfo() {
c.inner.updateMembersInfo(c.inner.ctx)
}

// setLeaderAddrIdx sets the index of the leader address in the inner client.
// only used for testing.
func (c *client) setLeaderAddrIdx(idx int) {
c.inner.Lock()
defer c.inner.Unlock()
c.inner.leaderAddrIdx = idx
}
17 changes: 8 additions & 9 deletions client/http/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func TestRedirectWithMetrics(t *testing.T) {
re.NoError(err)
failureCnt.Write(&out)
re.Equal(float64(3), out.Counter.GetValue())
c.Close()

// 2. Test the Leader success, just need to send to leader.
httpClient = newHTTPClientWithRequestChecker(func(req *http.Request) error {
Expand All @@ -133,15 +134,15 @@ func TestRedirectWithMetrics(t *testing.T) {
}
return nil
})
c.(*client).inner.cli = httpClient
// Force to update members info.
c.(*client).inner.leaderAddrIdx = 0
c.(*client).inner.pdAddrs = pdAddrs
c = NewClient("test-http-pd-redirect", pdAddrs, WithHTTPClient(httpClient), WithMetrics(metricCnt, nil))
// force to update members info.
c.(*client).setLeaderAddrIdx(0)
c.CreateScheduler(context.Background(), "test", 0)
successCnt, err := c.(*client).inner.requestCounter.GetMetricWithLabelValues([]string{createSchedulerName, ""}...)
re.NoError(err)
successCnt.Write(&out)
re.Equal(float64(1), out.Counter.GetValue())
c.Close()

// 3. Test when the leader fails, needs to be sent to the follower in order,
// and returns directly if one follower succeeds
Expand All @@ -152,10 +153,9 @@ func TestRedirectWithMetrics(t *testing.T) {
}
return nil
})
c.(*client).inner.cli = httpClient
// Force to update members info.
c.(*client).inner.leaderAddrIdx = 0
c.(*client).inner.pdAddrs = pdAddrs
c = NewClient("test-http-pd-redirect", pdAddrs, WithHTTPClient(httpClient), WithMetrics(metricCnt, nil))
// force to update members info.
c.(*client).setLeaderAddrIdx(0)
c.CreateScheduler(context.Background(), "test", 0)
successCnt, err = c.(*client).inner.requestCounter.GetMetricWithLabelValues([]string{createSchedulerName, ""}...)
re.NoError(err)
Expand All @@ -167,6 +167,5 @@ func TestRedirectWithMetrics(t *testing.T) {
failureCnt.Write(&out)
// leader failure
re.Equal(float64(4), out.Counter.GetValue())

c.Close()
}
Loading

0 comments on commit a4083da

Please sign in to comment.