This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 0a94038dde79 [SPARK-49352][SQL][3.4] Avoid redundant array transform
for identical expression
0a94038dde79 is described below
commit 0a94038dde79bb574a9376965cac8f8a4f229ccf
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Fri Aug 23 22:28:03 2024 -0700
[SPARK-49352][SQL][3.4] Avoid redundant array transform for identical
expression
### What changes were proposed in this pull request?
This patch avoids `ArrayTransform` in `resolveArrayType` function if the
resolution expression is the same as input param.
### Why are the changes needed?
Our customer encounters significant performance regression when migrating
from Spark 3.2 to Spark 3.4 on a `Insert Into` query which is analyzed as a
`AppendData` on an Iceberg table.
We found that the root cause is in Spark 3.4, `TableOutputResolver`
resolves the query with additional `ArrayTransform` on an `ArrayType` field.
The `ArrayTransform`'s lambda function is actually an identical function, i.e.,
the transformation is redundant.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit test and manual e2e test
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #47862 from viirya/fix_redundant_array_transform_3.4.
Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../catalyst/analysis/TableOutputResolver.scala | 12 ++++++--
.../spark/sql/catalyst/util/CharVarcharUtils.scala | 2 +-
.../catalyst/analysis/V2WriteAnalysisSuite.scala | 32 +++++++++++++++++++++-
3 files changed, 42 insertions(+), 4 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
index e1ee0defa239..908711db8503 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
@@ -182,8 +182,16 @@ object TableOutputResolver {
val fakeAttr = AttributeReference("x", expectedType.elementType,
expectedType.containsNull)()
val res = reorderColumnsByName(Seq(param), Seq(fakeAttr), conf,
addError, colPath)
if (res.length == 1) {
- val func = LambdaFunction(res.head, Seq(param))
- Some(Alias(ArrayTransform(input, func), expectedName)())
+ if (res.head == param) {
+ // If the element type is the same, we can reuse the input array
directly.
+ Some(
+ Alias(input, expectedName)(
+ nonInheritableMetadataKeys =
+ Seq(CharVarcharUtils.CHAR_VARCHAR_TYPE_STRING_METADATA_KEY)))
+ } else {
+ val func = LambdaFunction(res.head, Seq(param))
+ Some(Alias(ArrayTransform(input, func), expectedName)())
+ }
} else {
None
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharVarcharUtils.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharVarcharUtils.scala
index 448106343584..5324752ba3f8 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharVarcharUtils.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharVarcharUtils.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.types._
object CharVarcharUtils extends Logging {
- private val CHAR_VARCHAR_TYPE_STRING_METADATA_KEY =
"__CHAR_VARCHAR_TYPE_STRING"
+ private[sql] val CHAR_VARCHAR_TYPE_STRING_METADATA_KEY =
"__CHAR_VARCHAR_TYPE_STRING"
/**
* Replaces CharType/VarcharType with StringType recursively in the given
struct type. If a
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala
index 69cd838cfb24..7d55dd4e97a6 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala
@@ -21,7 +21,7 @@ import java.util.Locale
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference,
Cast, LessThanOrEqual, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, ArrayTransform,
AttributeReference, Cast, LessThanOrEqual, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.internal.SQLConf
@@ -237,6 +237,36 @@ abstract class V2WriteAnalysisSuiteBase extends
AnalysisTest {
def byPosition(table: NamedRelation, query: LogicalPlan): LogicalPlan
+ test("SPARK-49352: Avoid redundant array transform for identical
expression") {
+ def assertArrayField(fromType: ArrayType, toType: ArrayType, hasTransform:
Boolean): Unit = {
+ val table = TestRelation(Seq($"a".int, $"arr".array(toType)))
+ val query = TestRelation(Seq($"arr".array(fromType), $"a".int))
+
+ val writePlan = byName(table, query).analyze
+
+ assertResolved(writePlan)
+ checkAnalysis(writePlan, writePlan)
+
+ val transform = writePlan.children.head.expressions.exists { e =>
+ e.find {
+ case _: ArrayTransform => true
+ case _ => false
+ }.isDefined
+ }
+ if (hasTransform) {
+ assert(transform)
+ } else {
+ assert(!transform)
+ }
+ }
+
+ assertArrayField(ArrayType(LongType), ArrayType(LongType), hasTransform =
false)
+ assertArrayField(
+ ArrayType(new StructType().add("x", "int").add("y", "int")),
+ ArrayType(new StructType().add("y", "int").add("x", "byte")),
+ hasTransform = true)
+ }
+
test("SPARK-33136: output resolved on complex types for V2 write commands") {
def assertTypeCompatibility(name: String, fromType: DataType, toType:
DataType): Unit = {
val table = TestRelation(StructType(Seq(StructField("a",
toType))).toAttributes)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]