This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ec56aea8370 [fix][broker] Fix forced topic/namespace deletion hanging 
or failing when compaction is in progress (#26016)
ec56aea8370 is described below

commit ec56aea8370e2dfdfda7aeeb1f5ee730d2d86866
Author: Lari Hotari <[email protected]>
AuthorDate: Sat Jun 13 08:53:57 2026 +0300

    [fix][broker] Fix forced topic/namespace deletion hanging or failing when 
compaction is in progress (#26016)
---
 .../broker/service/persistent/PersistentTopic.java | 27 ++++++----
 .../apache/pulsar/client/impl/RawReaderImpl.java   | 11 ++++
 .../apache/pulsar/client/impl/RawReaderTest.java   | 19 +++++++
 .../apache/pulsar/compaction/CompactionTest.java   | 58 ++++++++++++++++++++--
 4 files changed, 100 insertions(+), 15 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 37f4c56ed5e..1b6239047d1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1436,8 +1436,18 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 return;
             }
         }
-        // Unsubscribe compaction cursor and delete compacted ledger.
-        currentCompaction.thenCompose(__ -> {
+        // Unsubscribe compaction cursor and delete compacted ledger. Wait for 
any in-flight compaction to finish
+        // first, but don't let a compaction that completed exceptionally 
block the cursor deletion: the deletion
+        // would otherwise fail on every retry until the topic instance is 
reloaded (issue #24148). Note that a
+        // fenced topic makes the compactor's reader fail with an 
unrecoverable error, so a forced deletion
+        // terminates an in-flight compaction exceptionally rather than 
waiting for it to complete normally.
+        currentCompaction.exceptionally(compactionEx -> {
+            log.info()
+                    .attr("subscription", subscriptionName)
+                    .exceptionMessage(compactionEx)
+                    .log("Last compaction task failed, proceeding to delete 
the compaction cursor");
+            return null;
+        }).thenCompose(__ -> {
             asyncDeleteCursor(subscriptionName, unsubscribeFuture);
             return unsubscribeFuture;
         }).thenAccept(__ -> {
@@ -1459,15 +1469,10 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 disablingCompaction.compareAndSet(true, false);
             }
         }).exceptionally(ex -> {
-            if (currentCompaction.isCompletedExceptionally()) {
-                log.warn()
-                        .attr("subscription", subscriptionName)
-                        .log("Last compaction task failed");
-            } else {
-                log.warn()
-                        .attr("subscription", subscriptionName)
-                        .log("Failed to delete cursor task failed");
-            }
+            log.warn()
+                    .attr("subscription", subscriptionName)
+                    .exceptionMessage(ex)
+                    .log("Failed to delete the compaction cursor");
             // Reset the variable: disablingCompaction,
             disablingCompaction.compareAndSet(true, false);
             unsubscribeFuture.completeExceptionally(ex);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 6530315da81..5975588290f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -230,6 +230,17 @@ public class RawReaderImpl implements RawReader {
             CompletableFuture<RawMessage> result = new CompletableFuture<>();
             pendingRawReceives.add(result);
             tryCompletePending();
+            // Once the consumer has reached a terminal state (for example it 
was closed after an
+            // unrecoverable error such as the topic or namespace being 
deleted), no further message
+            // will arrive and no close callback will run for receives 
enqueued from now on, so the
+            // future would never complete. Re-checking the state after 
enqueueing closes the race
+            // with a concurrent close() draining the queue, since close() 
drains only after moving to
+            // a terminal state. This matters for compaction: a 
never-completing read leaves the
+            // compaction future pending, which in turn blocks forced 
topic/namespace deletion.
+            State state = getState();
+            if (state == State.Closing || state == State.Closed || state == 
State.Failed) {
+                failPendingRawReceives();
+            }
             return result;
         }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
index 8b5732ef9dc..236d189d53a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
@@ -624,6 +624,25 @@ public class RawReaderTest extends 
MockedPulsarServiceBaseTest {
         admin.topics().delete(topic, false);
     }
 
+    @Test(timeOut = 30000)
+    public void testReadNextAsyncCompletesAfterConsumerClosed() throws 
Exception {
+        String topic = "persistent://my-property/my-ns/" + 
BrokerTestUtil.newUniqueName("reader");
+        admin.topics().createNonPartitionedTopic(topic);
+        RawReader reader = RawReader.create(pulsarClient, topic, 
subscription).get();
+
+        // Put the reader's underlying consumer into a terminal state. In 
production this happens when a
+        // compaction's RawReader hits an unrecoverable error (e.g. the 
topic/namespace is being deleted).
+        reader.closeAsync().get(5, TimeUnit.SECONDS);
+
+        // A read issued once the consumer has reached a terminal state must 
complete instead of hanging
+        // forever: a never-completing read keeps the compaction future 
pending, which blocks forced
+        // topic/namespace deletion (issue #24148).
+        CompletableFuture<RawMessage> readFuture = reader.readNextAsync();
+        Awaitility.await().atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> assertTrue(readFuture.isDone()));
+        assertTrue(readFuture.isCompletedExceptionally());
+    }
+
     @Test(timeOut = 100000)
     public void testPauseAndResume() throws Exception {
         log.info("-- Starting testPauseAndResume test --");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 82c1f6e1bdc..60e0edaa4da 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -136,6 +136,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
     protected void doInitConf() throws Exception {
         super.doInitConf();
         conf.setDispatcherMaxReadBatchSize(1);
+        conf.setForceDeleteNamespaceAllowed(true);
     }
 
     @BeforeClass
@@ -2538,10 +2539,59 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         Thread.sleep(3000);
         delayReadSignal.countDown();
 
-        // Verify: topic deletion is successfully executed.
-        Awaitility.await().atMost(15, TimeUnit.SECONDS).untilAsserted(() -> {
-            assertTrue(deleteTopicFuture.isDone());
-        });
+        // Verify: topic deletion is successfully executed. Asserting success 
(not just completion) covers the
+        // case where fencing the topic terminates the in-flight compaction 
exceptionally: the failed compaction
+        // must not fail the deletion (issue #24148).
+        deleteTopicFuture.get(15, TimeUnit.SECONDS);
+    }
+
+    @Test
+    public void testForcedDeleteSucceedsAfterFailedCompaction() throws 
Exception {
+        String topicName = 
newUniqueName("persistent://my-tenant/my-ns/delete-after-failed-compaction");
+        admin.topics().createNonPartitionedTopic(topicName);
+        try (Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topicName).create()) {
+            for (int i = 0; i < 10; i++) {
+                producer.newMessage().key("key" + (i % 2)).value("value-" + 
i).send();
+            }
+        }
+
+        // Fail the compaction after the phase-two seek
+        AbstractTwoPhaseCompactor.injectionPhaseTwoSeek =
+                (reader, id) -> CompletableFuture.failedFuture(new 
RuntimeException("injected compaction failure"));
+        PersistentTopic persistentTopic =
+                (PersistentTopic) 
pulsar.getBrokerService().getTopic(topicName, false).join().get();
+        persistentTopic.triggerCompaction();
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(persistentTopic.compactionStatus().status, 
LongRunningProcessStatus.Status.ERROR));
+        AbstractTwoPhaseCompactor.injectionPhaseTwoSeek = RawReader::seekAsync;
+
+        // The failed compaction must not block the deletion of the compaction 
cursor
+        admin.topics().delete(topicName, true);
+    }
+
+    @Test
+    public void testForcedNamespaceDeleteWithInflightCompaction() throws 
Exception {
+        String namespace = "my-tenant/my-ns-inflight-compaction";
+        admin.namespaces().createNamespace(namespace, 
Set.of(configClusterName));
+        final String topicName = newUniqueName("persistent://" + namespace + 
"/inflight-compaction");
+        admin.topics().createNonPartitionedTopic(topicName);
+        try (Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topicName).create()) {
+            // dispatcherMaxReadBatchSize=1 makes the compactor read these one 
at a time, keeping the compaction
+            // in-flight for several seconds while the namespace is deleted
+            for (int i = 0; i < 2000; i++) {
+                
producer.newMessage().key(String.valueOf(i)).value(String.valueOf(i)).send();
+            }
+        }
+        PersistentTopic persistentTopic =
+                (PersistentTopic) 
pulsar.getBrokerService().getTopic(topicName, false).join().get();
+        persistentTopic.triggerCompactionWithCheckHasMoreMessages().join();
+        Awaitility.await().untilAsserted(() ->
+                
assertEquals(persistentTopic.getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(),
+                        1));
+
+        // Forced namespace deletion must succeed while the compaction is 
in-flight: fencing the topic terminates
+        // the compaction exceptionally, which must not fail the deletion of 
the compaction cursor (issue #24148)
+        deleteNamespaceWithRetry(namespace, true, admin);
     }
 
     @Test

Reply via email to