From 37f61af6e19881e1c93b35f7996f783536dd0e1f Mon Sep 17 00:00:00 2001 From: Wenqi Mou Date: Tue, 14 Jan 2025 17:53:29 -0500 Subject: [PATCH] fix base merge Signed-off-by: Wenqi Mou --- br/pkg/restore/internal/prealloc_db/db.go | 2 +- br/pkg/stream/table_mapping.go | 59 ++++++++++++++--------- br/pkg/task/restore.go | 4 +- 3 files changed, 38 insertions(+), 27 deletions(-) diff --git a/br/pkg/restore/internal/prealloc_db/db.go b/br/pkg/restore/internal/prealloc_db/db.go index af5cbb30ac402..53bb9cf1dfd85 100644 --- a/br/pkg/restore/internal/prealloc_db/db.go +++ b/br/pkg/restore/internal/prealloc_db/db.go @@ -287,7 +287,7 @@ func (db *DB) canReuseTableID(ti *model.TableInfo) bool { } prealloced := db.preallocedIDs.PreallocedFor(ti) if prealloced { - log.Info("reusing table ID", zap.Stringer("table", ti.Name)) + log.Info("reusing table ID", zap.Stringer("table", ti.Name), zap.Int64("tableID", ti.ID)) } return prealloced } diff --git a/br/pkg/stream/table_mapping.go b/br/pkg/stream/table_mapping.go index 484bb6e9bcc9a..c106662777ed8 100644 --- a/br/pkg/stream/table_mapping.go +++ b/br/pkg/stream/table_mapping.go @@ -136,37 +136,48 @@ func (tm *TableMappingManager) ProcessTableValueAndUpdateIdMapping(dbID int64, t } func (tm *TableMappingManager) MergeBaseDBReplace(baseMap map[UpstreamID]*DBReplace) { - // merge baseMap to DBReplaceMap - // also updating globalIdMap, not going to be used in prod but convenient in test + // first pass: update all global IDs for upstreamID, baseDBReplace := range baseMap { tm.globalIdMap[upstreamID] = baseDBReplace.DbID - if existingDBReplace, exists := tm.DBReplaceMap[upstreamID]; exists { - existingDBReplace.DbID = baseDBReplace.DbID + for tableUpID, baseTableReplace := range baseDBReplace.TableMap { + tm.globalIdMap[tableUpID] = baseTableReplace.TableID - for tableUpID, baseTableReplace := range baseDBReplace.TableMap { - tm.globalIdMap[tableUpID] = baseTableReplace.TableID + for partUpID, basePartDownID := range baseTableReplace.PartitionMap { + tm.globalIdMap[partUpID] = basePartDownID + } + } + } - if existingTableReplace, tableExists := existingDBReplace.TableMap[tableUpID]; tableExists { - existingTableReplace.TableID = baseTableReplace.TableID + // second pass: update the DBReplaceMap + // first update all existing entries using the global ID map + for _, existingDBReplace := range tm.DBReplaceMap { + if newID, exists := tm.globalIdMap[existingDBReplace.DbID]; exists { + existingDBReplace.DbID = newID + } - for partUpID, basePartDownID := range baseTableReplace.PartitionMap { - tm.globalIdMap[partUpID] = basePartDownID - existingTableReplace.PartitionMap[partUpID] = basePartDownID - } - } else { - existingDBReplace.TableMap[tableUpID] = baseTableReplace - for partUpID, basePartDownID := range baseTableReplace.PartitionMap { - tm.globalIdMap[partUpID] = basePartDownID - } + for _, existingTableReplace := range existingDBReplace.TableMap { + if newID, exists := tm.globalIdMap[existingTableReplace.TableID]; exists { + existingTableReplace.TableID = newID + } + + for partUpID, partDownID := range existingTableReplace.PartitionMap { + if newID, exists := tm.globalIdMap[partDownID]; exists { + existingTableReplace.PartitionMap[partUpID] = newID } } - } else { + } + } + + // then add any new entries from the base map + for upstreamID, baseDBReplace := range baseMap { + if _, exists := tm.DBReplaceMap[upstreamID]; !exists { tm.DBReplaceMap[upstreamID] = baseDBReplace + } else { + existingDBReplace := tm.DBReplaceMap[upstreamID] for tableUpID, baseTableReplace := range baseDBReplace.TableMap { - tm.globalIdMap[tableUpID] = baseTableReplace.TableID - for partUpID, basePartDownID := range baseTableReplace.PartitionMap { - tm.globalIdMap[partUpID] = basePartDownID + if _, exists := existingDBReplace.TableMap[tableUpID]; !exists { + existingDBReplace.TableMap[tableUpID] = baseTableReplace } } } @@ -274,18 +285,18 @@ func (tm *TableMappingManager) ReplaceTemporaryIDs( return nil } -func (tm *TableMappingManager) FilterDBReplaceMap(filter *utils.PiTRTableTracker) { +func (tm *TableMappingManager) FilterDBReplaceMap(tracker *utils.PiTRTableTracker) { // iterate through existing DBReplaceMap for dbID, dbReplace := range tm.DBReplaceMap { // remove entire database if not in filter - if !filter.ContainsDB(dbID) { + if !tracker.ContainsDB(dbID) { delete(tm.DBReplaceMap, dbID) continue } // filter tables in this database for tableID := range dbReplace.TableMap { - if !filter.ContainsTable(dbID, tableID) { + if !tracker.ContainsTable(dbID, tableID) { delete(dbReplace.TableMap, tableID) } } diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index e1c2abc25edda..22180bd0bb613 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -1462,7 +1462,7 @@ func AdjustTablesToRestoreAndCreateTableTracker( // build tracker for pitr restore to use later piTRTableTracker := utils.NewPiTRTableTracker() - // put all the newly created db that matches the filter during log backup into the pitr filter + // put all the newly created db that matches the filter during log backup into the pitr table tracker newlyCreatedDBs := logBackupTableHistory.GetNewlyCreatedDBHistory() for dbId, dbName := range newlyCreatedDBs { if utils.MatchSchema(cfg.TableFilter, dbName) { @@ -1562,7 +1562,6 @@ func AdjustTablesToRestoreAndCreateTableTracker( } } // store the tracker into config - log.Info("pitr table tracker", zap.String("map", piTRTableTracker.String())) cfg.PiTRTableTracker = piTRTableTracker return } @@ -1571,6 +1570,7 @@ func UpdatePiTRTableTracker(cfg *RestoreConfig, tableMap map[int64]*metautil.Tab for _, table := range tableMap { cfg.PiTRTableTracker.AddTable(table.DB.ID, table.Info.ID) } + log.Info("pitr table tracker", zap.String("map", cfg.PiTRTableTracker.String())) } // tweakLocalConfForRestore tweaks some of configs of TiDB to make the restore progress go well.