This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f773208  Add Query Options for GroupBy In-Segment Trim  (#7052)
f773208 is described below

commit f7732082dde93a590488d1e5db2b583050c9fab1
Author: wuwenw <55009204+wuw...@users.noreply.github.com>
AuthorDate: Fri Jun 18 16:49:39 2021 -0400

    Add Query Options for GroupBy In-Segment Trim  (#7052)
    
    Adds query options for groupBy in-segment trim. Two options are added: 
segmentMinTrimSize and segmentEnableTrim.
    
    If segmentMinTrimSize is set to be positive in the query option, it will 
override the server config and trigger the trim based on the given number. If 
segmentEnableTrim is set to be true in the query option, it will trigger the 
trim based on the server config trim size if configured, or default trim size 
(5000).
    
    Example:
    minTrimSize in server config: 0
    Query: "SELECT metric_0, max(metric_1) FROM testTable GROUP BY metric_0 
ORDER BY max(metric_1) DESC LIMIT 50 OPTION(segmentMinTrimSize=1000)")
    => max(5*limit, 1000): 1000
    => so expectedSize from each segment: 1000
---
 .../query/AggregationGroupByOrderByOperator.java   | 27 +++++++++++++++++++--
 .../org/apache/pinot/core/util/QueryOptions.java   | 28 ++++++++++++++++++++++
 .../groupby/GroupByInSegmentTrimTest.java          | 14 +++++++++++
 .../apache/pinot/spi/utils/CommonConstants.java    |  2 ++
 4 files changed, 69 insertions(+), 2 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java
index ebc6947..c091d44 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.operator.query;
 
 import java.util.Collection;
+import java.util.Map;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.data.table.IntermediateRecord;
@@ -33,6 +34,8 @@ import 
org.apache.pinot.core.query.aggregation.groupby.DefaultGroupByExecutor;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByExecutor;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.startree.executor.StarTreeGroupByExecutor;
+import org.apache.pinot.core.util.GroupByUtils;
+import org.apache.pinot.core.util.QueryOptions;
 
 import static org.apache.pinot.core.util.GroupByUtils.getTableCapacity;
 
@@ -117,12 +120,13 @@ public class AggregationGroupByOrderByOperator extends 
BaseOperator<Intermediate
       groupByExecutor.process(transformBlock);
     }
 
+    int minSegmentTrimSize = calculateMinSegmentTrimSize();
     // There is no OrderBy or minSegmentTrimSize is set to be negative or 0
-    if (_queryContext.getOrderByExpressions() == null || _minSegmentTrimSize 
<= 0) {
+    if (_queryContext.getOrderByExpressions() == null || minSegmentTrimSize <= 
0) {
       // Build intermediate result block based on aggregation group-by result 
from the executor
       return new IntermediateResultsBlock(_aggregationFunctions, 
groupByExecutor.getResult(), _dataSchema);
     }
-    int trimSize = getTableCapacity(_queryContext.getLimit(), 
_minSegmentTrimSize);
+    int trimSize = getTableCapacity(_queryContext.getLimit(), 
minSegmentTrimSize);
     // Num of groups hasn't reached the threshold
     if (groupByExecutor.getNumGroups() <= trimSize) {
       return new IntermediateResultsBlock(_aggregationFunctions, 
groupByExecutor.getResult(), _dataSchema);
@@ -145,4 +149,23 @@ public class AggregationGroupByOrderByOperator extends 
BaseOperator<Intermediate
     return new ExecutionStatistics(_numDocsScanned, numEntriesScannedInFilter, 
numEntriesScannedPostFilter,
         _numTotalDocs);
   }
+
+  /**
+   * In query option, if a positive min trim size is given, we use it to 
override the server settings. Otherwise
+   * check if a simple boolean option is given and use default trim size.
+   */
+  private int calculateMinSegmentTrimSize() {
+    Map<String, String> options = _queryContext.getQueryOptions();
+    if (options == null) {
+      return _minSegmentTrimSize;
+    }
+    boolean queryOptionEnableTrim = QueryOptions.isEnableSegmentTrim(options);
+    int queryOptionTrimSize = QueryOptions.getMinSegmentTrimSize(options);
+    if (queryOptionTrimSize > 0) {
+      return queryOptionTrimSize;
+    } else if (queryOptionEnableTrim && _minSegmentTrimSize <= 0) {
+      return GroupByUtils.DEFAULT_MIN_NUM_GROUPS;
+    }
+    return _minSegmentTrimSize;
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/util/QueryOptions.java 
b/pinot-core/src/main/java/org/apache/pinot/core/util/QueryOptions.java
index cd7a285..77e75d7 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/util/QueryOptions.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/QueryOptions.java
@@ -33,6 +33,8 @@ public class QueryOptions {
   private final boolean _responseFormatSQL;
   private final boolean _preserveType;
   private final boolean _skipUpsert;
+  private final boolean _enableSegmentTrim;
+  private final int _minSegmentTrimSize;
 
   public QueryOptions(@Nullable Map<String, String> queryOptions) {
     if (queryOptions != null) {
@@ -41,12 +43,16 @@ public class QueryOptions {
       _responseFormatSQL = 
Request.SQL.equalsIgnoreCase(queryOptions.get(Request.QueryOptionKey.RESPONSE_FORMAT));
       _preserveType = 
Boolean.parseBoolean(queryOptions.get(Request.QueryOptionKey.PRESERVE_TYPE));
       _skipUpsert = 
Boolean.parseBoolean(queryOptions.get(Request.QueryOptionKey.SKIP_UPSERT));
+      _enableSegmentTrim = isEnableSegmentTrim(queryOptions);
+      _minSegmentTrimSize = getMinSegmentTrimSize(queryOptions);
     } else {
       _timeoutMs = null;
       _groupByModeSQL = false;
       _responseFormatSQL = false;
       _preserveType = false;
       _skipUpsert = false;
+      _enableSegmentTrim = false;
+      _minSegmentTrimSize = -1;
     }
   }
 
@@ -72,6 +78,16 @@ public class QueryOptions {
   }
 
   @Nullable
+  public Boolean isEnableSegmentTrim() {
+    return _enableSegmentTrim;
+  }
+
+  @Nullable
+  public Integer getMinSegmentTrimSize() {
+    return _minSegmentTrimSize;
+  }
+
+  @Nullable
   public static Long getTimeoutMs(Map<String, String> queryOptions) {
     String timeoutMsString = 
queryOptions.get(Request.QueryOptionKey.TIMEOUT_MS);
     if (timeoutMsString != null) {
@@ -82,4 +98,16 @@ public class QueryOptions {
       return null;
     }
   }
+
+  public static boolean isEnableSegmentTrim(Map<String, String> queryOptions) {
+    return 
Boolean.parseBoolean(queryOptions.get(Request.QueryOptionKey.ENABLE_SEGMENT_TRIM));
+  }
+
+  public static int getMinSegmentTrimSize(Map<String, String> queryOptions) {
+    String minSegmentTrimSize = 
queryOptions.get(Request.QueryOptionKey.MIN_SEGMENT_TRIM_SIZE);
+    if (minSegmentTrimSize != null) {
+      return Integer.parseInt(minSegmentTrimSize);
+    }
+    return -1;
+  }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/groupby/GroupByInSegmentTrimTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/groupby/GroupByInSegmentTrimTest.java
index c50ef8e..823c0d8 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/groupby/GroupByInSegmentTrimTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/groupby/GroupByInSegmentTrimTest.java
@@ -265,6 +265,20 @@ public class GroupByInSegmentTrimTest {
     expectedSize = 1000;
     data.add(new Object[]{trimSize, expectedResult.subList(0, expectedSize), 
queryContext});
 
+    // Testcase4: low limit + low server trim size + query option size
+    queryContext = QueryContextConverterUtils.getQueryContextFromSQL(
+        "SELECT metric_0, max(metric_1) FROM testTable GROUP BY metric_0 ORDER 
BY max(metric_1) DESC LIMIT 50 OPTION(minSegmentTrimSize=1000)");
+    trimSize = 0;
+    expectedSize = 1000;
+    data.add(new Object[]{trimSize, expectedResult.subList(0, expectedSize), 
queryContext});
+
+    // Testcase5: low limit + low server trim size + query option enable
+    queryContext = QueryContextConverterUtils.getQueryContextFromSQL(
+        "SELECT metric_0, max(metric_1) FROM testTable GROUP BY metric_0 ORDER 
BY max(metric_1) DESC LIMIT 50 OPTION(enableSegmentTrim=true)");
+    trimSize = 0;
+    expectedSize = 1000;
+    data.add(new Object[]{trimSize, expectedResult.subList(0, expectedSize), 
queryContext});
+
     return data.toArray(new Object[data.size()][]);
   }
 
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 147c551..478d555 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -213,6 +213,8 @@ public class CommonConstants {
         public static final String RESPONSE_FORMAT = "responseFormat";
         public static final String GROUP_BY_MODE = "groupByMode";
         public static final String SKIP_UPSERT = "skipUpsert";
+        public static final String ENABLE_SEGMENT_TRIM = "enableSegmentTrim";
+        public static final String MIN_SEGMENT_TRIM_SIZE = 
"minSegmentTrimSize";
       }
     }
   }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to