Skip to content

Commit

Permalink
should fatal when get schema error (#219)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ryan-Git authored Oct 20, 2019
1 parent dc97ba7 commit bbe2df4
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 3 deletions.
72 changes: 72 additions & 0 deletions integration_test/mysql_mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"testing"
"time"

"github.com/moiot/gravity/pkg/inputs"

"github.com/stretchr/testify/require"

"github.com/moiot/gravity/pkg/app"
Expand Down Expand Up @@ -125,6 +127,76 @@ func TestMySQLToMySQLStream(t *testing.T) {
r.NoError(generator.TestChecksum())
}

func TestTableNotExists(t *testing.T) {
r := require.New(t)

sourceDBName := strings.ToLower(t.Name()) + "_source"
targetDBName := strings.ToLower(t.Name()) + "_target"

sourceDB := mysql_test.MustSetupSourceDB(sourceDBName)
defer sourceDB.Close()
targetDB := mysql_test.MustSetupTargetDB(targetDBName)
defer targetDB.Close()

sourceDBConfig := mysql_test.SourceDBConfig()
targetDBConfig := mysql_test.TargetDBConfig()

dbUtil := utils.NewMySQLDB(sourceDB)
binlogFilePos, gtid, err := dbUtil.GetMasterStatus()
r.NoError(err)

pipelineConfig := config.PipelineConfigV3{
PipelineName: t.Name(),
Version: config.PipelineConfigV3Version,
InputPlugin: config.InputConfig{
Type: inputs.Mysql,
Mode: config.Stream,
Config: utils.MustAny2Map(mysqlstream.MySQLBinlogInputPluginConfig{
Source: sourceDBConfig,
StartPosition: &config.MySQLBinlogPosition{
BinLogFileName: binlogFilePos.Name,
BinLogFilePos: binlogFilePos.Pos,
BinlogGTID: gtid.String(),
},
}),
},
OutputPlugin: config.GenericPluginConfig{
Type: "mysql",
Config: utils.MustAny2Map(mysql.MySQLPluginConfig{
DBConfig: targetDBConfig,
EnableDDL: true,
Routes: []map[string]interface{}{
{
"match-schema": sourceDBName,
"match-table": "*",
"target-schema": targetDBName,
},
},
}),
},
}

fullTblName := fmt.Sprintf("`%s`.`t`", sourceDBName)
_, err = sourceDB.Exec(fmt.Sprintf("CREATE TABLE %s (`id` int(11) unsigned NOT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;", fullTblName))
r.NoError(err)
_, err = sourceDB.Exec(fmt.Sprintf("insert into %s(id) values (1);", fullTblName))
r.NoError(err)
_, err = sourceDB.Exec(fmt.Sprintf("drop table %s;", fullTblName))
r.NoError(err)

err = mysql_test.SendDeadSignal(sourceDB, pipelineConfig.PipelineName)
r.NoError(err)

// start the server
server, err := app.NewServer(pipelineConfig)
r.NoError(err)

r.NoError(server.Start())

server.Input.Wait()
server.Close()
}

func TestMySQLBatch(t *testing.T) {
r := require.New(t)

Expand Down
5 changes: 2 additions & 3 deletions pkg/inputs/mysqlstream/binlog_tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,7 @@ func (tailer *BinlogTailer) Start() error {
// TODO: introduce schema store, so that we won't have stale schema
schema, err := tailer.sourceSchemaStore.GetSchema(schemaName)
if err != nil {
log.Errorf("[binlogTailer] failed GetSchema %v. err: %v.", schemaName, errors.ErrorStack(err))
continue
log.Fatalf("[binlogTailer] failed GetSchema %v. err: %v.", schemaName, errors.ErrorStack(err))
}

tableDef := schema[tableName]
Expand All @@ -329,7 +328,7 @@ func (tailer *BinlogTailer) Start() error {
log.Fatalf("[binlogTailer] failed to get internal traffic table: schemaName: %v, tableName: %v",
schemaName, tableName)
} else {
log.Errorf("[binlogTailer] failed to get table def, schemaName: %v, tableName: %v", schemaName, tableName)
log.Warnf("[binlogTailer] failed to get table def, skip this mutation. schemaName: %v, tableName: %v", schemaName, tableName)
continue
}
}
Expand Down

0 comments on commit bbe2df4

Please sign in to comment.