Skip to content

Commit

Permalink
Use cached timestamps in metrics aggreagtion query
Browse files Browse the repository at this point in the history
  • Loading branch information
AdityaHegde committed Jan 14, 2025
1 parent c70143f commit 0cc9827
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 318 deletions.
7 changes: 4 additions & 3 deletions runtime/metricsview/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,10 @@ func (e *Executor) Timestamps(ctx context.Context, executionTime *time.Time) (ti
return e.min, e.max, e.watermark, nil
}

func (e *Executor) BindQuery(ctx context.Context, qry *Query, min, max, watermark time.Time) error {
e.min = min
e.max = max
// BindQuery allows to set min, max and watermark from a cache.
func (e *Executor) BindQuery(ctx context.Context, qry *Query, minTime, maxTime, watermark time.Time) error {
e.min = minTime
e.max = maxTime
e.watermark = watermark
return e.rewriteQueryTimeRanges(ctx, qry, nil)
}
Expand Down
8 changes: 4 additions & 4 deletions runtime/metricsview/executor_rewrite_time.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

// rewriteQueryTimeRanges rewrites the time ranges in the query to fixed start/end timestamps.
func (e *Executor) rewriteQueryTimeRanges(ctx context.Context, qry *Query, executionTime *time.Time) error {
minTime, maxTime, watermark, err := e.Timestamps(ctx, executionTime)
_, _, watermark, err := e.Timestamps(ctx, executionTime)
if err != nil {
return fmt.Errorf("failed to fetch time stamps: %w", err)
}
Expand All @@ -26,12 +26,12 @@ func (e *Executor) rewriteQueryTimeRanges(ctx context.Context, qry *Query, execu
}
}

err = e.resolveTimeRange(ctx, qry.TimeRange, tz, minTime, maxTime, watermark)
err = e.resolveTimeRange(qry.TimeRange, tz, watermark)
if err != nil {
return fmt.Errorf("failed to resolve time range: %w", err)
}

err = e.resolveTimeRange(ctx, qry.ComparisonTimeRange, tz, minTime, maxTime, watermark)
err = e.resolveTimeRange(qry.ComparisonTimeRange, tz, watermark)
if err != nil {
return fmt.Errorf("failed to resolve comparison time range: %w", err)
}
Expand All @@ -40,7 +40,7 @@ func (e *Executor) rewriteQueryTimeRanges(ctx context.Context, qry *Query, execu
}

// resolveTimeRange resolves the given time range, ensuring only its Start and End properties are populated.
func (e *Executor) resolveTimeRange(ctx context.Context, tr *TimeRange, tz *time.Location, minTime, maxTime, watermark time.Time) error {
func (e *Executor) resolveTimeRange(tr *TimeRange, tz *time.Location, watermark time.Time) error {
if tr == nil || tr.IsZero() {
return nil
}
Expand Down
52 changes: 52 additions & 0 deletions runtime/queries/metricsview_aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package queries
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"strings"
"time"

runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime"
Expand Down Expand Up @@ -130,6 +132,48 @@ func (q *MetricsViewAggregation) Export(ctx context.Context, rt *runtime.Runtime
}
defer e.Close()

res, err := rt.Resolve(ctx, &runtime.ResolveOptions{
InstanceID: instanceID,
Resolver: "metrics_time_range",
ResolverProperties: map[string]any{
"metrics_view": q.MetricsViewName,
},
Args: map[string]any{
"priority": q.Priority,
},
Claims: q.SecurityClaims,
})
if err != nil {
return err
}
defer res.Close()

row, err := res.Next()
if err != nil {
if errors.Is(err, io.EOF) {
return errors.New("time range query returned no results")
}
return err
}

minTime, err := anyToTime(row["min"].(string))
if err != nil {
return err
}
maxTime, err := anyToTime(row["max"].(string))
if err != nil {
return err
}
watermark, err := anyToTime(row["watermark"].(string))
if err != nil {
return err
}

err = e.BindQuery(ctx, qry, minTime, maxTime, watermark)
if err != nil {
return err
}

var format drivers.FileFormat
switch opts.Format {
case runtimev1.ExportFormat_EXPORT_FORMAT_CSV:
Expand Down Expand Up @@ -355,3 +399,11 @@ func metricViewExpression(expr *runtimev1.Expression, sql string) (*metricsview.
}
return nil, nil
}

func anyToTime(tm any) (time.Time, error) {
tmStr, ok := tm.(string)
if !ok {
return time.Time{}, errors.New("invalid type")
}
return time.Parse(time.RFC3339, tmStr)
}
3 changes: 3 additions & 0 deletions runtime/queries/metricsview_aggregation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ func TestMetricViewAggregationAgainstClickHouse(t *testing.T) {
t.Run("testMetricsViewsAggregation_comparison_with_offset_and_limit_and_delta", func(t *testing.T) {
testMetricsViewsAggregation_comparison_with_offset_and_limit_and_delta(t, rt, instanceID)
})
t.Run("testMetricsViewsAggregation_comparison_iso_timerange", func(t *testing.T) {
testMetricsViewsAggregation_comparison_iso_timerange(t, rt, instanceID)
})
}

func TestMetricViewAggregationAgainstDuckDB(t *testing.T) {
Expand Down
Loading

0 comments on commit 0cc9827

Please sign in to comment.