Skip to content

Commit

Permalink
Merge branch 'iwanbk-add-get-timeout', fixes #22
Browse files Browse the repository at this point in the history
  • Loading branch information
mna committed Feb 1, 2019
2 parents 703f85d + 9953fa3 commit ceb9ca8
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 2 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ Package redisc implements a redis cluster client built on top of the [redigo pac

## Releases

* **v1.1.5** : Add `Cluster.PoolWaitTime` to configure the time to wait on a connection from a pool with `MaxActive` > 0 and `Wait` set to true (thanks to [@iwanbk][iwanbk]).

* **v1.1.4** : Add `Conn.DoWithTimeout` and `Conn.ReceiveWithTimeout` to match redigo's `ConnWithTimeout` interface (thanks to [@letsfire][letsfire]).

* **v1.1.3** : Fix handling of `ASK` replies in `RetryConn`.
Expand Down Expand Up @@ -72,3 +74,4 @@ The [BSD 3-Clause license][bsd].
[radix1]: https://github.com/fzzy/radix
[radix2]: https://github.com/mediocregopher/radix.v2
[letsfire]: https://github.com/letsfire
[iwanbk]: https://github.com/iwanbk
11 changes: 9 additions & 2 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ type Cluster struct {
// pool is used to manage the connections returned by Get.
CreatePool func(address string, options ...redis.DialOption) (*redis.Pool, error)

// PoolWaitTime is the time to wait when getting a connection from
// a pool configured with MaxActive > 0 and Wait set to true, and
// MaxActive connections are already in use.
//
// If <= 0 (or with Go < 1.7), there is no wait timeout, it will wait
// indefinitely if Pool.Wait is true.
PoolWaitTime time.Duration

mu sync.RWMutex // protects following fields
err error // broken connection error
pools map[string]*redis.Pool // created pools per node
Expand Down Expand Up @@ -235,8 +243,7 @@ func (c *Cluster) getConnForAddr(addr string, forceDial bool) (redis.Conn, error
}
c.mu.Unlock()

conn := p.Get()
return conn, conn.Err()
return c.getFromPool(p)
}

var errNoNodeForSlot = errors.New("redisc: no node for slot")
Expand Down
15 changes: 15 additions & 0 deletions cluster_go16.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// +build !go1.7

package redisc

import (
"github.com/gomodule/redigo/redis"
)

// get connection from the pool
// pre go1.7, Pool has no GetContext method, so it always
// calls Get.
func (c *Cluster) getFromPool(p *redis.Pool) (redis.Conn, error) {
conn := p.Get()
return conn, conn.Err()
}
37 changes: 37 additions & 0 deletions cluster_go16_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// +build !go1.7

package redisc

import (
"testing"

"github.com/gomodule/redigo/redis"
"github.com/mna/redisc/redistest"
"github.com/stretchr/testify/assert"
)

// TestGetPool
func TestGetPool(t *testing.T) {
s := redistest.StartMockServer(t, func(cmd string, args ...string) interface{} {
return nil
})
defer s.Close()

p := &redis.Pool{
MaxActive: 1,
Dial: func() (redis.Conn, error) {
return redis.Dial("tcp", s.Addr)
},
}
c := Cluster{}

// fist connection is OK
conn, err := c.getFromPool(p)
if assert.NoError(t, err) {
defer conn.Close()
}

// second connection should be failed because we only have 1 MaxActive
_, err = c.getFromPool(p)
assert.Error(t, err)
}
23 changes: 23 additions & 0 deletions cluster_go17.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// +build go1.7

package redisc

import (
"context"

"github.com/gomodule/redigo/redis"
)

// get connection from the pool.
// use GetContext if PoolWaitTime > 0
func (c *Cluster) getFromPool(p *redis.Pool) (redis.Conn, error) {
if c.PoolWaitTime <= 0 {
conn := p.Get()
return conn, conn.Err()
}

ctx, cancel := context.WithTimeout(context.Background(), c.PoolWaitTime)
defer cancel()

return p.GetContext(ctx)
}
94 changes: 94 additions & 0 deletions cluster_go17_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// +build go1.7

package redisc

import (
"context"
"testing"
"time"

"github.com/gomodule/redigo/redis"
"github.com/mna/redisc/redistest"
"github.com/stretchr/testify/assert"
)

// TestGetPoolTimedOut test case where we can't get the connection because the pool
// is full
func TestGetPoolTimedOut(t *testing.T) {
s := redistest.StartMockServer(t, func(cmd string, args ...string) interface{} {
return nil
})
defer s.Close()

p := &redis.Pool{
MaxActive: 1,
Dial: func() (redis.Conn, error) {
return redis.Dial("tcp", s.Addr)
},
Wait: true,
}
c := Cluster{
PoolWaitTime: 100 * time.Millisecond,
}
conn, err := c.getFromPool(p)
if assert.NoError(t, err) {
defer conn.Close()
}

// second connection should be failed because we only have 1 MaxActive
start := time.Now()
_, err = c.getFromPool(p)
if assert.Error(t, err) {
assert.Equal(t, context.DeadlineExceeded, err)
assert.True(t, time.Since(start) >= 100*time.Millisecond)
}
}

// TestGetPoolWaitOnFull test that we could get the connection when the pool
// is full and we can wait for it
func TestGetPoolWaitOnFull(t *testing.T) {
s := redistest.StartMockServer(t, func(cmd string, args ...string) interface{} {
return nil
})
defer s.Close()

var (
usageTime = 100 * time.Millisecond // how long the connection will be used
waitTime = 3 * usageTime // how long we want to wait
)

p := &redis.Pool{
MaxActive: 1,
Dial: func() (redis.Conn, error) {
return redis.Dial("tcp", s.Addr)
},
Wait: true,
}
c := Cluster{
PoolWaitTime: waitTime,
}

// first connection OK
conn, err := c.getFromPool(p)
assert.NoError(t, err)

// second connection should be failed because we only have 1 MaxActive
start := time.Now()
_, err = c.getFromPool(p)
if assert.Error(t, err) {
assert.Equal(t, context.DeadlineExceeded, err)
assert.True(t, time.Since(start) >= waitTime)
}

go func() {
time.Sleep(usageTime) // sleep before close, to simulate waiting for connection
conn.Close()
}()

start = time.Now()
conn2, err := c.getFromPool(p)
if assert.NoError(t, err) {
assert.True(t, time.Since(start) >= usageTime)
}
conn2.Close()
}

0 comments on commit ceb9ca8

Please sign in to comment.