nodece commented on code in PR #25502:
URL: https://github.com/apache/pulsar/pull/25502#discussion_r3056833100
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java:
##########
@@ -88,6 +89,7 @@ private void scheduleDecrementThrottleCount(Producer
producer) {
// this is to avoid scheduling unthrottling multiple times for
concurrent producers
if (throttledProducersCount.incrementAndGet() == 1) {
ScheduledExecutorService executor =
producer.getCnx().getBrokerService().executor().next();
Review Comment:
You should provide (and reuse) a dedicated `Executor` when creating the rate
limiter, for example:
```java
private final Executor executor;
public PublishRateLimiterImpl(MonotonicClock monotonicClock,
Consumer<Producer> throttleAction,
Consumer<Producer> unthrottleAction,
Executor executor) {
this.monotonicClock = monotonicClock;
this.throttleAction = throttleAction;
this.unthrottleAction = unthrottleAction;
this.executor = executor;
}
```
This ensures that all scheduling (e.g., unthrottling tasks) runs on a
controlled and consistent execution context, rather than relying on ad-hoc or
shared threads.
When publish rate limiting is disabled, you can immediately release any
throttled producers by invoking:
```java
scheduleUnthrottling(executor, 0L);
```
Using a delay of `0L` guarantees that the unthrottling task is executed
asynchronously but without delay, ensuring producers are promptly resumed
without blocking the caller thread.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]