This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d211d89985 Adding tuple sketch scalar functions (#11517)
d211d89985 is described below

commit d211d899855881704931d8c83cfbbc152bb8f16f
Author: Xiang Fu <[email protected]>
AuthorDate: Sun Sep 10 04:24:26 2023 -0700

    Adding tuple sketch scalar functions (#11517)
    
    Adding more sketch integration test
---
 .../core/function/scalar/SketchFunctions.java      |  92 ++++++++++
 .../integration/tests/custom/ThetaSketchTest.java  |  67 +++++++
 .../integration/tests/custom/TupleSketchTest.java  | 198 +++++++++++++++++++++
 3 files changed, 357 insertions(+)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java
index 37244eff70..8b0b72d1d1 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java
@@ -32,6 +32,7 @@ import org.apache.datasketches.theta.Union;
 import org.apache.datasketches.theta.UpdateSketch;
 import org.apache.datasketches.tuple.aninteger.IntegerSketch;
 import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.spi.annotations.ScalarFunction;
 import org.apache.pinot.spi.utils.CommonConstants;
@@ -277,4 +278,95 @@ public class SketchFunctions {
           + sketchObj.getClass());
     }
   }
+
+  @ScalarFunction(names = {"intSumTupleSketchUnion", 
"int_sum_tuple_sketch_union"})
+  public static byte[] intSumTupleSketchUnion(Object o1, Object o2) {
+    return intSumTupleSketchUnion((int) Math.pow(2, 
CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK), o1, o2);
+  }
+
+  @ScalarFunction(names = {"intSumTupleSketchUnion", 
"int_sum_tuple_sketch_union"})
+  public static byte[] intSumTupleSketchUnion(int nomEntries, Object o1, 
Object o2) {
+    return intTupleSketchUnionVar(IntegerSummary.Mode.Sum, nomEntries, o1, o2);
+  }
+
+  @ScalarFunction(names = {"intMinTupleSketchUnion", 
"int_min_tuple_sketch_union"})
+  public static byte[] intMinTupleSketchUnion(Object o1, Object o2) {
+    return intMinTupleSketchUnion((int) Math.pow(2, 
CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK), o1, o2);
+  }
+
+  @ScalarFunction(names = {"intMinTupleSketchUnion", 
"int_min_tuple_sketch_union"})
+  public static byte[] intMinTupleSketchUnion(int nomEntries, Object o1, 
Object o2) {
+    return intTupleSketchUnionVar(IntegerSummary.Mode.Min, nomEntries, o1, o2);
+  }
+
+  @ScalarFunction(names = {"intMaxTupleSketchUnion", 
"int_max_tuple_sketch_union"})
+  public static byte[] intMaxTupleSketchUnion(Object o1, Object o2) {
+    return intMaxTupleSketchUnion((int) Math.pow(2, 
CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK), o1, o2);
+  }
+
+  @ScalarFunction(names = {"intMaxTupleSketchUnion", 
"int_max_tuple_sketch_union"})
+  public static byte[] intMaxTupleSketchUnion(int nomEntries, Object o1, 
Object o2) {
+    return intTupleSketchUnionVar(IntegerSummary.Mode.Max, nomEntries, o1, o2);
+  }
+
+  private static byte[] intTupleSketchUnionVar(IntegerSummary.Mode mode, int 
nomEntries, Object... sketchObjects) {
+    org.apache.datasketches.tuple.Union<IntegerSummary>
+        union = new org.apache.datasketches.tuple.Union<>(nomEntries,
+        new IntegerSummarySetOperations(mode, mode));
+    for (Object sketchObj : sketchObjects) {
+      union.union(asIntegerSketch(sketchObj));
+    }
+    return 
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(union.getResult().compact());
+  }
+
+  @ScalarFunction(names = {"intSumTupleSketchIntersect", 
"int_sum_tuple_sketch_intersect"})
+  public static byte[] intSumTupleSketchIntersect(Object o1, Object o2) {
+    return intTupleSketchIntersectVar(IntegerSummary.Mode.Sum, o1, o2);
+  }
+
+  @ScalarFunction(names = {"intMinTupleSketchIntersect", 
"int_min_tuple_sketch_intersect"})
+  public static byte[] intMinTupleSketchIntersect(Object o1, Object o2) {
+    return intTupleSketchIntersectVar(IntegerSummary.Mode.Min, o1, o2);
+  }
+
+  @ScalarFunction(names = {"intMaxTupleSketchIntersect", 
"int_max_tuple_sketch_intersect"})
+  public static byte[] intMaxTupleSketchIntersect(Object o1, Object o2) {
+    return intTupleSketchIntersectVar(IntegerSummary.Mode.Max, o1, o2);
+  }
+
+  private static byte[] intTupleSketchIntersectVar(IntegerSummary.Mode mode, 
Object... sketchObjects) {
+    org.apache.datasketches.tuple.Intersection<IntegerSummary> intersection =
+        new org.apache.datasketches.tuple.Intersection<>(new 
IntegerSummarySetOperations(mode, mode));
+    for (Object sketchObj : sketchObjects) {
+      intersection.intersect(asIntegerSketch(sketchObj));
+    }
+    return 
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(intersection.getResult().compact());
+  }
+
+  @ScalarFunction(names = {"intTupleSketchDiff", "int_tuple_sketch_diff"})
+  public static byte[] intSumTupleSketchDiff(Object o1, Object o2) {
+    org.apache.datasketches.tuple.AnotB<IntegerSummary> diff = new 
org.apache.datasketches.tuple.AnotB<>();
+    diff.setA(asIntegerSketch(o1));
+    diff.notB(asIntegerSketch(o2));
+    return 
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(diff.getResult(false).compact());
+  }
+
+  private static org.apache.datasketches.tuple.Sketch<IntegerSummary> 
asIntegerSketch(Object sketchObj) {
+    if (sketchObj instanceof String) {
+      byte[] decoded = Base64.getDecoder().decode((String) sketchObj);
+      return 
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(decoded);
+    } else if (sketchObj instanceof org.apache.datasketches.tuple.Sketch) {
+      return (org.apache.datasketches.tuple.Sketch<IntegerSummary>) sketchObj;
+    } else if (sketchObj instanceof byte[]) {
+      return 
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize((byte[]) sketchObj);
+    } else {
+      throw new RuntimeException("Exception occurred getting reading Tuple 
Sketch, unsupported Object type: "
+          + sketchObj.getClass());
+    }
+  }
+
+  @ScalarFunction(names = {"getIntTupleSketchEstimate", 
"get_int_tuple_sketch_estimate"})
+  public static long getIntTupleSketchEstimate(Object o1) {
+    return Math.round(asIntegerSketch(o1).getEstimate());
+  }
 }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ThetaSketchTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ThetaSketchTest.java
index c6a96bca70..392ed8738c 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ThetaSketchTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ThetaSketchTest.java
@@ -450,6 +450,73 @@ public class ThetaSketchTest extends 
CustomDataQueryClusterIntegrationTest {
           ImmutableMap.of("Female", 50 + 60 + 70 + 110 + 120 + 130, "Male", 80 
+ 90 + 100 + 140 + 150 + 160);
       runAndAssert(query, expected);
     }
+
+    // union all by gender
+    {
+      String query = "select dimValue, 
distinctCountThetaSketch(thetaSketchCol) from "
+          + "( "
+          + "SELECT dimValue, thetaSketchCol FROM " + getTableName()
+          + " where dimName = 'gender' and dimValue = 'Female' "
+          + "UNION ALL "
+          + "SELECT dimValue, thetaSketchCol FROM " + getTableName()
+          + " where dimName = 'gender' and dimValue = 'Male' "
+          + ") "
+          + "GROUP BY dimValue";
+      ImmutableMap<String, Integer> expected =
+          ImmutableMap.of("Female", 50 + 60 + 70 + 110 + 120 + 130, "Male", 80 
+ 90 + 100 + 140 + 150 + 160);
+      runAndAssert(query, expected);
+    }
+
+    // JOIN all by gender
+    {
+      String query = "select a.dimValue, 
distinctCountThetaSketch(b.thetaSketchCol) "
+          + "FROM "
+          + "(SELECT dimName, dimValue, thetaSketchCol FROM " + getTableName()
+          + " where dimName = 'gender' and dimValue = 'Female') a "
+          + "JOIN "
+          + "(SELECT dimName, dimValue, thetaSketchCol FROM " + getTableName()
+          + " where dimName = 'gender' and dimValue = 'Male') b "
+          + "ON a.dimName = b.dimName "
+          + "GROUP BY a.dimValue";
+      ImmutableMap<String, Integer> expected =
+          ImmutableMap.of("Female", 80 + 90 + 100 + 140 + 150 + 160);
+      runAndAssert(query, expected);
+    }
+    {
+      String query = "select b.dimValue, 
distinctCountThetaSketch(a.thetaSketchCol) "
+          + "FROM "
+          + "(SELECT dimName, dimValue, thetaSketchCol FROM " + getTableName()
+          + " where dimName = 'gender' and dimValue = 'Female') a "
+          + "JOIN "
+          + "(SELECT dimName, dimValue, thetaSketchCol FROM " + getTableName()
+          + " where dimName = 'gender' and dimValue = 'Male') b "
+          + "ON a.dimName = b.dimName "
+          + "GROUP BY b.dimValue";
+      ImmutableMap<String, Integer> expected =
+          ImmutableMap.of("Male", 50 + 60 + 70 + 110 + 120 + 130);
+      runAndAssert(query, expected);
+    }
+    {
+      String query = "SELECT "
+          + "GET_THETA_SKETCH_ESTIMATE(THETA_SKETCH_INTERSECT("
+          + "  DISTINCT_COUNT_RAW_THETA_SKETCH(a.thetaSketchCol, ''), "
+          + "  DISTINCT_COUNT_RAW_THETA_SKETCH(b.thetaSketchCol, ''))), "
+          + "GET_THETA_SKETCH_ESTIMATE(THETA_SKETCH_UNION("
+          + "  DISTINCT_COUNT_RAW_THETA_SKETCH(a.thetaSketchCol, ''), "
+          + "  DISTINCT_COUNT_RAW_THETA_SKETCH(b.thetaSketchCol, ''))) "
+          + "FROM "
+          + "(SELECT dimName, dimValue, thetaSketchCol FROM " + getTableName()
+          + " where dimName = 'gender' and dimValue = 'Female') a "
+          + "JOIN "
+          + "(SELECT dimName, dimValue, thetaSketchCol FROM " + getTableName()
+          + " where dimName = 'gender' and dimValue = 'Male') b "
+          + "ON a.dimName = b.dimName";
+      JsonNode jsonNode = postQuery(query);
+      
assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).longValue(),
+          0);
+      
assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(1).longValue(),
+          50 + 60 + 70 + 110 + 120 + 130 + 80 + 90 + 100 + 140 + 150 + 160);
+    }
   }
 
   private void runAndAssert(String query, int expected)
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TupleSketchTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TupleSketchTest.java
index e90b244f11..bb9c175e25 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TupleSketchTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TupleSketchTest.java
@@ -26,9 +26,12 @@ import java.util.Base64;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.commons.lang.math.RandomUtils;
+import org.apache.datasketches.tuple.Intersection;
 import org.apache.datasketches.tuple.Sketch;
 import org.apache.datasketches.tuple.aninteger.IntegerSketch;
 import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
@@ -42,6 +45,7 @@ import static org.testng.Assert.assertTrue;
 public class TupleSketchTest extends CustomDataQueryClusterIntegrationTest {
 
   private static final String DEFAULT_TABLE_NAME = "TupleSketchTest";
+  private static final String ID = "id";
   private static final String MET_TUPLE_SKETCH_BYTES = "metTupleSketchBytes";
 
   @Override
@@ -71,6 +75,196 @@ public class TupleSketchTest extends 
CustomDataQueryClusterIntegrationTest {
     assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(3).asLong() 
> 0);
   }
 
+  @Test(dataProvider = "useV2QueryEngine")
+  public void testTupleUnionQueries(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    for (int i = 0; i < 10; i++) {
+      String query = "SELECT "
+          + "DISTINCT_COUNT_TUPLE_SKETCH(metTupleSketchBytes), "
+          + 
"GET_INT_TUPLE_SKETCH_ESTIMATE(DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(metTupleSketchBytes))
 "
+          + "FROM " + getTableName()
+          + " WHERE id=" + i;
+      JsonNode jsonNode = postQuery(query);
+      long distinctCount = 
jsonNode.get("resultTable").get("rows").get(0).get(0).asLong();
+      
assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(1).asLong(), 
distinctCount);
+      query = "SELECT "
+          + 
"GET_INT_TUPLE_SKETCH_ESTIMATE(DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(metTupleSketchBytes)
 "
+          + "FILTER (WHERE id = " + i + ")) "
+          + "FROM " + getTableName();
+      jsonNode = postQuery(query);
+      
assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asLong(), 
distinctCount);
+    }
+
+    for (int i = 0; i < 10; i++) {
+      for (int j = 0; j < 10; j++) {
+        // Query Type 1:
+        String query = "SELECT 
DISTINCT_COUNT_TUPLE_SKETCH(metTupleSketchBytes), "
+            + 
"GET_INT_TUPLE_SKETCH_ESTIMATE(DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(metTupleSketchBytes))
 "
+            + "FROM " + getTableName()
+            + " WHERE id=" + i + " OR id=" + j;
+        JsonNode jsonNode = postQuery(query);
+        long distinctCount = 
jsonNode.get("resultTable").get("rows").get(0).get(0).asLong();
+        
assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(1).asLong(), 
distinctCount);
+
+        // Query Type 2:
+        query = "SELECT "
+            + 
"GET_INT_TUPLE_SKETCH_ESTIMATE(DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(metTupleSketchBytes)
 "
+            + "FILTER (WHERE id = " + i + " OR id = " + j + ")) "
+            + "FROM " + getTableName();
+        jsonNode = postQuery(query);
+        
assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asLong(), 
distinctCount);
+
+        // Query Type 3:
+        query = "SELECT "
+            + "GET_INT_TUPLE_SKETCH_ESTIMATE(INT_SUM_TUPLE_SKETCH_UNION( "
+            + 
"DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(metTupleSketchBytes) FILTER (WHERE 
id = " + i + "),"
+            + 
"DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(metTupleSketchBytes) FILTER (WHERE 
id = " + j + ")))"
+            + " FROM " + getTableName();
+
+        jsonNode = postQuery(query);
+        
assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asLong(), 
distinctCount);
+      }
+    }
+  }
+
+  @Test(dataProvider = "useV2QueryEngine")
+  public void testTupleIntersectionQueries(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+
+    for (int i = 1; i < 9; i++) { // Query with Intersection
+      String query = "SELECT "
+          + "GET_INT_TUPLE_SKETCH_ESTIMATE(INT_SUM_TUPLE_SKETCH_INTERSECT( "
+          + "DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(metTupleSketchBytes) "
+          + "FILTER (WHERE id = " + (i - 1) + " OR id = " + i + "),"
+          + "DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(metTupleSketchBytes) "
+          + "FILTER (WHERE id = " + i + " OR id = " + (i + 1) + "))),"
+
+          + "DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(metTupleSketchBytes) "
+          + "FILTER (WHERE id = " + (i - 1) + " OR id = " + i + "),"
+
+          + "DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(metTupleSketchBytes) "
+          + "FILTER (WHERE id = " + (i + 1) + " OR id = " + i + ")"
+          + " FROM " + getTableName();
+      JsonNode jsonNode = postQuery(query);
+
+      String sketch1 = 
jsonNode.get("resultTable").get("rows").get(0).get(1).asText();
+      String sketch2 = 
jsonNode.get("resultTable").get("rows").get(0).get(2).asText();
+      Sketch<IntegerSummary> deserializedSketch1 =
+          
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(Base64.getDecoder().decode(sketch1));
+      Sketch<IntegerSummary> deserializedSketch2 =
+          
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(Base64.getDecoder().decode(sketch2));
+      Intersection<IntegerSummary> intersection = new Intersection<>(new 
IntegerSummarySetOperations(
+          IntegerSummary.Mode.Sum, IntegerSummary.Mode.Sum));
+      intersection.intersect(deserializedSketch1);
+      intersection.intersect(deserializedSketch2);
+      long estimate = (long) intersection.getResult().getEstimate();
+      
assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asLong(), 
estimate);
+    }
+  }
+
+  @Test(dataProvider = "useV2QueryEngine")
+  public void testUnionWithSketchQueries(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String query =
+        String.format(
+            "SELECT "
+                + "DISTINCT_COUNT_TUPLE_SKETCH(%s), "
+                + "DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(%s), "
+                + "SUM_VALUES_INTEGER_SUM_TUPLE_SKETCH(%s), "
+                + "AVG_VALUE_INTEGER_SUM_TUPLE_SKETCH(%s) "
+                + "FROM "
+                + "("
+                + "SELECT %s FROM %s WHERE %s = 4 "
+                + "UNION ALL "
+                + "SELECT %s FROM %s WHERE %s = 5 "
+                + "UNION ALL "
+                + "SELECT %s FROM %s WHERE %s = 6 "
+                + "UNION ALL "
+                + "SELECT %s FROM %s WHERE %s = 7 "
+                + ")",
+            MET_TUPLE_SKETCH_BYTES, MET_TUPLE_SKETCH_BYTES, 
MET_TUPLE_SKETCH_BYTES, MET_TUPLE_SKETCH_BYTES,
+            MET_TUPLE_SKETCH_BYTES, getTableName(), ID, 
MET_TUPLE_SKETCH_BYTES, getTableName(), ID,
+            MET_TUPLE_SKETCH_BYTES, getTableName(), ID, 
MET_TUPLE_SKETCH_BYTES, getTableName(), ID);
+    JsonNode jsonNode = postQuery(query);
+    long distinctCount = 
jsonNode.get("resultTable").get("rows").get(0).get(0).asLong();
+    byte[] rawSketchBytes = 
Base64.getDecoder().decode(jsonNode.get("resultTable").get("rows").get(0).get(1).asText());
+    Sketch<IntegerSummary> deserializedSketch =
+        
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(rawSketchBytes);
+
+    assertTrue(distinctCount > 0);
+    assertEquals(Double.valueOf(deserializedSketch.getEstimate()).longValue(), 
distinctCount);
+    assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(2).asLong() 
> 0);
+    assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(3).asLong() 
> 0);
+  }
+
+  @Test(dataProvider = "useV2QueryEngine")
+  public void testJoinWithSketchQueries(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String query =
+        String.format(
+            "SELECT "
+                + "DISTINCT_COUNT_TUPLE_SKETCH(a.%s), "
+                + "DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(a.%s), "
+                + "SUM_VALUES_INTEGER_SUM_TUPLE_SKETCH(a.%s), "
+                + "AVG_VALUE_INTEGER_SUM_TUPLE_SKETCH(a.%s), "
+                + "DISTINCT_COUNT_TUPLE_SKETCH(b.%s), "
+                + "DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(b.%s), "
+                + "SUM_VALUES_INTEGER_SUM_TUPLE_SKETCH(b.%s), "
+                + "AVG_VALUE_INTEGER_SUM_TUPLE_SKETCH(b.%s) "
+                + "FROM "
+                + "(SELECT * FROM %s WHERE %s < 8 ) a "
+                + "JOIN "
+                + "(SELECT * FROM %s WHERE %s > 3 ) b "
+                + "ON a.%s = b.%s",
+            MET_TUPLE_SKETCH_BYTES, MET_TUPLE_SKETCH_BYTES, 
MET_TUPLE_SKETCH_BYTES, MET_TUPLE_SKETCH_BYTES,
+            MET_TUPLE_SKETCH_BYTES, MET_TUPLE_SKETCH_BYTES, 
MET_TUPLE_SKETCH_BYTES, MET_TUPLE_SKETCH_BYTES,
+            getTableName(), ID, getTableName(), ID, ID, ID);
+    JsonNode jsonNode = postQuery(query);
+    long distinctCount = 
jsonNode.get("resultTable").get("rows").get(0).get(0).asLong();
+    byte[] rawSketchBytes = 
Base64.getDecoder().decode(jsonNode.get("resultTable").get("rows").get(0).get(1).asText());
+    Sketch<IntegerSummary> deserializedSketch =
+        
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(rawSketchBytes);
+    assertTrue(distinctCount > 0);
+    assertEquals(Double.valueOf(deserializedSketch.getEstimate()).longValue(), 
distinctCount);
+    assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(2).asLong() 
> 0);
+    assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(3).asLong() 
> 0);
+
+    distinctCount = 
jsonNode.get("resultTable").get("rows").get(0).get(4).asLong();
+    rawSketchBytes = 
Base64.getDecoder().decode(jsonNode.get("resultTable").get("rows").get(0).get(5).asText());
+    deserializedSketch = 
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(rawSketchBytes);
+    assertTrue(distinctCount > 0);
+    assertEquals(Double.valueOf(deserializedSketch.getEstimate()).longValue(), 
distinctCount);
+    assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(6).asLong() 
> 0);
+    assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(7).asLong() 
> 0);
+  }
+
+  @Test(dataProvider = "useV2QueryEngine")
+  public void testJoinAndIntersectionWithSketchQueries(boolean 
useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String query =
+        "SELECT "
+            + "GET_INT_TUPLE_SKETCH_ESTIMATE(INT_SUM_TUPLE_SKETCH_INTERSECT( "
+            + "DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(a." + 
MET_TUPLE_SKETCH_BYTES + "), "
+            + "DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(b." + 
MET_TUPLE_SKETCH_BYTES + "))), "
+            + "GET_INT_TUPLE_SKETCH_ESTIMATE(INT_SUM_TUPLE_SKETCH_UNION( "
+            + "DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(a." + 
MET_TUPLE_SKETCH_BYTES + "), "
+            + "DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(b." + 
MET_TUPLE_SKETCH_BYTES + "))) "
+            + "FROM "
+            + "(SELECT * FROM " + getTableName() + " WHERE id < 8 ) a "
+            + "JOIN "
+            + "(SELECT * FROM " + getTableName() + " WHERE id > 3 ) b "
+            + "ON a.id = b.id";
+    JsonNode jsonNode = postQuery(query);
+    long distinctCountIntersection = 
jsonNode.get("resultTable").get("rows").get(0).get(0).asLong();
+    long distinctCountUnion = 
jsonNode.get("resultTable").get("rows").get(0).get(1).asLong();
+    assertTrue(distinctCountIntersection <= distinctCountUnion);
+  }
+
   @Override
   public String getTableName() {
     return DEFAULT_TABLE_NAME;
@@ -79,6 +273,7 @@ public class TupleSketchTest extends 
CustomDataQueryClusterIntegrationTest {
   @Override
   public Schema createSchema() {
     return new Schema.SchemaBuilder().setSchemaName(getTableName())
+        .addSingleValueDimension(ID, FieldSpec.DataType.INT)
         .addMetric(MET_TUPLE_SKETCH_BYTES, FieldSpec.DataType.BYTES)
         .build();
   }
@@ -89,6 +284,8 @@ public class TupleSketchTest extends 
CustomDataQueryClusterIntegrationTest {
     // create avro schema
     org.apache.avro.Schema avroSchema = 
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
     avroSchema.setFields(ImmutableList.of(
+        new org.apache.avro.Schema.Field(ID, 
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT), null,
+            null),
         new org.apache.avro.Schema.Field(MET_TUPLE_SKETCH_BYTES, 
org.apache.avro.Schema.create(
             org.apache.avro.Schema.Type.BYTES), null, null)));
 
@@ -99,6 +296,7 @@ public class TupleSketchTest extends 
CustomDataQueryClusterIntegrationTest {
       for (int i = 0; i < getCountStarResult(); i++) {
         // create avro record
         GenericData.Record record = new GenericData.Record(avroSchema);
+        record.put(ID, RandomUtils.nextInt(10));
         record.put(MET_TUPLE_SKETCH_BYTES, 
ByteBuffer.wrap(getRandomRawValue()));
         // add avro record to file
         fileWriter.append(record);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to