From afb88a8c38edd4fcba41268bf3e70f4ac822b72c Mon Sep 17 00:00:00 2001 From: Andrii Pitukh Date: Thu, 28 Jul 2022 17:12:22 +0300 Subject: [PATCH] [fixed #11] Fix for closing and recreating a subscription --- .../consumer/MessageConsumerActiveMQImpl.java | 48 ++++++++----------- .../CommonQueueTopicTestConfiguration.java | 6 +-- .../spring/integrationtests/TopicTest.java | 24 ++++++++++ 3 files changed, 48 insertions(+), 30 deletions(-) diff --git a/eventuate-messaging-activemq-spring-consumer/src/main/java/io/eventuate/messaging/activemq/spring/consumer/MessageConsumerActiveMQImpl.java b/eventuate-messaging-activemq-spring-consumer/src/main/java/io/eventuate/messaging/activemq/spring/consumer/MessageConsumerActiveMQImpl.java index a68916d..67d3f27 100644 --- a/eventuate-messaging-activemq-spring-consumer/src/main/java/io/eventuate/messaging/activemq/spring/consumer/MessageConsumerActiveMQImpl.java +++ b/eventuate-messaging-activemq-spring-consumer/src/main/java/io/eventuate/messaging/activemq/spring/consumer/MessageConsumerActiveMQImpl.java @@ -23,12 +23,9 @@ public class MessageConsumerActiveMQImpl implements CommonMessageConsumer { private Connection connection; private Session session; - private List consumers = new ArrayList<>(); - private List> processingFutures = new ArrayList<>(); + private List subscriptions = new ArrayList<>(); private Map messageModes; - private AtomicBoolean runFlag = new AtomicBoolean(true); - public MessageConsumerActiveMQImpl(String url, Optional user, Optional password) { @@ -59,7 +56,8 @@ public MessageConsumerActiveMQImpl(String url, public Subscription subscribe(String subscriberId, Set channels, ActiveMQMessageHandler handler) { try { logger.info("Subscribing: subscriberId: {}, channels: {}", subscriberId, channels); - List subscriptionConsumers = new ArrayList<>(); + List> processingFutures = new ArrayList<>(); + AtomicBoolean runFlag = new AtomicBoolean(true); for (String channel : channels) { ChannelType mode = messageModes.getOrDefault(channel, ChannelType.TOPIC); @@ -72,26 +70,28 @@ public Subscription subscribe(String subscriberId, Set channels, ActiveM logger.info("Creating consumer: {}", destination); javax.jms.MessageConsumer consumer = session.createConsumer(destination); - consumers.add(consumer); - subscriptionConsumers.add(consumer); - processingFutures.add(CompletableFuture.supplyAsync(() -> process(subscriberId, consumer, handler))); + processingFutures.add(CompletableFuture.supplyAsync(() -> process(runFlag, subscriberId, consumer, handler))); logger.info("Subscribed: subscriberId: {}, channels: {}", subscriberId, channels); } - return new Subscription(() -> { - logger.info("closing consumers"); - subscriptionConsumers.forEach(consumer -> { + Subscription subscription = new Subscription(() -> { + if (!runFlag.get()) { + return; + } + + runFlag.set(false); + + processingFutures.forEach(f -> { try { - consumer.close(); - } catch (JMSException e) { - logger.error("closing consumer failed", e); - throw new RuntimeException(e); + f.get(); + } catch (InterruptedException | ExecutionException e) { + logger.error("Getting data from future failed", e); } }); - logger.info("closed consumers"); }); - + this.subscriptions.add(subscription); + return subscription; } catch (JMSException e) { logger.error("Subscription failed", e); throw new RuntimeException(e); @@ -109,7 +109,7 @@ private ActiveMQConnectionFactory createActiveMQConnectionFactory(String url, Op .orElseGet(() -> new ActiveMQConnectionFactory(url)); } - private Void process(String subscriberId, + private Void process(AtomicBoolean runFlag, String subscriberId, javax.jms.MessageConsumer consumer, ActiveMQMessageHandler handler) { logger.info("starting processing"); @@ -161,15 +161,9 @@ private void acknowledge(TextMessage textMessage) { } public void close() { - runFlag.set(false); - - processingFutures.forEach(f -> { - try { - f.get(); - } catch (InterruptedException | ExecutionException e) { - logger.error("Getting data from future failed", e); - } - }); + logger.info("closing subscriptions"); + this.subscriptions.forEach(Subscription::close); + logger.info("closed subscriptions"); try { logger.info("closing session and connection"); diff --git a/eventuate-messaging-activemq-spring-integration-tests/src/test/java/io/eventuate/messaging/activemq/spring/integrationtests/CommonQueueTopicTestConfiguration.java b/eventuate-messaging-activemq-spring-integration-tests/src/test/java/io/eventuate/messaging/activemq/spring/integrationtests/CommonQueueTopicTestConfiguration.java index bbb6942..e87337b 100644 --- a/eventuate-messaging-activemq-spring-integration-tests/src/test/java/io/eventuate/messaging/activemq/spring/integrationtests/CommonQueueTopicTestConfiguration.java +++ b/eventuate-messaging-activemq-spring-integration-tests/src/test/java/io/eventuate/messaging/activemq/spring/integrationtests/CommonQueueTopicTestConfiguration.java @@ -27,9 +27,9 @@ public String uniquePostfix() { } @Bean - public MessageConsumerActiveMQImpl messageConsumerKafka(EventuateActiveMQConfigurationProperties eventuateActiveMQConfigurationProperties, - @Qualifier("uniquePostfix") String uniquePostfix, - @Qualifier("testChannelType") ChannelType channelType) { + public MessageConsumerActiveMQImpl activeMQMessageConsumer(EventuateActiveMQConfigurationProperties eventuateActiveMQConfigurationProperties, + @Qualifier("uniquePostfix") String uniquePostfix, + @Qualifier("testChannelType") ChannelType channelType) { return new MessageConsumerActiveMQImpl(eventuateActiveMQConfigurationProperties.getUrl(), Collections.singletonMap("destination" + uniquePostfix, channelType), Optional.ofNullable(eventuateActiveMQConfigurationProperties.getUser()), diff --git a/eventuate-messaging-activemq-spring-integration-tests/src/test/java/io/eventuate/messaging/activemq/spring/integrationtests/TopicTest.java b/eventuate-messaging-activemq-spring-integration-tests/src/test/java/io/eventuate/messaging/activemq/spring/integrationtests/TopicTest.java index ab4c33f..0561eb8 100644 --- a/eventuate-messaging-activemq-spring-integration-tests/src/test/java/io/eventuate/messaging/activemq/spring/integrationtests/TopicTest.java +++ b/eventuate-messaging-activemq-spring-integration-tests/src/test/java/io/eventuate/messaging/activemq/spring/integrationtests/TopicTest.java @@ -1,6 +1,7 @@ package io.eventuate.messaging.activemq.spring.integrationtests; import io.eventuate.messaging.activemq.spring.common.ChannelType; +import io.eventuate.messaging.activemq.spring.consumer.Subscription; import io.eventuate.util.test.async.Eventually; import org.junit.Assert; import org.junit.Test; @@ -89,6 +90,29 @@ private void testSeveralSubscribers(String destination) { Eventually.eventually(() -> Assert.assertEquals(messages * consumers, concurrentLinkedQueue.size())); } + @Test + public void testCloseSubscriptionAndResubscribe() { + String subscriberId = "subscriber1"; + String destination = "destination" + uniquePostfix; + + ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue<>(); + Subscription subscription = messageConsumerActiveMQ.subscribe(subscriberId, Collections.singleton(destination), message -> + concurrentLinkedQueue.add(Integer.parseInt(message.getPayload()))); + eventuateActiveMQProducer.send(destination, "key", String.valueOf(1)); + Eventually.eventually(() -> Assert.assertEquals(1, concurrentLinkedQueue.size())); + + subscription.close(); + eventuateActiveMQProducer.send(destination, "key", String.valueOf(1)); + Eventually.eventually(() -> Assert.assertEquals(1, concurrentLinkedQueue.size())); + + messageConsumerActiveMQ.subscribe(subscriberId, Collections.singleton(destination), message -> + concurrentLinkedQueue.add(Integer.parseInt(message.getPayload()))); + Eventually.eventually(() -> Assert.assertEquals(2, concurrentLinkedQueue.size())); + + eventuateActiveMQProducer.send(destination, "key", String.valueOf(1)); + Eventually.eventually(() -> Assert.assertEquals(3, concurrentLinkedQueue.size())); + } + @Test public void testSubscriberWithPeriods() {