Skip to content

Commit

Permalink
Downsampling based on MaxResolutionWindow (#68)
Browse files Browse the repository at this point in the history
  • Loading branch information
eswdd authored Dec 19, 2019
1 parent 74787fc commit b6a1fcb
Show file tree
Hide file tree
Showing 7 changed files with 564 additions and 50 deletions.
6 changes: 5 additions & 1 deletion cmd/geras/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,11 @@ func main() {
prometheus.DefaultRegisterer.MustRegister(version.NewCollector("geras"))

// create openTSDBStore and expose its api on a grpc server
srv := store.NewOpenTSDBStore(logger, client, prometheus.DefaultRegisterer, *refreshInterval, storeLabels, allowedMetricNames, blockedMetricNames, *enableMetricSuggestions, *healthcheckMetric)
srv, err := store.NewOpenTSDBStore(logger, client, prometheus.DefaultRegisterer, *refreshInterval, storeLabels, allowedMetricNames, blockedMetricNames, *enableMetricSuggestions, *healthcheckMetric)
if err != nil {
level.Error(logger).Log("err", err)
os.Exit(1)
}
grpcSrv := grpc.NewServer(
grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module github.com/G-Research/geras

require (
github.com/G-Research/opentsdb-goclient v0.0.0-20191028155047-1a0d357f6ca7
github.com/G-Research/opentsdb-goclient v0.0.0-20191219200136-ccb48600721a
github.com/go-kit/kit v0.9.0
github.com/grpc-ecosystem/go-grpc-prometheus v0.0.0-20181025070259-68e3a13e4117
github.com/pkg/errors v0.8.1
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ github.com/Azure/go-autorest v11.2.8+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSW
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/G-Research/opentsdb-goclient v0.0.0-20191028155047-1a0d357f6ca7 h1:LCY7S5FtlqIWHXV0HBX4yGMelG7DCZX90gSPD1TrmqU=
github.com/G-Research/opentsdb-goclient v0.0.0-20191028155047-1a0d357f6ca7/go.mod h1:oVqXUYMDThF8Uz8WH3f80BlB7+Y5BV4wDtqQ06e4m7g=
github.com/G-Research/opentsdb-goclient v0.0.0-20191210204552-9a5d3f5d556d h1:gsMpaf4Y8Zm70kWestox0+M6jmThzV0zXH78wYQVCuE=
github.com/G-Research/opentsdb-goclient v0.0.0-20191210204552-9a5d3f5d556d/go.mod h1:oVqXUYMDThF8Uz8WH3f80BlB7+Y5BV4wDtqQ06e4m7g=
github.com/G-Research/opentsdb-goclient v0.0.0-20191219200136-ccb48600721a h1:T3zoIUSc6BzpUMsiyw7afr3IiTrELVX4u+tkzk7RWbQ=
github.com/G-Research/opentsdb-goclient v0.0.0-20191219200136-ccb48600721a/go.mod h1:oVqXUYMDThF8Uz8WH3f80BlB7+Y5BV4wDtqQ06e4m7g=
github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ=
github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
Expand Down
132 changes: 120 additions & 12 deletions pkg/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ type OpenTSDBStore struct {
enableMetricSuggestions bool
storeLabels []storepb.Label
healthcheckMetric string
aggregateToDownsample map[storepb.Aggr]string
downsampleToAggregate map[string]storepb.Aggr
}

func NewOpenTSDBStore(logger log.Logger, client opentsdb.ClientContext, reg prometheus.Registerer, interval time.Duration, storeLabels []storepb.Label, allowedMetricNames, blockedMetricNames *regexp.Regexp, enableMetricSuggestions bool, healthcheckMetric string) *OpenTSDBStore {
func NewOpenTSDBStore(logger log.Logger, client opentsdb.ClientContext, reg prometheus.Registerer, interval time.Duration, storeLabels []storepb.Label, allowedMetricNames, blockedMetricNames *regexp.Regexp, enableMetricSuggestions bool, healthcheckMetric string) (*OpenTSDBStore, error) {
store := &OpenTSDBStore{
logger: log.With(logger, "component", "opentsdb"),
openTSDBClient: client,
Expand All @@ -52,8 +54,30 @@ func NewOpenTSDBStore(logger log.Logger, client opentsdb.ClientContext, reg prom
blockedMetricNames: blockedMetricNames,
healthcheckMetric: healthcheckMetric,
}
err := store.populateMaps()
if err != nil {
return nil, err
}
store.updateMetrics(context.Background(), logger)
return store
return store, nil
}

func (store *OpenTSDBStore) populateMaps() error {
store.aggregateToDownsample = map[storepb.Aggr]string{
storepb.Aggr_COUNT: "count",
storepb.Aggr_SUM: "sum",
storepb.Aggr_MIN: "min",
storepb.Aggr_MAX: "max",
storepb.Aggr_COUNTER: "avg",
}
store.downsampleToAggregate = map[string]storepb.Aggr{}
for a, d := range store.aggregateToDownsample {
if _, exists := store.downsampleToAggregate[d]; exists {
return errors.New(fmt.Sprintf("Invalid aggregate/downsample mapping - not reversible for downsample function %s", d))
}
store.downsampleToAggregate[d] = a
}
return nil
}

type internalMetrics struct {
Expand Down Expand Up @@ -209,7 +233,7 @@ func (store *OpenTSDBStore) Series(
if respI.Error != nil {
return respI.Error
}
res, count, err := convertOpenTSDBResultsToSeriesResponse(respI)
res, count, err := convertOpenTSDBResultsToSeriesResponse(respI, store.downsampleToAggregate)
if err != nil {
return err
}
Expand Down Expand Up @@ -417,19 +441,75 @@ func (store *OpenTSDBStore) composeOpenTSDBQuery(req *storepb.SeriesRequest) (op
return opentsdb.QueryParam{}, nil, nil
}

subQueries := make([]opentsdb.SubQuery, len(metricNames))
aggregationCount := 0
needRawAggregation := true
var downsampleSecs int64
if req.MaxResolutionWindow != 0 {
needRawAggregation = false
for _, agg := range req.Aggregates {
switch agg {
case storepb.Aggr_RAW:
needRawAggregation = true
break
case storepb.Aggr_COUNT:
fallthrough
case storepb.Aggr_SUM:
fallthrough
case storepb.Aggr_MIN:
fallthrough
case storepb.Aggr_MAX:
fallthrough
case storepb.Aggr_COUNTER:
aggregationCount++
break
default:
level.Info(store.logger).Log("err", fmt.Sprintf("Unrecognised series aggregator: %v", agg))
needRawAggregation = true
break
}
}
downsampleSecs = req.MaxResolutionWindow / 1000
}
if needRawAggregation {
aggregationCount++
}
subQueries := make([]opentsdb.SubQuery, len(metricNames)*aggregationCount)
for i, mn := range metricNames {
subQueries[i] = opentsdb.SubQuery{
Aggregator: "none",
Metric: mn,
Fiters: tagFilters,
aggregationIndex := 0
if req.MaxResolutionWindow != 0 {
for _, agg := range req.Aggregates {
addAgg := true
var downsample string
if ds, exists := store.aggregateToDownsample[agg]; exists {
downsample = ds
} else {
addAgg = false
}
if addAgg {
subQueries[(i*aggregationCount)+aggregationIndex] = opentsdb.SubQuery{
Aggregator: "none",
Downsample: fmt.Sprintf("%vs-%s", downsampleSecs, downsample),
Metric: mn,
Fiters: tagFilters,
}
aggregationIndex++
}
}
}
if needRawAggregation {
subQueries[(i*aggregationCount)+aggregationIndex] = opentsdb.SubQuery{
Aggregator: "none",
Metric: mn,
Fiters: tagFilters,
}
}
}
query := opentsdb.QueryParam{
Start: req.MinTime,
End: req.MaxTime,
Queries: subQueries,
MsResolution: true,
ShowQuery: true,
}
level.Debug(store.logger).Log("tsdb-query", query.String())
return query, warnings, nil
Expand Down Expand Up @@ -464,7 +544,7 @@ func (store *OpenTSDBStore) checkMetricNames(metricNames []string, fullBlock boo
return allowed, warnings, nil
}

func convertOpenTSDBResultsToSeriesResponse(respI *opentsdb.QueryRespItem) (*storepb.SeriesResponse, int, error) {
func convertOpenTSDBResultsToSeriesResponse(respI *opentsdb.QueryRespItem, downsampleToAggregate map[string]storepb.Aggr) (*storepb.SeriesResponse, int, error) {
seriesLabels := make([]storepb.Label, len(respI.Tags))
i := 0
for k, v := range respI.Tags {
Expand All @@ -473,6 +553,17 @@ func convertOpenTSDBResultsToSeriesResponse(respI *opentsdb.QueryRespItem) (*sto
}
seriesLabels = append(seriesLabels, storepb.Label{Name: "__name__", Value: respI.Metric})

downsampleFunction := "none"
if hyphenIndex := strings.Index(respI.Query.Downsample, "-"); hyphenIndex >= 0 {
downsampleFunction = respI.Query.Downsample[hyphenIndex+1:]
}
var aggregate storepb.Aggr
if v, exists := downsampleToAggregate[downsampleFunction]; exists {
aggregate = v
} else {
aggregate = storepb.Aggr_RAW
}

// Turn datapoints into chunks (Prometheus's tsdb encoding)
dps := respI.GetDataPoints()
chunks := []storepb.AggrChunk{}
Expand All @@ -493,11 +584,28 @@ func convertOpenTSDBResultsToSeriesResponse(respI *opentsdb.QueryRespItem) (*sto
}
a.Append(int64(dp.Timestamp), dp.Value.(float64))
}
chunks = append(chunks, storepb.AggrChunk{
chunk := &storepb.Chunk{Type: storepb.Chunk_XOR, Data: c.Bytes()}
aggrChunk := storepb.AggrChunk{
MinTime: minTime,
MaxTime: int64(dps[i-1].Timestamp),
Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: c.Bytes()},
})
}
switch aggregate {
case storepb.Aggr_COUNT:
aggrChunk.Count = chunk
case storepb.Aggr_SUM:
aggrChunk.Sum = chunk
case storepb.Aggr_MIN:
aggrChunk.Min = chunk
case storepb.Aggr_MAX:
aggrChunk.Max = chunk
case storepb.Aggr_COUNTER:
aggrChunk.Counter = chunk
case storepb.Aggr_RAW:
fallthrough
default:
aggrChunk.Raw = chunk
}
chunks = append(chunks, aggrChunk)
}
return storepb.NewSeriesResponse(&storepb.Series{
Labels: seriesLabels,
Expand Down
Loading

0 comments on commit b6a1fcb

Please sign in to comment.