diff --git a/logservice/schemastore/persist_storage.go b/logservice/schemastore/persist_storage.go index 332603c3..fc60fa2d 100644 --- a/logservice/schemastore/persist_storage.go +++ b/logservice/schemastore/persist_storage.go @@ -1371,7 +1371,7 @@ func updateRegisteredTableInfoStore( case model.ActionRebaseAutoID: // TODO: verify can be ignored case model.ActionRenameTable: - // ignore + tryApplyDDLToStore() case model.ActionSetDefaultValue: tryApplyDDLToStore() case model.ActionShardRowID, diff --git a/logservice/schemastore/persist_storage_test.go b/logservice/schemastore/persist_storage_test.go index 5dc6a358..b6f2724d 100644 --- a/logservice/schemastore/persist_storage_test.go +++ b/logservice/schemastore/persist_storage_test.go @@ -1568,280 +1568,100 @@ func TestReadWriteMeta(t *testing.T) { } } -// func TestBuildVersionedTableInfoStore(t *testing.T) { -// dbPath := fmt.Sprintf("/tmp/testdb-%s", t.Name()) -// err := os.RemoveAll(dbPath) -// require.Nil(t, err) - -// gcTs := uint64(1000) -// schemaID := int64(50) -// tableID := int64(99) -// databaseInfo := make(map[int64]*model.DBInfo) -// databaseInfo[schemaID] = &model.DBInfo{ -// ID: schemaID, -// Name: model.NewCIStr("test"), -// Tables: []*model.TableInfo{ -// { -// ID: tableID, -// Name: model.NewCIStr("t1"), -// }, -// }, -// } -// pStorage := newPersistentStorageForTest(dbPath, gcTs, databaseInfo) - -// require.Equal(t, 1, len(pStorage.databaseMap)) -// require.Equal(t, "test", pStorage.databaseMap[schemaID].Name) - -// { -// store := newEmptyVersionedTableInfoStore(tableID) -// pStorage.buildVersionedTableInfoStore(store) -// tableInfo, err := store.getTableInfo(gcTs) -// require.Nil(t, err) -// require.Equal(t, "t1", tableInfo.Name.O) -// require.Equal(t, tableID, tableInfo.ID) -// } - -// // rename table -// renameVersion := uint64(1500) -// { -// job := &model.Job{ -// Type: model.ActionRenameTable, -// SchemaID: schemaID, -// TableID: tableID, -// BinlogInfo: &model.HistoryInfo{ -// SchemaVersion: 3000, -// TableInfo: &model.TableInfo{ -// ID: tableID, -// Name: model.NewCIStr("t2"), -// }, -// FinishedTS: renameVersion, -// }, -// } -// err = pStorage.handleDDLJob(job) -// require.Nil(t, err) -// } - -// // create another table -// tableID2 := tableID + 1 -// createVersion := renameVersion + 200 -// { -// job := &model.Job{ -// Type: model.ActionCreateTable, -// SchemaID: schemaID, -// TableID: tableID2, -// BinlogInfo: &model.HistoryInfo{ -// SchemaVersion: 3500, -// TableInfo: &model.TableInfo{ -// ID: tableID2, -// Name: model.NewCIStr("t3"), -// }, -// FinishedTS: createVersion, -// }, -// } -// err = pStorage.handleDDLJob(job) -// require.Nil(t, err) -// } - -// upperBound := UpperBoundMeta{ -// FinishedDDLTs: 3000, -// SchemaVersion: 4000, -// ResolvedTs: 2000, -// } -// pStorage = loadPersistentStorageForTest(pStorage.db, gcTs, upperBound) -// { -// store := newEmptyVersionedTableInfoStore(tableID) -// pStorage.buildVersionedTableInfoStore(store) -// require.Equal(t, 2, len(store.infos)) -// tableInfo, err := store.getTableInfo(gcTs) -// require.Nil(t, err) -// require.Equal(t, "t1", tableInfo.Name.O) -// require.Equal(t, tableID, tableInfo.ID) -// tableInfo2, err := store.getTableInfo(renameVersion) -// require.Nil(t, err) -// require.Equal(t, "t2", tableInfo2.Name.O) - -// renameVersion2 := uint64(3000) -// store.applyDDL(&PersistedDDLEvent{ -// Type: byte(model.ActionRenameTable), -// CurrentSchemaID: schemaID, -// CurrentTableID: tableID, -// SchemaVersion: 3000, -// TableInfo: &model.TableInfo{ -// ID: tableID, -// Name: model.NewCIStr("t3"), -// }, -// FinishedTs: renameVersion2, -// }) -// tableInfo3, err := store.getTableInfo(renameVersion2) -// require.Nil(t, err) -// require.Equal(t, "t3", tableInfo3.Name.O) -// } - -// { -// store := newEmptyVersionedTableInfoStore(tableID2) -// pStorage.buildVersionedTableInfoStore(store) -// require.Equal(t, 1, len(store.infos)) -// tableInfo, err := store.getTableInfo(createVersion) -// require.Nil(t, err) -// require.Equal(t, "t3", tableInfo.Name.O) -// require.Equal(t, tableID2, tableInfo.ID) -// } - -// // truncate table -// tableID3 := tableID2 + 1 -// truncateVersion := createVersion + 200 -// { -// job := &model.Job{ -// Type: model.ActionTruncateTable, -// SchemaID: schemaID, -// TableID: tableID2, -// BinlogInfo: &model.HistoryInfo{ -// SchemaVersion: 3600, -// TableInfo: &model.TableInfo{ -// ID: tableID3, -// Name: model.NewCIStr("t4"), -// }, -// FinishedTS: truncateVersion, -// }, -// } -// err = pStorage.handleDDLJob(job) -// require.Nil(t, err) -// } - -// { -// store := newEmptyVersionedTableInfoStore(tableID2) -// pStorage.buildVersionedTableInfoStore(store) -// require.Equal(t, 1, len(store.infos)) -// require.Equal(t, truncateVersion, store.deleteVersion) -// } - -// { -// store := newEmptyVersionedTableInfoStore(tableID3) -// pStorage.buildVersionedTableInfoStore(store) -// require.Equal(t, 1, len(store.infos)) -// tableInfo, err := store.getTableInfo(truncateVersion) -// require.Nil(t, err) -// require.Equal(t, "t4", tableInfo.Name.O) -// require.Equal(t, tableID3, tableInfo.ID) -// } -// } - -// func TestBuildVersionedTableInfoStoreWithPartitionTable(t *testing.T) { -// dbPath := fmt.Sprintf("/tmp/testdb-%s", t.Name()) -// err := os.RemoveAll(dbPath) -// require.Nil(t, err) - -// gcTs := uint64(1000) -// schemaID := int64(50) -// tableID := int64(99) -// databaseInfo := make(map[int64]*model.DBInfo) -// databaseInfo[schemaID] = &model.DBInfo{ -// ID: schemaID, -// Name: model.NewCIStr("test"), -// } -// pStorage := newPersistentStorageForTest(dbPath, gcTs, databaseInfo) - -// // create a partition table -// partitionID1 := tableID + 100 -// partitionID2 := tableID + 200 -// { -// job := &model.Job{ -// Type: model.ActionCreateTable, -// SchemaID: schemaID, -// TableID: tableID, -// BinlogInfo: &model.HistoryInfo{ -// SchemaVersion: 2000, -// TableInfo: &model.TableInfo{ -// ID: tableID, -// Name: model.NewCIStr("t"), -// Partition: &model.PartitionInfo{ -// Definitions: []model.PartitionDefinition{ -// { -// ID: partitionID1, -// }, -// { -// ID: partitionID2, -// }, -// }, -// }, -// }, -// FinishedTS: 1200, -// }, -// } -// pStorage.handleDDLJob(job) -// } - -// upperBound := UpperBoundMeta{ -// FinishedDDLTs: 3000, -// SchemaVersion: 4000, -// ResolvedTs: 2000, -// } -// pStorage = loadPersistentStorageForTest(pStorage.db, gcTs, upperBound) -// { -// store := newEmptyVersionedTableInfoStore(partitionID1) -// pStorage.buildVersionedTableInfoStore(store) -// require.Equal(t, 1, len(store.infos)) -// } -// { -// store := newEmptyVersionedTableInfoStore(partitionID2) -// pStorage.buildVersionedTableInfoStore(store) -// require.Equal(t, 1, len(store.infos)) -// } - -// // truncate the partition table -// tableID2 := tableID + 500 -// partitionID3 := tableID2 + 100 -// partitionID4 := tableID2 + 200 -// { -// job := &model.Job{ -// Type: model.ActionTruncateTable, -// SchemaID: schemaID, -// TableID: tableID, -// BinlogInfo: &model.HistoryInfo{ -// SchemaVersion: 2100, -// TableInfo: &model.TableInfo{ -// ID: tableID2, -// Name: model.NewCIStr("t"), -// Partition: &model.PartitionInfo{ -// Definitions: []model.PartitionDefinition{ -// { -// ID: partitionID3, -// }, -// { -// ID: partitionID4, -// }, -// }, -// }, -// }, -// FinishedTS: 1300, -// }, -// } -// pStorage.handleDDLJob(job) -// } - -// { -// store := newEmptyVersionedTableInfoStore(partitionID1) -// pStorage.buildVersionedTableInfoStore(store) -// require.Equal(t, 1, len(store.infos)) -// require.Equal(t, uint64(1300), store.deleteVersion) -// } -// { -// store := newEmptyVersionedTableInfoStore(partitionID2) -// pStorage.buildVersionedTableInfoStore(store) -// require.Equal(t, 1, len(store.infos)) -// require.Equal(t, uint64(1300), store.deleteVersion) -// } -// { -// store := newEmptyVersionedTableInfoStore(partitionID3) -// pStorage.buildVersionedTableInfoStore(store) -// require.Equal(t, 1, len(store.infos)) -// } -// { -// store := newEmptyVersionedTableInfoStore(partitionID4) -// pStorage.buildVersionedTableInfoStore(store) -// require.Equal(t, 1, len(store.infos)) -// } -// } +func TestRegisterTable(t *testing.T) { + type QueryTableInfoTestCase struct { + tableID int64 + snapTs uint64 + name string + } + var testCases = []struct { + initailDBInfos []mockDBInfo // tables in initailDBInfos will be registered before apply ddl + ddlJobs []*model.Job + queryCases []QueryTableInfoTestCase + }{ + { + initailDBInfos: []mockDBInfo{ + { + dbInfo: &model.DBInfo{ + ID: 50, + Name: pmodel.NewCIStr("test"), + }, + tables: []*model.TableInfo{ + { + ID: 99, + Name: pmodel.NewCIStr("t1"), + }, + }, + }, + }, + ddlJobs: func() []*model.Job { + return []*model.Job{ + buildRenameTableJobForTest(50, 99, "t2", 1000), // rename table 99 to t2 + buildCreateTableJobForTest(50, 100, "t3", 1010), // create table 100 + buildTruncateTableJobForTest(50, 100, 101, "t3", 1020), // truncate table 100 to 101 + buildCreatePartitionTableJobForTest(50, 102, "t4", []int64{201, 202, 203}, 1030), // create partition table 102 + } + }(), + queryCases: []QueryTableInfoTestCase{ + { + tableID: 99, + snapTs: 990, + name: "t1", + }, + { + tableID: 99, + snapTs: 1000, + name: "t2", + }, + { + tableID: 100, + snapTs: 1010, + name: "t3", + }, + { + tableID: 101, + snapTs: 1020, + name: "t3", + }, + { + tableID: 201, + snapTs: 1030, + name: "t4", + }, + { + tableID: 202, + snapTs: 1030, + name: "t4", + }, + { + tableID: 203, + snapTs: 1030, + name: "t4", + }, + }, + }, + } + for _, tt := range testCases { + dbPath := fmt.Sprintf("/tmp/testdb-%s", t.Name()) + pStorage := newPersistentStorageForTest(dbPath, tt.initailDBInfos) + for _, db := range tt.initailDBInfos { + for _, table := range db.tables { + pStorage.registerTable(table.ID, 0) // second arguments is not important + } + } + for _, job := range tt.ddlJobs { + err := pStorage.handleDDLJob(job) + require.Nil(t, err) + } + for _, testCase := range tt.queryCases { + pStorage.registerTable(testCase.tableID, 0) // second arguments is not important + tableInfo, err := pStorage.getTableInfo(testCase.tableID, testCase.snapTs) + require.Nil(t, err) + require.Equal(t, testCase.name, tableInfo.TableName.Table) + } + pStorage.close() + } +} func TestGCPersistStorage(t *testing.T) { dbPath := fmt.Sprintf("/tmp/testdb-%s", t.Name()) diff --git a/logservice/schemastore/schema_store_test.go b/logservice/schemastore/schema_store_test.go deleted file mode 100644 index 926b0ec3..00000000 --- a/logservice/schemastore/schema_store_test.go +++ /dev/null @@ -1,80 +0,0 @@ -// Copyright 2024 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package schemastore - -// import ( -// "context" -// "fmt" -// "testing" -// "time" - -// "github.com/pingcap/log" -// "github.com/pingcap/ticdc/logservice/upstream" -// "github.com/pingcap/tiflow/pkg/security" -// "github.com/stretchr/testify/require" -// "github.com/tikv/client-go/v2/oracle" -// pd "github.com/tikv/pd/client" -// "go.uber.org/zap" -// "google.golang.org/grpc" -// "google.golang.org/grpc/backoff" -// "google.golang.org/grpc/credentials/insecure" -// ) - -// // TODO: this is not a really unit test, more like a usage example. -// // 1. deploy a tidb cluster -// // 2. set global tidb_gc_life_time="100h"; -// // 3. begin; select @@tidb_current_ts; and set it to gcTs -// // 4. create some tables; -// // 3. begin; select @@tidb_current_ts; and set it to snapTs; -// func TestBasicDDLJob(t *testing.T) { -// ctx := context.Background() -// upstreamManager := upstream.NewManager(ctx) -// pdEndpoints := []string{"http://127.0.0.1:2379"} -// pdClient, err := pd.NewClientWithContext( -// ctx, pdEndpoints, pd.SecurityOption{}, -// // the default `timeout` is 3s, maybe too small if the pd is busy, -// // set to 10s to avoid frequent timeout. -// pd.WithCustomTimeoutOption(10*time.Second), -// pd.WithGRPCDialOptions( -// grpc.WithConnectParams(grpc.ConnectParams{ -// Backoff: backoff.Config{ -// BaseDelay: time.Second, -// Multiplier: 1.1, -// Jitter: 0.1, -// MaxDelay: 3 * time.Second, -// }, -// MinConnectTimeout: 3 * time.Second, -// }), -// )) -// require.Nil(t, err) - -// etcdCli, err := upstream.CreateRawEtcdClient(&security.Credential{}, grpc.WithTransportCredentials(insecure.NewCredentials()), pdEndpoints...) -// require.Nil(t, err) - -// upstream, err := upstreamManager.AddDefaultUpstream(pdEndpoints, &security.Credential{}, pdClient, etcdCli) -// require.Nil(t, err) - -// schemaStore := New(ctx, "/tmp/cdc", upstream.PDClient, upstream.RegionCache, upstream.PDClock, upstream.KVStorage) -// ctx, cancel := context.WithCancel(context.Background()) -// defer cancel() -// go schemaStore.Run(ctx) -// time.Sleep(3 * time.Second) -// phy, logic, err := upstream.PDClient.GetTS(ctx) -// require.Nil(t, err) -// snapTs := oracle.ComposeTS(phy, logic) -// tables, err := schemaStore.GetAllPhysicalTables(uint64(snapTs), nil) -// require.Nil(t, err) -// log.Info("schema store get all tables", zap.Any("tables", tables)) -// fmt.Printf("all tables") -// }