This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 6d0a271c059 [SPARK-41660][SQL][3.3] Only propagate metadata columns if 
they are used
6d0a271c059 is described below

commit 6d0a271c0595d46384e18f8d292afe2d2e04e2c2
Author: huaxingao <[email protected]>
AuthorDate: Fri Apr 21 07:46:00 2023 -0700

    [SPARK-41660][SQL][3.3] Only propagate metadata columns if they are used
    
    ### What changes were proposed in this pull request?
    backporting https://github.com/apache/spark/pull/39152 to 3.3
    
    ### Why are the changes needed?
    bug fixing
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    UT
    
    Closes #40889 from huaxingao/metadata.
    
    Authored-by: huaxingao <[email protected]>
    Signed-off-by: huaxingao <[email protected]>
---
 .../apache/spark/sql/catalyst/analysis/Analyzer.scala   | 16 ++++++++++------
 .../spark/sql/connector/MetadataColumnSuite.scala       | 17 +++++++++++++++++
 2 files changed, 27 insertions(+), 6 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 4a5c0b4aa88..526dfd8ab5e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -925,7 +925,7 @@ class Analyzer(override val catalogManager: CatalogManager)
         if (metaCols.isEmpty) {
           node
         } else {
-          val newNode = addMetadataCol(node)
+          val newNode = node.mapChildren(addMetadataCol(_, 
metaCols.map(_.exprId).toSet))
           // We should not change the output schema of the plan. We should 
project away the extra
           // metadata columns if necessary.
           if (newNode.sameOutput(node)) {
@@ -959,16 +959,20 @@ class Analyzer(override val catalogManager: 
CatalogManager)
       })
     }
 
-    private def addMetadataCol(plan: LogicalPlan): LogicalPlan = plan match {
-      case s: ExposesMetadataColumns => s.withMetadataColumns()
-      case p: Project =>
+    private def addMetadataCol(
+        plan: LogicalPlan,
+        requiredAttrIds: Set[ExprId]): LogicalPlan = plan match {
+      case s: ExposesMetadataColumns if s.metadataOutput.exists(a =>
+        requiredAttrIds.contains(a.exprId)) =>
+        s.withMetadataColumns()
+      case p: Project if p.metadataOutput.exists(a => 
requiredAttrIds.contains(a.exprId)) =>
         val newProj = p.copy(
           // Do not leak the qualified-access-only restriction to normal plan 
outputs.
           projectList = p.projectList ++ 
p.metadataOutput.map(_.markAsAllowAnyAccess()),
-          child = addMetadataCol(p.child))
+          child = addMetadataCol(p.child, requiredAttrIds))
         newProj.copyTagsFrom(p)
         newProj
-      case _ => plan.withNewChildren(plan.children.map(addMetadataCol))
+      case _ => plan.withNewChildren(plan.children.map(addMetadataCol(_, 
requiredAttrIds)))
     }
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala
index 7f0e74f6bc7..70338bffed0 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.connector
 
 import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.functions.struct
 
 class MetadataColumnSuite extends DatasourceV2SQLBase {
@@ -232,4 +233,20 @@ class MetadataColumnSuite extends DatasourceV2SQLBase {
       )
     }
   }
+
+  test("SPARK-41660: only propagate metadata columns if they are used") {
+    withTable(tbl) {
+      prepareTable()
+      val df = sql(s"SELECT t2.id FROM $tbl t1 JOIN $tbl t2 USING (id)")
+      val scans = df.logicalPlan.collect {
+        case d: DataSourceV2Relation => d
+      }
+      assert(scans.length == 2)
+      scans.foreach { scan =>
+        // The query only access join hidden columns, and scan nodes should 
not expose its metadata
+        // columns.
+        assert(scan.output.map(_.name) == Seq("id", "data"))
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to