Skip to content

Commit

Permalink
Use an incremental integer as ID for metrics
Browse files Browse the repository at this point in the history
The ID allows to use it as an index for lookups on slices
removing the requirement to use a map.
  • Loading branch information
codebien committed Feb 10, 2023
1 parent 2ad5e2e commit 908a243
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 125 deletions.
110 changes: 63 additions & 47 deletions metrics/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ type MetricsEngine struct {
// they can be both top-level metrics or sub-metrics
//
// TODO: remove the tracked map using the sequence number
metricsWithThresholds map[string]metrics.Thresholds
trackedMetrics map[string]*trackedMetric
metricsWithThresholds map[uint64]metrics.Thresholds
trackedMetrics []*trackedMetric

breachedThresholdsCount uint32
}
Expand All @@ -65,23 +65,27 @@ func NewMetricsEngine(runState *lib.TestRunState) (*MetricsEngine, error) {
me := &MetricsEngine{
test: runState,
logger: runState.Logger.WithField("component", "metrics-engine"),
metricsWithThresholds: make(map[string]metrics.Thresholds),
trackedMetrics: make(map[string]*trackedMetric),
metricsWithThresholds: make(map[uint64]metrics.Thresholds),
}

if me.test.RuntimeOptions.NoSummary.Bool &&
me.test.RuntimeOptions.NoThresholds.Bool {
return me, nil
}

for _, registered := range me.test.Registry.All() {
typ := registered.Type
me.trackedMetrics[registered.Name] = &trackedMetric{
Metric: registered,
sink: metrics.NewSinkByType(typ),
}
// It adds all the registered metrics as tracked
// the custom metrics are also added because they have
// been seen and registered during the initEnv run
// that must run before this constructor is called.
registered := me.test.Registry.All()
me.trackedMetrics = make([]*trackedMetric, len(registered)+1)
for _, mreg := range registered {
me.trackMetric(mreg)
}

// It adds register and tracks all the metrics defined by the thresholds.
// They are also marked as observed because
// the summary wants them also if they didn't receive any sample.
err := me.initSubMetricsAndThresholds()
if err != nil {
return nil, err
Expand Down Expand Up @@ -156,27 +160,19 @@ func (me *MetricsEngine) initSubMetricsAndThresholds() error {
return fmt.Errorf("invalid metric '%s' in threshold definitions: %w", metricName, err)
}

// TODO: check and confirm that this check is not an issue
if len(thresholds.Thresholds) > 0 {
me.metricsWithThresholds[metric.Name] = thresholds
me.metricsWithThresholds[metric.ID] = thresholds
}

// Mark the metric (and the parent metric, if we're dealing with a
// submetric) as observed, so they are shown in the end-of-test summary,
// even if they don't have any metric samples during the test run

me.trackedMetrics[metric.Name] = &trackedMetric{
Metric: metric,
sink: metrics.NewSinkByType(metric.Type),
observed: true,
}
// even if they don't have any metric samples during the test run.
me.trackMetric(metric)
me.trackedMetrics[metric.ID].observed = true

if metric.Sub != nil {
me.trackedMetrics[metric.Sub.Parent.Name] = &trackedMetric{
Metric: metric.Sub.Parent,
sink: metrics.NewSinkByType(metric.Sub.Parent.Type),
observed: true,
}
me.trackMetric(metric.Sub.Parent)
me.trackedMetrics[metric.Sub.Parent.ID].observed = true
}
}

Expand All @@ -187,15 +183,30 @@ func (me *MetricsEngine) initSubMetricsAndThresholds() error {
if err != nil {
return err // shouldn't happen, but ¯\_(ツ)_/¯
}
me.trackedMetrics[expResMetric.Name] = &trackedMetric{
Metric: expResMetric,
sink: metrics.NewSinkByType(expResMetric.Type),
}
me.trackMetric(expResMetric)
}

return nil
}

func (me *MetricsEngine) trackMetric(m *metrics.Metric) {
tm := &trackedMetric{
Metric: m,
sink: metrics.NewSinkByType(m.Type),
}

if me.trackedMetrics == nil {
me.trackedMetrics = []*trackedMetric{nil}
}

if m.ID >= uint64(len(me.trackedMetrics)) {
me.trackedMetrics = append(me.trackedMetrics, tm)
return
}

me.trackedMetrics[m.ID] = tm
}

// StartThresholdCalculations spins up a new goroutine to crunch thresholds and
// returns a callback that will stop the goroutine and finalizes calculations.
func (me *MetricsEngine) StartThresholdCalculations(
Expand Down Expand Up @@ -261,40 +272,40 @@ func (me *MetricsEngine) evaluateThresholds(
) (breachedThresholds []string, shouldAbort bool) {
t := getCurrentTestRunDuration()

computeThresholds := func(metricName string, ths metrics.Thresholds) {
observedMetric, ok := me.trackedMetrics[metricName]
if !ok {
panic(fmt.Sprintf("observed metric %q not found for the threhsolds", metricName))
}
computeThresholds := func(tm *trackedMetric, ths metrics.Thresholds) {
//if !ok {
//panic(fmt.Sprintf("observed metric %q not found for the threhsolds", metricName))
//}

observedMetric.m.Lock()
defer observedMetric.m.Unlock()
tm.m.Lock()
defer tm.m.Unlock()

// If either the metric has no thresholds defined, or its sinks
// are empty, let's ignore its thresholds execution at this point.
if len(ths.Thresholds) == 0 || (ignoreEmptySinks && observedMetric.sink.IsEmpty()) {
if len(ths.Thresholds) == 0 || (ignoreEmptySinks && tm.sink.IsEmpty()) {
return
}
observedMetric.tainted = false
tm.tainted = false

succ, err := ths.Run(observedMetric.sink, t)
succ, err := ths.Run(tm.sink, t)
if err != nil {
me.logger.WithField("metric_name", metricName).WithError(err).Error("Threshold error")
me.logger.WithField("metric_name", tm.Name).WithError(err).Error("Threshold error")
return
}
if succ {
return // threshold passed
}
breachedThresholds = append(breachedThresholds, metricName)
observedMetric.tainted = true
breachedThresholds = append(breachedThresholds, tm.Name)
tm.tainted = true
if ths.Abort {
shouldAbort = true
}
}

me.logger.Debugf("Running thresholds on %d metrics...", len(me.metricsWithThresholds))
for m, ths := range me.metricsWithThresholds {
computeThresholds(m, ths)
for mid, ths := range me.metricsWithThresholds {
tracked := me.trackedMetrics[mid]
computeThresholds(tracked, ths)
}

if len(breachedThresholds) > 0 {
Expand All @@ -308,7 +319,11 @@ func (me *MetricsEngine) evaluateThresholds(
// ObservedMetrics returns all observed metrics.
func (me *MetricsEngine) ObservedMetrics() map[string]metrics.ObservedMetric {
ometrics := make(map[string]metrics.ObservedMetric, len(me.trackedMetrics))
for _, tm := range me.trackedMetrics {
for i := 1; i < len(me.trackedMetrics); i++ {
//if me.trackedMetrics[i] == nil {
//continue
//}
tm := me.trackedMetrics[i]
tm.m.Lock()
if !tm.observed {
tm.m.Unlock()
Expand All @@ -322,11 +337,12 @@ func (me *MetricsEngine) ObservedMetrics() map[string]metrics.ObservedMetric {

// ObservedMetricByID returns the observed metric by the provided id.
func (me *MetricsEngine) ObservedMetricByID(id string) (metrics.ObservedMetric, bool) {
tm, ok := me.trackedMetrics[id]
if !ok {
m := me.test.Registry.Get(id)
if m == nil {
return metrics.ObservedMetric{}, false
}

tm := me.trackedMetrics[m.ID]
tm.m.Lock()
defer tm.m.Unlock()

Expand Down Expand Up @@ -361,7 +377,7 @@ func (me *MetricsEngine) trackedToObserved(tm *trackedMetric) metrics.ObservedMe
Tainted: null.BoolFrom(tm.tainted), // TODO: if null it's required then add to trackedMetric
}

definedThs, ok := me.metricsWithThresholds[tm.Name]
definedThs, ok := me.metricsWithThresholds[tm.ID]
if !ok || len(definedThs.Thresholds) < 1 {
return om
}
Expand Down
57 changes: 35 additions & 22 deletions metrics/engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,22 +117,26 @@ func TestMetricsEngineEvaluateThresholdNoAbort(t *testing.T) {
t.Run(tc.threshold, func(t *testing.T) {
t.Parallel()
me := newTestMetricsEngine(t)
m1 := me.test.Registry.MustNewMetric("m1", metrics.Counter)
m2 := me.test.Registry.MustNewMetric("m2", metrics.Counter)

ths := metrics.NewThresholds([]string{tc.threshold})
require.NoError(t, ths.Parse())
ths.Thresholds[0].AbortOnFail = tc.abortOnFail

me.metricsWithThresholds["m1"] = ths
me.metricsWithThresholds["m2"] = metrics.Thresholds{}
me.metricsWithThresholds[1] = ths
me.metricsWithThresholds[2] = metrics.Thresholds{}

csink := &metrics.CounterSink{}
csink.Add(metrics.Sample{Value: 6.0})
me.trackedMetrics["m1"] = &trackedMetric{
sink: csink,
}
me.trackedMetrics["m2"] = &trackedMetric{
sink: &metrics.CounterSink{},
}
me.trackedMetrics = append(me.trackedMetrics, &trackedMetric{
Metric: m1,
sink: csink,
})
me.trackedMetrics = append(me.trackedMetrics, &trackedMetric{
Metric: m2,
sink: &metrics.CounterSink{},
})

breached, abort := me.evaluateThresholds(false, zeroTestRunDuration)
require.Equal(t, tc.abortOnFail, abort)
Expand All @@ -155,18 +159,18 @@ func TestMetricsEngineEvaluateIgnoreEmptySink(t *testing.T) {
require.NoError(t, ths.Parse())
ths.Thresholds[0].AbortOnFail = true

me.metricsWithThresholds["m1"] = ths
me.metricsWithThresholds["m2"] = metrics.Thresholds{}
me.metricsWithThresholds[1] = ths
me.metricsWithThresholds[2] = metrics.Thresholds{}

me.trackedMetrics["m1"] = &trackedMetric{
me.trackedMetrics = append(me.trackedMetrics, &trackedMetric{
Metric: m1,
sink: &metrics.CounterSink{},
}
})

me.trackedMetrics["m2"] = &trackedMetric{
me.trackedMetrics = append(me.trackedMetrics, &trackedMetric{
Metric: m2,
sink: &metrics.CounterSink{},
}
})

breached, abort := me.evaluateThresholds(false, zeroTestRunDuration)
require.True(t, abort)
Expand All @@ -191,23 +195,32 @@ func TestMetricsEngineObserveMetricByID(t *testing.T) {
require.NoError(t, ths.Parse())
ths.Thresholds[0].AbortOnFail = true

me.metricsWithThresholds["m1"] = metrics.Thresholds{}
me.metricsWithThresholds["m2"] = ths
me.metricsWithThresholds[1] = metrics.Thresholds{}
me.metricsWithThresholds[2] = ths

me.trackedMetrics["m1"] = &trackedMetric{
me.trackedMetrics = append(me.trackedMetrics, &trackedMetric{
Metric: m1,
}
me.trackedMetrics["m2"] = &trackedMetric{
})
me.trackedMetrics = append(me.trackedMetrics, &trackedMetric{
Metric: m2,
observed: true,
}
})

ometric, found := me.ObservedMetricByID("m2")
require.True(t, found)
assert.Equal(t, m2, ometric.Metric)
assert.Len(t, ometric.Thresholds, 1)
}

func TestMetricsEngineTrackMetric(t *testing.T) {
me := newTestMetricsEngine(t)
m := me.test.Registry.MustNewMetric("my_counter", metrics.Counter)
me.trackMetric(m)
require.Len(t, me.trackedMetrics, 2)
assert.Equal(t, m, me.trackedMetrics[1].Metric)
assert.IsType(t, &metrics.CounterSink{}, me.trackedMetrics[1].sink)
}

func newTestMetricsEngine(t *testing.T) MetricsEngine {
trs := &lib.TestRunState{
TestPreInitState: &lib.TestPreInitState{
Expand All @@ -219,8 +232,8 @@ func newTestMetricsEngine(t *testing.T) MetricsEngine {
return MetricsEngine{
logger: trs.Logger,
test: trs,
metricsWithThresholds: make(map[string]metrics.Thresholds),
trackedMetrics: make(map[string]*trackedMetric),
metricsWithThresholds: make(map[uint64]metrics.Thresholds),
trackedMetrics: []*trackedMetric{nil},
}
}

Expand Down
35 changes: 8 additions & 27 deletions metrics/engine/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"time"

"github.com/sirupsen/logrus"
"go.k6.io/k6/metrics"
"go.k6.io/k6/output"
)

Expand Down Expand Up @@ -60,46 +59,28 @@ func (oi *outputIngester) flushMetrics() {
// allow us to have a per-bucket lock, instead of one global one, and it
// will allow us to split apart the metric Name and Type from its Sink and
// Observed fields...
//
// TODO: And, to further optimize things, if every metric (and sub-metric) had a
// sequential integer ID, we would be able to use a slice for these buckets
// and eliminate the map loopkups altogether!

samplesByMetric := make(map[*metrics.Metric][]metrics.Sample)

for _, sampleContainer := range sampleContainers {
samples := sampleContainer.GetSamples()

if len(samples) == 0 {
continue
}

for _, sample := range samples {
m := sample.Metric
samples := samplesByMetric[m]
samples = append(samples, sample)
samplesByMetric[m] = samples
// Mark it as observed so it shows in the end-of-test summary
// and add its value to its own sink.
om := oi.metricsEngine.trackedMetrics[sample.Metric.ID]
om.AddSamples(sample)

// and also to the same for any submetrics that match the metric sample
for _, sm := range m.Submetrics {
for _, sm := range sample.Metric.Submetrics {
if !sample.Tags.Contains(sm.Tags) {
continue
}
samples := samplesByMetric[sm.Metric]
samples = append(samples, sample)
samplesByMetric[sm.Metric] = samples
}
}
}

for m, samples := range samplesByMetric {
om, ok := oi.metricsEngine.trackedMetrics[m.Name]
if !ok {
// if they are not pre-defined then
// it is not required to sink them
continue
om := oi.metricsEngine.trackedMetrics[sm.Metric.ID]
om.AddSamples(sample)
}
}

om.AddSamples(samples...)
}
}
Loading

0 comments on commit 908a243

Please sign in to comment.