This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e509574e5c [fix][client] Match logical topic when removing unacked 
messages (#25921)
4e509574e5c is described below

commit 4e509574e5c50c3edc872dd5a2c65c1b0c35909b
Author: Dream95 <[email protected]>
AuthorDate: Fri Jun 5 18:33:10 2026 +0800

    [fix][client] Match logical topic when removing unacked messages (#25921)
    
    Signed-off-by: Dream95 <[email protected]>
---
 .../apache/pulsar/client/api/TopicMessageId.java   | 13 +++++++
 .../pulsar/client/impl/TopicMessageIdImpl.java     |  7 ++++
 .../impl/UnAckedTopicMessageRedeliveryTracker.java |  8 ++---
 .../client/impl/UnAckedTopicMessageTracker.java    |  4 +--
 .../pulsar/client/impl/TopicMessageIdImplTest.java | 17 +++++++++
 .../UnAckedTopicMessageRedeliveryTrackerTest.java  | 40 +++++++++++++++++++++-
 6 files changed, 82 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java
index 4d02a7f4096..e2791ebf26a 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java
@@ -41,6 +41,19 @@ public interface TopicMessageId extends MessageId {
      */
     String getOwnerTopic();
 
+    /**
+     * Checks if this message's owner topic and the given topic refer to the 
same base
+     * partitioned topic by comparing their base partitioned topic names.
+     *
+     * <p>For example, {@code 
persistent://public/default/my-topic-partition-0} matches
+     * {@code persistent://public/default/my-topic} or any other partition of 
that topic.
+     * Topics sharing only a name prefix (e.g., {@code my-topic} vs {@code 
my-topic-v2}) do not match.
+     *
+     * @param topicName a full topic name (non-partitioned, partitioned, or 
specific partition)
+     * @return {@code true} if both topics resolve to the same base 
partitioned topic name
+     */
+    boolean hasSameBasePartitionedTopic(String topicName);
+
     static TopicMessageId create(String topic, MessageId messageId) {
         if (messageId instanceof TopicMessageId) {
             return (TopicMessageId) messageId;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
index 6380c2a8787..79905de1ade 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
@@ -23,6 +23,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageIdAdv;
 import org.apache.pulsar.client.api.TopicMessageId;
 import org.apache.pulsar.client.api.TraceableMessageId;
+import org.apache.pulsar.common.naming.TopicName;
 
 public class TopicMessageIdImpl implements MessageIdAdv, TopicMessageId, 
TraceableMessageId {
     private static final long serialVersionUID = 1L;
@@ -92,6 +93,12 @@ public class TopicMessageIdImpl implements MessageIdAdv, 
TopicMessageId, Traceab
         return ownerTopic;
     }
 
+    @Override
+    public boolean hasSameBasePartitionedTopic(String topicName) {
+        return TopicName.get(getOwnerTopic()).getPartitionedTopicName()
+                .equals(TopicName.get(topicName).getPartitionedTopicName());
+    }
+
     @Override
     public long getLedgerId() {
         return msgId.getLedgerId();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageRedeliveryTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageRedeliveryTracker.java
index 393557cd89a..6d142c1cfde 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageRedeliveryTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageRedeliveryTracker.java
@@ -42,8 +42,8 @@ public class UnAckedTopicMessageRedeliveryTracker extends 
UnAckedMessageRedelive
                 Entry<UnackMessageIdWrapper, HashSet<UnackMessageIdWrapper>> 
entry = iterator.next();
                 UnackMessageIdWrapper messageIdWrapper = entry.getKey();
                 MessageId messageId = messageIdWrapper.getMessageId();
-                if (messageId instanceof TopicMessageId
-                        && ((TopicMessageId) 
messageId).getOwnerTopic().contains(topicName)) {
+                if (messageId instanceof TopicMessageId topicMessageId
+                        && 
topicMessageId.hasSameBasePartitionedTopic(topicName)) {
                     entry.getValue().remove(messageIdWrapper);
                     iterator.remove();
                     messageIdWrapper.recycle();
@@ -54,8 +54,8 @@ public class UnAckedTopicMessageRedeliveryTracker extends 
UnAckedMessageRedelive
             Iterator<MessageId> iteratorAckTimeOut = 
ackTimeoutMessages.keySet().iterator();
             while (iteratorAckTimeOut.hasNext()) {
                 MessageId messageId = iteratorAckTimeOut.next();
-                if (messageId instanceof TopicMessageId
-                        && ((TopicMessageId) 
messageId).getOwnerTopic().contains(topicName)) {
+                if (messageId instanceof TopicMessageId topicMessageId
+                        && 
topicMessageId.hasSameBasePartitionedTopic(topicName)) {
                     iteratorAckTimeOut.remove();
                     removed++;
                 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
index 1cbab584404..bac68a9c5dd 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
@@ -40,8 +40,8 @@ public class UnAckedTopicMessageTracker extends 
UnAckedMessageTracker {
             while (iterator.hasNext()) {
                 Entry<MessageId, HashSet<MessageId>> entry = iterator.next();
                 MessageId messageId = entry.getKey();
-                if (messageId instanceof TopicMessageId
-                        && ((TopicMessageId) 
messageId).getOwnerTopic().contains(topicName)) {
+                if (messageId instanceof TopicMessageId topicMessageId
+                        && 
topicMessageId.hasSameBasePartitionedTopic(topicName)) {
                     entry.getValue().remove(messageId);
                     iterator.remove();
                     removed++;
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicMessageIdImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicMessageIdImplTest.java
index 399c9a1dafc..c1e2740a6cf 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicMessageIdImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicMessageIdImplTest.java
@@ -19,8 +19,10 @@
 package org.apache.pulsar.client.impl;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertSame;
+import static org.testng.Assert.assertTrue;
 import org.testng.annotations.Test;
 
 public class TopicMessageIdImplTest {
@@ -54,6 +56,21 @@ public class TopicMessageIdImplTest {
         assertNotEquals(topicMsgId1, topicMsgId2);
     }
 
+    @Test
+    public void testHasSameBasePartitionedTopic() {
+        MessageIdImpl msgId = new MessageIdImpl(0, 0, 0);
+        TopicMessageIdImpl partitionMsgId = new TopicMessageIdImpl(
+                "persistent://public/default/my-topic-partition-0", msgId);
+        assertTrue(partitionMsgId.hasSameBasePartitionedTopic(
+                "persistent://public/default/my-topic-partition-1"));
+        assertTrue(partitionMsgId.hasSameBasePartitionedTopic(
+                "persistent://public/default/my-topic"));
+        assertFalse(partitionMsgId.hasSameBasePartitionedTopic(
+                "persistent://public/default/my-topic-v2"));
+        assertFalse(partitionMsgId.hasSameBasePartitionedTopic(
+                "persistent://public/default/my-topic-v2-partition-0"));
+    }
+
     @SuppressWarnings("deprecation")
     @Test
     public void testDeprecatedMethods() {
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedTopicMessageRedeliveryTrackerTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedTopicMessageRedeliveryTrackerTest.java
index 9fdab39145b..e0ad1f9111b 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedTopicMessageRedeliveryTrackerTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedTopicMessageRedeliveryTrackerTest.java
@@ -70,10 +70,48 @@ public class UnAckedTopicMessageRedeliveryTrackerTest {
         tracker.ackTimeoutMessages.put(msgInAckTimeout, 
System.currentTimeMillis() + 1_000_000L);
         assertEquals(tracker.size(), 2);
 
-        assertEquals(tracker.removeTopicMessages("my-topic"), 2);
+        
assertEquals(tracker.removeTopicMessages("persistent://public/default/my-topic-partition-0"),
 2);
         assertTrue(tracker.isEmpty());
 
         tracker.close();
     }
 
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testRemoveTopicMessagesDoesNotMatchPrefixTopic() {
+        PulsarClientImpl client = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(client.instrumentProvider()).thenReturn(InstrumentProvider.NOOP);
+        when(client.getCnxPool()).thenReturn(connectionPool);
+        @Cleanup("stop")
+        Timer timer = new HashedWheelTimer(
+                new DefaultThreadFactory("pulsar-timer", 
Thread.currentThread().isDaemon()),
+                1, TimeUnit.MILLISECONDS);
+        when(client.timer()).thenReturn(timer);
+
+        ConsumerBase<byte[]> consumer = mock(ConsumerBase.class);
+        doNothing().when(consumer).onAckTimeoutSend(any());
+        doNothing().when(consumer).redeliverUnacknowledgedMessages(any());
+
+        ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>();
+        conf.setAckTimeoutMillis(1_000_000);
+        conf.setTickDurationMillis(100_000);
+        
conf.setAckTimeoutRedeliveryBackoff(MultiplierRedeliveryBackoff.builder().build());
+
+        UnAckedTopicMessageRedeliveryTracker tracker =
+                new UnAckedTopicMessageRedeliveryTracker(client, consumer, 
conf);
+
+        String ownerTopic = 
"persistent://public/default/my-topic-v2-partition-0";
+        TopicMessageIdImpl msgInPartition =
+                new TopicMessageIdImpl(ownerTopic, new MessageIdImpl(1L, 0L, 
-1));
+
+        assertTrue(tracker.add(msgInPartition));
+        assertEquals(tracker.size(), 1);
+
+        
assertEquals(tracker.removeTopicMessages("persistent://public/default/my-topic"),
 0);
+        assertEquals(tracker.size(), 1);
+
+        tracker.close();
+    }
+
 }

Reply via email to