Copilot commented on code in PR #16572:
URL: https://github.com/apache/pinot/pull/16572#discussion_r2268131355


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java:
##########
@@ -115,12 +124,22 @@ private void createOrUpdateServerRateLimiter(double 
serverRateLimit, ServerMetri
         // result of it, But the metric related to throttling won't be emitted 
since as a result of here above the
         // AsyncMetricEmitter will be closed. It's recommended to forceCommit 
segments to avoid this.
       }
+      return;
+    }
+
+    if (currentRateLimiter instanceof ServerRateLimiter) {
+      ServerRateLimiter existingLimiter = (ServerRateLimiter) 
_serverRateLimiter;
+      existingLimiter.updateRateLimit(serverRateLimit, throttlingStrategy);

Review Comment:
   The code checks if currentRateLimiter is an instance of ServerRateLimiter 
and then creates a new ServerRateLimiter instead of updating the existing one. 
This creates a new rate limiter without closing the old one, potentially 
causing resource leaks.
   ```suggestion
         existingLimiter.updateRateLimit(serverRateLimit, throttlingStrategy);
         existingLimiter.close();
   ```



##########
pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManagerTest.java:
##########
@@ -72,7 +72,15 @@ public class RealtimeConsumptionRateManagerTest {
 
     
when(SERVER_CONFIG_1.getProperty(CommonConstants.Server.CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT,
         
CommonConstants.Server.DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT)).thenReturn(5.0);
-    
when(SERVER_CONFIG_2.getProperty(CommonConstants.Server.CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT,
+    if (Math.random() < 0.5) {
+      
when(SERVER_CONFIG_1.getProperty(CommonConstants.Server.CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT_BYTES,
+          
CommonConstants.Server.DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT)).thenReturn(5.0);
+    }
+    if (Math.random() < 0.5) {
+      
when(SERVER_CONFIG_2.getProperty(CommonConstants.Server.CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT,
+          
CommonConstants.Server.DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT)).thenReturn(2.5);
+    }

Review Comment:
   Using Math.random() in tests makes them non-deterministic, which can lead to 
flaky tests that are hard to debug and reproduce. Tests should be deterministic 
and predictable.
   ```suggestion
       
when(SERVER_CONFIG_1.getProperty(CommonConstants.Server.CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT_BYTES,
           
CommonConstants.Server.DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT)).thenReturn(5.0);
       
when(SERVER_CONFIG_2.getProperty(CommonConstants.Server.CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT,
           
CommonConstants.Server.DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT)).thenReturn(2.5);
   ```



##########
pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/ServerRateLimitConfigChangeListenerTest.java:
##########
@@ -41,14 +43,17 @@ public class ServerRateLimitConfigChangeListenerTest {
   private static final double DELTA = 0.0001;
   private static final ServerMetrics MOCK_SERVER_METRICS = 
mock(ServerMetrics.class);
 
-  static {
-    
when(SERVER_CONFIG.getProperty(CommonConstants.Server.CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT,
-        
CommonConstants.Server.DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT)).thenReturn(5.0);
-  }
-
   @Test
   public void testRateLimitUpdate()
       throws InterruptedException {
+    String rateLimiterConfigKey = 
CommonConstants.Server.CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT;
+    if (Math.random() < 0.5) {

Review Comment:
   Using Math.random() in tests makes them non-deterministic, which can lead to 
flaky tests that are hard to debug and reproduce. Tests should be deterministic 
and predictable.



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java:
##########
@@ -205,33 +224,62 @@ public QuotaUtilizationTracker(ServerMetrics 
serverMetrics, String metricKeyName
     /**
      * Update count and return utilization ratio percentage (0 if not enough 
data yet).
      */
-    public int update(int numMsgsConsumed, double rateLimit, Instant now) {
+    public int update(int unitsConsumed, double rateLimit, Instant now) {
       int ratioPercentage = 0;
       long nowInMinutes = now.getEpochSecond() / 60;
       if (nowInMinutes == _previousMinute) {
-        _aggregateNumMessages += numMsgsConsumed;
+        _aggregateUnits += unitsConsumed;
       } else {
         if (_previousMinute != -1) { // not first time
-          double actualRate = _aggregateNumMessages / ((nowInMinutes - 
_previousMinute) * 60.0); // messages per second
+          double actualRate = _aggregateUnits / ((nowInMinutes - 
_previousMinute) * 60.0); // units per second
           ratioPercentage = (int) Math.round(actualRate / rateLimit * 100);
           _serverMetrics.setValueOfTableGauge(_metricKeyName, 
ServerGauge.CONSUMPTION_QUOTA_UTILIZATION,
               ratioPercentage);
         }
-        _aggregateNumMessages = numMsgsConsumed;
+        _aggregateUnits = unitsConsumed;
         _previousMinute = nowInMinutes;
       }
       return ratioPercentage;
     }
 
     @VisibleForTesting
-    int getAggregateNumMessages() {
-      return _aggregateNumMessages;
+    int getAggregateUnits() {
+      return _aggregateUnits;
     }
   }
 
   @FunctionalInterface
   public interface ConsumptionRateLimiter {
-    void throttle(int numMsgs);
+    void throttle(MessageBatch messageBatch);
+  }
+
+  public interface ThrottlingStrategy {
+    int getUnits(MessageBatch messageBatch);
+  }
+
+  static final class MessageCountThrottlingStrategy implements 
ThrottlingStrategy {
+    public static final MessageCountThrottlingStrategy INSTANCE = new 
MessageCountThrottlingStrategy();
+
+    private MessageCountThrottlingStrategy() {
+    }
+
+    @Override
+    public int getUnits(MessageBatch messageBatch) {
+      return messageBatch.getMessageCount();
+    }
+  }
+
+
+  static class ByteCountThrottlingStrategy implements ThrottlingStrategy {
+    public static final ByteCountThrottlingStrategy INSTANCE = new 
ByteCountThrottlingStrategy();
+
+    private ByteCountThrottlingStrategy() {
+    }
+
+    @Override
+    public int getUnits(MessageBatch messageBatch) {
+      return (int) messageBatch.getSizeInBytes();

Review Comment:
   Casting long to int can cause data loss if getSizeInBytes() returns a value 
larger than Integer.MAX_VALUE. This could lead to incorrect throttling behavior 
for very large message batches.
   ```suggestion
         long sizeInBytes = messageBatch.getSizeInBytes();
         return (int) Math.min(sizeInBytes, Integer.MAX_VALUE);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to