Jackie-Jiang commented on code in PR #14273:
URL: https://github.com/apache/pinot/pull/14273#discussion_r1814225259


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/WindowNode.java:
##########
@@ -30,6 +30,7 @@ public class WindowNode extends BasePlanNode {
   private final List<RelFieldCollation> _collations;
   private final List<RexExpression.FunctionCall> _aggCalls;
   private final WindowFrameType _windowFrameType;
+  // Both these bounds are relative to current row; 0 means current row, -1 
means previous row, 1 means next row, etc.

Review Comment:
   Also document `Integer.MIN_VALUE` as preceding unbounded and 
`Integer.MAX_VALUE` as following unbounded



##########
pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotWindowExchangeNodeInsertRule.java:
##########
@@ -144,60 +147,88 @@ public void onMatch(RelOptRuleCall call) {
         List.of(windowGroup)));
   }
 
+  /**
+   * Replaces the reference to literal arguments in the window group with the 
actual literal values.
+   * NOTE: {@link Window} has a field called "constants" which contains the 
literal values. If the input reference is
+   * beyond the window input size, it is a reference to the constants.
+   */
   private Window.Group updateLiteralArgumentsInWindowGroup(Window window) {
     Window.Group oldWindowGroup = window.groups.get(0);
-    int windowInputSize = window.getInput().getRowType().getFieldCount();
-    ImmutableList<Window.RexWinAggCall> oldAggCalls = oldWindowGroup.aggCalls;
-    List<Window.RexWinAggCall> newAggCallWindow = new 
ArrayList<>(oldAggCalls.size());
-    boolean aggCallChanged = false;
-    for (Window.RexWinAggCall oldAggCall : oldAggCalls) {
+    RelNode input = ((HepRelVertex) window.getInput()).getCurrentRel();
+    int numInputFields = input.getRowType().getFieldCount();
+    List<RexNode> projects = input instanceof Project ? ((Project) 
input).getProjects() : null;
+
+    List<Window.RexWinAggCall> newAggCallWindow = new 
ArrayList<>(oldWindowGroup.aggCalls.size());
+    boolean windowChanged = false;
+    for (Window.RexWinAggCall oldAggCall : oldWindowGroup.aggCalls) {
       boolean changed = false;
-      List<RexNode> oldAggCallArgList = oldAggCall.getOperands();
-      List<RexNode> rexList = new ArrayList<>(oldAggCallArgList.size());
-      for (RexNode rexNode : oldAggCallArgList) {
-        RexNode newRexNode = rexNode;
-        if (rexNode instanceof RexInputRef) {
-          RexInputRef inputRef = (RexInputRef) rexNode;
-          int inputRefIndex = inputRef.getIndex();
-          // If the input reference is greater than the window input size, it 
is a reference to the constants
-          if (inputRefIndex >= windowInputSize) {
-            newRexNode = window.constants.get(inputRefIndex - windowInputSize);
-            changed = true;
-            aggCallChanged = true;
-          } else {
-            RelNode windowInputRelNode = ((HepRelVertex) 
window.getInput()).getCurrentRel();
-            if (windowInputRelNode instanceof LogicalProject) {
-              RexNode inputRefRexNode = ((LogicalProject) 
windowInputRelNode).getProjects().get(inputRefIndex);
-              if (inputRefRexNode instanceof RexLiteral) {
-                // If the input reference is a literal, replace it with the 
literal value
-                newRexNode = inputRefRexNode;
-                changed = true;
-                aggCallChanged = true;
-              }
-            }
-          }
+      List<RexNode> oldOperands = oldAggCall.getOperands();
+      List<RexNode> newOperands = new ArrayList<>(oldOperands.size());
+      for (RexNode oldOperand : oldOperands) {
+        RexLiteral literal = getLiteral(oldOperand, numInputFields, 
window.constants, projects);
+        if (literal != null) {
+          newOperands.add(literal);
+          changed = true;
+          windowChanged = true;
+        } else {
+          newOperands.add(oldOperand);
         }
-        rexList.add(newRexNode);
       }
       if (changed) {
         newAggCallWindow.add(
-            new Window.RexWinAggCall((SqlAggFunction) 
oldAggCall.getOperator(), oldAggCall.type, rexList,
+            new Window.RexWinAggCall((SqlAggFunction) 
oldAggCall.getOperator(), oldAggCall.type, newOperands,
                 oldAggCall.ordinal, oldAggCall.distinct, 
oldAggCall.ignoreNulls));
       } else {
         newAggCallWindow.add(oldAggCall);
       }
     }
-    if (aggCallChanged) {
-      return new Window.Group(oldWindowGroup.keys, oldWindowGroup.isRows, 
oldWindowGroup.lowerBound,
-          oldWindowGroup.upperBound, oldWindowGroup.orderKeys, 
newAggCallWindow);
+
+    RexWindowBound lowerBound = oldWindowGroup.lowerBound;
+    RexNode offset = lowerBound.getOffset();
+    if (offset != null) {
+      RexLiteral literal = getLiteral(offset, numInputFields, 
window.constants, projects);

Review Comment:
   Could this return `null`? Should we throw exception when the literal cannot 
be read?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/aggregate/AggregateWindowFunction.java:
##########
@@ -18,113 +18,158 @@
  */
 package org.apache.pinot.query.runtime.operator.window.aggregate;
 
-import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.function.Function;
 import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.data.table.Key;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.runtime.operator.utils.AggregationUtils;
-import org.apache.pinot.query.runtime.operator.utils.AggregationUtils.Merger;
+import org.apache.pinot.query.runtime.operator.window.WindowFrame;
 import org.apache.pinot.query.runtime.operator.window.WindowFunction;
 
 
 public class AggregateWindowFunction extends WindowFunction {
-  private final Merger _merger;
+  private final WindowValueAggregator<Object> _windowValueAggregator;
+  private final String _functionName;
 
   public AggregateWindowFunction(RexExpression.FunctionCall aggCall, 
DataSchema inputSchema,
-      List<RelFieldCollation> collations, boolean partitionByOnly) {
-    super(aggCall, inputSchema, collations, partitionByOnly);
-    String functionName = aggCall.getFunctionName();
-    Function<ColumnDataType, Merger> mergerCreator = 
AggregationUtils.Accumulator.MERGERS.get(functionName);
-    Preconditions.checkArgument(mergerCreator != null, "Unsupported aggregate 
function: %s", functionName);
-    _merger = mergerCreator.apply(_dataType);
+      List<RelFieldCollation> collations, WindowFrame windowFrame) {
+    super(aggCall, inputSchema, collations, windowFrame);
+    _functionName = aggCall.getFunctionName();
+    _windowValueAggregator = 
WindowValueAggregatorFactory.getWindowValueAggregator(_functionName, _dataType,

Review Comment:
   For unbounded case, we shouldn't need removal support



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/aggregate/SumWindowValueAggregator.java:
##########
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator.window.aggregate;
+
+import javax.annotation.Nullable;
+
+
+/**
+ * Window value aggregator for SUM window function.
+ */
+public class SumWindowValueAggregator implements WindowValueAggregator<Object> 
{
+  private Double _sum = 0.0;

Review Comment:
   Use primitive `double`



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/aggregate/MaxWindowValueAggregator.java:
##########
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator.window.aggregate;
+
+import java.util.Deque;
+import java.util.LinkedList;
+import javax.annotation.Nullable;
+
+
+/**
+ * Window value aggregator for MAX window function.
+ */
+public class MaxWindowValueAggregator implements WindowValueAggregator<Object> 
{
+
+  private final boolean _supportRemoval;
+  private final Deque<Double> _deque = new LinkedList<>();

Review Comment:
   Consider using primitive queue (e.g. `DoubleArrayFIFOQueue`) to save memory



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to