Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support witness #605

Open
wants to merge 1 commit into
base: sharding
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions engine/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ func NewShardMeta(cs *enginepb.ChangeSet) *ShardMeta {
return shardMeta
}

func NewShardMetaFromBin(bin []byte) *ShardMeta {
cs := new(enginepb.ChangeSet)
_ = cs.Unmarshal(bin)
return NewShardMeta(cs)
}

func (si *ShardMeta) FileLevel(fid uint64) (int, bool) {
fm, ok := si.files[fid]
if ok {
Expand Down Expand Up @@ -330,6 +336,10 @@ func (si *ShardMeta) PreSplitKeys() [][]byte {
return si.preSplit.Keys
}

func (si *ShardMeta) HasParent() bool {
return si.parent != nil
}

// LevelCF is the struct that contains shard id and level id,
type LevelCF struct {
Level uint16
Expand Down
41 changes: 28 additions & 13 deletions tikv/raftstore/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,9 @@ func (a *applier) applyRaftCmd(aCtx *applyContext, index, term uint64,
if cl, ok := rlog.(*raftlog.CustomRaftLog); ok {
switch cl.Type() {
case raftlog.TypeFlush, raftlog.TypeCompaction, raftlog.TypeSplitFiles:
if a.peer.Witness {
break
}
change, err := cl.GetShardChangeSet()
y.Assert(err == nil)
// Assign the raft log's index as the sequence number of the ChangeSet to ensure monotonic increase.
Expand Down Expand Up @@ -684,6 +687,10 @@ func (a *applier) execRaftCmd(aCtx *applyContext, rlog raftlog.RaftLog) (
if req.GetAdminRequest() != nil {
return a.execAdminCmd(aCtx, req)
}
if a.peer.Witness {
// Skip write command for witness.
return
}
resp, result = a.execWriteCmd(aCtx, rlog)
return
}
Expand Down Expand Up @@ -981,17 +988,19 @@ func (a *applier) execBatchSplit(aCtx *applyContext, req *raft_cmdpb.AdminReques
return
}
a.applyState.appliedIndex = aCtx.execCtx.index
cs := buildSplitChangeSet(a.region, regions, a.applyState)
err = aCtx.engines.kv.FinishSplit(cs)
if err != nil {
if err == engine.ErrFinishSplitWrongStage {
// This must be a follower that fall behind, we need to pause the apply and wait for split files to finish
// in the background worker.
log.S().Warnf("%d:%d is not in split file done stage for finish split, pause apply",
a.region.Id, a.region.RegionEpoch.Version)
result = applyResult{tp: applyResultTypePause}
if !a.peer.Witness {
cs := buildSplitChangeSet(a.region, regions, a.applyState)
err = aCtx.engines.kv.FinishSplit(cs)
if err != nil {
if err == engine.ErrFinishSplitWrongStage {
// This must be a follower that fall behind, we need to pause the apply and wait for split files to finish
// in the background worker.
log.S().Warnf("%d:%d is not in split file done stage for finish split, pause apply",
a.region.Id, a.region.RegionEpoch.Version)
result = applyResult{tp: applyResultTypePause}
}
return
}
return
}
// clear the cache here or the locks doesn't belong to the new range would never have chance to delete.
a.lockCache = map[string][]byte{}
Expand Down Expand Up @@ -1061,6 +1070,7 @@ func splitGenNewRegionMetas(oldRegion *metapb.Region, splitReqs *raft_cmdpb.Batc
Id: request.NewPeerIds[j],
StoreId: derived.Peers[j].StoreId,
Role: derived.Peers[j].Role,
Witness: derived.Peers[j].Witness,
}
}
regions = append(regions, newRegion)
Expand Down Expand Up @@ -1197,9 +1207,14 @@ func (a *applier) handleApply(aCtx *applyContext, apply *apply) {
if len(apply.entries) == 0 || a.pendingRemove || a.stopped {
return
}
a.metrics = applyMetrics{}
shard := aCtx.engines.kv.GetShard(a.region.GetId())
a.metrics.approximateSize = uint64(shard.GetEstimatedSize())
if !a.peer.Witness {
a.metrics = applyMetrics{}
shard := aCtx.engines.kv.GetShard(a.region.GetId())
if shard == nil {
log.S().Warnf("%d shard not found for peer %s", a.region.Id, a.peer)
}
a.metrics.approximateSize = uint64(shard.GetEstimatedSize())
}
a.term = apply.term
a.handleRaftCommittedEntries(aCtx, apply.entries)
if a.waitMergeState != nil {
Expand Down
13 changes: 7 additions & 6 deletions tikv/raftstore/fsm_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,11 @@ func (d *peerMsgHandler) onRaftMsg(msg *rspb.RaftMessage) error {
if err != nil {
return err
}
if d.peer.IsLeader() && d.peer.Meta.Witness && !msg.FromPeer.Witness {
log.S().Infof("%d:%d witness transfer leader to store %d",
d.regionID(), d.region().RegionEpoch.Version, msg.FromPeer.StoreId)
d.peer.RaftGroup.TransferLeader(msg.FromPeer.Id)
}
if d.peer.AnyNewPeerCatchUp(msg.FromPeer.Id) {
d.peer.HeartbeatPd(d.ctx.pdTaskSender)
}
Expand Down Expand Up @@ -774,8 +779,6 @@ func (d *peerMsgHandler) onReadySplitRegion(derived *metapb.Region, regions []*m
// The raft state key changed when region version change, we need to set it here.
y.Assert(store.raftState.commit > 0)
d.ctx.raftWB.SetState(regionID, RaftStateKey(d.region().RegionEpoch.Version), store.raftState.Marshal())
// Reset the flush state for derived region.
store.initialFlushed = false
store.splitStage = enginepb.SplitStage_INITIAL
continue
}
Expand Down Expand Up @@ -1030,7 +1033,7 @@ func (d *peerMsgHandler) onSplitRegionCheckTick() {
return
}

if !d.peer.IsLeader() {
if !d.peer.IsLeader() || d.peer.Meta.Witness {
return
}
d.ctx.splitCheckTaskSender <- task{
Expand Down Expand Up @@ -1112,7 +1115,7 @@ func (d *peerMsgHandler) validateSplitRegion(epoch *metapb.RegionEpoch, splitKey
}

func (d *peerMsgHandler) onScheduleHalfSplitRegion(regionEpoch *metapb.RegionEpoch) {
if !d.peer.IsLeader() {
if !d.peer.IsLeader() || d.peer.Meta.Witness {
return
}
region := d.region()
Expand Down Expand Up @@ -1389,15 +1392,13 @@ func (d *peerMsgHandler) onApplyChangeSetResult(result *MsgApplyChangeSetResult)
}
if change.Snapshot != nil {
log.S().Infof("%d on apply change set result", d.regionID())
store.initialFlushed = true
store.snapState = SnapState_Relax
props := change.Snapshot.Properties
if props != nil {
store.stableApplyState = getApplyStateFromProps(props)
}
}
if change.Flush != nil {
store.initialFlushed = true
props := change.Flush.Properties
if props != nil {
store.stableApplyState = getApplyStateFromProps(props)
Expand Down
3 changes: 0 additions & 3 deletions tikv/raftstore/fsm_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,6 @@ func (bs *raftBatchSystem) loadPeers() ([]*peerFsm, error) {
if err != nil {
return err
}
shard := ctx.engine.kv.GetShard(regionID)
peer.peer.Store().initialFlushed = shard.IsInitialFlushed()
peer.peer.Store().splitStage = shard.GetSplitStage()
ctx.peerEventObserver.OnPeerCreate(peer.peer.getEventContext(), region)
if localState.State == rspb.PeerState_Merging {
log.S().Infof("region %d is merging", regionID)
Expand Down
18 changes: 17 additions & 1 deletion tikv/raftstore/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,9 @@ func (p *Peer) OnRoleChanged(observer PeerEventObserver, ready *raft.Ready) {
shard.SetPassive(ss.RaftState != raft.StateLeader)
}
if ss.RaftState == raft.StateLeader {
if p.Meta.Witness {
return
}
// The local read can only be performed after a new leader has applied
// the first empty entry on its term. After that the lease expiring time
// should be updated to
Expand Down Expand Up @@ -873,7 +876,7 @@ func (p *Peer) handleChangeSet(ctx *RaftContext, e *eraftpb.Entry) {
change.Sequence = e.Index
if clog.Type() == raftlog.TypePreSplit {
store.splitStage = enginepb.SplitStage_PRE_SPLIT
} else {
} else if !p.Meta.Witness {
store.applyingChanges = append(store.applyingChanges, change)
}
if store.splitStage >= enginepb.SplitStage_PRE_SPLIT && change.Compaction != nil {
Expand All @@ -882,6 +885,19 @@ func (p *Peer) handleChangeSet(ctx *RaftContext, e *eraftpb.Entry) {
}
shardMeta := store.GetEngineMeta()
shardMeta.ApplyChangeSet(change)
if p.Meta.Witness {
// For witness, we don't apply the change set to the Engine, so there will be no apply change set result.
// We have to do the work in OnResultSet here.
if change.Flush != nil {
props := change.Flush.Properties
if props != nil {
store.stableApplyState = getApplyStateFromProps(props)
}
}
if store.splitStage < change.Stage {
store.splitStage = change.Stage
}
}
log.S().Infof("%d:%d handle change set set engine meta, apply change %s", shardMeta.ID, shardMeta.Ver, change)
ctx.raftWB.SetState(p.regionId, KVEngineMetaKey(), shardMeta.Marshal())
}
Expand Down
43 changes: 25 additions & 18 deletions tikv/raftstore/peer_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ type PeerStorage struct {

applyingChanges []*enginepb.ChangeSet
splitStage enginepb.SplitStage
initialFlushed bool

Tag string

Expand All @@ -139,6 +138,9 @@ func NewPeerStorage(engines *Engines, region *metapb.Region, regionSched chan<-
if err != nil {
return nil, err
}
if peer.Witness {
applyState.appliedIndex = raftState.commit
}
if raftState.lastIndex < applyState.appliedIndex {
panic(fmt.Sprintf("%s unexpected raft log index: lastIndex %d < appliedIndex %d",
tag, raftState.lastIndex, applyState.appliedIndex))
Expand All @@ -147,24 +149,28 @@ func NewPeerStorage(engines *Engines, region *metapb.Region, regionSched chan<-
if err != nil {
return nil, err
}
var initialFlushed bool
var shardMeta *engine.ShardMeta
if shardMetaBin := engines.raft.GetState(region.Id, KVEngineMetaKey()); len(shardMetaBin) > 0 {
shardMeta = engine.NewShardMetaFromBin(shardMetaBin)
}
splitStage := enginepb.SplitStage_INITIAL
if shard := engines.kv.GetShard(region.Id); shard != nil {
initialFlushed = shard.IsInitialFlushed()
splitStage = shard.GetSplitStage()
} else if shardMeta != nil {
splitStage = shardMeta.SplitStage
}
return &PeerStorage{
Engines: engines,
peer: peer,
region: region,
Tag: tag,
raftState: raftState,
applyState: applyState,
lastTerm: lastTerm,
regionSched: regionSched,
stats: &CacheQueryStats{},
splitStage: splitStage,
initialFlushed: initialFlushed,
Engines: engines,
peer: peer,
region: region,
Tag: tag,
raftState: raftState,
applyState: applyState,
lastTerm: lastTerm,
regionSched: regionSched,
stats: &CacheQueryStats{},
splitStage: splitStage,
shardMeta: shardMeta,
}, nil
}

Expand Down Expand Up @@ -357,11 +363,12 @@ func (ps *PeerStorage) validateSnap(snap *eraftpb.Snapshot) bool {

func (ps *PeerStorage) Snapshot() (eraftpb.Snapshot, error) {
var snap eraftpb.Snapshot
if !ps.initialFlushed {
log.S().Infof("shard %d:%d has not flushed for generating snapshot", ps.region.Id, ps.region.RegionEpoch.Version)
shardMeta := ps.GetEngineMeta()
if shardMeta.HasParent() {
log.S().Infof("shard %d:%d has not flushed for generating snapshot", shardMeta.ID, shardMeta.Ver)
return snap, raft.ErrSnapshotTemporarilyUnavailable
}
changeSet := ps.GetEngineMeta().ToChangeSet()
changeSet := shardMeta.ToChangeSet()

applyState := getApplyStateFromProps(changeSet.Snapshot.Properties)
snapData := &snapData{
Expand Down Expand Up @@ -563,7 +570,7 @@ func (ps *PeerStorage) SaveReadyState(raftWB *raftengine.WriteBatch, ready *raft
}

func PeerEqual(l, r *metapb.Peer) bool {
return l.Id == r.Id && l.StoreId == r.StoreId && l.Role == r.Role
return l.Id == r.Id && l.StoreId == r.StoreId && l.Role == r.Role && l.Witness == r.Witness
}

func RegionEqual(l, r *metapb.Region) bool {
Expand Down
16 changes: 15 additions & 1 deletion tikv/raftstore/recover.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,24 @@ func (h *RecoverHandler) IterateMeta(fn func(meta *enginepb.ChangeSet) error) er
if h == nil {
return nil
}
var lastRegionLocalStateBin []byte
err := h.raftEngine.IterateAllStates(false, func(regionID uint64, key, val []byte) error {
if key[0] == RegionMetaKeyByte {
lastRegionLocalStateBin = append(lastRegionLocalStateBin[:0], val...)
}
if key[0] == KVEngineMetaKeyByte {
state := new(raft_serverpb.RegionLocalState)
err := state.Unmarshal(lastRegionLocalStateBin)
if err != nil {
return err
}
for _, peer := range state.Region.Peers {
if peer.StoreId == h.storeID && peer.Witness {
return nil
}
}
cs := new(enginepb.ChangeSet)
err := cs.Unmarshal(val)
err = cs.Unmarshal(val)
if err != nil {
return err
}
Expand Down
10 changes: 10 additions & 0 deletions tikv/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,16 @@ func (rm *regionManager) GetRegionFromCtx(ctx *kvrpcpb.Context) (*regionCtx, *er
},
}
}
for _, peer := range ri.meta.Peers {
if peer.StoreId == rm.storeMeta.Id && peer.Witness {
return nil, &errorpb.Error{
Message: "peer is witness",
NotLeader: &errorpb.NotLeader{
RegionId: ri.meta.Id,
},
}
}
}
return ri, nil
}

Expand Down