Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate v1 clients #602

Merged
merged 4 commits into from
Jan 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ examples:
$(MAKE) -C examples

deps-start:
docker-compose up -d
docker-compose up -d && sleep 10

deps-stop:
docker-compose down
Expand Down
180 changes: 74 additions & 106 deletions client/amqp/amqp.go
Original file line number Diff line number Diff line change
@@ -1,162 +1,121 @@
// Package amqp provides a client with included tracing capabilities.
//
// Deprecated: The AMQP client package is superseded by the `github.com/beatlabs/client/amqp/v2` package.
// Please refer to the documents and the examples for the usage.
//
// This package is frozen and no new functionality will be added.
package amqp

import (
"context"
"errors"
"fmt"
"net"
"strconv"
"time"

"github.com/beatlabs/patron/correlation"
"github.com/beatlabs/patron/encoding/json"
"github.com/beatlabs/patron/encoding/protobuf"
patronErrors "github.com/beatlabs/patron/errors"
patronerrors "github.com/beatlabs/patron/errors"
"github.com/beatlabs/patron/log"
"github.com/beatlabs/patron/trace"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/prometheus/client_golang/prometheus"
"github.com/streadway/amqp"
)

const (
publisherComponent = "amqp-publisher"
)

// Message abstraction for publishing.
type Message struct {
contentType string
body []byte
var publishDurationMetrics *prometheus.HistogramVec

func init() {
publishDurationMetrics = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "client",
Subsystem: "amqp",
Name: "publish_duration_seconds",
Help: "AMQP publish completed by the client.",
},
[]string{"exchange", "success"},
)
prometheus.MustRegister(publishDurationMetrics)
}

// NewMessage creates a new message.
func NewMessage(ct string, body []byte) *Message {
return &Message{contentType: ct, body: body}
// Publisher defines a RabbitMQ publisher with tracing instrumentation.
type Publisher struct {
cfg *amqp.Config
connection *amqp.Connection
channel *amqp.Channel
}

// NewJSONMessage creates a new message with a JSON encoded body.
func NewJSONMessage(d interface{}) (*Message, error) {
body, err := json.Encode(d)
if err != nil {
return nil, fmt.Errorf("failed to marshal to JSON: %w", err)
}
return &Message{contentType: json.Type, body: body}, nil
}

// NewProtobufMessage creates a new message with a protobuf encoded body.
func NewProtobufMessage(d interface{}) (*Message, error) {
body, err := protobuf.Encode(d)
if err != nil {
return nil, fmt.Errorf("failed to marshal to protobuf: %w", err)
}
return &Message{contentType: protobuf.Type, body: body}, nil
}

// Publisher interface of a RabbitMQ publisher.
type Publisher interface {
Publish(ctx context.Context, msg *Message) error
Close(ctx context.Context) error
}

var defaultCfg = amqp.Config{
Dial: func(network, addr string) (net.Conn, error) {
return net.DialTimeout(network, addr, 30*time.Second)
},
}

// TracedPublisher defines a RabbitMQ publisher with tracing instrumentation.
type TracedPublisher struct {
cfg amqp.Config
cn *amqp.Connection
ch *amqp.Channel
exc string
tag opentracing.Tag
}

// NewPublisher creates a new publisher with the following defaults
// - exchange type: fanout
// - notifications are not handled at this point TBD.
//
// Deprecated: The AMQP client package is superseded by the `github.com/beatlabs/client/amqp/v2` package.
// Please refer to the documents and the examples for the usage.
//
// This package is frozen and no new functionality will be added.
func NewPublisher(url, exc string, oo ...OptionFunc) (*TracedPublisher, error) {
// New constructor.
func New(url string, oo ...OptionFunc) (*Publisher, error) {
if url == "" {
return nil, errors.New("url is required")
}

if exc == "" {
return nil, errors.New("exchange is required")
}

p := TracedPublisher{
cfg: defaultCfg,
exc: exc,
tag: opentracing.Tag{Key: "exchange", Value: exc},
}
var err error
pub := &Publisher{}

for _, o := range oo {
err := o(&p)
for _, option := range oo {
err = option(pub)
if err != nil {
return nil, err
}
}

conn, err := amqp.DialConfig(url, p.cfg)
if err != nil {
return nil, fmt.Errorf("failed to open RabbitMq connection: %w", err)
}
p.cn = conn
var conn *amqp.Connection

ch, err := conn.Channel()
if pub.cfg == nil {
conn, err = amqp.Dial(url)
} else {
conn, err = amqp.DialConfig(url, *pub.cfg)
}
if err != nil {
return nil, fmt.Errorf("failed to open RabbitMq channel: %w", err)
return nil, fmt.Errorf("failed to open connection: %w", err)
}
p.ch = ch

err = ch.ExchangeDeclare(exc, amqp.ExchangeFanout, true, false, false, false, nil)
ch, err := conn.Channel()
if err != nil {
return nil, fmt.Errorf("failed to declare exchange: %w", err)
return nil, patronerrors.Aggregate(fmt.Errorf("failed to open channel: %w", err), conn.Close())
}

return &p, nil
pub.connection = conn
pub.channel = ch
return pub, nil
}

// Publish a message to an exchange.
func (tc *TracedPublisher) Publish(ctx context.Context, msg *Message) error {
sp, _ := trace.ChildSpan(ctx, trace.ComponentOpName(publisherComponent, tc.exc),
publisherComponent, ext.SpanKindProducer, tc.tag)

p := amqp.Publishing{
Headers: amqp.Table{},
ContentType: msg.contentType,
Body: msg.body,
}
func (tc *Publisher) Publish(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error {
sp := injectTraceHeaders(ctx, exchange, &msg)

c := amqpHeadersCarrier(p.Headers)
err := sp.Tracer().Inject(sp.Context(), opentracing.TextMap, c)
if err != nil {
return fmt.Errorf("failed to inject tracing headers: %w", err)
}
p.Headers[correlation.HeaderID] = correlation.IDFromContext(ctx)
start := time.Now()
err := tc.channel.Publish(exchange, key, mandatory, immediate, msg)

err = tc.ch.Publish(tc.exc, "", false, false, p)
trace.SpanComplete(sp, err)
observePublish(ctx, sp, start, exchange, err)
if err != nil {
return fmt.Errorf("failed to publish message: %w", err)
}

return nil
}

// Close the connection and channel of the publisher.
func (tc *TracedPublisher) Close(_ context.Context) error {
return patronErrors.Aggregate(tc.ch.Close(), tc.cn.Close())
func injectTraceHeaders(ctx context.Context, exchange string, msg *amqp.Publishing) opentracing.Span {
sp, _ := trace.ChildSpan(ctx, trace.ComponentOpName(publisherComponent, exchange),
publisherComponent, ext.SpanKindProducer, opentracing.Tag{Key: "exchange", Value: exchange})

if msg.Headers == nil {
msg.Headers = amqp.Table{}
}

c := amqpHeadersCarrier(msg.Headers)

if err := sp.Tracer().Inject(sp.Context(), opentracing.TextMap, c); err != nil {
log.FromContext(ctx).Errorf("failed to inject tracing headers: %v", err)
}
msg.Headers[correlation.HeaderID] = correlation.IDFromContext(ctx)
return sp
}

// Close the channel and connection.
func (tc *Publisher) Close() error {
return patronerrors.Aggregate(tc.channel.Close(), tc.connection.Close())
}

type amqpHeadersCarrier map[string]interface{}
Expand All @@ -165,3 +124,12 @@ type amqpHeadersCarrier map[string]interface{}
func (c amqpHeadersCarrier) Set(key, val string) {
c[key] = val
}

func observePublish(ctx context.Context, span opentracing.Span, start time.Time, exchange string, err error) {
trace.SpanComplete(span, err)

durationHistogram := trace.Histogram{
Observer: publishDurationMetrics.WithLabelValues(exchange, strconv.FormatBool(err == nil)),
}
durationHistogram.Observe(ctx, time.Since(start).Seconds())
}
70 changes: 28 additions & 42 deletions client/amqp/amqp_test.go
Original file line number Diff line number Diff line change
@@ -1,58 +1,34 @@
package amqp

import (
"context"
"testing"

"github.com/beatlabs/patron/examples"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/mocktracer"
"github.com/streadway/amqp"
"github.com/stretchr/testify/assert"
)

func TestNewMessage(t *testing.T) {
m := NewMessage("xxx", []byte("test"))
assert.Equal(t, "xxx", m.contentType)
assert.Equal(t, []byte("test"), m.body)
}

func TestNewJSONMessage(t *testing.T) {
m, err := NewJSONMessage("xxx")
assert.NoError(t, err)
assert.Equal(t, "application/json", m.contentType)
assert.Equal(t, []byte(`"xxx"`), m.body)
_, err = NewJSONMessage(make(chan bool))
assert.Error(t, err)
}

func TestNewProtobufMessage(t *testing.T) {
u := examples.User{
Firstname: "John",
Lastname: "Doe",
}
m, err := NewProtobufMessage(&u)
assert.NoError(t, err)
assert.Equal(t, "application/x-protobuf", m.contentType)
assert.Len(t, m.body, 11)
}

func TestNewPublisher(t *testing.T) {
func TestNew(t *testing.T) {
t.Parallel()
type args struct {
url string
exc string
opt OptionFunc
}
tests := []struct {
name string
args args
wantErr bool
tests := map[string]struct {
args args
expectedErr string
}{
{name: "fail, missing url", args: args{}, wantErr: true},
{name: "fail, missing exchange", args: args{url: "url"}, wantErr: true},
{name: "fail, missing exchange", args: args{url: "url", exc: "exc", opt: WithTimeout(0)}, wantErr: true},
"fail, missing url": {args: args{}, expectedErr: "url is required"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := NewPublisher(tt.args.url, tt.args.exc, tt.args.opt)
if tt.wantErr {
assert.Error(t, err)
for name, tt := range tests {
tt := tt
t.Run(name, func(t *testing.T) {
t.Parallel()

got, err := New(tt.args.url)
if tt.expectedErr != "" {
assert.EqualError(t, err, tt.expectedErr)
assert.Nil(t, got)
} else {
assert.NoError(t, err)
Expand All @@ -61,3 +37,13 @@ func TestNewPublisher(t *testing.T) {
})
}
}

func Test_injectTraceHeaders(t *testing.T) {
mtr := mocktracer.New()
opentracing.SetGlobalTracer(mtr)
t.Cleanup(func() { mtr.Reset() })
msg := amqp.Publishing{}
sp := injectTraceHeaders(context.Background(), "123", &msg)
assert.NotNil(t, sp)
assert.NotEmpty(t, msg.Headers)
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//go:build integration
// +build integration

package v2
package amqp

import (
"context"
Expand Down
21 changes: 5 additions & 16 deletions client/amqp/option.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,16 @@
package amqp

import (
"errors"
"net"
"time"

"github.com/streadway/amqp"
)

// OptionFunc definition for configuring the publisher in a functional way.
type OptionFunc func(*TracedPublisher) error
type OptionFunc func(*Publisher) error

// WithTimeout option for adjusting the timeout of the connection.
func WithTimeout(timeout time.Duration) OptionFunc {
return func(tp *TracedPublisher) error {
if timeout <= 0 {
return errors.New("timeout must be positive")
}
tp.cfg = amqp.Config{
Dial: func(network, addr string) (net.Conn, error) {
return net.DialTimeout(network, addr, timeout)
},
}
// WithConfig option for providing dial configuration.
func WithConfig(cfg amqp.Config) OptionFunc {
return func(p *Publisher) error {
p.cfg = &cfg
return nil
}
}
Loading