Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#8951
Browse files Browse the repository at this point in the history
close tikv#8950

Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
okJiang authored and ti-chi-bot committed Jan 3, 2025
1 parent a5ca5e7 commit 9883913
Show file tree
Hide file tree
Showing 12 changed files with 974 additions and 8 deletions.
587 changes: 587 additions & 0 deletions client/clients/tso/client.go

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,13 @@ require (
github.com/gogo/protobuf v1.3.2
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
<<<<<<< HEAD
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230407040905-68d0eebd564a
=======
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86
github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037
>>>>>>> 7a30ebc97 (server: advance ServerStart check (#8951))
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.11.1
github.com/stretchr/testify v1.8.2
Expand Down
29 changes: 28 additions & 1 deletion client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,19 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW
github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs=
github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTmyFqUwr+jcCvpVkK7sumiz+ko5H9eq4=
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
<<<<<<< HEAD
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20230407040905-68d0eebd564a h1:PWkMSJSDaOuLNKCV84K3tQ9stZuZPN8E148jRPD9TcA=
github.com/pingcap/kvproto v0.0.0-20230407040905-68d0eebd564a/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk=
=======
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 h1:tdMsjOqUR7YXHoBitzdebTvOjs/swniBTOLy5XiMtuE=
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86/go.mod h1:exzhVYca3WRtd6gclGNErRWb1qEgff3LYta0LvRmON4=
github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 h1:xYNSJjYNur4Dr5bV+9BXK9n5E0T1zlcAN25XX68+mOg=
github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
>>>>>>> 7a30ebc97 (server: advance ServerStart check (#8951))
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand All @@ -93,6 +99,7 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
<<<<<<< HEAD
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
Expand All @@ -115,6 +122,18 @@ github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNX
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
=======
github.com/prometheus/client_golang v1.20.5 h1:cxppBPuYhUnsO6yo/aoRol4L7q7UFfdm+bR9r+8l63Y=
github.com/prometheus/client_golang v1.20.5/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE=
github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E=
github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY=
github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G1dc=
github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8=
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
>>>>>>> 7a30ebc97 (server: advance ServerStart check (#8951))
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
Expand Down Expand Up @@ -231,15 +250,23 @@ google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
<<<<<<< HEAD
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
=======
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
>>>>>>> 7a30ebc97 (server: advance ServerStart check (#8951))
gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
<<<<<<< HEAD
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
=======
>>>>>>> 7a30ebc97 (server: advance ServerStart check (#8951))
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Expand Down
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,13 @@ require (
github.com/phf/go-queue v0.0.0-20170504031614-9abe38d0371d
github.com/pingcap/errcode v0.3.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
<<<<<<< HEAD
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce
github.com/pingcap/kvproto v0.0.0-20230407040905-68d0eebd564a
=======
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86
github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037
>>>>>>> 7a30ebc97 (server: advance ServerStart check (#8951))
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21
github.com/pingcap/tidb-dashboard v0.0.0-20241108022316-afb197c94697
Expand Down
6 changes: 5 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -400,12 +400,16 @@ github.com/pingcap/check v0.0.0-20191216031241-8a5a85928f12 h1:rfD9v3+ppLPzoQBgZ
github.com/pingcap/errcode v0.3.0 h1:IF6LC/4+b1KNwrMlr2rBTUrojFPMexXBcDWZSpNwxjg=
github.com/pingcap/errcode v0.3.0/go.mod h1:4b2X8xSqxIroj/IZ9MX/VGZhAwc11wB9wRIzHvz6SeM=
github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTmyFqUwr+jcCvpVkK7sumiz+ko5H9eq4=
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
<<<<<<< HEAD
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce h1:Y1kCxlCtlPTMtVcOkjUcuQKh+YrluSo7+7YMCQSzy30=
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce/go.mod h1:w4PEZ5y16LeofeeGwdgZB4ddv9bLyDuIX+ljstgKZyk=
=======
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 h1:tdMsjOqUR7YXHoBitzdebTvOjs/swniBTOLy5XiMtuE=
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86/go.mod h1:exzhVYca3WRtd6gclGNErRWb1qEgff3LYta0LvRmON4=
>>>>>>> 7a30ebc97 (server: advance ServerStart check (#8951))
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20230407040905-68d0eebd564a h1:PWkMSJSDaOuLNKCV84K3tQ9stZuZPN8E148jRPD9TcA=
github.com/pingcap/kvproto v0.0.0-20230407040905-68d0eebd564a/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk=
Expand Down
195 changes: 195 additions & 0 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,170 @@ func (s *GrpcServer) GetClusterInfo(ctx context.Context, _ *pdpb.GetClusterInfoR
}, nil
}

<<<<<<< HEAD
=======
// GetMinTS implements gRPC PDServer. In PD service mode, it simply returns a timestamp.
// In API service mode, it queries all tso servers and gets the minimum timestamp across
// all keyspace groups.
func (s *GrpcServer) GetMinTS(
ctx context.Context, request *pdpb.GetMinTSRequest,
) (*pdpb.GetMinTSResponse, error) {
if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() {
fName := currentFunction()
limiter := s.GetGRPCRateLimiter()
if done, err := limiter.Allow(fName); err == nil {
defer done()
} else {
return nil, errs.ErrGRPCRateLimitExceeded(err)
}
}
fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) {
return pdpb.NewPDClient(client).GetMinTS(ctx, request)
}
if rsp, err := s.unaryMiddleware(ctx, request, fn); err != nil {
return nil, err
} else if rsp != nil {
return rsp.(*pdpb.GetMinTSResponse), nil
}

var (
minTS *pdpb.Timestamp
err error
)
if s.IsServiceIndependent(constant.TSOServiceName) {
minTS, err = s.GetMinTSFromTSOService()
} else {
start := time.Now()
ts, internalErr := s.tsoAllocatorManager.HandleRequest(ctx, 1)
if internalErr == nil {
tsoHandleDuration.Observe(time.Since(start).Seconds())
}
minTS = &ts
}
if err != nil {
return &pdpb.GetMinTSResponse{
Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()),
Timestamp: minTS,
}, nil
}

return &pdpb.GetMinTSResponse{
Header: wrapHeader(),
Timestamp: minTS,
}, nil
}

// GetMinTSFromTSOService queries all tso servers and gets the minimum timestamp across
// all keyspace groups.
func (s *GrpcServer) GetMinTSFromTSOService() (*pdpb.Timestamp, error) {
if s.IsClosed() {
return nil, errs.ErrNotStarted
}
addrs := s.keyspaceGroupManager.GetTSOServiceAddrs()
if len(addrs) == 0 {
return &pdpb.Timestamp{}, errs.ErrGetMinTS.FastGenByArgs("no tso servers/pods discovered")
}

// Get the minimal timestamp from the TSO servers/pods
var mutex syncutil.Mutex
resps := make([]*tsopb.GetMinTSResponse, 0)
wg := sync.WaitGroup{}
wg.Add(len(addrs))
for _, addr := range addrs {
go func(addr string) {
defer wg.Done()
resp, err := s.getMinTSFromSingleServer(s.ctx, addr)
if err != nil || resp == nil {
log.Warn("failed to get min ts from tso server",
zap.String("address", addr), zap.Error(err))
return
}
mutex.Lock()
defer mutex.Unlock()
resps = append(resps, resp)
}(addr)
}
wg.Wait()

// Check the results. The returned minimal timestamp is valid if all the conditions are met:
// 1. The number of responses is equal to the number of TSO servers/pods.
// 2. The number of keyspace groups asked is equal to the number of TSO servers/pods.
// 3. The minimal timestamp is not zero.
var (
minTS *pdpb.Timestamp
keyspaceGroupsAsked uint32
)
if len(resps) == 0 {
return &pdpb.Timestamp{}, errs.ErrGetMinTS.FastGenByArgs("none of tso server/pod responded")
}
emptyTS := &pdpb.Timestamp{}
keyspaceGroupsTotal := resps[0].KeyspaceGroupsTotal
for _, resp := range resps {
if resp.KeyspaceGroupsTotal == 0 {
return &pdpb.Timestamp{}, errs.ErrGetMinTS.FastGenByArgs("the tso service has no keyspace group")
}
if resp.KeyspaceGroupsTotal != keyspaceGroupsTotal {
return &pdpb.Timestamp{}, errs.ErrGetMinTS.FastGenByArgs(
"the tso service has inconsistent keyspace group total count")
}
keyspaceGroupsAsked += resp.KeyspaceGroupsServing
if tsoutil.CompareTimestamp(resp.Timestamp, emptyTS) > 0 &&
(minTS == nil || tsoutil.CompareTimestamp(resp.Timestamp, minTS) < 0) {
minTS = resp.Timestamp
}
}

if keyspaceGroupsAsked != keyspaceGroupsTotal {
return &pdpb.Timestamp{}, errs.ErrGetMinTS.FastGenByArgs(
fmt.Sprintf("can't query all the tso keyspace groups. Asked %d, expected %d",
keyspaceGroupsAsked, keyspaceGroupsTotal))
}

if minTS == nil {
return &pdpb.Timestamp{}, errs.ErrGetMinTS.FastGenByArgs("the tso service is not ready")
}

return minTS, nil
}

func (s *GrpcServer) getMinTSFromSingleServer(
ctx context.Context, tsoSrvAddr string,
) (*tsopb.GetMinTSResponse, error) {
cc, err := s.getDelegateClient(s.ctx, tsoSrvAddr)
if err != nil {
return nil, errs.ErrClientGetMinTSO.FastGenByArgs(
fmt.Sprintf("can't connect to tso server %s", tsoSrvAddr))
}

cctx, cancel := context.WithTimeout(ctx, getMinTSFromTSOServerTimeout)
defer cancel()

resp, err := tsopb.NewTSOClient(cc).GetMinTS(
cctx, &tsopb.GetMinTSRequest{
Header: &tsopb.RequestHeader{
ClusterId: keypath.ClusterID(),
},
})
if err != nil {
attachErr := errors.Errorf("error:%s target:%s status:%s",
err, cc.Target(), cc.GetState().String())
return nil, errs.ErrClientGetMinTSO.Wrap(attachErr).GenWithStackByCause()
}
if resp == nil {
attachErr := errors.Errorf("error:%s target:%s status:%s",
"no min ts info collected", cc.Target(), cc.GetState().String())
return nil, errs.ErrClientGetMinTSO.Wrap(attachErr).GenWithStackByCause()
}
if resp.GetHeader().GetError() != nil {
attachErr := errors.Errorf("error:%s target:%s status:%s",
resp.GetHeader().GetError().String(), cc.Target(), cc.GetState().String())
return nil, errs.ErrClientGetMinTSO.Wrap(attachErr).GenWithStackByCause()
}

return resp, nil
}

>>>>>>> 7a30ebc97 (server: advance ServerStart check (#8951))
// GetMembers implements gRPC PDServer.
func (s *GrpcServer) GetMembers(context.Context, *pdpb.GetMembersRequest) (*pdpb.GetMembersResponse, error) {
// Here we purposely do not check the cluster ID because the client does not know the correct cluster ID
Expand Down Expand Up @@ -203,9 +367,19 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error {
return errors.WithStack(err)
}

<<<<<<< HEAD
if forwardedHost, err := s.getForwardedHost(ctx, stream.Context()); err != nil {
return err
} else if len(forwardedHost) > 0 {
=======
// TSO uses leader lease to determine validity. No need to check leader here.
if s.IsClosed() {
return errs.ErrNotStarted
}

forwardedHost := grpcutil.GetForwardedHost(stream.Context())
if !s.isLocalRequest(forwardedHost) {
>>>>>>> 7a30ebc97 (server: advance ServerStart check (#8951))
clientConn, err := s.getDelegateClient(s.ctx, forwardedHost)
if err != nil {
return errors.WithStack(err)
Expand All @@ -230,12 +404,17 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error {
}

start := time.Now()
<<<<<<< HEAD
// TSO uses leader lease to determine validity. No need to check leader here.
if s.IsClosed() {
return status.Errorf(codes.Unknown, "server not started")
}
if request.GetHeader().GetClusterId() != s.clusterID {
return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", s.clusterID, request.GetHeader().GetClusterId())
=======
if clusterID := keypath.ClusterID(); request.GetHeader().GetClusterId() != clusterID {
return errs.ErrMismatchClusterID(clusterID, request.GetHeader().GetClusterId())
>>>>>>> 7a30ebc97 (server: advance ServerStart check (#8951))
}
count := request.GetCount()
ts, err := s.tsoAllocatorManager.HandleRequest(request.GetDcLocation(), count)
Expand Down Expand Up @@ -344,7 +523,23 @@ func (s *GrpcServer) AllocID(ctx context.Context, request *pdpb.AllocIDRequest)
}

// IsSnapshotRecovering implements gRPC PDServer.
<<<<<<< HEAD
func (s *GrpcServer) IsSnapshotRecovering(ctx context.Context, request *pdpb.IsSnapshotRecoveringRequest) (*pdpb.IsSnapshotRecoveringResponse, error) {
=======
func (s *GrpcServer) IsSnapshotRecovering(ctx context.Context, _ *pdpb.IsSnapshotRecoveringRequest) (*pdpb.IsSnapshotRecoveringResponse, error) {
if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() {
fName := currentFunction()
limiter := s.GetGRPCRateLimiter()
if done, err := limiter.Allow(fName); err == nil {
defer done()
} else {
return nil, errs.ErrGRPCRateLimitExceeded(err)
}
}
if s.IsClosed() {
return nil, errs.ErrNotStarted
}
>>>>>>> 7a30ebc97 (server: advance ServerStart check (#8951))
// recovering mark is stored in etcd directly, there's no need to forward.
marked, err := s.Server.IsSnapshotRecovering(ctx)
if err != nil {
Expand Down
12 changes: 12 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,18 @@ func (s *Server) startServer(ctx context.Context) error {
cb()
}

<<<<<<< HEAD
=======
// to init all rate limiter and metrics
for service := range s.serviceLabels {
s.serviceRateLimiter.Update(service, ratelimit.InitLimiter())
}
for service := range s.grpcServiceLabels {
s.grpcServiceRateLimiter.Update(service, ratelimit.InitLimiter())
}

failpoint.InjectCall("delayStartServer")
>>>>>>> 7a30ebc97 (server: advance ServerStart check (#8951))
// Server has started.
atomic.StoreInt64(&s.isRunning, 1)
serverMaxProcs.Set(float64(runtime.GOMAXPROCS(0)))
Expand Down
Loading

0 comments on commit 9883913

Please sign in to comment.