Skip to content

Commit

Permalink
fix repliation mode deadlock (#4)
Browse files Browse the repository at this point in the history
* fix repliation mode hang when close the server
  • Loading branch information
ming535 authored Dec 28, 2018
1 parent 8cd206f commit 797ad2d
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 21 deletions.
61 changes: 61 additions & 0 deletions docs/2.0/example-mysql2mysql-full.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# 整个配置由 4 部分组成:
# - input: 定义 input plugin 的配置
# - filters: 定义 filters plugin 的配置,filter 用来对数据流做变更操作
# - output: 定义 output plugin 的配置
# - system: 定义系统级配置
#
# 围绕 core.Msg, 系统定义若干个 match 函数,在配置文件里使用 match 函数
# 来匹配 filter 和 output 的路由,filter/output 里的每一个 match 函数
# 都匹配才算满足匹配规则
#
[input.mysql]
mode = "stream"
ignore-bidirectional-data = true

[input.mysql]
mode = "stream"
[input.mysql.source]
host = "127.0.0.1"
username = "root"
password = ""
port = 3306

[[filters]]
type = "reject"
match-schema = "test_db"
match-table = "test_table"

[[filters]]
type = "rename-dml-column"
match-schema = "test"
match-table = "test_table_2"
from = ["b"]
to = ["d"]

[[filters]]
type = "delete-dml-column"
match-schema = "test"
match-table = "test_table"
columns = ["e", "f"]

[[filters]]
type = "dml-pk-override"
match-schema = "test"
match-table = "test_table"
id = "another_id"

[output.kafka]
broker-addrs = ["1.2.3.4:9002"]
mode = "async"

[[output.kafka.routes]]
match-schema = "test_db"
match-table = "test_table_2"
dml-topic = "binlog.test_db.test_table_2"

[system.scheduler]
type = "batch-scheduler"
nr-worker = 1
batch-size = 2
queue-size = 1024
sliding-window-size = 1024
12 changes: 9 additions & 3 deletions gravity/inputs/helper/two_stage_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,18 @@ func (i *TwoStageInputPlugin) Start(emitter core.Emitter) error {
}

go func() {

pos, ok := <-i.full.Done()
if !ok {
log.Info("[TwoStageInputPlugin] full stage done")
return
}

i.transitionMutex.Lock()
defer i.transitionMutex.Unlock()

pos, ok := <-i.full.Done()
if !ok || i.closed {
log.Info("[TwoStageInputPlugin] full stage cancelled")
if i.closed {
log.Info("[TwoStageInputPlugin] full stage closed")
return
}

Expand Down
15 changes: 11 additions & 4 deletions gravity/inputs/mysqlbatch/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,16 +236,22 @@ func (plugin *mysqlFullInput) Wait() {
}

func (plugin *mysqlFullInput) Close() {
log.Infof("[scanner server] closing...")

plugin.closeOnce.Do(func() {
plugin.cancel()
log.Infof("[scanner server] closing...")

if plugin.cancel != nil {
plugin.cancel()
}

for i := range plugin.tableScanners {
plugin.tableScanners[i].Wait()
}

plugin.sourceDB.Close()
if plugin.sourceDB != nil {
plugin.sourceDB.Close()
}


if plugin.scanDB != nil {
plugin.scanDB.Close()
Expand All @@ -254,9 +260,10 @@ func (plugin *mysqlFullInput) Close() {
if plugin.sourceSchemaStore != nil {
plugin.sourceSchemaStore.Close()
}

log.Infof("[mysqlFullInput] closed")
})

log.Infof("[mysqlFullInput] closed")
}

func (plugin *mysqlFullInput) waitFinish() {
Expand Down
5 changes: 4 additions & 1 deletion gravity/inputs/mysqlbatch/msg.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package mysqlbatch

import (
"github.com/moiot/gravity/position_store"
"time"

log "github.com/sirupsen/logrus"
Expand All @@ -22,7 +23,8 @@ func NewMsg(
rowPtrs []interface{},
columnTypes []*sql.ColumnType,
sourceTableDef *schema_store.Table,
callbackFunc core.AfterMsgCommitFunc) *core.Msg {
callbackFunc core.AfterMsgCommitFunc,
position position_store.MySQLTablePosition) *core.Msg {

columnDataMap := mysql.SQLDataPtrs2Val(rowPtrs, columnTypes)
msg := core.Msg{
Expand Down Expand Up @@ -57,6 +59,7 @@ func NewMsg(
msg.OutputStreamKey = utils.NewStringPtr(msg.GetPkSign())
msg.Done = make(chan struct{})
msg.AfterCommitCallback = callbackFunc
msg.InputContext = position
msg.Metrics = core.Metrics{
MsgCreateTime: time.Now(),
}
Expand Down
7 changes: 3 additions & 4 deletions gravity/inputs/mysqlbatch/mysql_table_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ func (tableScanner *TableScanner) LoopInBatch(db *sql.DB, tableDef *schema_store
tableScanner.positionStore.PutCurrent(utils.TableIdentity(tableDef.Schema, tableDef.Name), min)
currentMinPos = min
}
log.Infof("[LoopInBatch] prepare current: %v", currentMinPos)

currentMinValue := currentMinPos.Value
resultCount := 0
Expand Down Expand Up @@ -312,9 +313,7 @@ func (tableScanner *TableScanner) LoopInBatch(db *sql.DB, tableDef *schema_store
posV := mysql.NormalizeSQLType(reflect.ValueOf(rowPtrs[scanIdx]).Elem().Interface())
position := position_store.MySQLTablePosition{Value: posV, Column: scanColumn}

msg := NewMsg(rowPtrs, columnTypes, tableDef, tableScanner.AfterMsgCommit)
msg.AfterCommitCallback = tableScanner.AfterMsgCommit
msg.InputContext = position
msg := NewMsg(rowPtrs, columnTypes, tableDef, tableScanner.AfterMsgCommit, position)

if err := tableScanner.emitter.Emit(msg); err != nil {
log.Fatalf("[LoopInBatch] failed to emit job: %v", errors.ErrorStack(err))
Expand Down Expand Up @@ -355,7 +354,7 @@ func (tableScanner *TableScanner) FindAll(db *sql.DB, tableDef *schema_store.Tab

for i := range allData {
rowPtrs := allData[i]
msg := NewMsg(rowPtrs, columnTypes, tableDef, nil)
msg := NewMsg(rowPtrs, columnTypes, tableDef, nil, position_store.MySQLTablePosition{})
if err := tableScanner.emitter.Emit(msg); err != nil {
log.Fatalf("[tableScanner] failed to emit: %v", errors.ErrorStack(err))
}
Expand Down
25 changes: 17 additions & 8 deletions gravity/inputs/mysqlstream/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,19 +220,28 @@ func (plugin *mysqlInputPlugin) Start(emitter core.Emitter) error {
}

func (plugin *mysqlInputPlugin) Close() {
log.Infof("[mysqlInputPlugin] closing...")

plugin.closeOnce.Do(func() {
plugin.binlogChecker.Stop()
log.Infof("[mysqlInputPlugin] closing...")

plugin.binlogTailer.Close()
if plugin.binlogChecker != nil {
plugin.binlogChecker.Stop()
}

plugin.sourceSchemaStore.Close()
if plugin.binlogTailer != nil {
plugin.binlogTailer.Close()
}

if err := plugin.sourceDB.Close(); err != nil {
log.Errorf("[mysqlInputPlugin.Close] error close db. %s", errors.Trace(err))
if plugin.sourceSchemaStore != nil {
plugin.sourceSchemaStore.Close()
}
})

log.Infof("[mysqlInputPlugin] closed")
if plugin.sourceDB != nil {
if err := plugin.sourceDB.Close(); err != nil {
log.Errorf("[mysqlInputPlugin.Close] error close db. %s", errors.Trace(err))
}
}

log.Infof("[mysqlInputPlugin] closed")
})
}
2 changes: 1 addition & 1 deletion position_store/position_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ var myJson = jsoniter.Config{SortMapKeys: true}.Froze()

var (
oldTable = `cluster_gravity_binlog_position`
positionTableName = `gravity_binlog_position`
positionTableName = `gravity_positions`
positionFullTableName = fmt.Sprintf("%s.%s", config.GravityDBName, positionTableName)
createPositionTableStatement = fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
Expand Down

0 comments on commit 797ad2d

Please sign in to comment.