Repository: spark
Updated Branches:
refs/heads/master c5b735581 -> df4ea6614
[SPARK-15864][SQL] Fix Inconsistent Behaviors when Uncaching Non-cached Tables
#### What changes were proposed in this pull request?
To uncache a table, we have three different ways:
- _SQL interface_: `UNCACHE TABLE`
- _DataSet API_: `sparkSession.catalog.uncacheTable`
- _DataSet API_: `sparkSession.table(tableName).unpersist()`
When the table is not cached,
- _SQL interface_: `UNCACHE TABLE non-cachedTable` -> **no error message**
- _Dataset API_: `sparkSession.catalog.uncacheTable("non-cachedTable")` ->
**report a strange error message:**
```requirement failed: Table [a: int] is not cached```
- _Dataset API_: `sparkSession.table("non-cachedTable").unpersist()` -> **no
error message**
This PR will make them consistent. No operation if the table has already been
uncached.
In addition, this PR also removes `uncacheQuery` and renames `tryUncacheQuery`
to `uncacheQuery`, and documents it that it's noop if the table has already
been uncached
#### How was this patch tested?
Improved the existing test case for verifying the cases when the table has not
been cached.
Also added test cases for verifying the cases when the table does not exist
Author: gatorsmile <[email protected]>
Author: xiaoli <[email protected]>
Author: Xiao Li <[email protected]>
Closes #13593 from gatorsmile/uncacheNonCachedTable.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/df4ea661
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/df4ea661
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/df4ea661
Branch: refs/heads/master
Commit: df4ea6614d709ee66f1ceb966df6216b125b8ea1
Parents: c5b7355
Author: gatorsmile <[email protected]>
Authored: Tue Jun 14 11:44:37 2016 -0700
Committer: Wenchen Fan <[email protected]>
Committed: Tue Jun 14 11:44:37 2016 -0700
----------------------------------------------------------------------
.../scala/org/apache/spark/sql/Dataset.scala | 2 +-
.../spark/sql/execution/CacheManager.scala | 17 +++-----------
.../spark/sql/execution/command/cache.scala | 2 +-
.../spark/sql/execution/command/ddl.scala | 2 +-
.../spark/sql/execution/command/tables.scala | 2 +-
.../apache/spark/sql/internal/CatalogImpl.scala | 6 ++---
.../org/apache/spark/sql/CachedTableSuite.scala | 6 -----
.../spark/sql/hive/CachedTableSuite.scala | 24 ++++++++++++++++----
8 files changed, 30 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/df4ea661/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 5a67fc7..53779df 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2316,7 +2316,7 @@ class Dataset[T] private[sql](
* @since 1.6.0
*/
def unpersist(blocking: Boolean): this.type = {
- sparkSession.sharedState.cacheManager.tryUncacheQuery(this, blocking)
+ sparkSession.sharedState.cacheManager.uncacheQuery(this, blocking)
this
}
http://git-wip-us.apache.org/repos/asf/spark/blob/df4ea661/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 4e95754..de2503a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -104,22 +104,11 @@ private[sql] class CacheManager extends Logging {
}
}
- /** Removes the data for the given [[Dataset]] from the cache */
- private[sql] def uncacheQuery(query: Dataset[_], blocking: Boolean = true):
Unit = writeLock {
- val planToCache = query.queryExecution.analyzed
- val dataIndex = cachedData.indexWhere(cd =>
planToCache.sameResult(cd.plan))
- require(dataIndex >= 0, s"Table $query is not cached.")
-
cachedData(dataIndex).cachedRepresentation.cachedColumnBuffers.unpersist(blocking)
- cachedData.remove(dataIndex)
- }
-
/**
- * Tries to remove the data for the given [[Dataset]] from the cache
- * if it's cached
+ * Tries to remove the data for the given [[Dataset]] from the cache.
+ * No operation, if it's already uncached.
*/
- private[sql] def tryUncacheQuery(
- query: Dataset[_],
- blocking: Boolean = true): Boolean = writeLock {
+ private[sql] def uncacheQuery(query: Dataset[_], blocking: Boolean = true):
Boolean = writeLock {
val planToCache = query.queryExecution.analyzed
val dataIndex = cachedData.indexWhere(cd =>
planToCache.sameResult(cd.plan))
val found = dataIndex >= 0
http://git-wip-us.apache.org/repos/asf/spark/blob/df4ea661/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
index 3e5eed2..5332366 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
@@ -53,7 +53,7 @@ case class CacheTableCommand(
case class UncacheTableCommand(tableName: String) extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
- sparkSession.table(tableName).unpersist(blocking = false)
+ sparkSession.catalog.uncacheTable(tableName)
Seq.empty[Row]
}
http://git-wip-us.apache.org/repos/asf/spark/blob/df4ea661/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 5fd0b83..fc00912 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -201,7 +201,7 @@ case class DropTableCommand(
case _ =>
})
try {
- sparkSession.sharedState.cacheManager.tryUncacheQuery(
+ sparkSession.sharedState.cacheManager.uncacheQuery(
sparkSession.table(tableName.quotedString))
} catch {
case NonFatal(e) => log.warn(e.toString, e)
http://git-wip-us.apache.org/repos/asf/spark/blob/df4ea661/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 90db785..58bb5cd 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -374,7 +374,7 @@ case class TruncateTableCommand(
spark.sessionState.invalidateTable(tableName.unquotedString)
// Also try to drop the contents of the table from the columnar cache
try {
-
spark.sharedState.cacheManager.tryUncacheQuery(spark.table(tableName.quotedString))
+
spark.sharedState.cacheManager.uncacheQuery(spark.table(tableName.quotedString))
} catch {
case NonFatal(e) =>
log.warn(s"Exception when attempting to uncache table '$tableName'", e)
http://git-wip-us.apache.org/repos/asf/spark/blob/df4ea661/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index f42fd17..601334b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -292,7 +292,7 @@ class CatalogImpl(sparkSession: SparkSession) extends
Catalog {
* @since 2.0.0
*/
override def dropTempView(viewName: String): Unit = {
-
sparkSession.sharedState.cacheManager.tryUncacheQuery(sparkSession.table(viewName))
+
sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession.table(viewName))
sessionCatalog.dropTable(TableIdentifier(viewName), ignoreIfNotExists =
true)
}
@@ -323,7 +323,7 @@ class CatalogImpl(sparkSession: SparkSession) extends
Catalog {
* @since 2.0.0
*/
override def uncacheTable(tableName: String): Unit = {
-
sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession.table(tableName))
+ sparkSession.sharedState.cacheManager.uncacheQuery(query =
sparkSession.table(tableName))
}
/**
@@ -367,7 +367,7 @@ class CatalogImpl(sparkSession: SparkSession) extends
Catalog {
// TODO: Use uncacheTable once it supports database name.
val df = Dataset.ofRows(sparkSession, logicalPlan)
// Uncache the logicalPlan.
- sparkSession.sharedState.cacheManager.tryUncacheQuery(df, blocking =
true)
+ sparkSession.sharedState.cacheManager.uncacheQuery(df, blocking = true)
// Cache it again.
sparkSession.sharedState.cacheManager.cacheQuery(df,
Some(tableIdent.table))
}
http://git-wip-us.apache.org/repos/asf/spark/blob/df4ea661/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 3306ac4..d7df18a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -186,12 +186,6 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
with SharedSQLContext
assertCached(spark.table("testData"), 0)
}
- test("correct error on uncache of non-cached table") {
- intercept[IllegalArgumentException] {
- spark.catalog.uncacheTable("testData")
- }
- }
-
test("SELECT star from cached table") {
sql("SELECT * FROM testData").createOrReplaceTempView("selectStar")
spark.catalog.cacheTable("selectStar")
http://git-wip-us.apache.org/repos/asf/spark/blob/df4ea661/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index 5121440..e35a719 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -20,12 +20,14 @@ package org.apache.spark.sql.hive
import java.io.File
import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode}
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.storage.RDDBlockId
import org.apache.spark.util.Utils
-class CachedTableSuite extends QueryTest with TestHiveSingleton {
+class CachedTableSuite extends QueryTest with SQLTestUtils with
TestHiveSingleton {
import hiveContext._
def rddIdOf(tableName: String): Int = {
@@ -95,9 +97,23 @@ class CachedTableSuite extends QueryTest with
TestHiveSingleton {
sql("DROP TABLE IF EXISTS nonexistantTable")
}
- test("correct error on uncache of non-cached table") {
- intercept[IllegalArgumentException] {
- spark.catalog.uncacheTable("src")
+ test("correct error on uncache of nonexistant tables") {
+ intercept[NoSuchTableException] {
+ spark.catalog.uncacheTable("nonexistantTable")
+ }
+ intercept[NoSuchTableException] {
+ sql("UNCACHE TABLE nonexistantTable")
+ }
+ }
+
+ test("no error on uncache of non-cached table") {
+ val tableName = "newTable"
+ withTable(tableName) {
+ sql(s"CREATE TABLE $tableName(a INT)")
+ // no error will be reported in the following three ways to uncache a
table.
+ spark.catalog.uncacheTable(tableName)
+ sql("UNCACHE TABLE newTable")
+ sparkSession.table(tableName).unpersist()
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]