This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ec56aea8370 [fix][broker] Fix forced topic/namespace deletion hanging
or failing when compaction is in progress (#26016)
ec56aea8370 is described below
commit ec56aea8370e2dfdfda7aeeb1f5ee730d2d86866
Author: Lari Hotari <[email protected]>
AuthorDate: Sat Jun 13 08:53:57 2026 +0300
[fix][broker] Fix forced topic/namespace deletion hanging or failing when
compaction is in progress (#26016)
---
.../broker/service/persistent/PersistentTopic.java | 27 ++++++----
.../apache/pulsar/client/impl/RawReaderImpl.java | 11 ++++
.../apache/pulsar/client/impl/RawReaderTest.java | 19 +++++++
.../apache/pulsar/compaction/CompactionTest.java | 58 ++++++++++++++++++++--
4 files changed, 100 insertions(+), 15 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 37f4c56ed5e..1b6239047d1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1436,8 +1436,18 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
return;
}
}
- // Unsubscribe compaction cursor and delete compacted ledger.
- currentCompaction.thenCompose(__ -> {
+ // Unsubscribe compaction cursor and delete compacted ledger. Wait for
any in-flight compaction to finish
+ // first, but don't let a compaction that completed exceptionally
block the cursor deletion: the deletion
+ // would otherwise fail on every retry until the topic instance is
reloaded (issue #24148). Note that a
+ // fenced topic makes the compactor's reader fail with an
unrecoverable error, so a forced deletion
+ // terminates an in-flight compaction exceptionally rather than
waiting for it to complete normally.
+ currentCompaction.exceptionally(compactionEx -> {
+ log.info()
+ .attr("subscription", subscriptionName)
+ .exceptionMessage(compactionEx)
+ .log("Last compaction task failed, proceeding to delete
the compaction cursor");
+ return null;
+ }).thenCompose(__ -> {
asyncDeleteCursor(subscriptionName, unsubscribeFuture);
return unsubscribeFuture;
}).thenAccept(__ -> {
@@ -1459,15 +1469,10 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
disablingCompaction.compareAndSet(true, false);
}
}).exceptionally(ex -> {
- if (currentCompaction.isCompletedExceptionally()) {
- log.warn()
- .attr("subscription", subscriptionName)
- .log("Last compaction task failed");
- } else {
- log.warn()
- .attr("subscription", subscriptionName)
- .log("Failed to delete cursor task failed");
- }
+ log.warn()
+ .attr("subscription", subscriptionName)
+ .exceptionMessage(ex)
+ .log("Failed to delete the compaction cursor");
// Reset the variable: disablingCompaction,
disablingCompaction.compareAndSet(true, false);
unsubscribeFuture.completeExceptionally(ex);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 6530315da81..5975588290f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -230,6 +230,17 @@ public class RawReaderImpl implements RawReader {
CompletableFuture<RawMessage> result = new CompletableFuture<>();
pendingRawReceives.add(result);
tryCompletePending();
+ // Once the consumer has reached a terminal state (for example it
was closed after an
+ // unrecoverable error such as the topic or namespace being
deleted), no further message
+ // will arrive and no close callback will run for receives
enqueued from now on, so the
+ // future would never complete. Re-checking the state after
enqueueing closes the race
+ // with a concurrent close() draining the queue, since close()
drains only after moving to
+ // a terminal state. This matters for compaction: a
never-completing read leaves the
+ // compaction future pending, which in turn blocks forced
topic/namespace deletion.
+ State state = getState();
+ if (state == State.Closing || state == State.Closed || state ==
State.Failed) {
+ failPendingRawReceives();
+ }
return result;
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
index 8b5732ef9dc..236d189d53a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
@@ -624,6 +624,25 @@ public class RawReaderTest extends
MockedPulsarServiceBaseTest {
admin.topics().delete(topic, false);
}
+ @Test(timeOut = 30000)
+ public void testReadNextAsyncCompletesAfterConsumerClosed() throws
Exception {
+ String topic = "persistent://my-property/my-ns/" +
BrokerTestUtil.newUniqueName("reader");
+ admin.topics().createNonPartitionedTopic(topic);
+ RawReader reader = RawReader.create(pulsarClient, topic,
subscription).get();
+
+ // Put the reader's underlying consumer into a terminal state. In
production this happens when a
+ // compaction's RawReader hits an unrecoverable error (e.g. the
topic/namespace is being deleted).
+ reader.closeAsync().get(5, TimeUnit.SECONDS);
+
+ // A read issued once the consumer has reached a terminal state must
complete instead of hanging
+ // forever: a never-completing read keeps the compaction future
pending, which blocks forced
+ // topic/namespace deletion (issue #24148).
+ CompletableFuture<RawMessage> readFuture = reader.readNextAsync();
+ Awaitility.await().atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertTrue(readFuture.isDone()));
+ assertTrue(readFuture.isCompletedExceptionally());
+ }
+
@Test(timeOut = 100000)
public void testPauseAndResume() throws Exception {
log.info("-- Starting testPauseAndResume test --");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 82c1f6e1bdc..60e0edaa4da 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -136,6 +136,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
protected void doInitConf() throws Exception {
super.doInitConf();
conf.setDispatcherMaxReadBatchSize(1);
+ conf.setForceDeleteNamespaceAllowed(true);
}
@BeforeClass
@@ -2538,10 +2539,59 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
Thread.sleep(3000);
delayReadSignal.countDown();
- // Verify: topic deletion is successfully executed.
- Awaitility.await().atMost(15, TimeUnit.SECONDS).untilAsserted(() -> {
- assertTrue(deleteTopicFuture.isDone());
- });
+ // Verify: topic deletion is successfully executed. Asserting success
(not just completion) covers the
+ // case where fencing the topic terminates the in-flight compaction
exceptionally: the failed compaction
+ // must not fail the deletion (issue #24148).
+ deleteTopicFuture.get(15, TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void testForcedDeleteSucceedsAfterFailedCompaction() throws
Exception {
+ String topicName =
newUniqueName("persistent://my-tenant/my-ns/delete-after-failed-compaction");
+ admin.topics().createNonPartitionedTopic(topicName);
+ try (Producer<String> producer =
pulsarClient.newProducer(Schema.STRING).topic(topicName).create()) {
+ for (int i = 0; i < 10; i++) {
+ producer.newMessage().key("key" + (i % 2)).value("value-" +
i).send();
+ }
+ }
+
+ // Fail the compaction after the phase-two seek
+ AbstractTwoPhaseCompactor.injectionPhaseTwoSeek =
+ (reader, id) -> CompletableFuture.failedFuture(new
RuntimeException("injected compaction failure"));
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar.getBrokerService().getTopic(topicName, false).join().get();
+ persistentTopic.triggerCompaction();
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(persistentTopic.compactionStatus().status,
LongRunningProcessStatus.Status.ERROR));
+ AbstractTwoPhaseCompactor.injectionPhaseTwoSeek = RawReader::seekAsync;
+
+ // The failed compaction must not block the deletion of the compaction
cursor
+ admin.topics().delete(topicName, true);
+ }
+
+ @Test
+ public void testForcedNamespaceDeleteWithInflightCompaction() throws
Exception {
+ String namespace = "my-tenant/my-ns-inflight-compaction";
+ admin.namespaces().createNamespace(namespace,
Set.of(configClusterName));
+ final String topicName = newUniqueName("persistent://" + namespace +
"/inflight-compaction");
+ admin.topics().createNonPartitionedTopic(topicName);
+ try (Producer<String> producer =
pulsarClient.newProducer(Schema.STRING).topic(topicName).create()) {
+ // dispatcherMaxReadBatchSize=1 makes the compactor read these one
at a time, keeping the compaction
+ // in-flight for several seconds while the namespace is deleted
+ for (int i = 0; i < 2000; i++) {
+
producer.newMessage().key(String.valueOf(i)).value(String.valueOf(i)).send();
+ }
+ }
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar.getBrokerService().getTopic(topicName, false).join().get();
+ persistentTopic.triggerCompactionWithCheckHasMoreMessages().join();
+ Awaitility.await().untilAsserted(() ->
+
assertEquals(persistentTopic.getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(),
+ 1));
+
+ // Forced namespace deletion must succeed while the compaction is
in-flight: fencing the topic terminates
+ // the compaction exceptionally, which must not fail the deletion of
the compaction cursor (issue #24148)
+ deleteNamespaceWithRetry(namespace, true, admin);
}
@Test