diff --git a/.github/workflows/go-test.yml b/.github/workflows/go-test.yml index 7463c9ba091..d1ab71acc6d 100644 --- a/.github/workflows/go-test.yml +++ b/.github/workflows/go-test.yml @@ -24,4 +24,7 @@ jobs: - name: Go fmt run: test -z $(gofmt -l .) - name: Go test + env: + METRICS_CREDS: ${{ secrets.METRICS_CREDS }} run: go test -short -v ./... + diff --git a/runtime/drivers/druid/druidsqldriver/druid_api_sql_driver.go b/runtime/drivers/druid/druidsqldriver/druid_api_sql_driver.go index e0afed3d032..16e7455d8a0 100644 --- a/runtime/drivers/druid/druidsqldriver/druid_api_sql_driver.go +++ b/runtime/drivers/druid/druidsqldriver/druid_api_sql_driver.go @@ -3,17 +3,28 @@ package druidsqldriver import ( "bytes" "context" + "crypto/x509" "database/sql" "database/sql/driver" "encoding/json" "fmt" "io" "net/http" + "net/url" "reflect" + "regexp" "strconv" "time" "github.com/google/uuid" + "github.com/rilldata/rill/runtime/drivers/druid/retrier" +) + +// non-retryable HTTP errors +var ( + redirectsErrorRe = regexp.MustCompile(`stopped after \d+ redirects\z`) + schemeErrorRe = regexp.MustCompile(`unsupported protocol scheme`) + notTrustedErrorRe = regexp.MustCompile(`certificate is not trusted`) ) type druidSQLDriver struct{} @@ -40,187 +51,275 @@ type sqlConnection struct { var _ driver.QueryerContext = &sqlConnection{} -func (c *sqlConnection) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) { - dr := newDruidRequest(query, args) +type coordinatorHTTPCheck struct { + c *sqlConnection +} + +var _ retrier.AdditionalTest = &coordinatorHTTPCheck{} + +func (chc *coordinatorHTTPCheck) EnsureHardFailure(ctx context.Context) (bool, error) { + dr := newDruidRequest("SELECT * FROM sys.segments LIMIT 1", nil) b, err := json.Marshal(dr) if err != nil { - return nil, err + return false, err } bodyReader := bytes.NewReader(b) - context.AfterFunc(ctx, func() { - tctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - r, err := http.NewRequestWithContext(tctx, http.MethodDelete, c.dsn+"/"+dr.Context.SQLQueryID, http.NoBody) - if err != nil { - return - } - - resp, err := c.client.Do(r) - if err != nil { - return - } - resp.Body.Close() - }) - req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.dsn, bodyReader) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, chc.c.dsn, bodyReader) if err != nil { - return nil, err + return false, err } req.Header.Add("Content-Type", "application/json") - resp, err := c.client.Do(req) + resp, err := chc.c.client.Do(req) if err != nil { - return nil, err + return false, err } dec := json.NewDecoder(resp.Body) var obj any err = dec.Decode(&obj) + resp.Body.Close() if err != nil { - resp.Body.Close() - return nil, err + return false, err } switch v := obj.(type) { case map[string]any: - resp.Body.Close() - return nil, fmt.Errorf("%v", obj) + if v["errorCode"] != "invalidInput" { + return false, fmt.Errorf("%v", obj) + } + return true, nil case []any: - columns := toStringArray(v) + return true, nil + default: + return false, fmt.Errorf("unexpected response: %v", obj) + } +} + +func (c *sqlConnection) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) { + // total sum is 126 seconds (sum(2*2^x) from 0 to 5) + re := retrier.NewRetrier(6, 2*time.Second, &coordinatorHTTPCheck{ + c: c, + }) + return re.RunCtx(ctx, func(ctx2 context.Context) (driver.Rows, retrier.Action, error) { + dr := newDruidRequest(query, args) + b, err := json.Marshal(dr) + if err != nil { + return nil, retrier.Fail, err + } + + bodyReader := bytes.NewReader(b) + + context.AfterFunc(ctx, func() { + tctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + r, err := http.NewRequestWithContext(tctx, http.MethodDelete, c.dsn+"/"+dr.Context.SQLQueryID, http.NoBody) + if err != nil { + return + } + + resp, err := c.client.Do(r) + if err != nil { + return + } + resp.Body.Close() + }) + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.dsn, bodyReader) + if err != nil { + return nil, retrier.Fail, err + } + + req.Header.Add("Content-Type", "application/json") + resp, err := c.client.Do(req) + if err != nil { + // nolint:errorlint // there's no wrapping + if v, ok := err.(*url.Error); ok { + // Don't retry if the error was due to too many redirects. + if redirectsErrorRe.MatchString(v.Error()) { + return nil, retrier.Fail, v + } + + // Don't retry if the error was due to an invalid protocol scheme. + if schemeErrorRe.MatchString(v.Error()) { + return nil, retrier.Fail, v + } + + // Don't retry if the error was due to TLS cert verification failure. + if notTrustedErrorRe.MatchString(v.Error()) { + return nil, retrier.Fail, v + } + + // nolint:errorlint // there's no wrapping + if _, ok := v.Err.(x509.UnknownAuthorityError); ok { + return nil, retrier.Fail, v + } + } + + return nil, retrier.Retry, err + } + + switch resp.StatusCode { + case http.StatusTooManyRequests: + return nil, retrier.Retry, fmt.Errorf("Too many requests") + case http.StatusUnauthorized, http.StatusForbidden: + return nil, retrier.Fail, fmt.Errorf("Unauthorized request") + } + + dec := json.NewDecoder(resp.Body) + + var obj any err = dec.Decode(&obj) if err != nil { resp.Body.Close() - return nil, err + return nil, retrier.Fail, err } + switch v := obj.(type) { + case map[string]any: + resp.Body.Close() + a := retrier.Retry + if v["errorCode"] == "invalidInput" { // hard fail all invalid-syntax errors + a = retrier.AdditionalCheck + } + return nil, a, fmt.Errorf("%v", obj) + case []any: + columns := toStringArray(v) + err = dec.Decode(&obj) + if err != nil { + resp.Body.Close() + return nil, retrier.Fail, err + } - types := toStringArray(obj.([]any)) - - transformers := make([]func(any) (any, error), len(columns)) - for i, c := range types { - transformers[i] = identityTransformer - switch c { - case "TINYINT": - transformers[i] = func(v any) (any, error) { - switch v := v.(type) { - case float64: - return int8(v), nil - default: - return v, nil + types := toStringArray(obj.([]any)) + + transformers := make([]func(any) (any, error), len(columns)) + for i, c := range types { + transformers[i] = identityTransformer + switch c { + case "TINYINT": + transformers[i] = func(v any) (any, error) { + switch v := v.(type) { + case float64: + return int8(v), nil + default: + return v, nil + } } - } - case "SMALLINT": - transformers[i] = func(v any) (any, error) { - switch v := v.(type) { - case float64: - return int16(v), nil - default: - return v, nil + case "SMALLINT": + transformers[i] = func(v any) (any, error) { + switch v := v.(type) { + case float64: + return int16(v), nil + default: + return v, nil + } } - } - case "INTEGER": - transformers[i] = func(v any) (any, error) { - switch v := v.(type) { - case float64: - return int32(v), nil - default: - return v, nil + case "INTEGER": + transformers[i] = func(v any) (any, error) { + switch v := v.(type) { + case float64: + return int32(v), nil + default: + return v, nil + } } - } - case "BIGINT": - transformers[i] = func(v any) (any, error) { - switch v := v.(type) { - case float64: - return int64(v), nil - default: - return v, nil + case "BIGINT": + transformers[i] = func(v any) (any, error) { + switch v := v.(type) { + case float64: + return int64(v), nil + default: + return v, nil + } } - } - case "FLOAT": - transformers[i] = func(v any) (any, error) { - switch v := v.(type) { - case float64: - return float32(v), nil - case string: - return strconv.ParseFloat(v, 32) - default: - return v, nil + case "FLOAT": + transformers[i] = func(v any) (any, error) { + switch v := v.(type) { + case float64: + return float32(v), nil + case string: + return strconv.ParseFloat(v, 32) + default: + return v, nil + } } - } - case "DOUBLE": - transformers[i] = func(v any) (any, error) { - switch v := v.(type) { - case string: - return strconv.ParseFloat(v, 64) - default: - return v, nil + case "DOUBLE": + transformers[i] = func(v any) (any, error) { + switch v := v.(type) { + case string: + return strconv.ParseFloat(v, 64) + default: + return v, nil + } } - } - case "REAL": - transformers[i] = func(v any) (any, error) { - switch v := v.(type) { - case string: - return strconv.ParseFloat(v, 64) - default: - return v, nil + case "REAL": + transformers[i] = func(v any) (any, error) { + switch v := v.(type) { + case string: + return strconv.ParseFloat(v, 64) + default: + return v, nil + } } - } - case "DECIMAL": - transformers[i] = func(v any) (any, error) { - switch v := v.(type) { - case string: - return strconv.ParseFloat(v, 64) - default: - return v, nil + case "DECIMAL": + transformers[i] = func(v any) (any, error) { + switch v := v.(type) { + case string: + return strconv.ParseFloat(v, 64) + default: + return v, nil + } } - } - case "TIMESTAMP": - transformers[i] = func(v any) (any, error) { - switch v := v.(type) { - case string: - t, err := time.Parse(time.RFC3339, v) + case "TIMESTAMP": + transformers[i] = func(v any) (any, error) { + switch v := v.(type) { + case string: + t, err := time.Parse(time.RFC3339, v) + if err != nil { + return nil, err + } + return t, nil + default: + return v, nil + } + } + case "ARRAY": + transformers[i] = func(v any) (any, error) { + var l []any + err := json.Unmarshal([]byte(v.(string)), &l) if err != nil { return nil, err } - return t, nil - default: - return v, nil - } - } - case "ARRAY": - transformers[i] = func(v any) (any, error) { - var l []any - err := json.Unmarshal([]byte(v.(string)), &l) - if err != nil { - return nil, err + return l, nil } - return l, nil - } - case "OTHER": - transformers[i] = func(v any) (any, error) { - var l map[string]any - err := json.Unmarshal([]byte(v.(string)), &l) - if err != nil { - return nil, err + case "OTHER": + transformers[i] = func(v any) (any, error) { + var l map[string]any + err := json.Unmarshal([]byte(v.(string)), &l) + if err != nil { + return nil, err + } + return l, nil } - return l, nil } } - } - druidRows := &druidRows{ - closer: resp.Body, - dec: dec, - columns: columns, - types: types, - transformers: transformers, - currentValues: make([]any, len(columns)), + druidRows := &druidRows{ + closer: resp.Body, + dec: dec, + columns: columns, + types: types, + transformers: transformers, + currentValues: make([]any, len(columns)), + } + return druidRows, retrier.Succeed, nil + default: + resp.Body.Close() + return nil, retrier.Fail, fmt.Errorf("unexpected response: %v", obj) } - return druidRows, nil - default: - resp.Body.Close() - return nil, fmt.Errorf("unexpected response: %v", obj) - } + }) } type druidRows struct { diff --git a/runtime/drivers/druid/olap.go b/runtime/drivers/druid/olap.go index d2947725d04..fe7251b332d 100644 --- a/runtime/drivers/druid/olap.go +++ b/runtime/drivers/druid/olap.go @@ -196,7 +196,15 @@ func (i informationSchema) All(ctx context.Context) ([]*drivers.Table, error) { } func (i informationSchema) Lookup(ctx context.Context, db, schema, name string) (*drivers.Table, error) { - q := ` + // ensure Coordinator is ready + q := "SELECT * FROM sys.segments LIMIT 1" + rows, err := i.c.db.QueryxContext(ctx, q, name) + if err != nil { + return nil, err + } + rows.Close() + + q = ` SELECT T.TABLE_SCHEMA AS SCHEMA, T.TABLE_NAME AS NAME, @@ -210,7 +218,7 @@ func (i informationSchema) Lookup(ctx context.Context, db, schema, name string) ORDER BY SCHEMA, NAME, TABLE_TYPE, C.ORDINAL_POSITION ` - rows, err := i.c.db.QueryxContext(ctx, q, name) + rows, err = i.c.db.QueryxContext(ctx, q, name) if err != nil { return nil, err } diff --git a/runtime/drivers/olap.go b/runtime/drivers/olap.go index 65d67f8876d..adaeed68db7 100644 --- a/runtime/drivers/olap.go +++ b/runtime/drivers/olap.go @@ -256,6 +256,42 @@ func (d Dialect) EscapeTable(db, schema, table string) string { return sb.String() } +type UnnestConstruct struct { + UnnestClause string + ColClause string + UnnestColName string + UnnestTableName string +} + +func (d Dialect) DimensionSelectConstruct(db, dbSchema, table string, dim *runtimev1.MetricsViewSpec_DimensionV2) *UnnestConstruct { + colName := d.EscapeIdentifier(dim.Name) + if !dim.Unnest || d == DialectDruid { + return &UnnestConstruct{ + ColClause: fmt.Sprintf(`(%s) as %s`, d.MetricsViewDimensionExpression(dim), colName), + } + } + + unnestColName := d.EscapeIdentifier(tempName(fmt.Sprintf("%s_%s_", "unnested", dim.Name))) + unnestTableName := tempName("tbl") + sel := fmt.Sprintf(`%s as %s`, unnestColName, colName) + if dim.Expression == "" { + // select "unnested_colName" as "colName" ... FROM "mv_table", LATERAL UNNEST("mv_table"."colName") tbl_name("unnested_colName") ... + return &UnnestConstruct{ + ColClause: sel, + UnnestClause: fmt.Sprintf(`, LATERAL UNNEST(%s.%s) %s(%s)`, d.EscapeTable(db, dbSchema, table), colName, unnestTableName, unnestColName), + UnnestTableName: unnestTableName, + UnnestColName: unnestColName, + } + } + + return &UnnestConstruct{ + ColClause: sel, + UnnestClause: fmt.Sprintf(`, LATERAL UNNEST(%s) %s(%s)`, dim.Expression, unnestTableName, unnestColName), + UnnestTableName: unnestTableName, + UnnestColName: unnestColName, + } +} + func (d Dialect) DimensionSelect(db, dbSchema, table string, dim *runtimev1.MetricsViewSpec_DimensionV2) (dimSelect, unnestClause string) { colName := d.EscapeIdentifier(dim.Name) if !dim.Unnest || d == DialectDruid { diff --git a/runtime/queries/metricsview.go b/runtime/queries/metricsview.go index 1f475e22a3f..c65c34c9348 100644 --- a/runtime/queries/metricsview.go +++ b/runtime/queries/metricsview.go @@ -76,7 +76,7 @@ func lookupMetricsView(ctx context.Context, rt *runtime.Runtime, instanceID, nam res, err := ctrl.Get(ctx, &runtimev1.ResourceName{Kind: runtime.ResourceKindMetricsView, Name: name}, false) if err != nil { - return nil, nil, status.Error(codes.InvalidArgument, err.Error()) + return nil, nil, status.Error(codes.InvalidArgument, fmt.Sprintf("error getting metrics view %q: %s", name, err.Error())) } mv := res.GetMetricsView() @@ -234,11 +234,18 @@ type ExpressionBuilder struct { dialect drivers.Dialect measures []*runtimev1.MetricsViewAggregationMeasure having bool + unnests map[string]*drivers.UnnestConstruct } func (builder *ExpressionBuilder) columnIdentifierExpression(name string) (string, bool) { // check if identifier is a dimension for _, dim := range builder.mv.Dimensions { + if dim.Unnest { + unnest := builder.unnests[dim.Name] + if unnest != nil { + return fmt.Sprintf("%s.%s", unnest.UnnestTableName, unnest.UnnestColName), true + } + } if dim.Name == name { return builder.dialect.MetricsViewDimensionExpression(dim), true } diff --git a/runtime/queries/metricsview_aggregation.go b/runtime/queries/metricsview_aggregation.go index 447fa346b2e..329375b0973 100644 --- a/runtime/queries/metricsview_aggregation.go +++ b/runtime/queries/metricsview_aggregation.go @@ -1914,6 +1914,20 @@ func (q *MetricsViewAggregation) buildMetricsComparisonAggregationSQL(ctx contex if len(q.Dimensions) == 0 && len(q.Measures) == 0 { return "", nil, errors.New("no dimensions or measures specified") } + + if len(q.Measures) == 1 { + m := &runtimev1.MetricsViewAggregationMeasure{} + switch t := q.Measures[0].Compute.(type) { + case *runtimev1.MetricsViewAggregationMeasure_ComparisonValue: + m.Name = t.ComparisonValue.Measure + case *runtimev1.MetricsViewAggregationMeasure_ComparisonDelta: + m.Name = t.ComparisonDelta.Measure + case *runtimev1.MetricsViewAggregationMeasure_ComparisonRatio: + m.Name = t.ComparisonRatio.Measure + } + q.Measures = append(q.Measures, m) + } + dimByName := make(map[string]*runtimev1.MetricsViewAggregationDimension, len(mv.Dimensions)) measuresByFinalName := make(map[string]*runtimev1.MetricsViewAggregationMeasure, len(q.Measures)) for _, d := range q.Dimensions { @@ -1969,6 +1983,7 @@ func (q *MetricsViewAggregation) buildMetricsComparisonAggregationSQL(ctx contex // these go last to decrease complexity of indexing columns var finalComparisonTimeDims []string mvDimsByName := make(map[string]*runtimev1.MetricsViewSpec_DimensionV2, len(mv.Dimensions)) + unnests := make(map[string]*drivers.UnnestConstruct) for _, d := range q.Dimensions { // Handle regular dimensions if d.TimeGrain == runtimev1.TimeGrain_TIME_GRAIN_UNSPECIFIED { @@ -1977,7 +1992,10 @@ func (q *MetricsViewAggregation) buildMetricsComparisonAggregationSQL(ctx contex return "", nil, err } mvDimsByName[d.Name] = dim - dimSel, unnestClause := dialect.DimensionSelect(mv.Database, mv.DatabaseSchema, mv.Table, dim) + unnest := dialect.DimensionSelectConstruct(mv.Database, mv.DatabaseSchema, mv.Table, dim) + unnests[d.Name] = unnest + dimSel := unnest.ColClause + unnestClause := unnest.UnnestClause selectCols = append(selectCols, dimSel) comparisonSelectCols = append(comparisonSelectCols, dimSel) finalDims = append(finalDims, fmt.Sprintf("COALESCE(base.%[1]s,comparison.%[1]s) as %[1]s", safeName(dim.Name))) @@ -2166,6 +2184,7 @@ func (q *MetricsViewAggregation) buildMetricsComparisonAggregationSQL(ctx contex mv: mv, dialect: dialect, measures: q.Measures, + unnests: unnests, } whereClause, whereClauseArgs, err := whereBuilder.buildExpression(q.Where) if err != nil { diff --git a/runtime/queries/metricsview_aggregation_test.go b/runtime/queries/metricsview_aggregation_test.go index 28b6a7373f9..469bb95182b 100644 --- a/runtime/queries/metricsview_aggregation_test.go +++ b/runtime/queries/metricsview_aggregation_test.go @@ -2423,11 +2423,12 @@ func TestMetricsViewsAggregation_comparison_no_time_dim(t *testing.T) { } func TestMetricsViewsAggregation_comparison_Druid_no_dims(t *testing.T) { - if os.Getenv("LOCALDRUID") == "" { - t.Skip("skipping the test in non-local Druid environment") + if os.Getenv("METRICS_CREDS") == "" { + t.Skip("skipping the test without metrics-in") } - rt, instanceID := testruntime.NewInstanceForDruidProject(t) + rt, instanceID, err := testruntime.NewInstanceForDruidProject(t) + require.NoError(t, err) limit := int64(10) q := &queries.MetricsViewAggregation{ @@ -2467,7 +2468,7 @@ func TestMetricsViewsAggregation_comparison_Druid_no_dims(t *testing.T) { }, Limit: &limit, } - err := q.Resolve(context.Background(), rt, instanceID, 0) + err = q.Resolve(context.Background(), rt, instanceID, 0) require.NoError(t, err) require.NotEmpty(t, q.Result) fields := q.Result.Schema.Fields @@ -2561,10 +2562,11 @@ func TestMetricsViewsAggregation_comparison_no_dims(t *testing.T) { } func TestMetricsViewsAggregation_Druid_comparison_measure_filter_with_totals(t *testing.T) { - if os.Getenv("LOCALDRUID") == "" { - t.Skip("skipping the test in non-local Druid environment") + if os.Getenv("METRICS_CREDS") == "" { + t.Skip("skipping the test without metrics-in") } - rt, instanceID := testruntime.NewInstanceForDruidProject(t) + rt, instanceID, err := testruntime.NewInstanceForDruidProject(t) + require.NoError(t, err) limit := int64(10) q := &queries.MetricsViewAggregation{ @@ -2635,7 +2637,7 @@ func TestMetricsViewsAggregation_Druid_comparison_measure_filter_with_totals(t * }, Limit: &limit, } - err := q.Resolve(context.Background(), rt, instanceID, 0) + err = q.Resolve(context.Background(), rt, instanceID, 0) require.NoError(t, err) require.NotEmpty(t, q.Result) fields := q.Result.Schema.Fields @@ -2769,10 +2771,11 @@ func TestMetricsViewsAggregation_comparison_measure_filter_with_a_single_derivat } func TestMetricsViewsAggregation_Druid_comparison_measure_filter_no_duplicates(t *testing.T) { - if os.Getenv("LOCALDRUID") == "" { - t.Skip("skipping the test in non-local Druid environment") + if os.Getenv("METRICS_CREDS") == "" { + t.Skip("skipping the test without metrics-in") } - rt, instanceID := testruntime.NewInstanceForDruidProject(t) + rt, instanceID, err := testruntime.NewInstanceForDruidProject(t) + require.NoError(t, err) limit := int64(10) q := &queries.MetricsViewAggregation{ @@ -2837,7 +2840,7 @@ func TestMetricsViewsAggregation_Druid_comparison_measure_filter_no_duplicates(t }, Limit: &limit, } - err := q.Resolve(context.Background(), rt, instanceID, 0) + err = q.Resolve(context.Background(), rt, instanceID, 0) require.NoError(t, err) require.NotEmpty(t, q.Result) fields := q.Result.Schema.Fields @@ -3064,10 +3067,11 @@ func TestMetricsViewsAggregation_comparison_measure_filter_with_totals(t *testin } func TestMetricsViewsAggregation_Druid_comparison_measure_filter_with_limit(t *testing.T) { - if os.Getenv("LOCALDRUID") == "" { - t.Skip("skipping the test in non-local Druid environment") + if os.Getenv("METRICS_CREDS") == "" { + t.Skip("skipping the test without metrics-in") } - rt, instanceID := testruntime.NewInstanceForDruidProject(t) + rt, instanceID, err := testruntime.NewInstanceForDruidProject(t) + require.NoError(t, err) limit := int64(3) q := &queries.MetricsViewAggregation{ @@ -3138,7 +3142,7 @@ func TestMetricsViewsAggregation_Druid_comparison_measure_filter_with_limit(t *t Limit: &limit, Offset: 1, } - err := q.Resolve(context.Background(), rt, instanceID, 0) + err = q.Resolve(context.Background(), rt, instanceID, 0) require.NoError(t, err) require.NotEmpty(t, q.Result) fields := q.Result.Schema.Fields @@ -3504,6 +3508,450 @@ func TestMetricsViewsAggregation_comparison_measure_filter_with_having(t *testin require.Equal(t, "Google,news.google.com,2022-01-01T00:00:00Z,2022-01-01T00:00:00Z,3.55,3.74,2022-01-02T00:00:00Z,2022-01-01T00:00:00Z", fieldsToString2digits(rows[i], "pub", "dom", "timestamp", "timestamp_year", "m1", "m1_p", "timestamp__previous", "timestamp_year__previous")) } +func TestMetricsViewsAggregation_Druid_comparison_tags_with_filter(t *testing.T) { + if os.Getenv("METRICS_CREDS") == "" { + t.Skip("skipping the test without metrics-in") + } + + rt, instanceID, err := testruntime.NewInstanceForDruidProject(t) + require.NoError(t, err) + + limit := int64(10) + q := &queries.MetricsViewAggregation{ + MetricsViewName: "ad_bids_metrics", + Dimensions: []*runtimev1.MetricsViewAggregationDimension{ + { + Name: "pub", + }, + { + Name: "tags", + }, + }, + Measures: []*runtimev1.MetricsViewAggregationMeasure{ + { + Name: "m1", + }, + { + Name: "m1_p", + Compute: &runtimev1.MetricsViewAggregationMeasure_ComparisonValue{ + ComparisonValue: &runtimev1.MetricsViewAggregationMeasureComputeComparisonValue{ + Measure: "m1", + }, + }, + }, + }, + Where: expressionpb.AndAll( + expressionpb.OrAll( + expressionpb.Eq("pub", "Yahoo"), + expressionpb.Eq("pub", "Google"), + ), + expressionpb.Eq("tags", "a"), + ), + Having: expressionpb.Gt("m1", 0.0), + Sort: []*runtimev1.MetricsViewAggregationSort{ + { + Name: "pub", + }, + { + Name: "tags", + }, + { + Name: "m1", + }, + }, + + TimeRange: &runtimev1.TimeRange{ + Start: timestamppb.New(time.Date(2022, 1, 1, 0, 0, 0, 0, time.UTC)), + End: timestamppb.New(time.Date(2022, 1, 2, 0, 0, 0, 0, time.UTC)), + }, + ComparisonTimeRange: &runtimev1.TimeRange{ + Start: timestamppb.New(time.Date(2022, 1, 2, 0, 0, 0, 0, time.UTC)), + End: timestamppb.New(time.Date(2022, 1, 3, 0, 0, 0, 0, time.UTC)), + }, + Limit: &limit, + } + err = q.Resolve(context.Background(), rt, instanceID, 0) + require.NoError(t, err) + require.NotEmpty(t, q.Result) + fields := q.Result.Schema.Fields + require.Equal(t, "pub,tags,m1,m1_p", columnNames(fields)) + i := 0 + + for _, sf := range q.Result.Schema.Fields { + fmt.Printf("%v ", sf.Name) + } + fmt.Printf("\n") + + for i, row := range q.Result.Data { + for _, sf := range q.Result.Schema.Fields { + fmt.Printf("%v ", row.Fields[sf.Name].AsInterface()) + } + fmt.Printf(" %d \n", i) + + } + rows := q.Result.Data + require.Equal(t, 2, len(rows)) + + i = 0 + require.Equal(t, "Google,a,3.17", fieldsToString2digits(rows[i], "pub", "tags", "m1")) + i++ + require.Equal(t, "Yahoo,a,3.23", fieldsToString2digits(rows[i], "pub", "tags", "m1")) +} + +func TestMetricsViewsAggregation_Druid_comparison_tags_with_time(t *testing.T) { + if os.Getenv("METRICS_CREDS") == "" { + t.Skip("skipping the test without metrics-in") + } + + rt, instanceID, err := testruntime.NewInstanceForDruidProject(t) + require.NoError(t, err) + + limit := int64(10) + q := &queries.MetricsViewAggregation{ + MetricsViewName: "ad_bids_metrics", + Dimensions: []*runtimev1.MetricsViewAggregationDimension{ + { + Name: "pub", + }, + { + Name: "tags", + }, + { + Name: "__time", + TimeGrain: runtimev1.TimeGrain_TIME_GRAIN_DAY, + }, + }, + Measures: []*runtimev1.MetricsViewAggregationMeasure{ + { + Name: "m1_p", + Compute: &runtimev1.MetricsViewAggregationMeasure_ComparisonValue{ + ComparisonValue: &runtimev1.MetricsViewAggregationMeasureComputeComparisonValue{ + Measure: "m1", + }, + }, + }, + }, + Where: expressionpb.AndAll( + expressionpb.OrAll( + expressionpb.Eq("pub", "Yahoo"), + expressionpb.Eq("pub", "Google"), + ), + expressionpb.Eq("tags", "a"), + ), + Having: expressionpb.Gt("m1", 0.0), + Sort: []*runtimev1.MetricsViewAggregationSort{ + { + Name: "pub", + }, + { + Name: "tags", + }, + { + Name: "__time", + }, + { + Name: "m1", + }, + }, + + TimeRange: &runtimev1.TimeRange{ + Start: timestamppb.New(time.Date(2022, 1, 1, 0, 0, 0, 0, time.UTC)), + End: timestamppb.New(time.Date(2022, 1, 2, 0, 0, 0, 0, time.UTC)), + }, + ComparisonTimeRange: &runtimev1.TimeRange{ + Start: timestamppb.New(time.Date(2022, 1, 2, 0, 0, 0, 0, time.UTC)), + End: timestamppb.New(time.Date(2022, 1, 3, 0, 0, 0, 0, time.UTC)), + }, + Limit: &limit, + } + err = q.Resolve(context.Background(), rt, instanceID, 0) + require.NoError(t, err) + require.NotEmpty(t, q.Result) + fields := q.Result.Schema.Fields + require.Equal(t, "pub,tags,__time,m1_p,m1,__time__previous", columnNames(fields)) + i := 0 + + for _, sf := range q.Result.Schema.Fields { + fmt.Printf("%v ", sf.Name) + } + fmt.Printf("\n") + + for i, row := range q.Result.Data { + for _, sf := range q.Result.Schema.Fields { + fmt.Printf("%v ", row.Fields[sf.Name].AsInterface()) + } + fmt.Printf(" %d \n", i) + + } + rows := q.Result.Data + require.Equal(t, 2, len(rows)) + + i = 0 + require.Equal(t, "Google,a,3.17", fieldsToString2digits(rows[i], "pub", "tags", "m1")) + i++ + require.Equal(t, "Yahoo,a,3.23", fieldsToString2digits(rows[i], "pub", "tags", "m1")) +} + +func TestMetricsViewsAggregation_comparison_tags_with_time(t *testing.T) { + rt, instanceID := testruntime.NewInstanceForProject(t, "ad_bids") + + limit := int64(10) + q := &queries.MetricsViewAggregation{ + MetricsViewName: "ad_bids_tags_metrics", + Dimensions: []*runtimev1.MetricsViewAggregationDimension{ + { + Name: "pub", + }, + { + Name: "tags", + }, + { + Name: "timestamp", + TimeGrain: runtimev1.TimeGrain_TIME_GRAIN_DAY, + }, + }, + Measures: []*runtimev1.MetricsViewAggregationMeasure{ + { + Name: "m1_p", + Compute: &runtimev1.MetricsViewAggregationMeasure_ComparisonValue{ + ComparisonValue: &runtimev1.MetricsViewAggregationMeasureComputeComparisonValue{ + Measure: "m1", + }, + }, + }, + }, + Where: expressionpb.AndAll( + expressionpb.OrAll( + expressionpb.Eq("pub", "Yahoo"), + expressionpb.Eq("pub", "Google"), + ), + expressionpb.Eq("tags", "a"), + ), + Having: expressionpb.Gt("m1", 0.0), + Sort: []*runtimev1.MetricsViewAggregationSort{ + { + Name: "pub", + }, + { + Name: "tags", + }, + { + Name: "timestamp", + }, + { + Name: "m1", + }, + }, + + TimeRange: &runtimev1.TimeRange{ + Start: timestamppb.New(time.Date(2022, 1, 1, 0, 0, 0, 0, time.UTC)), + End: timestamppb.New(time.Date(2022, 1, 2, 0, 0, 0, 0, time.UTC)), + }, + ComparisonTimeRange: &runtimev1.TimeRange{ + Start: timestamppb.New(time.Date(2022, 1, 2, 0, 0, 0, 0, time.UTC)), + End: timestamppb.New(time.Date(2022, 1, 3, 0, 0, 0, 0, time.UTC)), + }, + Limit: &limit, + } + err := q.Resolve(context.Background(), rt, instanceID, 0) + require.NoError(t, err) + require.NotEmpty(t, q.Result) + fields := q.Result.Schema.Fields + require.Equal(t, "pub,tags,timestamp,m1_p,m1,timestamp__previous", columnNames(fields)) + i := 0 + + for _, sf := range q.Result.Schema.Fields { + fmt.Printf("%v ", sf.Name) + } + fmt.Printf("\n") + + for i, row := range q.Result.Data { + for _, sf := range q.Result.Schema.Fields { + fmt.Printf("%v ", row.Fields[sf.Name].AsInterface()) + } + fmt.Printf(" %d \n", i) + + } + rows := q.Result.Data + require.Equal(t, 2, len(rows)) + + i = 0 + require.Equal(t, "Google,a,3.17", fieldsToString2digits(rows[i], "pub", "tags", "m1")) + i++ + require.Equal(t, "Yahoo,a,3.23", fieldsToString2digits(rows[i], "pub", "tags", "m1")) +} + +func TestMetricsViewsAggregation_comparison_tags_with_filter(t *testing.T) { + rt, instanceID := testruntime.NewInstanceForProject(t, "ad_bids") + + limit := int64(10) + q := &queries.MetricsViewAggregation{ + MetricsViewName: "ad_bids_tags_metrics", + Dimensions: []*runtimev1.MetricsViewAggregationDimension{ + { + Name: "pub", + }, + { + Name: "tags", + }, + }, + Measures: []*runtimev1.MetricsViewAggregationMeasure{ + { + Name: "m1", + }, + { + Name: "m1_p", + Compute: &runtimev1.MetricsViewAggregationMeasure_ComparisonValue{ + ComparisonValue: &runtimev1.MetricsViewAggregationMeasureComputeComparisonValue{ + Measure: "m1", + }, + }, + }, + }, + Where: expressionpb.AndAll( + expressionpb.OrAll( + expressionpb.Eq("pub", "Yahoo"), + expressionpb.Eq("pub", "Google"), + ), + expressionpb.Eq("tags", "a"), + ), + Having: expressionpb.Gt("m1", 0.0), + Sort: []*runtimev1.MetricsViewAggregationSort{ + { + Name: "pub", + }, + { + Name: "tags", + }, + { + Name: "m1", + }, + }, + + TimeRange: &runtimev1.TimeRange{ + Start: timestamppb.New(time.Date(2022, 1, 1, 0, 0, 0, 0, time.UTC)), + End: timestamppb.New(time.Date(2022, 1, 2, 0, 0, 0, 0, time.UTC)), + }, + ComparisonTimeRange: &runtimev1.TimeRange{ + Start: timestamppb.New(time.Date(2022, 1, 2, 0, 0, 0, 0, time.UTC)), + End: timestamppb.New(time.Date(2022, 1, 3, 0, 0, 0, 0, time.UTC)), + }, + Limit: &limit, + } + err := q.Resolve(context.Background(), rt, instanceID, 0) + require.NoError(t, err) + require.NotEmpty(t, q.Result) + fields := q.Result.Schema.Fields + require.Equal(t, "pub,tags,m1,m1_p", columnNames(fields)) + i := 0 + + for _, sf := range q.Result.Schema.Fields { + fmt.Printf("%v ", sf.Name) + } + fmt.Printf("\n") + + for i, row := range q.Result.Data { + for _, sf := range q.Result.Schema.Fields { + fmt.Printf("%v ", row.Fields[sf.Name].AsInterface()) + } + fmt.Printf(" %d \n", i) + + } + rows := q.Result.Data + require.Equal(t, 2, len(rows)) + + i = 0 + require.Equal(t, "Google,a,3.17", fieldsToString2digits(rows[i], "pub", "tags", "m1")) + i++ + require.Equal(t, "Yahoo,a,3.23", fieldsToString2digits(rows[i], "pub", "tags", "m1")) +} + +func TestMetricsViewsAggregation_comparison_tags(t *testing.T) { + rt, instanceID := testruntime.NewInstanceForProject(t, "ad_bids") + + limit := int64(10) + q := &queries.MetricsViewAggregation{ + MetricsViewName: "ad_bids_tags_metrics", + Dimensions: []*runtimev1.MetricsViewAggregationDimension{ + { + Name: "pub", + }, + { + Name: "tags", + }, + }, + Measures: []*runtimev1.MetricsViewAggregationMeasure{ + { + Name: "m1", + }, + { + Name: "m1_p", + Compute: &runtimev1.MetricsViewAggregationMeasure_ComparisonValue{ + ComparisonValue: &runtimev1.MetricsViewAggregationMeasureComputeComparisonValue{ + Measure: "m1", + }, + }, + }, + }, + Where: expressionpb.OrAll( + expressionpb.Eq("pub", "Yahoo"), + expressionpb.Eq("pub", "Google"), + ), + Having: expressionpb.Gt("m1", 0.0), + Sort: []*runtimev1.MetricsViewAggregationSort{ + { + Name: "pub", + }, + { + Name: "tags", + }, + { + Name: "m1", + }, + }, + + TimeRange: &runtimev1.TimeRange{ + Start: timestamppb.New(time.Date(2022, 1, 1, 0, 0, 0, 0, time.UTC)), + End: timestamppb.New(time.Date(2022, 1, 2, 0, 0, 0, 0, time.UTC)), + }, + ComparisonTimeRange: &runtimev1.TimeRange{ + Start: timestamppb.New(time.Date(2022, 1, 2, 0, 0, 0, 0, time.UTC)), + End: timestamppb.New(time.Date(2022, 1, 3, 0, 0, 0, 0, time.UTC)), + }, + Limit: &limit, + } + err := q.Resolve(context.Background(), rt, instanceID, 0) + require.NoError(t, err) + require.NotEmpty(t, q.Result) + fields := q.Result.Schema.Fields + require.Equal(t, "pub,tags,m1,m1_p", columnNames(fields)) + i := 0 + + for _, sf := range q.Result.Schema.Fields { + fmt.Printf("%v ", sf.Name) + } + fmt.Printf("\n") + + for i, row := range q.Result.Data { + for _, sf := range q.Result.Schema.Fields { + fmt.Printf("%v ", row.Fields[sf.Name].AsInterface()) + } + fmt.Printf(" %d \n", i) + + } + rows := q.Result.Data + require.Equal(t, 4, len(rows)) + + i = 0 + require.Equal(t, "Google,a,3.17", fieldsToString2digits(rows[i], "pub", "tags", "m1")) + i++ + require.Equal(t, "Google,b,3.17", fieldsToString2digits(rows[i], "pub", "tags", "m1")) + i++ + require.Equal(t, "Yahoo,a,3.23", fieldsToString2digits(rows[i], "pub", "tags", "m1")) + i++ + require.Equal(t, "Yahoo,b,3.23", fieldsToString2digits(rows[i], "pub", "tags", "m1")) +} + func TestMetricsViewsAggregation_comparison(t *testing.T) { rt, instanceID := testruntime.NewInstanceForProject(t, "ad_bids") @@ -3679,10 +4127,11 @@ func TestMetricsViewsAggregation_comparison_pivot(t *testing.T) { // // metrics-in cluster requires proper authentication credentials in the DSN. func TestMetricsViewsAggregation_comparison_Druid_one_dim_base_order(t *testing.T) { - if os.Getenv("LOCALDRUID") == "" { - t.Skip("skipping the test in non-local Druid environment") + if os.Getenv("METRICS_CREDS") == "" { + t.Skip("skipping the test without metrics-in") } - rt, instanceID := testruntime.NewInstanceForDruidProject(t) + rt, instanceID, err := testruntime.NewInstanceForDruidProject(t) + require.NoError(t, err) limit := int64(10) q := &queries.MetricsViewAggregation{ @@ -3748,7 +4197,7 @@ func TestMetricsViewsAggregation_comparison_Druid_one_dim_base_order(t *testing. }, Limit: &limit, } - err := q.Resolve(context.Background(), rt, instanceID, 0) + err = q.Resolve(context.Background(), rt, instanceID, 0) require.NoError(t, err) require.NotEmpty(t, q.Result) fields := q.Result.Schema.Fields @@ -3776,10 +4225,11 @@ func TestMetricsViewsAggregation_comparison_Druid_one_dim_base_order(t *testing. } func TestMetricsViewsAggregation_comparison_Druid_one_dim_comparison_order(t *testing.T) { - if os.Getenv("LOCALDRUID") == "" { - t.Skip("skipping the test in non-local Druid environment") + if os.Getenv("METRICS_CREDS") == "" { + t.Skip("skipping the test without metrics-in") } - rt, instanceID := testruntime.NewInstanceForDruidProject(t) + rt, instanceID, err := testruntime.NewInstanceForDruidProject(t) + require.NoError(t, err) limit := int64(10) q := &queries.MetricsViewAggregation{ @@ -3845,7 +4295,7 @@ func TestMetricsViewsAggregation_comparison_Druid_one_dim_comparison_order(t *te }, Limit: &limit, } - err := q.Resolve(context.Background(), rt, instanceID, 0) + err = q.Resolve(context.Background(), rt, instanceID, 0) require.NoError(t, err) require.NotEmpty(t, q.Result) fields := q.Result.Schema.Fields @@ -3874,10 +4324,11 @@ func TestMetricsViewsAggregation_comparison_Druid_one_dim_comparison_order(t *te } func TestMetricsViewsAggregation_Druid_comparison_empty_set_previous_sorted(t *testing.T) { - if os.Getenv("LOCALDRUID") == "" { - t.Skip("skipping the test in non-local Druid environment") + if os.Getenv("METRICS_CREDS") == "" { + t.Skip("skipping the test without metrics-in") } - rt, instanceID := testruntime.NewInstanceForDruidProject(t) + rt, instanceID, err := testruntime.NewInstanceForDruidProject(t) + require.NoError(t, err) limit := int64(10) q := &queries.MetricsViewAggregation{ @@ -3929,7 +4380,7 @@ func TestMetricsViewsAggregation_Druid_comparison_empty_set_previous_sorted(t *t }, Limit: &limit, } - err := q.Resolve(context.Background(), rt, instanceID, 0) + err = q.Resolve(context.Background(), rt, instanceID, 0) require.NoError(t, err) require.NotEmpty(t, q.Result) fields := q.Result.Schema.Fields @@ -3952,10 +4403,11 @@ func TestMetricsViewsAggregation_Druid_comparison_empty_set_previous_sorted(t *t } func TestMetricsViewsAggregation_Druid_comparison_empty_set(t *testing.T) { - if os.Getenv("LOCALDRUID") == "" { - t.Skip("skipping the test in non-local Druid environment") + if os.Getenv("METRICS_CREDS") == "" { + t.Skip("skipping the test without metrics-in") } - rt, instanceID := testruntime.NewInstanceForDruidProject(t) + rt, instanceID, err := testruntime.NewInstanceForDruidProject(t) + require.NoError(t, err) limit := int64(10) q := &queries.MetricsViewAggregation{ @@ -4007,7 +4459,7 @@ func TestMetricsViewsAggregation_Druid_comparison_empty_set(t *testing.T) { }, Limit: &limit, } - err := q.Resolve(context.Background(), rt, instanceID, 0) + err = q.Resolve(context.Background(), rt, instanceID, 0) require.NoError(t, err) require.NotEmpty(t, q.Result) fields := q.Result.Schema.Fields @@ -4030,10 +4482,11 @@ func TestMetricsViewsAggregation_Druid_comparison_empty_set(t *testing.T) { } func TestMetricsViewsAggregation_Druid_comparison(t *testing.T) { - if os.Getenv("LOCALDRUID") == "" { - t.Skip("skipping the test in non-local Druid environment") + if os.Getenv("METRICS_CREDS") == "" { + t.Skip("skipping the test without metrics-in") } - rt, instanceID := testruntime.NewInstanceForDruidProject(t) + rt, instanceID, err := testruntime.NewInstanceForDruidProject(t) + require.NoError(t, err) limit := int64(10) q := &queries.MetricsViewAggregation{ @@ -4107,7 +4560,7 @@ func TestMetricsViewsAggregation_Druid_comparison(t *testing.T) { }, Limit: &limit, } - err := q.Resolve(context.Background(), rt, instanceID, 0) + err = q.Resolve(context.Background(), rt, instanceID, 0) require.NoError(t, err) require.NotEmpty(t, q.Result) fields := q.Result.Schema.Fields @@ -4133,10 +4586,11 @@ func TestMetricsViewsAggregation_Druid_comparison(t *testing.T) { } func TestMetricsViewsAggregation_Druid_comparison_measure_filter_with_having(t *testing.T) { - if os.Getenv("LOCALDRUID") == "" { - t.Skip("skipping the test in non-local Druid environment") + if os.Getenv("METRICS_CREDS") == "" { + t.Skip("skipping the test without metrics-in") } - rt, instanceID := testruntime.NewInstanceForDruidProject(t) + rt, instanceID, err := testruntime.NewInstanceForDruidProject(t) + require.NoError(t, err) limit := int64(10) q := &queries.MetricsViewAggregation{ @@ -4224,7 +4678,7 @@ func TestMetricsViewsAggregation_Druid_comparison_measure_filter_with_having(t * }, Limit: &limit, } - err := q.Resolve(context.Background(), rt, instanceID, 0) + err = q.Resolve(context.Background(), rt, instanceID, 0) require.NoError(t, err) require.NotEmpty(t, q.Result) fields := q.Result.Schema.Fields @@ -4251,10 +4705,11 @@ func TestMetricsViewsAggregation_Druid_comparison_measure_filter_with_having(t * } func TestMetricsViewsAggregation_Druid_comparison_measure_filter(t *testing.T) { - if os.Getenv("LOCALDRUID") == "" { - t.Skip("skipping the test in non-local Druid environment") + if os.Getenv("METRICS_CREDS") == "" { + t.Skip("skipping the test without metrics-in") } - rt, instanceID := testruntime.NewInstanceForDruidProject(t) + rt, instanceID, err := testruntime.NewInstanceForDruidProject(t) + require.NoError(t, err) limit := int64(10) q := &queries.MetricsViewAggregation{ @@ -4341,7 +4796,7 @@ func TestMetricsViewsAggregation_Druid_comparison_measure_filter(t *testing.T) { }, Limit: &limit, } - err := q.Resolve(context.Background(), rt, instanceID, 0) + err = q.Resolve(context.Background(), rt, instanceID, 0) require.NoError(t, err) require.NotEmpty(t, q.Result) fields := q.Result.Schema.Fields @@ -4364,20 +4819,21 @@ func TestMetricsViewsAggregation_Druid_comparison_measure_filter(t *testing.T) { require.Equal(t, 4, len(rows)) i = 0 - require.Equal(t, "Google,google.com,2022-01-01T00:00:00Z,2022-01-01T00:00:00Z,null,null,null,null", fieldsToString2digits(rows[i], "pub", "dom", "__time", "timestamp_year", "m1", "m1_p", "__time__previous", "timestamp_year__previous")) + require.Equal(t, "Google,google.com,2022-01-01T00:00:00Z,2022-01-01T00:00:00Z,null,null,2022-01-02T00:00:00Z,2022-01-01T00:00:00Z", fieldsToString2digits(rows[i], "pub", "dom", "__time", "timestamp_year", "m1", "m1_p", "__time__previous", "timestamp_year__previous")) i++ require.Equal(t, "Google,news.google.com,2022-01-01T00:00:00Z,2022-01-01T00:00:00Z,3.55,3.74,2022-01-02T00:00:00Z,2022-01-01T00:00:00Z", fieldsToString2digits(rows[i], "pub", "dom", "__time", "timestamp_year", "m1", "m1_p", "__time__previous", "timestamp_year__previous")) i++ - require.Equal(t, "Yahoo,news.yahoo.com,2022-01-01T00:00:00Z,2022-01-01T00:00:00Z,null,null,null,null", fieldsToString2digits(rows[i], "pub", "dom", "__time", "timestamp_year", "m1", "m1_p", "__time__previous", "timestamp_year__previous")) + require.Equal(t, "Yahoo,news.yahoo.com,2022-01-01T00:00:00Z,2022-01-01T00:00:00Z,null,null,2022-01-02T00:00:00Z,2022-01-01T00:00:00Z", fieldsToString2digits(rows[i], "pub", "dom", "__time", "timestamp_year", "m1", "m1_p", "__time__previous", "timestamp_year__previous")) i++ - require.Equal(t, "Yahoo,sports.yahoo.com,2022-01-01T00:00:00Z,2022-01-01T00:00:00Z,null,null,null,null", fieldsToString2digits(rows[i], "pub", "dom", "__time", "timestamp_year", "m1", "m1_p", "__time__previous", "timestamp_year__previous")) + require.Equal(t, "Yahoo,sports.yahoo.com,2022-01-01T00:00:00Z,2022-01-01T00:00:00Z,null,null,2022-01-02T00:00:00Z,2022-01-01T00:00:00Z", fieldsToString2digits(rows[i], "pub", "dom", "__time", "timestamp_year", "m1", "m1_p", "__time__previous", "timestamp_year__previous")) } func TestMetricsViewsAggregation_Druid_comparison_with_offset(t *testing.T) { - if os.Getenv("LOCALDRUID") == "" { - t.Skip("skipping the test in non-local Druid environment") + if os.Getenv("METRICS_CREDS") == "" { + t.Skip("skipping the test without metrics-in") } - rt, instanceID := testruntime.NewInstanceForDruidProject(t) + rt, instanceID, err := testruntime.NewInstanceForDruidProject(t) + require.NoError(t, err) limit := int64(2) q := &queries.MetricsViewAggregation{ @@ -4400,7 +4856,10 @@ func TestMetricsViewsAggregation_Druid_comparison_with_offset(t *testing.T) { }, }, }, - // Having: expressionpb.Gt("measure_1", 0.0), + Where: expressionpb.OrAll( + expressionpb.Eq("pub", "Yahoo"), + expressionpb.Eq("pub", "Google"), + ), Sort: []*runtimev1.MetricsViewAggregationSort{ { Name: "dom", @@ -4419,7 +4878,7 @@ func TestMetricsViewsAggregation_Druid_comparison_with_offset(t *testing.T) { Limit: &limit, Offset: 1, } - err := q.Resolve(context.Background(), rt, instanceID, 0) + err = q.Resolve(context.Background(), rt, instanceID, 0) require.NoError(t, err) require.NotEmpty(t, q.Result) fields := q.Result.Schema.Fields @@ -4442,9 +4901,9 @@ func TestMetricsViewsAggregation_Druid_comparison_with_offset(t *testing.T) { require.Equal(t, 2, len(rows)) i = 0 - require.Equal(t, "news.yahoo.com,1.50,1.53", fieldsToString2digits(rows[i], "dom", "m1", "m1__p")) + require.Equal(t, "news.yahoo.com,1.49,1.51", fieldsToString2digits(rows[i], "dom", "m1", "m1__p")) i++ - require.Equal(t, "news.google.com,3.59,3.69", fieldsToString2digits(rows[i], "dom", "m1", "m1__p")) + require.Equal(t, "news.google.com,3.55,3.74", fieldsToString2digits(rows[i], "dom", "m1", "m1__p")) } func TestMetricsViewsAggregation_comparison_with_offset(t *testing.T) { diff --git a/runtime/queries/metricsview_comparison_toplist_test.go b/runtime/queries/metricsview_comparison_toplist_test.go index e56835328da..35837f9e182 100644 --- a/runtime/queries/metricsview_comparison_toplist_test.go +++ b/runtime/queries/metricsview_comparison_toplist_test.go @@ -644,11 +644,12 @@ func TestServer_MetricsViewTimeseries_export_csv(t *testing.T) { } func TestMetricsViewsComparison_Druid_comparsion_no_dim_values(t *testing.T) { - if os.Getenv("LOCALDRUID") == "" { - t.Skip("skipping the test in non-local Druid environment") + if os.Getenv("METRICS_CREDS") == "" { + t.Skip("skipping the test without metrics-in") } - rt, instanceID := testruntime.NewInstanceForDruidProject(t) + rt, instanceID, err := testruntime.NewInstanceForDruidProject(t) + require.NoError(t, err) q := &queries.MetricsViewComparison{ MetricsViewName: "ad_bids_metrics", @@ -681,7 +682,7 @@ func TestMetricsViewsComparison_Druid_comparsion_no_dim_values(t *testing.T) { Limit: 250, } - err := q.Resolve(context.Background(), rt, instanceID, 0) + err = q.Resolve(context.Background(), rt, instanceID, 0) require.NoError(t, err) require.Empty(t, q.Result) } diff --git a/runtime/reconcilers/metrics_view.go b/runtime/reconcilers/metrics_view.go index a5bed6c61fb..b76a21195d0 100644 --- a/runtime/reconcilers/metrics_view.go +++ b/runtime/reconcilers/metrics_view.go @@ -76,6 +76,7 @@ func (r *MetricsViewReconciler) Reconcile(ctx context.Context, n *runtimev1.Reso if validateErr == nil { validateErr = validateResult.Error() } + if ctx.Err() != nil { return runtime.ReconcileResult{Err: errors.Join(validateErr, ctx.Err())} } diff --git a/runtime/testruntime/testdata/ad_bids/dashboards/ad_bids_tags_metrics.yaml b/runtime/testruntime/testdata/ad_bids/dashboards/ad_bids_tags_metrics.yaml new file mode 100644 index 00000000000..56bbe1dd0e3 --- /dev/null +++ b/runtime/testruntime/testdata/ad_bids/dashboards/ad_bids_tags_metrics.yaml @@ -0,0 +1,38 @@ +model: ad_bids_tags +display_name: Ad bids tags +description: + +timeseries: timestamp +smallest_time_grain: "" + +dimensions: + - label: Publisher + name: pub + column: publisher + - label: Domain + name: dom + column: domain + - label: Tags + name: tags + column: tags + unnest: true + +measures: + - label: "Number of bids" + expression: count(*) + description: "" + format_preset: "" + - label: "Average bid price" + expression: avg(bid_price) + description: "" + format_preset: "" + - name: m1 + expression: avg(bid_price) + description: "" + format_preset: "" + - name: bid_price + expression: avg(bid_price) + description: "" + format_preset: "" + + diff --git a/runtime/testruntime/testdata/ad_bids/models/ad_bids_tags.sql b/runtime/testruntime/testdata/ad_bids/models/ad_bids_tags.sql new file mode 100644 index 00000000000..27746d9fb58 --- /dev/null +++ b/runtime/testruntime/testdata/ad_bids/models/ad_bids_tags.sql @@ -0,0 +1,8 @@ +select + id, + timestamp, + publisher, + domain, + bid_price, + ['a','b'] tags +from ad_bids_source diff --git a/runtime/testruntime/testdata/ad_bids_druid/dashboards/ad_bids_metrics.yaml b/runtime/testruntime/testdata/ad_bids_druid/dashboards/ad_bids_metrics.yaml index fcef652173a..17e28585b3e 100644 --- a/runtime/testruntime/testdata/ad_bids_druid/dashboards/ad_bids_metrics.yaml +++ b/runtime/testruntime/testdata/ad_bids_druid/dashboards/ad_bids_metrics.yaml @@ -1,4 +1,4 @@ -model: ad_bids +model: AdBids display_name: Ad bids description: @@ -18,12 +18,16 @@ dimensions: property: publisher - label: Space Label name: space_label - expression: "publisher" + expression: publisher + - label: tags + name: tags + column: "tags" - label: id name: id expression: "id" + measures: - label: "Number of bids" expression: count(*) diff --git a/runtime/testruntime/testruntime.go b/runtime/testruntime/testruntime.go index 89fcc22e244..bd14c17ad27 100644 --- a/runtime/testruntime/testruntime.go +++ b/runtime/testruntime/testruntime.go @@ -2,12 +2,15 @@ package testruntime import ( "context" + "encoding/base64" "fmt" "maps" + "net/url" "os" "path/filepath" goruntime "runtime" "strconv" + "strings" "github.com/c2h5oh/datasize" runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" @@ -64,9 +67,11 @@ func New(t TestingT) *runtime.Runtime { } logger := zap.NewNop() - // nolint - // logger, err := zap.NewDevelopment() - // require.NoError(t, err) + var err error + if os.Getenv("DEBUG") == "1" { + logger, err = zap.NewDevelopment() + require.NoError(t, err) + } rt, err := runtime.New(context.Background(), opts, logger, activity.NewNoopClient(), email.New(email.NewTestSender())) require.NoError(t, err) @@ -223,11 +228,25 @@ func NewInstanceForProject(t TestingT, name string) (*runtime.Runtime, string) { return rt, inst.ID } -func NewInstanceForDruidProject(t TestingT) (*runtime.Runtime, string) { +func NewInstanceForDruidProject(t TestingT) (*runtime.Runtime, string, error) { rt := New(t) _, currentFile, _, _ := goruntime.Caller(0) projectPath := filepath.Join(currentFile, "..", "testdata", "ad_bids_druid") + var err error + creds := os.Getenv("METRICS_CREDS") + data, err := base64.StdEncoding.DecodeString(creds) + if err != nil { + return nil, "", err + } + + splits := strings.Split(string(data), ":") + if len(splits) < 2 { + return nil, "", fmt.Errorf("incorrect credentials") + } + for i := range splits { + splits[i] = url.QueryEscape(splits[i]) + } inst := &drivers.Instance{ Environment: "test", @@ -244,7 +263,7 @@ func NewInstanceForDruidProject(t TestingT) (*runtime.Runtime, string) { { Type: "druid", Name: "druid", - Config: map[string]string{"dsn": "http://localhost:8888/druid/v2/sql"}, + Config: map[string]string{"dsn": fmt.Sprintf("https://%s:%s@druid.master.in.rilldata.io/druid/v2/sql", splits[0], splits[1])}, }, { Type: "sqlite", @@ -257,7 +276,7 @@ func NewInstanceForDruidProject(t TestingT) (*runtime.Runtime, string) { // EmbedCatalog: true, } - err := rt.CreateInstance(context.Background(), inst) + err = rt.CreateInstance(context.Background(), inst) require.NoError(t, err) require.NotEmpty(t, inst.ID) @@ -270,5 +289,5 @@ func NewInstanceForDruidProject(t TestingT) (*runtime.Runtime, string) { err = ctrl.WaitUntilIdle(context.Background(), false) require.NoError(t, err) - return rt, inst.ID + return rt, inst.ID, nil }