This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new f4bcefbe8b33 [SPARK-49352][SQL][3.5] Avoid redundant array transform
for identical expression
f4bcefbe8b33 is described below
commit f4bcefbe8b33f6d8e64d2542eb69ea271e6a97c5
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Fri Aug 23 22:26:39 2024 -0700
[SPARK-49352][SQL][3.5] Avoid redundant array transform for identical
expression
### What changes were proposed in this pull request?
This patch avoids `ArrayTransform` in `resolveArrayType` function if the
resolution expression is the same as input param.
### Why are the changes needed?
Our customer encounters significant performance regression when migrating
from Spark 3.2 to Spark 3.4 on a `Insert Into` query which is analyzed as a
`AppendData` on an Iceberg table.
We found that the root cause is in Spark 3.4, `TableOutputResolver`
resolves the query with additional `ArrayTransform` on an `ArrayType` field.
The `ArrayTransform`'s lambda function is actually an identical function, i.e.,
the transformation is redundant.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit test and manual e2e test
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #47863 from viirya/fix_redundant_array_transform_3.5.
Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../catalyst/analysis/TableOutputResolver.scala | 12 ++++++--
.../catalyst/analysis/V2WriteAnalysisSuite.scala | 32 +++++++++++++++++++++-
2 files changed, 41 insertions(+), 3 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
index 42abc0eafda7..fabb5634ad10 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
@@ -371,8 +371,16 @@ object TableOutputResolver {
resolveColumnsByPosition(tableName, Seq(param), Seq(fakeAttr), conf,
addError, colPath)
}
if (res.length == 1) {
- val func = LambdaFunction(res.head, Seq(param))
- Some(Alias(ArrayTransform(nullCheckedInput, func), expected.name)())
+ if (res.head == param) {
+ // If the element type is the same, we can reuse the input array
directly.
+ Some(
+ Alias(nullCheckedInput, expected.name)(
+ nonInheritableMetadataKeys =
+ Seq(CharVarcharUtils.CHAR_VARCHAR_TYPE_STRING_METADATA_KEY)))
+ } else {
+ val func = LambdaFunction(res.head, Seq(param))
+ Some(Alias(ArrayTransform(nullCheckedInput, func), expected.name)())
+ }
} else {
None
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala
index d91a080d8fe8..21a049e91418 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala
@@ -22,7 +22,7 @@ import java.util.Locale
import org.apache.spark.QueryContext
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference,
Cast, CreateNamedStruct, GetStructField, If, IsNull, LessThanOrEqual, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, ArrayTransform,
AttributeReference, Cast, CreateNamedStruct, GetStructField, If, IsNull,
LessThanOrEqual, Literal}
import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
@@ -304,6 +304,36 @@ abstract class V2WriteAnalysisSuiteBase extends
AnalysisTest {
def byPosition(table: NamedRelation, query: LogicalPlan): LogicalPlan
+ test("SPARK-49352: Avoid redundant array transform for identical
expression") {
+ def assertArrayField(fromType: ArrayType, toType: ArrayType, hasTransform:
Boolean): Unit = {
+ val table = TestRelation(Seq($"a".int, $"arr".array(toType)))
+ val query = TestRelation(Seq($"arr".array(fromType), $"a".int))
+
+ val writePlan = byName(table, query).analyze
+
+ assertResolved(writePlan)
+ checkAnalysis(writePlan, writePlan)
+
+ val transform = writePlan.children.head.expressions.exists { e =>
+ e.find {
+ case _: ArrayTransform => true
+ case _ => false
+ }.isDefined
+ }
+ if (hasTransform) {
+ assert(transform)
+ } else {
+ assert(!transform)
+ }
+ }
+
+ assertArrayField(ArrayType(LongType), ArrayType(LongType), hasTransform =
false)
+ assertArrayField(
+ ArrayType(new StructType().add("x", "int").add("y", "int")),
+ ArrayType(new StructType().add("y", "int").add("x", "byte")),
+ hasTransform = true)
+ }
+
test("SPARK-33136: output resolved on complex types for V2 write commands") {
def assertTypeCompatibility(name: String, fromType: DataType, toType:
DataType): Unit = {
val table = TestRelation(StructType(Seq(StructField("a", toType))))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]