Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
cherrypick #1971 and #1990 to release-2.0.6 (#1991)
Browse files Browse the repository at this point in the history
  • Loading branch information
GMHDBJD authored Aug 13, 2021
1 parent 92baadb commit d2051ed
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 6 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
# DM Changelog

All notable changes to this project will be documented in this file.
## [2.0.6] 2021-08-13

### Bug fixes

- Fix the issue that the metadata inconsistency between ddl infos and upstream tables in the optimistic sharding DDL mode causes DM-master panic [#1971](https://github.com/pingcap/dm/pull/1971)

### Known issues

[GitHub issues](https://github.com/pingcap/dm/issues?q=is%3Aissue+label%3Aaffected-v2.0.6)

## [2.0.5] 2021-07-30

### Improvements
Expand Down
26 changes: 20 additions & 6 deletions pkg/shardddl/optimism/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,23 @@ func (lk *LockKeeper) RebuildLocksAndTables(
lock *Lock
ok bool
)
for _, taskInfos := range ifm {
for _, sourceInfos := range taskInfos {
for _, schemaInfos := range sourceInfos {
for _, info := range schemaInfos {
for task, taskInfos := range ifm {
for source, sourceInfos := range taskInfos {
for schema, schemaInfos := range sourceInfos {
for table, info := range schemaInfos {
lockID := utils.GenDDLLockID(info.Task, info.DownSchema, info.DownTable)
if lock, ok = lk.locks[lockID]; !ok {
lk.locks[lockID] = NewLock(cli, lockID, info.Task, info.DownSchema, info.DownTable, lockJoined[lockID], lockTTS[lockID])
lock = lk.locks[lockID]
lock = NewLock(cli, lockID, info.Task, info.DownSchema, info.DownTable, lockJoined[lockID], lockTTS[lockID])
}
// filter info which doesn't have SourceTable
// SourceTable will be changed after user update block-allow-list
// But old infos still remain in etcd.
// TODO: add a mechanism to remove all outdated infos in etcd.
if !lock.TableExist(info.Source, info.UpSchema, info.UpTable) {
delete(ifm[task][source][schema], table)
continue
}
lk.locks[lockID] = lock
lock.tables[info.Source][info.UpSchema][info.UpTable] = schemacmp.Encode(info.TableInfoBefore)
if columns, ok := colm[lockID]; ok {
lock.columns = columns
Expand All @@ -74,6 +82,12 @@ func (lk *LockKeeper) RebuildLocksAndTables(
for source, sourceTable := range lockTable {
for schema, schemaTable := range sourceTable {
for table, tableinfo := range schemaTable {
if _, ok := lk.locks[lockID]; !ok {
continue
}
if !lk.locks[lockID].TableExist(source, schema, table) {
continue
}
lk.locks[lockID].tables[source][schema][table] = tableinfo
}
}
Expand Down
14 changes: 14 additions & 0 deletions pkg/shardddl/optimism/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,20 @@ func (l *Lock) DeleteColumnsByOp(op Operation) error {
return nil
}

// TableExist check whether table exists.
func (l *Lock) TableExist(source, schema, table string) bool {
if _, ok := l.tables[source]; !ok {
return false
}
if _, ok := l.tables[source][schema]; !ok {
return false
}
if _, ok := l.tables[source][schema][table]; !ok {
return false
}
return true
}

// AddDifferentFieldLenColumns checks whether dm adds columns with different field lengths.
func AddDifferentFieldLenColumns(lockID, ddl string, oldJoined, newJoined schemacmp.Table) (string, error) {
col, err := GetColumnName(lockID, ddl, ast.AlterTableAddColumns)
Expand Down
78 changes: 78 additions & 0 deletions tests/shardddl1/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -497,10 +497,88 @@ function DM_DropAddColumn() {
done
}

function DM_UpdateBARule_CASE() {
run_sql_source1 "insert into ${shardddl1}.${tb1} values(1);"
run_sql_source1 "insert into ${shardddl2}.${tb1} values(2);"
run_sql_source2 "insert into ${shardddl1}.${tb1} values(3);"
run_sql_source2 "insert into ${shardddl2}.${tb1} values(4);"

run_sql_source1 "alter table ${shardddl1}.${tb1} add column new_col1 int"
run_sql_source1 "alter table ${shardddl2}.${tb1} add column new_col1 int"
run_sql_source2 "alter table ${shardddl1}.${tb1} add column new_col1 int"
run_sql_source2 "alter table ${shardddl2}.${tb1} add column new_col1 int"

run_sql_source1 "insert into ${shardddl1}.${tb1} values(5,5);"
run_sql_source1 "insert into ${shardddl2}.${tb1} values(6,6);"
run_sql_source2 "insert into ${shardddl1}.${tb1} values(7,7);"
run_sql_source2 "insert into ${shardddl2}.${tb1} values(8,8);"

# source1 db2.tb1 add column and then drop column
run_sql_source1 "alter table ${shardddl2}.${tb1} add column new_col2 int"
run_sql_source1 "insert into ${shardddl2}.${tb1} values(9,9,9);"
run_sql_source1 "alter table ${shardddl2}.${tb1} drop column new_col2"
run_sql_source1 "insert into ${shardddl2}.${tb1} values(10,10);"

# source1 db1.tb1 add column
run_sql_source1 "alter table ${shardddl1}.${tb1} add column new_col3 int"
run_sql_source1 "insert into ${shardddl1}.${tb1} values(11,11,11);"

# source2 db1.tb1 drop column
run_sql_source2 "alter table ${shardddl1}.${tb1} drop column new_col1"
run_sql_source2 "insert into ${shardddl1}.${tb1} values(12);"

# source2 db2.tb1 do a unsupported DDL
run_sql_source2 "alter table ${shardddl2}.${tb1} rename column id to new_id;"
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"because schema conflict detected" 1

# user found error and then change block-allow-list, restart task
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"stop-task test" \
"\"result\": true" 3

cp $cur/conf/double-source-optimistic.yaml $WORK_DIR/task.yaml
sed -i 's/do-dbs: \["shardddl1","shardddl2"\]/do-dbs: \["shardddl1"\]/g' $WORK_DIR/task.yaml
echo 'ignore-checking-items: ["schema_of_shard_tables"]' >>$WORK_DIR/task.yaml

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"start-task $WORK_DIR/task.yaml" \
"\"result\": true" 3

run_sql_source1 "insert into ${shardddl1}.${tb1} values(13,13,13);"
run_sql_source2 "insert into ${shardddl1}.${tb1} values(14);"
run_sql_tidb_with_retry "select count(1) from ${shardddl}.${tb};" "count(1): 14"

restart_master

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"show-ddl-locks" \
"\"ID\": \"test-\`shardddl\`.\`tb\`\"" 1

run_sql_source1 "alter table ${shardddl1}.${tb1} drop column new_col1"
run_sql_source2 "alter table ${shardddl1}.${tb1} add column new_col3 int"
run_sql_source1 "insert into ${shardddl1}.${tb1} values(15,15);"
run_sql_source2 "insert into ${shardddl1}.${tb1} values(16,16);"
run_sql_tidb_with_retry "select count(1) from ${shardddl}.${tb};" "count(1): 16"

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"\"result\": true" 3
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"show-ddl-locks" \
"no DDL lock exists" 1
}

function DM_UpdateBARule() {
run_case UpdateBARule "double-source-optimistic" "init_table 111 121 211 221" "clean_table" "optimistic"
}

function run() {
init_cluster
init_database

DM_UpdateBARule
DM_DropAddColumn
DM_RENAME_TABLE
DM_RENAME_COLUMN_OPTIMISTIC
Expand Down

0 comments on commit d2051ed

Please sign in to comment.