Skip to content

Commit

Permalink
feat(metrics): support OpenMetrics from applications (#7125)
Browse files Browse the repository at this point in the history
Signed-off-by: AyushSenapati <[email protected]>
  • Loading branch information
AyushSenapati authored Jul 18, 2023
1 parent c3b98e7 commit 7f7f96a
Show file tree
Hide file tree
Showing 8 changed files with 681 additions and 17 deletions.
192 changes: 175 additions & 17 deletions app/kuma-dp/pkg/dataplane/metrics/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"context"
"fmt"
"io"
"math"
"mime"
"net"
"net/http"
"net/url"
Expand All @@ -28,6 +30,23 @@ var (
var (
prometheusRequestHeaders = []string{"accept", "accept-encoding", "user-agent", "x-prometheus-scrape-timeout-seconds"}
logger = core.Log.WithName("metrics-hijacker")

// holds prometheus content types in order of priority.
prometheusPriorityContentType = []expfmt.Format{
expfmt.FmtOpenMetrics_1_0_0,
expfmt.FmtOpenMetrics_0_0_1,
expfmt.FmtText,
expfmt.FmtUnknown,
}

// Reverse mapping of prometheusPriorityContentType for faster lookup.
prometheusPriorityContentTypeLookup = func(expformats []expfmt.Format) map[expfmt.Format]int32 {
reverseMapping := map[expfmt.Format]int32{}
for priority, format := range expformats {
reverseMapping[format] = int32(priority)
}
return reverseMapping
}(prometheusPriorityContentType)
)

var _ component.Component = &Hijacker{}
Expand Down Expand Up @@ -151,44 +170,135 @@ func rewriteMetricsURL(address string, port uint32, path string, queryModifier Q
func (s *Hijacker) ServeHTTP(writer http.ResponseWriter, req *http.Request) {
ctx := req.Context()
out := make(chan []byte, len(s.applicationsToScrape))
contentTypes := make(chan expfmt.Format, len(s.applicationsToScrape))
var wg sync.WaitGroup
done := make(chan []byte)
wg.Add(len(s.applicationsToScrape))
go func() {
wg.Wait()
close(done)
close(out)
close(contentTypes)
close(done)
}()

for _, app := range s.applicationsToScrape {
go func(app ApplicationToScrape) {
defer wg.Done()
out <- s.getStats(ctx, req, app)
content, contentType := s.getStats(ctx, req, app)
out <- content

// It's possible to track the highest priority content type seen,
// but that would require mutex.
// I would prefer to calculate it later at one go
contentTypes <- contentType
}(app)
}

select {
case <-ctx.Done():
return
case <-done:
// default format returned by prometheus
writer.Header().Set("content-type", string(expfmt.FmtText))
for resp := range out {
if _, err := writer.Write(resp); err != nil {
logger.Error(err, "error while writing the response")
}
if _, err := writer.Write([]byte("\n")); err != nil {
logger.Error(err, "error while writing the response")
}
selectedCt := selectContentType(contentTypes, req.Header)
writer.Header().Set(hdrContentType, string(selectedCt))

// aggregate metrics of target applications and attempt to make them
// compatible with FmtOpenMetrics if it is the selected content type.
metrics := processMetrics(out, selectedCt)
if _, err := writer.Write(metrics); err != nil {
logger.Error(err, "error while writing the response")
}
}
}

func (s *Hijacker) getStats(ctx context.Context, initReq *http.Request, app ApplicationToScrape) []byte {
func processMetrics(contents <-chan []byte, contentType expfmt.Format) []byte {
buf := new(bytes.Buffer)

for metrics := range contents {
// remove the EOF marker from the metrics, because we are
// merging multiple metrics into one response.
metrics = bytes.ReplaceAll(metrics, []byte("# EOF"), []byte(""))

buf.Write(metrics)
buf.Write([]byte("\n"))
}

processedMetrics := append(processNewlineChars(buf.Bytes()), '\n')
buf.Reset()
buf.Write(processedMetrics)

if contentType == expfmt.FmtOpenMetrics_1_0_0 || contentType == expfmt.FmtOpenMetrics_0_0_1 {
// make metrics OpenMetrics compliant
buf.Write([]byte("# EOF\n"))
}

return buf.Bytes()
}

// processNewlineChars takes byte data and returns a new byte slice
// after trimming and deduplicating the newline characters.
func processNewlineChars(input []byte) []byte {
var deduped []byte

if len(input) == 0 {
return nil
}
last := input[0]

for i := 1; i < len(input); i++ {
if last == '\n' && input[i] == last {
continue
}
deduped = append(deduped, last)
last = input[i]
}
deduped = append(deduped, last)

return bytes.TrimSpace(deduped)
}

// selectContentType selects the highest priority content type supported by the applications.
// If no valid content type is returned by the applications, it negotiates content type based
// on Accept header of the scraper.
func selectContentType(contentTypes <-chan expfmt.Format, reqHeader http.Header) expfmt.Format {
// Tracks highest negotiated content type priority.
// Lower number means higher priority
//
// We can not simply use the highest priority content type i.e. `application/openmetrics-text`
// and try to mutate the metrics to make it compatible with this type,
// because:
// - if the application is not supporting this type,
// custom metrics might not be compatible (more prone to failure).
// - the user might be using older prom scraper.
//
// So it's better to choose the highest negotiated content type between the
// target apps and the scraper.
var ctPriority int32 = math.MaxInt32
ct := expfmt.FmtUnknown
for contentType := range contentTypes {
priority, valid := prometheusPriorityContentTypeLookup[contentType]
if !valid {
continue
}
if priority < ctPriority {
ctPriority = priority
ct = contentType
}
}

// If no valid content type is returned by the target applications,
// negotitate content type based on Accept header of the scraper.
if ct == expfmt.FmtUnknown {
ct = expfmt.Negotiate(reqHeader)
}

return ct
}

func (s *Hijacker) getStats(ctx context.Context, initReq *http.Request, app ApplicationToScrape) ([]byte, expfmt.Format) {
req, err := http.NewRequest("GET", rewriteMetricsURL(app.Address, app.Port, app.Path, app.QueryModifier, initReq.URL), nil)
if err != nil {
logger.Error(err, "failed to create request")
return nil
return nil, ""
}
s.passRequestHeaders(req.Header, initReq.Header)
req = req.WithContext(ctx)
Expand All @@ -206,25 +316,27 @@ func (s *Hijacker) getStats(ctx context.Context, initReq *http.Request, app Appl
}
if err != nil {
logger.Error(err, "failed call", "name", app.Name, "path", app.Path, "port", app.Port)
return nil
return nil, ""
}

respContentType := responseFormat(resp.Header)

var bodyBytes []byte
if app.Mutator != nil {
buf := new(bytes.Buffer)
if err := app.Mutator(resp.Body, buf); err != nil {
logger.Error(err, "failed while mutating data", "name", app.Name, "path", app.Path, "port", app.Port)
return nil
return nil, ""
}
bodyBytes = buf.Bytes()
} else {
bodyBytes, err = io.ReadAll(resp.Body)
if err != nil {
logger.Error(err, "failed while writing", "name", app.Name, "path", app.Path, "port", app.Port)
return nil
return nil, ""
}
}
return bodyBytes
return bodyBytes, respContentType
}

func (s *Hijacker) passRequestHeaders(into http.Header, from http.Header) {
Expand All @@ -241,3 +353,49 @@ func (s *Hijacker) passRequestHeaders(into http.Header, from http.Header) {
func (s *Hijacker) NeedLeaderElection() bool {
return false
}

const (
hdrContentType = "Content-Type"
textType = "text/plain"
)

// responseFormat extracts the correct format from a HTTP response header.
// If no matching format can be found FormatUnknown is returned.
func responseFormat(h http.Header) expfmt.Format {
ct := h.Get(hdrContentType)

mediatype, params, err := mime.ParseMediaType(ct)
if err != nil {
return expfmt.FmtUnknown
}

version := params["version"]

switch mediatype {
case expfmt.ProtoType:
p := params["proto"]
e := params["encoding"]
// only delimited encoding is supported by prometheus scraper
if p == expfmt.ProtoProtocol && e == "delimited" {
return expfmt.FmtProtoDelim
}

// if mediatype is `text/plain`, return Prometheus text format
// without checking the version, as there are few exporters
// which don't set the version param in the content-type header. ex: Envoy
case textType:
return expfmt.FmtText

// if mediatype is OpenMetricsType, return FmtUnknown for any version
// other than "0.0.1", "1.0.0" and "".
case expfmt.OpenMetricsType:
if version == expfmt.OpenMetricsVersion_0_0_1 || version == "" {
return expfmt.FmtOpenMetrics_0_0_1
}
if version == expfmt.OpenMetricsVersion_1_0_0 {
return expfmt.FmtOpenMetrics_1_0_0
}
}

return expfmt.FmtUnknown
}
Loading

0 comments on commit 7f7f96a

Please sign in to comment.