This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3f22cd190ab9ae54e7a6d9a8aeceeeda464021e0
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Jun 12 11:34:33 2026 +0800

    [fix][broker] Fix compacted read could be stuck forever or message loss due 
to cursor mark delete (#25998)
    
    (cherry picked from commit 7a9fefb4c4a25b1aa2c0fb335d99aa7b00ee3c91)
---
 .../service/persistent/PersistentSubscription.java | 18 +++++
 .../apache/pulsar/compaction/CompactionTest.java   | 85 ++++++++++++++++++++++
 2 files changed, 103 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index d24a3e9dec9..4880cf24f84 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -58,6 +58,7 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
 import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
 import org.apache.pulsar.broker.service.AbstractSubscription;
 import org.apache.pulsar.broker.service.AnalyzeBacklogResult;
 import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -434,6 +435,23 @@ public class PersistentSubscription extends 
AbstractSubscription {
             if (log.isDebugEnabled()) {
                 log.debug("[{}][{}] Cumulative ack on {}", topicName, subName, 
position);
             }
+            if (dispatcher instanceof AbstractDispatcherSingleActiveConsumer 
singleConsumerDispatcher) {
+                // For compacted consumer, we should ignore the position that 
does not exist in the managed ledger,
+                // otherwise, the `asyncMarkDelete` call could jump the read 
position to the active ledger, which will
+                // skip all entries present in the compacted ledger but not 
present in the managed ledger.
+                final var consumer = 
singleConsumerDispatcher.getActiveConsumer();
+                final var ml = cursor.getManagedLedger();
+                if (consumer != null
+                        && consumer.readCompacted()
+                        && !cursor.isDurable()
+                        && 
ml.getOptionalLedgerInfo(position.getLedgerId()).isEmpty()) {
+                    if (ml.getFirstPosition() == null || 
position.getLedgerId() > ml.getFirstPosition().getLedgerId()) {
+                        log.warn("Received an ACK whose position is " + 
position + ", valid ledgers: "
+                                + ml.getLedgersInfo().keySet());
+                    }
+                    return;
+                }
+            }
             cursor.asyncMarkDelete(position, mergeCursorProperties(properties),
                     markDeleteCallback, previousMarkDeletePosition);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index c882fe460fd..1d4856fe053 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -56,6 +56,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiConsumer;
@@ -70,12 +71,15 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.commons.lang3.mutable.MutableLong;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.intercept.MockBrokerInterceptor;
 import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.service.ServerCnx;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -87,6 +91,7 @@ import org.apache.pulsar.client.api.CryptoKeyReader;
 import org.apache.pulsar.client.api.EncryptionKeyInfo;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
@@ -124,6 +129,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
     protected ScheduledExecutorService compactionScheduler;
     protected BookKeeper bk;
     private PublishingOrderCompactor compactor;
+    private volatile 
java.util.function.Consumer<org.apache.pulsar.broker.service.Consumer> 
consumerCreated = __ -> {};
 
     @Override
     protected void doInitConf() throws Exception {
@@ -135,6 +141,14 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
     @Override
     public void setup() throws Exception {
         super.internalSetup();
+        pulsar.getBrokerService().setInterceptor(new MockBrokerInterceptor() {
+
+            @Override
+            public void consumerCreated(ServerCnx cnx, 
org.apache.pulsar.broker.service.Consumer consumer,
+                                        Map<String, String> metadata) {
+                consumerCreated.accept(consumer);
+            }
+        });
 
         admin.clusters().createCluster(configClusterName,
                 
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
@@ -164,6 +178,8 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         admin.namespaces().removeRetention("my-tenant/my-ns");
         AbstractTwoPhaseCompactor.injectionAfterSeekInPhaseTwo = () -> {};
         AbstractTwoPhaseCompactor.injectionPhaseTwoSeek = RawReader::seekAsync;
+        consumerCreated = __ -> {};
+        
pulsarTestContext.getMockBookKeeper().setDefaultReadEntriesDelayMillis(1);
     }
 
     protected long compact(String topic) throws ExecutionException, 
InterruptedException {
@@ -2650,4 +2666,73 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         Awaitility.await().untilAsserted(() -> assertEquals(
                 admin.topics().compactionStatus(topic).status, 
LongRunningProcessStatus.Status.SUCCESS));
     }
+
+    @Test
+    public void testReaderReadOnDeletedLedger() throws Exception {
+        final var topic = 
"persistent://my-tenant/my-ns/reader-read-on-deleted-ledger";
+        try (final var producer = 
pulsarClient.newProducer(Schema.STRING).topic(topic).create()) {
+            for (int i = 0; i < 3; i++) {
+                producer.newMessage().key("key-" + i).value("value-" + 
i).send();
+            }
+        }
+        // Trigger the ledger rollover
+        var ml = (ManagedLedgerImpl) ((PersistentTopic) 
pulsar.getBrokerService().getTopicIfExists(topic).get()
+                .orElseThrow()).getManagedLedger();
+        ml.getConfig().setMaxEntriesPerLedger(1);
+        ml.getConfig().setMaxSizePerLedgerMb(0);
+        ml.getConfig().setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
+        ml.rollCurrentLedgerIfFull();
+        Awaitility.await().untilAsserted(() -> 
assertEquals(ml.getLedgersInfo().size(), 2));
+
+        final var subName = "sub-" + System.currentTimeMillis();
+        @Cleanup final var reader = 
pulsarClient.newReader(Schema.STRING).readCompacted(true).topic(topic)
+                .subscriptionName(subName)
+                .startMessageId(MessageId.earliest).create();
+
+        // Slow down the pre-fetching
+        
pulsarTestContext.getMockBookKeeper().setDefaultReadEntriesDelayMillis(500);
+
+        // Receive 1 message so that the startMessageId will be reset to 
ledger_id:0 after reconnection
+        assertTrue(reader.hasMessageAvailable());
+        final var firstMsg = reader.readNext(3, TimeUnit.SECONDS);
+        assertNotNull(firstMsg);
+
+        triggerAndWaitCompaction(topic);
+
+        // Simulate the pending cumulative acknowledgment is flushed after the 
consumer is created
+        // We don't need such interception if we can support controlling the 
acknowledgment flush for reader.
+        final var firstTime = new AtomicBoolean(true);
+        consumerCreated = serverConsumer -> {
+            final var subscription = serverConsumer.getSubscription();
+            if (subscription.getName().contains(subName) && 
firstTime.compareAndSet(true, false)) {
+                final var msgId = (MessageIdAdv) firstMsg.getMessageId();
+                
subscription.acknowledgeMessage(List.of(PositionFactory.create(msgId.getLedgerId(),
+                        msgId.getEntryId())), CommandAck.AckType.Cumulative, 
Map.of());
+            }
+        };
+
+        // Trigger the reconnection and trim the first ledger.
+        admin.namespaces().unload("my-tenant/my-ns");
+        admin.lookups().lookupTopic(topic);
+        final var persistentTopic = (PersistentTopic) 
pulsar.getBrokerService().getTopic(topic, true).get()
+                .orElseThrow();
+        final var trimFuture = new CompletableFuture<Void>();
+        
persistentTopic.getManagedLedger().trimConsumedLedgersInBackground(trimFuture);
+        trimFuture.get();
+        
assertEquals(persistentTopic.getManagedLedger().getLedgersInfo().size(), 1);
+
+        
pulsarTestContext.getMockBookKeeper().setDefaultReadEntriesDelayMillis(1);
+
+        while (reader.hasMessageAvailable()) {
+            final var msg = reader.readNextAsync().get(3, TimeUnit.SECONDS);
+            log.info("read id={} key={} value={}", msg.getMessageId(), 
msg.getKey(), msg.getValue());
+        }
+
+        final var serverConsumer = 
persistentTopic.getSubscription(subName).getDispatcher().getConsumers().get(0);
+        assertEquals(((MessageIdAdv) 
serverConsumer.getStartMessageId()).getEntryId(), 0L);
+
+        final var emptyLedgerId = 
persistentTopic.getManagedLedger().getLedgersInfo().lastEntry().getKey();
+        
assertEquals(persistentTopic.getTopicCompactionService().getLastCompactedPosition().get(),
+                PositionFactory.create(emptyLedgerId, -1L));
+    }
 }

Reply via email to