Skip to content

Commit

Permalink
refactor rabbitmq
Browse files Browse the repository at this point in the history
  • Loading branch information
kevwan committed Mar 20, 2022
1 parent 254cf9c commit d472d93
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 91 deletions.
10 changes: 6 additions & 4 deletions example/rabbitmq/admin/admin.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
package main

import (
"github.com/zeromicro/go-queue/rabbitmq"
"log"

"github.com/zeromicro/go-queue/rabbitmq"
)

func main() {

conf := rabbitmq.RabbitMqConf{
conf := rabbitmq.RabbitConf{
Host: "192.168.253.100",
Port: 5672,
Username: "guest",
Password: "guest",
}
admin := rabbitmq.MustNewRabbitMqAdmin(conf)
admin := rabbitmq.MustNewAdmin(conf)
exchangeConf := rabbitmq.ExchangeConf{
ExchangeName: "jiang",
Type: "direct",
Expand All @@ -27,6 +27,7 @@ func main() {
if err != nil {
log.Fatal(err)
}

queueConf := rabbitmq.QueueConf{
Name: "jxj",
Durable: true,
Expand All @@ -38,6 +39,7 @@ func main() {
if err != nil {
log.Fatal(err)
}

err = admin.Bind("jxj", "jxj", "jiang", false, nil)
if err != nil {
log.Fatal(err)
Expand Down
2 changes: 1 addition & 1 deletion example/rabbitmq/listener/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ package config
import "github.com/zeromicro/go-queue/rabbitmq"

type Config struct {
ListenerConf rabbitmq.RabbitMqListenerConf
ListenerConf rabbitmq.RabbitListenerConf
}
6 changes: 2 additions & 4 deletions example/rabbitmq/listener/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package main
import (
"flag"
"fmt"

"github.com/zeromicro/go-queue/example/rabbitmq/listener/config"
"github.com/zeromicro/go-queue/rabbitmq"

"github.com/zeromicro/go-zero/core/conf"
"github.com/zeromicro/go-zero/core/service"
)
Expand All @@ -17,13 +17,11 @@ func main() {
var c config.Config
conf.MustLoad(*configFile, &c)

listener := rabbitmq.MustNewRabbitMqListener(c.ListenerConf, Handler{})

listener := rabbitmq.MustNewListener(c.ListenerConf, Handler{})
serviceGroup := service.NewServiceGroup()
serviceGroup.Add(listener)
defer serviceGroup.Stop()
serviceGroup.Start()

}

type Handler struct {
Expand Down
11 changes: 6 additions & 5 deletions example/rabbitmq/sender/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@ package main

import (
"encoding/json"
"github.com/zeromicro/go-queue/rabbitmq"
"log"

"github.com/zeromicro/go-queue/rabbitmq"
)

func main() {

conf := rabbitmq.RabbitMqSenderConf{RabbitMqConf: rabbitmq.RabbitMqConf{
conf := rabbitmq.RabbitSenderConf{RabbitConf: rabbitmq.RabbitConf{
Host: "192.168.253.100",
Port: 5672,
Username: "guest",
Password: "guest",
}, ContentType: "application/json"}
sender := rabbitmq.MustNewRabbitMqSender(conf)
sender := rabbitmq.MustNewSender(conf)
data := map[string]interface{}{
"msg": "json test 111",
}
Expand All @@ -23,8 +23,9 @@ func main() {
if err != nil {
log.Fatal(err)
}

conf.ContentType = "text/plain"
sender = rabbitmq.MustNewRabbitMqSender(conf)
sender = rabbitmq.MustNewSender(conf)
message := "test message"
err = sender.Send("exchange.direct", "gozero", []byte(message))
if err != nil {
Expand Down
22 changes: 13 additions & 9 deletions rabbitmq/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,32 @@ package rabbitmq

import "fmt"

type RabbitMqConf struct {
type RabbitConf struct {
Username string
Password string
Host string
Port int
VHost string `json:",optional"`
}

type RabbitMqListenerConf struct {
RabbitMqConf
type RabbitListenerConf struct {
RabbitConf
ListenerQueues []ConsumerConf
}

type ConsumerConf struct {
Name string
AutoAck bool `json:",default=true"`
Exclusive bool `json:",default=false"`
NoLocal bool `json:",default=false"` // Set to true, which means that messages sent by producers in the same connection cannot be delivered to consumers in this connection
NoWait bool `json:",default=false"` // Whether to block processing
// Set to true, which means that messages sent by producers in the same connection
// cannot be delivered to consumers in this connection.
NoLocal bool `json:",default=false"`
// Whether to block processing
NoWait bool `json:",default=false"`
}

type RabbitMqSenderConf struct {
RabbitMqConf
type RabbitSenderConf struct {
RabbitConf
ContentType string `json:",default=text/plain"` // MIME content type
}

Expand All @@ -46,6 +49,7 @@ type ExchangeConf struct {
Queues []QueueConf
}

func getRabbitMqURL(rabbitConf RabbitMqConf) string {
return fmt.Sprintf("amqp://%s:%s@%s:%d/%s", rabbitConf.Username, rabbitConf.Password, rabbitConf.Host, rabbitConf.Port, rabbitConf.VHost)
func getRabbitURL(rabbitConf RabbitConf) string {
return fmt.Sprintf("amqp://%s:%s@%s:%d/%s", rabbitConf.Username, rabbitConf.Password,
rabbitConf.Host, rabbitConf.Port, rabbitConf.VHost)
}
40 changes: 16 additions & 24 deletions rabbitmq/listener.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package rabbitmq

import (
"fmt"
"log"

"github.com/streadway/amqp"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/queue"
"log"
)

type (
Expand All @@ -20,40 +20,28 @@ type (
channel *amqp.Channel
forever chan bool
handler ConsumeHandler
queues RabbitMqListenerConf
queues RabbitListenerConf
}
)

func MustNewRabbitMqListener(listenerConf RabbitMqListenerConf, handler ConsumeHandler) queue.MessageQueue {
q, err := newRabbitMq(listenerConf, handler)
func MustNewListener(listenerConf RabbitListenerConf, handler ConsumeHandler) queue.MessageQueue {
listener := RabbitListener{queues: listenerConf, handler: handler, forever: make(chan bool)}
conn, err := amqp.Dial(getRabbitURL(listenerConf.RabbitConf))
if err != nil {
log.Fatal(err)
log.Fatalf("failed to connect rabbitmq, error: %v", err)
}

return q
}

func newRabbitMq(listenerConf RabbitMqListenerConf, handler ConsumeHandler) (queue.MessageQueue, error) {
listener := RabbitListener{queues: listenerConf, handler: handler, forever: make(chan bool)}
conn, err := amqp.Dial(getRabbitMqURL(listenerConf.RabbitMqConf))
listener.ErrorHandler(err, "failed to connect rabbitmq!")
listener.conn = conn

channel, err := listener.conn.Channel()
listener.ErrorHandler(err, "failed to open a channel")
listener.channel = channel
return listener, nil
}

func (q RabbitListener) ErrorHandler(err error, message string) {
if err != nil {
logx.Errorf("%s:%s", message, err)
panic(fmt.Sprintf("%s:%s", message, err))
log.Fatalf("failed to open a channel: %v", err)
}

listener.channel = channel
return listener
}

func (q RabbitListener) Start() {

for _, que := range q.queues.ListenerQueues {
msg, err := q.channel.Consume(
que.Name,
Expand All @@ -64,7 +52,10 @@ func (q RabbitListener) Start() {
que.NoWait,
nil,
)
q.ErrorHandler(err, "failed to listener")
if err != nil {
log.Fatalf("failed to listener, error: %v", err)
}

go func() {
for d := range msg {
if err := q.handler.Consume(string(d.Body)); err != nil {
Expand All @@ -73,6 +64,7 @@ func (q RabbitListener) Start() {
}
}()
}

<-q.forever
}

Expand Down
50 changes: 22 additions & 28 deletions rabbitmq/rabbitmqadmin.go
Original file line number Diff line number Diff line change
@@ -1,39 +1,35 @@
package rabbitmq

import (
"fmt"
"log"

"github.com/streadway/amqp"
"github.com/zeromicro/go-zero/core/logx"
)

type (
RabbitMqAdmin struct {
conn *amqp.Connection
channel *amqp.Channel
type Admin struct {
conn *amqp.Connection
channel *amqp.Channel
}

func MustNewAdmin(rabbitMqConf RabbitConf) *Admin {
var admin Admin
conn, err := amqp.Dial(getRabbitURL(rabbitMqConf))
if err != nil {
log.Fatalf("failed to connect rabbitmq, error: %v", err)
}
)

func MustNewRabbitMqAdmin(rabbitMqConf RabbitMqConf) *RabbitMqAdmin {
admin := &RabbitMqAdmin{}
conn, err := amqp.Dial(getRabbitMqURL(rabbitMqConf))
admin.ErrorHandler(err, "failed to connect rabbitmq!")
admin.conn = conn

channel, err := admin.conn.Channel()
admin.ErrorHandler(err, "failed to open a channel")
admin.channel = channel
return admin
}

func (q *RabbitMqAdmin) ErrorHandler(err error, message string) {
if err != nil {
logx.Errorf("%s:%s", message, err)
panic(fmt.Sprintf("%s:%s", message, err))
log.Fatalf("failed to open a channel, error: %v", err)
}

admin.channel = channel
return &admin
}

func (q *RabbitMqAdmin) DeclareExchange(conf ExchangeConf, args amqp.Table) error {
err := q.channel.ExchangeDeclare(
func (q *Admin) DeclareExchange(conf ExchangeConf, args amqp.Table) error {
return q.channel.ExchangeDeclare(
conf.ExchangeName,
conf.Type,
conf.Durable,
Expand All @@ -42,11 +38,9 @@ func (q *RabbitMqAdmin) DeclareExchange(conf ExchangeConf, args amqp.Table) erro
conf.NoWait,
args,
)

return err
}

func (q *RabbitMqAdmin) DeclareQueue(conf QueueConf, args amqp.Table) error {
func (q *Admin) DeclareQueue(conf QueueConf, args amqp.Table) error {
_, err := q.channel.QueueDeclare(
conf.Name,
conf.Durable,
Expand All @@ -55,16 +49,16 @@ func (q *RabbitMqAdmin) DeclareQueue(conf QueueConf, args amqp.Table) error {
conf.NoWait,
args,
)

return err
}

func (q *RabbitMqAdmin) Bind(queueName string, routekey string, exchange string, notWait bool, args amqp.Table) error {
err := q.channel.QueueBind(
func (q *Admin) Bind(queueName string, routekey string, exchange string, notWait bool, args amqp.Table) error {
return q.channel.QueueBind(
queueName,
routekey,
exchange,
notWait,
args,
)
return err
}
30 changes: 14 additions & 16 deletions rabbitmq/sender.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package rabbitmq

import (
"fmt"
"log"

"github.com/streadway/amqp"
"github.com/zeromicro/go-zero/core/logx"
)

type (
Expand All @@ -18,26 +18,25 @@ type (
}
)

func MustNewRabbitMqSender(rabbitMqConf RabbitMqSenderConf) Sender {
func MustNewSender(rabbitMqConf RabbitSenderConf) Sender {
sender := &RabbitMqSender{ContentType: rabbitMqConf.ContentType}
conn, err := amqp.Dial(getRabbitMqURL(rabbitMqConf.RabbitMqConf))
sender.ErrorHandler(err, "failed to connect rabbitmq!")
sender.conn = conn
conn, err := amqp.Dial(getRabbitURL(rabbitMqConf.RabbitConf))
if err != nil {
log.Fatalf("failed to connect rabbitmq, error: %v", err)
}

sender.conn = conn
channel, err := sender.conn.Channel()
sender.ErrorHandler(err, "failed to open a channel")
sender.channel = channel
return sender
}
func (q *RabbitMqSender) ErrorHandler(err error, message string) {
if err != nil {
logx.Errorf("%s:%s", message, err)
panic(fmt.Sprintf("%s:%s", message, err))
log.Fatalf("failed to open a channel, error: %v", err)
}

sender.channel = channel
return sender
}
func (q *RabbitMqSender) Send(exchange string, routeKey string, msg []byte) error {

err := q.channel.Publish(
func (q *RabbitMqSender) Send(exchange string, routeKey string, msg []byte) error {
return q.channel.Publish(
exchange,
routeKey,
false,
Expand All @@ -47,5 +46,4 @@ func (q *RabbitMqSender) Send(exchange string, routeKey string, msg []byte) erro
Body: msg,
},
)
return err
}

0 comments on commit d472d93

Please sign in to comment.