diff --git a/pkg/inputs/mysqlstream/binlog_tailer.go b/pkg/inputs/mysqlstream/binlog_tailer.go index 0add48b9..d3b1c50f 100644 --- a/pkg/inputs/mysqlstream/binlog_tailer.go +++ b/pkg/inputs/mysqlstream/binlog_tailer.go @@ -436,6 +436,8 @@ func (tailer *BinlogTailer) Start() error { log.Fatalf("[binlogTailer] failed to flush position cache, err: %v", errors.ErrorStack(err)) } + sent := 0 + for i := range dbNames { dbName := dbNames[i] table := tables[i] @@ -473,23 +475,34 @@ func (tailer *BinlogTailer) Start() error { ddlSQL, int64(e.Header.Timestamp), received) + + // do not send messages without router to the system + if consts.IsInternalDBTraffic(dbName) || + (tailer.router != nil && !tailer.router.Exists(ddlMsg)) { + continue + } + if err := tailer.emitter.Emit(ddlMsg); err != nil { log.Fatalf("failed to emit ddl msg: %v", errors.ErrorStack(err)) } + sent++ } - // emit barrier msg - barrierMsg = NewBarrierMsg(tailer.AfterMsgCommit) - barrierMsg.InputContext = inputContext{op: ddl, position: currentPosition} - if err := tailer.emitter.Emit(barrierMsg); err != nil { - log.Fatalf("failed to emit barrier msg: %v", errors.ErrorStack(err)) - } - <-barrierMsg.Done - if err := tailer.positionCache.Flush(); err != nil { - log.Fatalf("[binlogTailer] failed to flush position cache, err: %v", errors.ErrorStack(err)) + if sent > 0 { + // emit barrier msg + barrierMsg = NewBarrierMsg(tailer.AfterMsgCommit) + barrierMsg.InputContext = inputContext{op: ddl, position: currentPosition} + if err := tailer.emitter.Emit(barrierMsg); err != nil { + log.Fatalf("failed to emit barrier msg: %v", errors.ErrorStack(err)) + } + <-barrierMsg.Done + if err := tailer.positionCache.Flush(); err != nil { + log.Fatalf("[binlogTailer] failed to flush position cache, err: %v", errors.ErrorStack(err)) + } + + log.Infof("[binlogTailer] ddl done with gtid: %v, stmt: %s", ev.GSet.String(), string(ev.Query)) } - log.Infof("[binlogTailer] ddl done with gtid: %v, stmt: %s", ev.GSet.String(), string(ev.Query)) case *replication.GTIDEvent: // GTID stands for Global Transaction IDentifier // It is composed of two parts: diff --git a/pkg/outputs/mysql/mysql.go b/pkg/outputs/mysql/mysql.go index f1043a76..37965cbc 100644 --- a/pkg/outputs/mysql/mysql.go +++ b/pkg/outputs/mysql/mysql.go @@ -352,12 +352,15 @@ func (output *MySQLOutput) Execute(msgs []*core.Msg) error { if output.isTiDB { a := &ast.RenameTableStmt{ - OldTable: toTableName(os, ot), - NewTable: toTableName(ns, nt), - TableToTables: make([]*ast.TableToTable, 1), + OldTable: toTableName(os, ot), + NewTable: toTableName(ns, nt), + TableToTables: []*ast.TableToTable{ + { + OldTable: toTableName(os, ot), + NewTable: toTableName(ns, nt), + }, + }, } - a.TableToTables[0].OldTable = a.OldTable - a.TableToTables[0].NewTable = a.NewTable targetDDLs = append(targetDDLs, restore(a)) } else { tmp.TableToTables[i].OldTable = toTableName(os, ot) diff --git a/pkg/schedulers/batch_table_scheduler/batch_table_scheduler.go b/pkg/schedulers/batch_table_scheduler/batch_table_scheduler.go index cddcda0e..daf87b5f 100644 --- a/pkg/schedulers/batch_table_scheduler/batch_table_scheduler.go +++ b/pkg/schedulers/batch_table_scheduler/batch_table_scheduler.go @@ -391,7 +391,7 @@ func (scheduler *batchScheduler) dispatchMsg(msg *core.Msg) error { func (scheduler *batchScheduler) startTableDispatcher(tableKey string) { scheduler.tableBuffers[tableKey] = make(chan *core.Msg, scheduler.cfg.MaxBatchPerWorker*10) - scheduler.tableLatchC[tableKey] = make(chan uint64, scheduler.cfg.MaxBatchPerWorker*10) + scheduler.tableLatchC[tableKey] = make(chan uint64, scheduler.cfg.SlidingWindowSize) scheduler.tableBufferWg.Add(1) go func(c chan *core.Msg, tableLatchC chan uint64, key string) {