This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new e21c2d3 [SPARK-33094][SQL][2.4] Make ORC format propagate Hadoop
config from DS options to underlying HDFS file system
e21c2d3 is described below
commit e21c2d35bc87cf0803674640b00a009fa2a2d480
Author: Max Gekk <[email protected]>
AuthorDate: Sat Oct 10 14:18:00 2020 +0900
[SPARK-33094][SQL][2.4] Make ORC format propagate Hadoop config from DS
options to underlying HDFS file system
### What changes were proposed in this pull request?
Propagate ORC options to Hadoop configs in Hive `OrcFileFormat` and in the
regular ORC datasource.
### Why are the changes needed?
There is a bug that when running:
```scala
spark.read.format("orc").options(conf).load(path)
```
The underlying file system will not receive the conf options.
### Does this PR introduce _any_ user-facing change?
Yes
### How was this patch tested?
Added UT to `OrcSourceSuite`.
Closes #29987 from MaxGekk/orc-option-propagation-2.4.
Authored-by: Max Gekk <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
---
.../sql/execution/datasources/orc/OrcFileFormat.scala | 2 +-
.../spark/sql/execution/datasources/orc/OrcUtils.scala | 4 ++--
.../sql/execution/datasources/orc/OrcSourceSuite.scala | 15 ++++++++++++++-
3 files changed, 17 insertions(+), 4 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index 4574f82..a2a7ed5 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -94,7 +94,7 @@ class OrcFileFormat
sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
- OrcUtils.readSchema(sparkSession, files)
+ OrcUtils.readSchema(sparkSession, files, options)
}
override def prepareWrite(
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
index 57d2c56..d929ff6 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
@@ -77,10 +77,10 @@ object OrcUtils extends Logging {
}
}
- def readSchema(sparkSession: SparkSession, files: Seq[FileStatus])
+ def readSchema(sparkSession: SparkSession, files: Seq[FileStatus], options:
Map[String, String])
: Option[StructType] = {
val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
- val conf = sparkSession.sessionState.newHadoopConf()
+ val conf = sparkSession.sessionState.newHadoopConfWithOptions(options)
// TODO: We need to support merge schema. Please see SPARK-11412.
files.toIterator.map(file => readSchema(file.getPath, conf,
ignoreCorruptFiles)).collectFirst {
case Some(schema) =>
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
index 70079f0..fa409bb 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
@@ -31,7 +31,7 @@ import org.apache.orc.impl.RecordReaderImpl
import org.scalatest.BeforeAndAfterAll
import org.apache.spark.SPARK_VERSION_SHORT
-import org.apache.spark.sql.{Row, SPARK_VERSION_METADATA_KEY}
+import org.apache.spark.sql.{FakeFileSystemRequiringDSOption, Row,
SPARK_VERSION_METADATA_KEY}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.Utils
@@ -261,6 +261,19 @@ abstract class OrcSuite extends OrcTest with
BeforeAndAfterAll {
assert(version === SPARK_VERSION_SHORT)
}
}
+
+ test("SPARK-33094: should propagate Hadoop config from DS options to
underlying file system") {
+ withSQLConf(
+ "fs.file.impl" -> classOf[FakeFileSystemRequiringDSOption].getName,
+ "fs.file.impl.disable.cache" -> "true") {
+ withTempPath { dir =>
+ val path = dir.getAbsolutePath
+ val conf = Map("ds_option" -> "value")
+ spark.range(1).write.options(conf).orc(path)
+ checkAnswer(spark.read.options(conf).orc(path), Row(0))
+ }
+ }
+ }
}
class OrcSourceSuite extends OrcSuite with SharedSQLContext {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]