This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new db69d3d54a Add FunnelMaxStepAggregationFunction and 
FunnelCompleteCountAggregationFunction (#13231)
db69d3d54a is described below

commit db69d3d54abaf82e3ee5dae7329ef53384326c04
Author: Xiang Fu <xiangfu.1...@gmail.com>
AuthorDate: Tue Jun 4 10:09:15 2024 -0700

    Add FunnelMaxStepAggregationFunction and 
FunnelCompleteCountAggregationFunction (#13231)
---
 .../common/function/TransformFunctionType.java     |   3 +
 .../common/function/scalar/ArrayFunctions.java     |  18 +
 .../function/FunctionDefinitionRegistryTest.java   |   7 +-
 .../function/AggregationFunctionFactory.java       |   8 +-
 .../FunnelBaseAggregationFunction.java}            | 130 ++----
 .../FunnelCompleteCountAggregationFunction.java    | 110 +++++
 .../window/FunnelMatchStepAggregationFunction.java | 127 ++++++
 .../window/FunnelMaxStepAggregationFunction.java   | 110 +++++
 .../integration/tests/custom/WindowFunnelTest.java | 459 ++++++++++++++++++++-
 .../pinot/segment/spi/AggregationFunctionType.java |  19 +
 10 files changed, 866 insertions(+), 125 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java
index f7740a1e35..b134365f17 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java
@@ -193,6 +193,9 @@ public enum TransformFunctionType {
   ARRAY_MAX("arrayMax", ReturnTypes.cascade(opBinding -> 
positionalComponentReturnType(opBinding, 0),
       SqlTypeTransforms.FORCE_NULLABLE), 
OperandTypes.family(SqlTypeFamily.ARRAY), "array_max"),
   ARRAY_SUM("arraySum", ReturnTypes.DOUBLE, 
OperandTypes.family(SqlTypeFamily.ARRAY), "array_sum"),
+  ARRAY_SUM_INT("arraySumInt", ReturnTypes.INTEGER, 
OperandTypes.family(SqlTypeFamily.ARRAY), "array_sum_int"),
+  ARRAY_SUM_LONG("arraySumLong", ReturnTypes.BIGINT, 
OperandTypes.family(SqlTypeFamily.ARRAY), "array_sum_long"),
+
   VALUE_IN("valueIn", "value_in"),
   MAP_VALUE("mapValue", ReturnTypes.cascade(opBinding ->
       opBinding.getOperandType(2).getComponentType(), 
SqlTypeTransforms.FORCE_NULLABLE),
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ArrayFunctions.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ArrayFunctions.java
index 07fc94cebd..52997d0926 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ArrayFunctions.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ArrayFunctions.java
@@ -230,6 +230,24 @@ public class ArrayFunctions {
     return idx > 0 && idx <= arr.length ? arr[idx - 1] : 
NullValuePlaceHolder.STRING;
   }
 
+  @ScalarFunction
+  public static int arraySumInt(int[] arr) {
+    int sum = 0;
+    for (int value : arr) {
+      sum += value;
+    }
+    return sum;
+  }
+
+  @ScalarFunction
+  public static long arraySumLong(long[] arr) {
+    long sum = 0;
+    for (long value : arr) {
+      sum += value;
+    }
+    return sum;
+  }
+
   @ScalarFunction(names = {"array", "arrayValueConstructor"}, isVarArg = true)
   public static Object arrayValueConstructor(Object... arr) {
     if (arr == null || arr.length == 0) {
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/function/FunctionDefinitionRegistryTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/function/FunctionDefinitionRegistryTest.java
index 819c8b84c2..d2771bc626 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/function/FunctionDefinitionRegistryTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/function/FunctionDefinitionRegistryTest.java
@@ -47,9 +47,10 @@ public class FunctionDefinitionRegistryTest {
       // Scalar function
       "scalar",
       // Functions without scalar function counterpart as of now
-      "arraylength", "arrayaverage", "arraymin", "arraymax", "arraysum", 
"clpdecode", "clpencodedvarsmatch", "groovy",
-      "inidset", "jsonextractscalar", "jsonextractindex", "jsonextractkey", 
"lookup", "mapvalue", "timeconvert",
-      "valuein", "datetimeconvertwindowhop",
+      "arraylength", "arrayaverage", "arraymin", "arraymax", "arraysum", 
"arraysumint", "arraysumlong",
+      "clpdecode", "clpencodedvarsmatch", "groovy", "inidset",
+      "jsonextractscalar", "jsonextractindex", "jsonextractkey",
+      "lookup", "mapvalue", "timeconvert", "valuein", 
"datetimeconvertwindowhop",
       // functions not needed for register b/c they are in std sql table or 
they will not be composed directly.
       "in", "not_in", "and", "or", "range", "extract", "is_true", 
"is_not_true", "is_false", "is_not_false"
   );
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index 5ef12fc661..c2e57ce54f 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -36,7 +36,9 @@ import 
org.apache.pinot.core.query.aggregation.function.array.ArrayAggStringFunc
 import 
org.apache.pinot.core.query.aggregation.function.array.ListAggDistinctFunction;
 import org.apache.pinot.core.query.aggregation.function.array.ListAggFunction;
 import 
org.apache.pinot.core.query.aggregation.function.funnel.FunnelCountAggregationFunctionFactory;
-import 
org.apache.pinot.core.query.aggregation.function.funnel.FunnelMaxStepAggregationFunction;
+import 
org.apache.pinot.core.query.aggregation.function.funnel.window.FunnelCompleteCountAggregationFunction;
+import 
org.apache.pinot.core.query.aggregation.function.funnel.window.FunnelMatchStepAggregationFunction;
+import 
org.apache.pinot.core.query.aggregation.function.funnel.window.FunnelMaxStepAggregationFunction;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.exception.BadQueryRequestException;
@@ -455,6 +457,10 @@ public class AggregationFunctionFactory {
             return new FunnelCountAggregationFunctionFactory(arguments).get();
           case FUNNELMAXSTEP:
             return new FunnelMaxStepAggregationFunction(arguments);
+          case FUNNELMATCHSTEP:
+            return new FunnelMatchStepAggregationFunction(arguments);
+          case FUNNELCOMPLETECOUNT:
+            return new FunnelCompleteCountAggregationFunction(arguments);
           case FREQUENTSTRINGSSKETCH:
             return new FrequentStringsSketchAggregationFunction(arguments);
           case FREQUENTLONGSSKETCH:
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/FunnelMaxStepAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelBaseAggregationFunction.java
similarity index 69%
rename from 
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/FunnelMaxStepAggregationFunction.java
rename to 
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelBaseAggregationFunction.java
index 9f87130dc7..4df7a83a88 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/FunnelMaxStepAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelBaseAggregationFunction.java
@@ -16,10 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.query.aggregation.function.funnel;
+package org.apache.pinot.core.query.aggregation.function.funnel.window;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.List;
@@ -32,45 +31,40 @@ import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.aggregation.function.funnel.FunnelStepEvent;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
 import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
-import org.apache.pinot.segment.spi.AggregationFunctionType;
 
 
-public class FunnelMaxStepAggregationFunction implements 
AggregationFunction<PriorityQueue<FunnelStepEvent>, Long> {
-  private final ExpressionContext _timestampExpression;
-  private final long _windowSize;
-  private final List<ExpressionContext> _stepExpressions;
-  private final FunnelModes _modes = new FunnelModes();
-  private final int _numSteps;
+public abstract class FunnelBaseAggregationFunction<F extends Comparable>
+    implements AggregationFunction<PriorityQueue<FunnelStepEvent>, F> {
+  protected final ExpressionContext _timestampExpression;
+  protected final long _windowSize;
+  protected final List<ExpressionContext> _stepExpressions;
+  protected final FunnelModes _modes = new FunnelModes();
+  protected final int _numSteps;
 
-  public FunnelMaxStepAggregationFunction(List<ExpressionContext> arguments) {
+  public FunnelBaseAggregationFunction(List<ExpressionContext> arguments) {
     int numArguments = arguments.size();
-    Preconditions.checkArgument(numArguments > 2,
-        "FUNNELMAXSTEP expects >= 3 arguments, got: %s. The function can be 
used as "
-            + "funnelMaxStep(timestampExpression, windowSize, 
ARRAY[stepExpression, ..], [mode, [mode, ... ]])",
+    Preconditions.checkArgument(numArguments > 3,
+        "FUNNEL_AGG_FUNC expects >= 4 arguments, got: %s. The function can be 
used as "
+            + getType().getName() + "(timestampExpression, windowSize, 
numberSteps, stepExpression, "
+            + "[stepExpression, ..], [mode, [mode, ... ]])",
         numArguments);
     _timestampExpression = arguments.get(0);
     _windowSize = arguments.get(1).getLiteral().getLongValue();
     Preconditions.checkArgument(_windowSize > 0, "Window size must be > 0");
-    ExpressionContext stepExpressionContext = arguments.get(2);
-    if (stepExpressionContext.getFunction() != null) {
-      // LEAF stage init this function like 
funnelmaxstep($1,'1000',arrayValueConstructor($2,$3,$4,...))
-      _stepExpressions = stepExpressionContext.getFunction().getArguments();
-    } else {
-      // Intermediate Stage init this function like 
funnelmaxstep($1,'1000',__PLACEHOLDER__)
-      _stepExpressions = ImmutableList.of();
-    }
-    if (numArguments > 3) {
-      arguments.subList(3, numArguments)
+    _numSteps = arguments.get(2).getLiteral().getIntValue();
+    Preconditions.checkArgument(numArguments >= 3 + _numSteps,
+        "FUNNEL_AGG_FUNC expects >= " + (3 + _numSteps) + " arguments, got: 
%s. The function can be used as "
+            + getType().getName() + "(timestampExpression, windowSize, 
numberSteps, stepExpression, "
+            + "[stepExpression, ..], [mode, [mode, ... ]])",
+        numArguments);
+    _stepExpressions = arguments.subList(3, 3 + _numSteps);
+    if (numArguments > 3 + _numSteps) {
+      arguments.subList(3 + _numSteps, numArguments)
           .forEach(arg -> 
_modes.add(Mode.valueOf(arg.getLiteral().getStringValue().toUpperCase())));
     }
-    _numSteps = _stepExpressions.size();
-  }
-
-  @Override
-  public AggregationFunctionType getType() {
-    return AggregationFunctionType.FUNNELMAXSTEP;
   }
 
   @Override
@@ -198,35 +192,6 @@ public class FunnelMaxStepAggregationFunction implements 
AggregationFunction<Pri
     return DataSchema.ColumnDataType.OBJECT;
   }
 
-  @Override
-  public DataSchema.ColumnDataType getFinalResultColumnType() {
-    return DataSchema.ColumnDataType.LONG;
-  }
-
-  @Override
-  public Long extractFinalResult(PriorityQueue<FunnelStepEvent> stepEvents) {
-    long finalMaxStep = 0;
-    if (stepEvents == null || stepEvents.isEmpty()) {
-      return finalMaxStep;
-    }
-    ArrayDeque<FunnelStepEvent> slidingWindow = new ArrayDeque<>();
-    while (!stepEvents.isEmpty()) {
-      fillWindow(stepEvents, slidingWindow);
-      if (slidingWindow.isEmpty()) {
-        break;
-      }
-      int maxSteps = processWindow(slidingWindow);
-      finalMaxStep = Math.max(finalMaxStep, maxSteps);
-      if (finalMaxStep == _numSteps) {
-        break;
-      }
-      if (!slidingWindow.isEmpty()) {
-        slidingWindow.pollFirst();
-      }
-    }
-    return finalMaxStep;
-  }
-
   /**
    * Fill the sliding window with the events that fall into the window.
    * Note that the events from stepEvents are dequeued and added to the 
sliding window.
@@ -234,7 +199,7 @@ public class FunnelMaxStepAggregationFunction implements 
AggregationFunction<Pri
    * @param stepEvents The priority queue of step events
    * @param slidingWindow The sliding window with events that fall into the 
window
    */
-  private void fillWindow(PriorityQueue<FunnelStepEvent> stepEvents, 
ArrayDeque<FunnelStepEvent> slidingWindow) {
+  protected void fillWindow(PriorityQueue<FunnelStepEvent> stepEvents, 
ArrayDeque<FunnelStepEvent> slidingWindow) {
     // Ensure for the sliding window, the first event is the first step
     while ((!slidingWindow.isEmpty()) && slidingWindow.peek().getStep() != 0) {
       slidingWindow.pollFirst();
@@ -256,51 +221,10 @@ public class FunnelMaxStepAggregationFunction implements 
AggregationFunction<Pri
     }
   }
 
-  private int processWindow(ArrayDeque<FunnelStepEvent> slidingWindow) {
-    int maxStep = 0;
-    long previousTimestamp = -1;
-    for (FunnelStepEvent event : slidingWindow) {
-      int currentEventStep = event.getStep();
-      // If the same condition holds for the sequence of events, then such 
repeating event interrupts further
-      // processing.
-      if (_modes.hasStrictDeduplication()) {
-        if (currentEventStep == maxStep - 1) {
-          return maxStep;
-        }
-      }
-      // Don't allow interventions of other events. E.g. in the case of 
A->B->D->C, it stops finding A->B->C at the D
-      // and the max event level is 2.
-      if (_modes.hasStrictOrder()) {
-        if (currentEventStep != maxStep) {
-          return maxStep;
-        }
-      }
-      // Apply conditions only to events with strictly increasing timestamps.
-      if (_modes.hasStrictIncrease()) {
-        if (previousTimestamp == event.getTimestamp()) {
-          continue;
-        }
-      }
-      previousTimestamp = event.getTimestamp();
-      if (maxStep == currentEventStep) {
-        maxStep++;
-      }
-      if (maxStep == _numSteps) {
-        break;
-      }
-    }
-    return maxStep;
-  }
-
-  @Override
-  public Long mergeFinalResult(Long finalResult1, Long finalResult2) {
-    return Math.max(finalResult1, finalResult2);
-  }
-
   @Override
   public String toExplainString() {
     //@formatter:off
-    return "WindowFunnelAggregationFunction{"
+    return getType().getName() + "{"
         + "timestampExpression=" + _timestampExpression
         + ", windowSize=" + _windowSize
         + ", stepExpressions=" + _stepExpressions
@@ -308,7 +232,7 @@ public class FunnelMaxStepAggregationFunction implements 
AggregationFunction<Pri
     //@formatter:on
   }
 
-  enum Mode {
+  protected enum Mode {
     STRICT_DEDUPLICATION(1), STRICT_ORDER(2), STRICT_INCREASE(4);
 
     private final int _value;
@@ -322,7 +246,7 @@ public class FunnelMaxStepAggregationFunction implements 
AggregationFunction<Pri
     }
   }
 
-  static class FunnelModes {
+  protected static class FunnelModes {
     private int _bitmask = 0;
 
     public void add(Mode mode) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelCompleteCountAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelCompleteCountAggregationFunction.java
new file mode 100644
index 0000000000..0f1d12e269
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelCompleteCountAggregationFunction.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.funnel.window;
+
+import java.util.ArrayDeque;
+import java.util.List;
+import java.util.PriorityQueue;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.query.aggregation.function.funnel.FunnelStepEvent;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class FunnelCompleteCountAggregationFunction extends 
FunnelBaseAggregationFunction<Integer> {
+
+  public FunnelCompleteCountAggregationFunction(List<ExpressionContext> 
arguments) {
+    super(arguments);
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.FUNNELCOMPLETECOUNT;
+  }
+
+  @Override
+  public DataSchema.ColumnDataType getFinalResultColumnType() {
+    return DataSchema.ColumnDataType.INT;
+  }
+
+  @Override
+  public Integer extractFinalResult(PriorityQueue<FunnelStepEvent> stepEvents) 
{
+    int totalCompletedRounds = 0;
+    if (stepEvents == null || stepEvents.isEmpty()) {
+      return totalCompletedRounds;
+    }
+    ArrayDeque<FunnelStepEvent> slidingWindow = new ArrayDeque<>();
+    while (!stepEvents.isEmpty()) {
+      fillWindow(stepEvents, slidingWindow);
+      if (slidingWindow.isEmpty()) {
+        break;
+      }
+
+      long windowStart = slidingWindow.peek().getTimestamp();
+
+      int maxStep = 0;
+      long previousTimestamp = -1;
+      for (FunnelStepEvent event : slidingWindow) {
+        int currentEventStep = event.getStep();
+        // If the same condition holds for the sequence of events, then such 
repeating event interrupts further
+        // processing.
+        if (_modes.hasStrictDeduplication()) {
+          if (currentEventStep == maxStep - 1) {
+            maxStep = 0;
+          }
+        }
+        // Don't allow interventions of other events. E.g. in the case of 
A->B->D->C, it stops finding A->B->C at the D
+        // and the max event level is 2.
+        if (_modes.hasStrictOrder()) {
+          if (currentEventStep != maxStep) {
+            maxStep = 0;
+          }
+        }
+        // Apply conditions only to events with strictly increasing timestamps.
+        if (_modes.hasStrictIncrease()) {
+          if (previousTimestamp == event.getTimestamp()) {
+            continue;
+          }
+        }
+        previousTimestamp = event.getTimestamp();
+        if (maxStep == currentEventStep) {
+          maxStep++;
+        }
+        if (maxStep == _numSteps) {
+          totalCompletedRounds++;
+          maxStep = 0;
+          windowStart = event.getTimestamp();
+        }
+      }
+      if (!slidingWindow.isEmpty()) {
+        slidingWindow.pollFirst();
+      }
+      // sliding window should pop until current event:
+      while (!slidingWindow.isEmpty() && slidingWindow.peek().getTimestamp() < 
windowStart) {
+        slidingWindow.pollFirst();
+      }
+    }
+    return totalCompletedRounds;
+  }
+
+  @Override
+  public Integer mergeFinalResult(Integer finalResult1, Integer finalResult2) {
+    return finalResult1 + finalResult2;
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelMatchStepAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelMatchStepAggregationFunction.java
new file mode 100644
index 0000000000..044157fa21
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelMatchStepAggregationFunction.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.funnel.window;
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import java.util.ArrayDeque;
+import java.util.List;
+import java.util.PriorityQueue;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.query.aggregation.function.funnel.FunnelStepEvent;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class FunnelMatchStepAggregationFunction extends 
FunnelBaseAggregationFunction<IntArrayList> {
+
+  public FunnelMatchStepAggregationFunction(List<ExpressionContext> arguments) 
{
+    super(arguments);
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.FUNNELMATCHSTEP;
+  }
+
+  @Override
+  public DataSchema.ColumnDataType getFinalResultColumnType() {
+    return DataSchema.ColumnDataType.INT_ARRAY;
+  }
+
+  @Override
+  public IntArrayList extractFinalResult(PriorityQueue<FunnelStepEvent> 
stepEvents) {
+    int finalMaxStep = 0;
+    IntArrayList result = new IntArrayList(_numSteps);
+    for (int i = 0; i < _numSteps; i++) {
+      result.add(0);
+    }
+    if (stepEvents == null || stepEvents.isEmpty()) {
+      return result;
+    }
+    ArrayDeque<FunnelStepEvent> slidingWindow = new ArrayDeque<>();
+    while (!stepEvents.isEmpty()) {
+      fillWindow(stepEvents, slidingWindow);
+      if (slidingWindow.isEmpty()) {
+        break;
+      }
+      int maxSteps = processWindow(slidingWindow);
+      finalMaxStep = Math.max(finalMaxStep, maxSteps);
+      if (finalMaxStep == _numSteps) {
+        break;
+      }
+      if (!slidingWindow.isEmpty()) {
+        slidingWindow.pollFirst();
+      }
+    }
+    for (int i = 0; i < finalMaxStep; i++) {
+      result.set(i, 1);
+    }
+    return result;
+  }
+
+  protected Integer processWindow(ArrayDeque<FunnelStepEvent> slidingWindow) {
+    int maxStep = 0;
+    long previousTimestamp = -1;
+    for (FunnelStepEvent event : slidingWindow) {
+      int currentEventStep = event.getStep();
+      // If the same condition holds for the sequence of events, then such 
repeating event interrupts further
+      // processing.
+      if (_modes.hasStrictDeduplication()) {
+        if (currentEventStep == maxStep - 1) {
+          return maxStep;
+        }
+      }
+      // Don't allow interventions of other events. E.g. in the case of 
A->B->D->C, it stops finding A->B->C at the D
+      // and the max event level is 2.
+      if (_modes.hasStrictOrder()) {
+        if (currentEventStep != maxStep) {
+          return maxStep;
+        }
+      }
+      // Apply conditions only to events with strictly increasing timestamps.
+      if (_modes.hasStrictIncrease()) {
+        if (previousTimestamp == event.getTimestamp()) {
+          continue;
+        }
+      }
+      if (maxStep == currentEventStep) {
+        maxStep++;
+        previousTimestamp = event.getTimestamp();
+      }
+      if (maxStep == _numSteps) {
+        break;
+      }
+    }
+    return maxStep;
+  }
+
+  @Override
+  public IntArrayList mergeFinalResult(IntArrayList finalResult1, IntArrayList 
finalResult2) {
+    // Return the longest 1s sequence from both results
+    for (int i = 0; i < _numSteps; i++) {
+      if (finalResult1.getInt(i) == 0) {
+        return finalResult2;
+      }
+      if (finalResult2.getInt(i) == 0) {
+        return finalResult1;
+      }
+    }
+    return finalResult1;
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelMaxStepAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelMaxStepAggregationFunction.java
new file mode 100644
index 0000000000..73684ad460
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelMaxStepAggregationFunction.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.funnel.window;
+
+import java.util.ArrayDeque;
+import java.util.List;
+import java.util.PriorityQueue;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.query.aggregation.function.funnel.FunnelStepEvent;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class FunnelMaxStepAggregationFunction extends 
FunnelBaseAggregationFunction<Integer> {
+
+  public FunnelMaxStepAggregationFunction(List<ExpressionContext> arguments) {
+    super(arguments);
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.FUNNELMAXSTEP;
+  }
+
+  @Override
+  public DataSchema.ColumnDataType getFinalResultColumnType() {
+    return DataSchema.ColumnDataType.INT;
+  }
+
+  @Override
+  public Integer extractFinalResult(PriorityQueue<FunnelStepEvent> stepEvents) 
{
+    int finalMaxStep = 0;
+    if (stepEvents == null || stepEvents.isEmpty()) {
+      return finalMaxStep;
+    }
+    ArrayDeque<FunnelStepEvent> slidingWindow = new ArrayDeque<>();
+    while (!stepEvents.isEmpty()) {
+      fillWindow(stepEvents, slidingWindow);
+      if (slidingWindow.isEmpty()) {
+        break;
+      }
+      int maxSteps = processWindow(slidingWindow);
+      finalMaxStep = Math.max(finalMaxStep, maxSteps);
+      if (finalMaxStep == _numSteps) {
+        break;
+      }
+      if (!slidingWindow.isEmpty()) {
+        slidingWindow.pollFirst();
+      }
+    }
+    return finalMaxStep;
+  }
+
+  protected Integer processWindow(ArrayDeque<FunnelStepEvent> slidingWindow) {
+    int maxStep = 0;
+    long previousTimestamp = -1;
+    for (FunnelStepEvent event : slidingWindow) {
+      int currentEventStep = event.getStep();
+      // If the same condition holds for the sequence of events, then such 
repeating event interrupts further
+      // processing.
+      if (_modes.hasStrictDeduplication()) {
+        if (currentEventStep == maxStep - 1) {
+          return maxStep;
+        }
+      }
+      // Don't allow interventions of other events. E.g. in the case of 
A->B->D->C, it stops finding A->B->C at the D
+      // and the max event level is 2.
+      if (_modes.hasStrictOrder()) {
+        if (currentEventStep != maxStep) {
+          return maxStep;
+        }
+      }
+      // Apply conditions only to events with strictly increasing timestamps.
+      if (_modes.hasStrictIncrease()) {
+        if (previousTimestamp == event.getTimestamp()) {
+          continue;
+        }
+      }
+      if (maxStep == currentEventStep) {
+        maxStep++;
+        previousTimestamp = event.getTimestamp();
+      }
+      if (maxStep == _numSteps) {
+        break;
+      }
+    }
+    return maxStep;
+  }
+
+  @Override
+  public Integer mergeFinalResult(Integer finalResult1, Integer finalResult2) {
+    return Math.max(finalResult1, finalResult2);
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java
index 4b373d5fac..9ccf4b547e 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java
@@ -51,13 +51,12 @@ public class WindowFunnelTest extends 
CustomDataQueryClusterIntegrationTest {
     setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     String query =
         String.format("SELECT "
-            + "funnelMaxStep(timestampCol, '1000', "
-            + "ARRAY[ "
+            + "funnelMaxStep(timestampCol, '1000', 4, "
             + "url = '/product/search', "
             + "url = '/cart/add', "
             + "url = '/checkout/start', "
             + "url = '/checkout/confirmation' "
-            + "] ) "
+            + ") "
             + "FROM %s LIMIT %d", getTableName(), getCountStarResult());
     JsonNode jsonNode = postQuery(query);
     JsonNode rows = jsonNode.get("resultTable").get("rows");
@@ -73,13 +72,12 @@ public class WindowFunnelTest extends 
CustomDataQueryClusterIntegrationTest {
     setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     String query =
         String.format("SELECT "
-            + "userId, funnelMaxStep(timestampCol, '1000', "
-            + "ARRAY[ "
+            + "userId, funnelMaxStep(timestampCol, '1000', 4, "
             + "url = '/product/search', "
             + "url = '/cart/add', "
             + "url = '/checkout/start', "
             + "url = '/checkout/confirmation' "
-            + "] ) "
+            + ") "
             + "FROM %s GROUP BY userId ORDER BY userId LIMIT %d", 
getTableName(), getCountStarResult());
     JsonNode jsonNode = postQuery(query);
     JsonNode rows = jsonNode.get("resultTable").get("rows");
@@ -113,13 +111,12 @@ public class WindowFunnelTest extends 
CustomDataQueryClusterIntegrationTest {
     setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     String query =
         String.format("SELECT "
-            + "userId, funnelMaxStep(timestampCol, '1000', "
-            + "ARRAY[ "
+            + "userId, funnelMaxStep(timestampCol, '1000', 4, "
             + "url = '/product/search', "
             + "url = '/cart/add', "
             + "url = '/checkout/start', "
-            + "url = '/checkout/confirmation' "
-            + "], 'strict_order' ) "
+            + "url = '/checkout/confirmation', "
+            + "'strict_order' ) "
             + "FROM %s GROUP BY userId ORDER BY userId LIMIT %d", 
getTableName(), getCountStarResult());
     JsonNode jsonNode = postQuery(query);
     JsonNode rows = jsonNode.get("resultTable").get("rows");
@@ -148,13 +145,12 @@ public class WindowFunnelTest extends 
CustomDataQueryClusterIntegrationTest {
 
     query =
         String.format("SELECT "
-            + "userId, funnelMaxStep(timestampCol, '1000', "
-            + "ARRAY[ "
+            + "userId, funnelMaxStep(timestampCol, '1000', 4, "
             + "url = '/product/search', "
             + "url = '/cart/add', "
             + "url = '/checkout/start', "
-            + "url = '/checkout/confirmation' "
-            + "], 'strict_deduplication' ) "
+            + "url = '/checkout/confirmation', "
+            + "'strict_deduplication' ) "
             + "FROM %s GROUP BY userId ORDER BY userId LIMIT %d", 
getTableName(), getCountStarResult());
     jsonNode = postQuery(query);
     rows = jsonNode.get("resultTable").get("rows");
@@ -183,13 +179,12 @@ public class WindowFunnelTest extends 
CustomDataQueryClusterIntegrationTest {
 
     query =
         String.format("SELECT "
-            + "userId, funnelMaxStep(timestampCol, '1000', "
-            + "ARRAY[ "
+            + "userId, funnelMaxStep(timestampCol, '1000', 4, "
             + "url = '/product/search', "
             + "url = '/cart/add', "
             + "url = '/checkout/start', "
-            + "url = '/checkout/confirmation' "
-            + "], 'strict_increase' ) "
+            + "url = '/checkout/confirmation', "
+            + "'strict_increase' ) "
             + "FROM %s GROUP BY userId ORDER BY userId LIMIT %d", 
getTableName(), getCountStarResult());
     jsonNode = postQuery(query);
     rows = jsonNode.get("resultTable").get("rows");
@@ -217,6 +212,434 @@ public class WindowFunnelTest extends 
CustomDataQueryClusterIntegrationTest {
     }
   }
 
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testFunnelMatchStepGroupByQueriesWithMode(boolean 
useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String query =
+        String.format("SELECT "
+            + "userId, funnelMatchStep(timestampCol, '1000', 4, "
+            + "url = '/product/search', "
+            + "url = '/cart/add', "
+            + "url = '/checkout/start', "
+            + "url = '/checkout/confirmation', "
+            + "'strict_order' ) "
+            + "FROM %s GROUP BY userId ORDER BY userId LIMIT %d", 
getTableName(), getCountStarResult());
+    JsonNode jsonNode = postQuery(query);
+    JsonNode rows = jsonNode.get("resultTable").get("rows");
+    assertEquals(rows.size(), 40);
+    for (int i = 0; i < 40; i++) {
+      JsonNode row = rows.get(i);
+      assertEquals(row.size(), 2);
+      assertEquals(row.get(0).textValue(), "user" + (i / 10) + (i % 10));
+      int sumSteps = 0;
+      for (JsonNode step : row.get(1)) {
+        sumSteps += step.intValue();
+      }
+      switch (i / 10) {
+        case 0:
+          assertEquals(sumSteps, 3);
+          break;
+        case 1:
+          assertEquals(sumSteps, 3);
+          break;
+        case 2:
+          assertEquals(sumSteps, 2);
+          break;
+        case 3:
+          assertEquals(sumSteps, 1);
+          break;
+        default:
+          throw new IllegalStateException();
+      }
+    }
+
+    query =
+        String.format("SELECT "
+            + "userId, funnelMatchStep(timestampCol, '1000', 4, "
+            + "url = '/product/search', "
+            + "url = '/cart/add', "
+            + "url = '/checkout/start', "
+            + "url = '/checkout/confirmation', "
+            + "'strict_deduplication' ) "
+            + "FROM %s GROUP BY userId ORDER BY userId LIMIT %d", 
getTableName(), getCountStarResult());
+    jsonNode = postQuery(query);
+    rows = jsonNode.get("resultTable").get("rows");
+    assertEquals(rows.size(), 40);
+    for (int i = 0; i < 40; i++) {
+      JsonNode row = rows.get(i);
+      assertEquals(row.size(), 2);
+      assertEquals(row.get(0).textValue(), "user" + (i / 10) + (i % 10));
+      int sumSteps = 0;
+      for (JsonNode step : row.get(1)) {
+        sumSteps += step.intValue();
+      }
+      switch (i / 10) {
+        case 0:
+          assertEquals(sumSteps, 4);
+          break;
+        case 1:
+          assertEquals(sumSteps, 3);
+          break;
+        case 2:
+          assertEquals(sumSteps, 2);
+          break;
+        case 3:
+          assertEquals(sumSteps, 1);
+          break;
+        default:
+          throw new IllegalStateException();
+      }
+    }
+
+    query =
+        String.format("SELECT "
+            + "userId, funnelMatchStep(timestampCol, '1000', 4, "
+            + "url = '/product/search', "
+            + "url = '/cart/add', "
+            + "url = '/checkout/start', "
+            + "url = '/checkout/confirmation', "
+            + "'strict_increase' ) "
+            + "FROM %s GROUP BY userId ORDER BY userId LIMIT %d", 
getTableName(), getCountStarResult());
+    jsonNode = postQuery(query);
+    rows = jsonNode.get("resultTable").get("rows");
+    assertEquals(rows.size(), 40);
+    for (int i = 0; i < 40; i++) {
+      JsonNode row = rows.get(i);
+      assertEquals(row.size(), 2);
+      assertEquals(row.get(0).textValue(), "user" + (i / 10) + (i % 10));
+      int sumSteps = 0;
+      for (JsonNode step : row.get(1)) {
+        sumSteps += step.intValue();
+      }
+      switch (i / 10) {
+        case 0:
+          assertEquals(sumSteps, 4);
+          break;
+        case 1:
+          assertEquals(sumSteps, 2);
+          break;
+        case 2:
+          assertEquals(sumSteps, 3);
+          break;
+        case 3:
+          assertEquals(sumSteps, 1);
+          break;
+        default:
+          throw new IllegalStateException();
+      }
+    }
+  }
+
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testFunnelMatchStepGroupByQueriesWithModeSkipLeaf(boolean 
useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String query =
+        String.format("SELECT /*+ 
aggOptions(is_skip_leaf_stage_group_by='true') */ "
+            + "userId, funnelMatchStep(timestampCol, '1000', 4, "
+            + "url = '/product/search', "
+            + "url = '/cart/add', "
+            + "url = '/checkout/start', "
+            + "url = '/checkout/confirmation', "
+            + "'strict_order' ) "
+            + "FROM %s GROUP BY userId ORDER BY userId LIMIT %d", 
getTableName(), getCountStarResult());
+    JsonNode jsonNode = postQuery(query);
+    JsonNode rows = jsonNode.get("resultTable").get("rows");
+    assertEquals(rows.size(), 40);
+    for (int i = 0; i < 40; i++) {
+      JsonNode row = rows.get(i);
+      assertEquals(row.size(), 2);
+      assertEquals(row.get(0).textValue(), "user" + (i / 10) + (i % 10));
+      int sumSteps = 0;
+      for (JsonNode step : row.get(1)) {
+        sumSteps += step.intValue();
+      }
+      switch (i / 10) {
+        case 0:
+          assertEquals(sumSteps, 3);
+          break;
+        case 1:
+          assertEquals(sumSteps, 3);
+          break;
+        case 2:
+          assertEquals(sumSteps, 2);
+          break;
+        case 3:
+          assertEquals(sumSteps, 1);
+          break;
+        default:
+          throw new IllegalStateException();
+      }
+    }
+
+    query =
+        String.format("SELECT /*+ 
aggOptions(is_skip_leaf_stage_group_by='true') */ "
+            + "userId, funnelMatchStep(timestampCol, '1000', 4, "
+            + "url = '/product/search', "
+            + "url = '/cart/add', "
+            + "url = '/checkout/start', "
+            + "url = '/checkout/confirmation', "
+            + "'strict_deduplication' ) "
+            + "FROM %s GROUP BY userId ORDER BY userId LIMIT %d", 
getTableName(), getCountStarResult());
+    jsonNode = postQuery(query);
+    rows = jsonNode.get("resultTable").get("rows");
+    assertEquals(rows.size(), 40);
+    for (int i = 0; i < 40; i++) {
+      JsonNode row = rows.get(i);
+      assertEquals(row.size(), 2);
+      assertEquals(row.get(0).textValue(), "user" + (i / 10) + (i % 10));
+      int sumSteps = 0;
+      for (JsonNode step : row.get(1)) {
+        sumSteps += step.intValue();
+      }
+      switch (i / 10) {
+        case 0:
+          assertEquals(sumSteps, 4);
+          break;
+        case 1:
+          assertEquals(sumSteps, 3);
+          break;
+        case 2:
+          assertEquals(sumSteps, 2);
+          break;
+        case 3:
+          assertEquals(sumSteps, 1);
+          break;
+        default:
+          throw new IllegalStateException();
+      }
+    }
+
+    query =
+        String.format("SELECT "
+            + "userId, funnelMatchStep(timestampCol, '1000', 4, "
+            + "url = '/product/search', "
+            + "url = '/cart/add', "
+            + "url = '/checkout/start', "
+            + "url = '/checkout/confirmation', "
+            + "'strict_increase' ) "
+            + "FROM %s GROUP BY userId ORDER BY userId LIMIT %d", 
getTableName(), getCountStarResult());
+    jsonNode = postQuery(query);
+    rows = jsonNode.get("resultTable").get("rows");
+    assertEquals(rows.size(), 40);
+    for (int i = 0; i < 40; i++) {
+      JsonNode row = rows.get(i);
+      assertEquals(row.size(), 2);
+      assertEquals(row.get(0).textValue(), "user" + (i / 10) + (i % 10));
+      int sumSteps = 0;
+      for (JsonNode step : row.get(1)) {
+        sumSteps += step.intValue();
+      }
+      switch (i / 10) {
+        case 0:
+          assertEquals(sumSteps, 4);
+          break;
+        case 1:
+          assertEquals(sumSteps, 2);
+          break;
+        case 2:
+          assertEquals(sumSteps, 3);
+          break;
+        case 3:
+          assertEquals(sumSteps, 1);
+          break;
+        default:
+          throw new IllegalStateException();
+      }
+    }
+  }
+
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testFunnelMatchStepGroupByQueriesWithModeAndSum(boolean 
useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String query =
+        String.format("SELECT "
+            + "userId, arraySumInt(funnelMatchStep(timestampCol, '1000', 4, "
+            + "url = '/product/search', "
+            + "url = '/cart/add', "
+            + "url = '/checkout/start', "
+            + "url = '/checkout/confirmation', "
+            + "'strict_order' )) "
+            + "FROM %s GROUP BY userId ORDER BY userId LIMIT %d", 
getTableName(), getCountStarResult());
+    JsonNode jsonNode = postQuery(query);
+    JsonNode rows = jsonNode.get("resultTable").get("rows");
+    assertEquals(rows.size(), 40);
+    for (int i = 0; i < 40; i++) {
+      JsonNode row = rows.get(i);
+      assertEquals(row.size(), 2);
+      assertEquals(row.get(0).textValue(), "user" + (i / 10) + (i % 10));
+      switch (i / 10) {
+        case 0:
+          assertEquals(row.get(1).intValue(), 3);
+          break;
+        case 1:
+          assertEquals(row.get(1).intValue(), 3);
+          break;
+        case 2:
+          assertEquals(row.get(1).intValue(), 2);
+          break;
+        case 3:
+          assertEquals(row.get(1).intValue(), 1);
+          break;
+        default:
+          throw new IllegalStateException();
+      }
+    }
+
+    query =
+        String.format("SELECT "
+            + "userId, arraySumInt(funnelMatchStep(timestampCol, '1000', 4, "
+            + "url = '/product/search', "
+            + "url = '/cart/add', "
+            + "url = '/checkout/start', "
+            + "url = '/checkout/confirmation', "
+            + "'strict_deduplication' )) "
+            + "FROM %s GROUP BY userId ORDER BY userId LIMIT %d", 
getTableName(), getCountStarResult());
+    jsonNode = postQuery(query);
+    rows = jsonNode.get("resultTable").get("rows");
+    assertEquals(rows.size(), 40);
+    for (int i = 0; i < 40; i++) {
+      JsonNode row = rows.get(i);
+      assertEquals(row.size(), 2);
+      assertEquals(row.get(0).textValue(), "user" + (i / 10) + (i % 10));
+      switch (i / 10) {
+        case 0:
+          assertEquals(row.get(1).intValue(), 4);
+          break;
+        case 1:
+          assertEquals(row.get(1).intValue(), 3);
+          break;
+        case 2:
+          assertEquals(row.get(1).intValue(), 2);
+          break;
+        case 3:
+          assertEquals(row.get(1).intValue(), 1);
+          break;
+        default:
+          throw new IllegalStateException();
+      }
+    }
+
+    query =
+        String.format("SELECT "
+            + "userId, arraySumInt(funnelMatchStep(timestampCol, '1000', 4, "
+            + "url = '/product/search', "
+            + "url = '/cart/add', "
+            + "url = '/checkout/start', "
+            + "url = '/checkout/confirmation', "
+            + "'strict_increase' )) "
+            + "FROM %s GROUP BY userId ORDER BY userId LIMIT %d", 
getTableName(), getCountStarResult());
+    jsonNode = postQuery(query);
+    rows = jsonNode.get("resultTable").get("rows");
+    assertEquals(rows.size(), 40);
+    for (int i = 0; i < 40; i++) {
+      JsonNode row = rows.get(i);
+      assertEquals(row.size(), 2);
+      assertEquals(row.get(0).textValue(), "user" + (i / 10) + (i % 10));
+      switch (i / 10) {
+        case 0:
+          assertEquals(row.get(1).intValue(), 4);
+          break;
+        case 1:
+          assertEquals(row.get(1).intValue(), 2);
+          break;
+        case 2:
+          assertEquals(row.get(1).intValue(), 3);
+          break;
+        case 3:
+          assertEquals(row.get(1).intValue(), 1);
+          break;
+        default:
+          throw new IllegalStateException();
+      }
+    }
+  }
+
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testFunnelCompleteCountGroupByQueries(boolean 
useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String query =
+        String.format("SELECT "
+            + "userId, funnelCompleteCount(timestampCol, '1000', 4, "
+            + "url = '/product/search', "
+            + "url = '/cart/add', "
+            + "url = '/checkout/start', "
+            + "url = '/checkout/confirmation' "
+            + ") "
+            + "FROM %s GROUP BY userId ORDER BY userId LIMIT %d", 
getTableName(), getCountStarResult());
+    JsonNode jsonNode = postQuery(query);
+    JsonNode rows = jsonNode.get("resultTable").get("rows");
+    assertEquals(rows.size(), 40);
+    for (int i = 0; i < 40; i++) {
+      JsonNode row = rows.get(i);
+      assertEquals(row.size(), 2);
+      assertEquals(row.get(0).textValue(), "user" + (i / 10) + (i % 10));
+      switch (i / 10) {
+        case 0:
+          assertEquals(row.get(1).intValue(), 1);
+          break;
+        case 1:
+          assertEquals(row.get(1).intValue(), 0);
+          break;
+        case 2:
+          assertEquals(row.get(1).intValue(), 0);
+          break;
+        case 3:
+          assertEquals(row.get(1).intValue(), 0);
+          break;
+        default:
+          throw new IllegalStateException();
+      }
+    }
+  }
+
+
+  @Test(dataProvider = "useV2QueryEngine")
+  public void testFunnelCompleteCountGroupByQueriesSkipLeaf(boolean 
useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String query =
+        String.format("SELECT /*+ 
aggOptions(is_skip_leaf_stage_group_by='true') */"
+            + "userId, funnelCompleteCount(timestampCol, '1000', 4, "
+            + "url = '/product/search', "
+            + "url = '/cart/add', "
+            + "url = '/checkout/start', "
+            + "url = '/checkout/confirmation' "
+            + ") "
+            + "FROM %s GROUP BY userId ORDER BY userId LIMIT %d", 
getTableName(), getCountStarResult());
+    JsonNode jsonNode = postQuery(query);
+    JsonNode rows = jsonNode.get("resultTable").get("rows");
+    assertEquals(rows.size(), 40);
+    for (int i = 0; i < 40; i++) {
+      JsonNode row = rows.get(i);
+      assertEquals(row.size(), 2);
+      assertEquals(row.get(0).textValue(), "user" + (i / 10) + (i % 10));
+      switch (i / 10) {
+        case 0:
+          assertEquals(row.get(1).intValue(), 1);
+          break;
+        case 1:
+          assertEquals(row.get(1).intValue(), 0);
+          break;
+        case 2:
+          assertEquals(row.get(1).intValue(), 0);
+          break;
+        case 3:
+          assertEquals(row.get(1).intValue(), 0);
+          break;
+        default:
+          throw new IllegalStateException();
+      }
+    }
+  }
+
   @Override
   public String getTableName() {
     return DEFAULT_TABLE_NAME;
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
index 877ac7f232..62060254ac 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
@@ -26,8 +26,11 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import javax.annotation.Nullable;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.sql.SqlFunctionCategory;
 import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlOperatorBinding;
 import org.apache.calcite.sql.type.OperandTypes;
 import org.apache.calcite.sql.type.ReturnTypes;
 import org.apache.calcite.sql.type.SqlOperandTypeChecker;
@@ -329,6 +332,10 @@ public enum AggregationFunctionType {
   // funnel aggregate functions
   FUNNELMAXSTEP("funnelMaxStep", null, SqlKind.OTHER_FUNCTION, 
SqlFunctionCategory.USER_DEFINED_FUNCTION,
       OperandTypes.VARIADIC, ReturnTypes.BIGINT, 
ReturnTypes.explicit(SqlTypeName.OTHER)),
+  FUNNELCOMPLETECOUNT("funnelCompleteCount", null, SqlKind.OTHER_FUNCTION, 
SqlFunctionCategory.USER_DEFINED_FUNCTION,
+      OperandTypes.VARIADIC, ReturnTypes.BIGINT, 
ReturnTypes.explicit(SqlTypeName.OTHER)),
+  FUNNELMATCHSTEP("funnelMatchStep", null, SqlKind.OTHER_FUNCTION, 
SqlFunctionCategory.USER_DEFINED_FUNCTION,
+      OperandTypes.VARIADIC, IntArrayReturnTypeInference.INSTANCE, 
ReturnTypes.explicit(SqlTypeName.OTHER)),
   // TODO: revisit support for funnel count in V2
   FUNNELCOUNT("funnelCount");
 
@@ -503,4 +510,16 @@ public enum AggregationFunctionType {
       }
     }
   }
+
+  static class IntArrayReturnTypeInference implements SqlReturnTypeInference {
+    static final IntArrayReturnTypeInference INSTANCE = new 
IntArrayReturnTypeInference();
+
+    @Override
+    public RelDataType inferReturnType(
+        SqlOperatorBinding opBinding) {
+      RelDataTypeFactory typeFactory = opBinding.getTypeFactory();
+      RelDataType elementType = typeFactory.createSqlType(SqlTypeName.INTEGER);
+      return typeFactory.createArrayType(elementType, -1);
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to