This is an automated email from the ASF dual-hosted git repository. siddteotia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 102a216366 Enhance the instrumentation for a corner case where the query doesn't go through DocIdSetOp (#10729) 102a216366 is described below commit 102a216366de72108429c4007ce634320854097a Author: Jia Guo <jia...@linkedin.com> AuthorDate: Mon May 8 22:23:43 2023 -0700 Enhance the instrumentation for a corner case where the query doesn't go through DocIdSetOp (#10729) * Enhance the instrumentation for a corner case where the query doesn't go through DocIdSetOp * Enhance the instrumentation for a corner case where the query doesn't go through DocIdSetOp * Trigger Test * Enhance the instrumentation for a corner case where the query doesn't go through DocIdSetOp * Enhance the instrumentation for a corner case where the query doesn't go through DocIdSetOp --- .../query/DictionaryBasedDistinctOperator.java | 28 ++++++++++++++-------- ...fflineClusterMemBasedServerQueryKilingTest.java | 23 ++++++++++++++---- 2 files changed, 37 insertions(+), 14 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedDistinctOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedDistinctOperator.java index e8f1e015ca..edb5f8a109 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedDistinctOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedDistinctOperator.java @@ -34,6 +34,7 @@ import org.apache.pinot.core.query.distinct.DistinctTable; import org.apache.pinot.segment.spi.AggregationFunctionType; import org.apache.pinot.segment.spi.index.reader.Dictionary; import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.trace.Tracing; /** @@ -98,23 +99,16 @@ public class DictionaryBasedDistinctOperator extends BaseOperator<DistinctResult records = new ArrayList<>(actualLimit); _numDocsScanned = actualLimit; - - for (int i = 0; i < actualLimit; i++) { - records.add(new Record(new Object[]{_dictionary.getInternal(i)})); - } + iterateOnDictionary(dictLength, actualLimit, records); } else { if (_dictionary.isSorted()) { records = new ArrayList<>(actualLimit); if (_isAscending) { _numDocsScanned = actualLimit; - for (int i = 0; i < actualLimit; i++) { - records.add(new Record(new Object[]{_dictionary.getInternal(i)})); - } + iterateOnDictionary(dictLength, actualLimit, records); } else { _numDocsScanned = actualLimit; - for (int i = dictLength - 1; i >= (dictLength - actualLimit); i--) { - records.add(new Record(new Object[]{_dictionary.getInternal(i)})); - } + iterateOnDictionaryDesc(dictLength, actualLimit, records); } } else { // DictionaryBasedDistinctOperator cannot handle nulls. @@ -134,6 +128,20 @@ public class DictionaryBasedDistinctOperator extends BaseOperator<DistinctResult return new DistinctTable(dataSchema, records, _nullHandlingEnabled); } + private void iterateOnDictionary(int dictLength, int actualLimit, List<Record> records) { + for (int i = 0; i < actualLimit; i++) { + Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(i); + records.add(new Record(new Object[]{_dictionary.getInternal(i)})); + } + } + + private void iterateOnDictionaryDesc(int dictLength, int actualLimit, List<Record> records) { + for (int i = dictLength - 1, j = 0; i >= (dictLength - actualLimit); i--, j++) { + Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(j); + records.add(new Record(new Object[]{_dictionary.getInternal(i)})); + } + } + @Override public String toExplainString() { return EXPLAIN_NAME; diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKilingTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKilingTest.java index bfcf96c962..8a07cc7c53 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKilingTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKilingTest.java @@ -68,10 +68,16 @@ public class OfflineClusterMemBasedServerQueryKilingTest extends BaseClusterInte public static final String BOOLEAN_DIM_SV1 = "booleanDimSV1"; private static final int NUM_BROKERS = 1; private static final int NUM_SERVERS = 1; +private static final int NUM_DOCS = 3_000_000; + private static final String OOM_QUERY = "SELECT PERCENTILETDigest(doubleDimSV1, 50) AS digest, intDimSV1 FROM mytable GROUP BY intDimSV1" + " ORDER BY digest LIMIT 30000"; + private static final String OOM_QUERY_2 = + "SELECT stringDimSV2 FROM mytable GROUP BY stringDimSV2" + + " ORDER BY stringDimSV2 LIMIT 1000000"; + private static final String DIGEST_QUERY_1 = "SELECT PERCENTILETDigest(doubleDimSV1, 50) AS digest FROM mytable"; private static final String COUNT_STAR_QUERY = @@ -151,7 +157,7 @@ public class OfflineClusterMemBasedServerQueryKilingTest extends BaseClusterInte serverConf.setProperty(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." + CommonConstants.Accounting.CONFIG_OF_ALARMING_LEVEL_HEAP_USAGE_RATIO, 0.0f); serverConf.setProperty(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." - + CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO, 0.60f); + + CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO, 0.25f); serverConf.setProperty( CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." + CommonConstants.Accounting.CONFIG_OF_FACTORY_NAME, "org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory"); @@ -188,7 +194,7 @@ public class OfflineClusterMemBasedServerQueryKilingTest extends BaseClusterInte protected long getCountStarResult() { - return 3_000_000; + return NUM_DOCS * 3; } protected String getTimeColumnName() { @@ -214,6 +220,15 @@ public class OfflineClusterMemBasedServerQueryKilingTest extends BaseClusterInte Assert.assertTrue(queryResponse.get("exceptions").toString().contains("got killed because")); } + @Test + public void testDigestOOM2() + throws Exception { + JsonNode queryResponse = postQuery(OOM_QUERY_2); + LOGGER.info("testDigestOOM: {}", queryResponse); + Assert.assertTrue(queryResponse.get("exceptions").toString().contains("QueryCancelledException")); + Assert.assertTrue(queryResponse.get("exceptions").toString().contains("got killed because")); + } + @Test public void testDigestOOMMultipleQueries() throws Exception { @@ -284,14 +299,14 @@ public class OfflineClusterMemBasedServerQueryKilingTest extends BaseClusterInte try (DataFileWriter<GenericData.Record> fileWriter = new DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) { fileWriter.create(avroSchema, avroFile); - int numDocs = 1_000_000; + int numDocs = NUM_DOCS; int randBound = numDocs / 2; Random random = new Random(0); IntStream randomInt = random.ints(0, 100_000); for (int docId = 0; docId < numDocs; docId++) { GenericData.Record record = new GenericData.Record(avroSchema); record.put(STRING_DIM_SV1, "test query killing"); - record.put(STRING_DIM_SV2, "test query killing"); + record.put(STRING_DIM_SV2, "test query killing" + docId); record.put(INT_DIM_SV1, random.nextInt(randBound)); record.put(LONG_DIM_SV1, random.nextLong()); record.put(DOUBLE_DIM_SV1, random.nextDouble()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org