From 9546c684387ca672b53e299338c169173b46e4fb Mon Sep 17 00:00:00 2001 From: bom-d-van Date: Fri, 20 Dec 2019 13:29:27 +0100 Subject: [PATCH] Support a new aggregation policy: mix (with percentiles) --- carbonserver/carbonserver.go | 14 + carbonserver/fetchfromdisk.go | 22 +- deploy/storage-aggregation.conf | 5 + helper/aggregation.go | 37 ++ persister/whisper.go | 16 +- persister/whisper_aggregation.go | 34 +- .../go-graphite/go-whisper/compress.go | 440 ++++++++++++++++-- .../go-graphite/go-whisper/debug.go | 59 ++- .../go-graphite/go-whisper/whisper.go | 339 ++++++++++---- 9 files changed, 788 insertions(+), 178 deletions(-) create mode 100644 helper/aggregation.go diff --git a/carbonserver/carbonserver.go b/carbonserver/carbonserver.go index bd444be04..ba5a94726 100644 --- a/carbonserver/carbonserver.go +++ b/carbonserver/carbonserver.go @@ -783,6 +783,13 @@ func (listener *CarbonserverListener) expandGlobs(ctx context.Context, query str var useGlob bool + var aggregationSpec string + if strings.Contains(query, "@") { + a := strings.Split(query, "@") + query = a[0] + aggregationSpec = a[1] + } + // TODO: Find out why we have set 'useGlob' if 'star == -1' if star := strings.IndexByte(query, '*'); strings.IndexByte(query, '[') == -1 && strings.IndexByte(query, '?') == -1 && (star == -1 || star == len(query)-1) { useGlob = true @@ -880,6 +887,13 @@ func (listener *CarbonserverListener) expandGlobs(ctx context.Context, query str files[i] = strings.Replace(p, "/", ".", -1) } + if aggregationSpec != "" { + for i := range files { + files[i] += "@" + aggregationSpec + } + query += "@" + aggregationSpec + } + matchedCount = len(files) resultCh <- &ExpandedGlobResponse{query, files, leafs, nil} } diff --git a/carbonserver/fetchfromdisk.go b/carbonserver/fetchfromdisk.go index 50a144a9d..b891d2bdb 100644 --- a/carbonserver/fetchfromdisk.go +++ b/carbonserver/fetchfromdisk.go @@ -2,6 +2,7 @@ package carbonserver import ( "errors" + "fmt" _ "net/http/pprof" "strings" "sync/atomic" @@ -10,6 +11,7 @@ import ( "go.uber.org/zap" "github.com/go-graphite/go-whisper" + "github.com/lomik/go-carbon/helper" "github.com/lomik/go-carbon/points" ) @@ -28,6 +30,13 @@ type metricFromDisk struct { func (listener *CarbonserverListener) fetchFromDisk(metric string, fromTime, untilTime int32) (*metricFromDisk, error) { var step int32 + var aggregationSpec string + if strings.Contains(metric, "@") { + a := strings.Split(metric, "@") + metric = a[0] + aggregationSpec = a[1] + } + // We need to obtain the metadata from whisper file anyway. path := listener.whisperData + "/" + strings.Replace(metric, ".", "/", -1) + ".wsp" w, err := whisper.OpenWithOptions(path, &whisper.Options{ @@ -93,11 +102,20 @@ func (listener *CarbonserverListener) fetchFromDisk(metric string, fromTime, unt listener.prometheus.diskRequest() res.DiskStartTime = time.Now() - points, err := w.Fetch(int(fromTime), int(untilTime)) + var points *whisper.TimeSeries + if aggregationSpec == "" { + points, err = w.Fetch(int(fromTime), int(untilTime)) + } else { + var spec whisper.MixAggregationSpec + spec.Method, spec.Percentile, err = helper.ParseAggregationMethod(aggregationSpec) + if err == nil { + points, err = w.FetchByAggregation(int(fromTime), int(untilTime), &spec) + } + } w.Close() if err != nil { logger.Warn("failed to fetch points", zap.Error(err)) - return nil, errors.New("failed to fetch points") + return nil, fmt.Errorf("failed to fetch points: %s", err) } // Should never happen, because we have a check for proper archive now diff --git a/deploy/storage-aggregation.conf b/deploy/storage-aggregation.conf index 733e72c40..b1ad91cb7 100644 --- a/deploy/storage-aggregation.conf +++ b/deploy/storage-aggregation.conf @@ -5,3 +5,8 @@ pattern = .* xFilesFactor = 0.5 aggregationMethod = average + +[mix] +pattern = mix.* +xFilesFactor = 0.5 +aggregationMethod = average,sum,average,median,p90,p95 diff --git a/helper/aggregation.go b/helper/aggregation.go new file mode 100644 index 000000000..e55e68bb7 --- /dev/null +++ b/helper/aggregation.go @@ -0,0 +1,37 @@ +package helper + +import ( + "errors" + "strconv" + "strings" + + whisper "github.com/go-graphite/go-whisper" +) + +func ParseAggregationMethod(str string) (method whisper.AggregationMethod, percentile float32, err error) { + switch str { + case "average", "avg": + method = whisper.Average + case "sum": + method = whisper.Sum + case "last": + method = whisper.Last + case "max": + method = whisper.Max + case "min": + method = whisper.Min + case "median": + str = "p50" + fallthrough + default: + if strings.HasPrefix(str, "p") { + method = whisper.Percentile + var percentile64 float64 + percentile64, err = strconv.ParseFloat(strings.TrimLeft(str, "p"), 32) + percentile = float32(percentile64) + } else { + err = errors.New("unknown aggregation method") + } + } + return +} diff --git a/persister/whisper.go b/persister/whisper.go index 912acf588..e91cd9187 100644 --- a/persister/whisper.go +++ b/persister/whisper.go @@ -253,7 +253,6 @@ func (p *Whisper) store(metric string) { p.logger.Error("no storage aggregation defined for metric", zap.String("metric", metric)) return } - if err = os.MkdirAll(filepath.Dir(path), os.ModeDir|os.ModePerm); err != nil { p.logger.Error("mkdir failed", zap.String("dir", filepath.Dir(path)), @@ -267,10 +266,19 @@ func (p *Whisper) store(metric string) { if schema.Compressed != nil { compressed = *schema.Compressed } + if !compressed && aggr.aggregationMethod == whisper.Mix { + // intended to be less intrusive, just logging an error + p.logger.Error("bad aggregation match", + zap.String("path", path), + zap.Error(fmt.Errorf("mix aggregation currently only support cwhisper format, resetting it to default aggregation policy %s", p.aggregation.Default.aggregationMethod)), + ) + aggr = p.aggregation.Default + } w, err = whisper.CreateWithOptions(path, schema.Retentions, aggr.aggregationMethod, float32(aggr.xFilesFactor), &whisper.Options{ - Sparse: p.sparse, - FLock: p.flock, - Compressed: compressed, + Sparse: p.sparse, + FLock: p.flock, + Compressed: compressed, + MixAggregationSpecs: aggr.mixAggregationSpecs, }) if err != nil { p.logger.Error("create new whisper file failed", diff --git a/persister/whisper_aggregation.go b/persister/whisper_aggregation.go index 71fbdaf0f..c6855ff4d 100644 --- a/persister/whisper_aggregation.go +++ b/persister/whisper_aggregation.go @@ -8,8 +8,10 @@ import ( "fmt" "regexp" "strconv" + "strings" whisper "github.com/go-graphite/go-whisper" + "github.com/lomik/go-carbon/helper" ) // WhisperAggregationItem ... @@ -19,6 +21,7 @@ type WhisperAggregationItem struct { xFilesFactor float64 aggregationMethodStr string aggregationMethod whisper.AggregationMethod + mixAggregationSpecs []whisper.MixAggregationSpec } // WhisperAggregation ... @@ -88,21 +91,24 @@ func ReadWhisperAggregation(filename string) (*WhisperAggregation, error) { section["xfilesfactor"], item.name, err.Error()) } + var err error item.aggregationMethodStr = section["aggregationmethod"] - switch item.aggregationMethodStr { - case "average", "avg": - item.aggregationMethod = whisper.Average - case "sum": - item.aggregationMethod = whisper.Sum - case "last": - item.aggregationMethod = whisper.Last - case "max": - item.aggregationMethod = whisper.Max - case "min": - item.aggregationMethod = whisper.Min - default: - return nil, fmt.Errorf("unknown aggregation method '%s'", - section["aggregationmethod"]) + if strings.Contains(item.aggregationMethodStr, ",") { + item.aggregationMethod = whisper.Mix + specStrs := strings.Split(item.aggregationMethodStr, ",") + for _, specStr := range specStrs { + var spec whisper.MixAggregationSpec + spec.Method, spec.Percentile, err = helper.ParseAggregationMethod(specStr) + if err != nil { + break + } + item.mixAggregationSpecs = append(item.mixAggregationSpecs, spec) + } + } else { + item.aggregationMethod, _, err = helper.ParseAggregationMethod(item.aggregationMethodStr) + } + if err != nil { + return nil, fmt.Errorf("failed to parse aggregation method '%s': %s", section["aggregationmethod"], err) } result.Data = append(result.Data, item) diff --git a/vendor/github.com/go-graphite/go-whisper/compress.go b/vendor/github.com/go-graphite/go-whisper/compress.go index dae018499..ce19d5ef1 100644 --- a/vendor/github.com/go-graphite/go-whisper/compress.go +++ b/vendor/github.com/go-graphite/go-whisper/compress.go @@ -2,6 +2,7 @@ package whisper import ( "encoding/binary" + "errors" "fmt" "io" "io/ioutil" @@ -9,6 +10,7 @@ import ( "math/bits" "os" "sort" + "strconv" "sync" "time" "unsafe" @@ -88,6 +90,14 @@ func (whisper *Whisper) WriteHeaderCompressed() (err error) { i += packInt(b, archive.blockCount, i) i += packFloat32(b, archive.avgCompressedPointSize, i) + var mixSpecSize int + if archive.aggregationSpec != nil { + b[i] = byte(archive.aggregationSpec.Method) + i += ByteSize + i += packFloat32(b, archive.aggregationSpec.Percentile, i) + mixSpecSize = ByteSize + FloatSize + } + i += packInt(b, archive.cblock.index, i) i += packInt(b, archive.cblock.p0.interval, i) i += packFloat64(b, archive.cblock.p0.value, i) @@ -104,7 +114,11 @@ func (whisper *Whisper) WriteHeaderCompressed() (err error) { i += packInt(b, int(archive.stats.discard.oldInterval), i) i += packInt(b, int(archive.stats.extended), i) - i += FreeCompressedArchiveInfoSize + i += FreeCompressedArchiveInfoSize - mixSpecSize + + if FreeCompressedArchiveInfoSize < mixSpecSize { + panic("out of FreeCompressedArchiveInfoSize") // a panic that should never happens + } } // write block_range_info and buffer @@ -195,6 +209,14 @@ func (whisper *Whisper) readHeaderCompressed() (err error) { arc.avgCompressedPointSize = unpackFloat32(b[offset : offset+FloatSize]) offset += FloatSize + if whisper.aggregationMethod == Mix && i > 0 { + arc.aggregationSpec = &MixAggregationSpec{} + arc.aggregationSpec.Method = AggregationMethod(b[offset]) + offset += ByteSize + arc.aggregationSpec.Percentile = unpackFloat32(b[offset : offset+FloatSize]) + offset += FloatSize + } + arc.cblock.index = unpackInt(b[offset : offset+IntSize]) offset += IntSize arc.cblock.p0.interval = unpackInt(b[offset : offset+IntSize]) @@ -270,27 +292,12 @@ func (whisper *Whisper) readHeaderCompressed() (err error) { return nil } -type blockInfo struct { - index int - crc32 uint32 - p0, pn1, pn2 dataPoint // pn1/pn2: points at len(block_points) - 1/2 - lastByte byte - lastByteOffset int - lastByteBitPos int - count int -} - -type blockRange struct { - index int - start, end int // start and end timestamps - count int - crc32 uint32 -} - func (a *archiveInfo) blockOffset(blockIndex int) int { return a.offset + blockIndex*a.blockSize } +const maxInt = 1< 0 +func (archive *archiveInfo) getOverallRange() (from, until int) { + for _, b := range archive.blockRanges { + if from == 0 || from > b.start { + from = b.start + } + if b.end > until { + until = b.end + } + } + return } +func (archive *archiveInfo) hasBuffer() bool { return archive.bufferSize > 0 } + func (whisper *Whisper) fetchCompressed(start, end int64, archive *archiveInfo) ([]dataPoint, error) { var dst []dataPoint + var buf = make([]byte, archive.blockSize) for _, block := range archive.getSortedBlockRanges() { if block.end >= int(start) && int(end) >= block.start { - buf := make([]byte, archive.blockSize) if err := whisper.fileReadAt(buf, int64(archive.blockOffset(block.index))); err != nil { - return nil, fmt.Errorf("fetchCompressed: %s", err) + return nil, fmt.Errorf("fetchCompressed.%d.%d: %s", archive.numberOfPoints, block.index, err) } var err error @@ -328,8 +345,13 @@ func (whisper *Whisper) fetchCompressed(start, end int64, archive *archiveInfo) if err != nil { return dst, err } + + for i := 0; i < archive.blockSize; i++ { + buf[i] = 0 + } } } + if archive.hasBuffer() { dps := unpackDataPoints(archive.buffer) for _, p := range dps { @@ -338,14 +360,105 @@ func (whisper *Whisper) fetchCompressed(start, end int64, archive *archiveInfo) } } } + + // Start live aggregation. This probably has a read peformance hit. + if base := whisper.archives[0]; base != archive { + var dps []dataPoint + + // Mix aggregation is triggered when block in base archive is rotated and also + // depends on the sufficiency of data points. This could results to a over + // long gap when fetching data from higer archives, depending on different + // retention policy. Therefore cwhisper needs to do live aggregation. + if whisper.aggregationMethod == Mix { + var baseLookupNeeded = len(dst) == 0 || dst[len(dst)-1].interval < int(end) + var inBase bool + if baseLookupNeeded { + bstart, bend := base.getOverallRange() + inBase = int64(bstart) <= end || end <= int64(bend) + } + + if inBase { + nstart := start + if len(dst) > 0 { + // TODO: invest why shifting the last data point interval is wrong + nstart = int64(archive.Interval(dst[len(dst)-1].interval)) // + archive.secondsPerPoint + } + var err error + dps, err = whisper.fetchCompressed(nstart, end, base) + if err != nil { + return dst, err + } + } + } + + // This would benefits both mix and no-mix aggregations. + if base.hasBuffer() { + for _, p := range unpackDataPoints(base.buffer) { + if p.interval != 0 && int(start) <= p.interval && p.interval <= int(end) { + dps = append(dps, p) + } + } + } + + var pinterval int + var vals []float64 + for i, dp := range dps { + // same as archiveInfo.AggregateInterval + interval := dp.interval - mod(dp.interval, archive.secondsPerPoint) + if pinterval == 0 || pinterval == interval { + pinterval = interval + vals = append(vals, dp.value) + + if i < len(dps)-1 { + continue + } + } + + // check we have enough data points to propagate a value + knownPercent := float32(len(vals)) / float32(archive.secondsPerPoint/base.secondsPerPoint) + if len(vals) > 0 && knownPercent >= whisper.xFilesFactor { + var ndp dataPoint + ndp.interval = pinterval + if whisper.aggregationMethod == Mix { + if archive.aggregationSpec.Method == Percentile { + ndp.value = aggregatePercentile(archive.aggregationSpec.Percentile, vals) + } else { + ndp.value = aggregate(archive.aggregationSpec.Method, vals) + } + } else { + ndp.value = aggregate(whisper.aggregationMethod, vals) + } + dst = append(dst, ndp) + } + + vals = vals[:0] + vals = append(vals, dp.value) + pinterval = interval + } + } + return dst, nil } +// NOTE: this method assumes data saved in higer archives are fixed. If +// we mvoe to allowing data/intervals coming in non-monotonic order, we +// need to rethink the implementation here as well. func (whisper *Whisper) archiveUpdateManyCompressed(archive *archiveInfo, points []*TimeSeriesPoint) error { alignedPoints := alignPoints(archive, points) + // Note: in the current design, mix aggregation doesn't have any buffer in + // higer archives if !archive.hasBuffer() { - return archive.appendToBlockAndRotate(alignedPoints) + rotated, err := archive.appendToBlockAndRotate(alignedPoints) + if err != nil { + return err + } + + if !(whisper.aggregationMethod == Mix && rotated) { + return nil + } + + return whisper.propagateToMixedArchivesCompressed() } baseIntervalsPerUnit, currentUnit, minInterval := archive.getBufferInfo() @@ -388,7 +501,7 @@ func (whisper *Whisper) archiveUpdateManyCompressed(archive *archiveInfo, points continue } - if err := archive.appendToBlockAndRotate(dps); err != nil { + if _, err := archive.appendToBlockAndRotate(dps); err != nil { // TODO: record and continue? return err } @@ -408,6 +521,7 @@ func (whisper *Whisper) archiveUpdateManyCompressed(archive *archiveInfo, points aggregateValue := aggregate(whisper.aggregationMethod, knownValues) point := &TimeSeriesPoint{lowerIntervalStart, aggregateValue} + // TODO: consider migrating to a non-recursive propagation implementation like mix policy if err := whisper.archiveUpdateManyCompressed(lower, []*TimeSeriesPoint{point}); err != nil { return err } @@ -445,7 +559,7 @@ func (archive *archiveInfo) getBufferByUnit(unit int) []byte { return archive.buffer[lb:ub] } -func (archive *archiveInfo) appendToBlockAndRotate(dps []dataPoint) error { +func (archive *archiveInfo) appendToBlockAndRotate(dps []dataPoint) (rotated bool, err error) { whisper := archive.whisper // TODO: optimize away? blockBuffer := make([]byte, len(dps)*(MaxCompressedPointSize)+endOfBlockSize) @@ -460,7 +574,7 @@ func (archive *archiveInfo) appendToBlockAndRotate(dps []dataPoint) error { size = len(blockBuffer) } if err := whisper.fileWriteAt(blockBuffer[:size], int64(offset)); err != nil { - return err + return rotated, err } if len(left) == 0 { @@ -484,13 +598,17 @@ func (archive *archiveInfo) appendToBlockAndRotate(dps []dataPoint) error { archive.cblock = nblock archive.blockRanges[nblock.index].start = 0 archive.blockRanges[nblock.index].end = 0 + + rotated = true } - return nil + return rotated, nil } func (whisper *Whisper) extendIfNeeded() error { var rets []*Retention + var mixSpecs []MixAggregationSpec + var mixSizes = make(map[int][]float32) var extend bool var msg string for _, arc := range whisper.archives { @@ -540,10 +658,20 @@ func (whisper *Whisper) extendIfNeeded() error { filename := whisper.file.Name() os.Remove(whisper.file.Name() + ".extend") + if whisper.aggregationMethod == Mix && len(rets) > 1 { + rets, mixSpecs, mixSizes = extractMixSpecs(rets, whisper.archives) + } + nwhisper, err := CreateWithOptions( whisper.file.Name()+".extend", rets, whisper.aggregationMethod, whisper.xFilesFactor, - &Options{Compressed: true, PointsPerBlock: DefaultPointsPerBlock, InMemory: whisper.opts.InMemory}, + &Options{ + Compressed: true, + PointsPerBlock: DefaultPointsPerBlock, + InMemory: whisper.opts.InMemory, + MixAggregationSpecs: mixSpecs, + MixAvgCompressedPointSizes: mixSizes, + }, ) if err != nil { return fmt.Errorf("extend: %s", err) @@ -563,7 +691,7 @@ func (whisper *Whisper) extendIfNeeded() error { if err != nil { return fmt.Errorf("archives[%d].blocks[%d].read: %s", i, block.index, err) } - if err := nwhisper.archives[i].appendToBlockAndRotate(dst); err != nil { + if _, err := nwhisper.archives[i].appendToBlockAndRotate(dst); err != nil { return fmt.Errorf("archives[%d].blocks[%d].write: %s", i, block.index, err) } } @@ -594,6 +722,37 @@ func (whisper *Whisper) extendIfNeeded() error { return err } +func extractMixSpecs(orets Retentions, arcs []*archiveInfo) (Retentions, []MixAggregationSpec, map[int][]float32) { + var nrets Retentions + var specs []MixAggregationSpec + var sizes = make(map[int][]float32) + var specsCont bool + + for i, ret := range orets { + sizes[ret.secondsPerPoint] = append(sizes[ret.secondsPerPoint], ret.avgCompressedPointSize) + + if len(nrets) == 0 { + nrets = append(nrets, ret) + continue + } + + if ret.secondsPerPoint != nrets[len(nrets)-1].secondsPerPoint { + nrets = append(nrets, ret) + + if len(specs) == 0 { + specs = append(specs, *arcs[i].aggregationSpec) + specsCont = true + } else { + specsCont = false + } + } else if specsCont { + specs = append(specs, *arcs[i].aggregationSpec) + } + } + + return nrets, specs, sizes +} + func (arc *archiveInfo) avgPointsPerBlockReal() float32 { var totalPoints int var totalBlocks int @@ -969,6 +1128,7 @@ func (a *archiveInfo) ReadFromBlock(buf []byte, dst []dataPoint, start, end int) br.bitPos = 7 br.current = PointSize + // the first data point is not compressed p := unpackDataPoint(buf) if start <= p.interval && p.interval <= end { dst = append(dst, p) @@ -986,11 +1146,11 @@ readloop: var p dataPoint if debugCompress { - end := br.current + 8 - if end >= len(br.buf) { - end = len(br.buf) - 1 + endd := br.current + 8 + if endd >= len(br.buf) { + endd = len(br.buf) - 1 } - fmt.Printf("new point %d:\n br.index = %d/%d br.bitPos = %d byte = %08b peek(1) = %08b peek(2) = %08b peek(3) = %08b peek(4) = %08b buf[%d:%d] = %08b\n", len(dst), br.current, len(br.buf), br.bitPos, br.buf[br.current], br.Peek(1), br.Peek(2), br.Peek(3), br.Peek(4), br.current, end, br.buf[br.current:end]) + fmt.Printf("new point %d:\n br.index = %d/%d br.bitPos = %d byte = %08b peek(1) = %08b peek(2) = %08b peek(3) = %08b peek(4) = %08b buf[%d:%d] = %08b\n", len(dst), br.current, len(br.buf), br.bitPos, br.buf[br.current], br.Peek(1), br.Peek(2), br.Peek(3), br.Peek(4), br.current, endd, br.buf[br.current:endd]) } var skip, toRead int @@ -1014,8 +1174,8 @@ readloop: if br.current >= len(buf)-1 { break readloop } - start, end, data := br.trailingDebug() - return dst, br.current, fmt.Errorf("unknown timestamp prefix (archive[%d]): %04b at %d@%d, context[%d-%d] = %08b len(dst) = %d", a.secondsPerPoint, br.Peek(4), br.current, br.bitPos, start, end, data, len(dst)) + start, endd, data := br.trailingDebug() + return dst, br.current, fmt.Errorf("unknown timestamp prefix (archive[%d]): %04b at %d@%d, context[%d-%d] = %08b len(dst) = %d", a.secondsPerPoint, br.Peek(4), br.current, br.bitPos, start, endd, data, len(dst)) } br.Read(skip) @@ -1226,6 +1386,11 @@ func dumpBits(data ...uint64) string { // // CompressTo should stop compression/return errors when runs into any issues (if feasible). func (whisper *Whisper) CompressTo(dstPath string) error { + // Note: doesn't support mix-aggregation. + if whisper.aggregationMethod == Mix { + return errors.New("mix aggregation policy isn't supported.") + } + var rets []*Retention for _, arc := range whisper.archives { rets = append(rets, &Retention{secondsPerPoint: arc.secondsPerPoint, numberOfPoints: arc.numberOfPoints}) @@ -1271,7 +1436,7 @@ func (whisper *Whisper) CompressTo(dstPath string) error { // TODO: consider support moving the last data points to buffer for i := len(whisper.archives) - 1; i >= 0; i-- { points := pointsByArchives[i] - if err := dst.archives[i].appendToBlockAndRotate(points); err != nil { + if _, err := dst.archives[i].appendToBlockAndRotate(points); err != nil { return err } } @@ -1435,11 +1600,11 @@ func (dstw *Whisper) FillCompressed(srcw *Whisper) error { until := int(Now().Unix()) from := until - srcArc.MaxRetention() - srcPoints, err := srcw.Fetch(from, until) + srcPoints, err := srcw.FetchByAggregation(from, until, srcArc.aggregationSpec) if err != nil { return err } - dstPoints, err := dstw.Fetch(from, until) + dstPoints, err := dstw.FetchByAggregation(from, until, srcArc.aggregationSpec) if err != nil { return err } @@ -1464,13 +1629,21 @@ func (dstw *Whisper) FillCompressed(srcw *Whisper) error { rets[i].avgCompressedPointSize = estimatePointSize(points, rets[i], rets[i].calculateSuitablePointsPerBlock(dstw.pointsPerBlock)) } + var mixSpecs []MixAggregationSpec + var mixSizes = make(map[int][]float32) + if dstw.aggregationMethod == Mix && len(rets) > 1 { + rets, mixSpecs, mixSizes = extractMixSpecs(rets, srcw.archives) + } + newDst, err := CreateWithOptions( dstw.file.Name()+".fill", rets, dstw.aggregationMethod, dstw.xFilesFactor, &Options{ FLock: true, Compressed: true, - PointsPerBlock: DefaultPointsPerBlock, - InMemory: true, // need to close file if switch to non in-memory + PointsPerBlock: DefaultPointsPerBlock, + InMemory: true, // need to close file if switch to non in-memory + MixAggregationSpecs: mixSpecs, + MixAvgCompressedPointSizes: mixSizes, }, ) if err != nil { @@ -1480,7 +1653,7 @@ func (dstw *Whisper) FillCompressed(srcw *Whisper) error { for i := len(dstw.archives) - 1; i >= 0; i-- { points := pointsByArchives[i] - if err := newDst.archives[i].appendToBlockAndRotate(points); err != nil { + if _, err := newDst.archives[i].appendToBlockAndRotate(points); err != nil { return err } copy(newDst.archives[i].buffer, dstw.archives[i].buffer) @@ -1503,3 +1676,184 @@ func (dstw *Whisper) FillCompressed(srcw *Whisper) error { return nil } + +func (whisper *Whisper) propagateToMixedArchivesCompressed() error { + var largestSPP int + var lastArchive *archiveInfo + var spps []int + for _, arc := range whisper.archives[1:] { + if arc.secondsPerPoint > largestSPP { + largestSPP = arc.secondsPerPoint + lastArchive = arc + } + + var knownSPP bool + for _, spp := range spps { + knownSPP = knownSPP || (arc.secondsPerPoint == spp) + } + if !knownSPP { + spps = append(spps, arc.secondsPerPoint) + } + } + if largestSPP == 0 { + return nil + } + + var baseArchive = whisper.archives[0] + var sortedBaseArcBrs = baseArchive.getSortedBlockRanges() + var until = baseArchive.cblock.pn1.interval + if until == 0 { + for _, br := range sortedBaseArcBrs { + if br.end == 0 { + break + } + + until = br.end + } + + if until == 0 { + return nil + } + } + // always exclude the last data point to make sure it's not a pre-mature propagation. + until = lastArchive.Interval(until) - 1 + if until <= 0 { + return nil + } + + var from int + if lastArchive.cblock.pn1.interval == 0 { + if sortedBaseArcBrs[0].start == 0 { + return nil + } + for _, br := range lastArchive.getSortedBlockRanges() { + if br.end == 0 { + break + } + from = br.end + } + + if from == 0 { + from = sortedBaseArcBrs[0].start + } + } else { + from = lastArchive.cblock.pn1.interval + lastArchive.secondsPerPoint + } + + // only propagate when there are enough data points for all the lower + // archives, for perfomance reason (in theory). + if until-from < largestSPP { + return nil + } + + dps, err := whisper.fetchCompressed(int64(from), int64(until), baseArchive) + if err != nil { + return fmt.Errorf("mix: failed to baseArchive.fetchCompressed(%d, %d): %s", from, until, err) + } + if len(dps) == 0 { + return nil // TODO: should be an error? + } + + type groupedDataPoint struct { + interval int + values []float64 + } + var dpsBySPP = map[int][]*groupedDataPoint{} + for _, dp := range dps { + for _, spp := range spps { + interval := dp.interval - mod(dp.interval, spp) // same as archiveInfo.AggregateInterval + if len(dpsBySPP[spp]) == 0 { + dpsBySPP[spp] = append(dpsBySPP[spp], &groupedDataPoint{ + interval: interval, + values: []float64{dp.value}, + }) + continue + } + + if gdp := dpsBySPP[spp][len(dpsBySPP[spp])-1]; gdp.interval == interval { + gdp.values = append(gdp.values, dp.value) + continue + } + + gdp := &groupedDataPoint{ + interval: interval, + values: []float64{dp.value}, + } + dpsBySPP[spp] = append(dpsBySPP[spp], gdp) + + // check we have enough data points to propagate a value + knownPercent := float32(len(gdp.values)) / float32(spp/baseArchive.secondsPerPoint) + if knownPercent < whisper.xFilesFactor { + dpsBySPP[spp] = dpsBySPP[spp][:len(dpsBySPP[spp])-1] + continue + } + + // sorted for percentiles + sort.Float64s(gdp.values) + } + } + + if len(dpsBySPP[largestSPP]) == 0 { + return nil + } + + var skipInterval int + var maxBufferSPP = 3 // TODO: come out with a better value? + // Handle cases of retentions ratio smaller than 3 between base and the + // last archives. + if ratio := whisper.archives[0].MaxRetention() / largestSPP; ratio < maxBufferSPP { + maxBufferSPP = 0 + } + // Make sure that we don't propagate prematurely by checking there are + // enough data points for the last archives. + for i := len(dpsBySPP[largestSPP]) - 1; i >= 0 && i >= len(dpsBySPP[largestSPP])-maxBufferSPP; i-- { + if len(dpsBySPP[largestSPP][i].values) < largestSPP/whisper.archives[0].secondsPerPoint { + skipInterval = dpsBySPP[largestSPP][i].interval + } + } + + for _, arc := range whisper.archives[1:] { + gdps := dpsBySPP[arc.secondsPerPoint] + dps := make([]dataPoint, len(gdps)) + for i, gdp := range gdps { + if skipInterval > 0 && gdp.interval >= skipInterval { + dps = dps[:i] + break + } + + dps[i].interval = gdp.interval + + if arc.aggregationSpec.Method == Percentile { + dps[i].value = aggregatePercentile(arc.aggregationSpec.Percentile, gdp.values) + } else { + dps[i].value = aggregate(arc.aggregationSpec.Method, gdp.values) + } + } + if len(dps) == 0 { + continue + } + + if _, err := arc.appendToBlockAndRotate(dps); err != nil { + return fmt.Errorf("mix: failed to propagate archive %s: %s", arc.Retention, err) + } + } + + return nil +} + +// Same implementation copied from carbonapi, without using quickselect for +// keeping zero dependency. +// percentile values: 0 - 100 +func aggregatePercentile(p float32, vals []float64) float64 { + if len(vals) == 0 || p < 0 || p > 100 { + return math.NaN() + } + + k := (float64(len(vals)-1) * float64(p)) / 100 + index := int(math.Ceil(k)) + remainder := k - float64(int(k)) + if remainder == 0 { + return vals[index] + } + return (vals[index] * remainder) + (vals[index-1] * (1 - remainder)) +} diff --git a/vendor/github.com/go-graphite/go-whisper/debug.go b/vendor/github.com/go-graphite/go-whisper/debug.go index 072a326ac..77586b4ac 100644 --- a/vendor/github.com/go-graphite/go-whisper/debug.go +++ b/vendor/github.com/go-graphite/go-whisper/debug.go @@ -3,6 +3,7 @@ package whisper import ( "fmt" "os" + "time" ) func (whisper *Whisper) CheckIntegrity() { @@ -59,17 +60,30 @@ func (whisper *Whisper) Dump(all, showDecompressionInfo bool) { fmt.Printf("aggregation_method: %s\n", whisper.aggregationMethod) fmt.Printf("max_retention: %d\n", whisper.maxRetention) fmt.Printf("x_files_factor: %f\n", whisper.xFilesFactor) - if whisper.compressed { + whisper.compressed = false + ssize := whisper.Size() + whisper.compressed = true + csize := whisper.Size() + var ratio float64 + if ssize != 0 { + ratio = float64(csize) / float64(ssize) + } + fmt.Printf("comp_version: %d\n", whisper.compVersion) fmt.Printf("points_per_block: %d\n", whisper.pointsPerBlock) fmt.Printf("avg_compressed_point_size: %f\n", whisper.avgCompressedPointSize) fmt.Printf("crc32: %X\n", whisper.crc32) + fmt.Printf("compression_ratio: %f (compressed/standard: %d/%d)\n", ratio, csize, ssize) } fmt.Printf("archives: %d\n", len(whisper.archives)) for i, arc := range whisper.archives { - fmt.Printf("archives.%d.retention: %s\n", i, arc.Retention) + var agg string + if arc.aggregationSpec != nil { + agg = fmt.Sprintf(" (%s)", arc.aggregationSpec) + } + fmt.Printf("archives.%d.retention: %s%s\n", i, arc.Retention, agg) } for i, arc := range whisper.archives { @@ -106,11 +120,16 @@ func (archive *archiveInfo) dumpInfoCompressed() { fmt.Printf("block_count: %d\n", archive.blockCount) fmt.Printf("points_per_block: %d\n", archive.calculateSuitablePointsPerBlock(archive.whisper.pointsPerBlock)) fmt.Printf("compression_ratio: %f (%d/%d)\n", float64(archive.blockSize*archive.blockCount)/float64(archive.Size()), archive.blockSize*archive.blockCount, archive.Size()) + if archive.aggregationSpec != nil { + fmt.Printf("aggregation: %s\n", archive.aggregationSpec) + } + + toTime := func(t int) string { return time.Unix(int64(t), 0).Format("2006-01-02 15:04:05") } fmt.Printf("cblock\n") fmt.Printf(" index: %d\n", archive.cblock.index) - fmt.Printf(" p[0].interval: %d\n", archive.cblock.p0.interval) - fmt.Printf(" p[n-2].interval: %d\n", archive.cblock.pn2.interval) - fmt.Printf(" p[n-1].interval: %d\n", archive.cblock.pn1.interval) + fmt.Printf(" p[0].interval: %d %s\n", archive.cblock.p0.interval, toTime(archive.cblock.p0.interval)) + fmt.Printf(" p[n-2].interval: %d %s\n", archive.cblock.pn2.interval, toTime(archive.cblock.pn2.interval)) + fmt.Printf(" p[n-1].interval: %d %s\n", archive.cblock.pn1.interval, toTime(archive.cblock.pn1.interval)) fmt.Printf(" last_byte: %08b\n", archive.cblock.lastByte) fmt.Printf(" last_byte_offset: %d\n", archive.cblock.lastByteOffset) fmt.Printf(" last_byte_bit_pos: %d\n", archive.cblock.lastByteBitPos) @@ -125,10 +144,18 @@ func (archive *archiveInfo) dumpInfoCompressed() { lastByteOffset = archive.cblock.lastByteOffset } fmt.Printf( - "%02d: %10d - %10d count:%5d crc32:%08x start_offset:%d last_byte_offset: %d\n", - block.index, block.start, - block.end, block.count, block.crc32, + "%02d: %10d %s - %10d %s count:%5d crc32:%08x start:%d last_byte:%d end:%d\n", + block.index, + block.start, toTime(block.start), + block.end, toTime(block.end), + (func() int { + if block.count == 0 { + return 0 + } + return block.count + 1 + })(), block.crc32, archive.blockOffset(block.index), lastByteOffset, + archive.blockOffset(block.index)+archive.blockSize, ) } } @@ -142,6 +169,11 @@ func (arc *archiveInfo) dumpDataPointsCompressed() { } } + if arc.aggregationSpec != nil { + fmt.Printf("aggregation: %s\n", arc.aggregationSpec) + } + + toTime := func(t int) string { return time.Unix(int64(t), 0).Format("2006-01-02 15:04:05") } for _, block := range arc.blockRanges { fmt.Printf("archive %s block %d @%d\n", arc.Retention, block.index, arc.blockOffset(block.index)) if block.start == 0 { @@ -159,18 +191,18 @@ func (arc *archiveInfo) dumpDataPointsCompressed() { panic(err) } - endOffset := arc.blockSize + blockSize := arc.blockSize if block.index == arc.cblock.index { - endOffset = arc.cblock.lastByteOffset - arc.blockOffset(block.index) + blockSize = arc.cblock.lastByteOffset - arc.blockOffset(block.index) } - crc := crc32(buf[:endOffset], 0) + crc := crc32(buf[:blockSize], 0) startOffset := int(arc.blockOffset(block.index)) - fmt.Printf("crc32: %08x check: %08x startOffset: %d endOffset: %d length: %d\n", block.crc32, crc, startOffset, startOffset+endOffset, endOffset) + fmt.Printf("crc32: %08x check: %08x start: %d end: %d length: %d\n", block.crc32, crc, startOffset, startOffset+blockSize, blockSize) for i, p := range dps { // continue - fmt.Printf(" % 4d %d: %v\n", i, p.interval, p.value) + fmt.Printf(" % 4d %d %s: %f\n", i, p.interval, toTime(p.interval), p.value) } } } @@ -211,4 +243,5 @@ func GenTestArchive(buf []byte, ret Retention) *archiveInfo { return &na } + func GenDataPointSlice() []dataPoint { return []dataPoint{} } diff --git a/vendor/github.com/go-graphite/go-whisper/whisper.go b/vendor/github.com/go-graphite/go-whisper/whisper.go index a056cca68..2248443c9 100644 --- a/vendor/github.com/go-graphite/go-whisper/whisper.go +++ b/vendor/github.com/go-graphite/go-whisper/whisper.go @@ -20,6 +20,7 @@ import ( const ( // size constants + ByteSize = 1 IntSize = 4 FloatSize = 4 Float64Size = 8 @@ -37,6 +38,7 @@ const ( Years = 86400 * 365 ) +// Note: 4 bytes long in Whisper Header, 1 byte long in Archive Header type AggregationMethod int const ( @@ -45,6 +47,10 @@ const ( Last Max Min + First + + Mix // only used in whisper header + Percentile // only used in archive header ) func (am AggregationMethod) String() string { @@ -53,16 +59,24 @@ func (am AggregationMethod) String() string { return "average" case Sum: return "sum" + case First: + return "first" case Last: return "last" case Max: return "max" case Min: return "min" + case Mix: + return "mix" + case Percentile: + return "percentile" } return fmt.Sprintf("%d", am) } +// func ParseAggregationMethods() {} + type Options struct { Sparse bool FLock bool @@ -72,6 +86,134 @@ type Options struct { PointSize float32 InMemory bool OpenFileFlag *int + + MixAggregationSpecs []MixAggregationSpec + MixAvgCompressedPointSizes map[int][]float32 +} + +type MixAggregationSpec struct { + Method AggregationMethod + Percentile float32 +} + +type file interface { + Seek(offset int64, whence int) (ret int64, err error) + Fd() uintptr + ReadAt(b []byte, off int64) (n int, err error) + WriteAt(b []byte, off int64) (n int, err error) + Read(b []byte) (n int, err error) + Name() string + Close() error + Write(b []byte) (n int, err error) + Truncate(size int64) error +} + +/* + Represents a Whisper database file. +*/ +type Whisper struct { + // file *os.File + file file + + // Metadata + aggregationMethod AggregationMethod + maxRetention int + xFilesFactor float32 + archives []*archiveInfo + + compressed bool + compVersion uint8 + pointsPerBlock int + avgCompressedPointSize float32 + + crc32 uint32 + + opts *Options + Extended bool +} + +/* + A retention level. + + Retention levels describe a given archive in the database. How detailed it is and how far back + it records. +*/ +type Retention struct { + secondsPerPoint int + numberOfPoints int + + // for compressed whisper (internal) + avgCompressedPointSize float32 + blockCount int +} + +/* + Describes a time series in a file. + + The only addition this type has over a Retention is the offset at which it exists within the + whisper file. +*/ +type archiveInfo struct { + Retention + offset int + + next *archiveInfo + whisper *Whisper + + // why having buffer: + // + // original reasons: + // 1. less file writes per point + // 2. less file reads & no decompressions on propagation + // + // necessary reasons: + // cwhisper doesn't expect data points coming in randomly, having a buffer + // allows it to tolerate data points with different timestamp coming in + // non-increasing order for whatever reasons. But only the first/base archive + // is necessary to have it, so it's possible to optimize away buffers in lower + // archives. + buffer []byte + bufferSize int // dynamically calculated in Whisper.initMetaInfo + + blockRanges []blockRange // TODO: remove: sorted by start + blockSize int + cblock blockInfo // mostly for quick block write + + aggregationSpec *MixAggregationSpec + + stats struct { + // interval and value stats are not saved on disk because they could be + // regenerated by scanning blocks + interval struct { + len1, len9, len12, len16, len36 uint32 + } + value struct { + same, sameLen, variedLen uint32 + } + + extended uint32 + + discard struct { + oldInterval uint32 + } + } +} + +type blockInfo struct { + index int + crc32 uint32 + p0, pn1, pn2 dataPoint // pn1/pn2: points at len(block_points) - 1/2 + lastByte byte + lastByteOffset int + lastByteBitPos int + count int +} + +type blockRange struct { + index int + start, end int // start and end timestamps + count int // should be named as index + crc32 uint32 } func unitMultiplier(s string) (int, error) { @@ -151,42 +293,6 @@ func ParseRetentionDefs(retentionDefs string) (Retentions, error) { return retentions, nil } -type file interface { - Seek(offset int64, whence int) (ret int64, err error) - Fd() uintptr - ReadAt(b []byte, off int64) (n int, err error) - WriteAt(b []byte, off int64) (n int, err error) - Read(b []byte) (n int, err error) - Name() string - Close() error - Write(b []byte) (n int, err error) - Truncate(size int64) error -} - -/* - Represents a Whisper database file. -*/ -type Whisper struct { - // file *os.File - file file - - // Metadata - aggregationMethod AggregationMethod - maxRetention int - xFilesFactor float32 - archives []*archiveInfo - - compressed bool - compVersion uint8 - pointsPerBlock int - avgCompressedPointSize float32 - - crc32 uint32 - - opts *Options - Extended bool -} - // Wrappers for whisper.file operations func (whisper *Whisper) fileWriteAt(b []byte, off int64) error { _, err := whisper.file.WriteAt(b, off) @@ -210,10 +316,16 @@ func Create(path string, retentions Retentions, aggregationMethod AggregationMet } // CreateWithOptions is more customizable create function +// +// avgCompressedPointSize specification order: +// Options.PointSize < Retention.avgCompressedPointSize < Options.MixAggregationSpecs.AvgCompressedPointSize func CreateWithOptions(path string, retentions Retentions, aggregationMethod AggregationMethod, xFilesFactor float32, options *Options) (whisper *Whisper, err error) { if options == nil { options = &Options{} } + if aggregationMethod == Mix && !options.Compressed { + return nil, errors.New("mix aggregation method is currently supported only for compressed format") + } sort.Sort(retentionsByPrecision{retentions}) if err = validateRetentions(retentions); err != nil { return nil, err @@ -265,7 +377,7 @@ func CreateWithOptions(path string, retentions Retentions, aggregationMethod Agg } // Set the archive info - for _, retention := range retentions { + for i, retention := range retentions { archive := &archiveInfo{Retention: *retention} if archive.avgCompressedPointSize == 0 { @@ -275,15 +387,37 @@ func CreateWithOptions(path string, retentions Retentions, aggregationMethod Agg archive.blockCount = whisper.blockCount(archive) } - whisper.archives = append(whisper.archives, archive) + if whisper.aggregationMethod == Mix && i > 0 { + for i, spec := range options.MixAggregationSpecs { + narchive := *archive + narchive.aggregationSpec = &MixAggregationSpec{Method: spec.Method, Percentile: spec.Percentile} + ssp := narchive.secondsPerPoint + sindex := i % len(options.MixAggregationSpecs) + if msizes := options.MixAvgCompressedPointSizes; msizes != nil && + msizes[ssp] != nil && + isGoodFloat32(msizes[ssp][sindex]) { + narchive.avgCompressedPointSize = msizes[ssp][sindex] + } + + whisper.archives = append(whisper.archives, &narchive) + } + } else { + whisper.archives = append(whisper.archives, archive) + } } offset := whisper.MetadataSize() for i, retention := range retentions { - archive := whisper.archives[i] - archive.offset = offset + if !whisper.compressed { + archive := whisper.archives[i] + archive.offset = offset + offset += retention.Size() - if whisper.compressed { + continue + } + + if whisper.aggregationMethod != Mix || i == 0 { + archive := whisper.archives[i] if math.IsNaN(float64(archive.avgCompressedPointSize)) || archive.avgCompressedPointSize <= 0 { archive.avgCompressedPointSize = avgCompressedPointSize } @@ -295,14 +429,26 @@ func CreateWithOptions(path string, retentions Retentions, aggregationMethod Agg ppb := archive.calculateSuitablePointsPerBlock(whisper.pointsPerBlock) archive.blockSize = int(math.Ceil(float64(ppb)*float64(archive.avgCompressedPointSize))) + endOfBlockSize archive.blockRanges = make([]blockRange, archive.blockCount) + + archive.offset = offset offset += archive.blockSize * archive.blockCount if i > 0 { size := archive.secondsPerPoint / whisper.archives[i-1].secondsPerPoint * PointSize * 2 whisper.archives[i-1].buffer = make([]byte, size) } - } else { - offset += retention.Size() + + continue + } + + for j := range options.MixAggregationSpecs { + archive := whisper.archives[1+(i-1)*len(options.MixAggregationSpecs)+j] + archive.cblock.lastByteBitPos = 7 + archive.blockSize = int(math.Ceil(float64(whisper.pointsPerBlock)*float64(archive.avgCompressedPointSize))) + endOfBlockSize + archive.blockRanges = make([]blockRange, archive.blockCount) + + archive.offset = offset + offset += archive.blockSize * archive.blockCount } } @@ -335,6 +481,10 @@ func CreateWithOptions(path string, retentions Retentions, aggregationMethod Agg return whisper, nil } +func isGoodFloat32(n float32) bool { + return !math.IsNaN(float64(n)) && n > 0.0 +} + func (whisper *Whisper) blockCount(archive *archiveInfo) int { return int(math.Ceil(float64(archive.numberOfPoints)/float64(archive.calculateSuitablePointsPerBlock(whisper.pointsPerBlock)))) + 1 } @@ -498,7 +648,10 @@ func (whisper *Whisper) initMetaInfo() { prevArc := whisper.archives[i-1] prevArc.next = arc - prevArc.bufferSize = arc.secondsPerPoint / prevArc.secondsPerPoint * PointSize * bufferCount + + if whisper.aggregationMethod != Mix { + prevArc.bufferSize = arc.secondsPerPoint / prevArc.secondsPerPoint * PointSize * bufferCount + } } } @@ -565,6 +718,10 @@ func (whisper *Whisper) blockRangesSize() int { } func (whisper *Whisper) bufferSize() int { + if whisper.aggregationMethod == Mix { + return 0 + } + if len(whisper.archives) == 0 { return 0 } @@ -583,6 +740,8 @@ func (whisper *Whisper) AggregationMethod() string { aggr = "Average" case Sum: aggr = "Sum" + case First: + aggr = "First" case Last: aggr = "Last" case Max: @@ -679,7 +838,7 @@ func (whisper *Whisper) UpdateMany(points []*TimeSeriesPoint) (err error) { return whisper.UpdateManyForArchive(points, -1) } -func (whisper *Whisper) UpdateManyForArchive(points []*TimeSeriesPoint, archiveSpecifier int) (err error) { +func (whisper *Whisper) UpdateManyForArchive(points []*TimeSeriesPoint, targetRetention int) (err error) { // recover panics and return as error defer func() { if e := recover(); e != nil { @@ -696,7 +855,7 @@ func (whisper *Whisper) UpdateManyForArchive(points []*TimeSeriesPoint, archiveS var currentPoints []*TimeSeriesPoint for i := 0; i < len(whisper.archives); i++ { archive := whisper.archives[i] - if archiveSpecifier != -1 && archiveSpecifier != archive.MaxRetention() { + if targetRetention != -1 && targetRetention != archive.MaxRetention() { continue } @@ -708,6 +867,13 @@ func (whisper *Whisper) UpdateManyForArchive(points []*TimeSeriesPoint, archiveS // reverse currentPoints reversePoints(currentPoints) if whisper.compressed { + // Backfilling lower archives is not allowed/supported for mix + // aggreation policy with the current api, a new api/parameter is + // needed to specify which aggregaton target to backfill. + if whisper.aggregationMethod == Mix && i > 0 { + break + } + err = whisper.archiveUpdateManyCompressed(archive, currentPoints) } else { err = whisper.archiveUpdateMany(archive, currentPoints) @@ -985,6 +1151,10 @@ func (whisper *Whisper) StartTime() int { Fetch a TimeSeries for a given time span from the file. */ func (whisper *Whisper) Fetch(fromTime, untilTime int) (timeSeries *TimeSeries, err error) { + return whisper.FetchByAggregation(fromTime, untilTime, nil) +} + +func (whisper *Whisper) FetchByAggregation(fromTime, untilTime int, spec *MixAggregationSpec) (timeSeries *TimeSeries, err error) { now := int(Now().Unix()) // TODO: danger of 2030 something overflow if fromTime > untilTime { return nil, fmt.Errorf("Invalid time interval: from time '%d' is after until time '%d'", fromTime, untilTime) @@ -1011,9 +1181,18 @@ func (whisper *Whisper) Fetch(fromTime, untilTime int) (timeSeries *TimeSeries, var archive *archiveInfo for _, archive = range whisper.archives { if archive.MaxRetention() >= diff { + // TODO: select a default aggregation policy? + if whisper.aggregationMethod == Mix && spec != nil && archive.aggregationSpec != nil && spec.String() != archive.aggregationSpec.String() { + continue + } + break } } + // NOTE: base (i.e. the first) archive should have a nil aggregationSpec + if whisper.aggregationMethod == Mix && spec != nil && archive.aggregationSpec != nil && spec.String() != archive.aggregationSpec.String() { + return nil, fmt.Errorf("target aggregation %s not found", spec) + } fromInterval := archive.Interval(fromTime) untilInterval := archive.Interval(untilTime) @@ -1149,21 +1328,6 @@ func (whisper *Whisper) readInt(offset int64) (int, error) { return unpackInt(b), nil } -/* - A retention level. - - Retention levels describe a given archive in the database. How detailed it is and how far back - it records. -*/ -type Retention struct { - secondsPerPoint int - numberOfPoints int - - // for compressed whisper (internal) - avgCompressedPointSize float32 - blockCount int -} - func (r *Retention) MaxRetention() int { return r.secondsPerPoint * r.numberOfPoints } func (r *Retention) Size() int { return r.numberOfPoints * PointSize } func (r *Retention) SecondsPerPoint() int { return r.secondsPerPoint } @@ -1224,47 +1388,6 @@ func (r retentionsByPrecision) Less(i, j int) bool { return r.Retentions[i].secondsPerPoint < r.Retentions[j].secondsPerPoint } -/* - Describes a time series in a file. - - The only addition this type has over a Retention is the offset at which it exists within the - whisper file. -*/ -type archiveInfo struct { - Retention - offset int - - next *archiveInfo - whisper *Whisper - - // reason: - // 1. less file writes per point - // 2. less file reads & no decompressions on propagation - buffer []byte - bufferSize int - - blockRanges []blockRange // TODO: remove: sorted by start - blockSize int - cblock blockInfo // mostly for quick block write - - stats struct { - // interval and value stats are not saved on disk because they could be - // regenerated by scanning blocks - interval struct { - len1, len9, len12, len16, len36 uint32 - } - value struct { - same, sameLen, variedLen uint32 - } - - extended uint32 - - discard struct { - oldInterval uint32 - } - } -} - func (archive *archiveInfo) Offset() int64 { return int64(archive.offset) } @@ -1382,6 +1505,8 @@ func aggregate(method AggregationMethod, knownValues []float64) float64 { return sum(knownValues) / float64(len(knownValues)) case Sum: return sum(knownValues) + case First: + return knownValues[0] case Last: return knownValues[len(knownValues)-1] case Max: @@ -1497,3 +1622,13 @@ func crc32(data []byte, prev uint32) uint32 { } func (whisper *Whisper) File() *os.File { return whisper.file.(*os.File) } + +func (mas *MixAggregationSpec) String() string { + if mas.Method == Percentile { + if float32(int(mas.Percentile)) == mas.Percentile { + return fmt.Sprintf("p%0.0f", mas.Percentile) + } + return fmt.Sprintf("p%f", mas.Percentile) + } + return mas.Method.String() +}