diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index be82b190ffb32..35204e7af72bf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -217,15 +217,7 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE consumerList.remove(consumer); log.info("Removed consumer {} with pending {} acks", consumer, consumer.getPendingAcks().size()); if (consumerList.isEmpty()) { - cancelPendingRead(); - - redeliveryMessages.clear(); - redeliveryTracker.clear(); - if (closeFuture != null) { - log.info("[{}] All consumers removed. Subscription is disconnected", name); - closeFuture.complete(null); - } - totalAvailablePermits = 0; + clearComponentsAfterRemovedAllConsumers(); } else { if (log.isDebugEnabled()) { log.debug("[{}] Consumer are left, reading more entries", name); @@ -242,8 +234,29 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE readMoreEntries(); } } else { - log.info("[{}] Trying to remove a non-connected consumer: {}", name, consumer); + /** + * This is not an expected scenario, it will never happen in expected. + * Just add a defensive code to avoid the topic can not be unloaded anymore: remove the consumers which + * are not mismatch with {@link #consumerSet}. See more detail: https://github.com/apache/pulsar/pull/22270. + */ + log.error("[{}] Trying to remove a non-connected consumer: {}", name, consumer); + consumerList.removeIf(c -> consumer.equals(c)); + if (consumerList.isEmpty()) { + clearComponentsAfterRemovedAllConsumers(); + } + } + } + + private synchronized void clearComponentsAfterRemovedAllConsumers() { + cancelPendingRead(); + + redeliveryMessages.clear(); + redeliveryTracker.clear(); + if (closeFuture != null) { + log.info("[{}] All consumers removed. Subscription is disconnected", name); + closeFuture.complete(null); } + totalAvailablePermits = 0; } @Override @@ -554,6 +567,9 @@ public synchronized CompletableFuture disconnectAllConsumers( if (consumerList.isEmpty()) { closeFuture.complete(null); } else { + // Iterator of CopyOnWriteArrayList uses the internal array to do the for-each, and CopyOnWriteArrayList + // will create a new internal array when adding/removing a new item. So remove items in the for-each + // block is safety when the for-each and add/remove are using a same lock. consumerList.forEach(consumer -> consumer.disconnect(isResetCursor, assignedBrokerLookupData)); cancelPendingRead(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java new file mode 100644 index 0000000000000..f24c5c5933e5b --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.persistent; + +import com.carrotsearch.hppc.ObjectSet; +import java.util.List; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.service.Dispatcher; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; +import org.awaitility.reflect.WhiteboxImpl; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker-api") +public class PersistentDispatcherMultipleConsumersTest extends ProducerConsumerBase { + + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test(timeOut = 30 * 1000) + public void testTopicDeleteIfConsumerSetMismatchConsumerList() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + final String subscription = "s1"; + admin.topics().createNonPartitionedTopic(topicName); + admin.topics().createSubscription(topicName, subscription, MessageId.earliest); + + Consumer consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subscription) + .subscriptionType(SubscriptionType.Shared).subscribe(); + // Make an error that "consumerSet" is mismatch with "consumerList". + Dispatcher dispatcher = pulsar.getBrokerService() + .getTopic(topicName, false).join().get() + .getSubscription(subscription).getDispatcher(); + ObjectSet consumerSet = + WhiteboxImpl.getInternalState(dispatcher, "consumerSet"); + List consumerList = + WhiteboxImpl.getInternalState(dispatcher, "consumerList"); + + org.apache.pulsar.broker.service.Consumer serviceConsumer = consumerList.get(0); + consumerSet.add(serviceConsumer); + consumerList.add(serviceConsumer); + + // Verify: the topic can be deleted successfully. + consumer.close(); + admin.topics().delete(topicName, false); + } + + @Test(timeOut = 30 * 1000) + public void testTopicDeleteIfConsumerSetMismatchConsumerList2() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + final String subscription = "s1"; + admin.topics().createNonPartitionedTopic(topicName); + admin.topics().createSubscription(topicName, subscription, MessageId.earliest); + + Consumer consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subscription) + .subscriptionType(SubscriptionType.Shared).subscribe(); + // Make an error that "consumerSet" is mismatch with "consumerList". + Dispatcher dispatcher = pulsar.getBrokerService() + .getTopic(topicName, false).join().get() + .getSubscription(subscription).getDispatcher(); + ObjectSet consumerSet = + WhiteboxImpl.getInternalState(dispatcher, "consumerSet"); + consumerSet.clear(); + + // Verify: the topic can be deleted successfully. + consumer.close(); + admin.topics().delete(topicName, false); + } +}