This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c341e06b14 Add mode  to allow adding dummy events for non-matching 
steps (#13382)
c341e06b14 is described below

commit c341e06b14dd6dd3e54f945eac025ad37e7e2e89
Author: Xiang Fu <xiangfu.1...@gmail.com>
AuthorDate: Sat Jun 15 10:30:18 2024 -0700

    Add mode  to allow adding dummy events for non-matching steps (#13382)
---
 .../window/FunnelBaseAggregationFunction.java      | 50 +++++++++++----
 .../integration/tests/custom/WindowFunnelTest.java | 72 ++++++++++++++++++++++
 2 files changed, 111 insertions(+), 11 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelBaseAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelBaseAggregationFunction.java
index 4df7a83a88..502a49cdbe 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelBaseAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelBaseAggregationFunction.java
@@ -105,12 +105,18 @@ public abstract class FunnelBaseAggregationFunction<F 
extends Comparable>
       aggregationResultHolder.setValue(stepEvents);
     }
     for (int i = 0; i < length; i++) {
+      boolean stepFound = false;
       for (int j = 0; j < _numSteps; j++) {
         if (stepBlocks.get(j)[i] == 1) {
           stepEvents.add(new FunnelStepEvent(timestampBlock[i], j));
+          stepFound = true;
           break;
         }
       }
+      // If the mode is KEEP_ALL and no step is found, add a dummy step event 
with step -1
+      if (_modes.hasKeepAll() && !stepFound) {
+        stepEvents.add(new FunnelStepEvent(timestampBlock[i], -1));
+      }
     }
   }
 
@@ -124,17 +130,20 @@ public abstract class FunnelBaseAggregationFunction<F 
extends Comparable>
     }
     for (int i = 0; i < length; i++) {
       int groupKey = groupKeyArray[i];
+      boolean stepFound = false;
       for (int j = 0; j < _numSteps; j++) {
         if (stepBlocks.get(j)[i] == 1) {
-          PriorityQueue<FunnelStepEvent> stepEvents = 
groupByResultHolder.getResult(groupKey);
-          if (stepEvents == null) {
-            stepEvents = new PriorityQueue<>();
-            groupByResultHolder.setValueForKey(groupKey, stepEvents);
-          }
+          PriorityQueue<FunnelStepEvent> stepEvents = 
getFunnelStepEvents(groupByResultHolder, groupKey);
           stepEvents.add(new FunnelStepEvent(timestampBlock[i], j));
+          stepFound = true;
           break;
         }
       }
+      // If the mode is KEEP_ALL and no step is found, add a dummy step event 
with step -1
+      if (_modes.hasKeepAll() && !stepFound) {
+        PriorityQueue<FunnelStepEvent> stepEvents = 
getFunnelStepEvents(groupByResultHolder, groupKey);
+        stepEvents.add(new FunnelStepEvent(timestampBlock[i], -1));
+      }
     }
   }
 
@@ -148,20 +157,35 @@ public abstract class FunnelBaseAggregationFunction<F 
extends Comparable>
     }
     for (int i = 0; i < length; i++) {
       int[] groupKeys = groupKeysArray[i];
+      boolean stepFound = false;
       for (int j = 0; j < _numSteps; j++) {
         if (stepBlocks.get(j)[i] == 1) {
           for (int groupKey : groupKeys) {
-            PriorityQueue<FunnelStepEvent> stepEvents = 
groupByResultHolder.getResult(groupKey);
-            if (stepEvents == null) {
-              stepEvents = new PriorityQueue<>();
-              groupByResultHolder.setValueForKey(groupKey, stepEvents);
-            }
+            PriorityQueue<FunnelStepEvent> stepEvents = 
getFunnelStepEvents(groupByResultHolder, groupKey);
             stepEvents.add(new FunnelStepEvent(timestampBlock[i], j));
           }
+          stepFound = true;
           break;
         }
       }
+      // If the mode is KEEP_ALL and no step is found, add a dummy step event 
with step -1
+      if (_modes.hasKeepAll() && !stepFound) {
+        for (int groupKey : groupKeys) {
+          PriorityQueue<FunnelStepEvent> stepEvents = 
getFunnelStepEvents(groupByResultHolder, groupKey);
+          stepEvents.add(new FunnelStepEvent(timestampBlock[i], -1));
+        }
+      }
+    }
+  }
+
+  private static PriorityQueue<FunnelStepEvent> 
getFunnelStepEvents(GroupByResultHolder groupByResultHolder,
+      int groupKey) {
+    PriorityQueue<FunnelStepEvent> stepEvents = 
groupByResultHolder.getResult(groupKey);
+    if (stepEvents == null) {
+      stepEvents = new PriorityQueue<>();
+      groupByResultHolder.setValueForKey(groupKey, stepEvents);
     }
+    return stepEvents;
   }
 
   @Override
@@ -233,7 +257,7 @@ public abstract class FunnelBaseAggregationFunction<F 
extends Comparable>
   }
 
   protected enum Mode {
-    STRICT_DEDUPLICATION(1), STRICT_ORDER(2), STRICT_INCREASE(4);
+    STRICT_DEDUPLICATION(1), STRICT_ORDER(2), STRICT_INCREASE(4), KEEP_ALL(8);
 
     private final int _value;
 
@@ -272,5 +296,9 @@ public abstract class FunnelBaseAggregationFunction<F 
extends Comparable>
     public boolean hasStrictIncrease() {
       return contains(Mode.STRICT_INCREASE);
     }
+
+    public boolean hasKeepAll() {
+      return contains(Mode.KEEP_ALL);
+    }
   }
 }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java
index 0ce6b80b61..e41f235116 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java
@@ -212,6 +212,78 @@ public class WindowFunnelTest extends 
CustomDataQueryClusterIntegrationTest {
     }
   }
 
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testFunnelMaxStepGroupByQueriesWithModeKeepAll(boolean 
useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String query =
+        String.format("SELECT "
+            + "userId, funnelMaxStep(timestampCol, '1000', 3, "
+            + "url = '/product/search', "
+            + "url = '/checkout/start', "
+            + "url = '/checkout/confirmation', "
+            + "'strict_order', 'keep_all' ) "
+            + "FROM %s GROUP BY userId ORDER BY userId LIMIT %d", 
getTableName(), getCountStarResult());
+    JsonNode jsonNode = postQuery(query);
+    JsonNode rows = jsonNode.get("resultTable").get("rows");
+    assertEquals(rows.size(), 40);
+    for (int i = 0; i < 40; i++) {
+      JsonNode row = rows.get(i);
+      assertEquals(row.size(), 2);
+      assertEquals(row.get(0).textValue(), "user" + (i / 10) + (i % 10));
+      switch (i / 10) {
+        case 0:
+          assertEquals(row.get(1).intValue(), 1);
+          break;
+        case 1:
+          assertEquals(row.get(1).intValue(), 1);
+          break;
+        case 2:
+          assertEquals(row.get(1).intValue(), 1);
+          break;
+        case 3:
+          assertEquals(row.get(1).intValue(), 1);
+          break;
+        default:
+          throw new IllegalStateException();
+      }
+    }
+
+    query =
+        String.format("SELECT "
+            + "userId, funnelMaxStep(timestampCol, '1000', 3, "
+            + "url = '/product/search', "
+            + "url = '/checkout/start', "
+            + "url = '/checkout/confirmation', "
+            + "'strict_order' ) "
+            + "FROM %s GROUP BY userId ORDER BY userId LIMIT %d", 
getTableName(), getCountStarResult());
+    jsonNode = postQuery(query);
+    rows = jsonNode.get("resultTable").get("rows");
+    assertEquals(rows.size(), 40);
+    for (int i = 0; i < 40; i++) {
+      JsonNode row = rows.get(i);
+      assertEquals(row.size(), 2);
+      assertEquals(row.get(0).textValue(), "user" + (i / 10) + (i % 10));
+      switch (i / 10) {
+        case 0:
+          assertEquals(row.get(1).intValue(), 3);
+          break;
+        case 1:
+          assertEquals(row.get(1).intValue(), 2);
+          break;
+        case 2:
+          assertEquals(row.get(1).intValue(), 2);
+          break;
+        case 3:
+          assertEquals(row.get(1).intValue(), 1);
+          break;
+        default:
+          throw new IllegalStateException();
+      }
+    }
+  }
+
   @Test(dataProvider = "useBothQueryEngines")
   public void testFunnelMatchStepGroupByQueriesWithMode(boolean 
useMultiStageQueryEngine)
       throws Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to