This is an automated email from the ASF dual-hosted git repository.
thetumbled pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 85937d8ea8e [fix][broker] Unthrottle producers immediately when
publish rate limiting is disabled (#25502)
85937d8ea8e is described below
commit 85937d8ea8ecf07480f51df6c76654370e35c870
Author: Wenzhi Feng <[email protected]>
AuthorDate: Wed Apr 15 14:28:56 2026 +0800
[fix][broker] Unthrottle producers immediately when publish rate limiting
is disabled (#25502)
Co-authored-by: fengwenzhi <[email protected]>
---
.../broker/service/PublishRateLimiterImpl.java | 20 +++-
.../broker/service/PublishRateLimiterTest.java | 124 +++++++++++++++++++++
2 files changed, 142 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java
index 096418191dc..5d8fb4687b2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java
@@ -43,6 +43,12 @@ public class PublishRateLimiterImpl implements
PublishRateLimiter {
private final AtomicInteger throttledProducersCount = new AtomicInteger(0);
private final AtomicBoolean processingQueuedProducers = new
AtomicBoolean(false);
+
+ /**
+ * Executor used for the last {@link #scheduleUnthrottling} from this
limiter (set when throttling starts).
+ * Used to schedule an immediate follow-up run after publish-rate limits
change.
+ */
+ private volatile ScheduledExecutorService lastUnthrottleExecutor;
private final Consumer<Producer> throttleAction;
private final Consumer<Producer> unthrottleAction;
@@ -88,6 +94,7 @@ public class PublishRateLimiterImpl implements
PublishRateLimiter {
// this is to avoid scheduling unthrottling multiple times for
concurrent producers
if (throttledProducersCount.incrementAndGet() == 1) {
ScheduledExecutorService executor =
producer.getCnx().getBrokerService().executor().next();
+ lastUnthrottleExecutor = executor;
scheduleUnthrottling(executor, calculateThrottlingDurationNanos());
}
}
@@ -167,12 +174,18 @@ public class PublishRateLimiterImpl implements
PublishRateLimiter {
update(maxPublishRate);
}
+ private void scheduleImmediateUnthrottling() {
+ ScheduledExecutorService executor = lastUnthrottleExecutor;
+ if (executor != null) {
+ scheduleUnthrottling(executor, 0L);
+ }
+ }
+
public void update(PublishRate maxPublishRate) {
if (maxPublishRate != null) {
updateTokenBuckets(maxPublishRate.publishThrottlingRateInMsg,
maxPublishRate.publishThrottlingRateInByte);
} else {
- tokenBucketOnMessage = null;
- tokenBucketOnByte = null;
+ updateTokenBuckets(0L, 0L);
}
}
@@ -189,6 +202,9 @@ public class PublishRateLimiterImpl implements
PublishRateLimiter {
} else {
tokenBucketOnByte = null;
}
+ // After any bucket rebuild, wake unthrottling:
+ // old scheduled delay may be invalid and cause unnecessary wait time
for producers to be unthrottled.
+ scheduleImmediateUnthrottling();
}
@VisibleForTesting
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java
index 573e3980c73..270286419f7 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java
@@ -19,19 +19,26 @@
package org.apache.pulsar.broker.service;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.DefaultEventLoop;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
+import io.netty.util.concurrent.ScheduledFuture;
import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.pulsar.broker.qos.AsyncTokenBucket;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.testng.annotations.AfterMethod;
@@ -149,4 +156,121 @@ public class PublishRateLimiterTest {
});
future.get(5, TimeUnit.SECONDS);
}
+
+ /**
+ * When the token bucket is deeply depleted, the first scheduled
unthrottle uses a long delay. Disabling limits
+ * must schedule an immediate unthrottle (delay 0) so producers are not
stuck until that delay elapses.
+ */
+ @Test
+ public void
shouldUnthrottleImmediatelyAfterDisablingLimitsDespiteLongPendingDelay() {
+ AtomicLong manualClock = new AtomicLong(TimeUnit.SECONDS.toNanos(100));
+ AtomicInteger unthrottleCalls = new AtomicInteger();
+
+ PublishRateLimiterImpl limiter = new PublishRateLimiterImpl(
+ manualClock::get,
+ p -> { },
+ p -> unthrottleCalls.incrementAndGet());
+
+ EventLoop scheduler = mock(EventLoop.class);
+ AtomicInteger longDelaySchedules = new AtomicInteger();
+ doAnswer(invocation -> {
+ Runnable task = invocation.getArgument(0);
+ long delay = invocation.getArgument(1);
+ TimeUnit unit = invocation.getArgument(2);
+ long delayNanos = unit.toNanos(delay);
+ if (delayNanos == 0L) {
+ task.run();
+ } else {
+ longDelaySchedules.incrementAndGet();
+ }
+ @SuppressWarnings("unchecked")
+ ScheduledFuture<?> scheduled = mock(ScheduledFuture.class);
+ return scheduled;
+ }).when(scheduler).schedule(any(Runnable.class), anyLong(), any());
+
+ Producer p = mock(Producer.class);
+ ServerCnx cnx = mock(ServerCnx.class);
+ ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+ doAnswer(a -> ctx).when(cnx).ctx();
+ doAnswer(a -> cnx).when(p).getCnx();
+ when(p.getCnx()).thenReturn(cnx);
+ doAnswer(a -> {
+ ((Runnable) a.getArgument(0)).run();
+ return null;
+ }).when(cnx).execute(any(Runnable.class));
+
+ BrokerService brokerService = mock(BrokerService.class);
+ when(cnx.getBrokerService()).thenReturn(brokerService);
+ EventLoopGroup eventLoopGroup = mock(EventLoopGroup.class);
+ when(brokerService.executor()).thenReturn(eventLoopGroup);
+ when(eventLoopGroup.next()).thenReturn(scheduler);
+
+ limiter.update(new PublishRate(1, 0));
+ manualClock.addAndGet(TimeUnit.SECONDS.toNanos(1));
+
+ limiter.handlePublishThrottling(p, 100_000, 0L);
+ assertEquals(unthrottleCalls.get(), 0);
+ assertTrue(longDelaySchedules.get() >= 1,
+ "Expected a long-delay unthrottle to be scheduled while the
bucket is deeply depleted");
+
+ limiter.update(new PublishRate(0, 0));
+ assertEquals(unthrottleCalls.get(), 1);
+ }
+
+ /**
+ * Relaxing only the byte limit still invalidates a previously scheduled
long unthrottle delay; an immediate
+ * unthrottle pass must run after buckets are rebuilt.
+ */
+ @Test
+ public void
shouldUnthrottleImmediatelyAfterRaisingByteLimitDespiteLongPendingDelay() {
+ AtomicLong manualClock = new AtomicLong(TimeUnit.SECONDS.toNanos(100));
+ AtomicInteger unthrottleCalls = new AtomicInteger();
+
+ PublishRateLimiterImpl limiter = new PublishRateLimiterImpl(
+ manualClock::get,
+ p -> { },
+ p -> unthrottleCalls.incrementAndGet());
+
+ EventLoop scheduler = mock(EventLoop.class);
+ doAnswer(invocation -> {
+ Runnable task = invocation.getArgument(0);
+ long delay = invocation.getArgument(1);
+ TimeUnit unit = invocation.getArgument(2);
+ if (unit.toNanos(delay) == 0L) {
+ task.run();
+ }
+ @SuppressWarnings("unchecked")
+ ScheduledFuture<?> scheduled = mock(ScheduledFuture.class);
+ return scheduled;
+ }).when(scheduler).schedule(any(Runnable.class), anyLong(), any());
+
+ Producer p = mock(Producer.class);
+ ServerCnx cnx = mock(ServerCnx.class);
+ ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+ doAnswer(a -> ctx).when(cnx).ctx();
+ doAnswer(a -> cnx).when(p).getCnx();
+ when(p.getCnx()).thenReturn(cnx);
+ doAnswer(a -> {
+ ((Runnable) a.getArgument(0)).run();
+ return null;
+ }).when(cnx).execute(any(Runnable.class));
+
+ BrokerService brokerService = mock(BrokerService.class);
+ when(cnx.getBrokerService()).thenReturn(brokerService);
+ EventLoopGroup eventLoopGroup = mock(EventLoopGroup.class);
+ when(brokerService.executor()).thenReturn(eventLoopGroup);
+ when(eventLoopGroup.next()).thenReturn(scheduler);
+
+ limiter.update(new PublishRate(0, 1));
+ manualClock.addAndGet(TimeUnit.SECONDS.toNanos(1));
+
+ limiter.handlePublishThrottling(p, 0, 100_000L);
+ assertEquals(unthrottleCalls.get(), 0);
+
+ limiter.update(new PublishRate(0, 1_000_000));
+ assertEquals(unthrottleCalls.get(), 1);
+
+ AsyncTokenBucket byteBucket = limiter.getTokenBucketOnByte();
+ assertNotNull(byteBucket);
+ }
}