Repository: spark
Updated Branches:
  refs/heads/master 92f66331b -> 5cb03220a


[SPARK-14912][SQL] Propagate data source options to Hadoop configuration

## What changes were proposed in this pull request?
We currently have no way for users to propagate options to the underlying 
library that rely in Hadoop configurations to work. For example, there are 
various options in parquet-mr that users might want to set, but the data source 
API does not expose a per-job way to set it. This patch propagates the 
user-specified options also into Hadoop Configuration.

## How was this patch tested?
Used a mock data source implementation to test both the read path and the write 
path.

Author: Reynold Xin <[email protected]>

Closes #12688 from rxin/SPARK-14912.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5cb03220
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5cb03220
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5cb03220

Branch: refs/heads/master
Commit: 5cb03220a02c70d343e82d69cfd30edb894595a1
Parents: 92f6633
Author: Reynold Xin <[email protected]>
Authored: Tue Apr 26 10:58:56 2016 -0700
Committer: Yin Huai <[email protected]>
Committed: Tue Apr 26 10:58:56 2016 -0700

----------------------------------------------------------------------
 .../spark/ml/source/libsvm/LibSVMRelation.scala | 10 +++---
 .../datasources/FileSourceStrategy.scala        |  7 ++++-
 .../InsertIntoHadoopFsRelation.scala            |  4 +++
 .../datasources/csv/DefaultSource.scala         |  9 +++---
 .../datasources/fileSourceInterfaces.scala      |  3 +-
 .../datasources/json/JSONRelation.scala         | 12 ++++----
 .../datasources/parquet/ParquetRelation.scala   | 23 +++++++-------
 .../datasources/text/DefaultSource.scala        | 12 ++++----
 .../datasources/FileSourceStrategySuite.scala   |  4 ++-
 .../apache/spark/sql/hive/orc/OrcRelation.scala | 17 +++++------
 .../SimpleTextHadoopFsRelationSuite.scala       | 23 ++++++++++++++
 .../spark/sql/sources/SimpleTextRelation.scala  | 32 ++++++++++++--------
 12 files changed, 99 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5cb03220/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala 
b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
index f07374a..ba2e1e2 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
@@ -181,20 +181,20 @@ class DefaultSource extends FileFormat with 
DataSourceRegister {
       partitionSchema: StructType,
       requiredSchema: StructType,
       filters: Seq[Filter],
-      options: Map[String, String]): (PartitionedFile) => 
Iterator[InternalRow] = {
+      options: Map[String, String],
+      hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = 
{
     verifySchema(dataSchema)
     val numFeatures = options("numFeatures").toInt
     assert(numFeatures > 0)
 
     val sparse = options.getOrElse("vectorType", "sparse") == "sparse"
 
-    val broadcastedConf = sparkSession.sparkContext.broadcast(
-      new SerializableConfiguration(
-        new Configuration(sparkSession.sparkContext.hadoopConfiguration)))
+    val broadcastedHadoopConf =
+      sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
 
     (file: PartitionedFile) => {
       val points =
-        new HadoopFileLinesReader(file, broadcastedConf.value.value)
+        new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value)
           .map(_.toString.trim)
           .filterNot(line => line.isEmpty || line.startsWith("#"))
           .map { line =>

http://git-wip-us.apache.org/repos/asf/spark/blob/5cb03220/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 9e1308b..c26cae8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources
 
 import scala.collection.mutable.ArrayBuffer
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, 
Path}
 
 import org.apache.spark.internal.Logging
@@ -106,13 +107,17 @@ private[sql] object FileSourceStrategy extends Strategy 
with Logging {
       val pushedDownFilters = 
dataFilters.flatMap(DataSourceStrategy.translateFilter)
       logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}")
 
+      val hadoopConf = new 
Configuration(files.sparkSession.sessionState.hadoopConf)
+      files.options.foreach { case (k, v) => if (v ne null) hadoopConf.set(k, 
v) }
+
       val readFile = files.fileFormat.buildReader(
         sparkSession = files.sparkSession,
         dataSchema = files.dataSchema,
         partitionSchema = files.partitionSchema,
         requiredSchema = prunedDataSchema,
         filters = pushedDownFilters,
-        options = files.options)
+        options = files.options,
+        hadoopConf = hadoopConf)
 
       val plannedPartitions = files.bucketSpec match {
         case Some(bucketing) if 
files.sparkSession.sessionState.conf.bucketingEnabled =>

http://git-wip-us.apache.org/repos/asf/spark/blob/5cb03220/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
index b2483e6..fa95497 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
@@ -106,6 +106,10 @@ private[sql] case class InsertIntoHadoopFsRelation(
       val job = Job.getInstance(hadoopConf)
       job.setOutputKeyClass(classOf[Void])
       job.setOutputValueClass(classOf[InternalRow])
+
+      // Also set the options in Hadoop Configuration
+      options.foreach { case (k, v) => if (v ne null) 
job.getConfiguration.set(k, v) }
+
       FileOutputFormat.setOutputPath(job, qualifiedOutputPath)
 
       val partitionSet = AttributeSet(partitionColumns)

http://git-wip-us.apache.org/repos/asf/spark/blob/5cb03220/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
index fb047ff..8ca105d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
@@ -99,16 +99,17 @@ class DefaultSource extends FileFormat with 
DataSourceRegister {
       partitionSchema: StructType,
       requiredSchema: StructType,
       filters: Seq[Filter],
-      options: Map[String, String]): (PartitionedFile) => 
Iterator[InternalRow] = {
+      options: Map[String, String],
+      hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = 
{
     val csvOptions = new CSVOptions(options)
     val headers = requiredSchema.fields.map(_.name)
 
-    val conf = new Configuration(sparkSession.sessionState.hadoopConf)
-    val broadcastedConf = sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(conf))
+    val broadcastedHadoopConf =
+      sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
 
     (file: PartitionedFile) => {
       val lineIterator = {
-        val conf = broadcastedConf.value.value
+        val conf = broadcastedHadoopConf.value.value
         new HadoopFileLinesReader(file, conf).map { line =>
           new String(line.getBytes, 0, line.getLength, csvOptions.charset)
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/5cb03220/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
index 2628788..3058e79 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -217,7 +217,8 @@ trait FileFormat {
       partitionSchema: StructType,
       requiredSchema: StructType,
       filters: Seq[Filter],
-      options: Map[String, String]): PartitionedFile => Iterator[InternalRow] 
= {
+      options: Map[String, String],
+      hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
     // TODO: Remove this default implementation when the other formats have 
been ported
     // Until then we guard in [[FileSourceStrategy]] to only call this method 
on supported formats.
     throw new UnsupportedOperationException(s"buildReader is not supported for 
$this")

http://git-wip-us.apache.org/repos/asf/spark/blob/5cb03220/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index f9c34c6..b6b3907 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -97,10 +97,10 @@ class DefaultSource extends FileFormat with 
DataSourceRegister {
       partitionSchema: StructType,
       requiredSchema: StructType,
       filters: Seq[Filter],
-      options: Map[String, String]): PartitionedFile => Iterator[InternalRow] 
= {
-    val conf = new Configuration(sparkSession.sessionState.hadoopConf)
-    val broadcastedConf =
-      sparkSession.sparkContext.broadcast(new SerializableConfiguration(conf))
+      options: Map[String, String],
+      hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+    val broadcastedHadoopConf =
+      sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
 
     val parsedOptions: JSONOptions = new JSONOptions(options)
     val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord
@@ -109,8 +109,8 @@ class DefaultSource extends FileFormat with 
DataSourceRegister {
     val fullSchema = requiredSchema.toAttributes ++ 
partitionSchema.toAttributes
     val joinedRow = new JoinedRow()
 
-    file => {
-      val lines = new HadoopFileLinesReader(file, 
broadcastedConf.value.value).map(_.toString)
+    (file: PartitionedFile) => {
+      val lines = new HadoopFileLinesReader(file, 
broadcastedHadoopConf.value.value).map(_.toString)
 
       val rows = JacksonParser.parseJson(
         lines,

http://git-wip-us.apache.org/repos/asf/spark/blob/5cb03220/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index b156581..5be8770 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -262,13 +262,13 @@ private[sql] class DefaultSource
       partitionSchema: StructType,
       requiredSchema: StructType,
       filters: Seq[Filter],
-      options: Map[String, String]): PartitionedFile => Iterator[InternalRow] 
= {
-    val parquetConf = new Configuration(sparkSession.sessionState.hadoopConf)
-    parquetConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, 
classOf[CatalystReadSupport].getName)
-    parquetConf.set(
+      options: Map[String, String],
+      hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+    hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, 
classOf[CatalystReadSupport].getName)
+    hadoopConf.set(
       CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
       CatalystSchemaConverter.checkFieldNames(requiredSchema).json)
-    parquetConf.set(
+    hadoopConf.set(
       CatalystWriteSupport.SPARK_ROW_SCHEMA,
       CatalystSchemaConverter.checkFieldNames(requiredSchema).json)
 
@@ -276,13 +276,13 @@ private[sql] class DefaultSource
     // This metadata is only useful for detecting optional columns when 
pushdowning filters.
     val dataSchemaToWrite = 
StructType.removeMetadata(StructType.metadataKeyForOptionalField,
       requiredSchema).asInstanceOf[StructType]
-    CatalystWriteSupport.setSchema(dataSchemaToWrite, parquetConf)
+    CatalystWriteSupport.setSchema(dataSchemaToWrite, hadoopConf)
 
     // Sets flags for `CatalystSchemaConverter`
-    parquetConf.setBoolean(
+    hadoopConf.setBoolean(
       SQLConf.PARQUET_BINARY_AS_STRING.key,
       sparkSession.getConf(SQLConf.PARQUET_BINARY_AS_STRING))
-    parquetConf.setBoolean(
+    hadoopConf.setBoolean(
       SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
       sparkSession.getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP))
 
@@ -298,8 +298,8 @@ private[sql] class DefaultSource
       None
     }
 
-    val broadcastedConf =
-      sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(parquetConf))
+    val broadcastedHadoopConf =
+      sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
 
     // TODO: if you move this into the closure it reverts to the default 
values.
     // If true, enable using the custom RecordReader for parquet. This only 
works for
@@ -327,7 +327,8 @@ private[sql] class DefaultSource
           null)
 
       val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 
0), 0)
-      val hadoopAttemptContext = new 
TaskAttemptContextImpl(broadcastedConf.value.value, attemptId)
+      val hadoopAttemptContext =
+        new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, 
attemptId)
 
       val parquetReader = if (enableVectorizedReader) {
         val vectorizedReader = new VectorizedParquetRecordReader()

http://git-wip-us.apache.org/repos/asf/spark/blob/5cb03220/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
index a0d680c..348edfc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
@@ -89,17 +89,17 @@ class DefaultSource extends FileFormat with 
DataSourceRegister {
       partitionSchema: StructType,
       requiredSchema: StructType,
       filters: Seq[Filter],
-      options: Map[String, String]): PartitionedFile => Iterator[InternalRow] 
= {
-    val conf = new Configuration(sparkSession.sessionState.hadoopConf)
-    val broadcastedConf =
-      sparkSession.sparkContext.broadcast(new SerializableConfiguration(conf))
+      options: Map[String, String],
+      hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+    val broadcastedHadoopConf =
+      sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
 
-    file => {
+    (file: PartitionedFile) => {
       val unsafeRow = new UnsafeRow(1)
       val bufferHolder = new BufferHolder(unsafeRow)
       val unsafeRowWriter = new UnsafeRowWriter(bufferHolder, 1)
 
-      new HadoopFileLinesReader(file, broadcastedConf.value.value).map { line 
=>
+      new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value).map { 
line =>
         // Writes to an UnsafeRow directly
         bufferHolder.reset()
         unsafeRowWriter.write(0, line.getBytes, 0, line.getLength)

http://git-wip-us.apache.org/repos/asf/spark/blob/5cb03220/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 9da0af3..f73d485 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources
 import java.io.File
 import java.util.concurrent.atomic.AtomicInteger
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{BlockLocation, FileStatus, RawLocalFileSystem}
 import org.apache.hadoop.mapreduce.Job
 
@@ -476,7 +477,8 @@ class TestFileFormat extends FileFormat {
       partitionSchema: StructType,
       requiredSchema: StructType,
       filters: Seq[Filter],
-      options: Map[String, String]): PartitionedFile => Iterator[InternalRow] 
= {
+      options: Map[String, String],
+      hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
 
     // Record the arguments so they can be checked in the test case.
     LastArguments.partitionSchema = partitionSchema

http://git-wip-us.apache.org/repos/asf/spark/blob/5cb03220/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index cb49fc9..4f81967 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -114,22 +114,21 @@ private[sql] class DefaultSource
       partitionSchema: StructType,
       requiredSchema: StructType,
       filters: Seq[Filter],
-      options: Map[String, String]): (PartitionedFile) => 
Iterator[InternalRow] = {
-    val orcConf = new Configuration(sparkSession.sessionState.hadoopConf)
-
+      options: Map[String, String],
+      hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = 
{
     if (sparkSession.sessionState.conf.orcFilterPushDown) {
       // Sets pushed predicates
       OrcFilters.createFilter(filters.toArray).foreach { f =>
-        orcConf.set(OrcTableScan.SARG_PUSHDOWN, f.toKryo)
-        orcConf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true)
+        hadoopConf.set(OrcTableScan.SARG_PUSHDOWN, f.toKryo)
+        hadoopConf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true)
       }
     }
 
-    val broadcastedConf =
-      sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(orcConf))
+    val broadcastedHadoopConf =
+      sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
 
     (file: PartitionedFile) => {
-      val conf = broadcastedConf.value.value
+      val conf = broadcastedHadoopConf.value.value
 
       // SPARK-8501: Empty ORC files always have an empty schema stored in 
their footer. In this
       // case, `OrcFileOperator.readSchema` returns `None`, and we can't read 
the underlying file
@@ -154,7 +153,7 @@ private[sql] class DefaultSource
           // Specifically would be helpful for partitioned datasets.
           val orcReader = OrcFile.createReader(
             new Path(new URI(file.filePath)), OrcFile.readerOptions(conf))
-          new SparkOrcNewRecordReader(orcReader, conf, fileSplit.getStart(), 
fileSplit.getLength())
+          new SparkOrcNewRecordReader(orcReader, conf, fileSplit.getStart, 
fileSplit.getLength)
         }
 
         // Unwraps `OrcStruct`s to `UnsafeRow`s

http://git-wip-us.apache.org/repos/asf/spark/blob/5cb03220/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
index 71e3457..9ad0887 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
@@ -65,4 +65,27 @@ class SimpleTextHadoopFsRelationSuite extends 
HadoopFsRelationTest with Predicat
           .load(file.getCanonicalPath))
     }
   }
+
+  test("test hadoop conf option propagation") {
+    withTempPath { file =>
+      // Test write side
+      val df = sqlContext.range(10).selectExpr("cast(id as string)")
+      df.write
+        .option("some-random-write-option", "hahah-WRITE")
+        .option("some-null-value-option", null)  // test null robustness
+        .option("dataSchema", df.schema.json)
+        .format(dataSourceName).save(file.getAbsolutePath)
+      
assert(SimpleTextRelation.lastHadoopConf.get.get("some-random-write-option") == 
"hahah-WRITE")
+
+      // Test read side
+      val df1 = sqlContext.read
+        .option("some-random-read-option", "hahah-READ")
+        .option("some-null-value-option", null)  // test null robustness
+        .option("dataSchema", df.schema.json)
+        .format(dataSourceName)
+        .load(file.getAbsolutePath)
+      df1.count()
+      
assert(SimpleTextRelation.lastHadoopConf.get.get("some-random-read-option") == 
"hahah-READ")
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/5cb03220/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index e4bd1f9..0fa1841 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -47,13 +47,16 @@ class SimpleTextSource extends FileFormat with 
DataSourceRegister {
       sparkSession: SparkSession,
       job: Job,
       options: Map[String, String],
-      dataSchema: StructType): OutputWriterFactory = new OutputWriterFactory {
-    override def newInstance(
-        path: String,
-        bucketId: Option[Int],
-        dataSchema: StructType,
-        context: TaskAttemptContext): OutputWriter = {
-      new SimpleTextOutputWriter(path, context)
+      dataSchema: StructType): OutputWriterFactory = {
+    SimpleTextRelation.lastHadoopConf = Option(job.getConfiguration)
+    new OutputWriterFactory {
+      override def newInstance(
+          path: String,
+          bucketId: Option[Int],
+          dataSchema: StructType,
+          context: TaskAttemptContext): OutputWriter = {
+        new SimpleTextOutputWriter(path, context)
+      }
     }
   }
 
@@ -63,8 +66,9 @@ class SimpleTextSource extends FileFormat with 
DataSourceRegister {
       partitionSchema: StructType,
       requiredSchema: StructType,
       filters: Seq[Filter],
-      options: Map[String, String]): (PartitionedFile) => 
Iterator[InternalRow] = {
-
+      options: Map[String, String],
+      hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = 
{
+    SimpleTextRelation.lastHadoopConf = Option(hadoopConf)
     SimpleTextRelation.requiredColumns = requiredSchema.fieldNames
     SimpleTextRelation.pushedFilters = filters.toSet
 
@@ -74,9 +78,8 @@ class SimpleTextSource extends FileFormat with 
DataSourceRegister {
       inputAttributes.find(_.name == field.name)
     }
 
-    val conf = new Configuration(sparkSession.sessionState.hadoopConf)
-    val broadcastedConf =
-      sparkSession.sparkContext.broadcast(new SerializableConfiguration(conf))
+    val broadcastedHadoopConf =
+      sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
 
     (file: PartitionedFile) => {
       val predicate = {
@@ -95,7 +98,7 @@ class SimpleTextSource extends FileFormat with 
DataSourceRegister {
       val projection = new InterpretedProjection(outputAttributes, 
inputAttributes)
 
       val unsafeRowIterator =
-        new HadoopFileLinesReader(file, broadcastedConf.value.value).map { 
line =>
+        new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value).map 
{ line =>
           val record = line.toString
           new GenericInternalRow(record.split(",", -1).zip(fieldTypes).map {
             case (v, dataType) =>
@@ -164,4 +167,7 @@ object SimpleTextRelation {
 
   // Used to test failure callback
   var callbackCalled = false
+
+  // Used by the test case to check the value propagated in the hadoop confs.
+  var lastHadoopConf: Option[Configuration] = None
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to