This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new b331ad0ae2b [SPARK-41344][SQL] Make error clearer when table not found
in SupportsCatalogOptions catalog
b331ad0ae2b is described below
commit b331ad0ae2b6ab566aea2ddbdbddcd3d28f8eaa1
Author: Zhen Wang <[email protected]>
AuthorDate: Thu Dec 29 20:25:17 2022 +0900
[SPARK-41344][SQL] Make error clearer when table not found in
SupportsCatalogOptions catalog
### What changes were proposed in this pull request?
Make error clearer when table not found in SupportsCatalogOptions catalog.
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes #38871 from wForget/SPARK-41344.
Lead-authored-by: Zhen Wang <[email protected]>
Co-authored-by: wForget <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../sql/connector/catalog/CatalogV2Util.scala | 27 ++++++++++++++--------
.../datasources/v2/DataSourceV2Utils.scala | 2 +-
.../connector/SupportsCatalogOptionsSuite.scala | 9 +++++++-
3 files changed, 26 insertions(+), 12 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
index abd43065048..72c557c8d77 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
@@ -330,22 +330,29 @@ private[sql] object CatalogV2Util {
ident: Identifier,
timeTravelSpec: Option[TimeTravelSpec] = None): Option[Table] =
try {
- if (timeTravelSpec.nonEmpty) {
- timeTravelSpec.get match {
- case v: AsOfVersion =>
- Option(catalog.asTableCatalog.loadTable(ident, v.version))
- case ts: AsOfTimestamp =>
- Option(catalog.asTableCatalog.loadTable(ident, ts.timestamp))
- }
- } else {
- Option(catalog.asTableCatalog.loadTable(ident))
- }
+ Option(getTable(catalog, ident, timeTravelSpec))
} catch {
case _: NoSuchTableException => None
case _: NoSuchDatabaseException => None
case _: NoSuchNamespaceException => None
}
+ def getTable(
+ catalog: CatalogPlugin,
+ ident: Identifier,
+ timeTravelSpec: Option[TimeTravelSpec] = None): Table = {
+ if (timeTravelSpec.nonEmpty) {
+ timeTravelSpec.get match {
+ case v: AsOfVersion =>
+ catalog.asTableCatalog.loadTable(ident, v.version)
+ case ts: AsOfTimestamp =>
+ catalog.asTableCatalog.loadTable(ident, ts.timestamp)
+ }
+ } else {
+ catalog.asTableCatalog.loadTable(ident)
+ }
+ }
+
def loadFunction(catalog: CatalogPlugin, ident: Identifier):
Option[UnboundFunction] = {
try {
Option(catalog.asFunctionCatalog.loadFunction(ident))
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
index f1d1cc5a173..c906e42a9b9 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
@@ -134,7 +134,7 @@ private[sql] object DataSourceV2Utils extends Logging {
None
}
val timeTravel = TimeTravelSpec.create(timeTravelTimestamp,
timeTravelVersion, conf)
- (CatalogV2Util.loadTable(catalog, ident, timeTravel).get,
Some(catalog), Some(ident))
+ (CatalogV2Util.getTable(catalog, ident, timeTravel), Some(catalog),
Some(ident))
case _ =>
// TODO: Non-catalog paths for DSV2 are currently not well defined.
val tbl = DataSourceV2Utils.getTableFromProvider(provider, dsOptions,
userSpecifiedSchema)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
index f8278d18b0a..fd4f719417e 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
@@ -27,7 +27,7 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, SaveMode}
-import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
+import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException,
TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan,
OverwriteByExpression}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.connector.catalog.{Identifier,
InMemoryTableCatalog, SupportsCatalogOptions, TableCatalog}
@@ -214,6 +214,13 @@ class SupportsCatalogOptionsSuite extends QueryTest with
SharedSparkSession with
checkV2Identifiers(df.logicalPlan)
}
+ test("DataFrameReader read non-existent table") {
+ val e = intercept[NoSuchTableException] {
+ spark.read.format(format).option("name", "non_existent_table").load()
+ }
+ checkErrorTableNotFound(e, "`default`.`non_existent_table`")
+ }
+
test("DataFrameWriter creates v2Relation with identifiers") {
sql(s"create table $catalogName.t1 (id bigint) using $format")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]