Skip to content

Commit

Permalink
Merge pull request #73 from zeromicro/chore/pushwithkey-tests
Browse files Browse the repository at this point in the history
feat: support push with key
  • Loading branch information
kevwan authored Jul 24, 2024
2 parents 319b750 + e93e27f commit 77e9086
Show file tree
Hide file tree
Showing 5 changed files with 362 additions and 31 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ require (
github.com/prometheus/procfs v0.12.0 // indirect
github.com/redis/go-redis/v9 v9.5.3 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
go.opentelemetry.io/otel/exporters/jaeger v1.17.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0 // indirect
Expand Down
43 changes: 14 additions & 29 deletions kq/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@ type (
PushOption func(options *pushOptions)

Pusher struct {
producer *kafka.Writer
topic string
producer kafkaWriter
executor *executors.ChunkExecutor
}

kafkaWriter interface {
Close() error
WriteMessages(ctx context.Context, msgs ...kafka.Message) error
}

pushOptions struct {
// kafka.Writer options
allowAutoTopicCreation bool
Expand Down Expand Up @@ -138,23 +143,17 @@ func (p *Pusher) PushWithKey(ctx context.Context, key, v string) error {
}
}

// SetWriterBalancer set kafka-go custom writer balancer.
func (p *Pusher) SetWriterBalancer(balancer kafka.Balancer) {
if p.producer != nil {
p.producer.Balancer = balancer
// WithAllowAutoTopicCreation allows the Pusher to create the given topic if it does not exist.
func WithAllowAutoTopicCreation() PushOption {
return func(options *pushOptions) {
options.allowAutoTopicCreation = true
}
}

// PushWithKey sends a message to the Kafka topic with custom message key.
func (p *Pusher) PushWithKey(k, v string) error {
msg := kafka.Message{
Key: []byte(k), // custom message key
Value: []byte(v),
}
if p.executor != nil {
return p.executor.Add(msg, len(v))
} else {
return p.producer.WriteMessages(context.Background(), msg)
// WithBalancer customizes the Pusher with the given balancer.
func WithBalancer(balancer kafka.Balancer) PushOption {
return func(options *pushOptions) {
options.balancer = balancer
}
}

Expand All @@ -172,20 +171,6 @@ func WithFlushInterval(interval time.Duration) PushOption {
}
}

// WithAllowAutoTopicCreation allows the Pusher to create the given topic if it does not exist.
func WithAllowAutoTopicCreation() PushOption {
return func(options *pushOptions) {
options.allowAutoTopicCreation = true
}
}

// WithBalancer customizes the Pusher with the given balancer.
func WithBalancer(balancer kafka.Balancer) PushOption {
return func(options *pushOptions) {
options.balancer = balancer
}
}

// WithSyncPush enables the Pusher to push messages synchronously.
func WithSyncPush() PushOption {
return func(options *pushOptions) {
Expand Down
141 changes: 141 additions & 0 deletions kq/pusher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package kq

import (
"context"
"errors"
"testing"
"time"

"github.com/segmentio/kafka-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

// mockKafkaWriter is a mock for kafka.Writer
type mockKafkaWriter struct {
mock.Mock
}

func (m *mockKafkaWriter) WriteMessages(ctx context.Context, msgs ...kafka.Message) error {
args := m.Called(ctx, msgs)
return args.Error(0)
}

func (m *mockKafkaWriter) Close() error {
args := m.Called()
return args.Error(0)
}

func TestNewPusher(t *testing.T) {
addrs := []string{"localhost:9092"}
topic := "test-topic"

t.Run("DefaultOptions", func(t *testing.T) {
pusher := NewPusher(addrs, topic)
assert.NotNil(t, pusher)
assert.NotNil(t, pusher.producer)
assert.Equal(t, topic, pusher.topic)
assert.NotNil(t, pusher.executor)
})

t.Run("WithSyncPush", func(t *testing.T) {
pusher := NewPusher(addrs, topic, WithSyncPush())
assert.NotNil(t, pusher)
assert.NotNil(t, pusher.producer)
assert.Equal(t, topic, pusher.topic)
assert.Nil(t, pusher.executor)
})

t.Run("WithChunkSize", func(t *testing.T) {
pusher := NewPusher(addrs, topic, WithChunkSize(100))
assert.NotNil(t, pusher)
assert.NotNil(t, pusher.executor)
})

t.Run("WithFlushInterval", func(t *testing.T) {
pusher := NewPusher(addrs, topic, WithFlushInterval(time.Second))
assert.NotNil(t, pusher)
assert.NotNil(t, pusher.executor)
})

t.Run("WithAllowAutoTopicCreation", func(t *testing.T) {
pusher := NewPusher(addrs, topic, WithAllowAutoTopicCreation())
assert.NotNil(t, pusher)
assert.True(t, pusher.producer.(*kafka.Writer).AllowAutoTopicCreation)
})
}

func TestPusher_Close(t *testing.T) {
mockWriter := new(mockKafkaWriter)
pusher := &Pusher{
producer: mockWriter,
}

mockWriter.On("Close").Return(nil)

err := pusher.Close()
assert.NoError(t, err)
mockWriter.AssertExpectations(t)
}

func TestPusher_Name(t *testing.T) {
topic := "test-topic"
pusher := &Pusher{topic: topic}

assert.Equal(t, topic, pusher.Name())
}

func TestPusher_Push(t *testing.T) {
mockWriter := new(mockKafkaWriter)
pusher := &Pusher{
producer: mockWriter,
topic: "test-topic",
}

ctx := context.Background()
value := "test-value"

mockWriter.On("WriteMessages", mock.Anything, mock.AnythingOfType("[]kafka.Message")).Return(nil)

err := pusher.Push(ctx, value)
assert.NoError(t, err)
mockWriter.AssertExpectations(t)
}

func TestPusher_PushWithKey(t *testing.T) {
mockWriter := new(mockKafkaWriter)
pusher := &Pusher{
producer: mockWriter,
topic: "test-topic",
}

ctx := context.Background()
key := "test-key"
value := "test-value"

mockWriter.On("WriteMessages", mock.Anything, mock.AnythingOfType("[]kafka.Message")).Return(nil)

err := pusher.PushWithKey(ctx, key, value)
assert.NoError(t, err)
mockWriter.AssertExpectations(t)
}

func TestPusher_PushWithKey_Error(t *testing.T) {
mockWriter := new(mockKafkaWriter)
pusher := &Pusher{
producer: mockWriter,
topic: "test-topic",
}

ctx := context.Background()
key := "test-key"
value := "test-value"

expectedError := errors.New("write error")
mockWriter.On("WriteMessages", mock.Anything, mock.AnythingOfType("[]kafka.Message")).Return(expectedError)

err := pusher.PushWithKey(ctx, key, value)
assert.Error(t, err)
assert.Equal(t, expectedError, err)
mockWriter.AssertExpectations(t)
}
10 changes: 8 additions & 2 deletions kq/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ type (
Consume(ctx context.Context, key, value string) error
}

kafkaReader interface {
FetchMessage(ctx context.Context) (kafka.Message, error)
CommitMessages(ctx context.Context, msgs ...kafka.Message) error
Close() error
}

queueOptions struct {
commitInterval time.Duration
queueCapacity int
Expand All @@ -54,7 +60,7 @@ type (

kafkaQueue struct {
c KqConf
consumer *kafka.Reader
consumer kafkaReader
handler ConsumeHandler
channel chan kafka.Message
producerRoutines *threading.RoutineGroup
Expand Down Expand Up @@ -191,7 +197,7 @@ func (q *kafkaQueue) Start() {
q.producerRoutines.Wait()
close(q.channel)
q.consumerRoutines.Wait()

logx.Infof("Consumer %s is closed", q.c.Name)
}
}
Expand Down
Loading

0 comments on commit 77e9086

Please sign in to comment.