This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/main by this push: new bf02b8c KYLIN-5082,exactly match for percentile bf02b8c is described below commit bf02b8c2d33bfd27cc29d4ce8fd5faff92be9c77 Author: leocoder <leocod...@gmail.com> AuthorDate: Sat Sep 4 10:19:03 2021 +0800 KYLIN-5082,exactly match for percentile --- .../measure/percentile/PercentileCounter.java | 7 ++++ .../measure/percentile/PercentileSerializer.java | 4 +++ .../resources/query/sql_exactly_agg/query11.sql | 21 ++++++++++++ .../org/apache/spark/sql/KylinFunctions.scala | 8 ++--- .../catalyst/expressions/KylinExpresssions.scala | 28 ++++++++++++++++ .../sql/catalyst/expressions/ExpressionUtils.scala | 13 ++++++-- .../kylin/query/runtime/plans/AggregatePlan.scala | 37 ++++++++++++++-------- 7 files changed, 98 insertions(+), 20 deletions(-) diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileCounter.java index 33433dc..a6115ea 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileCounter.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileCounter.java @@ -64,6 +64,13 @@ public class PercentileCounter implements Serializable { return registers.quantile(quantileRatio); } + public Double getResultEstimateWithQuantileRatio(double quantileRatio) { + if (registers.size() == 0) { + return null; + } + return registers.quantile(quantileRatio); + } + public void writeRegisters(ByteBuffer out) { registers.compress(); registers.asSmallBytes(out); diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileSerializer.java index d0ecba7..f83e2d5 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileSerializer.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileSerializer.java @@ -33,6 +33,10 @@ public class PercentileSerializer extends DataTypeSerializer<PercentileCounter> this.compression = type.getPrecision(); } + public PercentileSerializer(int precision) { + this.compression = precision; + } + @Override public int peekLength(ByteBuffer in) { return current().peekLength(in); diff --git a/kylin-it/src/test/resources/query/sql_exactly_agg/query11.sql b/kylin-it/src/test/resources/query/sql_exactly_agg/query11.sql new file mode 100644 index 0000000..1082d5a --- /dev/null +++ b/kylin-it/src/test/resources/query/sql_exactly_agg/query11.sql @@ -0,0 +1,21 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +SELECT percentile(price, 0.6) from test_kylin_fact +group by LSTG_FORMAT_NAME, LSTG_SITE_ID, SLR_SEGMENT_CD +;{"scanRowCount":300,"scanBytes":0,"scanFiles":1,"cuboidId":[14336],"exactlyMatched":[true]} \ No newline at end of file diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinFunctions.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinFunctions.scala index 63c90dc..00fd8db 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinFunctions.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinFunctions.scala @@ -22,10 +22,7 @@ import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.types._ -import org.apache.spark.sql.catalyst.expressions.{ApproxCountDistinctDecode, BinaryExpression, - DictEncode, Expression, ExpressionInfo, ExpressionUtils, ImplicitCastInputTypes, In, - KylinAddMonths, Like, Literal, PreciseCountDistinctDecode, RoundBase, ScatterSkewData, SplitPart, Sum0, - TimestampAdd, TimestampDiff, Truncate, UnaryExpression} +import org.apache.spark.sql.catalyst.expressions.{ApproxCountDistinctDecode, BinaryExpression, DictEncode, Expression, ExpressionInfo, ExpressionUtils, ImplicitCastInputTypes, In, KylinAddMonths, Like, Literal, PercentileDecode, PreciseCountDistinctDecode, RoundBase, ScatterSkewData, SplitPart, Sum0, TimestampAdd, TimestampDiff, Truncate, UnaryExpression} import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction import org.apache.spark.sql.udaf.{ApproxCountDistinct, IntersectCount, PreciseCountDistinct} @@ -75,6 +72,9 @@ object KylinFunctions { def approx_count_distinct_decode(column: Column, precision: Int): Column = Column(ApproxCountDistinctDecode(column.expr, Literal(precision))) + def k_percentile_decode(column: Column, p: Column, precision: Int): Column = + Column(PercentileDecode(column.expr, p.expr, Literal(precision))) + def precise_count_distinct(column: Column): Column = Column(PreciseCountDistinct(column.expr).toAggregateExpression()) diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KylinExpresssions.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KylinExpresssions.scala index b47ff90..b9a9112 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KylinExpresssions.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KylinExpresssions.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.expressions import com.esotericsoftware.kryo.io.{Input, KryoDataInput} import org.apache.kylin.engine.spark.common.util.KylinDateTimeUtils import org.apache.kylin.measure.hllc.HLLCounter +import org.apache.kylin.measure.percentile.PercentileSerializer import org.apache.spark.dict.{NBucketDictionary, NGlobalDictionary} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate @@ -543,4 +544,31 @@ case class ScatterSkewData(left: Expression, right: Expression) extends BinaryEx override def dataType: DataType = StringType override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, AnyDataType) +} + +case class PercentileDecode(bytes: Expression, quantile: Expression, precision: Expression) extends TernaryExpression with ExpectsInputTypes { + + override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType, DecimalType, IntegerType) + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + val expressionUtils = ExpressionUtils.getClass.getName.stripSuffix("$") + defineCodeGen(ctx, ev, (bytes, quantile, precision) => { + s"""$expressionUtils.percentileDecodeHelper($bytes, $quantile, $precision)""" + }) + } + + override protected def nullSafeEval(bytes: Any, quantile: Any, precision: Any): Any = { + val arrayBytes = bytes.asInstanceOf[Array[Byte]] + val serializer = new PercentileSerializer(precision.asInstanceOf[Int]); + val counter = serializer.deserialize(ByteBuffer.wrap(arrayBytes)) + counter.getResultEstimateWithQuantileRatio(quantile.asInstanceOf[Decimal].toDouble) + } + + override def dataType: DataType = DoubleType + + override def prettyName: String = "percentile_decode" + + override def nullable: Boolean = false + + override def children: Seq[Expression] = Seq(bytes, quantile, precision) } \ No newline at end of file diff --git a/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala b/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala index 232f6cc..31d157a 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala @@ -18,15 +18,17 @@ package org.apache.spark.sql.catalyst.expressions +import org.apache.kylin.measure.percentile.PercentileSerializer import scala.util.{Failure, Success, Try} import scala.reflect.ClassTag import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.{FunctionBuilder, expressions} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types.Decimal +import java.nio.ByteBuffer -object -ExpressionUtils { +object ExpressionUtils { def expression[T <: Expression](name: String) (implicit tag: ClassTag[T]): (String, (ExpressionInfo, FunctionBuilder)) = { @@ -109,4 +111,11 @@ ExpressionUtils { new ExpressionInfo(clazz.getCanonicalName, name) } } + + def percentileDecodeHelper(bytes: Any, quantile: Any, precision: Any): Double = { + val arrayBytes = bytes.asInstanceOf[Array[Byte]] + val serializer = new PercentileSerializer(precision.asInstanceOf[Int]); + val counter = serializer.deserialize(ByteBuffer.wrap(arrayBytes)) + counter.getResultEstimateWithQuantileRatio(quantile.asInstanceOf[Decimal].toDouble) + } } diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/AggregatePlan.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/AggregatePlan.scala index 6f08e62..ebc1fad 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/AggregatePlan.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/AggregatePlan.scala @@ -59,23 +59,30 @@ object AggregatePlan extends LogEx { // exactly match, skip agg, direct project. val hash = System.identityHashCode(rel).toString val aggCols = rel.getRewriteAggCalls.asScala.zipWithIndex.map { - case (call: KylinAggregateCall, index: Int) - if OLAPAggregateRel.getAggrFuncName(call).equals("COUNT_DISTINCT") => + case (call: KylinAggregateCall, index: Int) => val dataType = call.getFunc.getReturnDataType val funcName = OLAPAggregateRel.getAggrFuncName(call) val argNames = call.getArgList.asScala.map(dataFrame.schema.names.apply(_)) val columnName = argNames.map(col) val aggName = SchemaProcessor.replaceToAggravateSchemaName(index, funcName, hash, argNames: _*) - if (call.isHllCountDistinctFunc) { - KylinFunctions - .approx_count_distinct_decode(columnName.head, dataType.getPrecision) - .alias(aggName) - } else if (call.isBitmapCountDistinctFunc) { - // execute count distinct precisely - KylinFunctions.precise_count_distinct_decode(columnName.head).alias(aggName) - } else { - throw new IllegalArgumentException( - s"""Unsupported function name $funcName""") + funcName match { + case FunctionDesc.FUNC_COUNT_DISTINCT => + if (call.isHllCountDistinctFunc) { + KylinFunctions + .approx_count_distinct_decode(columnName.head, dataType.getPrecision) + .alias(aggName) + } else if (call.isBitmapCountDistinctFunc) { + // execute count distinct precisely + KylinFunctions.precise_count_distinct_decode(columnName.head).alias(aggName) + } else { + throw new IllegalArgumentException( + s"""Unsupported function name $funcName""") + } + case FunctionDesc.FUNC_PERCENTILE => + val aggName = SchemaProcessor.replaceToAggravateSchemaName(index, "PERCENTILE_DECODE", hash, argNames: _*) + KylinFunctions.k_percentile_decode(columnName.head, columnName(1), dataType.getPrecision).alias(aggName) + case _ => + col(schemaNames.apply(call.getArgList.get(0))) } case (call: Any, index: Int) => col(schemaNames.apply(call.getArgList.get(0))) @@ -223,7 +230,7 @@ object AggregatePlan extends LogEx { .isSum0 } - val exactlyMatchSupportedFunctions = List("SUM", "MIN", "MAX", "COUNT_DISTINCT") + val exactlyMatchSupportedFunctions = List("SUM", "MIN", "MAX", "COUNT_DISTINCT", "PERCENTILE", "PERCENTILE_APPROX") def isExactlyCuboidMatched(rel: OLAPAggregateRel, groupByList: List[Column]): Boolean = { val olapContext = rel.getContext @@ -241,7 +248,9 @@ object AggregatePlan extends LogEx { return false } // when using intersect_count and intersect_value - if (call.getArgList.size() > 1) return false + if (call.getArgList.size() > 1 && !OLAPAggregateRel.getAggrFuncName(call).startsWith("PERCENTILE")) { + return false + } } val groupByCols = rel.getGroups.asScala.map(_.getIdentity).toSet if (groupByCols.isEmpty) return false