Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metrics): support OpenMetrics from applications #7103

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 52 additions & 17 deletions app/kuma-dp/pkg/dataplane/metrics/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,29 +200,60 @@ func (s *Hijacker) ServeHTTP(writer http.ResponseWriter, req *http.Request) {
case <-ctx.Done():
return
case <-done:
writer.Header().Set(hdrContentType, string(selectContentType(contentTypes, req.Header)))
for resp := range out {
if _, err := writer.Write(resp); err != nil {
logger.Error(err, "error while writing the response")
}
if _, err := writer.Write([]byte("\n")); err != nil {
logger.Error(err, "error while writing the response")
}
selectedCt := selectContentType(contentTypes, req.Header)
writer.Header().Set(hdrContentType, string(selectedCt))

// aggregate metrics of target applications and attempt to make them
// compatible with FmtOpenMetrics if it is the selected content type.
metrics := processMetrics(out, selectedCt)
if _, err := writer.Write(metrics); err != nil {
logger.Error(err, "error while writing the response")
}
}
}

func processMetrics(metrices <-chan []byte, contentType expfmt.Format) []byte {
buf := new(bytes.Buffer)

for metrics := range metrices {
// remove the EOF marker from the metrics, because we are
// merging multiple metrics into one response.
metrics = bytes.ReplaceAll(metrics, []byte("# EOF"), []byte(""))

if _, err := buf.Write(metrics); err != nil {
logger.Error(err, "error while writing the response to temporary buffer")
}
if _, err := buf.Write([]byte("\n")); err != nil {
logger.Error(err, "error while writing the response")
}
}

// if the content type is not OpenMetrics, we don't need to do any processing
if !(contentType == expfmt.FmtOpenMetrics_1_0_0 || contentType == expfmt.FmtOpenMetrics_0_0_1) {
return buf.Bytes()
}

// make metrics OpenMetrics compliant
expfmt.FinalizeOpenMetrics(buf)
return bytes.ReplaceAll(buf.Bytes(), []byte("\n\n"), []byte("\n"))
}

// selectContentType selects the highest priority content type supported by the applications.
// If no valid content type is returned by the applications, it returns the highest priority
// content type supported by the scraper.
// If no valid content type is returned by the applications, it negotiates content type based
// on Accept header of the scraper.
func selectContentType(contentTypes <-chan expfmt.Format, reqHeader http.Header) expfmt.Format {
// Tracks highest negotiated content type priority.
// Lower number means higher priority
//
// We should not simply use the highest priority content type even if `application/openmetrics-text`
// is the superset of `text/plain`, as it might not be
// supported by the applications or the user might be using older prom scraper.
// So it's better to choose the highest negotiated content type between the apps and the scraper.
// We can not simply use the highest priority content type i.e. `application/openmetrics-text`
// and try to mutate the metrics to make it compatible with this type,
// because:
// - if the application is not supporting this type,
// custom metrics might not be compatible (more prone to failure).
// - the user might be using older prom scraper.
//
// So it's better to choose the highest negotiated content type between the
// target apps and the scraper.
var ctPriority int32 = math.MaxInt32
ct := expfmt.FmtUnknown
for contentType := range contentTypes {
Expand All @@ -236,10 +267,13 @@ func selectContentType(contentTypes <-chan expfmt.Format, reqHeader http.Header)
}
}

// If no valid content type is returned by the applications,
// use the highest priority content type supported by the scraper.
// If no valid content type is returned by the target applications,
// negotitate content type based on Accept header of the scraper.
//
// Note: NegotiateIncludingOpenMetrics is not yet fully supported.
// So invoke expfmt.Negotiate() instead
if ct == expfmt.FmtUnknown {
ct = expfmt.NegotiateIncludingOpenMetrics(reqHeader)
ct = expfmt.Negotiate(reqHeader)
}

return ct
Expand Down Expand Up @@ -342,6 +376,7 @@ func responseFormat(h http.Header) expfmt.Format {
case textType:
return expfmt.FmtText

// TODO: if version does not match or set, should we return FmtUnknown?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can expect the open metric format to have a version I think.

case expfmt.OpenMetricsType:
if version == openmetricsVersion_0_0_1 {
return expfmt.FmtOpenMetrics_0_0_1
Expand Down
8 changes: 4 additions & 4 deletions app/kuma-dp/pkg/dataplane/metrics/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,24 +65,24 @@ var _ = Describe("Select Content Type", func() {
Expect(actualContentType).To(Equal(expfmt.FmtOpenMetrics_0_0_1))
})

It("should honor max supported accept type when app returns invalid content-type", func() {
It("should negotiate content-type based on Accept header", func() {
contentTypes := make(chan expfmt.Format, 1)
contentTypes <- expfmt.Format("invalid_content_type")
close(contentTypes)
reqHeader.Add("Accept", "application/openmetrics-text;version=1.0.0,application/openmetrics-text;version=0.0.1;q=0.75,text/plain;version=0.0.4;q=0.5,*/*;q=0.1")

actualContentType := selectContentType(contentTypes, reqHeader)
Expect(actualContentType).To(Equal(expfmt.FmtOpenMetrics_1_0_0))
Expect(actualContentType).To(Equal(expfmt.Negotiate(reqHeader)))
})

It("should use highest priority content-type available", func() {
It("should negotiate content-type based on Accept header", func() {
contentTypes := make(chan expfmt.Format, 1)
contentTypes <- expfmt.Format("invalid_content_type")
close(contentTypes)
reqHeader.Add("Accept", "*/*")

actualContentType := selectContentType(contentTypes, reqHeader)
Expect(actualContentType).To(Equal(expfmt.FmtText))
Expect(actualContentType).To(Equal(expfmt.Negotiate(reqHeader)))
})
})

Expand Down