From 41be7558a1dd5a5ee95885017144f603351a5a13 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Fri, 28 Jun 2019 17:50:19 +0800 Subject: [PATCH] syncer: support multiple ddls in single shard (#177) --- syncer/checkpoint.go | 15 +- syncer/checkpoint_test.go | 8 +- syncer/sharding-meta/shardmeta.go | 256 ++++++++++++ syncer/sharding-meta/shardmeta_test.go | 272 ++++++++++++ syncer/sharding_group.go | 392 +++++++++++++----- syncer/sharding_group_test.go | 23 +- syncer/syncer.go | 227 ++++++---- .../check_safe_mode.go | 0 tests/others_integration.txt | 2 + tests/safe_mode/run.sh | 3 +- .../sequence_safe_mode/conf/diff_config.toml | 57 +++ tests/sequence_safe_mode/conf/dm-master.toml | 9 + tests/sequence_safe_mode/conf/dm-task.yaml | 87 ++++ tests/sequence_safe_mode/conf/dm-tracer.toml | 4 + tests/sequence_safe_mode/conf/dm-worker1.toml | 21 + tests/sequence_safe_mode/conf/dm-worker2.toml | 21 + .../sequence_safe_mode/data/db1.increment.sql | 18 + .../data/db1.increment2.sql | 16 + tests/sequence_safe_mode/data/db1.prepare.sql | 7 + .../sequence_safe_mode/data/db2.increment.sql | 13 + .../data/db2.increment2.sql | 10 + tests/sequence_safe_mode/data/db2.prepare.sql | 7 + tests/sequence_safe_mode/run.sh | 99 +++++ tests/sequence_sharding/conf/diff_config.toml | 58 +++ tests/sequence_sharding/conf/dm-master.toml | 9 + tests/sequence_sharding/conf/dm-task.yaml | 87 ++++ tests/sequence_sharding/conf/dm-worker1.toml | 15 + tests/sequence_sharding/conf/dm-worker2.toml | 15 + .../sequence_sharding/data/db1.increment.sql | 22 + tests/sequence_sharding/data/db1.prepare.sql | 7 + .../sequence_sharding/data/db2.increment.sql | 27 ++ tests/sequence_sharding/data/db2.prepare.sql | 9 + tests/sequence_sharding/run.sh | 42 ++ 33 files changed, 1661 insertions(+), 197 deletions(-) create mode 100644 syncer/sharding-meta/shardmeta.go create mode 100644 syncer/sharding-meta/shardmeta_test.go rename tests/{safe_mode => _dmctl_tools}/check_safe_mode.go (100%) create mode 100644 tests/sequence_safe_mode/conf/diff_config.toml create mode 100644 tests/sequence_safe_mode/conf/dm-master.toml create mode 100644 tests/sequence_safe_mode/conf/dm-task.yaml create mode 100644 tests/sequence_safe_mode/conf/dm-tracer.toml create mode 100644 tests/sequence_safe_mode/conf/dm-worker1.toml create mode 100644 tests/sequence_safe_mode/conf/dm-worker2.toml create mode 100644 tests/sequence_safe_mode/data/db1.increment.sql create mode 100644 tests/sequence_safe_mode/data/db1.increment2.sql create mode 100644 tests/sequence_safe_mode/data/db1.prepare.sql create mode 100644 tests/sequence_safe_mode/data/db2.increment.sql create mode 100644 tests/sequence_safe_mode/data/db2.increment2.sql create mode 100644 tests/sequence_safe_mode/data/db2.prepare.sql create mode 100755 tests/sequence_safe_mode/run.sh create mode 100644 tests/sequence_sharding/conf/diff_config.toml create mode 100644 tests/sequence_sharding/conf/dm-master.toml create mode 100644 tests/sequence_sharding/conf/dm-task.yaml create mode 100644 tests/sequence_sharding/conf/dm-worker1.toml create mode 100644 tests/sequence_sharding/conf/dm-worker2.toml create mode 100644 tests/sequence_sharding/data/db1.increment.sql create mode 100644 tests/sequence_sharding/data/db1.prepare.sql create mode 100644 tests/sequence_sharding/data/db2.increment.sql create mode 100644 tests/sequence_sharding/data/db2.prepare.sql create mode 100755 tests/sequence_sharding/run.sh diff --git a/syncer/checkpoint.go b/syncer/checkpoint.go index 465f789029..4f5452b5d1 100644 --- a/syncer/checkpoint.go +++ b/syncer/checkpoint.go @@ -145,10 +145,12 @@ type CheckPoint interface { // corresponding to Meta.Save SaveGlobalPoint(pos mysql.Position) - // FlushGlobalPointsExcept flushes the global checkpoint and tables' checkpoints except exceptTables + // FlushGlobalPointsExcept flushes the global checkpoint and tables' + // checkpoints except exceptTables, it also flushes SQLs with Args providing + // by extraSQLs and extraArgs. Currently extraSQLs contain shard meta only. // @exceptTables: [[schema, table]... ] // corresponding to Meta.Flush - FlushPointsExcept(exceptTables [][]string) error + FlushPointsExcept(exceptTables [][]string, extraSQLs []string, extraArgs [][]interface{}) error // GlobalPoint returns the global binlog stream's checkpoint // corresponding to to Meta.Pos @@ -180,7 +182,7 @@ type RemoteCheckPoint struct { db *Conn schema string // schema name, set through task config table string // table name, now it's task name - id string // checkpoint ID, now it is `server-id` used as MySQL slave + id string // checkpoint ID, now it is `source-id` // source-schema -> source-table -> checkpoint // used to filter the synced binlog when re-syncing for sharding group @@ -330,7 +332,7 @@ func (cp *RemoteCheckPoint) SaveGlobalPoint(pos mysql.Position) { } // FlushPointsExcept implements CheckPoint.FlushPointsExcept -func (cp *RemoteCheckPoint) FlushPointsExcept(exceptTables [][]string) error { +func (cp *RemoteCheckPoint) FlushPointsExcept(exceptTables [][]string, extraSQLs []string, extraArgs [][]interface{}) error { cp.RLock() defer cp.RUnlock() @@ -373,9 +375,12 @@ func (cp *RemoteCheckPoint) FlushPointsExcept(exceptTables [][]string) error { points = append(points, point) } - } } + for i := range extraSQLs { + sqls = append(sqls, extraSQLs[i]) + args = append(args, extraArgs[i]) + } err := cp.db.executeSQL(sqls, args, maxRetryCount) if err != nil { diff --git a/syncer/checkpoint_test.go b/syncer/checkpoint_test.go index 2c8573b20d..0713cd279e 100644 --- a/syncer/checkpoint_test.go +++ b/syncer/checkpoint_test.go @@ -145,7 +145,7 @@ func (s *testCheckpointSuite) testGlobalCheckPoint(c *C, cp CheckPoint) { s.mock.ExpectBegin() s.mock.ExpectExec(flushCheckPointSQL).WithArgs(cpid, "", "", pos1.Name, pos1.Pos, true, pos1.Name, pos1.Pos).WillReturnResult(sqlmock.NewResult(0, 1)) s.mock.ExpectCommit() - err = cp.FlushPointsExcept(nil) + err = cp.FlushPointsExcept(nil, nil, nil) c.Assert(err, IsNil) c.Assert(cp.GlobalPoint(), Equals, pos1) c.Assert(cp.FlushedGlobalPoint(), Equals, pos1) @@ -185,7 +185,7 @@ func (s *testCheckpointSuite) testGlobalCheckPoint(c *C, cp CheckPoint) { s.mock.ExpectBegin() s.mock.ExpectExec(flushCheckPointSQL).WithArgs(cpid, "", "", pos2.Name, pos2.Pos, true, pos2.Name, pos2.Pos).WillReturnResult(sqlmock.NewResult(0, 1)) s.mock.ExpectCommit() - err = cp.FlushPointsExcept(nil) + err = cp.FlushPointsExcept(nil, nil, nil) c.Assert(err, IsNil) cp.Rollback() c.Assert(cp.GlobalPoint(), Equals, pos2) @@ -267,7 +267,7 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) { s.mock.ExpectBegin() s.mock.ExpectExec(flushCheckPointSQL).WithArgs(cpid, schema, table, pos2.Name, pos2.Pos, false, pos2.Name, pos2.Pos).WillReturnResult(sqlmock.NewResult(0, 1)) s.mock.ExpectCommit() - err = cp.FlushPointsExcept(nil) + err = cp.FlushPointsExcept(nil, nil, nil) c.Assert(err, IsNil) cp.Rollback() newer = cp.IsNewerTablePoint(schema, table, pos1) @@ -303,7 +303,7 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) { s.mock.ExpectBegin() s.mock.ExpectExec(flushCheckPointSQL).WithArgs(cpid, "", "", pos2.Name, pos2.Pos, true, pos2.Name, pos2.Pos).WillReturnResult(sqlmock.NewResult(0, 1)) s.mock.ExpectCommit() - err = cp.FlushPointsExcept([][]string{{schema, table}}) + err = cp.FlushPointsExcept([][]string{{schema, table}}, nil, nil) c.Assert(err, IsNil) cp.Rollback() newer = cp.IsNewerTablePoint(schema, table, pos1) diff --git a/syncer/sharding-meta/shardmeta.go b/syncer/sharding-meta/shardmeta.go new file mode 100644 index 0000000000..fb156a18d7 --- /dev/null +++ b/syncer/sharding-meta/shardmeta.go @@ -0,0 +1,256 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package shardmeta + +import ( + "encoding/json" + "fmt" + + "github.com/pingcap/errors" + "github.com/siddontang/go-mysql/mysql" + + "github.com/pingcap/dm/pkg/log" + "github.com/pingcap/dm/pkg/utils" +) + +const ( + // MetaTableFormat is used in meta table name constructor + MetaTableFormat = "%s_syncer_sharding_meta" +) + +// DDLItem records ddl information used in sharding sequence organization +type DDLItem struct { + FirstPos mysql.Position `json:"first-pos"` // first DDL's binlog Pos, not the End_log_pos of the event + DDLs []string `json:"ddls"` // DDLs, these ddls are in the same QueryEvent + Source string `json:"source"` // source table ID +} + +// NewDDLItem creates a new DDLItem +func NewDDLItem(pos mysql.Position, ddls []string, source string) *DDLItem { + return &DDLItem{ + FirstPos: pos, + DDLs: ddls, + Source: source, + } +} + +// String returns the item's format string value +func (item *DDLItem) String() string { + return fmt.Sprintf("first-pos: %s ddls: %+v source: %s", item.FirstPos, item.DDLs, item.Source) +} + +// ShardingSequence records a list of DDLItem +type ShardingSequence struct { + Items []*DDLItem `json:"items"` +} + +// IsPrefixSequence checks whether a ShardingSequence is the prefix sequence of other. +func (seq *ShardingSequence) IsPrefixSequence(other *ShardingSequence) bool { + if len(seq.Items) > len(other.Items) { + return false + } + for idx := range seq.Items { + if !utils.CompareShardingDDLs(seq.Items[idx].DDLs, other.Items[idx].DDLs) { + return false + } + } + return true +} + +// String returns the ShardingSequence's json string +func (seq *ShardingSequence) String() string { + jsonSeq, err := json.Marshal(seq.Items) + if err != nil { + log.Errorf("marshal ShardingSequence to json error %v", err) + } + return string(jsonSeq) +} + +// ShardingMeta stores sharding ddl sequence +// including global sequence and each source's own sequence +// NOTE: sharding meta is not thread safe, it must be used in thread safe context +type ShardingMeta struct { + activeIdx int // the first unsynced DDL index + global *ShardingSequence // merged sharding sequence of all source tables + sources map[string]*ShardingSequence // source table ID -> its sharding sequence + schema string // schema name in downstream meta db + table string // table name used in downstream meta db +} + +// NewShardingMeta creates a new ShardingMeta +func NewShardingMeta(schema, table string) *ShardingMeta { + return &ShardingMeta{ + schema: schema, + table: table, + global: &ShardingSequence{Items: make([]*DDLItem, 0)}, + sources: make(map[string]*ShardingSequence), + } +} + +// RestoreFromData restores ShardingMeta from given data +func (meta *ShardingMeta) RestoreFromData(sourceTableID string, activeIdx int, isGlobal bool, data []byte) error { + items := make([]*DDLItem, 0) + err := json.Unmarshal(data, &items) + if err != nil { + return errors.Trace(err) + } + if isGlobal { + meta.global = &ShardingSequence{Items: items} + } else { + meta.sources[sourceTableID] = &ShardingSequence{Items: items} + } + meta.activeIdx = activeIdx + return nil +} + +// ActiveIdx returns the activeIdx of sharding meta +func (meta *ShardingMeta) ActiveIdx() int { + return meta.activeIdx +} + +func (meta *ShardingMeta) reinitialize() { + meta.activeIdx = 0 + meta.global = &ShardingSequence{make([]*DDLItem, 0)} + meta.sources = make(map[string]*ShardingSequence) +} + +// checkItemExists checks whether DDLItem exists in its source sequence +// if exists, return the index of DDLItem in source sequence. +// if not exists, return the next index in source sequence. +func (meta *ShardingMeta) checkItemExists(item *DDLItem) (int, bool) { + source, ok := meta.sources[item.Source] + if !ok { + return 0, false + } + for idx, ddlItem := range source.Items { + if item.FirstPos.Compare(ddlItem.FirstPos) == 0 { + return idx, true + } + } + return len(source.Items), false +} + +// AddItem adds a new comming DDLItem into ShardingMeta +// 1. if DDLItem already exists in source sequence, check whether it is active DDL only +// 2. add the DDLItem into its related source sequence +// 3. if it is a new DDL in global sequence, add it into global sequence +// 4. check the source sequence is the prefix-sequence of global sequence, if not, return an error +// returns: +// active: whether the DDL will be processed in this round +func (meta *ShardingMeta) AddItem(item *DDLItem) (active bool, err error) { + index, exists := meta.checkItemExists(item) + if exists { + return index == meta.activeIdx, nil + } + + if source, ok := meta.sources[item.Source]; !ok { + meta.sources[item.Source] = &ShardingSequence{Items: []*DDLItem{item}} + } else { + source.Items = append(source.Items, item) + } + + found := false + for _, globalItem := range meta.global.Items { + if utils.CompareShardingDDLs(item.DDLs, globalItem.DDLs) { + found = true + break + } + } + if !found { + meta.global.Items = append(meta.global.Items, item) + } + + global, source := meta.global, meta.sources[item.Source] + if !source.IsPrefixSequence(global) { + return false, errors.Errorf("detect inconsistent DDL sequence from source %+v, right DDL sequence should be %+v", source.Items, global.Items) + } + + return index == meta.activeIdx, nil +} + +// GetGlobalActiveDDL returns activeDDL in global sequence +func (meta *ShardingMeta) GetGlobalActiveDDL() *DDLItem { + if meta.activeIdx < len(meta.global.Items) { + return meta.global.Items[meta.activeIdx] + } + return nil +} + +// GetGlobalItems returns global DDLItems +func (meta *ShardingMeta) GetGlobalItems() []*DDLItem { + return meta.global.Items +} + +// GetActiveDDLItem returns the source table's active DDLItem +// if in DDL unsynced procedure, the active DDLItem means the syncing DDL +// if in re-sync procedure, the active DDLItem means the next syncing DDL in DDL syncing sequence, may be nil +func (meta *ShardingMeta) GetActiveDDLItem(tableSource string) *DDLItem { + source, ok := meta.sources[tableSource] + if !ok { + return nil + } + if meta.activeIdx < len(source.Items) { + return source.Items[meta.activeIdx] + } + return nil +} + +// InSequenceSharding returns whether in sequence sharding +func (meta *ShardingMeta) InSequenceSharding() bool { + globalItemCount := len(meta.global.Items) + return globalItemCount > 0 && meta.activeIdx < globalItemCount +} + +// ResolveShardingDDL resolves one sharding DDL and increase activeIdx +// if activeIdx equals to the length of global sharding sequence, it means all +// sharding DDL in this ShardingMeta sequence is resolved and will reinitialize +// the ShardingMeta, return true if all DDLs are resolved. +func (meta *ShardingMeta) ResolveShardingDDL() bool { + meta.activeIdx++ + if meta.activeIdx == len(meta.global.Items) { + meta.reinitialize() + return true + } + return false +} + +// ActiveDDLFirstPos returns the first binlog position of active DDL +func (meta *ShardingMeta) ActiveDDLFirstPos() (mysql.Position, error) { + if meta.activeIdx >= len(meta.global.Items) { + return mysql.Position{}, errors.Errorf("activeIdx %d larger than length of global DDLItems: %v", meta.activeIdx, meta.global.Items) + } + return meta.global.Items[meta.activeIdx].FirstPos, nil +} + +// FlushData returns sharding meta flush SQL and args +func (meta *ShardingMeta) FlushData(sourceID, tableID string) ([]string, [][]interface{}) { + if len(meta.global.Items) == 0 { + sql2 := fmt.Sprintf("DELETE FROM `%s`.`%s` where source_id=? and target_table_id=?", meta.schema, meta.table) + args2 := []interface{}{sourceID, tableID} + return []string{sql2}, [][]interface{}{args2} + } + var ( + sqls = make([]string, 1+len(meta.sources)) + args = make([][]interface{}, 0, 1+len(meta.sources)) + baseSQL = fmt.Sprintf("INSERT INTO `%s`.`%s` (`source_id`, `target_table_id`, `source_table_id`, `active_index`, `is_global`, `data`) VALUES(?,?,?,?,?,?) ON DUPLICATE KEY UPDATE `data`=?, `active_index`=?", meta.schema, meta.table) + ) + for i := range sqls { + sqls[i] = baseSQL + } + args = append(args, []interface{}{sourceID, tableID, "", meta.activeIdx, true, meta.global.String(), meta.global.String(), meta.activeIdx}) + for source, seq := range meta.sources { + args = append(args, []interface{}{sourceID, tableID, source, meta.activeIdx, false, seq.String(), seq.String(), meta.activeIdx}) + } + return sqls, args +} diff --git a/syncer/sharding-meta/shardmeta_test.go b/syncer/sharding-meta/shardmeta_test.go new file mode 100644 index 0000000000..5812e4f671 --- /dev/null +++ b/syncer/sharding-meta/shardmeta_test.go @@ -0,0 +1,272 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package shardmeta + +import ( + "fmt" + "testing" + + "github.com/pingcap/check" + "github.com/siddontang/go-mysql/mysql" +) + +var _ = check.Suite(&testShardMetaSuite{}) + +func TestSuite(t *testing.T) { + check.TestingT(t) +} + +type testShardMetaSuite struct { +} + +func (t *testShardMetaSuite) TestShardingMeta(c *check.C) { + var ( + active bool + err error + sqls []string + args [][]interface{} + pos mysql.Position + filename = "mysql-bin.000001" + table1 = "table1" + table2 = "table2" + table3 = "table3" + metaSchema = "dm_meta" + metaTable = "test_syncer_sharding_meta" + sourceID = "mysql-replica-01" + tableID = "`target_db`.`target_table`" + meta = NewShardingMeta(metaSchema, metaTable) + items = []*DDLItem{ + NewDDLItem(mysql.Position{filename, 1000}, []string{"ddl1"}, table1), + NewDDLItem(mysql.Position{filename, 1200}, []string{"ddl2-1,ddl2-2"}, table1), + NewDDLItem(mysql.Position{filename, 1400}, []string{"ddl3"}, table1), + NewDDLItem(mysql.Position{filename, 1600}, []string{"ddl1"}, table2), + NewDDLItem(mysql.Position{filename, 1800}, []string{"ddl2-1,ddl2-2"}, table2), + NewDDLItem(mysql.Position{filename, 2000}, []string{"ddl3"}, table2), + NewDDLItem(mysql.Position{filename, 2200}, []string{"ddl1"}, table3), + NewDDLItem(mysql.Position{filename, 2400}, []string{"ddl2-1,ddl2-2"}, table3), + NewDDLItem(mysql.Position{filename, 2600}, []string{"ddl3"}, table3), + } + ) + + // 1st round sharding DDL sync + for i := 0; i < 7; i++ { + active, err = meta.AddItem(items[i]) + c.Assert(err, check.IsNil) + if i%3 == 0 { + c.Assert(active, check.IsTrue) + } else { + c.Assert(active, check.IsFalse) + } + } + + c.Assert(meta.GetGlobalItems(), check.DeepEquals, []*DDLItem{items[0], items[1], items[2]}) + c.Assert(meta.GetGlobalActiveDDL(), check.DeepEquals, items[0]) + c.Assert(meta.GetActiveDDLItem(table1), check.DeepEquals, items[0]) + c.Assert(meta.GetActiveDDLItem(table2), check.DeepEquals, items[3]) + c.Assert(meta.GetActiveDDLItem(table3), check.DeepEquals, items[6]) + c.Assert(meta.InSequenceSharding(), check.IsTrue) + pos, err = meta.ActiveDDLFirstPos() + c.Assert(err, check.IsNil) + c.Assert(pos, check.DeepEquals, items[0].FirstPos) + + // find synced in shrading group, and call ShardingMeta.ResolveShardingDDL + c.Assert(meta.ResolveShardingDDL(), check.IsFalse) + + c.Assert(meta.GetGlobalActiveDDL(), check.DeepEquals, items[1]) + c.Assert(meta.GetActiveDDLItem(table1), check.DeepEquals, items[1]) + c.Assert(meta.GetActiveDDLItem(table2), check.DeepEquals, items[4]) + c.Assert(meta.GetActiveDDLItem(table3), check.IsNil) + c.Assert(meta.InSequenceSharding(), check.IsTrue) + pos, err = meta.ActiveDDLFirstPos() + c.Assert(err, check.IsNil) + c.Assert(pos, check.DeepEquals, items[1].FirstPos) + + sqls, args = meta.FlushData(sourceID, tableID) + c.Assert(sqls, check.HasLen, 4) + c.Assert(args, check.HasLen, 4) + for _, stmt := range sqls { + c.Assert(stmt, check.Matches, fmt.Sprintf("INSERT INTO .*")) + } + for _, arg := range args { + c.Assert(arg, check.HasLen, 8) + c.Assert(arg[3], check.Equals, 1) + } + + // 2nd round sharding DDL sync + for i := 0; i < 8; i++ { + if i%3 == 0 { + continue + } + active, err = meta.AddItem(items[i]) + c.Assert(err, check.IsNil) + if i%3 == 1 { + c.Assert(active, check.IsTrue) + } else { + c.Assert(active, check.IsFalse) + } + } + + c.Assert(meta.GetGlobalActiveDDL(), check.DeepEquals, items[1]) + c.Assert(meta.GetActiveDDLItem(table1), check.DeepEquals, items[1]) + c.Assert(meta.GetActiveDDLItem(table2), check.DeepEquals, items[4]) + c.Assert(meta.GetActiveDDLItem(table3), check.DeepEquals, items[7]) + c.Assert(meta.InSequenceSharding(), check.IsTrue) + pos, err = meta.ActiveDDLFirstPos() + c.Assert(err, check.IsNil) + c.Assert(pos, check.DeepEquals, items[1].FirstPos) + + // find synced in shrading group, and call ShardingMeta.ResolveShardingDDL + c.Assert(meta.ResolveShardingDDL(), check.IsFalse) + + c.Assert(meta.GetGlobalActiveDDL(), check.DeepEquals, items[2]) + c.Assert(meta.GetActiveDDLItem(table1), check.DeepEquals, items[2]) + c.Assert(meta.GetActiveDDLItem(table2), check.DeepEquals, items[5]) + c.Assert(meta.GetActiveDDLItem(table3), check.IsNil) + c.Assert(meta.InSequenceSharding(), check.IsTrue) + pos, err = meta.ActiveDDLFirstPos() + c.Assert(err, check.IsNil) + c.Assert(pos, check.DeepEquals, items[2].FirstPos) + + sqls, args = meta.FlushData(sourceID, tableID) + c.Assert(sqls, check.HasLen, 4) + c.Assert(args, check.HasLen, 4) + for _, stmt := range sqls { + c.Assert(stmt, check.Matches, fmt.Sprintf("INSERT INTO .*")) + } + for _, arg := range args { + c.Assert(arg, check.HasLen, 8) + c.Assert(arg[3], check.Equals, 2) + } + + // 3rd round sharding DDL sync + for i := 0; i < 9; i++ { + if i%3 != 2 { + continue + } + active, err = meta.AddItem(items[i]) + c.Assert(err, check.IsNil) + if i%3 == 2 { + c.Assert(active, check.IsTrue) + } else { + c.Assert(active, check.IsFalse) + } + } + c.Assert(meta.GetGlobalActiveDDL(), check.DeepEquals, items[2]) + c.Assert(meta.GetActiveDDLItem(table1), check.DeepEquals, items[2]) + c.Assert(meta.GetActiveDDLItem(table2), check.DeepEquals, items[5]) + c.Assert(meta.GetActiveDDLItem(table3), check.DeepEquals, items[8]) + c.Assert(meta.InSequenceSharding(), check.IsTrue) + pos, err = meta.ActiveDDLFirstPos() + c.Assert(err, check.IsNil) + c.Assert(pos, check.DeepEquals, items[2].FirstPos) + + // find synced in shrading group, and call ShardingMeta.ResolveShardingDDL + c.Assert(meta.ResolveShardingDDL(), check.IsTrue) + + c.Assert(meta.GetGlobalActiveDDL(), check.IsNil) + c.Assert(meta.GetActiveDDLItem(table1), check.IsNil) + c.Assert(meta.GetActiveDDLItem(table2), check.IsNil) + c.Assert(meta.GetActiveDDLItem(table3), check.IsNil) + c.Assert(meta.InSequenceSharding(), check.IsFalse) + pos, err = meta.ActiveDDLFirstPos() + c.Assert(err, check.ErrorMatches, fmt.Sprintf("activeIdx %d larger than length of global DDLItems: .*", meta.ActiveIdx())) + + sqls, args = meta.FlushData(sourceID, tableID) + c.Assert(sqls, check.HasLen, 1) + c.Assert(args, check.HasLen, 1) + c.Assert(sqls[0], check.Matches, fmt.Sprintf("DELETE FROM .*")) + c.Assert(args[0], check.DeepEquals, []interface{}{sourceID, tableID}) +} + +func (t *testShardMetaSuite) TestShardingMetaWrongSequence(c *check.C) { + var ( + active bool + err error + filename = "mysql-bin.000001" + table1 = "table1" + table2 = "table2" + meta = NewShardingMeta("", "") + items = []*DDLItem{ + NewDDLItem(mysql.Position{filename, 1000}, []string{"ddl1"}, table1), + NewDDLItem(mysql.Position{filename, 1200}, []string{"ddl2"}, table1), + NewDDLItem(mysql.Position{filename, 1400}, []string{"ddl3"}, table1), + NewDDLItem(mysql.Position{filename, 1600}, []string{"ddl1"}, table2), + NewDDLItem(mysql.Position{filename, 1800}, []string{"ddl3"}, table2), + NewDDLItem(mysql.Position{filename, 2000}, []string{"ddl2"}, table2), + } + ) + + // 1st round sharding DDL sync + for i := 0; i < 4; i++ { + active, err = meta.AddItem(items[i]) + c.Assert(err, check.IsNil) + if i%3 == 0 { + c.Assert(active, check.IsTrue) + } else { + c.Assert(active, check.IsFalse) + } + } + // find synced in shrading group, and call ShardingMeta.ResolveShardingDDL + c.Assert(meta.ResolveShardingDDL(), check.IsFalse) + + // 2nd round sharding DDL sync + for i := 0; i < 4; i++ { + if i%3 == 0 { + continue + } + active, err = meta.AddItem(items[i]) + c.Assert(err, check.IsNil) + if i%3 == 1 { + c.Assert(active, check.IsTrue) + } else { + c.Assert(active, check.IsFalse) + } + } + active, err = meta.AddItem(items[4]) + c.Assert(active, check.IsFalse) + c.Assert(err, check.ErrorMatches, "detect inconsistent DDL sequence from source .*, right DDL sequence should be .*") +} + +func (t *testShardMetaSuite) TestFlushLoadMeta(c *check.C) { + var ( + active bool + err error + filename = "mysql-bin.000001" + table1 = "table1" + table2 = "table2" + metaSchema = "dm_meta" + metaTable = "test_syncer_sharding_meta" + sourceID = "mysql-replica-01" + tableID = "`target_db`.`target_table`" + meta = NewShardingMeta(metaSchema, metaTable) + loadedMeta = NewShardingMeta(metaSchema, metaTable) + items = []*DDLItem{ + NewDDLItem(mysql.Position{filename, 1000}, []string{"ddl1"}, table1), + NewDDLItem(mysql.Position{filename, 1200}, []string{"ddl1"}, table2), + } + ) + for _, item := range items { + active, err = meta.AddItem(item) + c.Assert(err, check.IsNil) + c.Assert(active, check.IsTrue) + } + sqls, args := meta.FlushData(sourceID, tableID) + c.Assert(sqls, check.HasLen, 3) + c.Assert(args, check.HasLen, 3) + for _, arg := range args { + c.Assert(arg, check.HasLen, 8) + loadedMeta.RestoreFromData(arg[2].(string), arg[3].(int), arg[4].(bool), []byte(arg[5].(string))) + } + c.Assert(loadedMeta, check.DeepEquals, meta) +} diff --git a/syncer/sharding_group.go b/syncer/sharding_group.go index e63a5dc224..68a23a7fb1 100644 --- a/syncer/sharding_group.go +++ b/syncer/sharding_group.go @@ -48,16 +48,14 @@ package syncer * save this DDL's next binlog pos as last pos for the sharding group * execute this DDL * reset the sharding group - * 7. create a new streamer to re-sync ignored binlog events in the sharding group before - * start re-syncing from first DDL's binlog pos in step.2 - * pause the global streamer's syncing - * 8. continue the syncing with the sharding group special streamer + * 7. redirect global streamer to the first DDL's binlog pos in step.2 + * 8. continue the syncing with the global streamer * ignore binlog events which not belong to the sharding group * ignore binlog events have synced (obsolete) for the sharding group * synced ignored binlog events in the sharding group from step.3 to step.5 * update per-table's and global checkpoint * 9. last pos in step.6 arrived - * 10. close the sharding group special streamer + * 10. redirect global streamer to the active DDL in sequence sharding if needed * 11. use the global streamer to continue the syncing * * all binlogs executed at least once: @@ -80,8 +78,10 @@ import ( "github.com/pingcap/errors" "github.com/siddontang/go-mysql/mysql" + "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/pb" - "github.com/pingcap/dm/pkg/utils" + "github.com/pingcap/dm/pkg/log" + shardmeta "github.com/pingcap/dm/syncer/sharding-meta" ) // ShardingGroup represents a sharding DDL sync group @@ -93,24 +93,32 @@ type ShardingGroup struct { // (0, len(sources)): waiting for syncing // NOTE: we can make remain to be configurable if needed remain int - sources map[string]bool // source table ID -> whether source table's DDL synced - sourceDDLs map[string][]string // source table ID -> ddl text; detect multiple ddl for on table in a sharding ddl - IsSchemaOnly bool // whether is a schema (database) only DDL TODO: zxc add schema-level syncing support later - firstPos *mysql.Position // first DDL's binlog pos, used to restrain the global checkpoint when un-resolved - firstEndPos *mysql.Position // first DDL's binlog End_log_pos, used to re-direct binlog streamer after synced - ddls []string // DDL which current in syncing + sources map[string]bool // source table ID -> whether source table's DDL synced + IsSchemaOnly bool // whether is a schema (database) only DDL TODO: zxc add schema-level syncing support later + + sourceID string // associate dm-worker source ID + meta *shardmeta.ShardingMeta // sharding sequence meta storage + + firstPos *mysql.Position // first DDL's binlog pos, used to restrain the global checkpoint when un-resolved + firstEndPos *mysql.Position // first DDL's binlog End_log_pos, used to re-direct binlog streamer after synced + ddls []string // DDL which current in syncing } // NewShardingGroup creates a new ShardingGroup -func NewShardingGroup(sources []string, isSchemaOnly bool) *ShardingGroup { +func NewShardingGroup(sourceID, shardMetaSchema, shardMetaTable string, sources []string, meta *shardmeta.ShardingMeta, isSchemaOnly bool) *ShardingGroup { sg := &ShardingGroup{ remain: len(sources), sources: make(map[string]bool, len(sources)), - sourceDDLs: make(map[string][]string), IsSchemaOnly: isSchemaOnly, + sourceID: sourceID, firstPos: nil, firstEndPos: nil, } + if meta != nil { + sg.meta = meta + } else { + sg.meta = shardmeta.NewShardingMeta(shardMetaSchema, shardMetaTable) + } for _, source := range sources { sg.sources[source] = false } @@ -119,40 +127,28 @@ func NewShardingGroup(sources []string, isSchemaOnly bool) *ShardingGroup { // Merge merges new sources to exists // used cases -// * add a new database / table to exists sharding group +// * add a new table to exists sharding group // * add new table(s) to parent database's sharding group -// if group is un-resolved, we add it in sources and set it true +// if group is in sequence sharding, return error directly // othereise add it in source, set it false and increment remain func (sg *ShardingGroup) Merge(sources []string) (bool, bool, int, error) { sg.Lock() defer sg.Unlock() - // need to check whether source is exist? but we maybe re-sync more times - isResolving := sg.remain != len(sg.sources) - ddls := []string{"create table"} + // NOTE: we don't support add shard table when in sequence sharding + if sg.meta.InSequenceSharding() { + return true, sg.remain <= 0, sg.remain, errors.NotSupportedf("in sequence sharding, can't add table, activeDDL: %s, sharding sequence: %s", sg.meta.GetGlobalActiveDDL(), sg.meta.GetGlobalItems()) + } for _, source := range sources { - synced, exist := sg.sources[source] - if isResolving && !synced { - if exist { - if !synced { - sg.remain-- - } else if !utils.CompareShardingDDLs(sg.sourceDDLs[source], ddls) { - return isResolving, sg.remain <= 0, sg.remain, errors.NotSupportedf("execute multiple ddls: previous ddl %s and current ddls %q for source table %s", sg.sourceDDLs[source], ddls, source) - } - } - - sg.sources[source] = true - sg.sourceDDLs[source] = ddls // fake create table ddl - } else { - if !exist { - sg.remain++ - } + _, exist := sg.sources[source] + if !exist { + sg.remain++ sg.sources[source] = false } } - return isResolving, sg.remain <= 0, sg.remain, nil + return false, sg.remain <= 0, sg.remain, nil } // Leave leaves from sharding group @@ -164,9 +160,9 @@ func (sg *ShardingGroup) Leave(sources []string) error { sg.Lock() defer sg.Unlock() - // if group is un-resolved, we can't do drop (DROP DATABASE / TABLE) - if sg.remain != len(sg.sources) { - return errors.NotSupportedf("group's sharding DDL %v is un-resolved, try drop sources %v", sg.ddls, sources) + // NOTE: if group is in sequence sharding, we can't do drop (DROP DATABASE / TABLE) + if sg.meta.InSequenceSharding() { + return errors.NotSupportedf("in sequence sharding, try drop sources %v, activeDDL: %s, sharding sequence: %s", sources, sg.meta.GetGlobalActiveDDL(), sg.meta.GetGlobalItems()) } for _, source := range sources { @@ -174,7 +170,6 @@ func (sg *ShardingGroup) Leave(sources []string) error { sg.remain-- } delete(sg.sources, source) - delete(sg.sourceDDLs, source) } return nil @@ -196,22 +191,22 @@ func (sg *ShardingGroup) Reset() { } // TrySync tries to sync the sharding group -// if source not in sharding group before, it will be added -func (sg *ShardingGroup) TrySync(source string, pos, endPos mysql.Position, ddls []string) (bool, int, error) { +// returns +// synced: whether the source table's sharding group synced +// active: whether the DDL will be processed in this round +// remain: remain un-synced source table's count +func (sg *ShardingGroup) TrySync(source string, pos, endPos mysql.Position, ddls []string) (bool, bool, int, error) { sg.Lock() defer sg.Unlock() - synced, ok := sg.sources[source] - if !ok { - // new source added, sg.remain unchanged + ddlItem := shardmeta.NewDDLItem(pos, ddls, source) + active, err := sg.meta.AddItem(ddlItem) + if err != nil { + return sg.remain <= 0, active, sg.remain, errors.Trace(err) + } + if active && !sg.sources[source] { sg.sources[source] = true - sg.sourceDDLs[source] = ddls - } else if !synced { sg.remain-- - sg.sources[source] = true - sg.sourceDDLs[source] = ddls - } else if !utils.CompareShardingDDLs(sg.sourceDDLs[source], ddls) { - return sg.remain <= 0, sg.remain, errors.NotSupportedf("execute multiple ddls: previous ddl %s and current ddls %q for source table %s", sg.sourceDDLs[source], ddls, source) } if sg.firstPos == nil { @@ -219,20 +214,20 @@ func (sg *ShardingGroup) TrySync(source string, pos, endPos mysql.Position, ddls sg.firstEndPos = &endPos sg.ddls = ddls } - return sg.remain <= 0, sg.remain, nil + return sg.remain <= 0, active, sg.remain, nil } -// InSyncing checks whether the source is in syncing -func (sg *ShardingGroup) InSyncing(source string) bool { +// CheckSyncing checks the source table syncing status +// returns +// beforeActiveDDL: whether the position is before active DDL +func (sg *ShardingGroup) CheckSyncing(source string, pos mysql.Position) (beforeActiveDDL bool) { sg.RLock() defer sg.RUnlock() - synced, ok := sg.sources[source] - if !ok { - return false + activeDDLItem := sg.meta.GetActiveDDLItem(source) + if activeDDLItem == nil { + return true } - // the group not synced, but the source synced - // so, the source is in syncing and waiting for other sources to sync - return sg.remain > 0 && synced + return activeDDLItem.FirstPos.Compare(pos) > 0 } // UnresolvedGroupInfo returns pb.ShardingGroup if is unresolved, else returns nil @@ -295,6 +290,8 @@ func (sg *ShardingGroup) UnresolvedTables() [][]string { sg.RLock() defer sg.RUnlock() + // TODO: if we have sharding ddl sequence, and partial ddls synced, we treat + // all the of the tables as unresolved if sg.remain == len(sg.sources) { return nil } @@ -318,6 +315,14 @@ func (sg *ShardingGroup) FirstPosUnresolved() *mysql.Position { Pos: sg.firstPos.Pos, } } + item := sg.meta.GetGlobalActiveDDL() + if item != nil { + // make a new copy + return &mysql.Position{ + Name: item.FirstPos.Name, + Pos: item.FirstPos.Pos, + } + } return nil } @@ -340,6 +345,37 @@ func (sg *ShardingGroup) String() string { return fmt.Sprintf("IsSchemaOnly:%v remain:%d, sources:%+v", sg.IsSchemaOnly, sg.remain, sg.sources) } +// InSequenceSharding returns whether this sharding group is in sequence sharding +func (sg *ShardingGroup) InSequenceSharding() bool { + sg.RLock() + defer sg.RUnlock() + return sg.meta.InSequenceSharding() +} + +// ResolveShardingDDL resolves sharding DDL in sharding group +func (sg *ShardingGroup) ResolveShardingDDL() bool { + sg.Lock() + defer sg.Unlock() + reset := sg.meta.ResolveShardingDDL() + // reset sharding group after DDL is exexuted + return reset +} + +// ActiveDDLFirstPos returns the first binlog position of active DDL +func (sg *ShardingGroup) ActiveDDLFirstPos() (mysql.Position, error) { + sg.RLock() + defer sg.RUnlock() + pos, err := sg.meta.ActiveDDLFirstPos() + return pos, errors.Trace(err) +} + +// FlushData returns sharding meta flush SQLs and args +func (sg *ShardingGroup) FlushData(targetTableID string) ([]string, [][]interface{}) { + sg.RLock() + defer sg.RUnlock() + return sg.meta.FlushData(sg.sourceID, targetTableID) +} + // GenTableID generates table ID func GenTableID(schema, table string) (ID string, isSchemaOnly bool) { if len(table) == 0 { @@ -360,48 +396,72 @@ func UnpackTableID(id string) (string, string) { type ShardingGroupKeeper struct { sync.RWMutex groups map[string]*ShardingGroup // target table ID -> ShardingGroup + cfg *config.SubTaskConfig + + shardMetaSchema string + shardMetaTable string + db *Conn } // NewShardingGroupKeeper creates a new ShardingGroupKeeper -func NewShardingGroupKeeper() *ShardingGroupKeeper { +func NewShardingGroupKeeper(cfg *config.SubTaskConfig) *ShardingGroupKeeper { k := &ShardingGroupKeeper{ groups: make(map[string]*ShardingGroup), + cfg: cfg, } + k.shardMetaSchema = cfg.MetaSchema + k.shardMetaTable = fmt.Sprintf(shardmeta.MetaTableFormat, cfg.Name) return k } // AddGroup adds new group(s) according to target schema, table and source IDs -func (k *ShardingGroupKeeper) AddGroup(targetSchema, targetTable string, sourceIDs []string, merge bool) (needShardingHandle bool, group *ShardingGroup, synced bool, remain int, err error) { +func (k *ShardingGroupKeeper) AddGroup(targetSchema, targetTable string, sourceIDs []string, meta *shardmeta.ShardingMeta, merge bool) (needShardingHandle bool, group *ShardingGroup, synced bool, remain int, err error) { // if need to support target table-level sharding DDL // we also need to support target schema-level sharding DDL schemaID, _ := GenTableID(targetSchema, "") - tableID, _ := GenTableID(targetSchema, targetTable) + targetTableID, _ := GenTableID(targetSchema, targetTable) k.Lock() defer k.Unlock() if schemaGroup, ok := k.groups[schemaID]; !ok { - k.groups[schemaID] = NewShardingGroup(sourceIDs, true) + k.groups[schemaID] = NewShardingGroup(k.cfg.SourceID, k.shardMetaSchema, k.shardMetaTable, sourceIDs, meta, true) } else { schemaGroup.Merge(sourceIDs) } var ok bool - if group, ok = k.groups[tableID]; !ok { - group = NewShardingGroup(sourceIDs, false) - k.groups[tableID] = group + if group, ok = k.groups[targetTableID]; !ok { + group = NewShardingGroup(k.cfg.SourceID, k.shardMetaSchema, k.shardMetaTable, sourceIDs, meta, false) + k.groups[targetTableID] = group } else if merge { - needShardingHandle, synced, remain, err = k.groups[tableID].Merge(sourceIDs) + needShardingHandle, synced, remain, err = k.groups[targetTableID].Merge(sourceIDs) } else { - err = errors.AlreadyExistsf("table group %s", tableID) + err = errors.AlreadyExistsf("table group %s", targetTableID) return } return } -// Clear clears all sharding groups -func (k *ShardingGroupKeeper) Clear() { +// Init does initialization staff +func (k *ShardingGroupKeeper) Init(conn *Conn) error { + k.clear() + if conn != nil { + k.db = conn + } else { + db, err := createDB(k.cfg, k.cfg.To, maxDDLConnectionTimeout) + if err != nil { + return errors.Trace(err) + } + k.db = db + } + err := k.prepare() + return errors.Trace(err) +} + +// clear clears all sharding groups +func (k *ShardingGroupKeeper) clear() { k.Lock() defer k.Unlock() k.groups = make(map[string]*ShardingGroup) @@ -420,10 +480,10 @@ func (k *ShardingGroupKeeper) ResetGroups() { // LeaveGroup doesn't affect in syncing process func (k *ShardingGroupKeeper) LeaveGroup(targetSchema, targetTable string, sources []string) error { schemaID, _ := GenTableID(targetSchema, "") - tableID, _ := GenTableID(targetSchema, targetTable) + targetTableID, _ := GenTableID(targetSchema, targetTable) k.Lock() defer k.Unlock() - if group, ok := k.groups[tableID]; ok { + if group, ok := k.groups[targetTableID]; ok { if err := group.Leave(sources); err != nil { return errors.Trace(err) } @@ -441,57 +501,65 @@ func (k *ShardingGroupKeeper) LeaveGroup(targetSchema, targetTable string, sourc // isSharding: whether the source table is in a sharding group // group: the sharding group // synced: whether the source table's sharding group synced +// active: whether is active DDL in sequence sharding DDL // remain: remain un-synced source table's count -func (k *ShardingGroupKeeper) TrySync(targetSchema, targetTable, source string, pos, endPos mysql.Position, ddls []string) (needShardingHandle bool, group *ShardingGroup, synced bool, remain int, err error) { - tableID, schemaOnly := GenTableID(targetSchema, targetTable) +func (k *ShardingGroupKeeper) TrySync( + targetSchema, targetTable, source string, pos, endPos mysql.Position, ddls []string) ( + needShardingHandle bool, group *ShardingGroup, synced, active bool, remain int, err error) { + + targetTableID, schemaOnly := GenTableID(targetSchema, targetTable) if schemaOnly { // NOTE: now we don't support syncing for schema only sharding DDL - return false, nil, true, 0, nil + return false, nil, true, false, 0, nil } k.Lock() defer k.Unlock() - group, ok := k.groups[tableID] + group, ok := k.groups[targetTableID] if !ok { - return false, group, true, 0, nil + return false, group, true, false, 0, nil } - synced, remain, err = group.TrySync(source, pos, endPos, ddls) - return true, group, synced, remain, errors.Trace(err) + synced, active, remain, err = group.TrySync(source, pos, endPos, ddls) + return true, group, synced, active, remain, errors.Trace(err) } -// InSyncing checks whether the source table is in syncing -func (k *ShardingGroupKeeper) InSyncing(targetSchema, targetTable, source string) bool { - tableID, _ := GenTableID(targetSchema, targetTable) - k.RLock() - defer k.RUnlock() - group, ok := k.groups[tableID] - if !ok { +// InSyncing checks whether the source is in sharding syncing +func (k *ShardingGroupKeeper) InSyncing(targetSchema, targetTable, source string, pos mysql.Position) bool { + group := k.Group(targetSchema, targetTable) + if group == nil { return false } - return group.InSyncing(source) + return !group.CheckSyncing(source, pos) } -// UnresolvedTables returns all source tables which with DDLs are un-resolved +// UnresolvedTables returns +// all `target-schema.target-table` that has unresolved sharding DDL +// all source tables which with DDLs are un-resolved // NOTE: this func only ensure the returned tables are current un-resolved // if passing the returned tables to other func (like checkpoint), // must ensure their sync state not changed in this progress -func (k *ShardingGroupKeeper) UnresolvedTables() [][]string { +func (k *ShardingGroupKeeper) UnresolvedTables() (map[string]bool, [][]string) { + ids := make(map[string]bool) tables := make([][]string, 0, 10) k.RLock() defer k.RUnlock() - for _, group := range k.groups { - tables = append(tables, group.UnresolvedTables()...) + for id, group := range k.groups { + unresolved := group.UnresolvedTables() + if len(unresolved) > 0 { + ids[id] = true + tables = append(tables, unresolved...) + } } - return tables + return ids, tables } // Group returns target table's group, nil if not exist func (k *ShardingGroupKeeper) Group(targetSchema, targetTable string) *ShardingGroup { - tableID, _ := GenTableID(targetSchema, targetTable) + targetTableID, _ := GenTableID(targetSchema, targetTable) k.RLock() defer k.RUnlock() - return k.groups[tableID] + return k.groups[targetTableID] } // lowestFirstPosInGroups returns the lowest pos in all groups which are unresolved @@ -552,10 +620,144 @@ func (k *ShardingGroupKeeper) UnresolvedGroups() []*pb.ShardingGroup { return groups } +// InSequenceSharding returns whether exists sharding group in unfinished sequence sharding +func (k *ShardingGroupKeeper) InSequenceSharding() bool { + k.RLock() + defer k.RUnlock() + for _, group := range k.groups { + if group.InSequenceSharding() { + return true + } + } + return false +} + +// ResolveShardingDDL resolves one sharding DDL in specific group +func (k *ShardingGroupKeeper) ResolveShardingDDL(targetSchema, targetTable string) (bool, error) { + group := k.Group(targetSchema, targetTable) + if group != nil { + return group.ResolveShardingDDL(), nil + } + return false, errors.NotFoundf("sharding group for `%s`.`%s`", targetSchema, targetTable) +} + +// ActiveDDLFirstPos returns the binlog postion of active DDL +func (k *ShardingGroupKeeper) ActiveDDLFirstPos(targetSchema, targetTable string) (mysql.Position, error) { + group := k.Group(targetSchema, targetTable) + k.Lock() + defer k.Unlock() + if group != nil { + pos, err := group.ActiveDDLFirstPos() + return pos, errors.Trace(err) + } + return mysql.Position{}, errors.NotFoundf("sharding group for `%s`.`%s`", targetSchema, targetTable) +} + +// PrepareFlushSQLs returns all sharding meta flushed SQLs execpt for given table IDs +func (k *ShardingGroupKeeper) PrepareFlushSQLs(exceptTableIDs map[string]bool) ([]string, [][]interface{}) { + k.RLock() + defer k.RUnlock() + var ( + sqls = make([]string, 0, len(k.groups)) + args = make([][]interface{}, 0, len(k.groups)) + ) + for id, group := range k.groups { + if group.IsSchemaOnly { + continue + } + _, ok := exceptTableIDs[id] + if ok { + continue + } + sqls2, args2 := group.FlushData(id) + sqls = append(sqls, sqls2...) + args = append(args, args2...) + } + return sqls, args +} + +// Prepare inits sharding meta schema and tables if not exists +func (k *ShardingGroupKeeper) prepare() error { + if err := k.createSchema(); err != nil { + return errors.Trace(err) + } + + if err := k.createTable(); err != nil { + return errors.Trace(err) + } + + return nil +} + +// Close closes sharding group keeper +func (k *ShardingGroupKeeper) Close() { + closeDBs(k.db) +} + +func (k *ShardingGroupKeeper) createSchema() error { + stmt := fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS `%s`", k.shardMetaSchema) + args := make([]interface{}, 0) + err := k.db.executeSQL([]string{stmt}, [][]interface{}{args}, maxRetryCount) + log.Infof("[ShardingGroupKeeper] execute sql %s", stmt) + return errors.Trace(err) +} + +func (k *ShardingGroupKeeper) createTable() error { + tableName := fmt.Sprintf("`%s`.`%s`", k.shardMetaSchema, k.shardMetaTable) + stmt := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s ( + source_id VARCHAR(32) NOT NULL COMMENT 'replica source id, defined in task.yaml', + target_table_id VARCHAR(144) NOT NULL, + source_table_id VARCHAR(144) NOT NULL, + active_index INT, + is_global BOOLEAN, + data JSON, + create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + UNIQUE KEY uk_source_id_table_id_source (source_id, target_table_id, source_table_id) + )`, tableName) + err := k.db.executeSQL([]string{stmt}, [][]interface{}{{}}, maxRetryCount) + log.Infof("[ShardingGroupKeeper] execute sql %s", stmt) + return errors.Trace(err) +} + +// LoadShardMeta implements CheckPoint.LoadShardMeta +func (k *ShardingGroupKeeper) LoadShardMeta() (map[string]*shardmeta.ShardingMeta, error) { + query := fmt.Sprintf("SELECT `target_table_id`, `source_table_id`, `active_index`, `is_global`, `data` FROM `%s`.`%s` WHERE `source_id`='%s'", k.shardMetaSchema, k.shardMetaTable, k.cfg.SourceID) + rows, err := k.db.querySQL(query, maxRetryCount) + if err != nil { + return nil, errors.Trace(err) + } + defer rows.Close() + + var ( + targetTableID string + sourceTableID string + activeIndex int + isGlobal bool + data string + meta = make(map[string]*shardmeta.ShardingMeta) + ) + for rows.Next() { + err := rows.Scan(&targetTableID, &sourceTableID, &activeIndex, &isGlobal, &data) + if err != nil { + return nil, errors.Trace(err) + } + if _, ok := meta[targetTableID]; !ok { + meta[targetTableID] = shardmeta.NewShardingMeta(k.shardMetaSchema, k.shardMetaTable) + } + err = meta[targetTableID].RestoreFromData(sourceTableID, activeIndex, isGlobal, []byte(data)) + if err != nil { + return nil, errors.Trace(err) + } + } + return meta, errors.Trace(rows.Err()) +} + // ShardingReSync represents re-sync info for a sharding DDL group type ShardingReSync struct { currPos mysql.Position // current DDL's binlog pos, initialize to first DDL's pos latestPos mysql.Position // latest DDL's binlog pos targetSchema string targetTable string + allResolved bool } diff --git a/syncer/sharding_group_test.go b/syncer/sharding_group_test.go index 3f47d5f75c..f97a5b6e4d 100644 --- a/syncer/sharding_group_test.go +++ b/syncer/sharding_group_test.go @@ -16,6 +16,8 @@ package syncer import ( . "github.com/pingcap/check" "github.com/siddontang/go-mysql/mysql" + + "github.com/pingcap/dm/dm/config" ) var _ = Suite(&testShardingGroupSuite{}) @@ -24,28 +26,35 @@ type testShardingGroupSuite struct { } func (t *testShardingGroupSuite) TestLowestFirstPosInGroups(c *C) { + cfg := &config.SubTaskConfig{ + SourceID: "mysql-replica-01", + MetaSchema: "test", + Name: "checkpoint_ut", + } + ddls := []string{"DUMMY DDL"} - g1 := NewShardingGroup([]string{"db1.tbl1", "db1.tbl2"}, false) + k := NewShardingGroupKeeper(cfg) + + g1 := NewShardingGroup(k.cfg.SourceID, k.shardMetaSchema, k.shardMetaTable, []string{"db1.tbl1", "db1.tbl2"}, nil, false) pos1 := mysql.Position{Name: "mysql-bin.000002", Pos: 123} endPos1 := mysql.Position{Name: "mysql-bin.000002", Pos: 456} - _, _, err := g1.TrySync("db1.tbl1", pos1, endPos1, ddls) + _, _, _, err := g1.TrySync("db1.tbl1", pos1, endPos1, ddls) c.Assert(err, IsNil) // lowest - g2 := NewShardingGroup([]string{"db2.tbl1", "db2.tbl2"}, false) + g2 := NewShardingGroup(k.cfg.SourceID, k.shardMetaSchema, k.shardMetaTable, []string{"db2.tbl1", "db2.tbl2"}, nil, false) pos2 := mysql.Position{Name: "mysql-bin.000001", Pos: 123} endPos2 := mysql.Position{Name: "mysql-bin.000001", Pos: 456} - _, _, err = g2.TrySync("db2.tbl1", pos2, endPos2, ddls) + _, _, _, err = g2.TrySync("db2.tbl1", pos2, endPos2, ddls) c.Assert(err, IsNil) - g3 := NewShardingGroup([]string{"db3.tbl1", "db3.tbl2"}, false) + g3 := NewShardingGroup(k.cfg.SourceID, k.shardMetaSchema, k.shardMetaTable, []string{"db3.tbl1", "db3.tbl2"}, nil, false) pos3 := mysql.Position{Name: "mysql-bin.000003", Pos: 123} endPos3 := mysql.Position{Name: "mysql-bin.000003", Pos: 456} - _, _, err = g3.TrySync("db3.tbl1", pos3, endPos3, ddls) + _, _, _, err = g3.TrySync("db3.tbl1", pos3, endPos3, ddls) c.Assert(err, IsNil) - k := NewShardingGroupKeeper() k.groups["db1.tbl"] = g1 k.groups["db2.tbl"] = g2 k.groups["db3.tbl"] = g3 diff --git a/syncer/syncer.go b/syncer/syncer.go index 93dd1eedbf..cf3c63c803 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -17,7 +17,6 @@ import ( "context" "fmt" "math" - "os" "runtime/debug" "strconv" "strings" @@ -96,6 +95,7 @@ type Syncer struct { syncer *replication.BinlogSyncer localReader *streamer.BinlogReader binlogType BinlogType + streamer streamer.Streamer wg sync.WaitGroup jobWg sync.WaitGroup @@ -214,7 +214,7 @@ func NewSyncer(cfg *config.SubTaskConfig) *Syncer { // maybe we can refactor to remove RemoteBinlog support in DM syncer.shardingSyncCfg = syncer.syncCfg syncer.shardingSyncCfg.ServerID = math.MaxUint32 - syncer.syncCfg.ServerID - syncer.sgk = NewShardingGroupKeeper() + syncer.sgk = NewShardingGroupKeeper(cfg) syncer.ddlInfoCh = make(chan *pb.DDLInfo, 1) syncer.ddlExecInfo = NewDDLExecInfo() } @@ -299,6 +299,7 @@ func (s *Syncer) Init() (err error) { if err != nil { return errors.Trace(err) } + rollbackHolder.Add(fr.FuncRollback{Name: "close-sharding-group-keeper", Fn: s.sgk.Close}) } err = s.checkpoint.Init(nil) @@ -367,8 +368,11 @@ func (s *Syncer) initShardingGroups() error { return errors.Trace(err) } - // clear old sharding group - s.sgk.Clear() + // clear old sharding group and initials some needed data + err = s.sgk.Init(nil) + if err != nil { + return errors.Trace(err) + } // convert according to router rules // target-schema -> target-table -> source-IDs @@ -390,10 +394,16 @@ func (s *Syncer) initShardingGroups() error { } } + loadMeta, err2 := s.sgk.LoadShardMeta() + if err2 != nil { + return errors.Trace(err2) + } + // add sharding group for targetSchema, mSchema := range mapper { for targetTable, sourceIDs := range mSchema { - _, _, _, _, err := s.sgk.AddGroup(targetSchema, targetTable, sourceIDs, false) + tableID, _ := GenTableID(targetSchema, targetTable) + _, _, _, _, err := s.sgk.AddGroup(targetSchema, targetTable, sourceIDs, loadMeta[tableID], false) if err != nil { return errors.Trace(err) } @@ -411,13 +421,7 @@ func (s *Syncer) IsFreshTask() (bool, error) { return globalPoint.Compare(minCheckpoint) <= 0, nil } -// Process implements the dm.Unit interface. -func (s *Syncer) Process(ctx context.Context, pr chan pb.ProcessResult) { - syncerExitWithErrorCounter.WithLabelValues(s.cfg.Name).Add(0) - - newCtx, cancel := context.WithCancel(ctx) - defer cancel() - +func (s *Syncer) resetReplicationSyncer() { if s.binlogType == RemoteBinlog { // create new binlog-syncer if s.syncer != nil { @@ -425,11 +429,22 @@ func (s *Syncer) Process(ctx context.Context, pr chan pb.ProcessResult) { } s.syncer = replication.NewBinlogSyncer(s.syncCfg) } else if s.binlogType == LocalBinlog { + // TODO: close old local reader before creating a new one s.localReader = streamer.NewBinlogReader(&streamer.BinlogReaderConfig{ RelayDir: s.cfg.RelayDir, Timezone: s.timezone, }) } +} + +// Process implements the dm.Unit interface. +func (s *Syncer) Process(ctx context.Context, pr chan pb.ProcessResult) { + syncerExitWithErrorCounter.WithLabelValues(s.cfg.Name).Add(0) + + newCtx, cancel := context.WithCancel(ctx) + defer cancel() + + s.resetReplicationSyncer() // create new done chan s.done = make(chan struct{}) // create new job chans @@ -731,13 +746,21 @@ func (s *Syncer) flushCheckPoints() error { return nil } - var exceptTables [][]string + var ( + exceptTableIDs map[string]bool + exceptTables [][]string + shardMetaSQLs []string + shardMetaArgs [][]interface{} + ) if s.cfg.IsSharding { // flush all checkpoints except tables which are unresolved for sharding DDL - exceptTables = s.sgk.UnresolvedTables() + exceptTableIDs, exceptTables = s.sgk.UnresolvedTables() log.Infof("[syncer] flush checkpoints except for tables %v", exceptTables) + + shardMetaSQLs, shardMetaArgs = s.sgk.PrepareFlushSQLs(exceptTableIDs) + log.Debugf("shardMetaSQLs: %+v shardMetaArgs: %+v", shardMetaSQLs, shardMetaArgs) } - err := s.checkpoint.FlushPointsExcept(exceptTables) + err := s.checkpoint.FlushPointsExcept(exceptTables, shardMetaSQLs, shardMetaArgs) if err != nil { return errors.Annotatef(err, "flush checkpoint %s", s.checkpoint) } @@ -890,6 +913,19 @@ func (s *Syncer) sync(ctx context.Context, queueBucket string, db *Conn, jobChan } } +// redirectStreamer redirects binlog stream to given position +func (s *Syncer) redirectStreamer(pos mysql.Position) error { + var err error + log.Infof("reset global streamer to position: %v", pos) + s.resetReplicationSyncer() + if s.binlogType == RemoteBinlog { + s.streamer, err = s.getBinlogStreamer(s.syncer, pos) + } else if s.binlogType == LocalBinlog { + s.streamer, err = s.getBinlogStreamer(s.localReader, pos) + } + return errors.Trace(err) +} + // Run starts running for sync, we should guarantee it can rerun when paused. func (s *Syncer) Run(ctx context.Context) (err error) { defer func() { @@ -922,11 +958,10 @@ func (s *Syncer) Run(ctx context.Context) (err error) { ) log.Infof("replicate binlog from latest checkpoint %+v", lastPos) - var globalStreamer streamer.Streamer if s.binlogType == RemoteBinlog { - globalStreamer, err = s.getBinlogStreamer(s.syncer, lastPos) + s.streamer, err = s.getBinlogStreamer(s.syncer, lastPos) } else if s.binlogType == LocalBinlog { - globalStreamer, err = s.getBinlogStreamer(s.localReader, lastPos) + s.streamer, err = s.getBinlogStreamer(s.localReader, lastPos) } if err != nil { return errors.Trace(err) @@ -998,9 +1033,6 @@ func (s *Syncer) Run(ctx context.Context) (err error) { // * compare last pos with current binlog's pos to determine whether re-sync completed // 6. use the global streamer to continue the syncing var ( - shardingSyncer *replication.BinlogSyncer - shardingReader *streamer.BinlogReader - shardingStreamer streamer.Streamer shardingReSyncCh = make(chan *ShardingReSync, 10) shardingReSync *ShardingReSync savedGlobalLastPos mysql.Position @@ -1010,44 +1042,46 @@ func (s *Syncer) Run(ctx context.Context) (err error) { traceID string ) - closeShardingSyncer := func() { - if shardingSyncer != nil { - s.closeBinlogSyncer(shardingSyncer) - shardingSyncer = nil + closeShardingResync := func() error { + if shardingReSync == nil { + return nil } - if shardingReader != nil { - shardingReader.Close() - shardingReader = nil + + // if remaining DDLs in sequence, redirect global stream to the next sharding DDL position + if !shardingReSync.allResolved { + nextPos, err2 := s.sgk.ActiveDDLFirstPos(shardingReSync.targetSchema, shardingReSync.targetTable) + if err2 != nil { + return errors.Trace(err2) + } + + err2 = s.redirectStreamer(nextPos) + if err2 != nil { + return errors.Trace(err2) + } } - shardingStreamer = nil shardingReSync = nil lastPos = savedGlobalLastPos // restore global last pos + return nil } - defer closeShardingSyncer() for { s.currentPosMu.Lock() s.currentPosMu.currentPos = currentPos s.currentPosMu.Unlock() - // if there are sharding groups need to re-sync previous ignored DMLs, we use another special streamer - if shardingStreamer == nil && len(shardingReSyncCh) > 0 { + // fetch from sharding resync channel if needed, and redirect global + // stream to current binlog position recorded by ShardingReSync + if shardingReSync == nil && len(shardingReSyncCh) > 0 { // some sharding groups need to re-syncing shardingReSync = <-shardingReSyncCh savedGlobalLastPos = lastPos // save global last pos lastPos = shardingReSync.currPos - if s.binlogType == RemoteBinlog { - shardingSyncer = replication.NewBinlogSyncer(s.shardingSyncCfg) - shardingStreamer, err = s.getBinlogStreamer(shardingSyncer, shardingReSync.currPos) - } else if s.binlogType == LocalBinlog { - shardingReader = streamer.NewBinlogReader(&streamer.BinlogReaderConfig{ - RelayDir: s.cfg.RelayDir, - Timezone: s.timezone, - }) - shardingStreamer, err = s.getBinlogStreamer(shardingReader, shardingReSync.currPos) + err = s.redirectStreamer(shardingReSync.currPos) + if err != nil { + return errors.Trace(err) } - log.Debugf("[syncer] start using a special streamer to re-sync DMLs for sharding group %+v", shardingReSync) + failpoint.Inject("ReSyncExit", func() { log.Warn("[failpoint] exit triggered by ReSyncExit") utils.OsExit(1) @@ -1069,12 +1103,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } }) ctx2, cancel := context.WithTimeout(ctx, eventTimeout) - if shardingStreamer != nil { - // use sharding group's special streamer to get binlog event - e, err = shardingStreamer.GetEvent(ctx2) - } else { - e, err = globalStreamer.GetEvent(ctx2) - } + e, err = s.streamer.GetEvent(ctx2) cancel() } @@ -1096,11 +1125,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { eventTimeoutCounter = 0 if s.needResync() { log.Info("timeout, resync") - if shardingStreamer != nil { - shardingStreamer, err = s.reopenWithRetry(s.shardingSyncCfg) - } else { - globalStreamer, err = s.reopenWithRetry(s.syncCfg) - } + err = s.reopenWithRetry(s.syncCfg) if err != nil { return errors.Trace(err) } @@ -1113,11 +1138,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { // try to re-sync in gtid mode if tryReSync && s.cfg.EnableGTID && isBinlogPurgedError(err) && s.cfg.AutoFixGTID { time.Sleep(retryTimeout) - if shardingStreamer != nil { - shardingStreamer, err = s.reSyncBinlog(s.shardingSyncCfg) - } else { - globalStreamer, err = s.reSyncBinlog(s.syncCfg) - } + err = s.reSyncBinlog(s.syncCfg) if err != nil { return errors.Trace(err) } @@ -1147,7 +1168,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { lastPos: &lastPos, shardingReSync: shardingReSync, latestOp: &latestOp, - closeShardingSyncer: closeShardingSyncer, + closeShardingResync: closeShardingResync, traceSource: traceSource, safeMode: safeMode, tryReSync: tryReSync, @@ -1158,17 +1179,20 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } switch ev := e.Event.(type) { case *replication.RotateEvent: - s.handleRotateEvent(ev, ec) + err = s.handleRotateEvent(ev, ec) + if err != nil { + return errors.Trace(err) + } case *replication.RowsEvent: err = s.handleRowsEvent(ev, ec) if err != nil { - return err + return errors.Trace(err) } case *replication.QueryEvent: err = s.handleQueryEvent(ev, ec) if err != nil { - return err + return errors.Trace(err) } case *replication.XIDEvent: @@ -1177,7 +1201,10 @@ func (s *Syncer) Run(ctx context.Context) (err error) { lastPos = shardingReSync.currPos if shardingReSync.currPos.Compare(shardingReSync.latestPos) >= 0 { log.Infof("[syncer] sharding group %v re-syncing completed", shardingReSync) - closeShardingSyncer() + err = closeShardingResync() + if err != nil { + return errors.Trace(err) + } continue } } @@ -1202,7 +1229,7 @@ type eventContext struct { lastPos *mysql.Position shardingReSync *ShardingReSync latestOp *opType - closeShardingSyncer func() + closeShardingResync func() error traceSource string safeMode *sm.SafeMode tryReSync bool @@ -1215,7 +1242,7 @@ type eventContext struct { // TODO: Further split into smaller functions and group common arguments into // a context struct. -func (s *Syncer) handleRotateEvent(ev *replication.RotateEvent, ec eventContext) { +func (s *Syncer) handleRotateEvent(ev *replication.RotateEvent, ec eventContext) error { *ec.currentPos = mysql.Position{ Name: string(ev.NextLogName), Pos: uint32(ev.Position), @@ -1231,11 +1258,14 @@ func (s *Syncer) handleRotateEvent(ev *replication.RotateEvent, ec eventContext) if ec.shardingReSync.currPos.Compare(ec.shardingReSync.latestPos) >= 0 { log.Infof("[syncer] sharding group %+v re-syncing completed", ec.shardingReSync) - ec.closeShardingSyncer() + err := ec.closeShardingResync() + if err != nil { + return errors.Trace(err) + } } else { log.Debugf("[syncer] rotate binlog to %v when re-syncing sharding group %+v", ec.currentPos, ec.shardingReSync) } - return + return nil } *ec.latestOp = rotate @@ -1250,6 +1280,7 @@ func (s *Syncer) handleRotateEvent(ev *replication.RotateEvent, ec eventContext) } log.Infof("rotate binlog to %v", *ec.currentPos) + return nil } func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) error { @@ -1264,8 +1295,7 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err ec.shardingReSync.currPos.Pos = ec.header.LogPos if ec.shardingReSync.currPos.Compare(ec.shardingReSync.latestPos) >= 0 { log.Infof("[syncer] sharding group %v re-syncing completed", ec.shardingReSync) - ec.closeShardingSyncer() - return nil + return errors.Trace(ec.closeShardingResync()) } if ec.shardingReSync.targetSchema != schemaName || ec.shardingReSync.targetTable != tableName { // in re-syncing, ignore non current sharding group's events @@ -1274,6 +1304,7 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err } } + // DML position before table checkpoint, ignore it if !s.checkpoint.IsNewerTablePoint(originSchema, originTable, *ec.currentPos) { log.Debugf("[syncer] ignore obsolete row event in %s that is old than checkpoint of table %s.%s", *ec.currentPos, originSchema, originTable) return nil @@ -1297,8 +1328,9 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err if s.cfg.IsSharding { source, _ := GenTableID(originSchema, originTable) - if s.sgk.InSyncing(schemaName, tableName, source) { - // current source is in sharding DDL syncing, ignore DML + if s.sgk.InSyncing(schemaName, tableName, source, *ec.currentPos) { + // if in unsync stage and not before active DDL, ignore it + // if in sharding re-sync stage and not before active DDL (the next DDL to be synced), ignore it log.Debugf("[syncer] source %s is in sharding DDL syncing, ignore Rows event %v", source, ec.currentPos) return nil } @@ -1431,9 +1463,13 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e ec.shardingReSync.currPos.Pos = ec.header.LogPos if ec.shardingReSync.currPos.Compare(ec.shardingReSync.latestPos) >= 0 { log.Infof("[syncer] sharding group %v re-syncing completed", ec.shardingReSync) - ec.closeShardingSyncer() + err2 := ec.closeShardingResync() + if err2 != nil { + return errors.Trace(err2) + } } else { - // in re-syncing, we can simply skip all DDLs + // in re-syncing, we can simply skip all DDLs, + // as they have been added to sharding DDL sequence // only update lastPos when the query is a real DDL *ec.lastPos = ec.shardingReSync.currPos log.Debugf("[syncer] skip query event when re-syncing sharding group %+v", ec.shardingReSync) @@ -1596,6 +1632,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e needShardingHandle bool group *ShardingGroup synced bool + active bool remain int source string ddlExecItem *DDLExecItem @@ -1613,16 +1650,21 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e // for CREATE DATABASE, we do nothing. when CREATE TABLE under this DATABASE, sharding groups will be added case *ast.CreateTableStmt: // for CREATE TABLE, we add it to group - needShardingHandle, group, synced, remain, err = s.sgk.AddGroup(ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name, []string{source}, true) + needShardingHandle, group, synced, remain, err = s.sgk.AddGroup(ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name, []string{source}, nil, true) if err != nil { return errors.Trace(err) } log.Infof("[syncer] add table %s to shard group (%v)", source, needShardingHandle) default: - needShardingHandle, group, synced, remain, err = s.sgk.TrySync(ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name, source, startPos, *ec.currentPos, needHandleDDLs) + needShardingHandle, group, synced, active, remain, err = s.sgk.TrySync(ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name, source, startPos, *ec.currentPos, needHandleDDLs) if err != nil { return errors.Trace(err) } + // meets DDL that will not be processed in sequence sharding + if !active { + log.Infof("[syncer] skip in-activeDDL %v from source %s", needHandleDDLs, source) + return nil + } log.Infof("[syncer] try to sync table %s to shard group (%v)", source, needShardingHandle) } @@ -1657,11 +1699,17 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e if firstEndPos == nil { return errors.Errorf("no valid End_log_pos of the first DDL exists for sharding group with source %s", source) } + + allResolved, err2 := s.sgk.ResolveShardingDDL(ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name) + if err2 != nil { + return errors.Trace(err2) + } *ec.shardingReSyncCh <- &ShardingReSync{ currPos: *firstEndPos, latestPos: *ec.currentPos, targetSchema: ddlInfo.tableNames[1][0].Schema, targetTable: ddlInfo.tableNames[1][0].Name, + allResolved: allResolved, } // Don't send new DDLInfo to dm-master until all local sql jobs finished @@ -1692,7 +1740,17 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e if ddlExecItem.req.Exec { failpoint.Inject("ShardSyncedExecutionExit", func() { log.Warn("[failpoint] exit triggered by ShardSyncedExecutionExit") - os.Exit(1) + utils.OsExit(1) + }) + failpoint.Inject("SequenceShardSyncedExecutionExit", func() { + group := s.sgk.Group(ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name) + if group != nil { + // exit in the first round sequence sharding DDL only + if group.meta.ActiveIdx() == 1 { + log.Warn("[failpoint] exit triggered by SequenceShardSyncedExecutionExit") + utils.OsExit(1) + } + } }) log.Infof("[syncer] add DDL %v to job, request is %+v", ddlInfo1.DDLs, ddlExecItem.req) @@ -1938,20 +1996,21 @@ func (s *Syncer) flushJobs() error { return errors.Trace(err) } -func (s *Syncer) reSyncBinlog(cfg replication.BinlogSyncerConfig) (streamer.Streamer, error) { +func (s *Syncer) reSyncBinlog(cfg replication.BinlogSyncerConfig) error { err := s.retrySyncGTIDs() if err != nil { - return nil, errors.Trace(err) + return errors.Trace(err) } // close still running sync - return s.reopenWithRetry(cfg) + return errors.Trace(s.reopenWithRetry(cfg)) } -func (s *Syncer) reopenWithRetry(cfg replication.BinlogSyncerConfig) (streamer streamer.Streamer, err error) { +func (s *Syncer) reopenWithRetry(cfg replication.BinlogSyncerConfig) error { + var err error for i := 0; i < maxRetryCount; i++ { - streamer, err = s.reopen(cfg) + s.streamer, err = s.reopen(cfg) if err == nil { - return + return nil } if needRetryReplicate(err) { log.Infof("[syncer] retry open binlog streamer %v", err) @@ -1960,7 +2019,7 @@ func (s *Syncer) reopenWithRetry(cfg replication.BinlogSyncerConfig) (streamer s } break } - return nil, errors.Trace(err) + return errors.Trace(err) } func (s *Syncer) reopen(cfg replication.BinlogSyncerConfig) (streamer.Streamer, error) { @@ -2112,7 +2171,7 @@ func (s *Syncer) Resume(ctx context.Context, pr chan pb.ProcessResult) { // now no config diff implemented, so simply re-init use new config func (s *Syncer) Update(cfg *config.SubTaskConfig) error { if s.cfg.IsSharding { - tables := s.sgk.UnresolvedTables() + _, tables := s.sgk.UnresolvedTables() if len(tables) > 0 { return errors.NotSupportedf("try update config when some tables' (%v) sharding DDL not synced", tables) } diff --git a/tests/safe_mode/check_safe_mode.go b/tests/_dmctl_tools/check_safe_mode.go similarity index 100% rename from tests/safe_mode/check_safe_mode.go rename to tests/_dmctl_tools/check_safe_mode.go diff --git a/tests/others_integration.txt b/tests/others_integration.txt index 3292de58f4..6ef9386f8d 100644 --- a/tests/others_integration.txt +++ b/tests/others_integration.txt @@ -1 +1,3 @@ all_mode +sequence_sharding +sequence_safe_mode diff --git a/tests/safe_mode/run.sh b/tests/safe_mode/run.sh index 73450a66ec..c6049a8b25 100755 --- a/tests/safe_mode/run.sh +++ b/tests/safe_mode/run.sh @@ -74,8 +74,7 @@ function run() { check_sync_diff $WORK_DIR $cur/conf/diff_config.toml - cd $cur && GO111MODULE=on go build -o bin/check_safe_mode check_safe_mode.go && cd - - $cur/bin/check_safe_mode + $cur/../bin/check_safe_mode } cleanup1 safe_mode_target diff --git a/tests/sequence_safe_mode/conf/diff_config.toml b/tests/sequence_safe_mode/conf/diff_config.toml new file mode 100644 index 0000000000..3251130109 --- /dev/null +++ b/tests/sequence_safe_mode/conf/diff_config.toml @@ -0,0 +1,57 @@ +# diff Configuration. + +log-level = "info" + +chunk-size = 1000 + +check-thread-count = 4 + +sample-percent = 100 + +use-rowid = false + +use-checksum = true + +fix-sql-file = "fix.sql" + +# tables need to check. +[[check-tables]] +schema = "sequence_safe_mode_target" +tables = ["t_target"] + +[[table-config]] +schema = "sequence_safe_mode_target" +table = "t_target" +ignore-columns = ["id"] +is-sharding = true +index-field = "uid" + +[[table-config.source-tables]] +instance-id = "source-1" +schema = "sequence_safe_mode_test" +table = "~t.*" + +[[table-config.source-tables]] +instance-id = "source-2" +schema = "sequence_safe_mode_test" +table = "~t.*" + +[[source-db]] +host = "127.0.0.1" +port = 3306 +user = "root" +password = "" +instance-id = "source-1" + +[[source-db]] +host = "127.0.0.1" +port = 3307 +user = "root" +password = "" +instance-id = "source-2" + +[target-db] +host = "127.0.0.1" +port = 4000 +user = "root" +password = "" diff --git a/tests/sequence_safe_mode/conf/dm-master.toml b/tests/sequence_safe_mode/conf/dm-master.toml new file mode 100644 index 0000000000..334e0de993 --- /dev/null +++ b/tests/sequence_safe_mode/conf/dm-master.toml @@ -0,0 +1,9 @@ +# Master Configuration. + +[[deploy]] +source-id = "mysql-replica-01" +dm-worker = "127.0.0.1:8262" + +[[deploy]] +source-id = "mysql-replica-02" +dm-worker = "127.0.0.1:8263" diff --git a/tests/sequence_safe_mode/conf/dm-task.yaml b/tests/sequence_safe_mode/conf/dm-task.yaml new file mode 100644 index 0000000000..772c4db954 --- /dev/null +++ b/tests/sequence_safe_mode/conf/dm-task.yaml @@ -0,0 +1,87 @@ +--- +name: test +task-mode: all +is-sharding: true +meta-schema: "dm_meta" +remove-meta: false +disable-heartbeat: true +timezone: "Asia/Shanghai" + +target-database: + host: "127.0.0.1" + port: 4000 + user: "root" + password: "" + +mysql-instances: + - source-id: "mysql-replica-01" + server-id: 101 + black-white-list: "instance" + route-rules: ["sharding-route-rules-table", "sharding-route-rules-schema"] + column-mapping-rules: ["instance-1"] + mydumper-config-name: "global" + loader-config-name: "global" + syncer-config-name: "global" + + - source-id: "mysql-replica-02" + server-id: 102 + black-white-list: "instance" + route-rules: ["sharding-route-rules-table", "sharding-route-rules-schema"] + column-mapping-rules: ["instance-2"] + mydumper-config-name: "global" + loader-config-name: "global" + syncer-config-name: "global" + +black-white-list: + instance: + do-dbs: ["sequence_safe_mode_test"] + do-tables: + - db-name: "sequence_safe_mode_test" + tbl-name: "~^t[\\d]+" + +routes: + sharding-route-rules-table: + schema-pattern: sequence_safe_mode_test + table-pattern: t* + target-schema: sequence_safe_mode_target + target-table: t_target + + sharding-route-rules-schema: + schema-pattern: sequence_safe_mode_test + target-schema: sequence_safe_mode_target + +column-mappings: + instance-1: + schema-pattern: "sequence_safe_mode_test" + table-pattern: "t*" + expression: "partition id" + source-column: "id" + target-column: "id" + arguments: ["1", "", "t"] + + instance-2: + schema-pattern: "sequence_safe_mode_test" + table-pattern: "t*" + expression: "partition id" + source-column: "id" + target-column: "id" + arguments: ["2", "", "t"] + +mydumpers: + global: + mydumper-path: "./bin/mydumper" + threads: 4 + chunk-filesize: 64 + skip-tz-utc: true + +loaders: + global: + pool-size: 16 + dir: "./dumped_data" + extra-args: "-B sequence_safe_mode_test" + +syncers: + global: + worker-count: 16 + batch: 100 + max-retry: 100 diff --git a/tests/sequence_safe_mode/conf/dm-tracer.toml b/tests/sequence_safe_mode/conf/dm-tracer.toml new file mode 100644 index 0000000000..8585c34182 --- /dev/null +++ b/tests/sequence_safe_mode/conf/dm-tracer.toml @@ -0,0 +1,4 @@ +# Tracer Configuration. + +log-level = "debug" +enable = true diff --git a/tests/sequence_safe_mode/conf/dm-worker1.toml b/tests/sequence_safe_mode/conf/dm-worker1.toml new file mode 100644 index 0000000000..94634a7159 --- /dev/null +++ b/tests/sequence_safe_mode/conf/dm-worker1.toml @@ -0,0 +1,21 @@ +# Worker Configuration. + +server-id = 101 +source-id = "mysql-replica-01" +flavor = "mysql" +meta-file = "relay.meta" +enable-gtid = false +relay-binlog-name = "" +relay-binlog-gtid = "" + +[from] +host = "127.0.0.1" +user = "root" +password = "" +port = 3306 + +[tracer] +enable = true +tracer-addr = "127.0.0.1:8264" +batch-size = 1 +checksum = true diff --git a/tests/sequence_safe_mode/conf/dm-worker2.toml b/tests/sequence_safe_mode/conf/dm-worker2.toml new file mode 100644 index 0000000000..828985a571 --- /dev/null +++ b/tests/sequence_safe_mode/conf/dm-worker2.toml @@ -0,0 +1,21 @@ +# Worker Configuration. + +server-id = 102 +source-id = "mysql-replica-02" +flavor = "mysql" +meta-file = "relay.meta" +enable-gtid = false +relay-binlog-name = "" +relay-binlog-gtid = "" + +[from] +host = "127.0.0.1" +user = "root" +password = "" +port = 3307 + +[tracer] +enable = true +tracer-addr = "127.0.0.1:8264" +batch-size = 1 +checksum = true diff --git a/tests/sequence_safe_mode/data/db1.increment.sql b/tests/sequence_safe_mode/data/db1.increment.sql new file mode 100644 index 0000000000..1eec206bac --- /dev/null +++ b/tests/sequence_safe_mode/data/db1.increment.sql @@ -0,0 +1,18 @@ +use sequence_safe_mode_test; +insert into t1 (uid, name) values (10003, 'Buenos Aires'); +alter table t1 add column age int; +alter table t1 add index age(age); +alter table t1 add column level int; +alter table t1 add index level(level); +insert into t1 (uid, name, age) values (10005, 'Buenos Aires', 200); +insert into t2 (uid, name) values (20005, 'Aureliano José'); +insert into t1 (uid, name, age) values (10006, 'Buenos Aires', 200); +alter table t2 add column age int; +alter table t2 add index age(age); +alter table t2 add column level int; +alter table t2 add index level(level); +insert into t1 (uid, name, age) values (10007, 'Buenos Aires', 300); +insert into t2 (uid, name, age) values (20006, 'Colonel Aureliano Buendía', 301); +insert into t1 (uid, name, age) values (10008, 'Buenos Aires', 400); +insert into t2 (uid, name, age) values (20007, 'Colonel Aureliano Buendía', 401); +update t1 set age = 404 where uid = 10005; diff --git a/tests/sequence_safe_mode/data/db1.increment2.sql b/tests/sequence_safe_mode/data/db1.increment2.sql new file mode 100644 index 0000000000..40fa9c8d9c --- /dev/null +++ b/tests/sequence_safe_mode/data/db1.increment2.sql @@ -0,0 +1,16 @@ +use sequence_safe_mode_test; +insert into t1 (uid, name, age) values (10009, 'Buenos Aires', 100); +insert into t2 (uid, name, age) values (20008, 'Colonel Aureliano Buendía', 402); +alter table t1 add column age2 int; +alter table t1 add index age2(age2); +insert into t1 (uid, name, age, age2) values (10010, 'Buenos Aires', 200, 404); +insert into t2 (uid, name, age) values (20009, 'Colonel Aureliano Buendía', 100); +update t2 set age = age + 1 where uid = 20008; +update t2 set age = age + 2 where uid = 20007; +update t2 set age = age + 1 where uid = 20009; +update t1 set age = age + 1 where uid = 10008; +update t1 set age = age + 1 where uid = 10009; +alter table t2 add column age2 int; +alter table t2 add index age2(age2); +update t1 set age = age + 1 where uid in (10010, 10011); +update t2 set age = age + 1 where uid in (20009, 20010); diff --git a/tests/sequence_safe_mode/data/db1.prepare.sql b/tests/sequence_safe_mode/data/db1.prepare.sql new file mode 100644 index 0000000000..cd99922c77 --- /dev/null +++ b/tests/sequence_safe_mode/data/db1.prepare.sql @@ -0,0 +1,7 @@ +drop database if exists `sequence_safe_mode_test`; +create database `sequence_safe_mode_test`; +use `sequence_safe_mode_test`; +create table t1 (id bigint auto_increment, uid int, name varchar(80), primary key (`id`), unique key(`uid`)) DEFAULT CHARSET=utf8mb4; +create table t2 (id bigint auto_increment, uid int, name varchar(80), primary key (`id`), unique key(`uid`)) DEFAULT CHARSET=utf8mb4; +insert into t1 (uid, name) values (10001, 'Gabriel García Márquez'), (10002, 'Cien años de soledad'); +insert into t2 (uid, name) values (20001, 'José Arcadio Buendía'), (20002, 'Úrsula Iguarán'), (20003, 'José Arcadio'); diff --git a/tests/sequence_safe_mode/data/db2.increment.sql b/tests/sequence_safe_mode/data/db2.increment.sql new file mode 100644 index 0000000000..a64470de11 --- /dev/null +++ b/tests/sequence_safe_mode/data/db2.increment.sql @@ -0,0 +1,13 @@ +use sequence_safe_mode_test; +delete from t3 where name = 'Santa Sofía de la Piedad'; +alter table t2 add column age int; +alter table t2 add index age(age); +alter table t2 add column level int; +alter table t2 add index level(level); +insert into t2 (uid, name, age) values (40002, 'Remedios Moscote', 100), (40003, 'Amaranta', 103); +insert into t3 (uid, name) values (30004, 'Aureliano José'), (30005, 'Santa Sofía de la Piedad'), (30006, '17 Aurelianos'); +alter table t3 add column age int; +alter table t3 add index age(age); +alter table t3 add column level int; +alter table t3 add index level(level); +update t3 set age = 1; diff --git a/tests/sequence_safe_mode/data/db2.increment2.sql b/tests/sequence_safe_mode/data/db2.increment2.sql new file mode 100644 index 0000000000..e911d71ade --- /dev/null +++ b/tests/sequence_safe_mode/data/db2.increment2.sql @@ -0,0 +1,10 @@ +use sequence_safe_mode_test; +alter table t2 add column age2 int; +alter table t2 add index age2(age2); +insert into t2 (uid, name, age, age2) values (40004, 'Remedios Moscote', 100, 300), (40005, 'Amaranta', 103, 301); +insert into t3 (uid, name, age) values (30007, 'Aureliano José', 99), (30008, 'Santa Sofía de la Piedad', 999), (30009, '17 Aurelianos', 9999); +update t2 set age = age + 33 where uid = 40004; +update t3 set age = age + 44 where uid > 30006 and uid < 30010; +alter table t3 add column age2 int; +alter table t3 add index age2(age2); +update t3 set age2 = 100; diff --git a/tests/sequence_safe_mode/data/db2.prepare.sql b/tests/sequence_safe_mode/data/db2.prepare.sql new file mode 100644 index 0000000000..fedddf1dfa --- /dev/null +++ b/tests/sequence_safe_mode/data/db2.prepare.sql @@ -0,0 +1,7 @@ +drop database if exists `sequence_safe_mode_test`; +create database `sequence_safe_mode_test`; +use `sequence_safe_mode_test`; +create table t2 (id bigint auto_increment, uid int, name varchar(80), primary key (`id`), unique key(`uid`)) DEFAULT CHARSET=utf8mb4; +create table t3 (id bigint auto_increment, uid int, name varchar(80), primary key (`id`), unique key(`uid`)) DEFAULT CHARSET=utf8mb4; +insert into t2 (uid, name) values (40000, 'Remedios Moscote'), (40001, 'Amaranta'); +insert into t3 (uid, name) values (30001, 'Aureliano José'), (30002, 'Santa Sofía de la Piedad'), (30003, '17 Aurelianos'); diff --git a/tests/sequence_safe_mode/run.sh b/tests/sequence_safe_mode/run.sh new file mode 100755 index 0000000000..9c2602893c --- /dev/null +++ b/tests/sequence_safe_mode/run.sh @@ -0,0 +1,99 @@ +#!/bin/bash + +set -eu + +cur=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) +source $cur/../_utils/test_prepare +WORK_DIR=$TEST_DIR/$TEST_NAME + +function run() { + run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 + check_contains 'Query OK, 2 rows affected' + run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 + check_contains 'Query OK, 3 rows affected' + + export GO_FAILPOINTS='github.com/pingcap/dm/syncer/ReSyncExit=return(true)' + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + run_dm_tracer $WORK_DIR/tracer $TRACER_PORT $cur/conf/dm-tracer.toml + check_port_alive $TRACER_PORT + + dmctl_start_task + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + + # DM-worker exit during re-sync after sharding group synced + run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 + run_sql_file $cur/data/db2.increment.sql $MYSQL_HOST2 $MYSQL_PORT2 + + check_port_offline $WORKER1_PORT 20 + check_port_offline $WORKER2_PORT 20 + + export GO_FAILPOINTS='' + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + + pkill -hup dm-worker.test 2>/dev/null || true + wait_process_exit dm-worker.test + + export GO_FAILPOINTS='github.com/pingcap/dm/syncer/SequenceShardSyncedExecutionExit=return(true);github.com/pingcap/dm/syncer/SafeModeInitPhaseSeconds=return(0)' + + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + + # DM-worker exit when waiting for sharding group synced + run_sql_file $cur/data/db1.increment2.sql $MYSQL_HOST1 $MYSQL_PORT1 + run_sql_file $cur/data/db2.increment2.sql $MYSQL_HOST2 $MYSQL_PORT2 + + i=0 + while [ $i -lt 10 ]; do + # we can't determine which DM-worker is the sharding lock owner, so we try both of them + # DM-worker1 is sharding lock owner and exits + if [ "$(check_port_return $WORKER1_PORT)" == "0" ]; then + echo "DM-worker1 is sharding lock owner and detects it offline" + export GO_FAILPOINTS='github.com/pingcap/dm/syncer/SafeModeInitPhaseSeconds=return(0)' + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + break + fi + # DM-worker2 is sharding lock owner and exits + if [ "$(check_port_return $WORKER2_PORT)" == "0" ]; then + echo "DM-worker2 is sharding lock owner and detects it offline" + export GO_FAILPOINTS='github.com/pingcap/dm/syncer/SafeModeInitPhaseSeconds=return(0)' + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + break + fi + + ((i+=1)) + echo "wait for one of DM-worker offine failed, retry later" && sleep 1 + done + if [ $i -ge 10 ]; then + echo "wait DM-worker offline timeout" + exit 1 + fi + + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + + $cur/../bin/check_safe_mode +} + +cleanup1 sequence_safe_mode_target +# also cleanup dm processes in case of last run failed +cleanup2 $* +run $* +cleanup2 $* + +wait_process_exit dm-master.test +wait_process_exit dm-worker.test +wait_process_exit dm-tracer.test + +echo "[$(date)] <<<<<< test case $TEST_NAME success! >>>>>>" diff --git a/tests/sequence_sharding/conf/diff_config.toml b/tests/sequence_sharding/conf/diff_config.toml new file mode 100644 index 0000000000..a6dff60f77 --- /dev/null +++ b/tests/sequence_sharding/conf/diff_config.toml @@ -0,0 +1,58 @@ +# diff Configuration. + +log-level = "info" + +chunk-size = 10 + +check-thread-count = 4 + +sample-percent = 100 + +use-rowid = false + +use-checksum = true + +fix-sql-file = "fix.sql" + +# tables need to check. +[[check-tables]] +schema = "sharding_target2" +tables = ["t_target"] + +[[table-config]] +schema = "sharding_target2" +table = "t_target" +remove-columns = ["id"] +is-sharding = true +index-fields = "uid" +# range-placeholder + +[[table-config.source-tables]] +instance-id = "source-1" +schema = "sharding_seq" +table = "~t.*" + +[[table-config.source-tables]] +instance-id = "source-2" +schema = "sharding_seq" +table = "~t.*" + +[[source-db]] +host = "127.0.0.1" +port = 3306 +user = "root" +password = "" +instance-id = "source-1" + +[[source-db]] +host = "127.0.0.1" +port = 3307 +user = "root" +password = "" +instance-id = "source-2" + +[target-db] +host = "127.0.0.1" +port = 4000 +user = "root" +password = "" diff --git a/tests/sequence_sharding/conf/dm-master.toml b/tests/sequence_sharding/conf/dm-master.toml new file mode 100644 index 0000000000..334e0de993 --- /dev/null +++ b/tests/sequence_sharding/conf/dm-master.toml @@ -0,0 +1,9 @@ +# Master Configuration. + +[[deploy]] +source-id = "mysql-replica-01" +dm-worker = "127.0.0.1:8262" + +[[deploy]] +source-id = "mysql-replica-02" +dm-worker = "127.0.0.1:8263" diff --git a/tests/sequence_sharding/conf/dm-task.yaml b/tests/sequence_sharding/conf/dm-task.yaml new file mode 100644 index 0000000000..78b309d032 --- /dev/null +++ b/tests/sequence_sharding/conf/dm-task.yaml @@ -0,0 +1,87 @@ +--- +name: sequence_sharding +task-mode: all +is-sharding: true +meta-schema: "dm_meta" +remove-meta: false +enable-heartbeat: true +timezone: "Asia/Shanghai" + +target-database: + host: "127.0.0.1" + port: 4000 + user: "root" + password: "" + +mysql-instances: + - source-id: "mysql-replica-01" + server-id: 101 + black-white-list: "instance" + route-rules: ["sharding-route-rules-table", "sharding-route-rules-schema"] + column-mapping-rules: ["instance-1"] + mydumper-config-name: "global" + loader-config-name: "global" + syncer-config-name: "global" + + - source-id: "mysql-replica-02" + server-id: 102 + black-white-list: "instance" + route-rules: ["sharding-route-rules-table", "sharding-route-rules-schema"] + column-mapping-rules: ["instance-2"] + mydumper-config-name: "global" + loader-config-name: "global" + syncer-config-name: "global" + +black-white-list: + instance: + do-dbs: ["sharding_seq"] + do-tables: + - db-name: "sharding_seq" + tbl-name: "~^t[\\d]+" + +routes: + sharding-route-rules-table: + schema-pattern: sharding_seq* + table-pattern: t* + target-schema: sharding_target2 + target-table: t_target + + sharding-route-rules-schema: + schema-pattern: sharding_seq* + target-schema: sharding_target2 + +column-mappings: + instance-1: + schema-pattern: "sharding_seq*" + table-pattern: "t*" + expression: "partition id" + source-column: "id" + target-column: "id" + arguments: ["1", "", "t"] + + instance-2: + schema-pattern: "sharding_seq*" + table-pattern: "t*" + expression: "partition id" + source-column: "id" + target-column: "id" + arguments: ["2", "", "t"] + +mydumpers: + global: + mydumper-path: "./bin/mydumper" + threads: 4 + chunk-filesize: 64 + skip-tz-utc: true + extra-args: "--regex '^sharding_seq.*'" + +loaders: + global: + pool-size: 16 + dir: "./dumped_data" + +syncers: + global: + worker-count: 16 + batch: 100 + max-retry: 100 diff --git a/tests/sequence_sharding/conf/dm-worker1.toml b/tests/sequence_sharding/conf/dm-worker1.toml new file mode 100644 index 0000000000..62157e6cd2 --- /dev/null +++ b/tests/sequence_sharding/conf/dm-worker1.toml @@ -0,0 +1,15 @@ +# Worker Configuration. + +server-id = 101 +source-id = "mysql-replica-01" +flavor = "mysql" +meta-file = "relay.meta" +enable-gtid = false +relay-binlog-name = "" +relay-binlog-gtid = "" + +[from] +host = "127.0.0.1" +user = "root" +password = "" +port = 3306 diff --git a/tests/sequence_sharding/conf/dm-worker2.toml b/tests/sequence_sharding/conf/dm-worker2.toml new file mode 100644 index 0000000000..7edb4c5415 --- /dev/null +++ b/tests/sequence_sharding/conf/dm-worker2.toml @@ -0,0 +1,15 @@ +# Worker Configuration. + +server-id = 102 +source-id = "mysql-replica-02" +flavor = "mysql" +meta-file = "relay.meta" +enable-gtid = false +relay-binlog-name = "" +relay-binlog-gtid = "" + +[from] +host = "127.0.0.1" +user = "root" +password = "" +port = 3307 diff --git a/tests/sequence_sharding/data/db1.increment.sql b/tests/sequence_sharding/data/db1.increment.sql new file mode 100644 index 0000000000..801c992577 --- /dev/null +++ b/tests/sequence_sharding/data/db1.increment.sql @@ -0,0 +1,22 @@ +use sharding_seq; +insert into t1 (uid,name) values (100003,'NR'); +update t1 set name = 'uxoKehvqWg' where `uid` = 100001; +update t1 set name = 'bapYymrtfT' where name = 'igvApUx'; +insert into t2 (uid,name) values (200004,'CXDvoltoliUINgo'),(200005,'188689130'); +alter table t1 add column c int; +alter table t1 add index c(c); +update t1 set c = 100; +alter table t1 add column d int; +alter table t1 add index d(d); +alter table t1 add column e int, add index e(e); +update t1 set d = 200; +alter table t2 add column c int; +alter table t2 add index c(c); +update t2 set c = 100; +alter table t2 add column d int; +alter table t2 add index d(d); +alter table t2 add column e int, add index e(e); +update t2 set d = 200; +update t1 set c = 101; +update t2 set c = 102; +insert into t1 (uid,name,c) values(100004,'VALUES',191472878),(100005,'jAPlnzXli',1091218279); diff --git a/tests/sequence_sharding/data/db1.prepare.sql b/tests/sequence_sharding/data/db1.prepare.sql new file mode 100644 index 0000000000..21b580481b --- /dev/null +++ b/tests/sequence_sharding/data/db1.prepare.sql @@ -0,0 +1,7 @@ +drop database if exists `sharding_seq`; +create database `sharding_seq`; +use `sharding_seq`; +create table t1 (id bigint auto_increment,uid int,name varchar(20),primary key (`id`),unique key(`uid`)) DEFAULT CHARSET=utf8mb4; +create table t2 (id bigint auto_increment,uid int,name varchar(20),primary key (`id`),unique key(`uid`)) DEFAULT CHARSET=utf8mb4; +insert into t1 (uid,name) values (100001,'igvApUx'),(100002,'qUyrcOBkwDK'); +insert into t2 (uid,name) values (200001,'EycletJHetWHMfH'),(200002,'ytkIaCOwXnWmy'),(200003,'MWQeWw""''rNmtGxzGp'); diff --git a/tests/sequence_sharding/data/db2.increment.sql b/tests/sequence_sharding/data/db2.increment.sql new file mode 100644 index 0000000000..6bd92ca190 --- /dev/null +++ b/tests/sequence_sharding/data/db2.increment.sql @@ -0,0 +1,27 @@ +use sharding_seq; +delete from t3 where id = 400002; +insert into t4 (uid,name) values(500005,'`.`'),(500006,'exit'); +alter table t2 add column c int; +alter table t2 add index c(c); +update t2 set c = 100; +alter table t2 add column d int; +alter table t2 add index d(d); +alter table t2 add column e int, add index e(e); +update t2 set d = 200; +alter table t3 add column c int; +alter table t3 add index c(c); +update t3 set c = 100; +alter table t3 add column d int; +alter table t3 add index d(d); +alter table t3 add column e int, add index e(e); +update t3 set d = 200; +alter table t4 add column c int; +alter table t4 add index c(c); +update t4 set c = 100; +alter table t4 add column d int; +alter table t4 add index d(d); +alter table t4 add column e int, add index e(e); +update t4 set d = 200; +update t4 set uid=uid+100000; +insert into t2 (uid,name,c) values(300003,'nvWgBf',73),(300004,'nD1000',4029); +insert into t3 (uid,name,c) values(400004,'1000',1000); diff --git a/tests/sequence_sharding/data/db2.prepare.sql b/tests/sequence_sharding/data/db2.prepare.sql new file mode 100644 index 0000000000..23223d148c --- /dev/null +++ b/tests/sequence_sharding/data/db2.prepare.sql @@ -0,0 +1,9 @@ +drop database if exists `sharding_seq`; +create database `sharding_seq`; +use `sharding_seq`; +create table t2 (id bigint auto_increment,uid int,name varchar(20),primary key (`id`),unique key(`uid`)) DEFAULT CHARSET=utf8mb4; +create table t3 (id bigint auto_increment,uid int,name varchar(20),primary key (`id`),unique key(`uid`)) DEFAULT CHARSET=utf8mb4; +create table t4 (id bigint auto_increment,uid int,name varchar(20),primary key (`id`),unique key(`uid`)) DEFAULT CHARSET=utf8mb4; +insert into t2 (uid,name) values (300001,'io'),(300002,'xOKvsDsofmAzEF'); +insert into t3 (uid,name) values (400001,'eXcRSo'),(400002,'QOP'),(400003,'DUotcCayM'); +insert into t4 (uid,name) values (500001,'`id` = 15'),(500002,'942032497'),(500003,'UrhcHUbwsDMZrvJxM'); diff --git a/tests/sequence_sharding/run.sh b/tests/sequence_sharding/run.sh new file mode 100755 index 0000000000..78f795b7c2 --- /dev/null +++ b/tests/sequence_sharding/run.sh @@ -0,0 +1,42 @@ +#!/bin/bash + +set -eu + +cur=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) +source $cur/../_utils/test_prepare +WORK_DIR=$TEST_DIR/$TEST_NAME + +function run() { + run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 + run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 + + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + + # start DM task only + dmctl_start_task + + # use sync_diff_inspector to check full dump loader + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + + run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 + run_sql_file $cur/data/db2.increment.sql $MYSQL_HOST2 $MYSQL_PORT2 + + # use sync_diff_inspector to check data now! + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml +} + +cleanup1 sharding_target2 +# also cleanup dm processes in case of last run failed +cleanup2 $* +run $* +cleanup2 $* + +wait_process_exit dm-master.test +wait_process_exit dm-worker.test + +echo "[$(date)] <<<<<< test case $TEST_NAME success! >>>>>>"