Skip to content

Commit

Permalink
Merge branch 'dev' of https://github.com/JackL9u/pd into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackl9u committed Aug 5, 2024
2 parents 66c0b74 + e05a2bf commit 3d48ee0
Show file tree
Hide file tree
Showing 29 changed files with 253 additions and 108 deletions.
3 changes: 3 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -214,3 +214,6 @@ issues:
- path: (pd-analysis|pd-api-bench|pd-backup|pd-ctl|pd-heartbeat-bench|pd-recover|pd-simulator|pd-tso-bench|pd-ut|regions-dump|stores-dump)
linters:
- errcheck
include:
# remove the comment after the path is ready
# - EXC0012
77 changes: 66 additions & 11 deletions pkg/core/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,19 +108,33 @@ type saveCacheStats struct {

// RegionHeartbeatProcessTracer is used to trace the process of handling region heartbeat.
type RegionHeartbeatProcessTracer interface {
// Begin starts the tracing.
Begin()
// OnPreCheckFinished will be called when the pre-check is finished.
OnPreCheckFinished()
// OnAsyncHotStatsFinished will be called when the async hot stats is finished.
OnAsyncHotStatsFinished()
// OnRegionGuideFinished will be called when the region guide is finished.
OnRegionGuideFinished()
// OnSaveCacheBegin will be called when the save cache begins.
OnSaveCacheBegin()
// OnSaveCacheFinished will be called when the save cache is finished.
OnSaveCacheFinished()
// OnCheckOverlapsFinished will be called when the check overlaps is finished.
OnCheckOverlapsFinished()
// OnValidateRegionFinished will be called when the validate region is finished.
OnValidateRegionFinished()
// OnSetRegionFinished will be called when the set region is finished.
OnSetRegionFinished()
// OnUpdateSubTreeFinished will be called when the update sub tree is finished.
OnUpdateSubTreeFinished()
// OnCollectRegionStatsFinished will be called when the collect region stats is finished.
OnCollectRegionStatsFinished()
// OnAllStageFinished will be called when all stages are finished.
OnAllStageFinished()
// LogFields returns the log fields.
LogFields() []zap.Field
// Release releases the tracer.
Release()
}

Expand All @@ -131,21 +145,48 @@ func NewNoopHeartbeatProcessTracer() RegionHeartbeatProcessTracer {
return &noopHeartbeatProcessTracer{}
}

func (*noopHeartbeatProcessTracer) Begin() {}
func (*noopHeartbeatProcessTracer) OnPreCheckFinished() {}
func (*noopHeartbeatProcessTracer) OnAsyncHotStatsFinished() {}
func (*noopHeartbeatProcessTracer) OnRegionGuideFinished() {}
func (*noopHeartbeatProcessTracer) OnSaveCacheBegin() {}
func (*noopHeartbeatProcessTracer) OnSaveCacheFinished() {}
func (*noopHeartbeatProcessTracer) OnCheckOverlapsFinished() {}
func (*noopHeartbeatProcessTracer) OnValidateRegionFinished() {}
func (*noopHeartbeatProcessTracer) OnSetRegionFinished() {}
func (*noopHeartbeatProcessTracer) OnUpdateSubTreeFinished() {}
// Begin implements the RegionHeartbeatProcessTracer interface.
func (*noopHeartbeatProcessTracer) Begin() {}

// OnPreCheckFinished implements the RegionHeartbeatProcessTracer interface.
func (*noopHeartbeatProcessTracer) OnPreCheckFinished() {}

// OnAsyncHotStatsFinished implements the RegionHeartbeatProcessTracer interface.
func (*noopHeartbeatProcessTracer) OnAsyncHotStatsFinished() {}

// OnRegionGuideFinished implements the RegionHeartbeatProcessTracer interface.
func (*noopHeartbeatProcessTracer) OnRegionGuideFinished() {}

// OnSaveCacheBegin implements the RegionHeartbeatProcessTracer interface.
func (*noopHeartbeatProcessTracer) OnSaveCacheBegin() {}

// OnSaveCacheFinished implements the RegionHeartbeatProcessTracer interface.
func (*noopHeartbeatProcessTracer) OnSaveCacheFinished() {}

// OnCheckOverlapsFinished implements the RegionHeartbeatProcessTracer interface.
func (*noopHeartbeatProcessTracer) OnCheckOverlapsFinished() {}

// OnValidateRegionFinished implements the RegionHeartbeatProcessTracer interface.
func (*noopHeartbeatProcessTracer) OnValidateRegionFinished() {}

// OnSetRegionFinished implements the RegionHeartbeatProcessTracer interface.
func (*noopHeartbeatProcessTracer) OnSetRegionFinished() {}

// OnUpdateSubTreeFinished implements the RegionHeartbeatProcessTracer interface.
func (*noopHeartbeatProcessTracer) OnUpdateSubTreeFinished() {}

// OnCollectRegionStatsFinished implements the RegionHeartbeatProcessTracer interface.
func (*noopHeartbeatProcessTracer) OnCollectRegionStatsFinished() {}
func (*noopHeartbeatProcessTracer) OnAllStageFinished() {}

// OnAllStageFinished implements the RegionHeartbeatProcessTracer interface.
func (*noopHeartbeatProcessTracer) OnAllStageFinished() {}

// LogFields implements the RegionHeartbeatProcessTracer interface.
func (*noopHeartbeatProcessTracer) LogFields() []zap.Field {
return nil
}

// Release implements the RegionHeartbeatProcessTracer interface.
func (*noopHeartbeatProcessTracer) Release() {}

type regionHeartbeatProcessTracer struct {
Expand All @@ -163,12 +204,14 @@ func NewHeartbeatProcessTracer() RegionHeartbeatProcessTracer {
return tracerPool.Get().(*regionHeartbeatProcessTracer)
}

// Begin implements the RegionHeartbeatProcessTracer interface.
func (h *regionHeartbeatProcessTracer) Begin() {
now := time.Now()
h.startTime = now
h.lastCheckTime = now
}

// OnPreCheckFinished implements the RegionHeartbeatProcessTracer interface.
func (h *regionHeartbeatProcessTracer) OnPreCheckFinished() {
now := time.Now()
h.preCheckDuration = now.Sub(h.lastCheckTime)
Expand All @@ -177,6 +220,7 @@ func (h *regionHeartbeatProcessTracer) OnPreCheckFinished() {
preCheckCount.Inc()
}

// OnAsyncHotStatsFinished implements the RegionHeartbeatProcessTracer interface.
func (h *regionHeartbeatProcessTracer) OnAsyncHotStatsFinished() {
now := time.Now()
h.asyncHotStatsDuration = now.Sub(h.lastCheckTime)
Expand All @@ -185,6 +229,7 @@ func (h *regionHeartbeatProcessTracer) OnAsyncHotStatsFinished() {
asyncHotStatsCount.Inc()
}

// OnRegionGuideFinished implements the RegionHeartbeatProcessTracer interface.
func (h *regionHeartbeatProcessTracer) OnRegionGuideFinished() {
now := time.Now()
h.regionGuideDuration = now.Sub(h.lastCheckTime)
Expand All @@ -193,25 +238,29 @@ func (h *regionHeartbeatProcessTracer) OnRegionGuideFinished() {
regionGuideCount.Inc()
}

// OnSaveCacheBegin implements the RegionHeartbeatProcessTracer interface.
func (h *regionHeartbeatProcessTracer) OnSaveCacheBegin() {
now := time.Now()
h.saveCacheStats.startTime = now
h.saveCacheStats.lastCheckTime = now
h.lastCheckTime = now
}

// OnSaveCacheFinished implements the RegionHeartbeatProcessTracer interface.
func (h *regionHeartbeatProcessTracer) OnSaveCacheFinished() {
// update the outer checkpoint time
h.lastCheckTime = time.Now()
}

// OnCollectRegionStatsFinished implements the RegionHeartbeatProcessTracer interface.
func (h *regionHeartbeatProcessTracer) OnCollectRegionStatsFinished() {
now := time.Now()
regionCollectDurationSum.Add(now.Sub(h.lastCheckTime).Seconds())
regionCollectCount.Inc()
h.lastCheckTime = now
}

// OnCheckOverlapsFinished implements the RegionHeartbeatProcessTracer interface.
func (h *regionHeartbeatProcessTracer) OnCheckOverlapsFinished() {
now := time.Now()
h.saveCacheStats.checkOverlapsDuration = now.Sub(h.lastCheckTime)
Expand All @@ -220,6 +269,7 @@ func (h *regionHeartbeatProcessTracer) OnCheckOverlapsFinished() {
checkOverlapsCount.Inc()
}

// OnValidateRegionFinished implements the RegionHeartbeatProcessTracer interface.
func (h *regionHeartbeatProcessTracer) OnValidateRegionFinished() {
now := time.Now()
h.saveCacheStats.validateRegionDuration = now.Sub(h.saveCacheStats.lastCheckTime)
Expand All @@ -228,6 +278,7 @@ func (h *regionHeartbeatProcessTracer) OnValidateRegionFinished() {
validateRegionCount.Inc()
}

// OnSetRegionFinished implements the RegionHeartbeatProcessTracer interface.
func (h *regionHeartbeatProcessTracer) OnSetRegionFinished() {
now := time.Now()
h.saveCacheStats.setRegionDuration = now.Sub(h.saveCacheStats.lastCheckTime)
Expand All @@ -236,6 +287,7 @@ func (h *regionHeartbeatProcessTracer) OnSetRegionFinished() {
setRegionCount.Inc()
}

// OnUpdateSubTreeFinished implements the RegionHeartbeatProcessTracer interface.
func (h *regionHeartbeatProcessTracer) OnUpdateSubTreeFinished() {
now := time.Now()
h.saveCacheStats.updateSubTreeDuration = now.Sub(h.saveCacheStats.lastCheckTime)
Expand All @@ -244,13 +296,15 @@ func (h *regionHeartbeatProcessTracer) OnUpdateSubTreeFinished() {
updateSubTreeCount.Inc()
}

// OnAllStageFinished implements the RegionHeartbeatProcessTracer interface.
func (h *regionHeartbeatProcessTracer) OnAllStageFinished() {
now := time.Now()
h.OtherDuration = now.Sub(h.lastCheckTime)
otherDurationSum.Add(h.OtherDuration.Seconds())
otherCount.Inc()
}

// LogFields implements the RegionHeartbeatProcessTracer interface.
func (h *regionHeartbeatProcessTracer) LogFields() []zap.Field {
return []zap.Field{
zap.Duration("pre-check-duration", h.preCheckDuration),
Expand All @@ -264,6 +318,7 @@ func (h *regionHeartbeatProcessTracer) LogFields() []zap.Field {
}
}

// Release implements the RegionHeartbeatProcessTracer interface.
// Release puts the tracer back into the pool.
func (h *regionHeartbeatProcessTracer) Release() {
// Reset the fields of h to their zero values.
Expand Down
3 changes: 3 additions & 0 deletions pkg/core/region_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,13 +437,16 @@ func (t *regionTree) RandomRegions(n int, ranges []KeyRange) []*RegionInfo {
return regions
}

// TotalSize returns the total size of all regions.
func (t *regionTree) TotalSize() int64 {
if t.length() == 0 {
return 0
}
return t.totalSize
}

// TotalWriteRate returns the total write bytes rate and the total write keys
// rate of all regions.
func (t *regionTree) TotalWriteRate() (bytesRate, keysRate float64) {
if t.length() == 0 {
return 0, 0
Expand Down
1 change: 1 addition & 0 deletions pkg/core/storelimit/store_limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ func (l *limit) Take(count int64) bool {
return l.limiter.AllowN(int(count))
}

// GetRatePerSec returns the rate per second.
func (l *limit) GetRatePerSec() float64 {
l.ratePerSecMutex.RLock()
defer l.ratePerSecMutex.RUnlock()
Expand Down
1 change: 1 addition & 0 deletions pkg/id/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func (alloc *allocatorImpl) Alloc() (uint64, error) {
return alloc.base, nil
}

// SetBase sets the base.
func (alloc *allocatorImpl) SetBase(newBase uint64) error {
alloc.mu.Lock()
defer alloc.mu.Unlock()
Expand Down
20 changes: 10 additions & 10 deletions pkg/ratelimit/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,35 +24,35 @@ const (
)

var (
RunnerTaskMaxWaitingDuration = prometheus.NewGaugeVec(
runnerTaskMaxWaitingDuration = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Subsystem: "ratelimit",
Name: "runner_task_max_waiting_duration_seconds",
Help: "The duration of tasks waiting in the runner.",
}, []string{nameStr})
RunnerPendingTasks = prometheus.NewGaugeVec(
runnerPendingTasks = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Subsystem: "ratelimit",
Name: "runner_pending_tasks",
Help: "The number of pending tasks in the runner.",
}, []string{nameStr, taskStr})
RunnerFailedTasks = prometheus.NewCounterVec(
runnerFailedTasks = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "ratelimit",
Name: "runner_failed_tasks_total",
Help: "The number of failed tasks in the runner.",
}, []string{nameStr, taskStr})
RunnerSucceededTasks = prometheus.NewCounterVec(
runnerSucceededTasks = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "ratelimit",
Name: "runner_success_tasks_total",
Help: "The number of tasks in the runner.",
}, []string{nameStr, taskStr})
RunnerTaskExecutionDuration = prometheus.NewHistogramVec(
runnerTaskExecutionDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "pd",
Subsystem: "ratelimit",
Expand All @@ -63,9 +63,9 @@ var (
)

func init() {
prometheus.MustRegister(RunnerTaskMaxWaitingDuration)
prometheus.MustRegister(RunnerPendingTasks)
prometheus.MustRegister(RunnerFailedTasks)
prometheus.MustRegister(RunnerTaskExecutionDuration)
prometheus.MustRegister(RunnerSucceededTasks)
prometheus.MustRegister(runnerTaskMaxWaitingDuration)
prometheus.MustRegister(runnerPendingTasks)
prometheus.MustRegister(runnerFailedTasks)
prometheus.MustRegister(runnerTaskExecutionDuration)
prometheus.MustRegister(runnerSucceededTasks)
}
12 changes: 6 additions & 6 deletions pkg/ratelimit/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func NewConcurrentRunner(name string, limiter *ConcurrencyLimiter, maxPendingDur
pendingTasks: make([]*Task, 0, initialCapacity),
pendingTaskCount: make(map[string]int),
existTasks: make(map[taskID]*Task),
maxWaitingDuration: RunnerTaskMaxWaitingDuration.WithLabelValues(name),
maxWaitingDuration: runnerTaskMaxWaitingDuration.WithLabelValues(name),
}
return s
}
Expand Down Expand Up @@ -136,7 +136,7 @@ func (cr *ConcurrentRunner) Start(ctx context.Context) {
maxDuration = time.Since(cr.pendingTasks[0].submittedAt)
}
for taskName, cnt := range cr.pendingTaskCount {
RunnerPendingTasks.WithLabelValues(cr.name, taskName).Set(float64(cnt))
runnerPendingTasks.WithLabelValues(cr.name, taskName).Set(float64(cnt))
}
cr.pendingMu.Unlock()
cr.maxWaitingDuration.Set(maxDuration.Seconds())
Expand All @@ -157,8 +157,8 @@ func (cr *ConcurrentRunner) run(ctx context.Context, task *Task, token *TaskToke
cr.limiter.ReleaseToken(token)
cr.processPendingTasks()
}
RunnerTaskExecutionDuration.WithLabelValues(cr.name, task.name).Observe(time.Since(start).Seconds())
RunnerSucceededTasks.WithLabelValues(cr.name, task.name).Inc()
runnerTaskExecutionDuration.WithLabelValues(cr.name, task.name).Observe(time.Since(start).Seconds())
runnerSucceededTasks.WithLabelValues(cr.name, task.name).Inc()
}

func (cr *ConcurrentRunner) processPendingTasks() {
Expand Down Expand Up @@ -214,12 +214,12 @@ func (cr *ConcurrentRunner) RunTask(id uint64, name string, f func(context.Conte
if !task.retained {
maxWait := time.Since(cr.pendingTasks[0].submittedAt)
if maxWait > cr.maxPendingDuration {
RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc()
runnerFailedTasks.WithLabelValues(cr.name, task.name).Inc()
return ErrMaxWaitingTasksExceeded
}
}
if pendingTaskNum > maxPendingTaskNum {
RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc()
runnerFailedTasks.WithLabelValues(cr.name, task.name).Inc()
return ErrMaxWaitingTasksExceeded
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/schedule/filter/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func NewCounter(scope string) *Counter {
return &Counter{counter: counter, scope: scope}
}

// SetScope sets the scope for the counter.
func (c *Counter) SetScope(scope string) {
c.scope = scope
}
Expand Down
Loading

0 comments on commit 3d48ee0

Please sign in to comment.