grorge123 commented on code in PR #3706:
URL: https://github.com/apache/datafusion-comet/pull/3706#discussion_r3009476525
##########
spark/src/main/scala/org/apache/comet/serde/arrays.scala:
##########
@@ -200,6 +201,88 @@ object CometArrayDistinct extends
CometExpressionSerde[ArrayDistinct] {
}
}
+object CometSortArray extends CometExpressionSerde[SortArray] {
+ private def containsFloatingPoint(dt: DataType): Boolean = {
+ dt match {
+ case FloatType | DoubleType => true
+ case ArrayType(elementType, _) => containsFloatingPoint(elementType)
+ case StructType(fields) => fields.exists(f =>
containsFloatingPoint(f.dataType))
+ case MapType(keyType, valueType, _) =>
+ containsFloatingPoint(keyType) || containsFloatingPoint(valueType)
+ case _ => false
+ }
+ }
+
+ private def canRank(dt: DataType, nestedInArray: Boolean = false): Boolean =
{
+ dt match {
+ case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _:
FloatType |
+ _: DoubleType | _: DecimalType =>
+ true
+ case _: DateType | _: TimestampType | _: TimestampNTZType =>
+ true
+ // DataFusion's array_sort compares nested arrays through Arrow's rank
kernel.
+ // That kernel does not support Struct or Null child values,
+ // so array<array<struct<...>>> and array<array<null>> would fail at
runtime.
+ case _: NullType if !nestedInArray =>
+ true
+ case _: BooleanType | _: BinaryType | _: StringType =>
+ true
+ case ArrayType(elementType, _) =>
+ canRank(elementType, nestedInArray = true)
+ case StructType(fields) if !nestedInArray =>
+ fields.forall(f => canRank(f.dataType))
+ case _ =>
+ false
+ }
+ }
+
+ override def getSupportLevel(expr: SortArray): SupportLevel = {
+ val elementType = expr.base.dataType.asInstanceOf[ArrayType].elementType
+
+ if (!canRank(elementType)) {
+ Unsupported(Some(s"Sort on array element type $elementType is not
supported"))
+ } else if (CometConf.COMET_EXEC_STRICT_FLOATING_POINT.get() &&
+ containsFloatingPoint(elementType)) {
+ Incompatible(
+ Some(
+ "Sorting on floating-point is not 100% compatible with Spark, and
Comet is running " +
+ s"with ${CometConf.COMET_EXEC_STRICT_FLOATING_POINT.key}=true. " +
+ s"${CometConf.COMPAT_GUIDE}"))
+ } else {
+ Compatible()
+ }
+ }
+
+ override def convert(
+ expr: SortArray,
+ inputs: Seq[Attribute],
+ binding: Boolean): Option[ExprOuterClass.Expr] = {
+ val arrayExprProto = exprToProtoInternal(expr.base, inputs, binding)
+ val sortDirectionExprProto = expr.ascendingOrder match {
Review Comment:
I have modified, thank you for your review.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]