This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 45cd065d32f09eb85cf71a6fde4ffbc562a34cce
Author: Dream95 <[email protected]>
AuthorDate: Fri Jun 5 18:33:24 2026 +0800

    [improve][client] Clean up unacked message tracker when topics are removed 
in multi-topic consumers (#25923)
    
    Signed-off-by: Dream95 <[email protected]>
    (cherry picked from commit 26cf550bb9635a8107e5628800b465778c847a91)
---
 .../client/impl/MultiTopicsConsumerImpl.java       | 17 +++++---
 .../impl/PatternMultiTopicsConsumerImpl.java       |  1 +
 .../client/impl/MultiTopicsConsumerImplTest.java   | 51 ++++++++++++++++++++++
 .../impl/PatternMultiTopicsConsumerImplTest.java   | 41 +++++++++++++++++
 4 files changed, 104 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index f1a543e95d4..f611abb179f 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -974,6 +974,14 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         }
     }
 
+    protected void removeTopicMessagesFromUnackedTracker(String topicName) {
+        if (unAckedMessageTracker instanceof UnAckedTopicMessageTracker 
tracker) {
+            tracker.removeTopicMessages(topicName);
+        } else if (unAckedMessageTracker instanceof 
UnAckedTopicMessageRedeliveryTracker tracker) {
+            tracker.removeTopicMessages(topicName);
+        }
+    }
+
     /***
      * Subscribe one more given topic.
      * @param topicName topic name without the partition suffix.
@@ -1283,11 +1291,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                     });
 
                     removeTopic(topicName);
-                    if (unAckedMessageTracker instanceof 
UnAckedTopicMessageTracker tracker) {
-                        tracker.removeTopicMessages(topicName);
-                    } else if (unAckedMessageTracker instanceof 
UnAckedTopicMessageRedeliveryTracker tracker){
-                        tracker.removeTopicMessages(topicName);
-                    }
+                    removeTopicMessagesFromUnackedTracker(topicName);
 
                     unsubscribeFuture.complete(null);
                     log.info("[{}] [{}] [{}] Unsubscribed Topics Consumer, 
allTopicPartitionsNumber: {}",
@@ -1414,7 +1418,8 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                     }
                 }
 
-                return FutureUtil.waitForAll(futures);
+                return FutureUtil.waitForAll(futures)
+                        .thenRun(() -> 
removeTopicMessagesFromUnackedTracker(topicName));
             } else if (oldPartitionNumber < currentPartitionNumber) {
                 allTopicPartitionsNumber.addAndGet(currentPartitionNumber - 
oldPartitionNumber);
                 partitionedTopics.put(topicName, currentPartitionNumber);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
index 3db37864986..cbfe4832907 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
@@ -367,6 +367,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
                             
removedPartitionedTopicsForLog.add(String.format("%s with %s partitions",
                                     groupedTopicRemoved, partitions));
                             partitionedTopics.remove(groupedTopicRemoved, 
partitions);
+                            
removeTopicMessagesFromUnackedTracker(groupedTopicRemoved);
                         }
                     }
                 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
index 54175613e3b..b4d007d855f 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
@@ -37,6 +37,7 @@ import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -266,4 +267,54 @@ public class MultiTopicsConsumerImplTest {
         verify(clientMock, times(3)).getPartitionedTopicMetadata(any(), 
anyBoolean(), anyBoolean());
     }
 
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testOnTopicsExtendedRemovedTopicCleansUnackedMessages() {
+        String topicName = "persistent://public/default/deleted-topic";
+        String topicPartition0 = topicName + "-partition-0";
+        String topicPartition1 = topicName + "-partition-1";
+        String otherTopicPartition = 
"persistent://public/default/other-topic-partition-0";
+
+        ConsumerConfigurationData<byte[]> consumerConfData = new 
ConsumerConfigurationData<>();
+        consumerConfData.setSubscriptionName("subscriptionName");
+        consumerConfData.setAutoUpdatePartitions(true);
+        consumerConfData.setAutoUpdatePartitionsIntervalSeconds(60);
+        consumerConfData.setAckTimeoutMillis(1000);
+
+        MultiTopicsConsumerImpl<byte[]> impl = 
createMultiTopicsConsumer(consumerConfData);
+
+        impl.partitionedTopics.put(topicName, 2);
+        impl.allTopicPartitionsNumber.set(2);
+
+        ConsumerImpl<byte[]> partitionConsumer0 = (ConsumerImpl<byte[]>) 
mock(ConsumerImpl.class);
+        ConsumerImpl<byte[]> partitionConsumer1 = (ConsumerImpl<byte[]>) 
mock(ConsumerImpl.class);
+        when(partitionConsumer0.getTopic()).thenReturn(topicPartition0);
+        when(partitionConsumer1.getTopic()).thenReturn(topicPartition1);
+        
when(partitionConsumer0.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
+        
when(partitionConsumer1.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
+        impl.consumers.put(topicPartition0, partitionConsumer0);
+        impl.consumers.put(topicPartition1, partitionConsumer1);
+
+        TopicMessageIdImpl removedTopicMessageId = new 
TopicMessageIdImpl(topicPartition0, new MessageIdImpl(1, 1, 0));
+        TopicMessageIdImpl otherTopicMessageId =
+                new TopicMessageIdImpl(otherTopicPartition, new 
MessageIdImpl(2, 2, 0));
+        impl.getUnAckedMessageTracker().add(removedTopicMessageId);
+        impl.getUnAckedMessageTracker().add(otherTopicMessageId);
+        assertEquals(impl.getUnAckedMessageTracker().size(), 2);
+
+        when(impl.client.getPartitionsForTopic(topicName, 
false)).thenReturn(CompletableFuture.completedFuture(
+                Collections.emptyList()));
+
+        PartitionsChangedListener listener = 
impl.topicsPartitionChangedListener;
+        listener.onTopicsExtended(Collections.singleton(topicName)).join();
+
+        assertTrue(impl.getConsumers().isEmpty());
+        assertEquals(impl.partitionedTopics.get(topicName), 
Integer.valueOf(0));
+        assertEquals(impl.allTopicPartitionsNumber.get(), 0);
+        assertEquals(impl.getUnAckedMessageTracker().size(), 1);
+        
assertTrue(impl.getUnAckedMessageTracker().remove(otherTopicMessageId));
+        verify(partitionConsumer0).closeAsync();
+        verify(partitionConsumer1).closeAsync();
+    }
+
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImplTest.java
index cd799edb3bd..84fdca77feb 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImplTest.java
@@ -310,6 +310,47 @@ public class PatternMultiTopicsConsumerImplTest {
         assertThat(invocationCount.get()).isEqualTo(5);
     }
 
+    @Test
+    @SuppressWarnings("unchecked")
+    public void 
testOnTopicsRemovedCleansUnackedMessagesForRemovedPartitionedTopic() {
+        String partitionedTopic = 
"persistent://tenant/namespace/deleted-topic";
+        String partition0 = partitionedTopic + "-partition-0";
+        String partition1 = partitionedTopic + "-partition-1";
+        String otherTopicPartition = 
"persistent://tenant/namespace/other-topic-partition-0";
+        TopicsPattern topicsPattern =
+                
TopicsPatternFactory.create("persistent://tenant/namespace/.*", 
TopicsPattern.RegexImplementation.JDK);
+        ConsumerConfigurationData<byte[]> consumerConfData = 
createConsumerConfigurationData();
+        consumerConfData.setAckTimeoutMillis(1000);
+
+        PatternMultiTopicsConsumerImpl<byte[]> consumer =
+                createPatternMultiTopicsConsumer(consumerConfData, 
topicsPattern);
+
+        consumer.partitionedTopics.put(partitionedTopic, 2);
+
+        ConsumerImpl<byte[]> partitionConsumer0 = (ConsumerImpl<byte[]>) 
mock(ConsumerImpl.class);
+        ConsumerImpl<byte[]> partitionConsumer1 = (ConsumerImpl<byte[]>) 
mock(ConsumerImpl.class);
+        
when(partitionConsumer0.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
+        
when(partitionConsumer1.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
+        consumer.consumers.put(partition0, partitionConsumer0);
+        consumer.consumers.put(partition1, partitionConsumer1);
+
+        TopicMessageIdImpl removedTopicMessageId = new 
TopicMessageIdImpl(partition0, new MessageIdImpl(1, 1, 0));
+        TopicMessageIdImpl otherTopicMessageId =
+                new TopicMessageIdImpl(otherTopicPartition, new 
MessageIdImpl(2, 2, 0));
+        consumer.getUnAckedMessageTracker().add(removedTopicMessageId);
+        consumer.getUnAckedMessageTracker().add(otherTopicMessageId);
+        assertThat(consumer.getUnAckedMessageTracker().size()).isEqualTo(2);
+
+        
consumer.topicsChangeListener.onTopicsRemoved(Arrays.asList(partition0, 
partition1)).join();
+
+        
assertThat(consumer.partitionedTopics.containsKey(partitionedTopic)).isFalse();
+        assertThat(consumer.consumers).doesNotContainKeys(partition0, 
partition1);
+        assertThat(consumer.getUnAckedMessageTracker().size()).isEqualTo(1);
+        
assertThat(consumer.getUnAckedMessageTracker().remove(otherTopicMessageId)).isTrue();
+        verify(partitionConsumer0).closeAsync();
+        verify(partitionConsumer1).closeAsync();
+    }
+
     private static void runTimerTasks(Deque<TimerTask> tasks) throws Exception 
{
         // first drain the queue to a list to avoid an infinite loop
         List<TimerTask> taskList = new ArrayList<>();

Reply via email to