This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a1b5a0dbd8d [fix][broker] Prevent subscribe rate limit from stalling
compaction and blocking forced deletion (#26015)
a1b5a0dbd8d is described below
commit a1b5a0dbd8d67f3fcb5db4db53f73e7acd8c825c
Author: Lari Hotari <[email protected]>
AuthorDate: Sat Jun 13 04:55:20 2026 +0300
[fix][broker] Prevent subscribe rate limit from stalling compaction and
blocking forced deletion (#26015)
---
.../broker/service/persistent/PersistentTopic.java | 11 ++++++--
.../apache/pulsar/compaction/CompactionTest.java | 29 ++++++++++++++++++++++
2 files changed, 38 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index c9bcad341fc..37f4c56ed5e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1010,8 +1010,15 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
new NamingException("Subscription with reserved
subscription name attempted"));
}
- if (cnx.clientAddress() != null &&
cnx.clientAddress().toString().contains(":")
- && subscribeRateLimiter.isPresent()) {
+ // The subscribe rate limit must not apply to the broker-internal
compaction subscription: the
+ // compactor's reader re-subscribes after the phase-two seek, and
throttling that re-subscribe stalls
+ // the compaction, which in turn blocks forced topic/namespace
deletion waiting on the in-flight
+ // compaction. System topics are exempt as well, consistent with
the publish/dispatch rate limiters,
+ // since throttling broker-internal readers (e.g. on
__change_events) can stall topic policy updates.
+ if (subscribeRateLimiter.isPresent()
+ && !isSystemTopic()
+ && !isCompactionSubscription(subscriptionName)
+ && cnx.clientAddress() != null &&
cnx.clientAddress().toString().contains(":")) {
SubscribeRateLimiter.ConsumerIdentifier consumer = new
SubscribeRateLimiter.ConsumerIdentifier(
cnx.clientAddress().toString().split(":")[0],
consumerName, consumerId);
if (!subscribeRateLimiter.get().subscribeAvailable(consumer)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index b9aff08119b..82c1f6e1bdc 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -110,6 +110,7 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.common.util.FutureUtil;
@@ -196,6 +197,34 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
return compactor;
}
+ @Test
+ public void testCompactionNotBlockedBySubscribeRateLimit() throws
Exception {
+ String namespace = "my-tenant/my-ns";
+ String topic = "persistent://" + namespace +
"/compaction-with-subscribe-rate-limit";
+
+ try (Producer<byte[]> producer =
pulsarClient.newProducer().topic(topic).enableBatching(false).create()) {
+ for (int i = 0; i < 10; i++) {
+ producer.newMessage().key("key" + (i %
2)).value(("my-message-" + i).getBytes()).send();
+ }
+ }
+
+ // Allow a single subscribe per consumer within a long period. The
compactor's reader consumes the only
+ // token when it first subscribes, so the re-subscribe triggered by
the phase-two seek would be throttled
+ // if the limit applied to the compaction subscription, stalling the
compaction.
+ admin.namespaces().setSubscribeRate(namespace, new SubscribeRate(1,
3600));
+ Awaitility.await().untilAsserted(() -> {
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topic).get();
+ assertTrue(persistentTopic.getSubscribeRateLimiter().isPresent());
+ });
+
+ try {
+ compactor.compact(topic).get(30, TimeUnit.SECONDS);
+ } finally {
+ admin.namespaces().removeSubscribeRate(namespace);
+ }
+ }
+
@Test
public void testCompaction() throws Exception {
String topic = "persistent://my-tenant/my-ns/compaction";