xiangfu0 commented on code in PR #14698:
URL: https://github.com/apache/pinot/pull/14698#discussion_r3000126483
##########
pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java:
##########
@@ -257,6 +262,23 @@ private void applyQueryOptions(QueryContext queryContext) {
} else {
queryContext.setMinInitialIndexedTableCapacity(_minInitialIndexedTableCapacity);
}
+ String groupByAlgorithm =
+
QueryOptionsUtils.normalizeGroupByAlgorithm(QueryOptionsUtils.getGroupByAlgorithm(queryOptions));
+ if (StringUtils.isNotEmpty(groupByAlgorithm)) {
+ queryContext.setGroupByAlgorithm(groupByAlgorithm);
+ } else {
+ groupByAlgorithm = DEFAULT_GROUP_BY_ALGORITHM;
+ queryContext.setGroupByAlgorithm(DEFAULT_GROUP_BY_ALGORITHM);
+ }
+ Integer numGroupByPartitions =
QueryOptionsUtils.getNumGroupByPartitions(queryOptions);
+ if (numGroupByPartitions != null) {
+ queryContext.setNumGroupByPartitions(numGroupByPartitions);
+ } else if
(PartitionedGroupByCombineOperator.ALGORITHM.equals(groupByAlgorithm)) {
+
queryContext.setNumGroupByPartitions(Math.max(DEFAULT_NUM_GROUP_BY_PARTITIONS,
+ queryContext.getMaxExecutionThreads()));
+ } else {
Review Comment:
Fixed in 3dbe195191. The partition default now derives from effective query
parallelism: when maxExecutionThreads is auto/non-positive, it falls back to
QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY before choosing the
partition count. I also added a regression in CombinePlanNodeTest to cover the
auto-thread case.
##########
pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java:
##########
@@ -263,9 +263,29 @@ public int size() {
@Override
public Iterator<Record> iterator() {
+ if (_topRecords == null) {
+ return _lookupMap.values().iterator();
+ }
return _topRecords.iterator();
}
+
+ public void mergePartitionTable(Table table) {
+ if (table instanceof IndexedTable) {
+ _lookupMap.putAll(((IndexedTable) table)._lookupMap);
+ } else {
+ Iterator<Record> iterator = table.iterator();
+ while (iterator.hasNext()) {
+ // NOTE: For performance concern, does not check the return value of
the upsert(). Override this method if
+ // upsert() can return false.
+ upsert(iterator.next());
+ }
+ }
+ if (_lookupMap.size() >= _trimThreshold) {
+ resize();
+ }
+ }
Review Comment:
Fixed in 3dbe195191. mergePartitionTable() now invalidates the cached
_topRecords view before mutating the backing lookup map, so post-finish merges
cannot serve stale ORDER BY results. IndexedTableTest now covers both the
invalidation path and a re-finish that picks the new top record.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]