Skip to content

Commit

Permalink
dump goroutine profile in receive
Browse files Browse the repository at this point in the history
  • Loading branch information
yuchen-db committed Dec 18, 2024
1 parent 95cb360 commit 1b1fbc8
Showing 1 changed file with 77 additions and 0 deletions.
77 changes: 77 additions & 0 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import (
"net/http"
"os"
"path"
"path/filepath"
"runtime"
"runtime/pprof"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -546,6 +549,75 @@ func runReceive(
})
}

if receiveMode == receive.IngestorOnly {
level.Debug(logger).Log("msg", "setting up periodic profiling")
ctx, cancel := context.WithCancel(context.Background())

checkProfileLimit := func(profileDir string, maxFiles int) (bool, error) {
files, err := os.ReadDir(profileDir)
if err != nil {
if os.IsNotExist(err) {
return false, nil // Directory doesn't exist yet, safe to proceed
}
return false, err // Real error
}
count := 0
for _, file := range files {
if !file.IsDir() && filepath.Ext(file.Name()) == ".prof" {
count++
}
}
return count >= maxFiles, nil
}

dumpGoroutineProfile := func() error {
numGoroutines := runtime.NumGoroutine()
if numGoroutines < conf.goroutineProfileThreshold {
return nil
}

profileDir := filepath.Join(conf.dataDir, "profiles")
limitReached, err := checkProfileLimit(profileDir, 10)
if err != nil {
level.Error(logger).Log("msg", "failed to check profile directory", "err", err)
return err
}
if limitReached {
level.Warn(logger).Log("msg", "profile limit reached, skipping profiling")
return nil
}

if err := os.MkdirAll(profileDir, os.ModePerm); err != nil {
level.Error(logger).Log("msg", "failed to create profiles directory", "err", err)
return err
}

timestamp := time.Now().UTC().Format("20060102_150405")
fileName := filepath.Join(profileDir, fmt.Sprintf("goroutine_UTC%s.prof", timestamp))

file, err := os.Create(fileName)
if err != nil {
level.Error(logger).Log("msg", "failed to create profile file", "err", err)
return err
}
defer file.Close()

if err := pprof.Lookup("goroutine").WriteTo(file, 1); err != nil {
level.Error(logger).Log("msg", "failed to write goroutine profile", "err", err)
return err
}

level.Info(logger).Log("msg", "Goroutine profile dumped", "num_goroutines", numGoroutines)
return nil
}

g.Add(func() error {
return runutil.Repeat(conf.profilingInterval, ctx.Done(), dumpGoroutineProfile)
}, func(err error) {
cancel()
})
}

{
if limiter.CanReload() {
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -988,6 +1060,8 @@ type receiveConfig struct {
topMetricsUpdateInterval time.Duration
matcherConverterCacheCapacity int
maxPendingGrpcWriteRequests int
profilingInterval time.Duration
goroutineProfileThreshold int

featureList *[]string
}
Expand Down Expand Up @@ -1154,6 +1228,9 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
Default("0").IntVar(&rc.matcherConverterCacheCapacity)
cmd.Flag("receive.max-pending-grcp-write-requests", "Reject right away gRPC write requests when this number of requests are pending. Value 0 disables this feature.").
Default("0").IntVar(&rc.maxPendingGrpcWriteRequests)
cmd.Flag("receive.profiling-interval", "The interval at which profiling data is collected.").
Default("1m").DurationVar(&rc.profilingInterval)
cmd.Flag("receive.goroutine-profile-threshold", "Dump a profile if more goroutine than this threshold").Default("100000").IntVar(&rc.goroutineProfileThreshold)
rc.featureList = cmd.Flag("enable-feature", "Comma separated experimental feature names to enable. The current list of features is "+metricNamesFilter+".").Default("").Strings()
}

Expand Down

0 comments on commit 1b1fbc8

Please sign in to comment.