diff --git a/docs/configuration-discovery-classifying.md b/docs/configuration-discovery-classifying.md index c28e8c906..5220683ca 100644 --- a/docs/configuration-discovery-classifying.md +++ b/docs/configuration-discovery-classifying.md @@ -10,6 +10,7 @@ "DataCenterPattern": "", "DetectDataCenterQuery": "select substring_index(substring_index(@@hostname, '-',3), '-', -1) as dc", "PhysicalEnvironmentPattern": "", + "DetectSemiSyncEnforcedQuery": "" } ``` @@ -60,3 +61,75 @@ You will configure data center awareness in one of two methods: ### Cluster domain To a lesser importance, and mostly for visibility, `DetectClusterDomainQuery` should return the VIP or CNAME or otherwise the address of the cluster's master + +### Semi-sync topology + +In some environments, it is important to control the not only the number of semi-sync replicas, but also if a replica is a semi-sync or an async replica. +`orchestrator` can detect an undesired semi-sync configuration and toggle the semi-sync flags +`rpl_semi_sync_slave_enabled` and `rpl_semi_sync_master_enabled` to correct the situation. + +#### Semi-sync master (`rpl_semi_sync_master_enabled`) + +`orchestrator` enables the semi-sync master flag during a master failover (e.g. `DeadMaster`) if `DetectSemiSyncEnforcedQuery` returns a value > 0 +for the new master. `orchestrator` does not trigger any recoveries if the master flag is otherwise changed or incorrectly set. + +A semi-sync master can enter two failure scenarios: [`LockedSemiSyncMaster`](failure-detection.md#lockedsemisyncmaster) and +[`MasterWithTooManySemiSyncReplicas`](failure-detection.md#masterwithtoomanysemisyncreplicas). `orchestrator` disables the +semi-sync master flag on semi-sync replicas during a recovery of either of these two conditions. + +#### Semi-sync replicas (`rpl_semi_sync_slave_enabled`) + +`orchestrator` can detect if there is an incorrect number of semi-sync replicas in the topology ([`LockedSemiSyncMaster`](failure-detection.md#lockedsemisyncmaster) and +[`MasterWithTooManySemiSyncReplicas`](failure-detection.md#masterwithtoomanysemisyncreplicas)), and can then correct the situation by enabling/disabling +the semi-sync replica flags accordingly. + +This behavior can be controlled by the following options: + +- `DetectSemiSyncEnforcedQuery`: query that returns the semi-sync priority (zero means async replica; higher number means higher priority) +- `EnforceExactSemiSyncReplicas`: flag that decides whether to enforce a _strict_ semi-sync replica topology. If enabled, the recovery of `LockedSemiSyncMaster` + and `MasterWithTooManyReplicas` will enable _and disable_ semi-sync on the replicas to match the desired topology exactly based on the priority order. +- `RecoverLockedSemiSyncMaster`: flag that decides whether to recover from a `LockedSemiSyncMaster` scenario. If enabled, the recovery of `LockedSemiSyncMaster` + will enable _(but never disable)_ semi-sync on the replicas in the priority order to match the master wait count. This option has no effect if + `EnforceExactSemiSyncReplicas` is set. It is only useful if you'd like to only handle a situation which which there are too few semi-sync replicas, + but not if there are too many. +- `ReasonableLockedSemiSyncMasterSeconds`: number of seconds after which the `LockedSemiSyncMaster` condition is triggered; if not set, falls back to `ReasonableReplicationLagSeconds` + +The priority order is defined by `DetectSemiSyncEnforcedQuery` (zero means async replica; higher number is higher priority), the promotion rule (`DetectPromotionRuleQuery`) +and the hostname (fallback). + +**Example 1**: Enforcing a strict semi-sync replica topology with two replicas and `rpl_semi_sync_master_wait_for_slave_count=1`: + +``` + "DetectSemiSyncEnforcedQuery": "select priority from meta.semi_sync where cluster_member = @@hostname", + "EnforceExactSemiSyncReplicas": true +``` + +Assuming this topology: + +``` + ,- replica1 (priority = 10, rpl_semi_sync_slave_enabled = 1) + master + `- replica2 (priority = 20, rpl_semi_sync_slave_enabled = 1) +``` + +`orchestrator` would detect a [`MasterWithTooManySemiSyncReplicas`](failure-detection.md#masterwithtoomanysemisyncreplicas) scenario +and disable semi-sync on replica2. + +**Example 2**: Enforcing a weak semi-sync replica toplogy with two replicas and `rpl_semi_sync_master_wait_for_slave_count=1`: + +``` + "DetectSemiSyncEnforcedQuery": "select 2586", + "DetectPromotionRuleQuery": "select promotion_rule from meta.promotion_rules where cluster_member = @@hostname", + "RecoverLockedSemiSyncMaster": true +``` + +Assuming this topology: + +``` + ,- replica1 (priority = 2586, promotion rule = prefer, rpl_semi_sync_slave_enabled = 0) + master + `- replica2 (priority = 2586, promotion rule = neutral, rpl_semi_sync_slave_enabled = 0) +``` + +`orchestrator` would detect a [`LockedSemiSyncMaster`](failure-detection.md#lockedsemisyncmaster) scenario +and enable semi-sync on replica1. diff --git a/docs/failure-detection.md b/docs/failure-detection.md index 89cb780dc..19d36f8ca 100644 --- a/docs/failure-detection.md +++ b/docs/failure-detection.md @@ -38,6 +38,7 @@ Observe the following list of potential failures: * UnreachableMasterWithLaggingReplicas * UnreachableMaster * LockedSemiSyncMaster +* MasterWithTooManySemiSyncReplicas * AllMasterReplicasNotReplicating * AllMasterReplicasNotReplicatingOrDead * DeadCoMaster @@ -96,15 +97,43 @@ This scenario can happen when the master is overloaded. Clients would see a "Too `orchestrator` responds to this scenario by restarting replication on all of master's immediate replicas. This will close the old client connections on those replicas and attempt to initiate new ones. These may now fail to connect, leading to a complete replication failure on all replicas. This will next lead `orchestrator` to analyze a `DeadMaster`. -### LockedSemiSyncMaster +#### `LockedSemiSyncMaster` -1. Master is running with semi-sync enabled +1. Master is running with semi-sync enabled (`rpl_semi_sync_master_enabled=1`) 2. Number of connected semi-sync replicas falls short of expected `rpl_semi_sync_master_wait_for_slave_count` 3. `rpl_semi_sync_master_timeout` is high enough such that master locks writes and does not fall back to asynchronous replication -Remediation can be to disable semi-sync on the master, or to bring up (or enable) sufficient semi-sync replicas. +This condition only triggers after `ReasonableLockedSemiSyncMasterSeconds` has passed. If `ReasonableLockedSemiSyncMasterSeconds` is not set, +it trigger after `ReasonableReplicationLagSeconds`. -At this time `orchestrator` does not invoke processes for this type of analysis. +Remediation of this condition can be to disable semi-sync on the master, or to bring up (or enable) sufficient semi-sync replicas. + +If `EnforceExactSemiSyncReplicas` is enabled, `orchestrator` will determine the desired semi-sync topology and enable/disable semi-sync on the replicas to match it. +The desired topology is defined by the priority order (see below) and the master wait count. + +If `RecoverLockedSemiSyncMaster` is enabled, `orchestrator` will enable (but never disable) semi-sync on the replicas in priority order until +the number of semi-sync replicas matches the master wait count. Please note that `RecoverLockedSemiSyncMaster` has no effect if `EnforceExactSemiSyncReplicas` is set. + +The priority order is defined by `DetectSemiSyncEnforcedQuery` (higher number is higher priority), the promotion rule (`DetectPromotionRuleQuery`) and the hostname (fallback). + +If `EnforceExactSemiSyncReplicas` and `RecoverLockedSemiSyncMaster` are both disabled (default), `orchestrator` does not invoke any recovery processes for this type of analysis. + +Please also consult the [semi-sync topology](configuration-discovery-classifying.md#semi-sync-topology) documentation for more details. + +#### `MasterWithTooManySemiSyncReplicas` + +1. Master is running with semi-sync enabled (`rpl_semi_sync_master_enabled=1`) +2. Number of connected semi-sync replicas is higher than the expected `rpl_semi_sync_master_wait_for_slave_count` +3. `EnforceExactSemiSyncReplicas` is enabled (this analysis is not triggered if this flag is not enabled) + +If `EnforceExactSemiSyncReplicas` is enabled, `orchestrator` will determine the desired semi-sync topology and enable/disable semi-sync on the replicas to match it. +The desired topology is defined by the priority order and the master wait count. + +The priority order is defined by `DetectSemiSyncEnforcedQuery` (higher number is higher priority), the promotion rule (`DetectPromotionRuleQuery`) and the hostname (fallback). + +If `EnforceExactSemiSyncReplicas` is disabled (default), `orchestrator` does not invoke any recovery processes for this type of analysis. + +Please also consult the [semi-sync topology](configuration-discovery-classifying.md#semi-sync-topology) documentation for more details. ### Failures of no interest diff --git a/etc/systemd/orchestrator.service b/etc/systemd/orchestrator.service index c80b9d920..71a1d6e6a 100644 --- a/etc/systemd/orchestrator.service +++ b/etc/systemd/orchestrator.service @@ -9,6 +9,7 @@ WorkingDirectory=/usr/local/orchestrator ExecStart=/usr/local/orchestrator/orchestrator http EnvironmentFile=-/etc/sysconfig/orchestrator ExecReload=/bin/kill -HUP $MAINPID +LimitNOFILE=16384 [Install] WantedBy=multi-user.target diff --git a/go/app/cli.go b/go/app/cli.go index bae687479..71f3ecbbe 100644 --- a/go/app/cli.go +++ b/go/app/cli.go @@ -450,7 +450,7 @@ func Cli(command string, strict bool, instance string, destination string, owner } validateInstanceIsFound(instanceKey) - lostReplicas, movedReplicas, cannotReplicateReplicas, promotedReplica, err := inst.RegroupReplicasGTID(instanceKey, false, func(candidateReplica *inst.Instance) { fmt.Println(candidateReplica.Key.DisplayString()) }, postponedFunctionsContainer, nil) + lostReplicas, movedReplicas, cannotReplicateReplicas, promotedReplica, err := inst.RegroupReplicasGTID(instanceKey, false, true, func(candidateReplica *inst.Instance) { fmt.Println(candidateReplica.Key.DisplayString()) }, postponedFunctionsContainer, nil) lostReplicas = append(lostReplicas, cannotReplicateReplicas...) if promotedReplica == nil { diff --git a/go/config/config.go b/go/config/config.go index b07790ebf..5d65da4b9 100644 --- a/go/config/config.go +++ b/go/config/config.go @@ -136,10 +136,11 @@ type Configuration struct { DefaultInstancePort int // In case port was not specified on command line SlaveLagQuery string // Synonym to ReplicationLagQuery ReplicationLagQuery string // custom query to check on replica lg (e.g. heartbeat table). Must return a single row with a single numeric column, which is the lag. - ReplicationCredentialsQuery string // custom query to get replication credentials. Must return a single row, with two text columns: 1st is username, 2nd is password. This is optional, and can be used by orchestrator to configure replication after master takeover or setup of co-masters. You need to ensure the orchestrator user has the privileges to run this query + ReplicationCredentialsQuery string // custom query to get replication credentials. Must return a single row, with five text columns: 1st is username, 2nd is password, 3rd is SSLCaCert, 4th is SSLCert, 5th is SSLKey. This is optional, and can be used by orchestrator to configure replication after master takeover or setup of co-masters. You need to ensure the orchestrator user has the privileges to run this query DiscoverByShowSlaveHosts bool // Attempt SHOW SLAVE HOSTS before PROCESSLIST UseSuperReadOnly bool // Should orchestrator super_read_only any time it sets read_only InstancePollSeconds uint // Number of seconds between instance reads + ReasonableInstanceCheckSeconds uint // Number of seconds an instance read is allowed to take before it is considered invalid, i.e. before LastCheckValid will be false InstanceWriteBufferSize int // Instance write buffer size (max number of instances to flush in one INSERT ODKU) BufferInstanceWrites bool // Set to 'true' for write-optimization on backend table (compromise: writes can be stale and overwrite non stale data) InstanceFlushIntervalMilliseconds int // Max interval between instance write buffer flushes @@ -262,7 +263,7 @@ type Configuration struct { GraphitePollSeconds int // Graphite writes interval. 0 disables. URLPrefix string // URL prefix to run orchestrator on non-root web path, e.g. /orchestrator to put it behind nginx. DiscoveryIgnoreReplicaHostnameFilters []string // Regexp filters to apply to prevent auto-discovering new replicas. Usage: unreachable servers due to firewalls, applications which trigger binlog dumps - DiscoveryIgnoreMasterHostnameFilters []string // Regexp filters to apply to prevent auto-discovering a master. Usage: pointing your master temporarily to replicate seom data from external host + DiscoveryIgnoreMasterHostnameFilters []string // Regexp filters to apply to prevent auto-discovering a master. Usage: pointing your master temporarily to replicate some data from external host DiscoveryIgnoreHostnameFilters []string // Regexp filters to apply to prevent discovering instances of any kind ConsulAddress string // Address where Consul HTTP api is found. Example: 127.0.0.1:8500 ConsulScheme string // Scheme (http or https) for Consul @@ -274,6 +275,9 @@ type Configuration struct { KVClusterMasterPrefix string // Prefix to use for clusters' masters entries in KV stores (internal, consul, ZK), default: "mysql/master" WebMessage string // If provided, will be shown on all web pages below the title bar MaxConcurrentReplicaOperations int // Maximum number of concurrent operations on replicas + EnforceExactSemiSyncReplicas bool // If true, semi-sync replicas will be enabled/disabled to match the wait count in the desired priority order; this applies to LockedSemiSyncMaster and MasterWithTooManySemiSyncReplicas + RecoverLockedSemiSyncMaster bool // If true, orchestrator will recover from a LockedSemiSync state by enabling semi-sync on replicas to match the wait count; this behavior can be overridden by EnforceExactSemiSyncReplicas + ReasonableLockedSemiSyncMasterSeconds uint // Time to evaluate the LockedSemiSyncHypothesis before triggering the LockedSemiSync analysis; falls back to ReasonableReplicationLagSeconds if not set } // ToJSONString will marshal this configuration as JSON @@ -320,6 +324,7 @@ func newConfiguration() *Configuration { DefaultInstancePort: 3306, TLSCacheTTLFactor: 100, InstancePollSeconds: 5, + ReasonableInstanceCheckSeconds: 1, InstanceWriteBufferSize: 100, BufferInstanceWrites: false, InstanceFlushIntervalMilliseconds: 100, @@ -444,6 +449,9 @@ func newConfiguration() *Configuration { KVClusterMasterPrefix: "mysql/master", WebMessage: "", MaxConcurrentReplicaOperations: 5, + EnforceExactSemiSyncReplicas: false, + RecoverLockedSemiSyncMaster: false, + ReasonableLockedSemiSyncMasterSeconds: 0, } } @@ -606,6 +614,9 @@ func (this *Configuration) postReadAdjustments() error { } else if this.ConsulMaxKVsPerTransaction > ConsulMaxTransactionOps { this.ConsulMaxKVsPerTransaction = ConsulMaxTransactionOps } + if this.ReasonableLockedSemiSyncMasterSeconds == 0 { + this.ReasonableLockedSemiSyncMasterSeconds = uint(this.ReasonableReplicationLagSeconds) + } return nil } diff --git a/go/http/api.go b/go/http/api.go index 7e1625436..d4c43d0e0 100644 --- a/go/http/api.go +++ b/go/http/api.go @@ -1157,7 +1157,7 @@ func (this *HttpAPI) RegroupReplicasGTID(params martini.Params, r render.Render, return } - lostReplicas, movedReplicas, cannotReplicateReplicas, promotedReplica, err := inst.RegroupReplicasGTID(&instanceKey, false, nil, nil, nil) + lostReplicas, movedReplicas, cannotReplicateReplicas, promotedReplica, err := inst.RegroupReplicasGTID(&instanceKey, false, true, nil, nil, nil) lostReplicas = append(lostReplicas, cannotReplicateReplicas...) if err != nil { diff --git a/go/inst/analysis.go b/go/inst/analysis.go index 8c392d403..d34d8f1d2 100644 --- a/go/inst/analysis.go +++ b/go/inst/analysis.go @@ -40,6 +40,7 @@ const ( AllMasterReplicasNotReplicatingOrDead = "AllMasterReplicasNotReplicatingOrDead" LockedSemiSyncMasterHypothesis = "LockedSemiSyncMasterHypothesis" LockedSemiSyncMaster = "LockedSemiSyncMaster" + MasterWithTooManySemiSyncReplicas = "MasterWithTooManySemiSyncReplicas" MasterWithoutReplicas = "MasterWithoutReplicas" DeadCoMaster = "DeadCoMaster" DeadCoMasterAndSomeReplicas = "DeadCoMasterAndSomeReplicas" @@ -228,5 +229,5 @@ func (this *ReplicationAnalysis) GetAnalysisInstanceType() AnalysisInstanceType // ValidSecondsFromSeenToLastAttemptedCheck returns the maximum allowed elapsed time // between last_attempted_check to last_checked before we consider the instance as invalid. func ValidSecondsFromSeenToLastAttemptedCheck() uint { - return config.Config.InstancePollSeconds + 1 + return config.Config.InstancePollSeconds + config.Config.ReasonableInstanceCheckSeconds } diff --git a/go/inst/analysis_dao.go b/go/inst/analysis_dao.go index 4b00281e4..d352abbcd 100644 --- a/go/inst/analysis_dao.go +++ b/go/inst/analysis_dao.go @@ -55,7 +55,7 @@ func initializeAnalysisDaoPostConfiguration() { func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints) ([]ReplicationAnalysis, error) { result := []ReplicationAnalysis{} - args := sqlutils.Args(config.Config.ReasonableReplicationLagSeconds, ValidSecondsFromSeenToLastAttemptedCheck(), config.Config.ReasonableReplicationLagSeconds, clusterName) + args := sqlutils.Args(config.Config.ReasonableLockedSemiSyncMasterSeconds, ValidSecondsFromSeenToLastAttemptedCheck(), config.Config.ReasonableReplicationLagSeconds, clusterName) analysisQueryReductionClause := `` if config.Config.ReduceReplicationAnalysisCount { @@ -531,6 +531,10 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints) a.Description = "Semi sync master seems to be locked, more samplings needed to validate" } // + } else if config.Config.EnforceExactSemiSyncReplicas && a.IsMaster && a.SemiSyncMasterEnabled && a.SemiSyncMasterStatus && a.SemiSyncMasterWaitForReplicaCount > 0 && a.SemiSyncMasterClients > a.SemiSyncMasterWaitForReplicaCount { + a.Analysis = MasterWithTooManySemiSyncReplicas + a.Description = "Semi sync master has more semi sync replicas than configured" + // } else if a.IsMaster && a.LastCheckValid && a.IsReadOnly && a.CountValidReplicatingReplicas > 0 && config.Config.RecoverNonWriteableMaster { a.Analysis = NoWriteableMasterStructureWarning a.Description = "Master with replicas is read_only" diff --git a/go/inst/instance.go b/go/inst/instance.go index 5b0d4e115..0f563b773 100644 --- a/go/inst/instance.go +++ b/go/inst/instance.go @@ -94,7 +94,7 @@ type Instance struct { HasReplicationCredentials bool ReplicationCredentialsAvailable bool SemiSyncAvailable bool // when both semi sync plugins (master & replica) are loaded - SemiSyncEnforced bool + SemiSyncPriority uint // higher value means higher priority, zero means async replica SemiSyncMasterEnabled bool SemiSyncReplicaEnabled bool SemiSyncMasterTimeout uint64 diff --git a/go/inst/instance_dao.go b/go/inst/instance_dao.go index 04e4168d7..63324c749 100644 --- a/go/inst/instance_dao.go +++ b/go/inst/instance_dao.go @@ -204,6 +204,20 @@ func ReadTopologyInstance(instanceKey *InstanceKey) (*Instance, error) { return ReadTopologyInstanceBufferable(instanceKey, false, nil) } +// ReadTopologyInstances is a convenience method that calls ReadTopologyInstance +// for all the instance keys and returns a slice of Instance. +func ReadTopologyInstances(instanceKeys []InstanceKey) ([]*Instance, error) { + instances := make([]*Instance, 0) + for _, instanceKey := range instanceKeys { + instance, err := ReadTopologyInstance(&instanceKey) + if err != nil { + return nil, err + } + instances = append(instances, instance) + } + return instances, nil +} + func RetryInstanceFunction(f func() (*Instance, error)) (instance *Instance, err error) { for i := 0; i < retryInstanceFunctionCount; i++ { if instance, err = f(); err == nil { @@ -776,7 +790,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, waitGroup.Add(1) go func() { defer waitGroup.Done() - err := db.QueryRow(config.Config.DetectSemiSyncEnforcedQuery).Scan(&instance.SemiSyncEnforced) + err := db.QueryRow(config.Config.DetectSemiSyncEnforcedQuery).Scan(&instance.SemiSyncPriority) logReadTopologyInstanceError(instanceKey, "DetectSemiSyncEnforcedQuery", err) }() } @@ -1209,7 +1223,7 @@ func readInstanceRow(m sqlutils.RowMap) *Instance { instance.DataCenter = m.GetString("data_center") instance.Region = m.GetString("region") instance.PhysicalEnvironment = m.GetString("physical_environment") - instance.SemiSyncEnforced = m.GetBool("semi_sync_enforced") + instance.SemiSyncPriority = m.GetUint("semi_sync_enforced") instance.SemiSyncAvailable = m.GetBool("semi_sync_available") instance.SemiSyncMasterEnabled = m.GetBool("semi_sync_master_enabled") instance.SemiSyncMasterTimeout = m.GetUint64("semi_sync_master_timeout") @@ -2610,7 +2624,7 @@ func mkInsertOdkuForInstances(instances []*Instance, instanceWasActuallyFound bo args = append(args, instance.ReplicationCredentialsAvailable) args = append(args, instance.HasReplicationCredentials) args = append(args, instance.AllowTLS) - args = append(args, instance.SemiSyncEnforced) + args = append(args, instance.SemiSyncPriority) args = append(args, instance.SemiSyncAvailable) args = append(args, instance.SemiSyncMasterEnabled) args = append(args, instance.SemiSyncMasterTimeout) diff --git a/go/inst/instance_dao_test.go b/go/inst/instance_dao_test.go index 79ea6f502..a0b1a6ba7 100644 --- a/go/inst/instance_dao_test.go +++ b/go/inst/instance_dao_test.go @@ -70,7 +70,7 @@ func TestMkInsertOdkuSingle(t *testing.T) { ` a1 := `i710, 3306, 0, 710, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, - false, false, 0, 0, false, false, false, , , , , , , false, false, , 0, mysql.000007, 10, , 0, , , {0 false}, {0 false}, 0, 0, [], , , , , , 0, false, false, false, false, false, false, false, 0, 0, false, false, 0, false, , 0, , false, , , [], , 0, ` + false, false, 0, 0, false, false, false, , , , , , , false, false, , 0, mysql.000007, 10, , 0, , , {0 false}, {0 false}, 0, 0, [], , , , , , 0, false, false, false, false, 0, false, false, 0, 0, false, false, 0, false, , 0, , false, , , [], , 0, ` sql1, args1, err := mkInsertOdkuForInstances(instances[:1], false, true) test.S(t).ExpectNil(err) @@ -96,9 +96,9 @@ func TestMkInsertOdkuThree(t *testing.T) { instance_alias=VALUES(instance_alias), last_discovery_latency=VALUES(last_discovery_latency), replication_group_name=VALUES(replication_group_name), replication_group_is_single_primary_mode=VALUES(replication_group_is_single_primary_mode), replication_group_member_state=VALUES(replication_group_member_state), replication_group_member_role=VALUES(replication_group_member_role), replication_group_members=VALUES(replication_group_members), replication_group_primary_host=VALUES(replication_group_primary_host), replication_group_primary_port=VALUES(replication_group_primary_port), last_seen=VALUES(last_seen) ` a3 := ` - i710, 3306, 0, 710, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, false, false, 0, 0, false, false, false, , , , , , , false, false, , 0, mysql.000007, 10, , 0, , , {0 false}, {0 false}, 0, 0, [], , , , , , 0, false, false, false, false, false, false, false, 0, 0, false, false, 0, false, , 0, , false, , , [], , 0, - i720, 3306, 0, 720, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, false, false, 0, 0, false, false, false, , , , , , , false, false, , 0, mysql.000007, 20, , 0, , , {0 false}, {0 false}, 0, 0, [], , , , , , 0, false, false, false, false, false, false, false, 0, 0, false, false, 0, false, , 0, , false, , , [], , 0, - i730, 3306, 0, 730, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, false, false, 0, 0, false, false, false, , , , , , , false, false, , 0, mysql.000007, 30, , 0, , , {0 false}, {0 false}, 0, 0, [], , , , , , 0, false, false, false, false, false, false, false, 0, 0, false, false, 0, false, , 0, , false, , , [], , 0, + i710, 3306, 0, 710, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, false, false, 0, 0, false, false, false, , , , , , , false, false, , 0, mysql.000007, 10, , 0, , , {0 false}, {0 false}, 0, 0, [], , , , , , 0, false, false, false, false, 0, false, false, 0, 0, false, false, 0, false, , 0, , false, , , [], , 0, + i720, 3306, 0, 720, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, false, false, 0, 0, false, false, false, , , , , , , false, false, , 0, mysql.000007, 20, , 0, , , {0 false}, {0 false}, 0, 0, [], , , , , , 0, false, false, false, false, 0, false, false, 0, 0, false, false, 0, false, , 0, , false, , , [], , 0, + i730, 3306, 0, 730, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, false, false, 0, 0, false, false, false, , , , , , , false, false, , 0, mysql.000007, 30, , 0, , , {0 false}, {0 false}, 0, 0, [], , , , , , 0, false, false, false, false, 0, false, false, 0, 0, false, false, 0, false, , 0, , false, , , [], , 0, ` sql3, args3, err := mkInsertOdkuForInstances(instances[:3], true, true) diff --git a/go/inst/instance_topology.go b/go/inst/instance_topology.go index efc65e72e..26b93113b 100644 --- a/go/inst/instance_topology.go +++ b/go/inst/instance_topology.go @@ -2509,6 +2509,7 @@ func RegroupReplicasPseudoGTIDIncludingSubReplicasOfBinlogServers( func RegroupReplicasGTID( masterKey *InstanceKey, returnReplicaEvenOnFailureToRegroup bool, + startReplicationOnCandidate bool, onCandidateReplicaChosen func(*Instance), postponedFunctionsContainer *PostponedFunctionsContainer, postponeAllMatchOperations func(*Instance, bool) bool, @@ -2554,7 +2555,9 @@ func RegroupReplicasGTID( err = moveGTIDFunc() } - StartReplication(&candidateReplica.Key) + if startReplicationOnCandidate { + StartReplication(&candidateReplica.Key) + } log.Debugf("RegroupReplicasGTID: done") AuditOperation("regroup-replicas-gtid", masterKey, fmt.Sprintf("regrouped replicas of %+v via GTID; promoted %+v", *masterKey, candidateReplica.Key)) @@ -2629,7 +2632,7 @@ func RegroupReplicas(masterKey *InstanceKey, returnReplicaEvenOnFailureToRegroup } if allGTID { log.Debugf("RegroupReplicas: using GTID to regroup replicas of %+v", *masterKey) - unmovedReplicas, movedReplicas, cannotReplicateReplicas, candidateReplica, err := RegroupReplicasGTID(masterKey, returnReplicaEvenOnFailureToRegroup, onCandidateReplicaChosen, nil, nil) + unmovedReplicas, movedReplicas, cannotReplicateReplicas, candidateReplica, err := RegroupReplicasGTID(masterKey, returnReplicaEvenOnFailureToRegroup, true, onCandidateReplicaChosen, nil, nil) return unmovedReplicas, emptyReplicas, movedReplicas, cannotReplicateReplicas, candidateReplica, err } if allBinlogServers { diff --git a/go/inst/instance_topology_dao.go b/go/inst/instance_topology_dao.go index 1631fe8eb..f63ab2db1 100644 --- a/go/inst/instance_topology_dao.go +++ b/go/inst/instance_topology_dao.go @@ -20,6 +20,7 @@ import ( "context" "database/sql" "fmt" + "sort" "strings" "time" @@ -372,7 +373,7 @@ func StopReplicas(replicas [](*Instance), stopReplicationMethod StopReplicationM defer func() { barrier <- *updatedReplica }() // Wait your turn to read a replica ExecuteOnTopology(func() { - if stopReplicationMethod == StopReplicationNice { + if stopReplicationMethod == StopReplicationNice && !replica.IsMariaDB() { StopReplicationNicely(&replica.Key, timeout) } replica, _ = StopReplication(&replica.Key) @@ -386,11 +387,6 @@ func StopReplicas(replicas [](*Instance), stopReplicationMethod StopReplicationM return refreshedReplicas } -// StopReplicasNicely will attemt to stop all given replicas nicely, up to timeout -func StopReplicasNicely(replicas [](*Instance), timeout time.Duration) [](*Instance) { - return StopReplicas(replicas, StopReplicationNice, timeout) -} - // StopReplication stops replication on a given instance func StopReplication(instanceKey *InstanceKey) (*Instance, error) { instance, err := ReadTopologyInstance(instanceKey) @@ -451,18 +447,19 @@ func StartReplication(instanceKey *InstanceKey) (*Instance, error) { return instance, fmt.Errorf("instance is not a replica: %+v", instanceKey) } + instance, err = MaybeDisableSemiSyncMaster(instance) + if err != nil { + return instance, log.Errore(err) + } + // If async fallback is disallowed, we'd better make sure to enable replicas to // send ACKs before START SLAVE. Replica ACKing is off at mysqld startup because // some replicas (those that must never be promoted) should never ACK. // Note: We assume that replicas use 'skip-slave-start' so they won't // START SLAVE on their own upon restart. - if instance.SemiSyncEnforced { - // Send ACK only from promotable instances. - sendACK := instance.PromotionRule != MustNotPromoteRule - // Always disable master setting, in case we're converting a former master. - if err := EnableSemiSync(instanceKey, false, sendACK); err != nil { - return instance, log.Errore(err) - } + instance, err = MaybeEnableSemiSyncReplica(instance) + if err != nil { + return instance, log.Errore(err) } _, err = ExecInstance(instanceKey, `start slave`) @@ -534,7 +531,7 @@ func WaitForExecBinlogCoordinatesToReach(instanceKey *InstanceKey, coordinates * } } -// StartReplicationUntilMasterCoordinates issuesa START SLAVE UNTIL... statement on given instance +// StartReplicationUntilMasterCoordinates issues a START SLAVE UNTIL... statement on given instance func StartReplicationUntilMasterCoordinates(instanceKey *InstanceKey, masterCoordinates *BinlogCoordinates) (*Instance, error) { instance, err := ReadTopologyInstance(instanceKey) if err != nil { @@ -550,13 +547,13 @@ func StartReplicationUntilMasterCoordinates(instanceKey *InstanceKey, masterCoor log.Infof("Will start replication on %+v until coordinates: %+v", instanceKey, masterCoordinates) - if instance.SemiSyncEnforced { - // Send ACK only from promotable instances. - sendACK := instance.PromotionRule != MustNotPromoteRule - // Always disable master setting, in case we're converting a former master. - if err := EnableSemiSync(instanceKey, false, sendACK); err != nil { - return instance, log.Errore(err) - } + instance, err = MaybeDisableSemiSyncMaster(instance) + if err != nil { + return instance, log.Errore(err) + } + instance, err = MaybeEnableSemiSyncReplica(instance) + if err != nil { + return instance, log.Errore(err) } // MariaDB has a bug: a CHANGE MASTER TO statement does not work properly with prepared statement... :P @@ -584,14 +581,194 @@ func StartReplicationUntilMasterCoordinates(instanceKey *InstanceKey, masterCoor return instance, err } -// EnableSemiSync sets the rpl_semi_sync_(master|replica)_enabled variables -// on a given instance. -func EnableSemiSync(instanceKey *InstanceKey, master, replica bool) error { - log.Infof("instance %+v rpl_semi_sync_master_enabled: %t, rpl_semi_sync_slave_enabled: %t", instanceKey, master, replica) - _, err := ExecInstance(instanceKey, - `set global rpl_semi_sync_master_enabled = ?, global rpl_semi_sync_slave_enabled = ?`, - master, replica) - return err +// MaybeDisableSemiSyncMaster always disables the semi-sync master (rpl_semi_sync_master_enabled) if the semi-sync priority is > 0. This is +// a little odd but in line with the legacy behavior and we really should disable the semi-sync master flag for replicas when starting replication. +func MaybeDisableSemiSyncMaster(replicaInstance *Instance) (*Instance, error) { + if replicaInstance.SemiSyncPriority > 0 && replicaInstance.SemiSyncMasterEnabled { + log.Infof("semi-sync: %s: setting rpl_semi_sync_master_enabled: %t", &replicaInstance.Key, false) + replicaInstance, err := SetSemiSyncMaster(&replicaInstance.Key, false) + if err != nil { + log.Warningf("semi-sync: %s: cannot disable rpl_semi_sync_master_enabled; that's not that bad though", &replicaInstance.Key) + } + return replicaInstance, err + } + return replicaInstance, nil +} + +// MaybeEnableSemiSyncReplica sets the semi-sync replica variable (rpl_semi_sync_replica_enabled) on a given instance based on the config and +// state of the world. If EnforceExactSemiSyncReplicas or RecoverLockedSemiSyncMaster are enabled, the semi-sync replica variable is enabled +// only if the given instance is supposed to be enabled according to the semi-sync priority order and the number of desired semi-sync replicas. +// If the flags are both turned off, the legacy behavior kicks in: If SemiSyncPriority > 0 and the instance is promotable (not "must_not"), +// semi-sync is enabled. +func MaybeEnableSemiSyncReplica(replicaInstance *Instance) (*Instance, error) { + // Backwards compatible logic: Enable semi-sync if SemiSyncPriority > 0 (formerly SemiSyncEnforced) + // Note that this logic NEVER enables semi-sync if the promotion rule is "must_not". + if !config.Config.EnforceExactSemiSyncReplicas && !config.Config.RecoverLockedSemiSyncMaster { + return maybeEnableSemiSyncReplicaLegacy(replicaInstance) + } + + // New logic: If EnforceExactSemiSyncReplicas or RecoverLockedSemiSyncMaster are set, we enable semi-sync only if the + // given replica instance is in the list of replicas to have semi-sync enabled (according to the priority). + _, _, actions, err := AnalyzeSemiSyncReplicaTopology(&replicaInstance.MasterKey, &replicaInstance.Key, config.Config.EnforceExactSemiSyncReplicas) + if err != nil { + return replicaInstance, log.Errorf("semi-sync: %s", err.Error()) + } + for replica, enable := range actions { + if replica.Key.Equals(&replicaInstance.Key) { + log.Infof("semi-sync: %s: setting rpl_semi_sync_slave_enabled=%t, restarting slave_io thread", replica.Key.String(), enable) + if _, err := SetSemiSyncReplica(&replica.Key, enable); err != nil { + return nil, fmt.Errorf("cannot enable semi sync on replica %+v", replica.Key) + } + return replicaInstance, nil + } + } + + // We are not taking any action for anything but replicaInstance, so if we detect that another replica has to be enabled, + // we won't act here and leave it to a future MasterWithTooManySemiSyncReplicas or LockedSemiSyncMaster event to correct. + + log.Infof("semi-sync: %+v: no action taken; this may lead to future recoveries", &replicaInstance.Key) + return replicaInstance, nil +} + +// maybeEnableSemiSyncReplicaLegacy enable semi-sync if SemiSyncPriority > 0 (formerly SemiSyncEnforced). This is a backwards +// compatible logic that NEVER enables semi-sync if the promotion rule is "must_not". +func maybeEnableSemiSyncReplicaLegacy(replicaInstance *Instance) (*Instance, error) { + if replicaInstance.SemiSyncPriority > 0 { + enable := replicaInstance.PromotionRule != MustNotPromoteRule // Send ACK only from promotable instances + log.Infof("semi-sync: %+v: setting rpl_semi_sync_slave_enabled = %t (legacy behavior)", &replicaInstance.Key, enable) + return SetSemiSyncReplica(&replicaInstance.Key, enable) + } + return replicaInstance, nil +} + +// AnalyzeSemiSyncReplicaTopology analyzes the replica topology for the given master and determines actions for the semi-sync replica enabled +// variable. It does not take any action itself. +func AnalyzeSemiSyncReplicaTopology(masterKey *InstanceKey, includeNonReplicatingInstance *InstanceKey, exactReplicaTopology bool) (masterInstance *Instance, replicas []*Instance, actions map[*Instance]bool, err error) { + // Read entire topology of master and its replicas to ensure we have the most up-to-date information + masterInstance, err = ReadTopologyInstance(masterKey) + if err != nil { + return nil, nil, nil, err + } + replicas, err = ReadTopologyInstances(masterInstance.Replicas.GetInstanceKeys()) + if err != nil { + replicas, err = ReadReplicaInstances(masterKey) // Falling back to just reading from our backend + if err != nil { + return nil, nil, nil, err + } + } + + // Classify and prioritize replicas & figure out which replicas need to be acted upon + possibleSemiSyncReplicas, asyncReplicas, excludedReplicas := classifyAndPrioritizeReplicas(replicas, includeNonReplicatingInstance) + actions = determineSemiSyncReplicaActions(masterInstance, possibleSemiSyncReplicas, asyncReplicas, exactReplicaTopology) + logSemiSyncReplicaAnalysis(masterInstance, possibleSemiSyncReplicas, asyncReplicas, excludedReplicas, actions) + + return masterInstance, replicas, actions, nil +} + +// classifyAndPrioritizeReplicas takes a list of replica instances and classifies them based on their semi-sync priority, excluding replicas +// that are down. The function furthermore prioritizes the possible semi-sync replicas based on SemiSyncPriority, PromotionRule and hostname (fallback). +func classifyAndPrioritizeReplicas(replicas []*Instance, includeNonReplicatingInstance *InstanceKey) (possibleSemiSyncReplicas []*Instance, asyncReplicas []*Instance, excludedReplicas []*Instance) { + // Classify based on state and semi-sync priority + possibleSemiSyncReplicas = make([]*Instance, 0) + asyncReplicas = make([]*Instance, 0) + excludedReplicas = make([]*Instance, 0) + for _, replica := range replicas { + isReplicating := replica.Key.Equals(includeNonReplicatingInstance) || replica.ReplicaRunning() + if !replica.IsLastCheckValid || !isReplicating { + excludedReplicas = append(excludedReplicas, replica) + } else if replica.SemiSyncPriority == 0 { + asyncReplicas = append(asyncReplicas, replica) + } else { + possibleSemiSyncReplicas = append(possibleSemiSyncReplicas, replica) + } + } + + // Sort replicas by priority (higher number means higher priority), promotion rule and name + sort.Slice(possibleSemiSyncReplicas, func(i, j int) bool { + if possibleSemiSyncReplicas[i].SemiSyncPriority != possibleSemiSyncReplicas[j].SemiSyncPriority { + return possibleSemiSyncReplicas[i].SemiSyncPriority > possibleSemiSyncReplicas[j].SemiSyncPriority + } + if possibleSemiSyncReplicas[i].PromotionRule != possibleSemiSyncReplicas[j].PromotionRule { + return possibleSemiSyncReplicas[i].PromotionRule.BetterThan(possibleSemiSyncReplicas[j].PromotionRule) + } + return strings.Compare(possibleSemiSyncReplicas[i].Key.String(), possibleSemiSyncReplicas[j].Key.String()) < 0 + }) + + return +} + +// determineSemiSyncReplicaActions returns a map of replicas for which to change the semi-sync replica setting. +// A value of true indicates semi-sync needs to be enabled, false that it needs to be disabled. +func determineSemiSyncReplicaActions(masterInstance *Instance, possibleSemiSyncReplicas []*Instance, asyncReplicas []*Instance, exactReplicaTopology bool) map[*Instance]bool { + if exactReplicaTopology { + return determineSemiSyncReplicaActionsForExactTopology(masterInstance, possibleSemiSyncReplicas, asyncReplicas) + } + return determineSemiSyncReplicaActionsForEnoughTopology(masterInstance, possibleSemiSyncReplicas) +} + +// determineSemiSyncReplicaActionsForExactTopology takes a priority-list of possible semi-sync replicas and always-async replicas and returns a list +// of actions to perform on them. If the current state of a replica's semi-sync flag does not match the desired state, an action is returned for it. +func determineSemiSyncReplicaActionsForExactTopology(masterInstance *Instance, possibleSemiSyncReplicas []*Instance, asyncReplicas []*Instance) map[*Instance]bool { + actions := make(map[*Instance]bool, 0) // true = enable semi-sync, false = disable semi-sync + for i, replica := range possibleSemiSyncReplicas { + isSemiSyncEnabled := replica.SemiSyncReplicaEnabled + shouldSemiSyncBeEnabled := uint(i) < masterInstance.SemiSyncMasterWaitForReplicaCount + if shouldSemiSyncBeEnabled && !isSemiSyncEnabled { + actions[replica] = true + } else if !shouldSemiSyncBeEnabled && isSemiSyncEnabled { + actions[replica] = false + } + } + for _, replica := range asyncReplicas { + if replica.SemiSyncReplicaEnabled { + actions[replica] = false + } + } + return actions +} + +// determineSemiSyncReplicaActionsForEnoughTopology takes a priority-list of possible semi-sync replicas and returns a list of actions to increase the +// number of semi-sync replicas to the semi-sync master wait count. This function will never return actions to disable a semi-sync replica. +func determineSemiSyncReplicaActionsForEnoughTopology(masterInstance *Instance, possibleSemiSyncReplicas []*Instance) map[*Instance]bool { + actions := make(map[*Instance]bool, 0) // true = enable semi-sync, false = disable semi-sync + enabled := uint(0) + for _, replica := range possibleSemiSyncReplicas { + if !replica.SemiSyncReplicaEnabled { + actions[replica] = true + enabled++ + } + if enabled == masterInstance.SemiSyncMasterWaitForReplicaCount-masterInstance.SemiSyncMasterClients { + break + } + } + return actions +} + +func logSemiSyncReplicaAnalysis(masterInstance *Instance, possibleSemiSyncReplicas []*Instance, asyncReplicas []*Instance, excludedReplicas []*Instance, actions map[*Instance]bool) { + log.Debugf("semi-sync: analysis results for recovery of cluster %+v:", masterInstance.ClusterName) + log.Debugf("semi-sync: master = %+v, master semi-sync wait count = %d, master semi-sync replica count = %d", masterInstance.Key, masterInstance.SemiSyncMasterWaitForReplicaCount, masterInstance.SemiSyncMasterClients) + logSemiSyncReplicaList("possible semi-sync replicas (in priority order)", possibleSemiSyncReplicas) + logSemiSyncReplicaList("always-async replicas", asyncReplicas) + logSemiSyncReplicaList("excluded replicas (defunct)", excludedReplicas) + if len(actions) > 0 { + log.Debugf("semi-sync: suggested actions:") + for replica, enable := range actions { + log.Debugf("semi-sync: - %+v: should set semi-sync enabled = %t", replica.Key, enable) + } + } else { + log.Debugf("semi-sync: suggested actions: (none)") + } +} + +func logSemiSyncReplicaList(description string, replicas []*Instance) { + if len(replicas) > 0 { + log.Debugf("semi-sync: %s:", description) + for _, replica := range replicas { + log.Debugf("semi-sync: - %s: semi-sync enabled = %t, priority = %d, promotion rule = %s, last check = %t, replicating = %t", replica.Key.String(), replica.SemiSyncReplicaEnabled, replica.SemiSyncPriority, replica.PromotionRule, replica.IsLastCheckValid, replica.ReplicaRunning()) + } + } else { + log.Debugf("semi-sync: %s: (none)", description) + } } // DelayReplication set the replication delay given seconds @@ -1044,13 +1221,36 @@ func MasterPosWait(instanceKey *InstanceKey, binlogCoordinates *BinlogCoordinate func ReadReplicationCredentials(instanceKey *InstanceKey) (creds *ReplicationCredentials, err error) { creds = &ReplicationCredentials{} if config.Config.ReplicationCredentialsQuery != "" { - err = ScanInstanceRow(instanceKey, config.Config.ReplicationCredentialsQuery, - &creds.User, - &creds.Password, - &creds.SSLCaCert, - &creds.SSLCert, - &creds.SSLKey, - ) + + db, err := db.OpenTopology(instanceKey.Hostname, instanceKey.Port) + if err != nil { + return creds, log.Errore(err) + } + { + resultData, err := sqlutils.QueryResultData(db, config.Config.ReplicationCredentialsQuery) + if err != nil { + return creds, log.Errore(err) + } + if len(resultData) > 0 { + // A row is found + row := resultData[0] + if len(row) > 0 { + creds.User = row[0].String + } + if len(row) > 1 { + creds.Password = row[1].String + } + if len(row) > 2 { + creds.SSLCaCert = row[2].String + } + if len(row) > 3 { + creds.SSLCert = row[3].String + } + if len(row) > 4 { + creds.SSLKey = row[4].String + } + } + } if err == nil && creds.User == "" { err = fmt.Errorf("Empty username retrieved by ReplicationCredentialsQuery") } @@ -1090,10 +1290,8 @@ func SetReadOnly(instanceKey *InstanceKey, readOnly bool) (*Instance, error) { // If async fallback is disallowed, we're responsible for flipping the master // semi-sync switch ON before accepting writes. The setting is off by default. - if instance.SemiSyncEnforced && !readOnly { - // Send ACK only from promotable instances. - sendACK := instance.PromotionRule != MustNotPromoteRule - if err := EnableSemiSync(instanceKey, true, sendACK); err != nil { + if instance.SemiSyncPriority > 0 && !readOnly { + if _, err := SetSemiSyncMaster(instanceKey, true); err != nil { return instance, log.Errore(err) } } @@ -1111,13 +1309,14 @@ func SetReadOnly(instanceKey *InstanceKey, readOnly bool) (*Instance, error) { } } instance, err = ReadTopologyInstance(instanceKey) + if err != nil { + return instance, log.Errore(err) + } // If we just went read-only, it's safe to flip the master semi-sync switch // OFF, which is the default value so that replicas can make progress. - if instance.SemiSyncEnforced && readOnly { - // Send ACK only from promotable instances. - sendACK := instance.PromotionRule != MustNotPromoteRule - if err := EnableSemiSync(instanceKey, false, sendACK); err != nil { + if instance.SemiSyncPriority > 0 && readOnly { + if _, err := SetSemiSyncMaster(instanceKey, false); err != nil { return instance, log.Errore(err) } } diff --git a/go/inst/instance_topology_dao_test.go b/go/inst/instance_topology_dao_test.go new file mode 100644 index 000000000..dce235734 --- /dev/null +++ b/go/inst/instance_topology_dao_test.go @@ -0,0 +1,146 @@ +package inst + +import ( + "github.com/openark/golib/log" + test "github.com/openark/golib/tests" + "github.com/openark/orchestrator/go/config" + "testing" +) + +func init() { + config.Config.HostnameResolveMethod = "none" + config.MarkConfigurationLoaded() + log.SetLevel(log.ERROR) +} + +func newTestReplica(key string, masterKey string, lastCheckValid bool, semiSyncPriority uint, promotionRule CandidatePromotionRule, replicationState ReplicationThreadState) *Instance { + return &Instance{ + Key: InstanceKey{Hostname: key, Port: 3306}, + MasterKey: InstanceKey{Hostname: masterKey, Port: 3306}, + ReadBinlogCoordinates: BinlogCoordinates{LogFile: "mysql.000001", LogPos: 10}, + ReplicationSQLThreadState: replicationState, + ReplicationIOThreadState: replicationState, + IsLastCheckValid: lastCheckValid, + SemiSyncPriority: semiSyncPriority, + PromotionRule: promotionRule, + } +} + +func expectInstancesMatch(t *testing.T, actual []*Instance, expected []*Instance) { + if len(expected) != len(actual) { + t.Fatalf("Actual instance list %+v does not match expected list %+v", actual, expected) + } + for i := range actual { + if actual[i] != expected[i] { + t.Fatalf("Actual instance %+v does not match expected %+v", actual[i], expected[i]) + } + } +} + +func TestClassifyAndPrioritizeReplicas_NoPrioritiesSamePromotionRule_NameTiebreaker(t *testing.T) { + replica1 := newTestReplica("replica1", "master1", true, 1, NeutralPromoteRule, ReplicationThreadStateRunning) + replica2 := newTestReplica("replica2", "master1", true, 1, NeutralPromoteRule, ReplicationThreadStateRunning) + replica3 := newTestReplica("replica3", "master1", true, 1, NeutralPromoteRule, ReplicationThreadStateRunning) + replicas := []*Instance{replica3, replica2, replica1} // inverse order! + + possibleSemiSyncReplicas, asyncReplicas, excludedReplicas := classifyAndPrioritizeReplicas(replicas, nil) + expectInstancesMatch(t, possibleSemiSyncReplicas, []*Instance{replica1, replica2, replica3}) + expectInstancesMatch(t, asyncReplicas, []*Instance{}) + expectInstancesMatch(t, excludedReplicas, []*Instance{}) +} + +func TestClassifyAndPrioritizeReplicas_WithPriorities(t *testing.T) { + replica1 := newTestReplica("replica1", "master1", true, 1, NeutralPromoteRule, ReplicationThreadStateRunning) + replica2 := newTestReplica("replica2", "master1", true, 3, NeutralPromoteRule, ReplicationThreadStateRunning) + replica3 := newTestReplica("replica3", "master1", true, 2, NeutralPromoteRule, ReplicationThreadStateRunning) + replicas := []*Instance{replica1, replica2, replica3} + + possibleSemiSyncReplicas, asyncReplicas, excludedReplicas := classifyAndPrioritizeReplicas(replicas, nil) + expectInstancesMatch(t, possibleSemiSyncReplicas, []*Instance{replica2, replica3, replica1}) + expectInstancesMatch(t, asyncReplicas, []*Instance{}) + expectInstancesMatch(t, excludedReplicas, []*Instance{}) +} + +func TestClassifyAndPrioritizeReplicas_WithPrioritiesAndPromotionRules_PriorityTakesPrecedence(t *testing.T) { + replica1 := newTestReplica("replica1", "master1", true, 1, PreferPromoteRule, ReplicationThreadStateRunning) + replica2 := newTestReplica("replica2", "master1", true, 3, MustNotPromoteRule, ReplicationThreadStateRunning) + replica3 := newTestReplica("replica3", "master1", true, 2, NeutralPromoteRule, ReplicationThreadStateRunning) + replicas := []*Instance{replica1, replica2, replica3} + + possibleSemiSyncReplicas, asyncReplicas, excludedReplicas := classifyAndPrioritizeReplicas(replicas, nil) + expectInstancesMatch(t, possibleSemiSyncReplicas, []*Instance{replica2, replica3, replica1}) + expectInstancesMatch(t, asyncReplicas, []*Instance{}) + expectInstancesMatch(t, excludedReplicas, []*Instance{}) +} + +func TestClassifyAndPrioritizeReplicas_LastCheckInvalidAndNotReplication(t *testing.T) { + replica1 := newTestReplica("replica1", "master1", true, 1, MustNotPromoteRule, ReplicationThreadStateRunning) + replica2 := newTestReplica("replica2", "master1", true, 1, NeutralPromoteRule, ReplicationThreadStateStopped) + replica3 := newTestReplica("replica3", "master1", false, 1, PreferPromoteRule, ReplicationThreadStateRunning) + replicas := []*Instance{replica1, replica2, replica3} + + possibleSemiSyncReplicas, asyncReplicas, excludedReplicas := classifyAndPrioritizeReplicas(replicas, nil) + expectInstancesMatch(t, possibleSemiSyncReplicas, []*Instance{replica1}) + expectInstancesMatch(t, asyncReplicas, []*Instance{}) + expectInstancesMatch(t, excludedReplicas, []*Instance{replica2, replica3}) +} + +func TestClassifyAndPrioritizeReplicas_NonReplicatingReplica(t *testing.T) { + replica1 := newTestReplica("replica1", "master1", true, 1, NeutralPromoteRule, ReplicationThreadStateRunning) + replica2 := newTestReplica("replica2", "master1", true, 1, NeutralPromoteRule, ReplicationThreadStateRunning) + replica3 := newTestReplica("replica3", "master1", true, 1, MustNotPromoteRule, ReplicationThreadStateStopped) + replicas := []*Instance{replica1, replica2, replica3} + + possibleSemiSyncReplicas, asyncReplicas, excludedReplicas := classifyAndPrioritizeReplicas(replicas, &replica3.Key) // Non-replicating instance + expectInstancesMatch(t, possibleSemiSyncReplicas, []*Instance{replica1, replica2, replica3}) + expectInstancesMatch(t, asyncReplicas, []*Instance{}) + expectInstancesMatch(t, excludedReplicas, []*Instance{}) +} + +func TestDetermineSemiSyncReplicaActionsForExactTopology_EnableSomeDisableSome(t *testing.T) { + master := &Instance{SemiSyncMasterWaitForReplicaCount: 2} + replica1 := &Instance{SemiSyncReplicaEnabled: true} + replica2 := &Instance{SemiSyncReplicaEnabled: false} + replica3 := &Instance{SemiSyncReplicaEnabled: true} + replica4 := &Instance{SemiSyncReplicaEnabled: false} + replica5 := &Instance{SemiSyncReplicaEnabled: true} + possibleSemiSyncReplicas := []*Instance{replica1, replica2, replica3, replica4} + asyncReplicas := []*Instance{replica5} + actions := determineSemiSyncReplicaActionsForExactTopology(master, possibleSemiSyncReplicas, asyncReplicas) + test.S(t).ExpectTrue(len(actions) == 3) + test.S(t).ExpectTrue(actions[replica2]) + test.S(t).ExpectFalse(actions[replica3]) + test.S(t).ExpectFalse(actions[replica5]) +} + +func TestDetermineSemiSyncReplicaActionsForExactTopology_NoActions(t *testing.T) { + master := &Instance{SemiSyncMasterWaitForReplicaCount: 1, SemiSyncMasterClients: 1} + replica1 := &Instance{SemiSyncReplicaEnabled: true} + replica2 := &Instance{SemiSyncReplicaEnabled: false} + replica3 := &Instance{SemiSyncReplicaEnabled: false} + possibleSemiSyncReplicas := []*Instance{replica1, replica2, replica3} + asyncReplicas := []*Instance{} + actions := determineSemiSyncReplicaActionsForExactTopology(master, possibleSemiSyncReplicas, asyncReplicas) + test.S(t).ExpectTrue(len(actions) == 0) +} + +func TestDetermineSemiSyncReplicaActionsForEnoughTopology_MoreThanWaitCountNoActions(t *testing.T) { + master := &Instance{SemiSyncMasterWaitForReplicaCount: 1, SemiSyncMasterClients: 3} + replica1 := &Instance{SemiSyncReplicaEnabled: true} + replica2 := &Instance{SemiSyncReplicaEnabled: true} + replica3 := &Instance{SemiSyncReplicaEnabled: true} + possibleSemiSyncReplicas := []*Instance{replica1, replica2, replica3} + actions := determineSemiSyncReplicaActionsForEnoughTopology(master, possibleSemiSyncReplicas) + test.S(t).ExpectTrue(len(actions) == 0) +} + +func TestDetermineSemiSyncReplicaActionsForEnoughTopology_LessThanWaitCountEnableOne(t *testing.T) { + master := &Instance{SemiSyncMasterWaitForReplicaCount: 2, SemiSyncMasterClients: 1} + replica1 := &Instance{SemiSyncReplicaEnabled: false} + replica2 := &Instance{SemiSyncReplicaEnabled: true} + replica3 := &Instance{SemiSyncReplicaEnabled: false} + possibleSemiSyncReplicas := []*Instance{replica1, replica2, replica3} + actions := determineSemiSyncReplicaActionsForEnoughTopology(master, possibleSemiSyncReplicas) + test.S(t).ExpectTrue(len(actions) == 1) + test.S(t).ExpectTrue(actions[replica1]) +} diff --git a/go/logic/topology_recovery.go b/go/logic/topology_recovery.go index 49cc36039..855a79387 100644 --- a/go/logic/topology_recovery.go +++ b/go/logic/topology_recovery.go @@ -554,7 +554,7 @@ func recoverDeadMaster(topologyRecovery *TopologyRecovery, candidateInstanceKey case MasterRecoveryGTID: { AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("RecoverDeadMaster: regrouping replicas via GTID")) - lostReplicas, _, cannotReplicateReplicas, promotedReplica, err = inst.RegroupReplicasGTID(failedInstanceKey, true, nil, &topologyRecovery.PostponedFunctionsContainer, promotedReplicaIsIdeal) + lostReplicas, _, cannotReplicateReplicas, promotedReplica, err = inst.RegroupReplicasGTID(failedInstanceKey, true, false, nil, &topologyRecovery.PostponedFunctionsContainer, promotedReplicaIsIdeal) } case MasterRecoveryPseudoGTID: { @@ -870,11 +870,13 @@ func checkAndRecoverDeadMaster(analysisEntry inst.ReplicationAnalysis, candidate if satisfied, reason := MasterFailoverGeographicConstraintSatisfied(&analysisEntry, promotedReplica); !satisfied { return nil, fmt.Errorf("RecoverDeadMaster: failed %+v promotion; %s", promotedReplica.Key, reason) } + AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("RecoverDeadMaster: promoted replica lag seconds: %+v", promotedReplica.ReplicationLagSeconds.Int64)) if config.Config.FailMasterPromotionOnLagMinutes > 0 && time.Duration(promotedReplica.ReplicationLagSeconds.Int64)*time.Second >= time.Duration(config.Config.FailMasterPromotionOnLagMinutes)*time.Minute { // candidate replica lags too much return nil, fmt.Errorf("RecoverDeadMaster: failed promotion. FailMasterPromotionOnLagMinutes is set to %d (minutes) and promoted replica %+v 's lag is %d (seconds)", config.Config.FailMasterPromotionOnLagMinutes, promotedReplica.Key, promotedReplica.ReplicationLagSeconds.Int64) } + AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("RecoverDeadMaster: promoted replica sql thread up-to-date: %+v", promotedReplica.SQLThreadUpToDate())) if config.Config.FailMasterPromotionIfSQLThreadNotUpToDate && !promotedReplica.SQLThreadUpToDate() { return nil, fmt.Errorf("RecoverDeadMaster: failed promotion. FailMasterPromotionIfSQLThreadNotUpToDate is set and promoted replica %+v 's sql thread is not up to date (relay logs still unapplied). Aborting promotion", promotedReplica.Key) } @@ -886,6 +888,7 @@ func checkAndRecoverDeadMaster(analysisEntry inst.ReplicationAnalysis, candidate AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("DelayMasterPromotionIfSQLThreadNotUpToDate: SQL thread caught up on %+v", promotedReplica.Key)) } // All seems well. No override done. + AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("RecoverDeadMaster: found no reason to override promotion of %+v", promotedReplica.Key)) return promotedReplica, err } if promotedReplica, err = overrideMasterPromotion(); err != nil { @@ -1315,7 +1318,7 @@ func RecoverDeadCoMaster(topologyRecovery *TopologyRecovery, skipProcesses bool) switch coMasterRecoveryType { case MasterRecoveryGTID: { - lostReplicas, _, cannotReplicateReplicas, promotedReplica, err = inst.RegroupReplicasGTID(failedInstanceKey, true, nil, &topologyRecovery.PostponedFunctionsContainer, nil) + lostReplicas, _, cannotReplicateReplicas, promotedReplica, err = inst.RegroupReplicasGTID(failedInstanceKey, true, false, nil, &topologyRecovery.PostponedFunctionsContainer, nil) } case MasterRecoveryPseudoGTID: { @@ -1479,14 +1482,72 @@ func checkAndRecoverNonWriteableMaster(analysisEntry inst.ReplicationAnalysis, c // checkAndRecoverLockedSemiSyncMaster func checkAndRecoverLockedSemiSyncMaster(analysisEntry inst.ReplicationAnalysis, candidateInstanceKey *inst.InstanceKey, forceInstanceRecovery bool, skipProcesses bool) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { - topologyRecovery, err = AttemptRecoveryRegistration(&analysisEntry, true, true) if topologyRecovery == nil { AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another RecoverLockedSemiSyncMaster.", analysisEntry.AnalyzedInstanceKey)) return false, nil, err } + if config.Config.EnforceExactSemiSyncReplicas { + return recoverSemiSyncReplicas(topologyRecovery, analysisEntry, true) + } + if config.Config.RecoverLockedSemiSyncMaster { + return recoverSemiSyncReplicas(topologyRecovery, analysisEntry, false) + } + AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("no action taken to recover locked semi sync master on %+v. Enable RecoverLockedSemiSyncMaster or EnforceExactSemiSyncReplicas change this behavior.", analysisEntry.AnalyzedInstanceKey)) + return false, nil, err +} - return false, nil, nil +// checkAndRecoverMasterWithTooManySemiSyncReplicas registers and performs a recovery for MasterWithTooManySemiSyncReplicas +func checkAndRecoverMasterWithTooManySemiSyncReplicas(analysisEntry inst.ReplicationAnalysis, candidateInstanceKey *inst.InstanceKey, forceInstanceRecovery bool, skipProcesses bool) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { + topologyRecovery, err = AttemptRecoveryRegistration(&analysisEntry, true, true) + if topologyRecovery == nil { + AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another RecoverMasterWithTooManySemiSyncReplicas.", analysisEntry.AnalyzedInstanceKey)) + return false, nil, err + } + return recoverSemiSyncReplicas(topologyRecovery, analysisEntry, true) +} + +// recoverSemiSyncReplicas analyzes the replica topology for the given master and applies to repair it. If exactReplicaTopology is set, it will enable/disable the semi-sync enabled +// variable (rpl_semi_sync_replica_enabled) of the replicas depending on their semi-sync priority and promotion rule. If exactReplicaTopology is not set, the function will only ever +// enable semi-sync on replicas and never disable it. This variable typically corresponds to the EnforceExactSemiSyncReplicas config variable. +func recoverSemiSyncReplicas(topologyRecovery *TopologyRecovery, analysisEntry inst.ReplicationAnalysis, exactReplicaTopology bool) (recoveryAttempted bool, topologyRecoveryOut *TopologyRecovery, err error) { + masterInstance, replicas, actions, err := inst.AnalyzeSemiSyncReplicaTopology(&analysisEntry.AnalyzedInstanceKey, nil, exactReplicaTopology) + if err != nil { + AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("semi-sync: %s", err.Error())) + return true, topologyRecovery, log.Errorf("semi-sync: %s", err.Error()) + } else if len(actions) == 0 { + AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("semi-sync: cannot determine actions based on possible semi-sync replicas; cannot recover on %+v", &analysisEntry.AnalyzedInstanceKey)) + return true, topologyRecovery, log.Errorf("cannot determine actions based on possible semi-sync replicas; cannot recover on %+v", &analysisEntry.AnalyzedInstanceKey) + } + + // Disable semi-sync master on all replicas; this is to avoid semi-sync failures on the replicas (rpl_semi_sync_master_no_tx) + // and to make it consistent with the logic in SetReadOnly + for _, replica := range replicas { + inst.MaybeDisableSemiSyncMaster(replica) // it's okay if this fails + } + + // Take action: we first enable and then disable (two loops) in order to avoid "locked master" scenarios + AuditTopologyRecovery(topologyRecovery, "semi-sync: taking actions:") + for replica, enable := range actions { + if enable { + AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("semi-sync: - %s: setting rpl_semi_sync_slave_enabled=%t, restarting slave_io thread", replica.Key.String(), enable)) + if _, err := inst.SetSemiSyncReplica(&replica.Key, enable); err != nil { + return true, topologyRecovery, log.Errorf("cannot enable semi sync on replica %+v", replica.Key) + } + } + } + for replica, enable := range actions { + if !enable { + AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("semi-sync: - %s: setting rpl_semi_sync_slave_enabled=%t, restarting slave_io thread", replica.Key.String(), enable)) + if _, err := inst.SetSemiSyncReplica(&replica.Key, enable); err != nil { + return true, topologyRecovery, fmt.Errorf("cannot disable semi sync on replica %+v", replica.Key) + } + } + } + + resolveRecovery(topologyRecovery, masterInstance) + AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("semi-sync: recovery complete; success = %t", topologyRecovery.IsSuccessful)) + return true, topologyRecovery, nil } // checkAndRecoverGenericProblem is a general-purpose recovery function @@ -1659,6 +1720,8 @@ func getCheckAndRecoverFunction(analysisCode inst.AnalysisCode, analyzedInstance } else { return checkAndRecoverLockedSemiSyncMaster, true } + case inst.MasterWithTooManySemiSyncReplicas: + return checkAndRecoverMasterWithTooManySemiSyncReplicas, true // intermediate master case inst.DeadIntermediateMaster: return checkAndRecoverDeadIntermediateMaster, true @@ -2059,7 +2122,7 @@ func GracefulMasterTakeover(clusterName string, designatedKey *inst.InstanceKey, return nil, nil, fmt.Errorf("Sanity check failure. It seems like the designated instance %+v does not replicate from the master %+v (designated instance's master key is %+v). This error is strange. Panicking", designatedInstance.Key, clusterMaster.Key, designatedInstance.MasterKey) } if !designatedInstance.HasReasonableMaintenanceReplicationLag() { - return nil, nil, fmt.Errorf("Desginated instance %+v seems to be lagging to much for thie operation. Aborting.", designatedInstance.Key) + return nil, nil, fmt.Errorf("Desginated instance %+v seems to be lagging too much for this operation. Aborting.", designatedInstance.Key) } if len(clusterMasterDirectReplicas) > 1 {