Repository: spark
Updated Branches:
  refs/heads/master c5b735581 -> df4ea6614


[SPARK-15864][SQL] Fix Inconsistent Behaviors when Uncaching Non-cached Tables

#### What changes were proposed in this pull request?
To uncache a table, we have three different ways:
- _SQL interface_: `UNCACHE TABLE`
- _DataSet API_: `sparkSession.catalog.uncacheTable`
- _DataSet API_: `sparkSession.table(tableName).unpersist()`

When the table is not cached,
- _SQL interface_: `UNCACHE TABLE non-cachedTable` -> **no error message**
- _Dataset API_: `sparkSession.catalog.uncacheTable("non-cachedTable")` -> 
**report a strange error message:**
```requirement failed: Table [a: int] is not cached```
- _Dataset API_: `sparkSession.table("non-cachedTable").unpersist()` -> **no 
error message**

This PR will make them consistent. No operation if the table has already been 
uncached.

In addition, this PR also removes `uncacheQuery` and renames `tryUncacheQuery` 
to `uncacheQuery`, and documents it that it's noop if the table has already 
been uncached

#### How was this patch tested?
Improved the existing test case for verifying the cases when the table has not 
been cached.
Also added test cases for verifying the cases when the table does not exist

Author: gatorsmile <[email protected]>
Author: xiaoli <[email protected]>
Author: Xiao Li <[email protected]>

Closes #13593 from gatorsmile/uncacheNonCachedTable.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/df4ea661
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/df4ea661
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/df4ea661

Branch: refs/heads/master
Commit: df4ea6614d709ee66f1ceb966df6216b125b8ea1
Parents: c5b7355
Author: gatorsmile <[email protected]>
Authored: Tue Jun 14 11:44:37 2016 -0700
Committer: Wenchen Fan <[email protected]>
Committed: Tue Jun 14 11:44:37 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/Dataset.scala    |  2 +-
 .../spark/sql/execution/CacheManager.scala      | 17 +++-----------
 .../spark/sql/execution/command/cache.scala     |  2 +-
 .../spark/sql/execution/command/ddl.scala       |  2 +-
 .../spark/sql/execution/command/tables.scala    |  2 +-
 .../apache/spark/sql/internal/CatalogImpl.scala |  6 ++---
 .../org/apache/spark/sql/CachedTableSuite.scala |  6 -----
 .../spark/sql/hive/CachedTableSuite.scala       | 24 ++++++++++++++++----
 8 files changed, 30 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/df4ea661/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 5a67fc7..53779df 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2316,7 +2316,7 @@ class Dataset[T] private[sql](
    * @since 1.6.0
    */
   def unpersist(blocking: Boolean): this.type = {
-    sparkSession.sharedState.cacheManager.tryUncacheQuery(this, blocking)
+    sparkSession.sharedState.cacheManager.uncacheQuery(this, blocking)
     this
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/df4ea661/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 4e95754..de2503a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -104,22 +104,11 @@ private[sql] class CacheManager extends Logging {
     }
   }
 
-  /** Removes the data for the given [[Dataset]] from the cache */
-  private[sql] def uncacheQuery(query: Dataset[_], blocking: Boolean = true): 
Unit = writeLock {
-    val planToCache = query.queryExecution.analyzed
-    val dataIndex = cachedData.indexWhere(cd => 
planToCache.sameResult(cd.plan))
-    require(dataIndex >= 0, s"Table $query is not cached.")
-    
cachedData(dataIndex).cachedRepresentation.cachedColumnBuffers.unpersist(blocking)
-    cachedData.remove(dataIndex)
-  }
-
   /**
-   * Tries to remove the data for the given [[Dataset]] from the cache
-   * if it's cached
+   * Tries to remove the data for the given [[Dataset]] from the cache.
+   * No operation, if it's already uncached.
    */
-  private[sql] def tryUncacheQuery(
-      query: Dataset[_],
-      blocking: Boolean = true): Boolean = writeLock {
+  private[sql] def uncacheQuery(query: Dataset[_], blocking: Boolean = true): 
Boolean = writeLock {
     val planToCache = query.queryExecution.analyzed
     val dataIndex = cachedData.indexWhere(cd => 
planToCache.sameResult(cd.plan))
     val found = dataIndex >= 0

http://git-wip-us.apache.org/repos/asf/spark/blob/df4ea661/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
index 3e5eed2..5332366 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
@@ -53,7 +53,7 @@ case class CacheTableCommand(
 case class UncacheTableCommand(tableName: String) extends RunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
-    sparkSession.table(tableName).unpersist(blocking = false)
+    sparkSession.catalog.uncacheTable(tableName)
     Seq.empty[Row]
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/df4ea661/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 5fd0b83..fc00912 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -201,7 +201,7 @@ case class DropTableCommand(
         case _ =>
       })
       try {
-        sparkSession.sharedState.cacheManager.tryUncacheQuery(
+        sparkSession.sharedState.cacheManager.uncacheQuery(
           sparkSession.table(tableName.quotedString))
       } catch {
         case NonFatal(e) => log.warn(e.toString, e)

http://git-wip-us.apache.org/repos/asf/spark/blob/df4ea661/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 90db785..58bb5cd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -374,7 +374,7 @@ case class TruncateTableCommand(
     spark.sessionState.invalidateTable(tableName.unquotedString)
     // Also try to drop the contents of the table from the columnar cache
     try {
-      
spark.sharedState.cacheManager.tryUncacheQuery(spark.table(tableName.quotedString))
+      
spark.sharedState.cacheManager.uncacheQuery(spark.table(tableName.quotedString))
     } catch {
       case NonFatal(e) =>
         log.warn(s"Exception when attempting to uncache table '$tableName'", e)

http://git-wip-us.apache.org/repos/asf/spark/blob/df4ea661/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index f42fd17..601334b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -292,7 +292,7 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
    * @since 2.0.0
    */
   override def dropTempView(viewName: String): Unit = {
-    
sparkSession.sharedState.cacheManager.tryUncacheQuery(sparkSession.table(viewName))
+    
sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession.table(viewName))
     sessionCatalog.dropTable(TableIdentifier(viewName), ignoreIfNotExists = 
true)
   }
 
@@ -323,7 +323,7 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
    * @since 2.0.0
    */
   override def uncacheTable(tableName: String): Unit = {
-    
sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession.table(tableName))
+    sparkSession.sharedState.cacheManager.uncacheQuery(query = 
sparkSession.table(tableName))
   }
 
   /**
@@ -367,7 +367,7 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
       // TODO: Use uncacheTable once it supports database name.
       val df = Dataset.ofRows(sparkSession, logicalPlan)
       // Uncache the logicalPlan.
-      sparkSession.sharedState.cacheManager.tryUncacheQuery(df, blocking = 
true)
+      sparkSession.sharedState.cacheManager.uncacheQuery(df, blocking = true)
       // Cache it again.
       sparkSession.sharedState.cacheManager.cacheQuery(df, 
Some(tableIdent.table))
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/df4ea661/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 3306ac4..d7df18a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -186,12 +186,6 @@ class CachedTableSuite extends QueryTest with SQLTestUtils 
with SharedSQLContext
     assertCached(spark.table("testData"), 0)
   }
 
-  test("correct error on uncache of non-cached table") {
-    intercept[IllegalArgumentException] {
-      spark.catalog.uncacheTable("testData")
-    }
-  }
-
   test("SELECT star from cached table") {
     sql("SELECT * FROM testData").createOrReplaceTempView("selectStar")
     spark.catalog.cacheTable("selectStar")

http://git-wip-us.apache.org/repos/asf/spark/blob/df4ea661/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index 5121440..e35a719 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -20,12 +20,14 @@ package org.apache.spark.sql.hive
 import java.io.File
 
 import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode}
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
 import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
 import org.apache.spark.storage.RDDBlockId
 import org.apache.spark.util.Utils
 
-class CachedTableSuite extends QueryTest with TestHiveSingleton {
+class CachedTableSuite extends QueryTest with SQLTestUtils with 
TestHiveSingleton {
   import hiveContext._
 
   def rddIdOf(tableName: String): Int = {
@@ -95,9 +97,23 @@ class CachedTableSuite extends QueryTest with 
TestHiveSingleton {
     sql("DROP TABLE IF EXISTS nonexistantTable")
   }
 
-  test("correct error on uncache of non-cached table") {
-    intercept[IllegalArgumentException] {
-      spark.catalog.uncacheTable("src")
+  test("correct error on uncache of nonexistant tables") {
+    intercept[NoSuchTableException] {
+      spark.catalog.uncacheTable("nonexistantTable")
+    }
+    intercept[NoSuchTableException] {
+      sql("UNCACHE TABLE nonexistantTable")
+    }
+  }
+
+  test("no error on uncache of non-cached table") {
+    val tableName = "newTable"
+    withTable(tableName) {
+      sql(s"CREATE TABLE $tableName(a INT)")
+      // no error will be reported in the following three ways to uncache a 
table.
+      spark.catalog.uncacheTable(tableName)
+      sql("UNCACHE TABLE newTable")
+      sparkSession.table(tableName).unpersist()
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to