This is a sample project to experiment with RabbitMQ.
This project requires a running RabbitMQ server.
The applications in this repository use the implicit default configuration for RabbitMQ:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtualhost: vhost
If changed, these values need to be set in the application.yml
configuration files of the services that are supposed to connect to that RabbitMQ server.
Both message producer and consumer require configuration of the exchanges to use.
Currently, the support a topic exchange and a fanout exchange:
messaging:
exchange:
fanout: as.fanout
topic: as.topic
These exchanges don’t need to exist before the applications start, i.e., they don’t need to be set up in the Rabbit MQ management interface on port 15672 manually. The applications are able to create them as needed. The same applies to queues and queue bindings.
The producer service running on port 8080 provides a REST API to send messages to both the fanout exchange as well as the topic exchange.
If the message was sent successfully, the endpoints return a 204 No Content HTTP response.
Both endpoints acccept a request body in JSON format with the following fields:
JSON Field | Description |
---|---|
|
some arbitrary String value, e.g., some identifier of the event that’s being sent as a message |
|
an arbitrary JSON data structure |
In fact, the entire JSON structure in the REST request is only relevant for the payload of the Rabbit MQ message, i.e., the specified key has no relevance for the routing. The structure moreover is just an example for a "complex" data structure as message payload.
Broadcast messages are sent to the fanout exchange configured by the property messaging.exchange.fanout
.
POST localhost:8080/broadcast
{
"key": "my-event",
"payload": {
"flag": true,
"count": 4,
"elements": [
"value"
]
}
}
Topic messages are sent to the topic exchange configured by the property messaging.exchange.topic
.
In addition to the topic, a routing key will be set. It is defined by the routingKey
path variable, but also uses the exchange name as a prefix.
POST localhost:8080/send/topic/{routingKey}
{
"key": "my-event",
"payload": {
"flag": true,
"count": 4,
"elements": [
"value"
]
}
}
In contrast to the producer, the consumer service uses a random port. This allows for starting several instances of the same service.
Each instance of the consumer service creates a Rabbit MQ queue and binds it to the fanout exchange, so that all consumer services instances receive a message.
INFO c.a.h.c.service.ConsumerServiceRabbit : Received message object on fanout: Event(key=Fanout, payload=Hello World) INFO c.a.h.c.service.ConsumerServiceRabbit : Headers: {amqp_receivedDeliveryMode=PERSISTENT, amqp_contentEncoding=UTF-8, amqp_receivedExchange=as.fanout, amqp_deliveryTag=2, amqp_consumerQueue=8d750f5a-5278-4c1d-a4c3-6fcd26cbd50c, amqp_redelivered=false, id=900d424a-633d-3980-a681-30769fad53a0, amqp_consumerTag=amq.ctag-z5TpXftS1oYL0HTszLsGiQ, contentType=application/json, __TypeId__=cc.andreassiegel.hellorabbitmq.common.model.Event, timestamp=1508935384693}
Different than the individual queues bound to the fanout exchange,
all consumer service instances bind to the same queue (log-consumer
in this case).
With this setup, only one instance of a service receives a particular message from the topic exchange.
In addition to that, message consumption also depends on the routing key,
but the current implementation binds to the routing key ${messaging.exchange.topic}.#
.
Hence, consumers will receive all messages that were sent to the topic exchange with the exchange name as prefix.
INFO c.a.h.c.service.ConsumerServiceRabbit : Received message object on topic exchange: Event(key=Topic, payload=Hello World) INFO c.a.h.c.service.ConsumerServiceRabbit : Headers: {amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=as.topic.hello.world, amqp_contentEncoding=UTF-8, amqp_receivedExchange=as.topic, amqp_deliveryTag=3, amqp_consumerQueue=log-consumer, amqp_redelivered=false, id=f558aed7-9337-f9da-bbe5-bc9c583a2acf, amqp_consumerTag=amq.ctag-ogyT6TxHCCl1k6sXs3DwOA, contentType=application/json, __TypeId__=cc.andreassiegel.hellorabbitmq.common.model.Event, timestamp=1508935795826}