Skip to content

Commit

Permalink
feat(executor/rabbitmq): Add support for RPC (#798)
Browse files Browse the repository at this point in the history
* feat(executor/rabbitmq): Add support for RPC

Introduced new client type to use for testing RPC pattern using Direct Reply-to.

Signed-off-by: Tomaž Završnik <[email protected]>

* docs(executor/rabbitmq): Update RabbitMQ executor docs.

Signed-off-by: Tomaž Završnik <[email protected]>
  • Loading branch information
docknight authored Aug 16, 2024
1 parent cdb500d commit 8a094a8
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 45 deletions.
44 changes: 40 additions & 4 deletions executors/rabbitmq/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
# Venom - Executor RabbitMQ

Step to use publish / subscribe on a RabbitMQ
Three types of execution are supported:
- **publisher**: publish a message to a queue or to an exchange.
- **subscriber**: bind to a queue or an exchange (using routing key) and wait for message(s) to be consumed.
- **client**: publish a message to a queue or to an exchange and wait for the response message to be received on the [reply-to](https://www.rabbitmq.com/docs/direct-reply-to) queue.

Steps to use publish / subscribe on a RabbitMQ:

## Input
In your yaml file, you can use:
Expand All @@ -12,7 +17,7 @@ In your yaml file, you can use:
- user optional (default guest)
- password optional (default guest)

- clientType mandatory (publisher or subscriber)
- clientType mandatory (publisher, subscriber or client)

# RabbitMQ Q configuration
- qName mandatory
Expand All @@ -22,10 +27,10 @@ In your yaml file, you can use:
- exchangeType optional (default "fanout")
- exchange optional (default "")

# For subscriber only
# For subscriber and client only
- messageLimit optional (default 1)

# For publisher only
# For publisher and client only
- messages
- durable optional (true or false) (default false)
- contentType optional
Expand Down Expand Up @@ -140,3 +145,34 @@ vars:
- result.messages.messages0.contentencoding ShouldEqual utf8
- result.messages.messages0.contenttype ShouldEqual application/json
```
### Client (pubsub RPC)
```yaml
name: TestSuite RabbitMQ
vars:
addrs: 'amqp://localhost:5672'
user:
password:
testcases:
- name: RabbitMQ request/reply
steps:
- type: rabbitmq
addrs: "{{.addrs}}"
user: "{{.user}}"
password: "{{.password}}"
clientType: client
exchange: exchange_test
routingKey: pubsub_test
messages:
- value: '{"a": "b"}'
contentType: application/json
contentEncoding: utf8
persistent: false
headers:
myCustomHeader: value
myCustomHeader2: value2
messageLimit: 1
assertions:
- result.bodyjson.bodyjson0 ShouldContainKey Status
- result.bodyjson.bodyjson0.Status ShouldEqual Succeeded
```
119 changes: 78 additions & 41 deletions executors/rabbitmq/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Message struct {
Persistent bool `json:"persistent" yaml:"persistent"`
ContentType string `json:"content_type" yaml:"contentType"`
ContentEncoding string `json:"content_encoding" yaml:"contentEncoding"`
ReplyTo string `json:"reply_to" yaml:"replyTo"`
}

// Executor represents a Test Exec
Expand Down Expand Up @@ -103,7 +104,7 @@ func (Executor) Run(ctx context.Context, step venom.TestStep) (interface{}, erro
switch e.ClientType {
case "publisher":
workdir := venom.StringVarFromCtx(ctx, "venom.testsuite.workdir")
err := e.publishMessages(ctx, workdir)
err := e.publishMessages(ctx, workdir, nil, nil, false)
if err != nil {
result.Err = err.Error()
return nil, err
Expand All @@ -115,8 +116,35 @@ func (Executor) Run(ctx context.Context, step venom.TestStep) (interface{}, erro
result.Err = err.Error()
return nil, err
}
case "client":
var conn, ch, err = e.openChannel(ctx)
if err != nil {
result.Err = err.Error()
return nil, err
}
defer ch.Close()
defer conn.Close()
var delivery, consumererr = ch.Consume("amq.rabbitmq.reply-to", "", true, false, false, false, nil)
if consumererr != nil {
return nil, consumererr
}
venom.Info(ctx, "Reply consumer started.")

workdir := venom.StringVarFromCtx(ctx, "venom.testsuite.workdir")
err = e.publishMessages(ctx, workdir, conn, ch, true)
if err != nil {
result.Err = err.Error()
return nil, err
}

var d = <-delivery
body := []string{}
bodyJSON := []interface{}{}
body, bodyJSON = e.processMessage(ctx, d, true, body, bodyJSON)
result.Body = body
result.BodyJSON = bodyJSON
default:
return nil, fmt.Errorf("clientType %q must be publisher or subscriber", e.ClientType)
return nil, fmt.Errorf("clientType %q must be publisher or subscriber or client", e.ClientType)
}

elapsed := time.Since(start)
Expand All @@ -125,26 +153,19 @@ func (Executor) Run(ctx context.Context, step venom.TestStep) (interface{}, erro
return result, nil
}

func (e Executor) publishMessages(ctx context.Context, workdir string) error {
uri, err := amqp.ParseURI(e.Addrs)
if err != nil {
return err
}
uri.Username = e.User
uri.Password = e.Password

conn, err := amqp.Dial(uri.String())
if err != nil {
return err
}
venom.Debug(ctx, "connection opened")
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
return err
func (e Executor) publishMessages(ctx context.Context, workdir string, connection *amqp.Connection, channel *amqp.Channel, rpc bool) error {
var ch *amqp.Channel
var err error
if connection == nil || channel == nil {
ch, conn, err := e.openChannel(ctx)
if err != nil {
return err
}
defer conn.Close()
defer ch.Close()
} else {
ch = channel
}
venom.Debug(ctx, "channel opened")
defer ch.Close()

// If an exchange if defined
routingKey := e.RoutingKey
Expand Down Expand Up @@ -186,6 +207,10 @@ func (e Executor) publishMessages(ctx context.Context, workdir string) error {
if !e.Messages[i].Persistent {
deliveryMode = amqp.Transient
}
var replyTo string = e.Messages[i].ReplyTo
if rpc {
replyTo = "amq.rabbitmq.reply-to"
}
err = ch.Publish(
e.Exchange, // exchange
routingKey, // routing key
Expand All @@ -195,6 +220,7 @@ func (e Executor) publishMessages(ctx context.Context, workdir string) error {
DeliveryMode: deliveryMode,
ContentType: e.Messages[i].ContentType,
ContentEncoding: e.Messages[i].ContentEncoding,
ReplyTo: replyTo,
Body: []byte(e.Messages[i].Value),
Headers: e.Messages[i].Headers,
})
Expand All @@ -208,26 +234,51 @@ func (e Executor) publishMessages(ctx context.Context, workdir string) error {
return nil
}

func (e Executor) consumeMessages(ctx context.Context) ([]string, []interface{}, []interface{}, []amqp.Table, error) {
func (e Executor) openChannel(ctx context.Context) (*amqp.Connection, *amqp.Channel, error) {
uri, err := amqp.ParseURI(e.Addrs)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, err
}
uri.Username = e.User
uri.Password = e.Password

conn, err := amqp.Dial(uri.String())
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, err
}
venom.Debug(ctx, "connection opened")
defer conn.Close()

ch, err := conn.Channel()
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, err
}
venom.Debug(ctx, "channel opened")
return conn, ch, nil
}

func (e Executor) processMessage(ctx context.Context, msg amqp.Delivery, ok bool, body []string, bodyJSON []interface{}) ([]string, []interface{}) {
venom.Debug(ctx, "message: %t %s %s %s", ok, msg.RoutingKey, msg.MessageId, msg.ContentType)
venom.Debug(ctx, "receive: %s", string(msg.Body))
body = append(body, string(msg.Body))

bodyJSONArray := []interface{}{}
if err := venom.JSONUnmarshal(msg.Body, &bodyJSONArray); err != nil {
bodyJSONMap := map[string]interface{}{}
venom.JSONUnmarshal(msg.Body, &bodyJSONMap) //nolint
bodyJSON = append(bodyJSON, bodyJSONMap)
} else {
bodyJSON = append(bodyJSON, bodyJSONArray)
}

return body, bodyJSON
}

func (e Executor) consumeMessages(ctx context.Context) ([]string, []interface{}, []interface{}, []amqp.Table, error) {
conn, ch, err := e.openChannel(ctx)
if err != nil {
return nil, nil, nil, nil, err
}
defer conn.Close()
defer ch.Close()

q, err := ch.QueueDeclare(
Expand Down Expand Up @@ -287,21 +338,7 @@ func (e Executor) consumeMessages(ctx context.Context) ([]string, []interface{},

headers = append(headers, msg.Headers)
messages = append(messages, msg)

venom.Debug(ctx, "message: %t %s %s %s", ok, msg.RoutingKey, msg.MessageId, msg.ContentType)

venom.Debug(ctx, "receive: %s", string(msg.Body))
body = append(body, string(msg.Body))

bodyJSONArray := []interface{}{}
if err := venom.JSONUnmarshal(msg.Body, &bodyJSONArray); err != nil {
bodyJSONMap := map[string]interface{}{}
venom.JSONUnmarshal(msg.Body, &bodyJSONMap) //nolint
bodyJSON = append(bodyJSON, bodyJSONMap)
} else {
bodyJSON = append(bodyJSON, bodyJSONArray)
}

body, bodyJSON = e.processMessage(ctx, msg, ok, body, bodyJSON)
}

return body, bodyJSON, messages, headers, err
Expand Down

0 comments on commit 8a094a8

Please sign in to comment.