Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PLAT-108920] not skip samples during dedup #38

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions pkg/dedup/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ type dedupSeriesIterator struct {
lastIter chunkenc.Iterator

penA, penB int64
useA bool
useA, useB bool
}

func newDedupSeriesIterator(a, b adjustableSeriesIterator) *dedupSeriesIterator {
Expand All @@ -288,6 +288,7 @@ func newDedupSeriesIterator(a, b adjustableSeriesIterator) *dedupSeriesIterator
lastIter: a,
aval: a.Next(),
bval: b.Next(),
useB: true,
}
}

Expand All @@ -304,16 +305,17 @@ func (it *dedupSeriesIterator) Next() chunkenc.ValueType {
}()

// Advance both iterators to at least the next highest timestamp plus the potential penalty.
if it.aval != chunkenc.ValNone {
if it.aval != chunkenc.ValNone && it.useA {
it.aval = it.a.Seek(it.lastT + 1 + it.penA)
}
if it.bval != chunkenc.ValNone {
if it.bval != chunkenc.ValNone && it.useB {
it.bval = it.b.Seek(it.lastT + 1 + it.penB)
}

// Handle basic cases where one iterator is exhausted before the other.
if it.aval == chunkenc.ValNone {
it.useA = false
it.useB = true
if it.bval != chunkenc.ValNone {
it.lastT = it.b.AtT()
it.lastIter = it.b
Expand All @@ -323,6 +325,7 @@ func (it *dedupSeriesIterator) Next() chunkenc.ValueType {
}
if it.bval == chunkenc.ValNone {
it.useA = true
it.useB = false
it.lastT = it.a.AtT()
it.lastIter = it.a
it.penA = 0
Expand All @@ -336,6 +339,7 @@ func (it *dedupSeriesIterator) Next() chunkenc.ValueType {
tb := it.b.AtT()

it.useA = ta <= tb
it.useB = ta >= tb

// For the series we didn't pick, add a penalty twice as high as the delta of the last two
// samples to the next seek against it.
Expand Down
24 changes: 24 additions & 0 deletions pkg/dedup/iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ func (s *mockedSeriesIterator) Seek(t int64) chunkenc.ValueType {
}

func (s *mockedSeriesIterator) At() (t int64, v float64) {
if s.cur >= len(s.samples) {
return 0, 0
}
sample := s.samples[s.cur]
return sample.t, sample.f
}
Expand Down Expand Up @@ -374,6 +377,27 @@ func TestDedupSeriesSet(t *testing.T) {
},
},
},
{
name: "Multi dedup labels - data points absent",
input: []series{
{
lset: labels.Labels{{Name: "a", Value: "5"}, {Name: "c", Value: "6"}},
samples: []sample{{10000, 1}, {30000, 3}, {40000, 4}},
}, {
lset: labels.Labels{{Name: "a", Value: "5"}, {Name: "c", Value: "6"}},
samples: []sample{{10000, 1}, {20000, 2}, {30000, 3}, {50000, 5}},
}, {
lset: labels.Labels{{Name: "a", Value: "5"}, {Name: "c", Value: "6"}},
samples: []sample{{10000, 1}, {80000, 10}},
},
},
exp: []series{
{
lset: labels.Labels{{Name: "a", Value: "5"}, {Name: "c", Value: "6"}},
samples: []sample{{10000, 1}, {20000, 2}, {30000, 3}, {40000, 4}, {50000, 5}, {80000, 10}},
},
},
},
{
name: "Multi dedup label - some series don't have all dedup labels",
input: []series{
Expand Down
4 changes: 0 additions & 4 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,6 @@ func newQuerierInternal(
if logger == nil {
logger = log.NewNopLogger()
}
rl := make(map[string]struct{})
for _, replicaLabel := range replicaLabels {
rl[replicaLabel] = struct{}{}
}

partialResponseStrategy := storepb.PartialResponseStrategy_ABORT
if groupReplicaPartialResponseStrategy {
Expand Down
2 changes: 1 addition & 1 deletion pkg/query/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ func TestQuerier_Select(t *testing.T) {
expectedAfterDedup: []series{{
lset: nil,
// We don't expect correctness here, it's just random non-replica data.
samples: []sample{{1, 1}, {2, 2}, {3, 3}, {5, 5}, {6, 6}, {7, 7}},
samples: []sample{{1, 1}, {2, 2}, {3, 3}, {4, 4}, {5, 5}, {6, 6}, {7, 7}},
}},
expectedWarning: "partial error",
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
opts.EnableOverlappingCompaction = false
s, err := tsdb.Open(
dataDir,
logger,
level.NewFilter(logger, level.AllowError()), // warning is too verbose
reg,
&opts,
nil,
Expand Down
Loading