This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new afd9e2cc0a7 Revert [SPARK-39203][SQL] Rewrite table location to
absolute URI based on database URI
afd9e2cc0a7 is described below
commit afd9e2cc0a73069514eef5c5eb7a3ebed8e4b8cf
Author: Wenchen Fan <[email protected]>
AuthorDate: Fri Apr 21 10:28:22 2023 +0900
Revert [SPARK-39203][SQL] Rewrite table location to absolute URI based on
database URI
### What changes were proposed in this pull request?
This reverts https://github.com/apache/spark/pull/36625 and its followup
https://github.com/apache/spark/pull/38321 .
### Why are the changes needed?
External table location can be arbitrary and has no connection with the
database location. It can be wrong to qualify the external table location based
on the database location.
If a table written by old Spark versions does not have a qualified
location, there is no way to restore it as the information is already lost.
People can manually fix the table locations assuming they are under the same
HDFS cluster with the database location, by themselves.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
N/A
Closes #40871 from cloud-fan/minor.
Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../spark/sql/hive/HiveExternalCatalog.scala | 27 ++--------------
.../spark/sql/hive/client/HiveClientImpl.scala | 36 +++++-----------------
.../spark/sql/hive/HiveExternalCatalogSuite.scala | 15 ---------
3 files changed, 10 insertions(+), 68 deletions(-)
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 5cd3b9c3abf..794d94a4f70 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.hive
import java.io.IOException
import java.lang.reflect.InvocationTargetException
-import java.net.URI
import java.util
import java.util.Locale
@@ -852,15 +851,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf,
hadoopConf: Configurat
// source tables. Here we set the table location to `locationUri` field
and filter out the
// path option in storage properties, to avoid exposing this concept
externally.
val storageWithLocation = {
- val tableLocation = getLocationFromStorageProps(table).map { path =>
- // Before SPARK-19257, created data source table does not use absolute
uri.
- // This makes Spark can't read these tables across HDFS clusters.
- // Rewrite table path to absolute uri based on location uri (The
location uri has been
- // rewritten by HiveClientImpl.convertHiveTableToCatalogTable) to fix
this issue.
- toAbsoluteURI(CatalogUtils.stringToURI(path),
table.storage.locationUri)
- }
+ val tableLocation = getLocationFromStorageProps(table)
// We pass None as `newPath` here, to remove the path option in storage
properties.
- updateLocationInStorageProps(table, newPath = None).copy(locationUri =
tableLocation)
+ updateLocationInStorageProps(table, newPath = None).copy(
+ locationUri = tableLocation.map(CatalogUtils.stringToURI(_)))
}
val storageWithoutHiveGeneratedProperties =
storageWithLocation.copy(properties =
storageWithLocation.properties.filterKeys(!HIVE_GENERATED_STORAGE_PROPERTIES(_)).toMap)
@@ -1447,19 +1441,4 @@ object HiveExternalCatalog {
isHiveCompatibleDataType(m.keyType) &&
isHiveCompatibleDataType(m.valueType)
case _ => true
}
-
- /** Rewrite uri to absolute location. For example:
- * uri: /user/hive/warehouse/test_table
- * absoluteUri: viewfs://clusterA/user/hive/warehouse/
- * The result is: viewfs://clusterA/user/hive/warehouse/test_table
- */
- private[spark] def toAbsoluteURI(uri: URI, absoluteUri: Option[URI]): URI = {
- if (!uri.isAbsolute && absoluteUri.isDefined) {
- val aUri = absoluteUri.get
- new URI(aUri.getScheme, aUri.getUserInfo, aUri.getHost, aUri.getPort,
- uri.getPath, uri.getQuery, uri.getFragment)
- } else {
- uri
- }
- }
}
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 92c3ca0ab3e..becca8eae5e 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.hive.client
import java.io.PrintStream
import java.lang.{Iterable => JIterable}
import java.lang.reflect.InvocationTargetException
-import java.net.URI
import java.nio.charset.StandardCharsets.UTF_8
import java.util.{HashMap => JHashMap, Locale, Map => JMap}
import java.util.concurrent.TimeUnit._
@@ -54,7 +53,6 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException,
NoSuchDatabaseException, NoSuchPartitionException, NoSuchPartitionsException,
NoSuchTableException, PartitionsAlreadyExistException}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.catalog.CatalogUtils.stringToURI
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
@@ -539,21 +537,7 @@ private[hive] class HiveClientImpl(
createTime = h.getTTable.getCreateTime.toLong * 1000,
lastAccessTime = h.getLastAccessTime.toLong * 1000,
storage = CatalogStorageFormat(
- locationUri = shim.getDataLocation(h).map { loc =>
- val tableUri = stringToURI(loc)
- if (h.getTableType == HiveTableType.VIRTUAL_VIEW) {
- // Data location of SQL view is useless. Do not qualify it even if
it's present, as
- // it can be an invalid path.
- tableUri
- } else {
- // Before SPARK-19257, created data source table does not use
absolute uri.
- // This makes Spark can't read these tables across HDFS clusters.
- // Rewrite table location to absolute uri based on database uri to
fix this issue.
- val absoluteUri = Option(tableUri).filterNot(_.isAbsolute)
- .map(_ =>
stringToURI(client.getDatabase(h.getDbName).getLocationUri))
- HiveExternalCatalog.toAbsoluteURI(tableUri, absoluteUri)
- }
- },
+ locationUri = shim.getDataLocation(h).map(CatalogUtils.stringToURI),
// To avoid ClassNotFound exception, we try our best to not get the
format class, but get
// the class name directly. However, for non-native tables, there is
no interface to get
// the format class name, so we may still throw ClassNotFound in this
case.
@@ -793,8 +777,7 @@ private[hive] class HiveClientImpl(
spec: TablePartitionSpec): Option[CatalogTablePartition] = withHiveState
{
val hiveTable = rawHiveTable.rawTable.asInstanceOf[HiveTable]
val hivePartition = shim.getPartition(client, hiveTable, spec.asJava,
false)
- Option(hivePartition)
- .map(fromHivePartition(_,
rawHiveTable.toCatalogTable.storage.locationUri))
+ Option(hivePartition).map(fromHivePartition)
}
override def getPartitions(
@@ -816,10 +799,7 @@ private[hive] class HiveClientImpl(
assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid")
s
}
- val absoluteUri =
shim.getDataLocation(hiveTable).map(stringToURI).filterNot(_.isAbsolute)
- .map(_ =>
stringToURI(client.getDatabase(hiveTable.getDbName).getLocationUri))
- val parts = shim.getPartitions(client, hiveTable, partSpec.asJava)
- .map(fromHivePartition(_, absoluteUri))
+ val parts = shim.getPartitions(client, hiveTable,
partSpec.asJava).map(fromHivePartition)
HiveCatalogMetrics.incrementFetchedPartitions(parts.length)
parts
}
@@ -829,9 +809,8 @@ private[hive] class HiveClientImpl(
predicates: Seq[Expression]): Seq[CatalogTablePartition] = withHiveState
{
val hiveTable = rawHiveTable.rawTable.asInstanceOf[HiveTable]
hiveTable.setOwner(userName)
- val parts =
- shim.getPartitionsByFilter(client, hiveTable, predicates,
rawHiveTable.toCatalogTable)
- .map(fromHivePartition(_,
rawHiveTable.toCatalogTable.storage.locationUri))
+ val parts = shim.getPartitionsByFilter(
+ client, hiveTable, predicates,
rawHiveTable.toCatalogTable).map(fromHivePartition)
HiveCatalogMetrics.incrementFetchedPartitions(parts.length)
parts
}
@@ -1212,7 +1191,7 @@ private[hive] object HiveClientImpl extends Logging {
/**
* Build the native partition metadata from Hive's Partition.
*/
- def fromHivePartition(hp: HivePartition, absoluteUri: Option[URI]):
CatalogTablePartition = {
+ def fromHivePartition(hp: HivePartition): CatalogTablePartition = {
val apiPartition = hp.getTPartition
val properties: Map[String, String] = if (hp.getParameters != null) {
hp.getParameters.asScala.toMap
@@ -1222,8 +1201,7 @@ private[hive] object HiveClientImpl extends Logging {
CatalogTablePartition(
spec = Option(hp.getSpec).map(_.asScala.toMap).getOrElse(Map.empty),
storage = CatalogStorageFormat(
- locationUri = Option(HiveExternalCatalog.toAbsoluteURI(
- stringToURI(apiPartition.getSd.getLocation), absoluteUri)),
+ locationUri =
Option(CatalogUtils.stringToURI(apiPartition.getSd.getLocation)),
inputFormat = Option(apiPartition.getSd.getInputFormat),
outputFormat = Option(apiPartition.getSd.getOutputFormat),
serde = Option(apiPartition.getSd.getSerdeInfo.getSerializationLib),
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
index 7c36198c326..e413e0ee73c 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
@@ -200,19 +200,4 @@ class HiveExternalCatalogSuite extends
ExternalCatalogSuite {
assert(alteredTable.provider === Some("foo"))
})
}
-
- test("SPARK-39203: Rewrite table location to absolute location based on
database location") {
- val tableLocation1 = CatalogUtils.stringToURI("/user/hive/warehouse/t1")
- val tableLocation2 =
CatalogUtils.stringToURI("viewfs://clusterB/user/hive/warehouse/t2")
- val dbLocation =
CatalogUtils.stringToURI("viewfs://clusterA/user/hive/warehouse/")
-
- assert(HiveExternalCatalog.toAbsoluteURI(tableLocation1, Some(dbLocation))
-
.equals(CatalogUtils.stringToURI("viewfs://clusterA/user/hive/warehouse/t1")))
-
- assert(HiveExternalCatalog.toAbsoluteURI(tableLocation1, None)
- .equals(tableLocation1))
-
- assert(HiveExternalCatalog.toAbsoluteURI(tableLocation2, Some(dbLocation))
- .equals(tableLocation2))
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]