Skip to content

Commit

Permalink
Add support for Google AlloyDB for PostgreSQL (#302)
Browse files Browse the repository at this point in the history
This adds two new configuration options for configuring a particular AlloyDB instance:

* gcp_alloydb_cluster_id / GCP_ALLOYDB_CLUSTER_ID
* gcp_alloydb_instance_id / GCP_ALLOYDB_INSTANCE_ID

Additionally, the gcp_project_id / GCP_PROJECT_ID has to be set as well.

Instead of going through the regular Cloud SQL log parsing, this utilizes
special logic that removes the source filename and line, which are
added by AlloyDB but not by standard Postgres.

In passing, accomodate special autovacuum log syntax used by AlloyDB.
Note there may be other log events with similar customizations we don't handle yet.
  • Loading branch information
lfittl authored Jul 29, 2022
1 parent 8064871 commit 878802b
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 31 deletions.
6 changes: 3 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,13 @@ type ServerConfig struct {
AzureADCertificatePath string `ini:"azure_ad_certificate_path"`
AzureADCertificatePassword string `ini:"azure_ad_certificate_password"`

GcpProjectID string `ini:"gcp_project_id"` // Optional for CloudSQL (you can pass the full "Connection name" as the instance ID)
GcpCloudSQLInstanceID string `ini:"gcp_cloudsql_instance_id"`
GcpAlloyDBClusterID string `ini:"gcp_alloydb_cluster_id"`
GcpAlloyDBInstanceID string `ini:"gcp_alloydb_instance_id"`
GcpPubsubSubscription string `ini:"gcp_pubsub_subscription"`
GcpCredentialsFile string `ini:"gcp_credentials_file"`

// Optional, we recommend passing the full "Connection name" as GCP CloudSQL instance ID
GcpProjectID string `ini:"gcp_project_id"`

CrunchyBridgeClusterID string `ini:"crunchy_bridge_cluster_id"`

AivenProjectID string `ini:"aiven_project_id"`
Expand Down
8 changes: 6 additions & 2 deletions config/identify_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,17 @@ func identifySystem(config ServerConfig) (systemID string, systemType string, sy
if systemID == "" {
systemID = config.AzureDbServerName
}
} else if (config.GcpProjectID != "" && config.GcpCloudSQLInstanceID != "") || systemType == "google_cloudsql" {
} else if (config.GcpProjectID != "" && config.GcpCloudSQLInstanceID != "") || (config.GcpProjectID != "" && config.GcpAlloyDBClusterID != "" && config.GcpAlloyDBInstanceID != "") || systemType == "google_cloudsql" {
systemType = "google_cloudsql"
if systemScope == "" {
systemScope = config.GcpProjectID
}
if systemID == "" {
systemID = config.GcpCloudSQLInstanceID
if config.GcpCloudSQLInstanceID != "" {
systemID = config.GcpCloudSQLInstanceID
} else if config.GcpAlloyDBClusterID != "" && config.GcpAlloyDBInstanceID != "" {
systemID = config.GcpAlloyDBClusterID + ":" + config.GcpAlloyDBInstanceID
}
}
} else if (config.CrunchyBridgeClusterID != "") || systemType == "crunchy_bridge" {
systemType = "crunchy_bridge"
Expand Down
6 changes: 6 additions & 0 deletions config/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,12 @@ func getDefaultConfig() *ServerConfig {
if gcpCloudSQLInstanceID := os.Getenv("GCP_CLOUDSQL_INSTANCE_ID"); gcpCloudSQLInstanceID != "" {
config.GcpCloudSQLInstanceID = gcpCloudSQLInstanceID
}
if gcpAlloyDBClusterID := os.Getenv("GCP_ALLOYDB_CLUSTER_ID"); gcpAlloyDBClusterID != "" {
config.GcpAlloyDBClusterID = gcpAlloyDBClusterID
}
if gcpAlloyDBInstanceID := os.Getenv("GCP_ALLOYDB_INSTANCE_ID"); gcpAlloyDBInstanceID != "" {
config.GcpAlloyDBInstanceID = gcpAlloyDBInstanceID
}
if gcpPubsubSubscription := os.Getenv("GCP_PUBSUB_SUBSCRIPTION"); gcpPubsubSubscription != "" {
config.GcpPubsubSubscription = gcpPubsubSubscription
}
Expand Down
73 changes: 56 additions & 17 deletions input/system/google_cloudsql/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"regexp"
"strings"
"sync"
"time"
Expand All @@ -30,11 +31,14 @@ type googleLogMessage struct {
Severity string `json:"severity"`
TextPayload string `json:"textPayload"`
Timestamp string `json:"timestamp"`
Labels map[string]string `json:"labels"`
}

type LogStreamItem struct {
GcpProjectID string
GcpCloudSQLInstanceID string
GcpAlloyDBClusterID string
GcpAlloyDBInstanceID string
OccurredAt time.Time
Content string
}
Expand Down Expand Up @@ -74,25 +78,52 @@ func setupPubSubSubscriber(ctx context.Context, wg *sync.WaitGroup, logger *util
return
}

if msg.Resource.ResourceType != "cloudsql_database" {
return
}
if !strings.HasSuffix(msg.LogName, "postgres.log") {
return
}
databaseID, ok := msg.Resource.Labels["database_id"]
if !ok || strings.Count(databaseID, ":") != 1 {
if msg.Resource.ResourceType == "cloudsql_database" {
if !strings.HasSuffix(msg.LogName, "postgres.log") {
return
}
databaseID, ok := msg.Resource.Labels["database_id"]
if !ok || strings.Count(databaseID, ":") != 1 {
return
}

parts := strings.SplitN(databaseID, ":", 2) // project_id:instance_id
t, _ := time.Parse(time.RFC3339Nano, msg.Timestamp)

gcpLogStream <- LogStreamItem{
GcpProjectID: parts[0],
GcpCloudSQLInstanceID: parts[1],
Content: msg.TextPayload,
OccurredAt: t,
}
return
}
} else if msg.Resource.ResourceType == "alloydb.googleapis.com/Instance" {
if !strings.HasSuffix(msg.LogName, "postgres.log") {
return
}
clusterID, ok := msg.Resource.Labels["cluster_id"]
if !ok {
return
}
instanceID, ok := msg.Resource.Labels["instance_id"]
if !ok {
return
}
projectID, ok := msg.Labels["CONSUMER_PROJECT"]
if !ok {
return
}

parts := strings.SplitN(databaseID, ":", 2) // project_id:instance_id
t, _ := time.Parse(time.RFC3339Nano, msg.Timestamp)
t, _ := time.Parse(time.RFC3339Nano, msg.Timestamp)

gcpLogStream <- LogStreamItem{
GcpProjectID: parts[0],
GcpCloudSQLInstanceID: parts[1],
Content: msg.TextPayload,
OccurredAt: t,
gcpLogStream <- LogStreamItem{
GcpProjectID: projectID,
GcpAlloyDBClusterID: clusterID,
GcpAlloyDBInstanceID: instanceID,
Content: msg.TextPayload,
OccurredAt: t,
}
return
}
})
if err == nil || err == context.Canceled {
Expand Down Expand Up @@ -170,7 +201,15 @@ func setupLogTransformer(ctx context.Context, wg *sync.WaitGroup, servers []*sta
}

for _, server := range servers {
if in.GcpProjectID == server.Config.GcpProjectID && in.GcpCloudSQLInstanceID == server.Config.GcpCloudSQLInstanceID {
if in.GcpProjectID == server.Config.GcpProjectID && in.GcpCloudSQLInstanceID != "" && in.GcpCloudSQLInstanceID == server.Config.GcpCloudSQLInstanceID {
out <- state.ParsedLogStreamItem{Identifier: server.Config.Identifier, LogLine: logLine}
}
if in.GcpProjectID == server.Config.GcpProjectID && in.GcpAlloyDBClusterID != "" && in.GcpAlloyDBClusterID == server.Config.GcpAlloyDBClusterID && in.GcpAlloyDBInstanceID != "" && in.GcpAlloyDBInstanceID == server.Config.GcpAlloyDBInstanceID {
// AlloyDB adds a special [filename:lineno] prefix to all log lines (not part of log_line_prefix)
parts := regexp.MustCompile(`^\[[\w.-]+:\d+\] (.*)`).FindStringSubmatch(string(logLine.Content))
if len(parts) == 2 {
logLine.Content = parts[1]
}
out <- state.ParsedLogStreamItem{Identifier: server.Config.Identifier, LogLine: logLine}
}
}
Expand Down
20 changes: 11 additions & 9 deletions logs/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,17 @@ var autoVacuum = analyzeGroup{
classification: pganalyze_collector.LogLineInformation_AUTOVACUUM_COMPLETED,
primary: match{
prefixes: []string{"automatic vacuum of table", "automatic aggressive vacuum of table", "automatic aggressive vacuum to prevent wraparound of table"},
regexp: regexp.MustCompile(`^automatic (aggressive )?vacuum (to prevent wraparound )?of table "(.+?)": index scans: (\d+)\s*` +
`pages: (\d+) removed, (\d+) remain(?:, (\d+) skipped due to pins)?(?:, (\d+) skipped frozen)?\s*` +
`tuples: (\d+) removed, (\d+) remain, (\d+) are dead but not yet removable(?:, oldest xmin: (\d+))?\s*` +
`(?:index scan (not needed|needed|bypassed|bypassed by failsafe): (\d+) pages from table \(([\d.]+)% of total\) (?:have|had) (\d+) dead item identifiers(?: removed)?)?\s*` + // Postgres 14+
`(?:I/O timings: read: ([\d.]+) ms, write: ([\d.]+) ms)?\s*` + // Postgres 14+
`(?:avg read rate: ([\d.]+) MB/s, avg write rate: ([\d.]+) MB/s)?\s*` + // Postgres 14+
`buffer usage: (\d+) hits, (\d+) misses, (\d+) dirtied\s*` +
`(?:avg read rate: ([\d.]+) MB/s, avg write rate: ([\d.]+) MB/s)?\s*` + // Postgres 13 and older
`(?:WAL usage: (\d+) records, (\d+) full page images, (\d+) bytes)?\s*` + // Postgres 14+
regexp: regexp.MustCompile(`^automatic (aggressive )?vacuum (to prevent wraparound )?of table "(.+?)": index scans: (\d+),?\s*` +
`(?:elapsed time: \d+ \w+, index vacuum time: \d+ \w+,)?\s*` + // Google AlloyDB for PostgreSQL
`pages: (\d+) removed, (\d+) remain(?:, (\d+) skipped due to pins)?(?:, (\d+) skipped frozen)?,?\s*` +
`(?:\d+ skipped using mintxid)?,?\s*` + // Google AlloyDB for PostgreSQL
`tuples: (\d+) removed, (\d+) remain, (\d+) are dead but not yet removable(?:, oldest xmin: (\d+))?,?\s*` +
`(?:index scan (not needed|needed|bypassed|bypassed by failsafe): (\d+) pages from table \(([\d.]+)% of total\) (?:have|had) (\d+) dead item identifiers(?: removed)?)?,?\s*` + // Postgres 14+
`(?:I/O timings: read: ([\d.]+) ms, write: ([\d.]+) ms)?,?\s*` + // Postgres 14+
`(?:avg read rate: ([\d.]+) MB/s, avg write rate: ([\d.]+) MB/s)?,?\s*` + // Postgres 14+
`buffer usage: (\d+) hits, (\d+) misses, (\d+) dirtied,?\s*` +
`(?:avg read rate: ([\d.]+) MB/s, avg write rate: ([\d.]+) MB/s)?,?\s*` + // Postgres 13 and older
`(?:WAL usage: (\d+) records, (\d+) full page images, (\d+) bytes)?,?\s*` + // Postgres 14+
`system usage: CPU(?:(?: ([\d.]+)s/([\d.]+)u sec elapsed ([\d.]+) sec)|(?:: user: ([\d.]+) s, system: ([\d.]+) s, elapsed: ([\d.]+) s))`),
secrets: []state.LogSecretKind{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
},
Expand Down
53 changes: 53 additions & 0 deletions logs/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1568,6 +1568,59 @@ var tests = []testpair{
}},
nil,
},
{
[]state.LogLine{{
Content: "automatic vacuum of table \"alloydbadmin.public.heartbeat\": index scans: 0, elapsed time: 0 s, index vacuum time: 0 ms," +
" pages: 0 removed, 1 remain, 0 skipped due to pins, 0 skipped frozen 0 skipped using mintxid," +
" tuples: 60 removed, 1 remain, 0 are dead but not yet removable, oldest xmin: 1782," +
" index scan not needed: 0 pages from table (0.00% of total) had 0 dead item identifiers removed," +
" I/O timings: read: 0.000 ms, write: 0.000 ms," +
" avg read rate: 0.000 MB/s, avg write rate: 0.000 MB/s," +
" buffer usage: 42 hits, 0 misses, 0 dirtied," +
" WAL usage: 3 records, 0 full page images, 286 bytes," +
" system usage: CPU: user: 0.00 s, system: 0.00 s, elapsed: 0.01 s",
LogLevel: pganalyze_collector.LogLineInformation_LOG,
}},
[]state.LogLine{{
Classification: pganalyze_collector.LogLineInformation_AUTOVACUUM_COMPLETED,
LogLevel: pganalyze_collector.LogLineInformation_LOG,
Database: "alloydbadmin",
SchemaName: "public",
RelationName: "heartbeat",
Details: map[string]interface{}{
"aggressive": false,
"anti_wraparound": false,
"num_index_scans": 0,
"pages_removed": 0,
"rel_pages": 1,
"pinskipped_pages": 0,
"frozenskipped_pages": 0,
"tuples_deleted": 60,
"new_rel_tuples": 1,
"new_dead_tuples": 0,
"oldest_xmin": 1782,
"lpdead_index_scan": "not needed",
"lpdead_item_pages": 0,
"lpdead_item_page_percent": 0,
"lpdead_items": 0,
"blk_read_time": 0,
"blk_write_time": 0,
"read_rate_mb": 0,
"write_rate_mb": 0,
"vacuum_page_hit": 42,
"vacuum_page_miss": 0,
"vacuum_page_dirty": 0,
"wal_records": 3,
"wal_fpi": 0,
"wal_bytes": 286,
"rusage_user": 0.00,
"rusage_kernel": 0.00,
"elapsed_secs": 0.01,
},
ReviewedForSecrets: true,
}},
nil,
},
{
[]state.LogLine{{
Content: "automatic aggressive vacuum of table \"demo_pgbench.public.pgbench_tellers\": index scans: 0" +
Expand Down

0 comments on commit 878802b

Please sign in to comment.