From d2051ed75511e069a512f54fce25a02f6d5a73fd Mon Sep 17 00:00:00 2001 From: GMHDBJD <35025882+GMHDBJD@users.noreply.github.com> Date: Fri, 13 Aug 2021 05:05:02 -0400 Subject: [PATCH] cherrypick #1971 and #1990 to release-2.0.6 (#1991) --- CHANGELOG.md | 10 +++++ pkg/shardddl/optimism/keeper.go | 26 ++++++++--- pkg/shardddl/optimism/lock.go | 14 ++++++ tests/shardddl1/run.sh | 78 +++++++++++++++++++++++++++++++++ 4 files changed, 122 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a572ca3cda..91323f1de6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,16 @@ # DM Changelog All notable changes to this project will be documented in this file. +## [2.0.6] 2021-08-13 + +### Bug fixes + +- Fix the issue that the metadata inconsistency between ddl infos and upstream tables in the optimistic sharding DDL mode causes DM-master panic [#1971](https://github.com/pingcap/dm/pull/1971) + +### Known issues + +[GitHub issues](https://github.com/pingcap/dm/issues?q=is%3Aissue+label%3Aaffected-v2.0.6) + ## [2.0.5] 2021-07-30 ### Improvements diff --git a/pkg/shardddl/optimism/keeper.go b/pkg/shardddl/optimism/keeper.go index 5521e8b0e8..08ed293d90 100644 --- a/pkg/shardddl/optimism/keeper.go +++ b/pkg/shardddl/optimism/keeper.go @@ -51,15 +51,23 @@ func (lk *LockKeeper) RebuildLocksAndTables( lock *Lock ok bool ) - for _, taskInfos := range ifm { - for _, sourceInfos := range taskInfos { - for _, schemaInfos := range sourceInfos { - for _, info := range schemaInfos { + for task, taskInfos := range ifm { + for source, sourceInfos := range taskInfos { + for schema, schemaInfos := range sourceInfos { + for table, info := range schemaInfos { lockID := utils.GenDDLLockID(info.Task, info.DownSchema, info.DownTable) if lock, ok = lk.locks[lockID]; !ok { - lk.locks[lockID] = NewLock(cli, lockID, info.Task, info.DownSchema, info.DownTable, lockJoined[lockID], lockTTS[lockID]) - lock = lk.locks[lockID] + lock = NewLock(cli, lockID, info.Task, info.DownSchema, info.DownTable, lockJoined[lockID], lockTTS[lockID]) } + // filter info which doesn't have SourceTable + // SourceTable will be changed after user update block-allow-list + // But old infos still remain in etcd. + // TODO: add a mechanism to remove all outdated infos in etcd. + if !lock.TableExist(info.Source, info.UpSchema, info.UpTable) { + delete(ifm[task][source][schema], table) + continue + } + lk.locks[lockID] = lock lock.tables[info.Source][info.UpSchema][info.UpTable] = schemacmp.Encode(info.TableInfoBefore) if columns, ok := colm[lockID]; ok { lock.columns = columns @@ -74,6 +82,12 @@ func (lk *LockKeeper) RebuildLocksAndTables( for source, sourceTable := range lockTable { for schema, schemaTable := range sourceTable { for table, tableinfo := range schemaTable { + if _, ok := lk.locks[lockID]; !ok { + continue + } + if !lk.locks[lockID].TableExist(source, schema, table) { + continue + } lk.locks[lockID].tables[source][schema][table] = tableinfo } } diff --git a/pkg/shardddl/optimism/lock.go b/pkg/shardddl/optimism/lock.go index 9c5682948e..683de1bc81 100644 --- a/pkg/shardddl/optimism/lock.go +++ b/pkg/shardddl/optimism/lock.go @@ -685,6 +685,20 @@ func (l *Lock) DeleteColumnsByOp(op Operation) error { return nil } +// TableExist check whether table exists. +func (l *Lock) TableExist(source, schema, table string) bool { + if _, ok := l.tables[source]; !ok { + return false + } + if _, ok := l.tables[source][schema]; !ok { + return false + } + if _, ok := l.tables[source][schema][table]; !ok { + return false + } + return true +} + // AddDifferentFieldLenColumns checks whether dm adds columns with different field lengths. func AddDifferentFieldLenColumns(lockID, ddl string, oldJoined, newJoined schemacmp.Table) (string, error) { col, err := GetColumnName(lockID, ddl, ast.AlterTableAddColumns) diff --git a/tests/shardddl1/run.sh b/tests/shardddl1/run.sh index bab3f984a1..6fd3bda88c 100644 --- a/tests/shardddl1/run.sh +++ b/tests/shardddl1/run.sh @@ -497,10 +497,88 @@ function DM_DropAddColumn() { done } +function DM_UpdateBARule_CASE() { + run_sql_source1 "insert into ${shardddl1}.${tb1} values(1);" + run_sql_source1 "insert into ${shardddl2}.${tb1} values(2);" + run_sql_source2 "insert into ${shardddl1}.${tb1} values(3);" + run_sql_source2 "insert into ${shardddl2}.${tb1} values(4);" + + run_sql_source1 "alter table ${shardddl1}.${tb1} add column new_col1 int" + run_sql_source1 "alter table ${shardddl2}.${tb1} add column new_col1 int" + run_sql_source2 "alter table ${shardddl1}.${tb1} add column new_col1 int" + run_sql_source2 "alter table ${shardddl2}.${tb1} add column new_col1 int" + + run_sql_source1 "insert into ${shardddl1}.${tb1} values(5,5);" + run_sql_source1 "insert into ${shardddl2}.${tb1} values(6,6);" + run_sql_source2 "insert into ${shardddl1}.${tb1} values(7,7);" + run_sql_source2 "insert into ${shardddl2}.${tb1} values(8,8);" + + # source1 db2.tb1 add column and then drop column + run_sql_source1 "alter table ${shardddl2}.${tb1} add column new_col2 int" + run_sql_source1 "insert into ${shardddl2}.${tb1} values(9,9,9);" + run_sql_source1 "alter table ${shardddl2}.${tb1} drop column new_col2" + run_sql_source1 "insert into ${shardddl2}.${tb1} values(10,10);" + + # source1 db1.tb1 add column + run_sql_source1 "alter table ${shardddl1}.${tb1} add column new_col3 int" + run_sql_source1 "insert into ${shardddl1}.${tb1} values(11,11,11);" + + # source2 db1.tb1 drop column + run_sql_source2 "alter table ${shardddl1}.${tb1} drop column new_col1" + run_sql_source2 "insert into ${shardddl1}.${tb1} values(12);" + + # source2 db2.tb1 do a unsupported DDL + run_sql_source2 "alter table ${shardddl2}.${tb1} rename column id to new_id;" + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "because schema conflict detected" 1 + + # user found error and then change block-allow-list, restart task + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "stop-task test" \ + "\"result\": true" 3 + + cp $cur/conf/double-source-optimistic.yaml $WORK_DIR/task.yaml + sed -i 's/do-dbs: \["shardddl1","shardddl2"\]/do-dbs: \["shardddl1"\]/g' $WORK_DIR/task.yaml + echo 'ignore-checking-items: ["schema_of_shard_tables"]' >>$WORK_DIR/task.yaml + + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-task $WORK_DIR/task.yaml" \ + "\"result\": true" 3 + + run_sql_source1 "insert into ${shardddl1}.${tb1} values(13,13,13);" + run_sql_source2 "insert into ${shardddl1}.${tb1} values(14);" + run_sql_tidb_with_retry "select count(1) from ${shardddl}.${tb};" "count(1): 14" + + restart_master + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "show-ddl-locks" \ + "\"ID\": \"test-\`shardddl\`.\`tb\`\"" 1 + + run_sql_source1 "alter table ${shardddl1}.${tb1} drop column new_col1" + run_sql_source2 "alter table ${shardddl1}.${tb1} add column new_col3 int" + run_sql_source1 "insert into ${shardddl1}.${tb1} values(15,15);" + run_sql_source2 "insert into ${shardddl1}.${tb1} values(16,16);" + run_sql_tidb_with_retry "select count(1) from ${shardddl}.${tb};" "count(1): 16" + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "\"result\": true" 3 + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "show-ddl-locks" \ + "no DDL lock exists" 1 +} + +function DM_UpdateBARule() { + run_case UpdateBARule "double-source-optimistic" "init_table 111 121 211 221" "clean_table" "optimistic" +} + function run() { init_cluster init_database + DM_UpdateBARule DM_DropAddColumn DM_RENAME_TABLE DM_RENAME_COLUMN_OPTIMISTIC