Skip to content

Commit

Permalink
Add bytes fetched in query frontend (#37)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuchen-db authored May 18, 2024
2 parents ca8da96 + b6336d8 commit 9299649
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 8 deletions.
54 changes: 54 additions & 0 deletions internal/cortex/querier/queryrange/query_bytes_fetched.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright (c) The Cortex Authors.
// Licensed under the Apache License 2.0.

package queryrange

import (
"strconv"
)

// QueryBytesFetchedHeaderName is the http header name of number of bytes fetched by a query from m3readcoord.
// This name is compatible with M3 and rule manager code
const QueryBytesFetchedHeaderName = "M3-Fetched-Bytes-Estimate"

func sumQueryBytesFetched(responses ...Response) uint64 {
var result uint64
result = 0
for _, resp := range responses {
for _, hdr := range resp.GetHeaders() {
if hdr.GetName() == QueryBytesFetchedHeaderName {
for _, v := range hdr.GetValues() {
n, err := strconv.ParseUint(v, 10, 64)
if err != nil {
continue
}
result += n
}
break
}
}
}
return result
}

func QueryBytesFetchedPrometheusResponseHeaders(responses ...Response) []*PrometheusResponseHeader {
n := sumQueryBytesFetched(responses...)
if n == 0 {
return nil
}
return []*PrometheusResponseHeader{{
Name: QueryBytesFetchedHeaderName,
Values: []string{strconv.FormatUint(n, 10)},
}}
}

func QueryBytesFetchedHttpHeaderValue(response Response) []string {
var result []string
for _, hdr := range response.GetHeaders() {
if hdr.GetName() == QueryBytesFetchedHeaderName {
result = hdr.GetValues()
break
}
}
return result
}
20 changes: 20 additions & 0 deletions internal/cortex/querier/queryrange/query_bytes_fetched_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright (c) The Cortex Authors.
// Licensed under the Apache License 2.0.

package queryrange

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestQueryBytesFetchedPrometheusResponseHeaders(t *testing.T) {
resp1 := PrometheusResponse{Headers: []*PrometheusResponseHeader{&PrometheusResponseHeader{Name: "M3-Fetched-Bytes-Estimate", Values: []string{"100"}}}}
resp2 := PrometheusResponse{Headers: []*PrometheusResponseHeader{&PrometheusResponseHeader{Name: "M3-Fetched-Bytes-Estimate", Values: []string{"1000"}}}}
resp3 := PrometheusResponse{}
hdrs := QueryBytesFetchedPrometheusResponseHeaders(&resp1, &resp2, &resp3)
expected := []*PrometheusResponseHeader{{Name: QueryBytesFetchedHeaderName,
Values: []string{"1100"}}}
require.Equal(t, hdrs, expected)
}
17 changes: 10 additions & 7 deletions internal/cortex/querier/queryrange/query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"context"
stdjson "encoding/json"
"fmt"
io "io"
"io"
"math"
"net/http"
"net/url"
Expand Down Expand Up @@ -278,12 +278,12 @@ func (prometheusCodec) MergeResponse(_ Request, responses ...Response) (Response
Analysis: AnalyzesMerge(analyzes...),
},
}

response.Headers = QueryBytesFetchedPrometheusResponseHeaders(responses...)
if len(resultsCacheGenNumberHeaderValues) != 0 {
response.Headers = []*PrometheusResponseHeader{{
response.Headers = append(response.Headers, &PrometheusResponseHeader{
Name: ResultsCacheGenNumberHeaderName,
Values: resultsCacheGenNumberHeaderValues,
}}
})
}

return &response, nil
Expand Down Expand Up @@ -447,10 +447,13 @@ func (prometheusCodec) EncodeResponse(ctx context.Context, res Response) (*http.

sp.LogFields(otlog.Int("bytes", len(b)))

httpHeader := http.Header{
"Content-Type": []string{"application/json"}}
if queryBytesFetchedHttpHeaderValue := QueryBytesFetchedHttpHeaderValue(res); queryBytesFetchedHttpHeaderValue != nil {
httpHeader[QueryBytesFetchedHeaderName] = queryBytesFetchedHttpHeaderValue
}
resp := http.Response{
Header: http.Header{
"Content-Type": []string{"application/json"},
},
Header: httpHeader,
Body: io.NopCloser(bytes.NewBuffer(b)),
StatusCode: http.StatusOK,
ContentLength: int64(len(b)),
Expand Down
5 changes: 4 additions & 1 deletion pkg/queryfrontend/queryinstant_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func (c queryInstantCodec) MergeResponse(req queryrange.Request, responses ...qu
Analysis: queryrange.AnalyzesMerge(analyzes...),
Stats: queryrange.StatsMerge(responses),
},
Headers: queryrange.QueryBytesFetchedPrometheusResponseHeaders(responses...),
}
default:
v, err := vectorMerge(req, promResponses)
Expand All @@ -96,6 +97,7 @@ func (c queryInstantCodec) MergeResponse(req queryrange.Request, responses ...qu
Analysis: queryrange.AnalyzesMerge(analyzes...),
Stats: queryrange.StatsMerge(responses),
},
Headers: queryrange.QueryBytesFetchedPrometheusResponseHeaders(responses...),
}
}

Expand Down Expand Up @@ -248,7 +250,8 @@ func (c queryInstantCodec) EncodeResponse(ctx context.Context, res queryrange.Re

resp := http.Response{
Header: http.Header{
"Content-Type": []string{"application/json"},
"Content-Type": []string{"application/json"},
queryrange.QueryBytesFetchedHeaderName: queryrange.QueryBytesFetchedHttpHeaderValue(res),
},
Body: io.NopCloser(bytes.NewBuffer(b)),
StatusCode: http.StatusOK,
Expand Down

0 comments on commit 9299649

Please sign in to comment.