From 878802b60ed1c8d4d596e0e14a3495cc1d7f0c90 Mon Sep 17 00:00:00 2001 From: Lukas Fittl Date: Fri, 29 Jul 2022 15:12:34 -0700 Subject: [PATCH] Add support for Google AlloyDB for PostgreSQL (#302) This adds two new configuration options for configuring a particular AlloyDB instance: * gcp_alloydb_cluster_id / GCP_ALLOYDB_CLUSTER_ID * gcp_alloydb_instance_id / GCP_ALLOYDB_INSTANCE_ID Additionally, the gcp_project_id / GCP_PROJECT_ID has to be set as well. Instead of going through the regular Cloud SQL log parsing, this utilizes special logic that removes the source filename and line, which are added by AlloyDB but not by standard Postgres. In passing, accomodate special autovacuum log syntax used by AlloyDB. Note there may be other log events with similar customizations we don't handle yet. --- config/config.go | 6 +-- config/identify_system.go | 8 ++- config/read.go | 6 +++ input/system/google_cloudsql/logs.go | 73 +++++++++++++++++++++------- logs/analyze.go | 20 ++++---- logs/analyze_test.go | 53 ++++++++++++++++++++ 6 files changed, 135 insertions(+), 31 deletions(-) diff --git a/config/config.go b/config/config.go index 80fde9ae3..45f684776 100644 --- a/config/config.go +++ b/config/config.go @@ -91,13 +91,13 @@ type ServerConfig struct { AzureADCertificatePath string `ini:"azure_ad_certificate_path"` AzureADCertificatePassword string `ini:"azure_ad_certificate_password"` + GcpProjectID string `ini:"gcp_project_id"` // Optional for CloudSQL (you can pass the full "Connection name" as the instance ID) GcpCloudSQLInstanceID string `ini:"gcp_cloudsql_instance_id"` + GcpAlloyDBClusterID string `ini:"gcp_alloydb_cluster_id"` + GcpAlloyDBInstanceID string `ini:"gcp_alloydb_instance_id"` GcpPubsubSubscription string `ini:"gcp_pubsub_subscription"` GcpCredentialsFile string `ini:"gcp_credentials_file"` - // Optional, we recommend passing the full "Connection name" as GCP CloudSQL instance ID - GcpProjectID string `ini:"gcp_project_id"` - CrunchyBridgeClusterID string `ini:"crunchy_bridge_cluster_id"` AivenProjectID string `ini:"aiven_project_id"` diff --git a/config/identify_system.go b/config/identify_system.go index 131620e16..b5be3e56d 100644 --- a/config/identify_system.go +++ b/config/identify_system.go @@ -35,13 +35,17 @@ func identifySystem(config ServerConfig) (systemID string, systemType string, sy if systemID == "" { systemID = config.AzureDbServerName } - } else if (config.GcpProjectID != "" && config.GcpCloudSQLInstanceID != "") || systemType == "google_cloudsql" { + } else if (config.GcpProjectID != "" && config.GcpCloudSQLInstanceID != "") || (config.GcpProjectID != "" && config.GcpAlloyDBClusterID != "" && config.GcpAlloyDBInstanceID != "") || systemType == "google_cloudsql" { systemType = "google_cloudsql" if systemScope == "" { systemScope = config.GcpProjectID } if systemID == "" { - systemID = config.GcpCloudSQLInstanceID + if config.GcpCloudSQLInstanceID != "" { + systemID = config.GcpCloudSQLInstanceID + } else if config.GcpAlloyDBClusterID != "" && config.GcpAlloyDBInstanceID != "" { + systemID = config.GcpAlloyDBClusterID + ":" + config.GcpAlloyDBInstanceID + } } } else if (config.CrunchyBridgeClusterID != "") || systemType == "crunchy_bridge" { systemType = "crunchy_bridge" diff --git a/config/read.go b/config/read.go index 828235a19..b33597594 100644 --- a/config/read.go +++ b/config/read.go @@ -179,6 +179,12 @@ func getDefaultConfig() *ServerConfig { if gcpCloudSQLInstanceID := os.Getenv("GCP_CLOUDSQL_INSTANCE_ID"); gcpCloudSQLInstanceID != "" { config.GcpCloudSQLInstanceID = gcpCloudSQLInstanceID } + if gcpAlloyDBClusterID := os.Getenv("GCP_ALLOYDB_CLUSTER_ID"); gcpAlloyDBClusterID != "" { + config.GcpAlloyDBClusterID = gcpAlloyDBClusterID + } + if gcpAlloyDBInstanceID := os.Getenv("GCP_ALLOYDB_INSTANCE_ID"); gcpAlloyDBInstanceID != "" { + config.GcpAlloyDBInstanceID = gcpAlloyDBInstanceID + } if gcpPubsubSubscription := os.Getenv("GCP_PUBSUB_SUBSCRIPTION"); gcpPubsubSubscription != "" { config.GcpPubsubSubscription = gcpPubsubSubscription } diff --git a/input/system/google_cloudsql/logs.go b/input/system/google_cloudsql/logs.go index 918adb0e0..4f43f8394 100644 --- a/input/system/google_cloudsql/logs.go +++ b/input/system/google_cloudsql/logs.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "regexp" "strings" "sync" "time" @@ -30,11 +31,14 @@ type googleLogMessage struct { Severity string `json:"severity"` TextPayload string `json:"textPayload"` Timestamp string `json:"timestamp"` + Labels map[string]string `json:"labels"` } type LogStreamItem struct { GcpProjectID string GcpCloudSQLInstanceID string + GcpAlloyDBClusterID string + GcpAlloyDBInstanceID string OccurredAt time.Time Content string } @@ -74,25 +78,52 @@ func setupPubSubSubscriber(ctx context.Context, wg *sync.WaitGroup, logger *util return } - if msg.Resource.ResourceType != "cloudsql_database" { - return - } - if !strings.HasSuffix(msg.LogName, "postgres.log") { - return - } - databaseID, ok := msg.Resource.Labels["database_id"] - if !ok || strings.Count(databaseID, ":") != 1 { + if msg.Resource.ResourceType == "cloudsql_database" { + if !strings.HasSuffix(msg.LogName, "postgres.log") { + return + } + databaseID, ok := msg.Resource.Labels["database_id"] + if !ok || strings.Count(databaseID, ":") != 1 { + return + } + + parts := strings.SplitN(databaseID, ":", 2) // project_id:instance_id + t, _ := time.Parse(time.RFC3339Nano, msg.Timestamp) + + gcpLogStream <- LogStreamItem{ + GcpProjectID: parts[0], + GcpCloudSQLInstanceID: parts[1], + Content: msg.TextPayload, + OccurredAt: t, + } return - } + } else if msg.Resource.ResourceType == "alloydb.googleapis.com/Instance" { + if !strings.HasSuffix(msg.LogName, "postgres.log") { + return + } + clusterID, ok := msg.Resource.Labels["cluster_id"] + if !ok { + return + } + instanceID, ok := msg.Resource.Labels["instance_id"] + if !ok { + return + } + projectID, ok := msg.Labels["CONSUMER_PROJECT"] + if !ok { + return + } - parts := strings.SplitN(databaseID, ":", 2) // project_id:instance_id - t, _ := time.Parse(time.RFC3339Nano, msg.Timestamp) + t, _ := time.Parse(time.RFC3339Nano, msg.Timestamp) - gcpLogStream <- LogStreamItem{ - GcpProjectID: parts[0], - GcpCloudSQLInstanceID: parts[1], - Content: msg.TextPayload, - OccurredAt: t, + gcpLogStream <- LogStreamItem{ + GcpProjectID: projectID, + GcpAlloyDBClusterID: clusterID, + GcpAlloyDBInstanceID: instanceID, + Content: msg.TextPayload, + OccurredAt: t, + } + return } }) if err == nil || err == context.Canceled { @@ -170,7 +201,15 @@ func setupLogTransformer(ctx context.Context, wg *sync.WaitGroup, servers []*sta } for _, server := range servers { - if in.GcpProjectID == server.Config.GcpProjectID && in.GcpCloudSQLInstanceID == server.Config.GcpCloudSQLInstanceID { + if in.GcpProjectID == server.Config.GcpProjectID && in.GcpCloudSQLInstanceID != "" && in.GcpCloudSQLInstanceID == server.Config.GcpCloudSQLInstanceID { + out <- state.ParsedLogStreamItem{Identifier: server.Config.Identifier, LogLine: logLine} + } + if in.GcpProjectID == server.Config.GcpProjectID && in.GcpAlloyDBClusterID != "" && in.GcpAlloyDBClusterID == server.Config.GcpAlloyDBClusterID && in.GcpAlloyDBInstanceID != "" && in.GcpAlloyDBInstanceID == server.Config.GcpAlloyDBInstanceID { + // AlloyDB adds a special [filename:lineno] prefix to all log lines (not part of log_line_prefix) + parts := regexp.MustCompile(`^\[[\w.-]+:\d+\] (.*)`).FindStringSubmatch(string(logLine.Content)) + if len(parts) == 2 { + logLine.Content = parts[1] + } out <- state.ParsedLogStreamItem{Identifier: server.Config.Identifier, LogLine: logLine} } } diff --git a/logs/analyze.go b/logs/analyze.go index 7b25aa573..756098544 100644 --- a/logs/analyze.go +++ b/logs/analyze.go @@ -76,15 +76,17 @@ var autoVacuum = analyzeGroup{ classification: pganalyze_collector.LogLineInformation_AUTOVACUUM_COMPLETED, primary: match{ prefixes: []string{"automatic vacuum of table", "automatic aggressive vacuum of table", "automatic aggressive vacuum to prevent wraparound of table"}, - regexp: regexp.MustCompile(`^automatic (aggressive )?vacuum (to prevent wraparound )?of table "(.+?)": index scans: (\d+)\s*` + - `pages: (\d+) removed, (\d+) remain(?:, (\d+) skipped due to pins)?(?:, (\d+) skipped frozen)?\s*` + - `tuples: (\d+) removed, (\d+) remain, (\d+) are dead but not yet removable(?:, oldest xmin: (\d+))?\s*` + - `(?:index scan (not needed|needed|bypassed|bypassed by failsafe): (\d+) pages from table \(([\d.]+)% of total\) (?:have|had) (\d+) dead item identifiers(?: removed)?)?\s*` + // Postgres 14+ - `(?:I/O timings: read: ([\d.]+) ms, write: ([\d.]+) ms)?\s*` + // Postgres 14+ - `(?:avg read rate: ([\d.]+) MB/s, avg write rate: ([\d.]+) MB/s)?\s*` + // Postgres 14+ - `buffer usage: (\d+) hits, (\d+) misses, (\d+) dirtied\s*` + - `(?:avg read rate: ([\d.]+) MB/s, avg write rate: ([\d.]+) MB/s)?\s*` + // Postgres 13 and older - `(?:WAL usage: (\d+) records, (\d+) full page images, (\d+) bytes)?\s*` + // Postgres 14+ + regexp: regexp.MustCompile(`^automatic (aggressive )?vacuum (to prevent wraparound )?of table "(.+?)": index scans: (\d+),?\s*` + + `(?:elapsed time: \d+ \w+, index vacuum time: \d+ \w+,)?\s*` + // Google AlloyDB for PostgreSQL + `pages: (\d+) removed, (\d+) remain(?:, (\d+) skipped due to pins)?(?:, (\d+) skipped frozen)?,?\s*` + + `(?:\d+ skipped using mintxid)?,?\s*` + // Google AlloyDB for PostgreSQL + `tuples: (\d+) removed, (\d+) remain, (\d+) are dead but not yet removable(?:, oldest xmin: (\d+))?,?\s*` + + `(?:index scan (not needed|needed|bypassed|bypassed by failsafe): (\d+) pages from table \(([\d.]+)% of total\) (?:have|had) (\d+) dead item identifiers(?: removed)?)?,?\s*` + // Postgres 14+ + `(?:I/O timings: read: ([\d.]+) ms, write: ([\d.]+) ms)?,?\s*` + // Postgres 14+ + `(?:avg read rate: ([\d.]+) MB/s, avg write rate: ([\d.]+) MB/s)?,?\s*` + // Postgres 14+ + `buffer usage: (\d+) hits, (\d+) misses, (\d+) dirtied,?\s*` + + `(?:avg read rate: ([\d.]+) MB/s, avg write rate: ([\d.]+) MB/s)?,?\s*` + // Postgres 13 and older + `(?:WAL usage: (\d+) records, (\d+) full page images, (\d+) bytes)?,?\s*` + // Postgres 14+ `system usage: CPU(?:(?: ([\d.]+)s/([\d.]+)u sec elapsed ([\d.]+) sec)|(?:: user: ([\d.]+) s, system: ([\d.]+) s, elapsed: ([\d.]+) s))`), secrets: []state.LogSecretKind{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, }, diff --git a/logs/analyze_test.go b/logs/analyze_test.go index 86e24a468..0e5e7a0d5 100644 --- a/logs/analyze_test.go +++ b/logs/analyze_test.go @@ -1568,6 +1568,59 @@ var tests = []testpair{ }}, nil, }, + { + []state.LogLine{{ + Content: "automatic vacuum of table \"alloydbadmin.public.heartbeat\": index scans: 0, elapsed time: 0 s, index vacuum time: 0 ms," + + " pages: 0 removed, 1 remain, 0 skipped due to pins, 0 skipped frozen 0 skipped using mintxid," + + " tuples: 60 removed, 1 remain, 0 are dead but not yet removable, oldest xmin: 1782," + + " index scan not needed: 0 pages from table (0.00% of total) had 0 dead item identifiers removed," + + " I/O timings: read: 0.000 ms, write: 0.000 ms," + + " avg read rate: 0.000 MB/s, avg write rate: 0.000 MB/s," + + " buffer usage: 42 hits, 0 misses, 0 dirtied," + + " WAL usage: 3 records, 0 full page images, 286 bytes," + + " system usage: CPU: user: 0.00 s, system: 0.00 s, elapsed: 0.01 s", + LogLevel: pganalyze_collector.LogLineInformation_LOG, + }}, + []state.LogLine{{ + Classification: pganalyze_collector.LogLineInformation_AUTOVACUUM_COMPLETED, + LogLevel: pganalyze_collector.LogLineInformation_LOG, + Database: "alloydbadmin", + SchemaName: "public", + RelationName: "heartbeat", + Details: map[string]interface{}{ + "aggressive": false, + "anti_wraparound": false, + "num_index_scans": 0, + "pages_removed": 0, + "rel_pages": 1, + "pinskipped_pages": 0, + "frozenskipped_pages": 0, + "tuples_deleted": 60, + "new_rel_tuples": 1, + "new_dead_tuples": 0, + "oldest_xmin": 1782, + "lpdead_index_scan": "not needed", + "lpdead_item_pages": 0, + "lpdead_item_page_percent": 0, + "lpdead_items": 0, + "blk_read_time": 0, + "blk_write_time": 0, + "read_rate_mb": 0, + "write_rate_mb": 0, + "vacuum_page_hit": 42, + "vacuum_page_miss": 0, + "vacuum_page_dirty": 0, + "wal_records": 3, + "wal_fpi": 0, + "wal_bytes": 286, + "rusage_user": 0.00, + "rusage_kernel": 0.00, + "elapsed_secs": 0.01, + }, + ReviewedForSecrets: true, + }}, + nil, + }, { []state.LogLine{{ Content: "automatic aggressive vacuum of table \"demo_pgbench.public.pgbench_tellers\": index scans: 0" +