Skip to content

Commit

Permalink
Store query texts in temp file (#620)
Browse files Browse the repository at this point in the history
  • Loading branch information
seanlinsley authored Oct 23, 2024
1 parent 8b9e229 commit 6b769dc
Showing 1 changed file with 48 additions and 13 deletions.
61 changes: 48 additions & 13 deletions input/postgres/statements.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"database/sql"
"errors"
"fmt"
"io"
"os"
"strings"

"github.com/guregu/null"
Expand Down Expand Up @@ -106,6 +108,7 @@ func GetStatements(ctx context.Context, server *state.Server, logger *util.Logge
var extSchema string
var extMinorVersion int16
var foundExtMinorVersion int16
var tmpFile *os.File

if postgresVersion.Numeric >= state.PostgresVersion17 {
extMinorVersion = 11
Expand Down Expand Up @@ -230,6 +233,18 @@ func GetStatements(ctx context.Context, server *state.Server, logger *util.Logge
statementTextsByFp := make(state.PostgresStatementTextMap)
statementStats := make(state.PostgresStatementStatsMap)

queryKeys := make([]state.PostgresStatementKey, 0)
queryStats := make([]state.PostgresStatementStats, 0)
queryLengths := make([]int, 0)
if showtext {
tmpFile, err = os.CreateTemp("", "")
if err != nil {
return nil, nil, nil, err
}
defer tmpFile.Close()
defer os.Remove(tmpFile.Name())
}

for rows.Next() {
var key state.PostgresStatementKey
var queryID null.Int
Expand All @@ -253,38 +268,58 @@ func GetStatements(ctx context.Context, server *state.Server, logger *util.Logge
}

if showtext {
queryKeys = append(queryKeys, key)
queryStats = append(queryStats, stats)
queryLengths = append(queryLengths, len(receivedQuery.String))
tmpFile.WriteString(receivedQuery.String)
} else {
statementStats[key] = stats
}
}

if err = rows.Err(); err != nil {
return nil, nil, nil, err
}

if showtext {
tmpFile.Seek(0, io.SeekStart)
for idx, length := range queryLengths {
bytes := make([]byte, length)
_, err = io.ReadFull(tmpFile, bytes)
if err != nil {
return nil, nil, nil, err
}
query := string(bytes)
key := queryKeys[idx]
select {
// Since normalizing can take time, explicitly check for cancellations
case <-ctx.Done():
return nil, nil, nil, ctx.Err()
default:
fingerprintAndNormalize(key, receivedQuery.String, server, statements, statementTextsByFp)
fingerprintAndNormalize(key, query, server, statements, statementTextsByFp)
}
stats := queryStats[idx]
if ignoreIOTiming(postgresVersion, query) {
stats.BlkReadTime = 0
stats.BlkWriteTime = 0
}
statementStats[key] = stats
}
if ignoreIOTiming(postgresVersion, receivedQuery) {
stats.BlkReadTime = 0
stats.BlkWriteTime = 0
}
statementStats[key] = stats
}

if err = rows.Err(); err != nil {
return nil, nil, nil, err
}

server.SelfTest.MarkCollectionAspectOk(state.CollectionAspectPgStatStatements)

return statements, statementTextsByFp, statementStats, nil
}

func ignoreIOTiming(postgresVersion state.PostgresVersion, receivedQuery null.String) bool {
func ignoreIOTiming(postgresVersion state.PostgresVersion, receivedQuery string) bool {
// Currently, Aurora gives wildly incorrect blk_read_time and blk_write_time values
// for utility statements; ignore I/O timing in this situation.
if !postgresVersion.IsAwsAurora || !receivedQuery.Valid {
if !postgresVersion.IsAwsAurora || receivedQuery == "" {
return false
}

isUtil, err := util.IsUtilityStmt(receivedQuery.String)
isUtil, err := util.IsUtilityStmt(receivedQuery)
if err != nil {
return false
}
Expand Down

0 comments on commit 6b769dc

Please sign in to comment.