diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ae3ff919b..e61be9b97 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,6 +13,15 @@ jobs: build: # Use new enough OS that has CGroupsv2 enabled (required by the integration tests) runs-on: ubuntu-22.04 + env: + TEST_DATABASE_URL: postgresql://postgres:postgres@localhost:5432/postgres?sslmode=disable + services: + postgres: + image: postgres:14 + env: + POSTGRES_PASSWORD: postgres + ports: + - 5432:5432 steps: diff --git a/input/postgres/establish_connection.go b/input/postgres/establish_connection.go index 8b216b316..c19b77d3b 100644 --- a/input/postgres/establish_connection.go +++ b/input/postgres/establish_connection.go @@ -4,7 +4,6 @@ import ( "context" "database/sql" "fmt" - "time" "github.com/aws/aws-sdk-go/service/rds/rdsutils" "github.com/pganalyze/collector/config" @@ -78,7 +77,6 @@ func connectToDb(ctx context.Context, config config.ServerConfig, logger *util.L } db.SetMaxOpenConns(1) - db.SetConnMaxLifetime(30 * time.Second) err = db.PingContext(ctx) if err != nil { diff --git a/input/postgres/explain_analyze.go b/input/postgres/explain_analyze.go new file mode 100644 index 000000000..a4ba85692 --- /dev/null +++ b/input/postgres/explain_analyze.go @@ -0,0 +1,68 @@ +package postgres + +import ( + "context" + "database/sql" + "fmt" + "strings" + + "github.com/guregu/null" + "github.com/lib/pq" + "github.com/pganalyze/collector/util" +) + +func RunExplainAnalyzeForQueryRun(ctx context.Context, db *sql.DB, query string, parameters []null.String, parameterTypes []string, marker string) (result string, err error) { + err = validateQuery(query) + if err != nil { + return + } + + // Warm up caches without collecting timing info (slightly faster) + _, err = runExplainAnalyze(ctx, db, query, parameters, parameterTypes, []string{"ANALYZE", "TIMING OFF"}, marker) + if err != nil { + if !strings.Contains(err.Error(), "statement timeout") { + return + } + + // Run again if it was a timeout error, to make sure we got the caches warmed up all the way + _, err = runExplainAnalyze(ctx, db, query, parameters, parameterTypes, []string{"ANALYZE", "TIMING OFF"}, marker) + if err != nil { + if !strings.Contains(err.Error(), "statement timeout") { + return + } + + // If it timed out again, capture a non-ANALYZE EXPLAIN instead + return runExplainAnalyze(ctx, db, query, parameters, parameterTypes, []string{}, marker) + } + } + + // Run EXPLAIN ANALYZE once more to get a warm cache result (this is the one we return) + return runExplainAnalyze(ctx, db, query, parameters, parameterTypes, []string{"ANALYZE", "BUFFERS"}, marker) +} + +func runExplainAnalyze(ctx context.Context, db *sql.DB, query string, parameters []null.String, parameterTypes []string, analyzeFlags []string, marker string) (explainOutput string, err error) { + tx, err := db.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) + if err != nil { + return "", err + } + defer tx.Rollback() + + err = tx.QueryRowContext(ctx, marker+"SELECT pganalyze.explain_analyze($1, $2, $3, $4)", marker+query, pq.Array(parameters), pq.Array(parameterTypes), pq.Array(analyzeFlags)).Scan(&explainOutput) + + return +} + +func validateQuery(query string) error { + var isUtil []bool + // To be on the safe side never EXPLAIN a statement that can't be parsed, + // or multiple statements in one (leading to accidental execution) + isUtil, err := util.IsUtilityStmt(query) + if err != nil || len(isUtil) != 1 || isUtil[0] { + err = fmt.Errorf("query is not permitted to run (multi-statement or utility command?)") + return err + } + + // TODO: Consider adding additional checks here (e.g. blocking known bad function calls) + + return nil +} diff --git a/input/postgres/explain_analyze_test.go b/input/postgres/explain_analyze_test.go new file mode 100644 index 000000000..f0b729b6f --- /dev/null +++ b/input/postgres/explain_analyze_test.go @@ -0,0 +1,445 @@ +package postgres_test + +import ( + "context" + "database/sql" + "fmt" + "os" + "regexp" + "testing" + "time" + + "github.com/guregu/null" + "github.com/lib/pq" + "github.com/pganalyze/collector/input/postgres" + "github.com/pganalyze/collector/util" +) + +type helperTestpair struct { + query string + params []null.String + paramTypes []string + analyzeFlags []string + expectedOutput string + expectedError string +} + +var helperTests = []helperTestpair{ + { + "SELECT 1", + []null.String{}, + []string{}, + []string{"VERBOSE OFF", "COSTS OFF", "ANALYZE", "TIMING OFF"}, + `[ + { + "Plan": { + "Node Type": "Result", + "Parallel Aware": false, + "Async Capable": false, + "Actual Rows": 1, + "Actual Loops": 1 + }, + "Planning Time": XXXXX, + "Triggers": [ + ], + "Execution Time": XXXXX + } +]`, + "", + }, + { + "SELECT pg_reload_conf()", + []null.String{}, + []string{}, + []string{"VERBOSE OFF", "COSTS OFF", "ANALYZE", "TIMING OFF"}, + "", + "pq: permission denied for function pg_reload_conf", + }, + { + "UPDATE test SET id = 123", + []null.String{}, + []string{}, + []string{"VERBOSE OFF", "COSTS OFF", "ANALYZE", "TIMING OFF"}, + "", + "pq: cannot execute UPDATE in a read-only transaction", + }, + { + "SELECT 1; UPDATE test SET id = 123", + []null.String{}, + []string{}, + []string{"VERBOSE OFF", "COSTS OFF", "ANALYZE", "TIMING OFF"}, + "", + "pq: cannot run pganalyze.explain_analyze helper with a multi-statement query", + }, + { + "SELECT $1", + /* EXECUTE pganalyze_explain_analyze (*/ []null.String{null.StringFrom("1); SELECT ('data'")}, /* ) */ + []string{}, + []string{"COSTS OFF"}, + `[ + { + "Plan": { + "Node Type": "Result", + "Parallel Aware": false, + "Async Capable": false, + "Output": ["'1); SELECT (''data'''::text"] + } + } +]`, + "", + }, + { + "SELECT $1", + []null.String{null.StringFrom("dummy")}, + /* PREPARE pganalyze_explain_analyze (*/ []string{"text) AS SELECT 'data'; PREPARE dummy (text"}, /*) AS [query] */ + []string{"COSTS OFF"}, + "", + `pq: type "text) AS SELECT 'data'; PREPARE dummy (text" does not exist`, + }, + { + "SELECT 'data'", + []null.String{}, + []string{}, + []string{"FORMAT JSON) SELECT 1; UPDATE test SET id = 123; EXPLAIN (COSTS OFF"}, + "", + "pq: cannot run pganalyze.explain_analyze helper with invalid flag", + }, + // Cases that are worth documenting by test (but they are not bugs, just things worth noting) + { + // DML statements for EXPLAIN (without ANALYZE) are permitted, if access is granted (they don't violate the rules of a read only transaction) + "UPDATE test SET id = 123", + []null.String{}, + []string{}, + []string{"VERBOSE OFF", "COSTS OFF"}, + `[ + { + "Plan": { + "Node Type": "ModifyTable", + "Operation": "Update", + "Parallel Aware": false, + "Async Capable": false, + "Relation Name": "test", + "Alias": "test", + "Plans": [ + { + "Node Type": "Seq Scan", + "Parent Relationship": "Outer", + "Parallel Aware": false, + "Async Capable": false, + "Relation Name": "test", + "Alias": "test" + } + ] + } + } +]`, + "", + }, +} + +func TestExplainAnalyzeHelper(t *testing.T) { + db := setupTest(t) + defer db.Close() + + for _, pair := range helperTests { + var output string + var errStr string + err := db.QueryRow("SELECT pganalyze.explain_analyze($1, $2, $3, $4)", pair.query, pq.Array(pair.params), pq.Array(pair.paramTypes), pq.Array(pair.analyzeFlags)).Scan(&output) + if err != nil { + errStr = fmt.Sprintf("%s", err) + } + + // Avoid differences in test runs by masking total planning/execution time stats + re := regexp.MustCompile(`("(Planning|Execution) Time":) [\d.]+`) + output = re.ReplaceAllString(output, "$1 XXXXX") + + if output != pair.expectedOutput { + t.Errorf("Incorrect output for query '%s' (direct):\n got: %s\n expected: %s", pair.query, output, pair.expectedOutput) + } + + if errStr != pair.expectedError { + t.Errorf("Incorrect error for query '%s' (direct):\n got: %s\n expected: %s", pair.query, errStr, pair.expectedError) + } + } +} + +type queryRunTestpair struct { + query string + params []null.String + paramTypes []string + expectedOutput string + expectedError string +} + +var queryRunTests = []queryRunTestpair{ + { + "SELECT 1", + []null.String{}, + []string{}, + `[ + { + "Plan": { + "Node Type": "Result", + "Parallel Aware": false, + "Async Capable": false, + "Startup Cost": XXXXX, + "Total Cost": XXXXX, + "Plan Rows": 1, + "Plan Width": 4, + "Actual Startup Time": XXXXX, + "Actual Total Time": XXXXX, + "Actual Rows": 1, + "Actual Loops": 1, + "Output": ["1"], + "Shared Hit Blocks": 0, + "Shared Read Blocks": 0, + "Shared Dirtied Blocks": 0, + "Shared Written Blocks": 0, + "Local Hit Blocks": 0, + "Local Read Blocks": 0, + "Local Dirtied Blocks": 0, + "Local Written Blocks": 0, + "Temp Read Blocks": 0, + "Temp Written Blocks": 0 + }, + "Planning": { + "Shared Hit Blocks": 0, + "Shared Read Blocks": 0, + "Shared Dirtied Blocks": 0, + "Shared Written Blocks": 0, + "Local Hit Blocks": 0, + "Local Read Blocks": 0, + "Local Dirtied Blocks": 0, + "Local Written Blocks": 0, + "Temp Read Blocks": 0, + "Temp Written Blocks": 0 + }, + "Planning Time": XXXXX, + "Triggers": [ + ], + "Execution Time": XXXXX + } +]`, + "", + }, + { + "SELECT pg_reload_conf()", + []null.String{}, + []string{}, + "", + "pq: permission denied for function pg_reload_conf", + }, + { + "UPDATE test SET id = 123", + []null.String{}, + []string{}, + "", + "pq: cannot execute UPDATE in a read-only transaction", + }, + { + "SELECT 1; UPDATE test SET id = 123", + []null.String{}, + []string{}, + "", + "query is not permitted to run (multi-statement or utility command?)", + }, + { + "SELECT $1", + /* EXECUTE pganalyze_explain_analyze (*/ []null.String{null.StringFrom("1); SELECT ('data'")}, /* ) */ + []string{}, + `[ + { + "Plan": { + "Node Type": "Result", + "Parallel Aware": false, + "Async Capable": false, + "Startup Cost": XXXXX, + "Total Cost": XXXXX, + "Plan Rows": 1, + "Plan Width": 32, + "Actual Startup Time": XXXXX, + "Actual Total Time": XXXXX, + "Actual Rows": 1, + "Actual Loops": 1, + "Output": ["'1); SELECT (''data'''::text"], + "Shared Hit Blocks": 0, + "Shared Read Blocks": 0, + "Shared Dirtied Blocks": 0, + "Shared Written Blocks": 0, + "Local Hit Blocks": 0, + "Local Read Blocks": 0, + "Local Dirtied Blocks": 0, + "Local Written Blocks": 0, + "Temp Read Blocks": 0, + "Temp Written Blocks": 0 + }, + "Planning": { + "Shared Hit Blocks": 0, + "Shared Read Blocks": 0, + "Shared Dirtied Blocks": 0, + "Shared Written Blocks": 0, + "Local Hit Blocks": 0, + "Local Read Blocks": 0, + "Local Dirtied Blocks": 0, + "Local Written Blocks": 0, + "Temp Read Blocks": 0, + "Temp Written Blocks": 0 + }, + "Planning Time": XXXXX, + "Triggers": [ + ], + "Execution Time": XXXXX + } +]`, + "", + }, + { + "SELECT $1", + []null.String{null.StringFrom("dummy")}, + /* PREPARE pganalyze_explain_analyze (*/ []string{"text) AS SELECT 'data'; PREPARE dummy (text"}, /*) AS [query] */ + "", + `pq: type "text) AS SELECT 'data'; PREPARE dummy (text" does not exist`, + }, +} + +func TestExplainAnalyzeForQueryRun(t *testing.T) { + db := setupTest(t) + defer db.Close() + + for _, pair := range queryRunTests { + var errStr string + output, err := postgres.RunExplainAnalyzeForQueryRun(context.Background(), db, pair.query, pair.params, pair.paramTypes, "") + if err != nil { + errStr = fmt.Sprintf("%s", err) + } + + // Avoid differences in test runs by masking timing stats + re := regexp.MustCompile(`("(?:Planning Time|Execution Time|Startup Cost|Total Cost|Actual Startup Time|Actual Total Time)":) [\d.]+`) + output = re.ReplaceAllString(output, "$1 XXXXX") + + if output != pair.expectedOutput { + t.Errorf("Incorrect output for query '%s' (via collector code):\n got: %s\n expected: %s", pair.query, output, pair.expectedOutput) + } + + if errStr != pair.expectedError { + t.Errorf("Incorrect error for query '%s' (via collector code):\n got: %s\n expected: %s", pair.query, errStr, pair.expectedError) + } + } +} + +func setupTest(t *testing.T) *sql.DB { + testDatabaseUrl := os.Getenv("TEST_DATABASE_URL") + if testDatabaseUrl == "" { + t.Skipf("Skipping test requiring database connection since TEST_DATABASE_URL is not set") + } + db, err := makeConnection(testDatabaseUrl) + if err != nil { + t.Fatalf("Could not connect to test database: %s", err) + } + + err = setupHelperAndRole(db) + if err != nil { + t.Fatalf("Could not set up helper: %s", err) + } + + _, err = db.Exec("GRANT pg_read_all_data TO pganalyze_explain") + if err != nil { + t.Fatalf("Could not grant permissions: %s", err) + } + + _, err = db.Exec("CREATE TABLE IF NOT EXISTS test (id int)") + if err != nil { + t.Fatalf("Could not create test table: %s", err) + } + + // We're granting the write permissions here to verify the function runs as a read-only transaction + _, err = db.Exec("GRANT ALL ON test TO pganalyze_explain") + if err != nil { + t.Fatalf("Could not GRANT on test table: %s", err) + } + + return db +} + +func makeConnection(testDatabaseUrl string) (*sql.DB, error) { + db, err := sql.Open("postgres", testDatabaseUrl) + if err != nil { + return nil, err + } + + db.SetMaxOpenConns(1) + db.SetConnMaxLifetime(30 * time.Second) + + err = db.Ping() + if err != nil { + db.Close() + return nil, err + } + + // Emit notices to logs to help with function debugging + _, err = db.Exec("SET log_min_messages = NOTICE") + if err != nil { + db.Close() + return nil, err + } + + // Don't generate queryid, to avoid making output different across clusters + _, err = db.Exec("SET compute_query_id = off") + if err != nil { + db.Close() + return nil, err + } + + return db, nil +} + +func setupHelperAndRole(db *sql.DB) (err error) { + // Clean up previous helper and role if it exists + _, err = db.Exec("DROP FUNCTION IF EXISTS pganalyze.explain_analyze(text, text[], text[], text[])") + if err != nil { + return + } + + db.Exec("DROP OWNED BY pganalyze_explain") + _, err = db.Exec("DROP ROLE IF EXISTS pganalyze_explain") + if err != nil { + return + } + + _, err = db.Exec("CREATE ROLE pganalyze_explain") + if err != nil { + return + } + + _, err = db.Exec("CREATE SCHEMA IF NOT EXISTS pganalyze") + if err != nil { + return + } + + _, err = db.Exec("GRANT CREATE ON SCHEMA pganalyze TO pganalyze_explain") + if err != nil { + return + } + + _, err = db.Exec("SET ROLE pganalyze_explain") + if err != nil { + return + } + + _, err = db.Exec(util.ExplainAnalyzeHelper) + if err != nil { + return + } + + _, err = db.Exec("RESET ROLE") + if err != nil { + return + } + + _, err = db.Exec("REVOKE CREATE ON SCHEMA pganalyze FROM pganalyze_explain") + if err != nil { + return + } + + return +} diff --git a/main.go b/main.go index 7d9137f77..1472ef4ac 100644 --- a/main.go +++ b/main.go @@ -50,6 +50,8 @@ func main() { var testExplain bool var testSection string var generateStatsHelperSql string + var generateHelperExplainAnalyzeSql string + var generateHelperExplainAnalyzeRole string var forceStateUpdate bool var configFilename string var stateFilename string @@ -74,6 +76,8 @@ func main() { flag.BoolVar(&testExplain, "test-explain", false, "Tests whether EXPLAIN collection works by issuing a dummy query (ensure log collection works first)") flag.StringVar(&testSection, "test-section", "", "Tests a particular section of the config file, i.e. a specific server, and ignores all other config sections") flag.StringVar(&generateStatsHelperSql, "generate-stats-helper-sql", "", "Generates a SQL script for the given server (name of section in the config file, or \"default\" for env variables), that can be run with \"psql -f\" for installing the collector stats helpers on all configured databases") + flag.StringVar(&generateHelperExplainAnalyzeSql, "generate-explain-analyze-helper-sql", "", "Generates a SQL script for the given server (name of section in the config file, or \"default\" for env variables), that can be run with \"psql -f\" for installing the collector pganalyze.explain_analyze helper on all configured databases") + flag.StringVar(&generateHelperExplainAnalyzeRole, "generate-explain-analyze-helper-role", "pganalyze_explain", "Sets owner role of the pganalyze.explain_analyze helper function, defaults to \"pganalyze_explain\"") flag.BoolVar(&reloadRun, "reload", false, "Reloads the collector daemon that's running on the host") flag.BoolVar(&noReload, "no-reload", false, "Disables automatic config reloading during a test run") flag.BoolVarP(&logger.Verbose, "verbose", "v", false, "Outputs additional debugging information, use this if you're encountering errors or other problems") @@ -145,33 +149,35 @@ func main() { } } - if testRunLogs || testRunAndTrace || testExplain || generateStatsHelperSql != "" { + if testRunLogs || testRunAndTrace || testExplain || generateStatsHelperSql != "" || generateHelperExplainAnalyzeSql != "" { testRun = true } globalCollectionOpts := state.CollectionOpts{ - StartedAt: time.Now(), - SubmitCollectedData: !benchmark && true, - TestRun: testRun, - TestRunLogs: testRunLogs || dryRunLogs, - TestExplain: testExplain, - TestSection: testSection, - GenerateStatsHelperSql: generateStatsHelperSql, - DebugLogs: debugLogs, - DiscoverLogLocation: discoverLogLocation, - CollectPostgresRelations: !noPostgresRelations, - CollectPostgresSettings: !noPostgresSettings, - CollectPostgresLocks: !noPostgresLocks, - CollectPostgresFunctions: !noPostgresFunctions, - CollectPostgresBloat: !noPostgresBloat, - CollectPostgresViews: !noPostgresViews, - CollectLogs: !noLogs, - CollectExplain: !noExplain, - CollectSystemInformation: !noSystemInformation, - StateFilename: stateFilename, - WriteStateUpdate: (!dryRun && !dryRunLogs && !testRun) || forceStateUpdate, - ForceEmptyGrant: dryRun || dryRunLogs || testRunLogs || benchmark, - OutputAsJson: !benchmark, + StartedAt: time.Now(), + SubmitCollectedData: !benchmark && true, + TestRun: testRun, + TestRunLogs: testRunLogs || dryRunLogs, + TestExplain: testExplain, + TestSection: testSection, + GenerateStatsHelperSql: generateStatsHelperSql, + GenerateExplainAnalyzeHelperSql: generateHelperExplainAnalyzeSql, + GenerateExplainAnalyzeHelperRole: generateHelperExplainAnalyzeRole, + DebugLogs: debugLogs, + DiscoverLogLocation: discoverLogLocation, + CollectPostgresRelations: !noPostgresRelations, + CollectPostgresSettings: !noPostgresSettings, + CollectPostgresLocks: !noPostgresLocks, + CollectPostgresFunctions: !noPostgresFunctions, + CollectPostgresBloat: !noPostgresBloat, + CollectPostgresViews: !noPostgresViews, + CollectLogs: !noLogs, + CollectExplain: !noExplain, + CollectSystemInformation: !noSystemInformation, + StateFilename: stateFilename, + WriteStateUpdate: (!dryRun && !dryRunLogs && !testRun) || forceStateUpdate, + ForceEmptyGrant: dryRun || dryRunLogs || testRunLogs || benchmark, + OutputAsJson: !benchmark, } if reloadRun && !testRun { diff --git a/output/pganalyze_collector/server_message.pb.go b/output/pganalyze_collector/server_message.pb.go index 972001fe8..ca917bd30 100644 --- a/output/pganalyze_collector/server_message.pb.go +++ b/output/pganalyze_collector/server_message.pb.go @@ -332,10 +332,13 @@ type ServerMessage_QueryRun struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Id int64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` - Type QueryRunType `protobuf:"varint,2,opt,name=type,proto3,enum=pganalyze.collector.QueryRunType" json:"type,omitempty"` - DatabaseName string `protobuf:"bytes,3,opt,name=database_name,json=databaseName,proto3" json:"database_name,omitempty"` - QueryText string `protobuf:"bytes,4,opt,name=query_text,json=queryText,proto3" json:"query_text,omitempty"` + Id int64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` + Type QueryRunType `protobuf:"varint,2,opt,name=type,proto3,enum=pganalyze.collector.QueryRunType" json:"type,omitempty"` + DatabaseName string `protobuf:"bytes,3,opt,name=database_name,json=databaseName,proto3" json:"database_name,omitempty"` + QueryText string `protobuf:"bytes,4,opt,name=query_text,json=queryText,proto3" json:"query_text,omitempty"` + QueryParameters []*NullString `protobuf:"bytes,5,rep,name=query_parameters,json=queryParameters,proto3" json:"query_parameters,omitempty"` + QueryParameterTypes []string `protobuf:"bytes,6,rep,name=query_parameter_types,json=queryParameterTypes,proto3" json:"query_parameter_types,omitempty"` + PostgresSettings map[string]string `protobuf:"bytes,7,rep,name=postgres_settings,json=postgresSettings,proto3" json:"postgres_settings,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` } func (x *ServerMessage_QueryRun) Reset() { @@ -398,13 +401,34 @@ func (x *ServerMessage_QueryRun) GetQueryText() string { return "" } +func (x *ServerMessage_QueryRun) GetQueryParameters() []*NullString { + if x != nil { + return x.QueryParameters + } + return nil +} + +func (x *ServerMessage_QueryRun) GetQueryParameterTypes() []string { + if x != nil { + return x.QueryParameterTypes + } + return nil +} + +func (x *ServerMessage_QueryRun) GetPostgresSettings() map[string]string { + if x != nil { + return x.PostgresSettings + } + return nil +} + var File_server_message_proto protoreflect.FileDescriptor var file_server_message_proto_rawDesc = []byte{ 0x0a, 0x14, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x13, 0x70, 0x67, 0x61, 0x6e, 0x61, 0x6c, 0x79, 0x7a, 0x65, 0x2e, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x1a, 0x0c, 0x73, 0x68, 0x61, - 0x72, 0x65, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x8c, 0x07, 0x0a, 0x0d, 0x53, 0x65, + 0x72, 0x65, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xc1, 0x09, 0x0a, 0x0d, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x43, 0x0a, 0x06, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x29, 0x2e, 0x70, 0x67, 0x61, 0x6e, 0x61, 0x6c, 0x79, 0x7a, 0x65, 0x2e, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, @@ -451,7 +475,7 @@ var file_server_message_proto_rawDesc = []byte{ 0x6d, 0x65, 0x6e, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x4d, 0x73, 0x51, 0x75, 0x65, 0x72, 0x79, 0x54, 0x65, 0x78, 0x74, 0x1a, 0x1d, 0x0a, 0x05, 0x50, 0x61, 0x75, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x70, 0x61, 0x75, 0x73, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, - 0x70, 0x61, 0x75, 0x73, 0x65, 0x1a, 0x95, 0x01, 0x0a, 0x08, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, + 0x70, 0x61, 0x75, 0x73, 0x65, 0x1a, 0xca, 0x03, 0x0a, 0x08, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x75, 0x6e, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x02, 0x69, 0x64, 0x12, 0x35, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x21, 0x2e, 0x70, 0x67, 0x61, 0x6e, 0x61, 0x6c, 0x79, 0x7a, 0x65, 0x2e, 0x63, 0x6f, 0x6c, @@ -460,12 +484,32 @@ var file_server_message_proto_rawDesc = []byte{ 0x61, 0x62, 0x61, 0x73, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x71, 0x75, 0x65, 0x72, 0x79, 0x5f, 0x74, 0x65, 0x78, 0x74, 0x18, 0x04, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x09, 0x71, 0x75, 0x65, 0x72, 0x79, 0x54, 0x65, 0x78, 0x74, 0x42, 0x09, 0x0a, - 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x42, 0x3b, 0x5a, 0x39, 0x67, 0x69, 0x74, 0x68, - 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x70, 0x67, 0x61, 0x6e, 0x61, 0x6c, 0x79, 0x7a, 0x65, - 0x2f, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2f, 0x6f, 0x75, 0x74, 0x70, 0x75, - 0x74, 0x2f, 0x70, 0x67, 0x61, 0x6e, 0x61, 0x6c, 0x79, 0x7a, 0x65, 0x5f, 0x63, 0x6f, 0x6c, 0x6c, - 0x65, 0x63, 0x74, 0x6f, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x28, 0x09, 0x52, 0x09, 0x71, 0x75, 0x65, 0x72, 0x79, 0x54, 0x65, 0x78, 0x74, 0x12, 0x4a, 0x0a, + 0x10, 0x71, 0x75, 0x65, 0x72, 0x79, 0x5f, 0x70, 0x61, 0x72, 0x61, 0x6d, 0x65, 0x74, 0x65, 0x72, + 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x70, 0x67, 0x61, 0x6e, 0x61, 0x6c, + 0x79, 0x7a, 0x65, 0x2e, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x4e, 0x75, + 0x6c, 0x6c, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x52, 0x0f, 0x71, 0x75, 0x65, 0x72, 0x79, 0x50, + 0x61, 0x72, 0x61, 0x6d, 0x65, 0x74, 0x65, 0x72, 0x73, 0x12, 0x32, 0x0a, 0x15, 0x71, 0x75, 0x65, + 0x72, 0x79, 0x5f, 0x70, 0x61, 0x72, 0x61, 0x6d, 0x65, 0x74, 0x65, 0x72, 0x5f, 0x74, 0x79, 0x70, + 0x65, 0x73, 0x18, 0x06, 0x20, 0x03, 0x28, 0x09, 0x52, 0x13, 0x71, 0x75, 0x65, 0x72, 0x79, 0x50, + 0x61, 0x72, 0x61, 0x6d, 0x65, 0x74, 0x65, 0x72, 0x54, 0x79, 0x70, 0x65, 0x73, 0x12, 0x6e, 0x0a, + 0x11, 0x70, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73, 0x5f, 0x73, 0x65, 0x74, 0x74, 0x69, 0x6e, + 0x67, 0x73, 0x18, 0x07, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x41, 0x2e, 0x70, 0x67, 0x61, 0x6e, 0x61, + 0x6c, 0x79, 0x7a, 0x65, 0x2e, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x53, + 0x65, 0x72, 0x76, 0x65, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x51, 0x75, 0x65, + 0x72, 0x79, 0x52, 0x75, 0x6e, 0x2e, 0x50, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73, 0x53, 0x65, + 0x74, 0x74, 0x69, 0x6e, 0x67, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x10, 0x70, 0x6f, 0x73, + 0x74, 0x67, 0x72, 0x65, 0x73, 0x53, 0x65, 0x74, 0x74, 0x69, 0x6e, 0x67, 0x73, 0x1a, 0x43, 0x0a, + 0x15, 0x50, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73, 0x53, 0x65, 0x74, 0x74, 0x69, 0x6e, 0x67, + 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, + 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, + 0x38, 0x01, 0x42, 0x09, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x42, 0x3b, 0x5a, + 0x39, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x70, 0x67, 0x61, 0x6e, + 0x61, 0x6c, 0x79, 0x7a, 0x65, 0x2f, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2f, + 0x6f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x2f, 0x70, 0x67, 0x61, 0x6e, 0x61, 0x6c, 0x79, 0x7a, 0x65, + 0x5f, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, } var ( @@ -480,26 +524,30 @@ func file_server_message_proto_rawDescGZIP() []byte { return file_server_message_proto_rawDescData } -var file_server_message_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_server_message_proto_msgTypes = make([]protoimpl.MessageInfo, 6) var file_server_message_proto_goTypes = []interface{}{ (*ServerMessage)(nil), // 0: pganalyze.collector.ServerMessage (*ServerMessage_Config)(nil), // 1: pganalyze.collector.ServerMessage.Config (*ServerMessage_Features)(nil), // 2: pganalyze.collector.ServerMessage.Features (*ServerMessage_Pause)(nil), // 3: pganalyze.collector.ServerMessage.Pause (*ServerMessage_QueryRun)(nil), // 4: pganalyze.collector.ServerMessage.QueryRun - (QueryRunType)(0), // 5: pganalyze.collector.QueryRunType + nil, // 5: pganalyze.collector.ServerMessage.QueryRun.PostgresSettingsEntry + (QueryRunType)(0), // 6: pganalyze.collector.QueryRunType + (*NullString)(nil), // 7: pganalyze.collector.NullString } var file_server_message_proto_depIdxs = []int32{ 1, // 0: pganalyze.collector.ServerMessage.config:type_name -> pganalyze.collector.ServerMessage.Config 3, // 1: pganalyze.collector.ServerMessage.pause:type_name -> pganalyze.collector.ServerMessage.Pause 4, // 2: pganalyze.collector.ServerMessage.query_run:type_name -> pganalyze.collector.ServerMessage.QueryRun 2, // 3: pganalyze.collector.ServerMessage.Config.features:type_name -> pganalyze.collector.ServerMessage.Features - 5, // 4: pganalyze.collector.ServerMessage.QueryRun.type:type_name -> pganalyze.collector.QueryRunType - 5, // [5:5] is the sub-list for method output_type - 5, // [5:5] is the sub-list for method input_type - 5, // [5:5] is the sub-list for extension type_name - 5, // [5:5] is the sub-list for extension extendee - 0, // [0:5] is the sub-list for field type_name + 6, // 4: pganalyze.collector.ServerMessage.QueryRun.type:type_name -> pganalyze.collector.QueryRunType + 7, // 5: pganalyze.collector.ServerMessage.QueryRun.query_parameters:type_name -> pganalyze.collector.NullString + 5, // 6: pganalyze.collector.ServerMessage.QueryRun.postgres_settings:type_name -> pganalyze.collector.ServerMessage.QueryRun.PostgresSettingsEntry + 7, // [7:7] is the sub-list for method output_type + 7, // [7:7] is the sub-list for method input_type + 7, // [7:7] is the sub-list for extension type_name + 7, // [7:7] is the sub-list for extension extendee + 0, // [0:7] is the sub-list for field type_name } func init() { file_server_message_proto_init() } @@ -581,7 +629,7 @@ func file_server_message_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_server_message_proto_rawDesc, NumEnums: 0, - NumMessages: 5, + NumMessages: 6, NumExtensions: 0, NumServices: 0, }, diff --git a/protobuf/server_message.proto b/protobuf/server_message.proto index 3c5fd2ece..e7451e01f 100644 --- a/protobuf/server_message.proto +++ b/protobuf/server_message.proto @@ -44,5 +44,8 @@ message ServerMessage { QueryRunType type = 2; string database_name = 3; string query_text = 4; + repeated NullString query_parameters = 5; + repeated string query_parameter_types = 6; + map postgres_settings = 7; } } diff --git a/runner/generate_helper_sql.go b/runner/generate_helper_sql.go index 13ecedad0..e33fc16fc 100644 --- a/runner/generate_helper_sql.go +++ b/runner/generate_helper_sql.go @@ -68,3 +68,36 @@ func GenerateStatsHelperSql(ctx context.Context, server *state.Server, globalCol return output.String(), nil } + +func GenerateExplainAnalyzeHelperSql(ctx context.Context, server *state.Server, globalCollectionOpts state.CollectionOpts, logger *util.Logger) (string, error) { + db, err := postgres.EstablishConnection(ctx, server, logger, globalCollectionOpts, "") + if err != nil { + return "", err + } + defer db.Close() + + version, err := postgres.GetPostgresVersion(ctx, logger, db) + if err != nil { + return "", fmt.Errorf("error collecting Postgres version: %s", err) + } + + databases, _, err := postgres.GetDatabases(ctx, logger, db, version) + if err != nil { + return "", fmt.Errorf("error collecting pg_databases: %s", err) + } + + output := strings.Builder{} + for _, dbName := range postgres.GetDatabasesToCollect(server, databases) { + output.WriteString(fmt.Sprintf("\\c %s\n", pq.QuoteIdentifier(dbName))) + output.WriteString("CREATE SCHEMA IF NOT EXISTS pganalyze;\n") + output.WriteString(fmt.Sprintf("GRANT USAGE ON SCHEMA pganalyze TO %s;\n", pq.QuoteIdentifier(server.Config.GetDbUsername()))) + output.WriteString(fmt.Sprintf("GRANT CREATE ON SCHEMA pganalyze TO %s;\n", pq.QuoteIdentifier(globalCollectionOpts.GenerateExplainAnalyzeHelperRole))) + output.WriteString(fmt.Sprintf("SET ROLE %s;\n", pq.QuoteIdentifier(globalCollectionOpts.GenerateExplainAnalyzeHelperRole))) + output.WriteString(util.ExplainAnalyzeHelper + "\n") + output.WriteString("RESET ROLE;\n") + output.WriteString(fmt.Sprintf("REVOKE CREATE ON SCHEMA pganalyze FROM %s;\n", pq.QuoteIdentifier(globalCollectionOpts.GenerateExplainAnalyzeHelperRole))) + output.WriteString("\n") + } + + return output.String(), nil +} diff --git a/runner/query_run.go b/runner/query_run.go index 5fd8c7c6f..8cd8ed1da 100644 --- a/runner/query_run.go +++ b/runner/query_run.go @@ -4,9 +4,9 @@ import ( "context" "errors" "fmt" - "strings" "time" + "github.com/lib/pq" "github.com/pganalyze/collector/input/postgres" "github.com/pganalyze/collector/output" "github.com/pganalyze/collector/output/pganalyze_collector" @@ -41,26 +41,16 @@ func SetupQueryRunnerForAllServers(ctx context.Context, servers []*state.Server, func run(ctx context.Context, server *state.Server, collectionOpts state.CollectionOpts, logger *util.Logger) { for id, query := range server.QueryRuns { - var firstErr error if !query.FinishedAt.IsZero() { continue } + server.QueryRunsMutex.Lock() server.QueryRuns[id].StartedAt = time.Now() server.QueryRunsMutex.Unlock() logger.PrintVerbose("Query run %d starting: %s", query.Id, query.QueryText) - db, err := postgres.EstablishConnection(ctx, server, logger, collectionOpts, query.DatabaseName) - if err != nil { - server.QueryRunsMutex.Lock() - server.QueryRuns[id].FinishedAt = time.Now() - server.QueryRuns[id].Error = err.Error() - server.QueryRunsMutex.Unlock() - continue - } - defer db.Close() - - err = postgres.SetStatementTimeout(ctx, db, 60*1000) + result, err := runQueryOnDatabase(ctx, server, collectionOpts, logger, id, query) if err != nil { server.QueryRunsMutex.Lock() server.QueryRuns[id].FinishedAt = time.Now() @@ -69,55 +59,9 @@ func run(ctx context.Context, server *state.Server, collectionOpts state.Collect continue } - pid := 0 - err = db.QueryRow(postgres.QueryMarkerSQL + "SELECT pg_backend_pid()").Scan(&pid) - if err == nil { - server.QueryRunsMutex.Lock() - server.QueryRuns[id].BackendPid = pid - server.QueryRunsMutex.Unlock() - } else { - server.QueryRunsMutex.Lock() - server.QueryRuns[id].FinishedAt = time.Now() - server.QueryRuns[id].Error = err.Error() - server.QueryRunsMutex.Unlock() - continue - } - - // We don't include QueryMarkerSQL so query runs are reported separately in pganalyze - comment := fmt.Sprintf("/* pganalyze:no-alert,pganalyze-query-run:%d */ ", query.Id) - result := "" - - if query.Type == pganalyze_collector.QueryRunType_EXPLAIN { - sql := "BEGIN; EXPLAIN (ANALYZE, VERBOSE, BUFFERS, FORMAT JSON) " + comment + query.QueryText + "; ROLLBACK" - err = db.QueryRowContext(ctx, sql).Scan(&result) - firstErr = err - - // Run EXPLAIN ANALYZE a second time to get a warm cache result - err = db.QueryRowContext(ctx, sql).Scan(&result) - - // If the first run failed and the second run succeeded, run once more to get a warm cache result - if err == nil && firstErr != nil { - err = db.QueryRowContext(ctx, sql).Scan(&result) - } - - // If it timed out, capture a non-ANALYZE EXPLAIN instead - if err != nil && strings.Contains(err.Error(), "statement timeout") { - sql = "BEGIN; EXPLAIN (VERBOSE, FORMAT JSON) " + comment + query.QueryText + "; ROLLBACK" - err = db.QueryRowContext(ctx, sql).Scan(&result) - } - } else { - err = errors.New("Unhandled query run type") - logger.PrintVerbose("Unhandled query run type %d for %d", query.Type, query.Id) - } - server.QueryRunsMutex.Lock() server.QueryRuns[id].FinishedAt = time.Now() server.QueryRuns[id].Result = result - if firstErr != nil { - server.QueryRuns[id].Error = firstErr.Error() - } else if err != nil { - server.QueryRuns[id].Error = err.Error() - } server.QueryRunsMutex.Unlock() // Activity snapshots will eventually send the query run result, but to reduce latency @@ -126,6 +70,51 @@ func run(ctx context.Context, server *state.Server, collectionOpts state.Collect } } +func runQueryOnDatabase(ctx context.Context, server *state.Server, collectionOpts state.CollectionOpts, logger *util.Logger, id int64, query *state.QueryRun) (string, error) { + if query.Type != pganalyze_collector.QueryRunType_EXPLAIN { + logger.PrintVerbose("Unhandled query run type %d for %d", query.Type, query.Id) + return "", errors.New("Unhandled query run type") + } + + db, err := postgres.EstablishConnection(ctx, server, logger, collectionOpts, query.DatabaseName) + if err != nil { + return "", err + } + defer db.Close() + + if postgres.StatsHelperExists(ctx, db, "explain_analyze") { + logger.PrintVerbose("Found pganalyze.explain_analyze helper function in database \"%s\"", query.DatabaseName) + } else { + return "", fmt.Errorf("Required helper function pganalyze.explain_analyze is not set up") + } + + pid := 0 + err = db.QueryRow(postgres.QueryMarkerSQL + "SELECT pg_backend_pid()").Scan(&pid) + if err != nil { + return "", err + } + server.QueryRunsMutex.Lock() + server.QueryRuns[id].BackendPid = pid + server.QueryRunsMutex.Unlock() + + for name, value := range query.PostgresSettings { + _, err = db.ExecContext(ctx, postgres.QueryMarkerSQL+fmt.Sprintf("SET %s = %s", pq.QuoteIdentifier(name), pq.QuoteLiteral(value))) + if err != nil { + return "", err + } + } + + err = postgres.SetStatementTimeout(ctx, db, 60*1000) + if err != nil { + return "", err + } + + // We don't include QueryMarkerSQL so query runs are reported separately in pganalyze + marker := fmt.Sprintf("/* pganalyze:no-alert,pganalyze-query-run:%d */ ", query.Id) + + return postgres.RunExplainAnalyzeForQueryRun(ctx, db, query.QueryText, query.QueryParameters, query.QueryParameterTypes, marker) +} + // Removes old query runs that have finished func cleanup(server *state.Server) { server.QueryRunsMutex.Lock() diff --git a/runner/run.go b/runner/run.go index a087e64a9..d8e86ea26 100644 --- a/runner/run.go +++ b/runner/run.go @@ -130,6 +130,34 @@ func Run(ctx context.Context, wg *sync.WaitGroup, globalCollectionOpts state.Col return } + if globalCollectionOpts.GenerateExplainAnalyzeHelperSql != "" { + wg.Add(1) + testRunSuccess = make(chan bool) + go func() { + var matchingServer *state.Server + for _, server := range servers { + if globalCollectionOpts.GenerateExplainAnalyzeHelperSql == server.Config.SectionName { + matchingServer = server + } + } + if matchingServer == nil { + fmt.Fprintf(os.Stderr, "ERROR - Specified configuration section name '%s' not known\n", globalCollectionOpts.GenerateExplainAnalyzeHelperSql) + testRunSuccess <- false + } else { + output, err := GenerateExplainAnalyzeHelperSql(ctx, matchingServer, globalCollectionOpts, logger.WithPrefix(matchingServer.Config.SectionName)) + if err != nil { + fmt.Fprintf(os.Stderr, "ERROR - %s\n", err) + testRunSuccess <- false + } else { + fmt.Print(output) + testRunSuccess <- true + } + } + wg.Done() + }() + return + } + state.ReadStateFile(servers, globalCollectionOpts, logger) writeStateFile = func() { diff --git a/runner/websocket.go b/runner/websocket.go index 09e8ae3cf..da1c3a66a 100644 --- a/runner/websocket.go +++ b/runner/websocket.go @@ -11,6 +11,7 @@ import ( "time" "github.com/gorilla/websocket" + "github.com/guregu/null" "github.com/pganalyze/collector/output/pganalyze_collector" "github.com/pganalyze/collector/state" "github.com/pganalyze/collector/util" @@ -142,11 +143,18 @@ func connect(ctx context.Context, server *state.Server, globalCollectionOpts sta logger.PrintVerbose("Query run %d received: %s", q.Id, q.QueryText) server.QueryRunsMutex.Lock() if _, exists := server.QueryRuns[q.Id]; !exists { + parameters := []null.String{} + for _, p := range q.QueryParameters { + parameters = append(parameters, null.NewString(p.Value, p.Valid)) + } server.QueryRuns[q.Id] = &state.QueryRun{ - Id: q.Id, - Type: q.Type, - DatabaseName: q.DatabaseName, - QueryText: q.QueryText, + Id: q.Id, + Type: q.Type, + DatabaseName: q.DatabaseName, + QueryText: q.QueryText, + QueryParameters: parameters, + QueryParameterTypes: q.QueryParameterTypes, + PostgresSettings: q.PostgresSettings, } } server.QueryRunsMutex.Unlock() diff --git a/state/state.go b/state/state.go index 79a07cf57..ef76c364d 100644 --- a/state/state.go +++ b/state/state.go @@ -8,6 +8,7 @@ import ( raven "github.com/getsentry/raven-go" "github.com/gorilla/websocket" + "github.com/guregu/null" "github.com/pganalyze/collector/config" "github.com/pganalyze/collector/output/pganalyze_collector" ) @@ -212,14 +213,16 @@ type CollectionOpts struct { DiffStatements bool - SubmitCollectedData bool - TestRun bool - TestRunLogs bool - TestExplain bool - TestSection string - GenerateStatsHelperSql string - DebugLogs bool - DiscoverLogLocation bool + SubmitCollectedData bool + TestRun bool + TestRunLogs bool + TestExplain bool + TestSection string + GenerateStatsHelperSql string + GenerateExplainAnalyzeHelperSql string + GenerateExplainAnalyzeHelperRole string + DebugLogs bool + DiscoverLogLocation bool StateFilename string WriteStateUpdate bool @@ -253,15 +256,18 @@ type CollectionStatus struct { } type QueryRun struct { - Id int64 - Type pganalyze_collector.QueryRunType - DatabaseName string - QueryText string - Result string - Error string - StartedAt time.Time - FinishedAt time.Time - BackendPid int + Id int64 + Type pganalyze_collector.QueryRunType + DatabaseName string + QueryText string + QueryParameters []null.String + QueryParameterTypes []string + PostgresSettings map[string]string + Result string + Error string + StartedAt time.Time + FinishedAt time.Time + BackendPid int } type Server struct { diff --git a/util/explain_analyze_helper.sql b/util/explain_analyze_helper.sql new file mode 100644 index 000000000..d2837aae1 --- /dev/null +++ b/util/explain_analyze_helper.sql @@ -0,0 +1,46 @@ +CREATE OR REPLACE FUNCTION pganalyze.explain_analyze(query text, params text[], param_types text[], analyze_flags text[]) RETURNS text AS $$ +DECLARE + prepared_query text; + params_str text; + param_types_str text; + explain_prefix text; + explain_flag text; + result text; +BEGIN + SET TRANSACTION READ ONLY; + + PERFORM 1 FROM pg_roles WHERE (rolname = current_user AND rolsuper) OR (pg_has_role(oid, 'MEMBER') AND rolname IN ('rds_superuser', 'azure_pg_admin', 'cloudsqlsuperuser')); + IF FOUND THEN + RAISE EXCEPTION 'cannot run: pganalyze.explain_analyze helper is owned by superuser - recreate function with lesser privileged user'; + END IF; + + SELECT pg_catalog.regexp_replace(query, ';+\s*\Z', '') INTO prepared_query; + IF prepared_query LIKE '%;%' THEN + RAISE EXCEPTION 'cannot run pganalyze.explain_analyze helper with a multi-statement query'; + END IF; + + explain_prefix := 'EXPLAIN (VERBOSE, FORMAT JSON'; + FOR explain_flag IN SELECT * FROM unnest(analyze_flags) + LOOP + IF explain_flag NOT SIMILAR TO '[A-z_ ]+' THEN + RAISE EXCEPTION 'cannot run pganalyze.explain_analyze helper with invalid flag'; + END IF; + explain_prefix := explain_prefix || ', ' || explain_flag; + END LOOP; + explain_prefix := explain_prefix || ') '; + + SELECT COALESCE('(' || pg_catalog.string_agg(pg_catalog.quote_literal(p), ',') || ')', '') FROM pg_catalog.unnest(params) _(p) INTO params_str; + SELECT COALESCE('(' || pg_catalog.string_agg(pg_catalog.quote_ident(p), ',') || ')', '') FROM pg_catalog.unnest(param_types) _(p) INTO param_types_str; + + EXECUTE 'PREPARE pganalyze_explain_analyze ' || param_types_str || ' AS ' || prepared_query; + BEGIN + EXECUTE explain_prefix || 'EXECUTE pganalyze_explain_analyze' || params_str INTO STRICT result; + EXCEPTION WHEN QUERY_CANCELED OR OTHERS THEN + DEALLOCATE pganalyze_explain_analyze; + RAISE; + END; + DEALLOCATE pganalyze_explain_analyze; + + RETURN result; +END +$$ LANGUAGE plpgsql VOLATILE SECURITY DEFINER; diff --git a/util/sql_helpers.go b/util/sql_helpers.go new file mode 100644 index 000000000..710354526 --- /dev/null +++ b/util/sql_helpers.go @@ -0,0 +1,8 @@ +package util + +import ( + _ "embed" +) + +//go:embed explain_analyze_helper.sql +var ExplainAnalyzeHelper string