Skip to content

Commit

Permalink
RDS logs: Remember marker from previous runs, to avoid duplicate log …
Browse files Browse the repository at this point in the history
…lines
  • Loading branch information
lfittl committed Jun 1, 2020
1 parent 6498d45 commit ea8a897
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 52 deletions.
11 changes: 5 additions & 6 deletions input/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,16 @@ import (
)

// DownloadLogs - Downloads a "logs" snapshot of log data we need on a regular interval
func DownloadLogs(server state.Server, connection *sql.DB, collectionOpts state.CollectionOpts, logger *util.Logger) (ls state.LogState, err error) {
func DownloadLogs(server state.Server, connection *sql.DB, collectionOpts state.CollectionOpts, logger *util.Logger) (tls state.TransientLogState, pls state.PersistedLogState, err error) {
var querySamples []state.PostgresQuerySample

ls.CollectedAt = time.Now()
ls.LogFiles, querySamples = system.DownloadLogFiles(server.Config, logger)
tls.CollectedAt = time.Now()
pls, tls.LogFiles, querySamples = system.DownloadLogFiles(server.PrevState.Log, server.Config, logger)

// TODO: Correctly pass connection for the logs runner case (on an interval)
if server.Config.EnableLogExplain && connection != nil {
ls.QuerySamples = postgres.RunExplain(connection, server.Config.GetDbName(), querySamples)
tls.QuerySamples = postgres.RunExplain(connection, server.Config.GetDbName(), querySamples)
} else {
ls.QuerySamples = querySamples
tls.QuerySamples = querySamples
}
return
}
18 changes: 16 additions & 2 deletions input/system/rds/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import (
)

// DownloadLogFiles - Gets log files for an Amazon RDS instance
func DownloadLogFiles(config config.ServerConfig, logger *util.Logger) (result []state.LogFile, samples []state.PostgresQuerySample) {
func DownloadLogFiles(prevState state.PersistedLogState, config config.ServerConfig, logger *util.Logger) (psl state.PersistedLogState, result []state.LogFile, samples []state.PostgresQuerySample) {
psl = prevState

sess, err := awsutil.GetAwsSession(config)
if err != nil {
logger.PrintError("Rds/Logs: Encountered error getting session: %v\n", err)
Expand Down Expand Up @@ -46,9 +48,15 @@ func DownloadLogFiles(config config.ServerConfig, logger *util.Logger) (result [
return
}

var newestLogFileTime int64

for _, rdsLogFile := range resp.DescribeDBLogFiles {
var lastMarker *string

if *rdsLogFile.LogFileName == prevState.AwsFilename {
lastMarker = &prevState.AwsMarker
}

var logFile state.LogFile
logFile.UUID = uuid.NewV4()
logFile.TmpFile, err = ioutil.TempFile("", "")
Expand All @@ -67,7 +75,7 @@ func DownloadLogFiles(config config.ServerConfig, logger *util.Logger) (result [
resp, err := rdsSvc.DownloadDBLogFilePortion(&rds.DownloadDBLogFilePortionInput{
DBInstanceIdentifier: instance.DBInstanceIdentifier,
LogFileName: rdsLogFile.LogFileName,
Marker: lastMarker, // This is not set for the initial call, so we only get the most recent lines
Marker: lastMarker, // This is not set for the initial call, so we only get the most recent lines
NumberOfLines: aws.Int64(2000), // This is the effective maximum lines retrieved per run
})

Expand Down Expand Up @@ -107,6 +115,12 @@ func DownloadLogFiles(config config.ServerConfig, logger *util.Logger) (result [
}
}

if *rdsLogFile.LastWritten > newestLogFileTime {
newestLogFileTime = *rdsLogFile.LastWritten
psl.AwsFilename = *rdsLogFile.LogFileName
psl.AwsMarker = *lastMarker
}

result = append(result, logFile)
}

Expand Down
6 changes: 4 additions & 2 deletions input/system/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import (
)

// DownloadLogFiles - Downloads all new log files for the remote system and returns them
func DownloadLogFiles(config config.ServerConfig, logger *util.Logger) (files []state.LogFile, querySamples []state.PostgresQuerySample) {
func DownloadLogFiles(prevState state.PersistedLogState, config config.ServerConfig, logger *util.Logger) (psl state.PersistedLogState, files []state.LogFile, querySamples []state.PostgresQuerySample) {
if config.SystemType == "amazon_rds" {
files, querySamples = rds.DownloadLogFiles(config, logger)
psl, files, querySamples = rds.DownloadLogFiles(prevState, config, logger)
} else {
psl = prevState
}

return
Expand Down
8 changes: 4 additions & 4 deletions logs/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func stitchLogLines(readyLogLines []state.LogLine) (analyzableLogLines []state.L
//
// The caller is expected to keep a repository of "tooFreshLogLines" that they
// can send back in again in the next call, combined with new lines received
func AnalyzeStreamInGroups(logLines []state.LogLine) (state.LogState, state.LogFile, []state.LogLine, error) {
func AnalyzeStreamInGroups(logLines []state.LogLine) (state.TransientLogState, state.LogFile, []state.LogLine, error) {
// Pre-Sort by PID, log line number and occurred at timestamp
//
// Its important we do this early, to support out-of-order receipt of log lines,
Expand All @@ -166,12 +166,12 @@ func AnalyzeStreamInGroups(logLines []state.LogLine) (state.LogState, state.LogF

readyLogLines, tooFreshLogLines := findReadyLogLines(logLines, 3*time.Second)
if len(readyLogLines) == 0 {
return state.LogState{}, state.LogFile{}, tooFreshLogLines, nil
return state.TransientLogState{}, state.LogFile{}, tooFreshLogLines, nil
}

logFile, err := writeTmpLogFile(readyLogLines)
if err != nil {
return state.LogState{}, state.LogFile{}, logLines, err
return state.TransientLogState{}, state.LogFile{}, logLines, err
}

// Ensure that log lines that span multiple lines are already concated together before passing them to analyze
Expand All @@ -180,7 +180,7 @@ func AnalyzeStreamInGroups(logLines []state.LogLine) (state.LogState, state.LogF
// this is required for cases where unknown log lines don't have PIDs associated
analyzableLogLines := stitchLogLines(readyLogLines)

logState := state.LogState{CollectedAt: time.Now()}
logState := state.TransientLogState{CollectedAt: time.Now()}
logFile.LogLines, logState.QuerySamples = handleLogAnalysis(analyzableLogLines)

return logState, logFile, tooFreshLogLines, nil
Expand Down
36 changes: 18 additions & 18 deletions logs/stream/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ import (
)

type testpair struct {
logLines []state.LogLine
logState state.LogState
logFile state.LogFile
logFileContent string
tooFreshLogLines []state.LogLine
err error
logLines []state.LogLine
TransientLogState state.TransientLogState
logFile state.LogFile
logFileContent string
tooFreshLogLines []state.LogLine
err error
}

var now = time.Now()
Expand All @@ -31,7 +31,7 @@ var tests = []testpair{
LogLevel: pganalyze_collector.LogLineInformation_LOG,
Content: "duration: 10003.847 ms statement: SELECT pg_sleep(10);\n",
}},
state.LogState{
state.TransientLogState{
QuerySamples: []state.PostgresQuerySample{{
Query: "SELECT pg_sleep(10);",
RuntimeMs: 10003.847,
Expand Down Expand Up @@ -66,7 +66,7 @@ var tests = []testpair{
LogLevel: pganalyze_collector.LogLineInformation_LOG,
Content: "duration: 10003.847 ms statement: SELECT pg_sleep(10);\n",
}},
state.LogState{},
state.TransientLogState{},
state.LogFile{},
"",
[]state.LogLine{{
Expand All @@ -88,7 +88,7 @@ var tests = []testpair{
LogLevel: pganalyze_collector.LogLineInformation_STATEMENT,
Content: "SELECT pg_reload_conf();\n",
}},
state.LogState{},
state.TransientLogState{},
state.LogFile{
LogLines: []state.LogLine{{
CollectedAt: now.Add(-5 * time.Second),
Expand Down Expand Up @@ -121,7 +121,7 @@ var tests = []testpair{
CollectedAt: now,
Content: " );\n",
}},
state.LogState{},
state.TransientLogState{},
state.LogFile{
LogLines: []state.LogLine{{
CollectedAt: now.Add(-5 * time.Second),
Expand All @@ -146,7 +146,7 @@ var tests = []testpair{
CollectedAt: now,
Content: " );\n",
}},
state.LogState{},
state.TransientLogState{},
state.LogFile{
LogLines: []state.LogLine{{
CollectedAt: now.Add(-5 * time.Second),
Expand Down Expand Up @@ -178,7 +178,7 @@ var tests = []testpair{
BackendPid: 42,
Content: "first\n",
}},
state.LogState{},
state.TransientLogState{},
state.LogFile{
LogLines: []state.LogLine{{
CollectedAt: now.Add(-5 * time.Second),
Expand All @@ -204,7 +204,7 @@ var tests = []testpair{

func TestAnalyzeStreamInGroups(t *testing.T) {
for _, pair := range tests {
logState, logFile, tooFreshLogLines, err := stream.AnalyzeStreamInGroups(pair.logLines)
TransientLogState, logFile, tooFreshLogLines, err := stream.AnalyzeStreamInGroups(pair.logLines)
logFileContent := ""
if logFile.TmpFile != nil {
dat, err := ioutil.ReadFile(logFile.TmpFile.Name())
Expand All @@ -214,15 +214,15 @@ func TestAnalyzeStreamInGroups(t *testing.T) {
logFileContent = string(dat)
}

logState.CollectedAt = time.Time{} // Avoid comparing against time.Now()
logFile.TmpFile = nil // Avoid comparing against tempfile
logFile.UUID = uuid.UUID{} // Avoid comparing against a generated UUID
TransientLogState.CollectedAt = time.Time{} // Avoid comparing against time.Now()
logFile.TmpFile = nil // Avoid comparing against tempfile
logFile.UUID = uuid.UUID{} // Avoid comparing against a generated UUID

cfg := pretty.CompareConfig
cfg.SkipZeroFields = true

if diff := cfg.Compare(pair.logState, logState); diff != "" {
t.Errorf("For %v: log state diff: (-want +got)\n%s", pair.logState, diff)
if diff := cfg.Compare(pair.TransientLogState, TransientLogState); diff != "" {
t.Errorf("For %v: log state diff: (-want +got)\n%s", pair.TransientLogState, diff)
}
if diff := cfg.Compare(pair.logFile, logFile); diff != "" {
t.Errorf("For %v: log file diff: (-want +got)\n%s", pair.logFile, diff)
Expand Down
2 changes: 1 addition & 1 deletion output/compact_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

// UploadAndSendLogs - Filters the log file, then uploads it to the storage and sends the metadata to the API
func UploadAndSendLogs(server state.Server, grant state.GrantLogs, collectionOpts state.CollectionOpts, logger *util.Logger, logState state.LogState) error {
func UploadAndSendLogs(server state.Server, grant state.GrantLogs, collectionOpts state.CollectionOpts, logger *util.Logger, logState state.TransientLogState) error {
for idx := range logState.LogFiles {
logState.LogFiles[idx].FilterLogSecret = state.ParseFilterLogSecret(server.Config.FilterLogSecret)
}
Expand Down
6 changes: 3 additions & 3 deletions output/transform/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
uuid "github.com/satori/go.uuid"
)

func LogStateToLogSnapshot(logState state.LogState) (snapshot.CompactLogSnapshot, snapshot.CompactSnapshot_BaseRefs) {
func LogStateToLogSnapshot(logState state.TransientLogState) (snapshot.CompactLogSnapshot, snapshot.CompactSnapshot_BaseRefs) {
var s snapshot.CompactLogSnapshot
var r snapshot.CompactSnapshot_BaseRefs
s, r = transformPostgresQuerySamples(s, r, logState)
Expand Down Expand Up @@ -62,7 +62,7 @@ func upsertRelationReference(refs []*snapshot.RelationReference, databaseIdx int
return idx, refs
}

func transformPostgresQuerySamples(s snapshot.CompactLogSnapshot, r snapshot.CompactSnapshot_BaseRefs, logState state.LogState) (snapshot.CompactLogSnapshot, snapshot.CompactSnapshot_BaseRefs) {
func transformPostgresQuerySamples(s snapshot.CompactLogSnapshot, r snapshot.CompactSnapshot_BaseRefs, logState state.TransientLogState) (snapshot.CompactLogSnapshot, snapshot.CompactSnapshot_BaseRefs) {
for _, sampleIn := range logState.QuerySamples {
occurredAt, _ := ptypes.TimestampProto(sampleIn.OccurredAt)

Expand Down Expand Up @@ -103,7 +103,7 @@ func transformPostgresQuerySamples(s snapshot.CompactLogSnapshot, r snapshot.Com
return s, r
}

func transformSystemLogs(s snapshot.CompactLogSnapshot, r snapshot.CompactSnapshot_BaseRefs, logState state.LogState) (snapshot.CompactLogSnapshot, snapshot.CompactSnapshot_BaseRefs) {
func transformSystemLogs(s snapshot.CompactLogSnapshot, r snapshot.CompactSnapshot_BaseRefs, logState state.TransientLogState) (snapshot.CompactLogSnapshot, snapshot.CompactSnapshot_BaseRefs) {
for _, logFileIn := range logState.LogFiles {
fileIdx := int32(len(s.LogFileReferences))
logFileReference := &snapshot.LogFileReference{
Expand Down
47 changes: 33 additions & 14 deletions runner/logs.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package runner

import (
"database/sql"
"sync"

"github.com/pganalyze/collector/grant"
"github.com/pganalyze/collector/input"
"github.com/pganalyze/collector/input/postgres"
"github.com/pganalyze/collector/input/system/google_cloudsql"
"github.com/pganalyze/collector/input/system/selfhosted"
"github.com/pganalyze/collector/output"
Expand All @@ -13,10 +15,12 @@ import (
"github.com/pkg/errors"
)

func downloadLogsForServer(server state.Server, globalCollectionOpts state.CollectionOpts, logger *util.Logger) (bool, error) {
func downloadLogsForServer(server state.Server, globalCollectionOpts state.CollectionOpts, logger *util.Logger) (state.PersistedState, bool, error) {
newState := server.PrevState

grant, err := grant.GetLogsGrant(server, globalCollectionOpts, logger)
if err != nil {
return false, errors.Wrap(err, "could not get log grant")
return newState, false, errors.Wrap(err, "could not get log grant")
}

if !grant.Valid {
Expand All @@ -25,24 +29,35 @@ func downloadLogsForServer(server state.Server, globalCollectionOpts state.Colle
} else {
logger.PrintVerbose("Log collection disabled by pganalyze, skipping")
}
return false, nil
return newState, false, nil
}

var connection *sql.DB
if server.Config.EnableLogExplain {
connection, err = postgres.EstablishConnection(server, logger, globalCollectionOpts, "")
if err != nil {
return newState, false, errors.Wrap(err, "failed to connect to database")
}

defer connection.Close()
}

// TODO: We'll need to pass a connection here for EXPLAINs to run (or hand them over to the next full snapshot run)
logState, err := input.DownloadLogs(server, nil, globalCollectionOpts, logger)
transientLogState, persistedLogState, err := input.DownloadLogs(server, connection, globalCollectionOpts, logger)
if err != nil {
logState.Cleanup()
return false, errors.Wrap(err, "could not collect logs")
transientLogState.Cleanup()
return newState, false, errors.Wrap(err, "could not collect logs")
}

err = output.UploadAndSendLogs(server, grant, globalCollectionOpts, logger, logState)
newState.Log = persistedLogState

err = output.UploadAndSendLogs(server, grant, globalCollectionOpts, logger, transientLogState)
if err != nil {
logState.Cleanup()
return false, errors.Wrap(err, "failed to upload/send logs")
transientLogState.Cleanup()
return newState, false, errors.Wrap(err, "failed to upload/send logs")
}

logState.Cleanup()
return true, nil
transientLogState.Cleanup()
return newState, true, nil
}

// TestLogsForAllServers - Test log download/tailing
Expand Down Expand Up @@ -79,7 +94,7 @@ func TestLogsForAllServers(servers []state.Server, globalCollectionOpts state.Co
}
} else if server.Config.AwsDbInstanceID != "" {
prefixedLogger.PrintInfo("Testing log collection (RDS)...")
_, err := downloadLogsForServer(server, globalCollectionOpts, prefixedLogger)
_, _, err := downloadLogsForServer(server, globalCollectionOpts, prefixedLogger)
if err != nil {
hasFailedServers = true
prefixedLogger.PrintError("Could not download logs for server: %s", err)
Expand Down Expand Up @@ -111,13 +126,17 @@ func DownloadLogsFromAllServers(servers []state.Server, globalCollectionOpts sta
go func(server *state.Server) {
prefixedLogger := logger.WithPrefixAndRememberErrors(server.Config.SectionName)

success, err := downloadLogsForServer(*server, globalCollectionOpts, prefixedLogger)
server.StateMutex.Lock()
newState, success, err := downloadLogsForServer(*server, globalCollectionOpts, prefixedLogger)
if err != nil {
server.StateMutex.Unlock()
prefixedLogger.PrintError("Could not collect logs for server: %s", err)
if server.Config.ErrorCallback != "" {
go runCompletionCallback("error", server.Config.ErrorCallback, server.Config.SectionName, "logs", err, prefixedLogger)
}
} else if success {
server.PrevState = newState
server.StateMutex.Unlock()
if server.Config.SuccessCallback != "" {
go runCompletionCallback("success", server.Config.SuccessCallback, server.Config.SectionName, "logs", nil, prefixedLogger)
}
Expand Down
9 changes: 7 additions & 2 deletions state/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,18 @@ type GrantLogsEncryptionKey struct {
Plaintext string `json:"plaintext"`
}

type LogState struct {
type TransientLogState struct {
CollectedAt time.Time

LogFiles []LogFile
QuerySamples []PostgresQuerySample
}

type PersistedLogState struct {
AwsFilename string
AwsMarker string
}

// LogFile - Log file that we are uploading for reference in log line metadata
type LogFile struct {
LogLines []LogLine
Expand Down Expand Up @@ -161,7 +166,7 @@ func (logFile LogFile) Cleanup() {
os.Remove(logFile.TmpFile.Name())
}

func (ls LogState) Cleanup() {
func (ls TransientLogState) Cleanup() {
for _, logFile := range ls.LogFiles {
logFile.Cleanup()
}
Expand Down
1 change: 1 addition & 0 deletions state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type PersistedState struct {

System SystemState
CollectorStats CollectorStats
Log PersistedLogState

// Incremented every run, indicates whether we should run a pg_stat_statements_reset()
// on behalf of the user. Only activates once it reaches GrantFeatures.StatementReset,
Expand Down

0 comments on commit ea8a897

Please sign in to comment.