This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 40d32542e06 [fix][broker] Prevent dedup recovery race from allowing 
duplicate messages (#25953)
40d32542e06 is described below

commit 40d32542e06507e752a8725f03dbfb3de25f086c
Author: void-ptr974 <[email protected]>
AuthorDate: Mon Jun 8 07:04:35 2026 +0800

    [fix][broker] Prevent dedup recovery race from allowing duplicate messages 
(#25953)
---
 .../service/persistent/MessageDeduplication.java   | 35 +++++++++---
 .../broker/BrokerMessageDeduplicationTest.java     | 64 ++++++++++++++++++++++
 2 files changed, 91 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 2db01ccc946..a980556f49b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -104,6 +104,7 @@ public class MessageDeduplication {
 
 
     private volatile Status status;
+    private CompletableFuture<Void> statusChangeFuture = 
CompletableFuture.completedFuture(null);
 
     // Map that contains the highest sequenceId that have been sent by each 
producers. The map will be updated before
     // the messages are persisted
@@ -159,7 +160,10 @@ public class MessageDeduplication {
     public CompletableFuture<Void> checkStatus() {
         boolean shouldBeEnabled = topic.isDeduplicationEnabled();
         synchronized (this) {
-            if (status == Status.Recovering || status == Status.Removing) {
+            if (status == Status.Recovering) {
+                return statusChangeFuture;
+            }
+            if (status == Status.Removing) {
                 // If there's already a transition happening, check later for 
status
                 pulsar.getExecutor().schedule(this::checkStatus, 1, 
TimeUnit.MINUTES);
                 return CompletableFuture.completedFuture(null);
@@ -224,17 +228,27 @@ public class MessageDeduplication {
                         }, null);
 
                 return future;
-            } else if ((status == Status.Disabled || status == 
Status.Initialized) && shouldBeEnabled) {
+            } else if ((status == Status.Disabled || status == 
Status.Initialized || status == Status.Failed)
+                    && shouldBeEnabled) {
                 // Enable deduping
-                final var future = openCursor(managedLedger, 
PersistentTopic.DEDUPLICATION_CURSOR_NAME)
-                        .thenCompose(this::replayCursor);
-                future.exceptionally(e -> {
+                status = Status.Recovering;
+                final CompletableFuture<Void> future;
+                try {
+                    future = openCursor(managedLedger, 
PersistentTopic.DEDUPLICATION_CURSOR_NAME)
+                            .thenCompose(this::replayCursor);
+                } catch (Throwable e) {
                     status = Status.Failed;
+                    statusChangeFuture = CompletableFuture.failedFuture(e);
                     log.error().exception(e).log("Failed to enable 
deduplication");
-                    future.completeExceptionally(e);
-                    return null;
+                    return statusChangeFuture;
+                }
+                statusChangeFuture = future.whenComplete((__, e) -> {
+                    if (e != null) {
+                        status = Status.Failed;
+                        log.error().exception(e).log("Failed to enable 
deduplication");
+                    }
                 });
-                return future;
+                return statusChangeFuture;
             } else {
                 // Nothing to do, we are in the correct state
                 return CompletableFuture.completedFuture(null);
@@ -244,6 +258,11 @@ public class MessageDeduplication {
 
     private CompletableFuture<Void> replayCursor(ManagedCursor cursor) {
         managedCursor = cursor;
+        cursor.rewind();
+        snapshotCounter = 0;
+        highestSequencedPushed.clear();
+        highestSequencedPersisted.clear();
+        inactiveProducers.clear();
         // Load the sequence ids from the snapshot in the cursor properties
         managedCursor.getProperties().forEach((k, v) -> {
             producerRemoved(k);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerMessageDeduplicationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerMessageDeduplicationTest.java
index c83803d9b9f..f36caf87e6c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerMessageDeduplicationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerMessageDeduplicationTest.java
@@ -25,7 +25,10 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import io.netty.buffer.ByteBuf;
@@ -35,6 +38,8 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -129,5 +134,64 @@ public class BrokerMessageDeduplicationTest {
             assertTrue(e.getCause() instanceof RuntimeException);
             assertTrue(e.getMessage().contains("asyncReadEntries failed"));
         }
+        assertEquals(String.valueOf(deduplication.getStatus()), "Failed");
+    }
+
+    @Test
+    public void checkStatusDoesNotStartMultipleRecoveries() throws Exception {
+        final var cursor = mock(ManagedCursor.class);
+        final var openCursorCallback = new 
AtomicReference<AsyncCallbacks.OpenCursorCallback>();
+        doAnswer(invocation -> {
+            openCursorCallback.set(invocation.getArgument(1));
+            return null;
+        }).when(managedLedger).asyncOpenCursor(any(), any(), any());
+        doReturn(Map.of()).when(cursor).getProperties();
+        doReturn(false).when(cursor).hasMoreEntries();
+
+        final var firstCheckStatus = deduplication.checkStatus();
+        assertFalse(firstCheckStatus.isDone());
+        assertEquals(String.valueOf(deduplication.getStatus()), "Recovering");
+
+        final var secondCheckStatus = deduplication.checkStatus();
+        assertFalse(secondCheckStatus.isDone());
+        verify(managedLedger, times(1)).asyncOpenCursor(any(), any(), any());
+
+        openCursorCallback.get().openCursorComplete(cursor, null);
+        firstCheckStatus.get(3, TimeUnit.SECONDS);
+        secondCheckStatus.get(3, TimeUnit.SECONDS);
+        assertEquals(String.valueOf(deduplication.getStatus()), "Enabled");
+        verify(managedLedger, times(1)).asyncOpenCursor(any(), any(), any());
+    }
+
+    @Test
+    public void checkStatusRetriesAfterFailedEnable() throws Exception {
+        final var cursor = mock(ManagedCursor.class);
+        doAnswer(invocation -> {
+            ((AsyncCallbacks.OpenCursorCallback) 
invocation.getArgument(1)).openCursorComplete(cursor, null);
+            return null;
+        }).when(managedLedger).asyncOpenCursor(any(), any(), any());
+        doReturn(Map.of("from-snapshot", 10L)).when(cursor).getProperties();
+
+        final var hasMoreEntriesCalls = new AtomicInteger();
+        doAnswer(invocation -> hasMoreEntriesCalls.getAndIncrement() == 
0).when(cursor).hasMoreEntries();
+        doAnswer(invocation -> {
+            throw new RuntimeException("asyncReadEntries failed");
+        }).when(cursor).asyncReadEntries(anyInt(), anyLong(), any(), any(), 
any());
+
+        try {
+            deduplication.checkStatus().get(3, TimeUnit.SECONDS);
+            fail();
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof RuntimeException);
+            assertTrue(e.getMessage().contains("asyncReadEntries failed"));
+        }
+        assertEquals(String.valueOf(deduplication.getStatus()), "Failed");
+
+        final var retry = deduplication.checkStatus();
+        retry.get(3, TimeUnit.SECONDS);
+        assertEquals(String.valueOf(deduplication.getStatus()), "Enabled");
+        
assertEquals(deduplication.getLastPublishedSequenceId("from-snapshot"), 10L);
+        verify(cursor, times(2)).rewind();
+        verify(managedLedger, times(2)).asyncOpenCursor(any(), any(), any());
     }
 }

Reply via email to