Skip to content

Commit

Permalink
remove filter and use tracker
Browse files Browse the repository at this point in the history
Signed-off-by: Wenqi Mou <[email protected]>
  • Loading branch information
Tristan1900 committed Jan 3, 2025
1 parent db655d2 commit ee5d347
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 47 deletions.
1 change: 0 additions & 1 deletion br/pkg/restore/log_client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ go_library(
"//pkg/util/codec",
"//pkg/util/redact",
"//pkg/util/sqlexec",
"//pkg/util/table-filter",
"@com_github_fatih_color//:color",
"@com_github_gogo_protobuf//proto",
"@com_github_opentracing_opentracing_go//:opentracing-go",
Expand Down
30 changes: 13 additions & 17 deletions br/pkg/restore/log_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ import (
"github.com/pingcap/tidb/pkg/meta/model"
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/sqlexec"
filter "github.com/pingcap/tidb/pkg/util/table-filter"
"github.com/tikv/client-go/v2/config"
kvutil "github.com/tikv/client-go/v2/util"
pd "github.com/tikv/pd/client"
Expand Down Expand Up @@ -856,10 +855,12 @@ func (rc *LogClient) loadSchemasMap(
func readFilteredFullBackupTables(
ctx context.Context,
s storage.ExternalStorage,
tableFilter filter.Filter,
piTRTableFilter *utils.PiTRTableTracker,
cipherInfo *backuppb.CipherInfo,
) (map[int64]*metautil.Table, error) {
if piTRTableFilter == nil {
return nil, errors.Errorf("missing pitr table tracker information")
}
metaData, err := s.ReadFile(ctx, metautil.MetaFile)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -885,11 +886,7 @@ func readFilteredFullBackupTables(

tables := make(map[int64]*metautil.Table)
for _, db := range databases {
dbName := db.Info.Name.O
if name, ok := utils.StripTempTableNamePrefixIfNeeded(db.Info.Name.O); utils.IsSysDB(name) && ok {
dbName = name
}
if !tableFilter.MatchSchema(dbName) && !(piTRTableFilter != nil && piTRTableFilter.ContainsDB(db.Info.ID)) {
if !piTRTableFilter.ContainsDB(db.Info.ID) {
continue
}

Expand All @@ -901,8 +898,7 @@ func readFilteredFullBackupTables(
tableAdded = true
continue
}
if !tableFilter.MatchTable(dbName, table.Info.Name.O) &&
!(piTRTableFilter != nil && piTRTableFilter.ContainsTable(db.Info.ID, table.Info.ID)) {
if !piTRTableFilter.ContainsTable(db.Info.ID, table.Info.ID) {
continue
}
tables[table.Info.ID] = table
Expand All @@ -929,12 +925,12 @@ type FullBackupStorageConfig struct {
type GetIDMapConfig struct {
// required
LoadSavedIDMap bool
TableFilter filter.Filter // original table filter from user

// optional
FullBackupStorage *FullBackupStorageConfig
CipherInfo *backuppb.CipherInfo
PiTRTableFilter *utils.PiTRTableTracker // generated table filter that contain all the table id that needs to restore
FullBackupStorageConfig *FullBackupStorageConfig
CipherInfo *backuppb.CipherInfo
// generated at full restore step that contains all the table ids that need to restore
PiTRTableTracker *utils.PiTRTableTracker
}

const UnsafePITRLogRestoreStartBeforeAnyUpstreamUserDDL = "UNSAFE_PITR_LOG_RESTORE_START_BEFORE_ANY_UPSTREAM_USER_DDL"
Expand All @@ -947,19 +943,19 @@ func (rc *LogClient) generateDBReplacesFromFullBackupStorage(
cfg *GetIDMapConfig,
) (map[stream.UpstreamID]*stream.DBReplace, error) {
dbReplaces := make(map[stream.UpstreamID]*stream.DBReplace)
if cfg.FullBackupStorage == nil {
if cfg.FullBackupStorageConfig == nil {
envVal, ok := os.LookupEnv(UnsafePITRLogRestoreStartBeforeAnyUpstreamUserDDL)
if ok && len(envVal) > 0 {
log.Info(fmt.Sprintf("the environment variable %s is active, skip loading the base schemas.", UnsafePITRLogRestoreStartBeforeAnyUpstreamUserDDL))
return dbReplaces, nil
}
return nil, errors.Errorf("miss upstream table information at `start-ts`(%d) but the full backup path is not specified", rc.startTS)
}
s, err := storage.New(ctx, cfg.FullBackupStorage.Backend, cfg.FullBackupStorage.Opts)
s, err := storage.New(ctx, cfg.FullBackupStorageConfig.Backend, cfg.FullBackupStorageConfig.Opts)
if err != nil {
return nil, errors.Trace(err)
}
filteredFullBackupTables, err := readFilteredFullBackupTables(ctx, s, cfg.TableFilter, cfg.PiTRTableFilter, cfg.CipherInfo)
filteredFullBackupTables, err := readFilteredFullBackupTables(ctx, s, cfg.PiTRTableTracker, cfg.CipherInfo)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -1023,7 +1019,7 @@ func (rc *LogClient) GetBaseIDMap(

// a new task, but without full snapshot restore, tries to load
// schemas map whose `restore-ts`` is the task's `start-ts`.
if len(dbMaps) <= 0 && cfg.FullBackupStorage == nil {
if len(dbMaps) <= 0 && cfg.FullBackupStorageConfig == nil {
log.Info("try to load pitr id maps of the previous task", zap.Uint64("start-ts", rc.startTS))
dbMaps, err = rc.loadSchemasMap(ctx, rc.startTS)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,9 @@ type Config struct {

FilterStr []string `json:"filter-strings" toml:"filter-strings"`
TableFilter filter.Filter `json:"-" toml:"-"`
// PiTRTableFilter generated from TableFilter during snapshot restore, it has all the db id and table id that needs
// PiTRTableTracker generated from TableFilter during snapshot restore, it has all the db id and table id that needs
// to be restored
PiTRTableFilter *utils.PiTRTableTracker `json:"-" toml:"-"`
PiTRTableTracker *utils.PiTRTableTracker `json:"-" toml:"-"`
SwitchModeInterval time.Duration `json:"switch-mode-interval" toml:"switch-mode-interval"`
// Schemas is a database name set, to check whether the restore database has been backup
Schemas map[string]struct{}
Expand Down
31 changes: 16 additions & 15 deletions br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,7 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s
if cfg.logTableHistoryManager != nil {
// adjust tables to restore in the snapshot restore phase since it will later be renamed during
// log restore and will fall into or out of the filter range.
err := adjustTablesToRestoreAndCreateFilter(cfg.logTableHistoryManager, cfg.RestoreConfig, client, fileMap, tableMap)
err := adjustTablesToRestoreAndCreateTableTracker(cfg.logTableHistoryManager, cfg.RestoreConfig, client, fileMap, tableMap)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -873,7 +873,7 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s
zap.Int("db", len(dbMap)))

// need to update to include all eligible table id from snapshot restore
UpdatePiTRFilter(cfg.RestoreConfig, tableMap)
UpdatePiTRTableTracker(cfg.RestoreConfig, tableMap)
}
files, tables, dbs := convertMapsToSlices(fileMap, tableMap, dbMap)

Expand Down Expand Up @@ -1452,7 +1452,7 @@ func filterRestoreFiles(
return
}

func adjustTablesToRestoreAndCreateFilter(
func adjustTablesToRestoreAndCreateTableTracker(
logBackupTableHistory *stream.LogBackupTableHistoryManager,
cfg *RestoreConfig,
client *snapclient.SnapClient,
Expand All @@ -1461,14 +1461,14 @@ func adjustTablesToRestoreAndCreateFilter(
) (err error) {
snapshotDBMap := client.GetDatabaseMap()

// build filter for pitr restore to use later
piTRTableFilter := utils.NewPiTRTableFilter()
// build tracker for pitr restore to use later
piTRTableTracker := utils.NewPiTRTableTracker()

// put all the newly created db that matches the filter during log backup into the pitr filter
newlyCreatedDBs := logBackupTableHistory.GetNewlyCreatedDBHistory()
for dbId, dbName := range newlyCreatedDBs {
if utils.MatchSchema(cfg.TableFilter, dbName) {
piTRTableFilter.AddDB(dbId)
piTRTableTracker.AddDB(dbId)
}
}

Expand All @@ -1480,10 +1480,11 @@ func adjustTablesToRestoreAndCreateFilter(
end := dbIDAndTableName[1]

var dbName string
// check in snapshot
if snapDb, exists := snapshotDBMap[end.DbID]; exists {
dbName = snapDb.Info.Name.O
} else if name, exists := logBackupTableHistory.GetDBNameByID(end.DbID); exists {
// if db id does not exist in the snapshot, meaning it's created during log backup
// check during log backup
dbName = name
} else {
log.Warn("did not find db id in full/log backup, "+
Expand All @@ -1502,10 +1503,10 @@ func adjustTablesToRestoreAndCreateFilter(
// 2. original has been renamed and current is in the filter range
// we need to restore original table
if utils.MatchTable(cfg.TableFilter, dbName, end.TableName) {
// put this db/table id into pitr filter as it matches with user's filter
// put this db/table id into pitr tracker as it matches with user's filter
// have to update filter here since table might be empty or not in snapshot so nothing will be returned .
// but we still need to capture this table id to restore during log restore.
piTRTableFilter.AddTable(end.DbID, tableID)
piTRTableTracker.AddTable(end.DbID, tableID)

// check if snapshot contains the original db/table
originalDB, exists := snapshotDBMap[start.DbID]
Expand Down Expand Up @@ -1533,7 +1534,7 @@ func adjustTablesToRestoreAndCreateFilter(
// restoring
} else if utils.MatchTable(cfg.TableFilter, dbName, start.TableName) {
// remove it from the filter, will not remove db even table size becomes 0
_ = piTRTableFilter.Remove(start.DbID, tableID)
_ = piTRTableTracker.Remove(start.DbID, tableID)

// check if snapshot contains the original db/table
originalDB, exists := snapshotDBMap[start.DbID]
Expand All @@ -1556,15 +1557,15 @@ func adjustTablesToRestoreAndCreateFilter(
}
}
}
// store the filter into config
log.Info("pitr table filter", zap.String("map", piTRTableFilter.String()))
cfg.PiTRTableFilter = piTRTableFilter
// store the tracker into config
log.Info("pitr table tracker", zap.String("map", piTRTableTracker.String()))
cfg.PiTRTableTracker = piTRTableTracker
return
}

func UpdatePiTRFilter(cfg *RestoreConfig, tableMap map[int64]*metautil.Table) {
func UpdatePiTRTableTracker(cfg *RestoreConfig, tableMap map[int64]*metautil.Table) {
for _, table := range tableMap {
cfg.PiTRTableFilter.AddTable(table.DB.ID, table.Info.ID)
cfg.PiTRTableTracker.AddTable(table.DB.ID, table.Info.ID)
}
}

Expand Down
17 changes: 10 additions & 7 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2014,19 +2014,18 @@ func buildSchemaReplace(
func buildAndSaveIDMapIfNeeded(ctx context.Context, client *logclient.LogClient, cfg *LogRestoreConfig,
tableMappingManager *stream.TableMappingManager) error {
// get full backup meta storage if needed.
fullBackupStorage, err := parseFullBackupTablesStorage(cfg.RestoreConfig)
fullBackupStorageConfig, err := parseFullBackupTablesStorage(cfg.RestoreConfig)
if err != nil {
return errors.Trace(err)
}

// get the schemas ID replace information.
saved := isCurrentIdMapSaved(cfg.checkpointTaskInfo)
dbReplaces, err := client.GetBaseIDMap(ctx, &logclient.GetIDMapConfig{
LoadSavedIDMap: saved,
TableFilter: cfg.TableFilter,
PiTRTableFilter: cfg.PiTRTableFilter,
FullBackupStorage: fullBackupStorage,
CipherInfo: &cfg.Config.CipherInfo,
LoadSavedIDMap: saved,
PiTRTableTracker: cfg.PiTRTableTracker,
FullBackupStorageConfig: fullBackupStorageConfig,
CipherInfo: &cfg.Config.CipherInfo,
})
if err != nil {
return errors.Trace(err)
Expand All @@ -2039,7 +2038,11 @@ func buildAndSaveIDMapIfNeeded(ctx context.Context, client *logclient.LogClient,
}
} else {
tableMappingManager.MergeBaseDBReplace(dbReplaces)
tableMappingManager.FilterDBReplaceMap(cfg.PiTRTableFilter)
if cfg.PiTRTableTracker != nil {
tableMappingManager.FilterDBReplaceMap(cfg.PiTRTableTracker)
} else {
log.Warn("pitr table tracker is nil, base map is not from full backup")
}
err = tableMappingManager.ReplaceTemporaryIDs(ctx, client.GenGlobalIDs)
if err != nil {
return errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/utils/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type PiTRTableTracker struct {
DBIdToTable map[int64]map[int64]struct{}
}

func NewPiTRTableFilter() *PiTRTableTracker {
func NewPiTRTableTracker() *PiTRTableTracker {
return &PiTRTableTracker{
DBIdToTable: make(map[int64]map[int64]struct{}),
}
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/utils/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import (

func TestPiTRTableTracker(t *testing.T) {
t.Run("test new tracker", func(t *testing.T) {
tracker := NewPiTRTableFilter()
tracker := NewPiTRTableTracker()
require.NotNil(t, tracker)
require.NotNil(t, tracker.DBIdToTable)
require.Empty(t, tracker.DBIdToTable)
})

t.Run("test update and contains table", func(t *testing.T) {
tracker := NewPiTRTableFilter()
tracker := NewPiTRTableTracker()

tracker.AddDB(1)
tracker.AddTable(1, 100)
Expand All @@ -38,7 +38,7 @@ func TestPiTRTableTracker(t *testing.T) {
})

t.Run("test remove table", func(t *testing.T) {
tracker := NewPiTRTableFilter()
tracker := NewPiTRTableTracker()

tracker.AddTable(1, 100)
tracker.AddTable(1, 101)
Expand Down
2 changes: 1 addition & 1 deletion br/tests/config/tikv.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,5 @@ path = "/tmp/backup_restore_test/master-key-file"
[log-backup]
max-flush-interval = "50s"
[gc]
ratio-threshold = 1.1
ratio-threshold = -1.0

0 comments on commit ee5d347

Please sign in to comment.