Skip to content

Commit

Permalink
metrics: add col/idx name(s) for BackfillProgressGauge and BackfillTo…
Browse files Browse the repository at this point in the history
…talCounter (#58380) (#58530)

close #58114
  • Loading branch information
ti-chi-bot authored Dec 26, 2024
1 parent 80db670 commit f3f44da
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 52 deletions.
31 changes: 20 additions & 11 deletions pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,21 +153,30 @@ type backfillCtx struct {
metricCounter prometheus.Counter
}

func newBackfillCtx(ctx *ddlCtx, id int, sessCtx sessionctx.Context,
schemaName string, tbl table.Table, jobCtx *JobContext, label string, isDistributed bool) *backfillCtx {
func newBackfillCtx(ctx *ddlCtx, id int, sessCtx sessionctx.Context, rInfo *reorgInfo, schemaName string, tbl table.Table,
jobCtx *JobContext, label string, isDistributed bool) *backfillCtx {
if isDistributed {
id = int(backfillContextID.Add(1))
}
colOrIdxName := ""
switch rInfo.Job.Type {
case model.ActionAddIndex, model.ActionAddPrimaryKey:
colOrIdxName = getIndexNamesFromJobArgs(rInfo)
case model.ActionModifyColumn:
oldCol, _ := getOldAndNewColumnsForUpdateColumn(tbl, rInfo.currElement.ID)
if oldCol != nil {
colOrIdxName = oldCol.Name.String()
}
}
return &backfillCtx{
id: id,
ddlCtx: ctx,
sessCtx: sessCtx,
schemaName: schemaName,
table: tbl,
batchCnt: int(variable.GetDDLReorgBatchSize()),
jobContext: jobCtx,
metricCounter: metrics.BackfillTotalCounter.WithLabelValues(
metrics.GenerateReorgLabel(label, schemaName, tbl.Meta().Name.String())),
id: id,
ddlCtx: ctx,
sessCtx: sessCtx,
schemaName: schemaName,
table: tbl,
batchCnt: int(variable.GetDDLReorgBatchSize()),
jobContext: jobCtx,
metricCounter: metrics.GetBackfillTotalByLabel(label, schemaName, tbl.Meta().Name.String(), colOrIdxName),
}
}

Expand Down
21 changes: 16 additions & 5 deletions pkg/ddl/backfilling_read_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/hex"
"encoding/json"
"strings"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -227,17 +228,21 @@ func (r *readIndexExecutor) buildLocalStorePipeline(
}
d := r.d
engines := make([]ingest.Engine, 0, len(r.indexes))
for _, index := range r.indexes {
var idxNames strings.Builder
for i, index := range r.indexes {
ei, err := r.bc.Register(r.job.ID, index.ID, r.job.SchemaName, r.job.TableName)
if err != nil {
tidblogutil.Logger(opCtx).Warn("cannot register new engine", zap.Error(err),
zap.Int64("job ID", r.job.ID), zap.Int64("index ID", index.ID))
return nil, err
}
engines = append(engines, ei)
if i > 0 {
idxNames.WriteByte('+')
}
idxNames.WriteString(index.Name.O)
}
counter := metrics.BackfillTotalCounter.WithLabelValues(
metrics.GenerateReorgLabel("add_idx_rate", r.job.SchemaName, tbl.Meta().Name.O))
counter := metrics.GetBackfillTotalByLabel(metrics.LblAddIdxRate, r.job.SchemaName, tbl.Meta().Name.O, idxNames.String())
return NewAddIndexIngestPipeline(
opCtx,
d.store,
Expand Down Expand Up @@ -283,8 +288,14 @@ func (r *readIndexExecutor) buildExternalStorePipeline(
kvMeta.MergeSummary(summary)
s.mu.Unlock()
}
counter := metrics.BackfillTotalCounter.WithLabelValues(
metrics.GenerateReorgLabel("add_idx_rate", r.job.SchemaName, tbl.Meta().Name.O))
var idxNames strings.Builder
for _, idx := range r.indexes {
if idxNames.Len() > 0 {
idxNames.WriteByte('+')
}
idxNames.WriteString(idx.Name.O)
}
counter := metrics.GetBackfillTotalByLabel(metrics.LblAddIdxRate, r.job.SchemaName, tbl.Meta().Name.O, idxNames.String())
return NewWriteIndexToExternalStoragePipeline(
opCtx,
d.store,
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/backfilling_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (b *txnBackfillScheduler) adjustWorkerSize() error {
)
switch b.tp {
case typeAddIndexWorker:
backfillCtx := newBackfillCtx(reorgInfo.d, i, sessCtx, job.SchemaName, b.tbl, jc, "add_idx_rate", false)
backfillCtx := newBackfillCtx(reorgInfo.d, i, sessCtx, reorgInfo, job.SchemaName, b.tbl, jc, metrics.LblAddIdxRate, false)
idxWorker, err := newAddIndexTxnWorker(b.decodeColMap, b.tbl, backfillCtx,
job.ID, reorgInfo.elements, reorgInfo.currElement.TypeKey)
if err != nil {
Expand All @@ -252,7 +252,7 @@ func (b *txnBackfillScheduler) adjustWorkerSize() error {
runner = newBackfillWorker(jc.ddlJobCtx, idxWorker)
worker = idxWorker
case typeAddIndexMergeTmpWorker:
backfillCtx := newBackfillCtx(reorgInfo.d, i, sessCtx, job.SchemaName, b.tbl, jc, "merge_tmp_idx_rate", false)
backfillCtx := newBackfillCtx(reorgInfo.d, i, sessCtx, reorgInfo, job.SchemaName, b.tbl, jc, metrics.LblMergeTmpIdxRate, false)
tmpIdxWorker := newMergeTempIndexWorker(backfillCtx, b.tbl, reorgInfo.elements)
runner = newBackfillWorker(jc.ddlJobCtx, tmpIdxWorker)
worker = tmpIdxWorker
Expand Down
27 changes: 17 additions & 10 deletions pkg/ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ func (w *worker) doModifyColumnTypeWithData(
// Make sure job args change after `updateVersionAndTableInfoWithCheck`, otherwise, the job args will
// be updated in `updateDDLJob` even if it meets an error in `updateVersionAndTableInfoWithCheck`.
job.SchemaState = model.StateDeleteOnly
metrics.GetBackfillProgressByLabel(metrics.LblModifyColumn, job.SchemaName, tblInfo.Name.String()).Set(0)
metrics.GetBackfillProgressByLabel(metrics.LblModifyColumn, job.SchemaName, tblInfo.Name.String(), oldCol.Name.O).Set(0)
job.Args = append(job.Args, changingCol, changingIdxs, rmIdxIDs)
case model.StateDeleteOnly:
// Column from null to not null.
Expand Down Expand Up @@ -1211,20 +1211,27 @@ type updateColumnWorker struct {
checksumNeeded bool
}

func getOldAndNewColumnsForUpdateColumn(t table.Table, currElementID int64) (oldCol, newCol *model.ColumnInfo) {
for _, col := range t.WritableCols() {
if col.ID == currElementID {
changeColumnOrigName := table.FindCol(t.Cols(), getChangingColumnOriginName(col.ColumnInfo))
if changeColumnOrigName != nil {
newCol = col.ColumnInfo
oldCol = changeColumnOrigName.ColumnInfo
return
}
}
}
return
}

func newUpdateColumnWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *JobContext) *updateColumnWorker {
if !bytes.Equal(reorgInfo.currElement.TypeKey, meta.ColumnElementKey) {
logutil.DDLLogger().Error("Element type for updateColumnWorker incorrect", zap.String("jobQuery", reorgInfo.Query),
zap.Stringer("reorgInfo", reorgInfo))
return nil
}
var oldCol, newCol *model.ColumnInfo
for _, col := range t.WritableCols() {
if col.ID == reorgInfo.currElement.ID {
newCol = col.ColumnInfo
oldCol = table.FindCol(t.Cols(), getChangingColumnOriginName(newCol)).ColumnInfo
break
}
}
oldCol, newCol := getOldAndNewColumnsForUpdateColumn(t, reorgInfo.currElement.ID)
rowDecoder := decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap)
checksumNeeded := false
failpoint.Inject("forceRowLevelChecksumOnUpdateColumnBackfill", func() {
Expand All @@ -1247,7 +1254,7 @@ func newUpdateColumnWorker(sessCtx sessionctx.Context, id int, t table.PhysicalT
}
}
return &updateColumnWorker{
backfillCtx: newBackfillCtx(reorgInfo.d, id, sessCtx, reorgInfo.SchemaName, t, jc, "update_col_rate", false),
backfillCtx: newBackfillCtx(reorgInfo.d, id, sessCtx, reorgInfo, reorgInfo.SchemaName, t, jc, metrics.LblUpdateColRate, false),
oldColInfo: oldCol,
newColInfo: newCol,
rowDecoder: rowDecoder,
Expand Down
16 changes: 10 additions & 6 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1678,6 +1678,7 @@ func newAddIndexIngestWorker(
checkpointMgr *ingest.CheckpointManager,
) (*addIndexIngestWorker, error) {
indexes := make([]table.Index, 0, len(indexIDs))
var indexNames strings.Builder
writers := make([]ingest.Writer, 0, len(indexIDs))
for i, indexID := range indexIDs {
indexInfo := model.FindIndexInfoByID(t.Meta().Indices, indexID)
Expand All @@ -1688,14 +1689,17 @@ func newAddIndexIngestWorker(
}
indexes = append(indexes, index)
writers = append(writers, lw)
if i > 0 {
indexNames.WriteString("+")
}
indexNames.WriteString(index.Meta().Name.O)
}

return &addIndexIngestWorker{
ctx: ctx,
d: d,
sessCtx: sessCtx,
metricCounter: metrics.BackfillTotalCounter.WithLabelValues(
metrics.GenerateReorgLabel("add_idx_rate", schemaName, t.Meta().Name.O)),
ctx: ctx,
d: d,
sessCtx: sessCtx,
metricCounter: metrics.GetBackfillTotalByLabel(metrics.LblAddIdxRate, schemaName, t.Meta().Name.O, indexNames.String()),
tbl: t,
indexes: indexes,
writers: writers,
Expand Down Expand Up @@ -2427,7 +2431,7 @@ func newCleanUpIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalT
}
return &cleanUpIndexWorker{
baseIndexWorker: baseIndexWorker{
backfillCtx: newBackfillCtx(reorgInfo.d, id, sessCtx, reorgInfo.SchemaName, t, jc, "cleanup_idx_rate", false),
backfillCtx: newBackfillCtx(reorgInfo.d, id, sessCtx, reorgInfo, reorgInfo.SchemaName, t, jc, metrics.LblCleanupIdxRate, false),
indexes: indexes,
rowDecoder: rowDecoder,
defaultVals: make([]types.Datum, len(t.WritableCols())),
Expand Down
8 changes: 4 additions & 4 deletions pkg/ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -2927,7 +2927,7 @@ func (w *worker) onReorganizePartition(d *ddlCtx, t *meta.Meta, job *model.Job)
}

// Assume we cannot have more than MaxUint64 rows, set the progress to 1/10 of that.
metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, job.SchemaName, tblInfo.Name.String()).Set(0.1 / float64(math.MaxUint64))
metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, job.SchemaName, tblInfo.Name.String(), "").Set(0.1 / float64(math.MaxUint64))
job.SchemaState = model.StateDeleteOnly
tblInfo.Partition.DDLState = model.StateDeleteOnly
ver, err = updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, true)
Expand Down Expand Up @@ -2982,15 +2982,15 @@ func (w *worker) onReorganizePartition(d *ddlCtx, t *meta.Meta, job *model.Job)
}

tblInfo.Partition.DDLState = model.StateWriteOnly
metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, job.SchemaName, tblInfo.Name.String()).Set(0.2 / float64(math.MaxUint64))
metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, job.SchemaName, tblInfo.Name.String(), "").Set(0.2 / float64(math.MaxUint64))
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, true)
job.SchemaState = model.StateWriteOnly
case model.StateWriteOnly:
// Insert this state to confirm all servers can see the new partitions when reorg is running,
// so that new data will be updated in both old and new partitions when reorganizing.
job.SnapshotVer = 0
tblInfo.Partition.DDLState = model.StateWriteReorganization
metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, job.SchemaName, tblInfo.Name.String()).Set(0.3 / float64(math.MaxUint64))
metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, job.SchemaName, tblInfo.Name.String(), "").Set(0.3 / float64(math.MaxUint64))
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, true)
job.SchemaState = model.StateWriteReorganization
case model.StateWriteReorganization:
Expand Down Expand Up @@ -3262,7 +3262,7 @@ func newReorgPartitionWorker(sessCtx sessionctx.Context, i int, t table.Physical
maxOffset = mathutil.Max[int](maxOffset, offset)
}
return &reorgPartitionWorker{
backfillCtx: newBackfillCtx(reorgInfo.d, i, sessCtx, reorgInfo.SchemaName, t, jc, "reorg_partition_rate", false),
backfillCtx: newBackfillCtx(reorgInfo.d, i, sessCtx, reorgInfo, reorgInfo.SchemaName, t, jc, metrics.LblReorgPartitionRate, false),
rowDecoder: decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap),
rowMap: make(map[int64]types.Datum, len(decodeColMap)),
writeColOffsetMap: writeColOffsetMap,
Expand Down
46 changes: 43 additions & 3 deletions pkg/ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
Expand Down Expand Up @@ -383,13 +384,52 @@ func updateBackfillProgress(w *worker, reorgInfo *reorgInfo, tblInfo *model.Tabl
} else {
label = metrics.LblAddIndex
}
metrics.GetBackfillProgressByLabel(label, reorgInfo.SchemaName, tblInfo.Name.String()).Set(progress * 100)
metrics.GetBackfillProgressByLabel(label, reorgInfo.SchemaName, tblInfo.Name.String(), getIndexNamesFromJobArgs(reorgInfo)).Set(progress * 100)
case model.ActionModifyColumn:
metrics.GetBackfillProgressByLabel(metrics.LblModifyColumn, reorgInfo.SchemaName, tblInfo.Name.String()).Set(progress * 100)
colName := ""
modifyInfo := &modifyingColInfo{pos: &ast.ColumnPosition{}}
err := reorgInfo.Job.DecodeArgs(&modifyInfo.newCol, &modifyInfo.oldColName, modifyInfo.pos, &modifyInfo.modifyColumnTp,
&modifyInfo.updatedAutoRandomBits, &modifyInfo.changingCol, &modifyInfo.changingIdxs, &modifyInfo.removedIdxs)
if err == nil {
colName = modifyInfo.oldColName.O
}
metrics.GetBackfillProgressByLabel(metrics.LblModifyColumn, reorgInfo.SchemaName, tblInfo.Name.String(), colName).Set(progress * 100)
case model.ActionReorganizePartition, model.ActionRemovePartitioning,
model.ActionAlterTablePartitioning:
metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, reorgInfo.SchemaName, tblInfo.Name.String()).Set(progress * 100)
metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, reorgInfo.SchemaName, tblInfo.Name.String(), "").Set(progress * 100)
}
}

func getIndexNamesFromJobArgs(reorgInfo *reorgInfo) string {
var err error
var idxNames strings.Builder
unique := make([]bool, 1)
global := make([]bool, 1)
indexNames := make([]model.CIStr, 1)
indexPartSpecifications := make([][]*ast.IndexPartSpecification, 1)
indexOption := make([]*ast.IndexOption, 1)
var sqlMode mysql.SQLMode
var warnings []string
hiddenCols := make([][]*model.ColumnInfo, 1)

if reorgInfo.Type == model.ActionAddPrimaryKey {
// Notice: sqlMode and warnings is used to support non-strict mode.
err = reorgInfo.Job.DecodeArgs(&unique[0], &indexNames[0], &indexPartSpecifications[0], &indexOption[0], &sqlMode, &warnings, &global[0])
} else if reorgInfo.Type == model.ActionAddIndex {
err = reorgInfo.Job.DecodeArgs(&unique[0], &indexNames[0], &indexPartSpecifications[0], &indexOption[0], &hiddenCols[0], &global[0])
if err != nil {
err = reorgInfo.Job.DecodeArgs(&unique, &indexNames, &indexPartSpecifications, &indexOption, &hiddenCols, &global)
}
}
if err == nil {
for i, idxName := range indexNames {
if i > 0 {
idxNames.WriteString("+")
}
idxNames.WriteString(idxName.O)
}
}
return idxNames.String()
}

func getTableTotalCount(w *worker, tblInfo *model.TableInfo) int64 {
Expand Down
43 changes: 32 additions & 11 deletions pkg/metrics/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,26 +180,47 @@ func InitDDLMetrics() {
const (
LblAction = "action"

LblAddIndex = "add_index"
LblAddIndexMerge = "add_index_merge_tmp"
LblModifyColumn = "modify_column"

// Used by BackfillProgressGauge
LblAddIndex = "add_index"
LblAddIndexMerge = "add_index_merge_tmp"
LblModifyColumn = "modify_column"
LblReorgPartition = "reorganize_partition"

// Used by BackfillTotalCounter
LblAddIdxRate = "add_idx_rate"
LblMergeTmpIdxRate = "merge_tmp_idx_rate"
LblCleanupIdxRate = "cleanup_idx_rate"
LblUpdateColRate = "update_col_rate"
LblReorgPartitionRate = "reorg_partition_rate"
)

// GenerateReorgLabel returns the label with schema name and table name.
func GenerateReorgLabel(label string, schemaName string, tableName string) string {
// generateReorgLabel returns the label with schema name, table name and optional column/index names.
// Multiple columns/indexes can be concatenated with "+".
func generateReorgLabel(label, schemaName, tableName, colOrIdxNames string) string {
var stringBuilder strings.Builder
stringBuilder.Grow(len(label) + len(schemaName) + len(tableName) + 2)
if len(colOrIdxNames) == 0 {
stringBuilder.Grow(len(label) + len(schemaName) + len(tableName) + 2)
} else {
stringBuilder.Grow(len(label) + len(schemaName) + len(tableName) + len(colOrIdxNames) + 3)
}
stringBuilder.WriteString(label)
stringBuilder.WriteString("_")
stringBuilder.WriteString("-")
stringBuilder.WriteString(schemaName)
stringBuilder.WriteString("_")
stringBuilder.WriteString("-")
stringBuilder.WriteString(tableName)
if len(colOrIdxNames) > 0 {
stringBuilder.WriteString("-")
stringBuilder.WriteString(colOrIdxNames)
}
return stringBuilder.String()
}

// GetBackfillTotalByLabel returns the Counter showing the speed of backfilling for the given type label.
func GetBackfillTotalByLabel(label, schemaName, tableName, optionalColOrIdxName string) prometheus.Counter {
return BackfillTotalCounter.WithLabelValues(generateReorgLabel(label, schemaName, tableName, optionalColOrIdxName))
}

// GetBackfillProgressByLabel returns the Gauge showing the percentage progress for the given type label.
func GetBackfillProgressByLabel(label string, schemaName string, tableName string) prometheus.Gauge {
return BackfillProgressGauge.WithLabelValues(GenerateReorgLabel(label, schemaName, tableName))
func GetBackfillProgressByLabel(label, schemaName, tableName, optionalColOrIdxName string) prometheus.Gauge {
return BackfillProgressGauge.WithLabelValues(generateReorgLabel(label, schemaName, tableName, optionalColOrIdxName))
}

0 comments on commit f3f44da

Please sign in to comment.