atris commented on code in PR #11092:
URL: https://github.com/apache/pinot/pull/11092#discussion_r1272617169


##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FunnelCountAggregationFunction.java:
##########
@@ -35,46 +50,107 @@
 import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.PeekableIntIterator;
 import org.roaringbitmap.RoaringBitmap;
 
 
 /**
- * The {@code FunnelCountAggregationFunction} calculates the number of step 
conversions for a given partition column and
- * a list of boolean expressions.
- * <p>IMPORTANT: This function relies on the partition column being 
partitioned for each segment, where there are no
- * common values across different segments.
- * <p>This function calculates the exact number of step matches per partition 
key within the segment, then sums up the
- * results from different segments.
+ * The {@code FunnelCountAggregationFunction} calculates the number of 
conversions for a given correlation column and
+ * a list of steps as boolean expressions.
  *
  * Example:
  *   SELECT
  *    dateTrunc('day', timestamp) AS ts,
  *    FUNNEL_COUNT(
  *      STEPS(url = '/addToCart', url = '/checkout', url = 
'/orderConfirmation'),
- *      CORRELATED_BY(user)
+ *      CORRELATED_BY(user_id)
  *    ) as step_counts
  *    FROM user_log
  *    WHERE url in ('/addToCart', '/checkout', '/orderConfirmation')
  *    GROUP BY 1
+ *
+ *  Counting strategies can be controlled via optional SETTINGS options, for 
example:
+ *
+ *  FUNNEL_COUNT(
+ *    STEPS(url = '/addToCart', url = '/checkout', url = '/orderConfirmation'),
+ *    CORRELATED_BY(user_id),
+ *    SETTINGS('theta_sketch','nominalEntries=4096')
+ *  )
+ *
+ *  There are 5 strategies available, mirroring the corresponding distinct 
count implementations as per below.
+ *  <p><ul>
+ *  <li>'set': See DISTINCTCOUNT at {@link DistinctCountAggregationFunction}
+ *  <li>'bitmap' (default): See DISTINCTCOUNTBITMAP at {@link 
DistinctCountBitmapAggregationFunction}
+ *  <li>'theta_sketch': See DISTINCTCOUNTTHETASKETCH at {@link 
DistinctCountThetaSketchAggregationFunction}
+ *  <li>'partitioned': See SEGMENTPARTITIONEDDISTINCTCOUNT {@link 
SegmentPartitionedDistinctCountAggregationFunction}
+ *  <li>'sorted': sorted counts per segment then sums up. Only availabe in 
combination with 'partitioned'.
+ *  <li>'nominalEntries=4096': theta sketch configuration, default is 4096.
+ *  </ul><p>
  */
-public class FunnelCountAggregationFunction implements 
AggregationFunction<List<Long>, LongArrayList> {
+public class FunnelCountAggregationFunction implements 
AggregationFunction<Object, LongArrayList> {

Review Comment:
   We are losing some type specification here by moving to Object. Is it 
possible to be creating an abstract type specific to our functions, and use it 
here>



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FunnelCountAggregationFunction.java:
##########
@@ -198,27 +278,58 @@ private int[][] getSteps(Map<ExpressionContext, 
BlockValSet> blockValSetMap) {
   }
 
   private boolean isSorted(Map<ExpressionContext, BlockValSet> blockValSetMap) 
{
-    final Dictionary primaryCorrelationDictionary = 
blockValSetMap.get(_primaryCorrelationCol).getDictionary();
-    if (primaryCorrelationDictionary == null) {
-      throw new IllegalArgumentException(
-          "CORRELATE_BY column in FUNNELCOUNT aggregation function not 
supported, please use a dictionary encoded "
-              + "column.");
+    return getDictionary(blockValSetMap).isSorted();
+  }
+
+  private AggregationStrategy getAggregationStrategy(Map<ExpressionContext, 
BlockValSet> blockValSetMap) {
+    if (_partitionSetting && _sortingSetting && isSorted(blockValSetMap)) {
+      return _sortedAggregationStrategy;
+    }
+    if (_thetaSketchSetting) {
+      return _thetaSketchAggregationStrategy;
     }
-    return primaryCorrelationDictionary.isSorted();
+    // default
+    return _bitmapAggregationStrategy;
   }
 
-  private SegmentAggregationStrategy<?, List<Long>> 
getAggregationStrategyByBlockValSetMap(
-      Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    return isSorted(blockValSetMap) ? _sortedAggregationStrategy : 
_bitmapAggregationStrategy;
+  private ResultExtractionStrategy getResultExtractionStrategy(Object 
aggResult) {
+    if (_partitionSetting) {
+        if (_sortingSetting && aggResult instanceof SortedAggregationResult) {
+          return _sortedPartitionedResultExtractionStrategy;
+        }
+        if (_thetaSketchSetting) {
+          return _thetaSketchPartitionedResultExtractionStrategy;
+        }
+        return _bitmapPartitionedResultExtractionStrategy;
+    }
+    if (_thetaSketchSetting) {
+      return _thetaSketchResultExtractionStrategy;

Review Comment:
   This IMO looks a bit scary. Are the existing tests exercising the code?



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FunnelCountAggregationFunction.java:
##########
@@ -109,47 +185,52 @@ public GroupByResultHolder createGroupByResultHolder(int 
initialCapacity, int ma
   @Override
   public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    getAggregationStrategyByBlockValSetMap(blockValSetMap).aggregate(length, 
aggregationResultHolder, blockValSetMap);
+    getAggregationStrategy(blockValSetMap)
+        .aggregate(length, aggregationResultHolder, blockValSetMap);
   }
 
   @Override
   public void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    
getAggregationStrategyByBlockValSetMap(blockValSetMap).aggregateGroupBySV(length,
 groupKeyArray,
-        groupByResultHolder, blockValSetMap);
+    getAggregationStrategy(blockValSetMap)
+        .aggregateGroupBySV(length, groupKeyArray, groupByResultHolder, 
blockValSetMap);
   }
 
   @Override
   public void aggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    
getAggregationStrategyByBlockValSetMap(blockValSetMap).aggregateGroupByMV(length,
 groupKeysArray,
-        groupByResultHolder, blockValSetMap);
+    getAggregationStrategy(blockValSetMap)
+        .aggregateGroupByMV(length, groupKeysArray, groupByResultHolder, 
blockValSetMap);
   }
 
   @Override
-  public List<Long> extractAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
-    return 
getAggregationStrategyByAggregationResult(aggregationResultHolder.getResult()).extractAggregationResult(
-        aggregationResultHolder);
+  public Object extractAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
+    return getResultExtractionStrategy(aggregationResultHolder.getResult())
+        .extractAggregationResult(aggregationResultHolder);
   }
-
   @Override
-  public List<Long> extractGroupByResult(GroupByResultHolder 
groupByResultHolder, int groupKey) {
-    return 
getAggregationStrategyByAggregationResult(groupByResultHolder.getResult(groupKey)).extractGroupByResult(
-        groupByResultHolder, groupKey);
+  public Object extractGroupByResult(GroupByResultHolder groupByResultHolder, 
int groupKey) {
+    return getResultExtractionStrategy(groupByResultHolder.getResult(groupKey))
+        .extractGroupByResult(groupByResultHolder, groupKey);
   }
 
   @Override
-  public List<Long> merge(List<Long> a, List<Long> b) {
-    int length = a.size();
-    Preconditions.checkState(length == b.size(), "The two operand arrays are 
not of the same size! provided %s, %s",
-        length, b.size());
+  public Object merge(Object a, Object b) {

Review Comment:
   Can this be an abstract type, known to this class and MergeStrategy?



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FunnelCountAggregationFunction.java:
##########
@@ -180,8 +256,12 @@ public String toExplainString() {
     return stringBuilder.append(')').toString();
   }
 
-  private static LongArrayList toLongArrayList(List<Long> longList) {
-    return longList instanceof LongArrayList ? ((LongArrayList) 
longList).clone() : new LongArrayList(longList);
+  private Dictionary getDictionary(Map<ExpressionContext, BlockValSet> 
blockValSetMap) {
+    final Dictionary primaryCorrelationDictionary = 
blockValSetMap.get(_primaryCorrelationCol).getDictionary();
+    Preconditions.checkArgument(primaryCorrelationDictionary != null,
+        "CORRELATE_BY column in FUNNELCOUNT aggregation function not 
supported, please use a dictionary encoded "

Review Comment:
   Is this a temporary limitation?



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FunnelCountAggregationFunction.java:
##########
@@ -35,46 +50,107 @@
 import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.PeekableIntIterator;
 import org.roaringbitmap.RoaringBitmap;
 
 
 /**
- * The {@code FunnelCountAggregationFunction} calculates the number of step 
conversions for a given partition column and
- * a list of boolean expressions.
- * <p>IMPORTANT: This function relies on the partition column being 
partitioned for each segment, where there are no
- * common values across different segments.
- * <p>This function calculates the exact number of step matches per partition 
key within the segment, then sums up the
- * results from different segments.
+ * The {@code FunnelCountAggregationFunction} calculates the number of 
conversions for a given correlation column and
+ * a list of steps as boolean expressions.
  *
  * Example:
  *   SELECT
  *    dateTrunc('day', timestamp) AS ts,
  *    FUNNEL_COUNT(
  *      STEPS(url = '/addToCart', url = '/checkout', url = 
'/orderConfirmation'),
- *      CORRELATED_BY(user)
+ *      CORRELATED_BY(user_id)
  *    ) as step_counts
  *    FROM user_log
  *    WHERE url in ('/addToCart', '/checkout', '/orderConfirmation')
  *    GROUP BY 1
+ *
+ *  Counting strategies can be controlled via optional SETTINGS options, for 
example:
+ *
+ *  FUNNEL_COUNT(
+ *    STEPS(url = '/addToCart', url = '/checkout', url = '/orderConfirmation'),
+ *    CORRELATED_BY(user_id),
+ *    SETTINGS('theta_sketch','nominalEntries=4096')
+ *  )
+ *
+ *  There are 5 strategies available, mirroring the corresponding distinct 
count implementations as per below.
+ *  <p><ul>
+ *  <li>'set': See DISTINCTCOUNT at {@link DistinctCountAggregationFunction}
+ *  <li>'bitmap' (default): See DISTINCTCOUNTBITMAP at {@link 
DistinctCountBitmapAggregationFunction}
+ *  <li>'theta_sketch': See DISTINCTCOUNTTHETASKETCH at {@link 
DistinctCountThetaSketchAggregationFunction}
+ *  <li>'partitioned': See SEGMENTPARTITIONEDDISTINCTCOUNT {@link 
SegmentPartitionedDistinctCountAggregationFunction}
+ *  <li>'sorted': sorted counts per segment then sums up. Only availabe in 
combination with 'partitioned'.
+ *  <li>'nominalEntries=4096': theta sketch configuration, default is 4096.
+ *  </ul><p>
  */
-public class FunnelCountAggregationFunction implements 
AggregationFunction<List<Long>, LongArrayList> {
+public class FunnelCountAggregationFunction implements 
AggregationFunction<Object, LongArrayList> {
+  private static final Sketch EMPTY_SKETCH = new 
UpdateSketchBuilder().build().compact();
   final List<ExpressionContext> _expressions;
   final List<ExpressionContext> _stepExpressions;
   final List<ExpressionContext> _correlateByExpressions;
   final ExpressionContext _primaryCorrelationCol;
   final int _numSteps;
-
-  final SegmentAggregationStrategy<?, List<Long>> _sortedAggregationStrategy;
-  final SegmentAggregationStrategy<?, List<Long>> _bitmapAggregationStrategy;
+  final int _nominalEntries;
+  final boolean _partitionSetting;
+  final boolean _sortingSetting;
+  final boolean _thetaSketchSetting;
+  final boolean _setSetting;
+
+  final AggregationStrategy _thetaSketchAggregationStrategy;
+  final AggregationStrategy _bitmapAggregationStrategy;
+  final AggregationStrategy _sortedAggregationStrategy;
+
+  final MergeStrategy _thetaSketchMergeStrategy;

Review Comment:
   Nit: Can this be moved to a child class for better readability?



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FunnelCountAggregationFunction.java:
##########
@@ -239,48 +350,112 @@ Optional<ExpressionContext> find(List<ExpressionContext> 
expressions) {
 
     public List<ExpressionContext> getInputExpressions(List<ExpressionContext> 
expressions) {
       return this.find(expressions).map(exp -> 
exp.getFunction().getArguments())
-          .orElseThrow(() -> new IllegalStateException("FUNNELCOUNT requires " 
+ _name));
+          .orElseThrow(() -> new IllegalArgumentException("FUNNELCOUNT 
requires " + _name));
     }
 
     public ExpressionContext getFirstInputExpression(List<ExpressionContext> 
expressions) {
       return this.find(expressions)
           .flatMap(exp -> 
exp.getFunction().getArguments().stream().findFirst())
-          .orElseThrow(() -> new IllegalStateException("FUNNELCOUNT: " + _name 
+ " requires an argument."));
+          .orElseThrow(() -> new IllegalArgumentException("FUNNELCOUNT: " + 
_name + " requires an argument."));
+    }
+
+    public List<String> getLiterals(List<ExpressionContext> expressions) {
+      List<ExpressionContext> inputExpressions = find(expressions).map(exp -> 
exp.getFunction().getArguments())
+          .orElseGet(Collections::emptyList);
+      Preconditions.checkArgument(
+          inputExpressions.stream().allMatch(exp -> exp.getType() == 
ExpressionContext.Type.LITERAL),
+          "FUNNELCOUNT: " + _name + " parameters must be literals");
+      return inputExpressions.stream().map(exp -> 
exp.getLiteral().getStringValue()).collect(Collectors.toList());
+    }
+
+    public static void validate(List<ExpressionContext> expressions) {
+      final List<String> invalidOptions = expressions.stream()
+          .filter(expression -> 
!Arrays.stream(Option.values()).anyMatch(option -> option.matches(expression)))
+          .map(ExpressionContext::toString)
+          .collect(Collectors.toList());
+
+      if (!invalidOptions.isEmpty()) {
+        throw new IllegalArgumentException("Invalid FUNNELCOUNT options: " + 
String.join(", ", invalidOptions));
+      }
+    }
+  }
+
+  enum Setting {
+    SET("set"),
+    BITMAP("bitmap"),

Review Comment:
   We could use this upstream to also store the MergeStrategy instance instead 
of having all present in the class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to