This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 1ac5c64689d74952add2c4a1a1918200ed1b4ebf Author: Wenzhi Feng <[email protected]> AuthorDate: Wed Apr 15 14:28:56 2026 +0800 [fix][broker] Unthrottle producers immediately when publish rate limiting is disabled (#25502) Co-authored-by: fengwenzhi <[email protected]> (cherry picked from commit 85937d8ea8ecf07480f51df6c76654370e35c870) --- .../broker/service/PublishRateLimiterImpl.java | 20 +++- .../broker/service/PublishRateLimiterTest.java | 124 +++++++++++++++++++++ 2 files changed, 142 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java index 096418191dc..5d8fb4687b2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java @@ -43,6 +43,12 @@ public class PublishRateLimiterImpl implements PublishRateLimiter { private final AtomicInteger throttledProducersCount = new AtomicInteger(0); private final AtomicBoolean processingQueuedProducers = new AtomicBoolean(false); + + /** + * Executor used for the last {@link #scheduleUnthrottling} from this limiter (set when throttling starts). + * Used to schedule an immediate follow-up run after publish-rate limits change. + */ + private volatile ScheduledExecutorService lastUnthrottleExecutor; private final Consumer<Producer> throttleAction; private final Consumer<Producer> unthrottleAction; @@ -88,6 +94,7 @@ public class PublishRateLimiterImpl implements PublishRateLimiter { // this is to avoid scheduling unthrottling multiple times for concurrent producers if (throttledProducersCount.incrementAndGet() == 1) { ScheduledExecutorService executor = producer.getCnx().getBrokerService().executor().next(); + lastUnthrottleExecutor = executor; scheduleUnthrottling(executor, calculateThrottlingDurationNanos()); } } @@ -167,12 +174,18 @@ public class PublishRateLimiterImpl implements PublishRateLimiter { update(maxPublishRate); } + private void scheduleImmediateUnthrottling() { + ScheduledExecutorService executor = lastUnthrottleExecutor; + if (executor != null) { + scheduleUnthrottling(executor, 0L); + } + } + public void update(PublishRate maxPublishRate) { if (maxPublishRate != null) { updateTokenBuckets(maxPublishRate.publishThrottlingRateInMsg, maxPublishRate.publishThrottlingRateInByte); } else { - tokenBucketOnMessage = null; - tokenBucketOnByte = null; + updateTokenBuckets(0L, 0L); } } @@ -189,6 +202,9 @@ public class PublishRateLimiterImpl implements PublishRateLimiter { } else { tokenBucketOnByte = null; } + // After any bucket rebuild, wake unthrottling: + // old scheduled delay may be invalid and cause unnecessary wait time for producers to be unthrottled. + scheduleImmediateUnthrottling(); } @VisibleForTesting diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java index 573e3980c73..270286419f7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java @@ -19,19 +19,26 @@ package org.apache.pulsar.broker.service; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.DefaultEventLoop; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.util.concurrent.DefaultThreadFactory; +import io.netty.util.concurrent.ScheduledFuture; import java.util.HashMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import org.apache.pulsar.broker.qos.AsyncTokenBucket; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublishRate; import org.testng.annotations.AfterMethod; @@ -149,4 +156,121 @@ public class PublishRateLimiterTest { }); future.get(5, TimeUnit.SECONDS); } + + /** + * When the token bucket is deeply depleted, the first scheduled unthrottle uses a long delay. Disabling limits + * must schedule an immediate unthrottle (delay 0) so producers are not stuck until that delay elapses. + */ + @Test + public void shouldUnthrottleImmediatelyAfterDisablingLimitsDespiteLongPendingDelay() { + AtomicLong manualClock = new AtomicLong(TimeUnit.SECONDS.toNanos(100)); + AtomicInteger unthrottleCalls = new AtomicInteger(); + + PublishRateLimiterImpl limiter = new PublishRateLimiterImpl( + manualClock::get, + p -> { }, + p -> unthrottleCalls.incrementAndGet()); + + EventLoop scheduler = mock(EventLoop.class); + AtomicInteger longDelaySchedules = new AtomicInteger(); + doAnswer(invocation -> { + Runnable task = invocation.getArgument(0); + long delay = invocation.getArgument(1); + TimeUnit unit = invocation.getArgument(2); + long delayNanos = unit.toNanos(delay); + if (delayNanos == 0L) { + task.run(); + } else { + longDelaySchedules.incrementAndGet(); + } + @SuppressWarnings("unchecked") + ScheduledFuture<?> scheduled = mock(ScheduledFuture.class); + return scheduled; + }).when(scheduler).schedule(any(Runnable.class), anyLong(), any()); + + Producer p = mock(Producer.class); + ServerCnx cnx = mock(ServerCnx.class); + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + doAnswer(a -> ctx).when(cnx).ctx(); + doAnswer(a -> cnx).when(p).getCnx(); + when(p.getCnx()).thenReturn(cnx); + doAnswer(a -> { + ((Runnable) a.getArgument(0)).run(); + return null; + }).when(cnx).execute(any(Runnable.class)); + + BrokerService brokerService = mock(BrokerService.class); + when(cnx.getBrokerService()).thenReturn(brokerService); + EventLoopGroup eventLoopGroup = mock(EventLoopGroup.class); + when(brokerService.executor()).thenReturn(eventLoopGroup); + when(eventLoopGroup.next()).thenReturn(scheduler); + + limiter.update(new PublishRate(1, 0)); + manualClock.addAndGet(TimeUnit.SECONDS.toNanos(1)); + + limiter.handlePublishThrottling(p, 100_000, 0L); + assertEquals(unthrottleCalls.get(), 0); + assertTrue(longDelaySchedules.get() >= 1, + "Expected a long-delay unthrottle to be scheduled while the bucket is deeply depleted"); + + limiter.update(new PublishRate(0, 0)); + assertEquals(unthrottleCalls.get(), 1); + } + + /** + * Relaxing only the byte limit still invalidates a previously scheduled long unthrottle delay; an immediate + * unthrottle pass must run after buckets are rebuilt. + */ + @Test + public void shouldUnthrottleImmediatelyAfterRaisingByteLimitDespiteLongPendingDelay() { + AtomicLong manualClock = new AtomicLong(TimeUnit.SECONDS.toNanos(100)); + AtomicInteger unthrottleCalls = new AtomicInteger(); + + PublishRateLimiterImpl limiter = new PublishRateLimiterImpl( + manualClock::get, + p -> { }, + p -> unthrottleCalls.incrementAndGet()); + + EventLoop scheduler = mock(EventLoop.class); + doAnswer(invocation -> { + Runnable task = invocation.getArgument(0); + long delay = invocation.getArgument(1); + TimeUnit unit = invocation.getArgument(2); + if (unit.toNanos(delay) == 0L) { + task.run(); + } + @SuppressWarnings("unchecked") + ScheduledFuture<?> scheduled = mock(ScheduledFuture.class); + return scheduled; + }).when(scheduler).schedule(any(Runnable.class), anyLong(), any()); + + Producer p = mock(Producer.class); + ServerCnx cnx = mock(ServerCnx.class); + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + doAnswer(a -> ctx).when(cnx).ctx(); + doAnswer(a -> cnx).when(p).getCnx(); + when(p.getCnx()).thenReturn(cnx); + doAnswer(a -> { + ((Runnable) a.getArgument(0)).run(); + return null; + }).when(cnx).execute(any(Runnable.class)); + + BrokerService brokerService = mock(BrokerService.class); + when(cnx.getBrokerService()).thenReturn(brokerService); + EventLoopGroup eventLoopGroup = mock(EventLoopGroup.class); + when(brokerService.executor()).thenReturn(eventLoopGroup); + when(eventLoopGroup.next()).thenReturn(scheduler); + + limiter.update(new PublishRate(0, 1)); + manualClock.addAndGet(TimeUnit.SECONDS.toNanos(1)); + + limiter.handlePublishThrottling(p, 0, 100_000L); + assertEquals(unthrottleCalls.get(), 0); + + limiter.update(new PublishRate(0, 1_000_000)); + assertEquals(unthrottleCalls.get(), 1); + + AsyncTokenBucket byteBucket = limiter.getTokenBucketOnByte(); + assertNotNull(byteBucket); + } }
