Repository: spark
Updated Branches:
  refs/heads/branch-2.0 1932bb683 -> 972106dd3


[SPARK-16182][CORE] Utils.scala -- terminateProcess() should call 
Process.destroyForcibly() if and only if Process.destroy() fails

## What changes were proposed in this pull request?

Utils.terminateProcess should `destroy()` first and only fall back to 
`destroyForcibly()` if it fails. It's kind of bad that we're force-killing 
executors -- and only in Java 8. See JIRA for an example of the impact: no 
shutdown

While here: `Utils.waitForProcess` should use the Java 8 method if available 
instead of a custom implementation.

## How was this patch tested?

Existing tests, which cover the force-kill case, and Amplab tests, which will 
cover both Java 7 and Java 8 eventually. However I tested locally on Java 8 and 
the PR builder will try Java 7 here.

Author: Sean Owen <[email protected]>

Closes #13973 from srowen/SPARK-16182.

(cherry picked from commit 2075bf8ef6035fd7606bcf20dc2cd7d7b9cda446)
Signed-off-by: Sean Owen <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/972106dd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/972106dd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/972106dd

Branch: refs/heads/branch-2.0
Commit: 972106dd3bdc40b0980949a09783d6d460e8d268
Parents: 1932bb6
Author: Sean Owen <[email protected]>
Authored: Fri Jul 1 09:22:27 2016 +0100
Committer: Sean Owen <[email protected]>
Committed: Fri Jul 1 09:22:36 2016 +0100

----------------------------------------------------------------------
 .../scala/org/apache/spark/util/Utils.scala     | 76 ++++++++++++--------
 .../org/apache/spark/util/UtilsSuite.scala      |  2 +-
 2 files changed, 47 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/972106dd/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index f77cc2f..0c23f3c 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1772,50 +1772,66 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * Terminates a process waiting for at most the specified duration. Returns 
whether
-   * the process terminated.
+   * Terminates a process waiting for at most the specified duration.
+   *
+   * @return the process exit value if it was successfully terminated, else 
None
    */
   def terminateProcess(process: Process, timeoutMs: Long): Option[Int] = {
-    try {
-      // Java8 added a new API which will more forcibly kill the process. Use 
that if available.
-      val destroyMethod = process.getClass().getMethod("destroyForcibly");
-      destroyMethod.setAccessible(true)
-      destroyMethod.invoke(process)
-    } catch {
-      case NonFatal(e) =>
-        if (!e.isInstanceOf[NoSuchMethodException]) {
-          logWarning("Exception when attempting to kill process", e)
-        }
-        process.destroy()
-    }
+    // Politely destroy first
+    process.destroy()
+
     if (waitForProcess(process, timeoutMs)) {
+      // Successful exit
       Option(process.exitValue())
     } else {
-      None
+      // Java 8 added a new API which will more forcibly kill the process. Use 
that if available.
+      try {
+        classOf[Process].getMethod("destroyForcibly").invoke(process)
+      } catch {
+        case _: NoSuchMethodException => return None // Not available; give up
+        case NonFatal(e) => logWarning("Exception when attempting to kill 
process", e)
+      }
+      // Wait, again, although this really should return almost immediately
+      if (waitForProcess(process, timeoutMs)) {
+        Option(process.exitValue())
+      } else {
+        logWarning("Timed out waiting to forcibly kill process")
+        None
+      }
     }
   }
 
   /**
    * Wait for a process to terminate for at most the specified duration.
-   * Return whether the process actually terminated after the given timeout.
+   *
+   * @return whether the process actually terminated before the given timeout.
    */
   def waitForProcess(process: Process, timeoutMs: Long): Boolean = {
-    var terminated = false
-    val startTime = System.currentTimeMillis
-    while (!terminated) {
-      try {
-        process.exitValue()
-        terminated = true
-      } catch {
-        case e: IllegalThreadStateException =>
-          // Process not terminated yet
-          if (System.currentTimeMillis - startTime > timeoutMs) {
-            return false
+    try {
+      // Use Java 8 method if available
+      classOf[Process].getMethod("waitFor", java.lang.Long.TYPE, 
classOf[TimeUnit])
+        .invoke(process, timeoutMs.asInstanceOf[java.lang.Long], 
TimeUnit.MILLISECONDS)
+        .asInstanceOf[Boolean]
+    } catch {
+      case _: NoSuchMethodError =>
+        // Otherwise implement it manually
+        var terminated = false
+        val startTime = System.currentTimeMillis
+        while (!terminated) {
+          try {
+            process.exitValue()
+            terminated = true
+          } catch {
+            case e: IllegalThreadStateException =>
+              // Process not terminated yet
+              if (System.currentTimeMillis - startTime > timeoutMs) {
+                return false
+              }
+              Thread.sleep(100)
           }
-          Thread.sleep(100)
-      }
+        }
+        true
     }
-    true
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/972106dd/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index df279b5..f5d0fb0 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -863,7 +863,7 @@ class UtilsSuite extends SparkFunSuite with 
ResetSystemProperties with Logging {
           assert(terminated.isDefined)
           Utils.waitForProcess(process, 5000)
           val duration = System.currentTimeMillis() - start
-          assert(duration < 5000)
+          assert(duration < 6000) // add a little extra time to allow a force 
kill to finish
           assert(!pidExists(pid))
         } finally {
           signal(pid, "SIGKILL")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to