Skip to content

Commit

Permalink
schemastore: fix rename table query (#769)
Browse files Browse the repository at this point in the history
* add rename test

* fix rename table query
  • Loading branch information
lidezhu authored Jan 2, 2025
1 parent ce2e018 commit 25ccc62
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 8 deletions.
16 changes: 16 additions & 0 deletions logservice/schemastore/persist_storage_ddl_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package schemastore

import (
"fmt"
"strings"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -563,6 +564,20 @@ func buildPersistedDDLEventForRenameTable(args buildPersistedDDLEventFuncArgs) P
event.CurrentSchemaName = getSchemaName(args.databaseMap, event.CurrentSchemaID)
// get the table's current table name from the ddl job
event.CurrentTableName = event.TableInfo.Name.O
// Note: event.PrevSchemaID may not be accurate for rename table in some case.
// after pr: https://github.com/pingcap/tidb/pull/43341,
// assume there is a table `test.t` and a ddl: `rename table t to test2.t;`, and its commit ts is `100`.
// if you get a ddl snapshot at ts `99`, table `t` is already in `test2`.
// so event.PrevSchemaName will also be `test2`.
// And because SchemaStore is the source of truth of cdc,
// we can use event.PrevSchemaID(even it is wrong) to update the internal state of the cdc.
// TODO: not sure whether kafka sink will use event.PrevSchemaName and event.PrevTableName
// But event.Query will be emit to downstream(out of cdc), we must make it correct.
if args.job.Query != "" {
event.Query = fmt.Sprintf("RENAME TABLE `%s`.`%s` TO `%s`.`%s`",
args.job.InvolvingSchemaInfo[0].Database, args.job.InvolvingSchemaInfo[0].Table,
event.CurrentSchemaName, event.CurrentTableName)
}
return event
}

Expand Down Expand Up @@ -1311,6 +1326,7 @@ func buildDDLEventForRenameTable(rawEvent *PersistedDDLEvent, tableFilter filter
}
}
} else if !ignoreCurrentTable {
// TODO: this should report an error as old cdc behaviour
ddlEvent.BlockedTables = &commonEvent.InfluencedTables{
InfluenceType: commonEvent.InfluenceTypeNormal,
TableIDs: []int64{heartbeatpb.DDLSpan.TableID},
Expand Down
20 changes: 14 additions & 6 deletions logservice/schemastore/persist_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,8 +700,11 @@ func TestApplyDDLJobs(t *testing.T) {
},
func() []*model.Job {
return []*model.Job{
buildCreateTableJobForTest(100, 300, "t1", 1010), // create table 300
buildRenameTableJobForTest(105, 300, "t2", 1020), // rename table 300 to schema 105
buildCreateTableJobForTest(100, 300, "t1", 1010), // create table 300
buildRenameTableJobForTest(105, 300, "t2", 1020, "rename table t1 to test2.t2", "test", "t1"), // rename table 300 to schema 105
// rename table 300 to schema 105 with the same name again
// check comments in buildPersistedDDLEventForRenameTable to see why this would happen
buildRenameTableJobForTest(105, 300, "t2", 1030, "", "", ""),
}
}(),
map[int64]*BasicTableInfo{
Expand All @@ -724,9 +727,9 @@ func TestApplyDDLJobs(t *testing.T) {
},
},
map[int64][]uint64{
300: {1010, 1020},
300: {1010, 1020, 1030},
},
[]uint64{1010, 1020},
[]uint64{1010, 1020, 1030},
nil,
[]FetchTableDDLEventsTestCase{
{
Expand All @@ -736,6 +739,7 @@ func TestApplyDDLJobs(t *testing.T) {
result: []commonEvent.DDLEvent{
{
Type: byte(model.ActionRenameTable),
Query: "RENAME TABLE `test`.`t1` TO `test2`.`t2`",
FinishedTs: 1020,
BlockedTables: &commonEvent.InfluencedTables{
InfluenceType: commonEvent.InfluenceTypeNormal,
Expand Down Expand Up @@ -800,7 +804,7 @@ func TestApplyDDLJobs(t *testing.T) {
{
tableFilter: buildTableFilterByNameForTest("test2", "*"),
startTs: 1010,
limit: 10,
limit: 1,
result: []commonEvent.DDLEvent{
{
Type: byte(model.ActionRenameTable),
Expand Down Expand Up @@ -1457,6 +1461,10 @@ func TestApplyDDLJobs(t *testing.T) {
if expectedDDLEvent.Type != actualDDLEvent.Type || expectedDDLEvent.FinishedTs != actualDDLEvent.FinishedTs {
return false
}
// check query
if expectedDDLEvent.Query != "" && expectedDDLEvent.Query != actualDDLEvent.Query {
return false
}
// check BlockedTables
if expectedDDLEvent.BlockedTables == nil && actualDDLEvent.BlockedTables != nil {
return false
Expand Down Expand Up @@ -1670,7 +1678,7 @@ func TestRegisterTable(t *testing.T) {
},
ddlJobs: func() []*model.Job {
return []*model.Job{
buildRenameTableJobForTest(50, 99, "t2", 1000), // rename table 99 to t2
buildRenameTableJobForTest(50, 99, "t2", 1000, "", "", ""), // rename table 99 to t2
buildCreateTableJobForTest(50, 100, "t3", 1010), // create table 100
buildTruncateTableJobForTest(50, 100, 101, "t3", 1020), // truncate table 100 to 101
buildCreatePartitionTableJobForTest(50, 102, "t4", []int64{201, 202, 203}, 1030), // create partition table 102
Expand Down
12 changes: 10 additions & 2 deletions logservice/schemastore/persist_storage_test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,10 @@ func formatDDLEventsForTest(events []commonEvent.DDLEvent) string {
if event.NeedDroppedTables != nil {
needDroppedTableIDs = fmt.Sprintf("type: %v, schemaID: %d, tableIDs: %v", event.NeedDroppedTables.InfluenceType, event.NeedDroppedTables.SchemaID, event.NeedDroppedTables.TableIDs)
}
res = append(res, fmt.Sprintf("type: %s, finishedTs: %d, blocked tables: %s, updated schemas %v, need dropped tables: %s, need added tables: %v, table name change %v",
res = append(res, fmt.Sprintf("type: %s, finishedTs: %d, query %s, blocked tables: %s, updated schemas %v, need dropped tables: %s, need added tables: %v, table name change %v",
model.ActionType(event.Type),
event.FinishedTs,
event.Query,
blockedTableIDs,
event.UpdatedSchemas,
needDroppedTableIDs,
Expand Down Expand Up @@ -245,18 +246,25 @@ func buildCreatePartitionTablesJobForTest(schemaID int64, tableIDs []int64, tabl
}
}

func buildRenameTableJobForTest(schemaID, tableID int64, tableName string, finishedTs uint64) *model.Job {
func buildRenameTableJobForTest(schemaID, tableID int64, tableName string, finishedTs uint64, query, prevSchemaName, prevTableName string) *model.Job {
return &model.Job{
Type: model.ActionRenameTable,
SchemaID: schemaID,
TableID: tableID,
Query: query,
BinlogInfo: &model.HistoryInfo{
TableInfo: &model.TableInfo{
ID: tableID,
Name: pmodel.NewCIStr(tableName),
},
FinishedTS: finishedTs,
},
InvolvingSchemaInfo: []model.InvolvingSchemaInfo{
{
Database: prevSchemaName,
Table: prevTableName,
},
},
}
}

Expand Down

0 comments on commit 25ccc62

Please sign in to comment.