Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moving code for cache functions to utils file #48

Merged
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 35 additions & 114 deletions internal/cortex/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,16 @@ import (
"io"
"net/http"
"net/url"
"regexp"
"strconv"
"strings"
"syscall"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/hashicorp/golang-lru"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/thanos-io/thanos/internal/cortex/frontend/transport/utils"
querier_stats "github.com/thanos-io/thanos/internal/cortex/querier/stats"
"github.com/thanos-io/thanos/internal/cortex/tenant"
"github.com/thanos-io/thanos/internal/cortex/util"
Expand All @@ -40,7 +39,6 @@ var (
errCanceled = httpgrpc.Errorf(StatusClientClosedRequest, context.Canceled.Error())
errDeadlineExceeded = httpgrpc.Errorf(http.StatusGatewayTimeout, context.DeadlineExceeded.Error())
errRequestEntityTooLarge = httpgrpc.Errorf(http.StatusRequestEntityTooLarge, "http: request body too large")
cacheableResponseCodes = []int{http.StatusRequestTimeout, http.StatusGatewayTimeout, http.StatusBadRequest}
)

// HandlerConfig Config for a Handler.
Expand All @@ -55,12 +53,10 @@ type HandlerConfig struct {
// Handler accepts queries and forwards them to RoundTripper. It can log slow queries,
// but all other logic is inside the RoundTripper.
type Handler struct {
cfg HandlerConfig
log log.Logger
roundTripper http.RoundTripper
lruCache *lru.Cache
regex *regexp.Regexp
errorExtract *regexp.Regexp
cfg HandlerConfig
log log.Logger
roundTripper http.RoundTripper
failedQueryCache *utils.FailedQueryCache

// Metrics.
querySeconds *prometheus.CounterVec
Expand All @@ -73,25 +69,22 @@ type Handler struct {
// NewHandler creates a new frontend handler.
func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logger, reg prometheus.Registerer) http.Handler {
var (
LruCache *lru.Cache
err error
FailedQueryCache *utils.FailedQueryCache
message string
)

if cfg.FailedQueryCacheCapacity > 0 {
LruCache, err = lru.New(cfg.FailedQueryCacheCapacity)
if err != nil {
LruCache = nil
level.Warn(log).Log("msg", "Failed to create LruCache", "error", err)
}
FailedQueryCache, message = utils.NewFailedQueryCache(cfg.FailedQueryCacheCapacity)
level.Warn(log).Log(message)
pranavmishradatabricks marked this conversation as resolved.
Show resolved Hide resolved
} else {
FailedQueryCache = nil
pranavmishradatabricks marked this conversation as resolved.
Show resolved Hide resolved
}

h := &Handler{
cfg: cfg,
log: log,
roundTripper: roundTripper,
lruCache: LruCache,
regex: regexp.MustCompile(`[\s\n\t]+`),
errorExtract: regexp.MustCompile(`Code\((\d+)\)`),
cfg: cfg,
log: log,
roundTripper: roundTripper,
failedQueryCache: FailedQueryCache,
}

if cfg.QueryStatsEnabled {
Expand Down Expand Up @@ -154,21 +147,19 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r.Body = io.NopCloser(io.TeeReader(r.Body, &buf))

// Check if caching is enabled.
if f.lruCache != nil {
if f.failedQueryCache != nil {
//Store query.
query := r.URL.Query()
// Store query expression.
queryExpressionNormalized = f.regex.ReplaceAllString(r.URL.Query().Get("query"), " ")

queryExpressionNormalized = f.failedQueryCache.NormalizeQueryString(query)
// Store query time range length.
queryExpressionRangeLength = getQueryRangeSeconds(r)
queryExpressionRangeLength = utils.GetQueryRangeSeconds(query)

// Check if query in cache and whether value exceeds time range length.
if value, ok := f.lruCache.Get(queryExpressionNormalized); ok && value.(int) >= queryExpressionRangeLength {
hczhu-db marked this conversation as resolved.
Show resolved Hide resolved
// Check if query in cache and whether value exceeds time range length. Log and increment counter appropriately.
pranavmishradatabricks marked this conversation as resolved.
Show resolved Hide resolved
cached, message := f.failedQueryCache.QueryHitCache(queryExpressionNormalized, queryExpressionRangeLength, f.failedQueryCache.LruCache)
pranavmishradatabricks marked this conversation as resolved.
Show resolved Hide resolved
if cached {
w.WriteHeader(http.StatusForbidden)
level.Info(util_log.WithContext(r.Context(), f.log)).Log(
"msg", "Retrieved query from cache",
"normalized_query", queryExpressionNormalized,
"range_seconds", queryExpressionRangeLength,
)
level.Info(util_log.WithContext(r.Context(), f.log)).Log(message)
f.cachedHits.Inc()
return
}
Expand All @@ -182,9 +173,17 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
writeError(w, err)
queryString = f.parseRequestQueryString(r, buf)
hczhu-db marked this conversation as resolved.
Show resolved Hide resolved

// Check if caching is enabled.
if f.lruCache != nil {
f.updateFailedQueryCache(err, queryExpressionNormalized, queryExpressionRangeLength, r)
// Try to update cache.
success, message := f.failedQueryCache.CallUpdateFailedQueryCache(err, queryExpressionNormalized, queryExpressionRangeLength)

if success {
level.Info(util_log.WithContext(r.Context(), f.log)).Log(message)
} else {
if message == "Failed query cache is not enabled" {
level.Debug(util_log.WithContext(r.Context(), f.log)).Log(message)
} else {
level.Error(util_log.WithContext(r.Context(), f.log)).Log(message)
}
pranavmishradatabricks marked this conversation as resolved.
Show resolved Hide resolved
}

if f.cfg.LogFailedQueries {
Expand Down Expand Up @@ -223,84 +222,6 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}

func (f *Handler) updateFailedQueryCache(err error, queryExpressionNormalized string, queryExpressionRangeLength int, r *http.Request) {
// Extracting error code from error string.
codeExtract := f.errorExtract.FindStringSubmatch(err.Error())

// Checking if error code extracted successfully.
if codeExtract == nil || len(codeExtract) < 2 {
level.Error(util_log.WithContext(r.Context(), f.log)).Log(
"msg", "Error string regex conversion error",
"normalized_query", queryExpressionNormalized,
"range_seconds", queryExpressionRangeLength,
"error", err)
return
}

// Converting error code to int.
errCode, strConvError := strconv.Atoi(codeExtract[1])

// Checking if error code extracted properly from string.
if strConvError != nil {
level.Error(util_log.WithContext(r.Context(), f.log)).Log(
"msg", "String to int conversion error",
"normalized_query", queryExpressionNormalized,
"range_seconds", queryExpressionRangeLength,
"error", err)
return
}

// If error should be cached, store it in cache.
if !isCacheableError(errCode) {
level.Debug(util_log.WithContext(r.Context(), f.log)).Log(
"msg", "Query not cached due to non-cacheable error code",
"normalized_query", queryExpressionNormalized,
"range_seconds", queryExpressionRangeLength,
"error", err,
)
return
}

// Checks if queryExpression is already in cache, and updates time range length value to min of stored and new value.
if contains, _ := f.lruCache.ContainsOrAdd(queryExpressionNormalized, queryExpressionRangeLength); contains {
if oldValue, ok := f.lruCache.Get(queryExpressionNormalized); ok {
queryExpressionRangeLength = min(queryExpressionRangeLength, oldValue.(int))
}
f.lruCache.Add(queryExpressionNormalized, queryExpressionRangeLength)
}

level.Debug(util_log.WithContext(r.Context(), f.log)).Log(
"msg", "Cached a failed query",
"normalized_query", queryExpressionNormalized,
"range_seconds", queryExpressionRangeLength,
"error", err,
)

}

// isCacheableError Returns true if response code is in pre-defined cacheable errors list, else returns false.
func isCacheableError(statusCode int) bool {
for _, errStatusCode := range cacheableResponseCodes {
if errStatusCode == statusCode {
return true
}
}
return false
}

// Time range length for queries, if either of "start" or "end" are not present, return 0.
func getQueryRangeSeconds(r *http.Request) int {
start, err := strconv.Atoi(r.URL.Query().Get("start"))
if err != nil {
return 0
}
end, err := strconv.Atoi(r.URL.Query().Get("end"))
if err != nil {
return 0
}
return end - start
}

func (f *Handler) reportFailedQuery(r *http.Request, queryString url.Values, err error) {
// NOTE(GiedriusS): see https://github.com/grafana/grafana/pull/60301 for more info.
grafanaDashboardUID := "-"
Expand Down
149 changes: 149 additions & 0 deletions internal/cortex/frontend/transport/utils/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright (c) The Cortex Authors.
// Licensed under the Apache License 2.0.
pranavmishradatabricks marked this conversation as resolved.
Show resolved Hide resolved

// Package utils Monitoring platform team helper resources for frontend
package utils

import (
"fmt"
"net/http"
"net/url"
"regexp"
"strconv"

lru "github.com/hashicorp/golang-lru"
)

var (
cacheableResponseCodes = []int{http.StatusRequestTimeout, http.StatusGatewayTimeout, http.StatusBadRequest}
)

// FailedQueryCache Handler holds an instance of FailedQueryCache and calls its methods
type FailedQueryCache struct {
regex *regexp.Regexp
errorExtract *regexp.Regexp
LruCache *lru.Cache
pranavmishradatabricks marked this conversation as resolved.
Show resolved Hide resolved
}

func NewFailedQueryCache(capacity int) (*FailedQueryCache, string) {
pranavmishradatabricks marked this conversation as resolved.
Show resolved Hide resolved
regex := regexp.MustCompile(`[\s\n\t]+`)
errorExtract := regexp.MustCompile(`Code\((\d+)\)`)
lruCache, err := lru.New(capacity)
message := ""
if err != nil {
lruCache = nil
message = fmt.Sprintf("Failed to create lru cache: %s", err)
pranavmishradatabricks marked this conversation as resolved.
Show resolved Hide resolved
}
return &FailedQueryCache{regex, errorExtract, lruCache}, message
pranavmishradatabricks marked this conversation as resolved.
Show resolved Hide resolved
}

// UpdateFailedQueryCache returns true if query is cached so that callsite can increase counter, returns message as a string for callsite to log outcome
func (f *FailedQueryCache) UpdateFailedQueryCache(err error, queryExpressionNormalized string, queryExpressionRangeLength int, lruCache *lru.Cache) (bool, string) {
// Extracting error code from error string.
codeExtract := f.errorExtract.FindStringSubmatch(err.Error())

// Checking if error code extracted successfully.
if codeExtract == nil || len(codeExtract) < 2 {
message := fmt.Sprintf(
`%s: %s, %s: %s, %s: %d, %s: %s`,
"msg", "String regex conversion error",
"normalized query", queryExpressionNormalized,
"query range seconds", queryExpressionRangeLength,
"updating cache for error", err,
)
return false, message
}

// Converting error code to int.
errCode, strConvError := strconv.Atoi(codeExtract[1])

// Checking if error code extracted properly from string.
if strConvError != nil {
message := fmt.Sprintf(
`%s: %s, %s: %s, %s: %d, %s: %s`,
"msg", "String to int conversion error",
"normalized query", queryExpressionNormalized,
"query range seconds", queryExpressionRangeLength,
"updating cache for error", err,
)
return false, message
}

// If error should be cached, store it in cache.
if !isCacheableError(errCode) {
message := fmt.Sprintf(
`%s: %s, %s: %s, %s: %d, %s: %s`,
"msg", "Query not cached due to non-cacheable error code",
"normalized query", queryExpressionNormalized,
"query range seconds", queryExpressionRangeLength,
"updating cache for error", err,
)
return false, message
}

// Checks if queryExpression is already in cache, and updates time range length value to min of stored and new value.
if contains, _ := lruCache.ContainsOrAdd(queryExpressionNormalized, queryExpressionRangeLength); contains {
if oldValue, ok := lruCache.Get(queryExpressionNormalized); ok {
queryExpressionRangeLength = min(queryExpressionRangeLength, oldValue.(int))
}
lruCache.Add(queryExpressionNormalized, queryExpressionRangeLength)
}

message := fmt.Sprintf(
`%s: %s, %s: %s, %s: %d, %s: %s`,
"msg", "Cached a failed query",
"normalized query", queryExpressionNormalized,
"range seconds", queryExpressionRangeLength,
"updating cache for error", err,
)
return true, message
}

// QueryHitCache checks if the lru cache is hit and returns whether to increment counter for cache hits along with appropriate message.
func (f *FailedQueryCache) QueryHitCache(queryExpressionNormalized string, queryExpressionRangeLength int, lruCache *lru.Cache) (bool, string) {
if value, ok := lruCache.Get(queryExpressionNormalized); ok && value.(int) >= queryExpressionRangeLength {
message := fmt.Sprintf(
`%s: %s, %s: %s, %s: %d`, "msg", "Retrieved query from cache",
"normalized query", queryExpressionNormalized,
"range seconds", queryExpressionRangeLength)
pranavmishradatabricks marked this conversation as resolved.
Show resolved Hide resolved

return true, message

}
return false, ""
}

// isCacheableError Returns true if response code is in pre-defined cacheable errors list, else returns false.
func isCacheableError(statusCode int) bool {
for _, errStatusCode := range cacheableResponseCodes {
if errStatusCode == statusCode {
return true
}
}
return false
}

// GetQueryRangeSeconds Time range length for queries, if either of "start" or "end" are not present, return 0.
func GetQueryRangeSeconds(query url.Values) int {
start, err := strconv.Atoi(query.Get("start"))
if err != nil {
return 0
}
end, err := strconv.Atoi(query.Get("end"))
if err != nil {
return 0
}
return end - start
}

func (f *FailedQueryCache) NormalizeQueryString(query url.Values) string {
return f.regex.ReplaceAllString(query.Get("query"), " ")
}

func (f *FailedQueryCache) CallUpdateFailedQueryCache(err error, queryExpressionNormalized string, queryExpressionRangeLength int) (bool, string) {
if f == nil {
return false, "Failed query cache is not enabled"
}
success, message := f.UpdateFailedQueryCache(err, queryExpressionNormalized, queryExpressionRangeLength, f.LruCache)
return success, message
}
Loading
Loading