forked from influxdata/telegraf
-
Notifications
You must be signed in to change notification settings - Fork 0
/
graphite.go
193 lines (168 loc) · 4.78 KB
/
graphite.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
package graphite
import (
"crypto/tls"
"errors"
"io"
"log"
"math/rand"
"net"
"time"
"github.com/influxdata/telegraf"
tlsint "github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
)
type Graphite struct {
GraphiteTagSupport bool
// URL is only for backwards compatibility
Servers []string
Prefix string
Template string
Timeout int
conns []net.Conn
tlsint.ClientConfig
}
var sampleConfig = `
## TCP endpoint for your graphite instance.
## If multiple endpoints are configured, output will be load balanced.
## Only one of the endpoints will be written to with each iteration.
servers = ["localhost:2003"]
## Prefix metrics name
prefix = ""
## Graphite output template
## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
template = "host.tags.measurement.field"
## Enable Graphite tags support
# graphite_tag_support = false
## timeout in seconds for the write connection to graphite
timeout = 2
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
`
func (g *Graphite) Connect() error {
// Set default values
if g.Timeout <= 0 {
g.Timeout = 2
}
if len(g.Servers) == 0 {
g.Servers = append(g.Servers, "localhost:2003")
}
// Set tls config
tlsConfig, err := g.ClientConfig.TLSConfig()
if err != nil {
return err
}
// Get Connections
var conns []net.Conn
for _, server := range g.Servers {
// Dialer with timeout
d := net.Dialer{Timeout: time.Duration(g.Timeout) * time.Second}
// Get secure connection if tls config is set
var conn net.Conn
if tlsConfig != nil {
conn, err = tls.DialWithDialer(&d, "tcp", server, tlsConfig)
} else {
conn, err = d.Dial("tcp", server)
}
if err == nil {
conns = append(conns, conn)
}
}
g.conns = conns
return nil
}
func (g *Graphite) Close() error {
// Closing all connections
for _, conn := range g.conns {
conn.Close()
}
return nil
}
func (g *Graphite) SampleConfig() string {
return sampleConfig
}
func (g *Graphite) Description() string {
return "Configuration for Graphite server to send metrics to"
}
// We need check eof as we can write to nothing without noticing anything is wrong
// the connection stays in a close_wait
// We can detect that by finding an eof
// if not for this, we can happily write and flush without getting errors (in Go) but getting RST tcp packets back (!)
// props to Tv via the authors of carbon-relay-ng` for this trick.
func checkEOF(conn net.Conn) {
b := make([]byte, 1024)
conn.SetReadDeadline(time.Now().Add(10 * time.Millisecond))
num, err := conn.Read(b)
if err == io.EOF {
log.Printf("E! Conn %s is closed. closing conn explicitly", conn)
conn.Close()
return
}
// just in case i misunderstand something or the remote behaves badly
if num != 0 {
log.Printf("I! conn %s .conn.Read data? did not expect that. data: %s\n", conn, b[:num])
}
// Log non-timeout errors or close.
if e, ok := err.(net.Error); !(ok && e.Timeout()) {
log.Printf("E! conn %s checkEOF .conn.Read returned err != EOF, which is unexpected. closing conn. error: %s\n", conn, err)
conn.Close()
}
}
// Choose a random server in the cluster to write to until a successful write
// occurs, logging each unsuccessful. If all servers fail, return error.
func (g *Graphite) Write(metrics []telegraf.Metric) error {
// Prepare data
var batch []byte
s, err := serializers.NewGraphiteSerializer(g.Prefix, g.Template, g.GraphiteTagSupport)
if err != nil {
return err
}
for _, metric := range metrics {
buf, err := s.Serialize(metric)
if err != nil {
log.Printf("E! Error serializing some metrics to graphite: %s", err.Error())
}
batch = append(batch, buf...)
}
err = g.send(batch)
// try to reconnect and retry to send
if err != nil {
log.Println("E! Graphite: Reconnecting and retrying: ")
g.Connect()
err = g.send(batch)
}
return err
}
func (g *Graphite) send(batch []byte) error {
// This will get set to nil if a successful write occurs
err := errors.New("Could not write to any Graphite server in cluster\n")
// Send data to a random server
p := rand.Perm(len(g.conns))
for _, n := range p {
if g.Timeout > 0 {
g.conns[n].SetWriteDeadline(time.Now().Add(time.Duration(g.Timeout) * time.Second))
}
checkEOF(g.conns[n])
if _, e := g.conns[n].Write(batch); e != nil {
// Error
log.Println("E! Graphite Error: " + e.Error())
// Close explicitly
g.conns[n].Close()
// Let's try the next one
} else {
// Success
err = nil
break
}
}
return err
}
func init() {
outputs.Add("graphite", func() telegraf.Output {
return &Graphite{}
})
}