This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new e21c2d3  [SPARK-33094][SQL][2.4] Make ORC format propagate Hadoop 
config from DS options to underlying HDFS file system
e21c2d3 is described below

commit e21c2d35bc87cf0803674640b00a009fa2a2d480
Author: Max Gekk <[email protected]>
AuthorDate: Sat Oct 10 14:18:00 2020 +0900

    [SPARK-33094][SQL][2.4] Make ORC format propagate Hadoop config from DS 
options to underlying HDFS file system
    
    ### What changes were proposed in this pull request?
    Propagate ORC options to Hadoop configs in Hive `OrcFileFormat` and in the 
regular ORC datasource.
    
    ### Why are the changes needed?
    There is a bug that when running:
    ```scala
    spark.read.format("orc").options(conf).load(path)
    ```
    The underlying file system will not receive the conf options.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes
    
    ### How was this patch tested?
    Added UT to `OrcSourceSuite`.
    
    Closes #29987 from MaxGekk/orc-option-propagation-2.4.
    
    Authored-by: Max Gekk <[email protected]>
    Signed-off-by: HyukjinKwon <[email protected]>
---
 .../sql/execution/datasources/orc/OrcFileFormat.scala     |  2 +-
 .../spark/sql/execution/datasources/orc/OrcUtils.scala    |  4 ++--
 .../sql/execution/datasources/orc/OrcSourceSuite.scala    | 15 ++++++++++++++-
 3 files changed, 17 insertions(+), 4 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index 4574f82..a2a7ed5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -94,7 +94,7 @@ class OrcFileFormat
       sparkSession: SparkSession,
       options: Map[String, String],
       files: Seq[FileStatus]): Option[StructType] = {
-    OrcUtils.readSchema(sparkSession, files)
+    OrcUtils.readSchema(sparkSession, files, options)
   }
 
   override def prepareWrite(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
index 57d2c56..d929ff6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
@@ -77,10 +77,10 @@ object OrcUtils extends Logging {
     }
   }
 
-  def readSchema(sparkSession: SparkSession, files: Seq[FileStatus])
+  def readSchema(sparkSession: SparkSession, files: Seq[FileStatus], options: 
Map[String, String])
       : Option[StructType] = {
     val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
-    val conf = sparkSession.sessionState.newHadoopConf()
+    val conf = sparkSession.sessionState.newHadoopConfWithOptions(options)
     // TODO: We need to support merge schema. Please see SPARK-11412.
     files.toIterator.map(file => readSchema(file.getPath, conf, 
ignoreCorruptFiles)).collectFirst {
       case Some(schema) =>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
index 70079f0..fa409bb 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
@@ -31,7 +31,7 @@ import org.apache.orc.impl.RecordReaderImpl
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.SPARK_VERSION_SHORT
-import org.apache.spark.sql.{Row, SPARK_VERSION_METADATA_KEY}
+import org.apache.spark.sql.{FakeFileSystemRequiringDSOption, Row, 
SPARK_VERSION_METADATA_KEY}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.util.Utils
@@ -261,6 +261,19 @@ abstract class OrcSuite extends OrcTest with 
BeforeAndAfterAll {
       assert(version === SPARK_VERSION_SHORT)
     }
   }
+
+  test("SPARK-33094: should propagate Hadoop config from DS options to 
underlying file system") {
+    withSQLConf(
+      "fs.file.impl" -> classOf[FakeFileSystemRequiringDSOption].getName,
+      "fs.file.impl.disable.cache" -> "true") {
+      withTempPath { dir =>
+        val path = dir.getAbsolutePath
+        val conf = Map("ds_option" -> "value")
+        spark.range(1).write.options(conf).orc(path)
+        checkAnswer(spark.read.options(conf).orc(path), Row(0))
+      }
+    }
+  }
 }
 
 class OrcSourceSuite extends OrcSuite with SharedSQLContext {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to