This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new fdbd32e [SPARK-31692][SQL] Pass hadoop confs specifed via Spark
confs to URLStreamHandlerfactory
fdbd32e is described below
commit fdbd32ef451fea115b8f507792b251fb4c2eb9c5
Author: Karuppayya Rajendran <[email protected]>
AuthorDate: Wed May 13 23:18:38 2020 -0700
[SPARK-31692][SQL] Pass hadoop confs specifed via Spark confs to
URLStreamHandlerfactory
Pass hadoop confs specifed via Spark confs to URLStreamHandlerfactory
**BEFORE**
```
➜ spark git:(SPARK-31692) ✗ ./bin/spark-shell --conf
spark.hadoop.fs.file.impl=org.apache.hadoop.fs.RawLocalFileSystem
scala> spark.sharedState
res0: org.apache.spark.sql.internal.SharedState =
org.apache.spark.sql.internal.SharedState5793cd84
scala> new java.net.URL("file:///tmp/1.txt").openConnection.getInputStream
res1: java.io.InputStream =
org.apache.hadoop.fs.ChecksumFileSystem$FSDataBoundedInputStream22846025
scala> import org.apache.hadoop.fs._
import org.apache.hadoop.fs._
scala> FileSystem.get(new Path("file:///tmp/1.txt").toUri,
spark.sparkContext.hadoopConfiguration)
res2: org.apache.hadoop.fs.FileSystem =
org.apache.hadoop.fs.LocalFileSystem5a930c03
```
**AFTER**
```
➜ spark git:(SPARK-31692) ✗ ./bin/spark-shell --conf
spark.hadoop.fs.file.impl=org.apache.hadoop.fs.RawLocalFileSystem
scala> spark.sharedState
res0: org.apache.spark.sql.internal.SharedState =
org.apache.spark.sql.internal.SharedState5c24a636
scala> new java.net.URL("file:///tmp/1.txt").openConnection.getInputStream
res1: java.io.InputStream = org.apache.hadoop.fs.FSDataInputStream2ba8f528
scala> import org.apache.hadoop.fs._
import org.apache.hadoop.fs._
scala> FileSystem.get(new Path("file:///tmp/1.txt").toUri,
spark.sparkContext.hadoopConfiguration)
res2: org.apache.hadoop.fs.FileSystem = LocalFS
scala> FileSystem.get(new Path("file:///tmp/1.txt").toUri,
spark.sparkContext.hadoopConfiguration).getClass
res3: Class[_ <: org.apache.hadoop.fs.FileSystem] = class
org.apache.hadoop.fs.RawLocalFileSystem
```
The type of FileSystem object created(you can check the last statement in
the above snippets) in the above two cases are different, which should not have
been the case
No
Tested locally.
Added Unit test
Closes #28516 from karuppayya/SPARK-31692.
Authored-by: Karuppayya Rajendran <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 72601460ada41761737f39d5dff8e69444fce2ba)
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit d639a12ef243e1e8d20bd06d3a97d00e47f05517)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../apache/spark/sql/internal/SharedState.scala | 6 +--
.../spark/sql/internal/SharedStateSuite.scala | 55 ++++++++++++++++++++++
2 files changed, 58 insertions(+), 3 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index 5e8c702..ce64c0a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -42,7 +42,7 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils}
*/
private[sql] class SharedState(val sparkContext: SparkContext) extends Logging
{
- SharedState.setFsUrlStreamHandlerFactory(sparkContext.conf)
+ SharedState.setFsUrlStreamHandlerFactory(sparkContext.conf,
sparkContext.hadoopConfiguration)
// Load hive-site.xml into hadoopConf and determine the warehouse path we
want to use, based on
// the config from both hive and Spark SQL. Finally set the warehouse config
value to sparkConf.
@@ -160,13 +160,13 @@ private[sql] class SharedState(val sparkContext:
SparkContext) extends Logging {
object SharedState extends Logging {
@volatile private var fsUrlStreamHandlerFactoryInitialized = false
- private def setFsUrlStreamHandlerFactory(conf: SparkConf): Unit = {
+ private def setFsUrlStreamHandlerFactory(conf: SparkConf, hadoopConf:
Configuration): Unit = {
if (!fsUrlStreamHandlerFactoryInitialized &&
conf.get(DEFAULT_URL_STREAM_HANDLER_FACTORY_ENABLED)) {
synchronized {
if (!fsUrlStreamHandlerFactoryInitialized) {
try {
- URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory())
+ URL.setURLStreamHandlerFactory(new
FsUrlStreamHandlerFactory(hadoopConf))
fsUrlStreamHandlerFactoryInitialized = true
} catch {
case NonFatal(_) =>
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala
new file mode 100644
index 0000000..cda791a
--- /dev/null
+++
b/sql/core/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal
+
+import java.net.URL
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FsUrlStreamHandlerFactory
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.sql.test.SharedSparkSession
+
+
+/**
+ * Tests for [[org.apache.spark.sql.internal.SharedState]].
+ */
+class SharedStateSuite extends SparkFunSuite with SharedSparkSession {
+
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf
+ .set("spark.hadoop.fs.defaultFS", "file:///")
+ }
+
+ test("SPARK-31692: Url handler factory should have the hadoop configs from
Spark conf") {
+ // Accessing shared state to init the object since it is `lazy val`
+ spark.sharedState
+ val field = classOf[URL].getDeclaredField("factory")
+ field.setAccessible(true)
+ val value = field.get(null)
+ assert(value.isInstanceOf[FsUrlStreamHandlerFactory])
+ val streamFactory = value.asInstanceOf[FsUrlStreamHandlerFactory]
+
+ val confField = classOf[FsUrlStreamHandlerFactory].getDeclaredField("conf")
+ confField.setAccessible(true)
+ val conf = confField.get(streamFactory)
+
+ assert(conf.isInstanceOf[Configuration])
+ assert(conf.asInstanceOf[Configuration].get("fs.defaultFS") == "file:///")
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]