This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b4066fbd40 Fix flaky quota integration tests (#14602)
b4066fbd40 is described below

commit b4066fbd40cce1e035a70968382fc24b3b7f4427
Author: Bolek Ziobrowski <26925920+bziobrow...@users.noreply.github.com>
AuthorDate: Tue Dec 10 23:06:35 2024 +0100

    Fix flaky quota integration tests (#14602)
---
 .../tests/QueryQuotaClusterIntegrationTest.java    | 53 ++++++++++++++--------
 1 file changed, 33 insertions(+), 20 deletions(-)

diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
index 8ac736e507..a40cbdf290 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
@@ -37,6 +37,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -234,30 +235,43 @@ public class QueryQuotaClusterIntegrationTest extends 
BaseClusterIntegrationTest
    * Then runs the query load with double the max rate and expects queries to 
fail due to quota breach.
    * @param maxRate max rate allowed by the quota
    */
-  void testQueryRate(float maxRate)
-      throws Exception {
+  void testQueryRate(int maxRate) {
     verifyQuotaUpdate(maxRate);
     runQueries(maxRate, false);
     //increase the qps and some of the queries should be throttled.
     runQueries(maxRate * 2, true);
   }
 
-  void testQueryRateOnBroker(float maxRate)
-      throws Exception {
+  void testQueryRateOnBroker(float maxRate) {
     verifyQuotaUpdate(maxRate);
     runQueriesOnBroker(maxRate, false);
     //increase the qps and some of the queries should be throttled.
     runQueriesOnBroker(maxRate * 2, true);
   }
 
+  // adjust sleep time depending on deadline and number of iterations left
+  private static void sleep(long deadline, double iterationsLeft) {
+    long time = System.currentTimeMillis();
+    long sleepDeadline = time + (long) Math.max(Math.ceil((deadline - time) / 
iterationsLeft), 0);
+
+    while (time < sleepDeadline) {
+      try {
+        Thread.sleep(sleepDeadline - time);
+      } catch (InterruptedException ie) {
+        throw new RuntimeException(ie);
+      }
+      time = System.currentTimeMillis();
+    }
+  }
+
   // try to keep the qps below 50 to ensure that the time lost between 2 query 
runs on top of the sleepMillis
   // is not comparable to sleepMillis, else the actual qps would end up being 
much lower than required qps
-  private void runQueries(double qps, boolean shouldFail)
-      throws Exception {
+  private void runQueries(int qps, boolean shouldFail) {
     int failCount = 0;
-    long sleepMillis = (long) (1000 / qps);
-    Thread.sleep(sleepMillis);
-    for (int i = 0; i < qps * 2; i++) {
+    long deadline = System.currentTimeMillis() + 1000;
+
+    for (int i = 0; i < qps; i++) {
+      sleep(deadline, qps - i);
       ResultSetGroup resultSetGroup =
           _pinotConnection.execute("SET applicationName='default'; SELECT 
COUNT(*) FROM " + getTableName());
       for (PinotClientException exception : resultSetGroup.getExceptions()) {
@@ -266,12 +280,12 @@ public class QueryQuotaClusterIntegrationTest extends 
BaseClusterIntegrationTest
           break;
         }
       }
-      Thread.sleep(sleepMillis);
     }
+
     if (shouldFail) {
-      assertTrue(failCount != 0, "Expected >0 failures for qps: " + qps);
+      assertTrue(failCount != 0, "Expected nonzero failures for qps: " + qps);
     } else {
-      assertTrue(failCount == 0, "Expected 0 failures for qps: " + qps);
+      Assert.assertEquals(failCount, 0, "Expected zero failures for qps: " + 
qps);
     }
   }
 
@@ -316,12 +330,12 @@ public class QueryQuotaClusterIntegrationTest extends 
BaseClusterIntegrationTest
     return _pinotClientTransport.executeQuery(_brokerHostPort, query);
   }
 
-  private void runQueriesOnBroker(double qps, boolean shouldFail)
-      throws Exception {
+  private void runQueriesOnBroker(float qps, boolean shouldFail) {
     int failCount = 0;
-    long sleepMillis = (long) (1000 / qps);
-    Thread.sleep(sleepMillis);
-    for (int i = 0; i < qps * 2; i++) {
+    long deadline = System.currentTimeMillis() + 1000;
+
+    for (int i = 0; i < qps; i++) {
+      sleep(deadline, qps - i);
       BrokerResponse resultSetGroup =
           executeQueryOnBroker("SET applicationName='default'; SELECT COUNT(*) 
FROM " + getTableName());
       for (Iterator<JsonNode> it = resultSetGroup.getExceptions().elements(); 
it.hasNext(); ) {
@@ -331,13 +345,12 @@ public class QueryQuotaClusterIntegrationTest extends 
BaseClusterIntegrationTest
           break;
         }
       }
-      Thread.sleep(sleepMillis);
     }
 
     if (shouldFail) {
-      assertTrue(failCount != 0, "Expected >0 failures for qps: " + qps);
+      assertTrue(failCount != 0, "Expected nonzero failures for qps: " + qps);
     } else {
-      assertTrue(failCount == 0, "Expected 0 failures for qps: " + qps);
+      Assert.assertEquals(failCount, 0, "Expected 0 failures for qps: " + qps);
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to