Skip to content

Commit

Permalink
Merge pull request #35 from zapier/add-passed-job-label
Browse files Browse the repository at this point in the history
add quick logic for jobs to be passed in the uri path
  • Loading branch information
mplachter authored Dec 8, 2022
2 parents c54732c + 735c913 commit 9ea76ef
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 52 deletions.
4 changes: 2 additions & 2 deletions aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (a *aggregate) saveFamily(familyName string, family *dto.MetricFamily) erro
return nil
}

func (a *aggregate) parseAndMerge(r io.Reader) error {
func (a *aggregate) parseAndMerge(r io.Reader, job string) error {
var parser expfmt.TextParser
inFamilies, err := parser.TextToMetricFamilies(r)
if err != nil {
Expand All @@ -110,7 +110,7 @@ func (a *aggregate) parseAndMerge(r io.Reader) error {
for name, family := range inFamilies {
// Sort labels in case source sends them inconsistently
for _, m := range family.Metric {
a.formatLabels(m)
a.formatLabels(m, job)
}

if err := validateFamily(family); err != nil {
Expand Down
81 changes: 41 additions & 40 deletions aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,25 +60,25 @@ histogram_count 1
`
want = `# HELP counter A counter
# TYPE counter counter
counter 60
counter{job="test"} 60
# HELP gauge A gauge
# TYPE gauge gauge
gauge 99
gauge{job="test"} 99
# HELP histogram A histogram
# TYPE histogram histogram
histogram_bucket{le="1"} 0
histogram_bucket{le="2"} 0
histogram_bucket{le="3"} 3
histogram_bucket{le="4"} 8
histogram_bucket{le="5"} 9
histogram_bucket{le="6"} 9
histogram_bucket{le="7"} 9
histogram_bucket{le="8"} 9
histogram_bucket{le="9"} 9
histogram_bucket{le="10"} 9
histogram_bucket{le="+Inf"} 9
histogram_sum 7
histogram_count 2
histogram_bucket{job="test",le="1"} 0
histogram_bucket{job="test",le="2"} 0
histogram_bucket{job="test",le="3"} 3
histogram_bucket{job="test",le="4"} 8
histogram_bucket{job="test",le="5"} 9
histogram_bucket{job="test",le="6"} 9
histogram_bucket{job="test",le="7"} 9
histogram_bucket{job="test",le="8"} 9
histogram_bucket{job="test",le="9"} 9
histogram_bucket{job="test",le="10"} 9
histogram_bucket{job="test",le="+Inf"} 9
histogram_sum{job="test"} 7
histogram_count{job="test"} 2
`

multilabel1 = `# HELP counter A counter
Expand All @@ -91,7 +91,7 @@ counter{a="a",b="b", ignore_label="ignore_value"} 2
`
multilabelResult = `# HELP counter A counter
# TYPE counter counter
counter{a="a",b="b"} 3
counter{a="a",b="b",job="test"} 3
`
labelFields1 = `# HELP ui_page_render_errors A counter
# TYPE ui_page_render_errors counter
Expand All @@ -104,8 +104,8 @@ ui_page_render_errors{path="/prom/:orgId"} 1
`
labelFieldResult = `# HELP ui_page_render_errors A counter
# TYPE ui_page_render_errors counter
ui_page_render_errors{path="/org/:orgId"} 1
ui_page_render_errors{path="/prom/:orgId"} 2
ui_page_render_errors{job="test",path="/org/:orgId"} 1
ui_page_render_errors{job="test",path="/prom/:orgId"} 2
`
gaugeInput = `
# HELP ui_external_lib_loaded A gauge with entries in un-sorted order
Expand All @@ -116,17 +116,17 @@ ui_external_lib_loaded{name="mixpanel",loaded="true"} 1
`
gaugeOutput = `# HELP ui_external_lib_loaded A gauge with entries in un-sorted order
# TYPE ui_external_lib_loaded gauge
ui_external_lib_loaded{loaded="true",name="Intercom"} 2
ui_external_lib_loaded{loaded="true",name="ga"} 2
ui_external_lib_loaded{loaded="true",name="mixpanel"} 2
ui_external_lib_loaded{job="test",loaded="true",name="Intercom"} 2
ui_external_lib_loaded{job="test",loaded="true",name="ga"} 2
ui_external_lib_loaded{job="test",loaded="true",name="mixpanel"} 2
`
duplicateLabels = `
# HELP ui_external_lib_loaded Test with duplicate values
# TYPE ui_external_lib_loaded gauge
ui_external_lib_loaded{name="Munchkin",loaded="true"} 15171
ui_external_lib_loaded{name="Munchkin",loaded="true"} 1
`
duplicateError = `duplicate labels: {__name__="ui_external_lib_loaded", loaded="true", name="Munchkin"}`
duplicateError = `duplicate labels: {__name__="ui_external_lib_loaded", job="test", loaded="true", name="Munchkin"}`

reorderedLabels1 = `# HELP counter A counter
# TYPE counter counter
Expand All @@ -138,20 +138,20 @@ counter{b="b",a="a"} 2
`
reorderedLabelsResult = `# HELP counter A counter
# TYPE counter counter
counter{a="a",b="b"} 3
counter{a="a",b="b",job="test"} 3
`

ignoredLabels1 = `# HELP counter A counter
# TYPE counter counter
counter{a="a",b="b", ignore_me="ignored"} 1
counter{a="a",b="b",ignore_me="ignored"} 1
`
ignoredLabels2 = `# HELP counter A counter
# TYPE counter counter
counter{b="b",a="a", ignore_me="ignored"} 2
counter{b="b",a="a",ignore_me="ignored"} 2
`
ignoredLabelsResult = `# HELP counter A counter
# TYPE counter counter
counter{a="a",b="b"} 3
counter{a="a",b="b",job="test"} 3
`
)

Expand All @@ -163,34 +163,35 @@ func TestNewAggregate(t *testing.T) {
func TestAggregate(t *testing.T) {
metricMiddleware := newMetricMiddleware(nil)
for _, c := range []struct {
testName string
a, b string
want string
ignoredLabels []string
err1 error
err2 error
}{
{gaugeInput, gaugeInput, gaugeOutput, []string{}, nil, nil},
{in1, in2, want, []string{}, nil, nil},
{multilabel1, multilabel2, multilabelResult, []string{"ignore_label"}, nil, nil},
{labelFields1, labelFields2, labelFieldResult, []string{}, nil, nil},
{duplicateLabels, "", "", []string{}, fmt.Errorf("%s", duplicateError), nil},
{reorderedLabels1, reorderedLabels2, reorderedLabelsResult, []string{}, nil, nil},
{ignoredLabels1, ignoredLabels2, ignoredLabelsResult, []string{"ignore_me"}, nil, nil},
{"simpleGauge", gaugeInput, gaugeInput, gaugeOutput, []string{}, nil, nil},
{"in", in1, in2, want, []string{}, nil, nil},
{"multilabel", multilabel1, multilabel2, multilabelResult, []string{"ignore_label"}, nil, nil},
{"labelFields", labelFields1, labelFields2, labelFieldResult, []string{}, nil, nil},
{"duplicateLabels", duplicateLabels, "", "", []string{}, fmt.Errorf("%s", duplicateError), nil},
{"reorderedLabels", reorderedLabels1, reorderedLabels2, reorderedLabelsResult, []string{}, nil, nil},
{"ignoredLabels", ignoredLabels1, ignoredLabels2, ignoredLabelsResult, []string{"ignore_me"}, nil, nil},
} {
rc := &RouterConfig{
MetricsMiddleware: &metricMiddleware,
Aggregate: newAggregate(AddIgnoredLabels(c.ignoredLabels...)),
}
router := setupRouter(rc)

if err := rc.Aggregate.parseAndMerge(strings.NewReader(c.a)); err != nil {
if err := rc.Aggregate.parseAndMerge(strings.NewReader(c.a), "test"); err != nil {
if c.err1 == nil {
t.Fatalf("Unexpected error: %s", err)
} else if c.err1.Error() != err.Error() {
t.Fatalf("Expected %s, got %s", c.err1, err)
}
}
if err := rc.Aggregate.parseAndMerge(strings.NewReader(c.b)); err != c.err2 {
if err := rc.Aggregate.parseAndMerge(strings.NewReader(c.b), "test"); err != c.err2 {
t.Fatalf("Expected %s, got %s", c.err2, err)
}

Expand All @@ -207,7 +208,7 @@ func TestAggregate(t *testing.T) {
ToFile: "want",
Context: 3,
})
t.Fatal(text)
t.Fatalf("%s: %s", c.testName, text)
}
}
}
Expand All @@ -232,10 +233,10 @@ func BenchmarkAggregate(b *testing.B) {
a.options.ignoredLabels = v.ignoredLabels
b.Run(fmt.Sprintf("metric_type_%s", v.inputName), func(b *testing.B) {
for n := 0; n < b.N; n++ {
if err := a.parseAndMerge(strings.NewReader(v.input1)); err != nil {
if err := a.parseAndMerge(strings.NewReader(v.input1), "test"); err != nil {
b.Fatalf("unexpected error %s", err)
}
if err := a.parseAndMerge(strings.NewReader(v.input2)); err != nil {
if err := a.parseAndMerge(strings.NewReader(v.input2), "test"); err != nil {
b.Fatalf("unexpected error %s", err)
}
}
Expand All @@ -248,15 +249,15 @@ func BenchmarkConcurrentAggregate(b *testing.B) {
for _, v := range testMetricTable {
a.options.ignoredLabels = v.ignoredLabels
b.Run(fmt.Sprintf("metric_type_%s", v.inputName), func(b *testing.B) {
if err := a.parseAndMerge(strings.NewReader(v.input1)); err != nil {
if err := a.parseAndMerge(strings.NewReader(v.input1), "test"); err != nil {
b.Fatalf("unexpected error %s", err)
}

for n := 0; n < b.N; n++ {
g, _ := errgroup.WithContext(context.Background())
for tN := 0; tN < 10; tN++ {
g.Go(func() error {
return a.parseAndMerge(strings.NewReader(v.input2))
return a.parseAndMerge(strings.NewReader(v.input2), "test")
})
}

Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ go 1.19

require (
github.com/gin-gonic/gin v1.8.1
github.com/go-playground/assert/v2 v2.0.1
github.com/pmezard/go-difflib v1.0.0
github.com/prometheus/client_golang v1.12.1
github.com/prometheus/client_model v0.3.0
Expand Down
17 changes: 16 additions & 1 deletion labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,22 @@ func strPtr(s string) *string {

}

func (a *aggregate) formatLabels(m *dto.Metric) {
var JobLabel = strPtr("job")

func addJobLabel(m *dto.Metric, job string) {
if len(m.Label) > 0 {
for _, l := range m.Label {
if l.GetName() == "job" {
l.Value = strPtr(job)
return
}
}
}
m.Label = append(m.Label, &dto.LabelPair{Name: JobLabel, Value: strPtr(job)})
}

func (a *aggregate) formatLabels(m *dto.Metric, job string) {
addJobLabel(m, job)
sort.Sort(byName(m.Label))

if len(a.options.ignoredLabels) > 0 {
Expand Down
11 changes: 6 additions & 5 deletions labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ func TestFormatLabels(t *testing.T) {
{},
},
}
a.formatLabels(m)
a.formatLabels(m, "test")

assert.Equal(t, &dto.LabelPair{Name: strPtr("thing1"), Value: strPtr("value1")}, m.Label[0])
assert.Equal(t, &dto.LabelPair{Name: strPtr("thing2"), Value: strPtr("value2")}, m.Label[1])
assert.Len(t, m.Label, 2)
assert.Equal(t, &dto.LabelPair{Name: strPtr("job"), Value: strPtr("test")}, m.Label[0])
assert.Equal(t, &dto.LabelPair{Name: strPtr("thing1"), Value: strPtr("value1")}, m.Label[1])
assert.Equal(t, &dto.LabelPair{Name: strPtr("thing2"), Value: strPtr("value2")}, m.Label[2])
assert.Len(t, m.Label, 3)

}

Expand Down Expand Up @@ -83,7 +84,7 @@ func BenchmarkFormatLabels(b *testing.B) {
a := newAggregate(AddIgnoredLabels(v.ignoredLabels...))
b.Run(fmt.Sprintf("metric_type_%s", v.inputName), func(b *testing.B) {
for n := 0; n < b.N; n++ {
a.formatLabels(v.m)
a.formatLabels(v.m, "test")
}
})
}
Expand Down
13 changes: 10 additions & 3 deletions router.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"fmt"
"log"
"net/http"

Expand Down Expand Up @@ -57,9 +58,15 @@ func setupRouter(cfg *RouterConfig) *gin.Engine {
r.GET("/metrics", mGin.Handler("metrics", *cfg.MetricsMiddleware), cfg.Aggregate.handler)
r.POST("/metrics/job/:job", mGin.Handler("/metrics/job", *cfg.MetricsMiddleware), func(c *gin.Context) {
c.Header("Access-Control-Allow-Origin", *cfg.AllowedCORS)
// TODO: job work just place holder for now
// job := c.Param("job")
if err := cfg.Aggregate.parseAndMerge(c.Request.Body); err != nil {
job := c.Param("job")
// TODO: add logic to verify correct format of job label
if job == "" {
err := fmt.Errorf("must send in a valid job name, sent: %s", job)
log.Println(err)
http.Error(c.Writer, err.Error(), http.StatusBadRequest)
return
}
if err := cfg.Aggregate.parseAndMerge(c.Request.Body, job); err != nil {
log.Println(err)
http.Error(c.Writer, err.Error(), http.StatusBadRequest)
return
Expand Down

0 comments on commit 9ea76ef

Please sign in to comment.