Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/client timeouts #248

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/dnstap/golang-dnstap v0.0.0-20170829151710-2cf77a2b5e11
github.com/golang/protobuf v1.3.2
github.com/google/uuid v1.1.1
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645
github.com/infobloxopen/go-trees v0.0.0-20190313150506-2af4e13f9062
github.com/miekg/dns v1.1.15
Expand Down
44 changes: 15 additions & 29 deletions go.sum

Large diffs are not rendered by default.

40 changes: 25 additions & 15 deletions pep/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

ot "github.com/opentracing/opentracing-go"
"google.golang.org/grpc"
)

var (
Expand Down Expand Up @@ -83,6 +84,14 @@ type Option func(*options)

const virtualServerAddress = "pdp"

// WithClientUnaryInterceptors returns an Options which appends to
// the client unary interceptors of the underlying connection to pdp
func WithClientUnaryInterceptors(interceptors ...grpc.UnaryClientInterceptor) Option {
return func(o *options) {
o.clientUnaryInterceptors = append([]grpc.UnaryClientInterceptor{}, interceptors...)
}
}

// WithRoundRobinBalancer returns an Option which sets round-robin balancer with given set of servers.
func WithRoundRobinBalancer(addresses ...string) Option {
return func(o *options) {
Expand Down Expand Up @@ -209,26 +218,27 @@ const (
)

type options struct {
addresses []string
balancer int
tracer ot.Tracer
maxStreams int
ctx context.Context
connTimeout time.Duration
connStateCb ConnectionStateNotificationCallback
autoRequestSize bool
maxRequestSize uint32
noPool bool
cache bool
cacheTTL time.Duration
cacheMaxSize int
onCacheHitHandler OnCacheHitHandler
addresses []string
balancer int
tracer ot.Tracer
maxStreams int
ctx context.Context
connStateCb ConnectionStateNotificationCallback
autoRequestSize bool
maxRequestSize uint32
noPool bool
cache bool
cacheTTL time.Duration
cacheMaxSize int
// ignored by Unary client
connTimeout time.Duration
onCacheHitHandler OnCacheHitHandler
clientUnaryInterceptors []grpc.UnaryClientInterceptor
}

// NewClient creates client instance using given options.
func NewClient(opts ...Option) Client {
o := options{
connTimeout: -1,
maxRequestSize: 10240,
}
for _, opt := range opts {
Expand Down
62 changes: 40 additions & 22 deletions pep/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,24 @@ import (
"time"

ot "github.com/opentracing/opentracing-go"
"google.golang.org/grpc"
)

func TestNewClient(t *testing.T) {
c := NewClient()
if _, ok := c.(*unaryClient); !ok {
t.Errorf("Expected *unaryClient from NewClient got %#v", c)
if _, ok := c.(*UnaryClient); !ok {
t.Errorf("Expected *UnaryClient from NewClient got %#v", c)
}
}

func TestNewBalancedClient(t *testing.T) {
c := NewClient(WithRoundRobinBalancer("127.0.0.1:1000", "127.0.0.1:1001"))
if uc, ok := c.(*unaryClient); ok {
if uc, ok := c.(*UnaryClient); ok {
if len(uc.opts.addresses) <= 0 {
t.Errorf("Expected balancer to be set but got nothing")
}
} else {
t.Errorf("Expected *unaryClient from NewClient got %#v", c)
t.Errorf("Expected *UnaryClient from NewClient got %#v", c)
}

c = NewClient(WithHotSpotBalancer("127.0.0.1:1000", "127.0.0.1:1001"), WithStreams(5))
Expand All @@ -49,21 +50,38 @@ func TestNewStreamingClient(t *testing.T) {
func TestNewClientWithTracer(t *testing.T) {
tr := &ot.NoopTracer{}
c := NewClient(WithTracer(tr))
uc, ok := c.(*unaryClient)
uc, ok := c.(*UnaryClient)
if !ok {
t.Fatalf("Expected *unaryClient from NewClient got %#v", c)
t.Fatalf("Expected *UnaryClient from NewClient got %#v", c)
}

if uc.opts.tracer != tr {
t.Errorf("Expected NoopTracer as client option but got %v", uc.opts.tracer)
}
}

var noOpClientInterceptor grpc.UnaryClientInterceptor = func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
return nil
}

func TestNewClientWithInterceptor(t *testing.T) {
ci := noOpClientInterceptor
c := NewClient(WithClientUnaryInterceptors(ci))
uc, ok := c.(*UnaryClient)
if !ok {
t.Fatalf("Expected *UnaryClient from NewClient got %#v", c)
}

if len(uc.opts.clientUnaryInterceptors) == 0 {
t.Errorf("Expected noOpClientInterceptor as client option but got %v", uc.opts.clientUnaryInterceptors)
}
}

func TestNewClientWithAutoRequestSize(t *testing.T) {
c := NewClient(WithAutoRequestSize(true))
uc, ok := c.(*unaryClient)
uc, ok := c.(*UnaryClient)
if !ok {
t.Fatalf("Expected *unaryClient from NewClient got %#v", c)
t.Fatalf("Expected *UnaryClient from NewClient got %#v", c)
}

if !uc.opts.autoRequestSize {
Expand All @@ -73,9 +91,9 @@ func TestNewClientWithAutoRequestSize(t *testing.T) {

func TestNewClientWithMaxRequestSize(t *testing.T) {
c := NewClient(WithMaxRequestSize(1024))
uc, ok := c.(*unaryClient)
uc, ok := c.(*UnaryClient)
if !ok {
t.Fatalf("Expected *unaryClient from NewClient got %#v", c)
t.Fatalf("Expected *UnaryClient from NewClient got %#v", c)
}

if uc.opts.maxRequestSize != 1024 {
Expand All @@ -85,9 +103,9 @@ func TestNewClientWithMaxRequestSize(t *testing.T) {

func TestNewClientWithNoRequestBufferPool(t *testing.T) {
c := NewClient(WithNoRequestBufferPool())
uc, ok := c.(*unaryClient)
uc, ok := c.(*UnaryClient)
if !ok {
t.Fatalf("Expected *unaryClient from NewClient got %#v", c)
t.Fatalf("Expected *UnaryClient from NewClient got %#v", c)
}

if uc.pool.b != nil {
Expand All @@ -97,9 +115,9 @@ func TestNewClientWithNoRequestBufferPool(t *testing.T) {

func TestNewClientWithCacheTTL(t *testing.T) {
c := NewClient(WithCacheTTL(5 * time.Second))
uc, ok := c.(*unaryClient)
uc, ok := c.(*UnaryClient)
if !ok {
t.Fatalf("Expected *unaryClient from NewClient got %#v", c)
t.Fatalf("Expected *UnaryClient from NewClient got %#v", c)
}

if !uc.opts.cache || uc.opts.cacheTTL != 5*time.Second {
Expand All @@ -109,9 +127,9 @@ func TestNewClientWithCacheTTL(t *testing.T) {

func TestNewClientWithCacheTTLAndMaxSize(t *testing.T) {
c := NewClient(WithCacheTTLAndMaxSize(5*time.Second, 1024))
uc, ok := c.(*unaryClient)
uc, ok := c.(*UnaryClient)
if !ok {
t.Fatalf("Expected *unaryClient from NewClient got %#v", c)
t.Fatalf("Expected *UnaryClient from NewClient got %#v", c)
}

if !uc.opts.cache || uc.opts.cacheTTL != 5*time.Second || uc.opts.cacheMaxSize != 1024 {
Expand All @@ -122,18 +140,18 @@ func TestNewClientWithCacheTTLAndMaxSize(t *testing.T) {

func TestNewClientWithContext(t *testing.T) {
c := NewClient()
uc, ok := c.(*unaryClient)
uc, ok := c.(*UnaryClient)
if !ok {
t.Fatalf("Expected *unaryClient from NewClient got %#v", c)
t.Fatalf("Expected *UnaryClient from NewClient got %#v", c)
}
if uc.opts.ctx != nil {
t.Errorf("Expected default client to have nil context")
}

c = NewClient(WithContext(nil))
uc, ok = c.(*unaryClient)
uc, ok = c.(*UnaryClient)
if !ok {
t.Fatalf("Expected *unaryClient from NewClient got %#v", c)
t.Fatalf("Expected *UnaryClient from NewClient got %#v", c)
}
if uc.opts.ctx != nil {
t.Errorf("Expected nil context to default to nil context")
Expand All @@ -142,9 +160,9 @@ func TestNewClientWithContext(t *testing.T) {
toCtx, toCancelFn := context.WithTimeout(context.Background(), 1*time.Second)
defer toCancelFn()
c = NewClient(WithContext(toCtx))
uc, ok = c.(*unaryClient)
uc, ok = c.(*UnaryClient)
if !ok {
t.Fatalf("Expected *unaryClient from NewClient got %#v", c)
t.Fatalf("Expected *UnaryClient from NewClient got %#v", c)
}
if uc.opts.ctx != toCtx {
t.Errorf("Expected timeout context")
Expand Down
62 changes: 33 additions & 29 deletions pep/unary_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ import (
"sync"

"github.com/allegro/bigcache"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
ot "github.com/opentracing/opentracing-go"
"google.golang.org/grpc"

pb "github.com/infobloxopen/themis/pdp-service"
)

type unaryClient struct {
type UnaryClient struct {
lock *sync.RWMutex
conn *grpc.ClientConn
client *pb.PDPClient
Expand All @@ -25,8 +26,8 @@ type unaryClient struct {
opts options
}

func newUnaryClient(opts options) *unaryClient {
c := &unaryClient{
func newUnaryClient(opts options) *UnaryClient {
c := &UnaryClient{
lock: &sync.RWMutex{},
opts: opts,
}
Expand All @@ -38,7 +39,7 @@ func newUnaryClient(opts options) *unaryClient {
return c
}

func (c *unaryClient) Connect(addr string) error {
func (c *UnaryClient) Connect(addr string) error {
c.lock.Lock()
defer c.lock.Unlock()

Expand All @@ -64,20 +65,26 @@ func (c *unaryClient) Connect(addr string) error {
}
}

var interceptors []grpc.UnaryClientInterceptor

if c.opts.tracer != nil {
opts = append(opts,
grpc.WithUnaryInterceptor(
otgrpc.OpenTracingClientInterceptor(
c.opts.tracer,
otgrpc.IncludingSpans(
func(parentSpanCtx ot.SpanContext, method string, req, resp interface{}) bool {
return parentSpanCtx != nil
},
),
interceptors = append(interceptors,
otgrpc.OpenTracingClientInterceptor(
c.opts.tracer,
otgrpc.IncludingSpans(
func(parentSpanCtx ot.SpanContext, method string, req, resp interface{}) bool {
return parentSpanCtx != nil
},
),
),
)
))
}

if c.opts.clientUnaryInterceptors != nil {
interceptors = append(interceptors, c.opts.clientUnaryInterceptors...)
}
opts = append(opts, grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(interceptors...),
))

cache, err := newCacheFromOptions(c.opts)
if err != nil {
Expand All @@ -89,13 +96,6 @@ func (c *unaryClient) Connect(addr string) error {
ctx = context.Background()
}

if c.opts.connTimeout > 0 {
var cancelFn context.CancelFunc
ctx, cancelFn = context.WithTimeout(ctx, c.opts.connTimeout)
defer cancelFn()
}

opts = append(opts, grpc.WithBlock())
conn, err := grpc.DialContext(ctx, addr, opts...)
if err != nil {
return err
Expand All @@ -110,7 +110,7 @@ func (c *unaryClient) Connect(addr string) error {
return nil
}

func (c *unaryClient) Close() {
func (c *UnaryClient) Close() {
c.lock.Lock()
defer c.lock.Unlock()

Expand All @@ -127,7 +127,16 @@ func (c *unaryClient) Close() {
c.client = nil
}

func (c *unaryClient) Validate(in, out interface{}) error {
func (c *UnaryClient) ValidateContext(ctx context.Context, in, out interface{}) error {
return c.validate(ctx, in, out)
}

// Validate is deprecated, use ValidateContext
func (c *UnaryClient) Validate(in, out interface{}) error {
return c.validate(context.Background(), in, out)
}

func (c *UnaryClient) validate(ctx context.Context, in, out interface{}) error {
c.lock.RLock()
uc := c.client
c.lock.RUnlock()
Expand Down Expand Up @@ -174,11 +183,6 @@ func (c *unaryClient) Validate(in, out interface{}) error {
}
}

ctx := c.opts.ctx
if ctx == nil {
ctx = context.Background()
}

if c.opts.connTimeout > 0 {
var cancelFn context.CancelFunc
ctx, cancelFn = context.WithTimeout(ctx, c.opts.connTimeout)
Expand Down
Loading