This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new fdbd32e  [SPARK-31692][SQL] Pass hadoop confs  specifed via Spark 
confs to URLStreamHandlerfactory
fdbd32e is described below

commit fdbd32ef451fea115b8f507792b251fb4c2eb9c5
Author: Karuppayya Rajendran <[email protected]>
AuthorDate: Wed May 13 23:18:38 2020 -0700

    [SPARK-31692][SQL] Pass hadoop confs  specifed via Spark confs to 
URLStreamHandlerfactory
    
    Pass hadoop confs  specifed via Spark confs to URLStreamHandlerfactory
    
    **BEFORE**
    ```
    ➜  spark git:(SPARK-31692) ✗ ./bin/spark-shell --conf 
spark.hadoop.fs.file.impl=org.apache.hadoop.fs.RawLocalFileSystem
    
    scala> spark.sharedState
    res0: org.apache.spark.sql.internal.SharedState = 
org.apache.spark.sql.internal.SharedState5793cd84
    
    scala> new java.net.URL("file:///tmp/1.txt").openConnection.getInputStream
    res1: java.io.InputStream = 
org.apache.hadoop.fs.ChecksumFileSystem$FSDataBoundedInputStream22846025
    
    scala> import org.apache.hadoop.fs._
    import org.apache.hadoop.fs._
    
    scala>  FileSystem.get(new Path("file:///tmp/1.txt").toUri, 
spark.sparkContext.hadoopConfiguration)
    res2: org.apache.hadoop.fs.FileSystem = 
org.apache.hadoop.fs.LocalFileSystem5a930c03
    ```
    
    **AFTER**
    ```
    ➜  spark git:(SPARK-31692) ✗ ./bin/spark-shell --conf 
spark.hadoop.fs.file.impl=org.apache.hadoop.fs.RawLocalFileSystem
    
    scala> spark.sharedState
    res0: org.apache.spark.sql.internal.SharedState = 
org.apache.spark.sql.internal.SharedState5c24a636
    
    scala> new java.net.URL("file:///tmp/1.txt").openConnection.getInputStream
    res1: java.io.InputStream = org.apache.hadoop.fs.FSDataInputStream2ba8f528
    
    scala> import org.apache.hadoop.fs._
    import org.apache.hadoop.fs._
    
    scala>  FileSystem.get(new Path("file:///tmp/1.txt").toUri, 
spark.sparkContext.hadoopConfiguration)
    res2: org.apache.hadoop.fs.FileSystem = LocalFS
    
    scala>  FileSystem.get(new Path("file:///tmp/1.txt").toUri, 
spark.sparkContext.hadoopConfiguration).getClass
    res3: Class[_ <: org.apache.hadoop.fs.FileSystem] = class 
org.apache.hadoop.fs.RawLocalFileSystem
    ```
    The type of FileSystem object created(you can check the last statement in 
the above snippets) in the above two cases are different, which should not have 
been the case
    
    No
    
    Tested locally.
    Added Unit test
    
    Closes #28516 from karuppayya/SPARK-31692.
    
    Authored-by: Karuppayya Rajendran <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
    (cherry picked from commit 72601460ada41761737f39d5dff8e69444fce2ba)
    Signed-off-by: Dongjoon Hyun <[email protected]>
    (cherry picked from commit d639a12ef243e1e8d20bd06d3a97d00e47f05517)
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../apache/spark/sql/internal/SharedState.scala    |  6 +--
 .../spark/sql/internal/SharedStateSuite.scala      | 55 ++++++++++++++++++++++
 2 files changed, 58 insertions(+), 3 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index 5e8c702..ce64c0a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -42,7 +42,7 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils}
  */
 private[sql] class SharedState(val sparkContext: SparkContext) extends Logging 
{
 
-  SharedState.setFsUrlStreamHandlerFactory(sparkContext.conf)
+  SharedState.setFsUrlStreamHandlerFactory(sparkContext.conf, 
sparkContext.hadoopConfiguration)
 
   // Load hive-site.xml into hadoopConf and determine the warehouse path we 
want to use, based on
   // the config from both hive and Spark SQL. Finally set the warehouse config 
value to sparkConf.
@@ -160,13 +160,13 @@ private[sql] class SharedState(val sparkContext: 
SparkContext) extends Logging {
 object SharedState extends Logging {
   @volatile private var fsUrlStreamHandlerFactoryInitialized = false
 
-  private def setFsUrlStreamHandlerFactory(conf: SparkConf): Unit = {
+  private def setFsUrlStreamHandlerFactory(conf: SparkConf, hadoopConf: 
Configuration): Unit = {
     if (!fsUrlStreamHandlerFactoryInitialized &&
         conf.get(DEFAULT_URL_STREAM_HANDLER_FACTORY_ENABLED)) {
       synchronized {
         if (!fsUrlStreamHandlerFactoryInitialized) {
           try {
-            URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory())
+            URL.setURLStreamHandlerFactory(new 
FsUrlStreamHandlerFactory(hadoopConf))
             fsUrlStreamHandlerFactoryInitialized = true
           } catch {
             case NonFatal(_) =>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala
new file mode 100644
index 0000000..cda791a
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal
+
+import java.net.URL
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FsUrlStreamHandlerFactory
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.sql.test.SharedSparkSession
+
+
+/**
+ * Tests for [[org.apache.spark.sql.internal.SharedState]].
+ */
+class SharedStateSuite extends SparkFunSuite with SharedSparkSession {
+
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf
+      .set("spark.hadoop.fs.defaultFS", "file:///")
+  }
+
+  test("SPARK-31692: Url handler factory should have the hadoop configs from 
Spark conf") {
+    // Accessing shared state to init the object since it is `lazy val`
+    spark.sharedState
+    val field = classOf[URL].getDeclaredField("factory")
+    field.setAccessible(true)
+    val value = field.get(null)
+    assert(value.isInstanceOf[FsUrlStreamHandlerFactory])
+    val streamFactory = value.asInstanceOf[FsUrlStreamHandlerFactory]
+
+    val confField = classOf[FsUrlStreamHandlerFactory].getDeclaredField("conf")
+    confField.setAccessible(true)
+    val conf = confField.get(streamFactory)
+
+    assert(conf.isInstanceOf[Configuration])
+    assert(conf.asInstanceOf[Configuration].get("fs.defaultFS") == "file:///")
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to