From ae3cb66f258e23645d1f0399bd082051f13efbb2 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 6 Jan 2025 11:39:23 +0800 Subject: [PATCH] rename client Signed-off-by: Ryan Leung --- client/client.go | 24 +-- client/clients/tso/client.go | 2 +- client/clients/tso/dispatcher_test.go | 2 +- client/http/client.go | 8 +- client/inner_client.go | 28 +-- client/keyspace_client.go | 8 +- client/meta_storage_client.go | 8 +- client/resource_manager_client.go | 4 +- ...discovery.go => mock_service_discovery.go} | 50 ++--- ...vice_discovery.go => service_discovery.go} | 204 +++++++++--------- ...very_test.go => service_discovery_test.go} | 20 +- .../servicediscovery/tso_service_discovery.go | 26 +-- server/grpc_service.go | 2 +- tests/cluster.go | 4 +- tests/integrations/client/client_test.go | 2 +- .../mcs/discovery/register_test.go | 2 +- .../mcs/keyspace/tso_keyspace_group_test.go | 2 +- tests/integrations/mcs/members/member_test.go | 2 +- .../mcs/scheduling/config_test.go | 2 +- .../integrations/mcs/scheduling/meta_test.go | 2 +- .../integrations/mcs/scheduling/rule_test.go | 2 +- .../mcs/scheduling/server_test.go | 4 +- tests/integrations/mcs/tso/api_test.go | 6 +- .../mcs/tso/keyspace_group_manager_test.go | 6 +- tests/integrations/mcs/tso/proxy_test.go | 2 +- tests/integrations/mcs/tso/server_test.go | 10 +- tests/integrations/tso/client_test.go | 4 +- tests/integrations/tso/consistency_test.go | 2 +- tests/integrations/tso/server_test.go | 2 +- .../apiv2/handlers/tso_keyspace_group_test.go | 2 +- tests/testutil.go | 2 +- .../tests/keyspace/keyspace_group_test.go | 14 +- tools/pd-ctl/tests/keyspace/keyspace_test.go | 4 +- 33 files changed, 231 insertions(+), 231 deletions(-) rename client/servicediscovery/{mock_pd_service_discovery.go => mock_service_discovery.go} (57%) rename client/servicediscovery/{pd_service_discovery.go => service_discovery.go} (82%) rename client/servicediscovery/{pd_service_discovery_test.go => service_discovery_test.go} (96%) diff --git a/client/client.go b/client/client.go index 4291b0afb4f..566a851a678 100644 --- a/client/client.go +++ b/client/client.go @@ -363,8 +363,8 @@ func newClientWithKeyspaceName( c := &client{ callerComponent: adjustCallerComponent(callerComponent), inner: &innerClient{ - // Create a PD service discovery with null keyspace id, then query the real id with the keyspace name, - // finally update the keyspace id to the PD service discovery for the following interactions. + // Create a service discovery with null keyspace id, then query the real id with the keyspace name, + // finally update the keyspace id to the service discovery for the following interactions. keyspaceID: constants.NullKeyspaceID, updateTokenConnectionCh: make(chan struct{}, 1), ctx: clientCtx, @@ -387,7 +387,7 @@ func newClientWithKeyspaceName( } c.inner.keyspaceID = keyspaceMeta.GetId() // c.keyspaceID is the source of truth for keyspace id. - c.inner.pdSvcDiscovery.SetKeyspaceID(c.inner.keyspaceID) + c.inner.serviceDiscovery.SetKeyspaceID(c.inner.keyspaceID) return nil } @@ -415,17 +415,17 @@ func (c *client) ResetTSOClient() { // GetClusterID returns the ClusterID. func (c *client) GetClusterID(context.Context) uint64 { - return c.inner.pdSvcDiscovery.GetClusterID() + return c.inner.serviceDiscovery.GetClusterID() } // GetLeaderURL returns the leader URL. func (c *client) GetLeaderURL() string { - return c.inner.pdSvcDiscovery.GetServingURL() + return c.inner.serviceDiscovery.GetServingURL() } // GetServiceDiscovery returns the client-side service discovery object func (c *client) GetServiceDiscovery() sd.ServiceDiscovery { - return c.inner.pdSvcDiscovery + return c.inner.serviceDiscovery } // UpdateOption updates the client option. @@ -441,7 +441,7 @@ func (c *client) UpdateOption(option opt.DynamicOption, value any) error { } case opt.EnableTSOFollowerProxy: if c.inner.getServiceMode() != pdpb.ServiceMode_PD_SVC_MODE { - return errors.New("[pd] tso follower proxy is only supported in PD service mode") + return errors.New("[pd] tso follower proxy is only supported in PD mode") } enable, ok := value.(bool) if !ok { @@ -494,7 +494,7 @@ func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) { // getClientAndContext returns the leader pd client and the original context. If leader is unhealthy, it returns // follower pd client and the context which holds forward information. func (c *client) getClientAndContext(ctx context.Context) (pdpb.PDClient, context.Context) { - serviceClient := c.inner.pdSvcDiscovery.GetServiceClient() + serviceClient := c.inner.serviceDiscovery.GetServiceClient() if serviceClient == nil || serviceClient.GetClientConn() == nil { return nil, ctx } @@ -607,7 +607,7 @@ func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs var resp *pdpb.GetRegionResponse for _, url := range memberURLs { - conn, err := c.inner.pdSvcDiscovery.GetOrCreateGRPCConn(url) + conn, err := c.inner.serviceDiscovery.GetOrCreateGRPCConn(url) if err != nil { log.Error("[pd] can't get grpc connection", zap.String("member-URL", url), errs.ZapError(err)) continue @@ -628,7 +628,7 @@ func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs if resp == nil { metrics.CmdFailedDurationGetRegion.Observe(time.Since(start).Seconds()) - c.inner.pdSvcDiscovery.ScheduleCheckMemberChanged() + c.inner.serviceDiscovery.ScheduleCheckMemberChanged() errorMsg := fmt.Sprintf("[pd] can't get region info from member URLs: %+v", memberURLs) return nil, errors.WithStack(errors.New(errorMsg)) } @@ -1170,7 +1170,7 @@ func (c *client) SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...o func (c *client) requestHeader() *pdpb.RequestHeader { return &pdpb.RequestHeader{ - ClusterId: c.inner.pdSvcDiscovery.GetClusterID(), + ClusterId: c.inner.serviceDiscovery.GetClusterID(), CallerId: string(caller.GetCallerID()), CallerComponent: string(c.callerComponent), } @@ -1354,7 +1354,7 @@ func (c *client) respForErr(observer prometheus.Observer, start time.Time, err e if err != nil || header.GetError() != nil { observer.Observe(time.Since(start).Seconds()) if err != nil { - c.inner.pdSvcDiscovery.ScheduleCheckMemberChanged() + c.inner.serviceDiscovery.ScheduleCheckMemberChanged() return errors.WithStack(err) } return errors.WithStack(errors.New(header.GetError().String())) diff --git a/client/clients/tso/client.go b/client/clients/tso/client.go index c26dd25f2ad..5c4e2ed1d4b 100644 --- a/client/clients/tso/client.go +++ b/client/clients/tso/client.go @@ -531,7 +531,7 @@ func (c *Cli) DispatchRequest(request *Request) (bool, error) { // Client is closed, no need to retry. return false, request.clientCtx.Err() case <-c.ctx.Done(): - // tsoClient is closed due to the PD service mode switch, which is retryable. + // tsoClient is closed due to the service mode switch, which is retryable. return true, c.ctx.Err() default: // This failpoint will increase the possibility that the request is sent to a closed dispatcher. diff --git a/client/clients/tso/dispatcher_test.go b/client/clients/tso/dispatcher_test.go index cefc53f3944..d4cb7bdfcaa 100644 --- a/client/clients/tso/dispatcher_test.go +++ b/client/clients/tso/dispatcher_test.go @@ -51,7 +51,7 @@ func (m *mockTSOServiceProvider) getOption() *opt.Option { } func (*mockTSOServiceProvider) getServiceDiscovery() sd.ServiceDiscovery { - return sd.NewMockPDServiceDiscovery([]string{mockStreamURL}, nil) + return sd.NewMockServiceDiscovery([]string{mockStreamURL}, nil) } func (m *mockTSOServiceProvider) updateConnectionCtxs(ctx context.Context, connectionCtxs *sync.Map) bool { diff --git a/client/http/client.go b/client/http/client.go index 87746e3bcea..b7109166a30 100644 --- a/client/http/client.go +++ b/client/http/client.go @@ -304,7 +304,7 @@ func WithMetrics( } } -// NewClientWithServiceDiscovery creates a PD HTTP client with the given PD service discovery. +// NewClientWithServiceDiscovery creates a PD HTTP client with the given service discovery. func NewClientWithServiceDiscovery( source string, sd sd.ServiceDiscovery, @@ -332,7 +332,7 @@ func NewClient( for _, opt := range opts { opt(c) } - sd := sd.NewDefaultPDServiceDiscovery(ctx, cancel, pdAddrs, c.inner.tlsConf) + sd := sd.NewDefaultServiceDiscovery(ctx, cancel, pdAddrs, c.inner.tlsConf) if err := sd.Init(); err != nil { log.Error("[pd] init service discovery failed", zap.String("source", source), zap.Strings("pd-addrs", pdAddrs), zap.Error(err)) @@ -420,7 +420,7 @@ func NewHTTPClientWithRequestChecker(checker requestChecker) *http.Client { } } -// newClientWithMockServiceDiscovery creates a new PD HTTP client with a mock PD service discovery. +// newClientWithMockServiceDiscovery creates a new PD HTTP client with a mock service discovery. func newClientWithMockServiceDiscovery( source string, pdAddrs []string, @@ -432,7 +432,7 @@ func newClientWithMockServiceDiscovery( for _, opt := range opts { opt(c) } - sd := sd.NewMockPDServiceDiscovery(pdAddrs, c.inner.tlsConf) + sd := sd.NewMockServiceDiscovery(pdAddrs, c.inner.tlsConf) if err := sd.Init(); err != nil { log.Error("[pd] init mock service discovery failed", zap.String("source", source), zap.Strings("pd-addrs", pdAddrs), zap.Error(err)) diff --git a/client/inner_client.go b/client/inner_client.go index d898c5de157..ece21e93563 100644 --- a/client/inner_client.go +++ b/client/inner_client.go @@ -31,7 +31,7 @@ const ( type innerClient struct { keyspaceID uint32 svrUrls []string - pdSvcDiscovery sd.ServiceDiscovery + serviceDiscovery sd.ServiceDiscovery tokenDispatcher *tokenDispatcher regionMetaCircuitBreaker *cb.CircuitBreaker[*pdpb.GetRegionResponse] @@ -49,13 +49,13 @@ type innerClient struct { } func (c *innerClient) init(updateKeyspaceIDCb sd.UpdateKeyspaceIDFunc) error { - c.pdSvcDiscovery = sd.NewPDServiceDiscovery( + c.serviceDiscovery = sd.NewServiceDiscovery( c.ctx, c.cancel, &c.wg, c.setServiceMode, updateKeyspaceIDCb, c.keyspaceID, c.svrUrls, c.tlsCfg, c.option) if err := c.setup(); err != nil { c.cancel() - if c.pdSvcDiscovery != nil { - c.pdSvcDiscovery.Close() + if c.serviceDiscovery != nil { + c.serviceDiscovery.Close() } return err } @@ -97,10 +97,10 @@ func (c *innerClient) resetTSOClientLocked(mode pdpb.ServiceMode) { switch mode { case pdpb.ServiceMode_PD_SVC_MODE: newTSOCli = tso.NewClient(c.ctx, c.option, - c.pdSvcDiscovery, &tso.PDStreamBuilderFactory{}) + c.serviceDiscovery, &tso.PDStreamBuilderFactory{}) case pdpb.ServiceMode_API_SVC_MODE: newTSOSvcDiscovery = sd.NewTSOServiceDiscovery( - c.ctx, c, c.pdSvcDiscovery, + c.ctx, c, c.serviceDiscovery, c.keyspaceID, c.tlsCfg, c.option) // At this point, the keyspace group isn't known yet. Starts from the default keyspace group, // and will be updated later. @@ -124,7 +124,7 @@ func (c *innerClient) resetTSOClientLocked(mode pdpb.ServiceMode) { oldTSOClient.Close() // Replace the old TSO service discovery if needed. oldTSOSvcDiscovery := c.tsoSvcDiscovery - // If newTSOSvcDiscovery is nil, that's expected, as it means we are switching to PD service mode and + // If newTSOSvcDiscovery is nil, that's expected, as it means we are switching to PD mode and // no tso microservice discovery is needed. c.tsoSvcDiscovery = newTSOSvcDiscovery // Close the old TSO service discovery safely after both the old client and service discovery are replaced. @@ -158,7 +158,7 @@ func (c *innerClient) close() { c.wg.Wait() c.serviceModeKeeper.close() - c.pdSvcDiscovery.Close() + c.serviceDiscovery.Close() if c.tokenDispatcher != nil { tokenErr := errors.WithStack(errs.ErrClosing) @@ -174,12 +174,12 @@ func (c *innerClient) setup() error { } // Init the client base. - if err := c.pdSvcDiscovery.Init(); err != nil { + if err := c.serviceDiscovery.Init(); err != nil { return err } // Register callbacks - c.pdSvcDiscovery.AddServingURLSwitchedCallback(c.scheduleUpdateTokenConnection) + c.serviceDiscovery.AddServingURLSwitchedCallback(c.scheduleUpdateTokenConnection) // Create dispatchers c.createTokenDispatcher() @@ -191,12 +191,12 @@ func (c *innerClient) setup() error { func (c *innerClient) getRegionAPIClientAndContext(ctx context.Context, allowFollower bool) (sd.ServiceClient, context.Context) { var serviceClient sd.ServiceClient if allowFollower { - serviceClient = c.pdSvcDiscovery.GetServiceClientByKind(sd.UniversalAPIKind) + serviceClient = c.serviceDiscovery.GetServiceClientByKind(sd.UniversalAPIKind) if serviceClient != nil { return serviceClient, serviceClient.BuildGRPCTargetContext(ctx, !allowFollower) } } - serviceClient = c.pdSvcDiscovery.GetServiceClient() + serviceClient = c.serviceDiscovery.GetServiceClient() if serviceClient == nil || serviceClient.GetClientConn() == nil { return nil, ctx } @@ -206,12 +206,12 @@ func (c *innerClient) getRegionAPIClientAndContext(ctx context.Context, allowFol // gRPCErrorHandler is used to handle the gRPC error returned by the resource manager service. func (c *innerClient) gRPCErrorHandler(err error) { if errs.IsLeaderChange(err) { - c.pdSvcDiscovery.ScheduleCheckMemberChanged() + c.serviceDiscovery.ScheduleCheckMemberChanged() } } func (c *innerClient) getOrCreateGRPCConn() (*grpc.ClientConn, error) { - cc, err := c.pdSvcDiscovery.GetOrCreateGRPCConn(c.pdSvcDiscovery.GetServingURL()) + cc, err := c.serviceDiscovery.GetOrCreateGRPCConn(c.serviceDiscovery.GetServingURL()) if err != nil { return nil, err } diff --git a/client/keyspace_client.go b/client/keyspace_client.go index 84bc29054eb..507279e906c 100644 --- a/client/keyspace_client.go +++ b/client/keyspace_client.go @@ -41,7 +41,7 @@ type KeyspaceClient interface { // keyspaceClient returns the KeyspaceClient from current PD leader. func (c *client) keyspaceClient() keyspacepb.KeyspaceClient { - if client := c.inner.pdSvcDiscovery.GetServingEndpointClientConn(); client != nil { + if client := c.inner.serviceDiscovery.GetServingEndpointClientConn(); client != nil { return keyspacepb.NewKeyspaceClient(client) } return nil @@ -70,7 +70,7 @@ func (c *client) LoadKeyspace(ctx context.Context, name string) (*keyspacepb.Key if err != nil { metrics.CmdFailedDurationLoadKeyspace.Observe(time.Since(start).Seconds()) - c.inner.pdSvcDiscovery.ScheduleCheckMemberChanged() + c.inner.serviceDiscovery.ScheduleCheckMemberChanged() return nil, err } @@ -115,7 +115,7 @@ func (c *client) UpdateKeyspaceState(ctx context.Context, id uint32, state keysp if err != nil { metrics.CmdFailedDurationUpdateKeyspaceState.Observe(time.Since(start).Seconds()) - c.inner.pdSvcDiscovery.ScheduleCheckMemberChanged() + c.inner.serviceDiscovery.ScheduleCheckMemberChanged() return nil, err } @@ -159,7 +159,7 @@ func (c *client) GetAllKeyspaces(ctx context.Context, startID uint32, limit uint if err != nil { metrics.CmdDurationGetAllKeyspaces.Observe(time.Since(start).Seconds()) - c.inner.pdSvcDiscovery.ScheduleCheckMemberChanged() + c.inner.serviceDiscovery.ScheduleCheckMemberChanged() return nil, err } diff --git a/client/meta_storage_client.go b/client/meta_storage_client.go index fbabd60debd..7652884720d 100644 --- a/client/meta_storage_client.go +++ b/client/meta_storage_client.go @@ -33,7 +33,7 @@ import ( // metaStorageClient gets the meta storage client from current PD leader. func (c *innerClient) metaStorageClient() meta_storagepb.MetaStorageClient { - if client := c.pdSvcDiscovery.GetServingEndpointClientConn(); client != nil { + if client := c.serviceDiscovery.GetServingEndpointClientConn(); client != nil { return meta_storagepb.NewMetaStorageClient(client) } return nil @@ -74,7 +74,7 @@ func (c *innerClient) Put(ctx context.Context, key, value []byte, opts ...opt.Me Lease: options.Lease, PrevKv: options.PrevKv, } - ctx = grpcutil.BuildForwardContext(ctx, c.pdSvcDiscovery.GetServingURL()) + ctx = grpcutil.BuildForwardContext(ctx, c.serviceDiscovery.GetServingURL()) cli := c.metaStorageClient() if cli == nil { cancel() @@ -113,7 +113,7 @@ func (c *innerClient) Get(ctx context.Context, key []byte, opts ...opt.MetaStora Limit: options.Limit, Revision: options.Revision, } - ctx = grpcutil.BuildForwardContext(ctx, c.pdSvcDiscovery.GetServingURL()) + ctx = grpcutil.BuildForwardContext(ctx, c.serviceDiscovery.GetServingURL()) cli := c.metaStorageClient() if cli == nil { cancel() @@ -179,7 +179,7 @@ func (c *innerClient) respForMetaStorageErr(observer prometheus.Observer, start if err != nil || header.GetError() != nil { observer.Observe(time.Since(start).Seconds()) if err != nil { - c.pdSvcDiscovery.ScheduleCheckMemberChanged() + c.serviceDiscovery.ScheduleCheckMemberChanged() return errors.WithStack(err) } return errors.WithStack(errors.New(header.GetError().String())) diff --git a/client/resource_manager_client.go b/client/resource_manager_client.go index 0c481631b93..3e4cd1a3cc8 100644 --- a/client/resource_manager_client.go +++ b/client/resource_manager_client.go @@ -331,7 +331,7 @@ func (c *innerClient) handleResourceTokenDispatcher(dispatcherCtx context.Contex // If the stream is still nil, return an error. if stream == nil { firstRequest.done <- errors.Errorf("failed to get the stream connection") - c.pdSvcDiscovery.ScheduleCheckMemberChanged() + c.serviceDiscovery.ScheduleCheckMemberChanged() connection.reset() continue } @@ -343,7 +343,7 @@ func (c *innerClient) handleResourceTokenDispatcher(dispatcherCtx context.Contex default: } if err = c.processTokenRequests(stream, firstRequest); err != nil { - c.pdSvcDiscovery.ScheduleCheckMemberChanged() + c.serviceDiscovery.ScheduleCheckMemberChanged() connection.reset() log.Info("[resource_manager] token request error", zap.Error(err)) } diff --git a/client/servicediscovery/mock_pd_service_discovery.go b/client/servicediscovery/mock_service_discovery.go similarity index 57% rename from client/servicediscovery/mock_pd_service_discovery.go rename to client/servicediscovery/mock_service_discovery.go index 87b74ae2136..6ca649f4575 100644 --- a/client/servicediscovery/mock_pd_service_discovery.go +++ b/client/servicediscovery/mock_service_discovery.go @@ -21,24 +21,24 @@ import ( "google.golang.org/grpc" ) -var _ ServiceDiscovery = (*mockPDServiceDiscovery)(nil) +var _ ServiceDiscovery = (*mockServiceDiscovery)(nil) -type mockPDServiceDiscovery struct { +type mockServiceDiscovery struct { urls []string tlsCfg *tls.Config clients []ServiceClient } -// NewMockPDServiceDiscovery creates a mock PD service discovery. -func NewMockPDServiceDiscovery(urls []string, tlsCfg *tls.Config) *mockPDServiceDiscovery { - return &mockPDServiceDiscovery{ +// NewMockServiceDiscovery creates a mock service discovery. +func NewMockServiceDiscovery(urls []string, tlsCfg *tls.Config) *mockServiceDiscovery { + return &mockServiceDiscovery{ urls: urls, tlsCfg: tlsCfg, } } // Init directly creates the service clients with the given URLs. -func (m *mockPDServiceDiscovery) Init() error { +func (m *mockServiceDiscovery) Init() error { m.clients = make([]ServiceClient, 0, len(m.urls)) for _, url := range m.urls { m.clients = append(m.clients, newPDServiceClient(url, m.urls[0], nil, false)) @@ -47,61 +47,61 @@ func (m *mockPDServiceDiscovery) Init() error { } // Close clears the service clients. -func (m *mockPDServiceDiscovery) Close() { +func (m *mockServiceDiscovery) Close() { clear(m.clients) } -// GetAllServiceClients returns all service clients init in the mock PD service discovery. -func (m *mockPDServiceDiscovery) GetAllServiceClients() []ServiceClient { +// GetAllServiceClients returns all service clients init in the mock service discovery. +func (m *mockServiceDiscovery) GetAllServiceClients() []ServiceClient { return m.clients } // GetClusterID implements the ServiceDiscovery interface. -func (*mockPDServiceDiscovery) GetClusterID() uint64 { return 0 } +func (*mockServiceDiscovery) GetClusterID() uint64 { return 0 } // GetKeyspaceID implements the ServiceDiscovery interface. -func (*mockPDServiceDiscovery) GetKeyspaceID() uint32 { return 0 } +func (*mockServiceDiscovery) GetKeyspaceID() uint32 { return 0 } // SetKeyspaceID implements the ServiceDiscovery interface. -func (*mockPDServiceDiscovery) SetKeyspaceID(uint32) {} +func (*mockServiceDiscovery) SetKeyspaceID(uint32) {} // GetKeyspaceGroupID implements the ServiceDiscovery interface. -func (*mockPDServiceDiscovery) GetKeyspaceGroupID() uint32 { return 0 } +func (*mockServiceDiscovery) GetKeyspaceGroupID() uint32 { return 0 } // GetServiceURLs implements the ServiceDiscovery interface. -func (*mockPDServiceDiscovery) GetServiceURLs() []string { return nil } +func (*mockServiceDiscovery) GetServiceURLs() []string { return nil } // GetServingEndpointClientConn implements the ServiceDiscovery interface. -func (*mockPDServiceDiscovery) GetServingEndpointClientConn() *grpc.ClientConn { return nil } +func (*mockServiceDiscovery) GetServingEndpointClientConn() *grpc.ClientConn { return nil } // GetClientConns implements the ServiceDiscovery interface. -func (*mockPDServiceDiscovery) GetClientConns() *sync.Map { return nil } +func (*mockServiceDiscovery) GetClientConns() *sync.Map { return nil } // GetServingURL implements the ServiceDiscovery interface. -func (*mockPDServiceDiscovery) GetServingURL() string { return "" } +func (*mockServiceDiscovery) GetServingURL() string { return "" } // GetBackupURLs implements the ServiceDiscovery interface. -func (*mockPDServiceDiscovery) GetBackupURLs() []string { return nil } +func (*mockServiceDiscovery) GetBackupURLs() []string { return nil } // GetServiceClient implements the ServiceDiscovery interface. -func (*mockPDServiceDiscovery) GetServiceClient() ServiceClient { return nil } +func (*mockServiceDiscovery) GetServiceClient() ServiceClient { return nil } // GetServiceClientByKind implements the ServiceDiscovery interface. -func (*mockPDServiceDiscovery) GetServiceClientByKind(APIKind) ServiceClient { return nil } +func (*mockServiceDiscovery) GetServiceClientByKind(APIKind) ServiceClient { return nil } // GetOrCreateGRPCConn implements the ServiceDiscovery interface. -func (*mockPDServiceDiscovery) GetOrCreateGRPCConn(string) (*grpc.ClientConn, error) { +func (*mockServiceDiscovery) GetOrCreateGRPCConn(string) (*grpc.ClientConn, error) { return nil, nil } // ScheduleCheckMemberChanged implements the ServiceDiscovery interface. -func (*mockPDServiceDiscovery) ScheduleCheckMemberChanged() {} +func (*mockServiceDiscovery) ScheduleCheckMemberChanged() {} // CheckMemberChanged implements the ServiceDiscovery interface. -func (*mockPDServiceDiscovery) CheckMemberChanged() error { return nil } +func (*mockServiceDiscovery) CheckMemberChanged() error { return nil } // AddServingURLSwitchedCallback implements the ServiceDiscovery interface. -func (*mockPDServiceDiscovery) AddServingURLSwitchedCallback(...func()) {} +func (*mockServiceDiscovery) AddServingURLSwitchedCallback(...func()) {} // AddServiceURLsSwitchedCallback implements the ServiceDiscovery interface. -func (*mockPDServiceDiscovery) AddServiceURLsSwitchedCallback(...func()) {} +func (*mockServiceDiscovery) AddServiceURLsSwitchedCallback(...func()) {} diff --git a/client/servicediscovery/pd_service_discovery.go b/client/servicediscovery/service_discovery.go similarity index 82% rename from client/servicediscovery/pd_service_discovery.go rename to client/servicediscovery/service_discovery.go index a95b57f9469..dd3e6b27c1c 100644 --- a/client/servicediscovery/pd_service_discovery.go +++ b/client/servicediscovery/service_discovery.go @@ -157,11 +157,11 @@ type ServiceClient interface { } var ( - _ ServiceClient = (*pdServiceClient)(nil) - _ ServiceClient = (*pdServiceAPIClient)(nil) + _ ServiceClient = (*serviceClient)(nil) + _ ServiceClient = (*serviceAPIClient)(nil) ) -type pdServiceClient struct { +type serviceClient struct { url string conn *grpc.ClientConn isLeader bool @@ -171,10 +171,10 @@ type pdServiceClient struct { } // NOTE: In the current implementation, the URL passed in is bound to have a scheme, -// because it is processed in `newPDServiceDiscovery`, and the url returned by etcd member owns the scheme. +// because it is processed in `newServiceDiscovery`, and the url returned by etcd member owns the scheme. // When testing, the URL is also bound to have a scheme. func newPDServiceClient(url, leaderURL string, conn *grpc.ClientConn, isLeader bool) ServiceClient { - cli := &pdServiceClient{ + cli := &serviceClient{ url: url, conn: conn, isLeader: isLeader, @@ -187,7 +187,7 @@ func newPDServiceClient(url, leaderURL string, conn *grpc.ClientConn, isLeader b } // GetURL implements ServiceClient. -func (c *pdServiceClient) GetURL() string { +func (c *serviceClient) GetURL() string { if c == nil { return "" } @@ -195,7 +195,7 @@ func (c *pdServiceClient) GetURL() string { } // BuildGRPCTargetContext implements ServiceClient. -func (c *pdServiceClient) BuildGRPCTargetContext(ctx context.Context, toLeader bool) context.Context { +func (c *serviceClient) BuildGRPCTargetContext(ctx context.Context, toLeader bool) context.Context { if c == nil || c.isLeader { return ctx } @@ -206,7 +206,7 @@ func (c *pdServiceClient) BuildGRPCTargetContext(ctx context.Context, toLeader b } // IsConnectedToLeader implements ServiceClient. -func (c *pdServiceClient) IsConnectedToLeader() bool { +func (c *serviceClient) IsConnectedToLeader() bool { if c == nil { return false } @@ -214,14 +214,14 @@ func (c *pdServiceClient) IsConnectedToLeader() bool { } // Available implements ServiceClient. -func (c *pdServiceClient) Available() bool { +func (c *serviceClient) Available() bool { if c == nil { return false } return !c.networkFailure.Load() } -func (c *pdServiceClient) checkNetworkAvailable(ctx context.Context) { +func (c *serviceClient) checkNetworkAvailable(ctx context.Context) { if c == nil || c.conn == nil { return } @@ -242,7 +242,7 @@ func (c *pdServiceClient) checkNetworkAvailable(ctx context.Context) { } // GetClientConn implements ServiceClient. -func (c *pdServiceClient) GetClientConn() *grpc.ClientConn { +func (c *serviceClient) GetClientConn() *grpc.ClientConn { if c == nil { return nil } @@ -250,7 +250,7 @@ func (c *pdServiceClient) GetClientConn() *grpc.ClientConn { } // NeedRetry implements ServiceClient. -func (c *pdServiceClient) NeedRetry(pdErr *pdpb.Error, err error) bool { +func (c *serviceClient) NeedRetry(pdErr *pdpb.Error, err error) bool { if c.IsConnectedToLeader() { return false } @@ -267,9 +267,9 @@ func regionAPIErrorFn(pdErr *pdpb.Error) bool { return pdErr.GetType() == pdpb.ErrorType_REGION_NOT_FOUND } -// pdServiceAPIClient is a specific API client for PD service. -// It extends the pdServiceClient and adds additional fields for managing availability -type pdServiceAPIClient struct { +// serviceAPIClient is a specific API client for service. +// It extends the serviceClient and adds additional fields for managing availability +type serviceAPIClient struct { ServiceClient fn errFn @@ -278,19 +278,19 @@ type pdServiceAPIClient struct { } func newPDServiceAPIClient(client ServiceClient, f errFn) ServiceClient { - return &pdServiceAPIClient{ + return &serviceAPIClient{ ServiceClient: client, fn: f, } } // Available implements ServiceClient. -func (c *pdServiceAPIClient) Available() bool { +func (c *serviceAPIClient) Available() bool { return c.ServiceClient.Available() && !c.unavailable.Load() } // markAsAvailable is used to try to mark the client as available if unavailable status is expired. -func (c *pdServiceAPIClient) markAsAvailable() { +func (c *serviceAPIClient) markAsAvailable() { if !c.unavailable.Load() { return } @@ -301,7 +301,7 @@ func (c *pdServiceAPIClient) markAsAvailable() { } // NeedRetry implements ServiceClient. -func (c *pdServiceAPIClient) NeedRetry(pdErr *pdpb.Error, err error) bool { +func (c *serviceAPIClient) NeedRetry(pdErr *pdpb.Error, err error) bool { if c.IsConnectedToLeader() { return false } @@ -317,43 +317,43 @@ func (c *pdServiceAPIClient) NeedRetry(pdErr *pdpb.Error, err error) bool { return true } -// pdServiceBalancerNode is a balancer node for PD service. -// It extends the pdServiceClient and adds additional fields for the next polling client in the chain. -type pdServiceBalancerNode struct { - *pdServiceAPIClient - next *pdServiceBalancerNode +// serviceBalancerNode is a balancer node for PD. +// It extends the serviceClient and adds additional fields for the next polling client in the chain. +type serviceBalancerNode struct { + *serviceAPIClient + next *serviceBalancerNode } -// pdServiceBalancer is a load balancer for PD service clients. -// It is used to balance the request to all servers and manage the connections to multiple PD service nodes. -type pdServiceBalancer struct { +// serviceBalancer is a load balancer for clients. +// It is used to balance the request to all servers and manage the connections to multiple nodes. +type serviceBalancer struct { mu sync.Mutex - now *pdServiceBalancerNode + now *serviceBalancerNode totalNode int errFn errFn } -func newPDServiceBalancer(fn errFn) *pdServiceBalancer { - return &pdServiceBalancer{ +func newServiceBalancer(fn errFn) *serviceBalancer { + return &serviceBalancer{ errFn: fn, } } -func (c *pdServiceBalancer) set(clients []ServiceClient) { +func (c *serviceBalancer) set(clients []ServiceClient) { c.mu.Lock() defer c.mu.Unlock() if len(clients) == 0 { return } c.totalNode = len(clients) - head := &pdServiceBalancerNode{ - pdServiceAPIClient: newPDServiceAPIClient(clients[c.totalNode-1], c.errFn).(*pdServiceAPIClient), + head := &serviceBalancerNode{ + serviceAPIClient: newPDServiceAPIClient(clients[c.totalNode-1], c.errFn).(*serviceAPIClient), } head.next = head last := head for i := c.totalNode - 2; i >= 0; i-- { - next := &pdServiceBalancerNode{ - pdServiceAPIClient: newPDServiceAPIClient(clients[i], c.errFn).(*pdServiceAPIClient), - next: head, + next := &serviceBalancerNode{ + serviceAPIClient: newPDServiceAPIClient(clients[i], c.errFn).(*serviceAPIClient), + next: head, } head = next last.next = head @@ -361,7 +361,7 @@ func (c *pdServiceBalancer) set(clients []ServiceClient) { c.now = head } -func (c *pdServiceBalancer) check() { +func (c *serviceBalancer) check() { c.mu.Lock() defer c.mu.Unlock() for range c.totalNode { @@ -370,11 +370,11 @@ func (c *pdServiceBalancer) check() { } } -func (c *pdServiceBalancer) next() { +func (c *serviceBalancer) next() { c.now = c.now.next } -func (c *pdServiceBalancer) get() (ret ServiceClient) { +func (c *serviceBalancer) get() (ret ServiceClient) { c.mu.Lock() defer c.mu.Unlock() i := 0 @@ -403,22 +403,22 @@ type TSOEventSource interface { } var ( - _ ServiceDiscovery = (*pdServiceDiscovery)(nil) - _ TSOEventSource = (*pdServiceDiscovery)(nil) + _ ServiceDiscovery = (*serviceDiscovery)(nil) + _ TSOEventSource = (*serviceDiscovery)(nil) ) -// pdServiceDiscovery is the service discovery client of PD/PD service which is quorum based -type pdServiceDiscovery struct { +// serviceDiscovery is the service discovery client of PD/PD service which is quorum based +type serviceDiscovery struct { isInitialized bool urls atomic.Value // Store as []string // PD leader - leader atomic.Value // Store as pdServiceClient + leader atomic.Value // Store as serviceClient // PD follower - followers sync.Map // Store as map[string]pdServiceClient + followers sync.Map // Store as map[string]serviceClient // PD leader and PD followers - all atomic.Value // Store as []pdServiceClient - apiCandidateNodes [apiKindCount]*pdServiceBalancer + all atomic.Value // Store as []serviceClient + apiCandidateNodes [apiKindCount]*serviceBalancer // PD follower URLs. Only for tso. followerURLs atomic.Value // Store as []string @@ -450,17 +450,17 @@ type pdServiceDiscovery struct { option *opt.Option } -// NewDefaultPDServiceDiscovery returns a new default PD service discovery-based client. -func NewDefaultPDServiceDiscovery( +// NewDefaultServiceDiscovery returns a new default service discovery-based client. +func NewDefaultServiceDiscovery( ctx context.Context, cancel context.CancelFunc, urls []string, tlsCfg *tls.Config, ) ServiceDiscovery { var wg sync.WaitGroup - return NewPDServiceDiscovery(ctx, cancel, &wg, nil, nil, constants.DefaultKeyspaceID, urls, tlsCfg, opt.NewOption()) + return NewServiceDiscovery(ctx, cancel, &wg, nil, nil, constants.DefaultKeyspaceID, urls, tlsCfg, opt.NewOption()) } -// NewPDServiceDiscovery returns a new PD service discovery-based client. -func NewPDServiceDiscovery( +// NewServiceDiscovery returns a new service discovery-based client. +func NewServiceDiscovery( ctx context.Context, cancel context.CancelFunc, wg *sync.WaitGroup, serviceModeUpdateCb func(pdpb.ServiceMode), @@ -468,12 +468,12 @@ func NewPDServiceDiscovery( keyspaceID uint32, urls []string, tlsCfg *tls.Config, option *opt.Option, ) ServiceDiscovery { - pdsd := &pdServiceDiscovery{ + pdsd := &serviceDiscovery{ checkMembershipCh: make(chan struct{}, 1), ctx: ctx, cancel: cancel, wg: wg, - apiCandidateNodes: [apiKindCount]*pdServiceBalancer{newPDServiceBalancer(emptyErrorFn), newPDServiceBalancer(regionAPIErrorFn)}, + apiCandidateNodes: [apiKindCount]*serviceBalancer{newServiceBalancer(emptyErrorFn), newServiceBalancer(regionAPIErrorFn)}, serviceModeUpdateCb: serviceModeUpdateCb, updateKeyspaceIDFunc: updateKeyspaceIDFunc, keyspaceID: keyspaceID, @@ -485,8 +485,8 @@ func NewPDServiceDiscovery( return pdsd } -// Init initializes the PD service discovery. -func (c *pdServiceDiscovery) Init() error { +// Init initializes the service discovery. +func (c *serviceDiscovery) Init() error { if c.isInitialized { return nil } @@ -522,7 +522,7 @@ func (c *pdServiceDiscovery) Init() error { return nil } -func (c *pdServiceDiscovery) initRetry(f func() error) error { +func (c *serviceDiscovery) initRetry(f func() error) error { var err error ticker := time.NewTicker(time.Second) defer ticker.Stop() @@ -539,7 +539,7 @@ func (c *pdServiceDiscovery) initRetry(f func() error) error { return errors.WithStack(err) } -func (c *pdServiceDiscovery) updateMemberLoop() { +func (c *serviceDiscovery) updateMemberLoop() { defer c.wg.Done() ctx, cancel := context.WithCancel(c.ctx) @@ -563,7 +563,7 @@ func (c *pdServiceDiscovery) updateMemberLoop() { } } -func (c *pdServiceDiscovery) updateServiceModeLoop() { +func (c *serviceDiscovery) updateServiceModeLoop() { defer c.wg.Done() failpoint.Inject("skipUpdateServiceMode", func() { failpoint.Return() @@ -595,7 +595,7 @@ func (c *pdServiceDiscovery) updateServiceModeLoop() { } } -func (c *pdServiceDiscovery) memberHealthCheckLoop() { +func (c *serviceDiscovery) memberHealthCheckLoop() { defer c.wg.Done() memberCheckLoopCtx, memberCheckLoopCancel := context.WithCancel(c.ctx) @@ -615,19 +615,19 @@ func (c *pdServiceDiscovery) memberHealthCheckLoop() { } } -func (c *pdServiceDiscovery) checkLeaderHealth(ctx context.Context) { +func (c *serviceDiscovery) checkLeaderHealth(ctx context.Context) { ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) defer cancel() leader := c.getLeaderServiceClient() leader.checkNetworkAvailable(ctx) } -func (c *pdServiceDiscovery) checkFollowerHealth(ctx context.Context) { +func (c *serviceDiscovery) checkFollowerHealth(ctx context.Context) { c.followers.Range(func(_, value any) bool { // To ensure that the leader's healthy check is not delayed, shorten the duration. ctx, cancel := context.WithTimeout(ctx, MemberHealthCheckInterval/3) defer cancel() - serviceClient := value.(*pdServiceClient) + serviceClient := value.(*serviceClient) serviceClient.checkNetworkAvailable(ctx) return true }) @@ -637,12 +637,12 @@ func (c *pdServiceDiscovery) checkFollowerHealth(ctx context.Context) { } // Close releases all resources. -func (c *pdServiceDiscovery) Close() { +func (c *serviceDiscovery) Close() { if c == nil { return } c.closeOnce.Do(func() { - log.Info("[pd] close pd service discovery client") + log.Info("[pd] close service discovery client") c.clientConns.Range(func(key, cc any) bool { if err := cc.(*grpc.ClientConn).Close(); err != nil { log.Error("[pd] failed to close grpc clientConn", errs.ZapError(errs.ErrCloseGRPCConn, err)) @@ -654,28 +654,28 @@ func (c *pdServiceDiscovery) Close() { } // GetClusterID returns the ClusterID. -func (c *pdServiceDiscovery) GetClusterID() uint64 { +func (c *serviceDiscovery) GetClusterID() uint64 { return c.clusterID } // GetKeyspaceID returns the ID of the keyspace -func (c *pdServiceDiscovery) GetKeyspaceID() uint32 { +func (c *serviceDiscovery) GetKeyspaceID() uint32 { return c.keyspaceID } // SetKeyspaceID sets the ID of the keyspace -func (c *pdServiceDiscovery) SetKeyspaceID(keyspaceID uint32) { +func (c *serviceDiscovery) SetKeyspaceID(keyspaceID uint32) { c.keyspaceID = keyspaceID } // GetKeyspaceGroupID returns the ID of the keyspace group -func (*pdServiceDiscovery) GetKeyspaceGroupID() uint32 { - // PD/PD service only supports the default keyspace group +func (*serviceDiscovery) GetKeyspaceGroupID() uint32 { + // PD only supports the default keyspace group return constants.DefaultKeyspaceGroupID } // DiscoverMicroservice discovers the microservice with the specified type and returns the server urls. -func (c *pdServiceDiscovery) discoverMicroservice(svcType serviceType) (urls []string, err error) { +func (c *serviceDiscovery) discoverMicroservice(svcType serviceType) (urls []string, err error) { switch svcType { case apiService: urls = c.GetServiceURLs() @@ -702,14 +702,14 @@ func (c *pdServiceDiscovery) discoverMicroservice(svcType serviceType) (urls []s // GetServiceURLs returns the URLs of the servers. // For testing use. It should only be called when the client is closed. -func (c *pdServiceDiscovery) GetServiceURLs() []string { +func (c *serviceDiscovery) GetServiceURLs() []string { return c.urls.Load().([]string) } // GetServingEndpointClientConn returns the grpc client connection of the serving endpoint // which is the leader in a quorum-based cluster or the primary in a primary/secondary // configured cluster. -func (c *pdServiceDiscovery) GetServingEndpointClientConn() *grpc.ClientConn { +func (c *serviceDiscovery) GetServingEndpointClientConn() *grpc.ClientConn { if cc, ok := c.clientConns.Load(c.getLeaderURL()); ok { return cc.(*grpc.ClientConn) } @@ -717,32 +717,32 @@ func (c *pdServiceDiscovery) GetServingEndpointClientConn() *grpc.ClientConn { } // GetClientConns returns the mapping {URL -> a gRPC connection} -func (c *pdServiceDiscovery) GetClientConns() *sync.Map { +func (c *serviceDiscovery) GetClientConns() *sync.Map { return &c.clientConns } // GetServingURL returns the leader url -func (c *pdServiceDiscovery) GetServingURL() string { +func (c *serviceDiscovery) GetServingURL() string { return c.getLeaderURL() } // GetBackupURLs gets the URLs of the current reachable followers // in a quorum-based cluster. Used for tso currently. -func (c *pdServiceDiscovery) GetBackupURLs() []string { +func (c *serviceDiscovery) GetBackupURLs() []string { return c.getFollowerURLs() } // getLeaderServiceClient returns the leader ServiceClient. -func (c *pdServiceDiscovery) getLeaderServiceClient() *pdServiceClient { +func (c *serviceDiscovery) getLeaderServiceClient() *serviceClient { leader := c.leader.Load() if leader == nil { return nil } - return leader.(*pdServiceClient) + return leader.(*serviceClient) } // GetServiceClientByKind returns ServiceClient of the specific kind. -func (c *pdServiceDiscovery) GetServiceClientByKind(kind APIKind) ServiceClient { +func (c *serviceDiscovery) GetServiceClientByKind(kind APIKind) ServiceClient { client := c.apiCandidateNodes[kind].get() if client == nil { return nil @@ -751,7 +751,7 @@ func (c *pdServiceDiscovery) GetServiceClientByKind(kind APIKind) ServiceClient } // GetServiceClient returns the leader/primary ServiceClient if it is healthy. -func (c *pdServiceDiscovery) GetServiceClient() ServiceClient { +func (c *serviceDiscovery) GetServiceClient() ServiceClient { leaderClient := c.getLeaderServiceClient() if c.option.EnableForwarding && !leaderClient.Available() { if followerClient := c.GetServiceClientByKind(ForwardAPIKind); followerClient != nil { @@ -766,7 +766,7 @@ func (c *pdServiceDiscovery) GetServiceClient() ServiceClient { } // GetAllServiceClients implements ServiceDiscovery -func (c *pdServiceDiscovery) GetAllServiceClients() []ServiceClient { +func (c *serviceDiscovery) GetAllServiceClients() []ServiceClient { all := c.all.Load() if all == nil { return nil @@ -777,7 +777,7 @@ func (c *pdServiceDiscovery) GetAllServiceClients() []ServiceClient { // ScheduleCheckMemberChanged is used to check if there is any membership // change among the leader and the followers. -func (c *pdServiceDiscovery) ScheduleCheckMemberChanged() { +func (c *serviceDiscovery) ScheduleCheckMemberChanged() { select { case c.checkMembershipCh <- struct{}{}: default: @@ -786,24 +786,24 @@ func (c *pdServiceDiscovery) ScheduleCheckMemberChanged() { // CheckMemberChanged Immediately check if there is any membership change among the leader/followers in a // quorum-based cluster or among the primary/secondaries in a primary/secondary configured cluster. -func (c *pdServiceDiscovery) CheckMemberChanged() error { +func (c *serviceDiscovery) CheckMemberChanged() error { return c.updateMember() } // AddServingURLSwitchedCallback adds callbacks which will be called // when the leader is switched. -func (c *pdServiceDiscovery) AddServingURLSwitchedCallback(callbacks ...func()) { +func (c *serviceDiscovery) AddServingURLSwitchedCallback(callbacks ...func()) { c.leaderSwitchedCbs = append(c.leaderSwitchedCbs, callbacks...) } // AddServiceURLsSwitchedCallback adds callbacks which will be called when // any leader/follower is changed. -func (c *pdServiceDiscovery) AddServiceURLsSwitchedCallback(callbacks ...func()) { +func (c *serviceDiscovery) AddServiceURLsSwitchedCallback(callbacks ...func()) { c.membersChangedCbs = append(c.membersChangedCbs, callbacks...) } // SetTSOLeaderURLUpdatedCallback adds a callback which will be called when the TSO leader is updated. -func (c *pdServiceDiscovery) SetTSOLeaderURLUpdatedCallback(callback tsoLeaderURLUpdatedFunc) { +func (c *serviceDiscovery) SetTSOLeaderURLUpdatedCallback(callback tsoLeaderURLUpdatedFunc) { url := c.getLeaderURL() if len(url) > 0 { if err := callback(url); err != nil { @@ -814,12 +814,12 @@ func (c *pdServiceDiscovery) SetTSOLeaderURLUpdatedCallback(callback tsoLeaderUR } // getLeaderURL returns the leader URL. -func (c *pdServiceDiscovery) getLeaderURL() string { +func (c *serviceDiscovery) getLeaderURL() string { return c.getLeaderServiceClient().GetURL() } // getFollowerURLs returns the follower URLs. -func (c *pdServiceDiscovery) getFollowerURLs() []string { +func (c *serviceDiscovery) getFollowerURLs() []string { followerURLs := c.followerURLs.Load() if followerURLs == nil { return []string{} @@ -827,7 +827,7 @@ func (c *pdServiceDiscovery) getFollowerURLs() []string { return followerURLs.([]string) } -func (c *pdServiceDiscovery) initClusterID() error { +func (c *serviceDiscovery) initClusterID() error { ctx, cancel := context.WithCancel(c.ctx) defer cancel() clusterID := uint64(0) @@ -854,7 +854,7 @@ func (c *pdServiceDiscovery) initClusterID() error { return nil } -func (c *pdServiceDiscovery) checkServiceModeChanged() error { +func (c *serviceDiscovery) checkServiceModeChanged() error { leaderURL := c.getLeaderURL() if len(leaderURL) == 0 { return errors.New("no leader found") @@ -882,7 +882,7 @@ func (c *pdServiceDiscovery) checkServiceModeChanged() error { return nil } -func (c *pdServiceDiscovery) updateMember() error { +func (c *serviceDiscovery) updateMember() error { for _, url := range c.GetServiceURLs() { members, err := c.getMembers(c.ctx, url, UpdateMemberTimeout) // Check the cluster ID. @@ -915,7 +915,7 @@ func (c *pdServiceDiscovery) updateMember() error { return errs.ErrClientGetMember.FastGenByArgs() } -func (c *pdServiceDiscovery) getClusterInfo(ctx context.Context, url string, timeout time.Duration) (*pdpb.GetClusterInfoResponse, error) { +func (c *serviceDiscovery) getClusterInfo(ctx context.Context, url string, timeout time.Duration) (*pdpb.GetClusterInfoResponse, error) { ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() cc, err := c.GetOrCreateGRPCConn(url) @@ -934,7 +934,7 @@ func (c *pdServiceDiscovery) getClusterInfo(ctx context.Context, url string, tim return clusterInfo, nil } -func (c *pdServiceDiscovery) getMembers(ctx context.Context, url string, timeout time.Duration) (*pdpb.GetMembersResponse, error) { +func (c *serviceDiscovery) getMembers(ctx context.Context, url string, timeout time.Duration) (*pdpb.GetMembersResponse, error) { ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() cc, err := c.GetOrCreateGRPCConn(url) @@ -953,7 +953,7 @@ func (c *pdServiceDiscovery) getMembers(ctx context.Context, url string, timeout return members, nil } -func (c *pdServiceDiscovery) updateURLs(members []*pdpb.Member) { +func (c *serviceDiscovery) updateURLs(members []*pdpb.Member) { urls := make([]string, 0, len(members)) for _, m := range members { urls = append(urls, m.GetClientUrls()...) @@ -976,7 +976,7 @@ func (c *pdServiceDiscovery) updateURLs(members []*pdpb.Member) { log.Info("[pd] update member urls", zap.Strings("old-urls", oldURLs), zap.Strings("new-urls", urls)) } -func (c *pdServiceDiscovery) switchLeader(url string) (bool, error) { +func (c *serviceDiscovery) switchLeader(url string) (bool, error) { oldLeader := c.getLeaderServiceClient() if url == oldLeader.GetURL() && oldLeader.GetClientConn() != nil { return false, nil @@ -1001,10 +1001,10 @@ func (c *pdServiceDiscovery) switchLeader(url string) (bool, error) { return true, err } -func (c *pdServiceDiscovery) updateFollowers(members []*pdpb.Member, leaderID uint64, leaderURL string) (changed bool) { - followers := make(map[string]*pdServiceClient) +func (c *serviceDiscovery) updateFollowers(members []*pdpb.Member, leaderID uint64, leaderURL string) (changed bool) { + followers := make(map[string]*serviceClient) c.followers.Range(func(key, value any) bool { - followers[key.(string)] = value.(*pdServiceClient) + followers[key.(string)] = value.(*serviceClient) return true }) var followerURLs []string @@ -1017,7 +1017,7 @@ func (c *pdServiceDiscovery) updateFollowers(members []*pdpb.Member, leaderID ui // FIXME: How to safely compare urls(also for leader)? For now, only allows one client url. url := tlsutil.PickMatchedURL(member.GetClientUrls(), c.tlsCfg) if client, ok := c.followers.Load(url); ok { - if client.(*pdServiceClient).GetClientConn() == nil { + if client.(*serviceClient).GetClientConn() == nil { conn, err := c.GetOrCreateGRPCConn(url) if err != nil || conn == nil { log.Warn("[pd] failed to connect follower", zap.String("follower", url), errs.ZapError(err)) @@ -1050,7 +1050,7 @@ func (c *pdServiceDiscovery) updateFollowers(members []*pdpb.Member, leaderID ui return } -func (c *pdServiceDiscovery) updateServiceClient(members []*pdpb.Member, leader *pdpb.Member) error { +func (c *serviceDiscovery) updateServiceClient(members []*pdpb.Member, leader *pdpb.Member) error { // FIXME: How to safely compare leader urls? For now, only allows one client url. leaderURL := tlsutil.PickMatchedURL(leader.GetClientUrls(), c.tlsCfg) leaderChanged, err := c.switchLeader(leaderURL) @@ -1066,7 +1066,7 @@ func (c *pdServiceDiscovery) updateServiceClient(members []*pdpb.Member, leader clients = append(clients, leaderClient) } c.followers.Range(func(_, value any) bool { - clients = append(clients, value.(*pdServiceClient)) + clients = append(clients, value.(*serviceClient)) return true }) c.all.Store(clients) @@ -1078,6 +1078,6 @@ func (c *pdServiceDiscovery) updateServiceClient(members []*pdpb.Member, leader } // GetOrCreateGRPCConn returns the corresponding grpc client connection of the given URL. -func (c *pdServiceDiscovery) GetOrCreateGRPCConn(url string) (*grpc.ClientConn, error) { +func (c *serviceDiscovery) GetOrCreateGRPCConn(url string) (*grpc.ClientConn, error) { return grpcutil.GetOrCreateGRPCConn(c.ctx, &c.clientConns, url, c.tlsCfg, c.option.GRPCDialOptions...) } diff --git a/client/servicediscovery/pd_service_discovery_test.go b/client/servicediscovery/service_discovery_test.go similarity index 96% rename from client/servicediscovery/pd_service_discovery_test.go rename to client/servicediscovery/service_discovery_test.go index dc0a0bd4511..0a678718fdc 100644 --- a/client/servicediscovery/pd_service_discovery_test.go +++ b/client/servicediscovery/service_discovery_test.go @@ -193,14 +193,14 @@ func (suite *serviceClientTestSuite) TestServiceClient() { re.True(leader.IsConnectedToLeader()) re.NoError(failpoint.Enable("github.com/tikv/pd/client/servicediscovery/unreachableNetwork1", "return(true)")) - follower.(*pdServiceClient).checkNetworkAvailable(suite.ctx) - leader.(*pdServiceClient).checkNetworkAvailable(suite.ctx) + follower.(*serviceClient).checkNetworkAvailable(suite.ctx) + leader.(*serviceClient).checkNetworkAvailable(suite.ctx) re.False(follower.Available()) re.False(leader.Available()) re.NoError(failpoint.Disable("github.com/tikv/pd/client/servicediscovery/unreachableNetwork1")) - follower.(*pdServiceClient).checkNetworkAvailable(suite.ctx) - leader.(*pdServiceClient).checkNetworkAvailable(suite.ctx) + follower.(*serviceClient).checkNetworkAvailable(suite.ctx) + leader.(*serviceClient).checkNetworkAvailable(suite.ctx) re.True(follower.Available()) re.True(leader.Available()) @@ -259,11 +259,11 @@ func (suite *serviceClientTestSuite) TestServiceClient() { re.False(leaderAPIClient.NeedRetry(pdErr2, nil)) re.False(followerAPIClient.Available()) re.True(leaderAPIClient.Available()) - followerAPIClient.(*pdServiceAPIClient).markAsAvailable() - leaderAPIClient.(*pdServiceAPIClient).markAsAvailable() + followerAPIClient.(*serviceAPIClient).markAsAvailable() + leaderAPIClient.(*serviceAPIClient).markAsAvailable() re.False(followerAPIClient.Available()) time.Sleep(time.Millisecond * 100) - followerAPIClient.(*pdServiceAPIClient).markAsAvailable() + followerAPIClient.(*serviceAPIClient).markAsAvailable() re.True(followerAPIClient.Available()) re.True(followerAPIClient.NeedRetry(nil, err)) @@ -278,7 +278,7 @@ func (suite *serviceClientTestSuite) TestServiceClientBalancer() { re := suite.Require() follower := suite.followerClient leader := suite.leaderClient - b := &pdServiceBalancer{} + b := &serviceBalancer{} b.set([]ServiceClient{leader, follower}) re.Equal(2, b.totalNode) @@ -400,7 +400,7 @@ func TestUpdateURLs(t *testing.T) { } return } - cli := &pdServiceDiscovery{option: opt.NewOption()} + cli := &serviceDiscovery{option: opt.NewOption()} cli.urls.Store([]string{}) cli.updateURLs(members[1:]) re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2]}), cli.GetServiceURLs()) @@ -421,7 +421,7 @@ func TestGRPCDialOption(t *testing.T) { start := time.Now() ctx, cancel := context.WithTimeout(context.TODO(), 500*time.Millisecond) defer cancel() - cli := &pdServiceDiscovery{ + cli := &serviceDiscovery{ checkMembershipCh: make(chan struct{}, 1), ctx: ctx, cancel: cancel, diff --git a/client/servicediscovery/tso_service_discovery.go b/client/servicediscovery/tso_service_discovery.go index 1d2130db804..7734fd23107 100644 --- a/client/servicediscovery/tso_service_discovery.go +++ b/client/servicediscovery/tso_service_discovery.go @@ -126,10 +126,10 @@ func (t *tsoServerDiscovery) resetFailure() { // tsoServiceDiscovery is the service discovery client of the independent TSO service type tsoServiceDiscovery struct { - metacli metastorage.Client - apiSvcDiscovery ServiceDiscovery - clusterID uint64 - keyspaceID atomic.Uint32 + metacli metastorage.Client + serviceDiscovery ServiceDiscovery + clusterID uint64 + keyspaceID atomic.Uint32 // defaultDiscoveryKey is the etcd path used for discovering the serving endpoints of // the default keyspace group @@ -161,7 +161,7 @@ type tsoServiceDiscovery struct { // NewTSOServiceDiscovery returns a new client-side service discovery for the independent TSO service. func NewTSOServiceDiscovery( - ctx context.Context, metacli metastorage.Client, apiSvcDiscovery ServiceDiscovery, + ctx context.Context, metacli metastorage.Client, serviceDiscovery ServiceDiscovery, keyspaceID uint32, tlsCfg *tls.Config, option *opt.Option, ) ServiceDiscovery { ctx, cancel := context.WithCancel(ctx) @@ -169,8 +169,8 @@ func NewTSOServiceDiscovery( ctx: ctx, cancel: cancel, metacli: metacli, - apiSvcDiscovery: apiSvcDiscovery, - clusterID: apiSvcDiscovery.GetClusterID(), + serviceDiscovery: serviceDiscovery, + clusterID: serviceDiscovery.GetClusterID(), tlsCfg: tlsCfg, option: option, checkMembershipCh: make(chan struct{}, 1), @@ -351,7 +351,7 @@ func (c *tsoServiceDiscovery) ScheduleCheckMemberChanged() { // CheckMemberChanged Immediately check if there is any membership change among the primary/secondaries in // a primary/secondary configured cluster. func (c *tsoServiceDiscovery) CheckMemberChanged() error { - if err := c.apiSvcDiscovery.CheckMemberChanged(); err != nil { + if err := c.serviceDiscovery.CheckMemberChanged(); err != nil { log.Warn("[tso] failed to check member changed", errs.ZapError(err)) } if err := c.retry(tsoQueryRetryMaxTimes, tsoQueryRetryInterval, c.updateMember); err != nil { @@ -382,17 +382,17 @@ func (c *tsoServiceDiscovery) SetTSOLeaderURLUpdatedCallback(callback tsoLeaderU // GetServiceClient implements ServiceDiscovery func (c *tsoServiceDiscovery) GetServiceClient() ServiceClient { - return c.apiSvcDiscovery.GetServiceClient() + return c.serviceDiscovery.GetServiceClient() } // GetServiceClientByKind implements ServiceDiscovery func (c *tsoServiceDiscovery) GetServiceClientByKind(kind APIKind) ServiceClient { - return c.apiSvcDiscovery.GetServiceClientByKind(kind) + return c.serviceDiscovery.GetServiceClientByKind(kind) } // GetAllServiceClients implements ServiceDiscovery func (c *tsoServiceDiscovery) GetAllServiceClients() []ServiceClient { - return c.apiSvcDiscovery.GetAllServiceClients() + return c.serviceDiscovery.GetAllServiceClients() } // getPrimaryURL returns the primary URL. @@ -425,7 +425,7 @@ func (c *tsoServiceDiscovery) afterPrimarySwitched(oldPrimary, newPrimary string func (c *tsoServiceDiscovery) updateMember() error { // The keyspace membership or the primary serving URL of the keyspace group, to which this // keyspace belongs, might have been changed. We need to query tso servers to get the latest info. - tsoServerURL, err := c.getTSOServer(c.apiSvcDiscovery) + tsoServerURL, err := c.getTSOServer(c.serviceDiscovery) if err != nil { log.Error("[tso] failed to get tso server", errs.ZapError(err)) return err @@ -589,7 +589,7 @@ func (c *tsoServiceDiscovery) getTSOServer(sd ServiceDiscovery) (string, error) ) t := c.tsoServerDiscovery if len(t.urls) == 0 || t.failureCount == len(t.urls) { - urls, err = sd.(*pdServiceDiscovery).discoverMicroservice(tsoService) + urls, err = sd.(*serviceDiscovery).discoverMicroservice(tsoService) if err != nil { return "", err } diff --git a/server/grpc_service.go b/server/grpc_service.go index 9f80c0bd849..804c08d95c0 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -271,7 +271,7 @@ func (s *GrpcServer) GetClusterInfo(context.Context, *pdpb.GetClusterInfoRequest }, nil } -// GetMinTS implements gRPC PDServer. In PD service mode, it simply returns a timestamp. +// GetMinTS implements gRPC PDServer. In PD mode, it simply returns a timestamp. // In PD service mode, it queries all tso servers and gets the minimum timestamp across // all keyspace groups. func (s *GrpcServer) GetMinTS( diff --git a/tests/cluster.go b/tests/cluster.go index b5ce735448e..4189b43902a 100644 --- a/tests/cluster.go +++ b/tests/cluster.go @@ -429,8 +429,8 @@ func NewTestCluster(ctx context.Context, initialServerCount int, opts ...ConfigO return createTestCluster(ctx, initialServerCount, nil, opts...) } -// NewTestMSCluster creates a new TestCluster with PD service. -func NewTestMSCluster(ctx context.Context, initialServerCount int, opts ...ConfigOption) (*TestCluster, error) { +// NewTestPDServiceCluster creates a new TestCluster with PD service. +func NewTestPDServiceCluster(ctx context.Context, initialServerCount int, opts ...ConfigOption) (*TestCluster, error) { return createTestCluster(ctx, initialServerCount, []string{constant.PDServiceName}, opts...) } diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index 8528297809c..a6ec24d5364 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -335,7 +335,7 @@ func TestTSOFollowerProxyWithTSOService(t *testing.T) { re.NoError(failpoint.Enable("github.com/tikv/pd/client/servicediscovery/fastUpdateServiceMode", `return(true)`)) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestMSCluster(ctx, 1) + cluster, err := tests.NewTestPDServiceCluster(ctx, 1) re.NoError(err) defer cluster.Destroy() err = cluster.RunInitialServers() diff --git a/tests/integrations/mcs/discovery/register_test.go b/tests/integrations/mcs/discovery/register_test.go index cd353802261..eb8933e10d8 100644 --- a/tests/integrations/mcs/discovery/register_test.go +++ b/tests/integrations/mcs/discovery/register_test.go @@ -54,7 +54,7 @@ func (suite *serverRegisterTestSuite) SetupSuite() { re := suite.Require() suite.ctx, suite.cancel = context.WithCancel(context.Background()) - suite.cluster, err = tests.NewTestMSCluster(suite.ctx, 1) + suite.cluster, err = tests.NewTestPDServiceCluster(suite.ctx, 1) re.NoError(err) err = suite.cluster.RunInitialServers() diff --git a/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go b/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go index b93b4fc4140..b31d919324d 100644 --- a/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go +++ b/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go @@ -60,7 +60,7 @@ func (suite *keyspaceGroupTestSuite) SetupTest() { re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`)) ctx, cancel := context.WithCancel(context.Background()) suite.ctx = ctx - cluster, err := tests.NewTestMSCluster(suite.ctx, 1) + cluster, err := tests.NewTestPDServiceCluster(suite.ctx, 1) suite.cluster = cluster re.NoError(err) re.NoError(cluster.RunInitialServers()) diff --git a/tests/integrations/mcs/members/member_test.go b/tests/integrations/mcs/members/member_test.go index f3de932c640..7e83ea570b9 100644 --- a/tests/integrations/mcs/members/member_test.go +++ b/tests/integrations/mcs/members/member_test.go @@ -64,7 +64,7 @@ func (suite *memberTestSuite) SetupTest() { re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`)) ctx, cancel := context.WithCancel(context.Background()) suite.ctx = ctx - cluster, err := tests.NewTestMSCluster(suite.ctx, 1) + cluster, err := tests.NewTestPDServiceCluster(suite.ctx, 1) suite.cluster = cluster re.NoError(err) re.NoError(cluster.RunInitialServers()) diff --git a/tests/integrations/mcs/scheduling/config_test.go b/tests/integrations/mcs/scheduling/config_test.go index 6465605aa41..6c770d3e4c1 100644 --- a/tests/integrations/mcs/scheduling/config_test.go +++ b/tests/integrations/mcs/scheduling/config_test.go @@ -62,7 +62,7 @@ func (suite *configTestSuite) SetupSuite() { schedulers.Register() var err error suite.ctx, suite.cancel = context.WithCancel(context.Background()) - suite.cluster, err = tests.NewTestMSCluster(suite.ctx, 1) + suite.cluster, err = tests.NewTestPDServiceCluster(suite.ctx, 1) re.NoError(err) err = suite.cluster.RunInitialServers() re.NoError(err) diff --git a/tests/integrations/mcs/scheduling/meta_test.go b/tests/integrations/mcs/scheduling/meta_test.go index 9e38d097bdd..8df576b82ca 100644 --- a/tests/integrations/mcs/scheduling/meta_test.go +++ b/tests/integrations/mcs/scheduling/meta_test.go @@ -53,7 +53,7 @@ func (suite *metaTestSuite) SetupSuite() { re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs", `return(true)`)) var err error suite.ctx, suite.cancel = context.WithCancel(context.Background()) - suite.cluster, err = tests.NewTestMSCluster(suite.ctx, 1) + suite.cluster, err = tests.NewTestPDServiceCluster(suite.ctx, 1) re.NoError(err) err = suite.cluster.RunInitialServers() re.NoError(err) diff --git a/tests/integrations/mcs/scheduling/rule_test.go b/tests/integrations/mcs/scheduling/rule_test.go index c407a742ed9..706c5784831 100644 --- a/tests/integrations/mcs/scheduling/rule_test.go +++ b/tests/integrations/mcs/scheduling/rule_test.go @@ -54,7 +54,7 @@ func (suite *ruleTestSuite) SetupSuite() { var err error suite.ctx, suite.cancel = context.WithCancel(context.Background()) - suite.cluster, err = tests.NewTestMSCluster(suite.ctx, 1) + suite.cluster, err = tests.NewTestPDServiceCluster(suite.ctx, 1) re.NoError(err) err = suite.cluster.RunInitialServers() re.NoError(err) diff --git a/tests/integrations/mcs/scheduling/server_test.go b/tests/integrations/mcs/scheduling/server_test.go index 060d93a3566..9a3d33d1dcf 100644 --- a/tests/integrations/mcs/scheduling/server_test.go +++ b/tests/integrations/mcs/scheduling/server_test.go @@ -66,7 +66,7 @@ func (suite *serverTestSuite) SetupSuite() { re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/scheduling/server/changeRunCollectWaitTime", `return(true)`)) re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs", `return(true)`)) suite.ctx, suite.cancel = context.WithCancel(context.Background()) - suite.cluster, err = tests.NewTestMSCluster(suite.ctx, 1) + suite.cluster, err = tests.NewTestPDServiceCluster(suite.ctx, 1) re.NoError(err) err = suite.cluster.RunInitialServers() @@ -636,7 +636,7 @@ func (suite *multipleServerTestSuite) SetupSuite() { re := suite.Require() re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs", `return(true)`)) suite.ctx, suite.cancel = context.WithCancel(context.Background()) - suite.cluster, err = tests.NewTestMSCluster(suite.ctx, 2) + suite.cluster, err = tests.NewTestPDServiceCluster(suite.ctx, 2) re.NoError(err) err = suite.cluster.RunInitialServers() diff --git a/tests/integrations/mcs/tso/api_test.go b/tests/integrations/mcs/tso/api_test.go index 0e155e3c789..dceb5ccdf7c 100644 --- a/tests/integrations/mcs/tso/api_test.go +++ b/tests/integrations/mcs/tso/api_test.go @@ -62,7 +62,7 @@ func (suite *tsoAPITestSuite) SetupTest() { var err error suite.ctx, suite.cancel = context.WithCancel(context.Background()) - suite.pdCluster, err = tests.NewTestMSCluster(suite.ctx, 1) + suite.pdCluster, err = tests.NewTestPDServiceCluster(suite.ctx, 1) re.NoError(err) err = suite.pdCluster.RunInitialServers() re.NoError(err) @@ -137,7 +137,7 @@ func TestTSOServerStartFirst(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - apiCluster, err := tests.NewTestMSCluster(ctx, 1, func(conf *config.Config, _ string) { + apiCluster, err := tests.NewTestPDServiceCluster(ctx, 1, func(conf *config.Config, _ string) { conf.Keyspace.PreAlloc = []string{"k1", "k2"} }) defer apiCluster.Destroy() @@ -200,7 +200,7 @@ func TestForwardOnlyTSONoScheduling(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - tc, err := tests.NewTestMSCluster(ctx, 1) + tc, err := tests.NewTestPDServiceCluster(ctx, 1) defer tc.Destroy() re.NoError(err) err = tc.RunInitialServers() diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index d1253fb16a1..ecbc0295845 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -82,7 +82,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) SetupSuite() { var err error suite.ctx, suite.cancel = context.WithCancel(context.Background()) - suite.cluster, err = tests.NewTestMSCluster(suite.ctx, 1) + suite.cluster, err = tests.NewTestPDServiceCluster(suite.ctx, 1) re.NoError(err) err = suite.cluster.RunInitialServers() re.NoError(err) @@ -538,7 +538,7 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) { re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastGroupSplitPatroller", `return(true)`)) // Init PD service config but not start. - tc, err := tests.NewTestMSCluster(ctx, 1, func(conf *config.Config, _ string) { + tc, err := tests.NewTestPDServiceCluster(ctx, 1, func(conf *config.Config, _ string) { conf.Keyspace.PreAlloc = []string{ "keyspace_a", "keyspace_b", } @@ -735,7 +735,7 @@ func TestGetTSOImmediately(t *testing.T) { re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastGroupSplitPatroller", `return(true)`)) // Init PD service config but not start. - tc, err := tests.NewTestMSCluster(ctx, 1, func(conf *config.Config, _ string) { + tc, err := tests.NewTestPDServiceCluster(ctx, 1, func(conf *config.Config, _ string) { conf.Keyspace.PreAlloc = []string{ "keyspace_a", "keyspace_b", } diff --git a/tests/integrations/mcs/tso/proxy_test.go b/tests/integrations/mcs/tso/proxy_test.go index 2950047f0ab..50583ebbbb4 100644 --- a/tests/integrations/mcs/tso/proxy_test.go +++ b/tests/integrations/mcs/tso/proxy_test.go @@ -62,7 +62,7 @@ func (s *tsoProxyTestSuite) SetupSuite() { var err error s.ctx, s.cancel = context.WithCancel(context.Background()) // Create an API cluster with 1 server - s.apiCluster, err = tests.NewTestMSCluster(s.ctx, 1) + s.apiCluster, err = tests.NewTestPDServiceCluster(s.ctx, 1) re.NoError(err) err = s.apiCluster.RunInitialServers() re.NoError(err) diff --git a/tests/integrations/mcs/tso/server_test.go b/tests/integrations/mcs/tso/server_test.go index 6bf0787b064..7416a314949 100644 --- a/tests/integrations/mcs/tso/server_test.go +++ b/tests/integrations/mcs/tso/server_test.go @@ -75,7 +75,7 @@ func (suite *tsoServerTestSuite) SetupSuite() { re := suite.Require() suite.ctx, suite.cancel = context.WithCancel(context.Background()) - suite.cluster, err = tests.NewTestMSCluster(suite.ctx, 1) + suite.cluster, err = tests.NewTestPDServiceCluster(suite.ctx, 1) re.NoError(err) err = suite.cluster.RunInitialServers() @@ -168,7 +168,7 @@ func checkTSOPath(re *require.Assertions, isPDServiceMode bool) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() if isPDServiceMode { - cluster, err = tests.NewTestMSCluster(ctx, 1, func(conf *config.Config, _ string) { + cluster, err = tests.NewTestPDServiceCluster(ctx, 1, func(conf *config.Config, _ string) { conf.MicroService.EnableTSODynamicSwitching = false }) } else { @@ -233,7 +233,7 @@ func NewPDServiceForward(re *require.Assertions) PDServiceForward { } var err error suite.ctx, suite.cancel = context.WithCancel(context.Background()) - suite.cluster, err = tests.NewTestMSCluster(suite.ctx, 3) + suite.cluster, err = tests.NewTestPDServiceCluster(suite.ctx, 3) re.NoError(err) err = suite.cluster.RunInitialServers() @@ -512,7 +512,7 @@ func (suite *CommonTestSuite) SetupSuite() { var err error re := suite.Require() suite.ctx, suite.cancel = context.WithCancel(context.Background()) - suite.cluster, err = tests.NewTestMSCluster(suite.ctx, 1) + suite.cluster, err = tests.NewTestPDServiceCluster(suite.ctx, 1) re.NoError(err) err = suite.cluster.RunInitialServers() @@ -598,7 +598,7 @@ func TestTSOServiceSwitch(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - tc, err := tests.NewTestMSCluster(ctx, 1, + tc, err := tests.NewTestPDServiceCluster(ctx, 1, func(conf *config.Config, _ string) { conf.MicroService.EnableTSODynamicSwitching = true }, diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index cfd729edd10..a06e44ed4ab 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -98,7 +98,7 @@ func (suite *tsoClientTestSuite) SetupSuite() { if suite.legacy { suite.cluster, err = tests.NewTestCluster(suite.ctx, serverCount) } else { - suite.cluster, err = tests.NewTestMSCluster(suite.ctx, serverCount, func(conf *config.Config, _ string) { + suite.cluster, err = tests.NewTestPDServiceCluster(suite.ctx, serverCount, func(conf *config.Config, _ string) { conf.MicroService.EnableTSODynamicSwitching = false }) } @@ -544,7 +544,7 @@ func TestUpgradingAPIandTSOClusters(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) // Create an API cluster which has 3 servers - apiCluster, err := tests.NewTestMSCluster(ctx, 3) + apiCluster, err := tests.NewTestPDServiceCluster(ctx, 3) re.NoError(err) err = apiCluster.RunInitialServers() re.NoError(err) diff --git a/tests/integrations/tso/consistency_test.go b/tests/integrations/tso/consistency_test.go index a4fed7d9463..b29ae696f26 100644 --- a/tests/integrations/tso/consistency_test.go +++ b/tests/integrations/tso/consistency_test.go @@ -76,7 +76,7 @@ func (suite *tsoConsistencyTestSuite) SetupSuite() { if suite.legacy { suite.cluster, err = tests.NewTestCluster(suite.ctx, serverCount) } else { - suite.cluster, err = tests.NewTestMSCluster(suite.ctx, serverCount) + suite.cluster, err = tests.NewTestPDServiceCluster(suite.ctx, serverCount) } re.NoError(err) err = suite.cluster.RunInitialServers() diff --git a/tests/integrations/tso/server_test.go b/tests/integrations/tso/server_test.go index 814d1d4b8f0..1428dbcd1a6 100644 --- a/tests/integrations/tso/server_test.go +++ b/tests/integrations/tso/server_test.go @@ -74,7 +74,7 @@ func (suite *tsoServerTestSuite) SetupSuite() { if suite.legacy { suite.cluster, err = tests.NewTestCluster(suite.ctx, serverCount) } else { - suite.cluster, err = tests.NewTestMSCluster(suite.ctx, serverCount) + suite.cluster, err = tests.NewTestPDServiceCluster(suite.ctx, serverCount) } re.NoError(err) err = suite.cluster.RunInitialServers() diff --git a/tests/server/apiv2/handlers/tso_keyspace_group_test.go b/tests/server/apiv2/handlers/tso_keyspace_group_test.go index 03b8c28d879..851df9b5fd1 100644 --- a/tests/server/apiv2/handlers/tso_keyspace_group_test.go +++ b/tests/server/apiv2/handlers/tso_keyspace_group_test.go @@ -42,7 +42,7 @@ func TestKeyspaceGroupTestSuite(t *testing.T) { func (suite *keyspaceGroupTestSuite) SetupTest() { re := suite.Require() suite.ctx, suite.cancel = context.WithCancel(context.Background()) - cluster, err := tests.NewTestMSCluster(suite.ctx, 1) + cluster, err := tests.NewTestPDServiceCluster(suite.ctx, 1) suite.cluster = cluster re.NoError(err) re.NoError(cluster.RunInitialServers()) diff --git a/tests/testutil.go b/tests/testutil.go index 6781d537260..4bbfa8155b4 100644 --- a/tests/testutil.go +++ b/tests/testutil.go @@ -380,7 +380,7 @@ func (s *SchedulingTestEnvironment) startCluster(m SchedulerMode) { re.NoError(leaderServer.BootstrapCluster()) s.clusters[PDMode] = cluster case PDServiceMode: - cluster, err := NewTestMSCluster(ctx, 1, s.opts...) + cluster, err := NewTestPDServiceCluster(ctx, 1, s.opts...) re.NoError(err) err = cluster.RunInitialServers() re.NoError(err) diff --git a/tools/pd-ctl/tests/keyspace/keyspace_group_test.go b/tools/pd-ctl/tests/keyspace/keyspace_group_test.go index 2734348d7de..fff95856931 100644 --- a/tools/pd-ctl/tests/keyspace/keyspace_group_test.go +++ b/tools/pd-ctl/tests/keyspace/keyspace_group_test.go @@ -41,7 +41,7 @@ func TestKeyspaceGroup(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - tc, err := pdTests.NewTestMSCluster(ctx, 1) + tc, err := pdTests.NewTestPDServiceCluster(ctx, 1) re.NoError(err) defer tc.Destroy() err = tc.RunInitialServers() @@ -102,7 +102,7 @@ func TestSplitKeyspaceGroup(t *testing.T) { for i := range 129 { keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i)) } - tc, err := pdTests.NewTestMSCluster(ctx, 3, func(conf *config.Config, _ string) { + tc, err := pdTests.NewTestPDServiceCluster(ctx, 3, func(conf *config.Config, _ string) { conf.Keyspace.PreAlloc = keyspaces }) re.NoError(err) @@ -157,7 +157,7 @@ func TestExternalAllocNodeWhenStart(t *testing.T) { for i := range 10 { keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i)) } - tc, err := pdTests.NewTestMSCluster(ctx, 1, func(conf *config.Config, _ string) { + tc, err := pdTests.NewTestPDServiceCluster(ctx, 1, func(conf *config.Config, _ string) { conf.Keyspace.PreAlloc = keyspaces }) re.NoError(err) @@ -197,7 +197,7 @@ func TestSetNodeAndPriorityKeyspaceGroup(t *testing.T) { for i := range 10 { keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i)) } - tc, err := pdTests.NewTestMSCluster(ctx, 3, func(conf *config.Config, _ string) { + tc, err := pdTests.NewTestPDServiceCluster(ctx, 3, func(conf *config.Config, _ string) { conf.Keyspace.PreAlloc = keyspaces }) re.NoError(err) @@ -301,7 +301,7 @@ func TestMergeKeyspaceGroup(t *testing.T) { for i := range 129 { keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i)) } - tc, err := pdTests.NewTestMSCluster(ctx, 1, func(conf *config.Config, _ string) { + tc, err := pdTests.NewTestPDServiceCluster(ctx, 1, func(conf *config.Config, _ string) { conf.Keyspace.PreAlloc = keyspaces }) re.NoError(err) @@ -420,7 +420,7 @@ func TestKeyspaceGroupState(t *testing.T) { for i := range 10 { keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i)) } - tc, err := pdTests.NewTestMSCluster(ctx, 1, func(conf *config.Config, _ string) { + tc, err := pdTests.NewTestPDServiceCluster(ctx, 1, func(conf *config.Config, _ string) { conf.Keyspace.PreAlloc = keyspaces }) re.NoError(err) @@ -511,7 +511,7 @@ func TestShowKeyspaceGroupPrimary(t *testing.T) { for i := range 10 { keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i)) } - tc, err := pdTests.NewTestMSCluster(ctx, 1, func(conf *config.Config, _ string) { + tc, err := pdTests.NewTestPDServiceCluster(ctx, 1, func(conf *config.Config, _ string) { conf.Keyspace.PreAlloc = keyspaces }) re.NoError(err) diff --git a/tools/pd-ctl/tests/keyspace/keyspace_test.go b/tools/pd-ctl/tests/keyspace/keyspace_test.go index 24342b5240e..6a523ced7b8 100644 --- a/tools/pd-ctl/tests/keyspace/keyspace_test.go +++ b/tools/pd-ctl/tests/keyspace/keyspace_test.go @@ -49,7 +49,7 @@ func TestKeyspace(t *testing.T) { for i := 1; i < 10; i++ { keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i)) } - tc, err := pdTests.NewTestMSCluster(ctx, 3, func(conf *config.Config, _ string) { + tc, err := pdTests.NewTestPDServiceCluster(ctx, 3, func(conf *config.Config, _ string) { conf.Keyspace.PreAlloc = keyspaces }) re.NoError(err) @@ -155,7 +155,7 @@ func (suite *keyspaceTestSuite) SetupTest() { suite.ctx, suite.cancel = context.WithCancel(context.Background()) re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayStartServerLoop", `return(true)`)) re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/skipSplitRegion", "return(true)")) - tc, err := pdTests.NewTestMSCluster(suite.ctx, 1) + tc, err := pdTests.NewTestPDServiceCluster(suite.ctx, 1) re.NoError(err) re.NoError(tc.RunInitialServers()) tc.WaitLeader()