This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4e509574e5c [fix][client] Match logical topic when removing unacked
messages (#25921)
4e509574e5c is described below
commit 4e509574e5c50c3edc872dd5a2c65c1b0c35909b
Author: Dream95 <[email protected]>
AuthorDate: Fri Jun 5 18:33:10 2026 +0800
[fix][client] Match logical topic when removing unacked messages (#25921)
Signed-off-by: Dream95 <[email protected]>
---
.../apache/pulsar/client/api/TopicMessageId.java | 13 +++++++
.../pulsar/client/impl/TopicMessageIdImpl.java | 7 ++++
.../impl/UnAckedTopicMessageRedeliveryTracker.java | 8 ++---
.../client/impl/UnAckedTopicMessageTracker.java | 4 +--
.../pulsar/client/impl/TopicMessageIdImplTest.java | 17 +++++++++
.../UnAckedTopicMessageRedeliveryTrackerTest.java | 40 +++++++++++++++++++++-
6 files changed, 82 insertions(+), 7 deletions(-)
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java
index 4d02a7f4096..e2791ebf26a 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java
@@ -41,6 +41,19 @@ public interface TopicMessageId extends MessageId {
*/
String getOwnerTopic();
+ /**
+ * Checks if this message's owner topic and the given topic refer to the
same base
+ * partitioned topic by comparing their base partitioned topic names.
+ *
+ * <p>For example, {@code
persistent://public/default/my-topic-partition-0} matches
+ * {@code persistent://public/default/my-topic} or any other partition of
that topic.
+ * Topics sharing only a name prefix (e.g., {@code my-topic} vs {@code
my-topic-v2}) do not match.
+ *
+ * @param topicName a full topic name (non-partitioned, partitioned, or
specific partition)
+ * @return {@code true} if both topics resolve to the same base
partitioned topic name
+ */
+ boolean hasSameBasePartitionedTopic(String topicName);
+
static TopicMessageId create(String topic, MessageId messageId) {
if (messageId instanceof TopicMessageId) {
return (TopicMessageId) messageId;
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
index 6380c2a8787..79905de1ade 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
@@ -23,6 +23,7 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.client.api.TraceableMessageId;
+import org.apache.pulsar.common.naming.TopicName;
public class TopicMessageIdImpl implements MessageIdAdv, TopicMessageId,
TraceableMessageId {
private static final long serialVersionUID = 1L;
@@ -92,6 +93,12 @@ public class TopicMessageIdImpl implements MessageIdAdv,
TopicMessageId, Traceab
return ownerTopic;
}
+ @Override
+ public boolean hasSameBasePartitionedTopic(String topicName) {
+ return TopicName.get(getOwnerTopic()).getPartitionedTopicName()
+ .equals(TopicName.get(topicName).getPartitionedTopicName());
+ }
+
@Override
public long getLedgerId() {
return msgId.getLedgerId();
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageRedeliveryTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageRedeliveryTracker.java
index 393557cd89a..6d142c1cfde 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageRedeliveryTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageRedeliveryTracker.java
@@ -42,8 +42,8 @@ public class UnAckedTopicMessageRedeliveryTracker extends
UnAckedMessageRedelive
Entry<UnackMessageIdWrapper, HashSet<UnackMessageIdWrapper>>
entry = iterator.next();
UnackMessageIdWrapper messageIdWrapper = entry.getKey();
MessageId messageId = messageIdWrapper.getMessageId();
- if (messageId instanceof TopicMessageId
- && ((TopicMessageId)
messageId).getOwnerTopic().contains(topicName)) {
+ if (messageId instanceof TopicMessageId topicMessageId
+ &&
topicMessageId.hasSameBasePartitionedTopic(topicName)) {
entry.getValue().remove(messageIdWrapper);
iterator.remove();
messageIdWrapper.recycle();
@@ -54,8 +54,8 @@ public class UnAckedTopicMessageRedeliveryTracker extends
UnAckedMessageRedelive
Iterator<MessageId> iteratorAckTimeOut =
ackTimeoutMessages.keySet().iterator();
while (iteratorAckTimeOut.hasNext()) {
MessageId messageId = iteratorAckTimeOut.next();
- if (messageId instanceof TopicMessageId
- && ((TopicMessageId)
messageId).getOwnerTopic().contains(topicName)) {
+ if (messageId instanceof TopicMessageId topicMessageId
+ &&
topicMessageId.hasSameBasePartitionedTopic(topicName)) {
iteratorAckTimeOut.remove();
removed++;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
index 1cbab584404..bac68a9c5dd 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
@@ -40,8 +40,8 @@ public class UnAckedTopicMessageTracker extends
UnAckedMessageTracker {
while (iterator.hasNext()) {
Entry<MessageId, HashSet<MessageId>> entry = iterator.next();
MessageId messageId = entry.getKey();
- if (messageId instanceof TopicMessageId
- && ((TopicMessageId)
messageId).getOwnerTopic().contains(topicName)) {
+ if (messageId instanceof TopicMessageId topicMessageId
+ &&
topicMessageId.hasSameBasePartitionedTopic(topicName)) {
entry.getValue().remove(messageId);
iterator.remove();
removed++;
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicMessageIdImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicMessageIdImplTest.java
index 399c9a1dafc..c1e2740a6cf 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicMessageIdImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicMessageIdImplTest.java
@@ -19,8 +19,10 @@
package org.apache.pulsar.client.impl;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertSame;
+import static org.testng.Assert.assertTrue;
import org.testng.annotations.Test;
public class TopicMessageIdImplTest {
@@ -54,6 +56,21 @@ public class TopicMessageIdImplTest {
assertNotEquals(topicMsgId1, topicMsgId2);
}
+ @Test
+ public void testHasSameBasePartitionedTopic() {
+ MessageIdImpl msgId = new MessageIdImpl(0, 0, 0);
+ TopicMessageIdImpl partitionMsgId = new TopicMessageIdImpl(
+ "persistent://public/default/my-topic-partition-0", msgId);
+ assertTrue(partitionMsgId.hasSameBasePartitionedTopic(
+ "persistent://public/default/my-topic-partition-1"));
+ assertTrue(partitionMsgId.hasSameBasePartitionedTopic(
+ "persistent://public/default/my-topic"));
+ assertFalse(partitionMsgId.hasSameBasePartitionedTopic(
+ "persistent://public/default/my-topic-v2"));
+ assertFalse(partitionMsgId.hasSameBasePartitionedTopic(
+ "persistent://public/default/my-topic-v2-partition-0"));
+ }
+
@SuppressWarnings("deprecation")
@Test
public void testDeprecatedMethods() {
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedTopicMessageRedeliveryTrackerTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedTopicMessageRedeliveryTrackerTest.java
index 9fdab39145b..e0ad1f9111b 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedTopicMessageRedeliveryTrackerTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedTopicMessageRedeliveryTrackerTest.java
@@ -70,10 +70,48 @@ public class UnAckedTopicMessageRedeliveryTrackerTest {
tracker.ackTimeoutMessages.put(msgInAckTimeout,
System.currentTimeMillis() + 1_000_000L);
assertEquals(tracker.size(), 2);
- assertEquals(tracker.removeTopicMessages("my-topic"), 2);
+
assertEquals(tracker.removeTopicMessages("persistent://public/default/my-topic-partition-0"),
2);
assertTrue(tracker.isEmpty());
tracker.close();
}
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testRemoveTopicMessagesDoesNotMatchPrefixTopic() {
+ PulsarClientImpl client = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(client.instrumentProvider()).thenReturn(InstrumentProvider.NOOP);
+ when(client.getCnxPool()).thenReturn(connectionPool);
+ @Cleanup("stop")
+ Timer timer = new HashedWheelTimer(
+ new DefaultThreadFactory("pulsar-timer",
Thread.currentThread().isDaemon()),
+ 1, TimeUnit.MILLISECONDS);
+ when(client.timer()).thenReturn(timer);
+
+ ConsumerBase<byte[]> consumer = mock(ConsumerBase.class);
+ doNothing().when(consumer).onAckTimeoutSend(any());
+ doNothing().when(consumer).redeliverUnacknowledgedMessages(any());
+
+ ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>();
+ conf.setAckTimeoutMillis(1_000_000);
+ conf.setTickDurationMillis(100_000);
+
conf.setAckTimeoutRedeliveryBackoff(MultiplierRedeliveryBackoff.builder().build());
+
+ UnAckedTopicMessageRedeliveryTracker tracker =
+ new UnAckedTopicMessageRedeliveryTracker(client, consumer,
conf);
+
+ String ownerTopic =
"persistent://public/default/my-topic-v2-partition-0";
+ TopicMessageIdImpl msgInPartition =
+ new TopicMessageIdImpl(ownerTopic, new MessageIdImpl(1L, 0L,
-1));
+
+ assertTrue(tracker.add(msgInPartition));
+ assertEquals(tracker.size(), 1);
+
+
assertEquals(tracker.removeTopicMessages("persistent://public/default/my-topic"),
0);
+ assertEquals(tracker.size(), 1);
+
+ tracker.close();
+ }
+
}