Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: don't handle live updates of column size #58596

Merged
merged 10 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions pkg/disttask/importinto/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ type SharedVars struct {
TableImporter *importer.TableImporter
DataEngine *backend.OpenedEngine
IndexEngine *backend.OpenedEngine
Progress *importer.Progress

mu sync.Mutex
Checksum *verification.KVGroupChecksum
Expand Down Expand Up @@ -183,5 +182,4 @@ type Checksum struct {
// This portion of the code may be implemented uniformly in the framework in the future.
type Result struct {
LoadedRowCnt uint64
ColSizeMap map[int64]int64
}
8 changes: 1 addition & 7 deletions pkg/disttask/importinto/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,14 +577,9 @@ func updateResult(handle storage.TaskHandle, task *proto.Task, taskMeta *TaskMet
}
subtaskMetas = append(subtaskMetas, &subtaskMeta)
}
columnSizeMap := make(map[int64]int64)
for _, subtaskMeta := range subtaskMetas {
taskMeta.Result.LoadedRowCnt += subtaskMeta.Result.LoadedRowCnt
for key, val := range subtaskMeta.Result.ColSizeMap {
columnSizeMap[key] += val
}
}
taskMeta.Result.ColSizeMap = columnSizeMap

if globalSort {
taskMeta.Result.LoadedRowCnt, err = getLoadedRowCountOnGlobalSort(handle, task)
Expand Down Expand Up @@ -662,8 +657,7 @@ func (sch *ImportSchedulerExt) finishJob(ctx context.Context, logger *zap.Logger
func(ctx context.Context) (bool, error) {
return true, taskHandle.WithNewSession(func(se sessionctx.Context) error {
if err := importer.FlushTableStats(ctx, se, taskMeta.Plan.TableInfo.ID, &importer.JobImportResult{
Affected: taskMeta.Result.LoadedRowCnt,
ColSizeMap: taskMeta.Result.ColSizeMap,
Affected: taskMeta.Result.LoadedRowCnt,
}); err != nil {
logger.Warn("flush table stats failed", zap.Error(err))
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/disttask/importinto/subtask_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ func (e *importMinimalTaskExecutor) Run(ctx context.Context, dataWriter, indexWr
sharedVars.TableImporter,
sharedVars.DataEngine,
sharedVars.IndexEngine,
sharedVars.Progress,
logger,
checksum,
); err != nil {
Expand All @@ -82,7 +81,6 @@ func (e *importMinimalTaskExecutor) Run(ctx context.Context, dataWriter, indexWr
sharedVars.TableImporter,
dataWriter,
indexWriter,
sharedVars.Progress,
logger,
checksum,
); err != nil {
Expand Down
2 changes: 0 additions & 2 deletions pkg/disttask/importinto/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ func (s *importStepExecutor) RunSubtask(ctx context.Context, subtask *proto.Subt
TableImporter: s.tableImporter,
DataEngine: dataEngine,
IndexEngine: indexEngine,
Progress: importer.NewProgress(),
Checksum: verification.NewKVGroupChecksumWithKeyspace(s.tableImporter.GetKeySpace()),
SortedDataMeta: &external.SortedKVMeta{},
SortedIndexMetas: make(map[int64]*external.SortedKVMeta),
Expand Down Expand Up @@ -251,7 +250,6 @@ func (s *importStepExecutor) onFinished(ctx context.Context, subtask *proto.Subt
}
subtaskMeta.Result = Result{
LoadedRowCnt: dataKVCount,
ColSizeMap: sharedVars.Progress.GetColSize(),
}
allocators := sharedVars.TableImporter.Allocators()
subtaskMeta.MaxIDs = map[autoid.AllocatorType]int64{
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func (e *DeleteExec) removeRow(ctx sessionctx.Context, t table.Table, h kv.Handl
return err
}

err = t.RemoveRecord(ctx.GetTableCtx(), txn, h, data, posInfo.ExtraPartialRowOption)
err = t.RemoveRecord(ctx.GetTableCtx(), txn, h, data, posInfo.IndexesRowLayout)
if err != nil {
return err
}
Expand Down
1 change: 0 additions & 1 deletion pkg/executor/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ go_library(
"job.go",
"kv_encode.go",
"precheck.go",
"progress.go",
"table_import.go",
],
importpath = "github.com/pingcap/tidb/pkg/executor/importer",
Expand Down
5 changes: 1 addition & 4 deletions pkg/executor/importer/engine_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ func ProcessChunk(
chunk *checkpoints.ChunkCheckpoint,
tableImporter *TableImporter,
dataEngine, indexEngine *backend.OpenedEngine,
progress *Progress,
logger *zap.Logger,
groupChecksum *verification.KVGroupChecksum,
) error {
Expand Down Expand Up @@ -65,7 +64,7 @@ func ProcessChunk(
}
}()

return ProcessChunkWithWriter(ctx, chunk, tableImporter, dataWriter, indexWriter, progress, logger, groupChecksum)
return ProcessChunkWithWriter(ctx, chunk, tableImporter, dataWriter, indexWriter, logger, groupChecksum)
}

// ProcessChunkWithWriter processes a chunk, and write kv pairs to dataWriter and indexWriter.
Expand All @@ -74,7 +73,6 @@ func ProcessChunkWithWriter(
chunk *checkpoints.ChunkCheckpoint,
tableImporter *TableImporter,
dataWriter, indexWriter backend.EngineWriter,
progress *Progress,
logger *zap.Logger,
groupChecksum *verification.KVGroupChecksum,
) error {
Expand Down Expand Up @@ -116,6 +114,5 @@ func ProcessChunkWithWriter(
if err != nil {
return err
}
progress.AddColSize(encoder.GetColumnSize())
return nil
}
5 changes: 2 additions & 3 deletions pkg/executor/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -1395,9 +1395,8 @@ func getDataSourceType(p *plannercore.ImportInto) DataSourceType {

// JobImportResult is the result of the job import.
type JobImportResult struct {
Affected uint64
Warnings []contextutil.SQLWarn
ColSizeMap variable.DeltaColsMap
Affected uint64
Warnings []contextutil.SQLWarn
}

// GetMsgFromBRError get msg from BR error.
Expand Down
8 changes: 2 additions & 6 deletions pkg/executor/importer/importer_testkit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,11 +311,9 @@ func TestProcessChunkWith(t *testing.T) {
defer ti.Backend().CloseEngineMgr()
kvWriter := mock.NewMockEngineWriter(ctrl)
kvWriter.EXPECT().AppendRows(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
progress := importer.NewProgress()
checksum := verify.NewKVGroupChecksumWithKeyspace(keyspace)
err := importer.ProcessChunkWithWriter(ctx, chunkInfo, ti, kvWriter, kvWriter, progress, zap.NewExample(), checksum)
err := importer.ProcessChunkWithWriter(ctx, chunkInfo, ti, kvWriter, kvWriter, zap.NewExample(), checksum)
require.NoError(t, err)
require.Len(t, progress.GetColSize(), 3)
checksumMap := checksum.GetInnerChecksums()
require.Len(t, checksumMap, 1)
require.Equal(t, verify.MakeKVChecksum(74, 2, 15625182175392723123), *checksumMap[verify.DataKVGroupID])
Expand Down Expand Up @@ -343,11 +341,9 @@ func TestProcessChunkWith(t *testing.T) {
ti.SetSelectedRowCh(rowsCh)
kvWriter := mock.NewMockEngineWriter(ctrl)
kvWriter.EXPECT().AppendRows(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
progress := importer.NewProgress()
checksum := verify.NewKVGroupChecksumWithKeyspace(keyspace)
err := importer.ProcessChunkWithWriter(ctx, chunkInfo, ti, kvWriter, kvWriter, progress, zap.NewExample(), checksum)
err := importer.ProcessChunkWithWriter(ctx, chunkInfo, ti, kvWriter, kvWriter, zap.NewExample(), checksum)
require.NoError(t, err)
require.Len(t, progress.GetColSize(), 3)
checksumMap := checksum.GetInnerChecksums()
require.Len(t, checksumMap, 1)
require.Equal(t, verify.MakeKVChecksum(111, 3, 14585065391351463171), *checksumMap[verify.DataKVGroupID])
Expand Down
6 changes: 0 additions & 6 deletions pkg/executor/importer/kv_encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ import (
// KVEncoder encodes a row of data into a KV pair.
type KVEncoder interface {
Encode(row []types.Datum, rowID int64) (*kv.Pairs, error)
// GetColumnSize returns the size of each column in the current encoder.
GetColumnSize() map[int64]int64
io.Closer
}

Expand Down Expand Up @@ -91,10 +89,6 @@ func (en *tableKVEncoder) Encode(row []types.Datum, rowID int64) (*kv.Pairs, err
return en.Record2KV(record, row, rowID)
}

func (en *tableKVEncoder) GetColumnSize() map[int64]int64 {
return en.SessionCtx.GetColumnSize(en.TableMeta().ID)
}

// todo merge with code in load_data.go
func (en *tableKVEncoder) parserData2TableData(parserData []types.Datum, rowID int64) ([]types.Datum, error) {
row := make([]types.Datum, 0, len(en.insertColumns))
Expand Down
49 changes: 0 additions & 49 deletions pkg/executor/importer/progress.go

This file was deleted.

16 changes: 5 additions & 11 deletions pkg/executor/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,25 +655,20 @@ func (ti *TableImporter) ImportSelectedRows(ctx context.Context, se sessionctx.C
}

var (
mu sync.Mutex
checksum = verify.NewKVGroupChecksumWithKeyspace(ti.keyspace)
colSizeMap = make(map[int64]int64)
mu sync.Mutex
checksum = verify.NewKVGroupChecksumWithKeyspace(ti.keyspace)
)
eg, egCtx := tidbutil.NewErrorGroupWithRecoverWithCtx(ctx)
for i := 0; i < ti.ThreadCnt; i++ {
eg.Go(func() error {
chunkCheckpoint := checkpoints.ChunkCheckpoint{}
chunkChecksum := verify.NewKVGroupChecksumWithKeyspace(ti.keyspace)
progress := NewProgress()
defer func() {
mu.Lock()
defer mu.Unlock()
checksum.Add(chunkChecksum)
for k, v := range progress.GetColSize() {
colSizeMap[k] += v
}
}()
return ProcessChunk(egCtx, &chunkCheckpoint, ti, dataEngine, indexEngine, progress, ti.logger, chunkChecksum)
return ProcessChunk(egCtx, &chunkCheckpoint, ti, dataEngine, indexEngine, ti.logger, chunkChecksum)
})
}
if err = eg.Wait(); err != nil {
Expand Down Expand Up @@ -717,8 +712,7 @@ func (ti *TableImporter) ImportSelectedRows(ctx context.Context, se sessionctx.C
}

return &JobImportResult{
Affected: uint64(dataKVCount),
ColSizeMap: colSizeMap,
Affected: uint64(dataKVCount),
}, nil
}

Expand Down Expand Up @@ -977,7 +971,7 @@ func FlushTableStats(ctx context.Context, se sessionctx.Context, tableID int64,
sessionVars := se.GetSessionVars()
sessionVars.TxnCtxMu.Lock()
defer sessionVars.TxnCtxMu.Unlock()
sessionVars.TxnCtx.UpdateDeltaForTable(tableID, int64(result.Affected), int64(result.Affected), result.ColSizeMap)
sessionVars.TxnCtx.UpdateDeltaForTable(tableID, int64(result.Affected), int64(result.Affected))
se.StmtCommit(ctx)
return se.CommitTxn(ctx)
}
39 changes: 23 additions & 16 deletions pkg/executor/infoschema_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,20 +184,23 @@ func TestDataForTableStatsField(t *testing.T) {
require.NoError(t, h.DumpStatsDeltaToKV(true))
require.NoError(t, h.Update(context.Background(), is))
tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.tables where table_name='t'").Check(
testkit.Rows("3 18 54 6"))
testkit.Rows("3 16 48 0"))
tk.MustExec(`insert into t(c, d, e) values(4, 5, "f")`)
require.NoError(t, h.DumpStatsDeltaToKV(true))
require.NoError(t, h.Update(context.Background(), is))
tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.tables where table_name='t'").Check(
testkit.Rows("4 18 72 8"))
testkit.Rows("4 16 64 0"))
tk.MustExec("delete from t where c >= 3")
require.NoError(t, h.DumpStatsDeltaToKV(true))
require.NoError(t, h.Update(context.Background(), is))
tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.tables where table_name='t'").Check(
testkit.Rows("2 18 36 4"))
testkit.Rows("2 16 32 0"))
tk.MustExec("delete from t where c=3")
require.NoError(t, h.DumpStatsDeltaToKV(true))
require.NoError(t, h.Update(context.Background(), is))
tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.tables where table_name='t'").Check(
testkit.Rows("2 16 32 0"))
tk.MustExec("analyze table t all columns")
tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.tables where table_name='t'").Check(
testkit.Rows("2 18 36 4"))

Expand All @@ -209,6 +212,9 @@ func TestDataForTableStatsField(t *testing.T) {
tk.MustExec(`insert into t(a, b, c) values(1, 2, "c"), (7, 3, "d"), (12, 4, "e")`)
require.NoError(t, h.DumpStatsDeltaToKV(true))
require.NoError(t, h.Update(context.Background(), is))
tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.tables where table_name='t'").Check(
testkit.Rows("3 16 48 0"))
tk.MustExec("analyze table t all columns")
tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.tables where table_name='t'").Check(
testkit.Rows("3 18 54 6"))
}
Expand All @@ -229,23 +235,24 @@ func TestPartitionsTable(t *testing.T) {
tk.MustExec(`insert into test_partitions(a, b, c) values(1, 2, "c"), (7, 3, "d"), (12, 4, "e");`)

tk.MustQuery("select PARTITION_NAME, PARTITION_DESCRIPTION from information_schema.PARTITIONS where table_name='test_partitions';").Check(
testkit.Rows("" +
"p0 6]\n" +
"[p1 11]\n" +
"[p2 16"))
testkit.Rows("p0 6", "p1 11", "p2 16"))

tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.PARTITIONS where table_name='test_partitions';").Check(
testkit.Rows("" +
"0 0 0 0]\n" +
"[0 0 0 0]\n" +
"[0 0 0 0"))
testkit.Rows(
"0 0 0 0",
"0 0 0 0",
"0 0 0 0",
),
)
require.NoError(t, h.DumpStatsDeltaToKV(true))
require.NoError(t, h.Update(context.Background(), is))
tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.PARTITIONS where table_name='test_partitions';").Check(
testkit.Rows("" +
"1 18 18 2]\n" +
"[1 18 18 2]\n" +
"[1 18 18 2"))
testkit.Rows(
"1 16 16 0",
"1 16 16 0",
"1 16 16 0",
),
)
})

// Test for table has no partitions.
Expand All @@ -257,7 +264,7 @@ func TestPartitionsTable(t *testing.T) {
require.NoError(t, h.DumpStatsDeltaToKV(true))
require.NoError(t, h.Update(context.Background(), is))
tk.MustQuery("select PARTITION_NAME, TABLE_ROWS, AVG_ROW_LENGTH, DATA_LENGTH, INDEX_LENGTH from information_schema.PARTITIONS where table_name='test_partitions_1';").Check(
testkit.Rows("<nil> 3 18 54 6"))
testkit.Rows("<nil> 3 16 48 0"))

tk.MustExec("DROP TABLE IF EXISTS `test_partitions`;")
tk.MustExec(`CREATE TABLE test_partitions1 (id int, b int, c varchar(5), primary key(id), index idx(c)) PARTITION BY RANGE COLUMNS(id) (PARTITION p0 VALUES LESS THAN (6), PARTITION p1 VALUES LESS THAN (11), PARTITION p2 VALUES LESS THAN (16));`)
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/internal/exec/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ func (e *BaseExecutor) Ctx() sessionctx.Context {
// UpdateDeltaForTableID updates the delta info for the table with tableID.
func (e *BaseExecutor) UpdateDeltaForTableID(id int64) {
txnCtx := e.ctx.GetSessionVars().TxnCtx
txnCtx.UpdateDeltaForTable(id, 0, 0, nil)
txnCtx.UpdateDeltaForTable(id, 0, 0)
}

// GetSysSession gets a system session context from executor.
Expand Down
Loading