From 8a6d0343147aed504387a32e2c119c71269985bf Mon Sep 17 00:00:00 2001 From: Evan Zhou Date: Tue, 20 Jul 2021 13:48:03 +0800 Subject: [PATCH] support witness --- engine/meta.go | 10 ++++++++ tikv/raftstore/applier.go | 41 ++++++++++++++++++++++---------- tikv/raftstore/fsm_peer.go | 13 +++++----- tikv/raftstore/fsm_store.go | 3 --- tikv/raftstore/peer.go | 18 +++++++++++++- tikv/raftstore/peer_storage.go | 43 ++++++++++++++++++++-------------- tikv/raftstore/recover.go | 16 ++++++++++++- tikv/region.go | 10 ++++++++ 8 files changed, 112 insertions(+), 42 deletions(-) diff --git a/engine/meta.go b/engine/meta.go index 5090a049..cb12ca1a 100644 --- a/engine/meta.go +++ b/engine/meta.go @@ -65,6 +65,12 @@ func NewShardMeta(cs *enginepb.ChangeSet) *ShardMeta { return shardMeta } +func NewShardMetaFromBin(bin []byte) *ShardMeta { + cs := new(enginepb.ChangeSet) + _ = cs.Unmarshal(bin) + return NewShardMeta(cs) +} + func (si *ShardMeta) FileLevel(fid uint64) (int, bool) { fm, ok := si.files[fid] if ok { @@ -330,6 +336,10 @@ func (si *ShardMeta) PreSplitKeys() [][]byte { return si.preSplit.Keys } +func (si *ShardMeta) HasParent() bool { + return si.parent != nil +} + // LevelCF is the struct that contains shard id and level id, type LevelCF struct { Level uint16 diff --git a/tikv/raftstore/applier.go b/tikv/raftstore/applier.go index 900712d8..3e6a61d0 100644 --- a/tikv/raftstore/applier.go +++ b/tikv/raftstore/applier.go @@ -592,6 +592,9 @@ func (a *applier) applyRaftCmd(aCtx *applyContext, index, term uint64, if cl, ok := rlog.(*raftlog.CustomRaftLog); ok { switch cl.Type() { case raftlog.TypeFlush, raftlog.TypeCompaction, raftlog.TypeSplitFiles: + if a.peer.Witness { + break + } change, err := cl.GetShardChangeSet() y.Assert(err == nil) // Assign the raft log's index as the sequence number of the ChangeSet to ensure monotonic increase. @@ -684,6 +687,10 @@ func (a *applier) execRaftCmd(aCtx *applyContext, rlog raftlog.RaftLog) ( if req.GetAdminRequest() != nil { return a.execAdminCmd(aCtx, req) } + if a.peer.Witness { + // Skip write command for witness. + return + } resp, result = a.execWriteCmd(aCtx, rlog) return } @@ -981,17 +988,19 @@ func (a *applier) execBatchSplit(aCtx *applyContext, req *raft_cmdpb.AdminReques return } a.applyState.appliedIndex = aCtx.execCtx.index - cs := buildSplitChangeSet(a.region, regions, a.applyState) - err = aCtx.engines.kv.FinishSplit(cs) - if err != nil { - if err == engine.ErrFinishSplitWrongStage { - // This must be a follower that fall behind, we need to pause the apply and wait for split files to finish - // in the background worker. - log.S().Warnf("%d:%d is not in split file done stage for finish split, pause apply", - a.region.Id, a.region.RegionEpoch.Version) - result = applyResult{tp: applyResultTypePause} + if !a.peer.Witness { + cs := buildSplitChangeSet(a.region, regions, a.applyState) + err = aCtx.engines.kv.FinishSplit(cs) + if err != nil { + if err == engine.ErrFinishSplitWrongStage { + // This must be a follower that fall behind, we need to pause the apply and wait for split files to finish + // in the background worker. + log.S().Warnf("%d:%d is not in split file done stage for finish split, pause apply", + a.region.Id, a.region.RegionEpoch.Version) + result = applyResult{tp: applyResultTypePause} + } + return } - return } // clear the cache here or the locks doesn't belong to the new range would never have chance to delete. a.lockCache = map[string][]byte{} @@ -1061,6 +1070,7 @@ func splitGenNewRegionMetas(oldRegion *metapb.Region, splitReqs *raft_cmdpb.Batc Id: request.NewPeerIds[j], StoreId: derived.Peers[j].StoreId, Role: derived.Peers[j].Role, + Witness: derived.Peers[j].Witness, } } regions = append(regions, newRegion) @@ -1197,9 +1207,14 @@ func (a *applier) handleApply(aCtx *applyContext, apply *apply) { if len(apply.entries) == 0 || a.pendingRemove || a.stopped { return } - a.metrics = applyMetrics{} - shard := aCtx.engines.kv.GetShard(a.region.GetId()) - a.metrics.approximateSize = uint64(shard.GetEstimatedSize()) + if !a.peer.Witness { + a.metrics = applyMetrics{} + shard := aCtx.engines.kv.GetShard(a.region.GetId()) + if shard == nil { + log.S().Warnf("%d shard not found for peer %s", a.region.Id, a.peer) + } + a.metrics.approximateSize = uint64(shard.GetEstimatedSize()) + } a.term = apply.term a.handleRaftCommittedEntries(aCtx, apply.entries) if a.waitMergeState != nil { diff --git a/tikv/raftstore/fsm_peer.go b/tikv/raftstore/fsm_peer.go index e0e5fac7..fb80b0af 100644 --- a/tikv/raftstore/fsm_peer.go +++ b/tikv/raftstore/fsm_peer.go @@ -391,6 +391,11 @@ func (d *peerMsgHandler) onRaftMsg(msg *rspb.RaftMessage) error { if err != nil { return err } + if d.peer.IsLeader() && d.peer.Meta.Witness && !msg.FromPeer.Witness { + log.S().Infof("%d:%d witness transfer leader to store %d", + d.regionID(), d.region().RegionEpoch.Version, msg.FromPeer.StoreId) + d.peer.RaftGroup.TransferLeader(msg.FromPeer.Id) + } if d.peer.AnyNewPeerCatchUp(msg.FromPeer.Id) { d.peer.HeartbeatPd(d.ctx.pdTaskSender) } @@ -774,8 +779,6 @@ func (d *peerMsgHandler) onReadySplitRegion(derived *metapb.Region, regions []*m // The raft state key changed when region version change, we need to set it here. y.Assert(store.raftState.commit > 0) d.ctx.raftWB.SetState(regionID, RaftStateKey(d.region().RegionEpoch.Version), store.raftState.Marshal()) - // Reset the flush state for derived region. - store.initialFlushed = false store.splitStage = enginepb.SplitStage_INITIAL continue } @@ -1030,7 +1033,7 @@ func (d *peerMsgHandler) onSplitRegionCheckTick() { return } - if !d.peer.IsLeader() { + if !d.peer.IsLeader() || d.peer.Meta.Witness { return } d.ctx.splitCheckTaskSender <- task{ @@ -1112,7 +1115,7 @@ func (d *peerMsgHandler) validateSplitRegion(epoch *metapb.RegionEpoch, splitKey } func (d *peerMsgHandler) onScheduleHalfSplitRegion(regionEpoch *metapb.RegionEpoch) { - if !d.peer.IsLeader() { + if !d.peer.IsLeader() || d.peer.Meta.Witness { return } region := d.region() @@ -1389,7 +1392,6 @@ func (d *peerMsgHandler) onApplyChangeSetResult(result *MsgApplyChangeSetResult) } if change.Snapshot != nil { log.S().Infof("%d on apply change set result", d.regionID()) - store.initialFlushed = true store.snapState = SnapState_Relax props := change.Snapshot.Properties if props != nil { @@ -1397,7 +1399,6 @@ func (d *peerMsgHandler) onApplyChangeSetResult(result *MsgApplyChangeSetResult) } } if change.Flush != nil { - store.initialFlushed = true props := change.Flush.Properties if props != nil { store.stableApplyState = getApplyStateFromProps(props) diff --git a/tikv/raftstore/fsm_store.go b/tikv/raftstore/fsm_store.go index 5db4df4e..1c8fe417 100644 --- a/tikv/raftstore/fsm_store.go +++ b/tikv/raftstore/fsm_store.go @@ -247,9 +247,6 @@ func (bs *raftBatchSystem) loadPeers() ([]*peerFsm, error) { if err != nil { return err } - shard := ctx.engine.kv.GetShard(regionID) - peer.peer.Store().initialFlushed = shard.IsInitialFlushed() - peer.peer.Store().splitStage = shard.GetSplitStage() ctx.peerEventObserver.OnPeerCreate(peer.peer.getEventContext(), region) if localState.State == rspb.PeerState_Merging { log.S().Infof("region %d is merging", regionID) diff --git a/tikv/raftstore/peer.go b/tikv/raftstore/peer.go index 4e2fe125..926bea6d 100644 --- a/tikv/raftstore/peer.go +++ b/tikv/raftstore/peer.go @@ -721,6 +721,9 @@ func (p *Peer) OnRoleChanged(observer PeerEventObserver, ready *raft.Ready) { shard.SetPassive(ss.RaftState != raft.StateLeader) } if ss.RaftState == raft.StateLeader { + if p.Meta.Witness { + return + } // The local read can only be performed after a new leader has applied // the first empty entry on its term. After that the lease expiring time // should be updated to @@ -873,7 +876,7 @@ func (p *Peer) handleChangeSet(ctx *RaftContext, e *eraftpb.Entry) { change.Sequence = e.Index if clog.Type() == raftlog.TypePreSplit { store.splitStage = enginepb.SplitStage_PRE_SPLIT - } else { + } else if !p.Meta.Witness { store.applyingChanges = append(store.applyingChanges, change) } if store.splitStage >= enginepb.SplitStage_PRE_SPLIT && change.Compaction != nil { @@ -882,6 +885,19 @@ func (p *Peer) handleChangeSet(ctx *RaftContext, e *eraftpb.Entry) { } shardMeta := store.GetEngineMeta() shardMeta.ApplyChangeSet(change) + if p.Meta.Witness { + // For witness, we don't apply the change set to the Engine, so there will be no apply change set result. + // We have to do the work in OnResultSet here. + if change.Flush != nil { + props := change.Flush.Properties + if props != nil { + store.stableApplyState = getApplyStateFromProps(props) + } + } + if store.splitStage < change.Stage { + store.splitStage = change.Stage + } + } log.S().Infof("%d:%d handle change set set engine meta, apply change %s", shardMeta.ID, shardMeta.Ver, change) ctx.raftWB.SetState(p.regionId, KVEngineMetaKey(), shardMeta.Marshal()) } diff --git a/tikv/raftstore/peer_storage.go b/tikv/raftstore/peer_storage.go index ea627d84..c327ee48 100644 --- a/tikv/raftstore/peer_storage.go +++ b/tikv/raftstore/peer_storage.go @@ -122,7 +122,6 @@ type PeerStorage struct { applyingChanges []*enginepb.ChangeSet splitStage enginepb.SplitStage - initialFlushed bool Tag string @@ -139,6 +138,9 @@ func NewPeerStorage(engines *Engines, region *metapb.Region, regionSched chan<- if err != nil { return nil, err } + if peer.Witness { + applyState.appliedIndex = raftState.commit + } if raftState.lastIndex < applyState.appliedIndex { panic(fmt.Sprintf("%s unexpected raft log index: lastIndex %d < appliedIndex %d", tag, raftState.lastIndex, applyState.appliedIndex)) @@ -147,24 +149,28 @@ func NewPeerStorage(engines *Engines, region *metapb.Region, regionSched chan<- if err != nil { return nil, err } - var initialFlushed bool + var shardMeta *engine.ShardMeta + if shardMetaBin := engines.raft.GetState(region.Id, KVEngineMetaKey()); len(shardMetaBin) > 0 { + shardMeta = engine.NewShardMetaFromBin(shardMetaBin) + } splitStage := enginepb.SplitStage_INITIAL if shard := engines.kv.GetShard(region.Id); shard != nil { - initialFlushed = shard.IsInitialFlushed() splitStage = shard.GetSplitStage() + } else if shardMeta != nil { + splitStage = shardMeta.SplitStage } return &PeerStorage{ - Engines: engines, - peer: peer, - region: region, - Tag: tag, - raftState: raftState, - applyState: applyState, - lastTerm: lastTerm, - regionSched: regionSched, - stats: &CacheQueryStats{}, - splitStage: splitStage, - initialFlushed: initialFlushed, + Engines: engines, + peer: peer, + region: region, + Tag: tag, + raftState: raftState, + applyState: applyState, + lastTerm: lastTerm, + regionSched: regionSched, + stats: &CacheQueryStats{}, + splitStage: splitStage, + shardMeta: shardMeta, }, nil } @@ -357,11 +363,12 @@ func (ps *PeerStorage) validateSnap(snap *eraftpb.Snapshot) bool { func (ps *PeerStorage) Snapshot() (eraftpb.Snapshot, error) { var snap eraftpb.Snapshot - if !ps.initialFlushed { - log.S().Infof("shard %d:%d has not flushed for generating snapshot", ps.region.Id, ps.region.RegionEpoch.Version) + shardMeta := ps.GetEngineMeta() + if shardMeta.HasParent() { + log.S().Infof("shard %d:%d has not flushed for generating snapshot", shardMeta.ID, shardMeta.Ver) return snap, raft.ErrSnapshotTemporarilyUnavailable } - changeSet := ps.GetEngineMeta().ToChangeSet() + changeSet := shardMeta.ToChangeSet() applyState := getApplyStateFromProps(changeSet.Snapshot.Properties) snapData := &snapData{ @@ -563,7 +570,7 @@ func (ps *PeerStorage) SaveReadyState(raftWB *raftengine.WriteBatch, ready *raft } func PeerEqual(l, r *metapb.Peer) bool { - return l.Id == r.Id && l.StoreId == r.StoreId && l.Role == r.Role + return l.Id == r.Id && l.StoreId == r.StoreId && l.Role == r.Role && l.Witness == r.Witness } func RegionEqual(l, r *metapb.Region) bool { diff --git a/tikv/raftstore/recover.go b/tikv/raftstore/recover.go index 490a52aa..c66e6217 100644 --- a/tikv/raftstore/recover.go +++ b/tikv/raftstore/recover.go @@ -165,10 +165,24 @@ func (h *RecoverHandler) IterateMeta(fn func(meta *enginepb.ChangeSet) error) er if h == nil { return nil } + var lastRegionLocalStateBin []byte err := h.raftEngine.IterateAllStates(false, func(regionID uint64, key, val []byte) error { + if key[0] == RegionMetaKeyByte { + lastRegionLocalStateBin = append(lastRegionLocalStateBin[:0], val...) + } if key[0] == KVEngineMetaKeyByte { + state := new(raft_serverpb.RegionLocalState) + err := state.Unmarshal(lastRegionLocalStateBin) + if err != nil { + return err + } + for _, peer := range state.Region.Peers { + if peer.StoreId == h.storeID && peer.Witness { + return nil + } + } cs := new(enginepb.ChangeSet) - err := cs.Unmarshal(val) + err = cs.Unmarshal(val) if err != nil { return err } diff --git a/tikv/region.go b/tikv/region.go index 52da3bea..ec0c2e39 100644 --- a/tikv/region.go +++ b/tikv/region.go @@ -265,6 +265,16 @@ func (rm *regionManager) GetRegionFromCtx(ctx *kvrpcpb.Context) (*regionCtx, *er }, } } + for _, peer := range ri.meta.Peers { + if peer.StoreId == rm.storeMeta.Id && peer.Witness { + return nil, &errorpb.Error{ + Message: "peer is witness", + NotLeader: &errorpb.NotLeader{ + RegionId: ri.meta.Id, + }, + } + } + } return ri, nil }