This is an automated email from the ASF dual-hosted git repository. rongr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 11b85df331 add theta sketch scalar (#11153) 11b85df331 is described below commit 11b85df3318ab414a7dff30e2b1d979259b984fc Author: Rong Rong <ro...@apache.org> AuthorDate: Thu Jul 27 11:50:36 2023 -0700 add theta sketch scalar (#11153) * add theta sketch scalar functions * fix sketch name to add "theta" specifically to differentiate other sketch types --------- Co-authored-by: Rong Rong <ro...@startree.ai> --- .../core/function/scalar/SketchFunctions.java | 92 ++++++++++++++++++++++ .../src/test/resources/queries/UDFAggregates.json | 20 +++++ 2 files changed, 112 insertions(+) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java b/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java index f6245bec6f..13577e7beb 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java @@ -20,8 +20,15 @@ package org.apache.pinot.core.function.scalar; import com.clearspring.analytics.stream.cardinality.HyperLogLog; import java.math.BigDecimal; +import java.util.Base64; import javax.annotation.Nullable; +import org.apache.datasketches.memory.Memory; +import org.apache.datasketches.theta.AnotB; +import org.apache.datasketches.theta.Intersection; +import org.apache.datasketches.theta.SetOperationBuilder; +import org.apache.datasketches.theta.Sketch; import org.apache.datasketches.theta.Sketches; +import org.apache.datasketches.theta.Union; import org.apache.datasketches.theta.UpdateSketch; import org.apache.datasketches.tuple.aninteger.IntegerSketch; import org.apache.datasketches.tuple.aninteger.IntegerSummary; @@ -65,6 +72,8 @@ import org.apache.pinot.spi.utils.CommonConstants; * } */ public class SketchFunctions { + private static final SetOperationBuilder SET_OPERATION_BUILDER = new SetOperationBuilder(); + private SketchFunctions() { } @@ -184,4 +193,87 @@ public class SketchFunctions { } return ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(is.compact()); } + + @ScalarFunction(names = {"getThetaSketchEstimate", "get_theta_sketch_estimate"}) + public static long getThetaSketchEstimate(Object sketchObject) { + return Math.round(asThetaSketch(sketchObject).getEstimate()); + } + + @ScalarFunction(names = {"thetaSketchUnion", "theta_sketch_union"}) + public static Sketch thetaSketchUnion(Object o1, Object o2) { + return thetaSketchUnionVar(o1, o2); + } + + @ScalarFunction(names = {"thetaSketchUnion", "theta_sketch_union"}) + public static Sketch thetaSketchUnion(Object o1, Object o2, Object o3) { + return thetaSketchUnionVar(o1, o2, o3); + } + + @ScalarFunction(names = {"thetaSketchUnion", "theta_sketch_union"}) + public static Sketch thetaSketchUnion(Object o1, Object o2, Object o3, Object o4) { + return thetaSketchUnionVar(o1, o2, o3, o4); + } + + @ScalarFunction(names = {"thetaSketchUnion", "theta_sketch_union"}) + public static Sketch thetaSketchUnion(Object o1, Object o2, Object o3, Object o4, Object o5) { + return thetaSketchUnionVar(o1, o2, o3, o4, o5); + } + + @ScalarFunction(names = {"thetaSketchIntersect", "theta_sketch_intersect"}) + public static Sketch thetaSketchIntersect(Object o1, Object o2) { + return thetaSketchIntersectVar(o1, o2); + } + + @ScalarFunction(names = {"thetaSketchIntersect", "theta_sketch_intersect"}) + public static Sketch thetaSketchIntersect(Object o1, Object o2, Object o3) { + return thetaSketchIntersectVar(o1, o2, o3); + } + + @ScalarFunction(names = {"thetaSketchIntersect", "theta_sketch_intersect"}) + public static Sketch thetaSketchIntersect(Object o1, Object o2, Object o3, Object o4) { + return thetaSketchIntersectVar(o1, o2, o3, o4); + } + + @ScalarFunction(names = {"thetaSketchIntersect", "theta_sketch_intersect"}) + public static Sketch thetaSketchIntersect(Object o1, Object o2, Object o3, Object o4, Object o5) { + return thetaSketchIntersectVar(o1, o2, o3, o4, o5); + } + + @ScalarFunction(names = {"thetaSketchDiff", "theta_sketch_diff"}) + public static Sketch thetaSketchDiff(Object sketchObjectA, Object sketchObjectB) { + AnotB diff = SET_OPERATION_BUILDER.buildANotB(); + diff.setA(asThetaSketch(sketchObjectA)); + diff.notB(asThetaSketch(sketchObjectB)); + return diff.getResult(false, null, false); + } + + private static Sketch thetaSketchUnionVar(Object... sketchObjects) { + Union union = SET_OPERATION_BUILDER.buildUnion(); + for (Object sketchObj : sketchObjects) { + union.union(asThetaSketch(sketchObj)); + } + return union.getResult(false, null); + } + + private static Sketch thetaSketchIntersectVar(Object... sketchObjects) { + Intersection intersection = SET_OPERATION_BUILDER.buildIntersection(); + for (Object sketchObj : sketchObjects) { + intersection.intersect(asThetaSketch(sketchObj)); + } + return intersection.getResult(false, null); + } + + private static Sketch asThetaSketch(Object sketchObj) { + if (sketchObj instanceof String) { + byte[] decoded = Base64.getDecoder().decode((String) sketchObj); + return Sketches.wrapSketch(Memory.wrap((decoded))); + } else if (sketchObj instanceof Sketch) { + return (Sketch) sketchObj; + } else if (sketchObj instanceof byte[]) { + return Sketches.wrapSketch(Memory.wrap((byte[]) sketchObj)); + } else { + throw new RuntimeException("Exception occurred getting estimate from Theta Sketch, unsupported Object type: " + + sketchObj.getClass()); + } + } } diff --git a/pinot-query-runtime/src/test/resources/queries/UDFAggregates.json b/pinot-query-runtime/src/test/resources/queries/UDFAggregates.json index f5a765e9ad..1bc39d0953 100644 --- a/pinot-query-runtime/src/test/resources/queries/UDFAggregates.json +++ b/pinot-query-runtime/src/test/resources/queries/UDFAggregates.json @@ -63,6 +63,26 @@ { "sql": "SELECT SUMPRECISION(decimal_col) FROM {tbl}", "outputs": [["10000000000100000000110000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000.00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 [...] + }, + { + "sql": "SELECT string_col, SUMPRECISION(decimal_col) FROM {tbl} GROUP BY string_col", + "outputs": [["a" ,"10000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000.000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 [...] + }, + { + "sql": "SELECT /*+ aggOptions(is_skip_leaf_stage_aggregate='true') */ string_col, SUMPRECISION(decimal_col) FROM {tbl} GROUP BY string_col", + "outputs": [["a" ,"10000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000.000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 [...] + }, + { + "sql": "select GET_THETA_SKETCH_ESTIMATE(DISTINCT_COUNT_RAW_THETA_SKETCH(string_col, 'nominalEntries=16')), GET_THETA_SKETCH_ESTIMATE(THETA_SKETCH_DIFF(DISTINCT_COUNT_RAW_THETA_SKETCH(int_col, ''), DISTINCT_COUNT_RAW_THETA_SKETCH(int_col, ''))), GET_THETA_SKETCH_ESTIMATE(THETA_SKETCH_UNION(DISTINCT_COUNT_RAW_THETA_SKETCH(int_col, ''), DISTINCT_COUNT_RAW_THETA_SKETCH(long_col, ''))), GET_THETA_SKETCH_ESTIMATE(THETA_SKETCH_INTERSECT(DISTINCT_COUNT_RAW_THETA_SKETCH(double_col, ''), [...] + "outputs": [[3, 0, 6, 6]] + }, + { + "sql": "select bool_col, GET_THETA_SKETCH_ESTIMATE(DISTINCT_COUNT_RAW_THETA_SKETCH(string_col, 'nominalEntries=16')), GET_THETA_SKETCH_ESTIMATE(THETA_SKETCH_DIFF(DISTINCT_COUNT_RAW_THETA_SKETCH(int_col, ''), DISTINCT_COUNT_RAW_THETA_SKETCH(int_col, ''))), GET_THETA_SKETCH_ESTIMATE(THETA_SKETCH_UNION(DISTINCT_COUNT_RAW_THETA_SKETCH(int_col, ''), DISTINCT_COUNT_RAW_THETA_SKETCH(long_col, ''))), GET_THETA_SKETCH_ESTIMATE(THETA_SKETCH_INTERSECT(DISTINCT_COUNT_RAW_THETA_SKETCH(double_ [...] + "outputs": [[true, 2, 0, 2, 3], [false, 2, 0, 4, 3]] + }, + { + "sql": "select /*+ aggOptions(is_skip_leaf_stage_aggregate='true') */ bool_col, GET_THETA_SKETCH_ESTIMATE(DISTINCT_COUNT_RAW_THETA_SKETCH(string_col, 'nominalEntries=16')), GET_THETA_SKETCH_ESTIMATE(THETA_SKETCH_DIFF(DISTINCT_COUNT_RAW_THETA_SKETCH(int_col, ''), DISTINCT_COUNT_RAW_THETA_SKETCH(int_col, ''))), GET_THETA_SKETCH_ESTIMATE(THETA_SKETCH_UNION(DISTINCT_COUNT_RAW_THETA_SKETCH(int_col, ''), DISTINCT_COUNT_RAW_THETA_SKETCH(long_col, ''))), GET_THETA_SKETCH_ESTIMATE(THETA_S [...] + "outputs": [[true, 2, 0, 2, 3], [false, 2, 0, 4, 3]] } ] } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org