Skip to content

Commit

Permalink
Split out activity and log snapshot states and mutexes
Browse files Browse the repository at this point in the history
This improves consistency of when these snapshots run, because they
no longer conflict with each other, or the full snapshot run.

Note that for now, the state file does not contain the activity and log
states. This could be implemented in the future, as long as its ensured
that there is a write mutex when writing the state file, as this could
happen concurrently now.
  • Loading branch information
lfittl committed Jun 2, 2020
1 parent ea8a897 commit 3601f62
Show file tree
Hide file tree
Showing 10 changed files with 105 additions and 91 deletions.
4 changes: 2 additions & 2 deletions input/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ import (
)

// DownloadLogs - Downloads a "logs" snapshot of log data we need on a regular interval
func DownloadLogs(server state.Server, connection *sql.DB, collectionOpts state.CollectionOpts, logger *util.Logger) (tls state.TransientLogState, pls state.PersistedLogState, err error) {
func DownloadLogs(server state.Server, prevLogState state.PersistedLogState, connection *sql.DB, collectionOpts state.CollectionOpts, logger *util.Logger) (tls state.TransientLogState, pls state.PersistedLogState, err error) {
var querySamples []state.PostgresQuerySample

tls.CollectedAt = time.Now()
pls, tls.LogFiles, querySamples = system.DownloadLogFiles(server.PrevState.Log, server.Config, logger)
pls, tls.LogFiles, querySamples = system.DownloadLogFiles(prevLogState, server.Config, logger)

if server.Config.EnableLogExplain && connection != nil {
tls.QuerySamples = postgres.RunExplain(connection, server.Config.GetDbName(), querySamples)
Expand Down
4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func run(ctx context.Context, wg *sync.WaitGroup, globalCollectionOpts state.Col

serverConfigs := conf.Servers
for _, config := range serverConfigs {
servers = append(servers, state.Server{Config: config, StateMutex: &sync.Mutex{}})
servers = append(servers, state.Server{Config: config, StateMutex: &sync.Mutex{}, LogStateMutex: &sync.Mutex{}, ActivityStateMutex: &sync.Mutex{}})
if config.EnableReports {
hasAnyReportsEnabled = true
}
Expand All @@ -87,7 +87,7 @@ func run(ctx context.Context, wg *sync.WaitGroup, globalCollectionOpts state.Col
}
}

runner.ReadStateFile(servers, globalCollectionOpts, logger)
state.ReadStateFile(servers, globalCollectionOpts, logger)

// We intentionally don't do a test-run in the normal mode, since we're fine with
// a later SIGHUP that fixes the config (or a temporarily unreachable server at start)
Expand Down
2 changes: 1 addition & 1 deletion output/compact_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/pganalyze/collector/util"
)

func SubmitCompactActivitySnapshot(server state.Server, grant state.Grant, collectionOpts state.CollectionOpts, logger *util.Logger, activityState state.ActivityState) error {
func SubmitCompactActivitySnapshot(server state.Server, grant state.Grant, collectionOpts state.CollectionOpts, logger *util.Logger, activityState state.TransientActivityState) error {
as, r := transform.ActivityStateToCompactActivitySnapshot(server, activityState)

if server.Config.FilterQuerySample == "all" {
Expand Down
6 changes: 3 additions & 3 deletions output/transform/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (
"github.com/pganalyze/collector/state"
)

func ActivityStateToCompactActivitySnapshot(server state.Server, activityState state.ActivityState) (snapshot.CompactActivitySnapshot, snapshot.CompactSnapshot_BaseRefs) {
func ActivityStateToCompactActivitySnapshot(server state.Server, activityState state.TransientActivityState) (snapshot.CompactActivitySnapshot, snapshot.CompactSnapshot_BaseRefs) {
var s snapshot.CompactActivitySnapshot
var r snapshot.CompactSnapshot_BaseRefs

if !server.PrevState.ActivitySnapshotAt.IsZero() {
s.PrevActivitySnapshotAt, _ = ptypes.TimestampProto(server.PrevState.ActivitySnapshotAt)
if !server.ActivityPrevState.ActivitySnapshotAt.IsZero() {
s.PrevActivitySnapshotAt, _ = ptypes.TimestampProto(server.ActivityPrevState.ActivitySnapshotAt)
}

for _, backend := range activityState.Backends {
Expand Down
14 changes: 7 additions & 7 deletions runner/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ import (
"github.com/pkg/errors"
)

func processActivityForServer(server state.Server, globalCollectionOpts state.CollectionOpts, logger *util.Logger) (state.PersistedState, bool, error) {
func processActivityForServer(server state.Server, globalCollectionOpts state.CollectionOpts, logger *util.Logger) (state.PersistedActivityState, bool, error) {
var newGrant state.Grant
var err error
var connection *sql.DB
var activity state.ActivityState
var activity state.TransientActivityState

newState := server.PrevState
newState := server.ActivityPrevState

if !globalCollectionOpts.ForceEmptyGrant {
newGrant, err = grant.GetDefaultGrant(server, globalCollectionOpts, logger)
Expand Down Expand Up @@ -94,18 +94,18 @@ func CollectActivityFromAllServers(servers []state.Server, globalCollectionOpts
prefixedLogger.PrintInfo("Testing activity snapshots...")
}

server.StateMutex.Lock()
server.ActivityStateMutex.Lock()
newState, success, err := processActivityForServer(*server, globalCollectionOpts, prefixedLogger)
if err != nil {
server.StateMutex.Unlock()
server.ActivityStateMutex.Unlock()
allSuccessful = false
prefixedLogger.PrintError("Could not collect activity for server: %s", err)
if server.Config.ErrorCallback != "" {
go runCompletionCallback("error", server.Config.ErrorCallback, server.Config.SectionName, "activity", err, prefixedLogger)
}
} else {
server.PrevState = newState
server.StateMutex.Unlock()
server.ActivityPrevState = newState
server.ActivityStateMutex.Unlock()
if success && server.Config.SuccessCallback != "" {
go runCompletionCallback("success", server.Config.SuccessCallback, server.Config.SectionName, "activity", nil, prefixedLogger)
}
Expand Down
55 changes: 1 addition & 54 deletions runner/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,13 @@ package runner

import (
"database/sql"
"encoding/gob"
"fmt"
"os"
"os/exec"
"runtime/debug"
"sync"
"time"

raven "github.com/getsentry/raven-go"
"github.com/pganalyze/collector/config"
"github.com/pganalyze/collector/grant"
"github.com/pganalyze/collector/input"
"github.com/pganalyze/collector/input/postgres"
Expand Down Expand Up @@ -124,56 +121,6 @@ func processDatabase(server state.Server, globalCollectionOpts state.CollectionO
return newState, newGrant, err
}

func writeStateFile(servers []state.Server, globalCollectionOpts state.CollectionOpts, logger *util.Logger) {
stateOnDisk := state.StateOnDisk{PrevStateByServer: make(map[config.ServerIdentifier]state.PersistedState), FormatVersion: state.StateOnDiskFormatVersion}

for _, server := range servers {
stateOnDisk.PrevStateByServer[server.Config.Identifier] = server.PrevState
}

file, err := os.Create(globalCollectionOpts.StateFilename)
if err != nil {
logger.PrintWarning("Could not write out state file to %s because of error: %s", globalCollectionOpts.StateFilename, err)
return
}
defer file.Close()

encoder := gob.NewEncoder(file)
encoder.Encode(stateOnDisk)
}

// ReadStateFile - This reads in the prevState structs from the state file - only run this on initial bootup and SIGHUP!
func ReadStateFile(servers []state.Server, globalCollectionOpts state.CollectionOpts, logger *util.Logger) {
var stateOnDisk state.StateOnDisk

file, err := os.Open(globalCollectionOpts.StateFilename)
if err != nil {
logger.PrintVerbose("Did not open state file: %s", err)
return
}
decoder := gob.NewDecoder(file)
err = decoder.Decode(&stateOnDisk)
if err != nil {
logger.PrintVerbose("Could not decode state file: %s", err)
return
}
defer file.Close()

if stateOnDisk.FormatVersion < state.StateOnDiskFormatVersion {
logger.PrintVerbose("Ignoring state file since the on-disk format has changed")
return
}

for idx, server := range servers {
prevState, exist := stateOnDisk.PrevStateByServer[server.Config.Identifier]
if exist {
prefixedLogger := logger.WithPrefix(server.Config.SectionName)
prefixedLogger.PrintVerbose("Successfully recovered state from on-disk file")
servers[idx].PrevState = prevState
}
}
}

func runCompletionCallback(callbackType string, callbackCmd string, sectionName string, snapshotType string, errIn error, logger *util.Logger) {
cmd := exec.Command("bash", "-c", callbackCmd)
cmd.Env = append(cmd.Env, "PGA_CALLBACK_TYPE="+callbackType)
Expand Down Expand Up @@ -236,7 +183,7 @@ func CollectAllServers(servers []state.Server, globalCollectionOpts state.Collec
wg.Wait()

if globalCollectionOpts.WriteStateUpdate {
writeStateFile(servers, globalCollectionOpts, logger)
state.WriteStateFile(servers, globalCollectionOpts, logger)
}

return
Expand Down
30 changes: 15 additions & 15 deletions runner/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ import (
"github.com/pkg/errors"
)

func downloadLogsForServer(server state.Server, globalCollectionOpts state.CollectionOpts, logger *util.Logger) (state.PersistedState, bool, error) {
newState := server.PrevState
func downloadLogsForServer(server state.Server, globalCollectionOpts state.CollectionOpts, logger *util.Logger) (state.PersistedLogState, bool, error) {
newLogState := server.LogPrevState

grant, err := grant.GetLogsGrant(server, globalCollectionOpts, logger)
if err != nil {
return newState, false, errors.Wrap(err, "could not get log grant")
return newLogState, false, errors.Wrap(err, "could not get log grant")
}

if !grant.Valid {
Expand All @@ -29,35 +29,35 @@ func downloadLogsForServer(server state.Server, globalCollectionOpts state.Colle
} else {
logger.PrintVerbose("Log collection disabled by pganalyze, skipping")
}
return newState, false, nil
return newLogState, false, nil
}

var connection *sql.DB
if server.Config.EnableLogExplain {
connection, err = postgres.EstablishConnection(server, logger, globalCollectionOpts, "")
if err != nil {
return newState, false, errors.Wrap(err, "failed to connect to database")
return newLogState, false, errors.Wrap(err, "failed to connect to database")
}

defer connection.Close()
}

transientLogState, persistedLogState, err := input.DownloadLogs(server, connection, globalCollectionOpts, logger)
transientLogState, persistedLogState, err := input.DownloadLogs(server, server.LogPrevState, connection, globalCollectionOpts, logger)
if err != nil {
transientLogState.Cleanup()
return newState, false, errors.Wrap(err, "could not collect logs")
return newLogState, false, errors.Wrap(err, "could not collect logs")
}

newState.Log = persistedLogState
newLogState = persistedLogState

err = output.UploadAndSendLogs(server, grant, globalCollectionOpts, logger, transientLogState)
if err != nil {
transientLogState.Cleanup()
return newState, false, errors.Wrap(err, "failed to upload/send logs")
return newLogState, false, errors.Wrap(err, "failed to upload/send logs")
}

transientLogState.Cleanup()
return newState, true, nil
return newLogState, true, nil
}

// TestLogsForAllServers - Test log download/tailing
Expand Down Expand Up @@ -126,17 +126,17 @@ func DownloadLogsFromAllServers(servers []state.Server, globalCollectionOpts sta
go func(server *state.Server) {
prefixedLogger := logger.WithPrefixAndRememberErrors(server.Config.SectionName)

server.StateMutex.Lock()
newState, success, err := downloadLogsForServer(*server, globalCollectionOpts, prefixedLogger)
server.LogStateMutex.Lock()
newLogState, success, err := downloadLogsForServer(*server, globalCollectionOpts, prefixedLogger)
if err != nil {
server.StateMutex.Unlock()
server.LogStateMutex.Unlock()
prefixedLogger.PrintError("Could not collect logs for server: %s", err)
if server.Config.ErrorCallback != "" {
go runCompletionCallback("error", server.Config.ErrorCallback, server.Config.SectionName, "logs", err, prefixedLogger)
}
} else if success {
server.PrevState = newState
server.StateMutex.Unlock()
server.LogPrevState = newLogState
server.LogStateMutex.Unlock()
if server.Config.SuccessCallback != "" {
go runCompletionCallback("success", server.Config.SuccessCallback, server.Config.SectionName, "logs", nil, prefixedLogger)
}
Expand Down
6 changes: 5 additions & 1 deletion state/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@ package state

import "time"

type ActivityState struct {
type TransientActivityState struct {
CollectedAt time.Time

Version PostgresVersion
Backends []PostgresBackend

Vacuums []PostgresVacuumProgress
}

type PersistedActivityState struct {
ActivitySnapshotAt time.Time
}
16 changes: 10 additions & 6 deletions state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
type PersistedState struct {
CollectedAt time.Time

ActivitySnapshotAt time.Time

StatementStats PostgresStatementStatsMap
RelationStats PostgresRelationStatsMap
IndexStats PostgresIndexStatsMap
Expand All @@ -24,7 +22,6 @@ type PersistedState struct {

System SystemState
CollectorStats CollectorStats
Log PersistedLogState

// Incremented every run, indicates whether we should run a pg_stat_statements_reset()
// on behalf of the user. Only activates once it reaches GrantFeatures.StatementReset,
Expand Down Expand Up @@ -78,7 +75,7 @@ type DiffState struct {
}

// StateOnDiskFormatVersion - Increment this when an old state preserved to disk should be ignored
const StateOnDiskFormatVersion = 2
const StateOnDiskFormatVersion = 3

type StateOnDisk struct {
FormatVersion uint
Expand Down Expand Up @@ -151,8 +148,15 @@ type GrantS3 struct {

type Server struct {
Config config.ServerConfig
PrevState PersistedState
StateMutex *sync.Mutex
RequestedSslMode string
Grant Grant

PrevState PersistedState
StateMutex *sync.Mutex

LogPrevState PersistedLogState
LogStateMutex *sync.Mutex

ActivityPrevState PersistedActivityState
ActivityStateMutex *sync.Mutex
}
59 changes: 59 additions & 0 deletions state/state_file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package state

import (
"encoding/gob"
"os"

"github.com/pganalyze/collector/config"
"github.com/pganalyze/collector/util"
)

func WriteStateFile(servers []Server, globalCollectionOpts CollectionOpts, logger *util.Logger) {
stateOnDisk := StateOnDisk{PrevStateByServer: make(map[config.ServerIdentifier]PersistedState), FormatVersion: StateOnDiskFormatVersion}

for _, server := range servers {
stateOnDisk.PrevStateByServer[server.Config.Identifier] = server.PrevState
}

file, err := os.Create(globalCollectionOpts.StateFilename)
if err != nil {
logger.PrintWarning("Could not write out state file to %s because of error: %s", globalCollectionOpts.StateFilename, err)
return
}
defer file.Close()

encoder := gob.NewEncoder(file)
encoder.Encode(stateOnDisk)
}

// ReadStateFile - This reads in the prevState structs from the state file - only run this on initial bootup and SIGHUP!
func ReadStateFile(servers []Server, globalCollectionOpts CollectionOpts, logger *util.Logger) {
var stateOnDisk StateOnDisk

file, err := os.Open(globalCollectionOpts.StateFilename)
if err != nil {
logger.PrintVerbose("Did not open state file: %s", err)
return
}
decoder := gob.NewDecoder(file)
err = decoder.Decode(&stateOnDisk)
if err != nil {
logger.PrintVerbose("Could not decode state file: %s", err)
return
}
defer file.Close()

if stateOnDisk.FormatVersion < StateOnDiskFormatVersion {
logger.PrintVerbose("Ignoring state file since the on-disk format has changed")
return
}

for idx, server := range servers {
prevState, exist := stateOnDisk.PrevStateByServer[server.Config.Identifier]
if exist {
prefixedLogger := logger.WithPrefix(server.Config.SectionName)
prefixedLogger.PrintVerbose("Successfully recovered state from on-disk file")
servers[idx].PrevState = prevState
}
}
}

0 comments on commit 3601f62

Please sign in to comment.