This is an automated email from the ASF dual-hosted git repository.
BewareMyPower pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7a9fefb4c4a [fix][broker] Fix compacted read could be stuck forever or
message loss due to cursor mark delete (#25998)
7a9fefb4c4a is described below
commit 7a9fefb4c4a25b1aa2c0fb335d99aa7b00ee3c91
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Jun 12 11:34:33 2026 +0800
[fix][broker] Fix compacted read could be stuck forever or message loss due
to cursor mark delete (#25998)
---
.../service/persistent/PersistentSubscription.java | 18 +++++
.../apache/pulsar/compaction/CompactionTest.java | 86 ++++++++++++++++++++++
2 files changed, 104 insertions(+)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 236a5516234..c7278257f8a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -58,6 +58,7 @@ import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.AbstractSubscription;
import org.apache.pulsar.broker.service.AnalyzeBacklogResult;
import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -471,6 +472,23 @@ public class PersistentSubscription extends
AbstractSubscription {
.attr("position", position)
.log("Cumulative ack on");
AckCallback callback = new AckCallback(previousMarkDeletePosition,
future);
+ if (dispatcher instanceof AbstractDispatcherSingleActiveConsumer
singleConsumerDispatcher) {
+ // For compacted consumer, we should ignore the position that
does not exist in the managed ledger,
+ // otherwise, the `asyncMarkDelete` call could jump the read
position to the active ledger, which will
+ // skip all entries present in the compacted ledger but not
present in the managed ledger.
+ final var consumer =
singleConsumerDispatcher.getActiveConsumer();
+ final var ml = cursor.getManagedLedger();
+ if (consumer != null
+ && consumer.readCompacted()
+ && !cursor.isDurable()
+ &&
ml.getOptionalLedgerInfo(position.getLedgerId()).isEmpty()) {
+ if (ml.getFirstPosition() == null ||
position.getLedgerId() > ml.getFirstPosition().getLedgerId()) {
+ log.warn("Received an ACK whose position is " +
position + ", valid ledgers: "
+ + ml.getLedgersInfo().keySet());
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+ }
cursor.asyncMarkDelete(position, mergeCursorProperties(properties),
callback, callback);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 6b625d1cd65..b9aff08119b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -56,6 +56,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
@@ -70,12 +71,15 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.intercept.MockBrokerInterceptor;
import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -87,6 +91,7 @@ import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.EncryptionKeyInfo;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
@@ -124,6 +129,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
protected ScheduledExecutorService compactionScheduler;
protected BookKeeper bk;
private PublishingOrderCompactor compactor;
+ private volatile
java.util.function.Consumer<org.apache.pulsar.broker.service.Consumer>
consumerCreated = __ -> {};
@Override
protected void doInitConf() throws Exception {
@@ -135,6 +141,14 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@Override
public void setup() throws Exception {
super.internalSetup();
+ pulsar.getBrokerService().setInterceptor(new MockBrokerInterceptor() {
+
+ @Override
+ public void consumerCreated(ServerCnx cnx,
org.apache.pulsar.broker.service.Consumer consumer,
+ Map<String, String> metadata) {
+ consumerCreated.accept(consumer);
+ }
+ });
admin.clusters().createCluster(configClusterName,
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
@@ -164,6 +178,8 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
admin.namespaces().removeRetention("my-tenant/my-ns");
AbstractTwoPhaseCompactor.injectionAfterSeekInPhaseTwo = () -> {};
AbstractTwoPhaseCompactor.injectionPhaseTwoSeek = RawReader::seekAsync;
+ consumerCreated = __ -> {};
+
pulsarTestContext.getMockBookKeeper().setDefaultReadEntriesDelayMillis(1);
}
protected long compact(String topic) throws ExecutionException,
InterruptedException {
@@ -2648,4 +2664,74 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
Awaitility.await().untilAsserted(() -> assertEquals(
admin.topics().compactionStatus(topic).status,
LongRunningProcessStatus.Status.SUCCESS));
}
+
+ @Test
+ public void testReaderReadOnDeletedLedger() throws Exception {
+ final var topic =
"persistent://my-tenant/my-ns/reader-read-on-deleted-ledger";
+ try (final var producer =
pulsarClient.newProducer(Schema.STRING).topic(topic).create()) {
+ for (int i = 0; i < 3; i++) {
+ producer.newMessage().key("key-" + i).value("value-" +
i).send();
+ }
+ }
+ // Trigger the ledger rollover
+ var ml = (ManagedLedgerImpl) ((PersistentTopic)
pulsar.getBrokerService().getTopicIfExists(topic).get()
+ .orElseThrow()).getManagedLedger();
+ ml.getConfig().setMaxEntriesPerLedger(1);
+ ml.getConfig().setMaxSizePerLedgerMb(0);
+ ml.getConfig().setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
+ ml.rollCurrentLedgerIfFull();
+ Awaitility.await().untilAsserted(() ->
assertEquals(ml.getLedgersInfo().size(), 2));
+
+ final var subName = "sub-" + System.currentTimeMillis();
+ @Cleanup final var reader =
pulsarClient.newReader(Schema.STRING).readCompacted(true).topic(topic)
+ .subscriptionName(subName)
+ .startMessageId(MessageId.earliest).create();
+
+ // Slow down the pre-fetching
+
pulsarTestContext.getMockBookKeeper().setDefaultReadEntriesDelayMillis(500);
+
+ // Receive 1 message so that the startMessageId will be reset to
ledger_id:0 after reconnection
+ assertTrue(reader.hasMessageAvailable());
+ final var firstMsg = reader.readNext(3, TimeUnit.SECONDS);
+ assertNotNull(firstMsg);
+
+ triggerAndWaitCompaction(topic);
+
+ // Simulate the pending cumulative acknowledgment is flushed after the
consumer is created
+ // We don't need such interception if we can support controlling the
acknowledgment flush for reader.
+ final var firstTime = new AtomicBoolean(true);
+ consumerCreated = serverConsumer -> {
+ final var subscription = serverConsumer.getSubscription();
+ if (subscription.getName().contains(subName) &&
firstTime.compareAndSet(true, false)) {
+ final var msgId = (MessageIdAdv) firstMsg.getMessageId();
+
subscription.acknowledgeMessageAsync(List.of(PositionFactory.create(msgId.getLedgerId(),
+ msgId.getEntryId())), CommandAck.AckType.Cumulative,
Map.of());
+ }
+ };
+
+ // Trigger the reconnection and trim the first ledger.
+ admin.namespaces().unload("my-tenant/my-ns");
+ admin.lookups().lookupTopic(topic);
+ final var persistentTopic = (PersistentTopic)
pulsar.getBrokerService().getTopic(topic, true).get()
+ .orElseThrow();
+ final var trimFuture = new CompletableFuture<Void>();
+
persistentTopic.getManagedLedger().trimConsumedLedgersInBackground(trimFuture);
+ trimFuture.get();
+
assertEquals(persistentTopic.getManagedLedger().getLedgersInfo().size(), 1);
+
+
pulsarTestContext.getMockBookKeeper().setDefaultReadEntriesDelayMillis(1);
+
+ while (reader.hasMessageAvailable()) {
+ final var msg = reader.readNextAsync().get(3, TimeUnit.SECONDS);
+ log.info().attr("id", msg.getMessageId()).attr("key", msg.getKey())
+ .attr("value", msg.getValue()).log("read");
+ }
+
+ final var serverConsumer =
persistentTopic.getSubscription(subName).getDispatcher().getConsumers().get(0);
+ assertEquals(((MessageIdAdv)
serverConsumer.getStartMessageId()).getEntryId(), 0L);
+
+ final var emptyLedgerId =
persistentTopic.getManagedLedger().getLedgersInfo().lastEntry().getKey();
+
assertEquals(persistentTopic.getTopicCompactionService().getLastCompactedPosition().get(),
+ PositionFactory.create(emptyLedgerId, -1L));
+ }
}