This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new b4066fbd40 Fix flaky quota integration tests (#14602) b4066fbd40 is described below commit b4066fbd40cce1e035a70968382fc24b3b7f4427 Author: Bolek Ziobrowski <26925920+bziobrow...@users.noreply.github.com> AuthorDate: Tue Dec 10 23:06:35 2024 +0100 Fix flaky quota integration tests (#14602) --- .../tests/QueryQuotaClusterIntegrationTest.java | 53 ++++++++++++++-------- 1 file changed, 33 insertions(+), 20 deletions(-) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java index 8ac736e507..a40cbdf290 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java @@ -37,6 +37,7 @@ import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.util.TestUtils; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -234,30 +235,43 @@ public class QueryQuotaClusterIntegrationTest extends BaseClusterIntegrationTest * Then runs the query load with double the max rate and expects queries to fail due to quota breach. * @param maxRate max rate allowed by the quota */ - void testQueryRate(float maxRate) - throws Exception { + void testQueryRate(int maxRate) { verifyQuotaUpdate(maxRate); runQueries(maxRate, false); //increase the qps and some of the queries should be throttled. runQueries(maxRate * 2, true); } - void testQueryRateOnBroker(float maxRate) - throws Exception { + void testQueryRateOnBroker(float maxRate) { verifyQuotaUpdate(maxRate); runQueriesOnBroker(maxRate, false); //increase the qps and some of the queries should be throttled. runQueriesOnBroker(maxRate * 2, true); } + // adjust sleep time depending on deadline and number of iterations left + private static void sleep(long deadline, double iterationsLeft) { + long time = System.currentTimeMillis(); + long sleepDeadline = time + (long) Math.max(Math.ceil((deadline - time) / iterationsLeft), 0); + + while (time < sleepDeadline) { + try { + Thread.sleep(sleepDeadline - time); + } catch (InterruptedException ie) { + throw new RuntimeException(ie); + } + time = System.currentTimeMillis(); + } + } + // try to keep the qps below 50 to ensure that the time lost between 2 query runs on top of the sleepMillis // is not comparable to sleepMillis, else the actual qps would end up being much lower than required qps - private void runQueries(double qps, boolean shouldFail) - throws Exception { + private void runQueries(int qps, boolean shouldFail) { int failCount = 0; - long sleepMillis = (long) (1000 / qps); - Thread.sleep(sleepMillis); - for (int i = 0; i < qps * 2; i++) { + long deadline = System.currentTimeMillis() + 1000; + + for (int i = 0; i < qps; i++) { + sleep(deadline, qps - i); ResultSetGroup resultSetGroup = _pinotConnection.execute("SET applicationName='default'; SELECT COUNT(*) FROM " + getTableName()); for (PinotClientException exception : resultSetGroup.getExceptions()) { @@ -266,12 +280,12 @@ public class QueryQuotaClusterIntegrationTest extends BaseClusterIntegrationTest break; } } - Thread.sleep(sleepMillis); } + if (shouldFail) { - assertTrue(failCount != 0, "Expected >0 failures for qps: " + qps); + assertTrue(failCount != 0, "Expected nonzero failures for qps: " + qps); } else { - assertTrue(failCount == 0, "Expected 0 failures for qps: " + qps); + Assert.assertEquals(failCount, 0, "Expected zero failures for qps: " + qps); } } @@ -316,12 +330,12 @@ public class QueryQuotaClusterIntegrationTest extends BaseClusterIntegrationTest return _pinotClientTransport.executeQuery(_brokerHostPort, query); } - private void runQueriesOnBroker(double qps, boolean shouldFail) - throws Exception { + private void runQueriesOnBroker(float qps, boolean shouldFail) { int failCount = 0; - long sleepMillis = (long) (1000 / qps); - Thread.sleep(sleepMillis); - for (int i = 0; i < qps * 2; i++) { + long deadline = System.currentTimeMillis() + 1000; + + for (int i = 0; i < qps; i++) { + sleep(deadline, qps - i); BrokerResponse resultSetGroup = executeQueryOnBroker("SET applicationName='default'; SELECT COUNT(*) FROM " + getTableName()); for (Iterator<JsonNode> it = resultSetGroup.getExceptions().elements(); it.hasNext(); ) { @@ -331,13 +345,12 @@ public class QueryQuotaClusterIntegrationTest extends BaseClusterIntegrationTest break; } } - Thread.sleep(sleepMillis); } if (shouldFail) { - assertTrue(failCount != 0, "Expected >0 failures for qps: " + qps); + assertTrue(failCount != 0, "Expected nonzero failures for qps: " + qps); } else { - assertTrue(failCount == 0, "Expected 0 failures for qps: " + qps); + Assert.assertEquals(failCount, 0, "Expected 0 failures for qps: " + qps); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org