Copilot commented on code in PR #14698:
URL: https://github.com/apache/pinot/pull/14698#discussion_r3000169045


##########
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/NonblockingGroupByCombineOperator.java:
##########
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.combine;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.data.table.IndexedTable;
+import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator;
+import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Combine operator for group-by queries.
+ * TODO: Use CombineOperatorUtils.getNumThreadsForQuery() to get the 
parallelism of the query instead of using
+ *       all threads

Review Comment:
   The class-level TODO/comment says this operator is "using all threads", but 
the actual parallelism is already bounded via `maxExecutionThreads`/`_numTasks` 
(inherited from `GroupByCombineOperator`). Please update/remove this TODO to 
avoid misleading future readers, or make it specific about what is still 
missing.
   ```suggestion
    * Parallelism is bounded by the configured max execution threads via {@link 
GroupByCombineOperator}.
   ```



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/PartitionedGroupByCombineOperator.java:
##########
@@ -0,0 +1,315 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.combine;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.data.table.IndexedTable;
+import org.apache.pinot.core.data.table.IntermediateRecord;
+import org.apache.pinot.core.data.table.Key;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator;
+import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
+import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
+import org.apache.pinot.core.operator.blocks.results.ResultsBlockUtils;
+import 
org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
+import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.util.trace.TraceCallable;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Combine operator for group-by queries.
+ * TODO: Use CombineOperatorUtils.getNumThreadsForQuery() to get the 
parallelism of the query instead of using
+ *       all threads

Review Comment:
   The class-level TODO/comment says this operator is "using all threads", but 
the actual parallelism is already bounded via `maxExecutionThreads`/`_numTasks` 
(inherited from `GroupByCombineOperator`). Please update/remove this TODO to 
avoid misleading future readers, or make it specific about what is still 
missing (and ideally link a tracking issue).
   ```suggestion
    * Combine operator for group-by queries using a partitioned {@link 
IndexedTable}.
    * <p>
    * Parallelism for this operator is bounded by the configuration in {@link 
GroupByCombineOperator},
    * which derives the number of tasks from the query context and execution 
thread limits.
   ```



##########
pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java:
##########
@@ -263,9 +263,30 @@ public int size() {
 
   @Override
   public Iterator<Record> iterator() {
+    if (_topRecords == null) {
+      return _lookupMap.values().iterator();
+    }
     return _topRecords.iterator();
   }
 
+
+  public void mergePartitionTable(Table table) {
+    _topRecords = null;
+    if (table instanceof IndexedTable) {
+      _lookupMap.putAll(((IndexedTable) table)._lookupMap);

Review Comment:
   `mergePartitionTable()` uses `putAll()` when merging another `IndexedTable`, 
which bypasses the normal aggregation merge semantics for duplicate keys (it 
will overwrite). This is only correct if the caller can guarantee the merged 
tables are from disjoint partitions (no overlapping keys). Please 
document/enforce that precondition (e.g., Javadoc + narrower 
signature/visibility) to prevent accidental misuse that would silently produce 
wrong group-by results.
   ```suggestion
     /**
      * Merges all records from the given {@link Table} into this table.
      * <p>
      * When {@code table} is an {@link IndexedTable}, this method assumes that 
the two tables have disjoint key spaces,
      * i.e. there are no group keys present in both tables. This method does 
not perform aggregation for duplicate keys;
      * instead, it will throw an {@link IllegalStateException} if an 
overlapping key is detected when merging another
      * {@code IndexedTable}. Callers must ensure this precondition holds (for 
example, by only merging tables from
      * disjoint partitions).
      */
     public void mergePartitionTable(Table table) {
       _topRecords = null;
       if (table instanceof IndexedTable) {
         IndexedTable indexedTable = (IndexedTable) table;
         for (Map.Entry<Key, Record> entry : 
indexedTable._lookupMap.entrySet()) {
           Key key = entry.getKey();
           // Enforce the precondition that the key space of the two tables 
must be disjoint.
           Preconditions.checkState(!_lookupMap.containsKey(key),
               "Found overlapping key while merging partition tables; 
aggregation semantics are not supported here");
           _lookupMap.put(key, entry.getValue());
         }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to