This is an automated email from the ASF dual-hosted git repository.
huaxingao pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 6d0a271c059 [SPARK-41660][SQL][3.3] Only propagate metadata columns if
they are used
6d0a271c059 is described below
commit 6d0a271c0595d46384e18f8d292afe2d2e04e2c2
Author: huaxingao <[email protected]>
AuthorDate: Fri Apr 21 07:46:00 2023 -0700
[SPARK-41660][SQL][3.3] Only propagate metadata columns if they are used
### What changes were proposed in this pull request?
backporting https://github.com/apache/spark/pull/39152 to 3.3
### Why are the changes needed?
bug fixing
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT
Closes #40889 from huaxingao/metadata.
Authored-by: huaxingao <[email protected]>
Signed-off-by: huaxingao <[email protected]>
---
.../apache/spark/sql/catalyst/analysis/Analyzer.scala | 16 ++++++++++------
.../spark/sql/connector/MetadataColumnSuite.scala | 17 +++++++++++++++++
2 files changed, 27 insertions(+), 6 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 4a5c0b4aa88..526dfd8ab5e 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -925,7 +925,7 @@ class Analyzer(override val catalogManager: CatalogManager)
if (metaCols.isEmpty) {
node
} else {
- val newNode = addMetadataCol(node)
+ val newNode = node.mapChildren(addMetadataCol(_,
metaCols.map(_.exprId).toSet))
// We should not change the output schema of the plan. We should
project away the extra
// metadata columns if necessary.
if (newNode.sameOutput(node)) {
@@ -959,16 +959,20 @@ class Analyzer(override val catalogManager:
CatalogManager)
})
}
- private def addMetadataCol(plan: LogicalPlan): LogicalPlan = plan match {
- case s: ExposesMetadataColumns => s.withMetadataColumns()
- case p: Project =>
+ private def addMetadataCol(
+ plan: LogicalPlan,
+ requiredAttrIds: Set[ExprId]): LogicalPlan = plan match {
+ case s: ExposesMetadataColumns if s.metadataOutput.exists(a =>
+ requiredAttrIds.contains(a.exprId)) =>
+ s.withMetadataColumns()
+ case p: Project if p.metadataOutput.exists(a =>
requiredAttrIds.contains(a.exprId)) =>
val newProj = p.copy(
// Do not leak the qualified-access-only restriction to normal plan
outputs.
projectList = p.projectList ++
p.metadataOutput.map(_.markAsAllowAnyAccess()),
- child = addMetadataCol(p.child))
+ child = addMetadataCol(p.child, requiredAttrIds))
newProj.copyTagsFrom(p)
newProj
- case _ => plan.withNewChildren(plan.children.map(addMetadataCol))
+ case _ => plan.withNewChildren(plan.children.map(addMetadataCol(_,
requiredAttrIds)))
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala
index 7f0e74f6bc7..70338bffed0 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.connector
import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.functions.struct
class MetadataColumnSuite extends DatasourceV2SQLBase {
@@ -232,4 +233,20 @@ class MetadataColumnSuite extends DatasourceV2SQLBase {
)
}
}
+
+ test("SPARK-41660: only propagate metadata columns if they are used") {
+ withTable(tbl) {
+ prepareTable()
+ val df = sql(s"SELECT t2.id FROM $tbl t1 JOIN $tbl t2 USING (id)")
+ val scans = df.logicalPlan.collect {
+ case d: DataSourceV2Relation => d
+ }
+ assert(scans.length == 2)
+ scans.foreach { scan =>
+ // The query only access join hidden columns, and scan nodes should
not expose its metadata
+ // columns.
+ assert(scan.output.map(_.name) == Seq("id", "data"))
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]