Skip to content

Commit

Permalink
Use an incremental integer as ID for metrics
Browse files Browse the repository at this point in the history
The ID allows to use it as an index for lookups on slices
removing the requirement to use a map.
  • Loading branch information
codebien committed Feb 10, 2023
1 parent 2ad5e2e commit b25d18d
Show file tree
Hide file tree
Showing 7 changed files with 216 additions and 183 deletions.
138 changes: 81 additions & 57 deletions js/summary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,63 +138,87 @@ func TestTextSummaryWithThresholdNoData(t *testing.T) {
assert.Equal(t, expected, string(summaryOut))
}

// func TestTextSummaryWithSubMetrics(t *testing.T) {
// t.Parallel()

// registry := metrics.NewRegistry()
// parentMetric, err := registry.NewMetric("my_parent", metrics.Counter)
// require.NoError(t, err)
// parentMetric.Sink.Add(metrics.Sample{Value: 11})

// parentMetricPost, err := registry.NewMetric("my_parent_post", metrics.Counter)
// require.NoError(t, err)
// parentMetricPost.Sink.Add(metrics.Sample{Value: 22})

// subMetric, err := parentMetric.AddSubmetric("sub:1")
// require.NoError(t, err)
// subMetric.Metric.Sink.Add(metrics.Sample{Value: 1})

// subMetricPost, err := parentMetricPost.AddSubmetric("sub:2")
// require.NoError(t, err)
// subMetricPost.Metric.Sink.Add(metrics.Sample{Value: 2})

//metrics := map[string]*metrics.Metric{
//parentMetric.Name: parentMetric,
//parentMetricPost.Name: parentMetricPost,
//subMetric.Name: subMetric.Metric,
//subMetricPost.Name: subMetricPost.Metric,
//}

//summary := &lib.Summary{
//Metrics: metrics,
//RootGroup: &lib.Group{},
//TestRunDuration: time.Second,
//}

//runner, err := getSimpleRunner(
//t,
//"/script.js",
//"exports.default = function() {/* we don't run this, metrics are mocked */};",
//lib.RuntimeOptions{CompatibilityMode: null.NewString("base", true)},
//)
//require.NoError(t, err)

// result, err := runner.HandleSummary(context.Background(), summary)
// require.NoError(t, err)

// require.Len(t, result, 1)
// stdout := result["stdout"]
// require.NotNil(t, stdout)

// summaryOut, err := ioutil.ReadAll(stdout)
// require.NoError(t, err)

//expected := " my_parent........: 11 11/s\n" +
//" { sub:1 }......: 1 1/s\n" +
//" my_parent_post...: 22 22/s\n" +
//" { sub:2 }......: 2 2/s\n"
//assert.Equal(t, "\n"+expected+"\n", string(summaryOut))
//}
func TestTextSummaryWithSubMetrics(t *testing.T) {
t.Parallel()

registry := metrics.NewRegistry()
parentMetric, err := registry.NewMetric("my_parent", metrics.Counter)
require.NoError(t, err)

parentMetricPost, err := registry.NewMetric("my_parent_post", metrics.Counter)
require.NoError(t, err)

subMetric, err := parentMetric.AddSubmetric("sub:1")
require.NoError(t, err)

subMetricPost, err := parentMetricPost.AddSubmetric("sub:2")
require.NoError(t, err)

metrics := map[string]metrics.ObservedMetric{
parentMetric.Name: {
Metric: parentMetric,
Sink: func() metrics.Sink {
s := metrics.NewSinkByType(parentMetricPost.Type)
s.Add(metrics.Sample{Value: 11})
return s
}(),
},
parentMetricPost.Name: {
Metric: parentMetricPost,
Sink: func() metrics.Sink {
s := metrics.NewSinkByType(parentMetricPost.Type)
s.Add(metrics.Sample{Value: 22})
return s
}(),
},
subMetric.Name: {
Metric: subMetric.Metric,
Sink: func() metrics.Sink {
s := metrics.NewSinkByType(subMetric.Metric.Type)
s.Add(metrics.Sample{Value: 1})
return s
}(),
},
subMetricPost.Name: {
Metric: subMetricPost.Metric,
Sink: func() metrics.Sink {
s := metrics.NewSinkByType(subMetricPost.Metric.Type)
s.Add(metrics.Sample{Value: 2})
return s
}(),
},
}

summary := &lib.Summary{
Metrics: metrics,
RootGroup: &lib.Group{},
TestRunDuration: time.Second,
}

runner, err := getSimpleRunner(
t,
"/script.js",
"exports.default = function() {/* we don't run this, metrics are mocked */};",
lib.RuntimeOptions{CompatibilityMode: null.NewString("base", true)},
)
require.NoError(t, err)

result, err := runner.HandleSummary(context.Background(), summary)
require.NoError(t, err)

require.Len(t, result, 1)
stdout := result["stdout"]
require.NotNil(t, stdout)

summaryOut, err := ioutil.ReadAll(stdout)
require.NoError(t, err)

expected := " my_parent........: 11 11/s\n" +
" { sub:1 }......: 1 1/s\n" +
" my_parent_post...: 22 22/s\n" +
" { sub:2 }......: 2 2/s\n"
assert.Equal(t, "\n"+expected+"\n", string(summaryOut))
}

func createTestMetrics(t *testing.T) (map[string]metrics.ObservedMetric, *lib.Group) {
registry := metrics.NewRegistry()
Expand Down
114 changes: 66 additions & 48 deletions metrics/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ type MetricsEngine struct {
// they can be both top-level metrics or sub-metrics
//
// TODO: remove the tracked map using the sequence number
metricsWithThresholds map[string]metrics.Thresholds
trackedMetrics map[string]*trackedMetric
metricsWithThresholds map[uint64]metrics.Thresholds
trackedMetrics []*trackedMetric

breachedThresholdsCount uint32
}
Expand All @@ -65,23 +65,27 @@ func NewMetricsEngine(runState *lib.TestRunState) (*MetricsEngine, error) {
me := &MetricsEngine{
test: runState,
logger: runState.Logger.WithField("component", "metrics-engine"),
metricsWithThresholds: make(map[string]metrics.Thresholds),
trackedMetrics: make(map[string]*trackedMetric),
metricsWithThresholds: make(map[uint64]metrics.Thresholds),
}

if me.test.RuntimeOptions.NoSummary.Bool &&
me.test.RuntimeOptions.NoThresholds.Bool {
return me, nil
}

for _, registered := range me.test.Registry.All() {
typ := registered.Type
me.trackedMetrics[registered.Name] = &trackedMetric{
Metric: registered,
sink: metrics.NewSinkByType(typ),
}
// It adds all the registered metrics as tracked
// the custom metrics are also added because they have
// been seen and registered during the initEnv run
// that must run before this constructor is called.
registered := me.test.Registry.All()
me.trackedMetrics = make([]*trackedMetric, len(registered)+1)
for _, mreg := range registered {
me.trackMetric(mreg)
}

// It adds register and tracks all the metrics defined by the thresholds.
// They are also marked as observed because
// the summary wants them also if they didn't receive any sample.
err := me.initSubMetricsAndThresholds()
if err != nil {
return nil, err
Expand Down Expand Up @@ -156,27 +160,19 @@ func (me *MetricsEngine) initSubMetricsAndThresholds() error {
return fmt.Errorf("invalid metric '%s' in threshold definitions: %w", metricName, err)
}

// TODO: check and confirm that this check is not an issue
if len(thresholds.Thresholds) > 0 {
me.metricsWithThresholds[metric.Name] = thresholds
me.metricsWithThresholds[metric.ID] = thresholds
}

// Mark the metric (and the parent metric, if we're dealing with a
// submetric) as observed, so they are shown in the end-of-test summary,
// even if they don't have any metric samples during the test run

me.trackedMetrics[metric.Name] = &trackedMetric{
Metric: metric,
sink: metrics.NewSinkByType(metric.Type),
observed: true,
}
// even if they don't have any metric samples during the test run.
me.trackMetric(metric)
me.trackedMetrics[metric.ID].observed = true

if metric.Sub != nil {
me.trackedMetrics[metric.Sub.Parent.Name] = &trackedMetric{
Metric: metric.Sub.Parent,
sink: metrics.NewSinkByType(metric.Sub.Parent.Type),
observed: true,
}
me.trackMetric(metric.Sub.Parent)
me.trackedMetrics[metric.Sub.Parent.ID].observed = true
}
}

Expand All @@ -187,15 +183,37 @@ func (me *MetricsEngine) initSubMetricsAndThresholds() error {
if err != nil {
return err // shouldn't happen, but ¯\_(ツ)_/¯
}
me.trackedMetrics[expResMetric.Name] = &trackedMetric{
Metric: expResMetric,
sink: metrics.NewSinkByType(expResMetric.Type),
}
me.trackMetric(expResMetric)
}

// TODO: the trackedMetrics slice is fixed now
// to be optimal we could shrink the slice cap

return nil
}

func (me *MetricsEngine) trackMetric(m *metrics.Metric) {
tm := &trackedMetric{
Metric: m,
sink: metrics.NewSinkByType(m.Type),
}

if me.trackedMetrics == nil {
// the Metric ID starts from one
// so it skips the zero-th position
// to simplify the access operations.
me.trackedMetrics = []*trackedMetric{nil}
}

if m.ID >= uint64(len(me.trackedMetrics)) {
// expand the slice
me.trackedMetrics = append(me.trackedMetrics, tm)
return
}

me.trackedMetrics[m.ID] = tm
}

// StartThresholdCalculations spins up a new goroutine to crunch thresholds and
// returns a callback that will stop the goroutine and finalizes calculations.
func (me *MetricsEngine) StartThresholdCalculations(
Expand Down Expand Up @@ -261,40 +279,36 @@ func (me *MetricsEngine) evaluateThresholds(
) (breachedThresholds []string, shouldAbort bool) {
t := getCurrentTestRunDuration()

computeThresholds := func(metricName string, ths metrics.Thresholds) {
observedMetric, ok := me.trackedMetrics[metricName]
if !ok {
panic(fmt.Sprintf("observed metric %q not found for the threhsolds", metricName))
}

observedMetric.m.Lock()
defer observedMetric.m.Unlock()
computeThresholds := func(tm *trackedMetric, ths metrics.Thresholds) {
tm.m.Lock()
defer tm.m.Unlock()

// If either the metric has no thresholds defined, or its sinks
// are empty, let's ignore its thresholds execution at this point.
if len(ths.Thresholds) == 0 || (ignoreEmptySinks && observedMetric.sink.IsEmpty()) {
if len(ths.Thresholds) == 0 || (ignoreEmptySinks && tm.sink.IsEmpty()) {
return
}
observedMetric.tainted = false
tm.tainted = false

succ, err := ths.Run(observedMetric.sink, t)
succ, err := ths.Run(tm.sink, t)
if err != nil {
me.logger.WithField("metric_name", metricName).WithError(err).Error("Threshold error")
me.logger.WithField("metric_name", tm.Name).WithError(err).Error("Threshold error")
return
}
if succ {
return // threshold passed
}
breachedThresholds = append(breachedThresholds, metricName)
observedMetric.tainted = true
breachedThresholds = append(breachedThresholds, tm.Name)
tm.tainted = true
if ths.Abort {
shouldAbort = true
}
}

me.logger.Debugf("Running thresholds on %d metrics...", len(me.metricsWithThresholds))
for m, ths := range me.metricsWithThresholds {
computeThresholds(m, ths)
for mid, ths := range me.metricsWithThresholds {
tracked := me.trackedMetrics[mid]
computeThresholds(tracked, ths)
}

if len(breachedThresholds) > 0 {
Expand All @@ -308,7 +322,10 @@ func (me *MetricsEngine) evaluateThresholds(
// ObservedMetrics returns all observed metrics.
func (me *MetricsEngine) ObservedMetrics() map[string]metrics.ObservedMetric {
ometrics := make(map[string]metrics.ObservedMetric, len(me.trackedMetrics))
for _, tm := range me.trackedMetrics {

// it skips the first item as it is nil by definition
for i := 1; i < len(me.trackedMetrics); i++ {
tm := me.trackedMetrics[i]
tm.m.Lock()
if !tm.observed {
tm.m.Unlock()
Expand All @@ -322,11 +339,12 @@ func (me *MetricsEngine) ObservedMetrics() map[string]metrics.ObservedMetric {

// ObservedMetricByID returns the observed metric by the provided id.
func (me *MetricsEngine) ObservedMetricByID(id string) (metrics.ObservedMetric, bool) {
tm, ok := me.trackedMetrics[id]
if !ok {
m := me.test.Registry.Get(id)
if m == nil {
return metrics.ObservedMetric{}, false
}

tm := me.trackedMetrics[m.ID]
tm.m.Lock()
defer tm.m.Unlock()

Expand Down Expand Up @@ -361,7 +379,7 @@ func (me *MetricsEngine) trackedToObserved(tm *trackedMetric) metrics.ObservedMe
Tainted: null.BoolFrom(tm.tainted), // TODO: if null it's required then add to trackedMetric
}

definedThs, ok := me.metricsWithThresholds[tm.Name]
definedThs, ok := me.metricsWithThresholds[tm.ID]
if !ok || len(definedThs.Thresholds) < 1 {
return om
}
Expand Down
Loading

0 comments on commit b25d18d

Please sign in to comment.