diff --git a/Makefile b/Makefile index a14f97e13a..9f9cdf00d6 100644 --- a/Makefile +++ b/Makefile @@ -157,10 +157,8 @@ dm_integration_test_build: retool_setup -coverpkg=github.com/pingcap/dm/... \ -o bin/dm-tracer.test github.com/pingcap/dm/cmd/dm-tracer \ || { $(FAILPOINT_DISABLE); exit 1; } - $(GOTEST) -c $(TEST_RACE_FLAG) -cover -covermode=atomic \ - -coverpkg=github.com/pingcap/dm/... \ - -o bin/dm-syncer.test github.com/pingcap/dm/cmd/dm-syncer \ - || { $(FAILPOINT_DISABLE); exit 1; } + CGO_ENABLED=1 GO111MODULE=on go build -o bin/dm-syncer ./cmd/dm-syncer + CGO_ENABLED=1 GO111MODULE=on go build -o bin/demo.so -buildmode=plugin ./syncer/plugin/demo/demo.go $(FAILPOINT_DISABLE) tests/prepare_tools.sh @@ -173,7 +171,7 @@ integration_test: check_third_party_binary @which bin/dm-master.test @which bin/dm-worker.test @which bin/dm-tracer.test - @which bin/dm-syncer.test + @which bin/dm-syncer tests/run.sh $(CASE) compatibility_test: check_third_party_binary diff --git a/_utils/terror_gen/errors_release.txt b/_utils/terror_gen/errors_release.txt index de802c0261..53f5d03d30 100644 --- a/_utils/terror_gen/errors_release.txt +++ b/_utils/terror_gen/errors_release.txt @@ -292,6 +292,7 @@ ErrSyncerUnitExecWithNoBlockingDDL,[code=36059:class=sync-unit:scope=internal:le ErrSyncerUnitGenBWList,[code=36060:class=sync-unit:scope=internal:level=high],"generate black white list" ErrSyncerUnitHandleDDLFailed,[code=36061:class=sync-unit:scope=internal:level=high],"fail to handle ddl job for %s" ErrSyncerShardDDLConflict,[code=36062:class=sync-unit:scope=internal:level=high],"fail to handle shard ddl %v in optimistic mode, because schema conflict detected" +ErrSyncerLoadPlugin,[code=36063:class=sync-unit:scope=internal:level=high],"fail to load plugin from %s" ErrMasterSQLOpNilRequest,[code=38001:class=dm-master:scope=internal:level=medium],"nil request not valid" ErrMasterSQLOpNotSupport,[code=38002:class=dm-master:scope=internal:level=medium],"op %s not supported" ErrMasterSQLOpWithoutSharding,[code=38003:class=dm-master:scope=internal:level=medium],"operate request without --sharding specified not valid" diff --git a/cmd/dm-syncer/config.go b/cmd/dm-syncer/config.go index 548b88483f..96f38ef5b8 100644 --- a/cmd/dm-syncer/config.go +++ b/cmd/dm-syncer/config.go @@ -56,6 +56,8 @@ type commonConfig struct { EnableANSIQuotes bool TimezoneStr string + PluginPath string + SyncerConfigFormat bool } @@ -77,6 +79,7 @@ func (c *commonConfig) newConfigFromSyncerConfig(args []string) (*config.SubTask MaxRetry: c.MaxRetry, EnableANSIQuotes: c.EnableANSIQuotes, TimezoneStr: c.TimezoneStr, + PluginPath: c.PluginPath, } cfg.FlagSet = flag.NewFlagSet("dm-syncer", flag.ContinueOnError) @@ -271,6 +274,8 @@ type syncerConfig struct { TimezoneStr string `toml:"timezone" json:"timezone"` Timezone *time.Location `json:"-"` + PluginPath string `toml:"plugin-path" json:"plugin-path"` + printVersion bool } @@ -349,6 +354,8 @@ func (oc *syncerConfig) convertToNewFormat() (*config.SubTaskConfig, error) { Timezone: oc.TimezoneStr, From: oc.From, To: oc.To, + + PluginPath: oc.PluginPath, } for _, rule := range oc.RouteRules { diff --git a/dm/config/subtask.go b/dm/config/subtask.go index 12cde3a473..68250cf0e6 100644 --- a/dm/config/subtask.go +++ b/dm/config/subtask.go @@ -181,6 +181,8 @@ type SubTaskConfig struct { PprofAddr string `toml:"pprof-addr" json:"pprof-addr"` StatusAddr string `toml:"status-addr" json:"status-addr"` + PluginPath string `toml:"plugin-path" json:"plugin-path"` + ConfigFile string `toml:"-" json:"config-file"` // still needed by Syncer / Loader bin diff --git a/pkg/terror/error_list.go b/pkg/terror/error_list.go index 0621398ade..797add249b 100644 --- a/pkg/terror/error_list.go +++ b/pkg/terror/error_list.go @@ -365,6 +365,7 @@ const ( codeSyncerUnitGenBWList codeSyncerUnitHandleDDLFailed codeSyncerShardDDLConflict + codeSyncerLoadPlugin ) // DM-master error code @@ -878,6 +879,8 @@ var ( ErrSyncerUnitHandleDDLFailed = New(codeSyncerUnitHandleDDLFailed, ClassSyncUnit, ScopeInternal, LevelHigh, "fail to handle ddl job for %s") ErrSyncerShardDDLConflict = New(codeSyncerShardDDLConflict, ClassSyncUnit, ScopeInternal, LevelHigh, "fail to handle shard ddl %v in optimistic mode, because schema conflict detected") + ErrSyncerLoadPlugin = New(codeSyncerLoadPlugin, ClassSyncUnit, ScopeInternal, LevelHigh, "fail to load plugin from %s") + // DM-master error ErrMasterSQLOpNilRequest = New(codeMasterSQLOpNilRequest, ClassDMMaster, ScopeInternal, LevelMedium, "nil request not valid") ErrMasterSQLOpNotSupport = New(codeMasterSQLOpNotSupport, ClassDMMaster, ScopeInternal, LevelMedium, "op %s not supported") diff --git a/syncer/optimist.go b/syncer/optimist.go index 1b83798a3e..0b4f080fed 100644 --- a/syncer/optimist.go +++ b/syncer/optimist.go @@ -170,8 +170,8 @@ func (s *Syncer) handleQueryEventOptimistic( return err } - if s.execErrorDetected.Get() { - return terror.ErrSyncerUnitHandleDDLFailed.Generate(ev.Query) + if detected, err := s.execError.Detected(); detected { + return terror.ErrSyncerUnitHandleDDLFailed.Delegate(err, ev.Query) } for _, table := range onlineDDLTableNames { diff --git a/syncer/plugin/demo/demo.go b/syncer/plugin/demo/demo.go new file mode 100644 index 0000000000..a733ae7cba --- /dev/null +++ b/syncer/plugin/demo/demo.go @@ -0,0 +1,156 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "database/sql" + "fmt" + "strings" + + "github.com/pingcap/dm/dm/config" + "github.com/pingcap/log" + "github.com/pingcap/parser" + "github.com/pingcap/parser/ast" + "github.com/pingcap/parser/format" + "github.com/pingcap/parser/model" + "github.com/pingcap/tidb-tools/pkg/dbutil" + "github.com/siddontang/go-mysql/replication" + "go.uber.org/zap" +) + +// DemoPlugin is a demo to show how to use plugin +type DemoPlugin struct { + db *sql.DB +} + +// NewPlugin creates a new DemoPlugin +func NewPlugin() interface{} { + return &DemoPlugin{} +} + +// Init implements Plugin's Init +func (dp *DemoPlugin) Init(cfg *config.SubTaskConfig) error { + log.Info("demo plugin initialize") + + dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&interpolateParams=true", + cfg.To.User, cfg.To.Password, cfg.To.Host, cfg.To.Port) + db, err := sql.Open("mysql", dsn) + if err != nil { + return err + } + dp.db = db + + return nil +} + +// HandleDDLJobResult implements Plugin's HandleDDLJobResult +// for example: +// ev.Query is `ALTER TABLE test.t1 MODIFY COLUMN name varchar(50);` +// error is `unsupported modify column length 50 is less than origin 100` +func (dp *DemoPlugin) HandleDDLJobResult(ev *replication.QueryEvent, err error) error { + if err == nil { + return nil + } + + log.Info("demo plugin handle ddl job result", zap.String("query", string(ev.Query)), zap.Error(err)) + + if !strings.Contains(err.Error(), "unsupported modify column length") { + log.Info("don't contain error message \"unsupported modify column length\"") + return nil + } + + stmt, err := parser.New().ParseOneStmt(string(ev.Query), "", "") + if err != nil { + log.Info("parser failed", zap.Error(err)) + return err + } + + schema := string(ev.Schema) + + switch st := stmt.(type) { + case *ast.AlterTableStmt: + switch st.Specs[0].Tp { + case ast.AlterTableModifyColumn: + originColName := st.Specs[0].NewColumns[0].Name.Name.O + tmpColName := fmt.Sprintf("%s_tmp", st.Specs[0].NewColumns[0].Name) + + // get origin column from ast, originCol is `name varchar(50)` + var sb strings.Builder + st.Specs[0].NewColumns[0].Restore(format.NewRestoreCtx(format.DefaultRestoreFlags, &sb)) + originCol := sb.String() + + // generate tmp column, tmpCol is `name_tmp varchar(50)` + st.Specs[0].NewColumns[0].Name.Name = model.NewCIStr(tmpColName) + var sb2 strings.Builder + st.Specs[0].NewColumns[0].Restore(format.NewRestoreCtx(format.DefaultRestoreFlags, &sb2)) + tmpCol := sb2.String() + + // get table infomation, used to get primary key and unique key + ctx := context.Background() + tableInfo, err := dbutil.GetTableInfo(ctx, dp.db, schema, st.Table.Name.O, "") + if err != nil { + log.Info("get table information failed", zap.Error(err)) + return err + } + keys, _ := dbutil.SelectUniqueOrderKey(tableInfo) + keysList := strings.Join(keys, ", ") + + addColSQL := fmt.Sprintf("alter table `%s`.`%s` add column %s after %s", schema, st.Table.Name.O, tmpCol, originColName) + log.Info("execute", zap.String("sql", addColSQL)) + _, err = dp.db.ExecContext(ctx, addColSQL) + if err != nil { + log.Info("execute sql failed", zap.String("sql", addColSQL), zap.Error(err)) + return err + } + + insertSQL := fmt.Sprintf("replace into `%s`.`%s`(%s, %s) SELECT %s, %s AS %s FROM `%s`.`%s`;", schema, st.Table.Name.O, keysList, tmpColName, keysList, originColName, tmpColName, schema, st.Table.Name.O) + log.Info("execute", zap.String("sql", insertSQL)) + _, err = dp.db.ExecContext(ctx, insertSQL) + if err != nil { + log.Info("execute sql failed", zap.String("sql", insertSQL), zap.Error(err)) + return err + } + + dropColSQL := fmt.Sprintf("alter table `%s`.`%s` drop column %s", schema, st.Table.Name.O, originColName) + log.Info("execute", zap.String("sql", dropColSQL)) + _, err = dp.db.ExecContext(ctx, dropColSQL) + if err != nil { + log.Info("execute sql failed", zap.String("sql", dropColSQL), zap.Error(err)) + return err + } + + changeColSQL := fmt.Sprintf("ALTER TABLE `%s`.`%s` CHANGE COLUMN %s %s;", schema, st.Table.Name.O, tmpColName, originCol) + log.Info("execute", zap.String("sql", changeColSQL)) + _, err = dp.db.ExecContext(ctx, changeColSQL) + if err != nil { + log.Info("execute sql failed", zap.String("sql", changeColSQL), zap.Error(err)) + return err + } + default: + log.Info("unhandle ddl type") + return nil + } + default: + log.Info("unhandle ddl type") + return nil + } + + return nil +} + +// HandleDMLJobResult implements Plugin's HandleDMLJobResult +func (dp *DemoPlugin) HandleDMLJobResult(ev *replication.RowsEvent, err error) error { + return err +} diff --git a/syncer/plugin/plugin.go b/syncer/plugin/plugin.go new file mode 100644 index 0000000000..973895481f --- /dev/null +++ b/syncer/plugin/plugin.go @@ -0,0 +1,86 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package plugin + +import ( + "plugin" + + "github.com/pingcap/dm/dm/config" + "github.com/pingcap/dm/pkg/terror" + "github.com/siddontang/go-mysql/replication" +) + +var ( + createPluginFunc = "NewPlugin" +) + +// LoadPlugin loads plugin by plugin's file path +func LoadPlugin(filepath string) (Plugin, error) { + if len(filepath) == 0 { + return new(NilPlugin), nil + } + + p, err := plugin.Open(filepath) + if err != nil { + return nil, terror.ErrSyncerLoadPlugin.Delegate(err, filepath) + } + + pluginSymbol, err := p.Lookup(createPluginFunc) + if err != nil { + return nil, terror.ErrSyncerLoadPlugin.Delegate(err, filepath) + } + + newPlugin, ok := pluginSymbol.(func() interface{}) + if !ok { + return nil, terror.ErrSyncerLoadPlugin.Delegate(err, filepath) + } + + plg := newPlugin() + plg2, ok := plg.(Plugin) + if !ok { + return nil, terror.ErrSyncerLoadPlugin.Delegate(err, filepath) + } + + return plg2, nil +} + +// Plugin is a struct of plugin used in syncer unit +type Plugin interface { + // Init do some init job + Init(cfg *config.SubTaskConfig) error + + // HandleDDLJobResult handles the result of ddl job + HandleDDLJobResult(ev *replication.QueryEvent, err error) error + + // HandleDMLJobResult handles the result of dml job + HandleDMLJobResult(ev *replication.RowsEvent, err error) error +} + +// NilPlugin is a plugin which do nothing +type NilPlugin struct{} + +// Init implements Plugin's Init +func (n *NilPlugin) Init(cfg *config.SubTaskConfig) error { + return nil +} + +// HandleDDLJobResult implements Plugin's HandleDDLJobResult +func (n *NilPlugin) HandleDDLJobResult(ev *replication.QueryEvent, err error) error { + return err +} + +// HandleDMLJobResult implements Plugin's HandleDMLJobResult +func (n *NilPlugin) HandleDMLJobResult(ev *replication.RowsEvent, err error) error { + return err +} diff --git a/syncer/syncer.go b/syncer/syncer.go index a192501b90..98db46de0b 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -55,6 +55,7 @@ import ( "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/pkg/tracing" "github.com/pingcap/dm/pkg/utils" + "github.com/pingcap/dm/syncer/plugin" sm "github.com/pingcap/dm/syncer/safe-mode" "github.com/pingcap/dm/syncer/shardddl" operator "github.com/pingcap/dm/syncer/sql-operator" @@ -156,7 +157,7 @@ type Syncer struct { // record process error rather than log.Fatal runFatalChan chan *pb.ProcessError // record whether error occurred when execute SQLs - execErrorDetected sync2.AtomicBool + execError executeError execErrors struct { sync.Mutex @@ -178,6 +179,8 @@ type Syncer struct { } addJobFunc func(*job) error + + plugin plugin.Plugin } // NewSyncer creates a new Syncer. @@ -374,10 +377,26 @@ func (s *Syncer) Init(ctx context.Context) (err error) { } rollbackHolder.Add(fr.FuncRollback{Name: "remove-active-realylog", Fn: s.removeActiveRelayLog}) + // init plugin + err = s.initPlugin() + if err != nil { + return err + } + s.reset() return nil } +// initPlugin initializes the plugin +func (s *Syncer) initPlugin() error { + plugin, err := plugin.LoadPlugin(s.cfg.PluginPath) + if err != nil { + return err + } + s.plugin = plugin + return s.plugin.Init(s.cfg) +} + // initShardingGroups initializes sharding groups according to source MySQL, filter rules and router rules // NOTE: now we don't support modify router rules after task has started func (s *Syncer) initShardingGroups() error { @@ -443,7 +462,7 @@ func (s *Syncer) reset() { // create new job chans s.newJobChans(s.cfg.WorkerCount + 1) - s.execErrorDetected.Set(false) + s.execError.Set(nil) s.resetExecErrors() switch s.cfg.ShardMode { @@ -813,7 +832,7 @@ func (s *Syncer) resetShardingGroup(schema, table string) { // // we may need to refactor the concurrency model to make the work-flow more clearer later func (s *Syncer) flushCheckPoints() error { - if s.execErrorDetected.Get() { + if detected, _ := s.execError.Detected(); detected { s.tctx.L().Warn("error detected when executing SQL job, skip flush checkpoint", zap.Stringer("checkpoint", s.checkpoint)) return nil } @@ -923,10 +942,7 @@ func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *DBConn, } s.jobWg.Done() if err != nil { - s.execErrorDetected.Set(true) - if !utils.IsContextCanceledError(err) { - s.runFatalChan <- unit.NewProcessError(err) - } + s.execError.Set(err) continue } s.addCount(true, queueBucket, sqlJob.tp, int64(len(sqlJob.ddls))) @@ -955,7 +971,7 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, jo } fatalF := func(err error) { - s.execErrorDetected.Set(true) + s.execError.Set(err) if !utils.IsContextCanceledError(err) { s.runFatalChan <- unit.NewProcessError(err) } @@ -1299,13 +1315,15 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } case *replication.RowsEvent: err = s.handleRowsEvent(ev, ec) - if err != nil { - return terror.Annotatef(err, "current location %s", currentLocation) + err1 := s.plugin.HandleDMLJobResult(ev, err) + if err1 != nil { + return terror.Annotatef(err1, "current location %s", currentLocation) } case *replication.QueryEvent: err = s.handleQueryEvent(ev, ec) - if err != nil { - return terror.Annotatef(err, "current location %s", currentLocation) + err1 := s.plugin.HandleDDLJobResult(ev, err) + if err1 != nil { + return terror.Annotatef(err1, "current location %s", currentLocation) } case *replication.XIDEvent: if shardingReSync != nil { @@ -1749,9 +1767,9 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e } // when add ddl job, will execute ddl and then flush checkpoint. - // if execute ddl failed, the execErrorDetected will be true. - if s.execErrorDetected.Get() { - return terror.ErrSyncerUnitHandleDDLFailed.Generate(ev.Query) + // if execute ddl failed, the detected will be true. + if detected, err := s.execError.Detected(); detected { + return terror.ErrSyncerUnitHandleDDLFailed.Delegate(err, ev.Query) } s.tctx.L().Info("finish to handle ddls in normal mode", zap.String("event", "query"), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), log.WrapStringerField("location", ec.currentLocation)) @@ -1935,8 +1953,8 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e return err } - if s.execErrorDetected.Get() { - return terror.ErrSyncerUnitHandleDDLFailed.Generate(ev.Query) + if detected, err := s.execError.Detected(); detected { + return terror.ErrSyncerUnitHandleDDLFailed.Delegate(err, ev.Query) } if len(onlineDDLTableNames) > 0 { diff --git a/syncer/util.go b/syncer/util.go index c264e71e3f..1682fd5e0d 100644 --- a/syncer/util.go +++ b/syncer/util.go @@ -17,6 +17,7 @@ import ( "fmt" "os" "strconv" + "sync" "github.com/pingcap/parser/ast" "github.com/pingcap/tidb-tools/pkg/filter" @@ -103,6 +104,25 @@ func getDBConfigFromEnv() config.DBConfig { } } +type executeError struct { + sync.RWMutex + + err error +} + +func (e *executeError) Detected() (bool, error) { + e.RLock() + defer e.RUnlock() + + return e.err != nil, e.err +} + +func (e *executeError) Set(err error) { + e.Lock() + e.err = err + e.Unlock() +} + // record source tbls record the tables that need to flush checkpoints func recordSourceTbls(sourceTbls map[string]map[string]struct{}, stmt ast.StmtNode, table *filter.Table) { schema, name := table.Schema, table.Name diff --git a/tests/_utils/run_dm_syncer b/tests/_utils/run_dm_syncer index 9b4c81f3bd..bee13db761 100755 --- a/tests/_utils/run_dm_syncer +++ b/tests/_utils/run_dm_syncer @@ -5,7 +5,7 @@ set -eu workdir=$1 config=$2 -binary=$PWD/bin/dm-syncer.test +binary=$PWD/bin/dm-syncer PWD=$(pwd) @@ -23,7 +23,6 @@ fi cd $workdir echo "$binary --log-file="$workdir/log/dm-syncer.log" --config="$config" $meta $format >> $workdir/log/stdout.log 2>&1 &" -$binary -test.coverprofile="$TEST_DIR/cov.$TEST_NAME.syncer.out" DEVEL \ --L=Debug --log-file="$workdir/log/dm-syncer.log" --config="$config" $name $meta $format >> $workdir/log/stdout.log 2>&1 & +$binary -L=Debug --log-file="$workdir/log/dm-syncer.log" --config="$config" $name $meta $format >> $workdir/log/stdout.log 2>&1 & cd $PWD diff --git a/tests/_utils/test_prepare b/tests/_utils/test_prepare index 1a5c291bec..a52c830bd5 100644 --- a/tests/_utils/test_prepare +++ b/tests/_utils/test_prepare @@ -14,12 +14,12 @@ function cleanup_process() { pkill -hup dm-worker.test 2>/dev/null || true pkill -hup dm-master.test 2>/dev/null || true pkill -hup dm-tracer.test 2>/dev/null || true - pkill -hup dm-syncer.test 2>/dev/null || true + pkill -hup dm-syncer 2>/dev/null || true wait_process_exit dm-master.test wait_process_exit dm-worker.test wait_process_exit dm-tracer.test - wait_process_exit dm-syncer.test + wait_process_exit dm-syncer } if [ "$RESET_MASTER" = true ]; then diff --git a/tests/dm_syncer/conf/dm-syncer-1.toml b/tests/dm_syncer/conf/dm-syncer-1.toml index 3d9b8bef54..b97e3529e7 100644 --- a/tests/dm_syncer/conf/dm-syncer-1.toml +++ b/tests/dm_syncer/conf/dm-syncer-1.toml @@ -23,6 +23,8 @@ disable-heartbeat = true # replicate from relay log or remote binlog binlog-type = "remote" +# plugin configuration + # Mydumper configuration # -t, --threads diff --git a/tests/dm_syncer/data/db1.increment.sql b/tests/dm_syncer/data/db1.increment.sql index e8f75e3ea2..94f449d9a3 100644 --- a/tests/dm_syncer/data/db1.increment.sql +++ b/tests/dm_syncer/data/db1.increment.sql @@ -23,3 +23,9 @@ insert into t1 (id, name, info) values (7, 'gentest', '{"id": 126}'); update t1 set name = 'gentestxxxxxx' where gen_id = 124; -- delete with unique key delete from t1 where gen_id > 124; + +create table t11(id int primary key, name varchar(100)); +insert into t11 values(1, "a"),(2, "b"); +/* alter table will failed when execute in TiDB, will be handled by plugin */ +ALTER TABLE t11 MODIFY COLUMN name varchar(50); +insert into t11 values(3, "c"),(4, "d"); diff --git a/tests/dm_syncer/run.sh b/tests/dm_syncer/run.sh index ce2e803cb4..87b54c57ed 100755 --- a/tests/dm_syncer/run.sh +++ b/tests/dm_syncer/run.sh @@ -63,10 +63,14 @@ function run() { name1=$(grep "Log: " $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata|awk -F: '{print $2}'|tr -d ' ') pos1=$(grep "Pos: " $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata|awk -F: '{print $2}'|tr -d ' ') fi + sed -i "s/binlog-name-placeholder-1/\"$name1\"/g" $WORK_DIR/dm-syncer-1.toml sed -i "s/binlog-pos-placeholder-1/$pos1/g" $WORK_DIR/dm-syncer-1.toml sed -i "s/binlog-name-placeholder-2/\"$name2\"/g" $WORK_DIR/old_meta_file sed -i "s/binlog-pos-placeholder-2/$pos2/g" $WORK_DIR/old_meta_file + plugin_so=$PWD/bin/demo.so + sed -i "/plugin/i\plugin-path = \"$plugin_so\"" $WORK_DIR/dm-syncer-1.toml + run_dm_syncer $WORK_DIR/syncer1 $WORK_DIR/dm-syncer-1.toml meta_file=$WORK_DIR/old_meta_file run_dm_syncer $WORK_DIR/syncer2 $WORK_DIR/dm-syncer-2.toml $meta_file --syncer-config-format syncer2