This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 659fe7c68a Support distinctCountBitmap and *TupleSketch functions in 
v2 (#11245)
659fe7c68a is described below

commit 659fe7c68a62d553e5ba24dfe996cc18f5a3bcea
Author: Xiang Fu <xiangfu.1...@gmail.com>
AuthorDate: Tue Aug 1 23:25:34 2023 -0700

    Support distinctCountBitmap and *TupleSketch functions in v2 (#11245)
---
 .../tests/MultiStageEngineIntegrationTest.java     | 45 +++++++++++--
 .../tests/SumPrecisionIntegrationTest.java         | 14 ----
 ...onTest.java => TupleSketchIntegrationTest.java} | 78 ++++++++--------------
 .../pinot/segment/spi/AggregationFunctionType.java | 34 +++++++---
 4 files changed, 92 insertions(+), 79 deletions(-)

diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
index bd12f0901b..e4d149a547 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
@@ -128,6 +128,43 @@ public class MultiStageEngineIntegrationTest extends 
BaseClusterIntegrationTestS
     testQueryWithMatchingRowCount(pinotQuery, h2Query);
   }
 
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testDistinctCountQueries(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String[] numericResultFunctions = new String[]{
+        "distinctCount", "distinctCountBitmap", "distinctCountHLL", 
"segmentPartitionedDistinctCount",
+        "distinctCountSmartHLL", "distinctCountThetaSketch", "distinctSum", 
"distinctAvg"
+    };
+
+    double[] expectedNumericResults = new double[]{
+        364, 364, 355, 364, 364, 364, 5915969, 16252.662087912087
+    };
+    Assert.assertEquals(numericResultFunctions.length, 
expectedNumericResults.length);
+
+    for (int i = 0; i < numericResultFunctions.length; i++) {
+      String pinotQuery = String.format("SELECT %s(DaysSinceEpoch) FROM 
mytable", numericResultFunctions[i]);
+      JsonNode jsonNode = postQuery(pinotQuery);
+      
Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asDouble(),
 expectedNumericResults[i]);
+    }
+
+    String[] binaryResultFunctions = new String[]{
+        "distinctCountRawHLL", "distinctCountRawThetaSketch"
+    };
+    int[] expectedBinarySizeResults = new int[]{
+        360,
+        3904
+    };
+    for (int i = 0; i < binaryResultFunctions.length; i++) {
+      String pinotQuery = String.format("SELECT %s(DaysSinceEpoch) FROM 
mytable", binaryResultFunctions[i]);
+      JsonNode jsonNode = postQuery(pinotQuery);
+      
Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asText().length(),
+          expectedBinarySizeResults[i]);
+    }
+
+    setUseMultiStageQueryEngine(true);
+  }
+
   @Test(dataProvider = "useBothQueryEngines")
   public void testMultiValueColumnAggregationQuery(boolean 
useMultiStageQueryEngine)
       throws Exception {
@@ -166,13 +203,13 @@ public class MultiStageEngineIntegrationTest extends 
BaseClusterIntegrationTestS
 
     pinotQuery = "SELECT percentileKLLMV(DivAirportIDs, 99) FROM mytable";
     jsonNode = postQuery(pinotQuery);
-    
Assert.assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(0).asDouble()
 > 12000);
-    
Assert.assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(0).asDouble()
 < 15000);
+    
Assert.assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(0).asDouble()
 > 10000);
+    
Assert.assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(0).asDouble()
 < 17000);
 
     pinotQuery = "SELECT percentileKLLMV(DivAirportIDs, 99, 100) FROM mytable";
     jsonNode = postQuery(pinotQuery);
-    
Assert.assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(0).asDouble()
 > 12000);
-    
Assert.assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(0).asDouble()
 < 15000);
+    
Assert.assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(0).asDouble()
 > 10000);
+    
Assert.assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(0).asDouble()
 < 17000);
 
     setUseMultiStageQueryEngine(true);
   }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SumPrecisionIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SumPrecisionIntegrationTest.java
index 8b8a8ab41f..f3763dd3e3 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SumPrecisionIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SumPrecisionIntegrationTest.java
@@ -24,8 +24,6 @@ import java.io.File;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.file.DataFileWriter;
@@ -104,18 +102,6 @@ public class SumPrecisionIntegrationTest extends 
BaseClusterIntegrationTest {
     }
   }
 
-  private void runAndAssert(String query, Map<String, Integer> 
expectedGroupToValueMap)
-      throws Exception {
-    Map<String, Integer> actualGroupToValueMap = new HashMap<>();
-    JsonNode jsonNode = postQuery(query);
-    jsonNode.get("resultTable").get("rows").forEach(node -> {
-      String group = node.get(0).textValue();
-      int value = node.get(1).intValue();
-      actualGroupToValueMap.put(group, value);
-    });
-    assertEquals(actualGroupToValueMap, expectedGroupToValueMap);
-  }
-
   private File createAvroFile(long totalNumRecords)
       throws IOException {
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SumPrecisionIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TupleSketchIntegrationTest.java
similarity index 57%
copy from 
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SumPrecisionIntegrationTest.java
copy to 
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TupleSketchIntegrationTest.java
index 8b8a8ab41f..e0e05d1075 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SumPrecisionIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TupleSketchIntegrationTest.java
@@ -22,22 +22,21 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.collect.ImmutableList;
 import java.io.File;
 import java.io.IOException;
-import java.math.BigDecimal;
 import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.Random;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.RandomUtils;
+import org.apache.datasketches.tuple.aninteger.IntegerSketch;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.utils.BigDecimalUtils;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.util.TestUtils;
 import org.testng.annotations.AfterClass;
@@ -45,14 +44,13 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
 
 
-public class SumPrecisionIntegrationTest extends BaseClusterIntegrationTest {
-  private static final String DIM_NAME = "dimName";
-  private static final String MET_BIG_DECIMAL_BYTES = "metBigDecimalBytes";
-  private static final String MET_BIG_DECIMAL_STRING = "metBigDecimalString";
-  private static final String MET_DOUBLE = "metDouble";
-  private static final String MET_LONG = "metLong";
+public class TupleSketchIntegrationTest extends BaseClusterIntegrationTest {
+  private static final String MET_TUPLE_SKETCH_BYTES = "metTupleSketchBytes";
+
+  private static final Random RANDOM = new Random();
 
   @BeforeClass
   public void setup()
@@ -67,11 +65,8 @@ public class SumPrecisionIntegrationTest extends 
BaseClusterIntegrationTest {
 
     // create & upload schema AND table config
     Schema schema = new 
Schema.SchemaBuilder().setSchemaName(DEFAULT_SCHEMA_NAME)
-        .addSingleValueDimension(DIM_NAME, FieldSpec.DataType.STRING)
-        .addMetric(MET_BIG_DECIMAL_BYTES, FieldSpec.DataType.BIG_DECIMAL)
-        .addMetric(MET_BIG_DECIMAL_STRING, FieldSpec.DataType.BIG_DECIMAL)
-        .addMetric(MET_DOUBLE, FieldSpec.DataType.DOUBLE)
-        .addMetric(MET_LONG, FieldSpec.DataType.LONG).build();
+        .addMetric(MET_TUPLE_SKETCH_BYTES, FieldSpec.DataType.BYTES)
+        .build();
     addSchema(schema);
     TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(DEFAULT_TABLE_NAME).build();
     addTableConfig(tableConfig);
@@ -94,26 +89,16 @@ public class SumPrecisionIntegrationTest extends 
BaseClusterIntegrationTest {
       throws Exception {
     setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     String query =
-        String.format("SELECT SUMPRECISION(%s), SUMPRECISION(%s), sum(%s), 
sum(%s) FROM %s",
-            MET_BIG_DECIMAL_BYTES, MET_BIG_DECIMAL_STRING, MET_DOUBLE, 
MET_LONG, DEFAULT_TABLE_NAME);
-    double sumResult = 2147484147500L;
-    JsonNode jsonNode = postQuery(query);
-    System.out.println("jsonNode = " + jsonNode.toPrettyString());
-    for (int i = 0; i < 4; i++) {
-      
assertEquals(Double.parseDouble(jsonNode.get("resultTable").get("rows").get(0).get(i).asText()),
 sumResult);
-    }
-  }
-
-  private void runAndAssert(String query, Map<String, Integer> 
expectedGroupToValueMap)
-      throws Exception {
-    Map<String, Integer> actualGroupToValueMap = new HashMap<>();
+        String.format(
+            "SELECT DISTINCT_COUNT_TUPLE_SKETCH(%s), 
DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(%s), "
+                + "SUM_VALUES_INTEGER_SUM_TUPLE_SKETCH(%s), 
AVG_VALUE_INTEGER_SUM_TUPLE_SKETCH(%s) FROM %s",
+            MET_TUPLE_SKETCH_BYTES, MET_TUPLE_SKETCH_BYTES, 
MET_TUPLE_SKETCH_BYTES, MET_TUPLE_SKETCH_BYTES,
+            DEFAULT_TABLE_NAME);
     JsonNode jsonNode = postQuery(query);
-    jsonNode.get("resultTable").get("rows").forEach(node -> {
-      String group = node.get(0).textValue();
-      int value = node.get(1).intValue();
-      actualGroupToValueMap.put(group, value);
-    });
-    assertEquals(actualGroupToValueMap, expectedGroupToValueMap);
+    assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(0).asLong() 
> 0);
+    
assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(1).asText().length(),
 1756);
+    assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(2).asLong() 
> 0);
+    assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(3).asLong() 
> 0);
   }
 
   private File createAvroFile(long totalNumRecords)
@@ -122,29 +107,16 @@ public class SumPrecisionIntegrationTest extends 
BaseClusterIntegrationTest {
     // create avro schema
     org.apache.avro.Schema avroSchema = 
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
     avroSchema.setFields(ImmutableList.of(
-        new Field(DIM_NAME, org.apache.avro.Schema.create(Type.STRING), null, 
null),
-        new Field(MET_BIG_DECIMAL_BYTES, 
org.apache.avro.Schema.create(Type.BYTES), null, null),
-        new Field(MET_BIG_DECIMAL_STRING, 
org.apache.avro.Schema.create(Type.STRING), null, null),
-        new Field(MET_DOUBLE, org.apache.avro.Schema.create(Type.DOUBLE), 
null, null),
-        new Field(MET_LONG, org.apache.avro.Schema.create(Type.LONG), null, 
null)));
+        new Field(MET_TUPLE_SKETCH_BYTES, 
org.apache.avro.Schema.create(Type.BYTES), null, null)));
 
     // create avro file
     File avroFile = new File(_tempDir, "data.avro");
     try (DataFileWriter<GenericData.Record> fileWriter = new 
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
       fileWriter.create(avroSchema, avroFile);
-      int dimCardinality = 50;
-      BigDecimal bigDecimalBase = BigDecimal.valueOf(Integer.MAX_VALUE + 1L);
       for (int i = 0; i < totalNumRecords; i++) {
         // create avro record
         GenericData.Record record = new GenericData.Record(avroSchema);
-        record.put(DIM_NAME, "dim" + (RandomUtils.nextInt() % dimCardinality));
-        BigDecimal bigDecimalValue = bigDecimalBase.add(BigDecimal.valueOf(i));
-
-        record.put(MET_BIG_DECIMAL_BYTES, 
ByteBuffer.wrap(BigDecimalUtils.serialize(bigDecimalValue)));
-        record.put(MET_BIG_DECIMAL_STRING, bigDecimalValue.toPlainString());
-        record.put(MET_DOUBLE, bigDecimalValue.doubleValue());
-        record.put(MET_LONG, bigDecimalValue.longValue());
-
+        record.put(MET_TUPLE_SKETCH_BYTES, 
ByteBuffer.wrap(getRandomRawValue()));
         // add avro record to file
         fileWriter.append(record);
       }
@@ -152,6 +124,12 @@ public class SumPrecisionIntegrationTest extends 
BaseClusterIntegrationTest {
     return avroFile;
   }
 
+  private byte[] getRandomRawValue() {
+    IntegerSketch is = new IntegerSketch(4, IntegerSummary.Mode.Sum);
+    is.update(RANDOM.nextInt(100), RANDOM.nextInt(100));
+    return 
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(is.compact());
+  }
+
   @AfterClass
   public void tearDown()
       throws IOException {
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
index 77a9199727..fae66d1ded 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
@@ -80,14 +80,15 @@ public enum AggregationFunctionType {
    * (1) distinct_count only supports single argument;
    * (2) count(distinct ...) support multi-argument and will be converted into 
DISTINCT + COUNT
    */
-  DISTINCTCOUNT("distinctCount", null, SqlKind.OTHER_FUNCTION,
+  DISTINCTCOUNT("distinctCount", ImmutableList.of("DISTINCT_COUNT"), 
SqlKind.OTHER_FUNCTION,
       SqlFunctionCategory.USER_DEFINED_FUNCTION, OperandTypes.ANY, 
ReturnTypes.BIGINT,
       ReturnTypes.explicit(SqlTypeName.OTHER)),
-  // TODO: support bitmap and segment partition in V2
-  DISTINCTCOUNTBITMAP("distinctCountBitmap"),
-  SEGMENTPARTITIONEDDISTINCTCOUNT("segmentPartitionedDistinctCount", null, 
SqlKind.OTHER_FUNCTION,
+  DISTINCTCOUNTBITMAP("distinctCountBitmap", 
ImmutableList.of("DISTINCT_COUNT_BITMAP"), SqlKind.OTHER_FUNCTION,
       SqlFunctionCategory.USER_DEFINED_FUNCTION, OperandTypes.ANY, 
ReturnTypes.BIGINT,
-      ReturnTypes.BIGINT),
+      ReturnTypes.explicit(SqlTypeName.OTHER)),
+  SEGMENTPARTITIONEDDISTINCTCOUNT("segmentPartitionedDistinctCount",
+      ImmutableList.of("SEGMENT_PARTITIONED_DISTINCT_COUNT"), 
SqlKind.OTHER_FUNCTION,
+      SqlFunctionCategory.USER_DEFINED_FUNCTION, OperandTypes.ANY, 
ReturnTypes.BIGINT, ReturnTypes.BIGINT),
   DISTINCTCOUNTHLL("distinctCountHLL", ImmutableList.of("DISTINCT_COUNT_HLL"), 
SqlKind.OTHER_FUNCTION,
       SqlFunctionCategory.USER_DEFINED_FUNCTION,
       OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY, 
SqlTypeFamily.NUMERIC), ordinal -> ordinal > 0),
@@ -101,6 +102,7 @@ public enum AggregationFunctionType {
       OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY, 
SqlTypeFamily.CHARACTER), ordinal -> ordinal > 0),
       ReturnTypes.BIGINT, ReturnTypes.explicit(SqlTypeName.OTHER)),
   // DEPRECATED in v2
+  @Deprecated
   FASTHLL("fastHLL"),
   DISTINCTCOUNTTHETASKETCH("distinctCountThetaSketch", null,
       SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION,
@@ -131,6 +133,7 @@ public enum AggregationFunctionType {
       OperandTypes.family(ImmutableList.of(SqlTypeFamily.NUMERIC, 
SqlTypeFamily.NUMERIC)), ReturnTypes.VARCHAR_2000,
       ReturnTypes.explicit(SqlTypeName.OTHER)),
   // DEPRECATED in v2
+  @Deprecated
   PERCENTILESMARTTDIGEST("percentileSmartTDigest"),
   PERCENTILEKLL("percentileKLL", null, SqlKind.OTHER_FUNCTION, 
SqlFunctionCategory.USER_DEFINED_FUNCTION,
       OperandTypes.family(ImmutableList.of(SqlTypeFamily.NUMERIC, 
SqlTypeFamily.NUMERIC)), ReturnTypes.ARG0,
@@ -140,6 +143,7 @@ public enum AggregationFunctionType {
       ReturnTypes.explicit(SqlTypeName.OTHER)),
 
   // DEPRECATED in v2
+  @Deprecated
   IDSET("idSet"),
 
   // TODO: support histogram requires solving ARRAY constructor and 
multi-function signature without optional ordinal
@@ -167,17 +171,25 @@ public enum AggregationFunctionType {
       OperandTypes.NUMERIC, ReturnTypes.DOUBLE, 
ReturnTypes.explicit(SqlTypeName.OTHER)),
   FOURTHMOMENT("fourthMoment"),
 
-  // TODO: revisit support for Tuple sketches in V2
   // DataSketches Tuple Sketch support
-  DISTINCTCOUNTTUPLESKETCH("distinctCountTupleSketch"),
+  DISTINCTCOUNTTUPLESKETCH("distinctCountTupleSketch", 
ImmutableList.of("DISTINCT_COUNT_TUPLE_SKETCH"),
+      SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION, 
OperandTypes.BINARY, ReturnTypes.BIGINT,
+      ReturnTypes.explicit(SqlTypeName.OTHER)),
 
   // DataSketches Tuple Sketch support for Integer based Tuple Sketches
-  
DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH("distinctCountRawIntegerSumTupleSketch"),
+  
DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH("distinctCountRawIntegerSumTupleSketch",
+      ImmutableList.of("DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH"), 
SqlKind.OTHER_FUNCTION,
+      SqlFunctionCategory.USER_DEFINED_FUNCTION, OperandTypes.BINARY, 
ReturnTypes.VARCHAR_2000,
+      ReturnTypes.explicit(SqlTypeName.OTHER)),
 
-  SUMVALUESINTEGERSUMTUPLESKETCH("sumValuesIntegerSumTupleSketch"),
-  AVGVALUEINTEGERSUMTUPLESKETCH("avgValueIntegerSumTupleSketch"),
+  SUMVALUESINTEGERSUMTUPLESKETCH("sumValuesIntegerSumTupleSketch",
+      ImmutableList.of("SUM_VALUES_INTEGER_SUM_TUPLE_SKETCH"), 
SqlKind.OTHER_FUNCTION,
+      SqlFunctionCategory.USER_DEFINED_FUNCTION, OperandTypes.BINARY, 
ReturnTypes.BIGINT,
+      ReturnTypes.explicit(SqlTypeName.OTHER)),
+  AVGVALUEINTEGERSUMTUPLESKETCH("avgValueIntegerSumTupleSketch", 
ImmutableList.of("AVG_VALUE_INTEGER_SUM_TUPLE_SKETCH"),
+      SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION, 
OperandTypes.BINARY, ReturnTypes.BIGINT,
+      ReturnTypes.explicit(SqlTypeName.OTHER)),
 
-  // TODO: revisit support for Geo-spatial agg in V2
   // Geo aggregation functions
   STUNION("STUnion", ImmutableList.of("ST_UNION"), SqlKind.OTHER_FUNCTION, 
SqlFunctionCategory.USER_DEFINED_FUNCTION,
       OperandTypes.BINARY, ReturnTypes.explicit(SqlTypeName.VARBINARY), 
ReturnTypes.explicit(SqlTypeName.OTHER)),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to