This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a1b5a0dbd8d [fix][broker] Prevent subscribe rate limit from stalling 
compaction and blocking forced deletion (#26015)
a1b5a0dbd8d is described below

commit a1b5a0dbd8d67f3fcb5db4db53f73e7acd8c825c
Author: Lari Hotari <[email protected]>
AuthorDate: Sat Jun 13 04:55:20 2026 +0300

    [fix][broker] Prevent subscribe rate limit from stalling compaction and 
blocking forced deletion (#26015)
---
 .../broker/service/persistent/PersistentTopic.java | 11 ++++++--
 .../apache/pulsar/compaction/CompactionTest.java   | 29 ++++++++++++++++++++++
 2 files changed, 38 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index c9bcad341fc..37f4c56ed5e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1010,8 +1010,15 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                         new NamingException("Subscription with reserved 
subscription name attempted"));
             }
 
-            if (cnx.clientAddress() != null && 
cnx.clientAddress().toString().contains(":")
-                    && subscribeRateLimiter.isPresent()) {
+            // The subscribe rate limit must not apply to the broker-internal 
compaction subscription: the
+            // compactor's reader re-subscribes after the phase-two seek, and 
throttling that re-subscribe stalls
+            // the compaction, which in turn blocks forced topic/namespace 
deletion waiting on the in-flight
+            // compaction. System topics are exempt as well, consistent with 
the publish/dispatch rate limiters,
+            // since throttling broker-internal readers (e.g. on 
__change_events) can stall topic policy updates.
+            if (subscribeRateLimiter.isPresent()
+                    && !isSystemTopic()
+                    && !isCompactionSubscription(subscriptionName)
+                    && cnx.clientAddress() != null && 
cnx.clientAddress().toString().contains(":")) {
                 SubscribeRateLimiter.ConsumerIdentifier consumer = new 
SubscribeRateLimiter.ConsumerIdentifier(
                         cnx.clientAddress().toString().split(":")[0], 
consumerName, consumerId);
                 if (!subscribeRateLimiter.get().subscribeAvailable(consumer)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index b9aff08119b..82c1f6e1bdc 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -110,6 +110,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.protocol.Markers;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -196,6 +197,34 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         return compactor;
     }
 
+    @Test
+    public void testCompactionNotBlockedBySubscribeRateLimit() throws 
Exception {
+        String namespace = "my-tenant/my-ns";
+        String topic = "persistent://" + namespace + 
"/compaction-with-subscribe-rate-limit";
+
+        try (Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).enableBatching(false).create()) {
+            for (int i = 0; i < 10; i++) {
+                producer.newMessage().key("key" + (i % 
2)).value(("my-message-" + i).getBytes()).send();
+            }
+        }
+
+        // Allow a single subscribe per consumer within a long period. The 
compactor's reader consumes the only
+        // token when it first subscribes, so the re-subscribe triggered by 
the phase-two seek would be throttled
+        // if the limit applied to the compaction subscription, stalling the 
compaction.
+        admin.namespaces().setSubscribeRate(namespace, new SubscribeRate(1, 
3600));
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopic persistentTopic =
+                    (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topic).get();
+            assertTrue(persistentTopic.getSubscribeRateLimiter().isPresent());
+        });
+
+        try {
+            compactor.compact(topic).get(30, TimeUnit.SECONDS);
+        } finally {
+            admin.namespaces().removeSubscribeRate(namespace);
+        }
+    }
+
     @Test
     public void testCompaction() throws Exception {
         String topic = "persistent://my-tenant/my-ns/compaction";

Reply via email to