Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: don't handle live updates of column size #58596

Merged
merged 10 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions pkg/disttask/importinto/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ type SharedVars struct {
TableImporter *importer.TableImporter
DataEngine *backend.OpenedEngine
IndexEngine *backend.OpenedEngine
Progress *importer.Progress

mu sync.Mutex
Checksum *verification.KVGroupChecksum
Expand Down Expand Up @@ -183,5 +182,4 @@ type Checksum struct {
// This portion of the code may be implemented uniformly in the framework in the future.
type Result struct {
LoadedRowCnt uint64
ColSizeMap map[int64]int64
}
8 changes: 1 addition & 7 deletions pkg/disttask/importinto/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,14 +577,9 @@ func updateResult(handle storage.TaskHandle, task *proto.Task, taskMeta *TaskMet
}
subtaskMetas = append(subtaskMetas, &subtaskMeta)
}
columnSizeMap := make(map[int64]int64)
for _, subtaskMeta := range subtaskMetas {
taskMeta.Result.LoadedRowCnt += subtaskMeta.Result.LoadedRowCnt
for key, val := range subtaskMeta.Result.ColSizeMap {
columnSizeMap[key] += val
}
}
taskMeta.Result.ColSizeMap = columnSizeMap

if globalSort {
taskMeta.Result.LoadedRowCnt, err = getLoadedRowCountOnGlobalSort(handle, task)
Expand Down Expand Up @@ -662,8 +657,7 @@ func (sch *ImportSchedulerExt) finishJob(ctx context.Context, logger *zap.Logger
func(ctx context.Context) (bool, error) {
return true, taskHandle.WithNewSession(func(se sessionctx.Context) error {
if err := importer.FlushTableStats(ctx, se, taskMeta.Plan.TableInfo.ID, &importer.JobImportResult{
Affected: taskMeta.Result.LoadedRowCnt,
ColSizeMap: taskMeta.Result.ColSizeMap,
Affected: taskMeta.Result.LoadedRowCnt,
}); err != nil {
logger.Warn("flush table stats failed", zap.Error(err))
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/disttask/importinto/subtask_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ func (e *importMinimalTaskExecutor) Run(ctx context.Context, dataWriter, indexWr
sharedVars.TableImporter,
sharedVars.DataEngine,
sharedVars.IndexEngine,
sharedVars.Progress,
logger,
checksum,
); err != nil {
Expand All @@ -82,7 +81,6 @@ func (e *importMinimalTaskExecutor) Run(ctx context.Context, dataWriter, indexWr
sharedVars.TableImporter,
dataWriter,
indexWriter,
sharedVars.Progress,
logger,
checksum,
); err != nil {
Expand Down
2 changes: 0 additions & 2 deletions pkg/disttask/importinto/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ func (s *importStepExecutor) RunSubtask(ctx context.Context, subtask *proto.Subt
TableImporter: s.tableImporter,
DataEngine: dataEngine,
IndexEngine: indexEngine,
Progress: importer.NewProgress(),
Checksum: verification.NewKVGroupChecksumWithKeyspace(s.tableImporter.GetKeySpace()),
SortedDataMeta: &external.SortedKVMeta{},
SortedIndexMetas: make(map[int64]*external.SortedKVMeta),
Expand Down Expand Up @@ -251,7 +250,6 @@ func (s *importStepExecutor) onFinished(ctx context.Context, subtask *proto.Subt
}
subtaskMeta.Result = Result{
LoadedRowCnt: dataKVCount,
ColSizeMap: sharedVars.Progress.GetColSize(),
}
allocators := sharedVars.TableImporter.Allocators()
subtaskMeta.MaxIDs = map[autoid.AllocatorType]int64{
Expand Down
1 change: 0 additions & 1 deletion pkg/executor/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ go_library(
"job.go",
"kv_encode.go",
"precheck.go",
"progress.go",
"table_import.go",
],
importpath = "github.com/pingcap/tidb/pkg/executor/importer",
Expand Down
5 changes: 1 addition & 4 deletions pkg/executor/importer/engine_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ func ProcessChunk(
chunk *checkpoints.ChunkCheckpoint,
tableImporter *TableImporter,
dataEngine, indexEngine *backend.OpenedEngine,
progress *Progress,
logger *zap.Logger,
groupChecksum *verification.KVGroupChecksum,
) error {
Expand Down Expand Up @@ -65,7 +64,7 @@ func ProcessChunk(
}
}()

return ProcessChunkWithWriter(ctx, chunk, tableImporter, dataWriter, indexWriter, progress, logger, groupChecksum)
return ProcessChunkWithWriter(ctx, chunk, tableImporter, dataWriter, indexWriter, logger, groupChecksum)
}

// ProcessChunkWithWriter processes a chunk, and write kv pairs to dataWriter and indexWriter.
Expand All @@ -74,7 +73,6 @@ func ProcessChunkWithWriter(
chunk *checkpoints.ChunkCheckpoint,
tableImporter *TableImporter,
dataWriter, indexWriter backend.EngineWriter,
progress *Progress,
logger *zap.Logger,
groupChecksum *verification.KVGroupChecksum,
) error {
Expand Down Expand Up @@ -116,6 +114,5 @@ func ProcessChunkWithWriter(
if err != nil {
return err
}
progress.AddColSize(encoder.GetColumnSize())
return nil
}
5 changes: 2 additions & 3 deletions pkg/executor/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -1395,9 +1395,8 @@ func getDataSourceType(p *plannercore.ImportInto) DataSourceType {

// JobImportResult is the result of the job import.
type JobImportResult struct {
Affected uint64
Warnings []contextutil.SQLWarn
ColSizeMap variable.DeltaColsMap
Affected uint64
Warnings []contextutil.SQLWarn
}

// GetMsgFromBRError get msg from BR error.
Expand Down
8 changes: 2 additions & 6 deletions pkg/executor/importer/importer_testkit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,11 +311,9 @@ func TestProcessChunkWith(t *testing.T) {
defer ti.Backend().CloseEngineMgr()
kvWriter := mock.NewMockEngineWriter(ctrl)
kvWriter.EXPECT().AppendRows(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
progress := importer.NewProgress()
checksum := verify.NewKVGroupChecksumWithKeyspace(keyspace)
err := importer.ProcessChunkWithWriter(ctx, chunkInfo, ti, kvWriter, kvWriter, progress, zap.NewExample(), checksum)
err := importer.ProcessChunkWithWriter(ctx, chunkInfo, ti, kvWriter, kvWriter, zap.NewExample(), checksum)
require.NoError(t, err)
require.Len(t, progress.GetColSize(), 3)
checksumMap := checksum.GetInnerChecksums()
require.Len(t, checksumMap, 1)
require.Equal(t, verify.MakeKVChecksum(74, 2, 15625182175392723123), *checksumMap[verify.DataKVGroupID])
Expand Down Expand Up @@ -343,11 +341,9 @@ func TestProcessChunkWith(t *testing.T) {
ti.SetSelectedRowCh(rowsCh)
kvWriter := mock.NewMockEngineWriter(ctrl)
kvWriter.EXPECT().AppendRows(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
progress := importer.NewProgress()
checksum := verify.NewKVGroupChecksumWithKeyspace(keyspace)
err := importer.ProcessChunkWithWriter(ctx, chunkInfo, ti, kvWriter, kvWriter, progress, zap.NewExample(), checksum)
err := importer.ProcessChunkWithWriter(ctx, chunkInfo, ti, kvWriter, kvWriter, zap.NewExample(), checksum)
require.NoError(t, err)
require.Len(t, progress.GetColSize(), 3)
checksumMap := checksum.GetInnerChecksums()
require.Len(t, checksumMap, 1)
require.Equal(t, verify.MakeKVChecksum(111, 3, 14585065391351463171), *checksumMap[verify.DataKVGroupID])
Expand Down
6 changes: 0 additions & 6 deletions pkg/executor/importer/kv_encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ import (
// KVEncoder encodes a row of data into a KV pair.
type KVEncoder interface {
Encode(row []types.Datum, rowID int64) (*kv.Pairs, error)
// GetColumnSize returns the size of each column in the current encoder.
GetColumnSize() map[int64]int64
io.Closer
}

Expand Down Expand Up @@ -91,10 +89,6 @@ func (en *tableKVEncoder) Encode(row []types.Datum, rowID int64) (*kv.Pairs, err
return en.Record2KV(record, row, rowID)
}

func (en *tableKVEncoder) GetColumnSize() map[int64]int64 {
return en.SessionCtx.GetColumnSize(en.TableMeta().ID)
}

// todo merge with code in load_data.go
func (en *tableKVEncoder) parserData2TableData(parserData []types.Datum, rowID int64) ([]types.Datum, error) {
row := make([]types.Datum, 0, len(en.insertColumns))
Expand Down
49 changes: 0 additions & 49 deletions pkg/executor/importer/progress.go

This file was deleted.

16 changes: 5 additions & 11 deletions pkg/executor/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,25 +655,20 @@ func (ti *TableImporter) ImportSelectedRows(ctx context.Context, se sessionctx.C
}

var (
mu sync.Mutex
checksum = verify.NewKVGroupChecksumWithKeyspace(ti.keyspace)
colSizeMap = make(map[int64]int64)
mu sync.Mutex
checksum = verify.NewKVGroupChecksumWithKeyspace(ti.keyspace)
)
eg, egCtx := tidbutil.NewErrorGroupWithRecoverWithCtx(ctx)
for i := 0; i < ti.ThreadCnt; i++ {
eg.Go(func() error {
chunkCheckpoint := checkpoints.ChunkCheckpoint{}
chunkChecksum := verify.NewKVGroupChecksumWithKeyspace(ti.keyspace)
progress := NewProgress()
defer func() {
mu.Lock()
defer mu.Unlock()
checksum.Add(chunkChecksum)
for k, v := range progress.GetColSize() {
colSizeMap[k] += v
}
}()
return ProcessChunk(egCtx, &chunkCheckpoint, ti, dataEngine, indexEngine, progress, ti.logger, chunkChecksum)
return ProcessChunk(egCtx, &chunkCheckpoint, ti, dataEngine, indexEngine, ti.logger, chunkChecksum)
})
}
if err = eg.Wait(); err != nil {
Expand Down Expand Up @@ -717,8 +712,7 @@ func (ti *TableImporter) ImportSelectedRows(ctx context.Context, se sessionctx.C
}

return &JobImportResult{
Affected: uint64(dataKVCount),
ColSizeMap: colSizeMap,
Affected: uint64(dataKVCount),
}, nil
}

Expand Down Expand Up @@ -977,7 +971,7 @@ func FlushTableStats(ctx context.Context, se sessionctx.Context, tableID int64,
sessionVars := se.GetSessionVars()
sessionVars.TxnCtxMu.Lock()
defer sessionVars.TxnCtxMu.Unlock()
sessionVars.TxnCtx.UpdateDeltaForTable(tableID, int64(result.Affected), int64(result.Affected), result.ColSizeMap)
sessionVars.TxnCtx.UpdateDeltaForTable(tableID, int64(result.Affected), int64(result.Affected))
se.StmtCommit(ctx)
return se.CommitTxn(ctx)
}
2 changes: 1 addition & 1 deletion pkg/executor/internal/exec/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ func (e *BaseExecutor) Ctx() sessionctx.Context {
// UpdateDeltaForTableID updates the delta info for the table with tableID.
func (e *BaseExecutor) UpdateDeltaForTableID(id int64) {
txnCtx := e.ctx.GetSessionVars().TxnCtx
txnCtx.UpdateDeltaForTable(id, 0, 0, nil)
txnCtx.UpdateDeltaForTable(id, 0, 0)
}

// GetSysSession gets a system session context from executor.
Expand Down
25 changes: 1 addition & 24 deletions pkg/lightning/backend/kv/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
package kv

import (
"maps"
"math/rand"
"sync"
"time"

"github.com/pingcap/tidb/pkg/errctx"
Expand Down Expand Up @@ -122,11 +120,6 @@ type litTableMutateContext struct {
reservedRowIDAlloc stmtctx.ReservedRowIDAlloc
enableMutationChecker bool
assertionLevel variable.AssertionLevel
tableDelta struct {
sync.Mutex
// tblID -> (colID -> deltaSize)
m map[int64]map[int64]int64
}
}

// AlternativeAllocators implements the `table.MutateContext` interface.
Expand Down Expand Up @@ -189,24 +182,8 @@ func (ctx *litTableMutateContext) GetStatisticsSupport() (tblctx.StatisticsSuppo

// UpdatePhysicalTableDelta implements the `table.StatisticsSupport` interface.
func (ctx *litTableMutateContext) UpdatePhysicalTableDelta(
physicalTableID int64, _ int64,
_ int64, cols variable.DeltaCols,
_, _, _ int64,
) {
ctx.tableDelta.Lock()
defer ctx.tableDelta.Unlock()
if ctx.tableDelta.m == nil {
ctx.tableDelta.m = make(map[int64]map[int64]int64)
}
tableMap := ctx.tableDelta.m
colSize := tableMap[physicalTableID]
tableMap[physicalTableID] = cols.UpdateColSizeMap(colSize)
}

// GetColumnSize returns the colum size map (colID -> deltaSize) for the given table ID.
func (ctx *litTableMutateContext) GetColumnSize(tblID int64) (ret map[int64]int64) {
ctx.tableDelta.Lock()
defer ctx.tableDelta.Unlock()
return maps.Clone(ctx.tableDelta.m[tblID])
}

// GetCachedTableSupport implements the `table.MutateContext` interface.
Expand Down
13 changes: 2 additions & 11 deletions pkg/lightning/backend/kv/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,17 +250,8 @@ func TestLitTableMutateContext(t *testing.T) {
stats, ok := tblCtx.GetStatisticsSupport()
require.True(t, ok)
// test for `UpdatePhysicalTableDelta` and `GetColumnSize`
stats.UpdatePhysicalTableDelta(123, 5, 2, variable.DeltaColsMap{1: 2, 3: 4})
r := tblCtx.GetColumnSize(123)
require.Equal(t, map[int64]int64{1: 2, 3: 4}, r)
stats.UpdatePhysicalTableDelta(123, 8, 2, variable.DeltaColsMap{3: 5, 4: 3})
r = tblCtx.GetColumnSize(123)
require.Equal(t, map[int64]int64{1: 2, 3: 9, 4: 3}, r)
// the result should be a cloned value
r[1] = 100
require.Equal(t, map[int64]int64{1: 2, 3: 9, 4: 3}, tblCtx.GetColumnSize(123))
// test gets a non-existed table
require.Empty(t, tblCtx.GetColumnSize(456))
stats.UpdatePhysicalTableDelta(123, 5, 2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can delete them at all.

stats.UpdatePhysicalTableDelta(123, 8, 2)
}

// test for default
Expand Down
5 changes: 0 additions & 5 deletions pkg/lightning/backend/kv/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,11 +363,6 @@ func (s *Session) UnsetUserVar(varName string) {
s.exprCtx.unsetUserVar(varName)
}

// GetColumnSize returns the size of each column.
func (s *Session) GetColumnSize(tblID int64) (ret map[int64]int64) {
return s.tblCtx.GetColumnSize(tblID)
}

// Close closes the session
func (s *Session) Close() {
memBuf := &s.txn.MemBuf
Expand Down
20 changes: 0 additions & 20 deletions pkg/planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5380,7 +5380,6 @@ func pruneAndBuildSingleTableColPosInfoForDelete(
// Columns can be seen by DELETE are the deletable columns.
deletableCols := t.DeletableCols()
deletableIdxs := t.DeletableIndices()
publicCols := t.Cols()
tblLen := len(deletableCols)

// Fix the start position of the columns.
Expand Down Expand Up @@ -5415,25 +5414,6 @@ func pruneAndBuildSingleTableColPosInfoForDelete(
fixedPos[i] = i - pruned
}

// Fill in the ColumnSizes.
colPosInfo.ColumnsSizeHelper = &table.ColumnsSizeHelper{
NotPruned: bitset.New(uint(len(publicCols))),
AvgSizes: make([]float64, 0, len(publicCols)),
PublicColsLayout: make([]int, 0, len(publicCols)),
}
colPosInfo.ColumnsSizeHelper.NotPruned.SetAll()
for i, col := range publicCols {
// If the column is not pruned, we can use the column data to get a more accurate size.
// We just need to record its position info.
if _, ok := fixedPos[col.Offset]; ok {
colPosInfo.ColumnsSizeHelper.PublicColsLayout = append(colPosInfo.ColumnsSizeHelper.PublicColsLayout, fixedPos[col.Offset])
continue
}
// Otherwise we need to get the average size of the column by its field type.
// TODO: use statistics to get a maybe more accurate size.
colPosInfo.ColumnsSizeHelper.NotPruned.Clear(uint(i))
colPosInfo.ColumnsSizeHelper.AvgSizes = append(colPosInfo.ColumnsSizeHelper.AvgSizes, float64(chunk.EstimateTypeWidth(&col.FieldType)))
}
// Fix the index layout and fill in table.IndexRowLayoutOption.
indexColMap := make(map[int64]table.IndexRowLayoutOption, len(deletableIdxs))
for _, idx := range deletableIdxs {
Expand Down
2 changes: 1 addition & 1 deletion pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -908,7 +908,7 @@ func (s *session) updateStatsDeltaToCollector() {
if s.statsCollector != nil && mapper != nil {
for _, item := range mapper {
if item.TableID > 0 {
s.statsCollector.Update(item.TableID, item.Delta, item.Count, &item.ColSize)
s.statsCollector.Update(item.TableID, item.Delta, item.Count)
}
}
}
Expand Down
Loading