Skip to content

Commit

Permalink
GH-2478 Handle conversion exception in AsyncRabbitTemplate
Browse files Browse the repository at this point in the history
Previously, conversion errors in AsyncRabbitTemplate lead to AmqpReplyTimeoutException
  • Loading branch information
BenEfrati committed Dec 19, 2024
1 parent d139c98 commit 91f9980
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer.ChannelHolder;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SmartMessageConverter;
import org.springframework.amqp.utils.JavaUtils;
Expand Down Expand Up @@ -89,6 +90,7 @@
* @author Artem Bilan
* @author FengYang Su
* @author Ngoc Nhan
* @author Ben Efrati
*
* @since 1.6
*/
Expand Down Expand Up @@ -604,12 +606,17 @@ public void onMessage(Message message, Channel channel) {
if (future instanceof RabbitConverterFuture) {
MessageConverter messageConverter = this.template.getMessageConverter();
RabbitConverterFuture<Object> rabbitFuture = (RabbitConverterFuture<Object>) future;
Object converted = rabbitFuture.getReturnType() != null
try {
Object converted = rabbitFuture.getReturnType() != null
&& messageConverter instanceof SmartMessageConverter smart
? smart.fromMessage(message,
rabbitFuture.getReturnType())
: messageConverter.fromMessage(message);
rabbitFuture.complete(converted);
rabbitFuture.complete(converted);
}
catch (MessageConversionException e) {
rabbitFuture.completeExceptionally(e);
}
}
else {
((RabbitMessageFuture) future).complete(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.rabbit.listener.adapter.ReplyingMessageListener;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.amqp.support.postprocessor.GUnzipPostProcessor;
import org.springframework.amqp.support.postprocessor.GZipPostProcessor;
Expand All @@ -72,6 +73,7 @@
/**
* @author Gary Russell
* @author Artem Bilan
* @author Ben Efrati
*
* @since 1.6
*/
Expand Down Expand Up @@ -394,6 +396,29 @@ public void testStopCancelled() throws Exception {
assertThat(callback.result).isNull();
}

@Test
@DirtiesContext
public void testConversionException() throws InterruptedException {
this.asyncTemplate.getRabbitTemplate().setMessageConverter(new SimpleMessageConverter() {
@Override
public Object fromMessage(Message message) throws MessageConversionException {
throw new MessageConversionException("Failed to convert message");
}
});

RabbitConverterFuture<String> replyFuture = this.asyncTemplate.convertSendAndReceive("conversionException");

final CountDownLatch cdl = new CountDownLatch(1);
final AtomicReference<Object> resultRef = new AtomicReference<>();
replyFuture.whenComplete((result, ex) -> {
resultRef.set(result);
cdl.countDown();
});
assertThat(cdl.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(replyFuture).isCompletedExceptionally();
assertThat(resultRef.get()).isNull();
}

@Test
void ctorCoverage() {
AsyncRabbitTemplate template = new AsyncRabbitTemplate(mock(ConnectionFactory.class), "ex", "rk");
Expand Down

0 comments on commit 91f9980

Please sign in to comment.