Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moving code for cache functions to utils file #48

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 30 additions & 124 deletions internal/cortex/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,16 @@ import (
"io"
"net/http"
"net/url"
"regexp"
"strconv"
"strings"
"syscall"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/hashicorp/golang-lru"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/thanos-io/thanos/internal/cortex/frontend/transport/utils"
querier_stats "github.com/thanos-io/thanos/internal/cortex/querier/stats"
"github.com/thanos-io/thanos/internal/cortex/tenant"
"github.com/thanos-io/thanos/internal/cortex/util"
Expand All @@ -40,7 +39,6 @@ var (
errCanceled = httpgrpc.Errorf(StatusClientClosedRequest, context.Canceled.Error())
errDeadlineExceeded = httpgrpc.Errorf(http.StatusGatewayTimeout, context.DeadlineExceeded.Error())
errRequestEntityTooLarge = httpgrpc.Errorf(http.StatusRequestEntityTooLarge, "http: request body too large")
cacheableResponseCodes = []int{http.StatusRequestTimeout, http.StatusGatewayTimeout, http.StatusBadRequest}
)

// HandlerConfig Config for a Handler.
Expand All @@ -55,12 +53,10 @@ type HandlerConfig struct {
// Handler accepts queries and forwards them to RoundTripper. It can log slow queries,
// but all other logic is inside the RoundTripper.
type Handler struct {
cfg HandlerConfig
log log.Logger
roundTripper http.RoundTripper
lruCache *lru.Cache
regex *regexp.Regexp
errorExtract *regexp.Regexp
cfg HandlerConfig
log log.Logger
roundTripper http.RoundTripper
failedQueryCache *utils.FailedQueryCache

// Metrics.
querySeconds *prometheus.CounterVec
Expand All @@ -72,26 +68,18 @@ type Handler struct {

// NewHandler creates a new frontend handler.
func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logger, reg prometheus.Registerer) http.Handler {
var (
LruCache *lru.Cache
err error
)

if cfg.FailedQueryCacheCapacity > 0 {
LruCache, err = lru.New(cfg.FailedQueryCacheCapacity)
if err != nil {
LruCache = nil
level.Warn(log).Log("msg", "Failed to create LruCache", "error", err)
}
}

h := &Handler{
cfg: cfg,
log: log,
roundTripper: roundTripper,
lruCache: LruCache,
regex: regexp.MustCompile(`[\s\n\t]+`),
errorExtract: regexp.MustCompile(`Code\((\d+)\)`),
}

if cfg.FailedQueryCacheCapacity > 0 {
FailedQueryCache, errQueryCache := utils.NewFailedQueryCache(cfg.FailedQueryCacheCapacity)
if errQueryCache != nil {
level.Warn(log).Log(errQueryCache.Error())
}
h.failedQueryCache = FailedQueryCache
}

if cfg.QueryStatsEnabled {
Expand Down Expand Up @@ -130,10 +118,9 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge

func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var (
stats *querier_stats.Stats
queryString url.Values
queryExpressionNormalized string
queryExpressionRangeLength int
stats *querier_stats.Stats
queryString url.Values
urlQuery url.Values
)

// Initialise the stats in the context and make sure it's propagated
Expand All @@ -153,22 +140,14 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r.Body = http.MaxBytesReader(w, r.Body, f.cfg.MaxBodySize)
r.Body = io.NopCloser(io.TeeReader(r.Body, &buf))

// Check if caching is enabled.
if f.lruCache != nil {
// Store query expression.
queryExpressionNormalized = f.regex.ReplaceAllString(r.URL.Query().Get("query"), " ")

// Store query time range length.
queryExpressionRangeLength = getQueryRangeSeconds(r)
urlQuery = r.URL.Query()

// Check if query in cache and whether value exceeds time range length.
if value, ok := f.lruCache.Get(queryExpressionNormalized); ok && value.(int) >= queryExpressionRangeLength {
hczhu-db marked this conversation as resolved.
Show resolved Hide resolved
// Check if query is cached
if f.failedQueryCache != nil {
cached, message := f.failedQueryCache.QueryHitCache(urlQuery)
if cached {
w.WriteHeader(http.StatusForbidden)
level.Info(util_log.WithContext(r.Context(), f.log)).Log(
"msg", "Retrieved query from cache",
"normalized_query", queryExpressionNormalized,
"range_seconds", queryExpressionRangeLength,
)
level.Info(util_log.WithContext(r.Context(), f.log)).Log(message)
f.cachedHits.Inc()
return
}
Expand All @@ -182,9 +161,14 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
writeError(w, err)
queryString = f.parseRequestQueryString(r, buf)
hczhu-db marked this conversation as resolved.
Show resolved Hide resolved

// Check if caching is enabled.
if f.lruCache != nil {
f.updateFailedQueryCache(err, queryExpressionNormalized, queryExpressionRangeLength, r)
// Update cache for failed queries.
if f.failedQueryCache != nil {
success, message := f.failedQueryCache.UpdateFailedQueryCache(err, urlQuery)
if success {
level.Info(util_log.WithContext(r.Context(), f.log)).Log(message)
} else {
level.Debug(util_log.WithContext(r.Context(), f.log)).Log(message)
}
pranavmishradatabricks marked this conversation as resolved.
Show resolved Hide resolved
}

if f.cfg.LogFailedQueries {
Expand Down Expand Up @@ -223,84 +207,6 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}

func (f *Handler) updateFailedQueryCache(err error, queryExpressionNormalized string, queryExpressionRangeLength int, r *http.Request) {
// Extracting error code from error string.
codeExtract := f.errorExtract.FindStringSubmatch(err.Error())

// Checking if error code extracted successfully.
if codeExtract == nil || len(codeExtract) < 2 {
level.Error(util_log.WithContext(r.Context(), f.log)).Log(
"msg", "Error string regex conversion error",
"normalized_query", queryExpressionNormalized,
"range_seconds", queryExpressionRangeLength,
"error", err)
return
}

// Converting error code to int.
errCode, strConvError := strconv.Atoi(codeExtract[1])

// Checking if error code extracted properly from string.
if strConvError != nil {
level.Error(util_log.WithContext(r.Context(), f.log)).Log(
"msg", "String to int conversion error",
"normalized_query", queryExpressionNormalized,
"range_seconds", queryExpressionRangeLength,
"error", err)
return
}

// If error should be cached, store it in cache.
if !isCacheableError(errCode) {
level.Debug(util_log.WithContext(r.Context(), f.log)).Log(
"msg", "Query not cached due to non-cacheable error code",
"normalized_query", queryExpressionNormalized,
"range_seconds", queryExpressionRangeLength,
"error", err,
)
return
}

// Checks if queryExpression is already in cache, and updates time range length value to min of stored and new value.
if contains, _ := f.lruCache.ContainsOrAdd(queryExpressionNormalized, queryExpressionRangeLength); contains {
if oldValue, ok := f.lruCache.Get(queryExpressionNormalized); ok {
queryExpressionRangeLength = min(queryExpressionRangeLength, oldValue.(int))
}
f.lruCache.Add(queryExpressionNormalized, queryExpressionRangeLength)
}

level.Debug(util_log.WithContext(r.Context(), f.log)).Log(
"msg", "Cached a failed query",
"normalized_query", queryExpressionNormalized,
"range_seconds", queryExpressionRangeLength,
"error", err,
)

}

// isCacheableError Returns true if response code is in pre-defined cacheable errors list, else returns false.
func isCacheableError(statusCode int) bool {
for _, errStatusCode := range cacheableResponseCodes {
if errStatusCode == statusCode {
return true
}
}
return false
}

// Time range length for queries, if either of "start" or "end" are not present, return 0.
func getQueryRangeSeconds(r *http.Request) int {
start, err := strconv.Atoi(r.URL.Query().Get("start"))
if err != nil {
return 0
}
end, err := strconv.Atoi(r.URL.Query().Get("end"))
if err != nil {
return 0
}
return end - start
}

func (f *Handler) reportFailedQuery(r *http.Request, queryString url.Values, err error) {
// NOTE(GiedriusS): see https://github.com/grafana/grafana/pull/60301 for more info.
grafanaDashboardUID := "-"
Expand Down
145 changes: 145 additions & 0 deletions internal/cortex/frontend/transport/utils/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright (c) The Cortex Authors.
// Licensed under the Apache License 2.0.
pranavmishradatabricks marked this conversation as resolved.
Show resolved Hide resolved

// Package utils Monitoring platform team helper resources for frontend
package utils

import (
"fmt"
"net/http"
"net/url"
"regexp"
"strconv"

lru "github.com/hashicorp/golang-lru"
)

var (
cacheableResponseCodes = []int{http.StatusRequestTimeout, http.StatusGatewayTimeout, http.StatusBadRequest}
)

// FailedQueryCache Handler holds an instance of FailedQueryCache and calls its methods
type FailedQueryCache struct {
regex *regexp.Regexp
errorExtract *regexp.Regexp
lruCache *lru.Cache
}

func NewFailedQueryCache(capacity int) (*FailedQueryCache, error) {
regex := regexp.MustCompile(`[\s\n\t]+`)
errorExtract := regexp.MustCompile(`Code\((\d+)\)`)
lruCache, err := lru.New(capacity)
if err != nil {
lruCache = nil
err = fmt.Errorf("Failed to create lru cache: %s", err)
return nil, err
}
return &FailedQueryCache{
regex: regex,
errorExtract: errorExtract,
lruCache: lruCache}, err
}

// UpdateFailedQueryCache returns true if query is cached so that callsite can increase counter, returns message as a string for callsite to log outcome
func (f *FailedQueryCache) updateFailedQueryCache(err error, queryExpressionNormalized string, queryExpressionRangeLength int, lruCache *lru.Cache) (bool, string) {
// Extracting error code from error string.
codeExtract := f.errorExtract.FindStringSubmatch(err.Error())

// Checking if error code extracted successfully.
if codeExtract == nil || len(codeExtract) < 2 {
message := createLogMessage("String to regex conversion error", queryExpressionNormalized, -1, queryExpressionRangeLength, err)
return false, message
}

// Converting error code to int.
errCode, strConvError := strconv.Atoi(codeExtract[1])

// Checking if error code extracted properly from string.
if strConvError != nil {
message := createLogMessage("String to int conversion error", queryExpressionNormalized, -1, queryExpressionRangeLength, err)
return false, message
}

// If error should be cached, store it in cache.
if !isCacheableError(errCode) {
message := createLogMessage("Query not cached due to non-cacheable error code", queryExpressionNormalized, -1, queryExpressionRangeLength, err)
return false, message
}

// Checks if queryExpression is already in cache, and updates time range length value to min of stored and new value.
if contains, _ := lruCache.ContainsOrAdd(queryExpressionNormalized, queryExpressionRangeLength); contains {
if oldValue, ok := lruCache.Get(queryExpressionNormalized); ok {
queryExpressionRangeLength = min(queryExpressionRangeLength, oldValue.(int))
}
lruCache.Add(queryExpressionNormalized, queryExpressionRangeLength)
}

message := createLogMessage("Cached a failed query", queryExpressionNormalized, -1, queryExpressionRangeLength, err)
return true, message
}

// QueryHitCache checks if the lru cache is hit and returns whether to increment counter for cache hits along with appropriate message.
func queryHitCache(queryExpressionNormalized string, queryExpressionRangeLength int, lruCache *lru.Cache) (bool, string) {
if value, ok := lruCache.Get(queryExpressionNormalized); ok && value.(int) <= queryExpressionRangeLength {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a bug here. The condition should be value.(int) >= queryExpressionRangeLength such that the cached range is longer than the current query range.
@pranavmishradatabricks Can you fix it and add a unit test case like the following one?

1. Cache failed query("xxx", range=1000) 
2. query("xxx", range=999) shouldn't be banned.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The condition should be value.(int) <= queryExpressionRangeLength since queryExpressionRangeLength is the range length of the new query and value.(int) is the range length of the old query. From my understanding, it should only hit the cache and block the query if the old query's range length <= the new query's range length (new query is longer range length).

I have a test case showing that it does not hit the cache here: Test File Line 153

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right.

cachedQueryRangeSeconds := value.(int)
message := createLogMessage("Retrieved query from cache", queryExpressionNormalized, cachedQueryRangeSeconds, queryExpressionRangeLength, nil)
return true, message
}
return false, ""
}

// isCacheableError Returns true if response code is in pre-defined cacheable errors list, else returns false.
func isCacheableError(statusCode int) bool {
for _, errStatusCode := range cacheableResponseCodes {
if errStatusCode == statusCode {
return true
}
}
return false
}

// GetQueryRangeSeconds Time range length for queries, if either of "start" or "end" are not present, return 0.
func getQueryRangeSeconds(query url.Values) int {
start, err := strconv.Atoi(query.Get("start"))
if err != nil {
return 0
}
end, err := strconv.Atoi(query.Get("end"))
if err != nil {
return 0
}
return end - start
}

func (f *FailedQueryCache) normalizeQueryString(query url.Values) string {
return f.regex.ReplaceAllString(query.Get("query"), " ")
}

func createLogMessage(message string, queryExpressionNormalized string, cachedQueryRangeSeconds int, queryExpressionRangeLength int, err error) string {
if err == nil {
return fmt.Sprintf(
`%s: %s, %s: %s, %s: %d, %s: %d`, "msg", message,
"cached_query", queryExpressionNormalized,
"cached_range_seconds", cachedQueryRangeSeconds,
"query_range_seconds", queryExpressionRangeLength)
}
return fmt.Sprintf(
`%s: %s, %s: %s, %s: %d, %s: %s`, "msg", message,
"cached_query", queryExpressionNormalized,
"query_range_seconds", queryExpressionRangeLength,
"cached_error", err)
}

func (f *FailedQueryCache) UpdateFailedQueryCache(err error, query url.Values) (bool, string) {
queryExpressionNormalized := f.normalizeQueryString(query)
queryExpressionRangeLength := getQueryRangeSeconds(query)
success, message := f.updateFailedQueryCache(err, queryExpressionNormalized, queryExpressionRangeLength, f.lruCache)
return success, message
}

func (f *FailedQueryCache) QueryHitCache(query url.Values) (bool, string) {
queryExpressionNormalized := f.normalizeQueryString(query)
queryExpressionRangeLength := getQueryRangeSeconds(query)
cached, message := queryHitCache(queryExpressionNormalized, queryExpressionRangeLength, f.lruCache)
return cached, message
}
Loading
Loading