parthchandra commented on code in PR #4542:
URL: https://github.com/apache/datafusion-comet/pull/4542#discussion_r3470459651
##########
spark/src/main/scala/org/apache/comet/serde/aggregates.scala:
##########
@@ -592,6 +592,79 @@ object CometStddevPop extends
CometAggregateExpressionSerde[StddevPop] with Come
}
}
+object CometPercentile extends CometAggregateExpressionSerde[Percentile] {
+
+ private val arrayOfPercentagesReason = "An array of percentages is not
supported."
+ private val nonLiteralPercentageReason = "The percentage argument must be a
literal."
+ private val frequencyReason = "A frequency argument is not supported."
+ // `reverse` is set when `percentile_cont`/`percentile_disc` is used with
+ // `WITHIN GROUP (ORDER BY ... DESC)` on Spark 4.0+. The native
`percentile_cont` always
+ // interpolates in ascending order, so the descending form would return a
wrong answer.
+ private val descendingReason =
+ "Descending order in `WITHIN GROUP (ORDER BY ... DESC)` is not supported."
+ private val inputTypeReason = "Only numeric input types are supported."
+
+ override def getUnsupportedReasons(): Seq[String] = Seq(
+ arrayOfPercentagesReason,
+ nonLiteralPercentageReason,
+ frequencyReason,
+ descendingReason,
+ inputTypeReason)
+
+ override def getSupportLevel(expr: Percentile): SupportLevel = {
+ // Only the single-percentage, default-frequency, numeric-input, ascending
form is wired
+ // today. It maps to DataFusion's percentile_cont, which uses the same
`index = p * (n - 1)`
+ // linear interpolation as Spark's exact Percentile. Array-of-percentages,
a non-default
+ // frequency argument, descending order, and interval inputs fall back to
Spark.
+ if (expr.percentageExpression.dataType != DoubleType) {
+ return Unsupported(Some(arrayOfPercentagesReason))
+ }
+ if (!expr.percentageExpression.foldable) {
+ return Unsupported(Some(nonLiteralPercentageReason))
+ }
+ expr.frequencyExpression match {
+ case Literal(1L, _) =>
+ case _ => return Unsupported(Some(frequencyReason))
+ }
+ if (expr.reverse) {
+ return Unsupported(Some(descendingReason))
+ }
+ expr.child.dataType match {
+ case _: NumericType => Compatible(None)
+ case _ => Unsupported(Some(inputTypeReason))
+ }
+ }
+
+ override def convert(
+ aggExpr: AggregateExpression,
+ percentile: Percentile,
+ inputs: Seq[Attribute],
+ binding: Boolean,
+ conf: SQLConf): Option[ExprOuterClass.AggExpr] = {
+ // Spark computes the percentile over the values as doubles; cast the
child up front so the
+ // native percentile_cont returns Float64 / DoubleType to match Spark.
+ val childExpr = exprToProto(Cast(percentile.child, DoubleType), inputs,
binding)
Review Comment:
This is probably redundant. DF (percentile_cont.rs) appears to already
coerce the value to `NAtiveType::Float64`.
FWIW, spark does a cast to Double too, but at [interpolation
time](https://github.com/apache/spark/blob/420b9e2c966df003e02d8ca0067baa0338b0d41a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala#L225)
which probably reduces the chances of precision loss .
##########
spark/src/main/scala/org/apache/comet/serde/aggregates.scala:
##########
@@ -592,6 +592,79 @@ object CometStddevPop extends
CometAggregateExpressionSerde[StddevPop] with Come
}
}
+object CometPercentile extends CometAggregateExpressionSerde[Percentile] {
+
+ private val arrayOfPercentagesReason = "An array of percentages is not
supported."
+ private val nonLiteralPercentageReason = "The percentage argument must be a
literal."
+ private val frequencyReason = "A frequency argument is not supported."
+ // `reverse` is set when `percentile_cont`/`percentile_disc` is used with
+ // `WITHIN GROUP (ORDER BY ... DESC)` on Spark 4.0+. The native
`percentile_cont` always
+ // interpolates in ascending order, so the descending form would return a
wrong answer.
+ private val descendingReason =
+ "Descending order in `WITHIN GROUP (ORDER BY ... DESC)` is not supported."
+ private val inputTypeReason = "Only numeric input types are supported."
+
+ override def getUnsupportedReasons(): Seq[String] = Seq(
+ arrayOfPercentagesReason,
+ nonLiteralPercentageReason,
+ frequencyReason,
+ descendingReason,
+ inputTypeReason)
+
+ override def getSupportLevel(expr: Percentile): SupportLevel = {
+ // Only the single-percentage, default-frequency, numeric-input, ascending
form is wired
+ // today. It maps to DataFusion's percentile_cont, which uses the same
`index = p * (n - 1)`
+ // linear interpolation as Spark's exact Percentile. Array-of-percentages,
a non-default
+ // frequency argument, descending order, and interval inputs fall back to
Spark.
+ if (expr.percentageExpression.dataType != DoubleType) {
+ return Unsupported(Some(arrayOfPercentagesReason))
+ }
+ if (!expr.percentageExpression.foldable) {
+ return Unsupported(Some(nonLiteralPercentageReason))
+ }
+ expr.frequencyExpression match {
+ case Literal(1L, _) =>
+ case _ => return Unsupported(Some(frequencyReason))
+ }
+ if (expr.reverse) {
+ return Unsupported(Some(descendingReason))
+ }
+ expr.child.dataType match {
+ case _: NumericType => Compatible(None)
Review Comment:
Should this be `Compatible(Some(..))` because interpolation precision is
limited to 6 by DF?
##########
spark/src/main/scala/org/apache/spark/sql/comet/operators.scala:
##########
@@ -1920,6 +1920,11 @@ object CometObjectHashAggregateExec
val elementType = cs.children.head.dataType
val nativeStateType = ArrayType(elementType, containsNull = true)
output(bufferIdx) = output(bufferIdx).withDataType(nativeStateType)
+ case _: Percentile =>
+ // DataFusion's percentile_cont keeps all values in a List<Float64>
partial state.
+ // Comet casts the child to double, so the native state is
ArrayType(DoubleType).
+ val nativeStateType = ArrayType(DoubleType, containsNull = true)
Review Comment:
DF's percentile_cont skips nulls by using `.iter().flatten()` so this can be
`containsNull = false`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]