Skip to content

Commit

Permalink
rabbitmq: re-construct the module
Browse files Browse the repository at this point in the history
  • Loading branch information
romberli committed Dec 13, 2023
1 parent a508110 commit de4c766
Show file tree
Hide file tree
Showing 10 changed files with 1,474 additions and 119 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package rabbitmq
package client

import (
"fmt"
Expand Down Expand Up @@ -54,34 +54,9 @@ func newConfig(addr, user, pass, vhost, tag string) *Config {
}
}

// GetAddr returns the address
func (c *Config) GetAddr() string {
return c.Addr
}

// GetUser returns the user
func (c *Config) GetUser() string {
return c.User
}

// GetPass returns the password
func (c *Config) GetPass() string {
return c.Pass
}

// GetVhost returns the vhost
func (c *Config) GetVhost() string {
return c.Vhost
}

// GetTag returns the tag
func (c *Config) GetTag() string {
return c.Tag
}

// GetURL returns the URL
func (c *Config) GetURL() string {
return fmt.Sprintf("amqp://%s:%s@%s%s", c.GetUser(), c.GetPass(), c.GetAddr(), c.GetVhost())
return fmt.Sprintf("amqp://%s:%s@%s%s", c.User, c.Pass, c.Addr, c.Vhost)
}

type Conn struct {
Expand Down Expand Up @@ -114,17 +89,7 @@ func NewConnWithConfig(config *Config) (*Conn, error) {
}, nil
}

// GetConfig returns the config
func (c *Conn) GetConfig() *Config {
return c.Config
}

// GetConnection returns the connection
func (c *Conn) GetConnection() *amqp.Connection {
return c.Connection
}

// Close closes the connection
func (c *Conn) Close() error {
return errors.Trace(c.GetConnection().Close())
return errors.Trace(c.Connection.Close())
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package rabbitmq
package client

import (
"github.com/romberli/log"
Expand All @@ -10,21 +10,6 @@ const (
testPass = "guest"
testVhost = "/"
testTag = "test_consumer"

testExchangeName = "test_exchange"
testExchangeType = "topic"
testQueueName = "test_queue"
testKey = "test_key"
testMessage = `{"dbs": {"id": 1, "db_name": "test_db", "cluster_id": 1}}`
testMessageTemplate = `{"dbs": {"id": %d, "db_name": "test_db", "cluster_id": 1}}`
testExpiration = 1000 * 60 * 60 * 5 // 5 minutes
testPublishCount = 5

testPrefetchCount = 3
testGlobal = true
testExclusive = true
testMultiple = true
testRequeue = true
)

var testConn *Conn
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package rabbitmq
package consumer

import (
"fmt"

"github.com/pingcap/errors"

"github.com/romberli/go-util/constant"
"github.com/romberli/go-util/middleware/rabbitmq/client"

amqp "github.com/rabbitmq/amqp091-go"
)
Expand All @@ -18,24 +19,24 @@ const (
)

type Consumer struct {
Conn *Conn
Conn *client.Conn
Chan *amqp.Channel
Queue amqp.Queue
}

// NewConsumer returns a new *Consumer
func NewConsumer(addr, user, pass, vhost, tag string) (*Consumer, error) {
return NewConsumerWithConfig(NewConfig(addr, user, pass, vhost, tag))
return NewConsumerWithConfig(client.NewConfig(addr, user, pass, vhost, tag))
}

// NewConsumerWithDefault returns a new *Consumer with default config
func NewConsumerWithDefault(addr, user, pass string) (*Consumer, error) {
return NewConsumerWithConfig(NewConfigWithDefault(addr, user, pass))
return NewConsumerWithConfig(client.NewConfigWithDefault(addr, user, pass))
}

// NewConsumerWithConfig returns a new *Consumer with given config
func NewConsumerWithConfig(config *Config) (*Consumer, error) {
conn, err := NewConnWithConfig(config)
func NewConsumerWithConfig(config *client.Config) (*Consumer, error) {
conn, err := client.NewConnWithConfig(config)
if err != nil {
return nil, err
}
Expand All @@ -44,8 +45,8 @@ func NewConsumerWithConfig(config *Config) (*Consumer, error) {
}

// NewConsumerWithConn returns a new *Consumer with given connection
func NewConsumerWithConn(conn *Conn) (*Consumer, error) {
channel, err := conn.GetConnection().Channel()
func NewConsumerWithConn(conn *client.Conn) (*Consumer, error) {
channel, err := conn.Connection.Channel()
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -56,51 +57,43 @@ func NewConsumerWithConn(conn *Conn) (*Consumer, error) {
}, nil
}

// GetConn returns the connection
func (c *Consumer) GetConn() *Conn {
return c.Conn
}

// GetChannel returns the channel
func (c *Consumer) GetChannel() *amqp.Channel {
return c.Chan
}

// GetQueue returns the queue
func (c *Consumer) GetQueue() amqp.Queue {
return c.Queue
}

// CloseChannel closes the channel
func (c *Consumer) CloseChannel() error {
return errors.Trace(c.GetChannel().Close())
}

// Close disconnects the rabbitmq server
func (c *Consumer) Close() error {
if c.GetChannel() != nil && !c.GetChannel().IsClosed() {
err := c.CloseChannel()
if err != nil {
return err
}
if c.Chan != nil && !c.Chan.IsClosed() {
return errors.Trace(c.Chan.Close())
}

return nil
}

// Disconnect disconnects the rabbitmq server
func (c *Consumer) Disconnect() error {
err := c.Close()
if err != nil {
return err
}

return c.GetConn().Close()
return c.Conn.Close()
}

// Channel returns the amqp channel,
// if the channel of the consumer is nil or had been closed, a new channel will be opened,
// otherwise the existing channel will be returned
func (c *Consumer) Channel() (*amqp.Channel, error) {
if c.GetChannel() == nil || c.GetChannel().IsClosed() {
if c.Chan == nil || c.Chan.IsClosed() {
var err error
c.Chan, err = c.GetConn().Channel()
c.Chan, err = c.Conn.Channel()
if err != nil {
return nil, err
}
}

return c.GetChannel(), nil
return c.Chan, nil
}

// ExchangeDeclare declares an exchange
Expand Down Expand Up @@ -156,7 +149,7 @@ func (c *Consumer) Consume(queue string, exclusive bool) (<-chan amqp.Delivery,
return nil, err
}

deliveryChan, err := channel.Consume(queue, c.GetConn().GetConfig().GetTag(), false, exclusive, false, false, nil)
deliveryChan, err := channel.Consume(queue, c.Conn.Config.Tag, false, exclusive, false, false, nil)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -171,7 +164,7 @@ func (c *Consumer) Cancel() error {
return err
}

return errors.Trace(channel.Cancel(c.GetConn().GetConfig().GetTag(), false))
return errors.Trace(channel.Cancel(c.Conn.Config.Tag, false))
}

// Ack acknowledges a delivery
Expand Down Expand Up @@ -205,7 +198,7 @@ func (c *Consumer) IsExclusiveUseError(queue string, err error) bool {
return false
}

message := fmt.Sprintf(ErrQueueExclusiveUseReasonTemplate, queue, c.GetConn().GetConfig().GetVhost())
message := fmt.Sprintf(ErrQueueExclusiveUseReasonTemplate, queue, c.Conn.Config.Vhost)

return e.Code == ErrQueueExclusiveUseCode && e.Reason == message
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package rabbitmq
package consumer

import (
"testing"
Expand All @@ -8,16 +8,57 @@ import (
"github.com/stretchr/testify/assert"

"github.com/romberli/go-util/common"
"github.com/romberli/go-util/middleware/rabbitmq/client"
)

var testConsumer *Consumer
const (
testAddr = "192.168.137.11:5672"
testUser = "guest"
testPass = "guest"
testVhost = "/"
testTag = "test_consumer"

testExchangeName = "test_exchange"
testExchangeType = "topic"
testQueueName = "test_queue"
testKey = "test_key"
testMessage = `{"dbs": {"id": 1, "db_name": "test_db", "cluster_id": 1}}`
testMessageTemplate = `{"dbs": {"id": %d, "db_name": "test_db", "cluster_id": 1}}`
testExpiration = 1000 * 60 * 60 * 5 // 5 minutes
testPublishCount = 5

testPrefetchCount = 3
testGlobal = true
testExclusive = true
testMultiple = true
testRequeue = true

testMaxWaitTime = 10 * time.Second
)

var (
testConn *client.Conn
testConsumer *Consumer
)

func init() {
testConn = testCreateConn(testAddr, testUser, testPass)
testConsumer = testCreateConsumer(testConn)
}

func testCreateConsumer(conn *Conn) *Consumer {
// testCreateConn returns a new *Conn with given address, user and password
func testCreateConn(addr, user, pass string) *client.Conn {
var err error

testConn, err = client.NewConn(addr, user, pass, testVhost, testTag)
if err != nil {
log.Errorf("creating new Connection failed. %s", err)
}

return testConn
}

func testCreateConsumer(conn *client.Conn) *Consumer {
var err error

testConsumer, err = NewConsumerWithConn(conn)
Expand Down Expand Up @@ -116,6 +157,24 @@ func TestConsumer_Ack(t *testing.T) {
}
}

func TestConsumer_AckAndWait(t *testing.T) {
asst := assert.New(t)

deliveryChan, err := testConsumer.Consume(testQueueName, testExclusive)
asst.Nil(err, common.CombineMessageWithError("test Ack() failed", err))
for {
select {
case d := <-deliveryChan:
log.Infof("%s", d.Body)
err = testConsumer.Ack(d.DeliveryTag, testMultiple)
asst.Nil(err, common.CombineMessageWithError("test Ack() failed", err))
default:
log.Infof("no message to consume, will check later")
time.Sleep(time.Second * 3)
}
}
}

func TestConsumer_Nack(t *testing.T) {
asst := assert.New(t)

Expand Down
Loading

0 comments on commit de4c766

Please sign in to comment.