0lai0 commented on code in PR #3706:
URL: https://github.com/apache/datafusion-comet/pull/3706#discussion_r3006270859
##########
spark/src/main/scala/org/apache/comet/serde/arrays.scala:
##########
@@ -200,6 +201,88 @@ object CometArrayDistinct extends
CometExpressionSerde[ArrayDistinct] {
}
}
+object CometSortArray extends CometExpressionSerde[SortArray] {
+ private def containsFloatingPoint(dt: DataType): Boolean = {
+ dt match {
+ case FloatType | DoubleType => true
+ case ArrayType(elementType, _) => containsFloatingPoint(elementType)
+ case StructType(fields) => fields.exists(f =>
containsFloatingPoint(f.dataType))
+ case MapType(keyType, valueType, _) =>
+ containsFloatingPoint(keyType) || containsFloatingPoint(valueType)
+ case _ => false
+ }
+ }
+
+ private def canRank(dt: DataType, nestedInArray: Boolean = false): Boolean =
{
+ dt match {
+ case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _:
FloatType |
+ _: DoubleType | _: DecimalType =>
+ true
+ case _: DateType | _: TimestampType | _: TimestampNTZType =>
+ true
+ // DataFusion's array_sort compares nested arrays through Arrow's rank
kernel.
+ // That kernel does not support Struct or Null child values,
+ // so array<array<struct<...>>> and array<array<null>> would fail at
runtime.
+ case _: NullType if !nestedInArray =>
+ true
+ case _: BooleanType | _: BinaryType | _: StringType =>
+ true
+ case ArrayType(elementType, _) =>
+ canRank(elementType, nestedInArray = true)
+ case StructType(fields) if !nestedInArray =>
+ fields.forall(f => canRank(f.dataType))
+ case _ =>
+ false
+ }
+ }
+
+ override def getSupportLevel(expr: SortArray): SupportLevel = {
+ val elementType = expr.base.dataType.asInstanceOf[ArrayType].elementType
+
+ if (!canRank(elementType)) {
+ Unsupported(Some(s"Sort on array element type $elementType is not
supported"))
+ } else if (CometConf.COMET_EXEC_STRICT_FLOATING_POINT.get() &&
+ containsFloatingPoint(elementType)) {
+ Incompatible(
+ Some(
+ "Sorting on floating-point is not 100% compatible with Spark, and
Comet is running " +
+ s"with ${CometConf.COMET_EXEC_STRICT_FLOATING_POINT.key}=true. " +
+ s"${CometConf.COMPAT_GUIDE}"))
+ } else {
+ Compatible()
+ }
+ }
+
+ override def convert(
+ expr: SortArray,
+ inputs: Seq[Attribute],
+ binding: Boolean): Option[ExprOuterClass.Expr] = {
+ val arrayExprProto = exprToProtoInternal(expr.base, inputs, binding)
+ val sortDirectionExprProto = expr.ascendingOrder match {
Review Comment:
nit: In the convert method, we are pattern matching on expr.ascendingOrder
twice to extract both the sort direction and the null ordering. We can make
this code more concise by combining them into a single pattern match using
Scala tuples.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]