Skip to content

Commit

Permalink
Check and report problematic log collection settings (#95)
Browse files Browse the repository at this point in the history
Some Postgres settings almost always cause a drastic increase in log
volume for little actual benefit. They tend to cause operational problems
for the collector (due to the load of additional log parsing) and the
pganalyze service itself (or indeed, likely for any service that would
process collector snapshots), and do not add any meaningful insights.
Furthermore, we found that these settings are often turned on
accidentally.

To avoid these issues, add some client-side checks in the collector to
disable log processing if any of the problematic settings are on.

The settings in question are:

 * log_min_duration_statement [1] less than 100ms
 * log_min_messages [2] set to 'all'
 * log_duration [3] set to 'on'

If any of these are set to these unsupported values, log collection will be
disabled. The settings are re-checked every full snapshot, and can be
reset with a collector reload.

[1]: https://www.postgresql.org/docs/current/runtime-config-logging.html#GUC-LOG-MIN-DURATION-STATEMENT
[2]: https://www.postgresql.org/docs/current/runtime-config-logging.html#GUC-LOG-MIN-MESSAGES
[3]: https://www.postgresql.org/docs/current/runtime-config-logging.html#GUC-LOG-DURATION
  • Loading branch information
msakrejda authored Oct 20, 2020
1 parent 9d2d78b commit 30e17bf
Show file tree
Hide file tree
Showing 47 changed files with 1,127 additions and 1,028 deletions.
2 changes: 1 addition & 1 deletion grant/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/pganalyze/collector/util"
)

func GetDefaultGrant(server state.Server, globalCollectionOpts state.CollectionOpts, logger *util.Logger) (state.Grant, error) {
func GetDefaultGrant(server *state.Server, globalCollectionOpts state.CollectionOpts, logger *util.Logger) (state.Grant, error) {
req, err := http.NewRequest("GET", server.Config.APIBaseURL+"/v2/snapshots/grant", nil)
if err != nil {
return state.Grant{}, err
Expand Down
2 changes: 1 addition & 1 deletion grant/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/pganalyze/collector/util"
)

func GetLogsGrant(server state.Server, globalCollectionOpts state.CollectionOpts, logger *util.Logger) (state.GrantLogs, error) {
func GetLogsGrant(server *state.Server, globalCollectionOpts state.CollectionOpts, logger *util.Logger) (state.GrantLogs, error) {
req, err := http.NewRequest("GET", server.Config.APIBaseURL+"/v2/snapshots/grant_logs", nil)
if err != nil {
return state.GrantLogs{}, err
Expand Down
61 changes: 61 additions & 0 deletions input/collector_config.go → input/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,72 @@ package input

import (
"os"
"runtime"

"github.com/pganalyze/collector/config"
"github.com/pganalyze/collector/state"
"github.com/pganalyze/collector/util"
"github.com/shirou/gopsutil/host"
"github.com/shirou/gopsutil/process"
)

func getMemoryRssBytes() uint64 {
pid := os.Getpid()

p, err := process.NewProcess(int32(pid))
if err != nil {
return 0
}

mem, err := p.MemoryInfo()
if err != nil {
return 0
}

return mem.RSS
}

func getCollectorStats() state.CollectorStats {
var memStats runtime.MemStats
runtime.ReadMemStats(&memStats)

return state.CollectorStats{
GoVersion: runtime.Version(),
ActiveGoroutines: int32(runtime.NumGoroutine()),
CgoCalls: runtime.NumCgoCall(),
MemoryHeapAllocatedBytes: memStats.HeapAlloc,
MemoryHeapObjects: memStats.HeapObjects,
MemorySystemBytes: memStats.Sys,
MemoryRssBytes: getMemoryRssBytes(),
}
}

func getCollectorPlatform(globalCollectionOpts state.CollectionOpts, logger *util.Logger) state.CollectorPlatform {
hostInfo, err := host.Info()
if err != nil {
if globalCollectionOpts.TestRun {
logger.PrintVerbose("Could not get collector host information: %s", err)
}
return state.CollectorPlatform{}
}

var virtSystem string
if hostInfo.VirtualizationRole == "guest" {
virtSystem = hostInfo.VirtualizationSystem
}
return state.CollectorPlatform{
StartedAt: globalCollectionOpts.StartedAt,
Architecture: runtime.GOARCH,
Hostname: hostInfo.Hostname,
OperatingSystem: hostInfo.OS,
Platform: hostInfo.Platform,
PlatformFamily: hostInfo.PlatformFamily,
PlatformVersion: hostInfo.PlatformVersion,
KernelVersion: hostInfo.KernelVersion,
VirtualizationSystem: virtSystem,
}
}

func getCollectorConfig(c config.ServerConfig) state.CollectorConfig {
return state.CollectorConfig{
SectionName: c.SectionName,
Expand Down
35 changes: 0 additions & 35 deletions input/collector_platform.go

This file was deleted.

40 changes: 0 additions & 40 deletions input/collector_stats.go

This file was deleted.

2 changes: 1 addition & 1 deletion input/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

// CollectFull - Collects a "full" snapshot of all data we need on a regular interval
func CollectFull(server state.Server, connection *sql.DB, globalCollectionOpts state.CollectionOpts, logger *util.Logger) (ps state.PersistedState, ts state.TransientState, err error) {
func CollectFull(server *state.Server, connection *sql.DB, globalCollectionOpts state.CollectionOpts, logger *util.Logger) (ps state.PersistedState, ts state.TransientState, err error) {
systemType := server.Config.SystemType

ps.CollectedAt = time.Now()
Expand Down
2 changes: 1 addition & 1 deletion input/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

// DownloadLogs - Downloads a "logs" snapshot of log data we need on a regular interval
func DownloadLogs(server state.Server, prevLogState state.PersistedLogState, collectionOpts state.CollectionOpts, logger *util.Logger) (tls state.TransientLogState, pls state.PersistedLogState, err error) {
func DownloadLogs(server *state.Server, prevLogState state.PersistedLogState, collectionOpts state.CollectionOpts, logger *util.Logger) (tls state.TransientLogState, pls state.PersistedLogState, err error) {
var querySamples []state.PostgresQuerySample

tls.CollectedAt = time.Now()
Expand Down
6 changes: 3 additions & 3 deletions input/postgres/establish_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/pganalyze/collector/util"
)

func EstablishConnection(server state.Server, logger *util.Logger, globalCollectionOpts state.CollectionOpts, databaseName string) (connection *sql.DB, err error) {
func EstablishConnection(server *state.Server, logger *util.Logger, globalCollectionOpts state.CollectionOpts, databaseName string) (connection *sql.DB, err error) {
connection, err = connectToDb(server.Config, logger, globalCollectionOpts, databaseName)
if err != nil {
if err.Error() == "pq: SSL is not enabled on the server" && (server.Config.DbSslMode == "prefer" || server.Config.DbSslMode == "") {
Expand Down Expand Up @@ -75,7 +75,7 @@ func SetStatementTimeout(connection *sql.DB, statementTimeoutMs int32) {
return
}

func SetDefaultStatementTimeout(connection *sql.DB, logger *util.Logger, server state.Server) {
func SetDefaultStatementTimeout(connection *sql.DB, logger *util.Logger, server *state.Server) {
statementTimeoutMs := server.Grant.Config.Features.StatementTimeoutMs
if statementTimeoutMs == 0 { // Default value
statementTimeoutMs = 30000
Expand All @@ -92,7 +92,7 @@ func SetDefaultStatementTimeout(connection *sql.DB, logger *util.Logger, server
return
}

func SetQueryTextStatementTimeout(connection *sql.DB, logger *util.Logger, server state.Server) {
func SetQueryTextStatementTimeout(connection *sql.DB, logger *util.Logger, server *state.Server) {
queryTextStatementTimeoutMs := server.Grant.Config.Features.StatementTimeoutMsQueryText
if queryTextStatementTimeoutMs == 0 { // Default value
queryTextStatementTimeoutMs = 120000
Expand Down
2 changes: 1 addition & 1 deletion input/postgres/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/pganalyze/collector/util"
)

func RunExplain(server state.Server, inputs []state.PostgresQuerySample, collectionOpts state.CollectionOpts, logger *util.Logger) (outputs []state.PostgresQuerySample) {
func RunExplain(server *state.Server, inputs []state.PostgresQuerySample, collectionOpts state.CollectionOpts, logger *util.Logger) (outputs []state.PostgresQuerySample) {
var samplesByDb = make(map[string]([]state.PostgresQuerySample))

skip := func(sample state.PostgresQuerySample) bool {
Expand Down
2 changes: 1 addition & 1 deletion input/postgres/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ SELECT setting
FROM pg_settings
WHERE name = '%s'`

func GetPostgresSetting(settingName string, server state.Server, globalCollectionOpts state.CollectionOpts, prefixedLogger *util.Logger) (string, error) {
func GetPostgresSetting(settingName string, server *state.Server, globalCollectionOpts state.CollectionOpts, prefixedLogger *util.Logger) (string, error) {
var value string

db, err := EstablishConnection(server, prefixedLogger, globalCollectionOpts, "")
Expand Down
2 changes: 1 addition & 1 deletion input/postgres/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/pganalyze/collector/util"
)

func CollectAllSchemas(server state.Server, collectionOpts state.CollectionOpts, logger *util.Logger, ps state.PersistedState, ts state.TransientState, systemType string) (state.PersistedState, state.TransientState) {
func CollectAllSchemas(server *state.Server, collectionOpts state.CollectionOpts, logger *util.Logger, ps state.PersistedState, ts state.TransientState, systemType string) (state.PersistedState, state.TransientState) {
schemaDbNames := []string{}

if server.Config.DbAllNames {
Expand Down
2 changes: 1 addition & 1 deletion input/postgres/statements.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func ResetStatements(logger *util.Logger, db *sql.DB, systemType string) error {
return nil
}

func GetStatements(server state.Server, logger *util.Logger, db *sql.DB, globalCollectionOpts state.CollectionOpts, postgresVersion state.PostgresVersion, showtext bool, systemType string) (state.PostgresStatementMap, state.PostgresStatementTextMap, state.PostgresStatementStatsMap, error) {
func GetStatements(server *state.Server, logger *util.Logger, db *sql.DB, globalCollectionOpts state.CollectionOpts, postgresVersion state.PostgresVersion, showtext bool, systemType string) (state.PostgresStatementMap, state.PostgresStatementTextMap, state.PostgresStatementStatsMap, error) {
var err error
var totalTimeField string
var optionalFields string
Expand Down
6 changes: 3 additions & 3 deletions input/system/azure/log_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ type AzureEventHubData struct {
Records []AzurePostgresLogRecord `json:"records"`
}

func SetupLogReceiver(ctx context.Context, servers []state.Server, globalCollectionOpts state.CollectionOpts, logger *util.Logger, azureLogStream <-chan AzurePostgresLogRecord) {
func SetupLogReceiver(ctx context.Context, servers []*state.Server, globalCollectionOpts state.CollectionOpts, logger *util.Logger, azureLogStream <-chan AzurePostgresLogRecord) {
logReceiver(ctx, servers, azureLogStream, globalCollectionOpts, logger, nil)
}

func logReceiver(ctx context.Context, servers []state.Server, in <-chan AzurePostgresLogRecord, globalCollectionOpts state.CollectionOpts, logger *util.Logger, logTestSucceeded chan<- bool) {
func logReceiver(ctx context.Context, servers []*state.Server, in <-chan AzurePostgresLogRecord, globalCollectionOpts state.CollectionOpts, logger *util.Logger, logTestSucceeded chan<- bool) {
go func() {
logLinesByServer := make(map[config.ServerIdentifier][]state.LogLine)

Expand Down Expand Up @@ -94,7 +94,7 @@ func logReceiver(ctx context.Context, servers []state.Server, in <-chan AzurePos
case <-timeout:
for identifier := range logLinesByServer {
if len(logLinesByServer[identifier]) > 0 {
server := state.Server{}
server := &state.Server{}
for _, s := range servers {
if s.Config.Identifier == identifier {
server = s
Expand Down
2 changes: 1 addition & 1 deletion input/system/azure/log_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func setupEventHubReceiver(ctx context.Context, wg *sync.WaitGroup, logger *util
return nil
}

func SetupLogSubscriber(ctx context.Context, wg *sync.WaitGroup, globalCollectionOpts state.CollectionOpts, logger *util.Logger, servers []state.Server, azureLogStream chan AzurePostgresLogRecord) error {
func SetupLogSubscriber(ctx context.Context, wg *sync.WaitGroup, globalCollectionOpts state.CollectionOpts, logger *util.Logger, servers []*state.Server, azureLogStream chan AzurePostgresLogRecord) error {
// This map is used to avoid duplicate receivers to the same Azure Event Hub
eventHubReceivers := make(map[string]bool)

Expand Down
4 changes: 2 additions & 2 deletions input/system/azure/test_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ import (
"github.com/pganalyze/collector/util"
)

func LogTestRun(server state.Server, globalCollectionOpts state.CollectionOpts, logger *util.Logger) error {
func LogTestRun(server *state.Server, globalCollectionOpts state.CollectionOpts, logger *util.Logger) error {
cctx, cancel := context.WithCancel(context.Background())

// We're testing one server at a time during the test run for now
servers := []state.Server{server}
servers := []*state.Server{server}

logTestSucceeded := make(chan bool, 1)
azureLogStream := make(chan AzurePostgresLogRecord, 500)
Expand Down
6 changes: 3 additions & 3 deletions input/system/google_cloudsql/log_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ type LogStreamItem struct {
Content string
}

func SetupLogReceiver(ctx context.Context, servers []state.Server, globalCollectionOpts state.CollectionOpts, logger *util.Logger, gcpLogStream <-chan LogStreamItem) {
func SetupLogReceiver(ctx context.Context, servers []*state.Server, globalCollectionOpts state.CollectionOpts, logger *util.Logger, gcpLogStream <-chan LogStreamItem) {
logReceiver(ctx, servers, gcpLogStream, globalCollectionOpts, logger, nil)
}

func logReceiver(ctx context.Context, servers []state.Server, in <-chan LogStreamItem, globalCollectionOpts state.CollectionOpts, logger *util.Logger, logTestSucceeded chan<- bool) {
func logReceiver(ctx context.Context, servers []*state.Server, in <-chan LogStreamItem, globalCollectionOpts state.CollectionOpts, logger *util.Logger, logTestSucceeded chan<- bool) {
go func() {
logLinesByServer := make(map[config.ServerIdentifier][]state.LogLine)

Expand Down Expand Up @@ -70,7 +70,7 @@ func logReceiver(ctx context.Context, servers []state.Server, in <-chan LogStrea
case <-timeout:
for identifier := range logLinesByServer {
if len(logLinesByServer[identifier]) > 0 {
server := state.Server{}
server := &state.Server{}
for _, s := range servers {
if s.Config.Identifier == identifier {
server = s
Expand Down
2 changes: 1 addition & 1 deletion input/system/google_cloudsql/log_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func setupPubSubSubscriber(ctx context.Context, wg *sync.WaitGroup, logger *util
return nil
}

func SetupLogSubscriber(ctx context.Context, wg *sync.WaitGroup, globalCollectionOpts state.CollectionOpts, logger *util.Logger, servers []state.Server, gcpLogStream chan LogStreamItem) error {
func SetupLogSubscriber(ctx context.Context, wg *sync.WaitGroup, globalCollectionOpts state.CollectionOpts, logger *util.Logger, servers []*state.Server, gcpLogStream chan LogStreamItem) error {
// This map is used to avoid duplicate receivers to the same subscriber
gcpPubSubHandlers := make(map[string]bool)

Expand Down
4 changes: 2 additions & 2 deletions input/system/google_cloudsql/test_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ import (
"github.com/pganalyze/collector/util"
)

func LogTestRun(server state.Server, globalCollectionOpts state.CollectionOpts, logger *util.Logger) error {
func LogTestRun(server *state.Server, globalCollectionOpts state.CollectionOpts, logger *util.Logger) error {
cctx, cancel := context.WithCancel(context.Background())

// We're testing one server at a time during the test run for now
servers := []state.Server{server}
servers := []*state.Server{server}

logTestSucceeded := make(chan bool, 1)
gcpLogStream := make(chan LogStreamItem, 500)
Expand Down
Loading

0 comments on commit 30e17bf

Please sign in to comment.