From 0cc98271ce036554490ba9aaaf77d97afc020dba Mon Sep 17 00:00:00 2001 From: Aditya Hegde Date: Tue, 14 Jan 2025 20:58:25 +0530 Subject: [PATCH] Use cached timestamps in metrics aggreagtion query --- runtime/metricsview/executor.go | 7 +- runtime/metricsview/executor_rewrite_time.go | 8 +- runtime/queries/metricsview_aggregation.go | 52 +++ .../queries/metricsview_aggregation_test.go | 3 + runtime/queries/metricsview_time_range.go | 300 ------------------ runtime/resolvers/metricsview_time_range.go | 22 +- 6 files changed, 74 insertions(+), 318 deletions(-) delete mode 100644 runtime/queries/metricsview_time_range.go diff --git a/runtime/metricsview/executor.go b/runtime/metricsview/executor.go index 9134a265ffe..03cdb108117 100644 --- a/runtime/metricsview/executor.go +++ b/runtime/metricsview/executor.go @@ -155,9 +155,10 @@ func (e *Executor) Timestamps(ctx context.Context, executionTime *time.Time) (ti return e.min, e.max, e.watermark, nil } -func (e *Executor) BindQuery(ctx context.Context, qry *Query, min, max, watermark time.Time) error { - e.min = min - e.max = max +// BindQuery allows to set min, max and watermark from a cache. +func (e *Executor) BindQuery(ctx context.Context, qry *Query, minTime, maxTime, watermark time.Time) error { + e.min = minTime + e.max = maxTime e.watermark = watermark return e.rewriteQueryTimeRanges(ctx, qry, nil) } diff --git a/runtime/metricsview/executor_rewrite_time.go b/runtime/metricsview/executor_rewrite_time.go index 2c92c543b48..720d11205c5 100644 --- a/runtime/metricsview/executor_rewrite_time.go +++ b/runtime/metricsview/executor_rewrite_time.go @@ -12,7 +12,7 @@ import ( // rewriteQueryTimeRanges rewrites the time ranges in the query to fixed start/end timestamps. func (e *Executor) rewriteQueryTimeRanges(ctx context.Context, qry *Query, executionTime *time.Time) error { - minTime, maxTime, watermark, err := e.Timestamps(ctx, executionTime) + _, _, watermark, err := e.Timestamps(ctx, executionTime) if err != nil { return fmt.Errorf("failed to fetch time stamps: %w", err) } @@ -26,12 +26,12 @@ func (e *Executor) rewriteQueryTimeRanges(ctx context.Context, qry *Query, execu } } - err = e.resolveTimeRange(ctx, qry.TimeRange, tz, minTime, maxTime, watermark) + err = e.resolveTimeRange(qry.TimeRange, tz, watermark) if err != nil { return fmt.Errorf("failed to resolve time range: %w", err) } - err = e.resolveTimeRange(ctx, qry.ComparisonTimeRange, tz, minTime, maxTime, watermark) + err = e.resolveTimeRange(qry.ComparisonTimeRange, tz, watermark) if err != nil { return fmt.Errorf("failed to resolve comparison time range: %w", err) } @@ -40,7 +40,7 @@ func (e *Executor) rewriteQueryTimeRanges(ctx context.Context, qry *Query, execu } // resolveTimeRange resolves the given time range, ensuring only its Start and End properties are populated. -func (e *Executor) resolveTimeRange(ctx context.Context, tr *TimeRange, tz *time.Location, minTime, maxTime, watermark time.Time) error { +func (e *Executor) resolveTimeRange(tr *TimeRange, tz *time.Location, watermark time.Time) error { if tr == nil || tr.IsZero() { return nil } diff --git a/runtime/queries/metricsview_aggregation.go b/runtime/queries/metricsview_aggregation.go index 6b5ff7a81c7..e6ec236bf60 100644 --- a/runtime/queries/metricsview_aggregation.go +++ b/runtime/queries/metricsview_aggregation.go @@ -3,10 +3,12 @@ package queries import ( "context" "encoding/json" + "errors" "fmt" "io" "os" "strings" + "time" runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" "github.com/rilldata/rill/runtime" @@ -130,6 +132,48 @@ func (q *MetricsViewAggregation) Export(ctx context.Context, rt *runtime.Runtime } defer e.Close() + res, err := rt.Resolve(ctx, &runtime.ResolveOptions{ + InstanceID: instanceID, + Resolver: "metrics_time_range", + ResolverProperties: map[string]any{ + "metrics_view": q.MetricsViewName, + }, + Args: map[string]any{ + "priority": q.Priority, + }, + Claims: q.SecurityClaims, + }) + if err != nil { + return err + } + defer res.Close() + + row, err := res.Next() + if err != nil { + if errors.Is(err, io.EOF) { + return errors.New("time range query returned no results") + } + return err + } + + minTime, err := anyToTime(row["min"].(string)) + if err != nil { + return err + } + maxTime, err := anyToTime(row["max"].(string)) + if err != nil { + return err + } + watermark, err := anyToTime(row["watermark"].(string)) + if err != nil { + return err + } + + err = e.BindQuery(ctx, qry, minTime, maxTime, watermark) + if err != nil { + return err + } + var format drivers.FileFormat switch opts.Format { case runtimev1.ExportFormat_EXPORT_FORMAT_CSV: @@ -355,3 +399,11 @@ func metricViewExpression(expr *runtimev1.Expression, sql string) (*metricsview. } return nil, nil } + +func anyToTime(tm any) (time.Time, error) { + tmStr, ok := tm.(string) + if !ok { + return time.Time{}, errors.New("invalid type") + } + return time.Parse(time.RFC3339, tmStr) +} diff --git a/runtime/queries/metricsview_aggregation_test.go b/runtime/queries/metricsview_aggregation_test.go index 3c0404e0628..b81f9920730 100644 --- a/runtime/queries/metricsview_aggregation_test.go +++ b/runtime/queries/metricsview_aggregation_test.go @@ -63,6 +63,9 @@ func TestMetricViewAggregationAgainstClickHouse(t *testing.T) { t.Run("testMetricsViewsAggregation_comparison_with_offset_and_limit_and_delta", func(t *testing.T) { testMetricsViewsAggregation_comparison_with_offset_and_limit_and_delta(t, rt, instanceID) }) + t.Run("testMetricsViewsAggregation_comparison_iso_timerange", func(t *testing.T) { + testMetricsViewsAggregation_comparison_iso_timerange(t, rt, instanceID) + }) } func TestMetricViewAggregationAgainstDuckDB(t *testing.T) { diff --git a/runtime/queries/metricsview_time_range.go b/runtime/queries/metricsview_time_range.go deleted file mode 100644 index 584b11f1527..00000000000 --- a/runtime/queries/metricsview_time_range.go +++ /dev/null @@ -1,300 +0,0 @@ -package queries - -import ( - "context" - "encoding/json" - "errors" - "fmt" - "io" - "time" - - runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" - "github.com/rilldata/rill/runtime" - "github.com/rilldata/rill/runtime/drivers" - "golang.org/x/sync/errgroup" - "google.golang.org/protobuf/types/known/timestamppb" -) - -type MetricsViewTimeRange struct { - MetricsViewName string `json:"name"` - MetricsView *runtimev1.MetricsViewSpec `json:"-"` - ResolvedMVSecurity *runtime.ResolvedSecurity `json:"security"` - - Result *runtimev1.MetricsViewTimeRangeResponse `json:"_"` -} - -var _ runtime.Query = &MetricsViewTimeRange{} - -func (q *MetricsViewTimeRange) Key() string { - r, err := json.Marshal(q) - if err != nil { - panic(err) - } - return fmt.Sprintf("MetricsViewTimeRange:%s", r) -} - -func (q *MetricsViewTimeRange) Deps() []*runtimev1.ResourceName { - return []*runtimev1.ResourceName{ - {Kind: runtime.ResourceKindMetricsView, Name: q.MetricsViewName}, - } -} - -func (q *MetricsViewTimeRange) MarshalResult() *runtime.QueryResult { - return &runtime.QueryResult{ - Value: q.Result, - Bytes: sizeProtoMessage(q.Result), - } -} - -func (q *MetricsViewTimeRange) UnmarshalResult(v any) error { - res, ok := v.(*runtimev1.MetricsViewTimeRangeResponse) - if !ok { - return fmt.Errorf("MetricsViewTimeRange: mismatched unmarshal input") - } - q.Result = res - return nil -} - -func (q *MetricsViewTimeRange) Resolve(ctx context.Context, rt *runtime.Runtime, instanceID string, priority int) error { - policyFilter := "" - if q.ResolvedMVSecurity != nil { - policyFilter = q.ResolvedMVSecurity.RowFilter() - } - - if q.MetricsView.TimeDimension == "" { - return fmt.Errorf("metrics view '%s' does not have a time dimension", q.MetricsViewName) - } - - olap, release, err := rt.OLAP(ctx, instanceID, q.MetricsView.Connector) - if err != nil { - return err - } - defer release() - - switch olap.Dialect() { - case drivers.DialectDuckDB: - return q.resolveDuckDB(ctx, olap, q.MetricsView.TimeDimension, escapeMetricsViewTable(drivers.DialectDuckDB, q.MetricsView), policyFilter, priority) - case drivers.DialectDruid: - return q.resolveDruid(ctx, olap, q.MetricsView.TimeDimension, escapeMetricsViewTable(drivers.DialectDruid, q.MetricsView), policyFilter, priority) - case drivers.DialectClickHouse: - return q.resolveClickHouseAndPinot(ctx, olap, q.MetricsView.TimeDimension, escapeMetricsViewTable(drivers.DialectClickHouse, q.MetricsView), policyFilter, priority) - case drivers.DialectPinot: - return q.resolveClickHouseAndPinot(ctx, olap, q.MetricsView.TimeDimension, escapeMetricsViewTable(drivers.DialectPinot, q.MetricsView), policyFilter, priority) - default: - return fmt.Errorf("not available for dialect '%s'", olap.Dialect()) - } -} - -func (q *MetricsViewTimeRange) resolveDuckDB(ctx context.Context, olap drivers.OLAPStore, timeDim, escapedTableName, filter string, priority int) error { - if filter != "" { - filter = fmt.Sprintf(" WHERE %s", filter) - } - - rangeSQL := fmt.Sprintf( - "SELECT min(%[1]s) as \"min\", max(%[1]s) as \"max\", max(%[1]s) - min(%[1]s) as \"interval\" FROM %[2]s %[3]s", - safeName(timeDim), - escapedTableName, - filter, - ) - - rows, err := olap.Execute(ctx, &drivers.Statement{ - Query: rangeSQL, - Priority: priority, - ExecutionTimeout: defaultExecutionTimeout, - }) - if err != nil { - return err - } - defer rows.Close() - - if rows.Next() { - summary := &runtimev1.TimeRangeSummary{} - rowMap := make(map[string]any) - err = rows.MapScan(rowMap) - if err != nil { - return err - } - if v := rowMap["min"]; v != nil { - minTime, ok := v.(time.Time) - if !ok { - return fmt.Errorf("not a timestamp column") - } - summary.Min = timestamppb.New(minTime) - summary.Max = timestamppb.New(rowMap["max"].(time.Time)) - summary.Interval, err = handleDuckDBInterval(rowMap["interval"]) - if err != nil { - return err - } - } - q.Result = &runtimev1.MetricsViewTimeRangeResponse{ - TimeRangeSummary: summary, - } - return nil - } - - err = rows.Err() - if err != nil { - return err - } - - return errors.New("no rows returned") -} - -func (q *MetricsViewTimeRange) resolveDruid(ctx context.Context, olap drivers.OLAPStore, timeDim, escapedTableName, filter string, priority int) error { - if filter != "" { - filter = fmt.Sprintf(" WHERE %s", filter) - } - - var minTime, maxTime time.Time - group, ctx := errgroup.WithContext(ctx) - - group.Go(func() error { - minSQL := fmt.Sprintf( - "SELECT min(%[1]s) as \"min\" FROM %[2]s %[3]s", - safeName(timeDim), - escapedTableName, - filter, - ) - - rows, err := olap.Execute(ctx, &drivers.Statement{ - Query: minSQL, - Priority: priority, - ExecutionTimeout: defaultExecutionTimeout, - }) - if err != nil { - return err - } - defer rows.Close() - - if rows.Next() { - err = rows.Scan(&minTime) - if err != nil { - return err - } - } else { - err = rows.Err() - if err != nil { - return err - } - return errors.New("no rows returned for min time") - } - - return nil - }) - - group.Go(func() error { - maxSQL := fmt.Sprintf( - "SELECT max(%[1]s) as \"max\" FROM %[2]s %[3]s", - safeName(timeDim), - escapedTableName, - filter, - ) - - rows, err := olap.Execute(ctx, &drivers.Statement{ - Query: maxSQL, - Priority: priority, - ExecutionTimeout: defaultExecutionTimeout, - }) - if err != nil { - return err - } - defer rows.Close() - - if rows.Next() { - err = rows.Scan(&maxTime) - if err != nil { - return err - } - } else { - err = rows.Err() - if err != nil { - return err - } - return errors.New("no rows returned for max time") - } - - return nil - }) - - err := group.Wait() - if err != nil { - return err - } - - summary := &runtimev1.TimeRangeSummary{} - summary.Min = timestamppb.New(minTime) - summary.Max = timestamppb.New(maxTime) - summary.Interval = &runtimev1.TimeRangeSummary_Interval{ - Micros: maxTime.Sub(minTime).Microseconds(), - } - q.Result = &runtimev1.MetricsViewTimeRangeResponse{ - TimeRangeSummary: summary, - } - - return nil -} - -func (q *MetricsViewTimeRange) resolveClickHouseAndPinot(ctx context.Context, olap drivers.OLAPStore, timeDim, escapedTableName, filter string, priority int) error { - if filter != "" { - filter = fmt.Sprintf(" WHERE %s", filter) - } - - rangeSQL := fmt.Sprintf( - "SELECT min(%[1]s) AS \"min\", max(%[1]s) AS \"max\" FROM %[2]s %[3]s", - safeName(timeDim), - escapedTableName, - filter, - ) - - rows, err := olap.Execute(ctx, &drivers.Statement{ - Query: rangeSQL, - Priority: priority, - ExecutionTimeout: defaultExecutionTimeout, - }) - if err != nil { - return err - } - defer rows.Close() - - if rows.Next() { - summary := &runtimev1.TimeRangeSummary{} - var minVal, maxVal *time.Time - err = rows.Scan(&minVal, &maxVal) - if err != nil { - return err - } - - if minVal != nil { - summary.Min = timestamppb.New(*minVal) - } - if maxVal != nil { - summary.Max = timestamppb.New(*maxVal) - } - if minVal != nil && maxVal != nil { - // ignoring months for now since its hard to compute and anyways not being used - summary.Interval = &runtimev1.TimeRangeSummary_Interval{} - duration := maxVal.Sub(*minVal) - hours := duration.Hours() - if hours >= hourInDay { - summary.Interval.Days = int32(hours / hourInDay) - } - summary.Interval.Micros = duration.Microseconds() - microsInDay*int64(summary.Interval.Days) - } - - q.Result = &runtimev1.MetricsViewTimeRangeResponse{ - TimeRangeSummary: summary, - } - return nil - } - - err = rows.Err() - if err != nil { - return err - } - - return errors.New("no rows returned") -} - -func (q *MetricsViewTimeRange) Export(ctx context.Context, rt *runtime.Runtime, instanceID string, w io.Writer, opts *runtime.ExportOptions) error { - return ErrExportNotSupported -} diff --git a/runtime/resolvers/metricsview_time_range.go b/runtime/resolvers/metricsview_time_range.go index c4133c9105d..8acf592a71c 100644 --- a/runtime/resolvers/metricsview_time_range.go +++ b/runtime/resolvers/metricsview_time_range.go @@ -55,33 +55,33 @@ func newMetricsViewTimeRangeResolver(ctx context.Context, opts *runtime.Resolver return nil, err } - var spec *runtimev1.MetricsViewSpec - var security *runtime.ResolvedSecurity - res, err := ctrl.Get(ctx, &runtimev1.ResourceName{Kind: runtime.ResourceKindMetricsView, Name: tr.MetricsView}, false) if err != nil { return nil, err } - spec = res.GetMetricsView().State.ValidSpec - if spec == nil { + mv := res.GetMetricsView().State.ValidSpec + if mv == nil { return nil, fmt.Errorf("metrics view %q is invalid", res.Meta.Name.Name) } - security, err = opts.Runtime.ResolveSecurity(opts.InstanceID, opts.Claims, res) - if err != nil { - return nil, err + if mv.TimeDimension == "" { + return nil, fmt.Errorf("metrics view '%s' does not have a time dimension", tr.MetricsView) } - if spec.TimeDimension == "" { - return nil, fmt.Errorf("metrics view '%s' does not have a time dimension", tr.MetricsView) + security, err := opts.Runtime.ResolveSecurity(opts.InstanceID, opts.Claims, res) + if err != nil { + return nil, err } if !security.CanAccess() { return nil, runtime.ErrForbidden } - ex, err := metricsview.NewExecutor(ctx, opts.Runtime, opts.InstanceID, spec, false, security, args.Priority) + ex, err := metricsview.NewExecutor(ctx, opts.Runtime, opts.InstanceID, mv, false, security, args.Priority) + if err != nil { + return nil, err + } return &metricsViewTimeRangeResolver{ runtime: opts.Runtime,