From 8b9d6ac31dc694c7a7480570c8f9d32f5eeaebb0 Mon Sep 17 00:00:00 2001 From: wangbo Date: Wed, 9 Aug 2023 12:07:10 +0800 Subject: [PATCH 1/2] add stop --- rabbitmq/sender.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/rabbitmq/sender.go b/rabbitmq/sender.go index af7f1b6..257ed6d 100644 --- a/rabbitmq/sender.go +++ b/rabbitmq/sender.go @@ -49,3 +49,8 @@ func (q *RabbitMqSender) Send(exchange string, routeKey string, msg []byte) erro }, ) } + +func (q *RabbitMqSender) Stop() { + q.channel.Close() + q.conn.Close() +} From 7647654ce8cdb781a7ae5f83c74da547b77ed957 Mon Sep 17 00:00:00 2001 From: wangbo Date: Wed, 21 Feb 2024 00:22:51 +0800 Subject: [PATCH 2/2] service --- dq/consumer.go | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/dq/consumer.go b/dq/consumer.go index daf4c1c..37eaefe 100644 --- a/dq/consumer.go +++ b/dq/consumer.go @@ -6,6 +6,7 @@ import ( "github.com/zeromicro/go-zero/core/hash" "github.com/zeromicro/go-zero/core/logx" + "github.com/zeromicro/go-zero/core/queue" "github.com/zeromicro/go-zero/core/service" "github.com/zeromicro/go-zero/core/stores/redis" ) @@ -19,6 +20,10 @@ const ( var maxCheckBytes = getMaxTimeLen() type ( + ConsumeHandler interface { + Consume(message string) error + } + Consume func(body []byte) Consumer interface { @@ -42,6 +47,43 @@ func NewConsumer(c DqConf) Consumer { } } +func NewService(c DqConf, handler ConsumeHandler) []queue.MessageQueue { + var nodes []*consumerNode + for _, node := range c.Beanstalks { + nodes = append(nodes, newConsumerNode(node.Endpoint, node.Tube)) + } + cluster := &consumerCluster{ + nodes: nodes, + red: c.Redis.NewRedis(), + } + guardedConsume := func(body []byte) { + key := hash.Md5Hex(body) + body, ok := cluster.unwrap(body) + if !ok { + logx.Errorf("discarded: %q", string(body)) + return + } + + ok, err := cluster.red.SetnxEx(key, guardValue, expiration) + if err != nil { + logx.Error(err) + } else if ok { + handler.Consume(string(body)) + } + } + + var services []queue.MessageQueue + + for _, node := range nodes { + services = append(services, consumeService{ + c: node, + consume: guardedConsume, + }) + } + + return services +} + func (c *consumerCluster) Consume(consume Consume) { guardedConsume := func(body []byte) { key := hash.Md5Hex(body)