Skip to content

Commit

Permalink
planner: support loading queries from statement_summary to run Index …
Browse files Browse the repository at this point in the history
…Advisor (pingcap#56160)

ref pingcap#12303
  • Loading branch information
qw4990 authored and winoros committed Sep 23, 2024
1 parent 7d5f24b commit 076fe76
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 15 deletions.
10 changes: 7 additions & 3 deletions pkg/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2748,11 +2748,15 @@ func (e *RecommendIndexExec) Next(ctx context.Context, req *chunk.Chunk) error {
return fmt.Errorf("unsupported action: %s", e.Action)
}

results, err := indexadvisor.AdviseIndexes(ctx, e.Ctx(), &indexadvisor.Option{
opt := &indexadvisor.Option{
MaxNumIndexes: 3,
MaxIndexWidth: 3,
SpecifiedSQLs: []string{e.SQL},
})
}
if e.SQL != "" {
opt.SpecifiedSQLs = []string{e.SQL}
}

results, err := indexadvisor.AdviseIndexes(ctx, e.Ctx(), opt)

for _, r := range results {
req.AppendString(0, r.Database)
Expand Down
3 changes: 0 additions & 3 deletions pkg/planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5731,9 +5731,6 @@ func (*PlanBuilder) buildRecommendIndex(v *ast.RecommendIndexStmt) (base.Plan, e

switch v.Action {
case "run":
if v.SQL == "" {
return nil, errors.New("recommend index SQL is empty")
}
schema := newColumnsWithNames(7)
schema.Append(buildColumnWithName("", "database", mysql.TypeVarchar, 64))
schema.Append(buildColumnWithName("", "table", mysql.TypeVarchar, 64))
Expand Down
3 changes: 3 additions & 0 deletions pkg/planner/indexadvisor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
deps = [
"//pkg/domain",
"//pkg/infoschema",
"//pkg/kv",
"//pkg/meta/model",
"//pkg/parser",
"//pkg/parser/ast",
Expand All @@ -24,10 +25,12 @@ go_library(
"//pkg/sessionctx",
"//pkg/types",
"//pkg/types/parser_driver",
"//pkg/util/chunk",
"//pkg/util/intest",
"//pkg/util/logutil",
"//pkg/util/parser",
"//pkg/util/set",
"//pkg/util/sqlexec",
"@com_github_google_uuid//:uuid",
"@org_uber_go_zap//:zap",
],
Expand Down
3 changes: 3 additions & 0 deletions pkg/planner/indexadvisor/algorithm.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,9 @@ func (aa *autoAdmin) heuristicCoveredIndexes(
var bestCoverIndex Index
var bestCoverIndexCost IndexSetCost
for i, coverIndex := range coverIndexSet.ToList() {
if candidateIndexes.Contains(coverIndex) {
continue // the new generated cover-index is duplicated
}
candidateIndexes.Add(coverIndex)
cost, err := evaluateIndexSetCost(querySet, aa.optimizer, candidateIndexes)
if err != nil {
Expand Down
49 changes: 41 additions & 8 deletions pkg/planner/indexadvisor/indexadvisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"math"
"sort"
"strings"

Expand Down Expand Up @@ -92,8 +93,6 @@ func AdviseIndexes(ctx context.Context, sctx sessionctx.Context,
func prepareQuerySet(ctx context.Context, sctx sessionctx.Context,
defaultDB string, opt Optimizer, specifiedSQLs []string) (s.Set[Query], error) {
advisorLogger().Info("prepare target query set")
defer advisorLogger().Info("prepare target query set finished")

querySet := s.NewSet[Query]()
if len(specifiedSQLs) > 0 { // if target queries are specified
for _, sql := range specifiedSQLs {
Expand All @@ -104,9 +103,12 @@ func prepareQuerySet(ctx context.Context, sctx sessionctx.Context,
querySet = ctx.Value(TestKey("query_set")).(s.Set[Query])
} else {
var err error
if querySet, err = loadQuerySetFromStmtSummary(sctx, defaultDB); err != nil {
if querySet, err = loadQuerySetFromStmtSummary(sctx); err != nil {
return nil, err
}
if querySet.Size() == 0 {
return nil, errors.New("can't get any queries from statements_summary")
}
}
}

Expand All @@ -124,12 +126,39 @@ func prepareQuerySet(ctx context.Context, sctx sessionctx.Context,
if err != nil {
return nil, err
}
advisorLogger().Info("finish query preparation", zap.Int("num_query", querySet.Size()))
return querySet, nil
}

func loadQuerySetFromStmtSummary(sessionctx.Context, string) (s.Set[Query], error) {
// TODO: load target queries from statement_summary automatically
return nil, errors.New("not implemented yet")
func loadQuerySetFromStmtSummary(sctx sessionctx.Context) (s.Set[Query], error) {
sql := `SELECT any_value(schema_name) as schema_name,
any_value(query_sample_text) as query_sample_text,
sum(cast(exec_count as double)) as exec_count
FROM information_schema.statements_summary_history
WHERE stmt_type = "Select" AND
summary_begin_time >= date_sub(now(), interval 1 day) AND
prepared = 0 AND
upper(schema_name) not in ("MYSQL", "INFORMATION_SCHEMA", "METRICS_SCHEMA", "PERFORMANCE_SCHEMA")
GROUP BY digest
ORDER BY sum(exec_count) DESC
LIMIT 5000`
rows, err := exec(sctx, sql)
if err != nil {
return nil, err
}

querySet := s.NewSet[Query]()
for _, r := range rows {
schemaName := r.GetString(0)
queryText := r.GetString(1)
execCount := r.GetFloat64(2)
querySet.Add(Query{
SchemaName: schemaName,
Text: queryText,
Frequency: int(execCount),
})
}
return querySet, nil
}

func prepareRecommendation(indexes s.Set[Index], queries s.Set[Query], optimizer Optimizer) ([]*Recommendation, error) {
Expand Down Expand Up @@ -181,7 +210,7 @@ func prepareRecommendation(indexes s.Set[Index], queries s.Set[Query], optimizer
workloadCostBefore += costBefore * float64(query.Frequency)
workloadCostAfter += costAfter * float64(query.Frequency)

queryImprovement := (costBefore - costAfter) / costBefore
queryImprovement := round((costBefore-costAfter)/costBefore, 6)
if queryImprovement < 0.0001 {
continue // this query has no benefit
}
Expand All @@ -204,7 +233,7 @@ func prepareRecommendation(indexes s.Set[Index], queries s.Set[Query], optimizer
workloadCostBefore += 0.1
workloadCostAfter += 0.1
}
workloadImpact.WorkloadImprovement = (workloadCostBefore - workloadCostAfter) / workloadCostBefore
workloadImpact.WorkloadImprovement = round((workloadCostBefore-workloadCostAfter)/workloadCostBefore, 6)

if workloadImpact.WorkloadImprovement < 0.000001 || len(indexResult.TopImpactedQueries) == 0 {
continue // this index has no benefit
Expand All @@ -219,6 +248,10 @@ Range Predicate clause(s) in query '%v'`, cols, normText)
return results, nil
}

func round(v float64, n int) float64 {
return math.Round(v*math.Pow(10, float64(n))) / math.Pow(10, float64(n))
}

func gracefulIndexName(opt Optimizer, schema, tableName string, cols []string) string {
indexName := fmt.Sprintf("idx_%v", strings.Join(cols, "_"))
if len(indexName) > 64 {
Expand Down
2 changes: 2 additions & 0 deletions pkg/planner/indexadvisor/indexadvisor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ func TestIndexAdvisorBasic1(t *testing.T) {
check(nil, t, tk, "test.t.a", option("select * from t where a=1"))
check(nil, t, tk, "test.t.a,test.t.b",
option("select * from t where a=1; select * from t where b=1"))
check(nil, t, tk, "test.t.a,test.t.b",
option("select a from t where a=1; select b from t where b=1"))
}

func TestIndexAdvisorBasic2(t *testing.T) {
Expand Down
24 changes: 24 additions & 0 deletions pkg/planner/indexadvisor/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,24 @@
package indexadvisor

import (
"context"
"fmt"
"sort"
"strings"

"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/opcode"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/types"
driver "github.com/pingcap/tidb/pkg/types/parser_driver"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/logutil"
parser2 "github.com/pingcap/tidb/pkg/util/parser"
s "github.com/pingcap/tidb/pkg/util/set"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -524,3 +529,22 @@ func evaluateIndexSetCost(

return IndexSetCost{workloadCost, totCols, strings.Join(keys, ",")}, nil
}

func exec(sctx sessionctx.Context, sql string) (ret []chunk.Row, err error) {
executor := sctx.(sqlexec.SQLExecutor)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
result, err := executor.ExecuteInternal(ctx, sql)
if err != nil {
return nil, fmt.Errorf("execute %v failed: %v", sql, err)
}
if result == nil {
return nil, nil
}
defer func() {
closeErr := result.Close()
if err == nil {
err = closeErr
}
}()
return sqlexec.DrainRecordSet(context.Background(), result, 64)
}
2 changes: 1 addition & 1 deletion pkg/util/stmtsummary/v2/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ go_test(
"table_test.go",
],
flaky = True,
shard_count = 12,
shard_count = 13,
deps = [
"//pkg/config",
"//pkg/kv",
Expand Down
20 changes: 20 additions & 0 deletions pkg/util/stmtsummary/v2/tests/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,26 @@ import (
"github.com/stretchr/testify/require"
)

func TestStmtSummaryIndexAdvisor(t *testing.T) {
setupStmtSummary()
defer closeStmtSummary()
store := testkit.CreateMockStore(t)
tk := newTestKitWithRoot(t, store)
tk.MustExec(`use test`)
tk.MustExec(`create table t (a int, b int, c int)`)

tk.MustQueryToErr(`recommend index run`) // no query

tk.MustQuery(`select a from t where a=1`)
rs := tk.MustQuery(`recommend index run`).Sort().Rows()
require.Equal(t, rs[0][2], "idx_a")

tk.MustQuery(`select b from t where b=1`)
rs = tk.MustQuery(`recommend index run`).Sort().Rows()
require.Equal(t, rs[0][2], "idx_a")
require.Equal(t, rs[1][2], "idx_b")
}

func TestStmtSummaryTable(t *testing.T) {
setupStmtSummary()
defer closeStmtSummary()
Expand Down

0 comments on commit 076fe76

Please sign in to comment.