Skip to content

Commit

Permalink
update cdc gc service safe point to pd (#202)
Browse files Browse the repository at this point in the history
  • Loading branch information
sdojjy authored Aug 13, 2024
1 parent 8b7bb72 commit 43f6faa
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 30 deletions.
73 changes: 72 additions & 1 deletion coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package coordinator
import (
"context"
"fmt"
"math"
"sync"
"time"

Expand All @@ -31,7 +32,12 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
)

Expand All @@ -56,14 +62,26 @@ type coordinator struct {
lastSaveTime time.Time
lastTickTime time.Time
scheduledChangefeeds utils.Map[scheduler.InferiorID, scheduler.Inferior]

gcManager gc.Manager
pdClient pd.Client
pdClock pdutil.Clock
etcdClient etcd.CDCEtcdClient
}

func NewCoordinator(capture *common.NodeInfo, version int64) server.Coordinator {
func NewCoordinator(capture *common.NodeInfo,
pdClient pd.Client,
pdClock pdutil.Clock,
etcdClient etcd.CDCEtcdClient, version int64) server.Coordinator {
c := &coordinator{
version: version,
nodeInfo: capture,
scheduledChangefeeds: utils.NewBtreeMap[scheduler.InferiorID, scheduler.Inferior](),
lastTickTime: time.Now(),
gcManager: gc.NewManager(etcdClient.GetGCServiceID(), pdClient, pdClock),
pdClient: pdClient,
etcdClient: etcdClient,
pdClock: pdClock,
}
id := scheduler.ChangefeedID(model.DefaultChangeFeedID("coordinator"))
c.supervisor = scheduler.NewSupervisor(
Expand Down Expand Up @@ -99,6 +117,15 @@ func (c *coordinator) Tick(
metrics.CoordinatorCounter.Add(float64(now.Sub(c.lastTickTime)) / float64(time.Second))
c.lastTickTime = now

// Owner should update GC safepoint before initializing changefeed, so
// changefeed can remove its "ticdc-creating" service GC safepoint during
// initializing.
//
// See more gc doc.
if err := c.updateGCSafepoint(ctx, state); err != nil {
return nil, errors.Trace(err)
}

// 1. handle grpc messages
err := c.handleMessages()
if err != nil {
Expand Down Expand Up @@ -348,6 +375,50 @@ func updateStatus(
})
}

func (c *coordinator) updateGCSafepoint(
ctx context.Context, state *orchestrator.GlobalReactorState,
) error {
minCheckpointTs, forceUpdate := c.calculateGCSafepoint(state)
// When the changefeed starts up, CDC will do a snapshot read at
// (checkpointTs - 1) from TiKV, so (checkpointTs - 1) should be an upper
// bound for the GC safepoint.
gcSafepointUpperBound := minCheckpointTs - 1
err := c.gcManager.TryUpdateGCSafePoint(ctx, gcSafepointUpperBound, forceUpdate)
return errors.Trace(err)
}

// calculateGCSafepoint calculates GCSafepoint for different upstream.
// Note: we need to maintain a TiCDC service GC safepoint for each upstream TiDB cluster
// to prevent upstream TiDB GC from removing data that is still needed by TiCDC.
// GcSafepoint is the minimum checkpointTs of all changefeeds that replicating a same upstream TiDB cluster.
func (c *coordinator) calculateGCSafepoint(state *orchestrator.GlobalReactorState) (
uint64, bool,
) {
var minCpts uint64 = math.MaxUint64
var forceUpdate = false

for changefeedID, changefeedState := range state.Changefeeds {
if changefeedState.Info == nil || !changefeedState.Info.NeedBlockGC() {
continue
}
checkpointTs := changefeedState.Info.GetCheckpointTs(changefeedState.Status)
if minCpts > checkpointTs {
minCpts = checkpointTs
}
// Force update when adding a new changefeed.
exist := c.scheduledChangefeeds.Has(scheduler.ChangefeedID(changefeedID))
if !exist {
forceUpdate = true
}
}
// check if the upstream has a changefeed, if not we should update the gc safepoint
if minCpts == math.MaxUint64 {
ts := c.pdClock.CurrentTime()
minCpts = oracle.GoTimeToTS(ts)
}
return minCpts, forceUpdate
}

func (c *coordinator) printStatus() {
if time.Since(c.lastCheckTime) > time.Second*10 {
workingTask := 0
Expand Down
1 change: 1 addition & 0 deletions maintainer/maintainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ func (m *Maintainer) initChangefeed() error {
m.tableSpans.ReplaceOrInsert(tableSpan, replicaSet)
}
m.supervisor.MarkNeedAddInferior()
//todo: remove gc service checkpoint ts
return err
}

Expand Down
60 changes: 31 additions & 29 deletions server/module_election.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,16 @@ import (
)

type elector struct {
election *concurrency.Election
captureImpl *serverImpl
election *concurrency.Election
serverImpl *serverImpl
}

func NewElector(captureImpl *serverImpl) SubModule {
election := concurrency.NewElection(captureImpl.session,
etcd.CaptureOwnerKey(captureImpl.EtcdClient.GetClusterID()))
func NewElector(serverImpl *serverImpl) SubModule {
election := concurrency.NewElection(serverImpl.session,
etcd.CaptureOwnerKey(serverImpl.EtcdClient.GetClusterID()))
return &elector{
captureImpl: captureImpl,
election: election,
serverImpl: serverImpl,
election: election,
}
}

Expand Down Expand Up @@ -80,13 +80,13 @@ func (e *elector) campaignCoordinator(ctx context.Context) error {
return errors.Trace(err)
}
// Before campaign check liveness
if e.captureImpl.liveness.Load() == model.LivenessCaptureStopping {
if e.serverImpl.liveness.Load() == model.LivenessCaptureStopping {
log.Info("do not campaign coordinator, liveness is stopping",
zap.String("captureID", e.captureImpl.info.ID))
zap.String("captureID", e.serverImpl.info.ID))
return nil
}
// Campaign to be the coordinator, it blocks until it been elected.
if err := e.election.Campaign(ctx, e.captureImpl.info.ID); err != nil {
if err := e.election.Campaign(ctx, e.serverImpl.info.ID); err != nil {
rootErr := errors.Cause(err)
if rootErr == context.Canceled {
return nil
Expand All @@ -96,81 +96,83 @@ func (e *elector) campaignCoordinator(ctx context.Context) error {
continue
}
log.Warn("campaign coordinator failed",
zap.String("captureID", e.captureImpl.info.ID), zap.Error(err))
zap.String("captureID", e.serverImpl.info.ID), zap.Error(err))
return cerror.ErrCaptureSuicide.GenWithStackByArgs()
}
// After campaign check liveness again.
// It is possible it becomes the coordinator right after receiving SIGTERM.
if e.captureImpl.liveness.Load() == model.LivenessCaptureStopping {
if e.serverImpl.liveness.Load() == model.LivenessCaptureStopping {
// If the server is stopping, resign actively.
log.Info("resign coordinator actively, liveness is stopping")
if resignErr := e.resign(ctx); resignErr != nil {
log.Warn("resign coordinator actively failed",
zap.String("captureID", e.captureImpl.info.ID), zap.Error(resignErr))
zap.String("captureID", e.serverImpl.info.ID), zap.Error(resignErr))
return errors.Trace(err)
}
return nil
}

coordinatorVersion, err := e.captureImpl.EtcdClient.GetOwnerRevision(ctx,
e.captureImpl.info.ID)
coordinatorVersion, err := e.serverImpl.EtcdClient.GetOwnerRevision(ctx,
e.serverImpl.info.ID)
if err != nil {
return errors.Trace(err)
}

log.Info("campaign coordinator successfully",
zap.String("captureID", e.captureImpl.info.ID),
zap.String("captureID", e.serverImpl.info.ID),
zap.Int64("coordinatorVersion", coordinatorVersion))

co := coordinator.NewCoordinator(e.captureImpl.info, coordinatorVersion)
e.captureImpl.setCoordinator(co)
co := coordinator.NewCoordinator(e.serverImpl.info,
e.serverImpl.pdClient, e.serverImpl.PDClock, e.serverImpl.EtcdClient,
coordinatorVersion)
e.serverImpl.setCoordinator(co)

// watcher changefeed changes
watcher := watcher.NewEtcdWatcher(e.captureImpl.EtcdClient,
e.captureImpl.session,
watcher := watcher.NewEtcdWatcher(e.serverImpl.EtcdClient,
e.serverImpl.session,
// changefeed info key prefix
etcd.BaseKey(e.captureImpl.EtcdClient.GetClusterID()),
etcd.BaseKey(e.serverImpl.EtcdClient.GetClusterID()),
"coordinator")

err = watcher.RunEtcdWorker(ctx, co.(orchestrator.Reactor),
orchestrator.NewGlobalState(e.captureImpl.EtcdClient.GetClusterID(),
orchestrator.NewGlobalState(e.serverImpl.EtcdClient.GetClusterID(),
cfg.CaptureSessionTTL),
ownerFlushInterval)
e.captureImpl.coordinator.AsyncStop()
e.captureImpl.setCoordinator(nil)
e.serverImpl.coordinator.AsyncStop()
e.serverImpl.setCoordinator(nil)

if !cerror.ErrNotOwner.Equal(err) {
// if coordinator exits, resign the coordinator key,
// use a new context to prevent the context from being cancelled.
resignCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if resignErr := e.resign(resignCtx); resignErr != nil {
if errors.Cause(resignErr) != context.DeadlineExceeded {
log.Info("coordinator resign failed", zap.String("captureID", e.captureImpl.info.ID),
log.Info("coordinator resign failed", zap.String("captureID", e.serverImpl.info.ID),
zap.Error(resignErr), zap.Int64("coordinatorVersion", coordinatorVersion))
cancel()
return errors.Trace(resignErr)
}

log.Warn("coordinator resign timeout", zap.String("captureID", e.captureImpl.info.ID),
log.Warn("coordinator resign timeout", zap.String("captureID", e.serverImpl.info.ID),
zap.Error(resignErr), zap.Int64("coordinatorVersion", coordinatorVersion))
}
cancel()
}

log.Info("coordinator resigned successfully",
zap.String("captureID", e.captureImpl.info.ID),
zap.String("captureID", e.serverImpl.info.ID),
zap.Int64("coordinatorVersion", coordinatorVersion))
if err != nil {
log.Warn("run coordinator exited with error",
zap.String("captureID", e.captureImpl.info.ID),
zap.String("captureID", e.serverImpl.info.ID),
zap.Int64("coordinatorVersion", coordinatorVersion),
zap.Error(err))
// for errors, return error and let server exits or restart
return errors.Trace(err)
}
// if coordinator exits normally, continue the campaign loop and try to election coordinator again
log.Info("run coordinator exited normally",
zap.String("captureID", e.captureImpl.info.ID),
zap.String("captureID", e.serverImpl.info.ID),
zap.Int64("coordinatorVersion", coordinatorVersion))
}
}
Expand Down

0 comments on commit 43f6faa

Please sign in to comment.