This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new db69d3d54a Add FunnelMaxStepAggregationFunction and FunnelCompleteCountAggregationFunction (#13231) db69d3d54a is described below commit db69d3d54abaf82e3ee5dae7329ef53384326c04 Author: Xiang Fu <xiangfu.1...@gmail.com> AuthorDate: Tue Jun 4 10:09:15 2024 -0700 Add FunnelMaxStepAggregationFunction and FunnelCompleteCountAggregationFunction (#13231) --- .../common/function/TransformFunctionType.java | 3 + .../common/function/scalar/ArrayFunctions.java | 18 + .../function/FunctionDefinitionRegistryTest.java | 7 +- .../function/AggregationFunctionFactory.java | 8 +- .../FunnelBaseAggregationFunction.java} | 130 ++---- .../FunnelCompleteCountAggregationFunction.java | 110 +++++ .../window/FunnelMatchStepAggregationFunction.java | 127 ++++++ .../window/FunnelMaxStepAggregationFunction.java | 110 +++++ .../integration/tests/custom/WindowFunnelTest.java | 459 ++++++++++++++++++++- .../pinot/segment/spi/AggregationFunctionType.java | 19 + 10 files changed, 866 insertions(+), 125 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java b/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java index f7740a1e35..b134365f17 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java @@ -193,6 +193,9 @@ public enum TransformFunctionType { ARRAY_MAX("arrayMax", ReturnTypes.cascade(opBinding -> positionalComponentReturnType(opBinding, 0), SqlTypeTransforms.FORCE_NULLABLE), OperandTypes.family(SqlTypeFamily.ARRAY), "array_max"), ARRAY_SUM("arraySum", ReturnTypes.DOUBLE, OperandTypes.family(SqlTypeFamily.ARRAY), "array_sum"), + ARRAY_SUM_INT("arraySumInt", ReturnTypes.INTEGER, OperandTypes.family(SqlTypeFamily.ARRAY), "array_sum_int"), + ARRAY_SUM_LONG("arraySumLong", ReturnTypes.BIGINT, OperandTypes.family(SqlTypeFamily.ARRAY), "array_sum_long"), + VALUE_IN("valueIn", "value_in"), MAP_VALUE("mapValue", ReturnTypes.cascade(opBinding -> opBinding.getOperandType(2).getComponentType(), SqlTypeTransforms.FORCE_NULLABLE), diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ArrayFunctions.java b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ArrayFunctions.java index 07fc94cebd..52997d0926 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ArrayFunctions.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ArrayFunctions.java @@ -230,6 +230,24 @@ public class ArrayFunctions { return idx > 0 && idx <= arr.length ? arr[idx - 1] : NullValuePlaceHolder.STRING; } + @ScalarFunction + public static int arraySumInt(int[] arr) { + int sum = 0; + for (int value : arr) { + sum += value; + } + return sum; + } + + @ScalarFunction + public static long arraySumLong(long[] arr) { + long sum = 0; + for (long value : arr) { + sum += value; + } + return sum; + } + @ScalarFunction(names = {"array", "arrayValueConstructor"}, isVarArg = true) public static Object arrayValueConstructor(Object... arr) { if (arr == null || arr.length == 0) { diff --git a/pinot-common/src/test/java/org/apache/pinot/common/function/FunctionDefinitionRegistryTest.java b/pinot-common/src/test/java/org/apache/pinot/common/function/FunctionDefinitionRegistryTest.java index 819c8b84c2..d2771bc626 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/function/FunctionDefinitionRegistryTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/function/FunctionDefinitionRegistryTest.java @@ -47,9 +47,10 @@ public class FunctionDefinitionRegistryTest { // Scalar function "scalar", // Functions without scalar function counterpart as of now - "arraylength", "arrayaverage", "arraymin", "arraymax", "arraysum", "clpdecode", "clpencodedvarsmatch", "groovy", - "inidset", "jsonextractscalar", "jsonextractindex", "jsonextractkey", "lookup", "mapvalue", "timeconvert", - "valuein", "datetimeconvertwindowhop", + "arraylength", "arrayaverage", "arraymin", "arraymax", "arraysum", "arraysumint", "arraysumlong", + "clpdecode", "clpencodedvarsmatch", "groovy", "inidset", + "jsonextractscalar", "jsonextractindex", "jsonextractkey", + "lookup", "mapvalue", "timeconvert", "valuein", "datetimeconvertwindowhop", // functions not needed for register b/c they are in std sql table or they will not be composed directly. "in", "not_in", "and", "or", "range", "extract", "is_true", "is_not_true", "is_false", "is_not_false" ); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java index 5ef12fc661..c2e57ce54f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java @@ -36,7 +36,9 @@ import org.apache.pinot.core.query.aggregation.function.array.ArrayAggStringFunc import org.apache.pinot.core.query.aggregation.function.array.ListAggDistinctFunction; import org.apache.pinot.core.query.aggregation.function.array.ListAggFunction; import org.apache.pinot.core.query.aggregation.function.funnel.FunnelCountAggregationFunctionFactory; -import org.apache.pinot.core.query.aggregation.function.funnel.FunnelMaxStepAggregationFunction; +import org.apache.pinot.core.query.aggregation.function.funnel.window.FunnelCompleteCountAggregationFunction; +import org.apache.pinot.core.query.aggregation.function.funnel.window.FunnelMatchStepAggregationFunction; +import org.apache.pinot.core.query.aggregation.function.funnel.window.FunnelMaxStepAggregationFunction; import org.apache.pinot.segment.spi.AggregationFunctionType; import org.apache.pinot.spi.data.FieldSpec.DataType; import org.apache.pinot.spi.exception.BadQueryRequestException; @@ -455,6 +457,10 @@ public class AggregationFunctionFactory { return new FunnelCountAggregationFunctionFactory(arguments).get(); case FUNNELMAXSTEP: return new FunnelMaxStepAggregationFunction(arguments); + case FUNNELMATCHSTEP: + return new FunnelMatchStepAggregationFunction(arguments); + case FUNNELCOMPLETECOUNT: + return new FunnelCompleteCountAggregationFunction(arguments); case FREQUENTSTRINGSSKETCH: return new FrequentStringsSketchAggregationFunction(arguments); case FREQUENTLONGSSKETCH: diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/FunnelMaxStepAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelBaseAggregationFunction.java similarity index 69% rename from pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/FunnelMaxStepAggregationFunction.java rename to pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelBaseAggregationFunction.java index 9f87130dc7..4df7a83a88 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/FunnelMaxStepAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelBaseAggregationFunction.java @@ -16,10 +16,9 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.core.query.aggregation.function.funnel; +package org.apache.pinot.core.query.aggregation.function.funnel.window; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.List; @@ -32,45 +31,40 @@ import org.apache.pinot.core.common.BlockValSet; import org.apache.pinot.core.query.aggregation.AggregationResultHolder; import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; +import org.apache.pinot.core.query.aggregation.function.funnel.FunnelStepEvent; import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder; -import org.apache.pinot.segment.spi.AggregationFunctionType; -public class FunnelMaxStepAggregationFunction implements AggregationFunction<PriorityQueue<FunnelStepEvent>, Long> { - private final ExpressionContext _timestampExpression; - private final long _windowSize; - private final List<ExpressionContext> _stepExpressions; - private final FunnelModes _modes = new FunnelModes(); - private final int _numSteps; +public abstract class FunnelBaseAggregationFunction<F extends Comparable> + implements AggregationFunction<PriorityQueue<FunnelStepEvent>, F> { + protected final ExpressionContext _timestampExpression; + protected final long _windowSize; + protected final List<ExpressionContext> _stepExpressions; + protected final FunnelModes _modes = new FunnelModes(); + protected final int _numSteps; - public FunnelMaxStepAggregationFunction(List<ExpressionContext> arguments) { + public FunnelBaseAggregationFunction(List<ExpressionContext> arguments) { int numArguments = arguments.size(); - Preconditions.checkArgument(numArguments > 2, - "FUNNELMAXSTEP expects >= 3 arguments, got: %s. The function can be used as " - + "funnelMaxStep(timestampExpression, windowSize, ARRAY[stepExpression, ..], [mode, [mode, ... ]])", + Preconditions.checkArgument(numArguments > 3, + "FUNNEL_AGG_FUNC expects >= 4 arguments, got: %s. The function can be used as " + + getType().getName() + "(timestampExpression, windowSize, numberSteps, stepExpression, " + + "[stepExpression, ..], [mode, [mode, ... ]])", numArguments); _timestampExpression = arguments.get(0); _windowSize = arguments.get(1).getLiteral().getLongValue(); Preconditions.checkArgument(_windowSize > 0, "Window size must be > 0"); - ExpressionContext stepExpressionContext = arguments.get(2); - if (stepExpressionContext.getFunction() != null) { - // LEAF stage init this function like funnelmaxstep($1,'1000',arrayValueConstructor($2,$3,$4,...)) - _stepExpressions = stepExpressionContext.getFunction().getArguments(); - } else { - // Intermediate Stage init this function like funnelmaxstep($1,'1000',__PLACEHOLDER__) - _stepExpressions = ImmutableList.of(); - } - if (numArguments > 3) { - arguments.subList(3, numArguments) + _numSteps = arguments.get(2).getLiteral().getIntValue(); + Preconditions.checkArgument(numArguments >= 3 + _numSteps, + "FUNNEL_AGG_FUNC expects >= " + (3 + _numSteps) + " arguments, got: %s. The function can be used as " + + getType().getName() + "(timestampExpression, windowSize, numberSteps, stepExpression, " + + "[stepExpression, ..], [mode, [mode, ... ]])", + numArguments); + _stepExpressions = arguments.subList(3, 3 + _numSteps); + if (numArguments > 3 + _numSteps) { + arguments.subList(3 + _numSteps, numArguments) .forEach(arg -> _modes.add(Mode.valueOf(arg.getLiteral().getStringValue().toUpperCase()))); } - _numSteps = _stepExpressions.size(); - } - - @Override - public AggregationFunctionType getType() { - return AggregationFunctionType.FUNNELMAXSTEP; } @Override @@ -198,35 +192,6 @@ public class FunnelMaxStepAggregationFunction implements AggregationFunction<Pri return DataSchema.ColumnDataType.OBJECT; } - @Override - public DataSchema.ColumnDataType getFinalResultColumnType() { - return DataSchema.ColumnDataType.LONG; - } - - @Override - public Long extractFinalResult(PriorityQueue<FunnelStepEvent> stepEvents) { - long finalMaxStep = 0; - if (stepEvents == null || stepEvents.isEmpty()) { - return finalMaxStep; - } - ArrayDeque<FunnelStepEvent> slidingWindow = new ArrayDeque<>(); - while (!stepEvents.isEmpty()) { - fillWindow(stepEvents, slidingWindow); - if (slidingWindow.isEmpty()) { - break; - } - int maxSteps = processWindow(slidingWindow); - finalMaxStep = Math.max(finalMaxStep, maxSteps); - if (finalMaxStep == _numSteps) { - break; - } - if (!slidingWindow.isEmpty()) { - slidingWindow.pollFirst(); - } - } - return finalMaxStep; - } - /** * Fill the sliding window with the events that fall into the window. * Note that the events from stepEvents are dequeued and added to the sliding window. @@ -234,7 +199,7 @@ public class FunnelMaxStepAggregationFunction implements AggregationFunction<Pri * @param stepEvents The priority queue of step events * @param slidingWindow The sliding window with events that fall into the window */ - private void fillWindow(PriorityQueue<FunnelStepEvent> stepEvents, ArrayDeque<FunnelStepEvent> slidingWindow) { + protected void fillWindow(PriorityQueue<FunnelStepEvent> stepEvents, ArrayDeque<FunnelStepEvent> slidingWindow) { // Ensure for the sliding window, the first event is the first step while ((!slidingWindow.isEmpty()) && slidingWindow.peek().getStep() != 0) { slidingWindow.pollFirst(); @@ -256,51 +221,10 @@ public class FunnelMaxStepAggregationFunction implements AggregationFunction<Pri } } - private int processWindow(ArrayDeque<FunnelStepEvent> slidingWindow) { - int maxStep = 0; - long previousTimestamp = -1; - for (FunnelStepEvent event : slidingWindow) { - int currentEventStep = event.getStep(); - // If the same condition holds for the sequence of events, then such repeating event interrupts further - // processing. - if (_modes.hasStrictDeduplication()) { - if (currentEventStep == maxStep - 1) { - return maxStep; - } - } - // Don't allow interventions of other events. E.g. in the case of A->B->D->C, it stops finding A->B->C at the D - // and the max event level is 2. - if (_modes.hasStrictOrder()) { - if (currentEventStep != maxStep) { - return maxStep; - } - } - // Apply conditions only to events with strictly increasing timestamps. - if (_modes.hasStrictIncrease()) { - if (previousTimestamp == event.getTimestamp()) { - continue; - } - } - previousTimestamp = event.getTimestamp(); - if (maxStep == currentEventStep) { - maxStep++; - } - if (maxStep == _numSteps) { - break; - } - } - return maxStep; - } - - @Override - public Long mergeFinalResult(Long finalResult1, Long finalResult2) { - return Math.max(finalResult1, finalResult2); - } - @Override public String toExplainString() { //@formatter:off - return "WindowFunnelAggregationFunction{" + return getType().getName() + "{" + "timestampExpression=" + _timestampExpression + ", windowSize=" + _windowSize + ", stepExpressions=" + _stepExpressions @@ -308,7 +232,7 @@ public class FunnelMaxStepAggregationFunction implements AggregationFunction<Pri //@formatter:on } - enum Mode { + protected enum Mode { STRICT_DEDUPLICATION(1), STRICT_ORDER(2), STRICT_INCREASE(4); private final int _value; @@ -322,7 +246,7 @@ public class FunnelMaxStepAggregationFunction implements AggregationFunction<Pri } } - static class FunnelModes { + protected static class FunnelModes { private int _bitmask = 0; public void add(Mode mode) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelCompleteCountAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelCompleteCountAggregationFunction.java new file mode 100644 index 0000000000..0f1d12e269 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelCompleteCountAggregationFunction.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.aggregation.function.funnel.window; + +import java.util.ArrayDeque; +import java.util.List; +import java.util.PriorityQueue; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.query.aggregation.function.funnel.FunnelStepEvent; +import org.apache.pinot.segment.spi.AggregationFunctionType; + + +public class FunnelCompleteCountAggregationFunction extends FunnelBaseAggregationFunction<Integer> { + + public FunnelCompleteCountAggregationFunction(List<ExpressionContext> arguments) { + super(arguments); + } + + @Override + public AggregationFunctionType getType() { + return AggregationFunctionType.FUNNELCOMPLETECOUNT; + } + + @Override + public DataSchema.ColumnDataType getFinalResultColumnType() { + return DataSchema.ColumnDataType.INT; + } + + @Override + public Integer extractFinalResult(PriorityQueue<FunnelStepEvent> stepEvents) { + int totalCompletedRounds = 0; + if (stepEvents == null || stepEvents.isEmpty()) { + return totalCompletedRounds; + } + ArrayDeque<FunnelStepEvent> slidingWindow = new ArrayDeque<>(); + while (!stepEvents.isEmpty()) { + fillWindow(stepEvents, slidingWindow); + if (slidingWindow.isEmpty()) { + break; + } + + long windowStart = slidingWindow.peek().getTimestamp(); + + int maxStep = 0; + long previousTimestamp = -1; + for (FunnelStepEvent event : slidingWindow) { + int currentEventStep = event.getStep(); + // If the same condition holds for the sequence of events, then such repeating event interrupts further + // processing. + if (_modes.hasStrictDeduplication()) { + if (currentEventStep == maxStep - 1) { + maxStep = 0; + } + } + // Don't allow interventions of other events. E.g. in the case of A->B->D->C, it stops finding A->B->C at the D + // and the max event level is 2. + if (_modes.hasStrictOrder()) { + if (currentEventStep != maxStep) { + maxStep = 0; + } + } + // Apply conditions only to events with strictly increasing timestamps. + if (_modes.hasStrictIncrease()) { + if (previousTimestamp == event.getTimestamp()) { + continue; + } + } + previousTimestamp = event.getTimestamp(); + if (maxStep == currentEventStep) { + maxStep++; + } + if (maxStep == _numSteps) { + totalCompletedRounds++; + maxStep = 0; + windowStart = event.getTimestamp(); + } + } + if (!slidingWindow.isEmpty()) { + slidingWindow.pollFirst(); + } + // sliding window should pop until current event: + while (!slidingWindow.isEmpty() && slidingWindow.peek().getTimestamp() < windowStart) { + slidingWindow.pollFirst(); + } + } + return totalCompletedRounds; + } + + @Override + public Integer mergeFinalResult(Integer finalResult1, Integer finalResult2) { + return finalResult1 + finalResult2; + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelMatchStepAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelMatchStepAggregationFunction.java new file mode 100644 index 0000000000..044157fa21 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelMatchStepAggregationFunction.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.aggregation.function.funnel.window; + +import it.unimi.dsi.fastutil.ints.IntArrayList; +import java.util.ArrayDeque; +import java.util.List; +import java.util.PriorityQueue; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.query.aggregation.function.funnel.FunnelStepEvent; +import org.apache.pinot.segment.spi.AggregationFunctionType; + + +public class FunnelMatchStepAggregationFunction extends FunnelBaseAggregationFunction<IntArrayList> { + + public FunnelMatchStepAggregationFunction(List<ExpressionContext> arguments) { + super(arguments); + } + + @Override + public AggregationFunctionType getType() { + return AggregationFunctionType.FUNNELMATCHSTEP; + } + + @Override + public DataSchema.ColumnDataType getFinalResultColumnType() { + return DataSchema.ColumnDataType.INT_ARRAY; + } + + @Override + public IntArrayList extractFinalResult(PriorityQueue<FunnelStepEvent> stepEvents) { + int finalMaxStep = 0; + IntArrayList result = new IntArrayList(_numSteps); + for (int i = 0; i < _numSteps; i++) { + result.add(0); + } + if (stepEvents == null || stepEvents.isEmpty()) { + return result; + } + ArrayDeque<FunnelStepEvent> slidingWindow = new ArrayDeque<>(); + while (!stepEvents.isEmpty()) { + fillWindow(stepEvents, slidingWindow); + if (slidingWindow.isEmpty()) { + break; + } + int maxSteps = processWindow(slidingWindow); + finalMaxStep = Math.max(finalMaxStep, maxSteps); + if (finalMaxStep == _numSteps) { + break; + } + if (!slidingWindow.isEmpty()) { + slidingWindow.pollFirst(); + } + } + for (int i = 0; i < finalMaxStep; i++) { + result.set(i, 1); + } + return result; + } + + protected Integer processWindow(ArrayDeque<FunnelStepEvent> slidingWindow) { + int maxStep = 0; + long previousTimestamp = -1; + for (FunnelStepEvent event : slidingWindow) { + int currentEventStep = event.getStep(); + // If the same condition holds for the sequence of events, then such repeating event interrupts further + // processing. + if (_modes.hasStrictDeduplication()) { + if (currentEventStep == maxStep - 1) { + return maxStep; + } + } + // Don't allow interventions of other events. E.g. in the case of A->B->D->C, it stops finding A->B->C at the D + // and the max event level is 2. + if (_modes.hasStrictOrder()) { + if (currentEventStep != maxStep) { + return maxStep; + } + } + // Apply conditions only to events with strictly increasing timestamps. + if (_modes.hasStrictIncrease()) { + if (previousTimestamp == event.getTimestamp()) { + continue; + } + } + if (maxStep == currentEventStep) { + maxStep++; + previousTimestamp = event.getTimestamp(); + } + if (maxStep == _numSteps) { + break; + } + } + return maxStep; + } + + @Override + public IntArrayList mergeFinalResult(IntArrayList finalResult1, IntArrayList finalResult2) { + // Return the longest 1s sequence from both results + for (int i = 0; i < _numSteps; i++) { + if (finalResult1.getInt(i) == 0) { + return finalResult2; + } + if (finalResult2.getInt(i) == 0) { + return finalResult1; + } + } + return finalResult1; + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelMaxStepAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelMaxStepAggregationFunction.java new file mode 100644 index 0000000000..73684ad460 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelMaxStepAggregationFunction.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.aggregation.function.funnel.window; + +import java.util.ArrayDeque; +import java.util.List; +import java.util.PriorityQueue; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.query.aggregation.function.funnel.FunnelStepEvent; +import org.apache.pinot.segment.spi.AggregationFunctionType; + + +public class FunnelMaxStepAggregationFunction extends FunnelBaseAggregationFunction<Integer> { + + public FunnelMaxStepAggregationFunction(List<ExpressionContext> arguments) { + super(arguments); + } + + @Override + public AggregationFunctionType getType() { + return AggregationFunctionType.FUNNELMAXSTEP; + } + + @Override + public DataSchema.ColumnDataType getFinalResultColumnType() { + return DataSchema.ColumnDataType.INT; + } + + @Override + public Integer extractFinalResult(PriorityQueue<FunnelStepEvent> stepEvents) { + int finalMaxStep = 0; + if (stepEvents == null || stepEvents.isEmpty()) { + return finalMaxStep; + } + ArrayDeque<FunnelStepEvent> slidingWindow = new ArrayDeque<>(); + while (!stepEvents.isEmpty()) { + fillWindow(stepEvents, slidingWindow); + if (slidingWindow.isEmpty()) { + break; + } + int maxSteps = processWindow(slidingWindow); + finalMaxStep = Math.max(finalMaxStep, maxSteps); + if (finalMaxStep == _numSteps) { + break; + } + if (!slidingWindow.isEmpty()) { + slidingWindow.pollFirst(); + } + } + return finalMaxStep; + } + + protected Integer processWindow(ArrayDeque<FunnelStepEvent> slidingWindow) { + int maxStep = 0; + long previousTimestamp = -1; + for (FunnelStepEvent event : slidingWindow) { + int currentEventStep = event.getStep(); + // If the same condition holds for the sequence of events, then such repeating event interrupts further + // processing. + if (_modes.hasStrictDeduplication()) { + if (currentEventStep == maxStep - 1) { + return maxStep; + } + } + // Don't allow interventions of other events. E.g. in the case of A->B->D->C, it stops finding A->B->C at the D + // and the max event level is 2. + if (_modes.hasStrictOrder()) { + if (currentEventStep != maxStep) { + return maxStep; + } + } + // Apply conditions only to events with strictly increasing timestamps. + if (_modes.hasStrictIncrease()) { + if (previousTimestamp == event.getTimestamp()) { + continue; + } + } + if (maxStep == currentEventStep) { + maxStep++; + previousTimestamp = event.getTimestamp(); + } + if (maxStep == _numSteps) { + break; + } + } + return maxStep; + } + + @Override + public Integer mergeFinalResult(Integer finalResult1, Integer finalResult2) { + return Math.max(finalResult1, finalResult2); + } +} diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java index 4b373d5fac..9ccf4b547e 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java @@ -51,13 +51,12 @@ public class WindowFunnelTest extends CustomDataQueryClusterIntegrationTest { setUseMultiStageQueryEngine(useMultiStageQueryEngine); String query = String.format("SELECT " - + "funnelMaxStep(timestampCol, '1000', " - + "ARRAY[ " + + "funnelMaxStep(timestampCol, '1000', 4, " + "url = '/product/search', " + "url = '/cart/add', " + "url = '/checkout/start', " + "url = '/checkout/confirmation' " - + "] ) " + + ") " + "FROM %s LIMIT %d", getTableName(), getCountStarResult()); JsonNode jsonNode = postQuery(query); JsonNode rows = jsonNode.get("resultTable").get("rows"); @@ -73,13 +72,12 @@ public class WindowFunnelTest extends CustomDataQueryClusterIntegrationTest { setUseMultiStageQueryEngine(useMultiStageQueryEngine); String query = String.format("SELECT " - + "userId, funnelMaxStep(timestampCol, '1000', " - + "ARRAY[ " + + "userId, funnelMaxStep(timestampCol, '1000', 4, " + "url = '/product/search', " + "url = '/cart/add', " + "url = '/checkout/start', " + "url = '/checkout/confirmation' " - + "] ) " + + ") " + "FROM %s GROUP BY userId ORDER BY userId LIMIT %d", getTableName(), getCountStarResult()); JsonNode jsonNode = postQuery(query); JsonNode rows = jsonNode.get("resultTable").get("rows"); @@ -113,13 +111,12 @@ public class WindowFunnelTest extends CustomDataQueryClusterIntegrationTest { setUseMultiStageQueryEngine(useMultiStageQueryEngine); String query = String.format("SELECT " - + "userId, funnelMaxStep(timestampCol, '1000', " - + "ARRAY[ " + + "userId, funnelMaxStep(timestampCol, '1000', 4, " + "url = '/product/search', " + "url = '/cart/add', " + "url = '/checkout/start', " - + "url = '/checkout/confirmation' " - + "], 'strict_order' ) " + + "url = '/checkout/confirmation', " + + "'strict_order' ) " + "FROM %s GROUP BY userId ORDER BY userId LIMIT %d", getTableName(), getCountStarResult()); JsonNode jsonNode = postQuery(query); JsonNode rows = jsonNode.get("resultTable").get("rows"); @@ -148,13 +145,12 @@ public class WindowFunnelTest extends CustomDataQueryClusterIntegrationTest { query = String.format("SELECT " - + "userId, funnelMaxStep(timestampCol, '1000', " - + "ARRAY[ " + + "userId, funnelMaxStep(timestampCol, '1000', 4, " + "url = '/product/search', " + "url = '/cart/add', " + "url = '/checkout/start', " - + "url = '/checkout/confirmation' " - + "], 'strict_deduplication' ) " + + "url = '/checkout/confirmation', " + + "'strict_deduplication' ) " + "FROM %s GROUP BY userId ORDER BY userId LIMIT %d", getTableName(), getCountStarResult()); jsonNode = postQuery(query); rows = jsonNode.get("resultTable").get("rows"); @@ -183,13 +179,12 @@ public class WindowFunnelTest extends CustomDataQueryClusterIntegrationTest { query = String.format("SELECT " - + "userId, funnelMaxStep(timestampCol, '1000', " - + "ARRAY[ " + + "userId, funnelMaxStep(timestampCol, '1000', 4, " + "url = '/product/search', " + "url = '/cart/add', " + "url = '/checkout/start', " - + "url = '/checkout/confirmation' " - + "], 'strict_increase' ) " + + "url = '/checkout/confirmation', " + + "'strict_increase' ) " + "FROM %s GROUP BY userId ORDER BY userId LIMIT %d", getTableName(), getCountStarResult()); jsonNode = postQuery(query); rows = jsonNode.get("resultTable").get("rows"); @@ -217,6 +212,434 @@ public class WindowFunnelTest extends CustomDataQueryClusterIntegrationTest { } } + + @Test(dataProvider = "useBothQueryEngines") + public void testFunnelMatchStepGroupByQueriesWithMode(boolean useMultiStageQueryEngine) + throws Exception { + setUseMultiStageQueryEngine(useMultiStageQueryEngine); + String query = + String.format("SELECT " + + "userId, funnelMatchStep(timestampCol, '1000', 4, " + + "url = '/product/search', " + + "url = '/cart/add', " + + "url = '/checkout/start', " + + "url = '/checkout/confirmation', " + + "'strict_order' ) " + + "FROM %s GROUP BY userId ORDER BY userId LIMIT %d", getTableName(), getCountStarResult()); + JsonNode jsonNode = postQuery(query); + JsonNode rows = jsonNode.get("resultTable").get("rows"); + assertEquals(rows.size(), 40); + for (int i = 0; i < 40; i++) { + JsonNode row = rows.get(i); + assertEquals(row.size(), 2); + assertEquals(row.get(0).textValue(), "user" + (i / 10) + (i % 10)); + int sumSteps = 0; + for (JsonNode step : row.get(1)) { + sumSteps += step.intValue(); + } + switch (i / 10) { + case 0: + assertEquals(sumSteps, 3); + break; + case 1: + assertEquals(sumSteps, 3); + break; + case 2: + assertEquals(sumSteps, 2); + break; + case 3: + assertEquals(sumSteps, 1); + break; + default: + throw new IllegalStateException(); + } + } + + query = + String.format("SELECT " + + "userId, funnelMatchStep(timestampCol, '1000', 4, " + + "url = '/product/search', " + + "url = '/cart/add', " + + "url = '/checkout/start', " + + "url = '/checkout/confirmation', " + + "'strict_deduplication' ) " + + "FROM %s GROUP BY userId ORDER BY userId LIMIT %d", getTableName(), getCountStarResult()); + jsonNode = postQuery(query); + rows = jsonNode.get("resultTable").get("rows"); + assertEquals(rows.size(), 40); + for (int i = 0; i < 40; i++) { + JsonNode row = rows.get(i); + assertEquals(row.size(), 2); + assertEquals(row.get(0).textValue(), "user" + (i / 10) + (i % 10)); + int sumSteps = 0; + for (JsonNode step : row.get(1)) { + sumSteps += step.intValue(); + } + switch (i / 10) { + case 0: + assertEquals(sumSteps, 4); + break; + case 1: + assertEquals(sumSteps, 3); + break; + case 2: + assertEquals(sumSteps, 2); + break; + case 3: + assertEquals(sumSteps, 1); + break; + default: + throw new IllegalStateException(); + } + } + + query = + String.format("SELECT " + + "userId, funnelMatchStep(timestampCol, '1000', 4, " + + "url = '/product/search', " + + "url = '/cart/add', " + + "url = '/checkout/start', " + + "url = '/checkout/confirmation', " + + "'strict_increase' ) " + + "FROM %s GROUP BY userId ORDER BY userId LIMIT %d", getTableName(), getCountStarResult()); + jsonNode = postQuery(query); + rows = jsonNode.get("resultTable").get("rows"); + assertEquals(rows.size(), 40); + for (int i = 0; i < 40; i++) { + JsonNode row = rows.get(i); + assertEquals(row.size(), 2); + assertEquals(row.get(0).textValue(), "user" + (i / 10) + (i % 10)); + int sumSteps = 0; + for (JsonNode step : row.get(1)) { + sumSteps += step.intValue(); + } + switch (i / 10) { + case 0: + assertEquals(sumSteps, 4); + break; + case 1: + assertEquals(sumSteps, 2); + break; + case 2: + assertEquals(sumSteps, 3); + break; + case 3: + assertEquals(sumSteps, 1); + break; + default: + throw new IllegalStateException(); + } + } + } + + + @Test(dataProvider = "useBothQueryEngines") + public void testFunnelMatchStepGroupByQueriesWithModeSkipLeaf(boolean useMultiStageQueryEngine) + throws Exception { + setUseMultiStageQueryEngine(useMultiStageQueryEngine); + String query = + String.format("SELECT /*+ aggOptions(is_skip_leaf_stage_group_by='true') */ " + + "userId, funnelMatchStep(timestampCol, '1000', 4, " + + "url = '/product/search', " + + "url = '/cart/add', " + + "url = '/checkout/start', " + + "url = '/checkout/confirmation', " + + "'strict_order' ) " + + "FROM %s GROUP BY userId ORDER BY userId LIMIT %d", getTableName(), getCountStarResult()); + JsonNode jsonNode = postQuery(query); + JsonNode rows = jsonNode.get("resultTable").get("rows"); + assertEquals(rows.size(), 40); + for (int i = 0; i < 40; i++) { + JsonNode row = rows.get(i); + assertEquals(row.size(), 2); + assertEquals(row.get(0).textValue(), "user" + (i / 10) + (i % 10)); + int sumSteps = 0; + for (JsonNode step : row.get(1)) { + sumSteps += step.intValue(); + } + switch (i / 10) { + case 0: + assertEquals(sumSteps, 3); + break; + case 1: + assertEquals(sumSteps, 3); + break; + case 2: + assertEquals(sumSteps, 2); + break; + case 3: + assertEquals(sumSteps, 1); + break; + default: + throw new IllegalStateException(); + } + } + + query = + String.format("SELECT /*+ aggOptions(is_skip_leaf_stage_group_by='true') */ " + + "userId, funnelMatchStep(timestampCol, '1000', 4, " + + "url = '/product/search', " + + "url = '/cart/add', " + + "url = '/checkout/start', " + + "url = '/checkout/confirmation', " + + "'strict_deduplication' ) " + + "FROM %s GROUP BY userId ORDER BY userId LIMIT %d", getTableName(), getCountStarResult()); + jsonNode = postQuery(query); + rows = jsonNode.get("resultTable").get("rows"); + assertEquals(rows.size(), 40); + for (int i = 0; i < 40; i++) { + JsonNode row = rows.get(i); + assertEquals(row.size(), 2); + assertEquals(row.get(0).textValue(), "user" + (i / 10) + (i % 10)); + int sumSteps = 0; + for (JsonNode step : row.get(1)) { + sumSteps += step.intValue(); + } + switch (i / 10) { + case 0: + assertEquals(sumSteps, 4); + break; + case 1: + assertEquals(sumSteps, 3); + break; + case 2: + assertEquals(sumSteps, 2); + break; + case 3: + assertEquals(sumSteps, 1); + break; + default: + throw new IllegalStateException(); + } + } + + query = + String.format("SELECT " + + "userId, funnelMatchStep(timestampCol, '1000', 4, " + + "url = '/product/search', " + + "url = '/cart/add', " + + "url = '/checkout/start', " + + "url = '/checkout/confirmation', " + + "'strict_increase' ) " + + "FROM %s GROUP BY userId ORDER BY userId LIMIT %d", getTableName(), getCountStarResult()); + jsonNode = postQuery(query); + rows = jsonNode.get("resultTable").get("rows"); + assertEquals(rows.size(), 40); + for (int i = 0; i < 40; i++) { + JsonNode row = rows.get(i); + assertEquals(row.size(), 2); + assertEquals(row.get(0).textValue(), "user" + (i / 10) + (i % 10)); + int sumSteps = 0; + for (JsonNode step : row.get(1)) { + sumSteps += step.intValue(); + } + switch (i / 10) { + case 0: + assertEquals(sumSteps, 4); + break; + case 1: + assertEquals(sumSteps, 2); + break; + case 2: + assertEquals(sumSteps, 3); + break; + case 3: + assertEquals(sumSteps, 1); + break; + default: + throw new IllegalStateException(); + } + } + } + + + @Test(dataProvider = "useBothQueryEngines") + public void testFunnelMatchStepGroupByQueriesWithModeAndSum(boolean useMultiStageQueryEngine) + throws Exception { + setUseMultiStageQueryEngine(useMultiStageQueryEngine); + String query = + String.format("SELECT " + + "userId, arraySumInt(funnelMatchStep(timestampCol, '1000', 4, " + + "url = '/product/search', " + + "url = '/cart/add', " + + "url = '/checkout/start', " + + "url = '/checkout/confirmation', " + + "'strict_order' )) " + + "FROM %s GROUP BY userId ORDER BY userId LIMIT %d", getTableName(), getCountStarResult()); + JsonNode jsonNode = postQuery(query); + JsonNode rows = jsonNode.get("resultTable").get("rows"); + assertEquals(rows.size(), 40); + for (int i = 0; i < 40; i++) { + JsonNode row = rows.get(i); + assertEquals(row.size(), 2); + assertEquals(row.get(0).textValue(), "user" + (i / 10) + (i % 10)); + switch (i / 10) { + case 0: + assertEquals(row.get(1).intValue(), 3); + break; + case 1: + assertEquals(row.get(1).intValue(), 3); + break; + case 2: + assertEquals(row.get(1).intValue(), 2); + break; + case 3: + assertEquals(row.get(1).intValue(), 1); + break; + default: + throw new IllegalStateException(); + } + } + + query = + String.format("SELECT " + + "userId, arraySumInt(funnelMatchStep(timestampCol, '1000', 4, " + + "url = '/product/search', " + + "url = '/cart/add', " + + "url = '/checkout/start', " + + "url = '/checkout/confirmation', " + + "'strict_deduplication' )) " + + "FROM %s GROUP BY userId ORDER BY userId LIMIT %d", getTableName(), getCountStarResult()); + jsonNode = postQuery(query); + rows = jsonNode.get("resultTable").get("rows"); + assertEquals(rows.size(), 40); + for (int i = 0; i < 40; i++) { + JsonNode row = rows.get(i); + assertEquals(row.size(), 2); + assertEquals(row.get(0).textValue(), "user" + (i / 10) + (i % 10)); + switch (i / 10) { + case 0: + assertEquals(row.get(1).intValue(), 4); + break; + case 1: + assertEquals(row.get(1).intValue(), 3); + break; + case 2: + assertEquals(row.get(1).intValue(), 2); + break; + case 3: + assertEquals(row.get(1).intValue(), 1); + break; + default: + throw new IllegalStateException(); + } + } + + query = + String.format("SELECT " + + "userId, arraySumInt(funnelMatchStep(timestampCol, '1000', 4, " + + "url = '/product/search', " + + "url = '/cart/add', " + + "url = '/checkout/start', " + + "url = '/checkout/confirmation', " + + "'strict_increase' )) " + + "FROM %s GROUP BY userId ORDER BY userId LIMIT %d", getTableName(), getCountStarResult()); + jsonNode = postQuery(query); + rows = jsonNode.get("resultTable").get("rows"); + assertEquals(rows.size(), 40); + for (int i = 0; i < 40; i++) { + JsonNode row = rows.get(i); + assertEquals(row.size(), 2); + assertEquals(row.get(0).textValue(), "user" + (i / 10) + (i % 10)); + switch (i / 10) { + case 0: + assertEquals(row.get(1).intValue(), 4); + break; + case 1: + assertEquals(row.get(1).intValue(), 2); + break; + case 2: + assertEquals(row.get(1).intValue(), 3); + break; + case 3: + assertEquals(row.get(1).intValue(), 1); + break; + default: + throw new IllegalStateException(); + } + } + } + + + @Test(dataProvider = "useBothQueryEngines") + public void testFunnelCompleteCountGroupByQueries(boolean useMultiStageQueryEngine) + throws Exception { + setUseMultiStageQueryEngine(useMultiStageQueryEngine); + String query = + String.format("SELECT " + + "userId, funnelCompleteCount(timestampCol, '1000', 4, " + + "url = '/product/search', " + + "url = '/cart/add', " + + "url = '/checkout/start', " + + "url = '/checkout/confirmation' " + + ") " + + "FROM %s GROUP BY userId ORDER BY userId LIMIT %d", getTableName(), getCountStarResult()); + JsonNode jsonNode = postQuery(query); + JsonNode rows = jsonNode.get("resultTable").get("rows"); + assertEquals(rows.size(), 40); + for (int i = 0; i < 40; i++) { + JsonNode row = rows.get(i); + assertEquals(row.size(), 2); + assertEquals(row.get(0).textValue(), "user" + (i / 10) + (i % 10)); + switch (i / 10) { + case 0: + assertEquals(row.get(1).intValue(), 1); + break; + case 1: + assertEquals(row.get(1).intValue(), 0); + break; + case 2: + assertEquals(row.get(1).intValue(), 0); + break; + case 3: + assertEquals(row.get(1).intValue(), 0); + break; + default: + throw new IllegalStateException(); + } + } + } + + + @Test(dataProvider = "useV2QueryEngine") + public void testFunnelCompleteCountGroupByQueriesSkipLeaf(boolean useMultiStageQueryEngine) + throws Exception { + setUseMultiStageQueryEngine(useMultiStageQueryEngine); + String query = + String.format("SELECT /*+ aggOptions(is_skip_leaf_stage_group_by='true') */" + + "userId, funnelCompleteCount(timestampCol, '1000', 4, " + + "url = '/product/search', " + + "url = '/cart/add', " + + "url = '/checkout/start', " + + "url = '/checkout/confirmation' " + + ") " + + "FROM %s GROUP BY userId ORDER BY userId LIMIT %d", getTableName(), getCountStarResult()); + JsonNode jsonNode = postQuery(query); + JsonNode rows = jsonNode.get("resultTable").get("rows"); + assertEquals(rows.size(), 40); + for (int i = 0; i < 40; i++) { + JsonNode row = rows.get(i); + assertEquals(row.size(), 2); + assertEquals(row.get(0).textValue(), "user" + (i / 10) + (i % 10)); + switch (i / 10) { + case 0: + assertEquals(row.get(1).intValue(), 1); + break; + case 1: + assertEquals(row.get(1).intValue(), 0); + break; + case 2: + assertEquals(row.get(1).intValue(), 0); + break; + case 3: + assertEquals(row.get(1).intValue(), 0); + break; + default: + throw new IllegalStateException(); + } + } + } + @Override public String getTableName() { return DEFAULT_TABLE_NAME; diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java index 877ac7f232..62060254ac 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java @@ -26,8 +26,11 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; import javax.annotation.Nullable; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.sql.SqlFunctionCategory; import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlOperatorBinding; import org.apache.calcite.sql.type.OperandTypes; import org.apache.calcite.sql.type.ReturnTypes; import org.apache.calcite.sql.type.SqlOperandTypeChecker; @@ -329,6 +332,10 @@ public enum AggregationFunctionType { // funnel aggregate functions FUNNELMAXSTEP("funnelMaxStep", null, SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION, OperandTypes.VARIADIC, ReturnTypes.BIGINT, ReturnTypes.explicit(SqlTypeName.OTHER)), + FUNNELCOMPLETECOUNT("funnelCompleteCount", null, SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION, + OperandTypes.VARIADIC, ReturnTypes.BIGINT, ReturnTypes.explicit(SqlTypeName.OTHER)), + FUNNELMATCHSTEP("funnelMatchStep", null, SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION, + OperandTypes.VARIADIC, IntArrayReturnTypeInference.INSTANCE, ReturnTypes.explicit(SqlTypeName.OTHER)), // TODO: revisit support for funnel count in V2 FUNNELCOUNT("funnelCount"); @@ -503,4 +510,16 @@ public enum AggregationFunctionType { } } } + + static class IntArrayReturnTypeInference implements SqlReturnTypeInference { + static final IntArrayReturnTypeInference INSTANCE = new IntArrayReturnTypeInference(); + + @Override + public RelDataType inferReturnType( + SqlOperatorBinding opBinding) { + RelDataTypeFactory typeFactory = opBinding.getTypeFactory(); + RelDataType elementType = typeFactory.createSqlType(SqlTypeName.INTEGER); + return typeFactory.createArrayType(elementType, -1); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org