Skip to content

Commit

Permalink
Merge pull request #20544 from lesam/series-iteration-optimization
Browse files Browse the repository at this point in the history
feat(tsi): optimize series iteration
  • Loading branch information
lesam authored Jan 25, 2021
2 parents fe3af66 + 98a76a1 commit d28bcb8
Show file tree
Hide file tree
Showing 6 changed files with 297 additions and 1 deletion.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ require (
github.com/prometheus/prometheus v0.0.0-20200609090129-a6600f564e3c
github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52
github.com/spf13/cast v1.3.0
github.com/stretchr/testify v1.5.1
github.com/tinylib/msgp v1.1.0
github.com/willf/bitset v1.1.9 // indirect
github.com/xlab/treeprint v0.0.0-20180616005107-d6fb6747feb6
Expand Down
56 changes: 56 additions & 0 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2498,6 +2498,40 @@ type indexTagSets interface {
TagSets(name []byte, options query.IteratorOptions) ([]*query.TagSet, error)
}

// createSeriesIterator creates an optimized series iterator if possible.
// We exclude less-common cases for now as not worth implementing.
func (e *Engine) createSeriesIterator(measurement string, ref *influxql.VarRef, is tsdb.IndexSet, opt query.IteratorOptions) (query.Iterator, error) {
// Main check to see if we are trying to create a seriesKey iterator
if ref == nil || ref.Val != "_seriesKey" || len(opt.Aux) != 0 {
return nil, nil
}
// Check some other cases that we could maybe handle, but don't
if len(opt.Dimensions) > 0 {
return nil, nil
}
if opt.SLimit != 0 || opt.SOffset != 0 {
return nil, nil
}
if opt.StripName {
return nil, nil
}
if opt.Ordered {
return nil, nil
}
// Actual creation of the iterator
seriesCursor, err := is.MeasurementSeriesKeyByExprIterator([]byte(measurement), opt.Condition, opt.Authorizer)
if err != nil {
seriesCursor.Close()
return nil, err
}
var seriesIterator query.Iterator
seriesIterator = newSeriesIterator(measurement, seriesCursor)
if opt.InterruptCh != nil {
seriesIterator = query.NewInterruptIterator(seriesIterator, opt.InterruptCh)
}
return seriesIterator, nil
}

func (e *Engine) createCallIterator(ctx context.Context, measurement string, call *influxql.Call, opt query.IteratorOptions) ([]query.Iterator, error) {
ref, _ := call.Args[0].(*influxql.VarRef)

Expand All @@ -2507,6 +2541,28 @@ func (e *Engine) createCallIterator(ctx context.Context, measurement string, cal
return nil, nil
}

// check for optimized series iteration for tsi index
if e.index.Type() == tsdb.TSI1IndexName {
indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
seriesOpt := opt
if len(opt.Dimensions) == 0 && call.Name == "count" {
// no point ordering the series if we are just counting all of them
seriesOpt.Ordered = false
}
seriesIterator, err := e.createSeriesIterator(measurement, ref, indexSet, seriesOpt)
if err != nil {
return nil, err
}
if seriesIterator != nil {
callIterator, err := query.NewCallIterator(seriesIterator, opt)
if err != nil {
seriesIterator.Close()
return nil, err
}
return []query.Iterator{callIterator}, nil
}
}

// Determine tagsets for this measurement based on dimensions and filters.
var (
tagSets []*query.TagSet
Expand Down
75 changes: 75 additions & 0 deletions tsdb/engine/tsm1/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
"github.com/influxdata/influxdb/tsdb/index/inmem"
"github.com/influxdata/influxql"
tassert "github.com/stretchr/testify/assert"
)

// Ensure that deletes only sent to the WAL will clear out the data from the cache on restart
Expand Down Expand Up @@ -2028,6 +2029,80 @@ func TestEngine_CreateCursor_Descending(t *testing.T) {
}
}

// Ensure engine can create an descending iterator for cached values.
func TestEngine_CreateIterator_SeriesKey(t *testing.T) {
t.Parallel()

for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
assert := tassert.New(t)
e := MustOpenEngine(index)
defer e.Close()

e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
e.CreateSeriesIfNotExists([]byte("cpu,host=A,region=east"), []byte("cpu"), models.NewTags(map[string]string{"host": "A", "region": "east"}))
e.CreateSeriesIfNotExists([]byte("cpu,host=B,region=east"), []byte("cpu"), models.NewTags(map[string]string{"host": "B", "region": "east"}))
e.CreateSeriesIfNotExists([]byte("cpu,host=C,region=east"), []byte("cpu"), models.NewTags(map[string]string{"host": "C", "region": "east"}))
e.CreateSeriesIfNotExists([]byte("cpu,host=A,region=west"), []byte("cpu"), models.NewTags(map[string]string{"host": "A", "region": "west"}))

if err := e.WritePointsString(
`cpu,host=A,region=east value=1.1 1000000001`,
`cpu,host=B,region=east value=1.2 1000000002`,
`cpu,host=A,region=east value=1.3 1000000003`,
`cpu,host=C,region=east value=1.4 1000000004`,
`cpu,host=A,region=west value=1.5 1000000005`,
); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}

opts := query.IteratorOptions{
Expr: influxql.MustParseExpr(`_seriesKey`),
Dimensions: []string{},
StartTime: influxql.MinTime,
EndTime: influxql.MaxTime,
Condition: influxql.MustParseExpr(`host = 'A'`),
}

itr, err := e.CreateIterator(context.Background(), "cpu", opts)
if err != nil {
t.Fatal(err)
}

stringItr, ok := itr.(query.StringIterator)
assert.True(ok, "series iterator must be of type string")
expectedSeries := map[string]struct{}{
"cpu,host=A,region=west": struct{}{},
"cpu,host=A,region=east": struct{}{},
}
var str *query.StringPoint
for str, err = stringItr.Next(); err == nil && str != (*query.StringPoint)(nil); str, err = stringItr.Next() {
_, ok := expectedSeries[str.Value]
assert.True(ok, "Saw bad key "+str.Value)
delete(expectedSeries, str.Value)
}
assert.NoError(err)
assert.NoError(itr.Close())

countOpts := opts
countOpts.Expr = influxql.MustParseExpr(`count(_seriesKey)`)
itr, err = e.CreateIterator(context.Background(), "cpu", countOpts)
if err != nil {
t.Fatal(err)
}

integerIter, ok := itr.(query.IntegerIterator)
assert.True(ok, "series count iterator must be of type integer")
i, err := integerIter.Next()
assert.NoError(err)
assert.Equal(int64(2), i.Value, "must count 2 series with host=A")
i, err = integerIter.Next()
assert.NoError(err)
assert.Equal((*query.IntegerPoint)(nil), i, "count iterator has only one output")
assert.NoError(itr.Close())
})
}
}

func makeBlockTypeSlice(n int) []byte {
r := make([]byte, n)
b := tsm1.BlockFloat64
Expand Down
71 changes: 71 additions & 0 deletions tsdb/engine/tsm1/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tsm1
import (
"context"
"fmt"
"sync"

"github.com/influxdata/influxdb/pkg/metrics"
"github.com/influxdata/influxdb/pkg/tracing"
Expand Down Expand Up @@ -216,3 +217,73 @@ func newInstrumentedIterator(ctx context.Context, itr query.Iterator) query.Iter
panic(fmt.Sprintf("unsupported instrumented iterator type: %T", itr))
}
}

type seriesIterator struct {
cur tsdb.SeriesKeyIterator
point query.StringPoint // reusable buffer

statsLock sync.Mutex
stats query.IteratorStats
statsBuf query.IteratorStats
}

func newSeriesIterator(name string, cur tsdb.SeriesKeyIterator) *seriesIterator {
itr := &seriesIterator{
cur: cur,
point: query.StringPoint{
Name: name,
Tags: query.NewTags(nil),
},
}
itr.stats = itr.statsBuf
return itr
}

// Next returns the next point from the iterator.
func (itr *seriesIterator) Next() (*query.StringPoint, error) {
for {
// Read from the main cursor
b, err := itr.cur.Next()
if err != nil {
itr.copyStats()
return nil, err
}
itr.point.Value = string(b)

// Exit if we have no more points or we are outside our time range.
if b == nil {
itr.copyStats()
return nil, nil
}
// Track points returned.
itr.statsBuf.PointN++
itr.statsBuf.SeriesN++

// Copy buffer to stats periodically.
if itr.statsBuf.PointN%statsBufferCopyIntervalN == 0 {
itr.copyStats()
}

return &itr.point, nil
}
}

// copyStats copies from the itr stats buffer to the stats under lock.
func (itr *seriesIterator) copyStats() {
itr.statsLock.Lock()
itr.stats = itr.statsBuf
itr.statsLock.Unlock()
}

// Stats returns stats on the points processed.
func (itr *seriesIterator) Stats() query.IteratorStats {
itr.statsLock.Lock()
stats := itr.stats
itr.statsLock.Unlock()
return stats
}

// Close closes the iterator.
func (itr *seriesIterator) Close() error {
return itr.cur.Close()
}
93 changes: 93 additions & 0 deletions tsdb/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,12 @@ type SeriesIDIterator interface {
Close() error
}

// SeriesKeyIterator represents an iterator over a list of SeriesKeys
type SeriesKeyIterator interface {
Next() ([]byte, error)
Close() error
}

// SeriesIDSetIterator represents an iterator that can produce a SeriesIDSet.
type SeriesIDSetIterator interface {
SeriesIDIterator
Expand Down Expand Up @@ -1918,6 +1924,93 @@ func (is IndexSet) measurementSeriesByExprIterator(name []byte, expr influxql.Ex
return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil
}

type measurementSeriesKeyByExprIterator struct {
ids SeriesIDIterator
is IndexSet
auth query.Authorizer
once sync.Once
releaser func()
}

func (itr *measurementSeriesKeyByExprIterator) Next() ([]byte, error) {
if itr == nil {
return nil, nil
}
for {
e, err := itr.ids.Next()
if err != nil {
return nil, err
} else if e.SeriesID == 0 {
return nil, nil
}

seriesKey := itr.is.SeriesFile.SeriesKey(e.SeriesID)
if len(seriesKey) == 0 {
continue
}

name, tags := ParseSeriesKey(seriesKey)

// Check leftover filters. All fields that might be filtered default to zero values
if e.Expr != nil {
if v, ok := e.Expr.(*influxql.BooleanLiteral); ok {
if !v.Val {
continue
}
} else {
values := make(map[string]interface{}, len(tags))
for _, t := range tags {
values[string(t.Key)] = string(t.Value)
}
if !influxql.EvalBool(e.Expr, values) {
continue
}
}
}

if itr.auth != nil && !itr.auth.AuthorizeSeriesRead(itr.is.Database(), name, tags) {
continue
}

out := models.MakeKey(name, tags)
// ensure nil is only returned when we are done (or for errors)
if out == nil {
out = []byte{}
}
return out, nil
}
}

func (itr *measurementSeriesKeyByExprIterator) Close() error {
if itr == nil {
return nil
}
itr.once.Do(itr.releaser)
return itr.ids.Close()
}

// MeasurementSeriesKeyByExprIterator iterates through series, filtered by an expression on the tags.
// Any non-tag expressions will be filtered as if the field had the zero value.
func (is IndexSet) MeasurementSeriesKeyByExprIterator(name []byte, expr influxql.Expr, auth query.Authorizer) (SeriesKeyIterator, error) {
release := is.SeriesFile.Retain()
// Create iterator for all matching series.
ids, err := is.measurementSeriesByExprIterator(name, expr)
if err != nil {
release()
return nil, err
}
if ids == nil {
release()
return nil, nil
}
return &measurementSeriesKeyByExprIterator{
ids: ids,
releaser: release,
auth: auth,
is: is,
}, nil
}

// MeasurementSeriesKeysByExpr returns a list of series keys matching expr.
func (is IndexSet) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr) ([][]byte, error) {
release := is.SeriesFile.Retain()
Expand Down
2 changes: 1 addition & 1 deletion tsdb/series_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func (f *SeriesFile) SeriesCount() uint64 {
return n
}

// SeriesIterator returns an iterator over all the series.
// SeriesIDIterator returns an iterator over all the series.
func (f *SeriesFile) SeriesIDIterator() SeriesIDIterator {
var ids []uint64
for _, p := range f.partitions {
Expand Down

0 comments on commit d28bcb8

Please sign in to comment.