Skip to content

Commit

Permalink
fix base merge
Browse files Browse the repository at this point in the history
Signed-off-by: Wenqi Mou <[email protected]>
  • Loading branch information
Tristan1900 committed Jan 14, 2025
1 parent 1eee968 commit 37f61af
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 27 deletions.
2 changes: 1 addition & 1 deletion br/pkg/restore/internal/prealloc_db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func (db *DB) canReuseTableID(ti *model.TableInfo) bool {
}
prealloced := db.preallocedIDs.PreallocedFor(ti)
if prealloced {
log.Info("reusing table ID", zap.Stringer("table", ti.Name))
log.Info("reusing table ID", zap.Stringer("table", ti.Name), zap.Int64("tableID", ti.ID))
}
return prealloced
}
Expand Down
59 changes: 35 additions & 24 deletions br/pkg/stream/table_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,37 +136,48 @@ func (tm *TableMappingManager) ProcessTableValueAndUpdateIdMapping(dbID int64, t
}

func (tm *TableMappingManager) MergeBaseDBReplace(baseMap map[UpstreamID]*DBReplace) {
// merge baseMap to DBReplaceMap
// also updating globalIdMap, not going to be used in prod but convenient in test
// first pass: update all global IDs
for upstreamID, baseDBReplace := range baseMap {
tm.globalIdMap[upstreamID] = baseDBReplace.DbID

if existingDBReplace, exists := tm.DBReplaceMap[upstreamID]; exists {
existingDBReplace.DbID = baseDBReplace.DbID
for tableUpID, baseTableReplace := range baseDBReplace.TableMap {
tm.globalIdMap[tableUpID] = baseTableReplace.TableID

for tableUpID, baseTableReplace := range baseDBReplace.TableMap {
tm.globalIdMap[tableUpID] = baseTableReplace.TableID
for partUpID, basePartDownID := range baseTableReplace.PartitionMap {
tm.globalIdMap[partUpID] = basePartDownID
}
}
}

if existingTableReplace, tableExists := existingDBReplace.TableMap[tableUpID]; tableExists {
existingTableReplace.TableID = baseTableReplace.TableID
// second pass: update the DBReplaceMap
// first update all existing entries using the global ID map
for _, existingDBReplace := range tm.DBReplaceMap {
if newID, exists := tm.globalIdMap[existingDBReplace.DbID]; exists {
existingDBReplace.DbID = newID
}

for partUpID, basePartDownID := range baseTableReplace.PartitionMap {
tm.globalIdMap[partUpID] = basePartDownID
existingTableReplace.PartitionMap[partUpID] = basePartDownID
}
} else {
existingDBReplace.TableMap[tableUpID] = baseTableReplace
for partUpID, basePartDownID := range baseTableReplace.PartitionMap {
tm.globalIdMap[partUpID] = basePartDownID
}
for _, existingTableReplace := range existingDBReplace.TableMap {
if newID, exists := tm.globalIdMap[existingTableReplace.TableID]; exists {
existingTableReplace.TableID = newID
}

for partUpID, partDownID := range existingTableReplace.PartitionMap {
if newID, exists := tm.globalIdMap[partDownID]; exists {
existingTableReplace.PartitionMap[partUpID] = newID
}
}
} else {
}
}

// then add any new entries from the base map
for upstreamID, baseDBReplace := range baseMap {
if _, exists := tm.DBReplaceMap[upstreamID]; !exists {
tm.DBReplaceMap[upstreamID] = baseDBReplace
} else {
existingDBReplace := tm.DBReplaceMap[upstreamID]
for tableUpID, baseTableReplace := range baseDBReplace.TableMap {
tm.globalIdMap[tableUpID] = baseTableReplace.TableID
for partUpID, basePartDownID := range baseTableReplace.PartitionMap {
tm.globalIdMap[partUpID] = basePartDownID
if _, exists := existingDBReplace.TableMap[tableUpID]; !exists {
existingDBReplace.TableMap[tableUpID] = baseTableReplace
}
}
}
Expand Down Expand Up @@ -274,18 +285,18 @@ func (tm *TableMappingManager) ReplaceTemporaryIDs(
return nil
}

func (tm *TableMappingManager) FilterDBReplaceMap(filter *utils.PiTRTableTracker) {
func (tm *TableMappingManager) FilterDBReplaceMap(tracker *utils.PiTRTableTracker) {
// iterate through existing DBReplaceMap
for dbID, dbReplace := range tm.DBReplaceMap {
// remove entire database if not in filter
if !filter.ContainsDB(dbID) {
if !tracker.ContainsDB(dbID) {
delete(tm.DBReplaceMap, dbID)
continue
}

// filter tables in this database
for tableID := range dbReplace.TableMap {
if !filter.ContainsTable(dbID, tableID) {
if !tracker.ContainsTable(dbID, tableID) {
delete(dbReplace.TableMap, tableID)
}
}
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1462,7 +1462,7 @@ func AdjustTablesToRestoreAndCreateTableTracker(
// build tracker for pitr restore to use later
piTRTableTracker := utils.NewPiTRTableTracker()

// put all the newly created db that matches the filter during log backup into the pitr filter
// put all the newly created db that matches the filter during log backup into the pitr table tracker
newlyCreatedDBs := logBackupTableHistory.GetNewlyCreatedDBHistory()
for dbId, dbName := range newlyCreatedDBs {
if utils.MatchSchema(cfg.TableFilter, dbName) {
Expand Down Expand Up @@ -1562,7 +1562,6 @@ func AdjustTablesToRestoreAndCreateTableTracker(
}
}
// store the tracker into config
log.Info("pitr table tracker", zap.String("map", piTRTableTracker.String()))
cfg.PiTRTableTracker = piTRTableTracker
return
}
Expand All @@ -1571,6 +1570,7 @@ func UpdatePiTRTableTracker(cfg *RestoreConfig, tableMap map[int64]*metautil.Tab
for _, table := range tableMap {
cfg.PiTRTableTracker.AddTable(table.DB.ID, table.Info.ID)
}
log.Info("pitr table tracker", zap.String("map", cfg.PiTRTableTracker.String()))
}

// tweakLocalConfForRestore tweaks some of configs of TiDB to make the restore progress go well.
Expand Down

0 comments on commit 37f61af

Please sign in to comment.