Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: remove api mode concept #8952

Merged
merged 7 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,8 +360,8 @@
c := &client{
callerComponent: adjustCallerComponent(callerComponent),
inner: &innerClient{
// Create a PD service discovery with null keyspace id, then query the real id with the keyspace name,
// finally update the keyspace id to the PD service discovery for the following interactions.
// Create a service discovery with null keyspace id, then query the real id with the keyspace name,
// finally update the keyspace id to the service discovery for the following interactions.
keyspaceID: constants.NullKeyspaceID,
updateTokenConnectionCh: make(chan struct{}, 1),
ctx: clientCtx,
Expand All @@ -384,7 +384,7 @@
}
c.inner.keyspaceID = keyspaceMeta.GetId()
// c.keyspaceID is the source of truth for keyspace id.
c.inner.pdSvcDiscovery.SetKeyspaceID(c.inner.keyspaceID)
c.inner.serviceDiscovery.SetKeyspaceID(c.inner.keyspaceID)
return nil
}

Expand Down Expand Up @@ -412,17 +412,17 @@

// GetClusterID returns the ClusterID.
func (c *client) GetClusterID(context.Context) uint64 {
return c.inner.pdSvcDiscovery.GetClusterID()
return c.inner.serviceDiscovery.GetClusterID()

Check warning on line 415 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L415

Added line #L415 was not covered by tests
}

// GetLeaderURL returns the leader URL.
func (c *client) GetLeaderURL() string {
return c.inner.pdSvcDiscovery.GetServingURL()
return c.inner.serviceDiscovery.GetServingURL()
}

// GetServiceDiscovery returns the client-side service discovery object
func (c *client) GetServiceDiscovery() sd.ServiceDiscovery {
return c.inner.pdSvcDiscovery
return c.inner.serviceDiscovery
}

// UpdateOption updates the client option.
Expand All @@ -438,7 +438,7 @@
}
case opt.EnableTSOFollowerProxy:
if c.inner.getServiceMode() != pdpb.ServiceMode_PD_SVC_MODE {
return errors.New("[pd] tso follower proxy is only supported in PD service mode")
return errors.New("[pd] tso follower proxy is only supported in PD mode")
}
enable, ok := value.(bool)
if !ok {
Expand Down Expand Up @@ -485,7 +485,7 @@
// getClientAndContext returns the leader pd client and the original context. If leader is unhealthy, it returns
// follower pd client and the context which holds forward information.
func (c *client) getClientAndContext(ctx context.Context) (pdpb.PDClient, context.Context) {
serviceClient := c.inner.pdSvcDiscovery.GetServiceClient()
serviceClient := c.inner.serviceDiscovery.GetServiceClient()
if serviceClient == nil || serviceClient.GetClientConn() == nil {
return nil, ctx
}
Expand Down Expand Up @@ -526,7 +526,7 @@

// GetMinTS implements the TSOClient interface.
func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, err error) {
// Handle compatibility issue in case of PD/API server doesn't support GetMinTS API.
// Handle compatibility issue in case of PD/PD service doesn't support GetMinTS API.
serviceMode := c.inner.getServiceMode()
switch serviceMode {
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
Expand Down Expand Up @@ -598,7 +598,7 @@

var resp *pdpb.GetRegionResponse
for _, url := range memberURLs {
conn, err := c.inner.pdSvcDiscovery.GetOrCreateGRPCConn(url)
conn, err := c.inner.serviceDiscovery.GetOrCreateGRPCConn(url)

Check warning on line 601 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L601

Added line #L601 was not covered by tests
if err != nil {
log.Error("[pd] can't get grpc connection", zap.String("member-URL", url), errs.ZapError(err))
continue
Expand All @@ -619,7 +619,7 @@

if resp == nil {
metrics.CmdFailedDurationGetRegion.Observe(time.Since(start).Seconds())
c.inner.pdSvcDiscovery.ScheduleCheckMemberChanged()
c.inner.serviceDiscovery.ScheduleCheckMemberChanged()

Check warning on line 622 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L622

Added line #L622 was not covered by tests
errorMsg := fmt.Sprintf("[pd] can't get region info from member URLs: %+v", memberURLs)
return nil, errors.WithStack(errors.New(errorMsg))
}
Expand Down Expand Up @@ -1150,7 +1150,7 @@

func (c *client) requestHeader() *pdpb.RequestHeader {
return &pdpb.RequestHeader{
ClusterId: c.inner.pdSvcDiscovery.GetClusterID(),
ClusterId: c.inner.serviceDiscovery.GetClusterID(),
CallerId: string(caller.GetCallerID()),
CallerComponent: string(c.callerComponent),
}
Expand Down Expand Up @@ -1334,7 +1334,7 @@
if err != nil || header.GetError() != nil {
observer.Observe(time.Since(start).Seconds())
if err != nil {
c.inner.pdSvcDiscovery.ScheduleCheckMemberChanged()
c.inner.serviceDiscovery.ScheduleCheckMemberChanged()
return errors.WithStack(err)
}
return errors.WithStack(errors.New(header.GetError().String()))
Expand Down
2 changes: 1 addition & 1 deletion client/clients/tso/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ func (c *Cli) DispatchRequest(request *Request) (bool, error) {
// Client is closed, no need to retry.
return false, request.clientCtx.Err()
case <-c.ctx.Done():
// tsoClient is closed due to the PD service mode switch, which is retryable.
// tsoClient is closed due to the service mode switch, which is retryable.
return true, c.ctx.Err()
default:
// This failpoint will increase the possibility that the request is sent to a closed dispatcher.
Expand Down
2 changes: 1 addition & 1 deletion client/clients/tso/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (m *mockTSOServiceProvider) getOption() *opt.Option {
}

func (*mockTSOServiceProvider) getServiceDiscovery() sd.ServiceDiscovery {
return sd.NewMockPDServiceDiscovery([]string{mockStreamURL}, nil)
return sd.NewMockServiceDiscovery([]string{mockStreamURL}, nil)
}

func (m *mockTSOServiceProvider) getConnectionCtxMgr() *cctx.Manager[*tsoStream] {
Expand Down
10 changes: 5 additions & 5 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func (ci *clientInner) doRequest(
if readErr != nil {
logFields = append(logFields, zap.NamedError("read-body-error", err))
} else {
// API server will return a JSON body containing the detailed error message
// PD service will return a JSON body containing the detailed error message
// when the status code is not `http.StatusOK` 200.
bs = bytes.TrimSpace(bs)
logFields = append(logFields, zap.ByteString("body", bs))
Expand Down Expand Up @@ -304,7 +304,7 @@ func WithMetrics(
}
}

// NewClientWithServiceDiscovery creates a PD HTTP client with the given PD service discovery.
// NewClientWithServiceDiscovery creates a PD HTTP client with the given service discovery.
func NewClientWithServiceDiscovery(
source string,
sd sd.ServiceDiscovery,
Expand Down Expand Up @@ -332,7 +332,7 @@ func NewClient(
for _, opt := range opts {
opt(c)
}
sd := sd.NewDefaultPDServiceDiscovery(ctx, cancel, pdAddrs, c.inner.tlsConf)
sd := sd.NewDefaultServiceDiscovery(ctx, cancel, pdAddrs, c.inner.tlsConf)
if err := sd.Init(); err != nil {
log.Error("[pd] init service discovery failed",
zap.String("source", source), zap.Strings("pd-addrs", pdAddrs), zap.Error(err))
Expand Down Expand Up @@ -420,7 +420,7 @@ func NewHTTPClientWithRequestChecker(checker requestChecker) *http.Client {
}
}

// newClientWithMockServiceDiscovery creates a new PD HTTP client with a mock PD service discovery.
// newClientWithMockServiceDiscovery creates a new PD HTTP client with a mock service discovery.
func newClientWithMockServiceDiscovery(
source string,
pdAddrs []string,
Expand All @@ -432,7 +432,7 @@ func newClientWithMockServiceDiscovery(
for _, opt := range opts {
opt(c)
}
sd := sd.NewMockPDServiceDiscovery(pdAddrs, c.inner.tlsConf)
sd := sd.NewMockServiceDiscovery(pdAddrs, c.inner.tlsConf)
if err := sd.Init(); err != nil {
log.Error("[pd] init mock service discovery failed",
zap.String("source", source), zap.Strings("pd-addrs", pdAddrs), zap.Error(err))
Expand Down
36 changes: 18 additions & 18 deletions client/inner_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
)

type innerClient struct {
keyspaceID uint32
svrUrls []string
pdSvcDiscovery sd.ServiceDiscovery
tokenDispatcher *tokenDispatcher
keyspaceID uint32
svrUrls []string
serviceDiscovery sd.ServiceDiscovery
tokenDispatcher *tokenDispatcher

// For service mode switching.
serviceModeKeeper
Expand All @@ -45,13 +45,13 @@
}

func (c *innerClient) init(updateKeyspaceIDCb sd.UpdateKeyspaceIDFunc) error {
c.pdSvcDiscovery = sd.NewPDServiceDiscovery(
c.serviceDiscovery = sd.NewServiceDiscovery(
c.ctx, c.cancel, &c.wg, c.setServiceMode,
updateKeyspaceIDCb, c.keyspaceID, c.svrUrls, c.tlsCfg, c.option)
if err := c.setup(); err != nil {
c.cancel()
if c.pdSvcDiscovery != nil {
c.pdSvcDiscovery.Close()
if c.serviceDiscovery != nil {
c.serviceDiscovery.Close()
}
return err
}
Expand Down Expand Up @@ -92,10 +92,10 @@
switch mode {
case pdpb.ServiceMode_PD_SVC_MODE:
newTSOCli = tso.NewClient(c.ctx, c.option,
c.pdSvcDiscovery, &tso.PDStreamBuilderFactory{})
c.serviceDiscovery, &tso.PDStreamBuilderFactory{})
case pdpb.ServiceMode_API_SVC_MODE:
newTSOSvcDiscovery = sd.NewTSOServiceDiscovery(
c.ctx, c, c.pdSvcDiscovery,
c.ctx, c, c.serviceDiscovery,
c.keyspaceID, c.tlsCfg, c.option)
// At this point, the keyspace group isn't known yet. Starts from the default keyspace group,
// and will be updated later.
Expand All @@ -119,12 +119,12 @@
oldTSOClient.Close()
// Replace the old TSO service discovery if needed.
oldTSOSvcDiscovery := c.tsoSvcDiscovery
// If newTSOSvcDiscovery is nil, that's expected, as it means we are switching to PD service mode and
// If newTSOSvcDiscovery is nil, that's expected, as it means we are switching to PD mode and
// no tso microservice discovery is needed.
c.tsoSvcDiscovery = newTSOSvcDiscovery
// Close the old TSO service discovery safely after both the old client and service discovery are replaced.
if oldTSOSvcDiscovery != nil {
// We are switching from API service mode to PD service mode, so delete the old tso microservice discovery.
// We are switching from PD service mode to PD mode, so delete the old tso microservice discovery.
oldTSOSvcDiscovery.Close()
}
}
Expand Down Expand Up @@ -153,7 +153,7 @@
c.wg.Wait()

c.serviceModeKeeper.close()
c.pdSvcDiscovery.Close()
c.serviceDiscovery.Close()

if c.tokenDispatcher != nil {
tokenErr := errors.WithStack(errs.ErrClosing)
Expand All @@ -169,12 +169,12 @@
}

// Init the client base.
if err := c.pdSvcDiscovery.Init(); err != nil {
if err := c.serviceDiscovery.Init(); err != nil {
return err
}

// Register callbacks
c.pdSvcDiscovery.AddServingURLSwitchedCallback(c.scheduleUpdateTokenConnection)
c.serviceDiscovery.AddServingURLSwitchedCallback(c.scheduleUpdateTokenConnection)

// Create dispatchers
c.createTokenDispatcher()
Expand All @@ -186,12 +186,12 @@
func (c *innerClient) getRegionAPIClientAndContext(ctx context.Context, allowFollower bool) (sd.ServiceClient, context.Context) {
var serviceClient sd.ServiceClient
if allowFollower {
serviceClient = c.pdSvcDiscovery.GetServiceClientByKind(sd.UniversalAPIKind)
serviceClient = c.serviceDiscovery.GetServiceClientByKind(sd.UniversalAPIKind)
if serviceClient != nil {
return serviceClient, serviceClient.BuildGRPCTargetContext(ctx, !allowFollower)
}
}
serviceClient = c.pdSvcDiscovery.GetServiceClient()
serviceClient = c.serviceDiscovery.GetServiceClient()
if serviceClient == nil || serviceClient.GetClientConn() == nil {
return nil, ctx
}
Expand All @@ -201,12 +201,12 @@
// gRPCErrorHandler is used to handle the gRPC error returned by the resource manager service.
func (c *innerClient) gRPCErrorHandler(err error) {
if errs.IsLeaderChange(err) {
c.pdSvcDiscovery.ScheduleCheckMemberChanged()
c.serviceDiscovery.ScheduleCheckMemberChanged()

Check warning on line 204 in client/inner_client.go

View check run for this annotation

Codecov / codecov/patch

client/inner_client.go#L204

Added line #L204 was not covered by tests
}
}

func (c *innerClient) getOrCreateGRPCConn() (*grpc.ClientConn, error) {
cc, err := c.pdSvcDiscovery.GetOrCreateGRPCConn(c.pdSvcDiscovery.GetServingURL())
cc, err := c.serviceDiscovery.GetOrCreateGRPCConn(c.serviceDiscovery.GetServingURL())
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions client/keyspace_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

// keyspaceClient returns the KeyspaceClient from current PD leader.
func (c *client) keyspaceClient() keyspacepb.KeyspaceClient {
if client := c.inner.pdSvcDiscovery.GetServingEndpointClientConn(); client != nil {
if client := c.inner.serviceDiscovery.GetServingEndpointClientConn(); client != nil {
return keyspacepb.NewKeyspaceClient(client)
}
return nil
Expand Down Expand Up @@ -70,7 +70,7 @@

if err != nil {
metrics.CmdFailedDurationLoadKeyspace.Observe(time.Since(start).Seconds())
c.inner.pdSvcDiscovery.ScheduleCheckMemberChanged()
c.inner.serviceDiscovery.ScheduleCheckMemberChanged()

Check warning on line 73 in client/keyspace_client.go

View check run for this annotation

Codecov / codecov/patch

client/keyspace_client.go#L73

Added line #L73 was not covered by tests
return nil, err
}

Expand Down Expand Up @@ -115,7 +115,7 @@

if err != nil {
metrics.CmdFailedDurationUpdateKeyspaceState.Observe(time.Since(start).Seconds())
c.inner.pdSvcDiscovery.ScheduleCheckMemberChanged()
c.inner.serviceDiscovery.ScheduleCheckMemberChanged()

Check warning on line 118 in client/keyspace_client.go

View check run for this annotation

Codecov / codecov/patch

client/keyspace_client.go#L118

Added line #L118 was not covered by tests
return nil, err
}

Expand Down Expand Up @@ -159,7 +159,7 @@

if err != nil {
metrics.CmdDurationGetAllKeyspaces.Observe(time.Since(start).Seconds())
c.inner.pdSvcDiscovery.ScheduleCheckMemberChanged()
c.inner.serviceDiscovery.ScheduleCheckMemberChanged()

Check warning on line 162 in client/keyspace_client.go

View check run for this annotation

Codecov / codecov/patch

client/keyspace_client.go#L162

Added line #L162 was not covered by tests
return nil, err
}

Expand Down
8 changes: 4 additions & 4 deletions client/meta_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (

// metaStorageClient gets the meta storage client from current PD leader.
func (c *innerClient) metaStorageClient() meta_storagepb.MetaStorageClient {
if client := c.pdSvcDiscovery.GetServingEndpointClientConn(); client != nil {
if client := c.serviceDiscovery.GetServingEndpointClientConn(); client != nil {
return meta_storagepb.NewMetaStorageClient(client)
}
return nil
Expand Down Expand Up @@ -74,7 +74,7 @@ func (c *innerClient) Put(ctx context.Context, key, value []byte, opts ...opt.Me
Lease: options.Lease,
PrevKv: options.PrevKv,
}
ctx = grpcutil.BuildForwardContext(ctx, c.pdSvcDiscovery.GetServingURL())
ctx = grpcutil.BuildForwardContext(ctx, c.serviceDiscovery.GetServingURL())
cli := c.metaStorageClient()
if cli == nil {
cancel()
Expand Down Expand Up @@ -113,7 +113,7 @@ func (c *innerClient) Get(ctx context.Context, key []byte, opts ...opt.MetaStora
Limit: options.Limit,
Revision: options.Revision,
}
ctx = grpcutil.BuildForwardContext(ctx, c.pdSvcDiscovery.GetServingURL())
ctx = grpcutil.BuildForwardContext(ctx, c.serviceDiscovery.GetServingURL())
cli := c.metaStorageClient()
if cli == nil {
cancel()
Expand Down Expand Up @@ -179,7 +179,7 @@ func (c *innerClient) respForMetaStorageErr(observer prometheus.Observer, start
if err != nil || header.GetError() != nil {
observer.Observe(time.Since(start).Seconds())
if err != nil {
c.pdSvcDiscovery.ScheduleCheckMemberChanged()
c.serviceDiscovery.ScheduleCheckMemberChanged()
return errors.WithStack(err)
}
return errors.WithStack(errors.New(header.GetError().String()))
Expand Down
4 changes: 2 additions & 2 deletions client/resource_manager_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@
// If the stream is still nil, return an error.
if stream == nil {
firstRequest.done <- errors.Errorf("failed to get the stream connection")
c.pdSvcDiscovery.ScheduleCheckMemberChanged()
c.serviceDiscovery.ScheduleCheckMemberChanged()

Check warning on line 334 in client/resource_manager_client.go

View check run for this annotation

Codecov / codecov/patch

client/resource_manager_client.go#L334

Added line #L334 was not covered by tests
connection.reset()
continue
}
Expand All @@ -343,7 +343,7 @@
default:
}
if err = c.processTokenRequests(stream, firstRequest); err != nil {
c.pdSvcDiscovery.ScheduleCheckMemberChanged()
c.serviceDiscovery.ScheduleCheckMemberChanged()
connection.reset()
log.Info("[resource_manager] token request error", zap.Error(err))
}
Expand Down
Loading
Loading