From f1cf54f62b057290536258e75cd268d1f1eac7c4 Mon Sep 17 00:00:00 2001 From: renhongdi Date: Mon, 25 Nov 2019 12:21:43 +0800 Subject: [PATCH 1/3] minor fixes --- integration_test/mysql_mysql_test.go | 112 ++++++++++++++++++++++++ pkg/inputs/mysqlstream/binlog_tailer.go | 33 ++++--- pkg/outputs/mysql/mysql.go | 13 +-- 3 files changed, 143 insertions(+), 15 deletions(-) diff --git a/integration_test/mysql_mysql_test.go b/integration_test/mysql_mysql_test.go index 1cfd15a4..89758f5d 100644 --- a/integration_test/mysql_mysql_test.go +++ b/integration_test/mysql_mysql_test.go @@ -1239,6 +1239,118 @@ func TestMySQLToMyBidirection(t *testing.T) { r.Equal(2, ids[0]) } +func TestMySQLToMyBidirectionDrop(t *testing.T) { + r := require.New(t) + + sourceDBName := strings.ToLower(t.Name()) + "_source" + targetDBName := strings.ToLower(t.Name()) + "_target" + + sourceDB := mysql_test.MustSetupSourceDB(sourceDBName) + defer sourceDB.Close() + + err := utils.InitInternalTxnTags(sourceDB) + r.NoError(err) + + targetDB := mysql_test.MustSetupTargetDB(targetDBName) + defer targetDB.Close() + + sourceDBConfig := mysql_test.SourceDBConfig() + targetDBConfig := mysql_test.TargetDBConfig() + + pipelineConfig1 := config.PipelineConfigV3{ + PipelineName: t.Name() + "_1", + Version: config.PipelineConfigV3Version, + InputPlugin: config.InputConfig{ + Type: "mysql", + Mode: config.Stream, + Config: utils.MustAny2Map(mysqlstream.MySQLBinlogInputPluginConfig{ + IgnoreBiDirectionalData: true, + Source: sourceDBConfig, + }), + }, + OutputPlugin: config.GenericPluginConfig{ + Type: "mysql", + Config: utils.MustAny2Map(mysql.MySQLPluginConfig{ + DBConfig: targetDBConfig, + EnableDDL: true, + Routes: []map[string]interface{}{ + { + "match-schema": sourceDBName, + "match-table": "*", + "target-schema": targetDBName, + }, + }, + EngineConfig: &config.GenericPluginConfig{ + Type: sql_execution_engine.MySQLReplaceEngine, + Config: map[string]interface{}{ + "tag-internal-txn": true, + }, + }, + }), + }, + } + + pipelineConfig2 := config.PipelineConfigV3{ + PipelineName: t.Name() + "_2", + Version: config.PipelineConfigV3Version, + InputPlugin: config.InputConfig{ + Type: "mysql", + Mode: config.Stream, + Config: utils.MustAny2Map(mysqlstream.MySQLBinlogInputPluginConfig{ + IgnoreBiDirectionalData: true, + Source: targetDBConfig, + }), + }, + OutputPlugin: config.GenericPluginConfig{ + Type: "mysql", + Config: utils.MustAny2Map(mysql.MySQLPluginConfig{ + DBConfig: sourceDBConfig, + EnableDDL: true, + Routes: []map[string]interface{}{ + { + "match-schema": targetDBName, + "match-table": "*", + "target-schema": sourceDBName, + }, + }, + EngineConfig: &config.GenericPluginConfig{ + Type: sql_execution_engine.MySQLReplaceEngine, + Config: map[string]interface{}{ + "tag-internal-txn": true, + }, + }, + }), + }, + } + + server1, err := app.NewServer(pipelineConfig1) + r.NoError(err) + r.NoError(server1.Start()) + + server2, err := app.NewServer(pipelineConfig2) + r.NoError(err) + r.NoError(server2.Start()) + + _, err = sourceDB.Exec(fmt.Sprintf("create table `%s`.t(id int(11), primary key(id)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ", sourceDBName)) + r.NoError(err) + + time.Sleep(time.Second) + + var tblName string + row := targetDB.QueryRow(fmt.Sprintf("SELECT table_name FROM information_schema.tables WHERE TABLE_SCHEMA = '%s' and table_name = 't'", targetDBName)) + r.NoError(row.Scan(&tblName)) + r.Equal("t", tblName) + + _, err = targetDB.Exec(fmt.Sprintf("drop table `%s`.t", targetDBName)) + r.NoError(err) + + time.Sleep(time.Second) + + row = sourceDB.QueryRow(fmt.Sprintf("SELECT table_name FROM information_schema.tables WHERE TABLE_SCHEMA = '%s' and table_name = 't'", sourceDBName)) + err = row.Scan(&tblName) + r.Equal(sql.ErrNoRows, err) +} + func TestMySQLTagDDL(t *testing.T) { r := require.New(t) diff --git a/pkg/inputs/mysqlstream/binlog_tailer.go b/pkg/inputs/mysqlstream/binlog_tailer.go index 0add48b9..d3b1c50f 100644 --- a/pkg/inputs/mysqlstream/binlog_tailer.go +++ b/pkg/inputs/mysqlstream/binlog_tailer.go @@ -436,6 +436,8 @@ func (tailer *BinlogTailer) Start() error { log.Fatalf("[binlogTailer] failed to flush position cache, err: %v", errors.ErrorStack(err)) } + sent := 0 + for i := range dbNames { dbName := dbNames[i] table := tables[i] @@ -473,23 +475,34 @@ func (tailer *BinlogTailer) Start() error { ddlSQL, int64(e.Header.Timestamp), received) + + // do not send messages without router to the system + if consts.IsInternalDBTraffic(dbName) || + (tailer.router != nil && !tailer.router.Exists(ddlMsg)) { + continue + } + if err := tailer.emitter.Emit(ddlMsg); err != nil { log.Fatalf("failed to emit ddl msg: %v", errors.ErrorStack(err)) } + sent++ } - // emit barrier msg - barrierMsg = NewBarrierMsg(tailer.AfterMsgCommit) - barrierMsg.InputContext = inputContext{op: ddl, position: currentPosition} - if err := tailer.emitter.Emit(barrierMsg); err != nil { - log.Fatalf("failed to emit barrier msg: %v", errors.ErrorStack(err)) - } - <-barrierMsg.Done - if err := tailer.positionCache.Flush(); err != nil { - log.Fatalf("[binlogTailer] failed to flush position cache, err: %v", errors.ErrorStack(err)) + if sent > 0 { + // emit barrier msg + barrierMsg = NewBarrierMsg(tailer.AfterMsgCommit) + barrierMsg.InputContext = inputContext{op: ddl, position: currentPosition} + if err := tailer.emitter.Emit(barrierMsg); err != nil { + log.Fatalf("failed to emit barrier msg: %v", errors.ErrorStack(err)) + } + <-barrierMsg.Done + if err := tailer.positionCache.Flush(); err != nil { + log.Fatalf("[binlogTailer] failed to flush position cache, err: %v", errors.ErrorStack(err)) + } + + log.Infof("[binlogTailer] ddl done with gtid: %v, stmt: %s", ev.GSet.String(), string(ev.Query)) } - log.Infof("[binlogTailer] ddl done with gtid: %v, stmt: %s", ev.GSet.String(), string(ev.Query)) case *replication.GTIDEvent: // GTID stands for Global Transaction IDentifier // It is composed of two parts: diff --git a/pkg/outputs/mysql/mysql.go b/pkg/outputs/mysql/mysql.go index f1043a76..37965cbc 100644 --- a/pkg/outputs/mysql/mysql.go +++ b/pkg/outputs/mysql/mysql.go @@ -352,12 +352,15 @@ func (output *MySQLOutput) Execute(msgs []*core.Msg) error { if output.isTiDB { a := &ast.RenameTableStmt{ - OldTable: toTableName(os, ot), - NewTable: toTableName(ns, nt), - TableToTables: make([]*ast.TableToTable, 1), + OldTable: toTableName(os, ot), + NewTable: toTableName(ns, nt), + TableToTables: []*ast.TableToTable{ + { + OldTable: toTableName(os, ot), + NewTable: toTableName(ns, nt), + }, + }, } - a.TableToTables[0].OldTable = a.OldTable - a.TableToTables[0].NewTable = a.NewTable targetDDLs = append(targetDDLs, restore(a)) } else { tmp.TableToTables[i].OldTable = toTableName(os, ot) From 4abca60ec7bd0aa64744e36d84b11396f6d3e4be Mon Sep 17 00:00:00 2001 From: renhongdi Date: Mon, 25 Nov 2019 22:28:20 +0800 Subject: [PATCH 2/3] fix test --- integration_test/mysql_mysql_test.go | 112 --------------------------- 1 file changed, 112 deletions(-) diff --git a/integration_test/mysql_mysql_test.go b/integration_test/mysql_mysql_test.go index 89758f5d..1cfd15a4 100644 --- a/integration_test/mysql_mysql_test.go +++ b/integration_test/mysql_mysql_test.go @@ -1239,118 +1239,6 @@ func TestMySQLToMyBidirection(t *testing.T) { r.Equal(2, ids[0]) } -func TestMySQLToMyBidirectionDrop(t *testing.T) { - r := require.New(t) - - sourceDBName := strings.ToLower(t.Name()) + "_source" - targetDBName := strings.ToLower(t.Name()) + "_target" - - sourceDB := mysql_test.MustSetupSourceDB(sourceDBName) - defer sourceDB.Close() - - err := utils.InitInternalTxnTags(sourceDB) - r.NoError(err) - - targetDB := mysql_test.MustSetupTargetDB(targetDBName) - defer targetDB.Close() - - sourceDBConfig := mysql_test.SourceDBConfig() - targetDBConfig := mysql_test.TargetDBConfig() - - pipelineConfig1 := config.PipelineConfigV3{ - PipelineName: t.Name() + "_1", - Version: config.PipelineConfigV3Version, - InputPlugin: config.InputConfig{ - Type: "mysql", - Mode: config.Stream, - Config: utils.MustAny2Map(mysqlstream.MySQLBinlogInputPluginConfig{ - IgnoreBiDirectionalData: true, - Source: sourceDBConfig, - }), - }, - OutputPlugin: config.GenericPluginConfig{ - Type: "mysql", - Config: utils.MustAny2Map(mysql.MySQLPluginConfig{ - DBConfig: targetDBConfig, - EnableDDL: true, - Routes: []map[string]interface{}{ - { - "match-schema": sourceDBName, - "match-table": "*", - "target-schema": targetDBName, - }, - }, - EngineConfig: &config.GenericPluginConfig{ - Type: sql_execution_engine.MySQLReplaceEngine, - Config: map[string]interface{}{ - "tag-internal-txn": true, - }, - }, - }), - }, - } - - pipelineConfig2 := config.PipelineConfigV3{ - PipelineName: t.Name() + "_2", - Version: config.PipelineConfigV3Version, - InputPlugin: config.InputConfig{ - Type: "mysql", - Mode: config.Stream, - Config: utils.MustAny2Map(mysqlstream.MySQLBinlogInputPluginConfig{ - IgnoreBiDirectionalData: true, - Source: targetDBConfig, - }), - }, - OutputPlugin: config.GenericPluginConfig{ - Type: "mysql", - Config: utils.MustAny2Map(mysql.MySQLPluginConfig{ - DBConfig: sourceDBConfig, - EnableDDL: true, - Routes: []map[string]interface{}{ - { - "match-schema": targetDBName, - "match-table": "*", - "target-schema": sourceDBName, - }, - }, - EngineConfig: &config.GenericPluginConfig{ - Type: sql_execution_engine.MySQLReplaceEngine, - Config: map[string]interface{}{ - "tag-internal-txn": true, - }, - }, - }), - }, - } - - server1, err := app.NewServer(pipelineConfig1) - r.NoError(err) - r.NoError(server1.Start()) - - server2, err := app.NewServer(pipelineConfig2) - r.NoError(err) - r.NoError(server2.Start()) - - _, err = sourceDB.Exec(fmt.Sprintf("create table `%s`.t(id int(11), primary key(id)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ", sourceDBName)) - r.NoError(err) - - time.Sleep(time.Second) - - var tblName string - row := targetDB.QueryRow(fmt.Sprintf("SELECT table_name FROM information_schema.tables WHERE TABLE_SCHEMA = '%s' and table_name = 't'", targetDBName)) - r.NoError(row.Scan(&tblName)) - r.Equal("t", tblName) - - _, err = targetDB.Exec(fmt.Sprintf("drop table `%s`.t", targetDBName)) - r.NoError(err) - - time.Sleep(time.Second) - - row = sourceDB.QueryRow(fmt.Sprintf("SELECT table_name FROM information_schema.tables WHERE TABLE_SCHEMA = '%s' and table_name = 't'", sourceDBName)) - err = row.Scan(&tblName) - r.Equal(sql.ErrNoRows, err) -} - func TestMySQLTagDDL(t *testing.T) { r := require.New(t) From b7fc80e2c2e0702eee8832f0f108ac245d4ea356 Mon Sep 17 00:00:00 2001 From: renhongdi Date: Tue, 26 Nov 2019 13:46:36 +0800 Subject: [PATCH 3/3] fix potential dead lock --- pkg/schedulers/batch_table_scheduler/batch_table_scheduler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/schedulers/batch_table_scheduler/batch_table_scheduler.go b/pkg/schedulers/batch_table_scheduler/batch_table_scheduler.go index cddcda0e..daf87b5f 100644 --- a/pkg/schedulers/batch_table_scheduler/batch_table_scheduler.go +++ b/pkg/schedulers/batch_table_scheduler/batch_table_scheduler.go @@ -391,7 +391,7 @@ func (scheduler *batchScheduler) dispatchMsg(msg *core.Msg) error { func (scheduler *batchScheduler) startTableDispatcher(tableKey string) { scheduler.tableBuffers[tableKey] = make(chan *core.Msg, scheduler.cfg.MaxBatchPerWorker*10) - scheduler.tableLatchC[tableKey] = make(chan uint64, scheduler.cfg.MaxBatchPerWorker*10) + scheduler.tableLatchC[tableKey] = make(chan uint64, scheduler.cfg.SlidingWindowSize) scheduler.tableBufferWg.Add(1) go func(c chan *core.Msg, tableLatchC chan uint64, key string) {