diff --git a/pkg/replication/replication_mode.go b/pkg/replication/replication_mode.go index faa06822127..30b34e4596a 100644 --- a/pkg/replication/replication_mode.go +++ b/pkg/replication/replication_mode.go @@ -371,14 +371,6 @@ func (m *ModeManager) Run(ctx context.Context) { wg.Wait() } -func storeIDs(stores []*core.StoreInfo) []uint64 { - ids := make([]uint64, len(stores)) - for i, s := range stores { - ids[i] = s.GetID() - } - return ids -} - func minimalUpVoters(rule *placement.Rule, upStores, downStores []*core.StoreInfo) int { if rule.Role == placement.Learner { return 0 @@ -411,7 +403,7 @@ func (m *ModeManager) tickUpdateState() { drTickCounter.Inc() - stores := m.checkStoreStatus() + stores, storeIDs := m.checkStoreStatus() var primaryHasVoter, drHasVoter bool var totalVoter, totalUpVoter int @@ -440,10 +432,10 @@ func (m *ModeManager) tickUpdateState() { hasMajority := totalUpVoter*2 > totalVoter log.Debug("replication store status", - zap.Uint64s("up-primary", storeIDs(stores[primaryUp])), - zap.Uint64s("up-dr", storeIDs(stores[drUp])), - zap.Uint64s("down-primary", storeIDs(stores[primaryDown])), - zap.Uint64s("down-dr", storeIDs(stores[drDown])), + zap.Uint64s("up-primary", storeIDs[primaryUp]), + zap.Uint64s("up-dr", storeIDs[drUp]), + zap.Uint64s("down-primary", storeIDs[primaryDown]), + zap.Uint64s("down-dr", storeIDs[drDown]), zap.Bool("can-sync", canSync), zap.Bool("has-majority", hasMajority), ) @@ -470,31 +462,31 @@ func (m *ModeManager) tickUpdateState() { case drStateSync: // If hasMajority is false, the cluster is always unavailable. Switch to async won't help. if !canSync && hasMajority { - m.drSwitchToAsyncWait(storeIDs(stores[primaryUp])) + m.drSwitchToAsyncWait(storeIDs[primaryUp]) } case drStateAsyncWait: if canSync { m.drSwitchToSync() break } - if oldAvailableStores := m.drGetAvailableStores(); !reflect.DeepEqual(oldAvailableStores, storeIDs(stores[primaryUp])) { - m.drSwitchToAsyncWait(storeIDs(stores[primaryUp])) + if oldAvailableStores := m.drGetAvailableStores(); !reflect.DeepEqual(oldAvailableStores, storeIDs[primaryUp]) { + m.drSwitchToAsyncWait(storeIDs[primaryUp]) break } - if m.drCheckStoreStateUpdated(storeIDs(stores[primaryUp])) { - m.drSwitchToAsync(storeIDs(stores[primaryUp])) + if m.drCheckStoreStateUpdated(storeIDs[primaryUp]) { + m.drSwitchToAsync(storeIDs[primaryUp]) } case drStateAsync: if canSync { m.drSwitchToSyncRecover() break } - if !reflect.DeepEqual(m.drGetAvailableStores(), stores[primaryUp]) && m.drCheckStoreStateUpdated(storeIDs(stores[primaryUp])) { - m.drSwitchToAsync(storeIDs(stores[primaryUp])) + if !reflect.DeepEqual(m.drGetAvailableStores(), storeIDs[primaryUp]) && m.drCheckStoreStateUpdated(storeIDs[primaryUp]) { + m.drSwitchToAsync(storeIDs[primaryUp]) } case drStateSyncRecover: if !canSync && hasMajority { - m.drSwitchToAsync(storeIDs(stores[primaryUp])) + m.drSwitchToAsync(storeIDs[primaryUp]) } else { m.updateProgress() progress := m.estimateProgress() @@ -569,39 +561,40 @@ const ( storeStatusTypeCount ) -func (m *ModeManager) checkStoreStatus() [][]*core.StoreInfo { +func (m *ModeManager) checkStoreStatus() ([][]*core.StoreInfo, [][]uint64) { m.RLock() defer m.RUnlock() - stores := make([][]*core.StoreInfo, storeStatusTypeCount) + stores, storeIDs := make([][]*core.StoreInfo, storeStatusTypeCount), make([][]uint64, storeStatusTypeCount) for _, s := range m.cluster.GetStores() { if s.IsRemoved() { continue } - // learner peers do not participate in major commit or vote, so it should not count in primary/dr as a normal store. - if s.GetRegionCount() == s.GetLearnerCount() { - continue - } down := s.DownTime() >= m.config.DRAutoSync.WaitStoreTimeout.Duration labelValue := s.GetLabelValue(m.config.DRAutoSync.LabelKey) if labelValue == m.config.DRAutoSync.Primary { if down { stores[primaryDown] = append(stores[primaryDown], s) + storeIDs[primaryDown] = append(storeIDs[primaryDown], s.GetID()) } else { stores[primaryUp] = append(stores[primaryUp], s) + storeIDs[primaryUp] = append(storeIDs[primaryUp], s.GetID()) } } if labelValue == m.config.DRAutoSync.DR { if down { stores[drDown] = append(stores[drDown], s) + storeIDs[drDown] = append(storeIDs[drDown], s.GetID()) } else { stores[drUp] = append(stores[drUp], s) + storeIDs[drUp] = append(storeIDs[drUp], s.GetID()) } } } for i := range stores { sort.Slice(stores[i], func(a, b int) bool { return stores[i][a].GetID() < stores[i][b].GetID() }) + sort.Slice(storeIDs[i], func(a, b int) bool { return storeIDs[i][a] < storeIDs[i][b] }) } - return stores + return stores, storeIDs } // UpdateStoreDRStatus saves the dr-autosync status of a store. diff --git a/pkg/replication/replication_mode_test.go b/pkg/replication/replication_mode_test.go index bb9b6559cf6..e01fb7a0b9a 100644 --- a/pkg/replication/replication_mode_test.go +++ b/pkg/replication/replication_mode_test.go @@ -245,8 +245,8 @@ func TestStateSwitch(t *testing.T) { rep.tickUpdateState() re.Equal(drStateSync, rep.drGetState()) - // once the voter node down, even learner node up, swith to async state. - setStoreState(cluster, "up", "up", "up", "up", "down", "up") + // once zone2 down, swith to async state. + setStoreState(cluster, "up", "up", "up", "up", "down", "down") rep.tickUpdateState() re.Equal(drStateAsyncWait, rep.drGetState()) @@ -264,18 +264,18 @@ func TestStateSwitch(t *testing.T) { re.False(rep.GetReplicationStatus().GetDrAutoSync().GetPauseRegionSplit()) // async_wait -> async_wait - setStoreState(cluster, "up", "up", "up", "up", "down", "up") + setStoreState(cluster, "up", "up", "up", "up", "down", "down") rep.tickUpdateState() re.Equal(drStateAsyncWait, rep.drGetState()) assertStateIDUpdate() rep.tickReplicateStatus() re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, stateID), replicator.lastData[1]) - setStoreState(cluster, "down", "up", "up", "up", "down", "up") + setStoreState(cluster, "down", "up", "up", "up", "down", "down") rep.tickUpdateState() assertStateIDUpdate() rep.tickReplicateStatus() re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[2,3,4]}`, stateID), replicator.lastData[1]) - setStoreState(cluster, "up", "down", "up", "up", "down", "up") + setStoreState(cluster, "up", "down", "up", "up", "down", "down") rep.tickUpdateState() assertStateIDUpdate() rep.tickReplicateStatus() @@ -294,7 +294,7 @@ func TestStateSwitch(t *testing.T) { re.Equal(fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,3,4]}`, stateID), replicator.lastData[1]) // async -> async - setStoreState(cluster, "up", "up", "up", "up", "down", "up") + setStoreState(cluster, "up", "up", "up", "up", "down", "down") rep.tickUpdateState() // store 2 won't be available before it syncs status. rep.tickReplicateStatus() @@ -319,14 +319,14 @@ func TestStateSwitch(t *testing.T) { // sync_recover -> async rep.tickUpdateState() re.Equal(drStateSyncRecover, rep.drGetState()) - setStoreState(cluster, "up", "up", "up", "up", "down", "up") + setStoreState(cluster, "up", "up", "up", "up", "down", "down") rep.tickUpdateState() re.Equal(drStateAsync, rep.drGetState()) assertStateIDUpdate() // lost majority, does not switch to async. rep.drSwitchToSyncRecover() assertStateIDUpdate() - setStoreState(cluster, "down", "down", "up", "up", "down", "up") + setStoreState(cluster, "down", "down", "up", "up", "down", "down") rep.tickUpdateState() re.Equal(drStateSyncRecover, rep.drGetState()) @@ -636,6 +636,8 @@ func TestComplexPlacementRules(t *testing.T) { setStoreState(cluster, "up", "up", "up", "up", "up", "up", "up", "down", "up", "down") rep.tickUpdateState() re.Equal(drStateAsyncWait, rep.drGetState()) + rep.tickReplicateStatus() + re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4,5,6]}`, rep.drAutoSync.StateID), replicator.lastData[1]) // reset to sync setStoreState(cluster, "up", "up", "up", "up", "up", "up", "up", "up", "up", "up") @@ -695,6 +697,47 @@ func TestComplexPlacementRules2(t *testing.T) { setStoreState(cluster, "up", "up", "up", "up", "down", "down", "up") rep.tickUpdateState() re.Equal(drStateAsyncWait, rep.drGetState()) + rep.tickReplicateStatus() + re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, rep.drAutoSync.StateID), replicator.lastData[1]) +} + +func TestComplexPlacementRules3(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + store := storage.NewStorageWithMemoryBackend() + conf := config.ReplicationModeConfig{ReplicationMode: modeDRAutoSync, DRAutoSync: config.DRAutoSyncReplicationConfig{ + LabelKey: "zone", + Primary: "zone1", + DR: "zone2", + WaitStoreTimeout: typeutil.Duration{Duration: time.Minute}, + }} + cluster := mockcluster.NewCluster(ctx, mockconfig.NewTestOptions()) + replicator := newMockReplicator([]uint64{1}) + rep, err := NewReplicationModeManager(conf, store, cluster, replicator) + re.NoError(err) + cluster.GetRuleManager().SetAllGroupBundles( + genPlacementRuleConfig([]ruleConfig{ + {key: "logic", value: "logic1", role: placement.Voter, count: 2}, + {key: "logic", value: "logic2", role: placement.Learner, count: 1}, + {key: "logic", value: "logic3", role: placement.Voter, count: 1}, + }), true) + + cluster.AddLabelsStore(1, 1, map[string]string{"zone": "zone1", "logic": "logic1"}) + cluster.AddLabelsStore(2, 1, map[string]string{"zone": "zone1", "logic": "logic1"}) + cluster.AddLabelsStore(3, 1, map[string]string{"zone": "zone1", "logic": "logic2"}) + cluster.AddLabelsStore(4, 1, map[string]string{"zone": "zone1", "logic": "logic2"}) + cluster.AddLabelsStore(5, 1, map[string]string{"zone": "zone2", "logic": "logic3"}) + + // initial state is sync + re.Equal(drStateSync, rep.drGetState()) + + // zone2 down, switch state, available stores should contain logic2 (learner) + setStoreState(cluster, "up", "up", "up", "up", "down") + rep.tickUpdateState() + re.Equal(drStateAsyncWait, rep.drGetState()) + rep.tickReplicateStatus() + re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, rep.drAutoSync.StateID), replicator.lastData[1]) } func genRegions(cluster *mockcluster.Cluster, stateID uint64, n int) []*core.RegionInfo {