Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Periodic reconnect to avoid stale UDP connections #51

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion bufferedclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type StatsdBuffer struct {
eventChannel chan event.Event
events map[string]event.Event
closeChannel chan closeRequest
Logger Logger
Logger *log.Logger
Verbose bool
}

Expand Down
24 changes: 12 additions & 12 deletions bufferedclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,15 @@ func TestBufferedInt64(t *testing.T) {
defer ln.Close()

t.Log("Starting new UDP listener at", udpAddr.String())
time.Sleep(50 * time.Millisecond)
time.Sleep(10 * time.Millisecond)

client := NewStatsdClient(udpAddr.String(), prefix)
buffered := NewStatsdBuffer(time.Millisecond*20, client)

ch := make(chan string)

go doListenUDP(t, ln, ch, len(tc.expected))
time.Sleep(50 * time.Millisecond)
time.Sleep(10 * time.Millisecond)

err = buffered.CreateSocket()
if nil != err {
Expand Down Expand Up @@ -234,7 +234,7 @@ func TestBufferedInt64(t *testing.T) {
t.Errorf("did not receive all metrics: Expected: %T %v, Actual: %T %v ", tc.expected, tc.expected, actual, actual)
}

time.Sleep(500 * time.Millisecond)
time.Sleep(10 * time.Millisecond)
})
}
}
Expand Down Expand Up @@ -300,15 +300,15 @@ func TestBufferedFloat64(t *testing.T) {
defer ln.Close()

t.Log("Starting new UDP listener at", udpAddr.String())
time.Sleep(50 * time.Millisecond)
time.Sleep(10 * time.Millisecond)

client := NewStatsdClient(udpAddr.String(), prefix)
buffered := NewStatsdBuffer(time.Millisecond*20, client)

ch := make(chan string)

go doListenUDP(t, ln, ch, len(tc.expected))
time.Sleep(50 * time.Millisecond)
time.Sleep(10 * time.Millisecond)

err = buffered.CreateSocket()
if nil != err {
Expand Down Expand Up @@ -366,7 +366,7 @@ func TestBufferedFloat64(t *testing.T) {
t.Errorf("did not receive all metrics: Expected: \n%T %v, \nActual: \n%T %v ", tc.expected, tc.expected, actual, actual)
}

time.Sleep(500 * time.Millisecond)
time.Sleep(10 * time.Millisecond)
})
}
}
Expand Down Expand Up @@ -413,15 +413,15 @@ func TestBufferedAbsolute(t *testing.T) {
defer ln.Close()

t.Log("Starting new UDP listener at", udpAddr.String())
time.Sleep(50 * time.Millisecond)
time.Sleep(10 * time.Millisecond)

client := NewStatsdClient(udpAddr.String(), prefix)
buffered := NewStatsdBuffer(time.Millisecond*20, client)

ch := make(chan string)

go doListenUDP(t, ln, ch, len(tc.expected))
time.Sleep(50 * time.Millisecond)
time.Sleep(10 * time.Millisecond)

err = buffered.CreateSocket()
if nil != err {
Expand Down Expand Up @@ -473,7 +473,7 @@ func TestBufferedAbsolute(t *testing.T) {
t.Errorf("did not receive all metrics: \nExpected: \n%T %v, \nActual: \n%T %v ", tc.expected, tc.expected, actual, actual)
}

time.Sleep(500 * time.Millisecond)
time.Sleep(10 * time.Millisecond)
})
}
}
Expand Down Expand Up @@ -520,15 +520,15 @@ func TestBufferedFAbsolute(t *testing.T) {
defer ln.Close()

t.Log("Starting new UDP listener at", udpAddr.String())
time.Sleep(50 * time.Millisecond)
time.Sleep(10 * time.Millisecond)

client := NewStatsdClient(udpAddr.String(), prefix)
buffered := NewStatsdBuffer(time.Millisecond*20, client)

ch := make(chan string)

go doListenUDP(t, ln, ch, len(tc.expected))
time.Sleep(50 * time.Millisecond)
time.Sleep(10 * time.Millisecond)

err = buffered.CreateSocket()
if nil != err {
Expand Down Expand Up @@ -580,7 +580,7 @@ func TestBufferedFAbsolute(t *testing.T) {
t.Errorf("did not receive all metrics: \nExpected: \n%T %v, \nActual: \n%T %v ", tc.expected, tc.expected, actual, actual)
}

time.Sleep(500 * time.Millisecond)
time.Sleep(10 * time.Millisecond)
})
}
}
60 changes: 44 additions & 16 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@ import (
"github.com/quipo/statsd/event"
)

// Logger interface compatible with log.Logger
type Logger interface {
Println(v ...interface{})
}

// UDPPayloadSize is the number of bytes to send at one go through the udp socket.
// SendEvents will try to pack as many events into one udp packet.
// Change this value as per network capabilities
Expand Down Expand Up @@ -54,29 +49,62 @@ const (

// StatsdClient is a client library to send events to StatsD
type StatsdClient struct {
conn net.Conn
addr string
prefix string
sockType socketType
Logger Logger
conn net.Conn
addr string
prefix string
sockType socketType
Logger *log.Logger
reconnect bool
reconnect_ticker *time.Ticker
}

// NewStatsdClient - Factory
func NewStatsdClient(addr string, prefix string) *StatsdClient {
// allow %HOST% in the prefix string
prefix = strings.Replace(prefix, "%HOST%", Hostname, 1)
return &StatsdClient{
addr: addr,
prefix: prefix,
Logger: log.New(os.Stdout, "[StatsdClient] ", log.Ldate|log.Ltime),
}
client := &StatsdClient{
addr: addr,
prefix: prefix,
Logger: log.New(os.Stdout, "[StatsdClient] ", log.Ldate|log.Ltime),
reconnect: false,
reconnect_ticker: time.NewTicker(30 * time.Second),
}

go func() {
for range client.reconnect_ticker.C {
if client.reconnect {
err := client.Reconnect()
if err != nil {
client.Logger.Println(err)
}
}
}
}()
return client
}

// String returns the StatsD server address
func (c *StatsdClient) String() string {
return c.addr
}

func (c *StatsdClient) Reconnect() error {
var err error
if c.sockType == "udp" {
c.Logger.Printf("Reconnecing to udp server at %s", c.String())
err = c.CreateSocket()
} else if c.sockType == "tcp" {
c.Logger.Printf("Reconnecing to tcp server at %s", c.String())
err = c.CreateTCPSocket()
} else if c.sockType == "" {
return fmt.Errorf("No socket created, cannot identify connection type")
}
if err != nil {
return err
}
return nil
}

// CreateSocket creates a UDP connection to a StatsD server
func (c *StatsdClient) CreateSocket() error {
conn, err := net.DialTimeout(string(udpSocket), c.addr, 5*time.Second)
Expand Down Expand Up @@ -294,7 +322,7 @@ func (c *StatsdClient) SendEvent(e event.Event) error {
return errNotConnected
}
for _, stat := range e.Stats() {
//fmt.Printf("SENDING EVENT %s%s\n", c.prefix, strings.Replace(stat, "%HOST%", Hostname, 1))
//c.Logger.Printf("SENDING EVENT %s%s\n", c.prefix, strings.Replace(stat, "%HOST%", Hostname, 1))
_, err := fmt.Fprintf(c.conn, "%s%s", c.prefix, strings.Replace(stat, "%HOST%", Hostname, 1))
if nil != err {
return err
Expand Down
75 changes: 60 additions & 15 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,14 +175,14 @@ func TestClientInt64(t *testing.T) {
defer ln.Close()

t.Log("Starting new UDP listener at", udpAddr.String())
time.Sleep(50 * time.Millisecond)
time.Sleep(10 * time.Millisecond)

client := NewStatsdClient(udpAddr.String(), prefix)

ch := make(chan string)

go doListenUDP(t, ln, ch, len(tc.expected))
time.Sleep(50 * time.Millisecond)
time.Sleep(10 * time.Millisecond)

err = client.CreateSocket()
if nil != err {
Expand Down Expand Up @@ -247,7 +247,7 @@ func TestClientInt64(t *testing.T) {
t.Errorf("did not receive all metrics: \nExpected: \n%T %v, \nActual: \n%T %v ", tc.expected, tc.expected, actual, actual)
}

time.Sleep(500 * time.Millisecond)
time.Sleep(10 * time.Millisecond)
})
}
}
Expand Down Expand Up @@ -316,14 +316,14 @@ func TestClientFloat64(t *testing.T) {
defer ln.Close()

t.Log("Starting new UDP listener at", udpAddr.String())
time.Sleep(50 * time.Millisecond)
time.Sleep(10 * time.Millisecond)

client := NewStatsdClient(udpAddr.String(), prefix)

ch := make(chan string)

go doListenUDP(t, ln, ch, len(tc.expected))
time.Sleep(50 * time.Millisecond)
time.Sleep(10 * time.Millisecond)

err = client.CreateSocket()
if nil != err {
Expand Down Expand Up @@ -382,7 +382,7 @@ func TestClientFloat64(t *testing.T) {
t.Errorf("did not receive all metrics: Expected: \n%T %v, \nActual: \n%T %v ", tc.expected, tc.expected, actual, actual)
}

time.Sleep(500 * time.Millisecond)
time.Sleep(10 * time.Millisecond)
})
}
}
Expand Down Expand Up @@ -429,14 +429,14 @@ func TestClientAbsolute(t *testing.T) {
defer ln.Close()

t.Log("Starting new UDP listener at", udpAddr.String())
time.Sleep(50 * time.Millisecond)
time.Sleep(10 * time.Millisecond)

client := NewStatsdClient(udpAddr.String(), prefix)

ch := make(chan string)

go doListenUDP(t, ln, ch, len(tc.expected))
time.Sleep(50 * time.Millisecond)
time.Sleep(10 * time.Millisecond)

err = client.CreateSocket()
if nil != err {
Expand Down Expand Up @@ -488,7 +488,7 @@ func TestClientAbsolute(t *testing.T) {
t.Errorf("did not receive all metrics: \nExpected: \n%T %v, \nActual: \n%T %v ", tc.expected, tc.expected, actual, actual)
}

time.Sleep(500 * time.Millisecond)
time.Sleep(10 * time.Millisecond)
})
}
}
Expand Down Expand Up @@ -535,14 +535,14 @@ func TestClientFAbsolute(t *testing.T) {
defer ln.Close()

t.Log("Starting new UDP listener at", udpAddr.String())
time.Sleep(50 * time.Millisecond)
time.Sleep(10 * time.Millisecond)

client := NewStatsdClient(udpAddr.String(), prefix)

ch := make(chan string)

go doListenUDP(t, ln, ch, len(tc.expected))
time.Sleep(50 * time.Millisecond)
time.Sleep(10 * time.Millisecond)

err = client.CreateSocket()
if nil != err {
Expand Down Expand Up @@ -594,7 +594,7 @@ func TestClientFAbsolute(t *testing.T) {
t.Errorf("did not receive all metrics: \nExpected: \n%T %v, \nActual: \n%T %v ", tc.expected, tc.expected, actual, actual)
}

time.Sleep(500 * time.Millisecond)
time.Sleep(10 * time.Millisecond)
})
}
}
Expand All @@ -615,6 +615,51 @@ func newLocalListenerUDP(t *testing.T) (*net.UDPConn, *net.UDPAddr) {
return ln, udpAddr
}

func Teardown(c *StatsdClient) {
c.reconnect = false
}

func TestReconnecting(t *testing.T) {
ln, udpAddr := newLocalListenerUDP(t)
defer ln.Close()

prefix := "test."

client := NewStatsdClient(udpAddr.String(), prefix)
client.reconnect = true
client.reconnect_ticker = time.NewTicker(10 * time.Millisecond)

defer Teardown(client)

ch := make(chan string, 0)

s := map[string]int64{
"a:b:c": 5,
"d:e:f": 2,
}

go doListenUDP(t, ln, ch, len(s))

client.CreateSocket()
client.Close()

time.Sleep(15 * time.Millisecond)
timeout := time.After(5 * time.Millisecond)

for k, v := range s {
t.Log("sent", k, v)
client.Total(k, v)
}

for i := len(s); i > 0; i-- {
select {
case <-ch:
case <-timeout:
t.Fatal("Timed out")
}
}
}

func doListenUDP(t *testing.T, conn *net.UDPConn, ch chan string, n int) {
var wg sync.WaitGroup
wg.Add(n)
Expand Down Expand Up @@ -687,7 +732,7 @@ func TestTCP(t *testing.T) {
defer ln.Close()

t.Log("Starting new TCP listener at", addr)
time.Sleep(50 * time.Millisecond)
time.Sleep(10 * time.Millisecond)

prefix := "myproject."
client := NewStatsdClient(addr, prefix)
Expand Down Expand Up @@ -727,10 +772,10 @@ func TestTCP(t *testing.T) {
t.Error(err)
}
}
time.Sleep(60 * time.Millisecond)
time.Sleep(10 * time.Millisecond)

go doListenTCP(t, ln, ch, len(s))
time.Sleep(50 * time.Millisecond)
time.Sleep(10 * time.Millisecond)

actual := make(map[string]int64)

Expand Down
2 changes: 1 addition & 1 deletion stdoutclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
type StdoutClient struct {
FD *os.File
prefix string
Logger Logger
Logger *log.Logger
}

// NewStdoutClient - Factory
Expand Down