From 797ad2d1baddb33c0cf5a5b4da49b06f8f02cea6 Mon Sep 17 00:00:00 2001 From: Hu Ming Date: Fri, 28 Dec 2018 18:16:23 +0800 Subject: [PATCH] fix repliation mode deadlock (#4) * fix repliation mode hang when close the server --- docs/2.0/example-mysql2mysql-full.toml | 61 +++++++++++++++++++ gravity/inputs/helper/two_stage_input.go | 12 +++- gravity/inputs/mysqlbatch/input.go | 15 +++-- gravity/inputs/mysqlbatch/msg.go | 5 +- .../inputs/mysqlbatch/mysql_table_scanner.go | 7 +-- gravity/inputs/mysqlstream/input.go | 25 +++++--- position_store/position_store.go | 2 +- 7 files changed, 106 insertions(+), 21 deletions(-) diff --git a/docs/2.0/example-mysql2mysql-full.toml b/docs/2.0/example-mysql2mysql-full.toml index e69de29b..22e70087 100644 --- a/docs/2.0/example-mysql2mysql-full.toml +++ b/docs/2.0/example-mysql2mysql-full.toml @@ -0,0 +1,61 @@ +# 整个配置由 4 部分组成: +# - input: 定义 input plugin 的配置 +# - filters: 定义 filters plugin 的配置,filter 用来对数据流做变更操作 +# - output: 定义 output plugin 的配置 +# - system: 定义系统级配置 +# +# 围绕 core.Msg, 系统定义若干个 match 函数,在配置文件里使用 match 函数 +# 来匹配 filter 和 output 的路由,filter/output 里的每一个 match 函数 +# 都匹配才算满足匹配规则 +# +[input.mysql] +mode = "stream" +ignore-bidirectional-data = true + +[input.mysql] +mode = "stream" +[input.mysql.source] +host = "127.0.0.1" +username = "root" +password = "" +port = 3306 + +[[filters]] +type = "reject" +match-schema = "test_db" +match-table = "test_table" + +[[filters]] +type = "rename-dml-column" +match-schema = "test" +match-table = "test_table_2" +from = ["b"] +to = ["d"] + +[[filters]] +type = "delete-dml-column" +match-schema = "test" +match-table = "test_table" +columns = ["e", "f"] + +[[filters]] +type = "dml-pk-override" +match-schema = "test" +match-table = "test_table" +id = "another_id" + +[output.kafka] +broker-addrs = ["1.2.3.4:9002"] +mode = "async" + +[[output.kafka.routes]] +match-schema = "test_db" +match-table = "test_table_2" +dml-topic = "binlog.test_db.test_table_2" + +[system.scheduler] +type = "batch-scheduler" +nr-worker = 1 +batch-size = 2 +queue-size = 1024 +sliding-window-size = 1024 diff --git a/gravity/inputs/helper/two_stage_input.go b/gravity/inputs/helper/two_stage_input.go index c5d9cde1..f5905474 100644 --- a/gravity/inputs/helper/two_stage_input.go +++ b/gravity/inputs/helper/two_stage_input.go @@ -135,12 +135,18 @@ func (i *TwoStageInputPlugin) Start(emitter core.Emitter) error { } go func() { + + pos, ok := <-i.full.Done() + if !ok { + log.Info("[TwoStageInputPlugin] full stage done") + return + } + i.transitionMutex.Lock() defer i.transitionMutex.Unlock() - pos, ok := <-i.full.Done() - if !ok || i.closed { - log.Info("[TwoStageInputPlugin] full stage cancelled") + if i.closed { + log.Info("[TwoStageInputPlugin] full stage closed") return } diff --git a/gravity/inputs/mysqlbatch/input.go b/gravity/inputs/mysqlbatch/input.go index f8bbf53b..31d180c5 100644 --- a/gravity/inputs/mysqlbatch/input.go +++ b/gravity/inputs/mysqlbatch/input.go @@ -236,16 +236,22 @@ func (plugin *mysqlFullInput) Wait() { } func (plugin *mysqlFullInput) Close() { - log.Infof("[scanner server] closing...") plugin.closeOnce.Do(func() { - plugin.cancel() + log.Infof("[scanner server] closing...") + + if plugin.cancel != nil { + plugin.cancel() + } for i := range plugin.tableScanners { plugin.tableScanners[i].Wait() } - plugin.sourceDB.Close() + if plugin.sourceDB != nil { + plugin.sourceDB.Close() + } + if plugin.scanDB != nil { plugin.scanDB.Close() @@ -254,9 +260,10 @@ func (plugin *mysqlFullInput) Close() { if plugin.sourceSchemaStore != nil { plugin.sourceSchemaStore.Close() } + + log.Infof("[mysqlFullInput] closed") }) - log.Infof("[mysqlFullInput] closed") } func (plugin *mysqlFullInput) waitFinish() { diff --git a/gravity/inputs/mysqlbatch/msg.go b/gravity/inputs/mysqlbatch/msg.go index 74507774..8d0a5256 100644 --- a/gravity/inputs/mysqlbatch/msg.go +++ b/gravity/inputs/mysqlbatch/msg.go @@ -1,6 +1,7 @@ package mysqlbatch import ( + "github.com/moiot/gravity/position_store" "time" log "github.com/sirupsen/logrus" @@ -22,7 +23,8 @@ func NewMsg( rowPtrs []interface{}, columnTypes []*sql.ColumnType, sourceTableDef *schema_store.Table, - callbackFunc core.AfterMsgCommitFunc) *core.Msg { + callbackFunc core.AfterMsgCommitFunc, + position position_store.MySQLTablePosition) *core.Msg { columnDataMap := mysql.SQLDataPtrs2Val(rowPtrs, columnTypes) msg := core.Msg{ @@ -57,6 +59,7 @@ func NewMsg( msg.OutputStreamKey = utils.NewStringPtr(msg.GetPkSign()) msg.Done = make(chan struct{}) msg.AfterCommitCallback = callbackFunc + msg.InputContext = position msg.Metrics = core.Metrics{ MsgCreateTime: time.Now(), } diff --git a/gravity/inputs/mysqlbatch/mysql_table_scanner.go b/gravity/inputs/mysqlbatch/mysql_table_scanner.go index 063ad6c4..7f5bd2e1 100644 --- a/gravity/inputs/mysqlbatch/mysql_table_scanner.go +++ b/gravity/inputs/mysqlbatch/mysql_table_scanner.go @@ -235,6 +235,7 @@ func (tableScanner *TableScanner) LoopInBatch(db *sql.DB, tableDef *schema_store tableScanner.positionStore.PutCurrent(utils.TableIdentity(tableDef.Schema, tableDef.Name), min) currentMinPos = min } + log.Infof("[LoopInBatch] prepare current: %v", currentMinPos) currentMinValue := currentMinPos.Value resultCount := 0 @@ -312,9 +313,7 @@ func (tableScanner *TableScanner) LoopInBatch(db *sql.DB, tableDef *schema_store posV := mysql.NormalizeSQLType(reflect.ValueOf(rowPtrs[scanIdx]).Elem().Interface()) position := position_store.MySQLTablePosition{Value: posV, Column: scanColumn} - msg := NewMsg(rowPtrs, columnTypes, tableDef, tableScanner.AfterMsgCommit) - msg.AfterCommitCallback = tableScanner.AfterMsgCommit - msg.InputContext = position + msg := NewMsg(rowPtrs, columnTypes, tableDef, tableScanner.AfterMsgCommit, position) if err := tableScanner.emitter.Emit(msg); err != nil { log.Fatalf("[LoopInBatch] failed to emit job: %v", errors.ErrorStack(err)) @@ -355,7 +354,7 @@ func (tableScanner *TableScanner) FindAll(db *sql.DB, tableDef *schema_store.Tab for i := range allData { rowPtrs := allData[i] - msg := NewMsg(rowPtrs, columnTypes, tableDef, nil) + msg := NewMsg(rowPtrs, columnTypes, tableDef, nil, position_store.MySQLTablePosition{}) if err := tableScanner.emitter.Emit(msg); err != nil { log.Fatalf("[tableScanner] failed to emit: %v", errors.ErrorStack(err)) } diff --git a/gravity/inputs/mysqlstream/input.go b/gravity/inputs/mysqlstream/input.go index d9cd9e59..5b34e6c0 100644 --- a/gravity/inputs/mysqlstream/input.go +++ b/gravity/inputs/mysqlstream/input.go @@ -220,19 +220,28 @@ func (plugin *mysqlInputPlugin) Start(emitter core.Emitter) error { } func (plugin *mysqlInputPlugin) Close() { - log.Infof("[mysqlInputPlugin] closing...") plugin.closeOnce.Do(func() { - plugin.binlogChecker.Stop() + log.Infof("[mysqlInputPlugin] closing...") - plugin.binlogTailer.Close() + if plugin.binlogChecker != nil { + plugin.binlogChecker.Stop() + } - plugin.sourceSchemaStore.Close() + if plugin.binlogTailer != nil { + plugin.binlogTailer.Close() + } - if err := plugin.sourceDB.Close(); err != nil { - log.Errorf("[mysqlInputPlugin.Close] error close db. %s", errors.Trace(err)) + if plugin.sourceSchemaStore != nil { + plugin.sourceSchemaStore.Close() } - }) - log.Infof("[mysqlInputPlugin] closed") + if plugin.sourceDB != nil { + if err := plugin.sourceDB.Close(); err != nil { + log.Errorf("[mysqlInputPlugin.Close] error close db. %s", errors.Trace(err)) + } + } + + log.Infof("[mysqlInputPlugin] closed") + }) } diff --git a/position_store/position_store.go b/position_store/position_store.go index 89fd6882..e5c8d3d1 100644 --- a/position_store/position_store.go +++ b/position_store/position_store.go @@ -19,7 +19,7 @@ var myJson = jsoniter.Config{SortMapKeys: true}.Froze() var ( oldTable = `cluster_gravity_binlog_position` - positionTableName = `gravity_binlog_position` + positionTableName = `gravity_positions` positionFullTableName = fmt.Sprintf("%s.%s", config.GravityDBName, positionTableName) createPositionTableStatement = fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s (