This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f9655b9c04 Allow configurable initial capacity for IndexedTable 
(#14620)
f9655b9c04 is described below

commit f9655b9c0412797cf43890caa8d4296e0f1b48f3
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Mon Dec 9 03:42:11 2024 -0800

    Allow configurable initial capacity for IndexedTable (#14620)
---
 .../org/apache/pinot/common/utils/HashUtil.java    |  10 +-
 .../common/utils/config/QueryOptionsUtils.java     |   6 +
 .../core/data/table/ConcurrentIndexedTable.java    |  10 +-
 .../apache/pinot/core/data/table/IndexedTable.java |  36 +----
 .../pinot/core/data/table/SimpleIndexedTable.java  |   9 +-
 .../table/UnboundedConcurrentIndexedTable.java     |   8 +-
 .../blocks/results/GroupByResultsBlock.java        |  10 ++
 .../operator/combine/GroupByCombineOperator.java   |  42 +-----
 .../streaming/StreamingGroupByCombineOperator.java |  41 +-----
 .../core/plan/maker/InstancePlanMakerImplV2.java   |  64 +++++++--
 .../groupby/AggregationGroupByResult.java          |   4 +
 .../pinot/core/query/reduce/BaseReduceService.java |   4 +
 .../core/query/reduce/BrokerReduceService.java     |   7 +-
 .../core/query/reduce/DataTableReducerContext.java |   8 +-
 .../core/query/reduce/GroupByDataTableReducer.java |  53 ++-----
 .../core/query/reduce/StreamingReduceService.java  |   7 +-
 .../core/query/request/context/QueryContext.java   |  10 ++
 .../org/apache/pinot/core/util/GroupByUtils.java   | 159 +++++++++++++++++++++
 .../accounting/ResourceManagerAccountingTest.java  |   4 +-
 .../pinot/core/data/table/IndexedTableTest.java    |  79 +++-------
 .../apache/pinot/core/util/GroupByUtilsTest.java   |  34 +++++
 .../org/apache/pinot/queries/ExprMinMaxTest.java   |   5 +-
 ...terSegmentAggregationMultiValueQueriesTest.java |   8 +-
 ...SegmentAggregationMultiValueRawQueriesTest.java |   8 +-
 ...erSegmentAggregationSingleValueQueriesTest.java |   8 +-
 .../InterSegmentGroupByMultiValueQueriesTest.java  |  10 +-
 ...nterSegmentGroupByMultiValueRawQueriesTest.java |  10 +-
 .../InterSegmentGroupBySingleValueQueriesTest.java |  10 +-
 .../tests/OfflineGRPCServerIntegrationTest.java    |   2 +-
 .../apache/pinot/perf/BenchmarkCombineGroupBy.java |   6 +-
 .../apache/pinot/perf/BenchmarkIndexedTable.java   |   6 +-
 .../apache/pinot/query/runtime/QueryRunner.java    |  15 ++
 .../apache/pinot/spi/utils/CommonConstants.java    |   8 +-
 33 files changed, 410 insertions(+), 291 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/HashUtil.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/HashUtil.java
index 9c8d227c9b..a8c5cc9985 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/HashUtil.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/HashUtil.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.common.utils;
 
+import com.google.common.primitives.Ints;
 import java.nio.ByteBuffer;
 import java.nio.IntBuffer;
 
@@ -44,9 +45,16 @@ public class HashUtil {
   /**
    * Returns a capacity that is sufficient to keep the map from being resized 
as long as it grows no larger than
    * expectedSize and the load factor is >= its default (0.75).
+   * NOTE: Borrowed from Guava's Maps library {@code int capacity(int 
expectedSize)}.
    */
   public static int getHashMapCapacity(int expectedSize) {
-    return (int) ((float) expectedSize / 0.75f + 1f);
+    if (expectedSize < 3) {
+      return expectedSize + 1;
+    }
+    if (expectedSize < Ints.MAX_POWER_OF_TWO) {
+      return (int) Math.ceil(expectedSize / 0.75);
+    }
+    return Integer.MAX_VALUE;
   }
 
   public static long compute(IntBuffer buff) {
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index 1ac9e6fab8..8dbd4bb402 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -275,6 +275,12 @@ public class QueryOptionsUtils {
     return 
Boolean.parseBoolean(queryOptions.get(QueryOptionKey.OPTIMIZE_MAX_INITIAL_RESULT_HOLDER_CAPACITY));
   }
 
+  @Nullable
+  public static Integer getMinInitialIndexedTableCapacity(Map<String, String> 
queryOptions) {
+    String minInitialIndexedTableCapacity = 
queryOptions.get(QueryOptionKey.MIN_INITIAL_INDEXED_TABLE_CAPACITY);
+    return 
checkedParseIntPositive(QueryOptionKey.MIN_INITIAL_INDEXED_TABLE_CAPACITY, 
minInitialIndexedTableCapacity);
+  }
+
   public static boolean shouldDropResults(Map<String, String> queryOptions) {
     return 
Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.QueryOptionKey.DROP_RESULTS));
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
index 119d47c79e..871eea7c26 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
@@ -32,14 +32,10 @@ public class ConcurrentIndexedTable extends IndexedTable {
   private final AtomicBoolean _noMoreNewRecords = new AtomicBoolean();
   private final ReentrantReadWriteLock _readWriteLock = new 
ReentrantReadWriteLock();
 
-  public ConcurrentIndexedTable(DataSchema dataSchema, QueryContext 
queryContext, int resultSize, int trimSize,
-      int trimThreshold) {
-    this(dataSchema, false, queryContext, resultSize, trimSize, trimThreshold);
-  }
-
   public ConcurrentIndexedTable(DataSchema dataSchema, boolean hasFinalInput, 
QueryContext queryContext, int resultSize,
-      int trimSize, int trimThreshold) {
-    super(dataSchema, hasFinalInput, queryContext, resultSize, trimSize, 
trimThreshold, new ConcurrentHashMap<>());
+      int trimSize, int trimThreshold, int initialCapacity) {
+    super(dataSchema, hasFinalInput, queryContext, resultSize, trimSize, 
trimThreshold,
+        new ConcurrentHashMap<>(initialCapacity));
   }
 
   /**
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
index dd961a0154..bce224eb3a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
@@ -26,13 +26,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.request.context.ExpressionContext;
-import org.apache.pinot.common.request.context.OrderByExpressionContext;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import org.apache.pinot.core.query.request.context.QueryContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
@@ -40,8 +37,6 @@ import org.slf4j.LoggerFactory;
  */
 @SuppressWarnings({"rawtypes", "unchecked"})
 public abstract class IndexedTable extends BaseTable {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(IndexedTable.class);
-
   protected final Map<Key, Record> _lookupMap;
   protected final boolean _hasFinalInput;
   protected final int _resultSize;
@@ -83,31 +78,12 @@ public abstract class IndexedTable extends BaseTable {
     assert groupByExpressions != null;
     _numKeyColumns = groupByExpressions.size();
     _aggregationFunctions = queryContext.getAggregationFunctions();
-    List<OrderByExpressionContext> orderByExpressions = 
queryContext.getOrderByExpressions();
-    if (orderByExpressions != null) {
-      // GROUP BY with ORDER BY
-      _hasOrderBy = true;
-      _tableResizer = new TableResizer(dataSchema, hasFinalInput, 
queryContext);
-      _trimSize = trimSize;
-      // trimThreshold is lower bounded by (2 * trimSize) in order to avoid 
excessive trimming. We don't modify trimSize
-      // in order to maintain the desired accuracy
-      if (trimSize > trimThreshold / 2) {
-        // Handle potential overflow
-        _trimThreshold = (2 * trimSize) > 0 ? 2 * trimSize : Integer.MAX_VALUE;
-        LOGGER.debug("Overriding group trim threshold to {}, since the 
configured value {} is less than twice the "
-            + "trim size ({})", _trimThreshold, trimThreshold, trimSize);
-      } else {
-        _trimThreshold = trimThreshold;
-      }
-    } else {
-      // GROUP BY without ORDER BY
-      // NOTE: The indexed table stops accepting records once the map size 
reaches resultSize, and there is no
-      //       resize/trim during upsert.
-      _hasOrderBy = false;
-      _tableResizer = null;
-      _trimSize = Integer.MAX_VALUE;
-      _trimThreshold = Integer.MAX_VALUE;
-    }
+    _hasOrderBy = queryContext.getOrderByExpressions() != null;
+    _tableResizer = _hasOrderBy ? new TableResizer(dataSchema, hasFinalInput, 
queryContext) : null;
+    // NOTE: Trim should be disabled when there is no ORDER BY
+    assert _hasOrderBy || (trimSize == Integer.MAX_VALUE && trimThreshold == 
Integer.MAX_VALUE);
+    _trimSize = trimSize;
+    _trimThreshold = trimThreshold;
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java
index 2163620225..df89c3a8e1 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java
@@ -30,14 +30,9 @@ import 
org.apache.pinot.core.query.request.context.QueryContext;
 @NotThreadSafe
 public class SimpleIndexedTable extends IndexedTable {
 
-  public SimpleIndexedTable(DataSchema dataSchema, QueryContext queryContext, 
int resultSize, int trimSize,
-      int trimThreshold) {
-    this(dataSchema, false, queryContext, resultSize, trimSize, trimThreshold);
-  }
-
   public SimpleIndexedTable(DataSchema dataSchema, boolean hasFinalInput, 
QueryContext queryContext, int resultSize,
-      int trimSize, int trimThreshold) {
-    super(dataSchema, hasFinalInput, queryContext, resultSize, trimSize, 
trimThreshold, new HashMap<>());
+      int trimSize, int trimThreshold, int initialCapacity) {
+    super(dataSchema, hasFinalInput, queryContext, resultSize, trimSize, 
trimThreshold, new HashMap<>(initialCapacity));
   }
 
   /**
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/UnboundedConcurrentIndexedTable.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/UnboundedConcurrentIndexedTable.java
index 67f82b2011..f397ac0e8c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/UnboundedConcurrentIndexedTable.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/UnboundedConcurrentIndexedTable.java
@@ -35,13 +35,9 @@ import 
org.apache.pinot.core.query.request.context.QueryContext;
  */
 public class UnboundedConcurrentIndexedTable extends ConcurrentIndexedTable {
 
-  public UnboundedConcurrentIndexedTable(DataSchema dataSchema, QueryContext 
queryContext, int resultSize) {
-    this(dataSchema, false, queryContext, resultSize);
-  }
-
   public UnboundedConcurrentIndexedTable(DataSchema dataSchema, boolean 
hasFinalInput, QueryContext queryContext,
-      int resultSize) {
-    super(dataSchema, hasFinalInput, queryContext, resultSize, 
Integer.MAX_VALUE, Integer.MAX_VALUE);
+      int resultSize, int initialCapacity) {
+    super(dataSchema, hasFinalInput, queryContext, resultSize, 
Integer.MAX_VALUE, Integer.MAX_VALUE, initialCapacity);
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
index dfc5faa289..b1bf65b7eb 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
@@ -120,6 +120,16 @@ public class GroupByResultsBlock extends BaseResultsBlock {
     return _table;
   }
 
+  public int getNumGroups() {
+    assert _aggregationGroupByResult != null || _intermediateRecords != null
+        : "Should not call getNumGroups() on instance level results";
+    if (_aggregationGroupByResult != null) {
+      return _aggregationGroupByResult.getNumGroups();
+    } else {
+      return _intermediateRecords.size();
+    }
+  }
+
   public boolean isNumGroupsLimitReached() {
     return _numGroupsLimitReached;
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
index ecb0a56cbf..aeef763ad5 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
@@ -26,15 +26,11 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
 import org.apache.pinot.core.data.table.IndexedTable;
 import org.apache.pinot.core.data.table.IntermediateRecord;
 import org.apache.pinot.core.data.table.Key;
 import org.apache.pinot.core.data.table.Record;
-import org.apache.pinot.core.data.table.SimpleIndexedTable;
-import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
 import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator;
 import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
 import org.apache.pinot.core.operator.blocks.results.ExceptionResultsBlock;
@@ -61,8 +57,6 @@ public class GroupByCombineOperator extends 
BaseSingleBlockCombineOperator<Group
   private static final Logger LOGGER = 
LoggerFactory.getLogger(GroupByCombineOperator.class);
   private static final String EXPLAIN_NAME = "COMBINE_GROUP_BY";
 
-  private final int _trimSize;
-  private final int _trimThreshold;
   private final int _numAggregationFunctions;
   private final int _numGroupByExpressions;
   private final int _numColumns;
@@ -76,25 +70,6 @@ public class GroupByCombineOperator extends 
BaseSingleBlockCombineOperator<Group
   public GroupByCombineOperator(List<Operator> operators, QueryContext 
queryContext, ExecutorService executorService) {
     super(null, operators, overrideMaxExecutionThreads(queryContext, 
operators.size()), executorService);
 
-    int minTrimSize = queryContext.getMinServerGroupTrimSize();
-    if (minTrimSize > 0) {
-      int limit = queryContext.getLimit();
-      if ((!queryContext.isServerReturnFinalResult() && 
queryContext.getOrderByExpressions() != null)
-          || queryContext.getHavingFilter() != null) {
-        _trimSize = GroupByUtils.getTableCapacity(limit, minTrimSize);
-      } else {
-        // TODO: Keeping only 'LIMIT' groups can cause inaccurate result 
because the groups are randomly selected
-        //       without ordering. Consider ordering on group-by columns if no 
ordering is specified.
-        _trimSize = limit;
-      }
-      int trimThreshold = queryContext.getGroupTrimThreshold();
-      _trimThreshold = trimThreshold > 0 ? trimThreshold : Integer.MAX_VALUE;
-    } else {
-      // Server trim is disabled
-      _trimSize = Integer.MAX_VALUE;
-      _trimThreshold = Integer.MAX_VALUE;
-    }
-
     AggregationFunction[] aggregationFunctions = 
_queryContext.getAggregationFunctions();
     assert aggregationFunctions != null;
     _numAggregationFunctions = aggregationFunctions.length;
@@ -136,22 +111,7 @@ public class GroupByCombineOperator extends 
BaseSingleBlockCombineOperator<Group
         if (_indexedTable == null) {
           synchronized (this) {
             if (_indexedTable == null) {
-              DataSchema dataSchema = resultsBlock.getDataSchema();
-              // NOTE: Use trimSize as resultSize on server side.
-              if (_numTasks == 1) {
-                _indexedTable = new SimpleIndexedTable(dataSchema, 
_queryContext, _trimSize, _trimSize, _trimThreshold);
-              } else {
-                if (_trimThreshold >= MAX_TRIM_THRESHOLD) {
-                  // special case of trim threshold where it is set to max 
value.
-                  // there won't be any trimming during upsert in this case.
-                  // thus we can avoid the overhead of read-lock and write-lock
-                  // in the upsert method.
-                  _indexedTable = new 
UnboundedConcurrentIndexedTable(dataSchema, _queryContext, _trimSize);
-                } else {
-                  _indexedTable =
-                      new ConcurrentIndexedTable(dataSchema, _queryContext, 
_trimSize, _trimSize, _trimThreshold);
-                }
-              }
+              _indexedTable = 
GroupByUtils.createIndexedTableForCombineOperator(resultsBlock, _queryContext, 
_numTasks);
             }
           }
         }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingGroupByCombineOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingGroupByCombineOperator.java
index 1e8c88e9ce..13b06ae6f4 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingGroupByCombineOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingGroupByCombineOperator.java
@@ -27,15 +27,11 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import org.apache.pinot.common.exception.QueryException;
-import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
 import org.apache.pinot.core.data.table.IndexedTable;
 import org.apache.pinot.core.data.table.IntermediateRecord;
 import org.apache.pinot.core.data.table.Key;
 import org.apache.pinot.core.data.table.Record;
-import org.apache.pinot.core.data.table.SimpleIndexedTable;
-import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
 import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator;
 import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
 import org.apache.pinot.core.operator.blocks.results.ExceptionResultsBlock;
@@ -66,8 +62,6 @@ public class StreamingGroupByCombineOperator extends 
BaseStreamingCombineOperato
   private static final Logger LOGGER = 
LoggerFactory.getLogger(StreamingGroupByCombineOperator.class);
   private static final String EXPLAIN_NAME = "STREAMING_COMBINE_GROUP_BY";
 
-  private final int _trimSize;
-  private final int _trimThreshold;
   private final int _numAggregationFunctions;
   private final int _numGroupByExpressions;
   private final int _numColumns;
@@ -83,24 +77,6 @@ public class StreamingGroupByCombineOperator extends 
BaseStreamingCombineOperato
       ExecutorService executorService) {
     super(null, operators, overrideMaxExecutionThreads(queryContext, 
operators.size()), executorService);
 
-    int minTrimSize = queryContext.getMinServerGroupTrimSize();
-    if (minTrimSize > 0) {
-      int limit = queryContext.getLimit();
-      if ((!queryContext.isServerReturnFinalResult() && 
queryContext.getOrderByExpressions() != null)
-          || queryContext.getHavingFilter() != null) {
-        _trimSize = GroupByUtils.getTableCapacity(limit, minTrimSize);
-      } else {
-        // TODO: Keeping only 'LIMIT' groups can cause inaccurate result 
because the groups are randomly selected
-        //       without ordering. Consider ordering on group-by columns if no 
ordering is specified.
-        _trimSize = limit;
-      }
-      _trimThreshold = queryContext.getGroupTrimThreshold();
-    } else {
-      // Server trim is disabled
-      _trimSize = Integer.MAX_VALUE;
-      _trimThreshold = Integer.MAX_VALUE;
-    }
-
     AggregationFunction[] aggregationFunctions = 
_queryContext.getAggregationFunctions();
     assert aggregationFunctions != null;
     _numAggregationFunctions = aggregationFunctions.length;
@@ -163,22 +139,7 @@ public class StreamingGroupByCombineOperator extends 
BaseStreamingCombineOperato
         if (_indexedTable == null) {
           synchronized (this) {
             if (_indexedTable == null) {
-              DataSchema dataSchema = resultsBlock.getDataSchema();
-              // NOTE: Use trimSize as resultSize on server side.
-              if (_numTasks == 1) {
-                _indexedTable = new SimpleIndexedTable(dataSchema, 
_queryContext, _trimSize, _trimSize, _trimThreshold);
-              } else {
-                if (_trimThreshold >= MAX_TRIM_THRESHOLD) {
-                  // special case of trim threshold where it is set to max 
value.
-                  // there won't be any trimming during upsert in this case.
-                  // thus we can avoid the overhead of read-lock and write-lock
-                  // in the upsert method.
-                  _indexedTable = new 
UnboundedConcurrentIndexedTable(dataSchema, _queryContext, _trimSize);
-                } else {
-                  _indexedTable =
-                      new ConcurrentIndexedTable(dataSchema, _queryContext, 
_trimSize, _trimSize, _trimThreshold);
-                }
-              }
+              _indexedTable = 
GroupByUtils.createIndexedTableForCombineOperator(resultsBlock, _queryContext, 
_numTasks);
             }
           }
         }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
index e76a649886..cadce4bcf6 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
@@ -71,6 +71,8 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
 
   public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY_KEY = 
"max.init.group.holder.capacity";
   public static final int DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY = 10_000;
+  public static final String MIN_INITIAL_INDEXED_TABLE_CAPACITY_KEY = 
"min.init.indexed.table.capacity";
+  public static final int DEFAULT_MIN_INITIAL_INDEXED_TABLE_CAPACITY = 128;
   public static final String NUM_GROUPS_LIMIT_KEY = "num.groups.limit";
   public static final int DEFAULT_NUM_GROUPS_LIMIT = 100_000;
 
@@ -93,6 +95,7 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
   private final FetchPlanner _fetchPlanner = FetchPlannerRegistry.getPlanner();
   private int _maxExecutionThreads = DEFAULT_MAX_EXECUTION_THREADS;
   private int _maxInitialResultHolderCapacity = 
DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY;
+  private int _minInitialIndexedTableCapacity = 
DEFAULT_MIN_INITIAL_INDEXED_TABLE_CAPACITY;
   // Limit on number of groups stored for each segment, beyond which no new 
group will be created
   private int _numGroupsLimit = DEFAULT_NUM_GROUPS_LIMIT;
   // Used for SQL GROUP BY (server combine)
@@ -103,25 +106,20 @@ public class InstancePlanMakerImplV2 implements PlanMaker 
{
   public InstancePlanMakerImplV2() {
   }
 
-  @VisibleForTesting
-  public InstancePlanMakerImplV2(int maxInitialResultHolderCapacity, int 
numGroupsLimit, int minSegmentGroupTrimSize,
-      int minServerGroupTrimSize, int groupByTrimThreshold) {
-    _maxInitialResultHolderCapacity = maxInitialResultHolderCapacity;
-    _numGroupsLimit = numGroupsLimit;
-    _minSegmentGroupTrimSize = minSegmentGroupTrimSize;
-    _minServerGroupTrimSize = minServerGroupTrimSize;
-    _groupByTrimThreshold = groupByTrimThreshold;
-  }
-
   @Override
   public void init(PinotConfiguration queryExecutorConfig) {
     _maxExecutionThreads = 
queryExecutorConfig.getProperty(MAX_EXECUTION_THREADS_KEY, 
DEFAULT_MAX_EXECUTION_THREADS);
     _maxInitialResultHolderCapacity = 
queryExecutorConfig.getProperty(MAX_INITIAL_RESULT_HOLDER_CAPACITY_KEY,
         DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY);
+    _minInitialIndexedTableCapacity = 
queryExecutorConfig.getProperty(MIN_INITIAL_INDEXED_TABLE_CAPACITY_KEY,
+        DEFAULT_MIN_INITIAL_INDEXED_TABLE_CAPACITY);
     _numGroupsLimit = queryExecutorConfig.getProperty(NUM_GROUPS_LIMIT_KEY, 
DEFAULT_NUM_GROUPS_LIMIT);
     Preconditions.checkState(_maxInitialResultHolderCapacity <= 
_numGroupsLimit,
         "Invalid configuration: maxInitialResultHolderCapacity: %d must be 
smaller or equal to numGroupsLimit: %d",
         _maxInitialResultHolderCapacity, _numGroupsLimit);
+    Preconditions.checkState(_minInitialIndexedTableCapacity <= 
_numGroupsLimit,
+        "Invalid configuration: minInitialIndexedTableCapacity: %d must be 
smaller or equal to numGroupsLimit: %d",
+        _minInitialIndexedTableCapacity, _numGroupsLimit);
     _minSegmentGroupTrimSize =
         queryExecutorConfig.getProperty(MIN_SEGMENT_GROUP_TRIM_SIZE_KEY, 
DEFAULT_MIN_SEGMENT_GROUP_TRIM_SIZE);
     _minServerGroupTrimSize =
@@ -135,6 +133,36 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
         _minServerGroupTrimSize, _groupByTrimThreshold);
   }
 
+  @VisibleForTesting
+  public void setMaxInitialResultHolderCapacity(int 
maxInitialResultHolderCapacity) {
+    _maxInitialResultHolderCapacity = maxInitialResultHolderCapacity;
+  }
+
+  @VisibleForTesting
+  public void setMinInitialIndexedTableCapacity(int 
minInitialIndexedTableCapacity) {
+    _minInitialIndexedTableCapacity = minInitialIndexedTableCapacity;
+  }
+
+  @VisibleForTesting
+  public void setNumGroupsLimit(int numGroupsLimit) {
+    _numGroupsLimit = numGroupsLimit;
+  }
+
+  @VisibleForTesting
+  public void setMinSegmentGroupTrimSize(int minSegmentGroupTrimSize) {
+    _minSegmentGroupTrimSize = minSegmentGroupTrimSize;
+  }
+
+  @VisibleForTesting
+  public void setMinServerGroupTrimSize(int minServerGroupTrimSize) {
+    _minServerGroupTrimSize = minServerGroupTrimSize;
+  }
+
+  @VisibleForTesting
+  public void setGroupByTrimThreshold(int groupByTrimThreshold) {
+    _groupByTrimThreshold = groupByTrimThreshold;
+  }
+
   public Plan makeInstancePlan(List<SegmentContext> segmentContexts, 
QueryContext queryContext,
       ExecutorService executorService, ServerMetrics serverMetrics) {
     applyQueryOptions(queryContext);
@@ -196,12 +224,19 @@ public class InstancePlanMakerImplV2 implements PlanMaker 
{
     // Set group-by query options
     if (QueryContextUtils.isAggregationQuery(queryContext) && 
queryContext.getGroupByExpressions() != null) {
       // Set maxInitialResultHolderCapacity
-      Integer initResultCap = 
QueryOptionsUtils.getMaxInitialResultHolderCapacity(queryOptions);
-      if (initResultCap != null) {
-        queryContext.setMaxInitialResultHolderCapacity(initResultCap);
+      Integer maxInitialResultHolderCapacity = 
QueryOptionsUtils.getMaxInitialResultHolderCapacity(queryOptions);
+      if (maxInitialResultHolderCapacity != null) {
+        
queryContext.setMaxInitialResultHolderCapacity(maxInitialResultHolderCapacity);
       } else {
         
queryContext.setMaxInitialResultHolderCapacity(_maxInitialResultHolderCapacity);
       }
+      // Set initialResultTableCapacity
+      Integer minInitialIndexedTableCapacity = 
QueryOptionsUtils.getMinInitialIndexedTableCapacity(queryOptions);
+      if (minInitialIndexedTableCapacity != null) {
+        
queryContext.setMinInitialIndexedTableCapacity(minInitialIndexedTableCapacity);
+      } else {
+        
queryContext.setMinInitialIndexedTableCapacity(_minInitialIndexedTableCapacity);
+      }
       // Set numGroupsLimit
       Integer numGroupsLimit = 
QueryOptionsUtils.getNumGroupsLimit(queryOptions);
       if (numGroupsLimit != null) {
@@ -361,7 +396,8 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
         .contains(overrideExpression.getIdentifier())) {
       return overrideExpression;
     }
-    expression.getFunction().getArguments()
+    expression.getFunction()
+        .getArguments()
         .replaceAll(argument -> overrideWithExpressionHints(argument, 
indexSegment, expressionOverrideHints));
     return expression;
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/AggregationGroupByResult.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/AggregationGroupByResult.java
index e2933527af..49c2361c3c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/AggregationGroupByResult.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/AggregationGroupByResult.java
@@ -40,6 +40,10 @@ public class AggregationGroupByResult {
     _resultHolders = resultHolders;
   }
 
+  public int getNumGroups() {
+    return _groupKeyGenerator.getNumKeys();
+  }
+
   /**
    * Returns an iterator of {@link GroupKeyGenerator.GroupKey}.
    */
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java
index 9b44e0c405..05e9dae536 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java
@@ -52,6 +52,7 @@ public abstract class BaseReduceService {
   protected final int _maxReduceThreadsPerQuery;
   protected final int _groupByTrimThreshold;
   protected final int _minGroupTrimSize;
+  protected final int _minInitialIndexedTableCapacity;
 
   public BaseReduceService(PinotConfiguration config) {
     _maxReduceThreadsPerQuery = 
config.getProperty(CommonConstants.Broker.CONFIG_OF_MAX_REDUCE_THREADS_PER_QUERY,
@@ -60,6 +61,9 @@ public abstract class BaseReduceService {
         CommonConstants.Broker.DEFAULT_BROKER_GROUPBY_TRIM_THRESHOLD);
     _minGroupTrimSize = 
config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_MIN_GROUP_TRIM_SIZE,
         CommonConstants.Broker.DEFAULT_BROKER_MIN_GROUP_TRIM_SIZE);
+    _minInitialIndexedTableCapacity =
+        
config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_MIN_INITIAL_INDEXED_TABLE_CAPACITY,
+            
CommonConstants.Broker.DEFAULT_BROKER_MIN_INITIAL_INDEXED_TABLE_CAPACITY);
 
     int numThreadsInExecutorService = 
Runtime.getRuntime().availableProcessors();
     LOGGER.info("Initializing BrokerReduceService with {} threads, and {} max 
reduce threads.",
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
index b9b0f7bb51..d10e0811ed 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
@@ -142,18 +142,23 @@ public class BrokerReduceService extends 
BaseReduceService {
 
     Integer minGroupTrimSizeQueryOption = null;
     Integer groupTrimThresholdQueryOption = null;
+    Integer minInitialIndexedTableCapacityQueryOption = null;
     if (queryOptions != null) {
       minGroupTrimSizeQueryOption = 
QueryOptionsUtils.getMinBrokerGroupTrimSize(queryOptions);
       groupTrimThresholdQueryOption = 
QueryOptionsUtils.getGroupTrimThreshold(queryOptions);
+      minInitialIndexedTableCapacityQueryOption = 
QueryOptionsUtils.getMinInitialIndexedTableCapacity(queryOptions);
     }
     int minGroupTrimSize = minGroupTrimSizeQueryOption != null ? 
minGroupTrimSizeQueryOption : _minGroupTrimSize;
     int groupTrimThreshold =
         groupTrimThresholdQueryOption != null ? groupTrimThresholdQueryOption 
: _groupByTrimThreshold;
+    int minInitialIndexedTableCapacity =
+        minInitialIndexedTableCapacityQueryOption != null ? 
minInitialIndexedTableCapacityQueryOption
+            : _minInitialIndexedTableCapacity;
 
     try {
       dataTableReducer.reduceAndSetResults(rawTableName, cachedDataSchema, 
dataTableMap, brokerResponseNative,
           new DataTableReducerContext(_reduceExecutorService, 
_maxReduceThreadsPerQuery, reduceTimeOutMs,
-              groupTrimThreshold, minGroupTrimSize), brokerMetrics);
+              groupTrimThreshold, minGroupTrimSize, 
minInitialIndexedTableCapacity), brokerMetrics);
     } catch (EarlyTerminationException e) {
       brokerResponseNative.addException(
           new 
QueryProcessingException(QueryException.QUERY_CANCELLATION_ERROR_CODE, 
e.toString()));
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducerContext.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducerContext.java
index d4b69e6c21..8c645a622b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducerContext.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducerContext.java
@@ -32,6 +32,7 @@ public class DataTableReducerContext {
   // used for SQL GROUP BY
   private final int _groupByTrimThreshold;
   private final int _minGroupTrimSize;
+  private final int _minInitialIndexedTableCapacity;
 
   /**
    * Constructor for the class.
@@ -42,12 +43,13 @@ public class DataTableReducerContext {
    * @param groupByTrimThreshold trim threshold for SQL group by
    */
   public DataTableReducerContext(ExecutorService executorService, int 
maxReduceThreadsPerQuery, long reduceTimeOutMs,
-      int groupByTrimThreshold, int minGroupTrimSize) {
+      int groupByTrimThreshold, int minGroupTrimSize, int 
minInitialIndexedTableCapacity) {
     _executorService = executorService;
     _maxReduceThreadsPerQuery = maxReduceThreadsPerQuery;
     _reduceTimeOutMs = reduceTimeOutMs;
     _groupByTrimThreshold = groupByTrimThreshold;
     _minGroupTrimSize = minGroupTrimSize;
+    _minInitialIndexedTableCapacity = minInitialIndexedTableCapacity;
   }
 
   public ExecutorService getExecutorService() {
@@ -69,4 +71,8 @@ public class DataTableReducerContext {
   public int getMinGroupTrimSize() {
     return _minGroupTrimSize;
   }
+
+  public int getMinInitialIndexedTableCapacity() {
+    return _minInitialIndexedTableCapacity;
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
index 34395febfb..d8ff92f908 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
@@ -51,12 +51,8 @@ import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
 import org.apache.pinot.core.data.table.IndexedTable;
 import org.apache.pinot.core.data.table.Record;
-import org.apache.pinot.core.data.table.SimpleIndexedTable;
-import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
-import org.apache.pinot.core.operator.combine.GroupByCombineOperator;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import 
org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
 import org.apache.pinot.core.query.request.context.QueryContext;
@@ -232,53 +228,22 @@ public class GroupByDataTableReducer implements 
DataTableReducer {
       DataTableReducerContext reducerContext)
       throws TimeoutException {
     long start = System.currentTimeMillis();
-    int numDataTables = dataTablesToReduce.size();
+
+    assert !dataTablesToReduce.isEmpty();
+    ArrayList<DataTable> dataTables = new ArrayList<>(dataTablesToReduce);
+    int numDataTables = dataTables.size();
 
     // Get the number of threads to use for reducing.
-    // In case of single reduce thread, fall back to SimpleIndexedTable to 
avoid redundant locking/unlocking calls.
     int numReduceThreadsToUse = getNumReduceThreadsToUse(numDataTables, 
reducerContext.getMaxReduceThreadsPerQuery());
-    boolean hasFinalInput =
-        _queryContext.isServerReturnFinalResult() || 
_queryContext.isServerReturnFinalResultKeyUnpartitioned();
-    int limit = _queryContext.getLimit();
-    int minTrimSize = reducerContext.getMinGroupTrimSize();
-    int trimSize;
-    int trimThreshold;
-    if (minTrimSize > 0) {
-      trimSize = GroupByUtils.getTableCapacity(limit, minTrimSize);
-      trimThreshold = reducerContext.getGroupByTrimThreshold();
-      if (trimThreshold <= 0) {
-        trimThreshold = Integer.MAX_VALUE;
-      }
-    } else {
-      // Broker trim is disabled
-      trimSize = Integer.MAX_VALUE;
-      trimThreshold = Integer.MAX_VALUE;
-    }
-    // NOTE: For query with HAVING clause, use trimSize as resultSize to 
ensure the result accuracy.
-    // TODO: Resolve the HAVING clause within the IndexedTable before 
returning the result
-    int resultSize = _queryContext.getHavingFilter() != null ? trimSize : 
limit;
-    IndexedTable indexedTable;
-    if (numReduceThreadsToUse == 1) {
-      indexedTable =
-          new SimpleIndexedTable(dataSchema, hasFinalInput, _queryContext, 
resultSize, trimSize, trimThreshold);
-    } else {
-      if (trimThreshold >= GroupByCombineOperator.MAX_TRIM_THRESHOLD) {
-        // special case of trim threshold where it is set to max value.
-        // there won't be any trimming during upsert in this case.
-        // thus we can avoid the overhead of read-lock and write-lock
-        // in the upsert method.
-        indexedTable = new UnboundedConcurrentIndexedTable(dataSchema, 
hasFinalInput, _queryContext, resultSize);
-      } else {
-        indexedTable =
-            new ConcurrentIndexedTable(dataSchema, hasFinalInput, 
_queryContext, resultSize, trimSize, trimThreshold);
-      }
-    }
+
+    // Create an indexed table to perform the reduce.
+    IndexedTable indexedTable =
+        GroupByUtils.createIndexedTableForDataTableReducer(dataTables.get(0), 
_queryContext, reducerContext,
+            numReduceThreadsToUse);
 
     // Create groups of data tables that each thread can process concurrently.
     // Given that numReduceThreads is <= numDataTables, each group will have 
at least one data table.
-    ArrayList<DataTable> dataTables = new ArrayList<>(dataTablesToReduce);
     List<List<DataTable>> reduceGroups = new 
ArrayList<>(numReduceThreadsToUse);
-
     for (int i = 0; i < numReduceThreadsToUse; i++) {
       reduceGroups.add(new ArrayList<>());
     }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java
index 653498aade..8b61a97d55 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java
@@ -80,18 +80,23 @@ public class StreamingReduceService extends 
BaseReduceService {
 
     Integer minGroupTrimSizeQueryOption = null;
     Integer groupTrimThresholdQueryOption = null;
+    Integer minInitialIndexedTableCapacityQueryOption = null;
     if (queryOptions != null) {
       minGroupTrimSizeQueryOption = 
QueryOptionsUtils.getMinBrokerGroupTrimSize(queryOptions);
       groupTrimThresholdQueryOption = 
QueryOptionsUtils.getGroupTrimThreshold(queryOptions);
+      minInitialIndexedTableCapacityQueryOption = 
QueryOptionsUtils.getMinInitialIndexedTableCapacity(queryOptions);
     }
     int minGroupTrimSize = minGroupTrimSizeQueryOption != null ? 
minGroupTrimSizeQueryOption : _minGroupTrimSize;
     int groupTrimThreshold =
         groupTrimThresholdQueryOption != null ? groupTrimThresholdQueryOption 
: _groupByTrimThreshold;
+    int minInitialIndexedTableCapacity =
+        minInitialIndexedTableCapacityQueryOption != null ? 
minInitialIndexedTableCapacityQueryOption
+            : _minInitialIndexedTableCapacity;
 
     // Process server response.
     DataTableReducerContext dataTableReducerContext =
         new DataTableReducerContext(_reduceExecutorService, 
_maxReduceThreadsPerQuery, reduceTimeOutMs,
-            groupTrimThreshold, minGroupTrimSize);
+            groupTrimThreshold, minGroupTrimSize, 
minInitialIndexedTableCapacity);
     StreamingReducer streamingReducer = 
ResultReducerFactory.getStreamingReducer(queryContext);
 
     streamingReducer.init(dataTableReducerContext);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
index 0aa233b43e..e1e3c37a8d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
@@ -114,6 +114,8 @@ public class QueryContext {
   // The following properties apply to group-by queries
   // Maximum initial capacity of the group-by result holder
   private int _maxInitialResultHolderCapacity = 
InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY;
+  // Initial capacity of the indexed table
+  private int _minInitialIndexedTableCapacity = 
InstancePlanMakerImplV2.DEFAULT_MIN_INITIAL_INDEXED_TABLE_CAPACITY;
   // Limit of number of groups stored in each segment
   private int _numGroupsLimit = 
InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT;
   // Minimum number of groups to keep per segment when trimming groups for SQL 
GROUP BY
@@ -368,6 +370,14 @@ public class QueryContext {
     _maxInitialResultHolderCapacity = maxInitialResultHolderCapacity;
   }
 
+  public int getMinInitialIndexedTableCapacity() {
+    return _minInitialIndexedTableCapacity;
+  }
+
+  public void setMinInitialIndexedTableCapacity(int 
minInitialIndexedTableCapacity) {
+    _minInitialIndexedTableCapacity = minInitialIndexedTableCapacity;
+  }
+
   public int getNumGroupsLimit() {
     return _numGroupsLimit;
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java 
b/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java
index e8551dab2c..313786cecf 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java
@@ -18,11 +18,25 @@
  */
 package org.apache.pinot.core.util;
 
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
+import org.apache.pinot.core.data.table.IndexedTable;
+import org.apache.pinot.core.data.table.SimpleIndexedTable;
+import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
+import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
+import org.apache.pinot.core.query.reduce.DataTableReducerContext;
+import org.apache.pinot.core.query.request.context.QueryContext;
+
+
 public final class GroupByUtils {
   private GroupByUtils() {
   }
 
   public static final int DEFAULT_MIN_NUM_GROUPS = 5000;
+  public static final int MAX_TRIM_THRESHOLD = 1_000_000_000;
 
   /**
    * Returns the capacity of the table required by the given query.
@@ -41,4 +55,149 @@ public final class GroupByUtils {
     long capacityByLimit = limit * 5L;
     return capacityByLimit > Integer.MAX_VALUE ? Integer.MAX_VALUE : 
Math.max((int) capacityByLimit, minNumGroups);
   }
+
+  /**
+   * Returns the actual trim threshold used for the indexed table. Trim 
threshold should be at least (2 * trimSize) to
+   * avoid excessive trimming. When trim threshold is non-positive or higher 
than 10^9, trim is considered disabled,
+   * where {@code Integer.MAX_VALUE} is returned.
+   */
+  @VisibleForTesting
+  static int getIndexedTableTrimThreshold(int trimSize, int trimThreshold) {
+    if (trimThreshold <= 0 || trimThreshold > MAX_TRIM_THRESHOLD || trimSize > 
MAX_TRIM_THRESHOLD / 2) {
+      return Integer.MAX_VALUE;
+    }
+    return Math.max(trimThreshold, 2 * trimSize);
+  }
+
+  /**
+   * Returns the initial capacity of the indexed table required by the given 
query.
+   */
+  @VisibleForTesting
+  static int getIndexedTableInitialCapacity(int maxRowsToKeep, int 
minNumGroups, int minCapacity) {
+    // The upper bound of the initial capacity is the capacity required to 
hold all the required rows. The indexed table
+    // should never grow over this capacity.
+    int upperBound = HashUtil.getHashMapCapacity(maxRowsToKeep);
+    if (minCapacity > upperBound) {
+      return upperBound;
+    }
+    // The lower bound of the initial capacity is the capacity required by the 
min number of groups to be added to the
+    // table.
+    int lowerBound = HashUtil.getHashMapCapacity(minNumGroups);
+    if (lowerBound > upperBound) {
+      return upperBound;
+    }
+    return Math.max(minCapacity, lowerBound);
+  }
+
+  /**
+   * Creates an indexed table for the combine operator given a sample results 
block.
+   */
+  public static IndexedTable 
createIndexedTableForCombineOperator(GroupByResultsBlock resultsBlock,
+      QueryContext queryContext, int numThreads) {
+    DataSchema dataSchema = resultsBlock.getDataSchema();
+    int numGroups = resultsBlock.getNumGroups();
+    int limit = queryContext.getLimit();
+    boolean hasOrderBy = queryContext.getOrderByExpressions() != null;
+    boolean hasHaving = queryContext.getHavingFilter() != null;
+    int minTrimSize = queryContext.getMinServerGroupTrimSize();
+    int minInitialIndexedTableCapacity = 
queryContext.getMinInitialIndexedTableCapacity();
+
+    // Disable trim when min trim size is non-positive
+    int trimSize = minTrimSize > 0 ? getTableCapacity(limit, minTrimSize) : 
Integer.MAX_VALUE;
+
+    // When there is no ORDER BY, trim is not required because the indexed 
table stops accepting new groups once the
+    // result size is reached
+    if (!hasOrderBy) {
+      int resultSize;
+      if (hasHaving) {
+        // Keep more groups when there is HAVING clause
+        resultSize = trimSize;
+      } else {
+        // TODO: Keeping only 'LIMIT' groups can cause inaccurate result 
because the groups are randomly selected
+        //       without ordering. Consider ordering on group-by columns if no 
ordering is specified.
+        resultSize = limit;
+      }
+      int initialCapacity = getIndexedTableInitialCapacity(resultSize, 
numGroups, minInitialIndexedTableCapacity);
+      return getTrimDisabledIndexedTable(dataSchema, false, queryContext, 
resultSize, initialCapacity, numThreads);
+    }
+
+    int resultSize;
+    if (queryContext.isServerReturnFinalResult() && !hasHaving) {
+      // When server is asked to return final result and there is no HAVING 
clause, return only LIMIT groups
+      resultSize = limit;
+    } else {
+      resultSize = trimSize;
+    }
+    int trimThreshold = getIndexedTableTrimThreshold(trimSize, 
queryContext.getGroupTrimThreshold());
+    int initialCapacity = getIndexedTableInitialCapacity(trimThreshold, 
numGroups, minInitialIndexedTableCapacity);
+    if (trimThreshold == Integer.MAX_VALUE) {
+      return getTrimDisabledIndexedTable(dataSchema, false, queryContext, 
resultSize, initialCapacity, numThreads);
+    } else {
+      return getTrimEnabledIndexedTable(dataSchema, false, queryContext, 
resultSize, trimSize, trimThreshold,
+          initialCapacity, numThreads);
+    }
+  }
+
+  /**
+   * Creates an indexed table for the data table reducer given a sample data 
table.
+   */
+  public static IndexedTable createIndexedTableForDataTableReducer(DataTable 
dataTable, QueryContext queryContext,
+      DataTableReducerContext reducerContext, int numThreads) {
+    DataSchema dataSchema = dataTable.getDataSchema();
+    int numGroups = dataTable.getNumberOfRows();
+    int limit = queryContext.getLimit();
+    boolean hasOrderBy = queryContext.getOrderByExpressions() != null;
+    boolean hasHaving = queryContext.getHavingFilter() != null;
+    boolean hasFinalInput =
+        queryContext.isServerReturnFinalResult() || 
queryContext.isServerReturnFinalResultKeyUnpartitioned();
+    int minTrimSize = reducerContext.getMinGroupTrimSize();
+    int minInitialIndexedTableCapacity = 
reducerContext.getMinInitialIndexedTableCapacity();
+
+    // Disable trim when min trim size is non-positive
+    int trimSize = minTrimSize > 0 ? getTableCapacity(limit, minTrimSize) : 
Integer.MAX_VALUE;
+
+    // Keep more groups when there is HAVING clause
+    // TODO: Resolve the HAVING clause within the IndexedTable before 
returning the result
+    int resultSize = hasHaving ? trimSize : limit;
+
+    // When there is no ORDER BY, trim is not required because the indexed 
table stops accepting new groups once the
+    // result size is reached
+    if (!hasOrderBy) {
+      int initialCapacity = getIndexedTableInitialCapacity(resultSize, 
numGroups, minInitialIndexedTableCapacity);
+      return getTrimDisabledIndexedTable(dataSchema, hasFinalInput, 
queryContext, resultSize, initialCapacity,
+          numThreads);
+    }
+
+    int trimThreshold = getIndexedTableTrimThreshold(trimSize, 
reducerContext.getGroupByTrimThreshold());
+    int initialCapacity = getIndexedTableInitialCapacity(trimThreshold, 
numGroups, minInitialIndexedTableCapacity);
+    if (trimThreshold == Integer.MAX_VALUE) {
+      return getTrimDisabledIndexedTable(dataSchema, hasFinalInput, 
queryContext, resultSize, initialCapacity,
+          numThreads);
+    } else {
+      return getTrimEnabledIndexedTable(dataSchema, hasFinalInput, 
queryContext, resultSize, trimSize, trimThreshold,
+          initialCapacity, numThreads);
+    }
+  }
+
+  private static IndexedTable getTrimDisabledIndexedTable(DataSchema 
dataSchema, boolean hasFinalInput,
+      QueryContext queryContext, int resultSize, int initialCapacity, int 
numThreads) {
+    if (numThreads == 1) {
+      return new SimpleIndexedTable(dataSchema, hasFinalInput, queryContext, 
resultSize, Integer.MAX_VALUE,
+          Integer.MAX_VALUE, initialCapacity);
+    } else {
+      return new UnboundedConcurrentIndexedTable(dataSchema, hasFinalInput, 
queryContext, resultSize, initialCapacity);
+    }
+  }
+
+  private static IndexedTable getTrimEnabledIndexedTable(DataSchema 
dataSchema, boolean hasFinalInput,
+      QueryContext queryContext, int resultSize, int trimSize, int 
trimThreshold, int initialCapacity, int numThreads) {
+    assert trimThreshold != Integer.MAX_VALUE;
+    if (numThreads == 1) {
+      return new SimpleIndexedTable(dataSchema, hasFinalInput, queryContext, 
resultSize, trimSize, trimThreshold,
+          initialCapacity);
+    } else {
+      return new ConcurrentIndexedTable(dataSchema, hasFinalInput, 
queryContext, resultSize, trimSize, trimThreshold,
+          initialCapacity);
+    }
+  }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
index 852e847d5a..e77e644fc3 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
@@ -44,6 +44,7 @@ import org.apache.pinot.core.data.table.IndexedTable;
 import org.apache.pinot.core.data.table.Record;
 import org.apache.pinot.core.data.table.SimpleIndexedTable;
 import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
+import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import 
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
@@ -329,7 +330,8 @@ public class ResourceManagerAccountingTest {
         });
     List<Object[]> rows = DataBlockTestUtils.getRandomRows(dataSchema, 
NUM_ROWS, 0);
     IndexedTable indexedTable =
-        new SimpleIndexedTable(dataSchema, queryContext, NUM_ROWS, 
Integer.MAX_VALUE, Integer.MAX_VALUE);
+        new SimpleIndexedTable(dataSchema, false, queryContext, NUM_ROWS, 
Integer.MAX_VALUE, Integer.MAX_VALUE,
+            
InstancePlanMakerImplV2.DEFAULT_MIN_INITIAL_INDEXED_TABLE_CAPACITY);
     for (Object[] row : rows) {
       indexedTable.upsert(new Record(row));
     }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java
index efbcbe0225..af8d8cf2ff 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java
@@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import 
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
 import org.testng.Assert;
@@ -45,6 +46,7 @@ import org.testng.annotations.Test;
 public class IndexedTableTest {
   private static final int TRIM_SIZE = 10;
   private static final int TRIM_THRESHOLD = 20;
+  private static final int INITIAL_CAPACITY = 
InstancePlanMakerImplV2.DEFAULT_MIN_INITIAL_INDEXED_TABLE_CAPACITY;
 
   @Test
   public void testConcurrentIndexedTable()
@@ -54,7 +56,8 @@ public class IndexedTableTest {
     DataSchema dataSchema = new DataSchema(new String[]{"d1", "d2", "d3", 
"sum(m1)", "max(m2)"}, new ColumnDataType[]{
         ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.DOUBLE, 
ColumnDataType.DOUBLE, ColumnDataType.DOUBLE
     });
-    IndexedTable indexedTable = new ConcurrentIndexedTable(dataSchema, 
queryContext, 5, TRIM_SIZE, TRIM_THRESHOLD);
+    IndexedTable indexedTable =
+        new ConcurrentIndexedTable(dataSchema, false, queryContext, 5, 
TRIM_SIZE, TRIM_THRESHOLD, INITIAL_CAPACITY);
 
     // 3 threads upsert together
     // a inserted 6 times (60), b inserted 5 times (50), d inserted 2 times 
(20)
@@ -127,15 +130,19 @@ public class IndexedTableTest {
         });
 
     // Test SimpleIndexedTable
-    IndexedTable indexedTable = new SimpleIndexedTable(dataSchema, 
queryContext, 5, TRIM_SIZE, TRIM_THRESHOLD);
-    IndexedTable mergeTable = new SimpleIndexedTable(dataSchema, queryContext, 
10, TRIM_SIZE, TRIM_THRESHOLD);
+    IndexedTable indexedTable =
+        new SimpleIndexedTable(dataSchema, false, queryContext, 5, TRIM_SIZE, 
TRIM_THRESHOLD, INITIAL_CAPACITY);
+    IndexedTable mergeTable =
+        new SimpleIndexedTable(dataSchema, false, queryContext, 10, TRIM_SIZE, 
TRIM_THRESHOLD, INITIAL_CAPACITY);
     testNonConcurrent(indexedTable, mergeTable);
     indexedTable.finish(true);
     checkSurvivors(indexedTable, survivors);
 
     // Test ConcurrentIndexedTable
-    indexedTable = new ConcurrentIndexedTable(dataSchema, queryContext, 5, 
TRIM_SIZE, TRIM_THRESHOLD);
-    mergeTable = new SimpleIndexedTable(dataSchema, queryContext, 10, 
TRIM_SIZE, TRIM_THRESHOLD);
+    indexedTable =
+        new ConcurrentIndexedTable(dataSchema, false, queryContext, 5, 
TRIM_SIZE, TRIM_THRESHOLD, INITIAL_CAPACITY);
+    mergeTable =
+        new SimpleIndexedTable(dataSchema, false, queryContext, 10, TRIM_SIZE, 
TRIM_THRESHOLD, INITIAL_CAPACITY);
     testNonConcurrent(indexedTable, mergeTable);
     indexedTable.finish(true);
     checkSurvivors(indexedTable, survivors);
@@ -251,10 +258,13 @@ public class IndexedTableTest {
         ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.DOUBLE, 
ColumnDataType.DOUBLE, ColumnDataType.DOUBLE
     });
 
-    IndexedTable indexedTable = new SimpleIndexedTable(dataSchema, 
queryContext, 5, TRIM_SIZE, TRIM_THRESHOLD);
+    IndexedTable indexedTable =
+        new SimpleIndexedTable(dataSchema, false, queryContext, 5, 
Integer.MAX_VALUE, Integer.MAX_VALUE,
+            INITIAL_CAPACITY);
     testNoMoreNewRecordsInTable(indexedTable);
 
-    indexedTable = new ConcurrentIndexedTable(dataSchema, queryContext, 5, 
TRIM_SIZE, TRIM_THRESHOLD);
+    indexedTable = new ConcurrentIndexedTable(dataSchema, false, queryContext, 
5, Integer.MAX_VALUE, Integer.MAX_VALUE,
+        INITIAL_CAPACITY);
     testNoMoreNewRecordsInTable(indexedTable);
   }
 
@@ -284,59 +294,4 @@ public class IndexedTableTest {
 
     checkEvicted(indexedTable, "f", "g");
   }
-
-  @Test
-  public void testAdaptiveTrimThreshold() {
-    QueryContext queryContext = QueryContextConverterUtils.getQueryContext(
-        "SELECT SUM(m1), MAX(m2) FROM testTable GROUP BY d1, d2, d3 ORDER BY 
SUM(m1)");
-    DataSchema dataSchema = new DataSchema(new String[]{"d1", "d2", "d3", 
"sum(m1)", "max(m2)"}, new ColumnDataType[]{
-        ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.DOUBLE, 
ColumnDataType.DOUBLE, ColumnDataType.DOUBLE
-    });
-    IndexedTable indexedTable = new SimpleIndexedTable(dataSchema, 
queryContext, 5, 5, 6);
-
-    // Insert 7 records. Ensure that no trimming has been done since the trim 
threshold should adapt to be at least
-    // twice the trim size to avoid excessive trimming
-    indexedTable.upsert(getRecord(new Object[]{"a", 1, 10d, 10d, 100d}));
-    indexedTable.upsert(getRecord(new Object[]{"b", 2, 20d, 10d, 200d}));
-    indexedTable.upsert(getRecord(new Object[]{"a", 1, 10d, 10d, 100d}));
-    indexedTable.upsert(getRecord(new Object[]{"a", 1, 10d, 10d, 100d}));
-    Assert.assertEquals(indexedTable.size(), 2);
-
-    indexedTable.upsert(getRecord(new Object[]{"c", 3, 30d, 10d, 300d}));
-    indexedTable.upsert(getRecord(new Object[]{"d", 4, 40d, 10d, 400d}));
-    indexedTable.upsert(getRecord(new Object[]{"e", 5, 50d, 10d, 500d}));
-    Assert.assertEquals(indexedTable.size(), 5);
-
-    indexedTable.upsert(getRecord(new Object[]{"c", 3, 30d, 10d, 300d}));
-    indexedTable.upsert(getRecord(new Object[]{"d", 4, 40d, 10d, 400d}));
-    indexedTable.upsert(getRecord(new Object[]{"e", 5, 50d, 10d, 500d}));
-    Assert.assertEquals(indexedTable.size(), 5);
-
-    // No resizing / trimming should be done yet
-    indexedTable.upsert(getRecord(new Object[]{"f", 6, 60d, 10d, 600d}));
-    indexedTable.upsert(getRecord(new Object[]{"g", 7, 70d, 10d, 700d}));
-    Assert.assertEquals(indexedTable.size(), 7);
-
-    // Insert 3 more records - this should reach the trim threshold and 
trigger trimming
-    indexedTable.upsert(getRecord(new Object[]{"h", 8, 80d, 10d, 800d}));
-    indexedTable.upsert(getRecord(new Object[]{"i", 9, 90d, 10d, 900d}));
-    indexedTable.upsert(getRecord(new Object[]{"j", 10, 100d, 20d, 1000d}));
-    Assert.assertEquals(indexedTable.size(), 5);
-
-    indexedTable.finish(false);
-    // The 5 keys with the largest aggregated values for SUM(m1) should be 
evicted
-    checkEvicted(indexedTable, "a", "c", "d", "e", "j");
-  }
-
-  @Test
-  public void testAdaptiveTrimThresholdMaxValue() {
-    QueryContext queryContext = QueryContextConverterUtils.getQueryContext(
-        "SELECT SUM(m1), MAX(m2) FROM testTable GROUP BY d1, d2, d3 ORDER BY 
SUM(m1)");
-    DataSchema dataSchema = new DataSchema(new String[]{"d1", "d2", "d3", 
"sum(m1)", "max(m2)"}, new ColumnDataType[]{
-        ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.DOUBLE, 
ColumnDataType.DOUBLE, ColumnDataType.DOUBLE
-    });
-    IndexedTable indexedTable = new SimpleIndexedTable(dataSchema, 
queryContext, 1234567890, 1234567890, 1234567890);
-    // If 2 * trimSize exceeds the max integer value, the trim threshold 
should be bounded to the max integer value
-    Assert.assertEquals(indexedTable._trimThreshold, Integer.MAX_VALUE);
-  }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/util/GroupByUtilsTest.java 
b/pinot-core/src/test/java/org/apache/pinot/core/util/GroupByUtilsTest.java
index 4370c43744..99d5cdf82f 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/util/GroupByUtilsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/util/GroupByUtilsTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.util;
 
+import org.apache.pinot.common.utils.HashUtil;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
@@ -37,4 +38,37 @@ public class GroupByUtilsTest {
     assertEquals(GroupByUtils.getTableCapacity(100000000), 500000000);
     assertEquals(GroupByUtils.getTableCapacity(1000000000), Integer.MAX_VALUE);
   }
+
+  @Test
+  public void getIndexedTableTrimThreshold() {
+    assertEquals(GroupByUtils.getIndexedTableTrimThreshold(5000, -1), 
Integer.MAX_VALUE);
+    assertEquals(GroupByUtils.getIndexedTableTrimThreshold(5000, 0), 
Integer.MAX_VALUE);
+    assertEquals(GroupByUtils.getIndexedTableTrimThreshold(5000, 10), 10000);
+    assertEquals(GroupByUtils.getIndexedTableTrimThreshold(5000, 100), 10000);
+    assertEquals(GroupByUtils.getIndexedTableTrimThreshold(5000, 1000), 10000);
+    assertEquals(GroupByUtils.getIndexedTableTrimThreshold(5000, 10000), 
10000);
+    assertEquals(GroupByUtils.getIndexedTableTrimThreshold(5000, 100000), 
100000);
+    assertEquals(GroupByUtils.getIndexedTableTrimThreshold(5000, 1000000), 
1000000);
+    assertEquals(GroupByUtils.getIndexedTableTrimThreshold(5000, 10000000), 
10000000);
+    assertEquals(GroupByUtils.getIndexedTableTrimThreshold(5000, 100000000), 
100000000);
+    assertEquals(GroupByUtils.getIndexedTableTrimThreshold(5000, 1000000000), 
1000000000);
+    assertEquals(GroupByUtils.getIndexedTableTrimThreshold(5000, 1000000001), 
Integer.MAX_VALUE);
+    assertEquals(GroupByUtils.getIndexedTableTrimThreshold(Integer.MAX_VALUE, 
10), Integer.MAX_VALUE);
+    assertEquals(GroupByUtils.getIndexedTableTrimThreshold(500000000, 10), 
1000000000);
+    assertEquals(GroupByUtils.getIndexedTableTrimThreshold(500000001, 10), 
Integer.MAX_VALUE);
+  }
+
+  @Test
+  public void testGetIndexedTableInitialCapacity() {
+    
assertEquals(GroupByUtils.getIndexedTableInitialCapacity(Integer.MAX_VALUE, 10, 
128), 128);
+    
assertEquals(GroupByUtils.getIndexedTableInitialCapacity(Integer.MAX_VALUE, 
100, 128),
+        HashUtil.getHashMapCapacity(100));
+    
assertEquals(GroupByUtils.getIndexedTableInitialCapacity(Integer.MAX_VALUE, 
100, 256), 256);
+    
assertEquals(GroupByUtils.getIndexedTableInitialCapacity(Integer.MAX_VALUE, 
1000, 256),
+        HashUtil.getHashMapCapacity(1000));
+    assertEquals(GroupByUtils.getIndexedTableInitialCapacity(100, 10, 128), 
128);
+    assertEquals(GroupByUtils.getIndexedTableInitialCapacity(100, 10, 256), 
HashUtil.getHashMapCapacity(100));
+    assertEquals(GroupByUtils.getIndexedTableInitialCapacity(100, 100, 256), 
HashUtil.getHashMapCapacity(100));
+    assertEquals(GroupByUtils.getIndexedTableInitialCapacity(100, 1000, 256), 
HashUtil.getHashMapCapacity(100));
+  }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/ExprMinMaxTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/ExprMinMaxTest.java
index 20d9fa80ab..1a22234ee3 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/ExprMinMaxTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/ExprMinMaxTest.java
@@ -524,7 +524,8 @@ public class ExprMinMaxTest extends BaseQueriesTest {
     String query =
         "SELECT stringColumn, 
expr_min(VALUE_IN(mvIntColumn,16,17,18,19,20,21,22,23,24,25,26,27), intColumn), 
"
             + 
"expr_max(VALUE_IN(mvIntColumn,16,17,18,19,20,21,22,23,24,25,26,27), intColumn) 
"
-            + "FROM testTable WHERE mvIntColumn in 
(16,17,18,19,20,21,22,23,24,25,26,27) GROUP BY stringColumn";
+            + "FROM testTable WHERE mvIntColumn in 
(16,17,18,19,20,21,22,23,24,25,26,27) "
+            + "GROUP BY stringColumn ORDER BY stringColumn";
 
     BrokerResponse brokerResponse = getBrokerResponse(query);
     ResultTable resultTable = brokerResponse.getResultTable();
@@ -540,7 +541,7 @@ public class ExprMinMaxTest extends BaseQueriesTest {
     query =
         "SELECT stringColumn, 
expr_min(VALUE_IN(mvIntColumn,16,17,18,19,20,21,22,23,24,25,26,27), intColumn), 
"
             + 
"expr_max(VALUE_IN(mvIntColumn,16,17,18,19,20,21,22,23,24,25,26,27), intColumn) 
"
-            + "FROM testTable GROUP BY stringColumn";
+            + "FROM testTable GROUP BY stringColumn ORDER BY stringColumn";
 
     brokerResponse = getBrokerResponse(query);
     resultTable = brokerResponse.getResultTable();
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
index f974aa8542..f5d0be1a06 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
@@ -679,10 +679,10 @@ public class InterSegmentAggregationMultiValueQueriesTest 
extends BaseMultiValue
     BrokerResponseNative brokerResponse = getBrokerResponse(query);
     assertFalse(brokerResponse.isNumGroupsLimitReached());
 
-    brokerResponse = getBrokerResponse(query,
-        new InstancePlanMakerImplV2(1000, 1000, 
InstancePlanMakerImplV2.DEFAULT_MIN_SEGMENT_GROUP_TRIM_SIZE,
-            InstancePlanMakerImplV2.DEFAULT_MIN_SERVER_GROUP_TRIM_SIZE,
-            InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD));
+    InstancePlanMakerImplV2 planMaker = new InstancePlanMakerImplV2();
+    planMaker.setMaxInitialResultHolderCapacity(1000);
+    planMaker.setNumGroupsLimit(1000);
+    brokerResponse = getBrokerResponse(query, planMaker);
     assertTrue(brokerResponse.isNumGroupsLimitReached());
   }
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java
index 591b5ffffa..62cbb0a3ef 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java
@@ -661,10 +661,10 @@ public class 
InterSegmentAggregationMultiValueRawQueriesTest extends BaseMultiVa
     BrokerResponseNative brokerResponse = getBrokerResponse(query);
     assertFalse(brokerResponse.isNumGroupsLimitReached());
 
-    brokerResponse = getBrokerResponse(query,
-        new InstancePlanMakerImplV2(1000, 1000, 
InstancePlanMakerImplV2.DEFAULT_MIN_SEGMENT_GROUP_TRIM_SIZE,
-            InstancePlanMakerImplV2.DEFAULT_MIN_SERVER_GROUP_TRIM_SIZE,
-            InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD));
+    InstancePlanMakerImplV2 planMaker = new InstancePlanMakerImplV2();
+    planMaker.setMaxInitialResultHolderCapacity(1000);
+    planMaker.setNumGroupsLimit(1000);
+    brokerResponse = getBrokerResponse(query, planMaker);
     assertTrue(brokerResponse.isNumGroupsLimitReached());
   }
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java
index a2c74071c9..6fcb909374 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java
@@ -767,10 +767,10 @@ public class 
InterSegmentAggregationSingleValueQueriesTest extends BaseSingleVal
     BrokerResponseNative brokerResponse = getBrokerResponse(query);
     assertFalse(brokerResponse.isNumGroupsLimitReached());
 
-    brokerResponse = getBrokerResponse(query,
-        new InstancePlanMakerImplV2(1000, 1000, 
InstancePlanMakerImplV2.DEFAULT_MIN_SEGMENT_GROUP_TRIM_SIZE,
-            InstancePlanMakerImplV2.DEFAULT_MIN_SERVER_GROUP_TRIM_SIZE,
-            InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD));
+    InstancePlanMakerImplV2 planMaker = new InstancePlanMakerImplV2();
+    planMaker.setMaxInitialResultHolderCapacity(1000);
+    planMaker.setNumGroupsLimit(1000);
+    brokerResponse = getBrokerResponse(query, planMaker);
     assertTrue(brokerResponse.isNumGroupsLimitReached());
   }
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentGroupByMultiValueQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentGroupByMultiValueQueriesTest.java
index 6b876de3ae..070ff531cb 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentGroupByMultiValueQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentGroupByMultiValueQueriesTest.java
@@ -33,11 +33,11 @@ import org.testng.annotations.Test;
  * Tests order by queries
  */
 public class InterSegmentGroupByMultiValueQueriesTest extends 
BaseMultiValueQueriesTest {
-  private static final InstancePlanMakerImplV2 TRIM_ENABLED_PLAN_MAKER =
-      new 
InstancePlanMakerImplV2(InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY,
-          InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, 1,
-          InstancePlanMakerImplV2.DEFAULT_MIN_SERVER_GROUP_TRIM_SIZE,
-          InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD);
+  private static final InstancePlanMakerImplV2 TRIM_ENABLED_PLAN_MAKER = new 
InstancePlanMakerImplV2();
+
+  static {
+    TRIM_ENABLED_PLAN_MAKER.setMinSegmentGroupTrimSize(1);
+  }
 
   @Test(dataProvider = "groupByOrderByDataProvider")
   public void testGroupByOrderBy(String query, long 
expectedNumEntriesScannedPostFilter,
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentGroupByMultiValueRawQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentGroupByMultiValueRawQueriesTest.java
index ec9e0d951f..a75bce325d 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentGroupByMultiValueRawQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentGroupByMultiValueRawQueriesTest.java
@@ -32,11 +32,11 @@ import org.testng.annotations.Test;
  * Tests order by queries with MV RAW index
  */
 public class InterSegmentGroupByMultiValueRawQueriesTest extends 
BaseMultiValueRawQueriesTest {
-  private static final InstancePlanMakerImplV2 TRIM_ENABLED_PLAN_MAKER =
-      new 
InstancePlanMakerImplV2(InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY,
-          InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, 1,
-          InstancePlanMakerImplV2.DEFAULT_MIN_SERVER_GROUP_TRIM_SIZE,
-          InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD);
+  private static final InstancePlanMakerImplV2 TRIM_ENABLED_PLAN_MAKER = new 
InstancePlanMakerImplV2();
+
+  static {
+    TRIM_ENABLED_PLAN_MAKER.setMinSegmentGroupTrimSize(1);
+  }
 
   @Test(dataProvider = "groupByOrderByDataProvider")
   public void testGroupByOrderBy(String query, long 
expectedNumEntriesScannedPostFilter,
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentGroupBySingleValueQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentGroupBySingleValueQueriesTest.java
index 846fbdbb8c..8cda3e1609 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentGroupBySingleValueQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentGroupBySingleValueQueriesTest.java
@@ -34,11 +34,11 @@ import org.testng.annotations.Test;
  * Tests order by queries
  */
 public class InterSegmentGroupBySingleValueQueriesTest extends 
BaseSingleValueQueriesTest {
-  private static final InstancePlanMakerImplV2 TRIM_ENABLED_PLAN_MAKER =
-      new 
InstancePlanMakerImplV2(InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY,
-          InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, 1,
-          InstancePlanMakerImplV2.DEFAULT_MIN_SERVER_GROUP_TRIM_SIZE,
-          InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD);
+  private static final InstancePlanMakerImplV2 TRIM_ENABLED_PLAN_MAKER = new 
InstancePlanMakerImplV2();
+
+  static {
+    TRIM_ENABLED_PLAN_MAKER.setMinSegmentGroupTrimSize(1);
+  }
 
   @Test(dataProvider = "groupByOrderByDataProvider")
   public void testGroupByOrderBy(String query, long 
expectedNumEntriesScannedPostFilter,
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java
index 6408fd8f31..66d1437460 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java
@@ -60,7 +60,7 @@ import static org.testng.Assert.*;
 public class OfflineGRPCServerIntegrationTest extends 
BaseClusterIntegrationTest {
   private static final ExecutorService EXECUTOR_SERVICE = 
Executors.newFixedThreadPool(2);
   private static final DataTableReducerContext DATATABLE_REDUCER_CONTEXT =
-      new DataTableReducerContext(EXECUTOR_SERVICE, 2, 10000, 10000, 5000);
+      new DataTableReducerContext(EXECUTOR_SERVICE, 2, 10000, 10000, 5000, 
128);
 
   @BeforeClass
   public void setUp()
diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkCombineGroupBy.java 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkCombineGroupBy.java
index 79a0d9e6ff..579bd5b227 100644
--- 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkCombineGroupBy.java
+++ 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkCombineGroupBy.java
@@ -118,8 +118,10 @@ public class BenchmarkCombineGroupBy {
     int trimSize = GroupByUtils.getTableCapacity(_queryContext.getLimit());
 
     // make 1 concurrent table
-    IndexedTable concurrentIndexedTable = new 
ConcurrentIndexedTable(_dataSchema, _queryContext, trimSize, trimSize,
-        InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD);
+    IndexedTable concurrentIndexedTable =
+        new ConcurrentIndexedTable(_dataSchema, false, _queryContext, 
trimSize, trimSize,
+            InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD,
+            
InstancePlanMakerImplV2.DEFAULT_MIN_INITIAL_INDEXED_TABLE_CAPACITY);
 
     List<Callable<Void>> innerSegmentCallables = new ArrayList<>(NUM_SEGMENTS);
 
diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java
index 8ba8d2756e..6c9667533b 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java
@@ -118,7 +118,8 @@ public class BenchmarkIndexedTable {
 
     // make 1 concurrent table
     IndexedTable concurrentIndexedTable =
-        new ConcurrentIndexedTable(_dataSchema, _queryContext, TRIM_SIZE, 
TRIM_SIZE, TRIM_THRESHOLD);
+        new ConcurrentIndexedTable(_dataSchema, false, _queryContext, 
TRIM_SIZE, TRIM_SIZE, TRIM_THRESHOLD,
+            TRIM_THRESHOLD);
 
     // 10 parallel threads putting 10k records into the table
 
@@ -167,7 +168,8 @@ public class BenchmarkIndexedTable {
 
       // make 10 indexed tables
       IndexedTable simpleIndexedTable =
-          new SimpleIndexedTable(_dataSchema, _queryContext, TRIM_SIZE, 
TRIM_SIZE, TRIM_THRESHOLD);
+          new SimpleIndexedTable(_dataSchema, false, _queryContext, TRIM_SIZE, 
TRIM_SIZE, TRIM_THRESHOLD,
+              TRIM_THRESHOLD);
       simpleIndexedTables.add(simpleIndexedTable);
 
       // put 10k records in each indexed table, in parallel
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index d62887b7be..0ca99b06cc 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -109,6 +109,8 @@ public class QueryRunner {
   private Integer _numGroupsLimit;
   @Nullable
   private Integer _maxInitialResultHolderCapacity;
+  @Nullable
+  private Integer _minInitialIndexedTableCapacity;
 
   // Join overflow settings
   @Nullable
@@ -142,6 +144,10 @@ public class QueryRunner {
         
config.getProperty(CommonConstants.Server.CONFIG_OF_QUERY_EXECUTOR_MAX_INITIAL_RESULT_HOLDER_CAPACITY);
     _maxInitialResultHolderCapacity =
         maxInitialGroupHolderCapacity != null ? 
Integer.parseInt(maxInitialGroupHolderCapacity) : null;
+    String minInitialIndexedTableCapacityStr =
+        
config.getProperty(CommonConstants.Server.CONFIG_OF_QUERY_EXECUTOR_MIN_INITIAL_INDEXED_TABLE_CAPACITY);
+    _minInitialIndexedTableCapacity =
+        minInitialIndexedTableCapacityStr != null ? 
Integer.parseInt(minInitialIndexedTableCapacityStr) : null;
     String maxRowsInJoinStr = 
config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_ROWS_IN_JOIN);
     _maxRowsInJoin = maxRowsInJoinStr != null ? 
Integer.parseInt(maxRowsInJoinStr) : null;
     String joinOverflowModeStr = 
config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_JOIN_OVERFLOW_MODE);
@@ -319,6 +325,15 @@ public class QueryRunner {
           Integer.toString(maxInitialResultHolderCapacity));
     }
 
+    Integer minInitialIndexedTableCapacity = 
QueryOptionsUtils.getMinInitialIndexedTableCapacity(opChainMetadata);
+    if (minInitialIndexedTableCapacity == null) {
+      minInitialIndexedTableCapacity = _minInitialIndexedTableCapacity;
+    }
+    if (minInitialIndexedTableCapacity != null) {
+      opChainMetadata.put(QueryOptionKey.MIN_INITIAL_INDEXED_TABLE_CAPACITY,
+          Integer.toString(minInitialIndexedTableCapacity));
+    }
+
     Integer maxRowsInJoin = 
QueryOptionsUtils.getMaxRowsInJoin(opChainMetadata);
     if (maxRowsInJoin == null) {
       maxRowsInJoin = _maxRowsInJoin;
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 06c7184f4e..1a6985084b 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -51,7 +51,7 @@ public class CommonConstants {
   public static final String DEFAULT_METRICS_FACTORY_CLASS_NAME =
       //"org.apache.pinot.plugin.metrics.compound.CompoundPinotMetricsFactory";
       "org.apache.pinot.plugin.metrics.yammer.YammerMetricsFactory";
-      //"org.apache.pinot.plugin.metrics.dropwizard.DropwizardMetricsFactory";
+  //"org.apache.pinot.plugin.metrics.dropwizard.DropwizardMetricsFactory";
   public static final String DEFAULT_BROKER_EVENT_LISTENER_CLASS_NAME =
       "org.apache.pinot.spi.eventlistener.query.NoOpBrokerQueryEventListener";
 
@@ -311,6 +311,9 @@ public class CommonConstants {
     public static final int DEFAULT_BROKER_GROUPBY_TRIM_THRESHOLD = 1_000_000;
     public static final String CONFIG_OF_BROKER_MIN_GROUP_TRIM_SIZE = 
"pinot.broker.min.group.trim.size";
     public static final int DEFAULT_BROKER_MIN_GROUP_TRIM_SIZE = 5000;
+    public static final String 
CONFIG_OF_BROKER_MIN_INITIAL_INDEXED_TABLE_CAPACITY =
+        "pinot.broker.min.init.indexed.table.capacity";
+    public static final int DEFAULT_BROKER_MIN_INITIAL_INDEXED_TABLE_CAPACITY 
= 128;
 
     // Configure the request handler type used by broker to handler inbound 
query request.
     // NOTE: the request handler type refers to the communication between 
Broker and Server.
@@ -439,6 +442,7 @@ public class CommonConstants {
         public static final String MULTI_STAGE_LEAF_LIMIT = 
"multiStageLeafLimit";
         public static final String NUM_GROUPS_LIMIT = "numGroupsLimit";
         public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY = 
"maxInitialResultHolderCapacity";
+        public static final String MIN_INITIAL_INDEXED_TABLE_CAPACITY = 
"minInitialIndexedTableCapacity";
         public static final String GROUP_TRIM_THRESHOLD = "groupTrimThreshold";
         public static final String STAGE_PARALLELISM = "stageParallelism";
 
@@ -685,6 +689,8 @@ public class CommonConstants {
         "pinot.server.query.executor.num.groups.limit";
     public static final String 
CONFIG_OF_QUERY_EXECUTOR_MAX_INITIAL_RESULT_HOLDER_CAPACITY =
         "pinot.server.query.executor.max.init.group.holder.capacity";
+    public static final String 
CONFIG_OF_QUERY_EXECUTOR_MIN_INITIAL_INDEXED_TABLE_CAPACITY =
+        "pinot.server.query.executor.min.init.indexed.table.capacity";
 
     public static final String CONFIG_OF_QUERY_EXECUTOR_OPCHAIN_EXECUTOR =
         "pinot.server.query.executor.multistage.executor";


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to