yashmayya commented on code in PR #14273:
URL: https://github.com/apache/pinot/pull/14273#discussion_r1814556802


##########
pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotWindowExchangeNodeInsertRule.java:
##########
@@ -144,60 +147,88 @@ public void onMatch(RelOptRuleCall call) {
         List.of(windowGroup)));
   }
 
+  /**
+   * Replaces the reference to literal arguments in the window group with the 
actual literal values.
+   * NOTE: {@link Window} has a field called "constants" which contains the 
literal values. If the input reference is
+   * beyond the window input size, it is a reference to the constants.
+   */
   private Window.Group updateLiteralArgumentsInWindowGroup(Window window) {
     Window.Group oldWindowGroup = window.groups.get(0);
-    int windowInputSize = window.getInput().getRowType().getFieldCount();
-    ImmutableList<Window.RexWinAggCall> oldAggCalls = oldWindowGroup.aggCalls;
-    List<Window.RexWinAggCall> newAggCallWindow = new 
ArrayList<>(oldAggCalls.size());
-    boolean aggCallChanged = false;
-    for (Window.RexWinAggCall oldAggCall : oldAggCalls) {
+    RelNode input = ((HepRelVertex) window.getInput()).getCurrentRel();
+    int numInputFields = input.getRowType().getFieldCount();
+    List<RexNode> projects = input instanceof Project ? ((Project) 
input).getProjects() : null;
+
+    List<Window.RexWinAggCall> newAggCallWindow = new 
ArrayList<>(oldWindowGroup.aggCalls.size());
+    boolean windowChanged = false;
+    for (Window.RexWinAggCall oldAggCall : oldWindowGroup.aggCalls) {
       boolean changed = false;
-      List<RexNode> oldAggCallArgList = oldAggCall.getOperands();
-      List<RexNode> rexList = new ArrayList<>(oldAggCallArgList.size());
-      for (RexNode rexNode : oldAggCallArgList) {
-        RexNode newRexNode = rexNode;
-        if (rexNode instanceof RexInputRef) {
-          RexInputRef inputRef = (RexInputRef) rexNode;
-          int inputRefIndex = inputRef.getIndex();
-          // If the input reference is greater than the window input size, it 
is a reference to the constants
-          if (inputRefIndex >= windowInputSize) {
-            newRexNode = window.constants.get(inputRefIndex - windowInputSize);
-            changed = true;
-            aggCallChanged = true;
-          } else {
-            RelNode windowInputRelNode = ((HepRelVertex) 
window.getInput()).getCurrentRel();
-            if (windowInputRelNode instanceof LogicalProject) {
-              RexNode inputRefRexNode = ((LogicalProject) 
windowInputRelNode).getProjects().get(inputRefIndex);
-              if (inputRefRexNode instanceof RexLiteral) {
-                // If the input reference is a literal, replace it with the 
literal value
-                newRexNode = inputRefRexNode;
-                changed = true;
-                aggCallChanged = true;
-              }
-            }
-          }
+      List<RexNode> oldOperands = oldAggCall.getOperands();
+      List<RexNode> newOperands = new ArrayList<>(oldOperands.size());
+      for (RexNode oldOperand : oldOperands) {
+        RexLiteral literal = getLiteral(oldOperand, numInputFields, 
window.constants, projects);
+        if (literal != null) {
+          newOperands.add(literal);
+          changed = true;
+          windowChanged = true;
+        } else {
+          newOperands.add(oldOperand);
         }
-        rexList.add(newRexNode);
       }
       if (changed) {
         newAggCallWindow.add(
-            new Window.RexWinAggCall((SqlAggFunction) 
oldAggCall.getOperator(), oldAggCall.type, rexList,
+            new Window.RexWinAggCall((SqlAggFunction) 
oldAggCall.getOperator(), oldAggCall.type, newOperands,
                 oldAggCall.ordinal, oldAggCall.distinct, 
oldAggCall.ignoreNulls));
       } else {
         newAggCallWindow.add(oldAggCall);
       }
     }
-    if (aggCallChanged) {
-      return new Window.Group(oldWindowGroup.keys, oldWindowGroup.isRows, 
oldWindowGroup.lowerBound,
-          oldWindowGroup.upperBound, oldWindowGroup.orderKeys, 
newAggCallWindow);
+
+    RexWindowBound lowerBound = oldWindowGroup.lowerBound;
+    RexNode offset = lowerBound.getOffset();
+    if (offset != null) {
+      RexLiteral literal = getLiteral(offset, numInputFields, 
window.constants, projects);

Review Comment:
   Good point - it should be guaranteed here that offset is a valid literal if 
offset is non-null (can be null for `UNBOUNDED` and `CURRENT ROW` cases only). 
For `ROWS` type window frames it has to be an integer literal, and for `RANGE` 
it can either be a numeric literal or a date / time literal (very few databases 
currently support this though) like `INTERVAL '1' DAY` - although we currently 
don't support offset `FOLLOWING / PRECEDING` with `RANGE` type window frames at 
all so we don't need to worry about this right now. 
   
   I've updated this to throw an exception with a useful error message if we 
are unable to read the literal for some reason here.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/aggregate/MaxWindowValueAggregator.java:
##########
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator.window.aggregate;
+
+import java.util.Deque;
+import java.util.LinkedList;
+import javax.annotation.Nullable;
+
+
+/**
+ * Window value aggregator for MAX window function.
+ */
+public class MaxWindowValueAggregator implements WindowValueAggregator<Object> 
{
+
+  private final boolean _supportRemoval;
+  private final Deque<Double> _deque = new LinkedList<>();

Review Comment:
   Makes sense, updated.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/aggregate/AggregateWindowFunction.java:
##########
@@ -18,113 +18,158 @@
  */
 package org.apache.pinot.query.runtime.operator.window.aggregate;
 
-import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.function.Function;
 import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.data.table.Key;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.runtime.operator.utils.AggregationUtils;
-import org.apache.pinot.query.runtime.operator.utils.AggregationUtils.Merger;
+import org.apache.pinot.query.runtime.operator.window.WindowFrame;
 import org.apache.pinot.query.runtime.operator.window.WindowFunction;
 
 
 public class AggregateWindowFunction extends WindowFunction {
-  private final Merger _merger;
+  private final WindowValueAggregator<Object> _windowValueAggregator;
+  private final String _functionName;
 
   public AggregateWindowFunction(RexExpression.FunctionCall aggCall, 
DataSchema inputSchema,
-      List<RelFieldCollation> collations, boolean partitionByOnly) {
-    super(aggCall, inputSchema, collations, partitionByOnly);
-    String functionName = aggCall.getFunctionName();
-    Function<ColumnDataType, Merger> mergerCreator = 
AggregationUtils.Accumulator.MERGERS.get(functionName);
-    Preconditions.checkArgument(mergerCreator != null, "Unsupported aggregate 
function: %s", functionName);
-    _merger = mergerCreator.apply(_dataType);
+      List<RelFieldCollation> collations, WindowFrame windowFrame) {
+    super(aggCall, inputSchema, collations, windowFrame);
+    _functionName = aggCall.getFunctionName();
+    _windowValueAggregator = 
WindowValueAggregatorFactory.getWindowValueAggregator(_functionName, _dataType,

Review Comment:
   Good catch, fixed!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to