This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 3c84c229d16 [SPARK-45328][SQL] Remove Hive support prior to 2.0.0
3c84c229d16 is described below
commit 3c84c229d167a6ab2857649e91fff6f0d57bb12c
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Wed Sep 27 07:20:14 2023 +0900
[SPARK-45328][SQL] Remove Hive support prior to 2.0.0
### What changes were proposed in this pull request?
This PR proposes to remove Hive support prior to 2.0.0
(`spark.sql.hive.metastore.version`).
### Why are the changes needed?
We dropped JDK 8 and 11, and Hive prior to 2.0.0 cannot work together. They
are actually already the dead code.
### Does this PR introduce _any_ user-facing change?
Technically no, because this wouldn't already work.
### How was this patch tested?
Nope because there is no way to test them.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #43116 from HyukjinKwon/SPARK-45328.
Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
docs/sql-migration-guide.md | 1 +
.../org/apache/spark/sql/hive/HiveUtils.scala | 2 +-
.../spark/sql/hive/client/HiveClientImpl.scala | 6 ---
.../apache/spark/sql/hive/client/HiveShim.scala | 12 +++---
.../sql/hive/client/IsolatedClientLoader.scala | 6 ---
.../org/apache/spark/sql/hive/client/package.scala | 46 +---------------------
.../spark/sql/hive/execution/HiveTempPath.scala | 40 ++-----------------
.../spark/sql/hive/client/HiveClientVersions.scala | 7 +---
.../hive/client/HivePartitionFilteringSuites.scala | 3 +-
9 files changed, 16 insertions(+), 107 deletions(-)
diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md
index 56a3c8292cd..a28f6fd284d 100644
--- a/docs/sql-migration-guide.md
+++ b/docs/sql-migration-guide.md
@@ -26,6 +26,7 @@ license: |
- Since Spark 4.0, the default value of `spark.sql.maxSinglePartitionBytes` is
changed from `Long.MaxValue` to `128m`. To restore the previous behavior, set
`spark.sql.maxSinglePartitionBytes` to `9223372036854775807`(`Long.MaxValue`).
- Since Spark 4.0, any read of SQL tables takes into consideration the SQL
configs
`spark.sql.files.ignoreCorruptFiles`/`spark.sql.files.ignoreMissingFiles`
instead of the core config
`spark.files.ignoreCorruptFiles`/`spark.files.ignoreMissingFiles`.
+- Since Spark 4.0, `spark.sql.hive.metastore` drops the support of Hive prior
to 2.0.0 as they require JDK 8 that Spark does not support anymore. Users
should migrate to higher versions.
## Upgrading from Spark SQL 3.4 to 3.5
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index a01246520f3..794838a1190 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -73,7 +73,7 @@ private[spark] object HiveUtils extends Logging {
val HIVE_METASTORE_VERSION =
buildStaticConf("spark.sql.hive.metastore.version")
.doc("Version of the Hive metastore. Available options are " +
- "<code>0.12.0</code> through <code>2.3.9</code> and " +
+ "<code>2.0.0</code> through <code>2.3.9</code> and " +
"<code>3.0.0</code> through <code>3.1.3</code>.")
.version("1.4.0")
.stringConf
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index f3d7d7e66a5..4e4ef6ce9f7 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -115,12 +115,6 @@ private[hive] class HiveClientImpl(
private val outputBuffer = new CircularBuffer()
private val shim = version match {
- case hive.v12 => new Shim_v0_12()
- case hive.v13 => new Shim_v0_13()
- case hive.v14 => new Shim_v0_14()
- case hive.v1_0 => new Shim_v1_0()
- case hive.v1_1 => new Shim_v1_1()
- case hive.v1_2 => new Shim_v1_2()
case hive.v2_0 => new Shim_v2_0()
case hive.v2_1 => new Shim_v2_1()
case hive.v2_2 => new Shim_v2_2()
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index 338498d3d48..e12fe857c88 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -255,7 +255,7 @@ private[client] sealed abstract class Shim {
}
}
-private[client] class Shim_v0_12 extends Shim with Logging {
+private class Shim_v0_12 extends Shim with Logging {
// See HIVE-12224, HOLD_DDLTIME was broken as soon as it landed
protected lazy val holdDDLTime = JBoolean.FALSE
// deletes the underlying data along with metadata
@@ -698,7 +698,7 @@ private[client] class Shim_v0_12 extends Shim with Logging {
}
}
-private[client] class Shim_v0_13 extends Shim_v0_12 {
+private class Shim_v0_13 extends Shim_v0_12 {
private lazy val setCurrentSessionStateMethod =
findStaticMethod(
@@ -1222,7 +1222,7 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
}
}
-private[client] class Shim_v0_14 extends Shim_v0_13 {
+private class Shim_v0_14 extends Shim_v0_13 {
// true if this is an ACID operation
protected lazy val isAcid = JBoolean.FALSE
@@ -1341,9 +1341,9 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
}
-private[client] class Shim_v1_0 extends Shim_v0_14
+private class Shim_v1_0 extends Shim_v0_14
-private[client] class Shim_v1_1 extends Shim_v1_0 {
+private class Shim_v1_1 extends Shim_v1_0 {
// throws an exception if the index does not exist
protected lazy val throwExceptionInDropIndex = JBoolean.TRUE
@@ -1366,7 +1366,7 @@ private[client] class Shim_v1_1 extends Shim_v1_0 {
}
-private[client] class Shim_v1_2 extends Shim_v1_1 {
+private class Shim_v1_2 extends Shim_v1_1 {
// txnId can be 0 unless isAcid == true
protected lazy val txnIdInLoadDynamicPartitions: JLong = 0L
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index 2765e6af521..d6489f04391 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -90,12 +90,6 @@ private[hive] object IsolatedClientLoader extends Logging {
def hiveVersion(version: String): HiveVersion = {
VersionUtils.majorMinorPatchVersion(version).flatMap {
- case (12, _, _) | (0, 12, _) => Some(hive.v12)
- case (13, _, _) | (0, 13, _) => Some(hive.v13)
- case (14, _, _) | (0, 14, _) => Some(hive.v14)
- case (1, 0, _) => Some(hive.v1_0)
- case (1, 1, _) => Some(hive.v1_1)
- case (1, 2, _) => Some(hive.v1_2)
case (2, 0, _) => Some(hive.v2_0)
case (2, 1, _) => Some(hive.v2_1)
case (2, 2, _) => Some(hive.v2_2)
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
index 9304074e866..a66842de7d8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
@@ -39,48 +39,6 @@ package object client {
// scalastyle:off
private[hive] object hive {
- case object v12 extends HiveVersion("0.12.0")
- case object v13 extends HiveVersion("0.13.1")
-
- // Do not need Calcite because we disabled hive.cbo.enable.
- //
- // The other excluded dependencies are nowhere to be found, so exclude
them explicitly. If
- // they're needed by the metastore client, users will have to dig them out
of somewhere and use
- // configuration to point Spark at the correct jars.
- case object v14 extends HiveVersion("0.14.0",
- exclusions = Seq("org.apache.calcite:calcite-core",
- "org.apache.calcite:calcite-avatica",
- "org.pentaho:pentaho-aggdesigner-algorithm"))
-
- case object v1_0 extends HiveVersion("1.0.1",
- exclusions = Seq("eigenbase:eigenbase-properties",
- "org.apache.calcite:calcite-core",
- "org.apache.calcite:calcite-avatica",
- "org.pentaho:pentaho-aggdesigner-algorithm",
- "net.hydromatic:linq4j",
- "net.hydromatic:quidem"))
-
- // The curator dependency was added to the exclusions here because it
seems to confuse the ivy
- // library. org.apache.curator:curator is a pom dependency but ivy tries
to find the jar for it,
- // and fails.
- case object v1_1 extends HiveVersion("1.1.1",
- exclusions = Seq("eigenbase:eigenbase-properties",
- "org.apache.calcite:calcite-core",
- "org.apache.calcite:calcite-avatica",
- "org.apache.curator:*",
- "org.pentaho:pentaho-aggdesigner-algorithm",
- "net.hydromatic:linq4j",
- "net.hydromatic:quidem"))
-
- case object v1_2 extends HiveVersion("1.2.2",
- exclusions = Seq("eigenbase:eigenbase-properties",
- "org.apache.calcite:calcite-core",
- "org.apache.calcite:calcite-avatica",
- "org.apache.curator:*",
- "org.pentaho:pentaho-aggdesigner-algorithm",
- "net.hydromatic:linq4j",
- "net.hydromatic:quidem"))
-
case object v2_0 extends HiveVersion("2.0.1",
exclusions = Seq("org.apache.calcite:calcite-core",
"org.apache.calcite:calcite-avatica",
@@ -131,8 +89,8 @@ package object client {
"org.pentaho:pentaho-aggdesigner-algorithm",
"org.apache.hive:hive-vector-code-gen"))
- val allSupportedHiveVersions =
- Set(v12, v13, v14, v1_0, v1_1, v1_2, v2_0, v2_1, v2_2, v2_3, v3_0, v3_1)
+ val allSupportedHiveVersions: Set[HiveVersion] =
+ Set(v2_0, v2_1, v2_2, v2_3, v3_0, v3_1)
}
// scalastyle:on
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala
index 9981ae4cc31..6fd8892fa1f 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala
@@ -33,7 +33,6 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.hive.HiveExternalCatalog
-import org.apache.spark.sql.hive.client.HiveVersion
class HiveTempPath(session: SparkSession, val hadoopConf: Configuration, path:
Path)
extends Logging {
@@ -44,54 +43,23 @@ class HiveTempPath(session: SparkSession, val hadoopConf:
Configuration, path: P
private def getExternalTmpPath(path: Path): Path = {
import org.apache.spark.sql.hive.client.hive._
- // Before Hive 1.1, when inserting into a table, Hive will create the
staging directory under
- // a common scratch directory. After the writing is finished, Hive will
simply empty the table
- // directory and move the staging directory to it.
- // After Hive 1.1, Hive will create the staging directory under the table
directory, and when
+ // Hive will creates the staging directory under the table directory, and
when
// moving staging directory to table directory, Hive will still empty the
table directory, but
// will exclude the staging directory there.
- // We have to follow the Hive behavior here, to avoid troubles. For
example, if we create
- // staging directory under the table director for Hive prior to 1.1, the
staging directory will
- // be removed by Hive when Hive is trying to empty the table directory.
- val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13,
v14, v1_0)
- val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] =
- Set(v1_1, v1_2, v2_0, v2_1, v2_2, v2_3, v3_0, v3_1)
-
- // Ensure all the supported versions are considered here.
- assert(hiveVersionsUsingNewExternalTempPath ++
hiveVersionsUsingOldExternalTempPath ==
- allSupportedHiveVersions)
val externalCatalog = session.sharedState.externalCatalog
val hiveVersion =
externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client.version
val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
- val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")
- if (hiveVersionsUsingOldExternalTempPath.contains(hiveVersion)) {
- oldVersionExternalTempPath(path, scratchDir)
- } else if (hiveVersionsUsingNewExternalTempPath.contains(hiveVersion)) {
- newVersionExternalTempPath(path, stagingDir)
+ if (allSupportedHiveVersions.contains(hiveVersion)) {
+ externalTempPath(path, stagingDir)
} else {
throw new IllegalStateException("Unsupported hive version: " +
hiveVersion.fullVersion)
}
}
- // Mostly copied from Context.java#getExternalTmpPath of Hive 0.13
- private def oldVersionExternalTempPath(path: Path, scratchDir: String): Path
= {
- val extURI: URI = path.toUri
- val scratchPath = new Path(scratchDir, executionId)
- var dirPath = new Path(
- extURI.getScheme,
- extURI.getAuthority,
- scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID())
-
- val fs = dirPath.getFileSystem(hadoopConf)
- dirPath = new Path(fs.makeQualified(dirPath).toString())
- stagingDirForCreating = Some(dirPath)
- dirPath
- }
-
// Mostly copied from Context.java#getExternalTmpPath of Hive 1.2
- private def newVersionExternalTempPath(path: Path, stagingDir: String): Path
= {
+ private def externalTempPath(path: Path, stagingDir: String): Path = {
val extURI: URI = path.toUri
if (extURI.getScheme == "viewfs") {
val qualifiedStagingDir = getStagingDir(path, stagingDir)
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala
index 6648c04a4c5..1dee9e6dcfc 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala
@@ -19,16 +19,11 @@ package org.apache.spark.sql.hive.client
import scala.collection.immutable.IndexedSeq
-import org.apache.commons.lang3.{JavaVersion, SystemUtils}
-
private[client] trait HiveClientVersions {
private val testVersions = sys.env.get("SPARK_TEST_HIVE_CLIENT_VERSIONS")
protected val versions = if (testVersions.nonEmpty) {
testVersions.get.split(",").map(_.trim).filter(_.nonEmpty).toIndexedSeq
- } else if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
- IndexedSeq("2.0", "2.1", "2.2", "2.3", "3.0", "3.1")
} else {
- IndexedSeq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1",
"2.2", "2.3", "3.0",
- "3.1")
+ IndexedSeq("2.0", "2.1", "2.2", "2.3", "3.0", "3.1")
}
}
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuites.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuites.scala
index a43e778b13b..f10e6386542 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuites.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuites.scala
@@ -23,7 +23,6 @@ import org.scalatest.Suite
class HivePartitionFilteringSuites extends Suite with HiveClientVersions {
override def nestedSuites: IndexedSeq[Suite] = {
- // Hive 0.12 does not provide the partition filtering API we call
- versions.filterNot(_ == "0.12").map(new HivePartitionFilteringSuite(_))
+ versions.map(new HivePartitionFilteringSuite(_))
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]