Skip to content

Commit

Permalink
fix bidirectional drop (#230)
Browse files Browse the repository at this point in the history
* fix bidirectional drop
  • Loading branch information
Ryan-Git authored and ming535 committed Nov 21, 2019
1 parent 0d71dcb commit d6a14f9
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 15 deletions.
14 changes: 12 additions & 2 deletions integration_test/mysql_mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1291,20 +1291,30 @@ func TestMySQLTagDDL(t *testing.T) {
r.NoError(server.Start())

tbl := "abc"
tbl2 := "abcd"
_, err = sourceDB.Exec(fmt.Sprintf("%screate table `%s`.`%s`(`id` int(11), PRIMARY KEY (`id`)) ENGINE=InnoDB", consts.DDLTag, sourceDBName, tbl))
r.NoError(err)
_, err = sourceDB.Exec(fmt.Sprintf("create table `%s`.`%s`(`id` int(11), PRIMARY KEY (`id`)) ENGINE=InnoDB", sourceDBName, tbl2))
r.NoError(err)
_, err = sourceDB.Exec(fmt.Sprintf("%sdrop table `%s`.`%s`;", consts.DDLTag, sourceDBName, tbl))
r.NoError(err)
_, err = sourceDB.Exec(fmt.Sprintf("%sdrop table `%s`.`%s`;", consts.DDLTag, sourceDBName, tbl2))
r.NoError(err)

err = mysql_test.SendDeadSignal(sourceDB, pipelineConfig.PipelineName)
r.NoError(err)

<-server.Input.Done()

server.Close()

row := targetDB.QueryRow(fmt.Sprintf("SELECT table_name FROM information_schema.tables WHERE TABLE_SCHEMA = '%s' and table_name = '%s'", targetDBName, tbl))
var tblName string
err = row.Scan(&tblName)
r.Equal(sql.ErrNoRows, err)
r.Equal(sql.ErrNoRows, err) // create ignored by gravity

row = targetDB.QueryRow(fmt.Sprintf("SELECT table_name FROM information_schema.tables WHERE TABLE_SCHEMA = '%s' and table_name = '%s'", targetDBName, tbl2))
err = row.Scan(&tblName)
r.Equal(sql.ErrNoRows, err) // mysql ignores annotation in drop stmt, it will be executed
}

func TestMySQLDDL(t *testing.T) {
Expand Down
64 changes: 54 additions & 10 deletions pkg/outputs/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"database/sql"
"fmt"
"strings"
"sync"
"time"

mysqldriver "github.com/go-sql-driver/mysql"
"github.com/juju/errors"
Expand Down Expand Up @@ -46,8 +48,14 @@ type MySQLOutput struct {
sqlExecutor sql_execution_engine.EngineExecutor
tableConfigs []config.TableConfig
isTiDB bool

// MySQL ignores comment in drop table stmt in some versions, see https://bugs.mysql.com/bug.php?id=87852
// to prevent endless bidirectional drop tables, we keep the recent dropped table names
droppedTable sync.Map
}

const keepDropTableSeconds = 30

func init() {
registry.RegisterPlugin(registry.OutputPlugin, Name, &MySQLOutput{}, false)
}
Expand Down Expand Up @@ -155,6 +163,36 @@ func (output *MySQLOutput) route0(s, t string) (schema, table string) {
return
}

func (output *MySQLOutput) markTableDropped(schema, table string) {
output.droppedTable.Store(utils.TableIdentity(schema, table), time.Now())
output.cleanupDroppedTable()
}

func (output *MySQLOutput) markTableCreated(schema, table string) {
output.droppedTable.Delete(utils.TableIdentity(schema, table))
output.cleanupDroppedTable()
}

func (output *MySQLOutput) hasDropped(schema, table string) bool {
_, ok := output.droppedTable.Load(utils.TableIdentity(schema, table))
return ok
}

func (output *MySQLOutput) cleanupDroppedTable() {
now := time.Now()
var toDelete []string
output.droppedTable.Range(func(key, value interface{}) bool {
if now.Sub(value.(time.Time)).Seconds() > keepDropTableSeconds {
toDelete = append(toDelete, key.(string))
}
return true
})

for _, k := range toDelete {
output.droppedTable.Delete(k)
}
}

func toTableName(s, t string) *ast.TableName {
return &ast.TableName{
Schema: model.CIStr{
Expand Down Expand Up @@ -247,19 +285,25 @@ func (output *MySQLOutput) Execute(msgs []*core.Msg) error {
log.Info("[output-mysql] executed ddl: ", stmt)
metrics.OutputCounter.WithLabelValues(output.pipelineName, targetSchema, targetTable, string(core.MsgDDL), "create-table").Add(1)
output.targetSchemaStore.InvalidateSchemaCache(targetSchema)
output.markTableCreated(msg.Database, msg.Table)

case *ast.DropTableStmt:
tmp := *node
tmp.Tables[0] = toTableName(targetSchema, targetTable)
tmp.IfExists = true
stmt := restore(&tmp)
err := output.executeDDL(targetSchema, stmt)
if err != nil {
log.Fatal("[output-mysql] error exec ddl: ", stmt, ". err:", err)
if !output.hasDropped(msg.Database, msg.Table) {
tmp := *node
tmp.Tables[0] = toTableName(targetSchema, targetTable)
tmp.IfExists = true
stmt := restore(&tmp)
err := output.executeDDL(targetSchema, stmt)
if err != nil {
log.Fatal("[output-mysql] error exec ddl: ", stmt, ". err:", err)
}
log.Info("[output-mysql] executed ddl: ", stmt)
metrics.OutputCounter.WithLabelValues(output.pipelineName, targetSchema, targetTable, string(core.MsgDDL), "drop-table").Add(1)
output.targetSchemaStore.InvalidateSchemaCache(targetSchema)
output.markTableDropped(msg.Database, msg.Table)
} else {
log.Warnf("table %s has been dropped recently. This might be a bidirectional stmt, ignore", utils.TableIdentity(msg.Database, msg.Table))
}
log.Info("[output-mysql] executed ddl: ", stmt)
metrics.OutputCounter.WithLabelValues(output.pipelineName, targetSchema, targetTable, string(core.MsgDDL), "drop-table").Add(1)
output.targetSchemaStore.InvalidateSchemaCache(targetSchema)

case *ast.AlterTableStmt:
var targetDDLs []string
Expand Down
11 changes: 8 additions & 3 deletions pkg/schedulers/batch_table_scheduler/batch_table_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,9 +488,14 @@ func (scheduler *batchScheduler) startTableDispatcher(tableKey string) {
}

if len(curBatch) > 0 {
queueIdx := round % uint(scheduler.cfg.NrWorker)
round++
scheduler.workerQueues[queueIdx] <- curBatch
if curBatch[0].Type == core.MsgDDL {
ddlIdx := utils.GenHashKey(utils.TableIdentity(curBatch[0].Database, curBatch[0].Table)) % uint32(scheduler.cfg.NrWorker)
scheduler.workerQueues[ddlIdx] <- curBatch
} else {
queueIdx := round % uint(scheduler.cfg.NrWorker)
round++
scheduler.workerQueues[queueIdx] <- curBatch
}
}

// delete the delivered messages
Expand Down

0 comments on commit d6a14f9

Please sign in to comment.