This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/main by this push:
     new bf02b8c  KYLIN-5082,exactly match for percentile
bf02b8c is described below

commit bf02b8c2d33bfd27cc29d4ce8fd5faff92be9c77
Author: leocoder <leocod...@gmail.com>
AuthorDate: Sat Sep 4 10:19:03 2021 +0800

    KYLIN-5082,exactly match for percentile
---
 .../measure/percentile/PercentileCounter.java      |  7 ++++
 .../measure/percentile/PercentileSerializer.java   |  4 +++
 .../resources/query/sql_exactly_agg/query11.sql    | 21 ++++++++++++
 .../org/apache/spark/sql/KylinFunctions.scala      |  8 ++---
 .../catalyst/expressions/KylinExpresssions.scala   | 28 ++++++++++++++++
 .../sql/catalyst/expressions/ExpressionUtils.scala | 13 ++++++--
 .../kylin/query/runtime/plans/AggregatePlan.scala  | 37 ++++++++++++++--------
 7 files changed, 98 insertions(+), 20 deletions(-)

diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileCounter.java
 
b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileCounter.java
index 33433dc..a6115ea 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileCounter.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileCounter.java
@@ -64,6 +64,13 @@ public class PercentileCounter implements Serializable {
         return registers.quantile(quantileRatio);
     }
 
+    public Double getResultEstimateWithQuantileRatio(double quantileRatio) {
+        if (registers.size() == 0) {
+            return null;
+        }
+        return registers.quantile(quantileRatio);
+    }
+
     public void writeRegisters(ByteBuffer out) {
         registers.compress();
         registers.asSmallBytes(out);
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileSerializer.java
 
b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileSerializer.java
index d0ecba7..f83e2d5 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileSerializer.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileSerializer.java
@@ -33,6 +33,10 @@ public class PercentileSerializer extends 
DataTypeSerializer<PercentileCounter>
         this.compression = type.getPrecision();
     }
 
+    public PercentileSerializer(int precision) {
+        this.compression = precision;
+    }
+
     @Override
     public int peekLength(ByteBuffer in) {
         return current().peekLength(in);
diff --git a/kylin-it/src/test/resources/query/sql_exactly_agg/query11.sql 
b/kylin-it/src/test/resources/query/sql_exactly_agg/query11.sql
new file mode 100644
index 0000000..1082d5a
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_exactly_agg/query11.sql
@@ -0,0 +1,21 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+SELECT percentile(price, 0.6) from test_kylin_fact
+group by LSTG_FORMAT_NAME, LSTG_SITE_ID, SLR_SEGMENT_CD
+;{"scanRowCount":300,"scanBytes":0,"scanFiles":1,"cuboidId":[14336],"exactlyMatched":[true]}
\ No newline at end of file
diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinFunctions.scala
 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinFunctions.scala
index 63c90dc..00fd8db 100644
--- 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinFunctions.scala
+++ 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinFunctions.scala
@@ -22,10 +22,7 @@ import org.apache.spark.sql.catalyst.FunctionIdentifier
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode}
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.catalyst.expressions.{ApproxCountDistinctDecode, 
BinaryExpression,
-  DictEncode, Expression, ExpressionInfo, ExpressionUtils, 
ImplicitCastInputTypes, In,
-  KylinAddMonths, Like, Literal, PreciseCountDistinctDecode, RoundBase, 
ScatterSkewData, SplitPart, Sum0,
-  TimestampAdd, TimestampDiff, Truncate, UnaryExpression}
+import org.apache.spark.sql.catalyst.expressions.{ApproxCountDistinctDecode, 
BinaryExpression, DictEncode, Expression, ExpressionInfo, ExpressionUtils, 
ImplicitCastInputTypes, In, KylinAddMonths, Like, Literal, PercentileDecode, 
PreciseCountDistinctDecode, RoundBase, ScatterSkewData, SplitPart, Sum0, 
TimestampAdd, TimestampDiff, Truncate, UnaryExpression}
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction
 import org.apache.spark.sql.udaf.{ApproxCountDistinct, IntersectCount, 
PreciseCountDistinct}
 
@@ -75,6 +72,9 @@ object KylinFunctions {
   def approx_count_distinct_decode(column: Column, precision: Int): Column =
     Column(ApproxCountDistinctDecode(column.expr, Literal(precision)))
 
+  def k_percentile_decode(column: Column, p: Column, precision: Int): Column =
+    Column(PercentileDecode(column.expr, p.expr, Literal(precision)))
+
   def precise_count_distinct(column: Column): Column =
     Column(PreciseCountDistinct(column.expr).toAggregateExpression())
 
diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KylinExpresssions.scala
 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KylinExpresssions.scala
index b47ff90..b9a9112 100644
--- 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KylinExpresssions.scala
+++ 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KylinExpresssions.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.expressions
 import com.esotericsoftware.kryo.io.{Input, KryoDataInput}
 import org.apache.kylin.engine.spark.common.util.KylinDateTimeUtils
 import org.apache.kylin.measure.hllc.HLLCounter
+import org.apache.kylin.measure.percentile.PercentileSerializer
 import org.apache.spark.dict.{NBucketDictionary, NGlobalDictionary}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate
@@ -543,4 +544,31 @@ case class ScatterSkewData(left: Expression, right: 
Expression) extends BinaryEx
   override def dataType: DataType = StringType
 
   override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, 
AnyDataType)
+}
+
+case class PercentileDecode(bytes: Expression, quantile: Expression, 
precision: Expression) extends TernaryExpression with ExpectsInputTypes {
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType, 
DecimalType, IntegerType)
+
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode = {
+    val expressionUtils = ExpressionUtils.getClass.getName.stripSuffix("$")
+    defineCodeGen(ctx, ev, (bytes, quantile, precision) => {
+      s"""$expressionUtils.percentileDecodeHelper($bytes, $quantile, 
$precision)"""
+    })
+  }
+
+  override protected def nullSafeEval(bytes: Any, quantile: Any, precision: 
Any): Any = {
+    val arrayBytes = bytes.asInstanceOf[Array[Byte]]
+    val serializer = new PercentileSerializer(precision.asInstanceOf[Int]);
+    val counter = serializer.deserialize(ByteBuffer.wrap(arrayBytes))
+    
counter.getResultEstimateWithQuantileRatio(quantile.asInstanceOf[Decimal].toDouble)
+  }
+
+  override def dataType: DataType = DoubleType
+
+  override def prettyName: String = "percentile_decode"
+
+  override def nullable: Boolean = false
+
+  override def children: Seq[Expression] = Seq(bytes, quantile, precision)
 }
\ No newline at end of file
diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala
 
b/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala
index 232f6cc..31d157a 100644
--- 
a/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala
+++ 
b/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala
@@ -18,15 +18,17 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
+import org.apache.kylin.measure.percentile.PercentileSerializer
 import scala.util.{Failure, Success, Try}
 import scala.reflect.ClassTag
 import org.apache.spark.sql.AnalysisException
 import 
org.apache.spark.sql.catalyst.analysis.FunctionRegistry.{FunctionBuilder, 
expressions}
 import org.apache.spark.sql.execution.datasources.DataSourceStrategy
 import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.Decimal
+import java.nio.ByteBuffer
 
-object
-ExpressionUtils {
+object ExpressionUtils {
   def expression[T <: Expression](name: String)
     (implicit tag: ClassTag[T]): (String, (ExpressionInfo, FunctionBuilder)) = 
{
 
@@ -109,4 +111,11 @@ ExpressionUtils {
       new ExpressionInfo(clazz.getCanonicalName, name)
     }
   }
+
+  def percentileDecodeHelper(bytes: Any, quantile: Any, precision: Any): 
Double = {
+    val arrayBytes = bytes.asInstanceOf[Array[Byte]]
+    val serializer = new PercentileSerializer(precision.asInstanceOf[Int]);
+    val counter = serializer.deserialize(ByteBuffer.wrap(arrayBytes))
+    
counter.getResultEstimateWithQuantileRatio(quantile.asInstanceOf[Decimal].toDouble)
+  }
 }
diff --git 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/AggregatePlan.scala
 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/AggregatePlan.scala
index 6f08e62..ebc1fad 100644
--- 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/AggregatePlan.scala
+++ 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/AggregatePlan.scala
@@ -59,23 +59,30 @@ object AggregatePlan extends LogEx {
       // exactly match, skip agg, direct project.
       val hash = System.identityHashCode(rel).toString
       val aggCols = rel.getRewriteAggCalls.asScala.zipWithIndex.map {
-        case (call: KylinAggregateCall, index: Int)
-          if OLAPAggregateRel.getAggrFuncName(call).equals("COUNT_DISTINCT") =>
+        case (call: KylinAggregateCall, index: Int) =>
           val dataType = call.getFunc.getReturnDataType
           val funcName = OLAPAggregateRel.getAggrFuncName(call)
           val argNames = 
call.getArgList.asScala.map(dataFrame.schema.names.apply(_))
           val columnName = argNames.map(col)
           val aggName = SchemaProcessor.replaceToAggravateSchemaName(index, 
funcName, hash, argNames: _*)
-          if (call.isHllCountDistinctFunc) {
-            KylinFunctions
-              .approx_count_distinct_decode(columnName.head, 
dataType.getPrecision)
-              .alias(aggName)
-          } else if (call.isBitmapCountDistinctFunc) {
-            // execute count distinct precisely
-            
KylinFunctions.precise_count_distinct_decode(columnName.head).alias(aggName)
-          } else {
-            throw new IllegalArgumentException(
-              s"""Unsupported function name $funcName""")
+          funcName match {
+            case FunctionDesc.FUNC_COUNT_DISTINCT =>
+              if (call.isHllCountDistinctFunc) {
+                KylinFunctions
+                  .approx_count_distinct_decode(columnName.head, 
dataType.getPrecision)
+                  .alias(aggName)
+              } else if (call.isBitmapCountDistinctFunc) {
+                // execute count distinct precisely
+                
KylinFunctions.precise_count_distinct_decode(columnName.head).alias(aggName)
+              } else {
+                throw new IllegalArgumentException(
+                  s"""Unsupported function name $funcName""")
+              }
+            case FunctionDesc.FUNC_PERCENTILE =>
+              val aggName = 
SchemaProcessor.replaceToAggravateSchemaName(index, "PERCENTILE_DECODE", hash, 
argNames: _*)
+              KylinFunctions.k_percentile_decode(columnName.head, 
columnName(1), dataType.getPrecision).alias(aggName)
+            case _ =>
+              col(schemaNames.apply(call.getArgList.get(0)))
           }
         case (call: Any, index: Int) =>
           col(schemaNames.apply(call.getArgList.get(0)))
@@ -223,7 +230,7 @@ object AggregatePlan extends LogEx {
       .isSum0
   }
 
-  val exactlyMatchSupportedFunctions = List("SUM", "MIN", "MAX", 
"COUNT_DISTINCT")
+  val exactlyMatchSupportedFunctions = List("SUM", "MIN", "MAX", 
"COUNT_DISTINCT", "PERCENTILE", "PERCENTILE_APPROX")
 
   def isExactlyCuboidMatched(rel: OLAPAggregateRel, groupByList: 
List[Column]): Boolean = {
     val olapContext = rel.getContext
@@ -241,7 +248,9 @@ object AggregatePlan extends LogEx {
         return false
       }
       // when using intersect_count and intersect_value
-      if (call.getArgList.size() > 1) return false
+      if (call.getArgList.size() > 1 && 
!OLAPAggregateRel.getAggrFuncName(call).startsWith("PERCENTILE")) {
+        return false
+      }
     }
     val groupByCols = rel.getGroups.asScala.map(_.getIdentity).toSet
     if (groupByCols.isEmpty) return false

Reply via email to