Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fixed #11] Fix for closing and recreating a subscription #12

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,9 @@ public class MessageConsumerActiveMQImpl implements CommonMessageConsumer {

private Connection connection;
private Session session;
private List<javax.jms.MessageConsumer> consumers = new ArrayList<>();
private List<Future<Void>> processingFutures = new ArrayList<>();
private List<Subscription> subscriptions = new ArrayList<>();
private Map<String, ChannelType> messageModes;

private AtomicBoolean runFlag = new AtomicBoolean(true);

public MessageConsumerActiveMQImpl(String url,
Optional<String> user,
Optional<String> password) {
Expand Down Expand Up @@ -59,7 +56,8 @@ public MessageConsumerActiveMQImpl(String url,
public Subscription subscribe(String subscriberId, Set<String> channels, ActiveMQMessageHandler handler) {
try {
logger.info("Subscribing: subscriberId: {}, channels: {}", subscriberId, channels);
List<javax.jms.MessageConsumer> subscriptionConsumers = new ArrayList<>();
List<Future<Void>> processingFutures = new ArrayList<>();
AtomicBoolean runFlag = new AtomicBoolean(true);
for (String channel : channels) {
40rn05lyv marked this conversation as resolved.
Show resolved Hide resolved
ChannelType mode = messageModes.getOrDefault(channel, ChannelType.TOPIC);

Expand All @@ -72,26 +70,28 @@ public Subscription subscribe(String subscriberId, Set<String> channels, ActiveM

logger.info("Creating consumer: {}", destination);
javax.jms.MessageConsumer consumer = session.createConsumer(destination);
consumers.add(consumer);
subscriptionConsumers.add(consumer);

processingFutures.add(CompletableFuture.supplyAsync(() -> process(subscriberId, consumer, handler)));
processingFutures.add(CompletableFuture.supplyAsync(() -> process(runFlag, subscriberId, consumer, handler)));
logger.info("Subscribed: subscriberId: {}, channels: {}", subscriberId, channels);
}

return new Subscription(() -> {
logger.info("closing consumers");
subscriptionConsumers.forEach(consumer -> {
Subscription subscription = new Subscription(() -> {
if (!runFlag.get()) {
return;
}

runFlag.set(false);

processingFutures.forEach(f -> {
try {
consumer.close();
} catch (JMSException e) {
logger.error("closing consumer failed", e);
throw new RuntimeException(e);
f.get();
} catch (InterruptedException | ExecutionException e) {
logger.error("Getting data from future failed", e);
}
});
logger.info("closed consumers");
});

this.subscriptions.add(subscription);
return subscription;
} catch (JMSException e) {
logger.error("Subscription failed", e);
throw new RuntimeException(e);
Expand All @@ -109,7 +109,7 @@ private ActiveMQConnectionFactory createActiveMQConnectionFactory(String url, Op
.orElseGet(() -> new ActiveMQConnectionFactory(url));
}

private Void process(String subscriberId,
private Void process(AtomicBoolean runFlag, String subscriberId,
javax.jms.MessageConsumer consumer,
ActiveMQMessageHandler handler) {
logger.info("starting processing");
Expand Down Expand Up @@ -161,15 +161,9 @@ private void acknowledge(TextMessage textMessage) {
}

public void close() {
runFlag.set(false);

processingFutures.forEach(f -> {
try {
f.get();
} catch (InterruptedException | ExecutionException e) {
logger.error("Getting data from future failed", e);
}
});
logger.info("closing subscriptions");
this.subscriptions.forEach(Subscription::close);
logger.info("closed subscriptions");

try {
logger.info("closing session and connection");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ public String uniquePostfix() {
}

@Bean
public MessageConsumerActiveMQImpl messageConsumerKafka(EventuateActiveMQConfigurationProperties eventuateActiveMQConfigurationProperties,
@Qualifier("uniquePostfix") String uniquePostfix,
@Qualifier("testChannelType") ChannelType channelType) {
public MessageConsumerActiveMQImpl activeMQMessageConsumer(EventuateActiveMQConfigurationProperties eventuateActiveMQConfigurationProperties,
@Qualifier("uniquePostfix") String uniquePostfix,
@Qualifier("testChannelType") ChannelType channelType) {
return new MessageConsumerActiveMQImpl(eventuateActiveMQConfigurationProperties.getUrl(),
Collections.singletonMap("destination" + uniquePostfix, channelType),
Optional.ofNullable(eventuateActiveMQConfigurationProperties.getUser()),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.eventuate.messaging.activemq.spring.integrationtests;

import io.eventuate.messaging.activemq.spring.common.ChannelType;
import io.eventuate.messaging.activemq.spring.consumer.Subscription;
import io.eventuate.util.test.async.Eventually;
import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -89,6 +90,29 @@ private void testSeveralSubscribers(String destination) {
Eventually.eventually(() -> Assert.assertEquals(messages * consumers, concurrentLinkedQueue.size()));
}

@Test
public void testCloseSubscriptionAndResubscribe() {
String subscriberId = "subscriber1";
String destination = "destination" + uniquePostfix;

ConcurrentLinkedQueue<Integer> concurrentLinkedQueue = new ConcurrentLinkedQueue<>();
Subscription subscription = messageConsumerActiveMQ.subscribe(subscriberId, Collections.singleton(destination), message ->
concurrentLinkedQueue.add(Integer.parseInt(message.getPayload())));
eventuateActiveMQProducer.send(destination, "key", String.valueOf(1));
Eventually.eventually(() -> Assert.assertEquals(1, concurrentLinkedQueue.size()));

subscription.close();
eventuateActiveMQProducer.send(destination, "key", String.valueOf(1));
Eventually.eventually(() -> Assert.assertEquals(1, concurrentLinkedQueue.size()));

messageConsumerActiveMQ.subscribe(subscriberId, Collections.singleton(destination), message ->
concurrentLinkedQueue.add(Integer.parseInt(message.getPayload())));
Eventually.eventually(() -> Assert.assertEquals(2, concurrentLinkedQueue.size()));

eventuateActiveMQProducer.send(destination, "key", String.valueOf(1));
Eventually.eventually(() -> Assert.assertEquals(3, concurrentLinkedQueue.size()));
}

@Test
public void testSubscriberWithPeriods() {

Expand Down