Skip to content

Commit

Permalink
added support backward compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
ifireice committed May 14, 2023
1 parent 252ec74 commit 40fae87
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 40 deletions.
77 changes: 67 additions & 10 deletions prometheus/graphite/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,22 @@ const (
millisecondsPerSecond = 1000
)

// ErrorHandler is a function that handles errors
type ErrorHandler func(err error)
// HandlerErrorHandling defines how a Handler serving metrics will handle
// errors.
type HandlerErrorHandling int

// DefaultErrorHandler skips received errors
var DefaultErrorHandler = func(err error) {}
// These constants cause handlers serving metrics to behave as described if
// errors are encountered.
const (
// Ignore errors and try to push as many metrics to Graphite as possible.
ContinueOnError HandlerErrorHandling = iota

// Abort the push to Graphite upon the first error encountered.
AbortOnError

// Execute callback function on error.
CallbackOnError
)

// Config defines the Graphite bridge config.
type Config struct {
Expand All @@ -64,8 +75,16 @@ type Config struct {
// The Gatherer to use for metrics. Defaults to prometheus.DefaultGatherer.
Gatherer prometheus.Gatherer

// ErrorHandler defines how errors are handled.
ErrorHandler ErrorHandler
// The logger that messages are written to. Defaults to no logging.
Logger Logger

// ErrorHandling defines how errors are handled. Note that errors are
// logged regardless of the configured ErrorHandling provided Logger
// is not nil.
ErrorHandling HandlerErrorHandling

// ErrorCallbackFunc is a callback function that can be executed when error is occurred
ErrorCallbackFunc CallbackFunc
}

// Bridge pushes metrics to the configured Graphite server.
Expand All @@ -76,11 +95,23 @@ type Bridge struct {
interval time.Duration
timeout time.Duration

errorHandler ErrorHandler
errorHandling HandlerErrorHandling
errorCallbackFunc CallbackFunc
logger Logger

g prometheus.Gatherer
}

// Logger is the minimal interface Bridge needs for logging. Note that
// log.Logger from the standard library implements this interface, and it is
// easy to implement by custom loggers, if they don't do so already anyway.
type Logger interface {
Println(v ...interface{})
}

// CallbackFunc is a special type for callback functions
type CallbackFunc func(error)

// NewBridge returns a pointer to a new Bridge struct.
func NewBridge(c *Config) (*Bridge, error) {
b := &Bridge{}
Expand All @@ -98,6 +129,10 @@ func NewBridge(c *Config) (*Bridge, error) {
b.g = c.Gatherer
}

if c.Logger != nil {
b.logger = c.Logger
}

if c.Prefix != "" {
b.prefix = c.Prefix
}
Expand All @@ -115,7 +150,11 @@ func NewBridge(c *Config) (*Bridge, error) {
b.timeout = c.Timeout
}

b.errorHandler = c.ErrorHandler
b.errorHandling = c.ErrorHandling

if c.ErrorCallbackFunc != nil {
b.errorCallbackFunc = c.ErrorCallbackFunc
}

return b, nil
}
Expand All @@ -128,7 +167,9 @@ func (b *Bridge) Run(ctx context.Context) {
for {
select {
case <-ticker.C:
b.errorHandler(b.Push())
if err := b.Push(); err != nil && b.logger != nil {
b.logger.Println("error pushing to Graphite:", err)
}
case <-ctx.Done():
return
}
Expand All @@ -137,11 +178,27 @@ func (b *Bridge) Run(ctx context.Context) {

// Push pushes Prometheus metrics to the configured Graphite server.
func (b *Bridge) Push() error {
err := b.push()
switch b.errorHandling {
case AbortOnError:
return err
case ContinueOnError:
if b.logger != nil {
b.logger.Println("continue on error:", err)
}
case CallbackOnError:
if b.errorCallbackFunc != nil {
b.errorCallbackFunc(err)
}
}
return nil
}

func (b *Bridge) push() error {
mfs, err := b.g.Gather()
if err != nil {
return err
}

if len(mfs) == 0 {
return nil
}
Expand Down
78 changes: 48 additions & 30 deletions prometheus/graphite/bridge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"context"
"fmt"
"io"
"log"
"net"
"os"
"reflect"
"regexp"
"sort"
Expand Down Expand Up @@ -436,12 +438,13 @@ type mockGraphite struct {

func ExampleBridge() {
b, err := NewBridge(&Config{
URL: "graphite.example.org:3099",
Gatherer: prometheus.DefaultGatherer,
Prefix: "prefix",
Interval: 15 * time.Second,
Timeout: 10 * time.Second,
ErrorHandler: func(err error) {},
URL: "graphite.example.org:3099",
Gatherer: prometheus.DefaultGatherer,
Prefix: "prefix",
Interval: 15 * time.Second,
Timeout: 10 * time.Second,
ErrorHandling: AbortOnError,
Logger: log.New(os.Stdout, "graphite bridge: ", log.Lshortfile),
})
if err != nil {
panic(err)
Expand All @@ -465,32 +468,47 @@ func ExampleBridge() {
b.Run(ctx)
}

func TestErrorHandler(t *testing.T) {
var internalError error
c := &Config{
URL: "localhost",
Gatherer: prometheus.DefaultGatherer,
Prefix: "prefix",
Interval: 5 * time.Second,
Timeout: 2 * time.Second,
ErrorHandler: func(err error) { internalError = err },
}
b, err := NewBridge(c)
if err != nil {
panic(err)
func TestErrorHandling(t *testing.T) {
var testCases = []struct {
errorHandling HandlerErrorHandling
receivedError error
interceptedError error
}{
{
errorHandling: ContinueOnError,
receivedError: nil,
interceptedError: nil,
},
{
errorHandling: AbortOnError,
receivedError: &net.OpError{},
interceptedError: nil,
},
{
errorHandling: CallbackOnError,
receivedError: nil,
interceptedError: &net.OpError{},
},
}

// Create a Context to control stopping the Run() loop that pushes
// metrics to Graphite. Multiplied by 2, because we need Run to be executed at least one time.
ctx, cancel := context.WithTimeout(context.Background(), c.Interval*2)
defer cancel()

// Start pushing metrics to Graphite in the Run() loop.
b.Run(ctx)
for _, testCase := range testCases {
var interceptedError error
c := &Config{
URL: "localhost",
ErrorHandling: testCase.errorHandling,
ErrorCallbackFunc: func(err error) { interceptedError = err },
}
b, err := NewBridge(c)
if err != nil {
t.Fatal(err)
}

// We haven't specified port
expError := fmt.Errorf("dial tcp: address localhost: missing port in address")
if internalError.Error() != expError.Error() {
t.Fatalf("Expected: '%s', actual: '%s'", expError, internalError)
receivedError := b.Push()
if reflect.TypeOf(receivedError) != reflect.TypeOf(testCase.receivedError) {
t.Errorf("expected to receive: %T, received: %T", testCase.receivedError, receivedError)
}
if reflect.TypeOf(interceptedError) != reflect.TypeOf(testCase.interceptedError) {
t.Errorf("expected to intercept: %T, intercepted: %T", testCase.interceptedError, interceptedError)
}
}
}

0 comments on commit 40fae87

Please sign in to comment.