From 7882e304dd11c0e5d4409944618bbba7a13f4ae1 Mon Sep 17 00:00:00 2001 From: Sotirios Mantziaris Date: Tue, 3 Jan 2023 16:20:24 +0200 Subject: [PATCH 1/4] Deprecate v1 clients --- client/amqp/amqp.go | 180 ++++++++++------------- client/amqp/amqp_test.go | 70 ++++----- client/amqp/{v2 => }/integration_test.go | 2 +- client/amqp/option.go | 21 +-- client/amqp/option_test.go | 29 +--- client/amqp/v2/amqp.go | 135 ----------------- client/amqp/v2/amqp_test.go | 49 ------ client/amqp/v2/option.go | 16 -- client/amqp/v2/option_test.go | 18 --- component/amqp/integration_test.go | 4 +- examples/kafka-legacy/main.go | 2 +- examples/kafka/main.go | 2 +- 12 files changed, 119 insertions(+), 409 deletions(-) rename client/amqp/{v2 => }/integration_test.go (99%) delete mode 100644 client/amqp/v2/amqp.go delete mode 100644 client/amqp/v2/amqp_test.go delete mode 100644 client/amqp/v2/option.go delete mode 100644 client/amqp/v2/option_test.go diff --git a/client/amqp/amqp.go b/client/amqp/amqp.go index a18c97aef..c06688052 100644 --- a/client/amqp/amqp.go +++ b/client/amqp/amqp.go @@ -1,25 +1,20 @@ // Package amqp provides a client with included tracing capabilities. -// -// Deprecated: The AMQP client package is superseded by the `github.com/beatlabs/client/amqp/v2` package. -// Please refer to the documents and the examples for the usage. -// -// This package is frozen and no new functionality will be added. package amqp import ( "context" "errors" "fmt" - "net" + "strconv" "time" "github.com/beatlabs/patron/correlation" - "github.com/beatlabs/patron/encoding/json" - "github.com/beatlabs/patron/encoding/protobuf" - patronErrors "github.com/beatlabs/patron/errors" + patronerrors "github.com/beatlabs/patron/errors" + "github.com/beatlabs/patron/log" "github.com/beatlabs/patron/trace" "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/ext" + "github.com/prometheus/client_golang/prometheus" "github.com/streadway/amqp" ) @@ -27,126 +22,73 @@ const ( publisherComponent = "amqp-publisher" ) -// Message abstraction for publishing. -type Message struct { - contentType string - body []byte +var publishDurationMetrics *prometheus.HistogramVec + +func init() { + publishDurationMetrics = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "client", + Subsystem: "amqp", + Name: "publish_duration_seconds", + Help: "AMQP publish completed by the client.", + }, + []string{"exchange", "success"}, + ) + prometheus.MustRegister(publishDurationMetrics) } -// NewMessage creates a new message. -func NewMessage(ct string, body []byte) *Message { - return &Message{contentType: ct, body: body} +// Publisher defines a RabbitMQ publisher with tracing instrumentation. +type Publisher struct { + cfg *amqp.Config + connection *amqp.Connection + channel *amqp.Channel } -// NewJSONMessage creates a new message with a JSON encoded body. -func NewJSONMessage(d interface{}) (*Message, error) { - body, err := json.Encode(d) - if err != nil { - return nil, fmt.Errorf("failed to marshal to JSON: %w", err) - } - return &Message{contentType: json.Type, body: body}, nil -} - -// NewProtobufMessage creates a new message with a protobuf encoded body. -func NewProtobufMessage(d interface{}) (*Message, error) { - body, err := protobuf.Encode(d) - if err != nil { - return nil, fmt.Errorf("failed to marshal to protobuf: %w", err) - } - return &Message{contentType: protobuf.Type, body: body}, nil -} - -// Publisher interface of a RabbitMQ publisher. -type Publisher interface { - Publish(ctx context.Context, msg *Message) error - Close(ctx context.Context) error -} - -var defaultCfg = amqp.Config{ - Dial: func(network, addr string) (net.Conn, error) { - return net.DialTimeout(network, addr, 30*time.Second) - }, -} - -// TracedPublisher defines a RabbitMQ publisher with tracing instrumentation. -type TracedPublisher struct { - cfg amqp.Config - cn *amqp.Connection - ch *amqp.Channel - exc string - tag opentracing.Tag -} - -// NewPublisher creates a new publisher with the following defaults -// - exchange type: fanout -// - notifications are not handled at this point TBD. -// -// Deprecated: The AMQP client package is superseded by the `github.com/beatlabs/client/amqp/v2` package. -// Please refer to the documents and the examples for the usage. -// -// This package is frozen and no new functionality will be added. -func NewPublisher(url, exc string, oo ...OptionFunc) (*TracedPublisher, error) { +// New constructor. +func New(url string, oo ...OptionFunc) (*Publisher, error) { if url == "" { return nil, errors.New("url is required") } - if exc == "" { - return nil, errors.New("exchange is required") - } - - p := TracedPublisher{ - cfg: defaultCfg, - exc: exc, - tag: opentracing.Tag{Key: "exchange", Value: exc}, - } + var err error + pub := &Publisher{} - for _, o := range oo { - err := o(&p) + for _, option := range oo { + err = option(pub) if err != nil { return nil, err } } - conn, err := amqp.DialConfig(url, p.cfg) - if err != nil { - return nil, fmt.Errorf("failed to open RabbitMq connection: %w", err) - } - p.cn = conn + var conn *amqp.Connection - ch, err := conn.Channel() + if pub.cfg == nil { + conn, err = amqp.Dial(url) + } else { + conn, err = amqp.DialConfig(url, *pub.cfg) + } if err != nil { - return nil, fmt.Errorf("failed to open RabbitMq channel: %w", err) + return nil, fmt.Errorf("failed to open connection: %w", err) } - p.ch = ch - err = ch.ExchangeDeclare(exc, amqp.ExchangeFanout, true, false, false, false, nil) + ch, err := conn.Channel() if err != nil { - return nil, fmt.Errorf("failed to declare exchange: %w", err) + return nil, patronerrors.Aggregate(fmt.Errorf("failed to open channel: %w", err), conn.Close()) } - return &p, nil + pub.connection = conn + pub.channel = ch + return pub, nil } // Publish a message to an exchange. -func (tc *TracedPublisher) Publish(ctx context.Context, msg *Message) error { - sp, _ := trace.ChildSpan(ctx, trace.ComponentOpName(publisherComponent, tc.exc), - publisherComponent, ext.SpanKindProducer, tc.tag) - - p := amqp.Publishing{ - Headers: amqp.Table{}, - ContentType: msg.contentType, - Body: msg.body, - } +func (tc *Publisher) Publish(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error { + sp := injectTraceHeaders(ctx, exchange, &msg) - c := amqpHeadersCarrier(p.Headers) - err := sp.Tracer().Inject(sp.Context(), opentracing.TextMap, c) - if err != nil { - return fmt.Errorf("failed to inject tracing headers: %w", err) - } - p.Headers[correlation.HeaderID] = correlation.IDFromContext(ctx) + start := time.Now() + err := tc.channel.Publish(exchange, key, mandatory, immediate, msg) - err = tc.ch.Publish(tc.exc, "", false, false, p) - trace.SpanComplete(sp, err) + observePublish(ctx, sp, start, exchange, err) if err != nil { return fmt.Errorf("failed to publish message: %w", err) } @@ -154,9 +96,26 @@ func (tc *TracedPublisher) Publish(ctx context.Context, msg *Message) error { return nil } -// Close the connection and channel of the publisher. -func (tc *TracedPublisher) Close(_ context.Context) error { - return patronErrors.Aggregate(tc.ch.Close(), tc.cn.Close()) +func injectTraceHeaders(ctx context.Context, exchange string, msg *amqp.Publishing) opentracing.Span { + sp, _ := trace.ChildSpan(ctx, trace.ComponentOpName(publisherComponent, exchange), + publisherComponent, ext.SpanKindProducer, opentracing.Tag{Key: "exchange", Value: exchange}) + + if msg.Headers == nil { + msg.Headers = amqp.Table{} + } + + c := amqpHeadersCarrier(msg.Headers) + + if err := sp.Tracer().Inject(sp.Context(), opentracing.TextMap, c); err != nil { + log.FromContext(ctx).Errorf("failed to inject tracing headers: %v", err) + } + msg.Headers[correlation.HeaderID] = correlation.IDFromContext(ctx) + return sp +} + +// Close the channel and connection. +func (tc *Publisher) Close() error { + return patronerrors.Aggregate(tc.channel.Close(), tc.connection.Close()) } type amqpHeadersCarrier map[string]interface{} @@ -165,3 +124,12 @@ type amqpHeadersCarrier map[string]interface{} func (c amqpHeadersCarrier) Set(key, val string) { c[key] = val } + +func observePublish(ctx context.Context, span opentracing.Span, start time.Time, exchange string, err error) { + trace.SpanComplete(span, err) + + durationHistogram := trace.Histogram{ + Observer: publishDurationMetrics.WithLabelValues(exchange, strconv.FormatBool(err == nil)), + } + durationHistogram.Observe(ctx, time.Since(start).Seconds()) +} diff --git a/client/amqp/amqp_test.go b/client/amqp/amqp_test.go index 3b3ea5a90..3a9dfb598 100644 --- a/client/amqp/amqp_test.go +++ b/client/amqp/amqp_test.go @@ -1,58 +1,34 @@ package amqp import ( + "context" "testing" - "github.com/beatlabs/patron/examples" + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/mocktracer" + "github.com/streadway/amqp" "github.com/stretchr/testify/assert" ) -func TestNewMessage(t *testing.T) { - m := NewMessage("xxx", []byte("test")) - assert.Equal(t, "xxx", m.contentType) - assert.Equal(t, []byte("test"), m.body) -} - -func TestNewJSONMessage(t *testing.T) { - m, err := NewJSONMessage("xxx") - assert.NoError(t, err) - assert.Equal(t, "application/json", m.contentType) - assert.Equal(t, []byte(`"xxx"`), m.body) - _, err = NewJSONMessage(make(chan bool)) - assert.Error(t, err) -} - -func TestNewProtobufMessage(t *testing.T) { - u := examples.User{ - Firstname: "John", - Lastname: "Doe", - } - m, err := NewProtobufMessage(&u) - assert.NoError(t, err) - assert.Equal(t, "application/x-protobuf", m.contentType) - assert.Len(t, m.body, 11) -} - -func TestNewPublisher(t *testing.T) { +func TestNew(t *testing.T) { + t.Parallel() type args struct { url string - exc string - opt OptionFunc } - tests := []struct { - name string - args args - wantErr bool + tests := map[string]struct { + args args + expectedErr string }{ - {name: "fail, missing url", args: args{}, wantErr: true}, - {name: "fail, missing exchange", args: args{url: "url"}, wantErr: true}, - {name: "fail, missing exchange", args: args{url: "url", exc: "exc", opt: WithTimeout(0)}, wantErr: true}, + "fail, missing url": {args: args{}, expectedErr: "url is required"}, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := NewPublisher(tt.args.url, tt.args.exc, tt.args.opt) - if tt.wantErr { - assert.Error(t, err) + for name, tt := range tests { + tt := tt + t.Run(name, func(t *testing.T) { + t.Parallel() + + got, err := New(tt.args.url) + if tt.expectedErr != "" { + assert.EqualError(t, err, tt.expectedErr) assert.Nil(t, got) } else { assert.NoError(t, err) @@ -61,3 +37,13 @@ func TestNewPublisher(t *testing.T) { }) } } + +func Test_injectTraceHeaders(t *testing.T) { + mtr := mocktracer.New() + opentracing.SetGlobalTracer(mtr) + t.Cleanup(func() { mtr.Reset() }) + msg := amqp.Publishing{} + sp := injectTraceHeaders(context.Background(), "123", &msg) + assert.NotNil(t, sp) + assert.NotEmpty(t, msg.Headers) +} diff --git a/client/amqp/v2/integration_test.go b/client/amqp/integration_test.go similarity index 99% rename from client/amqp/v2/integration_test.go rename to client/amqp/integration_test.go index efc309faf..9957b233f 100644 --- a/client/amqp/v2/integration_test.go +++ b/client/amqp/integration_test.go @@ -1,7 +1,7 @@ //go:build integration // +build integration -package v2 +package amqp import ( "context" diff --git a/client/amqp/option.go b/client/amqp/option.go index 5c29a92d2..878f907b3 100644 --- a/client/amqp/option.go +++ b/client/amqp/option.go @@ -1,27 +1,16 @@ package amqp import ( - "errors" - "net" - "time" - "github.com/streadway/amqp" ) // OptionFunc definition for configuring the publisher in a functional way. -type OptionFunc func(*TracedPublisher) error +type OptionFunc func(*Publisher) error -// WithTimeout option for adjusting the timeout of the connection. -func WithTimeout(timeout time.Duration) OptionFunc { - return func(tp *TracedPublisher) error { - if timeout <= 0 { - return errors.New("timeout must be positive") - } - tp.cfg = amqp.Config{ - Dial: func(network, addr string) (net.Conn, error) { - return net.DialTimeout(network, addr, timeout) - }, - } +// WithConfig option for providing dial configuration. +func WithConfig(cfg amqp.Config) OptionFunc { + return func(p *Publisher) error { + p.cfg = &cfg return nil } } diff --git a/client/amqp/option_test.go b/client/amqp/option_test.go index 79b1a4db4..43f15428c 100644 --- a/client/amqp/option_test.go +++ b/client/amqp/option_test.go @@ -2,32 +2,17 @@ package amqp import ( "testing" - "time" + "github.com/streadway/amqp" "github.com/stretchr/testify/assert" ) func TestTimeout(t *testing.T) { - type args struct { - timeout time.Duration - } - tests := []struct { - name string - args args - wantErr bool - }{ - {name: "success", args: args{timeout: time.Second}, wantErr: false}, - {name: "failure, invalid timeout", args: args{timeout: 0 * time.Second}, wantErr: true}, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - p := TracedPublisher{cfg: defaultCfg} - err := WithTimeout(tt.args.timeout)(&p) - if tt.wantErr { - assert.Error(t, err) - } else { - assert.NoError(t, err) - } - }) + cfg := amqp.Config{ + Locale: "123", } + + p := Publisher{} + assert.NoError(t, WithConfig(cfg)(&p)) + assert.Equal(t, cfg, *p.cfg) } diff --git a/client/amqp/v2/amqp.go b/client/amqp/v2/amqp.go deleted file mode 100644 index a6ef7a316..000000000 --- a/client/amqp/v2/amqp.go +++ /dev/null @@ -1,135 +0,0 @@ -// Package v2 provides a client with included tracing capabilities. -package v2 - -import ( - "context" - "errors" - "fmt" - "strconv" - "time" - - "github.com/beatlabs/patron/correlation" - patronerrors "github.com/beatlabs/patron/errors" - "github.com/beatlabs/patron/log" - "github.com/beatlabs/patron/trace" - "github.com/opentracing/opentracing-go" - "github.com/opentracing/opentracing-go/ext" - "github.com/prometheus/client_golang/prometheus" - "github.com/streadway/amqp" -) - -const ( - publisherComponent = "amqp-publisher" -) - -var publishDurationMetrics *prometheus.HistogramVec - -func init() { - publishDurationMetrics = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: "client", - Subsystem: "amqp", - Name: "publish_duration_seconds", - Help: "AMQP publish completed by the client.", - }, - []string{"exchange", "success"}, - ) - prometheus.MustRegister(publishDurationMetrics) -} - -// Publisher defines a RabbitMQ publisher with tracing instrumentation. -type Publisher struct { - cfg *amqp.Config - connection *amqp.Connection - channel *amqp.Channel -} - -// New constructor. -func New(url string, oo ...OptionFunc) (*Publisher, error) { - if url == "" { - return nil, errors.New("url is required") - } - - var err error - pub := &Publisher{} - - for _, option := range oo { - err = option(pub) - if err != nil { - return nil, err - } - } - - var conn *amqp.Connection - - if pub.cfg == nil { - conn, err = amqp.Dial(url) - } else { - conn, err = amqp.DialConfig(url, *pub.cfg) - } - if err != nil { - return nil, fmt.Errorf("failed to open connection: %w", err) - } - - ch, err := conn.Channel() - if err != nil { - return nil, patronerrors.Aggregate(fmt.Errorf("failed to open channel: %w", err), conn.Close()) - } - - pub.connection = conn - pub.channel = ch - return pub, nil -} - -// Publish a message to an exchange. -func (tc *Publisher) Publish(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error { - sp := injectTraceHeaders(ctx, exchange, &msg) - - start := time.Now() - err := tc.channel.Publish(exchange, key, mandatory, immediate, msg) - - observePublish(ctx, sp, start, exchange, err) - if err != nil { - return fmt.Errorf("failed to publish message: %w", err) - } - - return nil -} - -func injectTraceHeaders(ctx context.Context, exchange string, msg *amqp.Publishing) opentracing.Span { - sp, _ := trace.ChildSpan(ctx, trace.ComponentOpName(publisherComponent, exchange), - publisherComponent, ext.SpanKindProducer, opentracing.Tag{Key: "exchange", Value: exchange}) - - if msg.Headers == nil { - msg.Headers = amqp.Table{} - } - - c := amqpHeadersCarrier(msg.Headers) - - if err := sp.Tracer().Inject(sp.Context(), opentracing.TextMap, c); err != nil { - log.FromContext(ctx).Errorf("failed to inject tracing headers: %v", err) - } - msg.Headers[correlation.HeaderID] = correlation.IDFromContext(ctx) - return sp -} - -// Close the channel and connection. -func (tc *Publisher) Close() error { - return patronerrors.Aggregate(tc.channel.Close(), tc.connection.Close()) -} - -type amqpHeadersCarrier map[string]interface{} - -// Set implements Set() of opentracing.TextMapWriter. -func (c amqpHeadersCarrier) Set(key, val string) { - c[key] = val -} - -func observePublish(ctx context.Context, span opentracing.Span, start time.Time, exchange string, err error) { - trace.SpanComplete(span, err) - - durationHistogram := trace.Histogram{ - Observer: publishDurationMetrics.WithLabelValues(exchange, strconv.FormatBool(err == nil)), - } - durationHistogram.Observe(ctx, time.Since(start).Seconds()) -} diff --git a/client/amqp/v2/amqp_test.go b/client/amqp/v2/amqp_test.go deleted file mode 100644 index 4a2955f2c..000000000 --- a/client/amqp/v2/amqp_test.go +++ /dev/null @@ -1,49 +0,0 @@ -package v2 - -import ( - "context" - "testing" - - "github.com/opentracing/opentracing-go" - "github.com/opentracing/opentracing-go/mocktracer" - "github.com/streadway/amqp" - "github.com/stretchr/testify/assert" -) - -func TestNew(t *testing.T) { - t.Parallel() - type args struct { - url string - } - tests := map[string]struct { - args args - expectedErr string - }{ - "fail, missing url": {args: args{}, expectedErr: "url is required"}, - } - for name, tt := range tests { - tt := tt - t.Run(name, func(t *testing.T) { - t.Parallel() - - got, err := New(tt.args.url) - if tt.expectedErr != "" { - assert.EqualError(t, err, tt.expectedErr) - assert.Nil(t, got) - } else { - assert.NoError(t, err) - assert.NotNil(t, got) - } - }) - } -} - -func Test_injectTraceHeaders(t *testing.T) { - mtr := mocktracer.New() - opentracing.SetGlobalTracer(mtr) - t.Cleanup(func() { mtr.Reset() }) - msg := amqp.Publishing{} - sp := injectTraceHeaders(context.Background(), "123", &msg) - assert.NotNil(t, sp) - assert.NotEmpty(t, msg.Headers) -} diff --git a/client/amqp/v2/option.go b/client/amqp/v2/option.go deleted file mode 100644 index a1e4a3f3f..000000000 --- a/client/amqp/v2/option.go +++ /dev/null @@ -1,16 +0,0 @@ -package v2 - -import ( - "github.com/streadway/amqp" -) - -// OptionFunc definition for configuring the publisher in a functional way. -type OptionFunc func(*Publisher) error - -// WithConfig option for providing dial configuration. -func WithConfig(cfg amqp.Config) OptionFunc { - return func(p *Publisher) error { - p.cfg = &cfg - return nil - } -} diff --git a/client/amqp/v2/option_test.go b/client/amqp/v2/option_test.go deleted file mode 100644 index f28dfee11..000000000 --- a/client/amqp/v2/option_test.go +++ /dev/null @@ -1,18 +0,0 @@ -package v2 - -import ( - "testing" - - "github.com/streadway/amqp" - "github.com/stretchr/testify/assert" -) - -func TestTimeout(t *testing.T) { - cfg := amqp.Config{ - Locale: "123", - } - - p := Publisher{} - assert.NoError(t, WithConfig(cfg)(&p)) - assert.Equal(t, cfg, *p.cfg) -} diff --git a/component/amqp/integration_test.go b/component/amqp/integration_test.go index aa8f41dcf..d2ee44f6f 100644 --- a/component/amqp/integration_test.go +++ b/component/amqp/integration_test.go @@ -8,7 +8,7 @@ import ( "testing" "time" - v2 "github.com/beatlabs/patron/client/amqp/v2" + patronamqp "github.com/beatlabs/patron/client/amqp" "github.com/beatlabs/patron/correlation" "github.com/opentracing/opentracing-go/ext" "github.com/prometheus/client_golang/prometheus/testutil" @@ -28,7 +28,7 @@ func TestRun(t *testing.T) { ctx, cnl := context.WithCancel(context.Background()) - pub, err := v2.New(endpoint) + pub, err := patronamqp.New(endpoint) require.NoError(t, err) sent := []string{"one", "two"} diff --git a/examples/kafka-legacy/main.go b/examples/kafka-legacy/main.go index e99eeb156..fe561c978 100644 --- a/examples/kafka-legacy/main.go +++ b/examples/kafka-legacy/main.go @@ -7,7 +7,7 @@ import ( "time" "github.com/beatlabs/patron" - patronamqp "github.com/beatlabs/patron/client/amqp/v2" + patronamqp "github.com/beatlabs/patron/client/amqp" "github.com/beatlabs/patron/component/async" "github.com/beatlabs/patron/component/async/kafka" "github.com/beatlabs/patron/component/async/kafka/group" diff --git a/examples/kafka/main.go b/examples/kafka/main.go index 12f017946..ced91cb41 100644 --- a/examples/kafka/main.go +++ b/examples/kafka/main.go @@ -8,7 +8,7 @@ import ( "github.com/Shopify/sarama" "github.com/beatlabs/patron" - patronamqp "github.com/beatlabs/patron/client/amqp/v2" + patronamqp "github.com/beatlabs/patron/client/amqp" "github.com/beatlabs/patron/component/kafka" "github.com/beatlabs/patron/component/kafka/group" "github.com/beatlabs/patron/encoding/json" From 670f0fa52cfbc4549fd5ceee89dc6feb9840de29 Mon Sep 17 00:00:00 2001 From: Sotirios Mantziaris Date: Tue, 3 Jan 2023 16:27:07 +0200 Subject: [PATCH 2/4] Deprecate v1 clients --- client/kafka/async_producer.go | 51 +++--- client/kafka/builder.go | 205 ---------------------- client/kafka/builder_test.go | 150 ---------------- client/kafka/integration_test.go | 84 +++++++-- client/kafka/kafka.go | 192 ++++++++++++-------- client/kafka/kafka_test.go | 156 ++++++++-------- client/kafka/sync_producer.go | 85 ++++++--- client/kafka/v2/async_producer.go | 60 ------- client/kafka/v2/integration_test.go | 166 ------------------ client/kafka/v2/kafka.go | 179 ------------------- client/kafka/v2/kafka_test.go | 96 ---------- client/kafka/v2/sync_producer.go | 102 ----------- component/kafka/group/component_test.go | 1 + component/kafka/group/integration_test.go | 2 +- examples/http-sec/main.go | 2 +- 15 files changed, 351 insertions(+), 1180 deletions(-) delete mode 100644 client/kafka/builder.go delete mode 100644 client/kafka/builder_test.go delete mode 100644 client/kafka/v2/async_producer.go delete mode 100644 client/kafka/v2/integration_test.go delete mode 100644 client/kafka/v2/kafka.go delete mode 100644 client/kafka/v2/kafka_test.go delete mode 100644 client/kafka/v2/sync_producer.go diff --git a/client/kafka/async_producer.go b/client/kafka/async_producer.go index b11dcc5e0..64093447f 100644 --- a/client/kafka/async_producer.go +++ b/client/kafka/async_producer.go @@ -1,53 +1,48 @@ // Package kafka provides a client with included tracing capabilities. -// -// Deprecated: The Kafka client package is superseded by the `github.com/beatlabs/patron/client/kafka/v2` package. -// Please refer to the documents and the examples for the usage. -// -// This package is frozen and no new functionality will be added. package kafka import ( "context" "fmt" - "github.com/beatlabs/patron/trace" - "github.com/Shopify/sarama" - opentracing "github.com/opentracing/opentracing-go" + patronerrors "github.com/beatlabs/patron/errors" + "github.com/beatlabs/patron/trace" + "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/ext" ) +var asyncTag = opentracing.Tag{Key: "type", Value: deliveryTypeAsync} + // AsyncProducer is an asynchronous Kafka producer. type AsyncProducer struct { baseProducer asyncProd sarama.AsyncProducer - chErr chan error } // Send a message to a topic, asynchronously. Producer errors are queued on the // channel obtained during the AsyncProducer creation. -func (ap *AsyncProducer) Send(ctx context.Context, msg *Message) error { - sp, _ := trace.ChildSpan(ctx, trace.ComponentOpName(asyncProducerComponent, msg.topic), - asyncProducerComponent, ext.SpanKindProducer, ap.tag, - opentracing.Tag{Key: "topic", Value: msg.topic}) - pm, err := ap.createProducerMessage(ctx, msg, sp) +func (ap *AsyncProducer) Send(ctx context.Context, msg *sarama.ProducerMessage) error { + sp, _ := trace.ChildSpan(ctx, trace.ComponentOpName(componentTypeAsync, msg.Topic), componentTypeAsync, + ext.SpanKindProducer, asyncTag, opentracing.Tag{Key: "topic", Value: msg.Topic}) + + err := injectTracingAndCorrelationHeaders(ctx, msg, sp) if err != nil { - ap.statusCountInc(messageCreationErrors, msg.topic) + statusCountAdd(deliveryTypeAsync, deliveryStatusSendError, msg.Topic, 1) trace.SpanError(sp) - return err + return fmt.Errorf("failed to inject tracing headers: %w", err) } - ap.statusCountInc(messageSent, msg.topic) - ap.asyncProd.Input() <- pm + ap.asyncProd.Input() <- msg + statusCountAdd(deliveryTypeAsync, deliveryStatusSent, msg.Topic, 1) trace.SpanSuccess(sp) - return nil } -func (ap *AsyncProducer) propagateError() { +func (ap *AsyncProducer) propagateError(chErr chan<- error) { for pe := range ap.asyncProd.Errors() { - ap.statusCountInc(messageSendErrors, pe.Msg.Topic) - ap.chErr <- fmt.Errorf("failed to send message: %w", pe) + statusCountAdd(deliveryTypeAsync, deliveryStatusSendError, pe.Msg.Topic, 1) + chErr <- fmt.Errorf("failed to send message: %w", pe) } } @@ -55,16 +50,10 @@ func (ap *AsyncProducer) propagateError() { // flushed. You must call this function before a producer object passes out of // scope, as it may otherwise leak memory. func (ap *AsyncProducer) Close() error { - err := ap.asyncProd.Close() - if err != nil { - // always close client - _ = ap.prodClient.Close() - - return fmt.Errorf("failed to close async producer client: %w", err) + if err := ap.asyncProd.Close(); err != nil { + return patronerrors.Aggregate(fmt.Errorf("failed to close async producer client: %w", err), ap.prodClient.Close()) } - - err = ap.prodClient.Close() - if err != nil { + if err := ap.prodClient.Close(); err != nil { return fmt.Errorf("failed to close async producer: %w", err) } return nil diff --git a/client/kafka/builder.go b/client/kafka/builder.go deleted file mode 100644 index c8ea56710..000000000 --- a/client/kafka/builder.go +++ /dev/null @@ -1,205 +0,0 @@ -package kafka - -import ( - "errors" - "fmt" - "time" - - "github.com/beatlabs/patron/encoding" - "github.com/beatlabs/patron/encoding/json" - patronErrors "github.com/beatlabs/patron/errors" - "github.com/beatlabs/patron/internal/validation" - "github.com/beatlabs/patron/log" - - "github.com/Shopify/sarama" - "github.com/opentracing/opentracing-go" -) - -// RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements -// it must see before responding. -type RequiredAcks int16 - -const ( - // NoResponse doesn't send any response, the TCP ACK is all you get. - NoResponse RequiredAcks = 0 - // WaitForLocal waits for only the local commit to succeed before responding. - WaitForLocal RequiredAcks = 1 - // WaitForAll waits for all in-sync replicas to commit before responding. - WaitForAll RequiredAcks = -1 -) - -const fieldSetMsg = "setting kafka property '%v' to '%v'" - -// Builder gathers all required and optional properties, in order -// to construct a Kafka AsyncProducer/SyncProducer. -type Builder struct { - brokers []string - cfg *sarama.Config - enc encoding.EncodeFunc - contentType string - errors []error -} - -// NewBuilder initiates the AsyncProducer/SyncProducer builder chain. -// The builder instantiates the component using default values for -// EncodeFunc and Content-Type header. -// -// Deprecated: The Kafka client package is superseded by the `github.com/beatlabs/patron/client/kafka/v2` package. -// Please refer to the documents and the examples for the usage. -// -// This package is frozen and no new functionality will be added. -func NewBuilder(brokers []string) *Builder { - cfg := sarama.NewConfig() - cfg.Version = sarama.V0_11_0_0 - - var errs []error - if validation.IsStringSliceEmpty(brokers) { - errs = append(errs, errors.New("brokers are empty or have an empty value")) - } - - return &Builder{ - brokers: brokers, - cfg: cfg, - enc: json.Encode, - contentType: json.Type, - errors: errs, - } -} - -// WithTimeout sets the dial timeout for the sync or async producer. -func (ab *Builder) WithTimeout(dial time.Duration) *Builder { - if dial <= 0 { - ab.errors = append(ab.errors, errors.New("dial timeout has to be positive")) - return ab - } - ab.cfg.Net.DialTimeout = dial - log.Debugf(fieldSetMsg, "dial timeout", dial) - return ab -} - -// WithVersion sets the kafka versionfor the AsyncProducer/SyncProducer. -func (ab *Builder) WithVersion(version string) *Builder { - if version == "" { - ab.errors = append(ab.errors, errors.New("version is required")) - return ab - } - v, err := sarama.ParseKafkaVersion(version) - if err != nil { - ab.errors = append(ab.errors, errors.New("failed to parse kafka version")) - return ab - } - log.Debugf(fieldSetMsg, "version", version) - ab.cfg.Version = v - - return ab -} - -// WithRequiredAcksPolicy adjusts how many replica acknowledgements -// broker must see before responding. -func (ab *Builder) WithRequiredAcksPolicy(ack RequiredAcks) *Builder { - if !isValidRequiredAcks(ack) { - ab.errors = append(ab.errors, errors.New("invalid value for required acks policy provided")) - return ab - } - log.Debugf(fieldSetMsg, "required acks", ack) - ab.cfg.Producer.RequiredAcks = sarama.RequiredAcks(ack) - return ab -} - -// WithEncoder sets a specific encoder implementation and Content-Type string header; -// if no option is provided it defaults to json. -func (ab *Builder) WithEncoder(enc encoding.EncodeFunc, contentType string) *Builder { - if enc == nil { - ab.errors = append(ab.errors, errors.New("encoder is nil")) - } else { - log.Debugf(fieldSetMsg, "encoder", enc) - ab.enc = enc - } - if contentType == "" { - ab.errors = append(ab.errors, errors.New("content type is empty")) - } else { - log.Debugf(fieldSetMsg, "content type", contentType) - ab.contentType = contentType - } - - return ab -} - -// CreateAsync constructs the AsyncProducer component by applying the gathered properties. -func (ab *Builder) CreateAsync() (*AsyncProducer, <-chan error, error) { - if len(ab.errors) > 0 { - return nil, nil, patronErrors.Aggregate(ab.errors...) - } - - ap := AsyncProducer{ - baseProducer: baseProducer{ - messageStatus: messageStatus, - deliveryType: "async", - cfg: ab.cfg, - enc: ab.enc, - contentType: ab.contentType, - tag: opentracing.Tag{Key: "type", Value: "async"}, - }, - } - - var err error - ap.prodClient, err = sarama.NewClient(ab.brokers, ab.cfg) - if err != nil { - return nil, nil, fmt.Errorf("failed to create producer client: %w", err) - } - - ap.asyncProd, err = sarama.NewAsyncProducerFromClient(ap.prodClient) - if err != nil { - return nil, nil, fmt.Errorf("failed to create async producer: %w", err) - } - ap.chErr = make(chan error) - - go ap.propagateError() - - return &ap, ap.chErr, nil -} - -// CreateSync constructs the SyncProducer component by applying the gathered properties. -func (ab *Builder) CreateSync() (*SyncProducer, error) { - if len(ab.errors) > 0 { - return nil, patronErrors.Aggregate(ab.errors...) - } - - // required for any SyncProducer; 'Errors' is already true by default for both async/sync producers - ab.cfg.Producer.Return.Successes = true - - p := SyncProducer{ - baseProducer: baseProducer{ - messageStatus: messageStatus, - deliveryType: "sync", - cfg: ab.cfg, - enc: ab.enc, - contentType: ab.contentType, - tag: opentracing.Tag{Key: "type", Value: "sync"}, - }, - } - - var err error - p.prodClient, err = sarama.NewClient(ab.brokers, ab.cfg) - if err != nil { - return nil, fmt.Errorf("failed to create producer client: %w", err) - } - - p.syncProd, err = sarama.NewSyncProducerFromClient(p.prodClient) - if err != nil { - return nil, fmt.Errorf("failed to create sync producer: %w", err) - } - - return &p, nil -} - -func isValidRequiredAcks(ack RequiredAcks) bool { - switch ack { - case - NoResponse, - WaitForLocal, - WaitForAll: - return true - } - return false -} diff --git a/client/kafka/builder_test.go b/client/kafka/builder_test.go deleted file mode 100644 index 5103a472a..000000000 --- a/client/kafka/builder_test.go +++ /dev/null @@ -1,150 +0,0 @@ -package kafka - -import ( - "testing" - "time" - - "github.com/beatlabs/patron/encoding" - "github.com/beatlabs/patron/encoding/json" - "github.com/beatlabs/patron/encoding/protobuf" - - "github.com/Shopify/sarama" - "github.com/stretchr/testify/assert" -) - -func TestVersion(t *testing.T) { - type args struct { - version string - } - tests := []struct { - name string - args args - wantErr bool - }{ - {name: "success", args: args{version: sarama.V0_10_2_0.String()}, wantErr: false}, - {name: "failure, missing version", args: args{version: ""}, wantErr: true}, - {name: "failure, invalid version", args: args{version: "xxxxx"}, wantErr: true}, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - ab := NewBuilder([]string{"123"}).WithVersion(tt.args.version) - if tt.wantErr { - assert.NotEmpty(t, ab.errors) - } else { - assert.Empty(t, ab.errors) - v, err := sarama.ParseKafkaVersion(tt.args.version) - assert.NoError(t, err) - assert.Equal(t, v, ab.cfg.Version) - } - }) - } -} - -func TestTimeouts(t *testing.T) { - type args struct { - dial time.Duration - } - tests := []struct { - name string - args args - wantErr bool - }{ - {name: "success", args: args{dial: time.Second}, wantErr: false}, - {name: "fail, zero timeout", args: args{dial: 0 * time.Second}, wantErr: true}, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - ab := NewBuilder([]string{"123"}).WithTimeout(tt.args.dial) - if tt.wantErr { - assert.NotEmpty(t, ab.errors) - } else { - assert.Empty(t, ab.errors) - assert.Equal(t, tt.args.dial, ab.cfg.Net.DialTimeout) - } - }) - } -} - -func TestRequiredAcksPolicy(t *testing.T) { - type args struct { - requiredAcks RequiredAcks - } - tests := []struct { - name string - args args - wantErr bool - }{ - {name: "success", args: args{requiredAcks: NoResponse}, wantErr: false}, - {name: "success", args: args{requiredAcks: WaitForAll}, wantErr: false}, - {name: "success", args: args{requiredAcks: WaitForLocal}, wantErr: false}, - {name: "failure", args: args{requiredAcks: -5}, wantErr: true}, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - ab := NewBuilder([]string{"123"}).WithRequiredAcksPolicy(tt.args.requiredAcks) - if tt.wantErr { - assert.NotEmpty(t, ab.errors) - } else { - assert.Empty(t, ab.errors) - assert.EqualValues(t, tt.args.requiredAcks, ab.cfg.Producer.RequiredAcks) - } - }) - } -} - -func TestEncoder(t *testing.T) { - type args struct { - enc encoding.EncodeFunc - contentType string - } - tests := []struct { - name string - args args - wantErr bool - }{ - {name: "json EncodeFunc", args: args{enc: json.Encode, contentType: json.Type}, wantErr: false}, - {name: "protobuf EncodeFunc", args: args{enc: protobuf.Encode, contentType: protobuf.Type}, wantErr: false}, - {name: "empty content type", args: args{enc: protobuf.Encode, contentType: ""}, wantErr: true}, - {name: "nil EncodeFunc", args: args{enc: nil}, wantErr: true}, - {name: "nil EncodeFunc w/ ct", args: args{enc: nil, contentType: protobuf.Type}, wantErr: true}, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - ab := NewBuilder([]string{"123"}).WithEncoder(tt.args.enc, tt.args.contentType) - if tt.wantErr { - assert.NotEmpty(t, ab.errors) - } else { - assert.Empty(t, ab.errors) - assert.NotNil(t, ab.enc) - assert.Equal(t, tt.args.contentType, ab.contentType) - } - }) - } -} - -func TestBrokers(t *testing.T) { - type args struct { - brokers []string - } - tests := []struct { - name string - args args - wantErr bool - }{ - {name: "single mock broker", args: args{brokers: []string{"123"}}, wantErr: false}, - {name: "multiple mock brokers", args: args{brokers: []string{"123", "123", "123"}}, wantErr: false}, - {name: "empty brokers list", args: args{brokers: []string{}}, wantErr: true}, - {name: "brokers list with an empty value", args: args{brokers: []string{" ", "value"}}, wantErr: true}, - {name: "nil brokers list", args: args{brokers: nil}, wantErr: true}, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - ab := NewBuilder(tt.args.brokers) - if tt.wantErr { - assert.NotEmpty(t, ab.errors) - } else { - assert.Empty(t, ab.errors) - } - }) - } -} diff --git a/client/kafka/integration_test.go b/client/kafka/integration_test.go index 92429bdd3..39f37d409 100644 --- a/client/kafka/integration_test.go +++ b/client/kafka/integration_test.go @@ -1,5 +1,4 @@ //go:build integration -// +build integration package kafka @@ -7,6 +6,7 @@ import ( "context" "testing" + "github.com/Shopify/sarama" "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/ext" "github.com/opentracing/opentracing-go/mocktracer" @@ -22,27 +22,40 @@ const ( var brokers = []string{"127.0.0.1:9093"} func TestNewAsyncProducer_Success(t *testing.T) { - ap, chErr, err := NewBuilder(brokers).CreateAsync() + saramaCfg, err := DefaultProducerSaramaConfig("test-producer", true) + require.Nil(t, err) + + ap, chErr, err := New(brokers, saramaCfg).CreateAsync() assert.NoError(t, err) assert.NotNil(t, ap) assert.NotNil(t, chErr) } func TestNewSyncProducer_Success(t *testing.T) { - p, err := NewBuilder(brokers).CreateSync() + saramaCfg, err := DefaultProducerSaramaConfig("test-producer", true) + require.Nil(t, err) + + p, err := New(brokers, saramaCfg).Create() assert.NoError(t, err) assert.NotNil(t, p) } func TestAsyncProducer_SendMessage_Close(t *testing.T) { + saramaCfg, err := DefaultProducerSaramaConfig("test-consumer", false) + require.Nil(t, err) + mtr := mocktracer.New() defer mtr.Reset() opentracing.SetGlobalTracer(mtr) - ap, chErr, err := NewBuilder(brokers).CreateAsync() + ap, chErr, err := New(brokers, saramaCfg).CreateAsync() assert.NoError(t, err) assert.NotNil(t, ap) assert.NotNil(t, chErr) - msg := NewMessage(clientTopic, "TEST") + msg := &sarama.ProducerMessage{ + Topic: clientTopic, + Value: sarama.StringEncoder("TEST"), + Headers: []sarama.RecordHeader{{Key: []byte("123"), Value: []byte("123")}}, + } err = ap.Send(context.Background(), msg) assert.NoError(t, err) assert.NoError(t, ap.Close()) @@ -57,20 +70,29 @@ func TestAsyncProducer_SendMessage_Close(t *testing.T) { "version": "dev", } assert.Equal(t, expected, mtr.FinishedSpans()[0].Tags()) + // Metrics - assert.Equal(t, 1, testutil.CollectAndCount(messageStatus, "component_kafka_producer_message_status")) + assert.Equal(t, 1, testutil.CollectAndCount(messageStatus, "client_kafka_producer_message_status")) } func TestSyncProducer_SendMessage_Close(t *testing.T) { + saramaCfg, err := DefaultProducerSaramaConfig("test-producer", true) + require.NoError(t, err) + mtr := mocktracer.New() defer mtr.Reset() opentracing.SetGlobalTracer(mtr) - p, err := NewBuilder(brokers).CreateSync() + p, err := New(brokers, saramaCfg).Create() require.NoError(t, err) assert.NotNil(t, p) - msg := NewMessage(clientTopic, "TEST") - err = p.Send(context.Background(), msg) + msg := &sarama.ProducerMessage{ + Topic: clientTopic, + Value: sarama.StringEncoder("TEST"), + } + partition, offset, err := p.Send(context.Background(), msg) assert.NoError(t, err) + assert.True(t, partition >= 0) + assert.True(t, offset >= 0) assert.NoError(t, p.Close()) assert.Len(t, mtr.FinishedSpans(), 1) @@ -85,8 +107,45 @@ func TestSyncProducer_SendMessage_Close(t *testing.T) { assert.Equal(t, expected, mtr.FinishedSpans()[0].Tags()) } +func TestSyncProducer_SendMessages_Close(t *testing.T) { + saramaCfg, err := DefaultProducerSaramaConfig("test-producer", true) + require.NoError(t, err) + + mtr := mocktracer.New() + defer mtr.Reset() + opentracing.SetGlobalTracer(mtr) + p, err := New(brokers, saramaCfg).Create() + require.NoError(t, err) + assert.NotNil(t, p) + msg1 := &sarama.ProducerMessage{ + Topic: clientTopic, + Value: sarama.StringEncoder("TEST1"), + } + msg2 := &sarama.ProducerMessage{ + Topic: clientTopic, + Value: sarama.StringEncoder("TEST2"), + } + err = p.SendBatch(context.Background(), []*sarama.ProducerMessage{msg1, msg2}) + assert.NoError(t, err) + assert.NoError(t, p.Close()) + assert.Len(t, mtr.FinishedSpans(), 2) + + expected := map[string]interface{}{ + "component": "kafka-sync-producer", + "error": false, + "span.kind": ext.SpanKindEnum("producer"), + "topic": "batch", + "type": "sync", + "version": "dev", + } + assert.Equal(t, expected, mtr.FinishedSpans()[0].Tags()) +} + func TestAsyncProducerActiveBrokers(t *testing.T) { - ap, chErr, err := NewBuilder(brokers).CreateAsync() + saramaCfg, err := DefaultProducerSaramaConfig("test-producer", true) + require.NoError(t, err) + + ap, chErr, err := New(brokers, saramaCfg).CreateAsync() assert.NoError(t, err) assert.NotNil(t, ap) assert.NotNil(t, chErr) @@ -95,7 +154,10 @@ func TestAsyncProducerActiveBrokers(t *testing.T) { } func TestSyncProducerActiveBrokers(t *testing.T) { - ap, err := NewBuilder(brokers).CreateSync() + saramaCfg, err := DefaultProducerSaramaConfig("test-producer", true) + require.NoError(t, err) + + ap, err := New(brokers, saramaCfg).Create() assert.NoError(t, err) assert.NotNil(t, ap) assert.NotEmpty(t, ap.ActiveBrokers()) diff --git a/client/kafka/kafka.go b/client/kafka/kafka.go index 413250cdd..15a7a011f 100644 --- a/client/kafka/kafka.go +++ b/client/kafka/kafka.go @@ -4,59 +4,37 @@ import ( "context" "errors" "fmt" - - "github.com/beatlabs/patron/correlation" - "github.com/beatlabs/patron/encoding" + "os" "github.com/Shopify/sarama" + "github.com/beatlabs/patron/correlation" + patronerrors "github.com/beatlabs/patron/errors" + "github.com/beatlabs/patron/internal/validation" "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" ) -const ( - asyncProducerComponent = "kafka-async-producer" - syncProducerComponent = "kafka-sync-producer" - messageCreationErrors = "creation-errors" - messageSendErrors = "send-errors" - messageSent = "sent" +type ( + deliveryStatus string ) -var messageStatus *prometheus.CounterVec - -// Producer interface for Kafka. -type Producer interface { - Send(ctx context.Context, msg *Message) error - Close() error -} +const ( + deliveryTypeSync = "sync" + deliveryTypeAsync = "async" -type baseProducer struct { - cfg *sarama.Config - prodClient sarama.Client - tag opentracing.Tag - enc encoding.EncodeFunc - contentType string - // deliveryType can be 'sync' or 'async' - deliveryType string - messageStatus *prometheus.CounterVec -} + deliveryStatusSent deliveryStatus = "sent" + deliveryStatusSendError deliveryStatus = "send-errors" -var ( - _ Producer = &AsyncProducer{} - _ Producer = &SyncProducer{} + componentTypeAsync = "kafka-async-producer" + componentTypeSync = "kafka-sync-producer" ) -// Message abstraction of a Kafka message. -type Message struct { - topic string - body interface{} - key *string - headers kafkaHeadersCarrier -} +var messageStatus *prometheus.CounterVec func init() { messageStatus = prometheus.NewCounterVec( prometheus.CounterOpts{ - Namespace: "component", + Namespace: "client", Subsystem: "kafka_producer", Name: "message_status", Help: "Message status counter (produced, encoded, encoding-errors) classified by topic", @@ -66,28 +44,12 @@ func init() { prometheus.MustRegister(messageStatus) } -// NewMessage creates a new message. -func NewMessage(t string, b interface{}) *Message { - return &Message{topic: t, body: b} -} - -// SetHeader allows to set a message header. -// Multiple headers with the same key are supported. -// Headers are only set if Kafka is version 0.11+. -func (m *Message) SetHeader(key, value string) { - m.headers.Set(key, value) -} - -// NewMessageWithKey creates a new message with an associated key. -func NewMessageWithKey(t string, b interface{}, k string) (*Message, error) { - if k == "" { - return nil, errors.New("key string can not be null") - } - return &Message{topic: t, body: b, key: &k}, nil +func statusCountAdd(deliveryType string, status deliveryStatus, topic string, cnt int) { + messageStatus.WithLabelValues(string(status), topic, deliveryType).Add(float64(cnt)) } -func (p *baseProducer) statusCountInc(status, topic string) { - p.messageStatus.WithLabelValues(status, topic, p.deliveryType).Inc() +type baseProducer struct { + prodClient sarama.Client } // ActiveBrokers returns a list of active brokers' addresses. @@ -100,31 +62,102 @@ func (p *baseProducer) ActiveBrokers() []string { return activeBrokerAddresses } -func (p *baseProducer) createProducerMessage(ctx context.Context, msg *Message, sp opentracing.Span) (*sarama.ProducerMessage, error) { - c := msg.headers - err := sp.Tracer().Inject(sp.Context(), opentracing.TextMap, &c) +// Builder definition for creating sync and async producers. +type Builder struct { + brokers []string + cfg *sarama.Config + errs []error +} + +// New initiates the AsyncProducer/SyncProducer builder chain with the specified Sarama configuration. +func New(brokers []string, saramaConfig *sarama.Config) *Builder { + var ee []error + if validation.IsStringSliceEmpty(brokers) { + ee = append(ee, errors.New("brokers are empty or have an empty value")) + } + if saramaConfig == nil { + ee = append(ee, errors.New("no Sarama configuration specified")) + } + + return &Builder{ + brokers: brokers, + errs: ee, + cfg: saramaConfig, + } +} + +// DefaultProducerSaramaConfig creates a default Sarama configuration with idempotency enabled. +// See also: +// * https://pkg.go.dev/github.com/Shopify/sarama#RequiredAcks +// * https://pkg.go.dev/github.com/Shopify/sarama#Config +func DefaultProducerSaramaConfig(name string, idempotent bool) (*sarama.Config, error) { + host, err := os.Hostname() + if err != nil { + return nil, errors.New("failed to get hostname") + } + + cfg := sarama.NewConfig() + cfg.ClientID = fmt.Sprintf("%s-%s", host, name) + + if idempotent { + cfg.Net.MaxOpenRequests = 1 + cfg.Producer.Idempotent = true + } + cfg.Producer.RequiredAcks = sarama.WaitForAll + + return cfg, nil +} + +// Create a new synchronous producer. +func (b *Builder) Create() (*SyncProducer, error) { + if len(b.errs) > 0 { + return nil, patronerrors.Aggregate(b.errs...) + } + + // required for any SyncProducer; 'Errors' is already true by default for both async/sync producers + b.cfg.Producer.Return.Successes = true + + p := SyncProducer{} + + var err error + p.prodClient, err = sarama.NewClient(b.brokers, b.cfg) if err != nil { - return nil, fmt.Errorf("failed to inject tracing headers: %w", err) + return nil, fmt.Errorf("failed to create producer client: %w", err) } - c.Set(encoding.ContentTypeHeader, p.contentType) - var saramaKey sarama.Encoder - if msg.key != nil { - saramaKey = sarama.StringEncoder(*msg.key) + p.syncProd, err = sarama.NewSyncProducerFromClient(p.prodClient) + if err != nil { + return nil, fmt.Errorf("failed to create sync producer: %w", err) + } + + return &p, nil +} + +// CreateAsync a new asynchronous producer. +func (b Builder) CreateAsync() (*AsyncProducer, <-chan error, error) { + if len(b.errs) > 0 { + return nil, nil, patronerrors.Aggregate(b.errs...) + } + + ap := &AsyncProducer{ + baseProducer: baseProducer{}, + asyncProd: nil, } - b, err := p.enc(msg.body) + var err error + ap.prodClient, err = sarama.NewClient(b.brokers, b.cfg) if err != nil { - return nil, fmt.Errorf("failed to encode message body: %w", err) + return nil, nil, fmt.Errorf("failed to create producer client: %w", err) } - c.Set(correlation.HeaderID, correlation.IDFromContext(ctx)) - return &sarama.ProducerMessage{ - Topic: msg.topic, - Key: saramaKey, - Value: sarama.ByteEncoder(b), - Headers: c, - }, nil + ap.asyncProd, err = sarama.NewAsyncProducerFromClient(ap.prodClient) + if err != nil { + return nil, nil, fmt.Errorf("failed to create async producer: %w", err) + } + chErr := make(chan error) + go ap.propagateError(chErr) + + return ap, chErr, nil } type kafkaHeadersCarrier []sarama.RecordHeader @@ -133,3 +166,14 @@ type kafkaHeadersCarrier []sarama.RecordHeader func (c *kafkaHeadersCarrier) Set(key, val string) { *c = append(*c, sarama.RecordHeader{Key: []byte(key), Value: []byte(val)}) } + +func injectTracingAndCorrelationHeaders(ctx context.Context, msg *sarama.ProducerMessage, sp opentracing.Span) error { + msg.Headers = append(msg.Headers, sarama.RecordHeader{ + Key: []byte(correlation.HeaderID), + Value: []byte(correlation.IDFromContext(ctx)), + }) + c := kafkaHeadersCarrier(msg.Headers) + err := sp.Tracer().Inject(sp.Context(), opentracing.TextMap, &c) + msg.Headers = c + return err +} diff --git a/client/kafka/kafka_test.go b/client/kafka/kafka_test.go index 2033fb6ba..dad83e5c2 100644 --- a/client/kafka/kafka_test.go +++ b/client/kafka/kafka_test.go @@ -1,102 +1,96 @@ package kafka import ( + "context" + "fmt" + "strings" "testing" + "github.com/Shopify/sarama" + "github.com/beatlabs/patron/correlation" + "github.com/beatlabs/patron/trace" + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" + "github.com/opentracing/opentracing-go/mocktracer" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) -func TestNewMessage(t *testing.T) { - m := NewMessage("TOPIC", []byte("TEST")) - assert.Equal(t, "TOPIC", m.topic) - assert.Equal(t, []byte("TEST"), m.body) -} - -func TestNewMessageWithKey(t *testing.T) { - tests := []struct { - name string - data []byte - key string - wantErr bool +func TestBuilder_Create(t *testing.T) { + t.Parallel() + type args struct { + brokers []string + cfg *sarama.Config + } + tests := map[string]struct { + args args + expectedErr string }{ - {name: "success", data: []byte("TEST"), key: "TEST"}, - {name: "failure due to empty message key", data: []byte("TEST"), key: "", wantErr: true}, + "missing brokers": {args: args{brokers: nil, cfg: sarama.NewConfig()}, expectedErr: "brokers are empty or have an empty value\n"}, + "missing config": {args: args{brokers: []string{"123"}, cfg: nil}, expectedErr: "no Sarama configuration specified\n"}, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := NewMessageWithKey("TOPIC", tt.data, tt.key) - if tt.wantErr { - assert.Error(t, err) - assert.Nil(t, got) - } else { - assert.NoError(t, err) - assert.NotNil(t, got) - } + for name, tt := range tests { + tt := tt + t.Run(name, func(t *testing.T) { + t.Parallel() + got, err := New(tt.args.brokers, tt.args.cfg).Create() + + require.EqualError(t, err, tt.expectedErr) + require.Nil(t, got) }) } } -func TestNewAsyncProducer_Failure(t *testing.T) { - got, chErr, err := NewBuilder([]string{}).CreateAsync() - assert.Error(t, err) - assert.Nil(t, got) - assert.Nil(t, chErr) -} +func TestBuilder_CreateAsync(t *testing.T) { + t.Parallel() + type args struct { + brokers []string + cfg *sarama.Config + } + tests := map[string]struct { + args args + expectedErr string + }{ + "missing brokers": {args: args{brokers: nil, cfg: sarama.NewConfig()}, expectedErr: "brokers are empty or have an empty value\n"}, + "missing config": {args: args{brokers: []string{"123"}, cfg: nil}, expectedErr: "no Sarama configuration specified\n"}, + } + for name, tt := range tests { + tt := tt + t.Run(name, func(t *testing.T) { + t.Parallel() + got, chErr, err := New(tt.args.brokers, tt.args.cfg).CreateAsync() -func TestNewAsyncProducer_Option_Failure(t *testing.T) { - got, chErr, err := NewBuilder([]string{"xxx"}).WithVersion("xxxx").CreateAsync() - assert.Error(t, err) - assert.Nil(t, got) - assert.Nil(t, chErr) + require.EqualError(t, err, tt.expectedErr) + require.Nil(t, got) + require.Nil(t, chErr) + }) + } } -func TestNewSyncProducer_Failure(t *testing.T) { - got, err := NewBuilder([]string{}).CreateSync() - assert.Error(t, err) - assert.Nil(t, got) -} +func TestDefaultProducerSaramaConfig(t *testing.T) { + sc, err := DefaultProducerSaramaConfig("name", true) + require.NoError(t, err) + require.True(t, strings.HasSuffix(sc.ClientID, fmt.Sprintf("-%s", "name"))) + require.True(t, sc.Producer.Idempotent) -func TestNewSyncProducer_Option_Failure(t *testing.T) { - got, err := NewBuilder([]string{"xxx"}).WithVersion("xxxx").CreateSync() - assert.Error(t, err) - assert.Nil(t, got) + sc, err = DefaultProducerSaramaConfig("name", false) + require.NoError(t, err) + require.False(t, sc.Producer.Idempotent) } -func TestNewMessageWithHeader(t *testing.T) { - tests := []struct { - name string - data []byte - setHeaderKeys []string - setHeaderValues []string - expectedHeaderKeys []string - expectedHeaderValues []string - }{ - { - name: "2-headers", data: []byte("TEST"), - setHeaderKeys: []string{"header1", "header2"}, setHeaderValues: []string{"value1", "value2"}, - expectedHeaderKeys: []string{"header1", "header2"}, expectedHeaderValues: []string{"value1", "value2"}, - }, - { - name: "2-headers", data: []byte("TEST"), - setHeaderKeys: []string{"header1", "header1"}, setHeaderValues: []string{"value1", "value2"}, - expectedHeaderKeys: []string{"header1", "header1"}, expectedHeaderValues: []string{"value1", "value2"}, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - msg := NewMessage("TOPIC", tt.data) - - // set message headers - for i := 0; i < len(tt.setHeaderKeys); i++ { - msg.SetHeader(tt.setHeaderKeys[i], tt.setHeaderValues[i]) - } - - // verify - for i := 0; i < len(tt.expectedHeaderKeys); i++ { - assert.Equal(t, string(msg.headers[i].Key), tt.expectedHeaderKeys[i]) - assert.Equal(t, string(msg.headers[i].Value), tt.expectedHeaderValues[i]) - } - }) - } +func Test_injectTracingAndCorrelationHeaders(t *testing.T) { + mtr := mocktracer.New() + opentracing.SetGlobalTracer(mtr) + t.Cleanup(func() { mtr.Reset() }) + ctx := correlation.ContextWithID(context.Background(), "123") + sp, _ := trace.ChildSpan(context.Background(), trace.ComponentOpName(componentTypeAsync, "topic"), componentTypeAsync, + ext.SpanKindProducer, asyncTag, opentracing.Tag{Key: "topic", Value: "topic"}) + msg := sarama.ProducerMessage{} + assert.NoError(t, injectTracingAndCorrelationHeaders(ctx, &msg, sp)) + assert.Len(t, msg.Headers, 4) + assert.Equal(t, correlation.HeaderID, string(msg.Headers[0].Key)) + assert.Equal(t, "123", string(msg.Headers[0].Value)) + assert.Equal(t, "mockpfx-ids-traceid", string(msg.Headers[1].Key)) + assert.Equal(t, "mockpfx-ids-spanid", string(msg.Headers[2].Key)) + assert.Equal(t, "mockpfx-ids-sampled", string(msg.Headers[3].Key)) } diff --git a/client/kafka/sync_producer.go b/client/kafka/sync_producer.go index c7d610b33..612653ed0 100644 --- a/client/kafka/sync_producer.go +++ b/client/kafka/sync_producer.go @@ -2,44 +2,83 @@ package kafka import ( "context" + "errors" "fmt" - "github.com/beatlabs/patron/trace" - "github.com/Shopify/sarama" + patronerrors "github.com/beatlabs/patron/errors" + "github.com/beatlabs/patron/trace" "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/ext" ) +const batchTarget = "batch" + +var syncTag = opentracing.Tag{Key: "type", Value: deliveryTypeSync} + // SyncProducer is a synchronous Kafka producer. type SyncProducer struct { baseProducer - syncProd sarama.SyncProducer } // Send a message to a topic. -func (p *SyncProducer) Send(ctx context.Context, msg *Message) error { - sp, _ := trace.ChildSpan(ctx, trace.ComponentOpName(syncProducerComponent, msg.topic), - syncProducerComponent, ext.SpanKindProducer, p.tag, - opentracing.Tag{Key: "topic", Value: msg.topic}) - pm, err := p.createProducerMessage(ctx, msg, sp) +func (p *SyncProducer) Send(ctx context.Context, msg *sarama.ProducerMessage) (partition int32, offset int64, err error) { + sp, _ := trace.ChildSpan(ctx, trace.ComponentOpName(componentTypeSync, msg.Topic), componentTypeSync, + ext.SpanKindProducer, syncTag, opentracing.Tag{Key: "topic", Value: msg.Topic}) + + err = injectTracingAndCorrelationHeaders(ctx, msg, sp) if err != nil { - p.statusCountInc(messageCreationErrors, msg.topic) + statusCountAdd(deliveryTypeSync, deliveryStatusSendError, msg.Topic, 1) trace.SpanError(sp) - return err + return -1, -1, fmt.Errorf("failed to inject tracing headers: %w", err) } - _, _, err = p.syncProd.SendMessage(pm) + partition, offset, err = p.syncProd.SendMessage(msg) if err != nil { - p.statusCountInc(messageCreationErrors, msg.topic) + statusCountAdd(deliveryTypeSync, deliveryStatusSendError, msg.Topic, 1) trace.SpanError(sp) - return err + return -1, -1, err } - p.statusCountInc(messageSent, msg.topic) + statusCountAdd(deliveryTypeSync, deliveryStatusSent, msg.Topic, 1) trace.SpanSuccess(sp) + return partition, offset, nil +} + +// SendBatch sends a batch to a topic. +func (p *SyncProducer) SendBatch(ctx context.Context, messages []*sarama.ProducerMessage) error { + if len(messages) == 0 { + return errors.New("messages are empty or nil") + } + + spans := make([]opentracing.Span, 0, len(messages)) + + for _, msg := range messages { + sp, _ := trace.ChildSpan(ctx, trace.ComponentOpName(componentTypeSync, batchTarget), componentTypeSync, + ext.SpanKindProducer, syncTag, opentracing.Tag{Key: "topic", Value: batchTarget}) + if err := injectTracingAndCorrelationHeaders(ctx, msg, sp); err != nil { + statusCountAdd(deliveryTypeSync, deliveryStatusSendError, msg.Topic, len(messages)) + trace.SpanError(sp) + return fmt.Errorf("failed to inject tracing headers: %w", err) + } + spans = append(spans, sp) + } + + if err := p.syncProd.SendMessages(messages); err != nil { + statusCountBatchAdd(deliveryTypeSync, deliveryStatusSendError, messages) + for _, sp := range spans { + trace.SpanError(sp) + } + + return err + } + + statusCountBatchAdd(deliveryTypeSync, deliveryStatusSent, messages) + for _, sp := range spans { + trace.SpanSuccess(sp) + } return nil } @@ -47,17 +86,17 @@ func (p *SyncProducer) Send(ctx context.Context, msg *Message) error { // flushed. You must call this function before a producer object passes out of // scope, as it may otherwise leak memory. func (p *SyncProducer) Close() error { - err := p.syncProd.Close() - if err != nil { - // always close client - _ = p.prodClient.Close() - - return fmt.Errorf("failed to close sync producer client: %w", err) + if err := p.syncProd.Close(); err != nil { + return patronerrors.Aggregate(fmt.Errorf("failed to close sync producer client: %w", err), p.prodClient.Close()) } - - err = p.prodClient.Close() - if err != nil { + if err := p.prodClient.Close(); err != nil { return fmt.Errorf("failed to close sync producer: %w", err) } return nil } + +func statusCountBatchAdd(deliveryType string, status deliveryStatus, messages []*sarama.ProducerMessage) { + for _, msg := range messages { + statusCountAdd(deliveryType, status, msg.Topic, 1) + } +} diff --git a/client/kafka/v2/async_producer.go b/client/kafka/v2/async_producer.go deleted file mode 100644 index 57ca2d60f..000000000 --- a/client/kafka/v2/async_producer.go +++ /dev/null @@ -1,60 +0,0 @@ -// Package v2 provides a client with included tracing capabilities. -package v2 - -import ( - "context" - "fmt" - - "github.com/Shopify/sarama" - patronerrors "github.com/beatlabs/patron/errors" - "github.com/beatlabs/patron/trace" - "github.com/opentracing/opentracing-go" - "github.com/opentracing/opentracing-go/ext" -) - -var asyncTag = opentracing.Tag{Key: "type", Value: deliveryTypeAsync} - -// AsyncProducer is an asynchronous Kafka producer. -type AsyncProducer struct { - baseProducer - asyncProd sarama.AsyncProducer -} - -// Send a message to a topic, asynchronously. Producer errors are queued on the -// channel obtained during the AsyncProducer creation. -func (ap *AsyncProducer) Send(ctx context.Context, msg *sarama.ProducerMessage) error { - sp, _ := trace.ChildSpan(ctx, trace.ComponentOpName(componentTypeAsync, msg.Topic), componentTypeAsync, - ext.SpanKindProducer, asyncTag, opentracing.Tag{Key: "topic", Value: msg.Topic}) - - err := injectTracingAndCorrelationHeaders(ctx, msg, sp) - if err != nil { - statusCountAdd(deliveryTypeAsync, deliveryStatusSendError, msg.Topic, 1) - trace.SpanError(sp) - return fmt.Errorf("failed to inject tracing headers: %w", err) - } - - ap.asyncProd.Input() <- msg - statusCountAdd(deliveryTypeAsync, deliveryStatusSent, msg.Topic, 1) - trace.SpanSuccess(sp) - return nil -} - -func (ap *AsyncProducer) propagateError(chErr chan<- error) { - for pe := range ap.asyncProd.Errors() { - statusCountAdd(deliveryTypeAsync, deliveryStatusSendError, pe.Msg.Topic, 1) - chErr <- fmt.Errorf("failed to send message: %w", pe) - } -} - -// Close shuts down the producer and waits for any buffered messages to be -// flushed. You must call this function before a producer object passes out of -// scope, as it may otherwise leak memory. -func (ap *AsyncProducer) Close() error { - if err := ap.asyncProd.Close(); err != nil { - return patronerrors.Aggregate(fmt.Errorf("failed to close async producer client: %w", err), ap.prodClient.Close()) - } - if err := ap.prodClient.Close(); err != nil { - return fmt.Errorf("failed to close async producer: %w", err) - } - return nil -} diff --git a/client/kafka/v2/integration_test.go b/client/kafka/v2/integration_test.go deleted file mode 100644 index dc1d1d0a3..000000000 --- a/client/kafka/v2/integration_test.go +++ /dev/null @@ -1,166 +0,0 @@ -//go:build integration -// +build integration - -package v2 - -import ( - "context" - "testing" - - "github.com/Shopify/sarama" - "github.com/opentracing/opentracing-go" - "github.com/opentracing/opentracing-go/ext" - "github.com/opentracing/opentracing-go/mocktracer" - "github.com/prometheus/client_golang/prometheus/testutil" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -const ( - clientTopic = "clientTopic" -) - -var brokers = []string{"127.0.0.1:9093"} - -func TestNewAsyncProducer_Success(t *testing.T) { - saramaCfg, err := DefaultProducerSaramaConfig("test-producer", true) - require.Nil(t, err) - - ap, chErr, err := New(brokers, saramaCfg).CreateAsync() - assert.NoError(t, err) - assert.NotNil(t, ap) - assert.NotNil(t, chErr) -} - -func TestNewSyncProducer_Success(t *testing.T) { - saramaCfg, err := DefaultProducerSaramaConfig("test-producer", true) - require.Nil(t, err) - - p, err := New(brokers, saramaCfg).Create() - assert.NoError(t, err) - assert.NotNil(t, p) -} - -func TestAsyncProducer_SendMessage_Close(t *testing.T) { - saramaCfg, err := DefaultProducerSaramaConfig("test-consumer", false) - require.Nil(t, err) - - mtr := mocktracer.New() - defer mtr.Reset() - opentracing.SetGlobalTracer(mtr) - ap, chErr, err := New(brokers, saramaCfg).CreateAsync() - assert.NoError(t, err) - assert.NotNil(t, ap) - assert.NotNil(t, chErr) - msg := &sarama.ProducerMessage{ - Topic: clientTopic, - Value: sarama.StringEncoder("TEST"), - Headers: []sarama.RecordHeader{{Key: []byte("123"), Value: []byte("123")}}, - } - err = ap.Send(context.Background(), msg) - assert.NoError(t, err) - assert.NoError(t, ap.Close()) - assert.Len(t, mtr.FinishedSpans(), 1) - - expected := map[string]interface{}{ - "component": "kafka-async-producer", - "error": false, - "span.kind": ext.SpanKindEnum("producer"), - "topic": clientTopic, - "type": "async", - "version": "dev", - } - assert.Equal(t, expected, mtr.FinishedSpans()[0].Tags()) - - // Metrics - assert.Equal(t, 1, testutil.CollectAndCount(messageStatus, "client_kafka_producer_message_status")) -} - -func TestSyncProducer_SendMessage_Close(t *testing.T) { - saramaCfg, err := DefaultProducerSaramaConfig("test-producer", true) - require.NoError(t, err) - - mtr := mocktracer.New() - defer mtr.Reset() - opentracing.SetGlobalTracer(mtr) - p, err := New(brokers, saramaCfg).Create() - require.NoError(t, err) - assert.NotNil(t, p) - msg := &sarama.ProducerMessage{ - Topic: clientTopic, - Value: sarama.StringEncoder("TEST"), - } - partition, offset, err := p.Send(context.Background(), msg) - assert.NoError(t, err) - assert.True(t, partition >= 0) - assert.True(t, offset >= 0) - assert.NoError(t, p.Close()) - assert.Len(t, mtr.FinishedSpans(), 1) - - expected := map[string]interface{}{ - "component": "kafka-sync-producer", - "error": false, - "span.kind": ext.SpanKindEnum("producer"), - "topic": clientTopic, - "type": "sync", - "version": "dev", - } - assert.Equal(t, expected, mtr.FinishedSpans()[0].Tags()) -} - -func TestSyncProducer_SendMessages_Close(t *testing.T) { - saramaCfg, err := DefaultProducerSaramaConfig("test-producer", true) - require.NoError(t, err) - - mtr := mocktracer.New() - defer mtr.Reset() - opentracing.SetGlobalTracer(mtr) - p, err := New(brokers, saramaCfg).Create() - require.NoError(t, err) - assert.NotNil(t, p) - msg1 := &sarama.ProducerMessage{ - Topic: clientTopic, - Value: sarama.StringEncoder("TEST1"), - } - msg2 := &sarama.ProducerMessage{ - Topic: clientTopic, - Value: sarama.StringEncoder("TEST2"), - } - err = p.SendBatch(context.Background(), []*sarama.ProducerMessage{msg1, msg2}) - assert.NoError(t, err) - assert.NoError(t, p.Close()) - assert.Len(t, mtr.FinishedSpans(), 2) - - expected := map[string]interface{}{ - "component": "kafka-sync-producer", - "error": false, - "span.kind": ext.SpanKindEnum("producer"), - "topic": "batch", - "type": "sync", - "version": "dev", - } - assert.Equal(t, expected, mtr.FinishedSpans()[0].Tags()) -} - -func TestAsyncProducerActiveBrokers(t *testing.T) { - saramaCfg, err := DefaultProducerSaramaConfig("test-producer", true) - require.NoError(t, err) - - ap, chErr, err := New(brokers, saramaCfg).CreateAsync() - assert.NoError(t, err) - assert.NotNil(t, ap) - assert.NotNil(t, chErr) - assert.NotEmpty(t, ap.ActiveBrokers()) - assert.NoError(t, ap.Close()) -} - -func TestSyncProducerActiveBrokers(t *testing.T) { - saramaCfg, err := DefaultProducerSaramaConfig("test-producer", true) - require.NoError(t, err) - - ap, err := New(brokers, saramaCfg).Create() - assert.NoError(t, err) - assert.NotNil(t, ap) - assert.NotEmpty(t, ap.ActiveBrokers()) - assert.NoError(t, ap.Close()) -} diff --git a/client/kafka/v2/kafka.go b/client/kafka/v2/kafka.go deleted file mode 100644 index b8229ef0e..000000000 --- a/client/kafka/v2/kafka.go +++ /dev/null @@ -1,179 +0,0 @@ -package v2 - -import ( - "context" - "errors" - "fmt" - "os" - - "github.com/Shopify/sarama" - "github.com/beatlabs/patron/correlation" - patronerrors "github.com/beatlabs/patron/errors" - "github.com/beatlabs/patron/internal/validation" - "github.com/opentracing/opentracing-go" - "github.com/prometheus/client_golang/prometheus" -) - -type ( - deliveryStatus string -) - -const ( - deliveryTypeSync = "sync" - deliveryTypeAsync = "async" - - deliveryStatusSent deliveryStatus = "sent" - deliveryStatusSendError deliveryStatus = "send-errors" - - componentTypeAsync = "kafka-async-producer" - componentTypeSync = "kafka-sync-producer" -) - -var messageStatus *prometheus.CounterVec - -func init() { - messageStatus = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "client", - Subsystem: "kafka_producer", - Name: "message_status", - Help: "Message status counter (produced, encoded, encoding-errors) classified by topic", - }, []string{"status", "topic", "type"}, - ) - - prometheus.MustRegister(messageStatus) -} - -func statusCountAdd(deliveryType string, status deliveryStatus, topic string, cnt int) { - messageStatus.WithLabelValues(string(status), topic, deliveryType).Add(float64(cnt)) -} - -type baseProducer struct { - prodClient sarama.Client -} - -// ActiveBrokers returns a list of active brokers' addresses. -func (p *baseProducer) ActiveBrokers() []string { - brokers := p.prodClient.Brokers() - activeBrokerAddresses := make([]string, len(brokers)) - for i, b := range brokers { - activeBrokerAddresses[i] = b.Addr() - } - return activeBrokerAddresses -} - -// Builder definition for creating sync and async producers. -type Builder struct { - brokers []string - cfg *sarama.Config - errs []error -} - -// New initiates the AsyncProducer/SyncProducer builder chain with the specified Sarama configuration. -func New(brokers []string, saramaConfig *sarama.Config) *Builder { - var ee []error - if validation.IsStringSliceEmpty(brokers) { - ee = append(ee, errors.New("brokers are empty or have an empty value")) - } - if saramaConfig == nil { - ee = append(ee, errors.New("no Sarama configuration specified")) - } - - return &Builder{ - brokers: brokers, - errs: ee, - cfg: saramaConfig, - } -} - -// DefaultProducerSaramaConfig creates a default Sarama configuration with idempotency enabled. -// See also: -// * https://pkg.go.dev/github.com/Shopify/sarama#RequiredAcks -// * https://pkg.go.dev/github.com/Shopify/sarama#Config -func DefaultProducerSaramaConfig(name string, idempotent bool) (*sarama.Config, error) { - host, err := os.Hostname() - if err != nil { - return nil, errors.New("failed to get hostname") - } - - cfg := sarama.NewConfig() - cfg.ClientID = fmt.Sprintf("%s-%s", host, name) - - if idempotent { - cfg.Net.MaxOpenRequests = 1 - cfg.Producer.Idempotent = true - } - cfg.Producer.RequiredAcks = sarama.WaitForAll - - return cfg, nil -} - -// Create a new synchronous producer. -func (b *Builder) Create() (*SyncProducer, error) { - if len(b.errs) > 0 { - return nil, patronerrors.Aggregate(b.errs...) - } - - // required for any SyncProducer; 'Errors' is already true by default for both async/sync producers - b.cfg.Producer.Return.Successes = true - - p := SyncProducer{} - - var err error - p.prodClient, err = sarama.NewClient(b.brokers, b.cfg) - if err != nil { - return nil, fmt.Errorf("failed to create producer client: %w", err) - } - - p.syncProd, err = sarama.NewSyncProducerFromClient(p.prodClient) - if err != nil { - return nil, fmt.Errorf("failed to create sync producer: %w", err) - } - - return &p, nil -} - -// CreateAsync a new asynchronous producer. -func (b Builder) CreateAsync() (*AsyncProducer, <-chan error, error) { - if len(b.errs) > 0 { - return nil, nil, patronerrors.Aggregate(b.errs...) - } - - ap := &AsyncProducer{ - baseProducer: baseProducer{}, - asyncProd: nil, - } - - var err error - ap.prodClient, err = sarama.NewClient(b.brokers, b.cfg) - if err != nil { - return nil, nil, fmt.Errorf("failed to create producer client: %w", err) - } - - ap.asyncProd, err = sarama.NewAsyncProducerFromClient(ap.prodClient) - if err != nil { - return nil, nil, fmt.Errorf("failed to create async producer: %w", err) - } - chErr := make(chan error) - go ap.propagateError(chErr) - - return ap, chErr, nil -} - -type kafkaHeadersCarrier []sarama.RecordHeader - -// Set implements Set() of opentracing.TextMapWriter. -func (c *kafkaHeadersCarrier) Set(key, val string) { - *c = append(*c, sarama.RecordHeader{Key: []byte(key), Value: []byte(val)}) -} - -func injectTracingAndCorrelationHeaders(ctx context.Context, msg *sarama.ProducerMessage, sp opentracing.Span) error { - msg.Headers = append(msg.Headers, sarama.RecordHeader{ - Key: []byte(correlation.HeaderID), - Value: []byte(correlation.IDFromContext(ctx)), - }) - c := kafkaHeadersCarrier(msg.Headers) - err := sp.Tracer().Inject(sp.Context(), opentracing.TextMap, &c) - msg.Headers = c - return err -} diff --git a/client/kafka/v2/kafka_test.go b/client/kafka/v2/kafka_test.go deleted file mode 100644 index ef790d98a..000000000 --- a/client/kafka/v2/kafka_test.go +++ /dev/null @@ -1,96 +0,0 @@ -package v2 - -import ( - "context" - "fmt" - "strings" - "testing" - - "github.com/Shopify/sarama" - "github.com/beatlabs/patron/correlation" - "github.com/beatlabs/patron/trace" - "github.com/opentracing/opentracing-go" - "github.com/opentracing/opentracing-go/ext" - "github.com/opentracing/opentracing-go/mocktracer" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestBuilder_Create(t *testing.T) { - t.Parallel() - type args struct { - brokers []string - cfg *sarama.Config - } - tests := map[string]struct { - args args - expectedErr string - }{ - "missing brokers": {args: args{brokers: nil, cfg: sarama.NewConfig()}, expectedErr: "brokers are empty or have an empty value\n"}, - "missing config": {args: args{brokers: []string{"123"}, cfg: nil}, expectedErr: "no Sarama configuration specified\n"}, - } - for name, tt := range tests { - tt := tt - t.Run(name, func(t *testing.T) { - t.Parallel() - got, err := New(tt.args.brokers, tt.args.cfg).Create() - - require.EqualError(t, err, tt.expectedErr) - require.Nil(t, got) - }) - } -} - -func TestBuilder_CreateAsync(t *testing.T) { - t.Parallel() - type args struct { - brokers []string - cfg *sarama.Config - } - tests := map[string]struct { - args args - expectedErr string - }{ - "missing brokers": {args: args{brokers: nil, cfg: sarama.NewConfig()}, expectedErr: "brokers are empty or have an empty value\n"}, - "missing config": {args: args{brokers: []string{"123"}, cfg: nil}, expectedErr: "no Sarama configuration specified\n"}, - } - for name, tt := range tests { - tt := tt - t.Run(name, func(t *testing.T) { - t.Parallel() - got, chErr, err := New(tt.args.brokers, tt.args.cfg).CreateAsync() - - require.EqualError(t, err, tt.expectedErr) - require.Nil(t, got) - require.Nil(t, chErr) - }) - } -} - -func TestDefaultProducerSaramaConfig(t *testing.T) { - sc, err := DefaultProducerSaramaConfig("name", true) - require.NoError(t, err) - require.True(t, strings.HasSuffix(sc.ClientID, fmt.Sprintf("-%s", "name"))) - require.True(t, sc.Producer.Idempotent) - - sc, err = DefaultProducerSaramaConfig("name", false) - require.NoError(t, err) - require.False(t, sc.Producer.Idempotent) -} - -func Test_injectTracingAndCorrelationHeaders(t *testing.T) { - mtr := mocktracer.New() - opentracing.SetGlobalTracer(mtr) - t.Cleanup(func() { mtr.Reset() }) - ctx := correlation.ContextWithID(context.Background(), "123") - sp, _ := trace.ChildSpan(context.Background(), trace.ComponentOpName(componentTypeAsync, "topic"), componentTypeAsync, - ext.SpanKindProducer, asyncTag, opentracing.Tag{Key: "topic", Value: "topic"}) - msg := sarama.ProducerMessage{} - assert.NoError(t, injectTracingAndCorrelationHeaders(ctx, &msg, sp)) - assert.Len(t, msg.Headers, 4) - assert.Equal(t, correlation.HeaderID, string(msg.Headers[0].Key)) - assert.Equal(t, "123", string(msg.Headers[0].Value)) - assert.Equal(t, "mockpfx-ids-traceid", string(msg.Headers[1].Key)) - assert.Equal(t, "mockpfx-ids-spanid", string(msg.Headers[2].Key)) - assert.Equal(t, "mockpfx-ids-sampled", string(msg.Headers[3].Key)) -} diff --git a/client/kafka/v2/sync_producer.go b/client/kafka/v2/sync_producer.go deleted file mode 100644 index 37d723730..000000000 --- a/client/kafka/v2/sync_producer.go +++ /dev/null @@ -1,102 +0,0 @@ -package v2 - -import ( - "context" - "errors" - "fmt" - - "github.com/Shopify/sarama" - patronerrors "github.com/beatlabs/patron/errors" - "github.com/beatlabs/patron/trace" - "github.com/opentracing/opentracing-go" - "github.com/opentracing/opentracing-go/ext" -) - -const batchTarget = "batch" - -var syncTag = opentracing.Tag{Key: "type", Value: deliveryTypeSync} - -// SyncProducer is a synchronous Kafka producer. -type SyncProducer struct { - baseProducer - syncProd sarama.SyncProducer -} - -// Send a message to a topic. -func (p *SyncProducer) Send(ctx context.Context, msg *sarama.ProducerMessage) (partition int32, offset int64, err error) { - sp, _ := trace.ChildSpan(ctx, trace.ComponentOpName(componentTypeSync, msg.Topic), componentTypeSync, - ext.SpanKindProducer, syncTag, opentracing.Tag{Key: "topic", Value: msg.Topic}) - - err = injectTracingAndCorrelationHeaders(ctx, msg, sp) - if err != nil { - statusCountAdd(deliveryTypeSync, deliveryStatusSendError, msg.Topic, 1) - trace.SpanError(sp) - return -1, -1, fmt.Errorf("failed to inject tracing headers: %w", err) - } - - partition, offset, err = p.syncProd.SendMessage(msg) - if err != nil { - statusCountAdd(deliveryTypeSync, deliveryStatusSendError, msg.Topic, 1) - trace.SpanError(sp) - return -1, -1, err - } - - statusCountAdd(deliveryTypeSync, deliveryStatusSent, msg.Topic, 1) - trace.SpanSuccess(sp) - return partition, offset, nil -} - -// SendBatch sends a batch to a topic. -func (p *SyncProducer) SendBatch(ctx context.Context, messages []*sarama.ProducerMessage) error { - if len(messages) == 0 { - return errors.New("messages are empty or nil") - } - - spans := make([]opentracing.Span, 0, len(messages)) - - for _, msg := range messages { - sp, _ := trace.ChildSpan(ctx, trace.ComponentOpName(componentTypeSync, batchTarget), componentTypeSync, - ext.SpanKindProducer, syncTag, opentracing.Tag{Key: "topic", Value: batchTarget}) - - if err := injectTracingAndCorrelationHeaders(ctx, msg, sp); err != nil { - statusCountAdd(deliveryTypeSync, deliveryStatusSendError, msg.Topic, len(messages)) - trace.SpanError(sp) - return fmt.Errorf("failed to inject tracing headers: %w", err) - } - spans = append(spans, sp) - } - - if err := p.syncProd.SendMessages(messages); err != nil { - statusCountBatchAdd(deliveryTypeSync, deliveryStatusSendError, messages) - for _, sp := range spans { - trace.SpanError(sp) - } - - return err - } - - statusCountBatchAdd(deliveryTypeSync, deliveryStatusSent, messages) - for _, sp := range spans { - trace.SpanSuccess(sp) - } - return nil -} - -// Close shuts down the producer and waits for any buffered messages to be -// flushed. You must call this function before a producer object passes out of -// scope, as it may otherwise leak memory. -func (p *SyncProducer) Close() error { - if err := p.syncProd.Close(); err != nil { - return patronerrors.Aggregate(fmt.Errorf("failed to close sync producer client: %w", err), p.prodClient.Close()) - } - if err := p.prodClient.Close(); err != nil { - return fmt.Errorf("failed to close sync producer: %w", err) - } - return nil -} - -func statusCountBatchAdd(deliveryType string, status deliveryStatus, messages []*sarama.ProducerMessage) { - for _, msg := range messages { - statusCountAdd(deliveryType, status, msg.Topic, 1) - } -} diff --git a/component/kafka/group/component_test.go b/component/kafka/group/component_test.go index 6a9826810..83432ae66 100644 --- a/component/kafka/group/component_test.go +++ b/component/kafka/group/component_test.go @@ -109,6 +109,7 @@ func TestNew(t *testing.T) { }, } for _, tt := range tests { + tt := tt t.Run(tt.name, func(t *testing.T) { t.Parallel() diff --git a/component/kafka/group/integration_test.go b/component/kafka/group/integration_test.go index 03e3d871b..64d1213dd 100644 --- a/component/kafka/group/integration_test.go +++ b/component/kafka/group/integration_test.go @@ -13,7 +13,7 @@ import ( "time" "github.com/Shopify/sarama" - kafkaclient "github.com/beatlabs/patron/client/kafka/v2" + kafkaclient "github.com/beatlabs/patron/client/kafka" "github.com/beatlabs/patron/component/kafka" "github.com/beatlabs/patron/correlation" testkafka "github.com/beatlabs/patron/test/kafka" diff --git a/examples/http-sec/main.go b/examples/http-sec/main.go index 91dbfe9f3..ed1e49bad 100644 --- a/examples/http-sec/main.go +++ b/examples/http-sec/main.go @@ -10,7 +10,7 @@ import ( "github.com/Shopify/sarama" "github.com/beatlabs/patron" clienthttp "github.com/beatlabs/patron/client/http" - patronkafka "github.com/beatlabs/patron/client/kafka/v2" + patronkafka "github.com/beatlabs/patron/client/kafka" "github.com/beatlabs/patron/component/http/auth/apikey" v2 "github.com/beatlabs/patron/component/http/v2" "github.com/beatlabs/patron/component/http/v2/router/httprouter" From f4874311c5107ca708d94933166866a9c6213723 Mon Sep 17 00:00:00 2001 From: Sotirios Mantziaris Date: Tue, 3 Jan 2023 16:28:03 +0200 Subject: [PATCH 3/4] Deprecate v1 clients --- options_test.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/options_test.go b/options_test.go index 8053d174e..318f2035b 100644 --- a/options_test.go +++ b/options_test.go @@ -115,16 +115,11 @@ func TestRouter(t *testing.T) { type noopHTTPHandler struct{} func (noopHTTPHandler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { - return } func TestSIGHUP(t *testing.T) { t.Parallel() - type args struct { - handler func() - } - t.Run("empty value for sighup handler", func(t *testing.T) { t.Parallel() svc := &Service{} From 16259354fabf05a29a8847652cbb61377915f5db Mon Sep 17 00:00:00 2001 From: Sotirios Mantziaris Date: Tue, 3 Jan 2023 17:13:13 +0200 Subject: [PATCH 4/4] Deprecate v1 clients --- Makefile | 2 +- log/zerolog/logger_test.go | 1 + service.go | 3 +-- testdata/test.md | 7 ------- 4 files changed, 3 insertions(+), 10 deletions(-) delete mode 100644 testdata/test.md diff --git a/Makefile b/Makefile index afc90549c..170873ec6 100644 --- a/Makefile +++ b/Makefile @@ -37,7 +37,7 @@ examples: $(MAKE) -C examples deps-start: - docker-compose up -d + docker-compose up -d && sleep 10 deps-stop: docker-compose down diff --git a/log/zerolog/logger_test.go b/log/zerolog/logger_test.go index ec636367e..e34deb9f6 100644 --- a/log/zerolog/logger_test.go +++ b/log/zerolog/logger_test.go @@ -48,6 +48,7 @@ func TestLogMetrics(t *testing.T) { } for name, tt := range tests { tt := tt + name := name t.Run(name, func(t *testing.T) { t.Parallel() assert.Equal(t, 0.0, testutil.ToFloat64(log.LevelCount(string(tt.lvl)))) diff --git a/service.go b/service.go index 55817b914..350b1feae 100644 --- a/service.go +++ b/service.go @@ -13,9 +13,8 @@ import ( "syscall" "time" - "github.com/beatlabs/patron/component/http/v2/router/httprouter" - v2 "github.com/beatlabs/patron/component/http/v2" + "github.com/beatlabs/patron/component/http/v2/router/httprouter" patronErrors "github.com/beatlabs/patron/errors" "github.com/beatlabs/patron/log" patronzerolog "github.com/beatlabs/patron/log/zerolog" diff --git a/testdata/test.md b/testdata/test.md deleted file mode 100644 index 82a8ff476..000000000 --- a/testdata/test.md +++ /dev/null @@ -1,7 +0,0 @@ -# Markdown: Syntax - -This is the first paragraph. - -## Overview - -This is the second paragraph. \ No newline at end of file