This is an automated email from the ASF dual-hosted git repository.

thetumbled pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 85937d8ea8e [fix][broker] Unthrottle producers immediately when 
publish rate limiting is disabled (#25502)
85937d8ea8e is described below

commit 85937d8ea8ecf07480f51df6c76654370e35c870
Author: Wenzhi Feng <[email protected]>
AuthorDate: Wed Apr 15 14:28:56 2026 +0800

    [fix][broker] Unthrottle producers immediately when publish rate limiting 
is disabled (#25502)
    
    Co-authored-by: fengwenzhi <[email protected]>
---
 .../broker/service/PublishRateLimiterImpl.java     |  20 +++-
 .../broker/service/PublishRateLimiterTest.java     | 124 +++++++++++++++++++++
 2 files changed, 142 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java
index 096418191dc..5d8fb4687b2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java
@@ -43,6 +43,12 @@ public class PublishRateLimiterImpl implements 
PublishRateLimiter {
 
     private final AtomicInteger throttledProducersCount = new AtomicInteger(0);
     private final AtomicBoolean processingQueuedProducers = new 
AtomicBoolean(false);
+
+    /**
+     * Executor used for the last {@link #scheduleUnthrottling} from this 
limiter (set when throttling starts).
+     * Used to schedule an immediate follow-up run after publish-rate limits 
change.
+     */
+    private volatile ScheduledExecutorService lastUnthrottleExecutor;
     private final Consumer<Producer> throttleAction;
     private final Consumer<Producer> unthrottleAction;
 
@@ -88,6 +94,7 @@ public class PublishRateLimiterImpl implements 
PublishRateLimiter {
         // this is to avoid scheduling unthrottling multiple times for 
concurrent producers
         if (throttledProducersCount.incrementAndGet() == 1) {
             ScheduledExecutorService executor = 
producer.getCnx().getBrokerService().executor().next();
+            lastUnthrottleExecutor = executor;
             scheduleUnthrottling(executor, calculateThrottlingDurationNanos());
         }
     }
@@ -167,12 +174,18 @@ public class PublishRateLimiterImpl implements 
PublishRateLimiter {
         update(maxPublishRate);
     }
 
+    private void scheduleImmediateUnthrottling() {
+        ScheduledExecutorService executor = lastUnthrottleExecutor;
+        if (executor != null) {
+            scheduleUnthrottling(executor, 0L);
+        }
+    }
+
     public void update(PublishRate maxPublishRate) {
         if (maxPublishRate != null) {
             updateTokenBuckets(maxPublishRate.publishThrottlingRateInMsg, 
maxPublishRate.publishThrottlingRateInByte);
         } else {
-            tokenBucketOnMessage = null;
-            tokenBucketOnByte = null;
+            updateTokenBuckets(0L, 0L);
         }
     }
 
@@ -189,6 +202,9 @@ public class PublishRateLimiterImpl implements 
PublishRateLimiter {
         } else {
             tokenBucketOnByte = null;
         }
+        // After any bucket rebuild, wake unthrottling:
+        // old scheduled delay may be invalid and cause unnecessary wait time 
for producers to be unthrottled.
+        scheduleImmediateUnthrottling();
     }
 
     @VisibleForTesting
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java
index 573e3980c73..270286419f7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java
@@ -19,19 +19,26 @@
 
 package org.apache.pulsar.broker.service;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.DefaultEventLoop;
 import io.netty.channel.EventLoop;
 import io.netty.channel.EventLoopGroup;
 import io.netty.util.concurrent.DefaultThreadFactory;
+import io.netty.util.concurrent.ScheduledFuture;
 import java.util.HashMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.pulsar.broker.qos.AsyncTokenBucket;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.PublishRate;
 import org.testng.annotations.AfterMethod;
@@ -149,4 +156,121 @@ public class PublishRateLimiterTest {
         });
         future.get(5, TimeUnit.SECONDS);
     }
+
+    /**
+     * When the token bucket is deeply depleted, the first scheduled 
unthrottle uses a long delay. Disabling limits
+     * must schedule an immediate unthrottle (delay 0) so producers are not 
stuck until that delay elapses.
+     */
+    @Test
+    public void 
shouldUnthrottleImmediatelyAfterDisablingLimitsDespiteLongPendingDelay() {
+        AtomicLong manualClock = new AtomicLong(TimeUnit.SECONDS.toNanos(100));
+        AtomicInteger unthrottleCalls = new AtomicInteger();
+
+        PublishRateLimiterImpl limiter = new PublishRateLimiterImpl(
+                manualClock::get,
+                p -> { },
+                p -> unthrottleCalls.incrementAndGet());
+
+        EventLoop scheduler = mock(EventLoop.class);
+        AtomicInteger longDelaySchedules = new AtomicInteger();
+        doAnswer(invocation -> {
+            Runnable task = invocation.getArgument(0);
+            long delay = invocation.getArgument(1);
+            TimeUnit unit = invocation.getArgument(2);
+            long delayNanos = unit.toNanos(delay);
+            if (delayNanos == 0L) {
+                task.run();
+            } else {
+                longDelaySchedules.incrementAndGet();
+            }
+            @SuppressWarnings("unchecked")
+            ScheduledFuture<?> scheduled = mock(ScheduledFuture.class);
+            return scheduled;
+        }).when(scheduler).schedule(any(Runnable.class), anyLong(), any());
+
+        Producer p = mock(Producer.class);
+        ServerCnx cnx = mock(ServerCnx.class);
+        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+        doAnswer(a -> ctx).when(cnx).ctx();
+        doAnswer(a -> cnx).when(p).getCnx();
+        when(p.getCnx()).thenReturn(cnx);
+        doAnswer(a -> {
+            ((Runnable) a.getArgument(0)).run();
+            return null;
+        }).when(cnx).execute(any(Runnable.class));
+
+        BrokerService brokerService = mock(BrokerService.class);
+        when(cnx.getBrokerService()).thenReturn(brokerService);
+        EventLoopGroup eventLoopGroup = mock(EventLoopGroup.class);
+        when(brokerService.executor()).thenReturn(eventLoopGroup);
+        when(eventLoopGroup.next()).thenReturn(scheduler);
+
+        limiter.update(new PublishRate(1, 0));
+        manualClock.addAndGet(TimeUnit.SECONDS.toNanos(1));
+
+        limiter.handlePublishThrottling(p, 100_000, 0L);
+        assertEquals(unthrottleCalls.get(), 0);
+        assertTrue(longDelaySchedules.get() >= 1,
+                "Expected a long-delay unthrottle to be scheduled while the 
bucket is deeply depleted");
+
+        limiter.update(new PublishRate(0, 0));
+        assertEquals(unthrottleCalls.get(), 1);
+    }
+
+    /**
+     * Relaxing only the byte limit still invalidates a previously scheduled 
long unthrottle delay; an immediate
+     * unthrottle pass must run after buckets are rebuilt.
+     */
+    @Test
+    public void 
shouldUnthrottleImmediatelyAfterRaisingByteLimitDespiteLongPendingDelay() {
+        AtomicLong manualClock = new AtomicLong(TimeUnit.SECONDS.toNanos(100));
+        AtomicInteger unthrottleCalls = new AtomicInteger();
+
+        PublishRateLimiterImpl limiter = new PublishRateLimiterImpl(
+                manualClock::get,
+                p -> { },
+                p -> unthrottleCalls.incrementAndGet());
+
+        EventLoop scheduler = mock(EventLoop.class);
+        doAnswer(invocation -> {
+            Runnable task = invocation.getArgument(0);
+            long delay = invocation.getArgument(1);
+            TimeUnit unit = invocation.getArgument(2);
+            if (unit.toNanos(delay) == 0L) {
+                task.run();
+            }
+            @SuppressWarnings("unchecked")
+            ScheduledFuture<?> scheduled = mock(ScheduledFuture.class);
+            return scheduled;
+        }).when(scheduler).schedule(any(Runnable.class), anyLong(), any());
+
+        Producer p = mock(Producer.class);
+        ServerCnx cnx = mock(ServerCnx.class);
+        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+        doAnswer(a -> ctx).when(cnx).ctx();
+        doAnswer(a -> cnx).when(p).getCnx();
+        when(p.getCnx()).thenReturn(cnx);
+        doAnswer(a -> {
+            ((Runnable) a.getArgument(0)).run();
+            return null;
+        }).when(cnx).execute(any(Runnable.class));
+
+        BrokerService brokerService = mock(BrokerService.class);
+        when(cnx.getBrokerService()).thenReturn(brokerService);
+        EventLoopGroup eventLoopGroup = mock(EventLoopGroup.class);
+        when(brokerService.executor()).thenReturn(eventLoopGroup);
+        when(eventLoopGroup.next()).thenReturn(scheduler);
+
+        limiter.update(new PublishRate(0, 1));
+        manualClock.addAndGet(TimeUnit.SECONDS.toNanos(1));
+
+        limiter.handlePublishThrottling(p, 0, 100_000L);
+        assertEquals(unthrottleCalls.get(), 0);
+
+        limiter.update(new PublishRate(0, 1_000_000));
+        assertEquals(unthrottleCalls.get(), 1);
+
+        AsyncTokenBucket byteBucket = limiter.getTokenBucketOnByte();
+        assertNotNull(byteBucket);
+    }
 }

Reply via email to