This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 476aaee Revert "[SPARK-31170][SQL] Spark SQL Cli should respect
hive-site.xml and spark.sql.warehouse.dir"
476aaee is described below
commit 476aaee7089d051dae49d2a44de69df3b79248a0
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Thu Mar 19 16:13:50 2020 -0700
Revert "[SPARK-31170][SQL] Spark SQL Cli should respect hive-site.xml and
spark.sql.warehouse.dir"
This reverts commit 321341a4c3104380035350631c82a4b385f117e4.
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../apache/spark/sql/internal/SharedState.scala | 80 +++++++++++-----------
.../sql/hive/thriftserver/SparkSQLCLIDriver.scala | 2 -
.../spark/sql/hive/thriftserver/CliSuite.scala | 12 ----
.../spark/sql/hive/HiveSharedStateSuite.scala | 1 +
.../spark/sql/hive/HiveSparkSubmitSuite.scala | 2 +-
5 files changed, 42 insertions(+), 55 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index eb74e96..5347264 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -41,6 +41,7 @@ import
org.apache.spark.sql.streaming.ui.{StreamingQueryStatusListener, Streamin
import org.apache.spark.status.ElementTrackingStore
import org.apache.spark.util.Utils
+
/**
* A class that holds all state shared across sessions in a given
[[SQLContext]].
*
@@ -54,10 +55,45 @@ private[sql] class SharedState(
SharedState.setFsUrlStreamHandlerFactory(sparkContext.conf)
+ // Load hive-site.xml into hadoopConf and determine the warehouse path we
want to use, based on
+ // the config from both hive and Spark SQL. Finally set the warehouse config
value to sparkConf.
+ val warehousePath: String = {
+ val configFile =
Utils.getContextOrSparkClassLoader.getResource("hive-site.xml")
+ if (configFile != null) {
+ logInfo(s"loading hive config file: $configFile")
+ sparkContext.hadoopConfiguration.addResource(configFile)
+ }
+
+ // hive.metastore.warehouse.dir only stay in hadoopConf
+ sparkContext.conf.remove("hive.metastore.warehouse.dir")
+ // Set the Hive metastore warehouse path to the one we use
+ val hiveWarehouseDir =
sparkContext.hadoopConfiguration.get("hive.metastore.warehouse.dir")
+ if (hiveWarehouseDir != null &&
!sparkContext.conf.contains(WAREHOUSE_PATH.key)) {
+ // If hive.metastore.warehouse.dir is set and spark.sql.warehouse.dir is
not set,
+ // we will respect the value of hive.metastore.warehouse.dir.
+ sparkContext.conf.set(WAREHOUSE_PATH.key, hiveWarehouseDir)
+ logInfo(s"${WAREHOUSE_PATH.key} is not set, but
hive.metastore.warehouse.dir " +
+ s"is set. Setting ${WAREHOUSE_PATH.key} to the value of " +
+ s"hive.metastore.warehouse.dir ('$hiveWarehouseDir').")
+ hiveWarehouseDir
+ } else {
+ // If spark.sql.warehouse.dir is set, we will override
hive.metastore.warehouse.dir using
+ // the value of spark.sql.warehouse.dir.
+ // When neither spark.sql.warehouse.dir nor hive.metastore.warehouse.dir
is set,
+ // we will set hive.metastore.warehouse.dir to the default value of
spark.sql.warehouse.dir.
+ val sparkWarehouseDir = sparkContext.conf.get(WAREHOUSE_PATH)
+ logInfo(s"Setting hive.metastore.warehouse.dir ('$hiveWarehouseDir') to
the value of " +
+ s"${WAREHOUSE_PATH.key} ('$sparkWarehouseDir').")
+ sparkContext.hadoopConfiguration.set("hive.metastore.warehouse.dir",
sparkWarehouseDir)
+ sparkWarehouseDir
+ }
+ }
+ logInfo(s"Warehouse path is '$warehousePath'.")
+
+ // These 2 variables should be initiated after `warehousePath`, because in
the first place we need
+ // to load hive-site.xml into hadoopConf and determine the warehouse path
which will be set into
+ // both spark conf and hadoop conf avoiding be affected by any SparkSession
level options
private val (conf, hadoopConf) = {
- // Load hive-site.xml into hadoopConf and determine the warehouse path
which will be set into
- // both spark conf and hadoop conf avoiding be affected by any
SparkSession level options
- SharedState.loadHiveConfFile(sparkContext.conf,
sparkContext.hadoopConfiguration)
val confClone = sparkContext.conf.clone()
val hadoopConfClone = new Configuration(sparkContext.hadoopConfiguration)
// If `SparkSession` is instantiated using an existing `SparkContext`
instance and no existing
@@ -130,7 +166,7 @@ private[sql] class SharedState(
val defaultDbDefinition = CatalogDatabase(
SessionCatalog.DEFAULT_DATABASE,
"default database",
- CatalogUtils.stringToURI(conf.get(WAREHOUSE_PATH)),
+ CatalogUtils.stringToURI(warehousePath),
Map())
// Create default database if it doesn't exist
if (!externalCatalog.databaseExists(SessionCatalog.DEFAULT_DATABASE)) {
@@ -222,40 +258,4 @@ object SharedState extends Logging {
throw new IllegalArgumentException(s"Error while instantiating
'$className':", e)
}
}
-
- /**
- * Load hive-site.xml into hadoopConf and determine the warehouse path we
want to use, based on
- * the config from both hive and Spark SQL. Finally set the warehouse config
value to sparkConf.
- */
- def loadHiveConfFile(sparkConf: SparkConf, hadoopConf: Configuration): Unit
= {
- val hiveWarehouseKey = "hive.metastore.warehouse.dir"
- val configFile =
Utils.getContextOrSparkClassLoader.getResource("hive-site.xml")
- if (configFile != null) {
- logInfo(s"loading hive config file: $configFile")
- hadoopConf.addResource(configFile)
- }
- // hive.metastore.warehouse.dir only stay in hadoopConf
- sparkConf.remove(hiveWarehouseKey)
- // Set the Hive metastore warehouse path to the one we use
- val hiveWarehouseDir = hadoopConf.get(hiveWarehouseKey)
- val warehousePath = if (hiveWarehouseDir != null &&
!sparkConf.contains(WAREHOUSE_PATH.key)) {
- // If hive.metastore.warehouse.dir is set and spark.sql.warehouse.dir is
not set,
- // we will respect the value of hive.metastore.warehouse.dir.
- sparkConf.set(WAREHOUSE_PATH.key, hiveWarehouseDir)
- logInfo(s"${WAREHOUSE_PATH.key} is not set, but $hiveWarehouseKey is
set. Setting" +
- s" ${WAREHOUSE_PATH.key} to the value of $hiveWarehouseKey
('$hiveWarehouseDir').")
- hiveWarehouseDir
- } else {
- // If spark.sql.warehouse.dir is set, we will override
hive.metastore.warehouse.dir using
- // the value of spark.sql.warehouse.dir.
- // When neither spark.sql.warehouse.dir nor hive.metastore.warehouse.dir
is set
- // we will set hive.metastore.warehouse.dir to the default value of
spark.sql.warehouse.dir.
- val sparkWarehouseDir = sparkConf.get(WAREHOUSE_PATH)
- logInfo(s"Setting $hiveWarehouseKey ('$hiveWarehouseDir') to the value
of " +
- s"${WAREHOUSE_PATH.key} ('$sparkWarehouseDir').")
- hadoopConf.set(hiveWarehouseKey, sparkWarehouseDir)
- sparkWarehouseDir
- }
- logInfo(s"Warehouse path is '$warehousePath'.")
- }
}
diff --git
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index 3ddf4ec..6b76927 100644
---
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -46,7 +46,6 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.client.HiveClientImpl
import org.apache.spark.sql.hive.security.HiveDelegationTokenProvider
-import org.apache.spark.sql.internal.SharedState
import org.apache.spark.util.ShutdownHookManager
/**
@@ -88,7 +87,6 @@ private[hive] object SparkSQLCLIDriver extends Logging {
val sparkConf = new SparkConf(loadDefaults = true)
val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
- SharedState.loadHiveConfFile(sparkConf, hadoopConf)
val extraConfigs = HiveUtils.formatTimeVarsForHiveClient(hadoopConf)
val cliConf = HiveClientImpl.newHiveConf(sparkConf, hadoopConf,
extraConfigs)
diff --git
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
index ed77663..43aafc3 100644
---
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -32,7 +32,6 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.Logging
import org.apache.spark.sql.hive.test.HiveTestJars
-import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
import org.apache.spark.util.{ThreadUtils, Utils}
@@ -160,17 +159,6 @@ class CliSuite extends SparkFunSuite with
BeforeAndAfterAll with Logging {
}
}
- test("Pick spark.sql.warehouse.dir first for Spark Cli if set") {
- val sparkWareHouseDir = Utils.createTempDir()
- new File(warehousePath, "metastore_db").delete()
- runCliWithin(
- 1.minute,
- Seq("--conf",
s"${StaticSQLConf.WAREHOUSE_PATH.key}=$sparkWareHouseDir"))(
- "desc database default;" -> sparkWareHouseDir.getAbsolutePath
- )
- sparkWareHouseDir.delete()
- }
-
test("Simple commands") {
val dataFilePath =
Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt")
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala
index 78535b0..6e2dcfc 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala
@@ -45,6 +45,7 @@ class HiveSharedStateSuite extends SparkFunSuite {
GLOBAL_TEMP_DATABASE.key -> tmpDb)
val state = new SharedState(sc, initialConfigs)
+ assert(state.warehousePath !== invalidPath, "warehouse path can't
determine by session options")
assert(sc.conf.get(WAREHOUSE_PATH.key) !== invalidPath,
"warehouse conf in session options can't affect application wide spark
conf")
assert(sc.hadoopConfiguration.get(ConfVars.METASTOREWAREHOUSE.varname) !==
invalidPath,
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index 8b97489..31ff62e 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -787,7 +787,7 @@ object SPARK_18360 {
.enableHiveSupport().getOrCreate()
val defaultDbLocation = spark.catalog.getDatabase("default").locationUri
- assert(new Path(defaultDbLocation) == new
Path(spark.conf.get(WAREHOUSE_PATH)))
+ assert(new Path(defaultDbLocation) == new
Path(spark.sharedState.warehousePath))
val hiveClient =
spark.sharedState.externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]