diff --git a/syncer/causality.go b/syncer/causality.go index ae28b1ff3..81ca99bc4 100644 --- a/syncer/causality.go +++ b/syncer/causality.go @@ -14,6 +14,7 @@ package syncer import ( + "math" "time" "go.uber.org/zap" @@ -27,7 +28,7 @@ import ( // if some conflicts exist in more than one groups, causality generate a conflict job and reset. // this mechanism meets quiescent consistency to ensure correctness. type causality struct { - relations map[string]string + relations *rollingMap outCh chan *job inCh chan *job logger log.Logger @@ -40,7 +41,7 @@ type causality struct { // causalityWrap creates and runs a causality instance. func causalityWrap(inCh chan *job, syncer *Syncer) chan *job { causality := &causality{ - relations: make(map[string]string), + relations: newRollingMap(), task: syncer.cfg.Name, source: syncer.cfg.SourceID, logger: syncer.tctx.Logger.WithFields(zap.String("component", "causality")), @@ -63,17 +64,20 @@ func (c *causality) run() { metrics.QueueSizeGauge.WithLabelValues(c.task, "causality_input", c.source).Set(float64(len(c.inCh))) startTime := time.Now() - if j.tp == flush { - c.reset() - } else { + switch j.tp { + case flush: + c.relations.rotate() + case gc: + c.relations.gc(j.seq) + default: keys := j.dml.identifyKeys() // detectConflict before add if c.detectConflict(keys) { c.logger.Debug("meet causality key, will generate a conflict job to flush all sqls", zap.Strings("keys", keys)) c.outCh <- newConflictJob() - c.reset() + c.relations.clear() } - j.dml.key = c.add(keys) + j.dml.key = c.add(keys, j.seq) c.logger.Debug("key for keys", zap.String("key", j.dml.key), zap.Strings("keys", keys)) } metrics.ConflictDetectDurationHistogram.WithLabelValues(c.task, c.source).Observe(time.Since(startTime).Seconds()) @@ -88,7 +92,7 @@ func (c *causality) close() { } // add adds keys relation and return the relation. The keys must `detectConflict` first to ensure correctness. -func (c *causality) add(keys []string) string { +func (c *causality) add(keys []string, version int64) string { if len(keys) == 0 { return "" } @@ -97,7 +101,7 @@ func (c *causality) add(keys []string) string { selectedRelation := keys[0] var nonExistKeys []string for _, key := range keys { - if val, ok := c.relations[key]; ok { + if val, ok := c.relations.get(key); ok { selectedRelation = val } else { nonExistKeys = append(nonExistKeys, key) @@ -105,17 +109,12 @@ func (c *causality) add(keys []string) string { } // set causal relations for those non-exist keys for _, key := range nonExistKeys { - c.relations[key] = selectedRelation + c.relations.set(key, selectedRelation, version) } return selectedRelation } -// reset resets relations. -func (c *causality) reset() { - c.relations = make(map[string]string) -} - // detectConflict detects whether there is a conflict. func (c *causality) detectConflict(keys []string) bool { if len(keys) == 0 { @@ -124,7 +123,7 @@ func (c *causality) detectConflict(keys []string) bool { var existedRelation string for _, key := range keys { - if val, ok := c.relations[key]; ok { + if val, ok := c.relations.get(key); ok { if existedRelation != "" && val != existedRelation { return true } @@ -134,3 +133,79 @@ func (c *causality) detectConflict(keys []string) bool { return false } + +type versionedMap struct { + data map[string]string + maxVer int64 +} + +// rollingMap is a map this contains multi map instances +type rollingMap struct { + maps []*versionedMap + // current map fro write + cur *versionedMap +} + +func newRollingMap() *rollingMap { + m := &rollingMap{ + maps: make([]*versionedMap, 0), + } + m.rotate() + return m +} + +func (m *rollingMap) get(key string) (string, bool) { + for i := len(m.maps) - 1; i >= 0; i-- { + if v, ok := m.maps[i].data[key]; ok { + return v, true + } + } + return "", false +} + +func (m *rollingMap) set(key string, val string, version int64) { + m.cur.data[key] = val + if m.cur.maxVer < version { + m.cur.maxVer = version + } +} + +func (m *rollingMap) len() int { + cnt := 0 + for _, d := range m.maps { + cnt += len(d.data) + } + return cnt +} + +func (m *rollingMap) rotate() { + if len(m.maps) == 0 || len(m.maps[len(m.maps)-1].data) > 0 { + m.maps = append(m.maps, &versionedMap{ + data: make(map[string]string), + }) + m.cur = m.maps[len(m.maps)-1] + } +} + +func (m *rollingMap) clear() { + m.gc(math.MaxInt64) +} + +func (m *rollingMap) gc(version int64) { + idx := 0 + for i, d := range m.maps { + if d.maxVer > 0 && d.maxVer <= version { + // set nil value to trigger go gc + m.maps[i] = nil + } else { + idx = i + break + } + } + if idx == len(m.maps)-1 { + m.maps = m.maps[:0] + m.rotate() + } else if idx > 0 { + m.maps = m.maps[idx:] + } +} diff --git a/syncer/causality_test.go b/syncer/causality_test.go index 2e4988fb4..78d95abd7 100644 --- a/syncer/causality_test.go +++ b/syncer/causality_test.go @@ -30,7 +30,7 @@ import ( func (s *testSyncerSuite) TestDetectConflict(c *C) { ca := &causality{ - relations: make(map[string]string), + relations: newRollingMap(), } caseData := []string{"test_1", "test_2", "test_3"} excepted := map[string]string{ @@ -39,16 +39,16 @@ func (s *testSyncerSuite) TestDetectConflict(c *C) { "test_3": "test_1", } c.Assert(ca.detectConflict(caseData), IsFalse) - ca.add(caseData) + ca.add(caseData, 1) c.Assert(ca.relations, DeepEquals, excepted) c.Assert(ca.detectConflict([]string{"test_4"}), IsFalse) - ca.add([]string{"test_4"}) + ca.add([]string{"test_4"}, 2) excepted["test_4"] = "test_4" c.Assert(ca.relations, DeepEquals, excepted) conflictData := []string{"test_4", "test_3"} c.Assert(ca.detectConflict(conflictData), IsTrue) - ca.reset() - c.Assert(ca.relations, HasLen, 0) + ca.relations.clear() + c.Assert(ca.relations.len(), Equals, 0) } func (s *testSyncerSuite) TestCasuality(c *C) { diff --git a/syncer/checkpoint.go b/syncer/checkpoint.go index a57d27722..868a69546 100644 --- a/syncer/checkpoint.go +++ b/syncer/checkpoint.go @@ -61,25 +61,33 @@ var ( maxCheckPointTimeout = "1m" ) +type tablePoint struct { + location binlog.Location + ti *model.TableInfo +} + type binlogPoint struct { sync.RWMutex - location binlog.Location - ti *model.TableInfo + location tablePoint - flushedLocation binlog.Location // location which flushed permanently - flushedTI *model.TableInfo + // flushedLocation.location which flushed permanently + flushedLocation tablePoint enableGTID bool } func newBinlogPoint(location, flushedLocation binlog.Location, ti, flushedTI *model.TableInfo, enableGTID bool) *binlogPoint { return &binlogPoint{ - location: location, - ti: ti, - flushedLocation: flushedLocation, - flushedTI: flushedTI, - enableGTID: enableGTID, + location: tablePoint{ + location: location, + ti: ti, + }, + flushedLocation: tablePoint{ + location: flushedLocation, + ti: flushedTI, + }, + enableGTID: enableGTID, } } @@ -87,21 +95,25 @@ func (b *binlogPoint) save(location binlog.Location, ti *model.TableInfo) error b.Lock() defer b.Unlock() - if binlog.CompareLocation(location, b.location, b.enableGTID) < 0 { + if binlog.CompareLocation(location, b.location.location, b.enableGTID) < 0 { // support to save equal location, but not older location - return terror.ErrCheckpointSaveInvalidPos.Generate(location, b.location) + return terror.ErrCheckpointSaveInvalidPos.Generate(location, b.location.location) } - b.location = location - b.ti = ti + b.location.location = location + b.location.ti = ti return nil } func (b *binlogPoint) flush() { + b.flushBy(b.location) +} + +func (b *binlogPoint) flushBy(sp tablePoint) { b.Lock() defer b.Unlock() - b.flushedLocation = b.location - b.flushedTI = b.ti + b.flushedLocation.location = sp.location + b.flushedLocation.ti = sp.ti } func (b *binlogPoint) rollback(schemaTracker *schema.Tracker, schema string) (isSchemaChanged bool) { @@ -109,15 +121,15 @@ func (b *binlogPoint) rollback(schemaTracker *schema.Tracker, schema string) (is defer b.Unlock() // set suffix to 0 when we meet error - b.flushedLocation.ResetSuffix() - b.location = b.flushedLocation - if b.ti == nil { + b.flushedLocation.location.ResetSuffix() + b.location.location = b.flushedLocation.location + if b.location.ti == nil { return // for global checkpoint, no need to rollback the schema. } // NOTE: no `Equal` function for `model.TableInfo` exists now, so we compare `pointer` directly, // and after a new DDL applied to the schema, the returned pointer of `model.TableInfo` changed now. - trackedTi, _ := schemaTracker.GetTableInfo(&filter.Table{Schema: schema, Name: b.ti.Name.O}) // ignore the returned error, only compare `trackerTi` is enough. + trackedTi, _ := schemaTracker.GetTableInfo(&filter.Table{Schema: schema, Name: b.location.ti.Name.O}) // ignore the returned error, only compare `trackerTi` is enough. // may three versions of schema exist: // - the one tracked in the TiDB-with-mockTiKV. // - the one in the checkpoint but not flushed. @@ -125,8 +137,8 @@ func (b *binlogPoint) rollback(schemaTracker *schema.Tracker, schema string) (is // if any of them are not equal, then we rollback them: // - set the one in the checkpoint but not flushed to the one flushed. // - set the one tracked to the one in the checkpoint by the caller of this method (both flushed and not flushed are the same now) - if isSchemaChanged = (trackedTi != b.ti) || (b.ti != b.flushedTI); isSchemaChanged { - b.ti = b.flushedTI + if isSchemaChanged = (trackedTi != b.location.ti) || (b.location.ti != b.flushedLocation.ti); isSchemaChanged { + b.location.ti = b.flushedLocation.ti } return } @@ -135,35 +147,50 @@ func (b *binlogPoint) outOfDate() bool { b.RLock() defer b.RUnlock() - return binlog.CompareLocation(b.location, b.flushedLocation, b.enableGTID) > 0 + return binlog.CompareLocation(b.location.location, b.flushedLocation.location, b.enableGTID) > 0 +} + +func (b *binlogPoint) outOfDateBy(pos binlog.Location) bool { + b.RLock() + defer b.RUnlock() + + return binlog.CompareLocation(pos, b.flushedLocation.location, b.enableGTID) > 0 } // MySQLLocation returns point as binlog.Location. func (b *binlogPoint) MySQLLocation() binlog.Location { b.RLock() defer b.RUnlock() - return b.location + return b.location.location } // FlushedMySQLLocation returns flushed point as binlog.Location. func (b *binlogPoint) FlushedMySQLLocation() binlog.Location { b.RLock() defer b.RUnlock() - return b.flushedLocation + return b.flushedLocation.location } // TableInfo returns the table schema associated at the current binlog position. func (b *binlogPoint) TableInfo() *model.TableInfo { b.RLock() defer b.RUnlock() - return b.ti + return b.location.ti } func (b *binlogPoint) String() string { b.RLock() defer b.RUnlock() - return fmt.Sprintf("%v(flushed %v)", b.location, b.flushedLocation) + return fmt.Sprintf("%v(flushed %v)", b.location.location, b.flushedLocation.location) +} + +// +type SnapshotID struct { + // the corresponding snapshot id + id int + // global checkpoint position. + pos binlog.Location } // CheckPoint represents checkpoints status for syncer @@ -207,6 +234,9 @@ type CheckPoint interface { // corresponding to Meta.Save SaveGlobalPoint(point binlog.Location) + // Snapshot make a snapshot if current checkpoint and return its id + Snapshot() SnapshotID + // FlushGlobalPointsExcept flushes the global checkpoint and tables' // checkpoints except exceptTables, it also flushes SQLs with Args providing // by extraSQLs and extraArgs. Currently extraSQLs contain shard meta only. @@ -214,6 +244,10 @@ type CheckPoint interface { // corresponding to Meta.Flush FlushPointsExcept(tctx *tcontext.Context, exceptTables []*filter.Table, extraSQLs []string, extraArgs [][]interface{}) error + // FlushGlobalPointsExcept flushes the global checkpoint and tables' + // checkpoints except exceptTables with specified snapshot. + FlushSnapshotPointsExcept(tctx *tcontext.Context, snapshot int, exceptTables []*filter.Table, extraSQLs []string, extraArgs [][]interface{}) error + // FlushPointWithTableInfo flushed the table point with given table info FlushPointWithTableInfo(tctx *tcontext.Context, table *filter.Table, ti *model.TableInfo) error @@ -256,6 +290,14 @@ type CheckPoint interface { CheckAndUpdate(ctx context.Context, schemas map[string]string, tables map[string]map[string]string) error } +type remoteCheckpointSnapshot struct { + id int + globalPoint *tablePoint + globalPointSaveTime time.Time + needFlushSafeModeExitPoint bool + points map[string]map[string]tablePoint +} + // RemoteCheckPoint implements CheckPoint // which using target database to store info // NOTE: now we sync from relay log, so not add GTID support yet @@ -280,8 +322,8 @@ type RemoteCheckPoint struct { // this global checkpoint is min(next-binlog-pos, min(all-syncing-sharding-group-first-pos)) // else // this global checkpoint is next-binlog-pos - globalPoint *binlogPoint - globalPointSaveTime time.Time + globalPoint *binlogPoint + globalPointCheckOrSaveTime time.Time // safeModeExitPoint is set in RemoteCheckPoint.Load (from downstream DB) and LoadMeta (from metadata file). // it is unset (set nil) in RemoteCheckPoint.Clear, and when syncer's stream pass its location. @@ -293,6 +335,10 @@ type RemoteCheckPoint struct { needFlushSafeModeExitPoint bool logCtx *tcontext.Context + + // these fields are used for async flush checkpoint + snapshots []*remoteCheckpointSnapshot + snapshotSeq int } // NewRemoteCheckPoint creates a new RemoteCheckPoint. @@ -309,6 +355,56 @@ func NewRemoteCheckPoint(tctx *tcontext.Context, cfg *config.SubTaskConfig, id s return cp } +// Snapshot make a snapshot of checkpoint and return the snapshot id +func (cp *RemoteCheckPoint) Snapshot() SnapshotID { + cp.RLock() + defer cp.RUnlock() + // make snapshot is visit in single thread, so depend on rlock should be enough + cp.snapshotSeq++ + + id := cp.snapshotSeq + + tableCheckPoints := make(map[string]map[string]tablePoint, len(cp.points)) + for s, tableCps := range cp.points { + tableCpSnapshots := make(map[string]tablePoint) + for tbl, point := range tableCps { + if point.outOfDate() { + tableCpSnapshots[tbl] = point.location + } + } + if len(tableCpSnapshots) > 0 { + tableCheckPoints[s] = tableCpSnapshots + } + } + + // if there is no change just return an empty snapshot + if len(tableCheckPoints) == 0 && (cp.globalPoint == nil || !cp.globalPoint.outOfDate()) { + return SnapshotID{ + id: 0, + } + } + + snapshot := &remoteCheckpointSnapshot{ + id: id, + globalPointSaveTime: cp.globalPointCheckOrSaveTime, + needFlushSafeModeExitPoint: cp.needFlushSafeModeExitPoint, + points: tableCheckPoints, + } + if cp.globalPoint != nil { + globalLocation := &tablePoint{ + location: cp.globalPoint.location.location.Clone(), + ti: cp.globalPoint.location.ti, + } + snapshot.globalPoint = globalLocation + } + + cp.snapshots = append(cp.snapshots, snapshot) + return SnapshotID{ + id: id, + pos: cp.globalPoint.location.location, + } +} + // Init implements CheckPoint.Init. func (cp *RemoteCheckPoint) Init(tctx *tcontext.Context) error { checkPointDB := cp.cfg.To @@ -352,7 +448,7 @@ func (cp *RemoteCheckPoint) Clear(tctx *tcontext.Context) error { } cp.globalPoint = newBinlogPoint(binlog.NewLocation(cp.cfg.Flavor), binlog.NewLocation(cp.cfg.Flavor), nil, nil, cp.cfg.EnableGTID) - cp.globalPointSaveTime = time.Time{} + cp.globalPointCheckOrSaveTime = time.Time{} cp.points = make(map[string]map[string]*binlogPoint) cp.safeModeExitPoint = nil @@ -368,7 +464,7 @@ func (cp *RemoteCheckPoint) SaveTablePoint(table *filter.Table, point binlog.Loc // saveTablePoint saves single table's checkpoint without mutex.Lock. func (cp *RemoteCheckPoint) saveTablePoint(sourceTable *filter.Table, location binlog.Location, ti *model.TableInfo) { - if binlog.CompareLocation(cp.globalPoint.location, location, cp.cfg.EnableGTID) > 0 { + if binlog.CompareLocation(cp.globalPoint.location.location, location, cp.cfg.EnableGTID) > 0 { panic(fmt.Sprintf("table checkpoint %+v less than global checkpoint %+v", location, cp.globalPoint)) } @@ -517,7 +613,7 @@ func (cp *RemoteCheckPoint) FlushPointsExcept(tctx *tcontext.Context, exceptTabl sqls := make([]string, 0, 100) args := make([][]interface{}, 0, 100) - if cp.globalPoint.outOfDate() || cp.globalPointSaveTime.IsZero() || cp.needFlushSafeModeExitPoint { + if cp.globalPoint.outOfDate() || cp.globalPointCheckOrSaveTime.IsZero() || cp.needFlushSafeModeExitPoint { locationG := cp.GlobalPoint() sqlG, argG := cp.genUpdateSQL(globalCpSchema, globalCpTable, locationG, cp.safeModeExitPoint, nil, true) sqls = append(sqls, sqlG) @@ -534,7 +630,7 @@ func (cp *RemoteCheckPoint) FlushPointsExcept(tctx *tcontext.Context, exceptTabl } } if point.outOfDate() { - tiBytes, err := json.Marshal(point.ti) + tiBytes, err := json.Marshal(point.location.ti) if err != nil { return terror.ErrSchemaTrackerCannotSerialize.Delegate(err, schema, table) } @@ -566,7 +662,101 @@ func (cp *RemoteCheckPoint) FlushPointsExcept(tctx *tcontext.Context, exceptTabl point.flush() } - cp.globalPointSaveTime = time.Now() + cp.globalPointCheckOrSaveTime = time.Now() + cp.needFlushSafeModeExitPoint = false + return nil +} + +// FlushSnapshotPointsExcept implements CheckPoint.FlushSnapshotPointsExcept. +func (cp *RemoteCheckPoint) FlushSnapshotPointsExcept( + tctx *tcontext.Context, + snapshot int, + exceptTables []*filter.Table, + extraSQLs []string, + extraArgs [][]interface{}, +) error { + cp.RLock() + defer cp.RUnlock() + + if len(cp.snapshots) == 0 || cp.snapshots[0].id != snapshot { + cp.logCtx.Logger.DPanic("snapshot not found", zap.Int("id", snapshot)) + } + snapshotCp := cp.snapshots[0] + + // convert slice to map + excepts := make(map[string]map[string]struct{}) + for _, schemaTable := range exceptTables { + schema, table := schemaTable.Schema, schemaTable.Name + m, ok := excepts[schema] + if !ok { + m = make(map[string]struct{}) + excepts[schema] = m + } + m[table] = struct{}{} + } + + sqls := make([]string, 0, 100) + args := make([][]interface{}, 0, 100) + + if (snapshotCp.globalPoint != nil && cp.globalPoint.outOfDateBy(snapshotCp.globalPoint.location)) || snapshotCp.globalPointSaveTime.IsZero() || snapshotCp.needFlushSafeModeExitPoint { + locationG := snapshotCp.globalPoint.location + sqlG, argG := cp.genUpdateSQL(globalCpSchema, globalCpTable, locationG, cp.safeModeExitPoint, nil, true) + sqls = append(sqls, sqlG) + args = append(args, argG) + } + + type binlogPointSp struct { + pos *binlogPoint + spLoc tablePoint + } + + points := make([]*binlogPointSp, 0, 100) + + for schema, mSchema := range snapshotCp.points { + schemaCp := cp.points[schema] + for table, point := range mSchema { + if _, ok1 := excepts[schema]; ok1 { + if _, ok2 := excepts[schema][table]; ok2 { + continue + } + } + tableCP := schemaCp[table] + if tableCP.outOfDateBy(point.location) { + tiBytes, err := json.Marshal(point.ti) + if err != nil { + return terror.ErrSchemaTrackerCannotSerialize.Delegate(err, schema, table) + } + + sql2, arg := cp.genUpdateSQL(schema, table, point.location, nil, tiBytes, false) + sqls = append(sqls, sql2) + args = append(args, arg) + + points = append(points, &binlogPointSp{ + pos: tableCP, + spLoc: point, + }) + } + } + } + for i := range extraSQLs { + sqls = append(sqls, extraSQLs[i]) + args = append(args, extraArgs[i]) + } + + // use a new context apart from syncer, to make sure when syncer call `cancel` checkpoint could update + tctx2, cancel := tctx.WithContext(context.Background()).WithTimeout(maxDMLConnectionDuration) + defer cancel() + _, err := cp.dbConn.ExecuteSQL(tctx2, sqls, args...) + if err != nil { + return err + } + + cp.globalPoint.flush() + for _, point := range points { + point.pos.flushBy(point.spLoc) + } + + cp.globalPointCheckOrSaveTime = time.Now() cp.needFlushSafeModeExitPoint = false return nil } @@ -604,7 +794,7 @@ func (cp *RemoteCheckPoint) FlushPointWithTableInfo(tctx *tcontext.Context, tabl return err } - err = point.save(point.location, ti) + err = point.save(point.location.location, ti) if err != nil { return err } @@ -680,7 +870,9 @@ func (cp *RemoteCheckPoint) String() string { func (cp *RemoteCheckPoint) CheckGlobalPoint() bool { cp.RLock() defer cp.RUnlock() - return time.Since(cp.globalPointSaveTime) >= time.Duration(cp.cfg.CheckpointFlushInterval)*time.Second + t := cp.globalPointCheckOrSaveTime + cp.globalPointCheckOrSaveTime = time.Now() + return time.Since(t) >= time.Duration(cp.cfg.CheckpointFlushInterval)*time.Second } // Rollback implements CheckPoint.Rollback. @@ -703,12 +895,12 @@ func (cp *RemoteCheckPoint) Rollback(schemaTracker *schema.Tracker) { if err := schemaTracker.DropTable(table); err != nil { logger.Warn("failed to drop table from schema tracker", log.ShortError(err)) } - if point.ti != nil { + if point.location.ti != nil { // TODO: Figure out how to recover from errors. if err := schemaTracker.CreateSchemaIfNotExists(schemaName); err != nil { logger.Error("failed to rollback schema on schema tracker: cannot create schema", log.ShortError(err)) } - if err := schemaTracker.CreateTableIfNotExists(table, point.ti); err != nil { + if err := schemaTracker.CreateTableIfNotExists(table, point.location.ti); err != nil { logger.Error("failed to rollback schema on schema tracker: cannot create table", log.ShortError(err)) } } @@ -1043,7 +1235,7 @@ func (cp *RemoteCheckPoint) GetFlushedTableInfo(table *filter.Table) *model.Tabl if tables, ok := cp.points[table.Schema]; ok { if point, ok2 := tables[table.Name]; ok2 { - return point.flushedTI + return point.flushedLocation.ti } } return nil diff --git a/syncer/checkpoint_test.go b/syncer/checkpoint_test.go index 88ae25b62..8abc4d004 100644 --- a/syncer/checkpoint_test.go +++ b/syncer/checkpoint_test.go @@ -271,7 +271,7 @@ func (s *testCheckpointSuite) testGlobalCheckPoint(c *C, cp CheckPoint) { s.cfg.Dir = dir c.Assert(cp.LoadMeta(), IsNil) - // should flush because globalPointSaveTime is zero + // should flush because globalPointCheckOrSaveTime is zero s.mock.ExpectBegin() s.mock.ExpectExec("(202)?"+flushCheckPointSQL).WithArgs(cpid, "", "", pos1.Name, pos1.Pos, "", "", 0, "", "null", true).WillReturnResult(sqlmock.NewResult(0, 1)) s.mock.ExpectCommit() @@ -310,7 +310,7 @@ SHOW MASTER STATUS: /* AFTER CONNECTION POOL ESTABLISHED */ c.Assert(err, IsNil) c.Assert(cp.LoadMeta(), IsNil) - // should flush because globalPointSaveTime is zero + // should flush because globalPointCheckOrSaveTime is zero s.mock.ExpectBegin() s.mock.ExpectExec("(202)?"+flushCheckPointSQL).WithArgs(cpid, "", "", pos1.Name, pos1.Pos, "", pos2.Name, pos2.Pos, "", "null", true).WillReturnResult(sqlmock.NewResult(0, 1)) s.mock.ExpectCommit() @@ -399,12 +399,12 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) { cp.SaveTablePoint(table, binlog.Location{Position: pos1}, ti) rcp := cp.(*RemoteCheckPoint) c.Assert(rcp.points[schemaName][tableName].TableInfo(), NotNil) - c.Assert(rcp.points[schemaName][tableName].flushedTI, IsNil) + c.Assert(rcp.points[schemaName][tableName].flushedLocation.ti, IsNil) cp.Rollback(s.tracker) rcp = cp.(*RemoteCheckPoint) c.Assert(rcp.points[schemaName][tableName].TableInfo(), IsNil) - c.Assert(rcp.points[schemaName][tableName].flushedTI, IsNil) + c.Assert(rcp.points[schemaName][tableName].flushedLocation.ti, IsNil) _, err = s.tracker.GetTableInfo(table) c.Assert(strings.Contains(err.Error(), "doesn't exist"), IsTrue) @@ -478,6 +478,6 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) { c.Assert(cp.GlobalPoint(), DeepEquals, binlog.InitLocation(pos2, gs)) rcp = cp.(*RemoteCheckPoint) c.Assert(rcp.points[schemaName][tableName].TableInfo(), NotNil) - c.Assert(rcp.points[schemaName][tableName].flushedTI, NotNil) + c.Assert(rcp.points[schemaName][tableName].flushedLocation.ti, NotNil) c.Assert(*rcp.safeModeExitPoint, DeepEquals, binlog.InitLocation(pos2, gs)) } diff --git a/syncer/dml_worker.go b/syncer/dml_worker.go index c9c7c09c4..53afc0b2e 100644 --- a/syncer/dml_worker.go +++ b/syncer/dml_worker.go @@ -112,18 +112,18 @@ func (w *DMLWorker) run() { for j := range w.inCh { metrics.QueueSizeGauge.WithLabelValues(w.task, "dml_worker_input", w.source).Set(float64(len(w.inCh))) if j.tp == flush || j.tp == conflict { + // todo how to track conflict job metrics if j.tp == conflict { w.addCountFunc(false, adminQueueName, j.tp, 1, j.targetTable) } - w.wg.Add(w.workerCount) // flush for every DML queue for i, jobCh := range jobChs { startTime := time.Now() jobCh <- j metrics.AddJobDurationHistogram.WithLabelValues(j.tp.String(), w.task, queueBucketMapping[i], w.source).Observe(time.Since(startTime).Seconds()) } - w.wg.Wait() if j.tp == conflict { + w.wg.Wait() w.addCountFunc(true, adminQueueName, j.tp, 1, j.targetTable) } else { w.flushCh <- j @@ -168,6 +168,9 @@ func (w *DMLWorker) executeJobs(queueID int, jobCh chan *job) { w.executeBatchJobs(queueID, jobs) if j.tp == conflict || j.tp == flush { w.wg.Done() + if j.wg != nil { + j.wg.Done() + } } jobs = jobs[0:0] diff --git a/syncer/job.go b/syncer/job.go index ffa89350b..97bb27a39 100644 --- a/syncer/job.go +++ b/syncer/job.go @@ -15,6 +15,7 @@ package syncer import ( "fmt" + "sync" "time" "github.com/go-mysql-org/go-mysql/replication" @@ -36,6 +37,7 @@ const ( skip // used by Syncer.recordSkipSQLsLocation to record global location, but not execute SQL rotate conflict + gc // used to clean up out dated causality keys ) func (t opType) String() string { @@ -58,6 +60,8 @@ func (t opType) String() string { return "rotate" case conflict: return "conflict" + case gc: + return "gc" } return "" @@ -79,7 +83,10 @@ type job struct { originSQL string // show origin sql when error, only DDL now eventHeader *replication.EventHeader - jobAddTime time.Time // job commit time + jobAddTime time.Time // job commit time + seq int64 // sequence number for this job + sync bool // whether the flush job is sync or async + wg *sync.WaitGroup // wait group for flush job } func (j *job) String() string { @@ -167,6 +174,7 @@ func newFlushJob() *job { tp: flush, targetTable: &filter.Table{}, jobAddTime: time.Now(), + wg: &sync.WaitGroup{}, } } diff --git a/syncer/syncer.go b/syncer/syncer.go index a17448de3..ceae53619 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -18,6 +18,7 @@ import ( "context" "crypto/tls" "fmt" + "math" "os" "path" "reflect" @@ -183,8 +184,9 @@ type Syncer struct { done chan struct{} - checkpoint CheckPoint - onlineDDL onlineddl.OnlinePlugin + checkpoint CheckPoint + flushCpWorker checkpointFlushWorker + onlineDDL onlineddl.OnlinePlugin // record process error rather than log.Fatal runFatalChan chan *pb.ProcessError @@ -211,6 +213,7 @@ type Syncer struct { } addJobFunc func(*job) error + seq int64 // `lower_case_table_names` setting of upstream db SourceTableNamesFlavor utils.LowerCaseTableNamesFlavor @@ -398,6 +401,11 @@ func (s *Syncer) Init(ctx context.Context) (err error) { } } } + s.flushCpWorker = checkpointFlushWorker{ + input: make(chan *flushCpTask, 16), + cp: s.checkpoint, + afterFlushFn: s.afterFlushCheckpoint, + } // when Init syncer, set active relay log info err = s.setInitActiveRelayLog(ctx) @@ -916,6 +924,7 @@ func (s *Syncer) addJob(job *job) error { s.tctx.L().Info("All jobs is completed before syncer close, the coming job will be reject", zap.Any("job", job)) return nil } + job.seq = s.getSeq() switch job.tp { case xid: s.waitXIDJob.CAS(int64(waiting), int64(waitComplete)) @@ -927,10 +936,16 @@ func (s *Syncer) addJob(job *job) error { case flush: s.addCount(false, adminQueueName, job.tp, 1, job.targetTable) s.jobWg.Add(1) + job.wg.Add(s.cfg.WorkerCount) s.dmlJobCh <- job - s.jobWg.Wait() - s.addCount(true, adminQueueName, job.tp, 1, job.targetTable) - return s.flushCheckPoints() + var err error + if job.sync { + err = s.flushCheckPoints() + } else { + s.flushCheckPointsAsync(job.wg, job.seq) + } + + return err case ddl: s.addCount(false, adminQueueName, job.tp, 1, job.targetTable) s.updateReplicationJobTS(job, ddlJobIdx) @@ -957,10 +972,14 @@ func (s *Syncer) addJob(job *job) error { needFlush = true } }) + if needFlush { s.jobWg.Add(1) - s.dmlJobCh <- newFlushJob() - s.jobWg.Wait() + j := newFlushJob() + j.wg.Add(s.cfg.WorkerCount) + j.seq = s.getSeq() + s.dmlJobCh <- j + s.flushCheckPointsAsync(j.wg, j.seq) } if s.execError.Load() != nil { @@ -1005,7 +1024,7 @@ func (s *Syncer) addJob(job *job) error { } } - if needFlush || job.tp == ddl { + if job.tp == ddl { // interrupted after save checkpoint and before flush checkpoint. failpoint.Inject("FlushCheckpointStage", func(val failpoint.Value) { err := handleFlushCheckpointStage(4, val.(int), "before flush checkpoint") @@ -1038,18 +1057,87 @@ func (s *Syncer) resetShardingGroup(table *filter.Table) { } } +type flushCpTask struct { + snapshot SnapshotID + wg *sync.WaitGroup + // job version, causality gc max version + version int64 + // extra sharding ddl sqls + exceptTables []*filter.Table + shardMetaSQLs []string + shardMetaArgs [][]interface{} + // result chan + resChan chan error +} + +type checkpointFlushWorker struct { + input chan *flushCpTask + cp CheckPoint + afterFlushFn func(task *flushCpTask) error +} + +// Add add a new flush checkpoint job +func (w *checkpointFlushWorker) Add(msg *flushCpTask) { + w.input <- msg +} + +// Run read flush tasks from input and execute one by one +func (w *checkpointFlushWorker) Run(ctx *tcontext.Context) { + for msg := range w.input { + // wait all worker finish execute flush job + if msg.wg != nil { + msg.wg.Wait() + } + + err := w.cp.FlushSnapshotPointsExcept(ctx, msg.snapshot.id, msg.exceptTables, msg.shardMetaSQLs, msg.shardMetaArgs) + if msg.resChan != nil { + msg.resChan <- err + } + if err != nil { + ctx.L().Warn("flush checkpoint snapshot failed, ignore this error", zap.Any("snapshot", msg)) + + continue + } + ctx.L().Info("flushed checkpoint", zap.Int("snapshot_id", msg.snapshot.id), + zap.Stringer("pos", msg.snapshot.pos)) + if err = w.afterFlushFn(msg); err != nil { + if msg.resChan != nil { + msg.resChan <- err + } + ctx.L().Warn("flush post process failed", zap.Error(err)) + } + } +} + +// Close wait all pending job finish and stop this worker +func (w *checkpointFlushWorker) Close() { + close(w.input) +} + // flushCheckPoints flushes previous saved checkpoint in memory to persistent storage, like TiDB -// we flush checkpoints in four cases: +// we flush checkpoints in 3 four cases: // 1. DDL executed -// 2. at intervals (and job executed) -// 3. pausing / stopping the sync (driven by `s.flushJobs`) -// 4. IsFreshTask return true +// 2. pausing / stopping the sync (driven by `s.flushJobs`) +// 3. IsFreshTask return true // but when error occurred, we can not flush checkpoint, otherwise data may lost // and except rejecting to flush the checkpoint, we also need to rollback the checkpoint saved before // this should be handled when `s.Run` returned // // we may need to refactor the concurrency model to make the work-flow more clearer later. func (s *Syncer) flushCheckPoints() error { + errCh := make(chan error, 1) + // use math.MaxInt64 to clear all the data in causality component. + s.doFlushCheckPointsAsync(nil, errCh, math.MaxInt64) + return <-errCh +} + +// flushCheckPointsAsync flushes previous saved checkpoint asyncronously after all worker finsh flush. +// it it triggered by intervals (and job executed) +func (s *Syncer) flushCheckPointsAsync(wg *sync.WaitGroup, version int64) { + s.doFlushCheckPointsAsync(wg, nil, version) +} + +func (s *Syncer) doFlushCheckPointsAsync(wg *sync.WaitGroup, outCh chan error, version int64) { err := s.execError.Load() // TODO: for now, if any error occurred (including user canceled), checkpoint won't be updated. But if we have put // optimistic shard info, DM-master may resolved the optimistic lock and let other worker execute DDL. So after this @@ -1059,7 +1147,7 @@ func (s *Syncer) flushCheckPoints() error { s.tctx.L().Warn("error detected when executing SQL job, skip flush checkpoint", zap.Stringer("checkpoint", s.checkpoint), zap.Error(err)) - return nil + return } var ( @@ -1079,14 +1167,31 @@ func (s *Syncer) flushCheckPoints() error { s.tctx.L().Info("prepare flush sqls", zap.Strings("shard meta sqls", shardMetaSQLs), zap.Reflect("shard meta arguments", shardMetaArgs)) } - err = s.checkpoint.FlushPointsExcept(s.tctx, exceptTables, shardMetaSQLs, shardMetaArgs) - if err != nil { - return terror.Annotatef(err, "flush checkpoint %s", s.checkpoint) + snapshotID := s.checkpoint.Snapshot() + if snapshotID.id == 0 { + log.L().Info("checkpoint has no change, skip save checkpoint") + return + } + task := &flushCpTask{ + snapshot: snapshotID, + wg: wg, + exceptTables: exceptTables, + shardMetaSQLs: shardMetaSQLs, + shardMetaArgs: shardMetaArgs, + resChan: outCh, + version: version, } - s.tctx.L().Info("flushed checkpoint", zap.Stringer("checkpoint", s.checkpoint)) + s.flushCpWorker.Add(task) +} +func (s *Syncer) afterFlushCheckpoint(task *flushCpTask) error { + s.dmlJobCh <- &job{ + tp: gc, + seq: task.version, + } + s.addCount(true, adminQueueName, flush, 1, nil) // update current active relay log after checkpoint flushed - err = s.updateActiveRelayLog(s.checkpoint.GlobalPoint().Position) + err := s.updateActiveRelayLog(task.snapshot.pos.Position) if err != nil { return err } @@ -1098,18 +1203,22 @@ func (s *Syncer) flushCheckPoints() error { } s.lastCheckpointFlushedTime = now - s.tctx.L().Info("after last flushing checkpoint, DM has ignored row changes by expression filter", - zap.Int64("number of filtered insert", s.filteredInsert.Load()), - zap.Int64("number of filtered update", s.filteredUpdate.Load()), - zap.Int64("number of filtered delete", s.filteredDelete.Load())) - - s.filteredInsert.Store(0) - s.filteredUpdate.Store(0) - s.filteredDelete.Store(0) - + s.logAndClearFilteredStatistics() return nil } +func (s *Syncer) logAndClearFilteredStatistics() { + filteredInsert := s.filteredInsert.Swap(0) + filteredUpdate := s.filteredUpdate.Swap(0) + filteredDelete := s.filteredDelete.Swap(0) + if filteredInsert > 0 || filteredUpdate > 0 || filteredDelete > 0 { + s.tctx.L().Info("after last flushing checkpoint, DM has ignored row changes by expression filter", + zap.Int64("number of filtered insert", filteredInsert), + zap.Int64("number of filtered update", filteredUpdate), + zap.Int64("number of filtered delete", filteredDelete)) + } +} + // DDL synced one by one, so we only need to process one DDL at a time. func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *dbconn.DBConn, ddlJobChan chan *job) { defer s.wg.Done() @@ -1286,6 +1395,8 @@ func (s *Syncer) Run(ctx context.Context) (err error) { }() go func() { + // close flush checkpoint worker after all jobs are done. + defer s.flushCpWorker.Close() <-ctx.Done() select { case <-runCtx.Done(): @@ -1325,6 +1436,13 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } } + // start flush checkpoints worker. + s.wg.Add(1) + go func() { + defer s.wg.Done() + s.flushCpWorker.Run(s.tctx) + }() + var ( flushCheckpoint bool delLoadTask bool @@ -1478,7 +1596,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { s.checkpoint.SaveSafeModeExitPoint(&exitSafeModeLoc) // flush all jobs before exit - if err2 = s.flushJobs(); err2 != nil { + if err2 = s.flushJobs(true); err2 != nil { tctx.L().Warn("failed to flush jobs when exit task", zap.Error(err2)) } @@ -1733,7 +1851,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { currentLocation = startLocation } else if op == pb.ErrorOp_Skip { s.saveGlobalPoint(currentLocation) - err = s.flushJobs() + err = s.flushJobs(true) if err != nil { tctx.L().Warn("failed to flush jobs when handle-error skip", zap.Error(err)) } else { @@ -1848,7 +1966,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { // flush checkpoint even if there are no real binlog events if s.checkpoint.CheckGlobalPoint() { tctx.L().Info("meet heartbeat event and then flush jobs") - err2 = s.flushJobs() + err2 = s.flushJobs(false) } } } @@ -1947,6 +2065,12 @@ func (s *Syncer) handleRotateEvent(ev *replication.RotateEvent, ec eventContext) return nil } +func (s *Syncer) getSeq() int64 { + seq := s.seq + s.seq++ + return seq +} + func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) error { sourceTable := &filter.Table{ Schema: string(ev.Table.Schema), @@ -2353,7 +2477,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext, o // flush previous DMLs and checkpoint if needing to handle the DDL. // NOTE: do this flush before operations on shard groups which may lead to skip a table caused by `UnresolvedTables`. - if err = s.flushJobs(); err != nil { + if err = s.flushJobs(true); err != nil { return err } @@ -2918,9 +3042,10 @@ func (s *Syncer) recordSkipSQLsLocation(ec *eventContext) error { } // flushJobs add a flush job and wait for all jobs finished. -func (s *Syncer) flushJobs() error { +func (s *Syncer) flushJobs(sync bool) error { s.tctx.L().Info("flush all jobs", zap.Stringer("global checkpoint", s.checkpoint)) job := newFlushJob() + job.sync = sync return s.addJobFunc(job) }