This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d0e9219  [SPARK-26617][SQL] Cache manager locks
d0e9219 is described below

commit d0e9219e03373b0db918d51bafe6a97775e2c65f
Author: Dave DeCaprio <da...@alum.mit.edu>
AuthorDate: Thu Jan 24 10:48:48 2019 +0800

    [SPARK-26617][SQL] Cache manager locks
    
    ## What changes were proposed in this pull request?
    
    Fixed several places in CacheManager where a write lock was being held 
while running the query optimizer.  This could cause a very lock block if the 
query optimization takes a long time.  This builds on changes from 
[SPARK-26548] that fixed this issue for one specific case in the CacheManager.
    
    gatorsmile This is very similar to the PR you approved last week.
    
    ## How was this patch tested?
    
    Has been tested on a live system where the blocking was causing major 
issues and it is working well.
    CacheManager has no explicit unit test but is used in many places 
internally as part of the SharedState.
    
    Closes #23539 from DaveDeCaprio/cache-manager-locks.
    
    Lead-authored-by: Dave DeCaprio <da...@alum.mit.edu>
    Co-authored-by: David DeCaprio <da...@alum.mit.edu>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../apache/spark/sql/execution/CacheManager.scala  | 69 ++++++++++++++--------
 1 file changed, 43 insertions(+), 26 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 728fde5..0e6627c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 import org.apache.hadoop.fs.{FileSystem, Path}
 
@@ -120,7 +121,7 @@ class CacheManager extends Logging {
   def uncacheQuery(
       query: Dataset[_],
       cascade: Boolean,
-      blocking: Boolean = true): Unit = writeLock {
+      blocking: Boolean = true): Unit = {
     uncacheQuery(query.sparkSession, query.logicalPlan, cascade, blocking)
   }
 
@@ -136,21 +137,27 @@ class CacheManager extends Logging {
       spark: SparkSession,
       plan: LogicalPlan,
       cascade: Boolean,
-      blocking: Boolean): Unit = writeLock {
+      blocking: Boolean): Unit = {
     val shouldRemove: LogicalPlan => Boolean =
       if (cascade) {
         _.find(_.sameResult(plan)).isDefined
       } else {
         _.sameResult(plan)
       }
-    val it = cachedData.iterator()
-    while (it.hasNext) {
-      val cd = it.next()
-      if (shouldRemove(cd.plan)) {
-        cd.cachedRepresentation.cacheBuilder.clearCache(blocking)
-        it.remove()
+    val plansToUncache = mutable.Buffer[CachedData]()
+    writeLock {
+      val it = cachedData.iterator()
+      while (it.hasNext) {
+        val cd = it.next()
+        if (shouldRemove(cd.plan)) {
+          plansToUncache += cd
+          it.remove()
+        }
       }
     }
+    plansToUncache.foreach { cd =>
+      cd.cachedRepresentation.cacheBuilder.clearCache(blocking)
+    }
     // Re-compile dependent cached queries after removing the cached query.
     if (!cascade) {
       recacheByCondition(spark, _.find(_.sameResult(plan)).isDefined, 
clearCache = false)
@@ -160,7 +167,7 @@ class CacheManager extends Logging {
   /**
    * Tries to re-cache all the cache entries that refer to the given plan.
    */
-  def recacheByPlan(spark: SparkSession, plan: LogicalPlan): Unit = writeLock {
+  def recacheByPlan(spark: SparkSession, plan: LogicalPlan): Unit = {
     recacheByCondition(spark, _.find(_.sameResult(plan)).isDefined)
   }
 
@@ -168,26 +175,36 @@ class CacheManager extends Logging {
       spark: SparkSession,
       condition: LogicalPlan => Boolean,
       clearCache: Boolean = true): Unit = {
-    val it = cachedData.iterator()
     val needToRecache = scala.collection.mutable.ArrayBuffer.empty[CachedData]
-    while (it.hasNext) {
-      val cd = it.next()
-      if (condition(cd.plan)) {
-        if (clearCache) {
-          cd.cachedRepresentation.cacheBuilder.clearCache()
+    writeLock {
+      val it = cachedData.iterator()
+      while (it.hasNext) {
+        val cd = it.next()
+        if (condition(cd.plan)) {
+          needToRecache += cd
+          // Remove the cache entry before we create a new one, so that we can 
have a different
+          // physical plan.
+          it.remove()
+        }
+      }
+    }
+    needToRecache.map { cd =>
+      if (clearCache) {
+        cd.cachedRepresentation.cacheBuilder.clearCache()
+      }
+      val plan = spark.sessionState.executePlan(cd.plan).executedPlan
+      val newCache = InMemoryRelation(
+        cacheBuilder = 
cd.cachedRepresentation.cacheBuilder.withCachedPlan(plan),
+        logicalPlan = cd.plan)
+      val recomputedPlan = cd.copy(cachedRepresentation = newCache)
+      writeLock {
+        if (lookupCachedData(recomputedPlan.plan).nonEmpty) {
+          logWarning("While recaching, data was already added to cache.")
+        } else {
+          cachedData.add(recomputedPlan)
         }
-        // Remove the cache entry before we create a new one, so that we can 
have a different
-        // physical plan.
-        it.remove()
-        val plan = spark.sessionState.executePlan(cd.plan).executedPlan
-        val newCache = InMemoryRelation(
-          cacheBuilder = 
cd.cachedRepresentation.cacheBuilder.withCachedPlan(plan),
-          logicalPlan = cd.plan)
-        needToRecache += cd.copy(cachedRepresentation = newCache)
       }
     }
-
-    needToRecache.foreach(cachedData.add)
   }
 
   /** Optionally returns cached data for the given [[Dataset]] */
@@ -225,7 +242,7 @@ class CacheManager extends Logging {
    * Tries to re-cache all the cache entries that contain `resourcePath` in 
one or more
    * `HadoopFsRelation` node(s) as part of its logical plan.
    */
-  def recacheByPath(spark: SparkSession, resourcePath: String): Unit = 
writeLock {
+  def recacheByPath(spark: SparkSession, resourcePath: String): Unit = {
     val (fs, qualifiedPath) = {
       val path = new Path(resourcePath)
       val fs = path.getFileSystem(spark.sessionState.newHadoopConf())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to