Copilot commented on code in PR #16572:
URL: https://github.com/apache/pinot/pull/16572#discussion_r2268131355
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java:
##########
@@ -115,12 +124,22 @@ private void createOrUpdateServerRateLimiter(double
serverRateLimit, ServerMetri
// result of it, But the metric related to throttling won't be emitted
since as a result of here above the
// AsyncMetricEmitter will be closed. It's recommended to forceCommit
segments to avoid this.
}
+ return;
+ }
+
+ if (currentRateLimiter instanceof ServerRateLimiter) {
+ ServerRateLimiter existingLimiter = (ServerRateLimiter)
_serverRateLimiter;
+ existingLimiter.updateRateLimit(serverRateLimit, throttlingStrategy);
Review Comment:
The code checks if currentRateLimiter is an instance of ServerRateLimiter
and then creates a new ServerRateLimiter instead of updating the existing one.
This creates a new rate limiter without closing the old one, potentially
causing resource leaks.
```suggestion
existingLimiter.updateRateLimit(serverRateLimit, throttlingStrategy);
existingLimiter.close();
```
##########
pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManagerTest.java:
##########
@@ -72,7 +72,15 @@ public class RealtimeConsumptionRateManagerTest {
when(SERVER_CONFIG_1.getProperty(CommonConstants.Server.CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT,
CommonConstants.Server.DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT)).thenReturn(5.0);
-
when(SERVER_CONFIG_2.getProperty(CommonConstants.Server.CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT,
+ if (Math.random() < 0.5) {
+
when(SERVER_CONFIG_1.getProperty(CommonConstants.Server.CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT_BYTES,
+
CommonConstants.Server.DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT)).thenReturn(5.0);
+ }
+ if (Math.random() < 0.5) {
+
when(SERVER_CONFIG_2.getProperty(CommonConstants.Server.CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT,
+
CommonConstants.Server.DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT)).thenReturn(2.5);
+ }
Review Comment:
Using Math.random() in tests makes them non-deterministic, which can lead to
flaky tests that are hard to debug and reproduce. Tests should be deterministic
and predictable.
```suggestion
when(SERVER_CONFIG_1.getProperty(CommonConstants.Server.CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT_BYTES,
CommonConstants.Server.DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT)).thenReturn(5.0);
when(SERVER_CONFIG_2.getProperty(CommonConstants.Server.CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT,
CommonConstants.Server.DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT)).thenReturn(2.5);
```
##########
pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/ServerRateLimitConfigChangeListenerTest.java:
##########
@@ -41,14 +43,17 @@ public class ServerRateLimitConfigChangeListenerTest {
private static final double DELTA = 0.0001;
private static final ServerMetrics MOCK_SERVER_METRICS =
mock(ServerMetrics.class);
- static {
-
when(SERVER_CONFIG.getProperty(CommonConstants.Server.CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT,
-
CommonConstants.Server.DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT)).thenReturn(5.0);
- }
-
@Test
public void testRateLimitUpdate()
throws InterruptedException {
+ String rateLimiterConfigKey =
CommonConstants.Server.CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT;
+ if (Math.random() < 0.5) {
Review Comment:
Using Math.random() in tests makes them non-deterministic, which can lead to
flaky tests that are hard to debug and reproduce. Tests should be deterministic
and predictable.
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java:
##########
@@ -205,33 +224,62 @@ public QuotaUtilizationTracker(ServerMetrics
serverMetrics, String metricKeyName
/**
* Update count and return utilization ratio percentage (0 if not enough
data yet).
*/
- public int update(int numMsgsConsumed, double rateLimit, Instant now) {
+ public int update(int unitsConsumed, double rateLimit, Instant now) {
int ratioPercentage = 0;
long nowInMinutes = now.getEpochSecond() / 60;
if (nowInMinutes == _previousMinute) {
- _aggregateNumMessages += numMsgsConsumed;
+ _aggregateUnits += unitsConsumed;
} else {
if (_previousMinute != -1) { // not first time
- double actualRate = _aggregateNumMessages / ((nowInMinutes -
_previousMinute) * 60.0); // messages per second
+ double actualRate = _aggregateUnits / ((nowInMinutes -
_previousMinute) * 60.0); // units per second
ratioPercentage = (int) Math.round(actualRate / rateLimit * 100);
_serverMetrics.setValueOfTableGauge(_metricKeyName,
ServerGauge.CONSUMPTION_QUOTA_UTILIZATION,
ratioPercentage);
}
- _aggregateNumMessages = numMsgsConsumed;
+ _aggregateUnits = unitsConsumed;
_previousMinute = nowInMinutes;
}
return ratioPercentage;
}
@VisibleForTesting
- int getAggregateNumMessages() {
- return _aggregateNumMessages;
+ int getAggregateUnits() {
+ return _aggregateUnits;
}
}
@FunctionalInterface
public interface ConsumptionRateLimiter {
- void throttle(int numMsgs);
+ void throttle(MessageBatch messageBatch);
+ }
+
+ public interface ThrottlingStrategy {
+ int getUnits(MessageBatch messageBatch);
+ }
+
+ static final class MessageCountThrottlingStrategy implements
ThrottlingStrategy {
+ public static final MessageCountThrottlingStrategy INSTANCE = new
MessageCountThrottlingStrategy();
+
+ private MessageCountThrottlingStrategy() {
+ }
+
+ @Override
+ public int getUnits(MessageBatch messageBatch) {
+ return messageBatch.getMessageCount();
+ }
+ }
+
+
+ static class ByteCountThrottlingStrategy implements ThrottlingStrategy {
+ public static final ByteCountThrottlingStrategy INSTANCE = new
ByteCountThrottlingStrategy();
+
+ private ByteCountThrottlingStrategy() {
+ }
+
+ @Override
+ public int getUnits(MessageBatch messageBatch) {
+ return (int) messageBatch.getSizeInBytes();
Review Comment:
Casting long to int can cause data loss if getSizeInBytes() returns a value
larger than Integer.MAX_VALUE. This could lead to incorrect throttling behavior
for very large message batches.
```suggestion
long sizeInBytes = messageBatch.getSizeInBytes();
return (int) Math.min(sizeInBytes, Integer.MAX_VALUE);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]