From 82fa9933614a41237dbe8800b1bd76a121b6cf11 Mon Sep 17 00:00:00 2001 From: Tommie Gannert Date: Sun, 21 Jun 2020 01:07:59 +0200 Subject: [PATCH 1/2] Splits the log source handling with a pluggable interface. Provides a cleaner split between log sources, specifically for not compiling with systemd libraries. This is in preparation for a new log source to read from Docker. --- logsource.go | 63 ++++++++++++++++ logsource_file.go | 78 ++++++++++++++++++++ logsource_file_test.go | 87 ++++++++++++++++++++++ logsource_systemd.go | 143 ++++++++++++++++++++++++++++++++++++ logsource_systemd_test.go | 150 ++++++++++++++++++++++++++++++++++++++ main.go | 33 +++------ nosystemd.go | 25 ------- postfix_exporter.go | 106 ++++++++------------------- postfix_exporter_test.go | 10 +-- systemd.go | 119 ------------------------------ 10 files changed, 568 insertions(+), 246 deletions(-) create mode 100644 logsource.go create mode 100644 logsource_file.go create mode 100644 logsource_file_test.go create mode 100644 logsource_systemd.go create mode 100644 logsource_systemd_test.go delete mode 100644 nosystemd.go delete mode 100644 systemd.go diff --git a/logsource.go b/logsource.go new file mode 100644 index 0000000..3f8318b --- /dev/null +++ b/logsource.go @@ -0,0 +1,63 @@ +package main + +import ( + "fmt" + "io" + + "github.com/alecthomas/kingpin" +) + +// A LogSourceFactory provides a repository of log sources that can be +// instantiated from command line flags. +type LogSourceFactory interface { + // Init adds the factory's struct fields as flags in the + // application. + Init(*kingpin.Application) + + // New attempts to create a new log source. This is called after + // flags have been parsed. Returning `nil, nil`, means the user + // didn't want this log source. + New() (LogSourceCloser, error) +} + +type LogSourceCloser interface { + io.Closer + LogSource +} + +var logSourceFactories []LogSourceFactory + +// RegisterLogSourceFactory can be called from module `init` functions +// to register factories. +func RegisterLogSourceFactory(lsf LogSourceFactory) { + logSourceFactories = append(logSourceFactories, lsf) +} + +// InitLogSourceFactories runs Init on all factories. The +// initialization order is arbitrary, except `fileLogSourceFactory` is +// always last (the fallback). The file log source must be last since +// it's enabled by default. +func InitLogSourceFactories(app *kingpin.Application) { + RegisterLogSourceFactory(&fileLogSourceFactory{}) + + for _, f := range logSourceFactories { + f.Init(app) + } +} + +// NewLogSourceFromFactories iterates through the factories and +// attempts to instantiate a log source. The first factory to return +// success wins. +func NewLogSourceFromFactories() (LogSourceCloser, error) { + for _, f := range logSourceFactories { + src, err := f.New() + if err != nil { + return nil, err + } + if src != nil { + return src, nil + } + } + + return nil, fmt.Errorf("no log source configured") +} diff --git a/logsource_file.go b/logsource_file.go new file mode 100644 index 0000000..dfe85e8 --- /dev/null +++ b/logsource_file.go @@ -0,0 +1,78 @@ +package main + +import ( + "context" + "io" + "log" + + "github.com/alecthomas/kingpin" + "github.com/hpcloud/tail" +) + +// A FileLogSource can read lines from a file. +type FileLogSource struct { + tailer *tail.Tail +} + +// NewFileLogSource creates a new log source, tailing the given file. +func NewFileLogSource(path string) (*FileLogSource, error) { + tailer, err := tail.TailFile(path, tail.Config{ + ReOpen: true, // reopen the file if it's rotated + MustExist: true, // fail immediately if the file is missing or has incorrect permissions + Follow: true, // run in follow mode + Location: &tail.SeekInfo{Whence: io.SeekEnd}, // seek to end of file + Logger: tail.DiscardingLogger, + }) + if err != nil { + return nil, err + } + return &FileLogSource{tailer}, nil +} + +func (s *FileLogSource) Close() error { + defer s.tailer.Cleanup() + go func() { + // Stop() waits for the tailer goroutine to shut down, but it + // can be blocking on sending on the Lines channel... + for range s.tailer.Lines { + } + }() + return s.tailer.Stop() +} + +func (s *FileLogSource) Path() string { + return s.tailer.Filename +} + +func (s *FileLogSource) Read(ctx context.Context) (string, error) { + select { + case line, ok := <-s.tailer.Lines: + if !ok { + return "", io.EOF + } + return line.Text, nil + case <-ctx.Done(): + return "", ctx.Err() + } +} + +// A fileLogSourceFactory is a factory than can create log sources +// from command line flags. +// +// Because this factory is enabled by default, it must always be +// registered last. +type fileLogSourceFactory struct { + path string +} + +func (f *fileLogSourceFactory) Init(app *kingpin.Application) { + app.Flag("postfix.logfile_path", "Path where Postfix writes log entries.").Default("/var/log/maillog").StringVar(&f.path) +} + +func (f *fileLogSourceFactory) New() (LogSourceCloser, error) { + if f.path == "" { + return nil, nil + } + log.Printf("Reading log events from %s", f.path) + return NewFileLogSource(f.path) +} diff --git a/logsource_file_test.go b/logsource_file_test.go new file mode 100644 index 0000000..673be04 --- /dev/null +++ b/logsource_file_test.go @@ -0,0 +1,87 @@ +package main + +import ( + "context" + "fmt" + "io/ioutil" + "os" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestFileLogSource_Path(t *testing.T) { + path, close, err := setupFakeLogFile() + if err != nil { + t.Fatalf("setupFakeTailer failed: %v", err) + } + defer close() + + src, err := NewFileLogSource(path) + if err != nil { + t.Fatalf("NewFileLogSource failed: %v", err) + } + defer src.Close() + + assert.Equal(t, path, src.Path(), "Path should be set by New.") +} + +func TestFileLogSource_Read(t *testing.T) { + ctx := context.Background() + + path, close, err := setupFakeLogFile() + if err != nil { + t.Fatalf("setupFakeTailer failed: %v", err) + } + defer close() + + src, err := NewFileLogSource(path) + if err != nil { + t.Fatalf("NewFileLogSource failed: %v", err) + } + defer src.Close() + + s, err := src.Read(ctx) + if err != nil { + t.Fatalf("Read failed: %v", err) + } + assert.Equal(t, "Feb 13 23:31:30 ahost anid[123]: aline", s, "Read should get data from the journal entry.") +} + +func setupFakeLogFile() (string, func(), error) { + f, err := ioutil.TempFile("", "filelogsource") + if err != nil { + return "", nil, err + } + + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + defer os.Remove(f.Name()) + defer f.Close() + + for { + // The tailer seeks to the end and then does a + // follow. Keep writing lines so we know it wakes up and + // returns lines. + fmt.Fprintln(f, "Feb 13 23:31:30 ahost anid[123]: aline") + + select { + case <-time.After(10 * time.Millisecond): + // continue + case <-ctx.Done(): + return + } + } + }() + + return f.Name(), func() { + cancel() + wg.Wait() + }, nil +} diff --git a/logsource_systemd.go b/logsource_systemd.go new file mode 100644 index 0000000..76d3be8 --- /dev/null +++ b/logsource_systemd.go @@ -0,0 +1,143 @@ +// +build !nosystemd,linux + +package main + +import ( + "context" + "fmt" + "io" + "log" + "time" + + "github.com/alecthomas/kingpin" + "github.com/coreos/go-systemd/v22/sdjournal" +) + +// timeNow is a test fake injection point. +var timeNow = time.Now + +// A SystemdLogSource reads log records from the given Systemd +// journal. +type SystemdLogSource struct { + journal SystemdJournal + path string +} + +// A SystemdJournal is the journal interface that sdjournal.Journal +// provides. See https://pkg.go.dev/github.com/coreos/go-systemd/sdjournal?tab=doc +type SystemdJournal interface { + io.Closer + AddMatch(match string) error + GetEntry() (*sdjournal.JournalEntry, error) + Next() (uint64, error) + SeekRealtimeUsec(usec uint64) error + Wait(timeout time.Duration) int +} + +// NewSystemdLogSource returns a log source for reading Systemd +// journal entries. `unit` and `slice` provide filtering if non-empty +// (with `slice` taking precedence). +func NewSystemdLogSource(j SystemdJournal, path, unit, slice string) (*SystemdLogSource, error) { + logSrc := &SystemdLogSource{journal: j, path: path} + + var err error + if slice != "" { + err = logSrc.journal.AddMatch("_SYSTEMD_SLICE=" + slice) + } else if unit != "" { + err = logSrc.journal.AddMatch("_SYSTEMD_UNIT=" + unit) + } + if err != nil { + logSrc.journal.Close() + return nil, err + } + + // Start at end of journal + if err := logSrc.journal.SeekRealtimeUsec(uint64(timeNow().UnixNano() / 1000)); err != nil { + logSrc.journal.Close() + return nil, err + } + + if r := logSrc.journal.Wait(1 * time.Second); r < 0 { + logSrc.journal.Close() + return nil, err + } + + return logSrc, nil +} + +func (s *SystemdLogSource) Close() error { + return s.journal.Close() +} + +func (s *SystemdLogSource) Path() string { + return s.path +} + +func (s *SystemdLogSource) Read(ctx context.Context) (string, error) { + c, err := s.journal.Next() + if err != nil { + return "", err + } + if c == 0 { + return "", io.EOF + } + + e, err := s.journal.GetEntry() + if err != nil { + return "", err + } + ts := time.Unix(0, int64(e.RealtimeTimestamp)*int64(time.Microsecond)) + + return fmt.Sprintf( + "%s %s %s[%s]: %s", + ts.Format(time.Stamp), + e.Fields["_HOSTNAME"], + e.Fields["SYSLOG_IDENTIFIER"], + e.Fields["_PID"], + e.Fields["MESSAGE"], + ), nil +} + +// A systemdLogSourceFactory is a factory that can create +// SystemdLogSources from command line flags. +type systemdLogSourceFactory struct { + enable bool + unit, slice, path string +} + +func (f *systemdLogSourceFactory) Init(app *kingpin.Application) { + app.Flag("systemd.enable", "Read from the systemd journal instead of log").Default("false").BoolVar(&f.enable) + app.Flag("systemd.unit", "Name of the Postfix systemd unit.").Default("postfix.service").StringVar(&f.unit) + app.Flag("systemd.slice", "Name of the Postfix systemd slice. Overrides the systemd unit.").Default("").StringVar(&f.slice) + app.Flag("systemd.journal_path", "Path to the systemd journal").Default("").StringVar(&f.path) +} + +func (f *systemdLogSourceFactory) New() (LogSourceCloser, error) { + if !f.enable { + return nil, nil + } + + log.Println("Reading log events from systemd") + j, path, err := newSystemdJournal(f.path) + if err != nil { + return nil, err + } + return NewSystemdLogSource(j, path, f.unit, f.slice) +} + +// newSystemdJournal creates a journal handle. It returns the handle +// and a string representation of it. If `path` is empty, it connects +// to the local journald. +func newSystemdJournal(path string) (*sdjournal.Journal, string, error) { + if path != "" { + j, err := sdjournal.NewJournalFromDir(path) + return j, path, err + } + + j, err := sdjournal.NewJournal() + return j, "journald", err +} + +func init() { + RegisterLogSourceFactory(&systemdLogSourceFactory{}) +} diff --git a/logsource_systemd_test.go b/logsource_systemd_test.go new file mode 100644 index 0000000..a1f9c4a --- /dev/null +++ b/logsource_systemd_test.go @@ -0,0 +1,150 @@ +// +build !nosystemd,linux + +package main + +import ( + "context" + "io" + "os" + "testing" + "time" + + "github.com/coreos/go-systemd/v22/sdjournal" + "github.com/stretchr/testify/assert" +) + +func TestNewSystemdLogSource(t *testing.T) { + j := &fakeSystemdJournal{} + src, err := NewSystemdLogSource(j, "apath", "aunit", "aslice") + if err != nil { + t.Fatalf("NewSystemdLogSource failed: %v", err) + } + + assert.Equal(t, []string{"_SYSTEMD_SLICE=aslice"}, j.addMatchCalls, "A match should be added for slice.") + assert.Equal(t, []uint64{1234567890000000}, j.seekRealtimeUsecCalls, "A call to SeekRealtimeUsec should be made.") + assert.Equal(t, []time.Duration{1 * time.Second}, j.waitCalls, "A call to Wait should be made.") + + if err := src.Close(); err != nil { + t.Fatalf("Close failed: %v", err) + } + + assert.Equal(t, 1, j.closeCalls, "A call to Close should be made.") +} + +func TestSystemdLogSource_Path(t *testing.T) { + j := &fakeSystemdJournal{} + src, err := NewSystemdLogSource(j, "apath", "aunit", "aslice") + if err != nil { + t.Fatalf("NewSystemdLogSource failed: %v", err) + } + defer src.Close() + + assert.Equal(t, "apath", src.Path(), "Path should be set by New.") +} + +func TestSystemdLogSource_Read(t *testing.T) { + ctx := context.Background() + + j := &fakeSystemdJournal{ + getEntryValues: []sdjournal.JournalEntry{ + { + Fields: map[string]string{ + "_HOSTNAME": "ahost", + "SYSLOG_IDENTIFIER": "anid", + "_PID": "123", + "MESSAGE": "aline", + }, + RealtimeTimestamp: 1234567890000000, + }, + }, + nextValues: []uint64{1}, + } + src, err := NewSystemdLogSource(j, "apath", "aunit", "aslice") + if err != nil { + t.Fatalf("NewSystemdLogSource failed: %v", err) + } + defer src.Close() + + s, err := src.Read(ctx) + if err != nil { + t.Fatalf("Read failed: %v", err) + } + assert.Equal(t, "Feb 13 23:31:30 ahost anid[123]: aline", s, "Read should get data from the journal entry.") +} + +func TestSystemdLogSource_ReadEOF(t *testing.T) { + ctx := context.Background() + + j := &fakeSystemdJournal{ + nextValues: []uint64{0}, + } + src, err := NewSystemdLogSource(j, "apath", "aunit", "aslice") + if err != nil { + t.Fatalf("NewSystemdLogSource failed: %v", err) + } + defer src.Close() + + _, err = src.Read(ctx) + assert.Equal(t, io.EOF, err, "Should interpret Next 0 as EOF.") +} + +func TestMain(m *testing.M) { + // We compare Unix timestamps to date strings, so make it deterministic. + os.Setenv("TZ", "UTC") + timeNow = func() time.Time { return time.Date(2009, 2, 13, 23, 31, 30, 0, time.UTC) } + defer func() { + timeNow = time.Now + }() + + os.Exit(m.Run()) +} + +type fakeSystemdJournal struct { + getEntryValues []sdjournal.JournalEntry + getEntryError error + nextValues []uint64 + nextError error + + addMatchCalls []string + closeCalls int + seekRealtimeUsecCalls []uint64 + waitCalls []time.Duration +} + +func (j *fakeSystemdJournal) AddMatch(match string) error { + j.addMatchCalls = append(j.addMatchCalls, match) + return nil +} + +func (j *fakeSystemdJournal) Close() error { + j.closeCalls++ + return nil +} + +func (j *fakeSystemdJournal) GetEntry() (*sdjournal.JournalEntry, error) { + if len(j.getEntryValues) == 0 { + return nil, j.getEntryError + } + e := j.getEntryValues[0] + j.getEntryValues = j.getEntryValues[1:] + return &e, nil +} + +func (j *fakeSystemdJournal) Next() (uint64, error) { + if len(j.nextValues) == 0 { + return 0, j.nextError + } + v := j.nextValues[0] + j.nextValues = j.nextValues[1:] + return v, nil +} + +func (j *fakeSystemdJournal) SeekRealtimeUsec(usec uint64) error { + j.seekRealtimeUsecCalls = append(j.seekRealtimeUsecCalls, usec) + return nil +} + +func (j *fakeSystemdJournal) Wait(timeout time.Duration) int { + j.waitCalls = append(j.waitCalls, timeout) + return 0 +} diff --git a/main.go b/main.go index 2eaa9fc..2e5e8cb 100644 --- a/main.go +++ b/main.go @@ -13,36 +13,25 @@ import ( func main() { var ( - app = kingpin.New("postfix_exporter", "Prometheus metrics exporter for postfix") - listenAddress = app.Flag("web.listen-address", "Address to listen on for web interface and telemetry.").Default(":9154").String() - metricsPath = app.Flag("web.telemetry-path", "Path under which to expose metrics.").Default("/metrics").String() - postfixShowqPath = app.Flag("postfix.showq_path", "Path at which Postfix places its showq socket.").Default("/var/spool/postfix/public/showq").String() - postfixLogfilePath = app.Flag("postfix.logfile_path", "Path where Postfix writes log entries.").Default("/var/log/maillog").String() - logUnsupportedLines = app.Flag("log.unsupported", "Log all unsupported lines.").Bool() - systemdEnable bool - systemdUnit, systemdSlice, systemdJournalPath string + app = kingpin.New("postfix_exporter", "Prometheus metrics exporter for postfix") + listenAddress = app.Flag("web.listen-address", "Address to listen on for web interface and telemetry.").Default(":9154").String() + metricsPath = app.Flag("web.telemetry-path", "Path under which to expose metrics.").Default("/metrics").String() + postfixShowqPath = app.Flag("postfix.showq_path", "Path at which Postfix places its showq socket.").Default("/var/spool/postfix/public/showq").String() + logUnsupportedLines = app.Flag("log.unsupported", "Log all unsupported lines.").Bool() ) - systemdFlags(&systemdEnable, &systemdUnit, &systemdSlice, &systemdJournalPath, app) + InitLogSourceFactories(app) kingpin.MustParse(app.Parse(os.Args[1:])) - var journal *Journal - if systemdEnable { - var err error - journal, err = NewJournal(systemdUnit, systemdSlice, systemdJournalPath) - if err != nil { - log.Fatalf("Error opening systemd journal: %s", err) - } - defer journal.Close() - log.Println("Reading log events from systemd") - } else { - log.Printf("Reading log events from %v", *postfixLogfilePath) + logSrc, err := NewLogSourceFromFactories() + if err != nil { + log.Fatalf("Error opening log source: %s", err) } + defer logSrc.Close() exporter, err := NewPostfixExporter( *postfixShowqPath, - *postfixLogfilePath, - journal, + logSrc, *logUnsupportedLines, ) if err != nil { diff --git a/nosystemd.go b/nosystemd.go deleted file mode 100644 index 901d270..0000000 --- a/nosystemd.go +++ /dev/null @@ -1,25 +0,0 @@ -// +build nosystemd !linux -// This file contains stubs to support non-systemd use - -package main - -import ( - "io" - - "github.com/alecthomas/kingpin" -) - -type Journal struct { - io.Closer - Path string -} - -func systemdFlags(enable *bool, unit, slice, path *string, app *kingpin.Application) {} - -func NewJournal(unit, slice, path string) (*Journal, error) { - return nil, nil -} - -func (e *PostfixExporter) CollectLogfileFromJournal() error { - return nil -} diff --git a/postfix_exporter.go b/postfix_exporter.go index b9683b1..226542d 100644 --- a/postfix_exporter.go +++ b/postfix_exporter.go @@ -27,7 +27,6 @@ import ( "strings" "time" - "github.com/hpcloud/tail" "github.com/prometheus/client_golang/prometheus" ) @@ -42,8 +41,7 @@ var ( // Postfix Prometheus metrics exporter across scrapes. type PostfixExporter struct { showqPath string - journal *Journal - tailer *tail.Tail + logSrc LogSource logUnsupportedLines bool // Metrics that should persist after refreshes, based on logs. @@ -72,6 +70,16 @@ type PostfixExporter struct { opendkimSignatureAdded *prometheus.CounterVec } +// A LogSource is an interface to read log lines. +type LogSource interface { + // Path returns a representation of the log location. + Path() string + + // Read returns the next log line. Returns `io.EOF` at the end of + // the log. + Read(context.Context) (string, error) +} + // CollectShowqFromReader parses the output of Postfix's 'showq' command // and turns it into metrics. // @@ -420,50 +428,13 @@ func addToHistogramVec(h *prometheus.HistogramVec, value, fieldName string, labe h.WithLabelValues(labels...).Observe(float) } -// CollectLogfileFromFile tails a Postfix log file and collects entries from it. -func (e *PostfixExporter) CollectLogfileFromFile(ctx context.Context) { - gaugeVec := prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "postfix", - Subsystem: "", - Name: "up", - Help: "Whether scraping Postfix's metrics was successful.", - }, - []string{"path"}) - gauge := gaugeVec.WithLabelValues(e.tailer.Filename) - for { - select { - case line := <-e.tailer.Lines: - e.CollectFromLogLine(line.Text) - case <-ctx.Done(): - gauge.Set(0) - return - } - gauge.Set(1) - } -} - // NewPostfixExporter creates a new Postfix exporter instance. -func NewPostfixExporter(showqPath string, logfilePath string, journal *Journal, logUnsupportedLines bool) (*PostfixExporter, error) { - var tailer *tail.Tail - if logfilePath != "" { - var err error - tailer, err = tail.TailFile(logfilePath, tail.Config{ - ReOpen: true, // reopen the file if it's rotated - MustExist: true, // fail immediately if the file is missing or has incorrect permissions - Follow: true, // run in follow mode - Location: &tail.SeekInfo{Whence: io.SeekEnd}, // seek to end of file - }) - if err != nil { - return nil, err - } - } +func NewPostfixExporter(showqPath string, logSrc LogSource, logUnsupportedLines bool) (*PostfixExporter, error) { timeBuckets := []float64{1e-3, 1e-2, 1e-1, 1.0, 10, 1 * 60, 1 * 60 * 60, 24 * 60 * 60, 2 * 24 * 60 * 60} return &PostfixExporter{ logUnsupportedLines: logUnsupportedLines, showqPath: showqPath, - tailer: tailer, - journal: journal, + logSrc: logSrc, cleanupProcesses: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: "postfix", @@ -613,7 +584,7 @@ func NewPostfixExporter(showqPath string, logfilePath string, journal *Journal, func (e *PostfixExporter) Describe(ch chan<- *prometheus.Desc) { ch <- postfixUpDesc - if e.tailer == nil && e.journal == nil { + if e.logSrc == nil { return } ch <- e.cleanupProcesses.Desc() @@ -641,38 +612,12 @@ func (e *PostfixExporter) Describe(ch chan<- *prometheus.Desc) { e.opendkimSignatureAdded.Describe(ch) } -func (e *PostfixExporter) foreverCollectFromJournal(ctx context.Context) { - gauge := prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "postfix", - Subsystem: "", - Name: "up", - Help: "Whether scraping Postfix's metrics was successful.", - }, - []string{"path"}).WithLabelValues(e.journal.Path) - select { - case <-ctx.Done(): - gauge.Set(0) - return - default: - err := e.CollectLogfileFromJournal() - if err != nil { - log.Printf("Couldn't read journal: %v", err) - gauge.Set(0) - } else { - gauge.Set(1) - } - } -} - func (e *PostfixExporter) StartMetricCollection(ctx context.Context) { - if e.journal != nil { - e.foreverCollectFromJournal(ctx) - } else if e.tailer != nil { - e.CollectLogfileFromFile(ctx) + if e.logSrc == nil { + return } - prometheus.NewGaugeVec( + gaugeVec := prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "postfix", Subsystem: "", @@ -680,7 +625,20 @@ func (e *PostfixExporter) StartMetricCollection(ctx context.Context) { Help: "Whether scraping Postfix's metrics was successful.", }, []string{"path"}) - return + gauge := gaugeVec.WithLabelValues(e.logSrc.Path()) + defer gauge.Set(0) + + for { + line, err := e.logSrc.Read(ctx) + if err != nil { + if err != io.EOF { + log.Printf("Couldn't read journal: %v", err) + } + return + } + e.CollectFromLogLine(line) + gauge.Set(1) + } } // Collect metrics from Postfix's showq socket and its log file. @@ -701,7 +659,7 @@ func (e *PostfixExporter) Collect(ch chan<- prometheus.Metric) { e.showqPath) } - if e.tailer == nil && e.journal == nil { + if e.logSrc == nil { return } ch <- e.cleanupProcesses diff --git a/postfix_exporter_test.go b/postfix_exporter_test.go index d0d06f2..a3426a4 100644 --- a/postfix_exporter_test.go +++ b/postfix_exporter_test.go @@ -1,18 +1,17 @@ package main import ( - "github.com/hpcloud/tail" + "testing" + "github.com/prometheus/client_golang/prometheus" io_prometheus_client "github.com/prometheus/client_model/go" "github.com/stretchr/testify/assert" - "testing" ) func TestPostfixExporter_CollectFromLogline(t *testing.T) { type fields struct { showqPath string - journal *Journal - tailer *tail.Tail + logSrc LogSource cleanupProcesses prometheus.Counter cleanupRejects prometheus.Counter cleanupNotAccepted prometheus.Counter @@ -173,8 +172,7 @@ func TestPostfixExporter_CollectFromLogline(t *testing.T) { t.Run(tt.name, func(t *testing.T) { e := &PostfixExporter{ showqPath: tt.fields.showqPath, - journal: tt.fields.journal, - tailer: tt.fields.tailer, + logSrc: tt.fields.logSrc, cleanupProcesses: tt.fields.cleanupProcesses, cleanupRejects: tt.fields.cleanupRejects, cleanupNotAccepted: tt.fields.cleanupNotAccepted, diff --git a/systemd.go b/systemd.go deleted file mode 100644 index e459c8b..0000000 --- a/systemd.go +++ /dev/null @@ -1,119 +0,0 @@ -// +build !nosystemd,linux - -package main - -import ( - "fmt" - "log" - "sync" - "time" - - "github.com/alecthomas/kingpin" - "github.com/coreos/go-systemd/v22/sdjournal" -) - -// Journal represents a lockable systemd journal. -type Journal struct { - *sdjournal.Journal - sync.Mutex - Path string -} - -// NewJournal returns a Journal for reading journal entries. -func NewJournal(unit, slice, path string) (j *Journal, err error) { - j = new(Journal) - if path != "" { - j.Journal, err = sdjournal.NewJournalFromDir(path) - j.Path = path - } else { - j.Journal, err = sdjournal.NewJournal() - j.Path = "journald" - } - if err != nil { - return - } - - if slice != "" { - err = j.AddMatch("_SYSTEMD_SLICE=" + slice) - if err != nil { - return - } - } else if unit != "" { - err = j.AddMatch("_SYSTEMD_UNIT=" + unit) - if err != nil { - return - } - } - - // Start at end of journal - err = j.SeekRealtimeUsec(uint64(time.Now().UnixNano() / 1000)) - if err != nil { - log.Printf("%v", err) - } - return -} - -// NextMessage reads the next message from the journal. -func (j *Journal) NextMessage() (s string, c uint64, err error) { - var e *sdjournal.JournalEntry - - // Read to next - c, err = j.Next() - if err != nil { - return - } - // Return when on the end of journal - if c == 0 { - return - } - - // Get entry - e, err = j.GetEntry() - if err != nil { - return - } - ts := time.Unix(0, int64(e.RealtimeTimestamp)*int64(time.Microsecond)) - - // Format entry - s = fmt.Sprintf( - "%s %s %s[%s]: %s", - ts.Format(time.Stamp), - e.Fields["_HOSTNAME"], - e.Fields["SYSLOG_IDENTIFIER"], - e.Fields["_PID"], - e.Fields["MESSAGE"], - ) - - return -} - -// systemdFlags sets the flags for use with systemd -func systemdFlags(enable *bool, unit, slice, path *string, app *kingpin.Application) { - app.Flag("systemd.enable", "Read from the systemd journal instead of log").Default("false").BoolVar(enable) - app.Flag("systemd.unit", "Name of the Postfix systemd unit.").Default("postfix.service").StringVar(unit) - app.Flag("systemd.slice", "Name of the Postfix systemd slice. Overrides the systemd unit.").Default("").StringVar(slice) - app.Flag("systemd.journal_path", "Path to the systemd journal").Default("").StringVar(path) -} - -// CollectLogfileFromJournal Collects entries from the systemd journal. -func (e *PostfixExporter) CollectLogfileFromJournal() error { - e.journal.Lock() - defer e.journal.Unlock() - - r := e.journal.Wait(time.Duration(1) * time.Second) - if r < 0 { - log.Print("error while waiting for journal!") - } - for { - m, c, err := e.journal.NextMessage() - if err != nil { - return err - } - if c == 0 { - break - } - e.CollectFromLogLine(m) - } - - return nil -} From f1d5d0ad4d47b8e38bb591944c2c6e93736ffa79 Mon Sep 17 00:00:00 2001 From: Tommie Gannert Date: Sun, 21 Jun 2020 08:24:36 +0200 Subject: [PATCH 2/2] Adds a Docker log source. When Postfix is running in a Docker container, it's most useful to use the built-in Docker logging (as with Systemd). Setting `--docker.enable` allows that. The log source is using `client.NewEnvClient`, which reads environment variables to determine which Docker to connect to, and how TLS is handled. --- README.md | 18 +++++++- go.mod | 7 +++ go.sum | 12 +++++ logsource.go | 7 +-- logsource_docker.go | 96 ++++++++++++++++++++++++++++++++++++++++ logsource_docker_test.go | 79 +++++++++++++++++++++++++++++++++ logsource_file.go | 2 +- logsource_systemd.go | 2 +- main.go | 5 ++- 9 files changed, 219 insertions(+), 9 deletions(-) create mode 100644 logsource_docker.go create mode 100644 logsource_docker_test.go diff --git a/README.md b/README.md index e59f4f9..1be73b8 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ This exporter provides histogram metrics for the size and age of messages stored the mail queue. It extracts these metrics from Postfix by connecting to a UNIX socket under `/var/spool`. It also counts events by parsing Postfix's log entries, using regular expression matching. The log entries are retrieved from -the systemd journal or from a log file. +the systemd journal, the Docker logs, or from a log file. ## Options @@ -18,11 +18,25 @@ These options can be used when starting the `postfix_exporter` | `--postfix.showq_path` | Path at which Postfix places its showq socket | `/var/spool/postfix/public/showq` | | `--postfix.logfile_path` | Path where Postfix writes log entries | `/var/log/maillog` | | `--log.unsupported` | Log all unsupported lines | `false` | -| `--systemd.enable` | Read from the systemd journal instead of log | `false` | +| `--docker.enable` | Read from the Docker logs instead of a file | `false` | +| `--docker.container.id` | The container to read Docker logs from | `postfix` | +| `--systemd.enable` | Read from the systemd journal instead of file | `false` | | `--systemd.unit` | Name of the Postfix systemd unit | `postfix.service` | | `--systemd.slice` | Name of the Postfix systemd slice. | `""` | | `--systemd.journal_path` | Path to the systemd journal | `""` | +## Events from Docker + +Postfix servers running in a [Docker](https://www.docker.com/) +container can be monitored using the `--docker.enable` flag. The +default container ID is `postfix`, but can be customized with the +`--docker.container.id` flag. + +The default is to connect to the local Docker, but this can be +customized using [the `DOCKER_HOST` and +similar](https://pkg.go.dev/github.com/docker/docker/client?tab=doc#NewEnvClient) +environment variables. + ## Events from log file The log file is tailed when processed. Rotating the log files while the exporter diff --git a/go.mod b/go.mod index 3d5e663..f35270f 100644 --- a/go.mod +++ b/go.mod @@ -5,8 +5,15 @@ go 1.13 require ( github.com/alecthomas/kingpin v2.2.6+incompatible github.com/coreos/go-systemd/v22 v22.0.0 + github.com/docker/distribution v2.7.1+incompatible // indirect + github.com/docker/docker v1.13.1 + github.com/docker/go-connections v0.4.0 // indirect + github.com/docker/go-units v0.4.0 // indirect github.com/fsnotify/fsnotify v1.4.7 // indirect github.com/hpcloud/tail v1.0.0 + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.1 // indirect + github.com/opencontainers/go-digest v1.0.0 // indirect github.com/prometheus/client_golang v1.4.1 github.com/prometheus/client_model v0.2.0 github.com/stretchr/testify v1.4.0 diff --git a/go.sum b/go.sum index bea5d93..fedcb99 100644 --- a/go.sum +++ b/go.sum @@ -17,6 +17,14 @@ github.com/coreos/go-systemd/v22 v22.0.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/docker/distribution v2.7.1+incompatible h1:a5mlkVzth6W5A4fOsS3D2EO5BUmsJpcB+cRlLU7cSug= +github.com/docker/distribution v2.7.1+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= +github.com/docker/docker v1.13.1 h1:IkZjBSIc8hBjLpqeAbeE5mca5mNgeatLHBy3GO78BWo= +github.com/docker/docker v1.13.1/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= +github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ= +github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec= +github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw= +github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= @@ -53,7 +61,10 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= +github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -83,6 +94,7 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190613194153-d28f0bde5980 h1:dfGZHvZk057jK2MCeWus/TowKpJ8y4AmooUzdBSR9GU= golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= diff --git a/logsource.go b/logsource.go index 3f8318b..492d2e2 100644 --- a/logsource.go +++ b/logsource.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "io" @@ -17,7 +18,7 @@ type LogSourceFactory interface { // New attempts to create a new log source. This is called after // flags have been parsed. Returning `nil, nil`, means the user // didn't want this log source. - New() (LogSourceCloser, error) + New(context.Context) (LogSourceCloser, error) } type LogSourceCloser interface { @@ -48,9 +49,9 @@ func InitLogSourceFactories(app *kingpin.Application) { // NewLogSourceFromFactories iterates through the factories and // attempts to instantiate a log source. The first factory to return // success wins. -func NewLogSourceFromFactories() (LogSourceCloser, error) { +func NewLogSourceFromFactories(ctx context.Context) (LogSourceCloser, error) { for _, f := range logSourceFactories { - src, err := f.New() + src, err := f.New(ctx) if err != nil { return nil, err } diff --git a/logsource_docker.go b/logsource_docker.go new file mode 100644 index 0000000..fa182a7 --- /dev/null +++ b/logsource_docker.go @@ -0,0 +1,96 @@ +// +build !nodocker + +package main + +import ( + "bufio" + "context" + "io" + "log" + "strings" + + "github.com/alecthomas/kingpin" + "github.com/docker/docker/api/types" + "github.com/docker/docker/client" +) + +// A DockerLogSource reads log records from the given Docker +// journal. +type DockerLogSource struct { + client DockerClient + containerID string + reader *bufio.Reader +} + +// A DockerClient is the client interface that client.Client +// provides. See https://pkg.go.dev/github.com/docker/docker/client +type DockerClient interface { + io.Closer + ContainerLogs(context.Context, string, types.ContainerLogsOptions) (io.ReadCloser, error) +} + +// NewDockerLogSource returns a log source for reading Docker logs. +func NewDockerLogSource(ctx context.Context, c DockerClient, containerID string) (*DockerLogSource, error) { + r, err := c.ContainerLogs(ctx, containerID, types.ContainerLogsOptions{ + ShowStdout: true, + ShowStderr: true, + Follow: true, + Tail: "0", + }) + if err != nil { + return nil, err + } + + logSrc := &DockerLogSource{ + client: c, + containerID: containerID, + reader: bufio.NewReader(r), + } + + return logSrc, nil +} + +func (s *DockerLogSource) Close() error { + return s.client.Close() +} + +func (s *DockerLogSource) Path() string { + return "docker:" + s.containerID +} + +func (s *DockerLogSource) Read(ctx context.Context) (string, error) { + line, err := s.reader.ReadString('\n') + if err != nil { + return "", err + } + return strings.TrimSpace(line), nil +} + +// A dockerLogSourceFactory is a factory that can create +// DockerLogSources from command line flags. +type dockerLogSourceFactory struct { + enable bool + containerID string +} + +func (f *dockerLogSourceFactory) Init(app *kingpin.Application) { + app.Flag("docker.enable", "Read from Docker logs. Environment variable DOCKER_HOST can be used to change the address. See https://pkg.go.dev/github.com/docker/docker/client?tab=doc#NewEnvClient for more information.").Default("false").BoolVar(&f.enable) + app.Flag("docker.container.id", "ID/name of the Postfix Docker container.").Default("postfix").StringVar(&f.containerID) +} + +func (f *dockerLogSourceFactory) New(ctx context.Context) (LogSourceCloser, error) { + if !f.enable { + return nil, nil + } + + log.Println("Reading log events from Docker") + c, err := client.NewEnvClient() + if err != nil { + return nil, err + } + return NewDockerLogSource(ctx, c, f.containerID) +} + +func init() { + RegisterLogSourceFactory(&dockerLogSourceFactory{}) +} diff --git a/logsource_docker_test.go b/logsource_docker_test.go new file mode 100644 index 0000000..74231c2 --- /dev/null +++ b/logsource_docker_test.go @@ -0,0 +1,79 @@ +// +build !nodocker + +package main + +import ( + "context" + "io" + "io/ioutil" + "strings" + "testing" + + "github.com/docker/docker/api/types" + "github.com/stretchr/testify/assert" +) + +func TestNewDockerLogSource(t *testing.T) { + ctx := context.Background() + c := &fakeDockerClient{} + src, err := NewDockerLogSource(ctx, c, "acontainer") + if err != nil { + t.Fatalf("NewDockerLogSource failed: %v", err) + } + + assert.Equal(t, []string{"acontainer"}, c.containerLogsCalls, "A call to ContainerLogs should be made.") + + if err := src.Close(); err != nil { + t.Fatalf("Close failed: %v", err) + } + + assert.Equal(t, 1, c.closeCalls, "A call to Close should be made.") +} + +func TestDockerLogSource_Path(t *testing.T) { + ctx := context.Background() + c := &fakeDockerClient{} + src, err := NewDockerLogSource(ctx, c, "acontainer") + if err != nil { + t.Fatalf("NewDockerLogSource failed: %v", err) + } + defer src.Close() + + assert.Equal(t, "docker:acontainer", src.Path(), "Path should be set by New.") +} + +func TestDockerLogSource_Read(t *testing.T) { + ctx := context.Background() + + c := &fakeDockerClient{ + logsReader: ioutil.NopCloser(strings.NewReader("Feb 13 23:31:30 ahost anid[123]: aline\n")), + } + src, err := NewDockerLogSource(ctx, c, "acontainer") + if err != nil { + t.Fatalf("NewDockerLogSource failed: %v", err) + } + defer src.Close() + + s, err := src.Read(ctx) + if err != nil { + t.Fatalf("Read failed: %v", err) + } + assert.Equal(t, "Feb 13 23:31:30 ahost anid[123]: aline", s, "Read should get data from the journal entry.") +} + +type fakeDockerClient struct { + logsReader io.ReadCloser + + containerLogsCalls []string + closeCalls int +} + +func (c *fakeDockerClient) ContainerLogs(ctx context.Context, containerID string, opts types.ContainerLogsOptions) (io.ReadCloser, error) { + c.containerLogsCalls = append(c.containerLogsCalls, containerID) + return c.logsReader, nil +} + +func (c *fakeDockerClient) Close() error { + c.closeCalls++ + return nil +} diff --git a/logsource_file.go b/logsource_file.go index dfe85e8..b906a84 100644 --- a/logsource_file.go +++ b/logsource_file.go @@ -69,7 +69,7 @@ func (f *fileLogSourceFactory) Init(app *kingpin.Application) { app.Flag("postfix.logfile_path", "Path where Postfix writes log entries.").Default("/var/log/maillog").StringVar(&f.path) } -func (f *fileLogSourceFactory) New() (LogSourceCloser, error) { +func (f *fileLogSourceFactory) New(ctx context.Context) (LogSourceCloser, error) { if f.path == "" { return nil, nil } diff --git a/logsource_systemd.go b/logsource_systemd.go index 76d3be8..60f5cb6 100644 --- a/logsource_systemd.go +++ b/logsource_systemd.go @@ -112,7 +112,7 @@ func (f *systemdLogSourceFactory) Init(app *kingpin.Application) { app.Flag("systemd.journal_path", "Path to the systemd journal").Default("").StringVar(&f.path) } -func (f *systemdLogSourceFactory) New() (LogSourceCloser, error) { +func (f *systemdLogSourceFactory) New(ctx context.Context) (LogSourceCloser, error) { if !f.enable { return nil, nil } diff --git a/main.go b/main.go index 2e5e8cb..c92d7d5 100644 --- a/main.go +++ b/main.go @@ -13,6 +13,7 @@ import ( func main() { var ( + ctx = context.Background() app = kingpin.New("postfix_exporter", "Prometheus metrics exporter for postfix") listenAddress = app.Flag("web.listen-address", "Address to listen on for web interface and telemetry.").Default(":9154").String() metricsPath = app.Flag("web.telemetry-path", "Path under which to expose metrics.").Default("/metrics").String() @@ -23,7 +24,7 @@ func main() { InitLogSourceFactories(app) kingpin.MustParse(app.Parse(os.Args[1:])) - logSrc, err := NewLogSourceFromFactories() + logSrc, err := NewLogSourceFromFactories(ctx) if err != nil { log.Fatalf("Error opening log source: %s", err) } @@ -53,7 +54,7 @@ func main() { panic(err) } }) - ctx, cancelFunc := context.WithCancel(context.Background()) + ctx, cancelFunc := context.WithCancel(ctx) defer cancelFunc() go exporter.StartMetricCollection(ctx) log.Print("Listening on ", *listenAddress)