Skip to content

Commit

Permalink
schemastore: fix rename table bug and add more unit tests (#740)
Browse files Browse the repository at this point in the history
  • Loading branch information
lidezhu authored Dec 30, 2024
1 parent 3be4f05 commit e78f686
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 355 deletions.
2 changes: 1 addition & 1 deletion logservice/schemastore/persist_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -1371,7 +1371,7 @@ func updateRegisteredTableInfoStore(
case model.ActionRebaseAutoID:
// TODO: verify can be ignored
case model.ActionRenameTable:
// ignore
tryApplyDDLToStore()
case model.ActionSetDefaultValue:
tryApplyDDLToStore()
case model.ActionShardRowID,
Expand Down
368 changes: 94 additions & 274 deletions logservice/schemastore/persist_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1568,280 +1568,100 @@ func TestReadWriteMeta(t *testing.T) {
}
}

// func TestBuildVersionedTableInfoStore(t *testing.T) {
// dbPath := fmt.Sprintf("/tmp/testdb-%s", t.Name())
// err := os.RemoveAll(dbPath)
// require.Nil(t, err)

// gcTs := uint64(1000)
// schemaID := int64(50)
// tableID := int64(99)
// databaseInfo := make(map[int64]*model.DBInfo)
// databaseInfo[schemaID] = &model.DBInfo{
// ID: schemaID,
// Name: model.NewCIStr("test"),
// Tables: []*model.TableInfo{
// {
// ID: tableID,
// Name: model.NewCIStr("t1"),
// },
// },
// }
// pStorage := newPersistentStorageForTest(dbPath, gcTs, databaseInfo)

// require.Equal(t, 1, len(pStorage.databaseMap))
// require.Equal(t, "test", pStorage.databaseMap[schemaID].Name)

// {
// store := newEmptyVersionedTableInfoStore(tableID)
// pStorage.buildVersionedTableInfoStore(store)
// tableInfo, err := store.getTableInfo(gcTs)
// require.Nil(t, err)
// require.Equal(t, "t1", tableInfo.Name.O)
// require.Equal(t, tableID, tableInfo.ID)
// }

// // rename table
// renameVersion := uint64(1500)
// {
// job := &model.Job{
// Type: model.ActionRenameTable,
// SchemaID: schemaID,
// TableID: tableID,
// BinlogInfo: &model.HistoryInfo{
// SchemaVersion: 3000,
// TableInfo: &model.TableInfo{
// ID: tableID,
// Name: model.NewCIStr("t2"),
// },
// FinishedTS: renameVersion,
// },
// }
// err = pStorage.handleDDLJob(job)
// require.Nil(t, err)
// }

// // create another table
// tableID2 := tableID + 1
// createVersion := renameVersion + 200
// {
// job := &model.Job{
// Type: model.ActionCreateTable,
// SchemaID: schemaID,
// TableID: tableID2,
// BinlogInfo: &model.HistoryInfo{
// SchemaVersion: 3500,
// TableInfo: &model.TableInfo{
// ID: tableID2,
// Name: model.NewCIStr("t3"),
// },
// FinishedTS: createVersion,
// },
// }
// err = pStorage.handleDDLJob(job)
// require.Nil(t, err)
// }

// upperBound := UpperBoundMeta{
// FinishedDDLTs: 3000,
// SchemaVersion: 4000,
// ResolvedTs: 2000,
// }
// pStorage = loadPersistentStorageForTest(pStorage.db, gcTs, upperBound)
// {
// store := newEmptyVersionedTableInfoStore(tableID)
// pStorage.buildVersionedTableInfoStore(store)
// require.Equal(t, 2, len(store.infos))
// tableInfo, err := store.getTableInfo(gcTs)
// require.Nil(t, err)
// require.Equal(t, "t1", tableInfo.Name.O)
// require.Equal(t, tableID, tableInfo.ID)
// tableInfo2, err := store.getTableInfo(renameVersion)
// require.Nil(t, err)
// require.Equal(t, "t2", tableInfo2.Name.O)

// renameVersion2 := uint64(3000)
// store.applyDDL(&PersistedDDLEvent{
// Type: byte(model.ActionRenameTable),
// CurrentSchemaID: schemaID,
// CurrentTableID: tableID,
// SchemaVersion: 3000,
// TableInfo: &model.TableInfo{
// ID: tableID,
// Name: model.NewCIStr("t3"),
// },
// FinishedTs: renameVersion2,
// })
// tableInfo3, err := store.getTableInfo(renameVersion2)
// require.Nil(t, err)
// require.Equal(t, "t3", tableInfo3.Name.O)
// }

// {
// store := newEmptyVersionedTableInfoStore(tableID2)
// pStorage.buildVersionedTableInfoStore(store)
// require.Equal(t, 1, len(store.infos))
// tableInfo, err := store.getTableInfo(createVersion)
// require.Nil(t, err)
// require.Equal(t, "t3", tableInfo.Name.O)
// require.Equal(t, tableID2, tableInfo.ID)
// }

// // truncate table
// tableID3 := tableID2 + 1
// truncateVersion := createVersion + 200
// {
// job := &model.Job{
// Type: model.ActionTruncateTable,
// SchemaID: schemaID,
// TableID: tableID2,
// BinlogInfo: &model.HistoryInfo{
// SchemaVersion: 3600,
// TableInfo: &model.TableInfo{
// ID: tableID3,
// Name: model.NewCIStr("t4"),
// },
// FinishedTS: truncateVersion,
// },
// }
// err = pStorage.handleDDLJob(job)
// require.Nil(t, err)
// }

// {
// store := newEmptyVersionedTableInfoStore(tableID2)
// pStorage.buildVersionedTableInfoStore(store)
// require.Equal(t, 1, len(store.infos))
// require.Equal(t, truncateVersion, store.deleteVersion)
// }

// {
// store := newEmptyVersionedTableInfoStore(tableID3)
// pStorage.buildVersionedTableInfoStore(store)
// require.Equal(t, 1, len(store.infos))
// tableInfo, err := store.getTableInfo(truncateVersion)
// require.Nil(t, err)
// require.Equal(t, "t4", tableInfo.Name.O)
// require.Equal(t, tableID3, tableInfo.ID)
// }
// }

// func TestBuildVersionedTableInfoStoreWithPartitionTable(t *testing.T) {
// dbPath := fmt.Sprintf("/tmp/testdb-%s", t.Name())
// err := os.RemoveAll(dbPath)
// require.Nil(t, err)

// gcTs := uint64(1000)
// schemaID := int64(50)
// tableID := int64(99)
// databaseInfo := make(map[int64]*model.DBInfo)
// databaseInfo[schemaID] = &model.DBInfo{
// ID: schemaID,
// Name: model.NewCIStr("test"),
// }
// pStorage := newPersistentStorageForTest(dbPath, gcTs, databaseInfo)

// // create a partition table
// partitionID1 := tableID + 100
// partitionID2 := tableID + 200
// {
// job := &model.Job{
// Type: model.ActionCreateTable,
// SchemaID: schemaID,
// TableID: tableID,
// BinlogInfo: &model.HistoryInfo{
// SchemaVersion: 2000,
// TableInfo: &model.TableInfo{
// ID: tableID,
// Name: model.NewCIStr("t"),
// Partition: &model.PartitionInfo{
// Definitions: []model.PartitionDefinition{
// {
// ID: partitionID1,
// },
// {
// ID: partitionID2,
// },
// },
// },
// },
// FinishedTS: 1200,
// },
// }
// pStorage.handleDDLJob(job)
// }

// upperBound := UpperBoundMeta{
// FinishedDDLTs: 3000,
// SchemaVersion: 4000,
// ResolvedTs: 2000,
// }
// pStorage = loadPersistentStorageForTest(pStorage.db, gcTs, upperBound)
// {
// store := newEmptyVersionedTableInfoStore(partitionID1)
// pStorage.buildVersionedTableInfoStore(store)
// require.Equal(t, 1, len(store.infos))
// }
// {
// store := newEmptyVersionedTableInfoStore(partitionID2)
// pStorage.buildVersionedTableInfoStore(store)
// require.Equal(t, 1, len(store.infos))
// }

// // truncate the partition table
// tableID2 := tableID + 500
// partitionID3 := tableID2 + 100
// partitionID4 := tableID2 + 200
// {
// job := &model.Job{
// Type: model.ActionTruncateTable,
// SchemaID: schemaID,
// TableID: tableID,
// BinlogInfo: &model.HistoryInfo{
// SchemaVersion: 2100,
// TableInfo: &model.TableInfo{
// ID: tableID2,
// Name: model.NewCIStr("t"),
// Partition: &model.PartitionInfo{
// Definitions: []model.PartitionDefinition{
// {
// ID: partitionID3,
// },
// {
// ID: partitionID4,
// },
// },
// },
// },
// FinishedTS: 1300,
// },
// }
// pStorage.handleDDLJob(job)
// }

// {
// store := newEmptyVersionedTableInfoStore(partitionID1)
// pStorage.buildVersionedTableInfoStore(store)
// require.Equal(t, 1, len(store.infos))
// require.Equal(t, uint64(1300), store.deleteVersion)
// }
// {
// store := newEmptyVersionedTableInfoStore(partitionID2)
// pStorage.buildVersionedTableInfoStore(store)
// require.Equal(t, 1, len(store.infos))
// require.Equal(t, uint64(1300), store.deleteVersion)
// }
// {
// store := newEmptyVersionedTableInfoStore(partitionID3)
// pStorage.buildVersionedTableInfoStore(store)
// require.Equal(t, 1, len(store.infos))
// }
// {
// store := newEmptyVersionedTableInfoStore(partitionID4)
// pStorage.buildVersionedTableInfoStore(store)
// require.Equal(t, 1, len(store.infos))
// }
// }
func TestRegisterTable(t *testing.T) {
type QueryTableInfoTestCase struct {
tableID int64
snapTs uint64
name string
}
var testCases = []struct {
initailDBInfos []mockDBInfo // tables in initailDBInfos will be registered before apply ddl
ddlJobs []*model.Job
queryCases []QueryTableInfoTestCase
}{
{
initailDBInfos: []mockDBInfo{
{
dbInfo: &model.DBInfo{
ID: 50,
Name: pmodel.NewCIStr("test"),
},
tables: []*model.TableInfo{
{
ID: 99,
Name: pmodel.NewCIStr("t1"),
},
},
},
},
ddlJobs: func() []*model.Job {
return []*model.Job{
buildRenameTableJobForTest(50, 99, "t2", 1000), // rename table 99 to t2
buildCreateTableJobForTest(50, 100, "t3", 1010), // create table 100
buildTruncateTableJobForTest(50, 100, 101, "t3", 1020), // truncate table 100 to 101
buildCreatePartitionTableJobForTest(50, 102, "t4", []int64{201, 202, 203}, 1030), // create partition table 102
}
}(),
queryCases: []QueryTableInfoTestCase{
{
tableID: 99,
snapTs: 990,
name: "t1",
},
{
tableID: 99,
snapTs: 1000,
name: "t2",
},
{
tableID: 100,
snapTs: 1010,
name: "t3",
},
{
tableID: 101,
snapTs: 1020,
name: "t3",
},
{
tableID: 201,
snapTs: 1030,
name: "t4",
},
{
tableID: 202,
snapTs: 1030,
name: "t4",
},
{
tableID: 203,
snapTs: 1030,
name: "t4",
},
},
},
}
for _, tt := range testCases {
dbPath := fmt.Sprintf("/tmp/testdb-%s", t.Name())
pStorage := newPersistentStorageForTest(dbPath, tt.initailDBInfos)
for _, db := range tt.initailDBInfos {
for _, table := range db.tables {
pStorage.registerTable(table.ID, 0) // second arguments is not important
}
}
for _, job := range tt.ddlJobs {
err := pStorage.handleDDLJob(job)
require.Nil(t, err)
}
for _, testCase := range tt.queryCases {
pStorage.registerTable(testCase.tableID, 0) // second arguments is not important
tableInfo, err := pStorage.getTableInfo(testCase.tableID, testCase.snapTs)
require.Nil(t, err)
require.Equal(t, testCase.name, tableInfo.TableName.Table)
}
pStorage.close()
}
}

func TestGCPersistStorage(t *testing.T) {
dbPath := fmt.Sprintf("/tmp/testdb-%s", t.Name())
Expand Down
Loading

0 comments on commit e78f686

Please sign in to comment.