Copilot commented on code in PR #14698: URL: https://github.com/apache/pinot/pull/14698#discussion_r3000169045
########## pinot-core/src/main/java/org/apache/pinot/core/operator/combine/NonblockingGroupByCombineOperator.java: ########## @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.combine; + +import java.util.List; +import java.util.concurrent.ExecutorService; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.data.table.IndexedTable; +import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator; +import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Combine operator for group-by queries. + * TODO: Use CombineOperatorUtils.getNumThreadsForQuery() to get the parallelism of the query instead of using + * all threads Review Comment: The class-level TODO/comment says this operator is "using all threads", but the actual parallelism is already bounded via `maxExecutionThreads`/`_numTasks` (inherited from `GroupByCombineOperator`). Please update/remove this TODO to avoid misleading future readers, or make it specific about what is still missing. ```suggestion * Parallelism is bounded by the configured max execution threads via {@link GroupByCombineOperator}. ``` ########## pinot-core/src/main/java/org/apache/pinot/core/operator/combine/PartitionedGroupByCombineOperator.java: ########## @@ -0,0 +1,315 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.combine; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.data.table.IndexedTable; +import org.apache.pinot.core.data.table.IntermediateRecord; +import org.apache.pinot.core.data.table.Key; +import org.apache.pinot.core.data.table.Record; +import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator; +import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock; +import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock; +import org.apache.pinot.core.operator.blocks.results.ResultsBlockUtils; +import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult; +import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.util.trace.TraceCallable; +import org.apache.pinot.spi.query.QueryThreadContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Combine operator for group-by queries. + * TODO: Use CombineOperatorUtils.getNumThreadsForQuery() to get the parallelism of the query instead of using + * all threads Review Comment: The class-level TODO/comment says this operator is "using all threads", but the actual parallelism is already bounded via `maxExecutionThreads`/`_numTasks` (inherited from `GroupByCombineOperator`). Please update/remove this TODO to avoid misleading future readers, or make it specific about what is still missing (and ideally link a tracking issue). ```suggestion * Combine operator for group-by queries using a partitioned {@link IndexedTable}. * <p> * Parallelism for this operator is bounded by the configuration in {@link GroupByCombineOperator}, * which derives the number of tasks from the query context and execution thread limits. ``` ########## pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java: ########## @@ -263,9 +263,30 @@ public int size() { @Override public Iterator<Record> iterator() { + if (_topRecords == null) { + return _lookupMap.values().iterator(); + } return _topRecords.iterator(); } + + public void mergePartitionTable(Table table) { + _topRecords = null; + if (table instanceof IndexedTable) { + _lookupMap.putAll(((IndexedTable) table)._lookupMap); Review Comment: `mergePartitionTable()` uses `putAll()` when merging another `IndexedTable`, which bypasses the normal aggregation merge semantics for duplicate keys (it will overwrite). This is only correct if the caller can guarantee the merged tables are from disjoint partitions (no overlapping keys). Please document/enforce that precondition (e.g., Javadoc + narrower signature/visibility) to prevent accidental misuse that would silently produce wrong group-by results. ```suggestion /** * Merges all records from the given {@link Table} into this table. * <p> * When {@code table} is an {@link IndexedTable}, this method assumes that the two tables have disjoint key spaces, * i.e. there are no group keys present in both tables. This method does not perform aggregation for duplicate keys; * instead, it will throw an {@link IllegalStateException} if an overlapping key is detected when merging another * {@code IndexedTable}. Callers must ensure this precondition holds (for example, by only merging tables from * disjoint partitions). */ public void mergePartitionTable(Table table) { _topRecords = null; if (table instanceof IndexedTable) { IndexedTable indexedTable = (IndexedTable) table; for (Map.Entry<Key, Record> entry : indexedTable._lookupMap.entrySet()) { Key key = entry.getKey(); // Enforce the precondition that the key space of the two tables must be disjoint. Preconditions.checkState(!_lookupMap.containsKey(key), "Found overlapping key while merging partition tables; aggregation semantics are not supported here"); _lookupMap.put(key, entry.getValue()); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
