Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate with go-leak library and fix leaks #117

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions hystrix/circuit.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package hystrix

import (
"context"
"fmt"
"sync"
"sync/atomic"
Expand All @@ -15,9 +16,10 @@ type CircuitBreaker struct {
forceOpen bool
mutex *sync.RWMutex
openedOrLastTestedTime int64

executorPool *executorPool
metrics *metricExchange
ctx context.Context
ctxCancelFunc context.CancelFunc
executorPool *executorPool
metrics *metricExchange
}

var (
Expand Down Expand Up @@ -60,16 +62,18 @@ func Flush() {
for name, cb := range circuitBreakers {
cb.metrics.Reset()
cb.executorPool.Metrics.Reset()
cb.ctxCancelFunc()
delete(circuitBreakers, name)
}
}

// newCircuitBreaker creates a CircuitBreaker with associated Health
func newCircuitBreaker(name string) *CircuitBreaker {
c := &CircuitBreaker{}
c.ctx, c.ctxCancelFunc = context.WithCancel(context.TODO())
c.Name = name
c.metrics = newMetricExchange(name)
c.executorPool = newExecutorPool(name)
c.metrics = newMetricExchange(c.ctx, name)
c.executorPool = newExecutorPool(c.ctx, name)
c.mutex = &sync.RWMutex{}

return c
Expand Down
3 changes: 1 addition & 2 deletions hystrix/circuit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ func TestMultithreadedGetCircuit(t *testing.T) {
func TestReportEventOpenThenClose(t *testing.T) {
Convey("when a circuit is closed", t, func() {
defer Flush()

ConfigureCommand("", CommandConfig{ErrorPercentThreshold: 50})

cb, _, err := GetCircuit("")
Expand All @@ -83,7 +82,7 @@ func TestReportEventOpenThenClose(t *testing.T) {
openedTime := cb.openedOrLastTestedTime

Convey("but the metrics are unhealthy", func() {
cb.metrics = metricFailingPercent(100)
cb.metrics = metricFailingPercentWithContext(cb.ctx, 100)
So(cb.metrics.IsHealthy(time.Now()), ShouldBeFalse)

Convey("and a success is reported", func() {
Expand Down
2 changes: 1 addition & 1 deletion hystrix/hystrix.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func GoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC)
for !ticketChecked {
ticketCond.Wait()
}
cmd.circuit.executorPool.Return(cmd.ticket)
cmd.circuit.executorPool.Return(cmd.circuit.ctx, cmd.ticket)
cmd.Unlock()
}
// Shared by the following two goroutines. It ensures only the faster
Expand Down
39 changes: 31 additions & 8 deletions hystrix/hystrix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@ func TestTimeout(t *testing.T) {
ConfigureCommand("", CommandConfig{Timeout: 100})

resultChan := make(chan int)
errChan := GoC(context.Background(), "", func(ctx context.Context) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

errChan := GoC(ctx, "", func(ctx context.Context) error {
time.Sleep(1 * time.Second)
resultChan <- 1
return nil
Expand All @@ -99,7 +102,10 @@ func TestTimeoutEmptyFallback(t *testing.T) {
ConfigureCommand("", CommandConfig{Timeout: 100})

resultChan := make(chan int)
errChan := GoC(context.Background(), "", func(ctx context.Context) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

errChan := GoC(ctx, "", func(ctx context.Context) error {
time.Sleep(1 * time.Second)
resultChan <- 1
return nil
Expand Down Expand Up @@ -131,12 +137,14 @@ func TestMaxConcurrent(t *testing.T) {
resultChan <- 1
return nil
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Convey("and 3 of those commands try to execute at the same time", func() {
var good, bad int

for i := 0; i < 3; i++ {
errChan := GoC(context.Background(), "", run, nil)
errChan := GoC(ctx, "", run, nil)
time.Sleep(10 * time.Millisecond)

select {
Expand Down Expand Up @@ -346,11 +354,20 @@ func TestReturnTicket_QuickCheck(t *testing.T) {
compareTicket := func() bool {
defer Flush()
ConfigureCommand("", CommandConfig{Timeout: 2})
errChan := GoC(context.Background(), "", func(ctx context.Context) error {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

errChan := GoC(ctx, "", func(ctx context.Context) error {
//there are multiple ways to block here, the following sequence of steps
//will block:: c := make(chan struct{}); <-c; return nil // should block
//however, this would leak the internal GoC.func goroutine
//another non-leaking way to do this would be to simply: return ErrTimeout
c := make(chan struct{})
<-c // should block
<-c // should block (hence we add an exception in go-leak)
return nil
}, nil)

err := <-errChan
So(err, ShouldResemble, ErrTimeout)
cb, _, err := GetCircuit("")
Expand All @@ -371,10 +388,16 @@ func TestReturnTicket(t *testing.T) {
defer Flush()

ConfigureCommand("", CommandConfig{Timeout: 10})

errChan := GoC(context.Background(), "", func(ctx context.Context) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

errChan := GoC(ctx, "", func(ctx context.Context) error {
//there are multiple ways to block here, the following sequence of steps
//will block:: c := make(chan struct{}); <-c; return nil // should block
//however, this would leak the internal GoC.func goroutine
//another non-leaking way to do this would be to simply: return ErrTimeout
c := make(chan struct{})
<-c // should block
<-c // should block (hence we add an exception in go-leak)
return nil
}, nil)

Expand Down
42 changes: 25 additions & 17 deletions hystrix/metrics.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package hystrix

import (
"context"
"sync"
"time"

"github.com/afex/hystrix-go/hystrix/metric_collector"
metricCollector "github.com/afex/hystrix-go/hystrix/metric_collector"
"github.com/afex/hystrix-go/hystrix/rolling"
)

Expand All @@ -23,7 +24,7 @@ type metricExchange struct {
metricCollectors []metricCollector.MetricCollector
}

func newMetricExchange(name string) *metricExchange {
func newMetricExchange(ctx context.Context, name string) *metricExchange {
m := &metricExchange{}
m.Name = name

Expand All @@ -32,12 +33,12 @@ func newMetricExchange(name string) *metricExchange {
m.metricCollectors = metricCollector.Registry.InitializeMetricCollectors(name)
m.Reset()

go m.Monitor()
go m.Monitor(ctx)

return m
}

// The Default Collector function will panic if collectors are not setup to specification.
// DefaultCollector will panic if collectors are not setup to specification.
func (m *metricExchange) DefaultCollector() *metricCollector.DefaultMetricCollector {
if len(m.metricCollectors) < 1 {
panic("No Metric Collectors Registered.")
Expand All @@ -49,20 +50,27 @@ func (m *metricExchange) DefaultCollector() *metricCollector.DefaultMetricCollec
return collection
}

func (m *metricExchange) Monitor() {
for update := range m.Updates {
// we only grab a read lock to make sure Reset() isn't changing the numbers.
m.Mutex.RLock()

totalDuration := time.Since(update.Start)
wg := &sync.WaitGroup{}
for _, collector := range m.metricCollectors {
wg.Add(1)
go m.IncrementMetrics(wg, collector, update, totalDuration)
func (m *metricExchange) Monitor(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case u, ok := <-m.Updates:
if !ok {
return
}
// we only grab a read lock to make sure Reset() isn't changing the numbers.
m.Mutex.RLock()

totalDuration := time.Since(u.Start)
wg := &sync.WaitGroup{}
for _, collector := range m.metricCollectors {
wg.Add(1)
go m.IncrementMetrics(wg, collector, u, totalDuration)
}
wg.Wait()
m.Mutex.RUnlock()
}
wg.Wait()

m.Mutex.RUnlock()
}
}

Expand Down
24 changes: 21 additions & 3 deletions hystrix/metrics_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
package hystrix

import (
"context"
"testing"
"time"

"go.uber.org/goleak"

. "github.com/smartystreets/goconvey/convey"
)

func metricFailingPercent(p int) *metricExchange {
m := newMetricExchange("")
return metricFailingPercentWithContext(context.Background(), p)
}

func metricFailingPercentWithContext(ctx context.Context, p int) *metricExchange {
m := newMetricExchange(ctx, "")
for i := 0; i < 100; i++ {
t := "success"
if i < p {
Expand All @@ -17,15 +24,18 @@ func metricFailingPercent(p int) *metricExchange {
m.Updates <- &commandExecution{Types: []string{t}}
}

// Updates needs to be flushed
// updates need to be flushed
time.Sleep(100 * time.Millisecond)

return m
}

func TestErrorPercent(t *testing.T) {
Convey("with a metric failing 40 percent of the time", t, func() {
m := metricFailingPercent(40)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

m := metricFailingPercentWithContext(ctx, 40)
now := time.Now()

Convey("ErrorPercent() should return 40", func() {
Expand All @@ -43,3 +53,11 @@ func TestErrorPercent(t *testing.T) {
})
})
}

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m,
goleak.IgnoreTopFunction("time.Sleep"), //tests that sleep in goroutines explicitly
goleak.IgnoreTopFunction("github.com/afex/hystrix-go/hystrix.TestReturnTicket.func1.1"), //explicit leak
goleak.IgnoreTopFunction("github.com/afex/hystrix-go/hystrix.TestReturnTicket_QuickCheck.func1.1"), //explicit leak
)
}
22 changes: 16 additions & 6 deletions hystrix/pool.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
package hystrix

import (
"context"
)

type executorPool struct {
Name string
Metrics *poolMetrics
Max int
Tickets chan *struct{}
}

func newExecutorPool(name string) *executorPool {
func newExecutorPool(ctx context.Context, name string) *executorPool {
p := &executorPool{}
p.Name = name
p.Metrics = newPoolMetrics(name)
p.Metrics = newPoolMetrics(ctx, name)
p.Max = getSettings(name).MaxConcurrentRequests

p.Tickets = make(chan *struct{}, p.Max)
Expand All @@ -21,15 +25,21 @@ func newExecutorPool(name string) *executorPool {
return p
}

func (p *executorPool) Return(ticket *struct{}) {
func (p *executorPool) Return(ctx context.Context, ticket *struct{}) {
if ticket == nil {
return
}

p.Metrics.Updates <- poolMetricsUpdate{
activeCount: p.ActiveCount(),
for {
select {
case <-ctx.Done():
return
default:
p.Metrics.Updates <- poolMetricsUpdate{activeCount: p.ActiveCount()}
p.Tickets <- ticket
return
}
}
p.Tickets <- ticket
}

func (p *executorPool) ActiveCount() int {
Expand Down
27 changes: 17 additions & 10 deletions hystrix/pool_metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package hystrix

import (
"context"
"sync"

"github.com/afex/hystrix-go/hystrix/rolling"
Expand All @@ -19,15 +20,15 @@ type poolMetricsUpdate struct {
activeCount int
}

func newPoolMetrics(name string) *poolMetrics {
func newPoolMetrics(ctx context.Context, name string) *poolMetrics {
m := &poolMetrics{}
m.Name = name
m.Updates = make(chan poolMetricsUpdate)
m.Mutex = &sync.RWMutex{}

m.Reset()

go m.Monitor()
go m.Monitor(ctx)

return m
}
Expand All @@ -40,13 +41,19 @@ func (m *poolMetrics) Reset() {
m.Executed = rolling.NewNumber()
}

func (m *poolMetrics) Monitor() {
for u := range m.Updates {
m.Mutex.RLock()

m.Executed.Increment(1)
m.MaxActiveRequests.UpdateMax(float64(u.activeCount))

m.Mutex.RUnlock()
func (m *poolMetrics) Monitor(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case u, ok := <-m.Updates:
if !ok {
return
}
m.Mutex.RLock()
m.Executed.Increment(1)
m.MaxActiveRequests.UpdateMax(float64(u.activeCount))
m.Mutex.RUnlock()
}
}
}
Loading