lhotari commented on code in PR #25502:
URL: https://github.com/apache/pulsar/pull/25502#discussion_r3077548261
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java:
##########
@@ -167,12 +169,22 @@ public void update(Policies policies, String clusterName)
{
update(maxPublishRate);
}
+ private void unthrottleImmediately() {
Review Comment:
rename to `scheduleImmediateUnthrottling` so that it better conveys the
meaning. In updateTokenBuckets, this method should be called whenever the rates
change.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java:
##########
@@ -189,6 +201,9 @@ protected void updateTokenBuckets(long
publishThrottlingRateInMsg, long publishT
} else {
tokenBucketOnByte = null;
}
+ if (tokenBucketOnByte == null && tokenBucketOnMessage == null) {
Review Comment:
remove the condition since either limit might be impacting the throttling.
Let's say if it's throttled on bytes and just that limit gets increased, it
impacts the throttling decision. `scheduleUnthrottling` will handle it
correctly.
The call to `scheduleImmediateUnthrottling` (new name) should be made in
every case. The modification in `update` method also relies on this.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java:
##########
@@ -167,12 +169,22 @@ public void update(Policies policies, String clusterName)
{
update(maxPublishRate);
}
+ private void unthrottleImmediately() {
+ ScheduledExecutorService executor = lastUnthrottleExecutor;
+ if (executor != null) {
+ // Wake the existing unthrottle path without waiting on an old
delay.
+ scheduleUnthrottling(executor, 0L);
+ }
+ // If executor is null, no throttle has happened yet on this limiter
+ }
+
public void update(PublishRate maxPublishRate) {
if (maxPublishRate != null) {
updateTokenBuckets(maxPublishRate.publishThrottlingRateInMsg,
maxPublishRate.publishThrottlingRateInByte);
} else {
tokenBucketOnMessage = null;
tokenBucketOnByte = null;
+ unthrottleImmediately();
Review Comment:
this should be changed to `updateTokenBuckets(0, 0);` to avoid code
duplication.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]