Skip to content

Commit

Permalink
upstream (ticdc): add upstream to manage all upstream related resourc…
Browse files Browse the repository at this point in the history
…es. (#5282)

close #5288
  • Loading branch information
asddongmen authored May 5, 2022
1 parent 196de8b commit 9bcebc1
Show file tree
Hide file tree
Showing 25 changed files with 490 additions and 238 deletions.
11 changes: 8 additions & 3 deletions cdc/api/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
"github.com/pingcap/tiflow/pkg/version"
"github.com/r3labs/diff"
Expand All @@ -42,6 +43,9 @@ func verifyCreateChangefeedConfig(
changefeedConfig model.ChangefeedConfig,
capture *capture.Capture,
) (*model.ChangeFeedInfo, error) {
// TODO(dongmen): we should pass ClusterID in ChangefeedConfig in the upcoming future
upStream := capture.UpstreamManager.Get(upstream.DefaultClusterID)

// verify sinkURI
if changefeedConfig.SinkURI == "" {
return nil, cerror.ErrSinkURIInvalid.GenWithStackByArgs("sink-uri is empty, can't not create a changefeed without sink-uri")
Expand All @@ -63,7 +67,7 @@ func verifyCreateChangefeedConfig(

// verify start-ts
if changefeedConfig.StartTS == 0 {
ts, logical, err := capture.PDClient.GetTS(ctx)
ts, logical, err := upStream.PDClient.GetTS(ctx)
if err != nil {
return nil, cerror.ErrPDEtcdAPIError.GenWithStackByArgs("fail to get ts from pd client")
}
Expand All @@ -73,7 +77,8 @@ func verifyCreateChangefeedConfig(
// Ensure the start ts is valid in the next 1 hour.
const ensureTTL = 60 * 60
if err := gc.EnsureChangefeedStartTsSafety(
ctx, capture.PDClient,
ctx,
upStream.PDClient,
model.DefaultChangeFeedID(changefeedConfig.ID),
ensureTTL, changefeedConfig.StartTS); err != nil {
if !cerror.ErrStartTsBeforeGC.Equal(err) {
Expand Down Expand Up @@ -137,7 +142,7 @@ func verifyCreateChangefeedConfig(
}

if !replicaConfig.ForceReplicate && !changefeedConfig.IgnoreIneligibleTable {
ineligibleTables, _, err := VerifyTables(replicaConfig, capture.Storage, changefeedConfig.StartTS)
ineligibleTables, _, err := VerifyTables(replicaConfig, upStream.KVStorage, changefeedConfig.StartTS)
if err != nil {
return nil, err
}
Expand Down
90 changes: 27 additions & 63 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,11 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/client/v3/concurrency"
"go.etcd.io/etcd/server/v3/mvcc"
"go.uber.org/zap"
"golang.org/x/time/rate"

"github.com/pingcap/tiflow/cdc/kv"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/owner"
"github.com/pingcap/tiflow/cdc/processor"
Expand All @@ -44,7 +40,7 @@ import (
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/p2p"
"github.com/pingcap/tiflow/pkg/pdtime"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/version"
)

Expand All @@ -55,19 +51,16 @@ type Capture struct {
info *model.CaptureInfo
processorManager *processor.Manager

ownerMu sync.Mutex
owner owner.Owner
pdEnpoints []string
UpstreamManager *upstream.Manager
ownerMu sync.Mutex
owner owner.Owner

// session keeps alive between the capture and etcd
session *concurrency.Session
election *concurrency.Election

PDClient pd.Client
Storage tidbkv.Storage
EtcdClient *etcd.CDCEtcdClient
grpcPool kv.GrpcPool
regionCache *tikv.RegionCache
pdClock *pdtime.PDClock
sorterSystem *ssystem.System

enableNewScheduler bool
Expand All @@ -89,20 +82,18 @@ type Capture struct {

cancel context.CancelFunc

newProcessorManager func() *processor.Manager
newOwner func(pd.Client) owner.Owner
newProcessorManager func(upstreamManager *upstream.Manager) *processor.Manager
newOwner func(upstreamManager *upstream.Manager) owner.Owner
}

// NewCapture returns a new Capture instance
func NewCapture(pdClient pd.Client, kvStorage tidbkv.Storage, etcdClient *etcd.CDCEtcdClient, grpcService *p2p.ServerWrapper) *Capture {
func NewCapture(pdEnpoints []string, etcdClient *etcd.CDCEtcdClient, grpcService *p2p.ServerWrapper) *Capture {
conf := config.GetGlobalServerConfig()
return &Capture{
PDClient: pdClient,
Storage: kvStorage,
EtcdClient: etcdClient,
grpcService: grpcService,
cancel: func() {},

EtcdClient: etcdClient,
grpcService: grpcService,
cancel: func() {},
pdEnpoints: pdEnpoints,
enableNewScheduler: conf.Debug.EnableNewScheduler,
newProcessorManager: processor.NewManager,
newOwner: owner.NewOwner,
Expand Down Expand Up @@ -135,23 +126,26 @@ func (c *Capture) reset(ctx context.Context) error {
AdvertiseAddr: conf.AdvertiseAddr,
Version: version.ReleaseVersion,
}
c.processorManager = c.newProcessorManager()

if c.UpstreamManager != nil {
c.UpstreamManager.Close()
}
c.UpstreamManager = upstream.NewManager(ctx)
err = c.UpstreamManager.Add(upstream.DefaultClusterID, c.pdEnpoints)
if err != nil {
return errors.Annotate(
cerror.WrapError(cerror.ErrNewCaptureFailed, err),
"add default upstream failed")
}

c.processorManager = c.newProcessorManager(c.UpstreamManager)
if c.session != nil {
// It can't be handled even after it fails, so we ignore it.
_ = c.session.Close()
}
c.session = sess
c.election = concurrency.NewElection(sess, etcd.CaptureOwnerKey)

if c.pdClock != nil {
c.pdClock.Stop()
}

c.pdClock, err = pdtime.NewClock(ctx, c.PDClient)
if err != nil {
return errors.Trace(err)
}

if c.tableActorSystem != nil {
c.tableActorSystem.Stop()
}
Expand Down Expand Up @@ -183,9 +177,6 @@ func (c *Capture) reset(ctx context.Context) error {
"create sorter system")
}
}
if c.grpcPool != nil {
c.grpcPool.Close()
}

if c.enableNewScheduler {
c.grpcService.Reset(nil)
Expand All @@ -197,12 +188,6 @@ func (c *Capture) reset(ctx context.Context) error {
}
}

c.grpcPool = kv.NewGrpcPoolImpl(ctx, conf.Security)
if c.regionCache != nil {
c.regionCache.Close()
}
c.regionCache = tikv.NewRegionCache(c.PDClient)

if c.enableNewScheduler {
messageServerConfig := conf.Debug.Messages.ToMessageServerConfig()
c.MessageServer = p2p.NewMessageServer(c.info.ID, messageServerConfig)
Expand Down Expand Up @@ -265,13 +250,8 @@ func (c *Capture) Run(ctx context.Context) error {

func (c *Capture) run(stdCtx context.Context) error {
ctx := cdcContext.NewContext(stdCtx, &cdcContext.GlobalVars{
PDClient: c.PDClient,
KVStorage: c.Storage,
CaptureInfo: c.info,
EtcdClient: c.EtcdClient,
GrpcPool: c.grpcPool,
RegionCache: c.regionCache,
PDClock: c.pdClock,
TableActorSystem: c.tableActorSystem,
SorterSystem: c.sorterSystem,
MessageServer: c.MessageServer,
Expand Down Expand Up @@ -324,16 +304,6 @@ func (c *Capture) run(stdCtx context.Context) error {
processorErr = c.runEtcdWorker(ctx, c.processorManager, globalState, processorFlushInterval, "processor")
log.Info("the processor routine has exited", zap.Error(processorErr))
}()
wg.Add(1)
go func() {
defer wg.Done()
c.pdClock.Run(ctx)
}()
wg.Add(1)
go func() {
defer wg.Done()
c.grpcPool.RecycleConn(ctx)
}()
if c.enableNewScheduler {
wg.Add(1)
go func() {
Expand Down Expand Up @@ -419,7 +389,7 @@ func (c *Capture) campaignOwner(ctx cdcContext.Context) error {
zap.String("captureID", c.info.ID),
zap.Int64("ownerRev", ownerRev))

owner := c.newOwner(c.PDClient)
owner := c.newOwner(c.UpstreamManager)
c.setOwner(owner)

globalState := orchestrator.NewGlobalState()
Expand Down Expand Up @@ -541,13 +511,7 @@ func (c *Capture) AsyncClose() {
if c.processorManager != nil {
c.processorManager.AsyncClose()
}
if c.grpcPool != nil {
c.grpcPool.Close()
}
if c.regionCache != nil {
c.regionCache.Close()
c.regionCache = nil
}

if c.tableActorSystem != nil {
c.tableActorSystem.Stop()
c.tableActorSystem = nil
Expand Down
1 change: 1 addition & 0 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func (s FeedState) IsNeeded(need string) bool {

// ChangeFeedInfo describes the detail of a ChangeFeed
type ChangeFeedInfo struct {
ClusterID uint64 `json:"cluster-id"`
SinkURI string `json:"sink-uri"`
Opts map[string]string `json:"opts"`
CreateTime time.Time `json:"create-time"`
Expand Down
32 changes: 18 additions & 14 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
Expand All @@ -42,10 +43,10 @@ type changefeed struct {
id model.ChangeFeedID
state *orchestrator.ChangefeedReactorState

upStream *upstream.Upstream
scheduler scheduler
barriers *barriers
feedStateManager *feedStateManager
gcManager gc.Manager
redoManager redo.LogManager

schema *schemaWrap4Owner
Expand Down Expand Up @@ -80,19 +81,19 @@ type changefeed struct {
metricsChangefeedResolvedTsLagGauge prometheus.Gauge
metricsChangefeedTickDuration prometheus.Observer

newDDLPuller func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error)
newDDLPuller func(ctx cdcContext.Context, upStream *upstream.Upstream, startTs uint64) (DDLPuller, error)
newSink func() DDLSink
newScheduler func(ctx cdcContext.Context, startTs uint64) (scheduler, error)
}

func newChangefeed(id model.ChangeFeedID, gcManager gc.Manager) *changefeed {
func newChangefeed(id model.ChangeFeedID, upStream *upstream.Upstream) *changefeed {
c := &changefeed{
id: id,
// The scheduler will be created lazily.
scheduler: nil,
barriers: newBarriers(),
feedStateManager: newFeedStateManager(),
gcManager: gcManager,
upStream: upStream,

errCh: make(chan error, defaultErrChSize),
cancel: func() {},
Expand All @@ -105,19 +106,22 @@ func newChangefeed(id model.ChangeFeedID, gcManager gc.Manager) *changefeed {
}

func newChangefeed4Test(
id model.ChangeFeedID, gcManager gc.Manager,
newDDLPuller func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error),
id model.ChangeFeedID, upStream *upstream.Upstream,
newDDLPuller func(ctx cdcContext.Context, upStream *upstream.Upstream, startTs uint64) (DDLPuller, error),
newSink func() DDLSink,
) *changefeed {
c := newChangefeed(id, gcManager)
c := newChangefeed(id, upStream)
c.newDDLPuller = newDDLPuller
c.newSink = newSink
return c
}

func (c *changefeed) Tick(ctx cdcContext.Context, state *orchestrator.ChangefeedReactorState, captures map[model.CaptureID]*model.CaptureInfo) {
startTime := time.Now()

// skip this tick
if !c.upStream.IsNormal() {
return
}
ctx = cdcContext.WithErrorHandler(ctx, func(err error) error {
c.errCh <- errors.Trace(err)
return nil
Expand Down Expand Up @@ -162,7 +166,7 @@ func (c *changefeed) checkStaleCheckpointTs(ctx cdcContext.Context, checkpointTs
failpoint.Inject("InjectChangefeedFastFailError", func() error {
return cerror.ErrGCTTLExceeded.FastGen("InjectChangefeedFastFailError")
})
if err := c.gcManager.CheckStaleCheckpointTs(ctx, c.id, checkpointTs); err != nil {
if err := c.upStream.GCManager.CheckStaleCheckpointTs(ctx, c.id, checkpointTs); err != nil {
return errors.Trace(err)
}
}
Expand Down Expand Up @@ -238,7 +242,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, state *orchestrator.Changefeed
return errors.Trace(err)
}

pdTime, _ := ctx.GlobalVars().PDClock.CurrentTime()
pdTime, _ := c.upStream.PDClock.CurrentTime()
currentTs := oracle.GetPhysical(pdTime)

// CheckpointCannotProceed implies that not all tables are being replicated normally,
Expand Down Expand Up @@ -300,7 +304,7 @@ LOOP:
// See more gc doc.
ensureTTL := int64(10 * 60)
err := gc.EnsureChangefeedStartTsSafety(
ctx, ctx.GlobalVars().PDClient, c.state.ID, ensureTTL, checkpointTs)
ctx, c.upStream.PDClient, c.state.ID, ensureTTL, checkpointTs)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -318,20 +322,19 @@ LOOP:
// So we need to process all DDLs from the range [checkpointTs, ...), but since the semantics of start-ts requires
// the lower bound of an open interval, i.e. (startTs, ...), we pass checkpointTs-1 as the start-ts to initialize
// the schema cache.
c.schema, err = newSchemaWrap4Owner(ctx.GlobalVars().KVStorage,
c.schema, err = newSchemaWrap4Owner(c.upStream.KVStorage,
checkpointTs-1, c.state.Info.Config, ctx.ChangefeedVars().ID)
if err != nil {
return errors.Trace(err)
}

cancelCtx, cancel := cdcContext.WithCancel(ctx)
c.cancel = cancel

c.sink = c.newSink()
c.sink.run(cancelCtx, cancelCtx.ChangefeedVars().ID, cancelCtx.ChangefeedVars().Info)

// Refer to the previous comment on why we use (checkpointTs-1).
c.ddlPuller, err = c.newDDLPuller(cancelCtx, checkpointTs-1)
c.ddlPuller, err = c.newDDLPuller(cancelCtx, c.upStream, checkpointTs-1)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -650,6 +653,7 @@ func (c *changefeed) Close(ctx cdcContext.Context) {
startTime := time.Now()

c.releaseResources(ctx)

costTime := time.Since(startTime)
if costTime > changefeedLogsWarnDuration {
log.Warn("changefeed close took too long",
Expand Down
Loading

0 comments on commit 9bcebc1

Please sign in to comment.