From a02b0df4bd66d7d3ede9ef6d74415b01082c9f47 Mon Sep 17 00:00:00 2001 From: glorv Date: Thu, 21 Oct 2021 20:12:25 +0800 Subject: [PATCH 1/8] support flush checkpoint async --- syncer/checkpoint.go | 246 +++++++++++++++++++++++++++++++++----- syncer/checkpoint_test.go | 6 +- syncer/dml_worker.go | 21 ++-- syncer/job.go | 3 + syncer/syncer.go | 151 +++++++++++++++++++---- 5 files changed, 361 insertions(+), 66 deletions(-) diff --git a/syncer/checkpoint.go b/syncer/checkpoint.go index fc1edd3ca..a31d3c2bf 100644 --- a/syncer/checkpoint.go +++ b/syncer/checkpoint.go @@ -61,24 +61,32 @@ var ( maxCheckPointTimeout = "1m" ) +type tablePoint struct { + location binlog.Location + ti *model.TableInfo +} + type binlogPoint struct { sync.RWMutex - location binlog.Location - ti *model.TableInfo + location tablePoint - flushedLocation binlog.Location // location which flushed permanently - flushedTI *model.TableInfo + // flushedLocation.location which flushed permanently + flushedLocation tablePoint enableGTID bool } func newBinlogPoint(location, flushedLocation binlog.Location, ti, flushedTI *model.TableInfo, enableGTID bool) *binlogPoint { return &binlogPoint{ - location: location, - ti: ti, - flushedLocation: flushedLocation, - flushedTI: flushedTI, + location: tablePoint{ + location: location, + ti: ti, + }, + flushedLocation: tablePoint{ + location: flushedLocation, + ti: flushedTI, + }, enableGTID: enableGTID, } } @@ -87,21 +95,25 @@ func (b *binlogPoint) save(location binlog.Location, ti *model.TableInfo) error b.Lock() defer b.Unlock() - if binlog.CompareLocation(location, b.location, b.enableGTID) < 0 { + if binlog.CompareLocation(location, b.location.location, b.enableGTID) < 0 { // support to save equal location, but not older location - return terror.ErrCheckpointSaveInvalidPos.Generate(location, b.location) + return terror.ErrCheckpointSaveInvalidPos.Generate(location, b.location.location) } - b.location = location - b.ti = ti + b.location.location = location + b.location.ti = ti return nil } func (b *binlogPoint) flush() { + b.flushBy(b.location) +} + +func (b *binlogPoint) flushBy(sp tablePoint) { b.Lock() defer b.Unlock() - b.flushedLocation = b.location - b.flushedTI = b.ti + b.flushedLocation.location = sp.location + b.flushedLocation.ti = sp.ti } func (b *binlogPoint) rollback(schemaTracker *schema.Tracker, schema string) (isSchemaChanged bool) { @@ -109,15 +121,15 @@ func (b *binlogPoint) rollback(schemaTracker *schema.Tracker, schema string) (is defer b.Unlock() // set suffix to 0 when we meet error - b.flushedLocation.ResetSuffix() - b.location = b.flushedLocation - if b.ti == nil { + b.flushedLocation.location.ResetSuffix() + b.location.location = b.flushedLocation.location + if b.location.ti == nil { return // for global checkpoint, no need to rollback the schema. } // NOTE: no `Equal` function for `model.TableInfo` exists now, so we compare `pointer` directly, // and after a new DDL applied to the schema, the returned pointer of `model.TableInfo` changed now. - trackedTi, _ := schemaTracker.GetTableInfo(&filter.Table{Schema: schema, Name: b.ti.Name.O}) // ignore the returned error, only compare `trackerTi` is enough. + trackedTi, _ := schemaTracker.GetTableInfo(&filter.Table{Schema: schema, Name: b.location.ti.Name.O}) // ignore the returned error, only compare `trackerTi` is enough. // may three versions of schema exist: // - the one tracked in the TiDB-with-mockTiKV. // - the one in the checkpoint but not flushed. @@ -125,8 +137,8 @@ func (b *binlogPoint) rollback(schemaTracker *schema.Tracker, schema string) (is // if any of them are not equal, then we rollback them: // - set the one in the checkpoint but not flushed to the one flushed. // - set the one tracked to the one in the checkpoint by the caller of this method (both flushed and not flushed are the same now) - if isSchemaChanged = (trackedTi != b.ti) || (b.ti != b.flushedTI); isSchemaChanged { - b.ti = b.flushedTI + if isSchemaChanged = (trackedTi != b.location.ti) || (b.location.ti != b.flushedLocation.ti); isSchemaChanged { + b.location.ti = b.flushedLocation.ti } return } @@ -135,35 +147,50 @@ func (b *binlogPoint) outOfDate() bool { b.RLock() defer b.RUnlock() - return binlog.CompareLocation(b.location, b.flushedLocation, b.enableGTID) > 0 + return binlog.CompareLocation(b.location.location, b.flushedLocation.location, b.enableGTID) > 0 +} + +func (b *binlogPoint) outOfDateBy(pos binlog.Location) bool { + b.RLock() + defer b.RUnlock() + + return binlog.CompareLocation(pos, b.flushedLocation.location, b.enableGTID) > 0 } // MySQLLocation returns point as binlog.Location. func (b *binlogPoint) MySQLLocation() binlog.Location { b.RLock() defer b.RUnlock() - return b.location + return b.location.location } // FlushedMySQLLocation returns flushed point as binlog.Location. func (b *binlogPoint) FlushedMySQLLocation() binlog.Location { b.RLock() defer b.RUnlock() - return b.flushedLocation + return b.flushedLocation.location } // TableInfo returns the table schema associated at the current binlog position. func (b *binlogPoint) TableInfo() *model.TableInfo { b.RLock() defer b.RUnlock() - return b.ti + return b.location.ti } func (b *binlogPoint) String() string { b.RLock() defer b.RUnlock() - return fmt.Sprintf("%v(flushed %v)", b.location, b.flushedLocation) + return fmt.Sprintf("%v(flushed %v)", b.location.location, b.flushedLocation.location) +} + +// +type SnapshotID struct { + // the corresponding snapshot id + id int + // global checkpoint position. + pos binlog.Location } // CheckPoint represents checkpoints status for syncer @@ -207,6 +234,9 @@ type CheckPoint interface { // corresponding to Meta.Save SaveGlobalPoint(point binlog.Location) + // Snapshot make a snapshot if current checkpoint and return its id + Snapshot() SnapshotID + // FlushGlobalPointsExcept flushes the global checkpoint and tables' // checkpoints except exceptTables, it also flushes SQLs with Args providing // by extraSQLs and extraArgs. Currently extraSQLs contain shard meta only. @@ -214,6 +244,10 @@ type CheckPoint interface { // corresponding to Meta.Flush FlushPointsExcept(tctx *tcontext.Context, exceptTables []*filter.Table, extraSQLs []string, extraArgs [][]interface{}) error + // FlushGlobalPointsExcept flushes the global checkpoint and tables' + // checkpoints except exceptTables with specified snapshot. + FlushSnapshotPointsExcept(tctx *tcontext.Context, snapshot int, exceptTables []*filter.Table, extraSQLs []string, extraArgs [][]interface{}) error + // FlushPointWithTableInfo flushed the table point with given table info FlushPointWithTableInfo(tctx *tcontext.Context, table *filter.Table, ti *model.TableInfo) error @@ -256,6 +290,14 @@ type CheckPoint interface { CheckAndUpdate(ctx context.Context, schemas map[string]string, tables map[string]map[string]string) error } +type removeCheckpointSnapshot struct { + id int + globalPoint *tablePoint + globalPointSaveTime time.Time + needFlushSafeModeExitPoint bool + points map[string]map[string]tablePoint +} + // RemoteCheckPoint implements CheckPoint // which using target database to store info // NOTE: now we sync from relay log, so not add GTID support yet @@ -293,6 +335,10 @@ type RemoteCheckPoint struct { needFlushSafeModeExitPoint bool logCtx *tcontext.Context + + // these fields are used for async flush checkpoint + snapshots []*removeCheckpointSnapshot + snapshotSeq int } // NewRemoteCheckPoint creates a new RemoteCheckPoint. @@ -309,6 +355,50 @@ func NewRemoteCheckPoint(tctx *tcontext.Context, cfg *config.SubTaskConfig, id s return cp } +// Snapshot make a snapshot of checkpoint and return the snapshot id +func (cp *RemoteCheckPoint) Snapshot() SnapshotID { + cp.RLock() + defer cp.RUnlock() + //make snapshot is visit in single thread, so depend on rlock should be enough + cp.snapshotSeq++ + + id := cp.snapshotSeq + + tableCheckPoints := make(map[string]map[string]tablePoint, len(cp.points)) + for s, tableCps := range cp.points { + tableCpSnapshots := make(map[string]tablePoint) + for tbl, point := range tableCps { + if point.outOfDate() { + tableCpSnapshots[tbl] = point.location + } + } + if len(tableCpSnapshots) > 0 { + tableCheckPoints[s] = tableCpSnapshots + } + } + + // if there is no change just return an empty snapshot + if len(tableCheckPoints) == 0 && (cp.globalPoint == nil || !cp.globalPoint.outOfDate()) { + return 0 + } + + snapshot := &removeCheckpointSnapshot{ + id: id, + globalPointSaveTime: cp.globalPointSaveTime, + needFlushSafeModeExitPoint: cp.needFlushSafeModeExitPoint, + points: tableCheckPoints, + } + if cp.globalPoint != nil { + snapshot.globalPoint = &cp.globalPoint.location + } + + cp.snapshots = append(cp.snapshots, snapshot) + return SnapshotID{ + id: id, + pos: cp.globalPoint.location.location, + } +} + // Init implements CheckPoint.Init. func (cp *RemoteCheckPoint) Init(tctx *tcontext.Context) error { checkPointDB := cp.cfg.To @@ -368,7 +458,7 @@ func (cp *RemoteCheckPoint) SaveTablePoint(table *filter.Table, point binlog.Loc // saveTablePoint saves single table's checkpoint without mutex.Lock. func (cp *RemoteCheckPoint) saveTablePoint(sourceTable *filter.Table, location binlog.Location, ti *model.TableInfo) { - if binlog.CompareLocation(cp.globalPoint.location, location, cp.cfg.EnableGTID) > 0 { + if binlog.CompareLocation(cp.globalPoint.location.location, location, cp.cfg.EnableGTID) > 0 { panic(fmt.Sprintf("table checkpoint %+v less than global checkpoint %+v", location, cp.globalPoint)) } @@ -534,7 +624,7 @@ func (cp *RemoteCheckPoint) FlushPointsExcept(tctx *tcontext.Context, exceptTabl } } if point.outOfDate() { - tiBytes, err := json.Marshal(point.ti) + tiBytes, err := json.Marshal(point.location.ti) if err != nil { return terror.ErrSchemaTrackerCannotSerialize.Delegate(err, schema, table) } @@ -571,6 +661,100 @@ func (cp *RemoteCheckPoint) FlushPointsExcept(tctx *tcontext.Context, exceptTabl return nil } +// FlushSnapshotPointsExcept implements CheckPoint.FlushSnapshotPointsExcept. +func (cp *RemoteCheckPoint) FlushSnapshotPointsExcept( + tctx *tcontext.Context, + snapshot int, + exceptTables []*filter.Table, + extraSQLs []string, + extraArgs [][]interface{}, +) error { + cp.RLock() + defer cp.RUnlock() + + if len(cp.snapshots) == 0 || cp.snapshots[0].id != snapshot { + cp.logCtx.Logger.DPanic("snapshot not found", zap.Int("id", snapshot)) + } + snapshotCp := cp.snapshots[0] + + // convert slice to map + excepts := make(map[string]map[string]struct{}) + for _, schemaTable := range exceptTables { + schema, table := schemaTable.Schema, schemaTable.Name + m, ok := excepts[schema] + if !ok { + m = make(map[string]struct{}) + excepts[schema] = m + } + m[table] = struct{}{} + } + + sqls := make([]string, 0, 100) + args := make([][]interface{}, 0, 100) + + if (snapshotCp.globalPoint != nil && cp.globalPoint.outOfDateBy(snapshotCp.globalPoint.location)) || snapshotCp.globalPointSaveTime.IsZero() || snapshotCp.needFlushSafeModeExitPoint { + locationG := snapshotCp.globalPoint.location + sqlG, argG := cp.genUpdateSQL(globalCpSchema, globalCpTable, locationG, cp.safeModeExitPoint, nil, true) + sqls = append(sqls, sqlG) + args = append(args, argG) + } + + type binlogPointSp struct { + pos *binlogPoint + spLoc tablePoint + } + + points := make([]*binlogPointSp, 0, 100) + + for schema, mSchema := range snapshotCp.points { + schemaCp := cp.points[schema] + for table, point := range mSchema { + if _, ok1 := excepts[schema]; ok1 { + if _, ok2 := excepts[schema][table]; ok2 { + continue + } + } + tableCP := schemaCp[table] + if tableCP.outOfDateBy(point.location) { + tiBytes, err := json.Marshal(point.ti) + if err != nil { + return terror.ErrSchemaTrackerCannotSerialize.Delegate(err, schema, table) + } + + sql2, arg := cp.genUpdateSQL(schema, table, point.location, nil, tiBytes, false) + sqls = append(sqls, sql2) + args = append(args, arg) + + points = append(points, &binlogPointSp{ + pos: tableCP, + spLoc: point, + }) + } + } + } + for i := range extraSQLs { + sqls = append(sqls, extraSQLs[i]) + args = append(args, extraArgs[i]) + } + + // use a new context apart from syncer, to make sure when syncer call `cancel` checkpoint could update + tctx2, cancel := tctx.WithContext(context.Background()).WithTimeout(maxDMLConnectionDuration) + defer cancel() + _, err := cp.dbConn.ExecuteSQL(tctx2, sqls, args...) + if err != nil { + return err + } + + cp.globalPoint.flush() + for _, point := range points { + point.pos.flushBy(point.spLoc) + } + + cp.globalPointSaveTime = time.Now() + cp.needFlushSafeModeExitPoint = false + return nil +} + // FlushPointWithTableInfo implements CheckPoint.FlushPointWithTableInfo. func (cp *RemoteCheckPoint) FlushPointWithTableInfo(tctx *tcontext.Context, table *filter.Table, ti *model.TableInfo) error { cp.Lock() @@ -604,7 +788,7 @@ func (cp *RemoteCheckPoint) FlushPointWithTableInfo(tctx *tcontext.Context, tabl return err } - err = point.save(point.location, ti) + err = point.save(point.location.location, ti) if err != nil { return err } @@ -703,12 +887,12 @@ func (cp *RemoteCheckPoint) Rollback(schemaTracker *schema.Tracker) { if err := schemaTracker.DropTable(table); err != nil { logger.Warn("failed to drop table from schema tracker", log.ShortError(err)) } - if point.ti != nil { + if point.location.ti != nil { // TODO: Figure out how to recover from errors. if err := schemaTracker.CreateSchemaIfNotExists(schemaName); err != nil { logger.Error("failed to rollback schema on schema tracker: cannot create schema", log.ShortError(err)) } - if err := schemaTracker.CreateTableIfNotExists(table, point.ti); err != nil { + if err := schemaTracker.CreateTableIfNotExists(table, point.location.ti); err != nil { logger.Error("failed to rollback schema on schema tracker: cannot create table", log.ShortError(err)) } } @@ -1043,7 +1227,7 @@ func (cp *RemoteCheckPoint) GetFlushedTableInfo(table *filter.Table) *model.Tabl if tables, ok := cp.points[table.Schema]; ok { if point, ok2 := tables[table.Name]; ok2 { - return point.flushedTI + return point.flushedLocation.ti } } return nil diff --git a/syncer/checkpoint_test.go b/syncer/checkpoint_test.go index 88ae25b62..2f8c30b0f 100644 --- a/syncer/checkpoint_test.go +++ b/syncer/checkpoint_test.go @@ -399,12 +399,12 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) { cp.SaveTablePoint(table, binlog.Location{Position: pos1}, ti) rcp := cp.(*RemoteCheckPoint) c.Assert(rcp.points[schemaName][tableName].TableInfo(), NotNil) - c.Assert(rcp.points[schemaName][tableName].flushedTI, IsNil) + c.Assert(rcp.points[schemaName][tableName].flushedLocation.ti, IsNil) cp.Rollback(s.tracker) rcp = cp.(*RemoteCheckPoint) c.Assert(rcp.points[schemaName][tableName].TableInfo(), IsNil) - c.Assert(rcp.points[schemaName][tableName].flushedTI, IsNil) + c.Assert(rcp.points[schemaName][tableName].flushedLocation.ti, IsNil) _, err = s.tracker.GetTableInfo(table) c.Assert(strings.Contains(err.Error(), "doesn't exist"), IsTrue) @@ -478,6 +478,6 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) { c.Assert(cp.GlobalPoint(), DeepEquals, binlog.InitLocation(pos2, gs)) rcp = cp.(*RemoteCheckPoint) c.Assert(rcp.points[schemaName][tableName].TableInfo(), NotNil) - c.Assert(rcp.points[schemaName][tableName].flushedTI, NotNil) + c.Assert(rcp.points[schemaName][tableName].flushedLocation.ti, NotNil) c.Assert(*rcp.safeModeExitPoint, DeepEquals, binlog.InitLocation(pos2, gs)) } diff --git a/syncer/dml_worker.go b/syncer/dml_worker.go index 08a888277..31f3b6f35 100644 --- a/syncer/dml_worker.go +++ b/syncer/dml_worker.go @@ -112,20 +112,22 @@ func (w *DMLWorker) run() { for j := range w.inCh { metrics.QueueSizeGauge.WithLabelValues(w.task, "dml_worker_input", w.source).Set(float64(len(w.inCh))) if j.tp == flush || j.tp == conflict { - if j.tp == conflict { - w.addCountFunc(false, adminQueueName, j.tp, 1, j.targetTable) - } - w.wg.Add(w.workerCount) + // todo how to track conflict job metrics + //if j.tp == conflict { + // w.addCountFunc(false, adminQueueName, j.tp, 1, j.targetTable) + //} // flush for every DML queue for i, jobCh := range jobChs { startTime := time.Now() jobCh <- j metrics.AddJobDurationHistogram.WithLabelValues(j.tp.String(), w.task, queueBucketMapping[i], w.source).Observe(time.Since(startTime).Seconds()) } - w.wg.Wait() - if j.tp == conflict { - w.addCountFunc(true, adminQueueName, j.tp, 1, j.targetTable) - } else { + //if j.tp == conflict { + // w.addCountFunc(true, adminQueueName, j.tp, 1, j.targetTable) + //} else { + // w.flushCh <- j + //} + if j.tp == flush { w.flushCh <- j } } else { @@ -168,6 +170,9 @@ func (w *DMLWorker) executeJobs(queueID int, jobCh chan *job) { w.executeBatchJobs(queueID, jobs) if j.tp == conflict || j.tp == flush { w.wg.Done() + if j.wg != nil { + j.wg.Done() + } } jobs = jobs[0:0] diff --git a/syncer/job.go b/syncer/job.go index e85ae2bd5..96c33ea6e 100644 --- a/syncer/job.go +++ b/syncer/job.go @@ -15,6 +15,7 @@ package syncer import ( "fmt" + "sync" "time" "github.com/go-mysql-org/go-mysql/replication" @@ -83,6 +84,7 @@ type job struct { eventHeader *replication.EventHeader jobAddTime time.Time // job commit time + wg *sync.WaitGroup // wait group for flush/conflict job } func (j *job) String() string { @@ -169,6 +171,7 @@ func newFlushJob() *job { tp: flush, targetTable: &filter.Table{}, jobAddTime: time.Now(), + wg: &sync.WaitGroup{}, } } diff --git a/syncer/syncer.go b/syncer/syncer.go index 8478820a5..386b21f95 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -184,6 +184,7 @@ type Syncer struct { done chan struct{} checkpoint CheckPoint + flushCpWorker checkpointFlushWorker onlineDDL onlineddl.OnlinePlugin // record process error rather than log.Fatal @@ -398,6 +399,12 @@ func (s *Syncer) Init(ctx context.Context) (err error) { } } } + s.flushCpWorker = checkpointFlushWorker{ + input: make(chan *flushCpTask, 16), + wg: sync.WaitGroup{}, + cp: s.checkpoint, + afterFlushFn: s.afterFlushCheckpoint, + } // when Init syncer, set active relay log info err = s.setInitActiveRelayLog(ctx) @@ -928,9 +935,9 @@ func (s *Syncer) addJob(job *job) error { s.addCount(false, adminQueueName, job.tp, 1, job.targetTable) s.jobWg.Add(1) s.dmlJobCh <- job - s.jobWg.Wait() - s.addCount(true, adminQueueName, job.tp, 1, job.targetTable) - return s.flushCheckPoints() + job.wg.Add(s.cfg.WorkerCount) + s.flushCheckPointsAsync(job.wg) + return nil case ddl: s.addCount(false, adminQueueName, job.tp, 1, job.targetTable) s.updateReplicationJobTS(job, ddlJobIdx) @@ -957,10 +964,13 @@ func (s *Syncer) addJob(job *job) error { needFlush = true } }) + if needFlush { s.jobWg.Add(1) + j := newFlushJob() + j.wg.Add(s.cfg.WorkerCount) s.dmlJobCh <- newFlushJob() - s.jobWg.Wait() + s.flushCheckPointsAsync(j.wg) } if s.execError.Load() != nil { @@ -1005,7 +1015,7 @@ func (s *Syncer) addJob(job *job) error { } } - if needFlush || job.tp == ddl { + if job.tp == ddl { // interrupted after save checkpoint and before flush checkpoint. failpoint.Inject("FlushCheckpointStage", func(val failpoint.Value) { err := handleFlushCheckpointStage(4, val.(int), "before flush checkpoint") @@ -1038,18 +1048,93 @@ func (s *Syncer) resetShardingGroup(table *filter.Table) { } } +type flushCpTask struct { + snapshot SnapshotID + wg *sync.WaitGroup + // extra sharding ddl sqls + exceptTables []*filter.Table + shardMetaSQLs []string + shardMetaArgs [][]interface{} + // result chan + resChan chan error +} + +type checkpointFlushWorker struct { + input chan *flushCpTask + // wg is used to sync whether all the flush msg are done + wg sync.WaitGroup + cp CheckPoint + afterFlushFn func(location binlog.Location) error +} + +// Add add a new flush checkpoint job +func (w *checkpointFlushWorker) Add(msg *flushCpTask) { + w.wg.Add(1) + w.input <- msg +} + +// Run read flush tasks from input and execute one by one +func (w *checkpointFlushWorker) Run(ctx *tcontext.Context) { + for msg := range w.input { + // wait all worker finish execute flush job + if msg.wg != nil { + msg.wg.Wait() + } + + err := w.cp.FlushSnapshotPointsExcept(ctx, msg.snapshot.id, msg.exceptTables, msg.shardMetaSQLs, msg.shardMetaArgs) + w.wg.Done() + if err != nil { + ctx.L().Warn("flush checkpoint snapshot failed, ignore this error", zap.Any("snapshot", msg)) + if msg.resChan != nil { + msg.resChan <- err + } + continue + } + ctx.L().Info("flushed checkpoint", zap.Int("snapshot_id", msg.snapshot.id), + zap.Stringer("pos", msg.snapshot.pos)) + if err = w.afterFlushFn(msg.snapshot.pos); err != nil { + if msg.resChan != nil { + msg.resChan <- err + } + ctx.L().Warn("flush post process failed", zap.Error(err)) + } + } +} + +// Wait wait all pending flus checkpoint jobs finish. +func (w *checkpointFlushWorker) Wait() { + w.wg.Wait() +} + +// Close wait all pending job finish and stop this worker +func (w *checkpointFlushWorker) Close() { + close(w.input) + w.Wait() +} + // flushCheckPoints flushes previous saved checkpoint in memory to persistent storage, like TiDB -// we flush checkpoints in four cases: +// we flush checkpoints in 3 four cases: // 1. DDL executed -// 2. at intervals (and job executed) -// 3. pausing / stopping the sync (driven by `s.flushJobs`) -// 4. IsFreshTask return true +// 2. pausing / stopping the sync (driven by `s.flushJobs`) +// 3. IsFreshTask return true // but when error occurred, we can not flush checkpoint, otherwise data may lost // and except rejecting to flush the checkpoint, we also need to rollback the checkpoint saved before // this should be handled when `s.Run` returned // // we may need to refactor the concurrency model to make the work-flow more clearer later. func (s *Syncer) flushCheckPoints() error { + errCh := make(chan error, 1) + s.doFlushCheckPointsAsync(nil, errCh) + return <- errCh +} + +// flushCheckPointsAsync flushes previous saved checkpoint asyncronously after all worker finsh flush. +// it it triggered by intervals (and job executed) +func (s *Syncer) flushCheckPointsAsync(wg *sync.WaitGroup) { + s.doFlushCheckPointsAsync(wg, nil) +} + +func (s *Syncer) doFlushCheckPointsAsync(wg *sync.WaitGroup, outCh chan error) { err := s.execError.Load() // TODO: for now, if any error occurred (including user canceled), checkpoint won't be updated. But if we have put // optimistic shard info, DM-master may resolved the optimistic lock and let other worker execute DDL. So after this @@ -1059,7 +1144,7 @@ func (s *Syncer) flushCheckPoints() error { s.tctx.L().Warn("error detected when executing SQL job, skip flush checkpoint", zap.Stringer("checkpoint", s.checkpoint), zap.Error(err)) - return nil + return } var ( @@ -1079,14 +1164,22 @@ func (s *Syncer) flushCheckPoints() error { s.tctx.L().Info("prepare flush sqls", zap.Strings("shard meta sqls", shardMetaSQLs), zap.Reflect("shard meta arguments", shardMetaArgs)) } - err = s.checkpoint.FlushPointsExcept(s.tctx, exceptTables, shardMetaSQLs, shardMetaArgs) - if err != nil { - return terror.Annotatef(err, "flush checkpoint %s", s.checkpoint) + snapshotID := s.checkpoint.Snapshot() + task := &flushCpTask{ + snapshot: snapshotID, + wg: wg, + exceptTables: exceptTables, + shardMetaSQLs: shardMetaSQLs, + shardMetaArgs: shardMetaArgs, + resChan: outCh, } - s.tctx.L().Info("flushed checkpoint", zap.Stringer("checkpoint", s.checkpoint)) + s.flushCpWorker.Add(task) +} +func (s *Syncer) afterFlushCheckpoint(loc binlog.Location) error { + s.addCount(true, adminQueueName, flush, 1, nil) // update current active relay log after checkpoint flushed - err = s.updateActiveRelayLog(s.checkpoint.GlobalPoint().Position) + err := s.updateActiveRelayLog(loc.Position) if err != nil { return err } @@ -1098,18 +1191,22 @@ func (s *Syncer) flushCheckPoints() error { } s.lastCheckpointFlushedTime = now - s.tctx.L().Info("after last flushing checkpoint, DM has ignored row changes by expression filter", - zap.Int64("number of filtered insert", s.filteredInsert.Load()), - zap.Int64("number of filtered update", s.filteredUpdate.Load()), - zap.Int64("number of filtered delete", s.filteredDelete.Load())) - - s.filteredInsert.Store(0) - s.filteredUpdate.Store(0) - s.filteredDelete.Store(0) - + s.logAndClearFilteredStatistics() return nil } +func (s *Syncer) logAndClearFilteredStatistics() { + filteredInsert := s.filteredInsert.Swap(0) + filteredUpdate := s.filteredUpdate.Swap(0) + filteredDelete := s.filteredDelete.Swap(0) + if filteredInsert > 0 || filteredUpdate > 0 || filteredDelete > 0 { + s.tctx.L().Info("after last flushing checkpoint, DM has ignored row changes by expression filter", + zap.Int64("number of filtered insert", filteredInsert), + zap.Int64("number of filtered update", filteredUpdate), + zap.Int64("number of filtered delete", filteredDelete)) + } +} + // DDL synced one by one, so we only need to process one DDL at a time. func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *dbconn.DBConn, ddlJobChan chan *job) { defer s.wg.Done() @@ -1418,6 +1515,12 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } } + s.wg.Add(1) + go func() { + defer s.wg.Done() + s.flushCpWorker.Run(s.tctx) + }() + s.wg.Add(1) go s.syncDML() From 7ce2ef6214e26039e1f083557b047289c6c18cca Mon Sep 17 00:00:00 2001 From: glorv Date: Thu, 21 Oct 2021 20:16:23 +0800 Subject: [PATCH 2/8] fix build --- syncer/checkpoint.go | 4 +++- syncer/syncer.go | 4 ++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/syncer/checkpoint.go b/syncer/checkpoint.go index a31d3c2bf..35e613c79 100644 --- a/syncer/checkpoint.go +++ b/syncer/checkpoint.go @@ -379,7 +379,9 @@ func (cp *RemoteCheckPoint) Snapshot() SnapshotID { // if there is no change just return an empty snapshot if len(tableCheckPoints) == 0 && (cp.globalPoint == nil || !cp.globalPoint.outOfDate()) { - return 0 + return SnapshotID{ + id: 0, + } } snapshot := &removeCheckpointSnapshot{ diff --git a/syncer/syncer.go b/syncer/syncer.go index 386b21f95..39663be1e 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -1165,6 +1165,10 @@ func (s *Syncer) doFlushCheckPointsAsync(wg *sync.WaitGroup, outCh chan error) { } snapshotID := s.checkpoint.Snapshot() + if snapshotID.id == 0 { + log.L().Info("checkpoint has no change, skip save checkpoint") + return + } task := &flushCpTask{ snapshot: snapshotID, wg: wg, From 57d4a95ea0c438d6ca7fda7263d862b5782e710a Mon Sep 17 00:00:00 2001 From: glorv Date: Mon, 25 Oct 2021 12:32:09 +0800 Subject: [PATCH 3/8] wait flush worker to close --- syncer/syncer.go | 26 +++++++++----------------- 1 file changed, 9 insertions(+), 17 deletions(-) diff --git a/syncer/syncer.go b/syncer/syncer.go index 39663be1e..f42aaea83 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -401,7 +401,6 @@ func (s *Syncer) Init(ctx context.Context) (err error) { } s.flushCpWorker = checkpointFlushWorker{ input: make(chan *flushCpTask, 16), - wg: sync.WaitGroup{}, cp: s.checkpoint, afterFlushFn: s.afterFlushCheckpoint, } @@ -1061,15 +1060,12 @@ type flushCpTask struct { type checkpointFlushWorker struct { input chan *flushCpTask - // wg is used to sync whether all the flush msg are done - wg sync.WaitGroup cp CheckPoint afterFlushFn func(location binlog.Location) error } // Add add a new flush checkpoint job func (w *checkpointFlushWorker) Add(msg *flushCpTask) { - w.wg.Add(1) w.input <- msg } @@ -1082,7 +1078,6 @@ func (w *checkpointFlushWorker) Run(ctx *tcontext.Context) { } err := w.cp.FlushSnapshotPointsExcept(ctx, msg.snapshot.id, msg.exceptTables, msg.shardMetaSQLs, msg.shardMetaArgs) - w.wg.Done() if err != nil { ctx.L().Warn("flush checkpoint snapshot failed, ignore this error", zap.Any("snapshot", msg)) if msg.resChan != nil { @@ -1101,15 +1096,9 @@ func (w *checkpointFlushWorker) Run(ctx *tcontext.Context) { } } -// Wait wait all pending flus checkpoint jobs finish. -func (w *checkpointFlushWorker) Wait() { - w.wg.Wait() -} - // Close wait all pending job finish and stop this worker func (w *checkpointFlushWorker) Close() { close(w.input) - w.Wait() } // flushCheckPoints flushes previous saved checkpoint in memory to persistent storage, like TiDB @@ -1387,6 +1376,8 @@ func (s *Syncer) Run(ctx context.Context) (err error) { }() go func() { + // close flush checkpoint worker after all jobs are done. + defer s.flushCpWorker.Close() <-ctx.Done() select { case <-runCtx.Done(): @@ -1426,6 +1417,13 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } } + // start flush checkpoints worker. + s.wg.Add(1) + go func() { + defer s.wg.Done() + s.flushCpWorker.Run(s.tctx) + }() + var ( flushCheckpoint bool delLoadTask bool @@ -1519,12 +1517,6 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } } - s.wg.Add(1) - go func() { - defer s.wg.Done() - s.flushCpWorker.Run(s.tctx) - }() - s.wg.Add(1) go s.syncDML() From 1d4924925653b8458c014244b2d4ef595a6ccf54 Mon Sep 17 00:00:00 2001 From: glorv Date: Mon, 25 Oct 2021 17:18:43 +0800 Subject: [PATCH 4/8] make causality compatible with async flush --- syncer/causality.go | 104 +++++++++++++++++++++++++++++++++------ syncer/causality_test.go | 6 +-- syncer/dml_worker.go | 16 +++--- syncer/job.go | 4 ++ syncer/syncer.go | 37 ++++++++++---- 5 files changed, 130 insertions(+), 37 deletions(-) diff --git a/syncer/causality.go b/syncer/causality.go index b3594b9bc..bfa4df42f 100644 --- a/syncer/causality.go +++ b/syncer/causality.go @@ -14,6 +14,7 @@ package syncer import ( + "math" "time" "go.uber.org/zap" @@ -27,7 +28,7 @@ import ( // if some conflicts exist in more than one groups, causality generate a conflict job and reset. // this mechanism meets quiescent consistency to ensure correctness. type causality struct { - relations map[string]string + relations *rollingMap outCh chan *job inCh chan *job logger log.Logger @@ -40,7 +41,7 @@ type causality struct { // causalityWrap creates and runs a causality instance. func causalityWrap(inCh chan *job, syncer *Syncer) chan *job { causality := &causality{ - relations: make(map[string]string), + relations: newRollingMap(), task: syncer.cfg.Name, source: syncer.cfg.SourceID, logger: syncer.tctx.Logger.WithFields(zap.String("component", "causality")), @@ -63,16 +64,19 @@ func (c *causality) run() { metrics.QueueSizeGauge.WithLabelValues(c.task, "causality_input", c.source).Set(float64(len(c.inCh))) startTime := time.Now() - if j.tp == flush { - c.reset() - } else { + switch j.tp { + case flush: + c.relations.rotate() + case gc: + c.relations.gc(j.seq) + default: // detectConflict before add if c.detectConflict(j.keys) { c.logger.Debug("meet causality key, will generate a conflict job to flush all sqls", zap.Strings("keys", j.keys)) c.outCh <- newConflictJob() - c.reset() + c.relations.clear() } - j.key = c.add(j.keys) + j.key = c.add(j.keys, j.seq) c.logger.Debug("key for keys", zap.String("key", j.key), zap.Strings("keys", j.keys)) } metrics.ConflictDetectDurationHistogram.WithLabelValues(c.task, c.source).Observe(time.Since(startTime).Seconds()) @@ -87,7 +91,7 @@ func (c *causality) close() { } // add adds keys relation and return the relation. The keys must `detectConflict` first to ensure correctness. -func (c *causality) add(keys []string) string { +func (c *causality) add(keys []string, version int64) string { if len(keys) == 0 { return "" } @@ -96,7 +100,7 @@ func (c *causality) add(keys []string) string { selectedRelation := keys[0] var nonExistKeys []string for _, key := range keys { - if val, ok := c.relations[key]; ok { + if val, ok := c.relations.get(key); ok { selectedRelation = val } else { nonExistKeys = append(nonExistKeys, key) @@ -104,17 +108,12 @@ func (c *causality) add(keys []string) string { } // set causal relations for those non-exist keys for _, key := range nonExistKeys { - c.relations[key] = selectedRelation + c.relations.set(key, selectedRelation, version) } return selectedRelation } -// reset resets relations. -func (c *causality) reset() { - c.relations = make(map[string]string) -} - // detectConflict detects whether there is a conflict. func (c *causality) detectConflict(keys []string) bool { if len(keys) == 0 { @@ -123,7 +122,7 @@ func (c *causality) detectConflict(keys []string) bool { var existedRelation string for _, key := range keys { - if val, ok := c.relations[key]; ok { + if val, ok := c.relations.get(key); ok { if existedRelation != "" && val != existedRelation { return true } @@ -133,3 +132,76 @@ func (c *causality) detectConflict(keys []string) bool { return false } + +type versionedMap struct { + data map[string]string + maxVer int64 +} + +// rollingMap is a map this contains multi map instances +type rollingMap struct { + maps []*versionedMap + // current map fro write + cur *versionedMap +} + +func newRollingMap() *rollingMap { + m := &rollingMap{ + maps: make([]*versionedMap, 0), + } + m.rotate() + return m +} + +func (m *rollingMap) get(key string) (string, bool) { + for i := len(m.maps)-1; i >=0; i-- { + if v, ok := m.maps[i].data[key]; ok { + return v, true + } + } + return "", false +} + +func (m *rollingMap) set(key string, val string, version int64) { + m.cur.data[key] = val + if m.cur.maxVer < version { + m.cur.maxVer = version + } +} + +func (m *rollingMap) len() int { + cnt := 0 + for _, d := range m.maps { + cnt += len(d.data) + } + return cnt +} + +func (m *rollingMap) rotate() { + if len(m.maps) == 0 || len(m.maps[len(m.maps)-1].data) > 0 { + m.maps = append(m.maps, &versionedMap{ + data: make(map[string]string), + }) + m.cur = m.maps[len(m.maps)-1] + } +} + +func (m *rollingMap) clear() { + m.gc(math.MaxInt64) +} + +func (m *rollingMap) gc(version int64) { + idx := 0 + for i, m := range m.maps { + if m.maxVer == 0 || m.maxVer > version { + idx = i + break + } + } + if idx == len(m.maps) -1 { + m.maps = m.maps[:0] + m.rotate() + } else if idx > 0 { + m.maps = m.maps[idx:] + } +} \ No newline at end of file diff --git a/syncer/causality_test.go b/syncer/causality_test.go index dc5f223e3..f92371cae 100644 --- a/syncer/causality_test.go +++ b/syncer/causality_test.go @@ -30,7 +30,7 @@ import ( func (s *testSyncerSuite) TestDetectConflict(c *C) { ca := &causality{ - relations: make(map[string]string), + relations: newRollingMap(), } caseData := []string{"test_1", "test_2", "test_3"} excepted := map[string]string{ @@ -47,8 +47,8 @@ func (s *testSyncerSuite) TestDetectConflict(c *C) { c.Assert(ca.relations, DeepEquals, excepted) conflictData := []string{"test_4", "test_3"} c.Assert(ca.detectConflict(conflictData), IsTrue) - ca.reset() - c.Assert(ca.relations, HasLen, 0) + ca.relations.clear() + c.Assert(ca.relations.len(), Equals, 0) } func (s *testSyncerSuite) TestCasuality(c *C) { diff --git a/syncer/dml_worker.go b/syncer/dml_worker.go index 31f3b6f35..67f3307ad 100644 --- a/syncer/dml_worker.go +++ b/syncer/dml_worker.go @@ -113,21 +113,19 @@ func (w *DMLWorker) run() { metrics.QueueSizeGauge.WithLabelValues(w.task, "dml_worker_input", w.source).Set(float64(len(w.inCh))) if j.tp == flush || j.tp == conflict { // todo how to track conflict job metrics - //if j.tp == conflict { - // w.addCountFunc(false, adminQueueName, j.tp, 1, j.targetTable) - //} + if j.tp == conflict { + w.addCountFunc(false, adminQueueName, j.tp, 1, j.targetTable) + } // flush for every DML queue for i, jobCh := range jobChs { startTime := time.Now() jobCh <- j metrics.AddJobDurationHistogram.WithLabelValues(j.tp.String(), w.task, queueBucketMapping[i], w.source).Observe(time.Since(startTime).Seconds()) } - //if j.tp == conflict { - // w.addCountFunc(true, adminQueueName, j.tp, 1, j.targetTable) - //} else { - // w.flushCh <- j - //} - if j.tp == flush { + if j.tp == conflict { + w.wg.Wait() + w.addCountFunc(true, adminQueueName, j.tp, 1, j.targetTable) + } else { w.flushCh <- j } } else { diff --git a/syncer/job.go b/syncer/job.go index 96c33ea6e..072325207 100644 --- a/syncer/job.go +++ b/syncer/job.go @@ -37,6 +37,7 @@ const ( skip // used by Syncer.recordSkipSQLsLocation to record global location, but not execute SQL rotate conflict + gc // used to clean up out dated causality keys ) func (t opType) String() string { @@ -59,6 +60,8 @@ func (t opType) String() string { return "rotate" case conflict: return "conflict" + case gc: + return "gc" } return "" @@ -84,6 +87,7 @@ type job struct { eventHeader *replication.EventHeader jobAddTime time.Time // job commit time + seq int64 // sequence number for this job wg *sync.WaitGroup // wait group for flush/conflict job } diff --git a/syncer/syncer.go b/syncer/syncer.go index f42aaea83..da5973ee6 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -18,6 +18,7 @@ import ( "context" "crypto/tls" "fmt" + "math" "os" "path" "reflect" @@ -212,6 +213,7 @@ type Syncer struct { } addJobFunc func(*job) error + seq int64 // `lower_case_table_names` setting of upstream db SourceTableNamesFlavor utils.LowerCaseTableNamesFlavor @@ -922,6 +924,7 @@ func (s *Syncer) addJob(job *job) error { s.tctx.L().Info("All jobs is completed before syncer close, the coming job will be reject", zap.Any("job", job)) return nil } + job.seq = s.getSeq() switch job.tp { case xid: s.waitXIDJob.CAS(int64(waiting), int64(waitComplete)) @@ -933,9 +936,9 @@ func (s *Syncer) addJob(job *job) error { case flush: s.addCount(false, adminQueueName, job.tp, 1, job.targetTable) s.jobWg.Add(1) - s.dmlJobCh <- job job.wg.Add(s.cfg.WorkerCount) - s.flushCheckPointsAsync(job.wg) + s.dmlJobCh <- job + s.flushCheckPointsAsync(job.wg, job.seq) return nil case ddl: s.addCount(false, adminQueueName, job.tp, 1, job.targetTable) @@ -968,8 +971,9 @@ func (s *Syncer) addJob(job *job) error { s.jobWg.Add(1) j := newFlushJob() j.wg.Add(s.cfg.WorkerCount) + j.seq = s.getSeq() s.dmlJobCh <- newFlushJob() - s.flushCheckPointsAsync(j.wg) + s.flushCheckPointsAsync(j.wg, j.seq) } if s.execError.Load() != nil { @@ -1050,6 +1054,8 @@ func (s *Syncer) resetShardingGroup(table *filter.Table) { type flushCpTask struct { snapshot SnapshotID wg *sync.WaitGroup + // job version, causality gc max version + version int64 // extra sharding ddl sqls exceptTables []*filter.Table shardMetaSQLs []string @@ -1061,7 +1067,7 @@ type flushCpTask struct { type checkpointFlushWorker struct { input chan *flushCpTask cp CheckPoint - afterFlushFn func(location binlog.Location) error + afterFlushFn func(task *flushCpTask, location binlog.Location) error } // Add add a new flush checkpoint job @@ -1077,6 +1083,7 @@ func (w *checkpointFlushWorker) Run(ctx *tcontext.Context) { msg.wg.Wait() } + err := w.cp.FlushSnapshotPointsExcept(ctx, msg.snapshot.id, msg.exceptTables, msg.shardMetaSQLs, msg.shardMetaArgs) if err != nil { ctx.L().Warn("flush checkpoint snapshot failed, ignore this error", zap.Any("snapshot", msg)) @@ -1113,17 +1120,18 @@ func (w *checkpointFlushWorker) Close() { // we may need to refactor the concurrency model to make the work-flow more clearer later. func (s *Syncer) flushCheckPoints() error { errCh := make(chan error, 1) - s.doFlushCheckPointsAsync(nil, errCh) + // use math.MaxInt64 to clear all the data in causality component. + s.doFlushCheckPointsAsync(nil, errCh, math.MaxInt64) return <- errCh } // flushCheckPointsAsync flushes previous saved checkpoint asyncronously after all worker finsh flush. // it it triggered by intervals (and job executed) -func (s *Syncer) flushCheckPointsAsync(wg *sync.WaitGroup) { - s.doFlushCheckPointsAsync(wg, nil) +func (s *Syncer) flushCheckPointsAsync(wg *sync.WaitGroup, version int64) { + s.doFlushCheckPointsAsync(wg, nil, version) } -func (s *Syncer) doFlushCheckPointsAsync(wg *sync.WaitGroup, outCh chan error) { +func (s *Syncer) doFlushCheckPointsAsync(wg *sync.WaitGroup, outCh chan error, version int64) { err := s.execError.Load() // TODO: for now, if any error occurred (including user canceled), checkpoint won't be updated. But if we have put // optimistic shard info, DM-master may resolved the optimistic lock and let other worker execute DDL. So after this @@ -1165,11 +1173,16 @@ func (s *Syncer) doFlushCheckPointsAsync(wg *sync.WaitGroup, outCh chan error) { shardMetaSQLs: shardMetaSQLs, shardMetaArgs: shardMetaArgs, resChan: outCh, + version: version, } s.flushCpWorker.Add(task) } -func (s *Syncer) afterFlushCheckpoint(loc binlog.Location) error { +func (s *Syncer) afterFlushCheckpoint(task *flushCpTask, loc binlog.Location) error { + s.dmlJobCh <- &job{ + tp: gc, + seq: task.version, + } s.addCount(true, adminQueueName, flush, 1, nil) // update current active relay log after checkpoint flushed err := s.updateActiveRelayLog(loc.Position) @@ -2046,6 +2059,12 @@ func (s *Syncer) handleRotateEvent(ev *replication.RotateEvent, ec eventContext) return nil } +func (s *Syncer) getSeq() int64 { + seq := s.seq + s.seq++ + return seq +} + func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) error { sourceTable := &filter.Table{ Schema: string(ev.Table.Schema), From b759d6632c410f80f54276cbb4f33c5c27ce28d2 Mon Sep 17 00:00:00 2001 From: glorv Date: Mon, 25 Oct 2021 17:21:50 +0800 Subject: [PATCH 5/8] fmt code --- syncer/causality.go | 8 ++++---- syncer/checkpoint.go | 30 +++++++++++++++--------------- syncer/job.go | 8 ++++---- syncer/syncer.go | 41 ++++++++++++++++++++--------------------- 4 files changed, 43 insertions(+), 44 deletions(-) diff --git a/syncer/causality.go b/syncer/causality.go index bfa4df42f..276f401f6 100644 --- a/syncer/causality.go +++ b/syncer/causality.go @@ -134,7 +134,7 @@ func (c *causality) detectConflict(keys []string) bool { } type versionedMap struct { - data map[string]string + data map[string]string maxVer int64 } @@ -154,7 +154,7 @@ func newRollingMap() *rollingMap { } func (m *rollingMap) get(key string) (string, bool) { - for i := len(m.maps)-1; i >=0; i-- { + for i := len(m.maps) - 1; i >= 0; i-- { if v, ok := m.maps[i].data[key]; ok { return v, true } @@ -198,10 +198,10 @@ func (m *rollingMap) gc(version int64) { break } } - if idx == len(m.maps) -1 { + if idx == len(m.maps)-1 { m.maps = m.maps[:0] m.rotate() } else if idx > 0 { m.maps = m.maps[idx:] } -} \ No newline at end of file +} diff --git a/syncer/checkpoint.go b/syncer/checkpoint.go index 35e613c79..7115d0b38 100644 --- a/syncer/checkpoint.go +++ b/syncer/checkpoint.go @@ -81,13 +81,13 @@ func newBinlogPoint(location, flushedLocation binlog.Location, ti, flushedTI *mo return &binlogPoint{ location: tablePoint{ location: location, - ti: ti, + ti: ti, }, flushedLocation: tablePoint{ location: flushedLocation, ti: flushedTI, }, - enableGTID: enableGTID, + enableGTID: enableGTID, } } @@ -291,11 +291,11 @@ type CheckPoint interface { } type removeCheckpointSnapshot struct { - id int - globalPoint *tablePoint - globalPointSaveTime time.Time + id int + globalPoint *tablePoint + globalPointSaveTime time.Time needFlushSafeModeExitPoint bool - points map[string]map[string]tablePoint + points map[string]map[string]tablePoint } // RemoteCheckPoint implements CheckPoint @@ -337,7 +337,7 @@ type RemoteCheckPoint struct { logCtx *tcontext.Context // these fields are used for async flush checkpoint - snapshots []*removeCheckpointSnapshot + snapshots []*removeCheckpointSnapshot snapshotSeq int } @@ -356,10 +356,10 @@ func NewRemoteCheckPoint(tctx *tcontext.Context, cfg *config.SubTaskConfig, id s } // Snapshot make a snapshot of checkpoint and return the snapshot id -func (cp *RemoteCheckPoint) Snapshot() SnapshotID { +func (cp *RemoteCheckPoint) Snapshot() SnapshotID { cp.RLock() defer cp.RUnlock() - //make snapshot is visit in single thread, so depend on rlock should be enough + // make snapshot is visit in single thread, so depend on rlock should be enough cp.snapshotSeq++ id := cp.snapshotSeq @@ -385,10 +385,10 @@ func (cp *RemoteCheckPoint) Snapshot() SnapshotID { } snapshot := &removeCheckpointSnapshot{ - id: id, - globalPointSaveTime: cp.globalPointSaveTime, + id: id, + globalPointSaveTime: cp.globalPointSaveTime, needFlushSafeModeExitPoint: cp.needFlushSafeModeExitPoint, - points: tableCheckPoints, + points: tableCheckPoints, } if cp.globalPoint != nil { snapshot.globalPoint = &cp.globalPoint.location @@ -396,7 +396,7 @@ func (cp *RemoteCheckPoint) Snapshot() SnapshotID { cp.snapshots = append(cp.snapshots, snapshot) return SnapshotID{ - id: id, + id: id, pos: cp.globalPoint.location.location, } } @@ -702,7 +702,7 @@ func (cp *RemoteCheckPoint) FlushSnapshotPointsExcept( } type binlogPointSp struct { - pos *binlogPoint + pos *binlogPoint spLoc tablePoint } @@ -728,7 +728,7 @@ func (cp *RemoteCheckPoint) FlushSnapshotPointsExcept( args = append(args, arg) points = append(points, &binlogPointSp{ - pos: tableCP, + pos: tableCP, spLoc: point, }) } diff --git a/syncer/job.go b/syncer/job.go index 072325207..75cdcaef6 100644 --- a/syncer/job.go +++ b/syncer/job.go @@ -37,7 +37,7 @@ const ( skip // used by Syncer.recordSkipSQLsLocation to record global location, but not execute SQL rotate conflict - gc // used to clean up out dated causality keys + gc // used to clean up out dated causality keys ) func (t opType) String() string { @@ -86,8 +86,8 @@ type job struct { originSQL string // show origin sql when error, only DDL now eventHeader *replication.EventHeader - jobAddTime time.Time // job commit time - seq int64 // sequence number for this job + jobAddTime time.Time // job commit time + seq int64 // sequence number for this job wg *sync.WaitGroup // wait group for flush/conflict job } @@ -175,7 +175,7 @@ func newFlushJob() *job { tp: flush, targetTable: &filter.Table{}, jobAddTime: time.Now(), - wg: &sync.WaitGroup{}, + wg: &sync.WaitGroup{}, } } diff --git a/syncer/syncer.go b/syncer/syncer.go index da5973ee6..d09e4bad6 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -184,9 +184,9 @@ type Syncer struct { done chan struct{} - checkpoint CheckPoint + checkpoint CheckPoint flushCpWorker checkpointFlushWorker - onlineDDL onlineddl.OnlinePlugin + onlineDDL onlineddl.OnlinePlugin // record process error rather than log.Fatal runFatalChan chan *pb.ProcessError @@ -402,8 +402,8 @@ func (s *Syncer) Init(ctx context.Context) (err error) { } } s.flushCpWorker = checkpointFlushWorker{ - input: make(chan *flushCpTask, 16), - cp: s.checkpoint, + input: make(chan *flushCpTask, 16), + cp: s.checkpoint, afterFlushFn: s.afterFlushCheckpoint, } @@ -1053,11 +1053,11 @@ func (s *Syncer) resetShardingGroup(table *filter.Table) { type flushCpTask struct { snapshot SnapshotID - wg *sync.WaitGroup + wg *sync.WaitGroup // job version, causality gc max version version int64 // extra sharding ddl sqls - exceptTables []*filter.Table + exceptTables []*filter.Table shardMetaSQLs []string shardMetaArgs [][]interface{} // result chan @@ -1065,9 +1065,9 @@ type flushCpTask struct { } type checkpointFlushWorker struct { - input chan *flushCpTask - cp CheckPoint - afterFlushFn func(task *flushCpTask, location binlog.Location) error + input chan *flushCpTask + cp CheckPoint + afterFlushFn func(task *flushCpTask) error } // Add add a new flush checkpoint job @@ -1083,7 +1083,6 @@ func (w *checkpointFlushWorker) Run(ctx *tcontext.Context) { msg.wg.Wait() } - err := w.cp.FlushSnapshotPointsExcept(ctx, msg.snapshot.id, msg.exceptTables, msg.shardMetaSQLs, msg.shardMetaArgs) if err != nil { ctx.L().Warn("flush checkpoint snapshot failed, ignore this error", zap.Any("snapshot", msg)) @@ -1094,7 +1093,7 @@ func (w *checkpointFlushWorker) Run(ctx *tcontext.Context) { } ctx.L().Info("flushed checkpoint", zap.Int("snapshot_id", msg.snapshot.id), zap.Stringer("pos", msg.snapshot.pos)) - if err = w.afterFlushFn(msg.snapshot.pos); err != nil { + if err = w.afterFlushFn(msg); err != nil { if msg.resChan != nil { msg.resChan <- err } @@ -1122,7 +1121,7 @@ func (s *Syncer) flushCheckPoints() error { errCh := make(chan error, 1) // use math.MaxInt64 to clear all the data in causality component. s.doFlushCheckPointsAsync(nil, errCh, math.MaxInt64) - return <- errCh + return <-errCh } // flushCheckPointsAsync flushes previous saved checkpoint asyncronously after all worker finsh flush. @@ -1162,30 +1161,30 @@ func (s *Syncer) doFlushCheckPointsAsync(wg *sync.WaitGroup, outCh chan error, v } snapshotID := s.checkpoint.Snapshot() - if snapshotID.id == 0 { + if snapshotID.id == 0 { log.L().Info("checkpoint has no change, skip save checkpoint") return } task := &flushCpTask{ - snapshot: snapshotID, - wg: wg, - exceptTables: exceptTables, + snapshot: snapshotID, + wg: wg, + exceptTables: exceptTables, shardMetaSQLs: shardMetaSQLs, shardMetaArgs: shardMetaArgs, - resChan: outCh, - version: version, + resChan: outCh, + version: version, } s.flushCpWorker.Add(task) } -func (s *Syncer) afterFlushCheckpoint(task *flushCpTask, loc binlog.Location) error { +func (s *Syncer) afterFlushCheckpoint(task *flushCpTask) error { s.dmlJobCh <- &job{ - tp: gc, + tp: gc, seq: task.version, } s.addCount(true, adminQueueName, flush, 1, nil) // update current active relay log after checkpoint flushed - err := s.updateActiveRelayLog(loc.Position) + err := s.updateActiveRelayLog(task.snapshot.pos.Position) if err != nil { return err } From 5616b2b5cf72b7abdd2ed801d3a2d8220eb4a48b Mon Sep 17 00:00:00 2001 From: glorv Date: Tue, 26 Oct 2021 10:54:42 +0800 Subject: [PATCH 6/8] fix --- syncer/causality.go | 7 +++++-- syncer/checkpoint.go | 6 +++--- syncer/syncer.go | 2 +- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/syncer/causality.go b/syncer/causality.go index e532452c5..81ca99bc4 100644 --- a/syncer/causality.go +++ b/syncer/causality.go @@ -193,8 +193,11 @@ func (m *rollingMap) clear() { func (m *rollingMap) gc(version int64) { idx := 0 - for i, m := range m.maps { - if m.maxVer == 0 || m.maxVer > version { + for i, d := range m.maps { + if d.maxVer > 0 && d.maxVer <= version { + // set nil value to trigger go gc + m.maps[i] = nil + } else { idx = i break } diff --git a/syncer/checkpoint.go b/syncer/checkpoint.go index ccc20349c..05021f47e 100644 --- a/syncer/checkpoint.go +++ b/syncer/checkpoint.go @@ -290,7 +290,7 @@ type CheckPoint interface { CheckAndUpdate(ctx context.Context, schemas map[string]string, tables map[string]map[string]string) error } -type removeCheckpointSnapshot struct { +type remoteCheckpointSnapshot struct { id int globalPoint *tablePoint globalPointSaveTime time.Time @@ -337,7 +337,7 @@ type RemoteCheckPoint struct { logCtx *tcontext.Context // these fields are used for async flush checkpoint - snapshots []*removeCheckpointSnapshot + snapshots []*remoteCheckpointSnapshot snapshotSeq int } @@ -384,7 +384,7 @@ func (cp *RemoteCheckPoint) Snapshot() SnapshotID { } } - snapshot := &removeCheckpointSnapshot{ + snapshot := &remoteCheckpointSnapshot{ id: id, globalPointSaveTime: cp.globalPointSaveTime, needFlushSafeModeExitPoint: cp.needFlushSafeModeExitPoint, diff --git a/syncer/syncer.go b/syncer/syncer.go index 568f3d366..246435bf0 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -972,7 +972,7 @@ func (s *Syncer) addJob(job *job) error { j := newFlushJob() j.wg.Add(s.cfg.WorkerCount) j.seq = s.getSeq() - s.dmlJobCh <- newFlushJob() + s.dmlJobCh <- j s.flushCheckPointsAsync(j.wg, j.seq) } From 4c90b4bb83f6781cfd8139fef6cf1afb9c9aad42 Mon Sep 17 00:00:00 2001 From: glorv Date: Tue, 26 Oct 2021 11:47:38 +0800 Subject: [PATCH 7/8] fix sync flush and check global checkpoint time --- syncer/causality_test.go | 4 ++-- syncer/checkpoint.go | 18 ++++++++++-------- syncer/checkpoint_test.go | 4 ++-- syncer/job.go | 3 ++- syncer/syncer.go | 21 ++++++++++++++------- 5 files changed, 30 insertions(+), 20 deletions(-) diff --git a/syncer/causality_test.go b/syncer/causality_test.go index 17abd42c0..78d95abd7 100644 --- a/syncer/causality_test.go +++ b/syncer/causality_test.go @@ -39,10 +39,10 @@ func (s *testSyncerSuite) TestDetectConflict(c *C) { "test_3": "test_1", } c.Assert(ca.detectConflict(caseData), IsFalse) - ca.add(caseData) + ca.add(caseData, 1) c.Assert(ca.relations, DeepEquals, excepted) c.Assert(ca.detectConflict([]string{"test_4"}), IsFalse) - ca.add([]string{"test_4"}) + ca.add([]string{"test_4"}, 2) excepted["test_4"] = "test_4" c.Assert(ca.relations, DeepEquals, excepted) conflictData := []string{"test_4", "test_3"} diff --git a/syncer/checkpoint.go b/syncer/checkpoint.go index 05021f47e..59d560b53 100644 --- a/syncer/checkpoint.go +++ b/syncer/checkpoint.go @@ -322,8 +322,8 @@ type RemoteCheckPoint struct { // this global checkpoint is min(next-binlog-pos, min(all-syncing-sharding-group-first-pos)) // else // this global checkpoint is next-binlog-pos - globalPoint *binlogPoint - globalPointSaveTime time.Time + globalPoint *binlogPoint + globalPointCheckOrSaveTime time.Time // safeModeExitPoint is set in RemoteCheckPoint.Load (from downstream DB) and LoadMeta (from metadata file). // it is unset (set nil) in RemoteCheckPoint.Clear, and when syncer's stream pass its location. @@ -386,7 +386,7 @@ func (cp *RemoteCheckPoint) Snapshot() SnapshotID { snapshot := &remoteCheckpointSnapshot{ id: id, - globalPointSaveTime: cp.globalPointSaveTime, + globalPointSaveTime: cp.globalPointCheckOrSaveTime, needFlushSafeModeExitPoint: cp.needFlushSafeModeExitPoint, points: tableCheckPoints, } @@ -444,7 +444,7 @@ func (cp *RemoteCheckPoint) Clear(tctx *tcontext.Context) error { } cp.globalPoint = newBinlogPoint(binlog.NewLocation(cp.cfg.Flavor), binlog.NewLocation(cp.cfg.Flavor), nil, nil, cp.cfg.EnableGTID) - cp.globalPointSaveTime = time.Time{} + cp.globalPointCheckOrSaveTime = time.Time{} cp.points = make(map[string]map[string]*binlogPoint) cp.safeModeExitPoint = nil @@ -609,7 +609,7 @@ func (cp *RemoteCheckPoint) FlushPointsExcept(tctx *tcontext.Context, exceptTabl sqls := make([]string, 0, 100) args := make([][]interface{}, 0, 100) - if cp.globalPoint.outOfDate() || cp.globalPointSaveTime.IsZero() || cp.needFlushSafeModeExitPoint { + if cp.globalPoint.outOfDate() || cp.globalPointCheckOrSaveTime.IsZero() || cp.needFlushSafeModeExitPoint { locationG := cp.GlobalPoint() sqlG, argG := cp.genUpdateSQL(globalCpSchema, globalCpTable, locationG, cp.safeModeExitPoint, nil, true) sqls = append(sqls, sqlG) @@ -658,7 +658,7 @@ func (cp *RemoteCheckPoint) FlushPointsExcept(tctx *tcontext.Context, exceptTabl point.flush() } - cp.globalPointSaveTime = time.Now() + cp.globalPointCheckOrSaveTime = time.Now() cp.needFlushSafeModeExitPoint = false return nil } @@ -752,7 +752,7 @@ func (cp *RemoteCheckPoint) FlushSnapshotPointsExcept( point.pos.flushBy(point.spLoc) } - cp.globalPointSaveTime = time.Now() + cp.globalPointCheckOrSaveTime = time.Now() cp.needFlushSafeModeExitPoint = false return nil } @@ -866,7 +866,9 @@ func (cp *RemoteCheckPoint) String() string { func (cp *RemoteCheckPoint) CheckGlobalPoint() bool { cp.RLock() defer cp.RUnlock() - return time.Since(cp.globalPointSaveTime) >= time.Duration(cp.cfg.CheckpointFlushInterval)*time.Second + t := cp.globalPointCheckOrSaveTime + cp.globalPointCheckOrSaveTime = time.Now() + return time.Since(t) >= time.Duration(cp.cfg.CheckpointFlushInterval)*time.Second } // Rollback implements CheckPoint.Rollback. diff --git a/syncer/checkpoint_test.go b/syncer/checkpoint_test.go index 2f8c30b0f..8abc4d004 100644 --- a/syncer/checkpoint_test.go +++ b/syncer/checkpoint_test.go @@ -271,7 +271,7 @@ func (s *testCheckpointSuite) testGlobalCheckPoint(c *C, cp CheckPoint) { s.cfg.Dir = dir c.Assert(cp.LoadMeta(), IsNil) - // should flush because globalPointSaveTime is zero + // should flush because globalPointCheckOrSaveTime is zero s.mock.ExpectBegin() s.mock.ExpectExec("(202)?"+flushCheckPointSQL).WithArgs(cpid, "", "", pos1.Name, pos1.Pos, "", "", 0, "", "null", true).WillReturnResult(sqlmock.NewResult(0, 1)) s.mock.ExpectCommit() @@ -310,7 +310,7 @@ SHOW MASTER STATUS: /* AFTER CONNECTION POOL ESTABLISHED */ c.Assert(err, IsNil) c.Assert(cp.LoadMeta(), IsNil) - // should flush because globalPointSaveTime is zero + // should flush because globalPointCheckOrSaveTime is zero s.mock.ExpectBegin() s.mock.ExpectExec("(202)?"+flushCheckPointSQL).WithArgs(cpid, "", "", pos1.Name, pos1.Pos, "", pos2.Name, pos2.Pos, "", "null", true).WillReturnResult(sqlmock.NewResult(0, 1)) s.mock.ExpectCommit() diff --git a/syncer/job.go b/syncer/job.go index 664b7c362..97bb27a39 100644 --- a/syncer/job.go +++ b/syncer/job.go @@ -85,7 +85,8 @@ type job struct { eventHeader *replication.EventHeader jobAddTime time.Time // job commit time seq int64 // sequence number for this job - wg *sync.WaitGroup // wait group for flush/conflict job + sync bool // whether the flush job is sync or async + wg *sync.WaitGroup // wait group for flush job } func (j *job) String() string { diff --git a/syncer/syncer.go b/syncer/syncer.go index 246435bf0..b93359f5c 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -938,8 +938,14 @@ func (s *Syncer) addJob(job *job) error { s.jobWg.Add(1) job.wg.Add(s.cfg.WorkerCount) s.dmlJobCh <- job - s.flushCheckPointsAsync(job.wg, job.seq) - return nil + var err error + if job.sync { + err = s.flushCheckPoints() + } else { + s.flushCheckPointsAsync(job.wg, job.seq) + } + + return err case ddl: s.addCount(false, adminQueueName, job.tp, 1, job.targetTable) s.updateReplicationJobTS(job, ddlJobIdx) @@ -1589,7 +1595,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { s.checkpoint.SaveSafeModeExitPoint(&exitSafeModeLoc) // flush all jobs before exit - if err2 = s.flushJobs(); err2 != nil { + if err2 = s.flushJobs(true); err2 != nil { tctx.L().Warn("failed to flush jobs when exit task", zap.Error(err2)) } @@ -1844,7 +1850,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { currentLocation = startLocation } else if op == pb.ErrorOp_Skip { s.saveGlobalPoint(currentLocation) - err = s.flushJobs() + err = s.flushJobs(true) if err != nil { tctx.L().Warn("failed to flush jobs when handle-error skip", zap.Error(err)) } else { @@ -1959,7 +1965,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { // flush checkpoint even if there are no real binlog events if s.checkpoint.CheckGlobalPoint() { tctx.L().Info("meet heartbeat event and then flush jobs") - err2 = s.flushJobs() + err2 = s.flushJobs(false) } } } @@ -2470,7 +2476,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext, o // flush previous DMLs and checkpoint if needing to handle the DDL. // NOTE: do this flush before operations on shard groups which may lead to skip a table caused by `UnresolvedTables`. - if err = s.flushJobs(); err != nil { + if err = s.flushJobs(true); err != nil { return err } @@ -3035,9 +3041,10 @@ func (s *Syncer) recordSkipSQLsLocation(ec *eventContext) error { } // flushJobs add a flush job and wait for all jobs finished. -func (s *Syncer) flushJobs() error { +func (s *Syncer) flushJobs(sync bool) error { s.tctx.L().Info("flush all jobs", zap.Stringer("global checkpoint", s.checkpoint)) job := newFlushJob() + job.sync = sync return s.addJobFunc(job) } From 888d9b54fffca27671394e748f296917e38a8ab2 Mon Sep 17 00:00:00 2001 From: glorv Date: Tue, 26 Oct 2021 13:41:48 +0800 Subject: [PATCH 8/8] fix flush checkpoint sync --- syncer/checkpoint.go | 6 +++++- syncer/syncer.go | 7 ++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/syncer/checkpoint.go b/syncer/checkpoint.go index 59d560b53..868a69546 100644 --- a/syncer/checkpoint.go +++ b/syncer/checkpoint.go @@ -391,7 +391,11 @@ func (cp *RemoteCheckPoint) Snapshot() SnapshotID { points: tableCheckPoints, } if cp.globalPoint != nil { - snapshot.globalPoint = &cp.globalPoint.location + globalLocation := &tablePoint{ + location: cp.globalPoint.location.location.Clone(), + ti: cp.globalPoint.location.ti, + } + snapshot.globalPoint = globalLocation } cp.snapshots = append(cp.snapshots, snapshot) diff --git a/syncer/syncer.go b/syncer/syncer.go index b93359f5c..ceae53619 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -1090,11 +1090,12 @@ func (w *checkpointFlushWorker) Run(ctx *tcontext.Context) { } err := w.cp.FlushSnapshotPointsExcept(ctx, msg.snapshot.id, msg.exceptTables, msg.shardMetaSQLs, msg.shardMetaArgs) + if msg.resChan != nil { + msg.resChan <- err + } if err != nil { ctx.L().Warn("flush checkpoint snapshot failed, ignore this error", zap.Any("snapshot", msg)) - if msg.resChan != nil { - msg.resChan <- err - } + continue } ctx.L().Info("flushed checkpoint", zap.Int("snapshot_id", msg.snapshot.id),