Skip to content

Commit

Permalink
Merge pull request #1471 from fatsheep9146/native-histogram-exemplar
Browse files Browse the repository at this point in the history
add native histogram exemplar support
  • Loading branch information
beorn7 authored May 16, 2024
2 parents cb57abb + 2754a4c commit 542f7e6
Show file tree
Hide file tree
Showing 2 changed files with 335 additions and 4 deletions.
184 changes: 180 additions & 4 deletions prometheus/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ type HistogramOpts struct {
// constant (or any negative float value).
NativeHistogramZeroThreshold float64

// The remaining fields define a strategy to limit the number of
// The next three fields define a strategy to limit the number of
// populated sparse buckets. If NativeHistogramMaxBucketNumber is left
// at zero, the number of buckets is not limited. (Note that this might
// lead to unbounded memory consumption if the values observed by the
Expand Down Expand Up @@ -473,6 +473,22 @@ type HistogramOpts struct {
NativeHistogramMinResetDuration time.Duration
NativeHistogramMaxZeroThreshold float64

// NativeHistogramMaxExemplars limits the number of exemplars
// that are kept in memory for each native histogram. If you leave it at
// zero, a default value of 10 is used. If no exemplars should be kept specifically
// for native histograms, set it to a negative value. (Scrapers can
// still use the exemplars exposed for classic buckets, which are managed
// independently.)
NativeHistogramMaxExemplars int
// NativeHistogramExemplarTTL is only checked once
// NativeHistogramMaxExemplars is exceeded. In that case, the
// oldest exemplar is removed if it is older than NativeHistogramExemplarTTL.
// Otherwise, the older exemplar in the pair of exemplars that are closest
// together (on an exponential scale) is removed.
// If NativeHistogramExemplarTTL is left at its zero value, a default value of
// 5m is used. To always delete the oldest exemplar, set it to a negative value.
NativeHistogramExemplarTTL time.Duration

// now is for testing purposes, by default it's time.Now.
now func() time.Time

Expand Down Expand Up @@ -532,6 +548,7 @@ func newHistogram(desc *Desc, opts HistogramOpts, labelValues ...string) Histogr
if opts.afterFunc == nil {
opts.afterFunc = time.AfterFunc
}

h := &histogram{
desc: desc,
upperBounds: opts.Buckets,
Expand All @@ -556,6 +573,7 @@ func newHistogram(desc *Desc, opts HistogramOpts, labelValues ...string) Histogr
h.nativeHistogramZeroThreshold = DefNativeHistogramZeroThreshold
} // Leave h.nativeHistogramZeroThreshold at 0 otherwise.
h.nativeHistogramSchema = pickSchema(opts.NativeHistogramBucketFactor)
h.nativeExemplars = makeNativeExemplars(opts.NativeHistogramExemplarTTL, opts.NativeHistogramMaxExemplars)
}
for i, upperBound := range h.upperBounds {
if i < len(h.upperBounds)-1 {
Expand Down Expand Up @@ -725,7 +743,8 @@ type histogram struct {
// resetScheduled is protected by mtx. It is true if a reset is
// scheduled for a later time (when nativeHistogramMinResetDuration has
// passed).
resetScheduled bool
resetScheduled bool
nativeExemplars nativeExemplars

// now is for testing purposes, by default it's time.Now.
now func() time.Time
Expand All @@ -742,6 +761,9 @@ func (h *histogram) Observe(v float64) {
h.observe(v, h.findBucket(v))
}

// ObserveWithExemplar should not be called in a high-frequency setting
// for a native histogram with configured exemplars. For this case,
// the implementation isn't lock-free and might suffer from lock contention.
func (h *histogram) ObserveWithExemplar(v float64, e Labels) {
i := h.findBucket(v)
h.observe(v, i)
Expand Down Expand Up @@ -821,6 +843,15 @@ func (h *histogram) Write(out *dto.Metric) error {
Length: proto.Uint32(0),
}}
}

// If exemplars are not configured, the cap will be 0.
// So append is not needed in this case.
if cap(h.nativeExemplars.exemplars) > 0 {
h.nativeExemplars.Lock()
his.Exemplars = append(his.Exemplars, h.nativeExemplars.exemplars...)
h.nativeExemplars.Unlock()
}

}
addAndResetCounts(hotCounts, coldCounts)
return nil
Expand Down Expand Up @@ -1091,8 +1122,10 @@ func (h *histogram) resetCounts(counts *histogramCounts) {
deleteSyncMap(&counts.nativeHistogramBucketsPositive)
}

// updateExemplar replaces the exemplar for the provided bucket. With empty
// labels, it's a no-op. It panics if any of the labels is invalid.
// updateExemplar replaces the exemplar for the provided classic bucket.
// With empty labels, it's a no-op. It panics if any of the labels is invalid.
// If histogram is native, the exemplar will be cached into nativeExemplars,
// which has a limit, and will remove one exemplar when limit is reached.
func (h *histogram) updateExemplar(v float64, bucket int, l Labels) {
if l == nil {
return
Expand All @@ -1102,6 +1135,10 @@ func (h *histogram) updateExemplar(v float64, bucket int, l Labels) {
panic(err)
}
h.exemplars[bucket].Store(e)
doSparse := h.nativeHistogramSchema > math.MinInt32 && !math.IsNaN(v)
if doSparse {
h.nativeExemplars.addExemplar(e)
}
}

// HistogramVec is a Collector that bundles a set of Histograms that all share the
Expand Down Expand Up @@ -1575,3 +1612,142 @@ func addAndResetCounts(hot, cold *histogramCounts) {
atomic.AddUint64(&hot.nativeHistogramZeroBucket, atomic.LoadUint64(&cold.nativeHistogramZeroBucket))
atomic.StoreUint64(&cold.nativeHistogramZeroBucket, 0)
}

type nativeExemplars struct {
sync.Mutex

ttl time.Duration
exemplars []*dto.Exemplar
}

func makeNativeExemplars(ttl time.Duration, maxCount int) nativeExemplars {
if ttl == 0 {
ttl = 5 * time.Minute
}

if maxCount == 0 {
maxCount = 10
}

if maxCount < 0 {
maxCount = 0
}

return nativeExemplars{
ttl: ttl,
exemplars: make([]*dto.Exemplar, 0, maxCount),
}
}

func (n *nativeExemplars) addExemplar(e *dto.Exemplar) {
if cap(n.exemplars) == 0 {
return
}

n.Lock()
defer n.Unlock()

// The index where to insert the new exemplar.
var nIdx int = -1

// When the number of exemplars has not yet exceeded or
// is equal to cap(n.exemplars), then
// insert the new exemplar directly.
if len(n.exemplars) < cap(n.exemplars) {
for nIdx = 0; nIdx < len(n.exemplars); nIdx++ {
if *e.Value < *n.exemplars[nIdx].Value {
break
}
}
n.exemplars = append(n.exemplars[:nIdx], append([]*dto.Exemplar{e}, n.exemplars[nIdx:]...)...)
return
}

// When the number of exemplars exceeds the limit, remove one exemplar.
var (
rIdx int // The index where to remove the old exemplar.

ot = time.Now() // Oldest timestamp seen.
otIdx = -1 // Index of the exemplar with the oldest timestamp.

md = -1.0 // Logarithm of the delta of the closest pair of exemplars.
mdIdx = -1 // Index of the older exemplar within the closest pair.
cLog float64 // Logarithm of the current exemplar.
pLog float64 // Logarithm of the previous exemplar.
)

for i, exemplar := range n.exemplars {
// Find the exemplar with the oldest timestamp.
if otIdx == -1 || exemplar.Timestamp.AsTime().Before(ot) {
ot = exemplar.Timestamp.AsTime()
otIdx = i
}

// Find the index at which to insert new the exemplar.
if *e.Value <= *exemplar.Value && nIdx == -1 {
nIdx = i
}

// Find the two closest exemplars and pick the one the with older timestamp.
pLog = cLog
cLog = math.Log(exemplar.GetValue())
if i == 0 {
continue
}
diff := math.Abs(cLog - pLog)
if md == -1 || diff < md {
md = diff
if n.exemplars[i].Timestamp.AsTime().Before(n.exemplars[i-1].Timestamp.AsTime()) {
mdIdx = i
} else {
mdIdx = i - 1
}
}

}

// If all existing exemplar are smaller than new exemplar,
// then the exemplar should be inserted at the end.
if nIdx == -1 {
nIdx = len(n.exemplars)
}

if otIdx != -1 && e.Timestamp.AsTime().Sub(ot) > n.ttl {
rIdx = otIdx
} else {
// In the previous for loop, when calculating the closest pair of exemplars,
// we did not take into account the newly inserted exemplar.
// So we need to calculate with the newly inserted exemplar again.
elog := math.Log(e.GetValue())
if nIdx > 0 {
diff := math.Abs(elog - math.Log(n.exemplars[nIdx-1].GetValue()))
if diff < md {
md = diff
mdIdx = nIdx
if n.exemplars[nIdx-1].Timestamp.AsTime().Before(e.Timestamp.AsTime()) {
mdIdx = nIdx - 1
}
}
}
if nIdx < len(n.exemplars) {
diff := math.Abs(math.Log(n.exemplars[nIdx].GetValue()) - elog)
if diff < md {
mdIdx = nIdx
if n.exemplars[nIdx].Timestamp.AsTime().Before(e.Timestamp.AsTime()) {
mdIdx = nIdx
}
}
}
rIdx = mdIdx
}

// Adjust the slice according to rIdx and nIdx.
switch {
case rIdx == nIdx:
n.exemplars[nIdx] = e
case rIdx < nIdx:
n.exemplars = append(n.exemplars[:rIdx], append(n.exemplars[rIdx+1:nIdx], append([]*dto.Exemplar{e}, n.exemplars[nIdx:]...)...)...)
case rIdx > nIdx:
n.exemplars = append(n.exemplars[:nIdx], append([]*dto.Exemplar{e}, append(n.exemplars[nIdx:rIdx], n.exemplars[rIdx+1:]...)...)...)
}
}
Loading

0 comments on commit 542f7e6

Please sign in to comment.