From 3601f62ebaaa4e4cf8192fd509d16c9468da9ecd Mon Sep 17 00:00:00 2001 From: Lukas Fittl Date: Tue, 2 Jun 2020 01:07:06 -0700 Subject: [PATCH] Split out activity and log snapshot states and mutexes This improves consistency of when these snapshots run, because they no longer conflict with each other, or the full snapshot run. Note that for now, the state file does not contain the activity and log states. This could be implemented in the future, as long as its ensured that there is a write mutex when writing the state file, as this could happen concurrently now. --- input/logs.go | 4 +-- main.go | 4 +-- output/compact_activity.go | 2 +- output/transform/activity.go | 6 ++-- runner/activity.go | 14 ++++----- runner/full.go | 55 +-------------------------------- runner/logs.go | 30 +++++++++--------- state/activity.go | 6 +++- state/state.go | 16 ++++++---- state/state_file.go | 59 ++++++++++++++++++++++++++++++++++++ 10 files changed, 105 insertions(+), 91 deletions(-) create mode 100644 state/state_file.go diff --git a/input/logs.go b/input/logs.go index c2693bddf..1f6b79508 100644 --- a/input/logs.go +++ b/input/logs.go @@ -11,11 +11,11 @@ import ( ) // DownloadLogs - Downloads a "logs" snapshot of log data we need on a regular interval -func DownloadLogs(server state.Server, connection *sql.DB, collectionOpts state.CollectionOpts, logger *util.Logger) (tls state.TransientLogState, pls state.PersistedLogState, err error) { +func DownloadLogs(server state.Server, prevLogState state.PersistedLogState, connection *sql.DB, collectionOpts state.CollectionOpts, logger *util.Logger) (tls state.TransientLogState, pls state.PersistedLogState, err error) { var querySamples []state.PostgresQuerySample tls.CollectedAt = time.Now() - pls, tls.LogFiles, querySamples = system.DownloadLogFiles(server.PrevState.Log, server.Config, logger) + pls, tls.LogFiles, querySamples = system.DownloadLogFiles(prevLogState, server.Config, logger) if server.Config.EnableLogExplain && connection != nil { tls.QuerySamples = postgres.RunExplain(connection, server.Config.GetDbName(), querySamples) diff --git a/main.go b/main.go index 15b5e8f3c..907317ddb 100644 --- a/main.go +++ b/main.go @@ -69,7 +69,7 @@ func run(ctx context.Context, wg *sync.WaitGroup, globalCollectionOpts state.Col serverConfigs := conf.Servers for _, config := range serverConfigs { - servers = append(servers, state.Server{Config: config, StateMutex: &sync.Mutex{}}) + servers = append(servers, state.Server{Config: config, StateMutex: &sync.Mutex{}, LogStateMutex: &sync.Mutex{}, ActivityStateMutex: &sync.Mutex{}}) if config.EnableReports { hasAnyReportsEnabled = true } @@ -87,7 +87,7 @@ func run(ctx context.Context, wg *sync.WaitGroup, globalCollectionOpts state.Col } } - runner.ReadStateFile(servers, globalCollectionOpts, logger) + state.ReadStateFile(servers, globalCollectionOpts, logger) // We intentionally don't do a test-run in the normal mode, since we're fine with // a later SIGHUP that fixes the config (or a temporarily unreachable server at start) diff --git a/output/compact_activity.go b/output/compact_activity.go index b789d278d..8c322524a 100644 --- a/output/compact_activity.go +++ b/output/compact_activity.go @@ -8,7 +8,7 @@ import ( "github.com/pganalyze/collector/util" ) -func SubmitCompactActivitySnapshot(server state.Server, grant state.Grant, collectionOpts state.CollectionOpts, logger *util.Logger, activityState state.ActivityState) error { +func SubmitCompactActivitySnapshot(server state.Server, grant state.Grant, collectionOpts state.CollectionOpts, logger *util.Logger, activityState state.TransientActivityState) error { as, r := transform.ActivityStateToCompactActivitySnapshot(server, activityState) if server.Config.FilterQuerySample == "all" { diff --git a/output/transform/activity.go b/output/transform/activity.go index 0f5153588..42ed672f3 100644 --- a/output/transform/activity.go +++ b/output/transform/activity.go @@ -6,12 +6,12 @@ import ( "github.com/pganalyze/collector/state" ) -func ActivityStateToCompactActivitySnapshot(server state.Server, activityState state.ActivityState) (snapshot.CompactActivitySnapshot, snapshot.CompactSnapshot_BaseRefs) { +func ActivityStateToCompactActivitySnapshot(server state.Server, activityState state.TransientActivityState) (snapshot.CompactActivitySnapshot, snapshot.CompactSnapshot_BaseRefs) { var s snapshot.CompactActivitySnapshot var r snapshot.CompactSnapshot_BaseRefs - if !server.PrevState.ActivitySnapshotAt.IsZero() { - s.PrevActivitySnapshotAt, _ = ptypes.TimestampProto(server.PrevState.ActivitySnapshotAt) + if !server.ActivityPrevState.ActivitySnapshotAt.IsZero() { + s.PrevActivitySnapshotAt, _ = ptypes.TimestampProto(server.ActivityPrevState.ActivitySnapshotAt) } for _, backend := range activityState.Backends { diff --git a/runner/activity.go b/runner/activity.go index 50051fb94..c3cb8baca 100644 --- a/runner/activity.go +++ b/runner/activity.go @@ -14,13 +14,13 @@ import ( "github.com/pkg/errors" ) -func processActivityForServer(server state.Server, globalCollectionOpts state.CollectionOpts, logger *util.Logger) (state.PersistedState, bool, error) { +func processActivityForServer(server state.Server, globalCollectionOpts state.CollectionOpts, logger *util.Logger) (state.PersistedActivityState, bool, error) { var newGrant state.Grant var err error var connection *sql.DB - var activity state.ActivityState + var activity state.TransientActivityState - newState := server.PrevState + newState := server.ActivityPrevState if !globalCollectionOpts.ForceEmptyGrant { newGrant, err = grant.GetDefaultGrant(server, globalCollectionOpts, logger) @@ -94,18 +94,18 @@ func CollectActivityFromAllServers(servers []state.Server, globalCollectionOpts prefixedLogger.PrintInfo("Testing activity snapshots...") } - server.StateMutex.Lock() + server.ActivityStateMutex.Lock() newState, success, err := processActivityForServer(*server, globalCollectionOpts, prefixedLogger) if err != nil { - server.StateMutex.Unlock() + server.ActivityStateMutex.Unlock() allSuccessful = false prefixedLogger.PrintError("Could not collect activity for server: %s", err) if server.Config.ErrorCallback != "" { go runCompletionCallback("error", server.Config.ErrorCallback, server.Config.SectionName, "activity", err, prefixedLogger) } } else { - server.PrevState = newState - server.StateMutex.Unlock() + server.ActivityPrevState = newState + server.ActivityStateMutex.Unlock() if success && server.Config.SuccessCallback != "" { go runCompletionCallback("success", server.Config.SuccessCallback, server.Config.SectionName, "activity", nil, prefixedLogger) } diff --git a/runner/full.go b/runner/full.go index 40ffbcf40..0567c8ab8 100644 --- a/runner/full.go +++ b/runner/full.go @@ -2,16 +2,13 @@ package runner import ( "database/sql" - "encoding/gob" "fmt" - "os" "os/exec" "runtime/debug" "sync" "time" raven "github.com/getsentry/raven-go" - "github.com/pganalyze/collector/config" "github.com/pganalyze/collector/grant" "github.com/pganalyze/collector/input" "github.com/pganalyze/collector/input/postgres" @@ -124,56 +121,6 @@ func processDatabase(server state.Server, globalCollectionOpts state.CollectionO return newState, newGrant, err } -func writeStateFile(servers []state.Server, globalCollectionOpts state.CollectionOpts, logger *util.Logger) { - stateOnDisk := state.StateOnDisk{PrevStateByServer: make(map[config.ServerIdentifier]state.PersistedState), FormatVersion: state.StateOnDiskFormatVersion} - - for _, server := range servers { - stateOnDisk.PrevStateByServer[server.Config.Identifier] = server.PrevState - } - - file, err := os.Create(globalCollectionOpts.StateFilename) - if err != nil { - logger.PrintWarning("Could not write out state file to %s because of error: %s", globalCollectionOpts.StateFilename, err) - return - } - defer file.Close() - - encoder := gob.NewEncoder(file) - encoder.Encode(stateOnDisk) -} - -// ReadStateFile - This reads in the prevState structs from the state file - only run this on initial bootup and SIGHUP! -func ReadStateFile(servers []state.Server, globalCollectionOpts state.CollectionOpts, logger *util.Logger) { - var stateOnDisk state.StateOnDisk - - file, err := os.Open(globalCollectionOpts.StateFilename) - if err != nil { - logger.PrintVerbose("Did not open state file: %s", err) - return - } - decoder := gob.NewDecoder(file) - err = decoder.Decode(&stateOnDisk) - if err != nil { - logger.PrintVerbose("Could not decode state file: %s", err) - return - } - defer file.Close() - - if stateOnDisk.FormatVersion < state.StateOnDiskFormatVersion { - logger.PrintVerbose("Ignoring state file since the on-disk format has changed") - return - } - - for idx, server := range servers { - prevState, exist := stateOnDisk.PrevStateByServer[server.Config.Identifier] - if exist { - prefixedLogger := logger.WithPrefix(server.Config.SectionName) - prefixedLogger.PrintVerbose("Successfully recovered state from on-disk file") - servers[idx].PrevState = prevState - } - } -} - func runCompletionCallback(callbackType string, callbackCmd string, sectionName string, snapshotType string, errIn error, logger *util.Logger) { cmd := exec.Command("bash", "-c", callbackCmd) cmd.Env = append(cmd.Env, "PGA_CALLBACK_TYPE="+callbackType) @@ -236,7 +183,7 @@ func CollectAllServers(servers []state.Server, globalCollectionOpts state.Collec wg.Wait() if globalCollectionOpts.WriteStateUpdate { - writeStateFile(servers, globalCollectionOpts, logger) + state.WriteStateFile(servers, globalCollectionOpts, logger) } return diff --git a/runner/logs.go b/runner/logs.go index 28882a311..47247f769 100644 --- a/runner/logs.go +++ b/runner/logs.go @@ -15,12 +15,12 @@ import ( "github.com/pkg/errors" ) -func downloadLogsForServer(server state.Server, globalCollectionOpts state.CollectionOpts, logger *util.Logger) (state.PersistedState, bool, error) { - newState := server.PrevState +func downloadLogsForServer(server state.Server, globalCollectionOpts state.CollectionOpts, logger *util.Logger) (state.PersistedLogState, bool, error) { + newLogState := server.LogPrevState grant, err := grant.GetLogsGrant(server, globalCollectionOpts, logger) if err != nil { - return newState, false, errors.Wrap(err, "could not get log grant") + return newLogState, false, errors.Wrap(err, "could not get log grant") } if !grant.Valid { @@ -29,35 +29,35 @@ func downloadLogsForServer(server state.Server, globalCollectionOpts state.Colle } else { logger.PrintVerbose("Log collection disabled by pganalyze, skipping") } - return newState, false, nil + return newLogState, false, nil } var connection *sql.DB if server.Config.EnableLogExplain { connection, err = postgres.EstablishConnection(server, logger, globalCollectionOpts, "") if err != nil { - return newState, false, errors.Wrap(err, "failed to connect to database") + return newLogState, false, errors.Wrap(err, "failed to connect to database") } defer connection.Close() } - transientLogState, persistedLogState, err := input.DownloadLogs(server, connection, globalCollectionOpts, logger) + transientLogState, persistedLogState, err := input.DownloadLogs(server, server.LogPrevState, connection, globalCollectionOpts, logger) if err != nil { transientLogState.Cleanup() - return newState, false, errors.Wrap(err, "could not collect logs") + return newLogState, false, errors.Wrap(err, "could not collect logs") } - newState.Log = persistedLogState + newLogState = persistedLogState err = output.UploadAndSendLogs(server, grant, globalCollectionOpts, logger, transientLogState) if err != nil { transientLogState.Cleanup() - return newState, false, errors.Wrap(err, "failed to upload/send logs") + return newLogState, false, errors.Wrap(err, "failed to upload/send logs") } transientLogState.Cleanup() - return newState, true, nil + return newLogState, true, nil } // TestLogsForAllServers - Test log download/tailing @@ -126,17 +126,17 @@ func DownloadLogsFromAllServers(servers []state.Server, globalCollectionOpts sta go func(server *state.Server) { prefixedLogger := logger.WithPrefixAndRememberErrors(server.Config.SectionName) - server.StateMutex.Lock() - newState, success, err := downloadLogsForServer(*server, globalCollectionOpts, prefixedLogger) + server.LogStateMutex.Lock() + newLogState, success, err := downloadLogsForServer(*server, globalCollectionOpts, prefixedLogger) if err != nil { - server.StateMutex.Unlock() + server.LogStateMutex.Unlock() prefixedLogger.PrintError("Could not collect logs for server: %s", err) if server.Config.ErrorCallback != "" { go runCompletionCallback("error", server.Config.ErrorCallback, server.Config.SectionName, "logs", err, prefixedLogger) } } else if success { - server.PrevState = newState - server.StateMutex.Unlock() + server.LogPrevState = newLogState + server.LogStateMutex.Unlock() if server.Config.SuccessCallback != "" { go runCompletionCallback("success", server.Config.SuccessCallback, server.Config.SectionName, "logs", nil, prefixedLogger) } diff --git a/state/activity.go b/state/activity.go index 33a2ac446..f7ca42cd3 100644 --- a/state/activity.go +++ b/state/activity.go @@ -2,7 +2,7 @@ package state import "time" -type ActivityState struct { +type TransientActivityState struct { CollectedAt time.Time Version PostgresVersion @@ -10,3 +10,7 @@ type ActivityState struct { Vacuums []PostgresVacuumProgress } + +type PersistedActivityState struct { + ActivitySnapshotAt time.Time +} \ No newline at end of file diff --git a/state/state.go b/state/state.go index fdd89d3b2..8db082e22 100644 --- a/state/state.go +++ b/state/state.go @@ -12,8 +12,6 @@ import ( type PersistedState struct { CollectedAt time.Time - ActivitySnapshotAt time.Time - StatementStats PostgresStatementStatsMap RelationStats PostgresRelationStatsMap IndexStats PostgresIndexStatsMap @@ -24,7 +22,6 @@ type PersistedState struct { System SystemState CollectorStats CollectorStats - Log PersistedLogState // Incremented every run, indicates whether we should run a pg_stat_statements_reset() // on behalf of the user. Only activates once it reaches GrantFeatures.StatementReset, @@ -78,7 +75,7 @@ type DiffState struct { } // StateOnDiskFormatVersion - Increment this when an old state preserved to disk should be ignored -const StateOnDiskFormatVersion = 2 +const StateOnDiskFormatVersion = 3 type StateOnDisk struct { FormatVersion uint @@ -151,8 +148,15 @@ type GrantS3 struct { type Server struct { Config config.ServerConfig - PrevState PersistedState - StateMutex *sync.Mutex RequestedSslMode string Grant Grant + + PrevState PersistedState + StateMutex *sync.Mutex + + LogPrevState PersistedLogState + LogStateMutex *sync.Mutex + + ActivityPrevState PersistedActivityState + ActivityStateMutex *sync.Mutex } diff --git a/state/state_file.go b/state/state_file.go new file mode 100644 index 000000000..6bb1192be --- /dev/null +++ b/state/state_file.go @@ -0,0 +1,59 @@ +package state + +import ( + "encoding/gob" + "os" + + "github.com/pganalyze/collector/config" + "github.com/pganalyze/collector/util" +) + +func WriteStateFile(servers []Server, globalCollectionOpts CollectionOpts, logger *util.Logger) { + stateOnDisk := StateOnDisk{PrevStateByServer: make(map[config.ServerIdentifier]PersistedState), FormatVersion: StateOnDiskFormatVersion} + + for _, server := range servers { + stateOnDisk.PrevStateByServer[server.Config.Identifier] = server.PrevState + } + + file, err := os.Create(globalCollectionOpts.StateFilename) + if err != nil { + logger.PrintWarning("Could not write out state file to %s because of error: %s", globalCollectionOpts.StateFilename, err) + return + } + defer file.Close() + + encoder := gob.NewEncoder(file) + encoder.Encode(stateOnDisk) +} + +// ReadStateFile - This reads in the prevState structs from the state file - only run this on initial bootup and SIGHUP! +func ReadStateFile(servers []Server, globalCollectionOpts CollectionOpts, logger *util.Logger) { + var stateOnDisk StateOnDisk + + file, err := os.Open(globalCollectionOpts.StateFilename) + if err != nil { + logger.PrintVerbose("Did not open state file: %s", err) + return + } + decoder := gob.NewDecoder(file) + err = decoder.Decode(&stateOnDisk) + if err != nil { + logger.PrintVerbose("Could not decode state file: %s", err) + return + } + defer file.Close() + + if stateOnDisk.FormatVersion < StateOnDiskFormatVersion { + logger.PrintVerbose("Ignoring state file since the on-disk format has changed") + return + } + + for idx, server := range servers { + prevState, exist := stateOnDisk.PrevStateByServer[server.Config.Identifier] + if exist { + prefixedLogger := logger.WithPrefix(server.Config.SectionName) + prefixedLogger.PrintVerbose("Successfully recovered state from on-disk file") + servers[idx].PrevState = prevState + } + } +}