This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 8e9ff72be0ee [SPARK-52614][SQL][4.0] Support RowEncoder inside Product
Encoder
8e9ff72be0ee is described below
commit 8e9ff72be0eeeab5cfff0081f57cb79a0c9bee04
Author: Emil Ejbyfeldt <[email protected]>
AuthorDate: Mon Oct 6 14:10:34 2025 -0400
[SPARK-52614][SQL][4.0] Support RowEncoder inside Product Encoder
This is backport of SPARK-52614 #51319 to branch-4.0
### What changes were proposed in this pull request?
This fixes support for using a RowEncoder inside a ProductEncoder.
### Why are the changes needed?
The current does a dataType check on a path when contructing the RowEncoder
deserializer. But this is not safe and if the RowEncoder is used inside a
ProductEncoder, it will throw because the path Expression is unresolved.
The check was introduced in https://github.com/apache/spark/pull/49785
### Does this PR introduce _any_ user-facing change?
Yes, it makes it possible to use RowEncoder in more cases.
### How was this patch tested?
Existing and new unit tests.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #52503 from eejbyfeldt/SPARK-52614-4.0.
Authored-by: Emil Ejbyfeldt <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
---
.../sql/catalyst/DeserializerBuildHelper.scala | 28 +++++++++-------------
.../catalyst/encoders/ExpressionEncoderSuite.scala | 16 +++++++++++++
2 files changed, 27 insertions(+), 17 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
index 5d1bbb024074..9dcaba8c2bc4 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
@@ -241,19 +241,12 @@ object DeserializerBuildHelper {
val walkedTypePath =
WalkedTypePath().recordRoot(enc.clsTag.runtimeClass.getName)
// Assumes we are deserializing the first column of a row.
val input = GetColumnByOrdinal(0, enc.dataType)
- enc match {
- case AgnosticEncoders.RowEncoder(fields) =>
- val children = fields.zipWithIndex.map { case (f, i) =>
- createDeserializer(f.enc, GetStructField(input, i), walkedTypePath)
- }
- CreateExternalRow(children, enc.schema)
- case _ =>
- val deserializer = createDeserializer(
- enc,
- upCastToExpectedType(input, enc.dataType, walkedTypePath),
- walkedTypePath)
- expressionWithNullSafety(deserializer, enc.nullable, walkedTypePath)
- }
+ val deserializer = createDeserializer(
+ enc,
+ upCastToExpectedType(input, enc.dataType, walkedTypePath),
+ walkedTypePath,
+ isTopLevel = true)
+ expressionWithNullSafety(deserializer, enc.nullable, walkedTypePath)
}
/**
@@ -265,11 +258,13 @@ object DeserializerBuildHelper {
* external representation.
* @param path The expression which can be used to extract serialized value.
* @param walkedTypePath The paths from top to bottom to access current
field when deserializing.
+ * @param isTopLevel true if we are creating a deserializer for the top
level value.
*/
private def createDeserializer(
enc: AgnosticEncoder[_],
path: Expression,
- walkedTypePath: WalkedTypePath): Expression = enc match {
+ walkedTypePath: WalkedTypePath,
+ isTopLevel: Boolean = false): Expression = enc match {
case ae: AgnosticExpressionPathEncoder[_] =>
ae.fromCatalyst(path)
case _ if isNativeEncoder(enc) =>
@@ -408,13 +403,12 @@ object DeserializerBuildHelper {
NewInstance(cls, arguments, Nil, propagateNull = false, dt,
outerPointerGetter))
case AgnosticEncoders.RowEncoder(fields) =>
- val isExternalRow = !path.dataType.isInstanceOf[StructType]
val convertedFields = fields.zipWithIndex.map { case (f, i) =>
val newTypePath = walkedTypePath.recordField(
f.enc.clsTag.runtimeClass.getName,
f.name)
val deserializer = createDeserializer(f.enc, GetStructField(path, i),
newTypePath)
- if (isExternalRow) {
+ if (!isTopLevel) {
exprs.If(
Invoke(path, "isNullAt", BooleanType, exprs.Literal(i) :: Nil),
exprs.Literal.create(null, externalDataTypeFor(f.enc)),
@@ -460,7 +454,7 @@ object DeserializerBuildHelper {
Literal.create(provider(), ObjectType(classOf[Codec[_, _]])),
"decode",
dataTypeForClass(tag.runtimeClass),
- createDeserializer(encoder, path, walkedTypePath) :: Nil)
+ createDeserializer(encoder, path, walkedTypePath, isTopLevel) :: Nil)
}
private def deserializeArray(
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
index 1b5f1b109c45..3d738fe985dd 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
@@ -659,6 +659,22 @@ class ExpressionEncoderSuite extends
CodegenInterpretedPlanTest with AnalysisTes
assert(fromRow(toRow(new Wrapper(Row(9L, "x")))) == new Wrapper(Row(9L,
"x")))
}
+ test("SPARK-52614: transforming encoder row encoder in product encoder") {
+ val schema = new StructType().add("a", LongType).add("b", StringType)
+ val wrapperEncoder = TransformingEncoder(
+ classTag[Wrapper[Row]],
+ RowEncoder.encoderFor(schema),
+ new WrapperCodecProvider[Row])
+ val encoder = ExpressionEncoder(ProductEncoder(
+ classTag[V[Wrapper[Row]]],
+ Seq(EncoderField("v", wrapperEncoder, nullable = false, Metadata.empty)),
+ None))
+ .resolveAndBind()
+ val toRow = encoder.createSerializer()
+ val fromRow = encoder.createDeserializer()
+ assert(fromRow(toRow(V(new Wrapper(Row(9L, "x"))))) == V(new
Wrapper(Row(9L, "x"))))
+ }
+
// below tests are related to SPARK-49960 and TransformingEncoder usage
test("""Encoder with OptionEncoder of transformation""".stripMargin) {
type T = Option[V[V[Int]]]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]