From 5f883b7117d92cc3c54690b134f0ddf6a3ce206c Mon Sep 17 00:00:00 2001 From: nexustar Date: Wed, 15 Nov 2023 15:08:52 +0800 Subject: [PATCH] add global component_sources --- components/dm/spec/logic.go | 22 ++++++++++++++++++++-- components/dm/spec/topology_dm.go | 23 +++++++---------------- pkg/cluster/spec/alertmanager.go | 5 +++++ pkg/cluster/spec/cdc.go | 19 ++++++++++--------- pkg/cluster/spec/dashboard.go | 17 +++++++++-------- pkg/cluster/spec/drainer.go | 19 ++++++++++--------- pkg/cluster/spec/grafana.go | 5 +++++ pkg/cluster/spec/instance.go | 9 ++++++--- pkg/cluster/spec/monitoring.go | 5 +++++ pkg/cluster/spec/pd.go | 19 ++++++++++--------- pkg/cluster/spec/pump.go | 19 ++++++++++--------- pkg/cluster/spec/spec.go | 14 ++++++++++++++ pkg/cluster/spec/tidb.go | 17 +++++++++-------- pkg/cluster/spec/tiflash.go | 19 ++++++++++--------- pkg/cluster/spec/tikv.go | 17 +++++++++-------- pkg/cluster/spec/tikv_cdc.go | 19 ++++++++++--------- pkg/cluster/spec/tiproxy.go | 6 +++++- pkg/cluster/spec/tispark.go | 10 ++++++++++ 18 files changed, 164 insertions(+), 100 deletions(-) diff --git a/components/dm/spec/logic.go b/components/dm/spec/logic.go index 3f8c48c20a..64a2b3e03f 100644 --- a/components/dm/spec/logic.go +++ b/components/dm/spec/logic.go @@ -68,6 +68,15 @@ func (c *DMMasterComponent) Role() string { return ComponentDMMaster } +// Source implements Component interface. +func (c *DMMasterComponent) Source() string { + source := c.Topology.ComponentSources.Master + if source != "" { + return source + } + return ComponentDMMaster +} + // CalculateVersion implements the Component interface func (c *DMMasterComponent) CalculateVersion(clusterVersion string) string { return clusterVersion @@ -93,7 +102,7 @@ func (c *DMMasterComponent) Instances() []Instance { ListenHost: c.Topology.BaseTopo().GlobalOptions.ListenHost, Port: s.Port, SSHP: s.SSHPort, - Source: s.GetSource(), + Source: s.Source, Ports: []int{ s.Port, @@ -283,6 +292,15 @@ func (c *DMWorkerComponent) Role() string { return ComponentDMWorker } +// Source implements Component interface. +func (c *DMWorkerComponent) Source() string { + source := c.Topology.ComponentSources.Worker + if source != "" { + return source + } + return ComponentDMWorker +} + // CalculateVersion implements the Component interface func (c *DMWorkerComponent) CalculateVersion(clusterVersion string) string { return clusterVersion @@ -308,7 +326,7 @@ func (c *DMWorkerComponent) Instances() []Instance { ListenHost: c.Topology.BaseTopo().GlobalOptions.ListenHost, Port: s.Port, SSHP: s.SSHPort, - Source: s.GetSource(), + Source: s.Source, Ports: []int{ s.Port, diff --git a/components/dm/spec/topology_dm.go b/components/dm/spec/topology_dm.go index 6e073fb7c3..d3ccebf139 100644 --- a/components/dm/spec/topology_dm.go +++ b/components/dm/spec/topology_dm.go @@ -95,10 +95,17 @@ type ( Grafana map[string]string `yaml:"grafana"` } + // ComponentSources represents the source of components + ComponentSources struct { + Master string `yaml:"master,omitempty"` + Worker string `yaml:"worker,omitempty"` + } + // Specification represents the specification of topology.yaml Specification struct { GlobalOptions GlobalOptions `yaml:"global,omitempty" validate:"global:editable"` MonitoredOptions *MonitoredOptions `yaml:"monitored,omitempty" validate:"monitored:editable"` + ComponentSources ComponentSources `yaml:"component_sources,omitempty" validate:"component_sources:editable"` ServerConfigs DMServerConfigs `yaml:"server_configs,omitempty" validate:"server_configs:ignore"` Masters []*MasterSpec `yaml:"master_servers"` Workers []*WorkerSpec `yaml:"worker_servers"` @@ -203,14 +210,6 @@ func (s *MasterSpec) GetAdvertisePeerURL(enableTLS bool) string { return fmt.Sprintf("%s://%s", scheme, utils.JoinHostPort(s.Host, s.PeerPort)) } -// GetSource returns source to download the component -func (s *MasterSpec) GetSource() string { - if s.Source == "" { - return ComponentDMMaster - } - return s.Source -} - // WorkerSpec represents the Master topology specification in topology.yaml type WorkerSpec struct { Host string `yaml:"host"` @@ -282,14 +281,6 @@ func (s *WorkerSpec) IgnoreMonitorAgent() bool { return s.IgnoreExporter } -// GetSource returns source to download the component -func (s *WorkerSpec) GetSource() string { - if s.Source == "" { - return ComponentDMWorker - } - return s.Source -} - // UnmarshalYAML sets default values when unmarshaling the topology file func (s *Specification) UnmarshalYAML(unmarshal func(any) error) error { type topology Specification diff --git a/pkg/cluster/spec/alertmanager.go b/pkg/cluster/spec/alertmanager.go index b71392b858..5884aa72fc 100644 --- a/pkg/cluster/spec/alertmanager.go +++ b/pkg/cluster/spec/alertmanager.go @@ -98,6 +98,11 @@ func (c *AlertManagerComponent) Role() string { return RoleMonitor } +// Source implements Component interface. +func (c *AlertManagerComponent) Source() string { + return ComponentAlertmanager +} + // CalculateVersion implements the Component interface func (c *AlertManagerComponent) CalculateVersion(_ string) string { // always not follow cluster version, use ""(latest) by default diff --git a/pkg/cluster/spec/cdc.go b/pkg/cluster/spec/cdc.go index 23a03b1c12..c4ae85913c 100644 --- a/pkg/cluster/spec/cdc.go +++ b/pkg/cluster/spec/cdc.go @@ -92,14 +92,6 @@ func (s *CDCSpec) IgnoreMonitorAgent() bool { return s.IgnoreExporter } -// GetSource returns source to download the component -func (s *CDCSpec) GetSource() string { - if s.Source == "" { - return ComponentCDC - } - return s.Source -} - // CDCComponent represents CDC component. type CDCComponent struct{ Topology *Specification } @@ -113,6 +105,15 @@ func (c *CDCComponent) Role() string { return ComponentCDC } +// Source implements Component interface. +func (c *CDCComponent) Source() string { + source := c.Topology.ComponentSources.CDC + if source != "" { + return source + } + return ComponentCDC +} + // CalculateVersion implements the Component interface func (c *CDCComponent) CalculateVersion(clusterVersion string) string { version := c.Topology.ComponentVersions.CDC @@ -140,7 +141,7 @@ func (c *CDCComponent) Instances() []Instance { ListenHost: c.Topology.BaseTopo().GlobalOptions.ListenHost, Port: s.Port, SSHP: s.SSHPort, - Source: s.GetSource(), + Source: s.Source, NumaNode: s.NumaNode, NumaCores: "", diff --git a/pkg/cluster/spec/dashboard.go b/pkg/cluster/spec/dashboard.go index 2d236ff5ff..435e9d9306 100644 --- a/pkg/cluster/spec/dashboard.go +++ b/pkg/cluster/spec/dashboard.go @@ -94,14 +94,6 @@ func (s *DashboardSpec) IgnoreMonitorAgent() bool { return s.IgnoreExporter } -// GetSource returns source to download the component -func (s *DashboardSpec) GetSource() string { - if s.Source == "" { - return ComponentDashboard - } - return s.Source -} - // DashboardComponent represents Drainer component. type DashboardComponent struct{ Topology *Specification } @@ -115,6 +107,15 @@ func (c *DashboardComponent) Role() string { return ComponentDashboard } +// Source implements Component interface. +func (c *DashboardComponent) Source() string { + source := c.Topology.ComponentSources.Dashboard + if source != "" { + return source + } + return ComponentDashboard +} + // CalculateVersion implements the Component interface func (c *DashboardComponent) CalculateVersion(clusterVersion string) string { version := c.Topology.ComponentVersions.Dashboard diff --git a/pkg/cluster/spec/drainer.go b/pkg/cluster/spec/drainer.go index 402b6bbb49..f66c111c0f 100644 --- a/pkg/cluster/spec/drainer.go +++ b/pkg/cluster/spec/drainer.go @@ -113,14 +113,6 @@ func (s *DrainerSpec) IgnoreMonitorAgent() bool { return s.IgnoreExporter } -// GetSource returns source to download the component -func (s *DrainerSpec) GetSource() string { - if s.Source == "" { - return ComponentDrainer - } - return s.Source -} - // DrainerComponent represents Drainer component. type DrainerComponent struct{ Topology *Specification } @@ -134,6 +126,15 @@ func (c *DrainerComponent) Role() string { return ComponentDrainer } +// Source implements Component interface. +func (c *DrainerComponent) Source() string { + source := c.Topology.ComponentSources.Drainer + if source != "" { + return source + } + return ComponentDrainer +} + // CalculateVersion implements the Component interface func (c *DrainerComponent) CalculateVersion(clusterVersion string) string { version := c.Topology.ComponentVersions.Drainer @@ -161,7 +162,7 @@ func (c *DrainerComponent) Instances() []Instance { ListenHost: c.Topology.BaseTopo().GlobalOptions.ListenHost, Port: s.Port, SSHP: s.SSHPort, - Source: s.GetSource(), + Source: s.Source, NumaNode: s.NumaNode, NumaCores: "", diff --git a/pkg/cluster/spec/grafana.go b/pkg/cluster/spec/grafana.go index e18a543448..52940915b6 100644 --- a/pkg/cluster/spec/grafana.go +++ b/pkg/cluster/spec/grafana.go @@ -107,6 +107,11 @@ func (c *GrafanaComponent) Role() string { return RoleMonitor } +// Source implements Component interface. +func (c *GrafanaComponent) Source() string { + return ComponentGrafana +} + // CalculateVersion implements the Component interface func (c *GrafanaComponent) CalculateVersion(clusterVersion string) string { // always not follow cluster version, use ""(latest) by default diff --git a/pkg/cluster/spec/instance.go b/pkg/cluster/spec/instance.go index 3281484a80..cbe43aca58 100644 --- a/pkg/cluster/spec/instance.go +++ b/pkg/cluster/spec/instance.go @@ -70,6 +70,7 @@ var ( type Component interface { Name() string Role() string + Source() string Instances() []Instance CalculateVersion(string) string SetVersion(string) @@ -316,10 +317,12 @@ func (i *BaseInstance) ComponentName() string { // ComponentSource implements Instance interface func (i *BaseInstance) ComponentSource() string { - if i.Source == "" { - return i.Name + if i.Source != "" { + return i.Source + } else if i.Component.Source() == "" { + return i.Component.Source() } - return i.Source + return i.Name } // InstanceName implements Instance interface diff --git a/pkg/cluster/spec/monitoring.go b/pkg/cluster/spec/monitoring.go index 07aafaf5c4..486370b549 100644 --- a/pkg/cluster/spec/monitoring.go +++ b/pkg/cluster/spec/monitoring.go @@ -124,6 +124,11 @@ func (c *MonitorComponent) Role() string { return RoleMonitor } +// Source implements Component interface. +func (c *MonitorComponent) Source() string { + return ComponentPrometheus +} + // CalculateVersion implements the Component interface func (c *MonitorComponent) CalculateVersion(clusterVersion string) string { // always not follow cluster version, use ""(latest) by default diff --git a/pkg/cluster/spec/pd.go b/pkg/cluster/spec/pd.go index f505db7b18..25fa6230e7 100644 --- a/pkg/cluster/spec/pd.go +++ b/pkg/cluster/spec/pd.go @@ -138,14 +138,6 @@ func (s *PDSpec) GetAdvertisePeerURL(enableTLS bool) string { return fmt.Sprintf("%s://%s", scheme, utils.JoinHostPort(s.Host, s.PeerPort)) } -// GetSource returns source to download the component -func (s *PDSpec) GetSource() string { - if s.Source == "" { - return ComponentPD - } - return s.Source -} - // PDComponent represents PD component. type PDComponent struct{ Topology *Specification } @@ -159,6 +151,15 @@ func (c *PDComponent) Role() string { return ComponentPD } +// Source implements Component interface. +func (c *PDComponent) Source() string { + source := c.Topology.ComponentSources.PD + if source != "" { + return source + } + return ComponentPD +} + // CalculateVersion implements the Component interface func (c *PDComponent) CalculateVersion(clusterVersion string) string { version := c.Topology.ComponentVersions.PD @@ -188,7 +189,7 @@ func (c *PDComponent) Instances() []Instance { ListenHost: utils.Ternary(s.ListenHost != "", s.ListenHost, c.Topology.BaseTopo().GlobalOptions.ListenHost).(string), Port: s.ClientPort, SSHP: s.SSHPort, - Source: s.GetSource(), + Source: s.Source, NumaNode: s.NumaNode, NumaCores: "", diff --git a/pkg/cluster/spec/pump.go b/pkg/cluster/spec/pump.go index 00dc89ddfd..796cf2ad8d 100644 --- a/pkg/cluster/spec/pump.go +++ b/pkg/cluster/spec/pump.go @@ -112,14 +112,6 @@ func (s *PumpSpec) IgnoreMonitorAgent() bool { return s.IgnoreExporter } -// GetSource returns source to download the component -func (s *PumpSpec) GetSource() string { - if s.Source == "" { - return ComponentPump - } - return s.Source -} - // PumpComponent represents Pump component. type PumpComponent struct{ Topology *Specification } @@ -133,6 +125,15 @@ func (c *PumpComponent) Role() string { return ComponentPump } +// Source implements Component interface. +func (c *PumpComponent) Source() string { + source := c.Topology.ComponentSources.Pump + if source != "" { + return source + } + return ComponentPump +} + // CalculateVersion implements the Component interface func (c *PumpComponent) CalculateVersion(clusterVersion string) string { version := c.Topology.ComponentVersions.Pump @@ -160,7 +161,7 @@ func (c *PumpComponent) Instances() []Instance { ListenHost: c.Topology.BaseTopo().GlobalOptions.ListenHost, Port: s.Port, SSHP: s.SSHPort, - Source: s.GetSource(), + Source: s.Source, NumaNode: s.NumaNode, NumaCores: "", diff --git a/pkg/cluster/spec/spec.go b/pkg/cluster/spec/spec.go index b6c44c1d6d..7e5d4f7827 100644 --- a/pkg/cluster/spec/spec.go +++ b/pkg/cluster/spec/spec.go @@ -141,11 +141,25 @@ type ( // BlackboxExporter string `yaml:"blackbox_exporter,omitempty"` } + // ComponentSources represents the source of components + ComponentSources struct { + TiDB string `yaml:"tidb,omitempty"` + TiKV string `yaml:"tikv,omitempty"` + TiFlash string `yaml:"tiflash,omitempty"` + PD string `yaml:"pd,omitempty"` + Dashboard string `yaml:"tidb_dashboard,omitempty"` + Pump string `yaml:"pump,omitempty"` + Drainer string `yaml:"drainer,omitempty"` + CDC string `yaml:"cdc,omitempty"` + TiKVCDC string `yaml:"kvcdc,omitempty"` + } + // Specification represents the specification of topology.yaml Specification struct { GlobalOptions GlobalOptions `yaml:"global,omitempty" validate:"global:editable"` MonitoredOptions MonitoredOptions `yaml:"monitored,omitempty" validate:"monitored:editable"` ComponentVersions ComponentVersions `yaml:"component_versions,omitempty" validate:"component_versions:editable"` + ComponentSources ComponentSources `yaml:"component_sources,omitempty" validate:"component_versions:editable"` ServerConfigs ServerConfigs `yaml:"server_configs,omitempty" validate:"server_configs:ignore"` TiDBServers []*TiDBSpec `yaml:"tidb_servers"` TiKVServers []*TiKVSpec `yaml:"tikv_servers"` diff --git a/pkg/cluster/spec/tidb.go b/pkg/cluster/spec/tidb.go index 3d793f9a76..b12d8928f9 100644 --- a/pkg/cluster/spec/tidb.go +++ b/pkg/cluster/spec/tidb.go @@ -89,14 +89,6 @@ func (s *TiDBSpec) IgnoreMonitorAgent() bool { return s.IgnoreExporter } -// GetSource returns source to download the component -func (s *TiDBSpec) GetSource() string { - if s.Source == "" { - return ComponentTiDB - } - return s.Source -} - // TiDBComponent represents TiDB component. type TiDBComponent struct{ Topology *Specification } @@ -110,6 +102,15 @@ func (c *TiDBComponent) Role() string { return ComponentTiDB } +// Source implements Component interface. +func (c *TiDBComponent) Source() string { + source := c.Topology.ComponentSources.TiDB + if source != "" { + return source + } + return ComponentTiDB +} + // CalculateVersion implements the Component interface func (c *TiDBComponent) CalculateVersion(clusterVersion string) string { version := c.Topology.ComponentVersions.TiDB diff --git a/pkg/cluster/spec/tiflash.go b/pkg/cluster/spec/tiflash.go index 45ed2e8dd0..e81280151a 100644 --- a/pkg/cluster/spec/tiflash.go +++ b/pkg/cluster/spec/tiflash.go @@ -162,14 +162,6 @@ func (s *TiFlashSpec) IgnoreMonitorAgent() bool { return s.IgnoreExporter } -// GetSource returns source to download the component -func (s *TiFlashSpec) GetSource() string { - if s.Source == "" { - return ComponentTiFlash - } - return s.Source -} - // key names for storage config const ( TiFlashStorageKeyMainDirs string = "storage.main.dir" @@ -282,6 +274,15 @@ func (c *TiFlashComponent) Role() string { return ComponentTiFlash } +// Source implements Component interface. +func (c *TiFlashComponent) Source() string { + source := c.Topology.ComponentSources.TiFlash + if source != "" { + return source + } + return ComponentTiFlash +} + // CalculateVersion implements the Component interface func (c *TiFlashComponent) CalculateVersion(clusterVersion string) string { version := c.Topology.ComponentVersions.TiFlash @@ -308,7 +309,7 @@ func (c *TiFlashComponent) Instances() []Instance { ListenHost: c.Topology.BaseTopo().GlobalOptions.ListenHost, Port: s.GetMainPort(), SSHP: s.SSHPort, - Source: s.GetSource(), + Source: s.Source, NumaNode: s.NumaNode, NumaCores: s.NumaCores, diff --git a/pkg/cluster/spec/tikv.go b/pkg/cluster/spec/tikv.go index 48f24dcf38..4c40fb1408 100644 --- a/pkg/cluster/spec/tikv.go +++ b/pkg/cluster/spec/tikv.go @@ -164,14 +164,6 @@ func (s *TiKVSpec) Labels() (map[string]string, error) { return lbs, nil } -// GetSource returns source to download the component -func (s *TiKVSpec) GetSource() string { - if s.Source == "" { - return ComponentTiKV - } - return s.Source -} - // TiKVComponent represents TiKV component. type TiKVComponent struct{ Topology *Specification } @@ -185,6 +177,15 @@ func (c *TiKVComponent) Role() string { return ComponentTiKV } +// Source implements Component interface. +func (c *TiKVComponent) Source() string { + source := c.Topology.ComponentSources.TiKV + if source != "" { + return source + } + return ComponentTiKV +} + // CalculateVersion implements the Component interface func (c *TiKVComponent) CalculateVersion(clusterVersion string) string { version := c.Topology.ComponentVersions.TiKV diff --git a/pkg/cluster/spec/tikv_cdc.go b/pkg/cluster/spec/tikv_cdc.go index ee9d6c5c5d..2392815d20 100644 --- a/pkg/cluster/spec/tikv_cdc.go +++ b/pkg/cluster/spec/tikv_cdc.go @@ -105,6 +105,15 @@ func (c *TiKVCDCComponent) Role() string { return ComponentTiKVCDC } +// Source implements Component interface. +func (c *TiKVCDCComponent) Source() string { + source := c.Topology.ComponentSources.TiKVCDC + if source != "" { + return source + } + return ComponentTiKVCDC +} + // CalculateVersion implements the Component interface func (c *TiKVCDCComponent) CalculateVersion(clusterVersion string) string { // always not follow global version, use ""(latest) by default @@ -117,14 +126,6 @@ func (c *TiKVCDCComponent) SetVersion(version string) { c.Topology.ComponentVersions.TiKVCDC = version } -// GetSource returns source to download the component -func (s *TiKVCDCSpec) GetSource() string { - if s.Source == "" { - return ComponentTiKVCDC - } - return s.Source -} - // Instances implements Component interface. func (c *TiKVCDCComponent) Instances() []Instance { ins := make([]Instance, 0, len(c.Topology.TiKVCDCServers)) @@ -138,7 +139,7 @@ func (c *TiKVCDCComponent) Instances() []Instance { ListenHost: c.Topology.BaseTopo().GlobalOptions.ListenHost, Port: s.Port, SSHP: s.SSHPort, - Source: s.GetSource(), + Source: s.Source, NumaNode: s.NumaNode, NumaCores: "", diff --git a/pkg/cluster/spec/tiproxy.go b/pkg/cluster/spec/tiproxy.go index 15d167ef94..2cfe1c421e 100644 --- a/pkg/cluster/spec/tiproxy.go +++ b/pkg/cluster/spec/tiproxy.go @@ -129,6 +129,11 @@ func (c *TiProxyComponent) Role() string { return ComponentTiProxy } +// Source implements Component interface. +func (c *TiProxyComponent) Source() string { + return ComponentTiProxy +} + // CalculateVersion implements the Component interface func (c *TiProxyComponent) CalculateVersion(clusterVersion string) string { // always not follow global version, use ""(latest) by default @@ -154,7 +159,6 @@ func (c *TiProxyComponent) Instances() []Instance { ListenHost: c.Topology.BaseTopo().GlobalOptions.ListenHost, Port: s.Port, SSHP: s.SSHPort, - Source: ComponentTiProxy, NumaNode: s.NumaNode, NumaCores: "", Ports: []int{ diff --git a/pkg/cluster/spec/tispark.go b/pkg/cluster/spec/tispark.go index 59aea16985..bf0105ace5 100644 --- a/pkg/cluster/spec/tispark.go +++ b/pkg/cluster/spec/tispark.go @@ -141,6 +141,11 @@ func (c *TiSparkMasterComponent) Role() string { return RoleTiSparkMaster } +// Source implements Component interface. +func (c *TiSparkMasterComponent) Source() string { + return ComponentTiSpark +} + // CalculateVersion implements the Component interface func (c *TiSparkMasterComponent) CalculateVersion(clusterVersion string) string { return "" @@ -334,6 +339,11 @@ func (c *TiSparkWorkerComponent) Role() string { return RoleTiSparkWorker } +// Source implements Component interface. +func (c *TiSparkWorkerComponent) Source() string { + return ComponentTiSpark +} + // CalculateVersion implements the Component interface func (c *TiSparkWorkerComponent) CalculateVersion(clusterVersion string) string { return ""