This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 659fe7c68a Support distinctCountBitmap and *TupleSketch functions in v2 (#11245) 659fe7c68a is described below commit 659fe7c68a62d553e5ba24dfe996cc18f5a3bcea Author: Xiang Fu <xiangfu.1...@gmail.com> AuthorDate: Tue Aug 1 23:25:34 2023 -0700 Support distinctCountBitmap and *TupleSketch functions in v2 (#11245) --- .../tests/MultiStageEngineIntegrationTest.java | 45 +++++++++++-- .../tests/SumPrecisionIntegrationTest.java | 14 ---- ...onTest.java => TupleSketchIntegrationTest.java} | 78 ++++++++-------------- .../pinot/segment/spi/AggregationFunctionType.java | 34 +++++++--- 4 files changed, 92 insertions(+), 79 deletions(-) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java index bd12f0901b..e4d149a547 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java @@ -128,6 +128,43 @@ public class MultiStageEngineIntegrationTest extends BaseClusterIntegrationTestS testQueryWithMatchingRowCount(pinotQuery, h2Query); } + @Test(dataProvider = "useBothQueryEngines") + public void testDistinctCountQueries(boolean useMultiStageQueryEngine) + throws Exception { + setUseMultiStageQueryEngine(useMultiStageQueryEngine); + String[] numericResultFunctions = new String[]{ + "distinctCount", "distinctCountBitmap", "distinctCountHLL", "segmentPartitionedDistinctCount", + "distinctCountSmartHLL", "distinctCountThetaSketch", "distinctSum", "distinctAvg" + }; + + double[] expectedNumericResults = new double[]{ + 364, 364, 355, 364, 364, 364, 5915969, 16252.662087912087 + }; + Assert.assertEquals(numericResultFunctions.length, expectedNumericResults.length); + + for (int i = 0; i < numericResultFunctions.length; i++) { + String pinotQuery = String.format("SELECT %s(DaysSinceEpoch) FROM mytable", numericResultFunctions[i]); + JsonNode jsonNode = postQuery(pinotQuery); + Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asDouble(), expectedNumericResults[i]); + } + + String[] binaryResultFunctions = new String[]{ + "distinctCountRawHLL", "distinctCountRawThetaSketch" + }; + int[] expectedBinarySizeResults = new int[]{ + 360, + 3904 + }; + for (int i = 0; i < binaryResultFunctions.length; i++) { + String pinotQuery = String.format("SELECT %s(DaysSinceEpoch) FROM mytable", binaryResultFunctions[i]); + JsonNode jsonNode = postQuery(pinotQuery); + Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asText().length(), + expectedBinarySizeResults[i]); + } + + setUseMultiStageQueryEngine(true); + } + @Test(dataProvider = "useBothQueryEngines") public void testMultiValueColumnAggregationQuery(boolean useMultiStageQueryEngine) throws Exception { @@ -166,13 +203,13 @@ public class MultiStageEngineIntegrationTest extends BaseClusterIntegrationTestS pinotQuery = "SELECT percentileKLLMV(DivAirportIDs, 99) FROM mytable"; jsonNode = postQuery(pinotQuery); - Assert.assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(0).asDouble() > 12000); - Assert.assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(0).asDouble() < 15000); + Assert.assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(0).asDouble() > 10000); + Assert.assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(0).asDouble() < 17000); pinotQuery = "SELECT percentileKLLMV(DivAirportIDs, 99, 100) FROM mytable"; jsonNode = postQuery(pinotQuery); - Assert.assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(0).asDouble() > 12000); - Assert.assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(0).asDouble() < 15000); + Assert.assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(0).asDouble() > 10000); + Assert.assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(0).asDouble() < 17000); setUseMultiStageQueryEngine(true); } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SumPrecisionIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SumPrecisionIntegrationTest.java index 8b8a8ab41f..f3763dd3e3 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SumPrecisionIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SumPrecisionIntegrationTest.java @@ -24,8 +24,6 @@ import java.io.File; import java.io.IOException; import java.math.BigDecimal; import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Map; import org.apache.avro.Schema.Field; import org.apache.avro.Schema.Type; import org.apache.avro.file.DataFileWriter; @@ -104,18 +102,6 @@ public class SumPrecisionIntegrationTest extends BaseClusterIntegrationTest { } } - private void runAndAssert(String query, Map<String, Integer> expectedGroupToValueMap) - throws Exception { - Map<String, Integer> actualGroupToValueMap = new HashMap<>(); - JsonNode jsonNode = postQuery(query); - jsonNode.get("resultTable").get("rows").forEach(node -> { - String group = node.get(0).textValue(); - int value = node.get(1).intValue(); - actualGroupToValueMap.put(group, value); - }); - assertEquals(actualGroupToValueMap, expectedGroupToValueMap); - } - private File createAvroFile(long totalNumRecords) throws IOException { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SumPrecisionIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TupleSketchIntegrationTest.java similarity index 57% copy from pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SumPrecisionIntegrationTest.java copy to pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TupleSketchIntegrationTest.java index 8b8a8ab41f..e0e05d1075 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SumPrecisionIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TupleSketchIntegrationTest.java @@ -22,22 +22,21 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableList; import java.io.File; import java.io.IOException; -import java.math.BigDecimal; import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Map; +import java.util.Random; import org.apache.avro.Schema.Field; import org.apache.avro.Schema.Type; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumWriter; import org.apache.commons.io.FileUtils; -import org.apache.commons.lang3.RandomUtils; +import org.apache.datasketches.tuple.aninteger.IntegerSketch; +import org.apache.datasketches.tuple.aninteger.IntegerSummary; +import org.apache.pinot.core.common.ObjectSerDeUtils; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; -import org.apache.pinot.spi.utils.BigDecimalUtils; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.util.TestUtils; import org.testng.annotations.AfterClass; @@ -45,14 +44,13 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; -public class SumPrecisionIntegrationTest extends BaseClusterIntegrationTest { - private static final String DIM_NAME = "dimName"; - private static final String MET_BIG_DECIMAL_BYTES = "metBigDecimalBytes"; - private static final String MET_BIG_DECIMAL_STRING = "metBigDecimalString"; - private static final String MET_DOUBLE = "metDouble"; - private static final String MET_LONG = "metLong"; +public class TupleSketchIntegrationTest extends BaseClusterIntegrationTest { + private static final String MET_TUPLE_SKETCH_BYTES = "metTupleSketchBytes"; + + private static final Random RANDOM = new Random(); @BeforeClass public void setup() @@ -67,11 +65,8 @@ public class SumPrecisionIntegrationTest extends BaseClusterIntegrationTest { // create & upload schema AND table config Schema schema = new Schema.SchemaBuilder().setSchemaName(DEFAULT_SCHEMA_NAME) - .addSingleValueDimension(DIM_NAME, FieldSpec.DataType.STRING) - .addMetric(MET_BIG_DECIMAL_BYTES, FieldSpec.DataType.BIG_DECIMAL) - .addMetric(MET_BIG_DECIMAL_STRING, FieldSpec.DataType.BIG_DECIMAL) - .addMetric(MET_DOUBLE, FieldSpec.DataType.DOUBLE) - .addMetric(MET_LONG, FieldSpec.DataType.LONG).build(); + .addMetric(MET_TUPLE_SKETCH_BYTES, FieldSpec.DataType.BYTES) + .build(); addSchema(schema); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(DEFAULT_TABLE_NAME).build(); addTableConfig(tableConfig); @@ -94,26 +89,16 @@ public class SumPrecisionIntegrationTest extends BaseClusterIntegrationTest { throws Exception { setUseMultiStageQueryEngine(useMultiStageQueryEngine); String query = - String.format("SELECT SUMPRECISION(%s), SUMPRECISION(%s), sum(%s), sum(%s) FROM %s", - MET_BIG_DECIMAL_BYTES, MET_BIG_DECIMAL_STRING, MET_DOUBLE, MET_LONG, DEFAULT_TABLE_NAME); - double sumResult = 2147484147500L; - JsonNode jsonNode = postQuery(query); - System.out.println("jsonNode = " + jsonNode.toPrettyString()); - for (int i = 0; i < 4; i++) { - assertEquals(Double.parseDouble(jsonNode.get("resultTable").get("rows").get(0).get(i).asText()), sumResult); - } - } - - private void runAndAssert(String query, Map<String, Integer> expectedGroupToValueMap) - throws Exception { - Map<String, Integer> actualGroupToValueMap = new HashMap<>(); + String.format( + "SELECT DISTINCT_COUNT_TUPLE_SKETCH(%s), DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH(%s), " + + "SUM_VALUES_INTEGER_SUM_TUPLE_SKETCH(%s), AVG_VALUE_INTEGER_SUM_TUPLE_SKETCH(%s) FROM %s", + MET_TUPLE_SKETCH_BYTES, MET_TUPLE_SKETCH_BYTES, MET_TUPLE_SKETCH_BYTES, MET_TUPLE_SKETCH_BYTES, + DEFAULT_TABLE_NAME); JsonNode jsonNode = postQuery(query); - jsonNode.get("resultTable").get("rows").forEach(node -> { - String group = node.get(0).textValue(); - int value = node.get(1).intValue(); - actualGroupToValueMap.put(group, value); - }); - assertEquals(actualGroupToValueMap, expectedGroupToValueMap); + assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(0).asLong() > 0); + assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(1).asText().length(), 1756); + assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(2).asLong() > 0); + assertTrue(jsonNode.get("resultTable").get("rows").get(0).get(3).asLong() > 0); } private File createAvroFile(long totalNumRecords) @@ -122,29 +107,16 @@ public class SumPrecisionIntegrationTest extends BaseClusterIntegrationTest { // create avro schema org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false); avroSchema.setFields(ImmutableList.of( - new Field(DIM_NAME, org.apache.avro.Schema.create(Type.STRING), null, null), - new Field(MET_BIG_DECIMAL_BYTES, org.apache.avro.Schema.create(Type.BYTES), null, null), - new Field(MET_BIG_DECIMAL_STRING, org.apache.avro.Schema.create(Type.STRING), null, null), - new Field(MET_DOUBLE, org.apache.avro.Schema.create(Type.DOUBLE), null, null), - new Field(MET_LONG, org.apache.avro.Schema.create(Type.LONG), null, null))); + new Field(MET_TUPLE_SKETCH_BYTES, org.apache.avro.Schema.create(Type.BYTES), null, null))); // create avro file File avroFile = new File(_tempDir, "data.avro"); try (DataFileWriter<GenericData.Record> fileWriter = new DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) { fileWriter.create(avroSchema, avroFile); - int dimCardinality = 50; - BigDecimal bigDecimalBase = BigDecimal.valueOf(Integer.MAX_VALUE + 1L); for (int i = 0; i < totalNumRecords; i++) { // create avro record GenericData.Record record = new GenericData.Record(avroSchema); - record.put(DIM_NAME, "dim" + (RandomUtils.nextInt() % dimCardinality)); - BigDecimal bigDecimalValue = bigDecimalBase.add(BigDecimal.valueOf(i)); - - record.put(MET_BIG_DECIMAL_BYTES, ByteBuffer.wrap(BigDecimalUtils.serialize(bigDecimalValue))); - record.put(MET_BIG_DECIMAL_STRING, bigDecimalValue.toPlainString()); - record.put(MET_DOUBLE, bigDecimalValue.doubleValue()); - record.put(MET_LONG, bigDecimalValue.longValue()); - + record.put(MET_TUPLE_SKETCH_BYTES, ByteBuffer.wrap(getRandomRawValue())); // add avro record to file fileWriter.append(record); } @@ -152,6 +124,12 @@ public class SumPrecisionIntegrationTest extends BaseClusterIntegrationTest { return avroFile; } + private byte[] getRandomRawValue() { + IntegerSketch is = new IntegerSketch(4, IntegerSummary.Mode.Sum); + is.update(RANDOM.nextInt(100), RANDOM.nextInt(100)); + return ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(is.compact()); + } + @AfterClass public void tearDown() throws IOException { diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java index 77a9199727..fae66d1ded 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java @@ -80,14 +80,15 @@ public enum AggregationFunctionType { * (1) distinct_count only supports single argument; * (2) count(distinct ...) support multi-argument and will be converted into DISTINCT + COUNT */ - DISTINCTCOUNT("distinctCount", null, SqlKind.OTHER_FUNCTION, + DISTINCTCOUNT("distinctCount", ImmutableList.of("DISTINCT_COUNT"), SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION, OperandTypes.ANY, ReturnTypes.BIGINT, ReturnTypes.explicit(SqlTypeName.OTHER)), - // TODO: support bitmap and segment partition in V2 - DISTINCTCOUNTBITMAP("distinctCountBitmap"), - SEGMENTPARTITIONEDDISTINCTCOUNT("segmentPartitionedDistinctCount", null, SqlKind.OTHER_FUNCTION, + DISTINCTCOUNTBITMAP("distinctCountBitmap", ImmutableList.of("DISTINCT_COUNT_BITMAP"), SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION, OperandTypes.ANY, ReturnTypes.BIGINT, - ReturnTypes.BIGINT), + ReturnTypes.explicit(SqlTypeName.OTHER)), + SEGMENTPARTITIONEDDISTINCTCOUNT("segmentPartitionedDistinctCount", + ImmutableList.of("SEGMENT_PARTITIONED_DISTINCT_COUNT"), SqlKind.OTHER_FUNCTION, + SqlFunctionCategory.USER_DEFINED_FUNCTION, OperandTypes.ANY, ReturnTypes.BIGINT, ReturnTypes.BIGINT), DISTINCTCOUNTHLL("distinctCountHLL", ImmutableList.of("DISTINCT_COUNT_HLL"), SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION, OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY, SqlTypeFamily.NUMERIC), ordinal -> ordinal > 0), @@ -101,6 +102,7 @@ public enum AggregationFunctionType { OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY, SqlTypeFamily.CHARACTER), ordinal -> ordinal > 0), ReturnTypes.BIGINT, ReturnTypes.explicit(SqlTypeName.OTHER)), // DEPRECATED in v2 + @Deprecated FASTHLL("fastHLL"), DISTINCTCOUNTTHETASKETCH("distinctCountThetaSketch", null, SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION, @@ -131,6 +133,7 @@ public enum AggregationFunctionType { OperandTypes.family(ImmutableList.of(SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC)), ReturnTypes.VARCHAR_2000, ReturnTypes.explicit(SqlTypeName.OTHER)), // DEPRECATED in v2 + @Deprecated PERCENTILESMARTTDIGEST("percentileSmartTDigest"), PERCENTILEKLL("percentileKLL", null, SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION, OperandTypes.family(ImmutableList.of(SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC)), ReturnTypes.ARG0, @@ -140,6 +143,7 @@ public enum AggregationFunctionType { ReturnTypes.explicit(SqlTypeName.OTHER)), // DEPRECATED in v2 + @Deprecated IDSET("idSet"), // TODO: support histogram requires solving ARRAY constructor and multi-function signature without optional ordinal @@ -167,17 +171,25 @@ public enum AggregationFunctionType { OperandTypes.NUMERIC, ReturnTypes.DOUBLE, ReturnTypes.explicit(SqlTypeName.OTHER)), FOURTHMOMENT("fourthMoment"), - // TODO: revisit support for Tuple sketches in V2 // DataSketches Tuple Sketch support - DISTINCTCOUNTTUPLESKETCH("distinctCountTupleSketch"), + DISTINCTCOUNTTUPLESKETCH("distinctCountTupleSketch", ImmutableList.of("DISTINCT_COUNT_TUPLE_SKETCH"), + SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION, OperandTypes.BINARY, ReturnTypes.BIGINT, + ReturnTypes.explicit(SqlTypeName.OTHER)), // DataSketches Tuple Sketch support for Integer based Tuple Sketches - DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH("distinctCountRawIntegerSumTupleSketch"), + DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH("distinctCountRawIntegerSumTupleSketch", + ImmutableList.of("DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH"), SqlKind.OTHER_FUNCTION, + SqlFunctionCategory.USER_DEFINED_FUNCTION, OperandTypes.BINARY, ReturnTypes.VARCHAR_2000, + ReturnTypes.explicit(SqlTypeName.OTHER)), - SUMVALUESINTEGERSUMTUPLESKETCH("sumValuesIntegerSumTupleSketch"), - AVGVALUEINTEGERSUMTUPLESKETCH("avgValueIntegerSumTupleSketch"), + SUMVALUESINTEGERSUMTUPLESKETCH("sumValuesIntegerSumTupleSketch", + ImmutableList.of("SUM_VALUES_INTEGER_SUM_TUPLE_SKETCH"), SqlKind.OTHER_FUNCTION, + SqlFunctionCategory.USER_DEFINED_FUNCTION, OperandTypes.BINARY, ReturnTypes.BIGINT, + ReturnTypes.explicit(SqlTypeName.OTHER)), + AVGVALUEINTEGERSUMTUPLESKETCH("avgValueIntegerSumTupleSketch", ImmutableList.of("AVG_VALUE_INTEGER_SUM_TUPLE_SKETCH"), + SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION, OperandTypes.BINARY, ReturnTypes.BIGINT, + ReturnTypes.explicit(SqlTypeName.OTHER)), - // TODO: revisit support for Geo-spatial agg in V2 // Geo aggregation functions STUNION("STUnion", ImmutableList.of("ST_UNION"), SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION, OperandTypes.BINARY, ReturnTypes.explicit(SqlTypeName.VARBINARY), ReturnTypes.explicit(SqlTypeName.OTHER)), --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org